Dhcp Lease Queue worker

This commit is contained in:
Jöran Malek 2023-12-29 18:31:13 +01:00
parent 39020c75ec
commit 0f9f127494
14 changed files with 164 additions and 74 deletions

View file

@ -7,7 +7,23 @@ namespace pdns_dhcp.Dhcp;
public class DhcpLeaseQueue public class DhcpLeaseQueue
{ {
private readonly Channel<DhcpLeaseChange> _pipe = Channel.CreateUnbounded<DhcpLeaseChange>(); private readonly Channel<DhcpLeaseChange> _pipe;
private readonly ChannelReader<DhcpLeaseChange> _reader;
private readonly ChannelWriter<DhcpLeaseChange> _writer;
public ref readonly ChannelReader<DhcpLeaseChange> Reader => ref _reader;
public DhcpLeaseQueue()
{
_pipe = Channel.CreateUnbounded<DhcpLeaseChange>();
_reader = _pipe.Reader;
_writer = _pipe.Writer;
}
public ValueTask Write(DhcpLeaseChange change, CancellationToken cancellationToken = default)
{
return _writer.WriteAsync(change, cancellationToken);
}
} }
public readonly record struct DhcpLeaseChange(IPAddress Address, string FQDN, DhcpLeaseIdentifier Identifier, TimeSpan Lifetime) public readonly record struct DhcpLeaseChange(IPAddress Address, string FQDN, DhcpLeaseIdentifier Identifier, TimeSpan Lifetime)

View file

