feat(notifications): config-gated Telegram anomaly alerts

- INotificationSink + AnomalyNotification (Application) and a testable
  GetPendingAnomalyNotificationsUseCase (date+score filter, event-title join,
  oldest-first). 4 use-case tests.
- TelegramNotificationSink posts to the Bot API via HttpClient (no SDK dependency);
  no-ops with a warning when unconfigured and never logs the token.
- AnomalyNotificationDispatcher BackgroundService: startup-baselined marker advanced
  past the newest sent (gap- and dup-free); idles until Notifications:Enabled.
- Wire options + named client + sink + dispatcher in InfrastructureModule. Add a
  secret-free Notifications section + steam-move tuning to appsettings.json
  (bot token + chat id go in appsettings.Local.json only).
This commit is contained in:
2026-05-29 00:59:57 +03:00
parent 2e53dff853
commit 005d4e794a
9 changed files with 418 additions and 1 deletions
@@ -0,0 +1,28 @@
using Marathon.Domain.Enums;
namespace Marathon.Application.Abstractions;
/// <summary>
/// A ready-to-deliver anomaly alert payload, shaped by
/// <see cref="UseCases.GetPendingAnomalyNotificationsUseCase"/> so a sink only has to
/// format and transmit it.
/// </summary>
public sealed record AnomalyNotification(
Guid AnomalyId,
string EventTitle,
AnomalyKind Kind,
decimal Score,
DateTimeOffset DetectedAt);
/// <summary>
/// A channel that delivers anomaly alerts (e.g. Telegram; future: email / Discord).
/// </summary>
/// <remarks>
/// Implementations must be resilient: a failure to deliver one notification should be
/// logged and swallowed, never thrown into the dispatcher loop. A sink that is not
/// configured (e.g. missing credentials) should no-op with a warning.
/// </remarks>
public interface INotificationSink
{
Task SendAsync(AnomalyNotification notification, CancellationToken ct);
}
@@ -31,6 +31,7 @@ public static class ApplicationModule
services.AddScoped<ExportToExcelUseCase>();
services.AddScoped<DetectAnomaliesUseCase>();
services.AddScoped<EvaluateAnomalyOutcomesUseCase>();
services.AddScoped<GetPendingAnomalyNotificationsUseCase>();
services.AddScoped<RecordPlacedBetUseCase>();
services.AddScoped<ResolvePendingBetsUseCase>();
@@ -0,0 +1,62 @@
using Marathon.Application.Abstractions;
using Microsoft.Extensions.Logging;
namespace Marathon.Application.UseCases;
/// <summary>
/// Shapes the anomalies worth alerting on: those detected at or after a caller-supplied
/// marker whose score clears a minimum, joined with their event titles. Pure of any
/// transport concern — the dispatcher decides cadence and the sink decides delivery.
/// </summary>
/// <remarks>
/// Results are ordered oldest-first so the caller can advance its "since" marker to the
/// last item's <see cref="AnomalyNotification.DetectedAt"/> (plus one tick) without gaps
/// or duplicates.
/// </remarks>
public sealed class GetPendingAnomalyNotificationsUseCase
{
private readonly IAnomalyRepository _anomalies;
private readonly IEventRepository _events;
private readonly ILogger<GetPendingAnomalyNotificationsUseCase> _logger;
public GetPendingAnomalyNotificationsUseCase(
IAnomalyRepository anomalies,
IEventRepository events,
ILogger<GetPendingAnomalyNotificationsUseCase> logger)
{
_anomalies = anomalies ?? throw new ArgumentNullException(nameof(anomalies));
_events = events ?? throw new ArgumentNullException(nameof(events));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<IReadOnlyList<AnomalyNotification>> ExecuteAsync(
DateTimeOffset since,
decimal minScore,
CancellationToken ct)
{
// Date filter pushed to SQL; score filter is cheap in memory over the small slice.
var recent = await _anomalies.ListByDateRangeAsync(since, to: null, ct).ConfigureAwait(false);
var qualifying = recent.Where(a => a.Score >= minScore).ToList();
if (qualifying.Count == 0)
return Array.Empty<AnomalyNotification>();
var eventIds = qualifying.Select(a => a.EventId).Distinct().ToList();
var events = await _events.GetManyAsync(eventIds, ct).ConfigureAwait(false);
var notifications = qualifying
.OrderBy(a => a.DetectedAt)
.Select(a => new AnomalyNotification(
AnomalyId: a.Id,
EventTitle: events.TryGetValue(a.EventId, out var ev) ? ev.Title : a.EventId.Value,
Kind: a.Kind,
Score: a.Score,
DetectedAt: a.DetectedAt))
.ToList();
_logger.LogDebug(
"GetPendingAnomalyNotificationsUseCase: {Count} alert(s) since {Since:O} at minScore {MinScore}",
notifications.Count, since, minScore);
return notifications;
}
}
@@ -41,7 +41,14 @@
"SuspensionGapSeconds": 60,
"OddsFlipThreshold": 0.30,
"MinSnapshotCount": 3,
"DetectionIntervalSeconds": 60
"DetectionIntervalSeconds": 60,
"SteamMoveWindowSeconds": 120,
"SteamMoveDriftThreshold": 0.20
},
"Notifications": {
"Enabled": false,
"MinScore": 0.45,
"PollIntervalSeconds": 60
},
"Localization": {
"DefaultCulture": "ru-RU"
@@ -0,0 +1,30 @@
namespace Marathon.Infrastructure.Configuration;
/// <summary>
/// Options for outbound anomaly notifications, bound from the <c>Notifications</c>
/// config section.
/// </summary>
/// <remarks>
/// Disabled by default. <see cref="TelegramBotToken"/> and <see cref="TelegramChatId"/>
/// are secrets — set them ONLY in <c>appsettings.Local.json</c> (gitignored) or an
/// environment variable, never in the committed <c>appsettings.json</c>.
/// </remarks>
public sealed class NotificationOptions
{
public const string SectionName = "Notifications";
/// <summary>Master switch — when false, the dispatcher idles and nothing is sent.</summary>
public bool Enabled { get; init; }
/// <summary>Telegram bot token (secret — Local.json / env only).</summary>
public string? TelegramBotToken { get; init; }
/// <summary>Telegram chat id to deliver alerts to (secret — Local.json / env only).</summary>
public string? TelegramChatId { get; init; }
/// <summary>Minimum anomaly score to alert on. Default: 0.45 (Medium severity).</summary>
public decimal MinScore { get; init; } = 0.45m;
/// <summary>Seconds between dispatcher polls. Default: 60.</summary>
public int PollIntervalSeconds { get; init; } = 60;
}
@@ -1,5 +1,7 @@
using Marathon.Application.Abstractions;
using Marathon.Application.Configuration;
using Marathon.Infrastructure.Configuration;
using Marathon.Infrastructure.Notifications;
using Marathon.Infrastructure.Persistence;
using Marathon.Infrastructure.Scraping;
using Marathon.Infrastructure.Workers;
@@ -50,11 +52,22 @@ public static class InfrastructureModule
.AddOptions<ScrapingThrottle>()
.Bind(config.GetSection(ScrapingThrottle.SectionName));
services
.AddOptions<NotificationOptions>()
.Bind(config.GetSection(NotificationOptions.SectionName));
services.AddHostedService<UpcomingEventsPoller>();
services.AddHostedService<LiveOddsPoller>();
services.AddHostedService<ResultsWatchListPoller>();
services.AddHostedService<AnomalyDetectionPoller>();
// Outbound anomaly notifications (Telegram). Sink + dispatcher are always
// registered; the dispatcher idles until Notifications:Enabled is true and
// the sink no-ops until a bot token + chat id are configured.
services.AddHttpClient(TelegramNotificationSink.HttpClientName);
services.AddSingleton<INotificationSink, TelegramNotificationSink>();
services.AddHostedService<AnomalyNotificationDispatcher>();
return services;
}
}
@@ -0,0 +1,79 @@
using System.Net.Http.Json;
using Marathon.Application.Abstractions;
using Marathon.Infrastructure.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Marathon.Infrastructure.Notifications;
/// <summary>
/// Delivers anomaly alerts to a Telegram chat via the Bot API <c>sendMessage</c>
/// endpoint, using a plain <see cref="HttpClient"/> (no third-party SDK dependency).
/// </summary>
/// <remarks>
/// No-ops with a warning when the bot token or chat id is not configured, so a
/// half-configured deployment degrades gracefully rather than throwing. The bot token
/// is never logged (it sits in the request URL only).
/// </remarks>
internal sealed class TelegramNotificationSink : INotificationSink
{
public const string HttpClientName = "telegram";
private readonly IHttpClientFactory _factory;
private readonly IOptionsMonitor<NotificationOptions> _opts;
private readonly ILogger<TelegramNotificationSink> _logger;
public TelegramNotificationSink(
IHttpClientFactory factory,
IOptionsMonitor<NotificationOptions> opts,
ILogger<TelegramNotificationSink> logger)
{
_factory = factory ?? throw new ArgumentNullException(nameof(factory));
_opts = opts ?? throw new ArgumentNullException(nameof(opts));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task SendAsync(AnomalyNotification notification, CancellationToken ct)
{
ArgumentNullException.ThrowIfNull(notification);
var opts = _opts.CurrentValue;
if (string.IsNullOrWhiteSpace(opts.TelegramBotToken) || string.IsNullOrWhiteSpace(opts.TelegramChatId))
{
_logger.LogWarning(
"TelegramNotificationSink: bot token / chat id not configured — skipping notification {AnomalyId}.",
notification.AnomalyId);
return;
}
var payload = new
{
chat_id = opts.TelegramChatId,
text = FormatMessage(notification),
disable_web_page_preview = true,
};
var client = _factory.CreateClient(HttpClientName);
var requestUri = $"https://api.telegram.org/bot{opts.TelegramBotToken}/sendMessage";
try
{
using var response = await client.PostAsJsonAsync(requestUri, payload, ct).ConfigureAwait(false);
if (!response.IsSuccessStatusCode)
{
// Never log the URL/token — only the status.
_logger.LogWarning(
"TelegramNotificationSink: send failed for {AnomalyId} with status {Status}.",
notification.AnomalyId, (int)response.StatusCode);
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogWarning(ex,
"TelegramNotificationSink: send threw for {AnomalyId}.", notification.AnomalyId);
}
}
private static string FormatMessage(AnomalyNotification n) =>
$"⚠ {n.Kind}\n{n.EventTitle}\nScore {n.Score:0.00} · {n.DetectedAt:yyyy-MM-dd HH:mm} MSK";
}
@@ -0,0 +1,102 @@
using Marathon.Application.Abstractions;
using Marathon.Application.UseCases;
using Marathon.Domain.ValueObjects;
using Marathon.Infrastructure.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Marathon.Infrastructure.Workers;
/// <summary>
/// Polls for newly detected anomalies above a configured score and pushes each to the
/// registered <see cref="INotificationSink"/>. Idle (cheap re-check) while
/// <see cref="NotificationOptions.Enabled"/> is false.
/// </summary>
/// <remarks>
/// The "since" marker is baselined to startup time so pre-existing anomalies are not
/// re-announced on every restart, and is advanced past the newest dispatched item
/// (plus one tick) each cycle — gap-free and duplicate-free. The scoped use case is
/// resolved per cycle (EF Core DbContext lifetime); the sink is a singleton.
/// </remarks>
internal sealed class AnomalyNotificationDispatcher : BackgroundService
{
private readonly IServiceProvider _services;
private readonly INotificationSink _sink;
private readonly IOptionsMonitor<NotificationOptions> _opts;
private readonly ILogger<AnomalyNotificationDispatcher> _logger;
private DateTimeOffset _since;
public AnomalyNotificationDispatcher(
IServiceProvider services,
INotificationSink sink,
IOptionsMonitor<NotificationOptions> opts,
ILogger<AnomalyNotificationDispatcher> logger)
{
_services = services ?? throw new ArgumentNullException(nameof(services));
_sink = sink ?? throw new ArgumentNullException(nameof(sink));
_opts = opts ?? throw new ArgumentNullException(nameof(opts));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Baseline: only alert on anomalies detected after this service started.
_since = MoscowTime.Now;
_logger.LogInformation("AnomalyNotificationDispatcher: started");
while (!stoppingToken.IsCancellationRequested)
{
var opts = _opts.CurrentValue;
if (!opts.Enabled)
{
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
continue;
}
try
{
await using var scope = _services.CreateAsyncScope();
var useCase = scope.ServiceProvider.GetRequiredService<GetPendingAnomalyNotificationsUseCase>();
var pending = await useCase.ExecuteAsync(_since, opts.MinScore, stoppingToken);
foreach (var notification in pending)
{
stoppingToken.ThrowIfCancellationRequested();
await _sink.SendAsync(notification, stoppingToken);
}
if (pending.Count > 0)
{
// pending is oldest-first; advance strictly past the newest sent.
_since = pending[^1].DetectedAt.AddTicks(1);
_logger.LogInformation(
"AnomalyNotificationDispatcher: dispatched {Count} alert(s)", pending.Count);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex,
"AnomalyNotificationDispatcher: cycle failed — will retry after interval");
}
var interval = TimeSpan.FromSeconds(Math.Max(5, opts.PollIntervalSeconds));
try
{
await Task.Delay(interval, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
}
_logger.LogInformation("AnomalyNotificationDispatcher: stopping");
}
}
@@ -0,0 +1,95 @@
using FluentAssertions;
using Marathon.Application.Abstractions;
using Marathon.Application.UseCases;
using Marathon.Domain.Entities;
using Marathon.Domain.Enums;
using Marathon.Domain.ValueObjects;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
namespace Marathon.Application.Tests.UseCases;
public sealed class GetPendingAnomalyNotificationsUseCaseTests
{
private readonly IAnomalyRepository _anomalies = Substitute.For<IAnomalyRepository>();
private readonly IEventRepository _events = Substitute.For<IEventRepository>();
private static readonly TimeSpan MoscowOffset = TimeSpan.FromHours(3);
private static readonly DateTimeOffset BaseTime = new(2026, 5, 10, 18, 0, 0, MoscowOffset);
public GetPendingAnomalyNotificationsUseCaseTests()
{
// Event titles are resolved via the batched GetManyAsync; route it through
// the per-id GetAsync stubs each test configures.
TestFixtures.BridgeGetMany(_events);
}
private GetPendingAnomalyNotificationsUseCase CreateSut() =>
new(_anomalies, _events, NullLogger<GetPendingAnomalyNotificationsUseCase>.Instance);
private static Anomaly MakeAnomaly(EventId id, decimal score, DateTimeOffset detectedAt) =>
new(Guid.NewGuid(), id, detectedAt, AnomalyKind.SuspensionFlip, score, "{}");
private static Event MakeEvent(EventId id) =>
new(id, new SportCode(11), "BY", "L1", "Cat", BaseTime, "Team A", "Team B");
private void StubAnomalies(params Anomaly[] anomalies) =>
_anomalies.ListByDateRangeAsync(
Arg.Any<DateTimeOffset?>(), Arg.Any<DateTimeOffset?>(), Arg.Any<CancellationToken>())
.Returns(anomalies.ToList().AsReadOnly());
[Fact]
public async Task Should_ExcludeAnomaliesBelowMinScore()
{
var id = new EventId("11111111");
StubAnomalies(
MakeAnomaly(id, 0.30m, BaseTime),
MakeAnomaly(id, 0.70m, BaseTime.AddMinutes(1)));
_events.GetAsync(id, Arg.Any<CancellationToken>()).Returns(MakeEvent(id));
var result = await CreateSut().ExecuteAsync(BaseTime.AddHours(-1), minScore: 0.45m, CancellationToken.None);
result.Should().ContainSingle();
result[0].Score.Should().Be(0.70m);
result[0].EventTitle.Should().Be("Team A vs Team B");
}
[Fact]
public async Task Should_OrderOldestFirst()
{
var id1 = new EventId("11111111");
var id2 = new EventId("22222222");
StubAnomalies(
MakeAnomaly(id2, 0.60m, BaseTime.AddMinutes(5)),
MakeAnomaly(id1, 0.50m, BaseTime));
_events.GetAsync(id1, Arg.Any<CancellationToken>()).Returns(MakeEvent(id1));
_events.GetAsync(id2, Arg.Any<CancellationToken>()).Returns(MakeEvent(id2));
var result = await CreateSut().ExecuteAsync(BaseTime.AddHours(-1), 0.45m, CancellationToken.None);
result.Select(n => n.DetectedAt).Should().BeInAscendingOrder();
}
[Fact]
public async Task Should_FallBackToEventId_When_EventMissing()
{
var id = new EventId("orphan00");
StubAnomalies(MakeAnomaly(id, 0.55m, BaseTime));
_events.GetAsync(id, Arg.Any<CancellationToken>()).Returns((Event?)null);
var result = await CreateSut().ExecuteAsync(BaseTime.AddHours(-1), 0.45m, CancellationToken.None);
result.Should().ContainSingle();
result[0].EventTitle.Should().Be("orphan00");
}
[Fact]
public async Task Should_ReturnEmpty_When_NothingQualifies()
{
StubAnomalies(Array.Empty<Anomaly>());
var result = await CreateSut().ExecuteAsync(BaseTime, 0.45m, CancellationToken.None);
result.Should().BeEmpty();
}
}