From 4e7ab4a4528f9f70323fa7509f90e0c10a525a5d Mon Sep 17 00:00:00 2001 From: AliveDevil Date: Mon, 27 Nov 2023 23:50:37 +0100 Subject: [PATCH] Handle synchronization --- src/pdns-dhcp/Kea/KeaLeaseWatcher.cs | 31 +++++---- .../System/Threading/Tasks/AsyncWaitHandle.cs | 68 +++++++++++++++++++ 2 files changed, 85 insertions(+), 14 deletions(-) create mode 100644 src/pdns-dhcp/System/Threading/Tasks/AsyncWaitHandle.cs diff --git a/src/pdns-dhcp/Kea/KeaLeaseWatcher.cs b/src/pdns-dhcp/Kea/KeaLeaseWatcher.cs index 8565c91..92bf812 100644 --- a/src/pdns-dhcp/Kea/KeaLeaseWatcher.cs +++ b/src/pdns-dhcp/Kea/KeaLeaseWatcher.cs @@ -92,20 +92,22 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService var loopToken = loopCts.Token; _eventChannel = Channel.CreateUnbounded(); _fsw.EnableRaisingEvents = true; - using SemaphoreSlim waitTrap = new(0, 1); - using SemaphoreSlim changeEvent = new(0, 1); - Task reader = FileReader(waitTrap, changeEvent, loopToken); + using AutoResetEvent resetEvent = new(false); + Task reader = Task.CompletedTask; try { - await foreach (var @event in _eventChannel.Reader.ReadAllAsync(loopToken)) + for ( + EventArgs readArgs = EventArgs.Empty; + !loopToken.IsCancellationRequested; + readArgs = await _eventChannel.Reader.ReadAsync(loopToken)) { // Guard for Deleted and renamed away events, // both have to stop this reader immediately. // Just wait for the file being created/renamed to _leaseFile. // Described in [The LFC Process](https://kea.readthedocs.io/en/latest/arm/lfc.html#kea-lfc) - switch (@event) + switch (readArgs) { - case { ChangeType: WatcherChangeTypes.Deleted }: + case FileSystemEventArgs { ChangeType: WatcherChangeTypes.Deleted }: case RenamedEventArgs renamed when renamed.OldName == _leaseFile: loopCts.Cancel(); continue; @@ -113,12 +115,15 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService if (reader is not { IsCompleted: false }) { - reader = FileReader(waitTrap, changeEvent, loopToken); + // In any case that the reader failed (for whatever reason) + // restart now. + // Incoming event could be Changed/Created/Renamed + // This doesn't care, as we already lost the file handle. + reader = FileReader(resetEvent, stoppingToken); } - - if (waitTrap.Wait(0, CancellationToken.None)) + else { - changeEvent.Release(); + resetEvent.Set(); } } } @@ -138,7 +143,7 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService } } - private async Task FileReader(SemaphoreSlim waitTrap, SemaphoreSlim changeEvent, CancellationToken stoppingToken) + private async Task FileReader(AutoResetEvent waitHandle, CancellationToken stoppingToken) { PipeWriter writer = _pipe.Writer; SepReader? reader = null; @@ -174,9 +179,7 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService } else { - var acquireLock = changeEvent.WaitAsync(stoppingToken); - waitTrap.Release(); - await acquireLock.ConfigureAwait(continueOnCapturedContext: false); + await waitHandle.WaitOneAsync(stoppingToken).ConfigureAwait(continueOnCapturedContext: false); } } } diff --git a/src/pdns-dhcp/System/Threading/Tasks/AsyncWaitHandle.cs b/src/pdns-dhcp/System/Threading/Tasks/AsyncWaitHandle.cs new file mode 100644 index 0000000..fa42fd9 --- /dev/null +++ b/src/pdns-dhcp/System/Threading/Tasks/AsyncWaitHandle.cs @@ -0,0 +1,68 @@ +namespace System.Threading.Tasks; + +public static class AsyncWaitHandle +{ + public static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken = default) + { + WaitOneAsyncState state = new(waitHandle, cancellationToken); + return state.WaitOneAsync(); + } + + private class WaitOneAsyncState : IDisposable + { + private readonly CancellationTokenRegistration _cancellationTokenRegistration; + private readonly RegisteredWaitHandle _registeredWaitHandle; + private readonly TaskCompletionSource _tcs; + private bool _disposed; + + public WaitOneAsyncState(WaitHandle waitHandle, CancellationToken cancellationToken) + { + _tcs = new(); + _cancellationTokenRegistration = cancellationToken.Register((state, token) => ((WaitOneAsyncState)state!).Canceled(token), this); + _registeredWaitHandle = ThreadPool.RegisterWaitForSingleObject(waitHandle, (state, timeout) => ((WaitOneAsyncState)state!).Signaled(timeout), this, Timeout.Infinite, true); + } + + public void Dispose() + { + if (_disposed) + { + return; + } + + _registeredWaitHandle.Unregister(default); + _cancellationTokenRegistration.Dispose(); + + _disposed = true; + } + + public Task WaitOneAsync() + { + const TaskContinuationOptions options = TaskContinuationOptions.AttachedToParent | TaskContinuationOptions.ExecuteSynchronously; + return _tcs.Task.ContinueWith((upstream, state) => ((WaitOneAsyncState)state!).Continuation(upstream), this, options).Unwrap(); + } + + private void Canceled(CancellationToken token) + { + _tcs.SetCanceled(token); + this.Dispose(); + } + + private Task Continuation(Task task) + { + this.Dispose(); + return task; + } + + private void Signaled(bool timeout) + { + if (timeout) + { + Canceled(CancellationToken.None); + } + else + { + _tcs.SetResult(); + } + } + } +}