From e307a54bec4620ebef5b8a4e03ad1aecbc6842b4 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Fri, 29 May 2026 01:08:02 +0300 Subject: [PATCH] harden(notifications): per-item marker advance + Telegram client timeout Review follow-ups: advance the dispatcher's "since" marker after each delivered alert (not once per batch) so a future throwing sink can't re-deliver already-sent alerts; give the Telegram HttpClient a 15s timeout so a hung connection can't stall the dispatch loop. --- src/Marathon.Infrastructure/InfrastructureModule.cs | 3 ++- .../Workers/AnomalyNotificationDispatcher.cs | 12 ++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/Marathon.Infrastructure/InfrastructureModule.cs b/src/Marathon.Infrastructure/InfrastructureModule.cs index fecb1e9..d70cc75 100644 --- a/src/Marathon.Infrastructure/InfrastructureModule.cs +++ b/src/Marathon.Infrastructure/InfrastructureModule.cs @@ -64,7 +64,8 @@ public static class InfrastructureModule // Outbound anomaly notifications (Telegram). Sink + dispatcher are always // registered; the dispatcher idles until Notifications:Enabled is true and // the sink no-ops until a bot token + chat id are configured. - services.AddHttpClient(TelegramNotificationSink.HttpClientName); + services.AddHttpClient(TelegramNotificationSink.HttpClientName, client => + client.Timeout = TimeSpan.FromSeconds(15)); services.AddSingleton(); services.AddHostedService(); diff --git a/src/Marathon.Infrastructure/Workers/AnomalyNotificationDispatcher.cs b/src/Marathon.Infrastructure/Workers/AnomalyNotificationDispatcher.cs index d2c80e4..5581866 100644 --- a/src/Marathon.Infrastructure/Workers/AnomalyNotificationDispatcher.cs +++ b/src/Marathon.Infrastructure/Workers/AnomalyNotificationDispatcher.cs @@ -62,18 +62,22 @@ internal sealed class AnomalyNotificationDispatcher : BackgroundService var useCase = scope.ServiceProvider.GetRequiredService(); var pending = await useCase.ExecuteAsync(_since, opts.MinScore, stoppingToken); + var dispatched = 0; foreach (var notification in pending) { stoppingToken.ThrowIfCancellationRequested(); await _sink.SendAsync(notification, stoppingToken); + // Advance the marker per delivered item (pending is oldest-first) so that + // if a future sink ever threw mid-batch, the already-sent alerts are not + // re-delivered on the next cycle — only the unsent tail is retried. + _since = notification.DetectedAt.AddTicks(1); + dispatched++; } - if (pending.Count > 0) + if (dispatched > 0) { - // pending is oldest-first; advance strictly past the newest sent. - _since = pending[^1].DetectedAt.AddTicks(1); _logger.LogInformation( - "AnomalyNotificationDispatcher: dispatched {Count} alert(s)", pending.Count); + "AnomalyNotificationDispatcher: dispatched {Count} alert(s)", dispatched); } } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)