"""Deferred-dispatch infrastructure for quiet-hours notifications. When ``evaluate_event_gate`` returns ``QUIET_HOURS`` for a deferrable event type, the dispatch site calls :func:`defer_event` instead of dropping. That either inserts a new ``DeferredDispatch`` row or coalesces the event into an existing pending row for the same ``(link_id, collection_id)`` — asset add + matching remove cancels out, asset add + asset add merges set-union. An APScheduler one-shot ``date`` job per quiet-window-end fires :func:`drain_deferred_due` which: 1. Re-resolves each pending row's link/target/configs against current state. 2. Drops rows whose link/target was deleted or disabled in the meantime. 3. Re-checks quiet hours (in case the user extended the window mid-flight) and pushes ``fire_at`` to the new end if still suppressed. 4. Dispatches via the existing ``NotificationDispatcher``. 5. Writes a follow-up ``event_log`` row referencing the original ``event_log_id`` so the dashboard shows "delivered late". Wall-clock event types (``scheduled_message``) are explicitly NOT in ``_DEFERRABLE_EVENT_TYPES`` — delivering a "good morning" memory at 3 pm is worse than dropping it. Those keep the legacy drop-on-quiet-hours behavior. """ from __future__ import annotations import asyncio import dataclasses import logging from datetime import datetime, timezone from typing import Any from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from notify_bridge_core.models.events import EventType, ServiceEvent from notify_bridge_core.models.media import MediaAsset, MediaType from notify_bridge_core.notifications.dispatcher import ( NotificationDispatcher, TargetConfig, ) from notify_bridge_core.providers.base import ServiceProviderType from ..database.engine import get_engine from ..database.models import ( DeferredDispatch, EventLog, NotificationTracker, ServiceProvider, ) from .dispatch_helpers import ( GateReason, apply_tracking_display_filters, evaluate_event_gate, get_app_timezone, load_link_data, ) _LOGGER = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Policy # --------------------------------------------------------------------------- # Change-driven event types that are safe to deliver after the quiet window # ends — the underlying state change (a photo was added, a PR was opened, the # UPS went on battery) remains relevant even hours later. Wall-clock event # types (``scheduled_message``) are deliberately excluded: a "good morning" # delivered at 3 pm is wrong, drop is more correct than late delivery. _DEFERRABLE_EVENT_TYPES: frozenset[str] = frozenset({ # Immich "assets_added", "assets_removed", "collection_renamed", "collection_deleted", "sharing_changed", # Gitea "push", "issue_opened", "issue_closed", "issue_commented", "pr_opened", "pr_closed", "pr_merged", "pr_commented", "release_published", # Planka "card_created", "card_updated", "card_moved", "card_deleted", "card_commented", "comment_updated", "board_created", "board_updated", "board_deleted", "list_created", "list_updated", "list_deleted", "attachment_created", "card_label_added", "task_completed", # Generic webhook "webhook_received", # NUT (UPS) "ups_online", "ups_on_battery", "ups_low_battery", "ups_battery_restored", "ups_comms_lost", "ups_comms_restored", "ups_replace_battery", "ups_overload", }) # Per-tracker cap on the pending queue. A misconfigured short quiet window # plus a chatty upstream (e.g. mass-imported album) could otherwise grow # unbounded. On overflow we drop oldest (FIFO) — recent events still survive # to be delivered, ancient ones are sacrificed. _MAX_PENDING_PER_TRACKER = 1000 # Per-row timeout in the drain. Without this, a single hanging Telegram/SMTP # call could stall the whole drain for hours and leave the rest of the queue # stranded. Generous because legitimate large media uploads can take minutes. _DRAIN_DISPATCH_TIMEOUT_SECONDS = 120 def is_deferrable(event_type: str) -> bool: """Whether this event type should be deferred (vs. dropped) during quiet hours.""" return event_type in _DEFERRABLE_EVENT_TYPES # --------------------------------------------------------------------------- # ServiceEvent (de)serialization # --------------------------------------------------------------------------- # # JSON column stores ``dataclasses.asdict(event)`` plus a normalisation pass # for datetimes (ISO strings) and enums (string values). Round-trip via the # reverse pass below. def _normalize_for_json(value: Any) -> Any: if isinstance(value, datetime): return value.isoformat() if isinstance(value, (EventType, MediaType, ServiceProviderType)): return value.value if isinstance(value, dict): return {k: _normalize_for_json(v) for k, v in value.items()} if isinstance(value, (list, tuple)): return [_normalize_for_json(v) for v in value] return value def serialize_event(event: ServiceEvent) -> dict[str, Any]: """Convert a ``ServiceEvent`` to a JSON-safe dict for ``DeferredDispatch.event_payload``.""" return _normalize_for_json(dataclasses.asdict(event)) def _parse_dt(s: Any) -> datetime: if isinstance(s, datetime): return s return datetime.fromisoformat(s) def _deserialize_asset(data: dict[str, Any]) -> MediaAsset: return MediaAsset( id=data["id"], type=MediaType(data["type"]), filename=data["filename"], created_at=_parse_dt(data["created_at"]), owner_name=data.get("owner_name"), description=data.get("description"), tags=list(data.get("tags") or []), thumbnail_url=data.get("thumbnail_url"), preview_url=data.get("preview_url"), full_url=data.get("full_url"), extra=dict(data.get("extra") or {}), ) def deserialize_event(data: dict[str, Any]) -> ServiceEvent: """Inverse of :func:`serialize_event`.""" return ServiceEvent( event_type=EventType(data["event_type"]), provider_type=ServiceProviderType(data["provider_type"]), provider_name=data["provider_name"], collection_id=data["collection_id"], collection_name=data["collection_name"], timestamp=_parse_dt(data["timestamp"]), added_assets=[_deserialize_asset(a) for a in data.get("added_assets") or []], removed_asset_ids=list(data.get("removed_asset_ids") or []), added_count=int(data.get("added_count") or 0), removed_count=int(data.get("removed_count") or 0), old_name=data.get("old_name"), new_name=data.get("new_name"), old_shared=data.get("old_shared"), new_shared=data.get("new_shared"), extra=dict(data.get("extra") or {}), ) # --------------------------------------------------------------------------- # Coalescing # --------------------------------------------------------------------------- def _added_ids(payload: dict[str, Any]) -> list[str]: return [a["id"] for a in payload.get("added_assets") or [] if "id" in a] def _coalesce_assets_added( new_event: ServiceEvent, existing_added_row: DeferredDispatch | None, existing_removed_row: DeferredDispatch | None, ) -> tuple[str, DeferredDispatch | None, DeferredDispatch | None]: """Apply add-then-remove cancellation and add-then-add union. Returns ``(action, updated_added_row, updated_removed_row)`` where action is one of ``"insert"`` (caller must create a new row), ``"merge"`` (update existing rows in place — caller must session.add them). """ new_ids = [a.id for a in new_event.added_assets] new_ids_set = set(new_ids) # 1) If a matching assets_removed row pending: subtract — that's a re-add. if existing_removed_row is not None: removed_ids = list(existing_removed_row.event_payload.get("removed_asset_ids") or []) kept = [rid for rid in removed_ids if rid not in new_ids_set] if len(kept) != len(removed_ids): payload = dict(existing_removed_row.event_payload) payload["removed_asset_ids"] = kept payload["removed_count"] = len(kept) existing_removed_row.event_payload = payload if not kept: # All previously-removed IDs are being re-added → entire # removal is cancelled. Mark for caller to delete. existing_removed_row.status = "cancelled" # The intersection re-adds are accounted for by the cancellation; # remaining new IDs (those NOT in removed list) still need to land # in the assets_added row. new_ids = [nid for nid in new_ids if nid not in set(removed_ids)] new_ids_set = set(new_ids) if not new_ids: # All new added IDs cancelled an existing remove → nothing to enqueue. return ("merge", None, existing_removed_row) if existing_added_row is None: return ("insert", None, existing_removed_row) # 2) Union with existing assets_added — earliest fire_at wins. payload = dict(existing_added_row.event_payload) existing_assets = list(payload.get("added_assets") or []) seen = {a.get("id") for a in existing_assets} new_serialized = serialize_event(new_event) for a in new_serialized.get("added_assets") or []: if a.get("id") in new_ids_set and a.get("id") not in seen: existing_assets.append(a) seen.add(a.get("id")) payload["added_assets"] = existing_assets payload["added_count"] = len(existing_assets) existing_added_row.event_payload = payload return ("merge", existing_added_row, existing_removed_row) def _coalesce_assets_removed( new_event: ServiceEvent, existing_added_row: DeferredDispatch | None, existing_removed_row: DeferredDispatch | None, ) -> tuple[str, DeferredDispatch | None, DeferredDispatch | None]: """Mirror of :func:`_coalesce_assets_added` for removal events.""" new_ids = list(new_event.removed_asset_ids) new_ids_set = set(new_ids) # 1) If a matching assets_added row pending: subtract — that's an # add-then-remove within the window, cancel both sides. if existing_added_row is not None: added = list(existing_added_row.event_payload.get("added_assets") or []) kept_assets = [a for a in added if a.get("id") not in new_ids_set] if len(kept_assets) != len(added): payload = dict(existing_added_row.event_payload) payload["added_assets"] = kept_assets payload["added_count"] = len(kept_assets) existing_added_row.event_payload = payload if not kept_assets: existing_added_row.status = "cancelled" # IDs that were just added during the window don't need to flow # into the assets_removed row — they're a wash. cancelled_ids = {a.get("id") for a in added if a.get("id") in new_ids_set} new_ids = [nid for nid in new_ids if nid not in cancelled_ids] new_ids_set = set(new_ids) if not new_ids: return ("merge", existing_added_row, None) if existing_removed_row is None: return ("insert", existing_added_row, None) # 2) Union with existing assets_removed — earliest fire_at wins. payload = dict(existing_removed_row.event_payload) existing_ids = list(payload.get("removed_asset_ids") or []) seen = set(existing_ids) for rid in new_ids: if rid not in seen: existing_ids.append(rid) seen.add(rid) payload["removed_asset_ids"] = existing_ids payload["removed_count"] = len(existing_ids) existing_removed_row.event_payload = payload return ("merge", existing_added_row, existing_removed_row) async def _find_pending_asset_rows( session: AsyncSession, link_id: int, collection_id: str, ) -> tuple[DeferredDispatch | None, DeferredDispatch | None]: """Return ``(assets_added_row, assets_removed_row)`` pending for this link+collection.""" result = await session.exec( select(DeferredDispatch).where( DeferredDispatch.link_id == link_id, DeferredDispatch.collection_id == collection_id, DeferredDispatch.status == "pending", DeferredDispatch.event_type.in_(["assets_added", "assets_removed"]), ) ) added_row: DeferredDispatch | None = None removed_row: DeferredDispatch | None = None for row in result.all(): if row.event_type == "assets_added": added_row = row elif row.event_type == "assets_removed": removed_row = row return added_row, removed_row async def _trim_queue_if_needed( session: AsyncSession, tracker_id: int, ) -> None: """Drop oldest pending rows beyond the per-tracker cap with a log row each. Loads the parent tracker so the emitted event_log rows carry proper ``tracker_name``/``provider_id``/``provider_name`` and slot into the dashboard's "by tracker" grouping — without these the drop rows show up under an unattributed bucket and confuse the audit trail. """ rows = (await session.exec( select(DeferredDispatch).where( DeferredDispatch.tracker_id == tracker_id, DeferredDispatch.status == "pending", ).order_by(DeferredDispatch.fire_at.asc(), DeferredDispatch.id.asc()) )).all() overflow = len(rows) - _MAX_PENDING_PER_TRACKER if overflow <= 0: return _LOGGER.warning( "Deferred queue for tracker %d exceeds cap (%d > %d); dropping %d oldest", tracker_id, len(rows), _MAX_PENDING_PER_TRACKER, overflow, ) tracker = await session.get(NotificationTracker, tracker_id) tracker_name = tracker.name if tracker else "" provider_id = tracker.provider_id if tracker else None provider_name = "" if tracker is not None and provider_id is not None: provider = await session.get(ServiceProvider, provider_id) if provider is not None: provider_name = provider.name for row in rows[:overflow]: await _mark_dropped( session, row, tracker_name=tracker_name, provider_id=provider_id, provider_name=provider_name, reason="queue_overflow", ) # --------------------------------------------------------------------------- # Enqueue (called from dispatch sites when gate returns QUIET_HOURS) # --------------------------------------------------------------------------- async def defer_event( session: AsyncSession, *, event: ServiceEvent, user_id: int | None, tracker_id: int, link_id: int, event_log_id: int | None, fire_at: datetime, ) -> str: """Persist a deferred dispatch (or coalesce into an existing one). Caller is responsible for committing the session. Returns one of: * ``"inserted"`` — a fresh DeferredDispatch row was created. * ``"merged"`` — coalesced into an existing row (union or partial cancel). * ``"cancelled"`` — the new event fully cancelled an existing pending one (add-then-remove or remove-then-readd of the same asset IDs). Both sides are gone after this call. * ``"non_deferrable"`` — event type is wall-clock; caller should drop it with a ``"suppressed_quiet_hours_nondeferrable"`` event_log row. """ event_type = event.event_type.value if not is_deferrable(event_type): return "non_deferrable" fire_at_utc = fire_at.astimezone(timezone.utc) if fire_at.tzinfo else fire_at.replace(tzinfo=timezone.utc) # Asset events get set-merging across the same link+collection. Everything # else just gets a new row — those events aren't naturally cancellable. if event_type in ("assets_added", "assets_removed"): added_row, removed_row = await _find_pending_asset_rows( session, link_id, event.collection_id, ) if event_type == "assets_added": action, upd_added, upd_removed = _coalesce_assets_added( event, added_row, removed_row, ) else: action, upd_added, upd_removed = _coalesce_assets_removed( event, added_row, removed_row, ) # Apply pending updates. ``status="cancelled"`` rows are deleted # outright so the drain doesn't see them. fully_cancelled = False for row in (upd_added, upd_removed): if row is None: continue if row.status == "cancelled": await session.delete(row) fully_cancelled = True else: session.add(row) if action == "insert": new_row = DeferredDispatch( user_id=user_id, tracker_id=tracker_id, link_id=link_id, event_log_id=event_log_id, event_type=event_type, collection_id=event.collection_id, event_payload=serialize_event(event), fire_at=fire_at_utc, status="pending", ) session.add(new_row) await _trim_queue_if_needed(session, tracker_id) return "inserted" # action == "merge" — either updated existing or fully cancelled. return "cancelled" if fully_cancelled and (upd_added is None or upd_added.status == "cancelled") and (upd_removed is None or upd_removed.status == "cancelled") else "merged" # Non-asset event: no coalescing, fresh row. new_row = DeferredDispatch( user_id=user_id, tracker_id=tracker_id, link_id=link_id, event_log_id=event_log_id, event_type=event_type, collection_id=event.collection_id, event_payload=serialize_event(event), fire_at=fire_at_utc, status="pending", ) session.add(new_row) await _trim_queue_if_needed(session, tracker_id) return "inserted" # --------------------------------------------------------------------------- # Drain (called by APScheduler date job at quiet_hours_end_at) # --------------------------------------------------------------------------- async def drain_deferred_due(now: datetime | None = None) -> dict[str, int]: """Dispatch all pending DeferredDispatch rows whose ``fire_at <= now``. Re-resolves link/target/configs against current DB state so config edits between suppression and drain time take effect. Returns a small stats dict for logging. Implementation note: rows are *re-fetched* by id inside each per-tracker session rather than carried across session boundaries. Carrying a row instance to a new session and calling ``session.add(row)`` on a detached PK-bearing instance triggers an INSERT (collision with the existing PK) on flush — a class of bug that's invisible until the first session closes, hence the up-front re-fetch. """ now_utc = (now or datetime.now(timezone.utc)) if now_utc.tzinfo is None: now_utc = now_utc.replace(tzinfo=timezone.utc) stats = {"fired": 0, "dropped": 0, "rescheduled": 0, "errors": 0} engine = get_engine() async with AsyncSession(engine) as session: # Only pull the row identity + grouping key. Loading the full ORM # objects in a session that's about to close just wastes work — we # re-fetch fresh attached instances in the per-tracker session below. ident_rows = (await session.exec( select(DeferredDispatch.id, DeferredDispatch.tracker_id).where( DeferredDispatch.status == "pending", DeferredDispatch.fire_at <= now_utc, ).order_by(DeferredDispatch.fire_at.asc()) )).all() if not ident_rows: _LOGGER.debug("drain_deferred_due: no pending rows due") return stats _LOGGER.info( "Draining %d deferred dispatches due at %s", len(ident_rows), now_utc.isoformat(), ) # Group by tracker so a single per-tracker session can re-fetch its rows # (attached) and re-resolve link state once. ids_by_tracker: dict[int, list[int]] = {} for row_id, tracker_id in ident_rows: if row_id is None: continue ids_by_tracker.setdefault(tracker_id, []).append(row_id) from .watcher import _get_telegram_caches from .http_session import get_http_session url_cache, asset_cache = await _get_telegram_caches() shared_session = await get_http_session() dispatcher = NotificationDispatcher( url_cache=url_cache, asset_cache=asset_cache, session=shared_session, ) for tracker_id, row_ids in ids_by_tracker.items(): async with AsyncSession(engine) as session: tracker = await session.get(NotificationTracker, tracker_id) # Re-fetch rows freshly attached to THIS session. rows = (await session.exec( select(DeferredDispatch).where(DeferredDispatch.id.in_(row_ids)) )).all() if tracker is None or not tracker.enabled: # Tracker deleted or disabled between defer and drain — drop # all pending rows for it. Disable matches the live-path # invariant (watcher / webhooks / scheduled_dispatch all # short-circuit when ``tracker.enabled`` is False). reason = "tracker_removed" if tracker is None else "tracker_disabled_after_defer" for row in rows: await _mark_dropped( session, row, tracker=tracker, reason=reason, ) stats["dropped"] += 1 await session.commit() continue provider = await session.get(ServiceProvider, tracker.provider_id) provider_config = dict(provider.config) if provider else {} provider_id = provider.id if provider else tracker.provider_id provider_name = provider.name if provider else "" app_tz = await get_app_timezone(session) # Reload current link state. Broadcast links emit ONE entry per # child target sharing the SAME parent ``link_id`` — a plain # ``{link_id: ld}`` dict would silently drop N-1 children. The # drain dispatches to every expanded entry for the parent. link_data = await load_link_data(session, tracker_id) link_by_id: dict[int, list[dict[str, Any]]] = {} for ld in link_data: key = ld.get("link_id") if key is None: continue link_by_id.setdefault(key, []).append(ld) for row in rows: try: await _process_row( session, row, tracker, provider_id, provider_name, provider_config, app_tz, link_by_id, dispatcher, stats, ) except Exception as err: # noqa: BLE001 — keep draining other rows _LOGGER.exception( "Drain failed for deferred dispatch id=%s: %s", row.id, err, ) stats["errors"] += 1 await session.commit() _LOGGER.info("Drain complete: %s", stats) return stats async def _mark_dropped( session: AsyncSession, row: DeferredDispatch, *, tracker: NotificationTracker | None = None, tracker_name: str = "", provider_id: int | None = None, provider_name: str = "", reason: str, ) -> None: """Record a drop on the deferred row and emit a follow-up event_log entry. ``tracker``/``tracker_name``/``provider_id``/``provider_name`` populate the new event_log row's owner/provider columns so the dashboard "by tracker" grouping works for the drop path. Without these the row would have empty strings and slot into the "unknown" bucket. """ if tracker is not None: tracker_name = tracker_name or tracker.name if provider_id is None: provider_id = tracker.provider_id payload = row.event_payload if isinstance(row.event_payload, dict) else {} row.status = "dropped" row.fired_at = datetime.now(timezone.utc) session.add(row) session.add(EventLog( user_id=row.user_id, tracker_id=row.tracker_id, tracker_name=tracker_name, provider_id=provider_id, provider_name=provider_name, event_type=row.event_type, collection_id=row.collection_id, collection_name=payload.get("collection_name", ""), assets_count=int(payload.get("added_count", 0)) or int(payload.get("removed_count", 0)), details={ "dispatch_status": "deferred_then_dropped", "reason": reason, "original_event_log_id": row.event_log_id, "provider_type": payload.get("provider_type", ""), }, )) async def _process_row( session: AsyncSession, row: DeferredDispatch, tracker: NotificationTracker, provider_id: int, provider_name: str, provider_config: dict[str, Any], app_tz: str, link_by_id: dict[int, list[dict[str, Any]]], dispatcher: NotificationDispatcher, stats: dict[str, int], ) -> None: """Drain a single row: re-resolve link, re-evaluate gate, dispatch. ``link_by_id`` maps parent link_id → list of expanded entries (one per broadcast child, or a single-element list for regular targets). Every entry produces its own target_config so a broadcast deferred row fans out to all current children at drain time. """ expanded = link_by_id.get(row.link_id) if not expanded: # Link removed/disabled between defer and drain. await _mark_dropped( session, row, tracker=tracker, provider_id=provider_id, provider_name=provider_name, reason="link_removed", ) stats["dropped"] += 1 return # Every expanded entry for a parent link shares the same tracking_config, # so the gate decision and ``apply_tracking_display_filters`` shaping are # made once. Only the target_configs differ across children. tc = expanded[0].get("tracking_config") event = deserialize_event(row.event_payload) if tc is not None: outcome = evaluate_event_gate(event, tc, app_tz) if outcome.reason is GateReason.EVENT_TYPE_DISABLED: await _mark_dropped( session, row, tracker=tracker, provider_id=provider_id, provider_name=provider_name, reason="event_type_disabled_after_defer", ) stats["dropped"] += 1 return if outcome.reason is GateReason.QUIET_HOURS and outcome.quiet_hours_end_at is not None: row.fire_at = outcome.quiet_hours_end_at session.add(row) stats["rescheduled"] += 1 try: from .scheduler import schedule_deferred_drain schedule_deferred_drain(outcome.quiet_hours_end_at) except Exception: # noqa: BLE001 _LOGGER.exception( "Failed to reschedule drain for %s", outcome.quiet_hours_end_at, ) return shaped = apply_tracking_display_filters(event, tc) if shaped is None: # ``notify_favorites_only`` (or another display filter) dropped every # asset from the event. Inconsistent earlier behavior swallowed this # silently; we now route through the same "dropped + event_log" # pathway as link_removed so the dashboard shows why. await _mark_dropped( session, row, tracker=tracker, provider_id=provider_id, provider_name=provider_name, reason="filtered_after_defer", ) stats["dropped"] += 1 return # Build one target_config per expanded child (regular targets → length 1; # broadcast → length N children). target_configs: list[TargetConfig] = [] for ld in expanded: tmpl = ld.get("template_config") target_configs.append(TargetConfig( type=ld["target_type"], config=ld["target_config"], template_slots=ld.get("template_slots"), date_format=tmpl.date_format if tmpl else "%d.%m.%Y, %H:%M UTC", date_only_format=(tmpl.date_only_format if tmpl and tmpl.date_only_format else "%d.%m.%Y"), provider_api_key=provider_config.get("api_key") or provider_config.get("api_token"), provider_internal_url=provider_config.get("url", ""), provider_external_url=provider_config.get("external_domain", "") or provider_config.get("url", ""), receivers=ld["receivers"], )) # Per-row timeout — a single hanging remote call (Telegram outage, slow # SMTP) must not stall the rest of the queue. try: results = await asyncio.wait_for( dispatcher.dispatch(shaped, target_configs), timeout=_DRAIN_DISPATCH_TIMEOUT_SECONDS, ) except asyncio.TimeoutError: _LOGGER.warning( "Drain dispatch for row %s timed out after %ds", row.id, _DRAIN_DISPATCH_TIMEOUT_SECONDS, ) results = [{"success": False, "error": f"timeout after {_DRAIN_DISPATCH_TIMEOUT_SECONDS}s"}] success = any(r.get("success") for r in results) row.status = "fired" if success else "dropped" row.fired_at = datetime.now(timezone.utc) session.add(row) if success: stats["fired"] += 1 session.add(EventLog( user_id=row.user_id, tracker_id=row.tracker_id, tracker_name=tracker.name, provider_id=provider_id, provider_name=provider_name, event_type=row.event_type, collection_id=row.collection_id, collection_name=event.collection_name, assets_count=event.added_count or event.removed_count or 0, details={ "dispatch_status": "delivered_after_quiet_hours", "original_event_log_id": row.event_log_id, "deferred_for_seconds": int( (row.fired_at - row.created_at).total_seconds() ), "provider_type": event.provider_type.value, }, )) else: stats["dropped"] += 1 first_err = next((r.get("error") for r in results if not r.get("success")), "unknown") session.add(EventLog( user_id=row.user_id, tracker_id=row.tracker_id, tracker_name=tracker.name, provider_id=provider_id, provider_name=provider_name, event_type=row.event_type, collection_id=row.collection_id, collection_name=event.collection_name, assets_count=event.added_count or event.removed_count or 0, details={ "dispatch_status": "deferred_then_failed", "reason": str(first_err)[:200], "original_event_log_id": row.event_log_id, "provider_type": event.provider_type.value, }, )) # --------------------------------------------------------------------------- # Startup: reschedule pending drain jobs found in the DB # --------------------------------------------------------------------------- async def load_pending_drain_jobs() -> int: """At startup, scan ``DeferredDispatch`` for pending rows and (re)schedule drains. Rows whose ``fire_at`` already passed get a single immediate-fire job; the rest get one job per distinct ``fire_at`` (minute-rounded) so all rows sharing a window end share a drain. """ from .scheduler import schedule_deferred_drain engine = get_engine() async with AsyncSession(engine) as session: rows = (await session.exec( select(DeferredDispatch.fire_at).where( DeferredDispatch.status == "pending", ) )).all() if not rows: return 0 unique_fire_ats: set[datetime] = set() for fa in rows: if isinstance(fa, datetime): unique_fire_ats.add(fa.astimezone(timezone.utc) if fa.tzinfo else fa.replace(tzinfo=timezone.utc)) for fa in unique_fire_ats: schedule_deferred_drain(fa) _LOGGER.info( "Loaded %d pending deferred dispatches; scheduled %d drain job(s)", len(rows), len(unique_fire_ats), ) return len(unique_fire_ats)