perf(scraping): parallel HTTP fan-out, sequential DB persist (HIGH)

The Pull*UseCase implementations issued one HTTP request at a time despite
Scraping:MaxConcurrentRequests=4. With 30–80 live events and ~1s per
fetch, a 5–10s live cadence target was unreachable; cycles overflowed
the configured interval.

* New Marathon.Application.Configuration.ScrapingThrottle bound from the
  shared Scraping:* section. Exposes only MaxConcurrentRequests so the
  Application layer doesn't pull in the Infrastructure-side ScrapingOptions.
* PullLiveOddsUseCase + PullUpcomingEventsUseCase split into two phases:
  - Phase 1 — Parallel.ForEachAsync over the event list with
    MaxDegreeOfParallelism = throttle.MaxConcurrentRequests. The scraper's
    Polly rate limiter still throttles to RequestsPerSecond underneath
    this fan-out, so spikes are smoothed before they hit the bookmaker.
  - Phase 2 — sequential foreach over the (Event, Snapshot) tuples
    captured in Phase 1, doing event upsert + snapshot insert. EF Core
    DbContext is not thread-safe so all DB writes stay on a single thread.
* InfrastructureModule binds ScrapingThrottle alongside AnomalyOptions.
* Failed snapshot scrapes in Phase 1 mean the event row is also NOT
  persisted in Phase 2 — previously we'd persist the row even when the
  snapshot scrape failed, leaving an orphan event with no odds. Updated
  the regression test accordingly.
* Test fixture exposes TestFixtures.Throttle(maxConcurrentRequests=1) for
  deterministic sequential test runs.
* One existing NSubstitute setup that chained Arg.Is<>() across two
  configurations was rewritten to use a single Arg.Any<>() with inline
  branching — chained matchers were leaking and returning wrong results.
