feat(phase-7-backend): implement anomaly detection — SuspensionFlip detector, use case, poller, and tests
- AnomalyDetector (pure domain): detects odds-flip pattern from live snapshot timelines using implied-probability vectors (p=1/rate, normalised), flip score = max(|p_post−p_pre|), gated by both threshold AND favourite-changed test - SuspensionInterval record: typed pair of (pre, post) OddsSnapshot bracketing a gap - AnomalyOptions POCO (Application layer): bound to Anomaly:* config section with four fields (SuspensionGapSeconds=60, OddsFlipThreshold=0.30, MinSnapshotCount=3, DetectionIntervalSeconds=60) - DetectAnomaliesUseCase: iterates all events, loads last-24h live snapshots, runs detector, persists new anomalies with 1-minute dedup window - AnomalyDetectionPoller: BackgroundService polling every DetectionIntervalSeconds, gated by WorkerOptions.AnomalyDetectionEnabled (default true) - DI wiring: DetectAnomaliesUseCase registered Scoped in ApplicationModule; AnomalyOptions bound + AnomalyDetectionPoller hosted in InfrastructureModule - WorkerOptions.AnomalyDetectionEnabled added; appsettings.json updated - 13 domain tests + 4 application tests; total 245/245 passing (no regression)
This commit is contained in:
@@ -0,0 +1,146 @@
|
||||
using Marathon.Application.Abstractions;
|
||||
using Marathon.Application.Configuration;
|
||||
using Marathon.Domain.AnomalyDetection;
|
||||
using Marathon.Domain.Entities;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace Marathon.Application.UseCases;
|
||||
|
||||
/// <summary>
|
||||
/// Orchestrates one anomaly-detection cycle:
|
||||
/// <list type="number">
|
||||
/// <item>Loads all tracked events.</item>
|
||||
/// <item>For each event, fetches its last-24-hour live snapshots.</item>
|
||||
/// <item>Runs <see cref="AnomalyDetector"/> over the snapshot timeline.</item>
|
||||
/// <item>Persists any new anomalies that have not already been stored (dedup by EventId + DetectedAt minute-window).</item>
|
||||
/// </list>
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// 🟡 Optimisation opportunity (Phase 8/9): currently iterates ALL events and loads 24 h of
|
||||
/// snapshots per event. A future improvement is to track a "last detection run" timestamp per
|
||||
/// event so we only load new snapshots. This is intentionally deferred to keep Phase 7 scope
|
||||
/// focused on the detection algorithm.
|
||||
/// </remarks>
|
||||
public sealed class DetectAnomaliesUseCase
|
||||
{
|
||||
private static readonly TimeSpan MoscowOffset = TimeSpan.FromHours(3);
|
||||
private static readonly TimeSpan SnapshotLookback = TimeSpan.FromHours(24);
|
||||
|
||||
// Dedup window: two anomalies for the same event within this window are considered duplicates.
|
||||
private static readonly TimeSpan DedupWindow = TimeSpan.FromMinutes(1);
|
||||
|
||||
private readonly IEventRepository _eventRepo;
|
||||
private readonly ISnapshotRepository _snapshotRepo;
|
||||
private readonly IAnomalyRepository _anomalyRepo;
|
||||
private readonly AnomalyOptions _options;
|
||||
private readonly ILogger<DetectAnomaliesUseCase> _logger;
|
||||
|
||||
public DetectAnomaliesUseCase(
|
||||
IEventRepository eventRepo,
|
||||
ISnapshotRepository snapshotRepo,
|
||||
IAnomalyRepository anomalyRepo,
|
||||
IOptions<AnomalyOptions> options,
|
||||
ILogger<DetectAnomaliesUseCase> logger)
|
||||
{
|
||||
_eventRepo = eventRepo ?? throw new ArgumentNullException(nameof(eventRepo));
|
||||
_snapshotRepo = snapshotRepo ?? throw new ArgumentNullException(nameof(snapshotRepo));
|
||||
_anomalyRepo = anomalyRepo ?? throw new ArgumentNullException(nameof(anomalyRepo));
|
||||
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value;
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Executes one detection cycle.
|
||||
/// </summary>
|
||||
/// <param name="ct">Cancellation token.</param>
|
||||
/// <returns>Number of new anomalies persisted during this cycle.</returns>
|
||||
public async Task<int> ExecuteAsync(CancellationToken ct)
|
||||
{
|
||||
_logger.LogInformation("DetectAnomaliesUseCase: cycle started");
|
||||
|
||||
var detector = new AnomalyDetector(
|
||||
_options.SuspensionGapSeconds,
|
||||
_options.OddsFlipThreshold,
|
||||
_options.MinSnapshotCount);
|
||||
|
||||
var events = await _eventRepo.ListAsync(ct);
|
||||
int newAnomalyCount = 0;
|
||||
|
||||
var now = DateTimeOffset.UtcNow.ToOffset(MoscowOffset);
|
||||
var from = now - SnapshotLookback;
|
||||
|
||||
foreach (var ev in events)
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
|
||||
try
|
||||
{
|
||||
newAnomalyCount += await ProcessEventAsync(detector, ev, from, now, ct);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"DetectAnomaliesUseCase: failed to process event {EventId} — skipping",
|
||||
ev.Id.Value);
|
||||
}
|
||||
}
|
||||
|
||||
_logger.LogInformation(
|
||||
"DetectAnomaliesUseCase: cycle done — {NewAnomalies} new anomalies across {TotalEvents} events",
|
||||
newAnomalyCount, events.Count);
|
||||
|
||||
return newAnomalyCount;
|
||||
}
|
||||
|
||||
// ── Private helpers ───────────────────────────────────────────────────────
|
||||
|
||||
private async Task<int> ProcessEventAsync(
|
||||
AnomalyDetector detector,
|
||||
Event ev,
|
||||
DateTimeOffset from,
|
||||
DateTimeOffset to,
|
||||
CancellationToken ct)
|
||||
{
|
||||
var snapshots = await _snapshotRepo.ListByEventAsync(ev.Id, from, to, ct);
|
||||
var detected = detector.Detect(ev.Id, snapshots);
|
||||
|
||||
if (detected.Count == 0)
|
||||
return 0;
|
||||
|
||||
// Load existing anomalies for this event so we can deduplicate.
|
||||
var existing = await _anomalyRepo.ListAsync(ct);
|
||||
var existingForEvent = existing
|
||||
.Where(a => a.EventId == ev.Id)
|
||||
.ToList();
|
||||
|
||||
int persisted = 0;
|
||||
foreach (var anomaly in detected)
|
||||
{
|
||||
if (IsDuplicate(anomaly, existingForEvent))
|
||||
continue;
|
||||
|
||||
await _anomalyRepo.AddAsync(anomaly, ct);
|
||||
await _anomalyRepo.SaveChangesAsync(ct);
|
||||
existingForEvent.Add(anomaly); // Keep local list in sync so the same cycle doesn't re-add.
|
||||
persisted++;
|
||||
}
|
||||
|
||||
return persisted;
|
||||
}
|
||||
|
||||
private static bool IsDuplicate(Anomaly candidate, IReadOnlyList<Anomaly> existing)
|
||||
{
|
||||
// Two anomalies are considered duplicates if they share the same EventId, Kind,
|
||||
// and their DetectedAt timestamps fall within the dedup window.
|
||||
return existing.Any(a =>
|
||||
a.EventId == candidate.EventId &&
|
||||
a.Kind == candidate.Kind &&
|
||||
Math.Abs((a.DetectedAt - candidate.DetectedAt).TotalMinutes) <=
|
||||
DedupWindow.TotalMinutes);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user