Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e307a54bec | |||
| 68f3229c35 | |||
| 005d4e794a |
@@ -0,0 +1,28 @@
|
|||||||
|
using Marathon.Domain.Enums;
|
||||||
|
|
||||||
|
namespace Marathon.Application.Abstractions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// A ready-to-deliver anomaly alert payload, shaped by
|
||||||
|
/// <see cref="UseCases.GetPendingAnomalyNotificationsUseCase"/> so a sink only has to
|
||||||
|
/// format and transmit it.
|
||||||
|
/// </summary>
|
||||||
|
public sealed record AnomalyNotification(
|
||||||
|
Guid AnomalyId,
|
||||||
|
string EventTitle,
|
||||||
|
AnomalyKind Kind,
|
||||||
|
decimal Score,
|
||||||
|
DateTimeOffset DetectedAt);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// A channel that delivers anomaly alerts (e.g. Telegram; future: email / Discord).
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Implementations must be resilient: a failure to deliver one notification should be
|
||||||
|
/// logged and swallowed, never thrown into the dispatcher loop. A sink that is not
|
||||||
|
/// configured (e.g. missing credentials) should no-op with a warning.
|
||||||
|
/// </remarks>
|
||||||
|
public interface INotificationSink
|
||||||
|
{
|
||||||
|
Task SendAsync(AnomalyNotification notification, CancellationToken ct);
|
||||||
|
}
|
||||||
@@ -31,6 +31,7 @@ public static class ApplicationModule
|
|||||||
services.AddScoped<ExportToExcelUseCase>();
|
services.AddScoped<ExportToExcelUseCase>();
|
||||||
services.AddScoped<DetectAnomaliesUseCase>();
|
services.AddScoped<DetectAnomaliesUseCase>();
|
||||||
services.AddScoped<EvaluateAnomalyOutcomesUseCase>();
|
services.AddScoped<EvaluateAnomalyOutcomesUseCase>();
|
||||||
|
services.AddScoped<GetPendingAnomalyNotificationsUseCase>();
|
||||||
|
|
||||||
services.AddScoped<RecordPlacedBetUseCase>();
|
services.AddScoped<RecordPlacedBetUseCase>();
|
||||||
services.AddScoped<ResolvePendingBetsUseCase>();
|
services.AddScoped<ResolvePendingBetsUseCase>();
|
||||||
|
|||||||
@@ -44,4 +44,11 @@ public sealed class AnomalyOptions
|
|||||||
/// to flag a steam move. Must be in (0, 1). Default: 0.20 (20 percentage points).
|
/// to flag a steam move. Must be in (0, 1). Default: 0.20 (20 percentage points).
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public decimal SteamMoveDriftThreshold { get; init; } = 0.20m;
|
public decimal SteamMoveDriftThreshold { get; init; } = 0.20m;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Maximum normalised implied-probability change across a suspension for it to count
|
||||||
|
/// as a "freeze" (line resumed essentially unchanged). Must be in (0, 1).
|
||||||
|
/// Default: 0.05 (5 percentage points).
|
||||||
|
/// </summary>
|
||||||
|
public decimal SuspensionFreezeThreshold { get; init; } = 0.05m;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -70,6 +70,10 @@ public sealed class DetectAnomaliesUseCase
|
|||||||
_options.SteamMoveDriftThreshold,
|
_options.SteamMoveDriftThreshold,
|
||||||
_options.MinSnapshotCount,
|
_options.MinSnapshotCount,
|
||||||
_options.SuspensionGapSeconds),
|
_options.SuspensionGapSeconds),
|
||||||
|
new SuspensionFreezeDetector(
|
||||||
|
_options.SuspensionGapSeconds,
|
||||||
|
_options.SuspensionFreezeThreshold,
|
||||||
|
_options.MinSnapshotCount),
|
||||||
};
|
};
|
||||||
|
|
||||||
var events = await _eventRepo.ListAsync(ct);
|
var events = await _eventRepo.ListAsync(ct);
|
||||||
|
|||||||
@@ -0,0 +1,62 @@
|
|||||||
|
using Marathon.Application.Abstractions;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
|
namespace Marathon.Application.UseCases;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Shapes the anomalies worth alerting on: those detected at or after a caller-supplied
|
||||||
|
/// marker whose score clears a minimum, joined with their event titles. Pure of any
|
||||||
|
/// transport concern — the dispatcher decides cadence and the sink decides delivery.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Results are ordered oldest-first so the caller can advance its "since" marker to the
|
||||||
|
/// last item's <see cref="AnomalyNotification.DetectedAt"/> (plus one tick) without gaps
|
||||||
|
/// or duplicates.
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class GetPendingAnomalyNotificationsUseCase
|
||||||
|
{
|
||||||
|
private readonly IAnomalyRepository _anomalies;
|
||||||
|
private readonly IEventRepository _events;
|
||||||
|
private readonly ILogger<GetPendingAnomalyNotificationsUseCase> _logger;
|
||||||
|
|
||||||
|
public GetPendingAnomalyNotificationsUseCase(
|
||||||
|
IAnomalyRepository anomalies,
|
||||||
|
IEventRepository events,
|
||||||
|
ILogger<GetPendingAnomalyNotificationsUseCase> logger)
|
||||||
|
{
|
||||||
|
_anomalies = anomalies ?? throw new ArgumentNullException(nameof(anomalies));
|
||||||
|
_events = events ?? throw new ArgumentNullException(nameof(events));
|
||||||
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<IReadOnlyList<AnomalyNotification>> ExecuteAsync(
|
||||||
|
DateTimeOffset since,
|
||||||
|
decimal minScore,
|
||||||
|
CancellationToken ct)
|
||||||
|
{
|
||||||
|
// Date filter pushed to SQL; score filter is cheap in memory over the small slice.
|
||||||
|
var recent = await _anomalies.ListByDateRangeAsync(since, to: null, ct).ConfigureAwait(false);
|
||||||
|
var qualifying = recent.Where(a => a.Score >= minScore).ToList();
|
||||||
|
if (qualifying.Count == 0)
|
||||||
|
return Array.Empty<AnomalyNotification>();
|
||||||
|
|
||||||
|
var eventIds = qualifying.Select(a => a.EventId).Distinct().ToList();
|
||||||
|
var events = await _events.GetManyAsync(eventIds, ct).ConfigureAwait(false);
|
||||||
|
|
||||||
|
var notifications = qualifying
|
||||||
|
.OrderBy(a => a.DetectedAt)
|
||||||
|
.Select(a => new AnomalyNotification(
|
||||||
|
AnomalyId: a.Id,
|
||||||
|
EventTitle: events.TryGetValue(a.EventId, out var ev) ? ev.Title : a.EventId.Value,
|
||||||
|
Kind: a.Kind,
|
||||||
|
Score: a.Score,
|
||||||
|
DetectedAt: a.DetectedAt))
|
||||||
|
.ToList();
|
||||||
|
|
||||||
|
_logger.LogDebug(
|
||||||
|
"GetPendingAnomalyNotificationsUseCase: {Count} alert(s) since {Since:O} at minScore {MinScore}",
|
||||||
|
notifications.Count, since, minScore);
|
||||||
|
|
||||||
|
return notifications;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,107 @@
|
|||||||
|
using Marathon.Domain.Entities;
|
||||||
|
using Marathon.Domain.Enums;
|
||||||
|
using Marathon.Domain.ValueObjects;
|
||||||
|
|
||||||
|
namespace Marathon.Domain.AnomalyDetection;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Detects a "suspension freeze": the market was suspended (a gap larger than
|
||||||
|
/// <c>suspensionGapSeconds</c> between adjacent live snapshots) but resumed with
|
||||||
|
/// essentially the same line — the favourite is unchanged and the largest normalised
|
||||||
|
/// implied-probability move is below <c>freezeThreshold</c>.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>
|
||||||
|
/// This is the mirror image of <see cref="AnomalyDetector"/> (SuspensionFlip): the flip
|
||||||
|
/// fires on a large favourite-changing move across a suspension; the freeze fires when
|
||||||
|
/// the bookmaker paused but did <i>not</i> move — a tell that they were uncertain or
|
||||||
|
/// gathering information rather than repricing.
|
||||||
|
/// </para>
|
||||||
|
/// <para>
|
||||||
|
/// Score = how completely the line froze: <c>1 − (maxMove / freezeThreshold)</c>, so a
|
||||||
|
/// perfectly unchanged line scores ~1.0 and one near the threshold scores near 0. The
|
||||||
|
/// shared <see cref="MatchWinEvidence"/> shape (pre ≈ post) conveys the freeze directly,
|
||||||
|
/// and the outcome evaluator grades the unchanged favourite like any other anomaly.
|
||||||
|
/// </para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class SuspensionFreezeDetector : IAnomalyDetector
|
||||||
|
{
|
||||||
|
private readonly int _suspensionGapSeconds;
|
||||||
|
private readonly decimal _freezeThreshold;
|
||||||
|
private readonly int _minSnapshotCount;
|
||||||
|
|
||||||
|
/// <param name="suspensionGapSeconds">Minimum adjacent-snapshot gap (seconds) classed as a suspension.</param>
|
||||||
|
/// <param name="freezeThreshold">Maximum normalised probability move to count as frozen; in (0, 1).</param>
|
||||||
|
/// <param name="minSnapshotCount">Minimum live snapshots before detection runs (>= 2).</param>
|
||||||
|
public SuspensionFreezeDetector(int suspensionGapSeconds, decimal freezeThreshold, int minSnapshotCount)
|
||||||
|
{
|
||||||
|
if (suspensionGapSeconds <= 0)
|
||||||
|
throw new ArgumentOutOfRangeException(nameof(suspensionGapSeconds), suspensionGapSeconds, "Must be positive.");
|
||||||
|
if (freezeThreshold is <= 0m or >= 1m)
|
||||||
|
throw new ArgumentOutOfRangeException(nameof(freezeThreshold), freezeThreshold, "Must be in (0, 1).");
|
||||||
|
if (minSnapshotCount < 2)
|
||||||
|
throw new ArgumentOutOfRangeException(nameof(minSnapshotCount), minSnapshotCount, "Must be at least 2.");
|
||||||
|
|
||||||
|
_suspensionGapSeconds = suspensionGapSeconds;
|
||||||
|
_freezeThreshold = freezeThreshold;
|
||||||
|
_minSnapshotCount = minSnapshotCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public IReadOnlyList<Anomaly> Detect(EventId eventId, IReadOnlyList<OddsSnapshot> snapshots)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(eventId);
|
||||||
|
ArgumentNullException.ThrowIfNull(snapshots);
|
||||||
|
|
||||||
|
var live = snapshots
|
||||||
|
.Where(s => s.Source == OddsSource.Live)
|
||||||
|
.OrderBy(s => s.CapturedAt)
|
||||||
|
.ToList();
|
||||||
|
|
||||||
|
if (live.Count < _minSnapshotCount)
|
||||||
|
return Array.Empty<Anomaly>();
|
||||||
|
|
||||||
|
var suspensionGap = TimeSpan.FromSeconds(_suspensionGapSeconds);
|
||||||
|
var anomalies = new List<Anomaly>();
|
||||||
|
|
||||||
|
for (int i = 0; i < live.Count - 1; i++)
|
||||||
|
{
|
||||||
|
var pre = live[i];
|
||||||
|
var post = live[i + 1];
|
||||||
|
if (post.CapturedAt - pre.CapturedAt <= suspensionGap)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
var preProbs = MatchWinEvidence.Extract(pre);
|
||||||
|
var postProbs = MatchWinEvidence.Extract(post);
|
||||||
|
if (preProbs is null || postProbs is null)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
decimal maxMove = Math.Max(
|
||||||
|
Math.Abs(postProbs.P1 - preProbs.P1),
|
||||||
|
Math.Abs(postProbs.P2 - preProbs.P2));
|
||||||
|
if (preProbs.PDraw.HasValue && postProbs.PDraw.HasValue)
|
||||||
|
maxMove = Math.Max(maxMove, Math.Abs(postProbs.PDraw.Value - preProbs.PDraw.Value));
|
||||||
|
|
||||||
|
var favouriteUnchanged =
|
||||||
|
MatchWinEvidence.Favourite(preProbs) == MatchWinEvidence.Favourite(postProbs);
|
||||||
|
|
||||||
|
// Strictly below the threshold so the score stays in (0, 1].
|
||||||
|
if (!favouriteUnchanged || maxMove >= _freezeThreshold)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
var score = 1m - (maxMove / _freezeThreshold);
|
||||||
|
var gapSeconds = (int)(post.CapturedAt - pre.CapturedAt).TotalSeconds;
|
||||||
|
var evidenceJson = MatchWinEvidence.BuildJson(gapSeconds, pre, preProbs, post, postProbs);
|
||||||
|
|
||||||
|
anomalies.Add(new Anomaly(
|
||||||
|
Id: Guid.NewGuid(),
|
||||||
|
EventId: eventId,
|
||||||
|
DetectedAt: MoscowTime.Now,
|
||||||
|
Kind: AnomalyKind.SuspensionFreeze,
|
||||||
|
Score: score,
|
||||||
|
EvidenceJson: evidenceJson));
|
||||||
|
}
|
||||||
|
|
||||||
|
return anomalies.AsReadOnly();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -16,4 +16,10 @@ public enum AnomalyKind
|
|||||||
/// continuous window (no suspension) — money moving the line ("steam").
|
/// continuous window (no suspension) — money moving the line ("steam").
|
||||||
/// </summary>
|
/// </summary>
|
||||||
SteamMove,
|
SteamMove,
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The bookmaker suspended the market but resumed with essentially the same line
|
||||||
|
/// (favourite unchanged, negligible price move) — a freeze signalling uncertainty.
|
||||||
|
/// </summary>
|
||||||
|
SuspensionFreeze,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,7 +41,15 @@
|
|||||||
"SuspensionGapSeconds": 60,
|
"SuspensionGapSeconds": 60,
|
||||||
"OddsFlipThreshold": 0.30,
|
"OddsFlipThreshold": 0.30,
|
||||||
"MinSnapshotCount": 3,
|
"MinSnapshotCount": 3,
|
||||||
"DetectionIntervalSeconds": 60
|
"DetectionIntervalSeconds": 60,
|
||||||
|
"SteamMoveWindowSeconds": 120,
|
||||||
|
"SteamMoveDriftThreshold": 0.20,
|
||||||
|
"SuspensionFreezeThreshold": 0.05
|
||||||
|
},
|
||||||
|
"Notifications": {
|
||||||
|
"Enabled": false,
|
||||||
|
"MinScore": 0.45,
|
||||||
|
"PollIntervalSeconds": 60
|
||||||
},
|
},
|
||||||
"Localization": {
|
"Localization": {
|
||||||
"DefaultCulture": "ru-RU"
|
"DefaultCulture": "ru-RU"
|
||||||
|
|||||||
@@ -0,0 +1,30 @@
|
|||||||
|
namespace Marathon.Infrastructure.Configuration;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Options for outbound anomaly notifications, bound from the <c>Notifications</c>
|
||||||
|
/// config section.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Disabled by default. <see cref="TelegramBotToken"/> and <see cref="TelegramChatId"/>
|
||||||
|
/// are secrets — set them ONLY in <c>appsettings.Local.json</c> (gitignored) or an
|
||||||
|
/// environment variable, never in the committed <c>appsettings.json</c>.
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class NotificationOptions
|
||||||
|
{
|
||||||
|
public const string SectionName = "Notifications";
|
||||||
|
|
||||||
|
/// <summary>Master switch — when false, the dispatcher idles and nothing is sent.</summary>
|
||||||
|
public bool Enabled { get; init; }
|
||||||
|
|
||||||
|
/// <summary>Telegram bot token (secret — Local.json / env only).</summary>
|
||||||
|
public string? TelegramBotToken { get; init; }
|
||||||
|
|
||||||
|
/// <summary>Telegram chat id to deliver alerts to (secret — Local.json / env only).</summary>
|
||||||
|
public string? TelegramChatId { get; init; }
|
||||||
|
|
||||||
|
/// <summary>Minimum anomaly score to alert on. Default: 0.45 (Medium severity).</summary>
|
||||||
|
public decimal MinScore { get; init; } = 0.45m;
|
||||||
|
|
||||||
|
/// <summary>Seconds between dispatcher polls. Default: 60.</summary>
|
||||||
|
public int PollIntervalSeconds { get; init; } = 60;
|
||||||
|
}
|
||||||
@@ -1,5 +1,7 @@
|
|||||||
|
using Marathon.Application.Abstractions;
|
||||||
using Marathon.Application.Configuration;
|
using Marathon.Application.Configuration;
|
||||||
using Marathon.Infrastructure.Configuration;
|
using Marathon.Infrastructure.Configuration;
|
||||||
|
using Marathon.Infrastructure.Notifications;
|
||||||
using Marathon.Infrastructure.Persistence;
|
using Marathon.Infrastructure.Persistence;
|
||||||
using Marathon.Infrastructure.Scraping;
|
using Marathon.Infrastructure.Scraping;
|
||||||
using Marathon.Infrastructure.Workers;
|
using Marathon.Infrastructure.Workers;
|
||||||
@@ -50,11 +52,23 @@ public static class InfrastructureModule
|
|||||||
.AddOptions<ScrapingThrottle>()
|
.AddOptions<ScrapingThrottle>()
|
||||||
.Bind(config.GetSection(ScrapingThrottle.SectionName));
|
.Bind(config.GetSection(ScrapingThrottle.SectionName));
|
||||||
|
|
||||||
|
services
|
||||||
|
.AddOptions<NotificationOptions>()
|
||||||
|
.Bind(config.GetSection(NotificationOptions.SectionName));
|
||||||
|
|
||||||
services.AddHostedService<UpcomingEventsPoller>();
|
services.AddHostedService<UpcomingEventsPoller>();
|
||||||
services.AddHostedService<LiveOddsPoller>();
|
services.AddHostedService<LiveOddsPoller>();
|
||||||
services.AddHostedService<ResultsWatchListPoller>();
|
services.AddHostedService<ResultsWatchListPoller>();
|
||||||
services.AddHostedService<AnomalyDetectionPoller>();
|
services.AddHostedService<AnomalyDetectionPoller>();
|
||||||
|
|
||||||
|
// Outbound anomaly notifications (Telegram). Sink + dispatcher are always
|
||||||
|
// registered; the dispatcher idles until Notifications:Enabled is true and
|
||||||
|
// the sink no-ops until a bot token + chat id are configured.
|
||||||
|
services.AddHttpClient(TelegramNotificationSink.HttpClientName, client =>
|
||||||
|
client.Timeout = TimeSpan.FromSeconds(15));
|
||||||
|
services.AddSingleton<INotificationSink, TelegramNotificationSink>();
|
||||||
|
services.AddHostedService<AnomalyNotificationDispatcher>();
|
||||||
|
|
||||||
return services;
|
return services;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,79 @@
|
|||||||
|
using System.Net.Http.Json;
|
||||||
|
using Marathon.Application.Abstractions;
|
||||||
|
using Marathon.Infrastructure.Configuration;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
|
||||||
|
namespace Marathon.Infrastructure.Notifications;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Delivers anomaly alerts to a Telegram chat via the Bot API <c>sendMessage</c>
|
||||||
|
/// endpoint, using a plain <see cref="HttpClient"/> (no third-party SDK dependency).
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// No-ops with a warning when the bot token or chat id is not configured, so a
|
||||||
|
/// half-configured deployment degrades gracefully rather than throwing. The bot token
|
||||||
|
/// is never logged (it sits in the request URL only).
|
||||||
|
/// </remarks>
|
||||||
|
internal sealed class TelegramNotificationSink : INotificationSink
|
||||||
|
{
|
||||||
|
public const string HttpClientName = "telegram";
|
||||||
|
|
||||||
|
private readonly IHttpClientFactory _factory;
|
||||||
|
private readonly IOptionsMonitor<NotificationOptions> _opts;
|
||||||
|
private readonly ILogger<TelegramNotificationSink> _logger;
|
||||||
|
|
||||||
|
public TelegramNotificationSink(
|
||||||
|
IHttpClientFactory factory,
|
||||||
|
IOptionsMonitor<NotificationOptions> opts,
|
||||||
|
ILogger<TelegramNotificationSink> logger)
|
||||||
|
{
|
||||||
|
_factory = factory ?? throw new ArgumentNullException(nameof(factory));
|
||||||
|
_opts = opts ?? throw new ArgumentNullException(nameof(opts));
|
||||||
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task SendAsync(AnomalyNotification notification, CancellationToken ct)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(notification);
|
||||||
|
|
||||||
|
var opts = _opts.CurrentValue;
|
||||||
|
if (string.IsNullOrWhiteSpace(opts.TelegramBotToken) || string.IsNullOrWhiteSpace(opts.TelegramChatId))
|
||||||
|
{
|
||||||
|
_logger.LogWarning(
|
||||||
|
"TelegramNotificationSink: bot token / chat id not configured — skipping notification {AnomalyId}.",
|
||||||
|
notification.AnomalyId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var payload = new
|
||||||
|
{
|
||||||
|
chat_id = opts.TelegramChatId,
|
||||||
|
text = FormatMessage(notification),
|
||||||
|
disable_web_page_preview = true,
|
||||||
|
};
|
||||||
|
|
||||||
|
var client = _factory.CreateClient(HttpClientName);
|
||||||
|
var requestUri = $"https://api.telegram.org/bot{opts.TelegramBotToken}/sendMessage";
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using var response = await client.PostAsJsonAsync(requestUri, payload, ct).ConfigureAwait(false);
|
||||||
|
if (!response.IsSuccessStatusCode)
|
||||||
|
{
|
||||||
|
// Never log the URL/token — only the status.
|
||||||
|
_logger.LogWarning(
|
||||||
|
"TelegramNotificationSink: send failed for {AnomalyId} with status {Status}.",
|
||||||
|
notification.AnomalyId, (int)response.StatusCode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex) when (ex is not OperationCanceledException)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(ex,
|
||||||
|
"TelegramNotificationSink: send threw for {AnomalyId}.", notification.AnomalyId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string FormatMessage(AnomalyNotification n) =>
|
||||||
|
$"⚠ {n.Kind}\n{n.EventTitle}\nScore {n.Score:0.00} · {n.DetectedAt:yyyy-MM-dd HH:mm} MSK";
|
||||||
|
}
|
||||||
@@ -0,0 +1,106 @@
|
|||||||
|
using Marathon.Application.Abstractions;
|
||||||
|
using Marathon.Application.UseCases;
|
||||||
|
using Marathon.Domain.ValueObjects;
|
||||||
|
using Marathon.Infrastructure.Configuration;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.Hosting;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
|
||||||
|
namespace Marathon.Infrastructure.Workers;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Polls for newly detected anomalies above a configured score and pushes each to the
|
||||||
|
/// registered <see cref="INotificationSink"/>. Idle (cheap re-check) while
|
||||||
|
/// <see cref="NotificationOptions.Enabled"/> is false.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// The "since" marker is baselined to startup time so pre-existing anomalies are not
|
||||||
|
/// re-announced on every restart, and is advanced past the newest dispatched item
|
||||||
|
/// (plus one tick) each cycle — gap-free and duplicate-free. The scoped use case is
|
||||||
|
/// resolved per cycle (EF Core DbContext lifetime); the sink is a singleton.
|
||||||
|
/// </remarks>
|
||||||
|
internal sealed class AnomalyNotificationDispatcher : BackgroundService
|
||||||
|
{
|
||||||
|
private readonly IServiceProvider _services;
|
||||||
|
private readonly INotificationSink _sink;
|
||||||
|
private readonly IOptionsMonitor<NotificationOptions> _opts;
|
||||||
|
private readonly ILogger<AnomalyNotificationDispatcher> _logger;
|
||||||
|
|
||||||
|
private DateTimeOffset _since;
|
||||||
|
|
||||||
|
public AnomalyNotificationDispatcher(
|
||||||
|
IServiceProvider services,
|
||||||
|
INotificationSink sink,
|
||||||
|
IOptionsMonitor<NotificationOptions> opts,
|
||||||
|
ILogger<AnomalyNotificationDispatcher> logger)
|
||||||
|
{
|
||||||
|
_services = services ?? throw new ArgumentNullException(nameof(services));
|
||||||
|
_sink = sink ?? throw new ArgumentNullException(nameof(sink));
|
||||||
|
_opts = opts ?? throw new ArgumentNullException(nameof(opts));
|
||||||
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
|
{
|
||||||
|
// Baseline: only alert on anomalies detected after this service started.
|
||||||
|
_since = MoscowTime.Now;
|
||||||
|
_logger.LogInformation("AnomalyNotificationDispatcher: started");
|
||||||
|
|
||||||
|
while (!stoppingToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
var opts = _opts.CurrentValue;
|
||||||
|
if (!opts.Enabled)
|
||||||
|
{
|
||||||
|
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await using var scope = _services.CreateAsyncScope();
|
||||||
|
var useCase = scope.ServiceProvider.GetRequiredService<GetPendingAnomalyNotificationsUseCase>();
|
||||||
|
var pending = await useCase.ExecuteAsync(_since, opts.MinScore, stoppingToken);
|
||||||
|
|
||||||
|
var dispatched = 0;
|
||||||
|
foreach (var notification in pending)
|
||||||
|
{
|
||||||
|
stoppingToken.ThrowIfCancellationRequested();
|
||||||
|
await _sink.SendAsync(notification, stoppingToken);
|
||||||
|
// Advance the marker per delivered item (pending is oldest-first) so that
|
||||||
|
// if a future sink ever threw mid-batch, the already-sent alerts are not
|
||||||
|
// re-delivered on the next cycle — only the unsent tail is retried.
|
||||||
|
_since = notification.DetectedAt.AddTicks(1);
|
||||||
|
dispatched++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dispatched > 0)
|
||||||
|
{
|
||||||
|
_logger.LogInformation(
|
||||||
|
"AnomalyNotificationDispatcher: dispatched {Count} alert(s)", dispatched);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex,
|
||||||
|
"AnomalyNotificationDispatcher: cycle failed — will retry after interval");
|
||||||
|
}
|
||||||
|
|
||||||
|
var interval = TimeSpan.FromSeconds(Math.Max(5, opts.PollIntervalSeconds));
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await Task.Delay(interval, stoppingToken);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_logger.LogInformation("AnomalyNotificationDispatcher: stopping");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -213,6 +213,7 @@
|
|||||||
{
|
{
|
||||||
AnomalyKind.SuspensionFlip => L["Anomaly.Kind.SuspensionFlip"],
|
AnomalyKind.SuspensionFlip => L["Anomaly.Kind.SuspensionFlip"],
|
||||||
AnomalyKind.SteamMove => L["Anomaly.Kind.SteamMove"],
|
AnomalyKind.SteamMove => L["Anomaly.Kind.SteamMove"],
|
||||||
|
AnomalyKind.SuspensionFreeze => L["Anomaly.Kind.SuspensionFreeze"],
|
||||||
_ => kind.ToString(),
|
_ => kind.ToString(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -107,6 +107,7 @@
|
|||||||
{
|
{
|
||||||
AnomalyKind.SuspensionFlip => L["Anomaly.Kind.SuspensionFlip"],
|
AnomalyKind.SuspensionFlip => L["Anomaly.Kind.SuspensionFlip"],
|
||||||
AnomalyKind.SteamMove => L["Anomaly.Kind.SteamMove"],
|
AnomalyKind.SteamMove => L["Anomaly.Kind.SteamMove"],
|
||||||
|
AnomalyKind.SuspensionFreeze => L["Anomaly.Kind.SuspensionFreeze"],
|
||||||
_ => kind.ToString(),
|
_ => kind.ToString(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -167,6 +167,7 @@
|
|||||||
<data name="Anomaly.Live"><value>Anomaly</value></data>
|
<data name="Anomaly.Live"><value>Anomaly</value></data>
|
||||||
<data name="Anomaly.Kind.SuspensionFlip"><value>Suspension flip</value></data>
|
<data name="Anomaly.Kind.SuspensionFlip"><value>Suspension flip</value></data>
|
||||||
<data name="Anomaly.Kind.SteamMove"><value>Steam move</value></data>
|
<data name="Anomaly.Kind.SteamMove"><value>Steam move</value></data>
|
||||||
|
<data name="Anomaly.Kind.SuspensionFreeze"><value>Suspension freeze</value></data>
|
||||||
<data name="Anomaly.Score"><value>Confidence</value></data>
|
<data name="Anomaly.Score"><value>Confidence</value></data>
|
||||||
|
|
||||||
<!-- Phase 7 — Anomaly feed UI -->
|
<!-- Phase 7 — Anomaly feed UI -->
|
||||||
|
|||||||
@@ -180,6 +180,7 @@
|
|||||||
<data name="Anomaly.Live"><value>Аномалия</value></data>
|
<data name="Anomaly.Live"><value>Аномалия</value></data>
|
||||||
<data name="Anomaly.Kind.SuspensionFlip"><value>Разворот после заморозки</value></data>
|
<data name="Anomaly.Kind.SuspensionFlip"><value>Разворот после заморозки</value></data>
|
||||||
<data name="Anomaly.Kind.SteamMove"><value>Движение линии</value></data>
|
<data name="Anomaly.Kind.SteamMove"><value>Движение линии</value></data>
|
||||||
|
<data name="Anomaly.Kind.SuspensionFreeze"><value>Заморозка линии</value></data>
|
||||||
<data name="Anomaly.Score"><value>Уверенность</value></data>
|
<data name="Anomaly.Score"><value>Уверенность</value></data>
|
||||||
|
|
||||||
<!-- Phase 7 — Лента аномалий -->
|
<!-- Phase 7 — Лента аномалий -->
|
||||||
|
|||||||
+95
@@ -0,0 +1,95 @@
|
|||||||
|
using FluentAssertions;
|
||||||
|
using Marathon.Application.Abstractions;
|
||||||
|
using Marathon.Application.UseCases;
|
||||||
|
using Marathon.Domain.Entities;
|
||||||
|
using Marathon.Domain.Enums;
|
||||||
|
using Marathon.Domain.ValueObjects;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using NSubstitute;
|
||||||
|
|
||||||
|
namespace Marathon.Application.Tests.UseCases;
|
||||||
|
|
||||||
|
public sealed class GetPendingAnomalyNotificationsUseCaseTests
|
||||||
|
{
|
||||||
|
private readonly IAnomalyRepository _anomalies = Substitute.For<IAnomalyRepository>();
|
||||||
|
private readonly IEventRepository _events = Substitute.For<IEventRepository>();
|
||||||
|
|
||||||
|
private static readonly TimeSpan MoscowOffset = TimeSpan.FromHours(3);
|
||||||
|
private static readonly DateTimeOffset BaseTime = new(2026, 5, 10, 18, 0, 0, MoscowOffset);
|
||||||
|
|
||||||
|
public GetPendingAnomalyNotificationsUseCaseTests()
|
||||||
|
{
|
||||||
|
// Event titles are resolved via the batched GetManyAsync; route it through
|
||||||
|
// the per-id GetAsync stubs each test configures.
|
||||||
|
TestFixtures.BridgeGetMany(_events);
|
||||||
|
}
|
||||||
|
|
||||||
|
private GetPendingAnomalyNotificationsUseCase CreateSut() =>
|
||||||
|
new(_anomalies, _events, NullLogger<GetPendingAnomalyNotificationsUseCase>.Instance);
|
||||||
|
|
||||||
|
private static Anomaly MakeAnomaly(EventId id, decimal score, DateTimeOffset detectedAt) =>
|
||||||
|
new(Guid.NewGuid(), id, detectedAt, AnomalyKind.SuspensionFlip, score, "{}");
|
||||||
|
|
||||||
|
private static Event MakeEvent(EventId id) =>
|
||||||
|
new(id, new SportCode(11), "BY", "L1", "Cat", BaseTime, "Team A", "Team B");
|
||||||
|
|
||||||
|
private void StubAnomalies(params Anomaly[] anomalies) =>
|
||||||
|
_anomalies.ListByDateRangeAsync(
|
||||||
|
Arg.Any<DateTimeOffset?>(), Arg.Any<DateTimeOffset?>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(anomalies.ToList().AsReadOnly());
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Should_ExcludeAnomaliesBelowMinScore()
|
||||||
|
{
|
||||||
|
var id = new EventId("11111111");
|
||||||
|
StubAnomalies(
|
||||||
|
MakeAnomaly(id, 0.30m, BaseTime),
|
||||||
|
MakeAnomaly(id, 0.70m, BaseTime.AddMinutes(1)));
|
||||||
|
_events.GetAsync(id, Arg.Any<CancellationToken>()).Returns(MakeEvent(id));
|
||||||
|
|
||||||
|
var result = await CreateSut().ExecuteAsync(BaseTime.AddHours(-1), minScore: 0.45m, CancellationToken.None);
|
||||||
|
|
||||||
|
result.Should().ContainSingle();
|
||||||
|
result[0].Score.Should().Be(0.70m);
|
||||||
|
result[0].EventTitle.Should().Be("Team A vs Team B");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Should_OrderOldestFirst()
|
||||||
|
{
|
||||||
|
var id1 = new EventId("11111111");
|
||||||
|
var id2 = new EventId("22222222");
|
||||||
|
StubAnomalies(
|
||||||
|
MakeAnomaly(id2, 0.60m, BaseTime.AddMinutes(5)),
|
||||||
|
MakeAnomaly(id1, 0.50m, BaseTime));
|
||||||
|
_events.GetAsync(id1, Arg.Any<CancellationToken>()).Returns(MakeEvent(id1));
|
||||||
|
_events.GetAsync(id2, Arg.Any<CancellationToken>()).Returns(MakeEvent(id2));
|
||||||
|
|
||||||
|
var result = await CreateSut().ExecuteAsync(BaseTime.AddHours(-1), 0.45m, CancellationToken.None);
|
||||||
|
|
||||||
|
result.Select(n => n.DetectedAt).Should().BeInAscendingOrder();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Should_FallBackToEventId_When_EventMissing()
|
||||||
|
{
|
||||||
|
var id = new EventId("orphan00");
|
||||||
|
StubAnomalies(MakeAnomaly(id, 0.55m, BaseTime));
|
||||||
|
_events.GetAsync(id, Arg.Any<CancellationToken>()).Returns((Event?)null);
|
||||||
|
|
||||||
|
var result = await CreateSut().ExecuteAsync(BaseTime.AddHours(-1), 0.45m, CancellationToken.None);
|
||||||
|
|
||||||
|
result.Should().ContainSingle();
|
||||||
|
result[0].EventTitle.Should().Be("orphan00");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Should_ReturnEmpty_When_NothingQualifies()
|
||||||
|
{
|
||||||
|
StubAnomalies(Array.Empty<Anomaly>());
|
||||||
|
|
||||||
|
var result = await CreateSut().ExecuteAsync(BaseTime, 0.45m, CancellationToken.None);
|
||||||
|
|
||||||
|
result.Should().BeEmpty();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,104 @@
|
|||||||
|
using FluentAssertions;
|
||||||
|
using Marathon.Domain.AnomalyDetection;
|
||||||
|
using Marathon.Domain.Entities;
|
||||||
|
using Marathon.Domain.Enums;
|
||||||
|
using Marathon.Domain.ValueObjects;
|
||||||
|
|
||||||
|
namespace Marathon.Domain.Tests.AnomalyDetection;
|
||||||
|
|
||||||
|
public sealed class SuspensionFreezeDetectorTests
|
||||||
|
{
|
||||||
|
private static readonly TimeSpan MoscowOffset = TimeSpan.FromHours(3);
|
||||||
|
private static readonly DateTimeOffset BaseTime = new(2026, 5, 10, 18, 0, 0, MoscowOffset);
|
||||||
|
private static readonly EventId Event = new("26000002");
|
||||||
|
|
||||||
|
// suspension gap 60s, freeze threshold 0.05, min 3 snapshots.
|
||||||
|
private static SuspensionFreezeDetector CreateSut() => new(60, 0.05m, 3);
|
||||||
|
|
||||||
|
private static OddsSnapshot Live(int seconds, decimal r1, decimal r2) =>
|
||||||
|
new(Event, BaseTime.AddSeconds(seconds), OddsSource.Live,
|
||||||
|
new List<Bet>
|
||||||
|
{
|
||||||
|
new(MatchScope.Instance, BetType.Win, Side.Side1, null, new OddsRate(r1)),
|
||||||
|
new(MatchScope.Instance, BetType.Win, Side.Side2, null, new OddsRate(r2)),
|
||||||
|
});
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Should_FlagFreeze_When_SuspensionResumesUnchanged()
|
||||||
|
{
|
||||||
|
// 90s gap between the 2nd and 3rd snapshot; line resumes identical.
|
||||||
|
var snapshots = new[]
|
||||||
|
{
|
||||||
|
Live(0, 1.5m, 3.0m),
|
||||||
|
Live(30, 1.5m, 3.0m),
|
||||||
|
Live(120, 1.5m, 3.0m), // 90s gap, unchanged
|
||||||
|
Live(150, 1.5m, 3.0m),
|
||||||
|
};
|
||||||
|
|
||||||
|
var result = CreateSut().Detect(Event, snapshots);
|
||||||
|
|
||||||
|
result.Should().ContainSingle();
|
||||||
|
result[0].Kind.Should().Be(AnomalyKind.SuspensionFreeze);
|
||||||
|
result[0].Score.Should().Be(1.0m, "an unchanged line is a complete freeze");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Should_NotFlag_When_FlipAcrossSuspension()
|
||||||
|
{
|
||||||
|
// Favourite changes across the gap — that is the SuspensionFlip detector's job.
|
||||||
|
var snapshots = new[]
|
||||||
|
{
|
||||||
|
Live(0, 1.3m, 4.0m),
|
||||||
|
Live(30, 1.3m, 4.0m),
|
||||||
|
Live(120, 4.0m, 1.3m), // 90s gap, flipped
|
||||||
|
Live(150, 4.0m, 1.3m),
|
||||||
|
};
|
||||||
|
|
||||||
|
CreateSut().Detect(Event, snapshots).Should().BeEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Should_NotFlag_When_NoSuspensionGap()
|
||||||
|
{
|
||||||
|
// Continuous 30s steps — no gap exceeds the 60s suspension threshold.
|
||||||
|
var snapshots = new[]
|
||||||
|
{
|
||||||
|
Live(0, 1.5m, 3.0m),
|
||||||
|
Live(30, 1.5m, 3.0m),
|
||||||
|
Live(60, 1.5m, 3.0m),
|
||||||
|
};
|
||||||
|
|
||||||
|
CreateSut().Detect(Event, snapshots).Should().BeEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Should_ReturnEmpty_When_FewerThanMinSnapshots()
|
||||||
|
{
|
||||||
|
var snapshots = new[] { Live(0, 1.5m, 3.0m), Live(120, 1.5m, 3.0m) };
|
||||||
|
|
||||||
|
CreateSut().Detect(Event, snapshots).Should().BeEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Should_EmitParseableEvidence_For_DetectedFreeze()
|
||||||
|
{
|
||||||
|
var snapshots = new[]
|
||||||
|
{
|
||||||
|
Live(0, 1.5m, 3.0m),
|
||||||
|
Live(30, 1.5m, 3.0m),
|
||||||
|
Live(120, 1.5m, 3.0m),
|
||||||
|
};
|
||||||
|
|
||||||
|
var anomaly = CreateSut().Detect(Event, snapshots).First();
|
||||||
|
|
||||||
|
AnomalyEvidenceParser.TryParse(anomaly.EvidenceJson, out var data).Should().BeTrue();
|
||||||
|
data.PreSuspension.Favourite.Should().Be(data.PostSuspension.Favourite, "the favourite is unchanged in a freeze");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Should_Throw_When_FreezeThresholdOutOfRange()
|
||||||
|
{
|
||||||
|
var act = () => new SuspensionFreezeDetector(60, 0m, 3);
|
||||||
|
act.Should().Throw<ArgumentOutOfRangeException>();
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user