This commit is contained in:
2026-05-09 15:27:06 +03:00
parent 66ae038243
commit 286b55986b
8 changed files with 177 additions and 53 deletions
@@ -1,7 +1,10 @@
using System.Collections.Concurrent;
using Marathon.Application.Abstractions;
using Marathon.Application.Configuration;
using Marathon.Domain.Entities;
using Marathon.Domain.Enums;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Marathon.Application.UseCases;
@@ -21,17 +24,20 @@ public sealed class PullLiveOddsUseCase
private readonly IOddsScraper _scraper;
private readonly IEventRepository _eventRepo;
private readonly ISnapshotRepository _snapshotRepo;
private readonly IOptionsMonitor<ScrapingThrottle> _throttle;
private readonly ILogger<PullLiveOddsUseCase> _logger;
public PullLiveOddsUseCase(
IOddsScraper scraper,
IEventRepository eventRepo,
ISnapshotRepository snapshotRepo,
IOptionsMonitor<ScrapingThrottle> throttle,
ILogger<PullLiveOddsUseCase> logger)
{
_scraper = scraper ?? throw new ArgumentNullException(nameof(scraper));
_eventRepo = eventRepo ?? throw new ArgumentNullException(nameof(eventRepo));
_snapshotRepo = snapshotRepo ?? throw new ArgumentNullException(nameof(snapshotRepo));
_throttle = throttle ?? throw new ArgumentNullException(nameof(throttle));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
@@ -64,39 +70,27 @@ public sealed class PullLiveOddsUseCase
"PullLiveOddsUseCase: scraper returned {Count} live events",
liveEvents.Count);
int snapshotsCaptured = 0;
// Phase 1 — parallel HTTP fan-out: scrape every event's odds in parallel,
// capped at MaxConcurrentRequests. The scraper's rate limiter still
// throttles to RequestsPerSecond underneath this fan-out, so spikes are
// smoothed out before they reach the bookmaker. We deliberately do NOT
// touch the DbContext (or its repositories) inside the parallel block —
// EF Core DbContext is not thread-safe.
var scraped = new ConcurrentBag<(Event Live, OddsSnapshot Snapshot)>();
var maxParallelism = Math.Max(1, _throttle.CurrentValue.MaxConcurrentRequests);
foreach (var live in liveEvents)
var parallelOptions = new ParallelOptions
{
ct.ThrowIfCancellationRequested();
MaxDegreeOfParallelism = maxParallelism,
CancellationToken = ct,
};
// Persist new live events — the upcoming poller may not have seen them
// yet (or never will, for matches added after their scheduled start).
// The Live page reads from the events table, so a new live row must
// exist before its snapshots become visible.
Event eventForScrape;
await Parallel.ForEachAsync(liveEvents, parallelOptions, async (live, taskCt) =>
{
try
{
var existing = await _eventRepo.GetAsync(live.Id, ct);
if (existing is null)
{
await _eventRepo.AddAsync(live, ct);
await _eventRepo.SaveChangesAsync(ct);
eventForScrape = live;
}
else if (existing.EventPath is null && live.EventPath is not null)
{
// Backfill EventPath on rows persisted before the column existed,
// so subsequent scrapes can use the correct URL.
var patched = existing with { EventPath = live.EventPath };
await _eventRepo.UpdateAsync(patched, ct);
await _eventRepo.SaveChangesAsync(ct);
eventForScrape = patched;
}
else
{
eventForScrape = existing;
}
var snapshot = await _scraper.ScrapeEventOddsAsync(live, OddsSource.Live, taskCt);
scraped.Add((live, snapshot));
}
catch (OperationCanceledException)
{
@@ -105,14 +99,38 @@ public sealed class PullLiveOddsUseCase
catch (Exception ex)
{
_logger.LogWarning(ex,
"PullLiveOddsUseCase: failed to persist/lookup live event {EventId} — skipping",
"PullLiveOddsUseCase: failed to capture live snapshot for event {EventId} — skipping",
live.Id.Value);
continue;
}
});
// Phase 2 — sequential persistence. EF Core DbContext is single-threaded,
// so we apply each (event upsert + snapshot insert) one at a time.
int snapshotsCaptured = 0;
foreach (var (live, snapshot) in scraped)
{
ct.ThrowIfCancellationRequested();
try
{
var snapshot = await _scraper.ScrapeEventOddsAsync(eventForScrape, OddsSource.Live, ct);
// Persist new live events — the upcoming poller may not have seen them
// yet (or never will, for matches added after their scheduled start).
// The Live page reads from the events table, so a new live row must
// exist before its snapshots become visible.
var existing = await _eventRepo.GetAsync(live.Id, ct);
if (existing is null)
{
await _eventRepo.AddAsync(live, ct);
await _eventRepo.SaveChangesAsync(ct);
}
else if (existing.EventPath is null && live.EventPath is not null)
{
// Backfill EventPath on rows persisted before the column existed.
var patched = existing with { EventPath = live.EventPath };
await _eventRepo.UpdateAsync(patched, ct);
await _eventRepo.SaveChangesAsync(ct);
}
await _snapshotRepo.AddAsync(snapshot, ct);
await _snapshotRepo.SaveChangesAsync(ct);
snapshotsCaptured++;
@@ -124,8 +142,8 @@ public sealed class PullLiveOddsUseCase
catch (Exception ex)
{
_logger.LogWarning(ex,
"PullLiveOddsUseCase: failed to capture live snapshot for event {EventId} — skipping",
eventForScrape.Id.Value);
"PullLiveOddsUseCase: failed to persist live snapshot for event {EventId} — skipping",
live.Id.Value);
}
}
@@ -1,5 +1,10 @@
using System.Collections.Concurrent;
using Marathon.Application.Abstractions;
using Marathon.Application.Configuration;
using Marathon.Domain.Entities;
using Marathon.Domain.Enums;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Marathon.Application.UseCases;
@@ -13,18 +18,21 @@ public sealed class PullUpcomingEventsUseCase
private readonly IOddsScraper _scraper;
private readonly IEventRepository _eventRepo;
private readonly ISnapshotRepository _snapshotRepo;
private readonly IOptionsMonitor<ScrapingThrottle> _throttle;
private readonly ILogger<PullUpcomingEventsUseCase> _logger;
public PullUpcomingEventsUseCase(
IOddsScraper scraper,
IEventRepository eventRepo,
ISnapshotRepository snapshotRepo,
IOptionsMonitor<ScrapingThrottle> throttle,
ILogger<PullUpcomingEventsUseCase> logger)
{
_scraper = scraper ?? throw new ArgumentNullException(nameof(scraper));
_eventRepo = eventRepo ?? throw new ArgumentNullException(nameof(eventRepo));
_scraper = scraper ?? throw new ArgumentNullException(nameof(scraper));
_eventRepo = eventRepo ?? throw new ArgumentNullException(nameof(eventRepo));
_snapshotRepo = snapshotRepo ?? throw new ArgumentNullException(nameof(snapshotRepo));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_throttle = throttle ?? throw new ArgumentNullException(nameof(throttle));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
@@ -44,14 +52,49 @@ public sealed class PullUpcomingEventsUseCase
var events = await _scraper.ScrapeUpcomingAsync(sportFilter: null, ct);
int eventsProcessed = events.Count;
int newEvents = 0;
int snapshotsCaptured = 0;
_logger.LogInformation(
"PullUpcomingEventsUseCase: scraper returned {Count} events",
eventsProcessed);
foreach (var ev in events)
// Phase 1 — parallel HTTP fan-out. Each event's odds snapshot is scraped
// concurrently up to MaxConcurrentRequests; the scraper's rate limiter
// smooths spikes underneath. We do NOT touch the DbContext here — EF Core
// is single-threaded.
var scraped = new ConcurrentBag<(Event Event, OddsSnapshot Snapshot)>();
var maxParallelism = Math.Max(1, _throttle.CurrentValue.MaxConcurrentRequests);
var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = maxParallelism,
CancellationToken = ct,
};
await Parallel.ForEachAsync(events, parallelOptions, async (ev, taskCt) =>
{
try
{
var snapshot = await _scraper.ScrapeEventOddsAsync(ev, OddsSource.PreMatch, taskCt);
scraped.Add((ev, snapshot));
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"PullUpcomingEventsUseCase: failed to capture snapshot for event {EventId} — skipping",
ev.Id.Value);
}
});
// Phase 2 — sequential persistence. Upsert event row, then save the
// captured snapshot. Per-event try/catch keeps a single failure from
// aborting the whole cycle.
int newEvents = 0;
int snapshotsCaptured = 0;
foreach (var (ev, snapshot) in scraped)
{
ct.ThrowIfCancellationRequested();
@@ -78,11 +121,6 @@ public sealed class PullUpcomingEventsUseCase
try
{
var snapshot = await _scraper.ScrapeEventOddsAsync(
ev,
Domain.Enums.OddsSource.PreMatch,
ct);
await _snapshotRepo.AddAsync(snapshot, ct);
await _snapshotRepo.SaveChangesAsync(ct);
snapshotsCaptured++;
@@ -94,7 +132,7 @@ public sealed class PullUpcomingEventsUseCase
catch (Exception ex)
{
_logger.LogWarning(ex,
"PullUpcomingEventsUseCase: failed to capture snapshot for event {EventId} — skipping",
"PullUpcomingEventsUseCase: failed to persist snapshot for event {EventId} — skipping",
ev.Id.Value);
}
}