Review watchers

This commit is contained in:
Jöran Malek 2024-01-13 17:38:54 +01:00
parent 9684f15925
commit 99202af871
3 changed files with 42 additions and 40 deletions

View file

@ -144,69 +144,72 @@ public sealed class KeaDhcpLeaseWatcher : IHostedService
_eventChannel.Writer.TryComplete(); _eventChannel.Writer.TryComplete();
if (reader is { IsCompleted: false }) if (reader is { IsCompleted: false })
{ {
try await reader.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
{
await reader.ConfigureAwait(continueOnCapturedContext: false);
}
catch { }
} }
_pipe.Reset();
} }
} }
} }
private async Task FileReader(AutoResetEvent waitHandle, CancellationToken stoppingToken) private async Task FileReader(AutoResetEvent waitHandle, CancellationToken stoppingToken)
{ {
PipeWriter writer = _pipe.Writer;
SepReader? reader = null; SepReader? reader = null;
try try
{ {
using var file = new FileStream(Options.Leases, LeaseFileStreamOptions); PipeWriter writer = _pipe.Writer;
using (stoppingToken.Register(s => ((PipeWriter)s!).Complete(null), writer))
bool awaitLineFeed = false;
int newLinesEncountered = 0;
while (!stoppingToken.IsCancellationRequested)
{ {
for (; newLinesEncountered > 0; newLinesEncountered--) using var file = new FileStream(Options.Leases, LeaseFileStreamOptions);
bool awaitLineFeed = false;
int newLinesEncountered = 0;
while (!stoppingToken.IsCancellationRequested)
{ {
if (reader is null) for (; newLinesEncountered > 0; newLinesEncountered--)
{ {
reader = MemfileReader.From(_pipe.Reader.AsStream()); if (reader is null)
continue; {
reader = await Task.Factory.StartNew(
s => MemfileReader.From((Stream)s!),
_pipe.Reader.AsStream(),
stoppingToken,
TaskCreationOptions.AttachedToParent, TaskScheduler.Default);
continue;
}
if (!reader.MoveNext())
{
// TODO Error state.
return;
}
if (_handler.Handle(reader.Current) is not { } lease)
{
continue;
}
await _queue.Write(lease, stoppingToken).ConfigureAwait(false);
} }
if (!reader.MoveNext()) var memory = writer.GetMemory();
int read = await file.ReadAsync(memory, stoppingToken);
if (read > 0)
{ {
// TODO Error state. CountNewLines(_decoder, memory[..read], ref newLinesEncountered, ref awaitLineFeed);
return; writer.Advance(read);
await writer.FlushAsync(stoppingToken);
} }
else
if (_handler.Handle(reader.Current) is not { } lease)
{ {
continue; await waitHandle.WaitOneAsync(stoppingToken).ConfigureAwait(continueOnCapturedContext: false);
} }
await _queue.Write(lease, stoppingToken).ConfigureAwait(false);
}
var memory = writer.GetMemory();
int read = await file.ReadAsync(memory, stoppingToken);
if (read > 0)
{
CountNewLines(_decoder, memory[..read], ref newLinesEncountered, ref awaitLineFeed);
writer.Advance(read);
await writer.FlushAsync(stoppingToken);
}
else
{
await waitHandle.WaitOneAsync(stoppingToken).ConfigureAwait(continueOnCapturedContext: false);
} }
} }
} }
finally finally
{ {
reader?.Dispose(); reader?.Dispose();
writer.Complete();
_pipe.Reset();
} }
static void CountNewLines(Decoder decoder, in Memory<byte> memory, ref int newLinesEncountered, ref bool awaitLineFeed) static void CountNewLines(Decoder decoder, in Memory<byte> memory, ref int newLinesEncountered, ref bool awaitLineFeed)

View file

@ -65,7 +65,6 @@ builder.WebHost.ConfigureKestrel((context, options) =>
var path = PathEx.ExpandPath(pdnsOptions.Socket); var path = PathEx.ExpandPath(pdnsOptions.Socket);
FileInfo file = new(path); FileInfo file = new(path);
file.Directory!.Create(); file.Directory!.Create();
file.Delete();
options.ListenUnixSocket(path, options => options.ListenUnixSocket(path, options =>
{ {
options.UseConnectionHandler<PowerDnsHandler>(); options.UseConnectionHandler<PowerDnsHandler>();

View file

@ -19,7 +19,7 @@
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.0" PrivateAssets="all" /> <PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.0" PrivateAssets="all" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" /> <PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Systemd" Version="8.0.0" /> <PackageReference Include="Microsoft.Extensions.Hosting.Systemd" Version="8.0.0" />
<PackageReference Include="Sep" Version="0.3.0" /> <PackageReference Include="Sep" Version="0.4.0" />
<PackageReference Include="System.IO.Pipelines" Version="8.0.0" /> <PackageReference Include="System.IO.Pipelines" Version="8.0.0" />
</ItemGroup> </ItemGroup>