286b55986b
The Pull*UseCase implementations issued one HTTP request at a time despite
Scraping:MaxConcurrentRequests=4. With 30–80 live events and ~1s per
fetch, a 5–10s live cadence target was unreachable; cycles overflowed
the configured interval.
* New Marathon.Application.Configuration.ScrapingThrottle bound from the
shared Scraping:* section. Exposes only MaxConcurrentRequests so the
Application layer doesn't pull in the Infrastructure-side ScrapingOptions.
* PullLiveOddsUseCase + PullUpcomingEventsUseCase split into two phases:
- Phase 1 — Parallel.ForEachAsync over the event list with
MaxDegreeOfParallelism = throttle.MaxConcurrentRequests. The scraper's
Polly rate limiter still throttles to RequestsPerSecond underneath
this fan-out, so spikes are smoothed before they hit the bookmaker.
- Phase 2 — sequential foreach over the (Event, Snapshot) tuples
captured in Phase 1, doing event upsert + snapshot insert. EF Core
DbContext is not thread-safe so all DB writes stay on a single thread.
* InfrastructureModule binds ScrapingThrottle alongside AnomalyOptions.
* Failed snapshot scrapes in Phase 1 mean the event row is also NOT
persisted in Phase 2 — previously we'd persist the row even when the
snapshot scrape failed, leaving an orphan event with no odds. Updated
the regression test accordingly.
* Test fixture exposes TestFixtures.Throttle(maxConcurrentRequests=1) for
deterministic sequential test runs.
* One existing NSubstitute setup that chained Arg.Is<>() across two
configurations was rewritten to use a single Arg.Any<>() with inline
branching — chained matchers were leaking and returning wrong results.
147 lines
5.5 KiB
C#
147 lines
5.5 KiB
C#
using System.Collections.Concurrent;
|
|
using Marathon.Application.Abstractions;
|
|
using Marathon.Application.Configuration;
|
|
using Marathon.Domain.Entities;
|
|
using Marathon.Domain.Enums;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
|
|
namespace Marathon.Application.UseCases;
|
|
|
|
/// <summary>
|
|
/// Fetches the current pre-match event listing, persists new events (skipping
|
|
/// duplicates by <see cref="Domain.ValueObjects.EventId"/>), and captures an
|
|
/// initial pre-match odds snapshot for every returned event.
|
|
/// </summary>
|
|
public sealed class PullUpcomingEventsUseCase
|
|
{
|
|
private readonly IOddsScraper _scraper;
|
|
private readonly IEventRepository _eventRepo;
|
|
private readonly ISnapshotRepository _snapshotRepo;
|
|
private readonly IOptionsMonitor<ScrapingThrottle> _throttle;
|
|
private readonly ILogger<PullUpcomingEventsUseCase> _logger;
|
|
|
|
public PullUpcomingEventsUseCase(
|
|
IOddsScraper scraper,
|
|
IEventRepository eventRepo,
|
|
ISnapshotRepository snapshotRepo,
|
|
IOptionsMonitor<ScrapingThrottle> throttle,
|
|
ILogger<PullUpcomingEventsUseCase> logger)
|
|
{
|
|
_scraper = scraper ?? throw new ArgumentNullException(nameof(scraper));
|
|
_eventRepo = eventRepo ?? throw new ArgumentNullException(nameof(eventRepo));
|
|
_snapshotRepo = snapshotRepo ?? throw new ArgumentNullException(nameof(snapshotRepo));
|
|
_throttle = throttle ?? throw new ArgumentNullException(nameof(throttle));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Executes one polling cycle: scrape → persist new events → capture snapshots.
|
|
/// </summary>
|
|
/// <param name="ct">Cancellation token.</param>
|
|
/// <returns>
|
|
/// A tuple of <c>(EventsProcessed, NewEvents, SnapshotsCaptured)</c>.
|
|
/// <c>EventsProcessed</c> is the total number returned by the scraper.
|
|
/// <c>NewEvents</c> is how many were not already in the DB.
|
|
/// <c>SnapshotsCaptured</c> is how many snapshots were successfully saved.
|
|
/// </returns>
|
|
public async Task<(int EventsProcessed, int NewEvents, int SnapshotsCaptured)> ExecuteAsync(
|
|
CancellationToken ct)
|
|
{
|
|
_logger.LogInformation("PullUpcomingEventsUseCase: cycle started");
|
|
|
|
var events = await _scraper.ScrapeUpcomingAsync(sportFilter: null, ct);
|
|
int eventsProcessed = events.Count;
|
|
|
|
_logger.LogInformation(
|
|
"PullUpcomingEventsUseCase: scraper returned {Count} events",
|
|
eventsProcessed);
|
|
|
|
// Phase 1 — parallel HTTP fan-out. Each event's odds snapshot is scraped
|
|
// concurrently up to MaxConcurrentRequests; the scraper's rate limiter
|
|
// smooths spikes underneath. We do NOT touch the DbContext here — EF Core
|
|
// is single-threaded.
|
|
var scraped = new ConcurrentBag<(Event Event, OddsSnapshot Snapshot)>();
|
|
var maxParallelism = Math.Max(1, _throttle.CurrentValue.MaxConcurrentRequests);
|
|
|
|
var parallelOptions = new ParallelOptions
|
|
{
|
|
MaxDegreeOfParallelism = maxParallelism,
|
|
CancellationToken = ct,
|
|
};
|
|
|
|
await Parallel.ForEachAsync(events, parallelOptions, async (ev, taskCt) =>
|
|
{
|
|
try
|
|
{
|
|
var snapshot = await _scraper.ScrapeEventOddsAsync(ev, OddsSource.PreMatch, taskCt);
|
|
scraped.Add((ev, snapshot));
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
throw;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"PullUpcomingEventsUseCase: failed to capture snapshot for event {EventId} — skipping",
|
|
ev.Id.Value);
|
|
}
|
|
});
|
|
|
|
// Phase 2 — sequential persistence. Upsert event row, then save the
|
|
// captured snapshot. Per-event try/catch keeps a single failure from
|
|
// aborting the whole cycle.
|
|
int newEvents = 0;
|
|
int snapshotsCaptured = 0;
|
|
foreach (var (ev, snapshot) in scraped)
|
|
{
|
|
ct.ThrowIfCancellationRequested();
|
|
|
|
try
|
|
{
|
|
var existing = await _eventRepo.GetAsync(ev.Id, ct);
|
|
if (existing is null)
|
|
{
|
|
await _eventRepo.AddAsync(ev, ct);
|
|
await _eventRepo.SaveChangesAsync(ct);
|
|
newEvents++;
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
throw;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"PullUpcomingEventsUseCase: failed to persist event {EventId} — skipping",
|
|
ev.Id.Value);
|
|
}
|
|
|
|
try
|
|
{
|
|
await _snapshotRepo.AddAsync(snapshot, ct);
|
|
await _snapshotRepo.SaveChangesAsync(ct);
|
|
snapshotsCaptured++;
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
throw;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"PullUpcomingEventsUseCase: failed to persist snapshot for event {EventId} — skipping",
|
|
ev.Id.Value);
|
|
}
|
|
}
|
|
|
|
_logger.LogInformation(
|
|
"PullUpcomingEventsUseCase: cycle done — processed={Processed}, new={New}, snapshots={Snapshots}",
|
|
eventsProcessed, newEvents, snapshotsCaptured);
|
|
|
|
return (eventsProcessed, newEvents, snapshotsCaptured);
|
|
}
|
|
}
|