Handle synchronization

This commit is contained in:
Jöran Malek 2023-11-27 23:50:37 +01:00
parent 3d635ad4c1
commit 4e7ab4a452
2 changed files with 85 additions and 14 deletions

View file

@ -92,20 +92,22 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService
var loopToken = loopCts.Token;
_eventChannel = Channel.CreateUnbounded<FileSystemEventArgs>();
_fsw.EnableRaisingEvents = true;
using SemaphoreSlim waitTrap = new(0, 1);
using SemaphoreSlim changeEvent = new(0, 1);
Task reader = FileReader(waitTrap, changeEvent, loopToken);
using AutoResetEvent resetEvent = new(false);
Task reader = Task.CompletedTask;
try
{
await foreach (var @event in _eventChannel.Reader.ReadAllAsync(loopToken))
for (
EventArgs readArgs = EventArgs.Empty;
!loopToken.IsCancellationRequested;
readArgs = await _eventChannel.Reader.ReadAsync(loopToken))
{
// Guard for Deleted and renamed away events,
// both have to stop this reader immediately.
// Just wait for the file being created/renamed to _leaseFile.
// Described in [The LFC Process](https://kea.readthedocs.io/en/latest/arm/lfc.html#kea-lfc)
switch (@event)
switch (readArgs)
{
case { ChangeType: WatcherChangeTypes.Deleted }:
case FileSystemEventArgs { ChangeType: WatcherChangeTypes.Deleted }:
case RenamedEventArgs renamed when renamed.OldName == _leaseFile:
loopCts.Cancel();
continue;
@ -113,12 +115,15 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService
if (reader is not { IsCompleted: false })
{
reader = FileReader(waitTrap, changeEvent, loopToken);
// In any case that the reader failed (for whatever reason)
// restart now.
// Incoming event could be Changed/Created/Renamed
// This doesn't care, as we already lost the file handle.
reader = FileReader(resetEvent, stoppingToken);
}
if (waitTrap.Wait(0, CancellationToken.None))
else
{
changeEvent.Release();
resetEvent.Set();
}
}
}
@ -138,7 +143,7 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService
}
}
private async Task FileReader(SemaphoreSlim waitTrap, SemaphoreSlim changeEvent, CancellationToken stoppingToken)
private async Task FileReader(AutoResetEvent waitHandle, CancellationToken stoppingToken)
{
PipeWriter writer = _pipe.Writer;
SepReader? reader = null;
@ -174,9 +179,7 @@ public abstract class KeaDhcpLeaseWatcher : IHostedService
}
else
{
var acquireLock = changeEvent.WaitAsync(stoppingToken);
waitTrap.Release();
await acquireLock.ConfigureAwait(continueOnCapturedContext: false);
await waitHandle.WaitOneAsync(stoppingToken).ConfigureAwait(continueOnCapturedContext: false);
}
}
}

View file

@ -0,0 +1,68 @@
namespace System.Threading.Tasks;
public static class AsyncWaitHandle
{
public static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken = default)
{
WaitOneAsyncState state = new(waitHandle, cancellationToken);
return state.WaitOneAsync();
}
private class WaitOneAsyncState : IDisposable
{
private readonly CancellationTokenRegistration _cancellationTokenRegistration;
private readonly RegisteredWaitHandle _registeredWaitHandle;
private readonly TaskCompletionSource _tcs;
private bool _disposed;
public WaitOneAsyncState(WaitHandle waitHandle, CancellationToken cancellationToken)
{
_tcs = new();
_cancellationTokenRegistration = cancellationToken.Register((state, token) => ((WaitOneAsyncState)state!).Canceled(token), this);
_registeredWaitHandle = ThreadPool.RegisterWaitForSingleObject(waitHandle, (state, timeout) => ((WaitOneAsyncState)state!).Signaled(timeout), this, Timeout.Infinite, true);
}
public void Dispose()
{
if (_disposed)
{
return;
}
_registeredWaitHandle.Unregister(default);
_cancellationTokenRegistration.Dispose();
_disposed = true;
}
public Task WaitOneAsync()
{
const TaskContinuationOptions options = TaskContinuationOptions.AttachedToParent | TaskContinuationOptions.ExecuteSynchronously;
return _tcs.Task.ContinueWith((upstream, state) => ((WaitOneAsyncState)state!).Continuation(upstream), this, options).Unwrap();
}
private void Canceled(CancellationToken token)
{
_tcs.SetCanceled(token);
this.Dispose();
}
private Task Continuation(Task task)
{
this.Dispose();
return task;
}
private void Signaled(bool timeout)
{
if (timeout)
{
Canceled(CancellationToken.None);
}
else
{
_tcs.SetResult();
}
}
}
}