using Marathon.Application.Abstractions;
using Marathon.Application.Configuration;
using Marathon.Domain.AnomalyDetection;
using Marathon.Domain.Entities;
using Marathon.Domain.ValueObjects;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Marathon.Application.UseCases;
///
/// Orchestrates one anomaly-detection cycle:
///
/// - Loads all tracked events.
/// - For each event, fetches its last-24-hour live snapshots.
/// - Runs over the snapshot timeline.
/// - Persists any new anomalies that have not already been stored (dedup by EventId + DetectedAt minute-window).
///
///
///
/// 🟡 Optimisation opportunity (Phase 8/9): currently iterates ALL events and loads 24 h of
/// snapshots per event. A future improvement is to track a "last detection run" timestamp per
/// event so we only load new snapshots. This is intentionally deferred to keep Phase 7 scope
/// focused on the detection algorithm.
///
public sealed class DetectAnomaliesUseCase
{
private static readonly TimeSpan SnapshotLookback = TimeSpan.FromHours(24);
// Dedup window: two anomalies for the same event within this window are considered duplicates.
private static readonly TimeSpan DedupWindow = TimeSpan.FromMinutes(1);
private readonly IEventRepository _eventRepo;
private readonly ISnapshotRepository _snapshotRepo;
private readonly IAnomalyRepository _anomalyRepo;
private readonly AnomalyOptions _options;
private readonly ILogger _logger;
public DetectAnomaliesUseCase(
IEventRepository eventRepo,
ISnapshotRepository snapshotRepo,
IAnomalyRepository anomalyRepo,
IOptions options,
ILogger logger)
{
_eventRepo = eventRepo ?? throw new ArgumentNullException(nameof(eventRepo));
_snapshotRepo = snapshotRepo ?? throw new ArgumentNullException(nameof(snapshotRepo));
_anomalyRepo = anomalyRepo ?? throw new ArgumentNullException(nameof(anomalyRepo));
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
///
/// Executes one detection cycle.
///
/// Cancellation token.
/// Number of new anomalies persisted during this cycle.
public async Task ExecuteAsync(CancellationToken ct)
{
_logger.LogInformation("DetectAnomaliesUseCase: cycle started");
var detectors = new IAnomalyDetector[]
{
new AnomalyDetector(
_options.SuspensionGapSeconds,
_options.OddsFlipThreshold,
_options.MinSnapshotCount),
new SteamMoveDetector(
_options.SteamMoveWindowSeconds,
_options.SteamMoveDriftThreshold,
_options.MinSnapshotCount,
_options.SuspensionGapSeconds),
new SuspensionFreezeDetector(
_options.SuspensionGapSeconds,
_options.SuspensionFreezeThreshold,
_options.MinSnapshotCount),
};
var events = await _eventRepo.ListAsync(ct);
int newAnomalyCount = 0;
var now = MoscowTime.Now;
var from = now - SnapshotLookback;
// Hoisted outside the per-event loop: load existing anomalies ONCE per cycle
// and index them by event so dedup is O(1) per event instead of scanning the
// whole list each time (was O(events × anomalies)). Reviewer W1, Phase 7.
var existingAnomalies = await _anomalyRepo.ListAsync(ct);
var existingByEvent = existingAnomalies
.GroupBy(a => a.EventId)
.ToDictionary(g => g.Key, g => g.ToList());
// Single batched query for all events' snapshots — replaces the prior
// per-event ListByEventAsync round-trip (O(N) SQLite hits + N Include(Bets)
// payloads). Returns an empty list for events with no snapshots in range.
var eventIds = events.Select(e => e.Id).ToList();
var snapshotsByEvent = await _snapshotRepo.ListByEventsAsync(eventIds, from, now, ct);
foreach (var ev in events)
{
ct.ThrowIfCancellationRequested();
try
{
var snapshots = snapshotsByEvent.TryGetValue(ev.Id, out var found)
? found
: Array.Empty();
var existingForEvent = existingByEvent.TryGetValue(ev.Id, out var slice)
? slice
: new List();
newAnomalyCount += await ProcessEventAsync(detectors, ev, snapshots, existingForEvent, ct);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"DetectAnomaliesUseCase: failed to process event {EventId} — skipping",
ev.Id.Value);
}
}
_logger.LogInformation(
"DetectAnomaliesUseCase: cycle done — {NewAnomalies} new anomalies across {TotalEvents} events",
newAnomalyCount, events.Count);
return newAnomalyCount;
}
// ── Private helpers ───────────────────────────────────────────────────────
private async Task ProcessEventAsync(
IReadOnlyList detectors,
Event ev,
IReadOnlyList snapshots,
List existingForEvent,
CancellationToken ct)
{
// Fan out over every detector kind; dedup below keys on EventId + Kind so the
// flip and steam signals for one event persist independently.
var detected = detectors
.SelectMany(d => d.Detect(ev.Id, snapshots))
.ToList();
if (detected.Count == 0)
return 0;
int persisted = 0;
foreach (var anomaly in detected)
{
if (IsDuplicate(anomaly, existingForEvent))
continue;
await _anomalyRepo.AddAsync(anomaly, ct);
await _anomalyRepo.SaveChangesAsync(ct);
existingForEvent.Add(anomaly); // Keep local list in sync so the same cycle doesn't re-add.
persisted++;
}
return persisted;
}
private static bool IsDuplicate(Anomaly candidate, IReadOnlyList existing)
{
// Two anomalies are considered duplicates if they share the same EventId, Kind,
// and their DetectedAt timestamps fall within the dedup window.
return existing.Any(a =>
a.EventId == candidate.EventId &&
a.Kind == candidate.Kind &&
Math.Abs((a.DetectedAt - candidate.DetectedAt).TotalMinutes) <=
DedupWindow.TotalMinutes);
}
}