From 66ae0382435ea75d88cfdaff3543909f3bc7c878 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Sat, 9 May 2026 15:17:49 +0300 Subject: [PATCH] perf(detect-anomalies): batch snapshot loads into a single query (HIGH) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DetectAnomaliesUseCase was issuing one ISnapshotRepository.ListByEventAsync call per event each cycle, with each call rehydrating that event's bets via Include(s => s.Bets) — O(N) SQLite round-trips and N Include payloads on every detection cycle. * Add ISnapshotRepository.ListByEventsAsync(IReadOnlyCollection, …) returning a per-event dictionary; events with no snapshots in range get Array.Empty() so the caller doesn't need a presence check. * Implementation uses a single .Where(s => ids.Contains(s.EventCode)) query and groups in memory. * DetectAnomaliesUseCase loads the whole batch once before the foreach, then ProcessEventAsync receives the per-event slice as a parameter. * Tests updated to stub the new method; per-event-failure test now exercises an AddAsync throw rather than a snapshot-load throw, since individual snapshot loads no longer fail per-event. --- .../Abstractions/ISnapshotRepository.cs | 11 ++++ .../UseCases/DetectAnomaliesUseCase.cs | 17 ++++-- .../Repositories/SnapshotRepository.cs | 37 ++++++++++++ .../UseCases/DetectAnomaliesUseCaseTests.cs | 56 ++++++++++++------- 4 files changed, 95 insertions(+), 26 deletions(-) diff --git a/src/Marathon.Application/Abstractions/ISnapshotRepository.cs b/src/Marathon.Application/Abstractions/ISnapshotRepository.cs index 895ba5d..7bc8440 100644 --- a/src/Marathon.Application/Abstractions/ISnapshotRepository.cs +++ b/src/Marathon.Application/Abstractions/ISnapshotRepository.cs @@ -22,6 +22,17 @@ public interface ISnapshotRepository DateTimeOffset to, CancellationToken ct = default); + /// + /// Batched companion to : loads snapshots + /// for many events in a single query and groups by . + /// Events with no snapshots in range get an empty list in the result. + /// + Task>> ListByEventsAsync( + IReadOnlyCollection eventIds, + DateTimeOffset from, + DateTimeOffset to, + CancellationToken ct = default); + Task AddAsync(OddsSnapshot entity, CancellationToken ct = default); Task SaveChangesAsync(CancellationToken ct = default); diff --git a/src/Marathon.Application/UseCases/DetectAnomaliesUseCase.cs b/src/Marathon.Application/UseCases/DetectAnomaliesUseCase.cs index 0fa93e3..4e4692d 100644 --- a/src/Marathon.Application/UseCases/DetectAnomaliesUseCase.cs +++ b/src/Marathon.Application/UseCases/DetectAnomaliesUseCase.cs @@ -75,13 +75,22 @@ public sealed class DetectAnomaliesUseCase // (O(N_events) round-trips). Reviewer W1, Phase 7. var existingAnomalies = await _anomalyRepo.ListAsync(ct); + // Single batched query for all events' snapshots — replaces the prior + // per-event ListByEventAsync round-trip (O(N) SQLite hits + N Include(Bets) + // payloads). Returns an empty list for events with no snapshots in range. + var eventIds = events.Select(e => e.Id).ToList(); + var snapshotsByEvent = await _snapshotRepo.ListByEventsAsync(eventIds, from, now, ct); + foreach (var ev in events) { ct.ThrowIfCancellationRequested(); try { - newAnomalyCount += await ProcessEventAsync(detector, ev, from, now, existingAnomalies, ct); + var snapshots = snapshotsByEvent.TryGetValue(ev.Id, out var found) + ? found + : Array.Empty(); + newAnomalyCount += await ProcessEventAsync(detector, ev, snapshots, existingAnomalies, ct); } catch (OperationCanceledException) { @@ -107,13 +116,11 @@ public sealed class DetectAnomaliesUseCase private async Task ProcessEventAsync( AnomalyDetector detector, Event ev, - DateTimeOffset from, - DateTimeOffset to, + IReadOnlyList snapshots, IReadOnlyList existingAnomalies, CancellationToken ct) { - var snapshots = await _snapshotRepo.ListByEventAsync(ev.Id, from, to, ct); - var detected = detector.Detect(ev.Id, snapshots); + var detected = detector.Detect(ev.Id, snapshots); if (detected.Count == 0) return 0; diff --git a/src/Marathon.Infrastructure/Persistence/Repositories/SnapshotRepository.cs b/src/Marathon.Infrastructure/Persistence/Repositories/SnapshotRepository.cs index f3d2f1c..8654737 100644 --- a/src/Marathon.Infrastructure/Persistence/Repositories/SnapshotRepository.cs +++ b/src/Marathon.Infrastructure/Persistence/Repositories/SnapshotRepository.cs @@ -38,6 +38,43 @@ internal sealed class SnapshotRepository : ISnapshotRepository return entities.Select(Mapping.ToDomain).ToList().AsReadOnly(); } + public async Task>> ListByEventsAsync( + IReadOnlyCollection eventIds, + DateTimeOffset from, + DateTimeOffset to, + CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(eventIds); + + var result = new Dictionary>(eventIds.Count); + if (eventIds.Count == 0) + return result; + + var ids = eventIds.Select(e => e.Value).Distinct().ToArray(); + var fromStr = from.ToString("O"); + var toStr = to.ToString("O"); + + var entities = await _db.Snapshots.AsNoTracking() + .Include(s => s.Bets) + .Where(s => ids.Contains(s.EventCode) + && s.CapturedAt.CompareTo(fromStr) >= 0 + && s.CapturedAt.CompareTo(toStr) <= 0) + .ToListAsync(ct); + + var grouped = entities + .GroupBy(e => e.EventCode) + .ToDictionary(g => g.Key, g => g.Select(Mapping.ToDomain).ToList()); + + foreach (var id in eventIds) + { + result[id] = grouped.TryGetValue(id.Value, out var list) + ? list.AsReadOnly() + : Array.Empty(); + } + + return result; + } + public async Task AddAsync(OddsSnapshot entity, CancellationToken ct = default) { var efEntity = Mapping.ToEntity(entity); diff --git a/tests/Marathon.Application.Tests/UseCases/DetectAnomaliesUseCaseTests.cs b/tests/Marathon.Application.Tests/UseCases/DetectAnomaliesUseCaseTests.cs index 6312058..90ad976 100644 --- a/tests/Marathon.Application.Tests/UseCases/DetectAnomaliesUseCaseTests.cs +++ b/tests/Marathon.Application.Tests/UseCases/DetectAnomaliesUseCaseTests.cs @@ -79,9 +79,10 @@ public sealed class DetectAnomaliesUseCaseTests .Returns(new[] { ev }.ToList().AsReadOnly()); _snapshotRepo - .ListByEventAsync(eventId, Arg.Any(), Arg.Any(), + .ListByEventsAsync(Arg.Any>(), + Arg.Any(), Arg.Any(), Arg.Any()) - .Returns(BuildFlipTimeline(eventId)); + .Returns(SnapshotsByEvent(eventId, BuildFlipTimeline(eventId))); // No existing anomalies → dedup will not filter anything. _anomalyRepo.ListAsync(Arg.Any()) @@ -111,9 +112,10 @@ public sealed class DetectAnomaliesUseCaseTests .Returns(new[] { ev }.ToList().AsReadOnly()); _snapshotRepo - .ListByEventAsync(eventId, Arg.Any(), Arg.Any(), + .ListByEventsAsync(Arg.Any>(), + Arg.Any(), Arg.Any(), Arg.Any()) - .Returns(BuildFlipTimeline(eventId)); + .Returns(SnapshotsByEvent(eventId, BuildFlipTimeline(eventId))); // Existing anomaly with same EventId, Kind=SuspensionFlip, and DetectedAt ≈ now (within dedup window). var existingAnomaly = new Anomaly( @@ -142,7 +144,9 @@ public sealed class DetectAnomaliesUseCaseTests [Fact] public async Task Should_ContinueAfterPerEventFailure_And_ReturnPartialCount() { - // Arrange: two events — first throws on snapshot load, second has a detectable flip. + // Arrange: two events — persistence on event 1 throws, event 2 has a detectable flip. + // (The batched ListByEventsAsync no longer fails per-event; the per-event try/catch + // protects against persistence failures while iterating detected anomalies.) var ev1Id = new EventId("33333333"); var ev2Id = new EventId("44444444"); var ev1 = TestFixtures.MakeEvent(ev1Id.Value); @@ -151,21 +155,23 @@ public sealed class DetectAnomaliesUseCaseTests _eventRepo.ListAsync(Arg.Any()) .Returns(new[] { ev1, ev2 }.ToList().AsReadOnly()); - // Event 1 — snapshot load throws. + // Both events have detectable flip timelines. _snapshotRepo - .ListByEventAsync(ev1Id, Arg.Any(), Arg.Any(), + .ListByEventsAsync(Arg.Any>(), + Arg.Any(), Arg.Any(), Arg.Any()) - .ThrowsAsync(new InvalidOperationException("DB error for event 1")); - - // Event 2 — clean flip timeline. - _snapshotRepo - .ListByEventAsync(ev2Id, Arg.Any(), Arg.Any(), - Arg.Any()) - .Returns(BuildFlipTimeline(ev2Id)); + .Returns(SnapshotsByEvent( + (ev1Id, BuildFlipTimeline(ev1Id)), + (ev2Id, BuildFlipTimeline(ev2Id)))); _anomalyRepo.ListAsync(Arg.Any()) .Returns(Array.Empty().ToList().AsReadOnly()); + // Event 1's anomaly persistence throws; event 2's succeeds. + _anomalyRepo + .AddAsync(Arg.Is(a => a.EventId == ev1Id), Arg.Any()) + .ThrowsAsync(new InvalidOperationException("DB error for event 1 anomaly")); + var sut = CreateSut(); // Act — must not throw despite event 1 failing. @@ -193,14 +199,12 @@ public sealed class DetectAnomaliesUseCaseTests .Returns(new[] { ev1, ev2 }.ToList().AsReadOnly()); _snapshotRepo - .ListByEventAsync(ev1Id, Arg.Any(), Arg.Any(), + .ListByEventsAsync(Arg.Any>(), + Arg.Any(), Arg.Any(), Arg.Any()) - .Returns(BuildFlipTimeline(ev1Id)); - - _snapshotRepo - .ListByEventAsync(ev2Id, Arg.Any(), Arg.Any(), - Arg.Any()) - .Returns(BuildFlipTimeline(ev2Id)); + .Returns(SnapshotsByEvent( + (ev1Id, BuildFlipTimeline(ev1Id)), + (ev2Id, BuildFlipTimeline(ev2Id)))); _anomalyRepo.ListAsync(Arg.Any()) .Returns(Array.Empty().ToList().AsReadOnly()); @@ -214,4 +218,14 @@ public sealed class DetectAnomaliesUseCaseTests count.Should().Be(2, "two events, one flip each → 2 new anomalies"); await _anomalyRepo.Received(2).AddAsync(Arg.Any(), Arg.Any()); } + + // ── Helper: build the dictionary returned by ISnapshotRepository.ListByEventsAsync ─ + + private static IReadOnlyDictionary> SnapshotsByEvent( + EventId id, IReadOnlyList snapshots) => + new Dictionary> { [id] = snapshots }; + + private static IReadOnlyDictionary> SnapshotsByEvent( + params (EventId Id, IReadOnlyList Snapshots)[] entries) => + entries.ToDictionary(e => e.Id, e => e.Snapshots); }