perf: batch repository reads, index snapshots, centralize date encoding
- Add IEventRepository/IResultRepository.GetManyAsync to kill N+1 lookups at 6 sites (backtest, outcome eval, both bet-journal paths, anomaly browsing, results selection); guarded by a Received(1).GetManyAsync test. - Add EventRepository.QueryAsync to push date+sport filtering to SQL (was load-whole-range-then-filter); search/sort stay in-memory for Cyrillic order. - Add AnomalyRepository.CountSinceAsync (unread badge) + ListByDateRangeAsync (feed date filter); add Event/Snapshot count methods for the dashboard. - Add composite indexes IX_Snapshots_EventCode_CapturedAt and _EventCode_Source_CapturedAt via a new migration + model snapshot. - Introduce SqliteDateText as the single source of the O-format date encoding shared by Mapping (read/write) and the repositories' range predicates. - Fix LiveOddsPoller cadence drift (budget sleep against cycle time); make DetectAnomalies dedup O(1) per event; add Event.Title to dedup the title join. Tests adapted to the batched GetManyAsync via a TestFixtures bridge.
This commit is contained in:
@@ -54,16 +54,17 @@ public sealed class BuildBetJournalReportUseCase
|
||||
|
||||
var distinctEventIds = bets.Select(b => b.EventId).Distinct().ToList();
|
||||
|
||||
// Resolve closing snapshot per event using a single-row repo call —
|
||||
// pushes the ORDER BY / LIMIT 1 down to SQLite rather than materialising
|
||||
// every snapshot in a 30-day window.
|
||||
// Batch the event loads (was N+1). The closing-snapshot lookup stays per-event
|
||||
// because it pushes ORDER BY / LIMIT 1 down to SQLite (one indexed row each)
|
||||
// and is parameterised by that event's ScheduledAt.
|
||||
var events = await _events.GetManyAsync(distinctEventIds, ct).ConfigureAwait(false);
|
||||
|
||||
var closingByEvent = new Dictionary<DomainEventId, OddsSnapshot?>(distinctEventIds.Count);
|
||||
foreach (var eventId in distinctEventIds)
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
|
||||
var ev = await _events.GetAsync(eventId, ct).ConfigureAwait(false);
|
||||
if (ev is null)
|
||||
if (!events.TryGetValue(eventId, out var ev))
|
||||
{
|
||||
closingByEvent[eventId] = null;
|
||||
continue;
|
||||
|
||||
@@ -30,10 +30,10 @@ public sealed class DetectAnomaliesUseCase
|
||||
// Dedup window: two anomalies for the same event within this window are considered duplicates.
|
||||
private static readonly TimeSpan DedupWindow = TimeSpan.FromMinutes(1);
|
||||
|
||||
private readonly IEventRepository _eventRepo;
|
||||
private readonly IEventRepository _eventRepo;
|
||||
private readonly ISnapshotRepository _snapshotRepo;
|
||||
private readonly IAnomalyRepository _anomalyRepo;
|
||||
private readonly AnomalyOptions _options;
|
||||
private readonly IAnomalyRepository _anomalyRepo;
|
||||
private readonly AnomalyOptions _options;
|
||||
private readonly ILogger<DetectAnomaliesUseCase> _logger;
|
||||
|
||||
public DetectAnomaliesUseCase(
|
||||
@@ -43,11 +43,11 @@ public sealed class DetectAnomaliesUseCase
|
||||
IOptions<AnomalyOptions> options,
|
||||
ILogger<DetectAnomaliesUseCase> logger)
|
||||
{
|
||||
_eventRepo = eventRepo ?? throw new ArgumentNullException(nameof(eventRepo));
|
||||
_eventRepo = eventRepo ?? throw new ArgumentNullException(nameof(eventRepo));
|
||||
_snapshotRepo = snapshotRepo ?? throw new ArgumentNullException(nameof(snapshotRepo));
|
||||
_anomalyRepo = anomalyRepo ?? throw new ArgumentNullException(nameof(anomalyRepo));
|
||||
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value;
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_anomalyRepo = anomalyRepo ?? throw new ArgumentNullException(nameof(anomalyRepo));
|
||||
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value;
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -67,13 +67,16 @@ public sealed class DetectAnomaliesUseCase
|
||||
var events = await _eventRepo.ListAsync(ct);
|
||||
int newAnomalyCount = 0;
|
||||
|
||||
var now = MoscowTime.Now;
|
||||
var now = MoscowTime.Now;
|
||||
var from = now - SnapshotLookback;
|
||||
|
||||
// Hoisted outside the per-event loop: load existing anomalies ONCE per cycle
|
||||
// and slice per-event in the loop. Previously this was reloaded per event
|
||||
// (O(N_events) round-trips). Reviewer W1, Phase 7.
|
||||
// and index them by event so dedup is O(1) per event instead of scanning the
|
||||
// whole list each time (was O(events × anomalies)). Reviewer W1, Phase 7.
|
||||
var existingAnomalies = await _anomalyRepo.ListAsync(ct);
|
||||
var existingByEvent = existingAnomalies
|
||||
.GroupBy(a => a.EventId)
|
||||
.ToDictionary(g => g.Key, g => g.ToList());
|
||||
|
||||
// Single batched query for all events' snapshots — replaces the prior
|
||||
// per-event ListByEventAsync round-trip (O(N) SQLite hits + N Include(Bets)
|
||||
@@ -90,7 +93,10 @@ public sealed class DetectAnomaliesUseCase
|
||||
var snapshots = snapshotsByEvent.TryGetValue(ev.Id, out var found)
|
||||
? found
|
||||
: Array.Empty<OddsSnapshot>();
|
||||
newAnomalyCount += await ProcessEventAsync(detector, ev, snapshots, existingAnomalies, ct);
|
||||
var existingForEvent = existingByEvent.TryGetValue(ev.Id, out var slice)
|
||||
? slice
|
||||
: new List<Anomaly>();
|
||||
newAnomalyCount += await ProcessEventAsync(detector, ev, snapshots, existingForEvent, ct);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
@@ -117,7 +123,7 @@ public sealed class DetectAnomaliesUseCase
|
||||
AnomalyDetector detector,
|
||||
Event ev,
|
||||
IReadOnlyList<OddsSnapshot> snapshots,
|
||||
IReadOnlyList<Anomaly> existingAnomalies,
|
||||
List<Anomaly> existingForEvent,
|
||||
CancellationToken ct)
|
||||
{
|
||||
var detected = detector.Detect(ev.Id, snapshots);
|
||||
@@ -125,11 +131,6 @@ public sealed class DetectAnomaliesUseCase
|
||||
if (detected.Count == 0)
|
||||
return 0;
|
||||
|
||||
// Slice the cycle-wide existing-anomaly list to just this event for dedup.
|
||||
var existingForEvent = existingAnomalies
|
||||
.Where(a => a.EventId == ev.Id)
|
||||
.ToList();
|
||||
|
||||
int persisted = 0;
|
||||
foreach (var anomaly in detected)
|
||||
{
|
||||
@@ -151,7 +152,7 @@ public sealed class DetectAnomaliesUseCase
|
||||
// and their DetectedAt timestamps fall within the dedup window.
|
||||
return existing.Any(a =>
|
||||
a.EventId == candidate.EventId &&
|
||||
a.Kind == candidate.Kind &&
|
||||
a.Kind == candidate.Kind &&
|
||||
Math.Abs((a.DetectedAt - candidate.DetectedAt).TotalMinutes) <=
|
||||
DedupWindow.TotalMinutes);
|
||||
}
|
||||
|
||||
@@ -75,29 +75,16 @@ public sealed class EvaluateAnomalyOutcomesUseCase
|
||||
return EmptyReport();
|
||||
}
|
||||
|
||||
// Build event + result lookups — distinct keys only to avoid quadratic loads.
|
||||
// TODO (perf, future): batch via IEventRepository.GetManyAsync / IResultRepository.GetManyAsync
|
||||
// once the repositories expose them. Today the per-event GetAsync round-trip is acceptable
|
||||
// because anomaly volumes are bounded (1 row per suspension interval per event).
|
||||
// Batched lookups — a single query each, replacing the prior per-event
|
||||
// GetAsync round-trip (N+1 against SQLite).
|
||||
var distinctEventIds = anomalies.Select(a => a.EventId).Distinct().ToList();
|
||||
|
||||
var eventLookup = new Dictionary<DomainEventId, Event>(distinctEventIds.Count);
|
||||
var resultLookup = new Dictionary<DomainEventId, EventResult>(distinctEventIds.Count);
|
||||
var eventTitles = new Dictionary<DomainEventId, string>(distinctEventIds.Count);
|
||||
foreach (var id in distinctEventIds)
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
var eventLookup = await _events.GetManyAsync(distinctEventIds, ct).ConfigureAwait(false);
|
||||
var resultLookup = await _results.GetManyAsync(distinctEventIds, ct).ConfigureAwait(false);
|
||||
|
||||
var ev = await _events.GetAsync(id, ct).ConfigureAwait(false);
|
||||
if (ev is not null)
|
||||
{
|
||||
eventLookup[id] = ev;
|
||||
eventTitles[id] = string.Concat(ev.Side1Name, " vs ", ev.Side2Name);
|
||||
}
|
||||
|
||||
var res = await _results.GetAsync(id, ct).ConfigureAwait(false);
|
||||
if (res is not null) resultLookup[id] = res;
|
||||
}
|
||||
var eventTitles = new Dictionary<DomainEventId, string>(eventLookup.Count);
|
||||
foreach (var (id, ev) in eventLookup)
|
||||
eventTitles[id] = ev.Title;
|
||||
|
||||
// Evaluate every anomaly through the pure domain function.
|
||||
var resolved = new List<ResolvedAnomaly>();
|
||||
|
||||
@@ -72,10 +72,10 @@ public sealed class PullResultsUseCase
|
||||
IResultRepository resultRepo,
|
||||
ILogger<PullResultsUseCase> logger)
|
||||
{
|
||||
_scraper = scraper ?? throw new ArgumentNullException(nameof(scraper));
|
||||
_eventRepo = eventRepo ?? throw new ArgumentNullException(nameof(eventRepo));
|
||||
_scraper = scraper ?? throw new ArgumentNullException(nameof(scraper));
|
||||
_eventRepo = eventRepo ?? throw new ArgumentNullException(nameof(eventRepo));
|
||||
_resultRepo = resultRepo ?? throw new ArgumentNullException(nameof(resultRepo));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -149,12 +149,13 @@ public sealed class PullResultsUseCase
|
||||
{
|
||||
if (selection is { Count: > 0 })
|
||||
{
|
||||
// Batched load (was N+1); preserve the caller's selection order and
|
||||
// silently drop ids with no stored event.
|
||||
var events = await _eventRepo.GetManyAsync(selection, ct).ConfigureAwait(false);
|
||||
var resolved = new List<Event>(selection.Count);
|
||||
foreach (var id in selection)
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
var ev = await _eventRepo.GetAsync(id, ct).ConfigureAwait(false);
|
||||
if (ev is not null)
|
||||
if (events.TryGetValue(id, out var ev))
|
||||
resolved.Add(ev);
|
||||
}
|
||||
return resolved;
|
||||
|
||||
@@ -63,29 +63,16 @@ public sealed class RunBacktestUseCase
|
||||
return BacktestSimulator.Run(strategy, Array.Empty<BacktestCandidate>());
|
||||
}
|
||||
|
||||
// Distinct event lookups — minimises repo calls.
|
||||
// TODO (perf, future): batch via IEventRepository.GetManyAsync /
|
||||
// IResultRepository.GetManyAsync once those exist — currently shared
|
||||
// with EvaluateAnomalyOutcomesUseCase, acceptable at expected volumes.
|
||||
// Batched lookups — a single query each, replacing the prior per-event
|
||||
// GetAsync round-trip (N+1 against SQLite).
|
||||
var distinctEventIds = anomalies.Select(a => a.EventId).Distinct().ToList();
|
||||
|
||||
var eventLookup = new Dictionary<DomainEventId, Event>(distinctEventIds.Count);
|
||||
var resultLookup = new Dictionary<DomainEventId, EventResult>(distinctEventIds.Count);
|
||||
var titles = new Dictionary<DomainEventId, string>(distinctEventIds.Count);
|
||||
foreach (var id in distinctEventIds)
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
var eventLookup = await _events.GetManyAsync(distinctEventIds, ct).ConfigureAwait(false);
|
||||
var resultLookup = await _results.GetManyAsync(distinctEventIds, ct).ConfigureAwait(false);
|
||||
|
||||
var ev = await _events.GetAsync(id, ct).ConfigureAwait(false);
|
||||
if (ev is not null)
|
||||
{
|
||||
eventLookup[id] = ev;
|
||||
titles[id] = string.Concat(ev.Side1Name, " vs ", ev.Side2Name);
|
||||
}
|
||||
|
||||
var res = await _results.GetAsync(id, ct).ConfigureAwait(false);
|
||||
if (res is not null) resultLookup[id] = res;
|
||||
}
|
||||
var titles = new Dictionary<DomainEventId, string>(eventLookup.Count);
|
||||
foreach (var (id, ev) in eventLookup)
|
||||
titles[id] = ev.Title;
|
||||
|
||||
var candidates = new List<BacktestCandidate>(anomalies.Count);
|
||||
foreach (var anomaly in anomalies)
|
||||
|
||||
Reference in New Issue
Block a user