feat(phase-8-backend): per-event results harvesting + EventPath plumbing

Implements Phase 8 Amendment 1: marathonbet.by has no public results archive
endpoint, so results must be harvested per-event by re-fetching the event
detail page until eventJsonInfo.matchIsComplete=true.

Backend changes:

* IOddsScraper:
  - ScrapeResultsAsync(DateRange) replaced with ScrapeEventResultAsync(Event)
    returning a nullable EventResult — null when match still in progress.
  - ScrapeEventOddsAsync now takes the full Event (so EventPath drives URL
    construction) instead of bare EventId.
  - New ScrapeLiveAsync() for the /su/live listing.

* Domain:
  - Event gains EventPath (nullable string) — the data-event-path attribute
    captured during scraping; required for reliable URL construction.

* Infrastructure:
  - New migration 20260506000000_AddEventPath adds the column.
  - EventEntity / EventConfiguration / Mapping / model-snapshot updated.
  - MarathonbetScraper: new ScrapeLiveAsync + ScrapeEventResultAsync; URL
    builder prefers EventPath, falls back to numeric ID for legacy rows.
  - EventListingParserBase extracts data-event-path on every listing row.

* Application:
  - PullResultsUseCase: branches on selection vs date-range, emits IProgress<
    PullResultsProgress>, returns ResultLoadOutcome (Loaded / AlreadyLoaded /
    NotYetComplete / Failed); idempotent (skips events whose result already
    exists).
  - PullLiveOddsUseCase now drives off the live listing (auto-discovers
    events that go live without ever appearing in the upcoming list) and
    backfills EventPath on legacy rows.
  - PullUpcomingEventsUseCase wires EventPath on persisted events.

* Workers: UpcomingEventsPoller updates persistence path accordingly.

* Tests: 17 net-new tests across Application + Infrastructure + Domain;
  all 293 still pass.
