This commit is contained in:
Jöran Malek 2023-11-27 01:14:18 +01:00
parent f79341e0c0
commit 3d635ad4c1

View file

@ -21,8 +21,10 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService
Share = (FileShare)7, Share = (FileShare)7,
}; };
private readonly Decoder _decoder;
private readonly FileSystemWatcher _fsw; private readonly FileSystemWatcher _fsw;
private readonly string _leaseFile; private readonly string _leaseFile;
private readonly Pipe _pipe;
private Channel<FileSystemEventArgs>? _eventChannel; private Channel<FileSystemEventArgs>? _eventChannel;
private Task? _executeTask; private Task? _executeTask;
private CancellationTokenSource? _stoppingCts; private CancellationTokenSource? _stoppingCts;
@ -45,10 +47,12 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService
throw new ArgumentException($"{nameof(options.Leases)} must point to a file in an existing path.", nameof(options)); throw new ArgumentException($"{nameof(options.Leases)} must point to a file in an existing path.", nameof(options));
} }
_decoder = Encoding.UTF8.GetDecoder();
_leaseFile = leaseFile.ToString(); _leaseFile = leaseFile.ToString();
_fsw = new(leaseDirectory, _leaseFile); _fsw = new(leaseDirectory, _leaseFile);
_fsw.Changed += OnLeaseChanged; _fsw.Changed += OnLeaseChanged;
_fsw.Error += OnLeaseError; _fsw.Error += OnLeaseError;
_pipe = new();
} }
public Task StartAsync(CancellationToken cancellationToken) public Task StartAsync(CancellationToken cancellationToken)
@ -82,14 +86,15 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService
private async Task Reading(CancellationToken stoppingToken) private async Task Reading(CancellationToken stoppingToken)
{ {
SemaphoreSlim readerLock = new(1, 1);
while (!stoppingToken.IsCancellationRequested) while (!stoppingToken.IsCancellationRequested)
{ {
using CancellationTokenSource loopCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); using CancellationTokenSource loopCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
var loopToken = loopCts.Token; var loopToken = loopCts.Token;
_eventChannel = Channel.CreateUnbounded<FileSystemEventArgs>(); _eventChannel = Channel.CreateUnbounded<FileSystemEventArgs>();
_fsw.EnableRaisingEvents = true; _fsw.EnableRaisingEvents = true;
Task reader = FileReader(loopToken); using SemaphoreSlim waitTrap = new(0, 1);
using SemaphoreSlim changeEvent = new(0, 1);
Task reader = FileReader(waitTrap, changeEvent, loopToken);
try try
{ {
await foreach (var @event in _eventChannel.Reader.ReadAllAsync(loopToken)) await foreach (var @event in _eventChannel.Reader.ReadAllAsync(loopToken))
@ -108,12 +113,12 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService
if (reader is not { IsCompleted: false }) if (reader is not { IsCompleted: false })
{ {
reader = FileReader(loopToken); reader = FileReader(waitTrap, changeEvent, loopToken);
} }
if (@event.ChangeType == WatcherChangeTypes.Changed) if (waitTrap.Wait(0, CancellationToken.None))
{ {
changeEvent.Release();
} }
} }
} }
@ -133,11 +138,9 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService
} }
} }
private async Task FileReader(CancellationToken stoppingToken) private async Task FileReader(SemaphoreSlim waitTrap, SemaphoreSlim changeEvent, CancellationToken stoppingToken)
{ {
Pipe buffer = new(); PipeWriter writer = _pipe.Writer;
var writer = buffer.Writer;
var textDecoder = Encoding.UTF8.GetDecoder();
SepReader? reader = null; SepReader? reader = null;
try try
{ {
@ -151,14 +154,14 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService
{ {
if (reader is null) if (reader is null)
{ {
reader = Sep.Reader().From(buffer.Reader.AsStream()); reader = Sep.Reader().From(_pipe.Reader.AsStream());
continue; continue;
} }
if (!reader.MoveNext()) if (!reader.MoveNext())
{ {
// TODO Error state. // TODO Error state.
continue; return;
} }
} }
@ -166,20 +169,22 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService
int read = await file.ReadAsync(memory, stoppingToken); int read = await file.ReadAsync(memory, stoppingToken);
if (read > 0) if (read > 0)
{ {
CountNewLines(textDecoder, memory[..read], ref newLinesEncountered, ref awaitLineFeed); CountNewLines(_decoder, memory[..read], ref newLinesEncountered, ref awaitLineFeed);
writer.Advance(read); writer.Advance(read);
} }
else else
{ {
// TODO Await. var acquireLock = changeEvent.WaitAsync(stoppingToken);
waitTrap.Release();
await acquireLock.ConfigureAwait(continueOnCapturedContext: false);
} }
} }
} }
catch { }
finally finally
{ {
writer.Complete();
reader?.Dispose(); reader?.Dispose();
writer.Complete();
_pipe.Reset();
} }
static void CountNewLines(Decoder decoder, in Memory<byte> memory, ref int newLinesEncountered, ref bool awaitLineFeed) static void CountNewLines(Decoder decoder, in Memory<byte> memory, ref int newLinesEncountered, ref bool awaitLineFeed)