perf(detect-anomalies): batch snapshot loads into a single query (HIGH)
DetectAnomaliesUseCase was issuing one ISnapshotRepository.ListByEventAsync call per event each cycle, with each call rehydrating that event's bets via Include(s => s.Bets) — O(N) SQLite round-trips and N Include payloads on every detection cycle. * Add ISnapshotRepository.ListByEventsAsync(IReadOnlyCollection<EventId>, …) returning a per-event dictionary; events with no snapshots in range get Array.Empty<OddsSnapshot>() so the caller doesn't need a presence check. * Implementation uses a single .Where(s => ids.Contains(s.EventCode)) query and groups in memory. * DetectAnomaliesUseCase loads the whole batch once before the foreach, then ProcessEventAsync receives the per-event slice as a parameter. * Tests updated to stub the new method; per-event-failure test now exercises an AddAsync throw rather than a snapshot-load throw, since individual snapshot loads no longer fail per-event.
This commit is contained in:
@@ -22,6 +22,17 @@ public interface ISnapshotRepository
|
|||||||
DateTimeOffset to,
|
DateTimeOffset to,
|
||||||
CancellationToken ct = default);
|
CancellationToken ct = default);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Batched companion to <see cref="ListByEventAsync"/>: loads snapshots
|
||||||
|
/// for many events in a single query and groups by <see cref="EventId"/>.
|
||||||
|
/// Events with no snapshots in range get an empty list in the result.
|
||||||
|
/// </summary>
|
||||||
|
Task<IReadOnlyDictionary<EventId, IReadOnlyList<OddsSnapshot>>> ListByEventsAsync(
|
||||||
|
IReadOnlyCollection<EventId> eventIds,
|
||||||
|
DateTimeOffset from,
|
||||||
|
DateTimeOffset to,
|
||||||
|
CancellationToken ct = default);
|
||||||
|
|
||||||
Task AddAsync(OddsSnapshot entity, CancellationToken ct = default);
|
Task AddAsync(OddsSnapshot entity, CancellationToken ct = default);
|
||||||
|
|
||||||
Task SaveChangesAsync(CancellationToken ct = default);
|
Task SaveChangesAsync(CancellationToken ct = default);
|
||||||
|
|||||||
@@ -75,13 +75,22 @@ public sealed class DetectAnomaliesUseCase
|
|||||||
// (O(N_events) round-trips). Reviewer W1, Phase 7.
|
// (O(N_events) round-trips). Reviewer W1, Phase 7.
|
||||||
var existingAnomalies = await _anomalyRepo.ListAsync(ct);
|
var existingAnomalies = await _anomalyRepo.ListAsync(ct);
|
||||||
|
|
||||||
|
// Single batched query for all events' snapshots — replaces the prior
|
||||||
|
// per-event ListByEventAsync round-trip (O(N) SQLite hits + N Include(Bets)
|
||||||
|
// payloads). Returns an empty list for events with no snapshots in range.
|
||||||
|
var eventIds = events.Select(e => e.Id).ToList();
|
||||||
|
var snapshotsByEvent = await _snapshotRepo.ListByEventsAsync(eventIds, from, now, ct);
|
||||||
|
|
||||||
foreach (var ev in events)
|
foreach (var ev in events)
|
||||||
{
|
{
|
||||||
ct.ThrowIfCancellationRequested();
|
ct.ThrowIfCancellationRequested();
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
newAnomalyCount += await ProcessEventAsync(detector, ev, from, now, existingAnomalies, ct);
|
var snapshots = snapshotsByEvent.TryGetValue(ev.Id, out var found)
|
||||||
|
? found
|
||||||
|
: Array.Empty<OddsSnapshot>();
|
||||||
|
newAnomalyCount += await ProcessEventAsync(detector, ev, snapshots, existingAnomalies, ct);
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException)
|
catch (OperationCanceledException)
|
||||||
{
|
{
|
||||||
@@ -107,13 +116,11 @@ public sealed class DetectAnomaliesUseCase
|
|||||||
private async Task<int> ProcessEventAsync(
|
private async Task<int> ProcessEventAsync(
|
||||||
AnomalyDetector detector,
|
AnomalyDetector detector,
|
||||||
Event ev,
|
Event ev,
|
||||||
DateTimeOffset from,
|
IReadOnlyList<OddsSnapshot> snapshots,
|
||||||
DateTimeOffset to,
|
|
||||||
IReadOnlyList<Anomaly> existingAnomalies,
|
IReadOnlyList<Anomaly> existingAnomalies,
|
||||||
CancellationToken ct)
|
CancellationToken ct)
|
||||||
{
|
{
|
||||||
var snapshots = await _snapshotRepo.ListByEventAsync(ev.Id, from, to, ct);
|
var detected = detector.Detect(ev.Id, snapshots);
|
||||||
var detected = detector.Detect(ev.Id, snapshots);
|
|
||||||
|
|
||||||
if (detected.Count == 0)
|
if (detected.Count == 0)
|
||||||
return 0;
|
return 0;
|
||||||
|
|||||||
@@ -38,6 +38,43 @@ internal sealed class SnapshotRepository : ISnapshotRepository
|
|||||||
return entities.Select(Mapping.ToDomain).ToList().AsReadOnly();
|
return entities.Select(Mapping.ToDomain).ToList().AsReadOnly();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async Task<IReadOnlyDictionary<EventId, IReadOnlyList<OddsSnapshot>>> ListByEventsAsync(
|
||||||
|
IReadOnlyCollection<EventId> eventIds,
|
||||||
|
DateTimeOffset from,
|
||||||
|
DateTimeOffset to,
|
||||||
|
CancellationToken ct = default)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(eventIds);
|
||||||
|
|
||||||
|
var result = new Dictionary<EventId, IReadOnlyList<OddsSnapshot>>(eventIds.Count);
|
||||||
|
if (eventIds.Count == 0)
|
||||||
|
return result;
|
||||||
|
|
||||||
|
var ids = eventIds.Select(e => e.Value).Distinct().ToArray();
|
||||||
|
var fromStr = from.ToString("O");
|
||||||
|
var toStr = to.ToString("O");
|
||||||
|
|
||||||
|
var entities = await _db.Snapshots.AsNoTracking()
|
||||||
|
.Include(s => s.Bets)
|
||||||
|
.Where(s => ids.Contains(s.EventCode)
|
||||||
|
&& s.CapturedAt.CompareTo(fromStr) >= 0
|
||||||
|
&& s.CapturedAt.CompareTo(toStr) <= 0)
|
||||||
|
.ToListAsync(ct);
|
||||||
|
|
||||||
|
var grouped = entities
|
||||||
|
.GroupBy(e => e.EventCode)
|
||||||
|
.ToDictionary(g => g.Key, g => g.Select(Mapping.ToDomain).ToList());
|
||||||
|
|
||||||
|
foreach (var id in eventIds)
|
||||||
|
{
|
||||||
|
result[id] = grouped.TryGetValue(id.Value, out var list)
|
||||||
|
? list.AsReadOnly()
|
||||||
|
: Array.Empty<OddsSnapshot>();
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
public async Task AddAsync(OddsSnapshot entity, CancellationToken ct = default)
|
public async Task AddAsync(OddsSnapshot entity, CancellationToken ct = default)
|
||||||
{
|
{
|
||||||
var efEntity = Mapping.ToEntity(entity);
|
var efEntity = Mapping.ToEntity(entity);
|
||||||
|
|||||||
@@ -79,9 +79,10 @@ public sealed class DetectAnomaliesUseCaseTests
|
|||||||
.Returns(new[] { ev }.ToList().AsReadOnly());
|
.Returns(new[] { ev }.ToList().AsReadOnly());
|
||||||
|
|
||||||
_snapshotRepo
|
_snapshotRepo
|
||||||
.ListByEventAsync(eventId, Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(),
|
.ListByEventsAsync(Arg.Any<IReadOnlyCollection<EventId>>(),
|
||||||
|
Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(),
|
||||||
Arg.Any<CancellationToken>())
|
Arg.Any<CancellationToken>())
|
||||||
.Returns(BuildFlipTimeline(eventId));
|
.Returns(SnapshotsByEvent(eventId, BuildFlipTimeline(eventId)));
|
||||||
|
|
||||||
// No existing anomalies → dedup will not filter anything.
|
// No existing anomalies → dedup will not filter anything.
|
||||||
_anomalyRepo.ListAsync(Arg.Any<CancellationToken>())
|
_anomalyRepo.ListAsync(Arg.Any<CancellationToken>())
|
||||||
@@ -111,9 +112,10 @@ public sealed class DetectAnomaliesUseCaseTests
|
|||||||
.Returns(new[] { ev }.ToList().AsReadOnly());
|
.Returns(new[] { ev }.ToList().AsReadOnly());
|
||||||
|
|
||||||
_snapshotRepo
|
_snapshotRepo
|
||||||
.ListByEventAsync(eventId, Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(),
|
.ListByEventsAsync(Arg.Any<IReadOnlyCollection<EventId>>(),
|
||||||
|
Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(),
|
||||||
Arg.Any<CancellationToken>())
|
Arg.Any<CancellationToken>())
|
||||||
.Returns(BuildFlipTimeline(eventId));
|
.Returns(SnapshotsByEvent(eventId, BuildFlipTimeline(eventId)));
|
||||||
|
|
||||||
// Existing anomaly with same EventId, Kind=SuspensionFlip, and DetectedAt ≈ now (within dedup window).
|
// Existing anomaly with same EventId, Kind=SuspensionFlip, and DetectedAt ≈ now (within dedup window).
|
||||||
var existingAnomaly = new Anomaly(
|
var existingAnomaly = new Anomaly(
|
||||||
@@ -142,7 +144,9 @@ public sealed class DetectAnomaliesUseCaseTests
|
|||||||
[Fact]
|
[Fact]
|
||||||
public async Task Should_ContinueAfterPerEventFailure_And_ReturnPartialCount()
|
public async Task Should_ContinueAfterPerEventFailure_And_ReturnPartialCount()
|
||||||
{
|
{
|
||||||
// Arrange: two events — first throws on snapshot load, second has a detectable flip.
|
// Arrange: two events — persistence on event 1 throws, event 2 has a detectable flip.
|
||||||
|
// (The batched ListByEventsAsync no longer fails per-event; the per-event try/catch
|
||||||
|
// protects against persistence failures while iterating detected anomalies.)
|
||||||
var ev1Id = new EventId("33333333");
|
var ev1Id = new EventId("33333333");
|
||||||
var ev2Id = new EventId("44444444");
|
var ev2Id = new EventId("44444444");
|
||||||
var ev1 = TestFixtures.MakeEvent(ev1Id.Value);
|
var ev1 = TestFixtures.MakeEvent(ev1Id.Value);
|
||||||
@@ -151,21 +155,23 @@ public sealed class DetectAnomaliesUseCaseTests
|
|||||||
_eventRepo.ListAsync(Arg.Any<CancellationToken>())
|
_eventRepo.ListAsync(Arg.Any<CancellationToken>())
|
||||||
.Returns(new[] { ev1, ev2 }.ToList().AsReadOnly());
|
.Returns(new[] { ev1, ev2 }.ToList().AsReadOnly());
|
||||||
|
|
||||||
// Event 1 — snapshot load throws.
|
// Both events have detectable flip timelines.
|
||||||
_snapshotRepo
|
_snapshotRepo
|
||||||
.ListByEventAsync(ev1Id, Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(),
|
.ListByEventsAsync(Arg.Any<IReadOnlyCollection<EventId>>(),
|
||||||
|
Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(),
|
||||||
Arg.Any<CancellationToken>())
|
Arg.Any<CancellationToken>())
|
||||||
.ThrowsAsync(new InvalidOperationException("DB error for event 1"));
|
.Returns(SnapshotsByEvent(
|
||||||
|
(ev1Id, BuildFlipTimeline(ev1Id)),
|
||||||
// Event 2 — clean flip timeline.
|
(ev2Id, BuildFlipTimeline(ev2Id))));
|
||||||
_snapshotRepo
|
|
||||||
.ListByEventAsync(ev2Id, Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(),
|
|
||||||
Arg.Any<CancellationToken>())
|
|
||||||
.Returns(BuildFlipTimeline(ev2Id));
|
|
||||||
|
|
||||||
_anomalyRepo.ListAsync(Arg.Any<CancellationToken>())
|
_anomalyRepo.ListAsync(Arg.Any<CancellationToken>())
|
||||||
.Returns(Array.Empty<Anomaly>().ToList().AsReadOnly());
|
.Returns(Array.Empty<Anomaly>().ToList().AsReadOnly());
|
||||||
|
|
||||||
|
// Event 1's anomaly persistence throws; event 2's succeeds.
|
||||||
|
_anomalyRepo
|
||||||
|
.AddAsync(Arg.Is<Anomaly>(a => a.EventId == ev1Id), Arg.Any<CancellationToken>())
|
||||||
|
.ThrowsAsync(new InvalidOperationException("DB error for event 1 anomaly"));
|
||||||
|
|
||||||
var sut = CreateSut();
|
var sut = CreateSut();
|
||||||
|
|
||||||
// Act — must not throw despite event 1 failing.
|
// Act — must not throw despite event 1 failing.
|
||||||
@@ -193,14 +199,12 @@ public sealed class DetectAnomaliesUseCaseTests
|
|||||||
.Returns(new[] { ev1, ev2 }.ToList().AsReadOnly());
|
.Returns(new[] { ev1, ev2 }.ToList().AsReadOnly());
|
||||||
|
|
||||||
_snapshotRepo
|
_snapshotRepo
|
||||||
.ListByEventAsync(ev1Id, Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(),
|
.ListByEventsAsync(Arg.Any<IReadOnlyCollection<EventId>>(),
|
||||||
|
Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(),
|
||||||
Arg.Any<CancellationToken>())
|
Arg.Any<CancellationToken>())
|
||||||
.Returns(BuildFlipTimeline(ev1Id));
|
.Returns(SnapshotsByEvent(
|
||||||
|
(ev1Id, BuildFlipTimeline(ev1Id)),
|
||||||
_snapshotRepo
|
(ev2Id, BuildFlipTimeline(ev2Id))));
|
||||||
.ListByEventAsync(ev2Id, Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(),
|
|
||||||
Arg.Any<CancellationToken>())
|
|
||||||
.Returns(BuildFlipTimeline(ev2Id));
|
|
||||||
|
|
||||||
_anomalyRepo.ListAsync(Arg.Any<CancellationToken>())
|
_anomalyRepo.ListAsync(Arg.Any<CancellationToken>())
|
||||||
.Returns(Array.Empty<Anomaly>().ToList().AsReadOnly());
|
.Returns(Array.Empty<Anomaly>().ToList().AsReadOnly());
|
||||||
@@ -214,4 +218,14 @@ public sealed class DetectAnomaliesUseCaseTests
|
|||||||
count.Should().Be(2, "two events, one flip each → 2 new anomalies");
|
count.Should().Be(2, "two events, one flip each → 2 new anomalies");
|
||||||
await _anomalyRepo.Received(2).AddAsync(Arg.Any<Anomaly>(), Arg.Any<CancellationToken>());
|
await _anomalyRepo.Received(2).AddAsync(Arg.Any<Anomaly>(), Arg.Any<CancellationToken>());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Helper: build the dictionary returned by ISnapshotRepository.ListByEventsAsync ─
|
||||||
|
|
||||||
|
private static IReadOnlyDictionary<EventId, IReadOnlyList<OddsSnapshot>> SnapshotsByEvent(
|
||||||
|
EventId id, IReadOnlyList<OddsSnapshot> snapshots) =>
|
||||||
|
new Dictionary<EventId, IReadOnlyList<OddsSnapshot>> { [id] = snapshots };
|
||||||
|
|
||||||
|
private static IReadOnlyDictionary<EventId, IReadOnlyList<OddsSnapshot>> SnapshotsByEvent(
|
||||||
|
params (EventId Id, IReadOnlyList<OddsSnapshot> Snapshots)[] entries) =>
|
||||||
|
entries.ToDictionary(e => e.Id, e => e.Snapshots);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user