From 3d635ad4c184f1f611c7ccf7daf1a5605c6af13b Mon Sep 17 00:00:00 2001 From: AliveDevil Date: Mon, 27 Nov 2023 01:14:18 +0100 Subject: [PATCH] Sync --- src/pdns-dhcp/Kea/KeaLeaseWatcher.cs | 35 ++++++++++++++++------------ 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/src/pdns-dhcp/Kea/KeaLeaseWatcher.cs b/src/pdns-dhcp/Kea/KeaLeaseWatcher.cs index 31ca565..8565c91 100644 --- a/src/pdns-dhcp/Kea/KeaLeaseWatcher.cs +++ b/src/pdns-dhcp/Kea/KeaLeaseWatcher.cs @@ -21,8 +21,10 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService Share = (FileShare)7, }; + private readonly Decoder _decoder; private readonly FileSystemWatcher _fsw; private readonly string _leaseFile; + private readonly Pipe _pipe; private Channel? _eventChannel; private Task? _executeTask; private CancellationTokenSource? _stoppingCts; @@ -45,10 +47,12 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService throw new ArgumentException($"{nameof(options.Leases)} must point to a file in an existing path.", nameof(options)); } + _decoder = Encoding.UTF8.GetDecoder(); _leaseFile = leaseFile.ToString(); _fsw = new(leaseDirectory, _leaseFile); _fsw.Changed += OnLeaseChanged; _fsw.Error += OnLeaseError; + _pipe = new(); } public Task StartAsync(CancellationToken cancellationToken) @@ -82,14 +86,15 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService private async Task Reading(CancellationToken stoppingToken) { - SemaphoreSlim readerLock = new(1, 1); while (!stoppingToken.IsCancellationRequested) { using CancellationTokenSource loopCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); var loopToken = loopCts.Token; _eventChannel = Channel.CreateUnbounded(); _fsw.EnableRaisingEvents = true; - Task reader = FileReader(loopToken); + using SemaphoreSlim waitTrap = new(0, 1); + using SemaphoreSlim changeEvent = new(0, 1); + Task reader = FileReader(waitTrap, changeEvent, loopToken); try { await foreach (var @event in _eventChannel.Reader.ReadAllAsync(loopToken)) @@ -108,12 +113,12 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService if (reader is not { IsCompleted: false }) { - reader = FileReader(loopToken); + reader = FileReader(waitTrap, changeEvent, loopToken); } - if (@event.ChangeType == WatcherChangeTypes.Changed) + if (waitTrap.Wait(0, CancellationToken.None)) { - + changeEvent.Release(); } } } @@ -133,11 +138,9 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService } } - private async Task FileReader(CancellationToken stoppingToken) + private async Task FileReader(SemaphoreSlim waitTrap, SemaphoreSlim changeEvent, CancellationToken stoppingToken) { - Pipe buffer = new(); - var writer = buffer.Writer; - var textDecoder = Encoding.UTF8.GetDecoder(); + PipeWriter writer = _pipe.Writer; SepReader? reader = null; try { @@ -151,14 +154,14 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService { if (reader is null) { - reader = Sep.Reader().From(buffer.Reader.AsStream()); + reader = Sep.Reader().From(_pipe.Reader.AsStream()); continue; } if (!reader.MoveNext()) { // TODO Error state. - continue; + return; } } @@ -166,20 +169,22 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService int read = await file.ReadAsync(memory, stoppingToken); if (read > 0) { - CountNewLines(textDecoder, memory[..read], ref newLinesEncountered, ref awaitLineFeed); + CountNewLines(_decoder, memory[..read], ref newLinesEncountered, ref awaitLineFeed); writer.Advance(read); } else { - // TODO Await. + var acquireLock = changeEvent.WaitAsync(stoppingToken); + waitTrap.Release(); + await acquireLock.ConfigureAwait(continueOnCapturedContext: false); } } } - catch { } finally { - writer.Complete(); reader?.Dispose(); + writer.Complete(); + _pipe.Reset(); } static void CountNewLines(Decoder decoder, in Memory memory, ref int newLinesEncountered, ref bool awaitLineFeed)