harden(notifications): per-item marker advance + Telegram client timeout

Review follow-ups: advance the dispatcher's "since" marker after each delivered
alert (not once per batch) so a future throwing sink can't re-deliver already-sent
alerts; give the Telegram HttpClient a 15s timeout so a hung connection can't stall
the dispatch loop.
This commit is contained in:
2026-05-29 01:08:02 +03:00
parent 68f3229c35
commit e307a54bec
2 changed files with 10 additions and 5 deletions
@@ -64,7 +64,8 @@ public static class InfrastructureModule
// Outbound anomaly notifications (Telegram). Sink + dispatcher are always // Outbound anomaly notifications (Telegram). Sink + dispatcher are always
// registered; the dispatcher idles until Notifications:Enabled is true and // registered; the dispatcher idles until Notifications:Enabled is true and
// the sink no-ops until a bot token + chat id are configured. // the sink no-ops until a bot token + chat id are configured.
services.AddHttpClient(TelegramNotificationSink.HttpClientName); services.AddHttpClient(TelegramNotificationSink.HttpClientName, client =>
client.Timeout = TimeSpan.FromSeconds(15));
services.AddSingleton<INotificationSink, TelegramNotificationSink>(); services.AddSingleton<INotificationSink, TelegramNotificationSink>();
services.AddHostedService<AnomalyNotificationDispatcher>(); services.AddHostedService<AnomalyNotificationDispatcher>();
@@ -62,18 +62,22 @@ internal sealed class AnomalyNotificationDispatcher : BackgroundService
var useCase = scope.ServiceProvider.GetRequiredService<GetPendingAnomalyNotificationsUseCase>(); var useCase = scope.ServiceProvider.GetRequiredService<GetPendingAnomalyNotificationsUseCase>();
var pending = await useCase.ExecuteAsync(_since, opts.MinScore, stoppingToken); var pending = await useCase.ExecuteAsync(_since, opts.MinScore, stoppingToken);
var dispatched = 0;
foreach (var notification in pending) foreach (var notification in pending)
{ {
stoppingToken.ThrowIfCancellationRequested(); stoppingToken.ThrowIfCancellationRequested();
await _sink.SendAsync(notification, stoppingToken); await _sink.SendAsync(notification, stoppingToken);
// Advance the marker per delivered item (pending is oldest-first) so that
// if a future sink ever threw mid-batch, the already-sent alerts are not
// re-delivered on the next cycle — only the unsent tail is retried.
_since = notification.DetectedAt.AddTicks(1);
dispatched++;
} }
if (pending.Count > 0) if (dispatched > 0)
{ {
// pending is oldest-first; advance strictly past the newest sent.
_since = pending[^1].DetectedAt.AddTicks(1);
_logger.LogInformation( _logger.LogInformation(
"AnomalyNotificationDispatcher: dispatched {Count} alert(s)", pending.Count); "AnomalyNotificationDispatcher: dispatched {Count} alert(s)", dispatched);
} }
} }
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)