diff --git a/src/Marathon.Application/Abstractions/INotificationSink.cs b/src/Marathon.Application/Abstractions/INotificationSink.cs new file mode 100644 index 0000000..9c613c0 --- /dev/null +++ b/src/Marathon.Application/Abstractions/INotificationSink.cs @@ -0,0 +1,28 @@ +using Marathon.Domain.Enums; + +namespace Marathon.Application.Abstractions; + +/// +/// A ready-to-deliver anomaly alert payload, shaped by +/// so a sink only has to +/// format and transmit it. +/// +public sealed record AnomalyNotification( + Guid AnomalyId, + string EventTitle, + AnomalyKind Kind, + decimal Score, + DateTimeOffset DetectedAt); + +/// +/// A channel that delivers anomaly alerts (e.g. Telegram; future: email / Discord). +/// +/// +/// Implementations must be resilient: a failure to deliver one notification should be +/// logged and swallowed, never thrown into the dispatcher loop. A sink that is not +/// configured (e.g. missing credentials) should no-op with a warning. +/// +public interface INotificationSink +{ + Task SendAsync(AnomalyNotification notification, CancellationToken ct); +} diff --git a/src/Marathon.Application/ApplicationModule.cs b/src/Marathon.Application/ApplicationModule.cs index 0b47a67..90c98ba 100644 --- a/src/Marathon.Application/ApplicationModule.cs +++ b/src/Marathon.Application/ApplicationModule.cs @@ -31,6 +31,7 @@ public static class ApplicationModule services.AddScoped(); services.AddScoped(); services.AddScoped(); + services.AddScoped(); services.AddScoped(); services.AddScoped(); diff --git a/src/Marathon.Application/UseCases/GetPendingAnomalyNotificationsUseCase.cs b/src/Marathon.Application/UseCases/GetPendingAnomalyNotificationsUseCase.cs new file mode 100644 index 0000000..908d027 --- /dev/null +++ b/src/Marathon.Application/UseCases/GetPendingAnomalyNotificationsUseCase.cs @@ -0,0 +1,62 @@ +using Marathon.Application.Abstractions; +using Microsoft.Extensions.Logging; + +namespace Marathon.Application.UseCases; + +/// +/// Shapes the anomalies worth alerting on: those detected at or after a caller-supplied +/// marker whose score clears a minimum, joined with their event titles. Pure of any +/// transport concern — the dispatcher decides cadence and the sink decides delivery. +/// +/// +/// Results are ordered oldest-first so the caller can advance its "since" marker to the +/// last item's (plus one tick) without gaps +/// or duplicates. +/// +public sealed class GetPendingAnomalyNotificationsUseCase +{ + private readonly IAnomalyRepository _anomalies; + private readonly IEventRepository _events; + private readonly ILogger _logger; + + public GetPendingAnomalyNotificationsUseCase( + IAnomalyRepository anomalies, + IEventRepository events, + ILogger logger) + { + _anomalies = anomalies ?? throw new ArgumentNullException(nameof(anomalies)); + _events = events ?? throw new ArgumentNullException(nameof(events)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task> ExecuteAsync( + DateTimeOffset since, + decimal minScore, + CancellationToken ct) + { + // Date filter pushed to SQL; score filter is cheap in memory over the small slice. + var recent = await _anomalies.ListByDateRangeAsync(since, to: null, ct).ConfigureAwait(false); + var qualifying = recent.Where(a => a.Score >= minScore).ToList(); + if (qualifying.Count == 0) + return Array.Empty(); + + var eventIds = qualifying.Select(a => a.EventId).Distinct().ToList(); + var events = await _events.GetManyAsync(eventIds, ct).ConfigureAwait(false); + + var notifications = qualifying + .OrderBy(a => a.DetectedAt) + .Select(a => new AnomalyNotification( + AnomalyId: a.Id, + EventTitle: events.TryGetValue(a.EventId, out var ev) ? ev.Title : a.EventId.Value, + Kind: a.Kind, + Score: a.Score, + DetectedAt: a.DetectedAt)) + .ToList(); + + _logger.LogDebug( + "GetPendingAnomalyNotificationsUseCase: {Count} alert(s) since {Since:O} at minScore {MinScore}", + notifications.Count, since, minScore); + + return notifications; + } +} diff --git a/src/Marathon.Hosts.WpfBlazor/appsettings.json b/src/Marathon.Hosts.WpfBlazor/appsettings.json index 21c5052..ee5ae74 100644 --- a/src/Marathon.Hosts.WpfBlazor/appsettings.json +++ b/src/Marathon.Hosts.WpfBlazor/appsettings.json @@ -41,7 +41,14 @@ "SuspensionGapSeconds": 60, "OddsFlipThreshold": 0.30, "MinSnapshotCount": 3, - "DetectionIntervalSeconds": 60 + "DetectionIntervalSeconds": 60, + "SteamMoveWindowSeconds": 120, + "SteamMoveDriftThreshold": 0.20 + }, + "Notifications": { + "Enabled": false, + "MinScore": 0.45, + "PollIntervalSeconds": 60 }, "Localization": { "DefaultCulture": "ru-RU" diff --git a/src/Marathon.Infrastructure/Configuration/NotificationOptions.cs b/src/Marathon.Infrastructure/Configuration/NotificationOptions.cs new file mode 100644 index 0000000..0f9afe3 --- /dev/null +++ b/src/Marathon.Infrastructure/Configuration/NotificationOptions.cs @@ -0,0 +1,30 @@ +namespace Marathon.Infrastructure.Configuration; + +/// +/// Options for outbound anomaly notifications, bound from the Notifications +/// config section. +/// +/// +/// Disabled by default. and +/// are secrets — set them ONLY in appsettings.Local.json (gitignored) or an +/// environment variable, never in the committed appsettings.json. +/// +public sealed class NotificationOptions +{ + public const string SectionName = "Notifications"; + + /// Master switch — when false, the dispatcher idles and nothing is sent. + public bool Enabled { get; init; } + + /// Telegram bot token (secret — Local.json / env only). + public string? TelegramBotToken { get; init; } + + /// Telegram chat id to deliver alerts to (secret — Local.json / env only). + public string? TelegramChatId { get; init; } + + /// Minimum anomaly score to alert on. Default: 0.45 (Medium severity). + public decimal MinScore { get; init; } = 0.45m; + + /// Seconds between dispatcher polls. Default: 60. + public int PollIntervalSeconds { get; init; } = 60; +} diff --git a/src/Marathon.Infrastructure/InfrastructureModule.cs b/src/Marathon.Infrastructure/InfrastructureModule.cs index 7280a7e..fecb1e9 100644 --- a/src/Marathon.Infrastructure/InfrastructureModule.cs +++ b/src/Marathon.Infrastructure/InfrastructureModule.cs @@ -1,5 +1,7 @@ +using Marathon.Application.Abstractions; using Marathon.Application.Configuration; using Marathon.Infrastructure.Configuration; +using Marathon.Infrastructure.Notifications; using Marathon.Infrastructure.Persistence; using Marathon.Infrastructure.Scraping; using Marathon.Infrastructure.Workers; @@ -50,11 +52,22 @@ public static class InfrastructureModule .AddOptions() .Bind(config.GetSection(ScrapingThrottle.SectionName)); + services + .AddOptions() + .Bind(config.GetSection(NotificationOptions.SectionName)); + services.AddHostedService(); services.AddHostedService(); services.AddHostedService(); services.AddHostedService(); + // Outbound anomaly notifications (Telegram). Sink + dispatcher are always + // registered; the dispatcher idles until Notifications:Enabled is true and + // the sink no-ops until a bot token + chat id are configured. + services.AddHttpClient(TelegramNotificationSink.HttpClientName); + services.AddSingleton(); + services.AddHostedService(); + return services; } } diff --git a/src/Marathon.Infrastructure/Notifications/TelegramNotificationSink.cs b/src/Marathon.Infrastructure/Notifications/TelegramNotificationSink.cs new file mode 100644 index 0000000..b9b114a --- /dev/null +++ b/src/Marathon.Infrastructure/Notifications/TelegramNotificationSink.cs @@ -0,0 +1,79 @@ +using System.Net.Http.Json; +using Marathon.Application.Abstractions; +using Marathon.Infrastructure.Configuration; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace Marathon.Infrastructure.Notifications; + +/// +/// Delivers anomaly alerts to a Telegram chat via the Bot API sendMessage +/// endpoint, using a plain (no third-party SDK dependency). +/// +/// +/// No-ops with a warning when the bot token or chat id is not configured, so a +/// half-configured deployment degrades gracefully rather than throwing. The bot token +/// is never logged (it sits in the request URL only). +/// +internal sealed class TelegramNotificationSink : INotificationSink +{ + public const string HttpClientName = "telegram"; + + private readonly IHttpClientFactory _factory; + private readonly IOptionsMonitor _opts; + private readonly ILogger _logger; + + public TelegramNotificationSink( + IHttpClientFactory factory, + IOptionsMonitor opts, + ILogger logger) + { + _factory = factory ?? throw new ArgumentNullException(nameof(factory)); + _opts = opts ?? throw new ArgumentNullException(nameof(opts)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task SendAsync(AnomalyNotification notification, CancellationToken ct) + { + ArgumentNullException.ThrowIfNull(notification); + + var opts = _opts.CurrentValue; + if (string.IsNullOrWhiteSpace(opts.TelegramBotToken) || string.IsNullOrWhiteSpace(opts.TelegramChatId)) + { + _logger.LogWarning( + "TelegramNotificationSink: bot token / chat id not configured — skipping notification {AnomalyId}.", + notification.AnomalyId); + return; + } + + var payload = new + { + chat_id = opts.TelegramChatId, + text = FormatMessage(notification), + disable_web_page_preview = true, + }; + + var client = _factory.CreateClient(HttpClientName); + var requestUri = $"https://api.telegram.org/bot{opts.TelegramBotToken}/sendMessage"; + + try + { + using var response = await client.PostAsJsonAsync(requestUri, payload, ct).ConfigureAwait(false); + if (!response.IsSuccessStatusCode) + { + // Never log the URL/token — only the status. + _logger.LogWarning( + "TelegramNotificationSink: send failed for {AnomalyId} with status {Status}.", + notification.AnomalyId, (int)response.StatusCode); + } + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + _logger.LogWarning(ex, + "TelegramNotificationSink: send threw for {AnomalyId}.", notification.AnomalyId); + } + } + + private static string FormatMessage(AnomalyNotification n) => + $"⚠ {n.Kind}\n{n.EventTitle}\nScore {n.Score:0.00} · {n.DetectedAt:yyyy-MM-dd HH:mm} MSK"; +} diff --git a/src/Marathon.Infrastructure/Workers/AnomalyNotificationDispatcher.cs b/src/Marathon.Infrastructure/Workers/AnomalyNotificationDispatcher.cs new file mode 100644 index 0000000..d2c80e4 --- /dev/null +++ b/src/Marathon.Infrastructure/Workers/AnomalyNotificationDispatcher.cs @@ -0,0 +1,102 @@ +using Marathon.Application.Abstractions; +using Marathon.Application.UseCases; +using Marathon.Domain.ValueObjects; +using Marathon.Infrastructure.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace Marathon.Infrastructure.Workers; + +/// +/// Polls for newly detected anomalies above a configured score and pushes each to the +/// registered . Idle (cheap re-check) while +/// is false. +/// +/// +/// The "since" marker is baselined to startup time so pre-existing anomalies are not +/// re-announced on every restart, and is advanced past the newest dispatched item +/// (plus one tick) each cycle — gap-free and duplicate-free. The scoped use case is +/// resolved per cycle (EF Core DbContext lifetime); the sink is a singleton. +/// +internal sealed class AnomalyNotificationDispatcher : BackgroundService +{ + private readonly IServiceProvider _services; + private readonly INotificationSink _sink; + private readonly IOptionsMonitor _opts; + private readonly ILogger _logger; + + private DateTimeOffset _since; + + public AnomalyNotificationDispatcher( + IServiceProvider services, + INotificationSink sink, + IOptionsMonitor opts, + ILogger logger) + { + _services = services ?? throw new ArgumentNullException(nameof(services)); + _sink = sink ?? throw new ArgumentNullException(nameof(sink)); + _opts = opts ?? throw new ArgumentNullException(nameof(opts)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + // Baseline: only alert on anomalies detected after this service started. + _since = MoscowTime.Now; + _logger.LogInformation("AnomalyNotificationDispatcher: started"); + + while (!stoppingToken.IsCancellationRequested) + { + var opts = _opts.CurrentValue; + if (!opts.Enabled) + { + await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken); + continue; + } + + try + { + await using var scope = _services.CreateAsyncScope(); + var useCase = scope.ServiceProvider.GetRequiredService(); + var pending = await useCase.ExecuteAsync(_since, opts.MinScore, stoppingToken); + + foreach (var notification in pending) + { + stoppingToken.ThrowIfCancellationRequested(); + await _sink.SendAsync(notification, stoppingToken); + } + + if (pending.Count > 0) + { + // pending is oldest-first; advance strictly past the newest sent. + _since = pending[^1].DetectedAt.AddTicks(1); + _logger.LogInformation( + "AnomalyNotificationDispatcher: dispatched {Count} alert(s)", pending.Count); + } + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + _logger.LogError(ex, + "AnomalyNotificationDispatcher: cycle failed — will retry after interval"); + } + + var interval = TimeSpan.FromSeconds(Math.Max(5, opts.PollIntervalSeconds)); + try + { + await Task.Delay(interval, stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + } + + _logger.LogInformation("AnomalyNotificationDispatcher: stopping"); + } +} diff --git a/tests/Marathon.Application.Tests/UseCases/GetPendingAnomalyNotificationsUseCaseTests.cs b/tests/Marathon.Application.Tests/UseCases/GetPendingAnomalyNotificationsUseCaseTests.cs new file mode 100644 index 0000000..7f99da5 --- /dev/null +++ b/tests/Marathon.Application.Tests/UseCases/GetPendingAnomalyNotificationsUseCaseTests.cs @@ -0,0 +1,95 @@ +using FluentAssertions; +using Marathon.Application.Abstractions; +using Marathon.Application.UseCases; +using Marathon.Domain.Entities; +using Marathon.Domain.Enums; +using Marathon.Domain.ValueObjects; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; + +namespace Marathon.Application.Tests.UseCases; + +public sealed class GetPendingAnomalyNotificationsUseCaseTests +{ + private readonly IAnomalyRepository _anomalies = Substitute.For(); + private readonly IEventRepository _events = Substitute.For(); + + private static readonly TimeSpan MoscowOffset = TimeSpan.FromHours(3); + private static readonly DateTimeOffset BaseTime = new(2026, 5, 10, 18, 0, 0, MoscowOffset); + + public GetPendingAnomalyNotificationsUseCaseTests() + { + // Event titles are resolved via the batched GetManyAsync; route it through + // the per-id GetAsync stubs each test configures. + TestFixtures.BridgeGetMany(_events); + } + + private GetPendingAnomalyNotificationsUseCase CreateSut() => + new(_anomalies, _events, NullLogger.Instance); + + private static Anomaly MakeAnomaly(EventId id, decimal score, DateTimeOffset detectedAt) => + new(Guid.NewGuid(), id, detectedAt, AnomalyKind.SuspensionFlip, score, "{}"); + + private static Event MakeEvent(EventId id) => + new(id, new SportCode(11), "BY", "L1", "Cat", BaseTime, "Team A", "Team B"); + + private void StubAnomalies(params Anomaly[] anomalies) => + _anomalies.ListByDateRangeAsync( + Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(anomalies.ToList().AsReadOnly()); + + [Fact] + public async Task Should_ExcludeAnomaliesBelowMinScore() + { + var id = new EventId("11111111"); + StubAnomalies( + MakeAnomaly(id, 0.30m, BaseTime), + MakeAnomaly(id, 0.70m, BaseTime.AddMinutes(1))); + _events.GetAsync(id, Arg.Any()).Returns(MakeEvent(id)); + + var result = await CreateSut().ExecuteAsync(BaseTime.AddHours(-1), minScore: 0.45m, CancellationToken.None); + + result.Should().ContainSingle(); + result[0].Score.Should().Be(0.70m); + result[0].EventTitle.Should().Be("Team A vs Team B"); + } + + [Fact] + public async Task Should_OrderOldestFirst() + { + var id1 = new EventId("11111111"); + var id2 = new EventId("22222222"); + StubAnomalies( + MakeAnomaly(id2, 0.60m, BaseTime.AddMinutes(5)), + MakeAnomaly(id1, 0.50m, BaseTime)); + _events.GetAsync(id1, Arg.Any()).Returns(MakeEvent(id1)); + _events.GetAsync(id2, Arg.Any()).Returns(MakeEvent(id2)); + + var result = await CreateSut().ExecuteAsync(BaseTime.AddHours(-1), 0.45m, CancellationToken.None); + + result.Select(n => n.DetectedAt).Should().BeInAscendingOrder(); + } + + [Fact] + public async Task Should_FallBackToEventId_When_EventMissing() + { + var id = new EventId("orphan00"); + StubAnomalies(MakeAnomaly(id, 0.55m, BaseTime)); + _events.GetAsync(id, Arg.Any()).Returns((Event?)null); + + var result = await CreateSut().ExecuteAsync(BaseTime.AddHours(-1), 0.45m, CancellationToken.None); + + result.Should().ContainSingle(); + result[0].EventTitle.Should().Be("orphan00"); + } + + [Fact] + public async Task Should_ReturnEmpty_When_NothingQualifies() + { + StubAnomalies(Array.Empty()); + + var result = await CreateSut().ExecuteAsync(BaseTime, 0.45m, CancellationToken.None); + + result.Should().BeEmpty(); + } +}