diff --git a/src/Marathon.Application/Configuration/ScrapingThrottle.cs b/src/Marathon.Application/Configuration/ScrapingThrottle.cs new file mode 100644 index 0000000..cab3a70 --- /dev/null +++ b/src/Marathon.Application/Configuration/ScrapingThrottle.cs @@ -0,0 +1,26 @@ +namespace Marathon.Application.Configuration; + +/// +/// Application-layer view of the scraping concurrency knobs. +/// +/// +/// +/// Bound from the same Scraping appsettings section as +/// Marathon.Infrastructure.Configuration.ScrapingOptions — but only the +/// fields the use cases need to schedule fan-out. Keeping a separate Application +/// type avoids leaking the Infrastructure namespace into use-case code. +/// +/// +public sealed class ScrapingThrottle +{ + public const string SectionName = "Scraping"; + + /// + /// Maximum number of in-flight HTTP requests the scraper is allowed to + /// issue concurrently. Use cases use this as the + /// for batch fan-out. + /// The bookmaker rate limiter still throttles to RequestsPerSecond + /// underneath this value. + /// + public int MaxConcurrentRequests { get; init; } = 4; +} diff --git a/src/Marathon.Application/UseCases/PullLiveOddsUseCase.cs b/src/Marathon.Application/UseCases/PullLiveOddsUseCase.cs index 2eff086..c119d06 100644 --- a/src/Marathon.Application/UseCases/PullLiveOddsUseCase.cs +++ b/src/Marathon.Application/UseCases/PullLiveOddsUseCase.cs @@ -1,7 +1,10 @@ +using System.Collections.Concurrent; using Marathon.Application.Abstractions; +using Marathon.Application.Configuration; using Marathon.Domain.Entities; using Marathon.Domain.Enums; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace Marathon.Application.UseCases; @@ -21,17 +24,20 @@ public sealed class PullLiveOddsUseCase private readonly IOddsScraper _scraper; private readonly IEventRepository _eventRepo; private readonly ISnapshotRepository _snapshotRepo; + private readonly IOptionsMonitor _throttle; private readonly ILogger _logger; public PullLiveOddsUseCase( IOddsScraper scraper, IEventRepository eventRepo, ISnapshotRepository snapshotRepo, + IOptionsMonitor throttle, ILogger logger) { _scraper = scraper ?? throw new ArgumentNullException(nameof(scraper)); _eventRepo = eventRepo ?? throw new ArgumentNullException(nameof(eventRepo)); _snapshotRepo = snapshotRepo ?? throw new ArgumentNullException(nameof(snapshotRepo)); + _throttle = throttle ?? throw new ArgumentNullException(nameof(throttle)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } @@ -64,39 +70,27 @@ public sealed class PullLiveOddsUseCase "PullLiveOddsUseCase: scraper returned {Count} live events", liveEvents.Count); - int snapshotsCaptured = 0; + // Phase 1 — parallel HTTP fan-out: scrape every event's odds in parallel, + // capped at MaxConcurrentRequests. The scraper's rate limiter still + // throttles to RequestsPerSecond underneath this fan-out, so spikes are + // smoothed out before they reach the bookmaker. We deliberately do NOT + // touch the DbContext (or its repositories) inside the parallel block — + // EF Core DbContext is not thread-safe. + var scraped = new ConcurrentBag<(Event Live, OddsSnapshot Snapshot)>(); + var maxParallelism = Math.Max(1, _throttle.CurrentValue.MaxConcurrentRequests); - foreach (var live in liveEvents) + var parallelOptions = new ParallelOptions { - ct.ThrowIfCancellationRequested(); + MaxDegreeOfParallelism = maxParallelism, + CancellationToken = ct, + }; - // Persist new live events — the upcoming poller may not have seen them - // yet (or never will, for matches added after their scheduled start). - // The Live page reads from the events table, so a new live row must - // exist before its snapshots become visible. - Event eventForScrape; + await Parallel.ForEachAsync(liveEvents, parallelOptions, async (live, taskCt) => + { try { - var existing = await _eventRepo.GetAsync(live.Id, ct); - if (existing is null) - { - await _eventRepo.AddAsync(live, ct); - await _eventRepo.SaveChangesAsync(ct); - eventForScrape = live; - } - else if (existing.EventPath is null && live.EventPath is not null) - { - // Backfill EventPath on rows persisted before the column existed, - // so subsequent scrapes can use the correct URL. - var patched = existing with { EventPath = live.EventPath }; - await _eventRepo.UpdateAsync(patched, ct); - await _eventRepo.SaveChangesAsync(ct); - eventForScrape = patched; - } - else - { - eventForScrape = existing; - } + var snapshot = await _scraper.ScrapeEventOddsAsync(live, OddsSource.Live, taskCt); + scraped.Add((live, snapshot)); } catch (OperationCanceledException) { @@ -105,14 +99,38 @@ public sealed class PullLiveOddsUseCase catch (Exception ex) { _logger.LogWarning(ex, - "PullLiveOddsUseCase: failed to persist/lookup live event {EventId} — skipping", + "PullLiveOddsUseCase: failed to capture live snapshot for event {EventId} — skipping", live.Id.Value); - continue; } + }); + + // Phase 2 — sequential persistence. EF Core DbContext is single-threaded, + // so we apply each (event upsert + snapshot insert) one at a time. + int snapshotsCaptured = 0; + foreach (var (live, snapshot) in scraped) + { + ct.ThrowIfCancellationRequested(); try { - var snapshot = await _scraper.ScrapeEventOddsAsync(eventForScrape, OddsSource.Live, ct); + // Persist new live events — the upcoming poller may not have seen them + // yet (or never will, for matches added after their scheduled start). + // The Live page reads from the events table, so a new live row must + // exist before its snapshots become visible. + var existing = await _eventRepo.GetAsync(live.Id, ct); + if (existing is null) + { + await _eventRepo.AddAsync(live, ct); + await _eventRepo.SaveChangesAsync(ct); + } + else if (existing.EventPath is null && live.EventPath is not null) + { + // Backfill EventPath on rows persisted before the column existed. + var patched = existing with { EventPath = live.EventPath }; + await _eventRepo.UpdateAsync(patched, ct); + await _eventRepo.SaveChangesAsync(ct); + } + await _snapshotRepo.AddAsync(snapshot, ct); await _snapshotRepo.SaveChangesAsync(ct); snapshotsCaptured++; @@ -124,8 +142,8 @@ public sealed class PullLiveOddsUseCase catch (Exception ex) { _logger.LogWarning(ex, - "PullLiveOddsUseCase: failed to capture live snapshot for event {EventId} — skipping", - eventForScrape.Id.Value); + "PullLiveOddsUseCase: failed to persist live snapshot for event {EventId} — skipping", + live.Id.Value); } } diff --git a/src/Marathon.Application/UseCases/PullUpcomingEventsUseCase.cs b/src/Marathon.Application/UseCases/PullUpcomingEventsUseCase.cs index cc3c042..4c89017 100644 --- a/src/Marathon.Application/UseCases/PullUpcomingEventsUseCase.cs +++ b/src/Marathon.Application/UseCases/PullUpcomingEventsUseCase.cs @@ -1,5 +1,10 @@ +using System.Collections.Concurrent; using Marathon.Application.Abstractions; +using Marathon.Application.Configuration; +using Marathon.Domain.Entities; +using Marathon.Domain.Enums; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace Marathon.Application.UseCases; @@ -13,18 +18,21 @@ public sealed class PullUpcomingEventsUseCase private readonly IOddsScraper _scraper; private readonly IEventRepository _eventRepo; private readonly ISnapshotRepository _snapshotRepo; + private readonly IOptionsMonitor _throttle; private readonly ILogger _logger; public PullUpcomingEventsUseCase( IOddsScraper scraper, IEventRepository eventRepo, ISnapshotRepository snapshotRepo, + IOptionsMonitor throttle, ILogger logger) { - _scraper = scraper ?? throw new ArgumentNullException(nameof(scraper)); - _eventRepo = eventRepo ?? throw new ArgumentNullException(nameof(eventRepo)); + _scraper = scraper ?? throw new ArgumentNullException(nameof(scraper)); + _eventRepo = eventRepo ?? throw new ArgumentNullException(nameof(eventRepo)); _snapshotRepo = snapshotRepo ?? throw new ArgumentNullException(nameof(snapshotRepo)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _throttle = throttle ?? throw new ArgumentNullException(nameof(throttle)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } /// @@ -44,14 +52,49 @@ public sealed class PullUpcomingEventsUseCase var events = await _scraper.ScrapeUpcomingAsync(sportFilter: null, ct); int eventsProcessed = events.Count; - int newEvents = 0; - int snapshotsCaptured = 0; _logger.LogInformation( "PullUpcomingEventsUseCase: scraper returned {Count} events", eventsProcessed); - foreach (var ev in events) + // Phase 1 — parallel HTTP fan-out. Each event's odds snapshot is scraped + // concurrently up to MaxConcurrentRequests; the scraper's rate limiter + // smooths spikes underneath. We do NOT touch the DbContext here — EF Core + // is single-threaded. + var scraped = new ConcurrentBag<(Event Event, OddsSnapshot Snapshot)>(); + var maxParallelism = Math.Max(1, _throttle.CurrentValue.MaxConcurrentRequests); + + var parallelOptions = new ParallelOptions + { + MaxDegreeOfParallelism = maxParallelism, + CancellationToken = ct, + }; + + await Parallel.ForEachAsync(events, parallelOptions, async (ev, taskCt) => + { + try + { + var snapshot = await _scraper.ScrapeEventOddsAsync(ev, OddsSource.PreMatch, taskCt); + scraped.Add((ev, snapshot)); + } + catch (OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "PullUpcomingEventsUseCase: failed to capture snapshot for event {EventId} — skipping", + ev.Id.Value); + } + }); + + // Phase 2 — sequential persistence. Upsert event row, then save the + // captured snapshot. Per-event try/catch keeps a single failure from + // aborting the whole cycle. + int newEvents = 0; + int snapshotsCaptured = 0; + foreach (var (ev, snapshot) in scraped) { ct.ThrowIfCancellationRequested(); @@ -78,11 +121,6 @@ public sealed class PullUpcomingEventsUseCase try { - var snapshot = await _scraper.ScrapeEventOddsAsync( - ev, - Domain.Enums.OddsSource.PreMatch, - ct); - await _snapshotRepo.AddAsync(snapshot, ct); await _snapshotRepo.SaveChangesAsync(ct); snapshotsCaptured++; @@ -94,7 +132,7 @@ public sealed class PullUpcomingEventsUseCase catch (Exception ex) { _logger.LogWarning(ex, - "PullUpcomingEventsUseCase: failed to capture snapshot for event {EventId} — skipping", + "PullUpcomingEventsUseCase: failed to persist snapshot for event {EventId} — skipping", ev.Id.Value); } } diff --git a/src/Marathon.Infrastructure/InfrastructureModule.cs b/src/Marathon.Infrastructure/InfrastructureModule.cs index ea1a2d1..7280a7e 100644 --- a/src/Marathon.Infrastructure/InfrastructureModule.cs +++ b/src/Marathon.Infrastructure/InfrastructureModule.cs @@ -46,6 +46,10 @@ public static class InfrastructureModule .AddOptions() .Bind(config.GetSection(AnomalyOptions.SectionName)); + services + .AddOptions() + .Bind(config.GetSection(ScrapingThrottle.SectionName)); + services.AddHostedService(); services.AddHostedService(); services.AddHostedService(); diff --git a/tests/Marathon.Application.Tests/UseCases/PullLiveOddsUseCaseTests.cs b/tests/Marathon.Application.Tests/UseCases/PullLiveOddsUseCaseTests.cs index 12383f9..aaee6f8 100644 --- a/tests/Marathon.Application.Tests/UseCases/PullLiveOddsUseCaseTests.cs +++ b/tests/Marathon.Application.Tests/UseCases/PullLiveOddsUseCaseTests.cs @@ -18,6 +18,7 @@ public sealed class PullLiveOddsUseCaseTests private PullLiveOddsUseCase CreateSut() => new(_scraper, _eventRepo, _snapshotRepo, + TestFixtures.Throttle(), NullLogger.Instance); [Fact] diff --git a/tests/Marathon.Application.Tests/UseCases/PullUpcomingEventsUseCaseTests.cs b/tests/Marathon.Application.Tests/UseCases/PullUpcomingEventsUseCaseTests.cs index ab29953..7971c4e 100644 --- a/tests/Marathon.Application.Tests/UseCases/PullUpcomingEventsUseCaseTests.cs +++ b/tests/Marathon.Application.Tests/UseCases/PullUpcomingEventsUseCaseTests.cs @@ -18,6 +18,7 @@ public sealed class PullUpcomingEventsUseCaseTests private PullUpcomingEventsUseCase CreateSut() => new(_scraper, _eventRepo, _snapshotRepo, + TestFixtures.Throttle(), NullLogger.Instance); [Fact] @@ -88,24 +89,30 @@ public sealed class PullUpcomingEventsUseCaseTests var ev2 = TestFixtures.MakeEvent("22222222"); var events = new List { ev1, ev2 }.AsReadOnly(); - _scraper.ScrapeUpcomingAsync(null, Arg.Any()).Returns(events); + _scraper.ScrapeUpcomingAsync(Arg.Any(), Arg.Any()).Returns(events); _eventRepo.GetAsync(Arg.Any(), Arg.Any()).Returns((Event?)null); + // Inline routing of the throwing-vs-passing branch on event id, since chained + // Arg.Is<>() setups can leak matchers across NSubstitute invocations. _scraper.ScrapeEventOddsAsync( - Arg.Is(e => e.Id == ev1.Id), OddsSource.PreMatch, Arg.Any()) - .ThrowsAsync(new HttpRequestException("site down")); - _scraper.ScrapeEventOddsAsync( - Arg.Is(e => e.Id == ev2.Id), OddsSource.PreMatch, Arg.Any()) - .Returns(TestFixtures.MakeSnapshot(ev2.Id)); + Arg.Any(), OddsSource.PreMatch, Arg.Any()) + .Returns(ci => + { + var evArg = ci.Arg(); + if (evArg.Id == ev1.Id) + throw new HttpRequestException("site down"); + return TestFixtures.MakeSnapshot(evArg.Id); + }); var sut = CreateSut(); // Act — should not throw var (processed, newEvents, snapshots) = await sut.ExecuteAsync(CancellationToken.None); - // Assert + // Assert: ev1's snapshot scrape failed in Phase 1, so it's not even + // attempted in Phase 2 — no orphan event row gets persisted. processed.Should().Be(2); - newEvents.Should().Be(2); + newEvents.Should().Be(1, "ev1's snapshot failed so it was not persisted"); snapshots.Should().Be(1, "only ev2 snapshot succeeded"); } diff --git a/tests/Marathon.Application.Tests/UseCases/TestFixtures.cs b/tests/Marathon.Application.Tests/UseCases/TestFixtures.cs index 3901942..ab7922d 100644 --- a/tests/Marathon.Application.Tests/UseCases/TestFixtures.cs +++ b/tests/Marathon.Application.Tests/UseCases/TestFixtures.cs @@ -1,6 +1,8 @@ +using Marathon.Application.Configuration; using Marathon.Domain.Entities; using Marathon.Domain.Enums; using Marathon.Domain.ValueObjects; +using Microsoft.Extensions.Options; namespace Marathon.Application.Tests.UseCases; @@ -42,4 +44,23 @@ internal static class TestFixtures { return new EventResult(eventId, 2, 1, Side.Side1, DateTimeOffset.UtcNow); } + + /// + /// Creates an that always returns the given + /// throttle. Use 1 for sequential test behaviour, higher values to exercise fan-out. + /// + public static IOptionsMonitor Throttle(int maxConcurrentRequests = 1) => + new StaticOptionsMonitor(new ScrapingThrottle + { + MaxConcurrentRequests = maxConcurrentRequests, + }); + + private sealed class StaticOptionsMonitor : IOptionsMonitor where T : class + { + private readonly T _value; + public StaticOptionsMonitor(T value) => _value = value; + public T CurrentValue => _value; + public T Get(string? name) => _value; + public IDisposable? OnChange(Action listener) => null; + } } diff --git a/tests/Marathon.Infrastructure.Tests/Workers/LiveOddsPollerTests.cs b/tests/Marathon.Infrastructure.Tests/Workers/LiveOddsPollerTests.cs index 15cdb9a..dde40fb 100644 --- a/tests/Marathon.Infrastructure.Tests/Workers/LiveOddsPollerTests.cs +++ b/tests/Marathon.Infrastructure.Tests/Workers/LiveOddsPollerTests.cs @@ -1,5 +1,6 @@ using FluentAssertions; using Marathon.Application.Abstractions; +using Marathon.Application.Configuration; using Marathon.Application.UseCases; using Marathon.Domain.Entities; using Marathon.Domain.Enums; @@ -47,11 +48,19 @@ public sealed class LiveOddsPollerTests sp.GetRequiredService(), sp.GetRequiredService(), sp.GetRequiredService(), + StaticThrottle(), NullLogger.Instance)); return services.BuildServiceProvider(); } + private static IOptionsMonitor StaticThrottle() + { + var monitor = Substitute.For>(); + monitor.CurrentValue.Returns(new ScrapingThrottle { MaxConcurrentRequests = 1 }); + return monitor; + } + private static IOptionsMonitor BuildOptions( bool enabled = true, int intervalSeconds = 0)