This commit is contained in:
Jöran Malek 2023-11-26 01:50:32 +01:00
parent 7131259eba
commit 4508324ff5

View file

@ -2,12 +2,23 @@ using System.Threading.Channels;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using nietras.SeparatedValues;
using pdns_dhcp.Options; using pdns_dhcp.Options;
using Stl.Async;
namespace pdns_dhcp.Kea; namespace pdns_dhcp.Kea;
public abstract class KeaDhcpLeaseWatcher : IHostedService public abstract class KeaDhcpLeaseWatcher : IHostedService
{ {
private static readonly FileStreamOptions _leaseFileStreamOptions = new()
{
Access = FileAccess.Read,
Mode = FileMode.Open,
Options = FileOptions.SequentialScan,
Share = (FileShare)7,
};
private readonly FileSystemWatcher _fsw; private readonly FileSystemWatcher _fsw;
private readonly string _leaseFile; private readonly string _leaseFile;
private Channel<FileSystemEventArgs>? _eventChannel; private Channel<FileSystemEventArgs>? _eventChannel;
@ -69,30 +80,38 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService
private async Task Reading(CancellationToken stoppingToken) private async Task Reading(CancellationToken stoppingToken)
{ {
SemaphoreSlim readerLock = new(1, 1);
while (!stoppingToken.IsCancellationRequested) while (!stoppingToken.IsCancellationRequested)
{ {
using CancellationTokenSource loopCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); using CancellationTokenSource loopCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
var loopToken = loopCts.Token;
_eventChannel = Channel.CreateUnbounded<FileSystemEventArgs>(); _eventChannel = Channel.CreateUnbounded<FileSystemEventArgs>();
_fsw.EnableRaisingEvents = true; _fsw.EnableRaisingEvents = true;
ValueTask reader = default; Task reader = FileReader(loopToken);
try try
{ {
await foreach (var @event in _eventChannel.Reader.ReadAllAsync(loopCts.Token)) await foreach (var @event in _eventChannel.Reader.ReadAllAsync(loopToken))
{ {
// Guard for Deleted-events and moved-away events,
// both have to stop this reader immediately.
// Just wait for the file being created/moved to _leaseFile.
// Described in [The LFC Process](https://kea.readthedocs.io/en/latest/arm/lfc.html#kea-lfc)
switch (@event) switch (@event)
{ {
case { ChangeType: WatcherChangeTypes.Created }:
case RenamedEventArgs { ChangeType: WatcherChangeTypes.Renamed } renamed when renamed.Name == _leaseFile:
reader = FileReader(loopCts.Token);
break;
case { ChangeType: WatcherChangeTypes.Deleted }: case { ChangeType: WatcherChangeTypes.Deleted }:
case RenamedEventArgs { ChangeType: WatcherChangeTypes.Renamed } renamed when renamed.OldName == _leaseFile: case RenamedEventArgs renamed when renamed.OldName == _leaseFile:
loopCts.Cancel(); loopCts.Cancel();
break; continue;
}
if (reader is not { IsCompleted: false })
{
reader = FileReader(loopToken);
}
if (@event.ChangeType == WatcherChangeTypes.Changed)
{
case { ChangeType: WatcherChangeTypes.Changed }:
break;
} }
} }
} }
@ -100,18 +119,25 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService
finally finally
{ {
_eventChannel.Writer.TryComplete(); _eventChannel.Writer.TryComplete();
try if (reader is { IsCompleted: false })
{ {
await reader.ConfigureAwait(continueOnCapturedContext: false); try
{
await reader.ConfigureAwait(continueOnCapturedContext: false);
}
catch { }
} }
catch { }
} }
} }
}
ValueTask FileReader(CancellationToken stoppingToken) private async Task FileReader(CancellationToken stoppingToken)
{
try
{ {
return ValueTask.CompletedTask; using var fileStream = new FileStream(Options.Leases, _leaseFileStreamOptions);
} }
catch { }
} }
private void OnLeaseChanged(object sender, FileSystemEventArgs e) private void OnLeaseChanged(object sender, FileSystemEventArgs e)
@ -122,13 +148,16 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService
return; return;
} }
var write = writer.WriteAsync(e); #pragma warning disable CA2012 // Task is awaited immediately.
if (write.IsCompleted) if (writer.WriteAsync(e) is { IsCompleted: false } task)
{ {
return; try
{
task.GetAwaiter().GetResult();
}
catch { }
} }
#pragma warning restore
write.GetAwaiter().GetResult();
} }
private void OnLeaseError(object sender, ErrorEventArgs e) private void OnLeaseError(object sender, ErrorEventArgs e)