@ -40,10 +40,10 @@ public class DnsRepository
cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
} }
public async ValueTask Record(DhcpLeaseChange leaseChange) public async ValueTask Record(DhcpLeaseChange leaseChange, CancellationToken cancellationToken = default)
{ {
// just lock that thing. // just lock that thing.
using (await _recordLock.AcquireLockAsync(CancellationToken.None).ConfigureAwait(false)) using (await _recordLock.AcquireLockAsync(cancellationToken).ConfigureAwait(false))
{ {
RecordContinuation(leaseChange); RecordContinuation(leaseChange);
} }

View file

@ -1,8 +1,10 @@
using nietras.SeparatedValues; using nietras.SeparatedValues;
using pdns_dhcp.Dhcp;
namespace pdns_dhcp.Kea; namespace pdns_dhcp.Kea;
public interface IKeaDhcpLeaseHandler public interface IKeaDhcpLeaseHandler
{ {
void Handle(in SepReader.Row row); DhcpLeaseChange? Handle(in SepReader.Row row);
} }

View file

@ -0,0 +1,14 @@
using pdns_dhcp.Options;
using Stl.Interception;
namespace pdns_dhcp.Kea;
public interface IKeaFactory : IRequiresFullProxy
{
KeaDhcp4LeaseHandler CreateHandler4();
KeaDhcp6LeaseHandler CreateHandler6();
KeaDhcpLeaseWatcher CreateWatcher(IKeaDhcpLeaseHandler handler, KeaDhcpServerOptions options);
}

View file

@ -1,11 +1,15 @@
using nietras.SeparatedValues; using nietras.SeparatedValues;
using pdns_dhcp.Dhcp;
namespace pdns_dhcp.Kea; namespace pdns_dhcp.Kea;
public class KeaDhcp4LeaseHandler : IKeaDhcpLeaseHandler public class KeaDhcp4LeaseHandler : IKeaDhcpLeaseHandler
{ {
public void Handle(in SepReader.Row row) public DhcpLeaseChange? Handle(in SepReader.Row row)
{ {
KeaDhcp4Lease lease = KeaDhcp4Lease.Parse(row); KeaDhcp4Lease lease = KeaDhcp4Lease.Parse(row);
return default;
} }
} }

View file

@ -1,11 +1,15 @@
using nietras.SeparatedValues; using nietras.SeparatedValues;
using pdns_dhcp.Dhcp;
namespace pdns_dhcp.Kea; namespace pdns_dhcp.Kea;
public class KeaDhcp6LeaseHandler : IKeaDhcpLeaseHandler public class KeaDhcp6LeaseHandler : IKeaDhcpLeaseHandler
{ {
public void Handle(in SepReader.Row row) public DhcpLeaseChange? Handle(in SepReader.Row row)
{ {
KeaDhcp6Lease lease = KeaDhcp6Lease.Parse(row); KeaDhcp6Lease lease = KeaDhcp6Lease.Parse(row);
return default;
} }
} }

View file

@ -7,36 +7,38 @@ using Microsoft.Extensions.Hosting;
using nietras.SeparatedValues; using nietras.SeparatedValues;
using pdns_dhcp.Dhcp;
using pdns_dhcp.Options; using pdns_dhcp.Options;
namespace pdns_dhcp.Kea; namespace pdns_dhcp.Kea;
public sealed class KeaDhcpLeaseWatcher<T> : IHostedService public sealed class KeaDhcpLeaseWatcher : IHostedService
where T : IKeaDhcpLeaseHandler
{ {
private static readonly FileStreamOptions LeaseFileStreamOptions = new() private static readonly FileStreamOptions LeaseFileStreamOptions = new()
{ {
Access = FileAccess.Read, Access = FileAccess.Read,
Mode = FileMode.Open, Mode = FileMode.Open,
Options = FileOptions.SequentialScan, Options = FileOptions.SequentialScan | FileOptions.Asynchronous,
Share = (FileShare)7, Share = (FileShare)7,
}; };
private readonly Decoder _decoder; private readonly Decoder _decoder;
private readonly FileSystemWatcher _fsw; private readonly FileSystemWatcher _fsw;
private readonly T _handler; private readonly IKeaDhcpLeaseHandler _handler;
private readonly string _leaseFile; private readonly string _leaseFile;
private readonly Pipe _pipe; private readonly Pipe _pipe;
private readonly DhcpLeaseQueue _queue;
private Channel<FileSystemEventArgs>? _eventChannel; private Channel<FileSystemEventArgs>? _eventChannel;
private Task? _executeTask; private Task? _executeTask;
private CancellationTokenSource? _stoppingCts; private CancellationTokenSource? _stoppingCts;
private KeaDhcpServerOptions Options { get; } private KeaDhcpServerOptions Options { get; }
public KeaDhcpLeaseWatcher(KeaDhcpServerOptions options, T handler) public KeaDhcpLeaseWatcher(KeaDhcpServerOptions options, IKeaDhcpLeaseHandler handler, DhcpLeaseQueue queue)
{ {
Options = options = options with { Leases = PathEx.ExpandPath(options.Leases) }; Options = options = options with { Leases = PathEx.ExpandPath(options.Leases) };
_handler = handler; _handler = handler;
_queue = queue;
var leases = options.Leases.AsSpan(); var leases = options.Leases.AsSpan();
if (leases.IsWhiteSpace()) if (leases.IsWhiteSpace())
@ -177,7 +179,12 @@ public sealed class KeaDhcpLeaseWatcher<T> : IHostedService
return; return;
} }
_handler.Handle(reader.Current); if (_handler.Handle(reader.Current) is not { } lease)
{
continue;
}
await _queue.Write(lease, stoppingToken).ConfigureAwait(false);
} }
var memory = writer.GetMemory(); var memory = writer.GetMemory();

View file

@ -0,0 +1,50 @@
using System.Collections.Immutable;
using Microsoft.Extensions.Hosting;
using pdns_dhcp.Options;
namespace pdns_dhcp.Kea;
public class KeaService : IHostedService
{
private readonly ImmutableArray<IHostedService> _services;
public KeaService(KeaDhcpOptions options, IKeaFactory factory)
{
if (options.Dhcp4 is { } dhcp4Options)
{
_services.Add(factory.CreateWatcher(factory.CreateHandler4(), dhcp4Options));
}
if (options.Dhcp6 is { } dhcp6Options)
{
_services.Add(factory.CreateWatcher(factory.CreateHandler6(), dhcp6Options));
}
}
public Task StartAsync(CancellationToken cancellationToken = default)
{
Task[] tasks = new Task[_services.Length];
for (int i = 0; i < tasks.Length; i++)
{
tasks[i] = _services[i].StartAsync(cancellationToken);
}
return Task.WhenAll(tasks);
}
public async Task StopAsync(CancellationToken cancellationToken = default)
{
Task[] tasks = new Task[_services.Length];
for (int i = 0; i < tasks.Length; i++)
{
tasks[i] = _services[i].StopAsync(cancellationToken);
}
var waitTask = Task.WhenAll(tasks);
TaskCompletionSource taskCompletionSource = new();
using var registration = cancellationToken.Register(s => ((TaskCompletionSource)s!).SetCanceled(), taskCompletionSource);
await Task.WhenAny(waitTask, taskCompletionSource.Task).ConfigureAwait(continueOnCapturedContext: false);
}
}

View file

@ -9,6 +9,8 @@ using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using pdns_dhcp.Connections; using pdns_dhcp.Connections;
using pdns_dhcp.Dhcp;
using pdns_dhcp.Dns;
using pdns_dhcp.Kea; using pdns_dhcp.Kea;
using pdns_dhcp.Options; using pdns_dhcp.Options;
using pdns_dhcp.PowerDns; using pdns_dhcp.PowerDns;
@ -21,13 +23,14 @@ var builder = WebApplication.CreateBuilder(args);
builder.Services.Configure<DhcpOptions>(builder.Configuration.GetRequiredSection("Dhcp")); builder.Services.Configure<DhcpOptions>(builder.Configuration.GetRequiredSection("Dhcp"));
builder.Services.Configure<PowerDnsOptions>(builder.Configuration.GetRequiredSection("PowerDns")); builder.Services.Configure<PowerDnsOptions>(builder.Configuration.GetRequiredSection("PowerDns"));
builder.Services.AddHostedService<DhcpLeaseWatcher>(); builder.Services.AddHostedService<DhcpWatcher>();
builder.Services.AddHostedService<PowerDnsBackend>(); builder.Services.AddHostedService<DnsQueueWorker>();
builder.Services.AddTypedFactory<IDhcpLeaseWatcherFactory>(); builder.Services.AddSingleton<DhcpLeaseQueue>();
builder.Services.AddSingleton<DnsRepository>();
builder.Services.AddTransient<KeaDhcp4LeaseHandler>(); builder.Services.AddTypedFactory<IDhcpWatcherFactory>();
builder.Services.AddTransient<KeaDhcp6LeaseHandler>(); builder.Services.AddTypedFactory<IKeaFactory>();
builder.Services.Configure<SocketTransportOptions>(options => builder.Services.Configure<SocketTransportOptions>(options =>
{ {

View file

@ -7,31 +7,23 @@ using pdns_dhcp.Options;
namespace pdns_dhcp.Services; namespace pdns_dhcp.Services;
public class DhcpLeaseWatcher : IHostedService public class DhcpWatcher : IHostedService
{ {
private readonly ImmutableArray<IHostedService> _services; private readonly ImmutableArray<IHostedService> _services;
public DhcpLeaseWatcher(IOptions<DhcpOptions> options, IDhcpLeaseWatcherFactory factory) public DhcpWatcher(IOptions<DhcpOptions> options, IDhcpWatcherFactory factory)
{ {
var dhcpOptions = options.Value; var dhcpOptions = options.Value;
var services = ImmutableArray.CreateBuilder<IHostedService>(); var services = ImmutableArray.CreateBuilder<IHostedService>();
if (dhcpOptions.Kea is { } keaOptions) if (dhcpOptions.Kea is { } keaOptions)
{ {
if (keaOptions.Dhcp4 is { } dhcp4Options) services.Add(factory.KeaService(keaOptions));
{
services.Add(factory.KeaDhcp4Watcher(dhcp4Options));
}
if (keaOptions.Dhcp6 is { } dhcp6Options)
{
services.Add(factory.KeaDhcp6Watcher(dhcp6Options));
}
} }
_services = services.DrainToImmutable(); _services = services.DrainToImmutable();
} }
public Task StartAsync(CancellationToken cancellationToken) public Task StartAsync(CancellationToken cancellationToken = default)
{ {
Task[] tasks = new Task[_services.Length]; Task[] tasks = new Task[_services.Length];
for (int i = 0; i < tasks.Length; i++) for (int i = 0; i < tasks.Length; i++)
@ -42,7 +34,7 @@ public class DhcpLeaseWatcher : IHostedService
return Task.WhenAll(tasks); return Task.WhenAll(tasks);
} }
public async Task StopAsync(CancellationToken cancellationToken) public async Task StopAsync(CancellationToken cancellationToken = default)
{ {
Task[] tasks = new Task[_services.Length]; Task[] tasks = new Task[_services.Length];
for (int i = 0; i < tasks.Length; i++) for (int i = 0; i < tasks.Length; i++)

View file

@ -0,0 +1,31 @@
using System.Threading.Channels;
using Microsoft.Extensions.Hosting;
using pdns_dhcp.Dhcp;
using pdns_dhcp.Dns;
namespace pdns_dhcp.Services;
public class DnsQueueWorker : BackgroundService
{
private readonly ChannelReader<DhcpLeaseChange> _channelReader;
private readonly DnsRepository _repository;
public DnsQueueWorker(DhcpLeaseQueue queue, DnsRepository repository)
{
_channelReader = queue.Reader;
_repository = repository;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (await _channelReader.WaitToReadAsync(stoppingToken).ConfigureAwait(false))
{
while (_channelReader.TryRead(out var lease))
{
await _repository.Record(default, stoppingToken).ConfigureAwait(false);
}
}
}
}

View file

@ -1,13 +0,0 @@
using pdns_dhcp.Kea;
using pdns_dhcp.Options;
using Stl.Interception;
namespace pdns_dhcp.Services;
public interface IDhcpLeaseWatcherFactory : IRequiresFullProxy
{
KeaDhcpLeaseWatcher<KeaDhcp4LeaseHandler> KeaDhcp4Watcher(KeaDhcpServerOptions options);
KeaDhcpLeaseWatcher<KeaDhcp6LeaseHandler> KeaDhcp6Watcher(KeaDhcpServerOptions options);
}

View file

@ -0,0 +1,11 @@
using pdns_dhcp.Kea;
using pdns_dhcp.Options;
using Stl.Interception;
namespace pdns_dhcp.Services;
public interface IDhcpWatcherFactory : IRequiresFullProxy
{
KeaService KeaService(KeaDhcpOptions options);
}

View file

@ -1,31 +0,0 @@
using Microsoft.Extensions.Hosting;
namespace pdns_dhcp.Services;
public class PowerDnsBackend : BackgroundService
{
public PowerDnsBackend()
{
}
~PowerDnsBackend()
{
DisposeCore();
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task.CompletedTask;
}
public override void Dispose()
{
base.Dispose();
DisposeCore();
GC.SuppressFinalize(this);
}
private void DisposeCore()
{
}
}