This commit is contained in:
2026-05-09 15:10:27 +03:00
parent 1bbf4fcfed
commit 9c5d3df1f2
18 changed files with 638 additions and 217 deletions
@@ -1,13 +1,21 @@
using Marathon.Application.Abstractions;
using Marathon.Domain.Entities;
using Marathon.Domain.Enums;
using Microsoft.Extensions.Logging;
namespace Marathon.Application.UseCases;
/// <summary>
/// For each currently-live event in the database, fetches a fresh odds snapshot
/// via the scraper and persists it.
/// Discovers currently-live events from the bookmaker's <c>/su/live</c> listing,
/// persists any not yet known to the database, and captures a fresh
/// <see cref="OddsSource.Live"/> snapshot for each.
/// </summary>
/// <remarks>
/// Live discovery is authoritative: events that go live without ever appearing
/// in the upcoming list (late-added matches, in-play markets opened on demand)
/// are picked up here. Pre-match-only events are NOT scraped by this use case —
/// they would just be wasted requests against the bookmaker.
/// </remarks>
public sealed class PullLiveOddsUseCase
{
private readonly IOddsScraper _scraper;
@@ -31,27 +39,80 @@ public sealed class PullLiveOddsUseCase
/// Executes one live-odds polling cycle.
/// </summary>
/// <param name="ct">Cancellation token.</param>
/// <returns>Number of snapshots successfully captured.</returns>
/// <returns>Number of live snapshots successfully captured.</returns>
public async Task<int> ExecuteAsync(CancellationToken ct)
{
_logger.LogInformation("PullLiveOddsUseCase: cycle started");
// Refresh odds for every event we already track. The "live vs pre-match"
// distinction is recorded by stamping each snapshot with OddsSource.Live.
// TODO(phase-6/8): once IEventRepository.ListLiveAsync(cutoff) ships, swap
// this for a filter that only returns currently-live events to avoid
// hammering the scraper with non-live IDs.
var allEvents = await _eventRepo.ListAsync(ct);
IReadOnlyList<Event> liveEvents;
try
{
liveEvents = await _scraper.ScrapeLiveAsync(ct);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
_logger.LogError(ex,
"PullLiveOddsUseCase: failed to fetch live event listing — skipping cycle");
return 0;
}
_logger.LogInformation(
"PullLiveOddsUseCase: scraper returned {Count} live events",
liveEvents.Count);
int snapshotsCaptured = 0;
foreach (var ev in allEvents)
foreach (var live in liveEvents)
{
ct.ThrowIfCancellationRequested();
// Persist new live events — the upcoming poller may not have seen them
// yet (or never will, for matches added after their scheduled start).
// The Live page reads from the events table, so a new live row must
// exist before its snapshots become visible.
Event eventForScrape;
try
{
var snapshot = await _scraper.ScrapeEventOddsAsync(ev.Id, OddsSource.Live, ct);
var existing = await _eventRepo.GetAsync(live.Id, ct);
if (existing is null)
{
await _eventRepo.AddAsync(live, ct);
await _eventRepo.SaveChangesAsync(ct);
eventForScrape = live;
}
else if (existing.EventPath is null && live.EventPath is not null)
{
// Backfill EventPath on rows persisted before the column existed,
// so subsequent scrapes can use the correct URL.
var patched = existing with { EventPath = live.EventPath };
await _eventRepo.UpdateAsync(patched, ct);
await _eventRepo.SaveChangesAsync(ct);
eventForScrape = patched;
}
else
{
eventForScrape = existing;
}
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"PullLiveOddsUseCase: failed to persist/lookup live event {EventId} — skipping",
live.Id.Value);
continue;
}
try
{
var snapshot = await _scraper.ScrapeEventOddsAsync(eventForScrape, OddsSource.Live, ct);
await _snapshotRepo.AddAsync(snapshot, ct);
await _snapshotRepo.SaveChangesAsync(ct);
snapshotsCaptured++;
@@ -64,13 +125,13 @@ public sealed class PullLiveOddsUseCase
{
_logger.LogWarning(ex,
"PullLiveOddsUseCase: failed to capture live snapshot for event {EventId} — skipping",
ev.Id.Value);
eventForScrape.Id.Value);
}
}
_logger.LogInformation(
"PullLiveOddsUseCase: cycle done — snapshots captured for {Count}/{Total} events",
snapshotsCaptured, allEvents.Count);
"PullLiveOddsUseCase: cycle done — snapshots captured for {Count}/{Total} live events",
snapshotsCaptured, liveEvents.Count);
return snapshotsCaptured;
}
@@ -1,26 +1,62 @@
using Marathon.Application.Abstractions;
using Marathon.Application.Storage;
using Marathon.Domain.Entities;
using Microsoft.Extensions.Logging;
using DomainEventId = Marathon.Domain.ValueObjects.EventId;
namespace Marathon.Application.UseCases;
/// <summary>
/// Scaffolded results loader — inspects events for completion and persists
/// <see cref="Domain.Entities.EventResult"/>s when detected.
/// Per-event progress emitted by <see cref="PullResultsUseCase.ExecuteAsync"/>.
/// Used by the UI to render a progress bar and the running list of loaded
/// results — each tick is fired AFTER the bookmaker has been queried for
/// <see cref="EventId"/>, so the UI sees one tick per inspected event.
/// </summary>
/// <param name="Processed">Total events processed so far (1-based at the first tick).</param>
/// <param name="Total">Total candidates in this run.</param>
/// <param name="EventId">The event just processed.</param>
/// <param name="Outcome">What happened — see <see cref="ResultLoadOutcome"/>.</param>
/// <param name="Result">The persisted <see cref="EventResult"/> when <paramref name="Outcome"/> is <see cref="ResultLoadOutcome.Loaded"/>; otherwise null.</param>
public sealed record PullResultsProgress(
int Processed,
int Total,
DomainEventId EventId,
ResultLoadOutcome Outcome,
EventResult? Result);
/// <summary>What happened to a single candidate event during a results load.</summary>
public enum ResultLoadOutcome
{
/// <summary>A new <see cref="EventResult"/> was scraped and persisted.</summary>
Loaded,
/// <summary>The event already had a stored result — no work was done.</summary>
AlreadyLoaded,
/// <summary>The match isn't complete yet — try again later.</summary>
NotYetComplete,
/// <summary>The scrape failed (HTTP, parse, etc.). Logged at warning.</summary>
Failed,
}
/// <summary>
/// Loads completed-event results into the database.
/// </summary>
/// <remarks>
/// <para>
/// <b>Phase 4 scaffold:</b> This implementation is intentionally minimal.
/// The formal watch-list polling strategy lands in Phase 8, when
/// <c>IOddsScraper.ScrapeResultsAsync</c> will be replaced with real
/// per-event polling against <c>IResultsParser</c>.
/// For each candidate event, the use case:
/// </para>
/// <list type="number">
/// <item>Skips it if a result is already stored (idempotent).</item>
/// <item>Calls <see cref="IOddsScraper.ScrapeEventResultAsync"/>, which returns
/// a non-null <see cref="EventResult"/> only when the bookmaker reports
/// <c>matchIsComplete=true</c>.</item>
/// <item>Persists the result and increments the loaded count.</item>
/// </list>
/// <para>
/// Current behaviour: calls <c>IOddsScraper.ScrapeResultsAsync</c> (which
/// returns an empty list and logs a warning per Phase 3), so
/// <c>ResultsLoaded</c> will always be 0 until Phase 8.
/// All events with existing results are skipped (idempotent).
/// Candidates are either an explicit <paramref name="selection"/> list or — when
/// null/empty — every event scheduled in <c>range</c>.
/// </para>
/// </remarks>
public sealed class PullResultsUseCase
@@ -45,90 +81,51 @@ public sealed class PullResultsUseCase
/// <summary>
/// Inspects events for completion and persists results.
/// </summary>
/// <param name="range">Date range to scope the event search.</param>
/// <param name="range">Date range used when <paramref name="selection"/> is null or empty.</param>
/// <param name="selection">
/// When non-null, only these event IDs are inspected.
/// When null, all events in <paramref name="range"/> without a result row are inspected.
/// When non-empty, only these event IDs are inspected.
/// When null or empty, all events in <paramref name="range"/> without a stored
/// result are inspected.
/// </param>
/// <param name="progress">
/// Optional progress sink. Receives one update per candidate AFTER the scrape
/// has resolved. Suitable for binding to a UI progress indicator.
/// </param>
/// <param name="ct">Cancellation token.</param>
/// <returns>
/// A tuple of <c>(Inspected, ResultsLoaded, Skipped)</c> where:
/// <list type="bullet">
/// <item><c>Inspected</c>: total candidates examined.</item>
/// <item><c>ResultsLoaded</c>: results that were persisted this cycle.</item>
/// <item><c>Skipped</c>: events already with a result (idempotency guard).</item>
/// </list>
/// </returns>
public async Task<(int Inspected, int ResultsLoaded, int Skipped)> ExecuteAsync(
DateRange range,
IReadOnlyList<DomainEventId>? selection,
IProgress<PullResultsProgress>? progress,
CancellationToken ct)
{
_logger.LogInformation(
"PullResultsUseCase: cycle started — range={From:O}..{To:O}, selection={SelectionCount}",
range.From, range.To, selection?.Count.ToString() ?? "all");
// Resolve the candidate event IDs.
IReadOnlyList<Domain.Entities.Event> candidates;
if (selection is { Count: > 0 })
{
var selected = new List<Domain.Entities.Event>(selection.Count);
foreach (var id in selection)
{
ct.ThrowIfCancellationRequested();
var ev = await _eventRepo.GetAsync(id, ct);
if (ev is not null)
selected.Add(ev);
}
candidates = selected;
}
else
{
candidates = await _eventRepo.ListByDateRangeAsync(range, ct);
}
var candidates = await ResolveCandidatesAsync(range, selection, ct).ConfigureAwait(false);
int inspected = 0;
int resultsLoaded = 0;
int skipped = 0;
// Use the scraper's results endpoint (currently a no-op in Phase 3 — returns []).
var scraped = await _scraper.ScrapeResultsAsync(range, ct);
var scrapedByEventId = scraped.ToDictionary(r => r.EventId.Value, r => r);
foreach (var ev in candidates)
{
ct.ThrowIfCancellationRequested();
inspected++;
try
var (outcome, persisted) = await ProcessOneAsync(ev, ct).ConfigureAwait(false);
switch (outcome)
{
// Idempotency: skip events that already have a result stored.
var existingResult = await _resultRepo.GetAsync(ev.Id, ct);
if (existingResult is not null)
{
skipped++;
continue;
}
case ResultLoadOutcome.Loaded: resultsLoaded++; break;
case ResultLoadOutcome.AlreadyLoaded: skipped++; break;
}
// Check if the scraper returned a result for this event.
if (scrapedByEventId.TryGetValue(ev.Id.Value, out var result))
{
await _resultRepo.AddAsync(result, ct);
await _resultRepo.SaveChangesAsync(ct);
resultsLoaded++;
}
// Phase 8: else → add to watch list for next poll cycle.
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"PullResultsUseCase: error processing event {EventId} — skipping",
ev.Id.Value);
}
progress?.Report(new PullResultsProgress(
Processed: inspected,
Total: candidates.Count,
EventId: ev.Id,
Outcome: outcome,
Result: persisted));
}
_logger.LogInformation(
@@ -137,4 +134,67 @@ public sealed class PullResultsUseCase
return (inspected, resultsLoaded, skipped);
}
/// <summary>Convenience overload without progress reporting (worker callers).</summary>
public Task<(int Inspected, int ResultsLoaded, int Skipped)> ExecuteAsync(
DateRange range,
IReadOnlyList<DomainEventId>? selection,
CancellationToken ct)
=> ExecuteAsync(range, selection, progress: null, ct);
private async Task<IReadOnlyList<Event>> ResolveCandidatesAsync(
DateRange range,
IReadOnlyList<DomainEventId>? selection,
CancellationToken ct)
{
if (selection is { Count: > 0 })
{
var resolved = new List<Event>(selection.Count);
foreach (var id in selection)
{
ct.ThrowIfCancellationRequested();
var ev = await _eventRepo.GetAsync(id, ct).ConfigureAwait(false);
if (ev is not null)
resolved.Add(ev);
}
return resolved;
}
return await _eventRepo.ListByDateRangeAsync(range, ct).ConfigureAwait(false);
}
private async Task<(ResultLoadOutcome Outcome, EventResult? Persisted)> ProcessOneAsync(
Event ev,
CancellationToken ct)
{
try
{
var existing = await _resultRepo.GetAsync(ev.Id, ct).ConfigureAwait(false);
if (existing is not null)
{
return (ResultLoadOutcome.AlreadyLoaded, null);
}
var scraped = await _scraper.ScrapeEventResultAsync(ev, ct).ConfigureAwait(false);
if (scraped is null)
{
return (ResultLoadOutcome.NotYetComplete, null);
}
await _resultRepo.AddAsync(scraped, ct).ConfigureAwait(false);
await _resultRepo.SaveChangesAsync(ct).ConfigureAwait(false);
return (ResultLoadOutcome.Loaded, scraped);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"PullResultsUseCase: error processing event {EventId} — skipping",
ev.Id.Value);
return (ResultLoadOutcome.Failed, null);
}
}
}
@@ -79,7 +79,7 @@ public sealed class PullUpcomingEventsUseCase
try
{
var snapshot = await _scraper.ScrapeEventOddsAsync(
ev.Id,
ev,
Domain.Enums.OddsSource.PreMatch,
ct);