Files
notify-bridge/packages/server/src/notify_bridge_server/services/deferred_dispatch.py
T
alexei.dolgolyov 6a8f374678 feat: observability, per-receiver Telegram options, oversized-video fallback
Operability:
- Correlation IDs end-to-end: shared dispatch_id between log lines and
  EventLog rows (event/watcher/scheduled/deferred/action/HA/command paths)
  and a new X-Request-Id middleware that normalizes inbound ids and binds
  request_id into log context.
- dispatch_summary block merged into EventLog.details: per-target
  success/failure counts plus Telegram media delivered/skipped/failed and
  truncated error lists, so partial outcomes surface in the UI.
- Diagnostic mode: admin can flip one module to DEBUG for a bounded
  window with auto-revert (in-memory only; setup_logging() resets on
  boot, lifespan reverts on shutdown). New /diagnostic-mode endpoints
  plus DiagnosticsCassette UI on the settings page.

Telegram:
- Per-receiver options: disable_notification (silent send) and
  message_thread_id (forum-topic routing), wired through the dispatcher
  via a ContextVar so all four send sites (sendMessage / sendPhoto-Video-
  Document / sendMediaGroup / cache-hit POST) pick them up.
- send_large_videos_as_documents target setting: bypass the 50 MB
  sendVideo cap by falling back to sendDocument for oversized videos.
- sendMediaGroup byte-budget enforcement (TELEGRAM_MAX_GROUP_TOTAL_BYTES,
  45 MB) with per-item fallback on chunk failure so a stale file_id no
  longer silently drops a cached asset.

Tests:
- New: diagnostic_mode, dispatch_summary, request_correlation,
  telegram_media_group_partial, telegram_per_send_options.

Docs:
- .claude/reviews/: six-axis production-readiness review of v0.8.1.
- .claude/docs/functional-review-2026-05-28.md: focused review of
  Telegram/Immich/logging subsystems.
2026-05-28 15:19:31 +03:00

845 lines
34 KiB
Python

"""Deferred-dispatch infrastructure for quiet-hours notifications.
When ``evaluate_event_gate`` returns ``QUIET_HOURS`` for a deferrable event
type, the dispatch site calls :func:`defer_event` instead of dropping. That
either inserts a new ``DeferredDispatch`` row or coalesces the event into an
existing pending row for the same ``(link_id, collection_id)`` — asset add
+ matching remove cancels out, asset add + asset add merges set-union.
An APScheduler one-shot ``date`` job per quiet-window-end fires
:func:`drain_deferred_due` which:
1. Re-resolves each pending row's link/target/configs against current state.
2. Drops rows whose link/target was deleted or disabled in the meantime.
3. Re-checks quiet hours (in case the user extended the window mid-flight)
and pushes ``fire_at`` to the new end if still suppressed.
4. Dispatches via the existing ``NotificationDispatcher``.
5. Writes a follow-up ``event_log`` row referencing the original
``event_log_id`` so the dashboard shows "delivered late".
Wall-clock event types (``scheduled_message``) are explicitly NOT in
``_DEFERRABLE_EVENT_TYPES`` — delivering a "good morning" memory at 3 pm is
worse than dropping it. Those keep the legacy drop-on-quiet-hours behavior.
"""
from __future__ import annotations
import asyncio
import dataclasses
import logging
from datetime import datetime, timezone
from typing import Any
from sqlalchemy.orm.attributes import flag_modified
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from notify_bridge_core.log_context import (
bind_log_context,
ensure_dispatch_id,
enrich_details_with_correlation,
)
from notify_bridge_core.models.events import EventType, ServiceEvent
from notify_bridge_core.models.media import MediaAsset, MediaType
from notify_bridge_core.notifications.dispatcher import (
NotificationDispatcher,
TargetConfig,
)
from notify_bridge_core.providers.base import ServiceProviderType
from ..database.engine import get_engine
from ..database.models import (
DeferredDispatch,
EventLog,
NotificationTracker,
ServiceProvider,
)
from .dispatch_helpers import (
GateReason,
apply_tracking_display_filters,
evaluate_event_gate,
get_app_timezone,
load_link_data,
resolve_provider_credential,
)
from .dispatch_summary import summarize_dispatch_results
_LOGGER = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Policy
# ---------------------------------------------------------------------------
# Change-driven event types that are safe to deliver after the quiet window
# ends — the underlying state change (a photo was added, a PR was opened, the
# UPS went on battery) remains relevant even hours later. Wall-clock event
# types (``scheduled_message``) are deliberately excluded: a "good morning"
# delivered at 3 pm is wrong, drop is more correct than late delivery.
_DEFERRABLE_EVENT_TYPES: frozenset[str] = frozenset({
# Immich
"assets_added", "assets_removed",
"collection_renamed", "collection_deleted", "sharing_changed",
# Gitea
"push",
"issue_opened", "issue_closed", "issue_commented",
"pr_opened", "pr_closed", "pr_merged", "pr_commented",
"release_published",
# Planka
"card_created", "card_updated", "card_moved", "card_deleted",
"card_commented", "comment_updated",
"board_created", "board_updated", "board_deleted",
"list_created", "list_updated", "list_deleted",
"attachment_created", "card_label_added", "task_completed",
# Generic webhook
"webhook_received",
# NUT (UPS)
"ups_online", "ups_on_battery", "ups_low_battery",
"ups_battery_restored", "ups_comms_lost", "ups_comms_restored",
"ups_replace_battery", "ups_overload",
# Home Assistant — state changes & automations are change-driven; the
# underlying state remains relevant after the quiet window.
"ha_state_changed", "ha_automation_triggered",
"ha_service_called", "ha_event_fired",
})
# Per-tracker cap on the pending queue. A misconfigured short quiet window
# plus a chatty upstream (e.g. mass-imported album) could otherwise grow
# unbounded. On overflow we drop oldest (FIFO) — recent events still survive
# to be delivered, ancient ones are sacrificed.
_MAX_PENDING_PER_TRACKER = 1000
# Per-row timeout in the drain. Without this, a single hanging Telegram/SMTP
# call could stall the whole drain for hours and leave the rest of the queue
# stranded. Generous because legitimate large media uploads can take minutes.
_DRAIN_DISPATCH_TIMEOUT_SECONDS = 120
def is_deferrable(event_type: str) -> bool:
"""Whether this event type should be deferred (vs. dropped) during quiet hours."""
return event_type in _DEFERRABLE_EVENT_TYPES
# ---------------------------------------------------------------------------
# ServiceEvent (de)serialization
# ---------------------------------------------------------------------------
#
# JSON column stores ``dataclasses.asdict(event)`` plus a normalisation pass
# for datetimes (ISO strings) and enums (string values). Round-trip via the
# reverse pass below.
def _normalize_for_json(value: Any) -> Any:
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, (EventType, MediaType, ServiceProviderType)):
return value.value
if isinstance(value, dict):
return {k: _normalize_for_json(v) for k, v in value.items()}
if isinstance(value, (list, tuple)):
return [_normalize_for_json(v) for v in value]
return value
def serialize_event(event: ServiceEvent) -> dict[str, Any]:
"""Convert a ``ServiceEvent`` to a JSON-safe dict for ``DeferredDispatch.event_payload``."""
return _normalize_for_json(dataclasses.asdict(event))
def _parse_dt(s: Any) -> datetime:
if isinstance(s, datetime):
return s
return datetime.fromisoformat(s)
def _deserialize_asset(data: dict[str, Any]) -> MediaAsset:
return MediaAsset(
id=data["id"],
type=MediaType(data["type"]),
filename=data["filename"],
created_at=_parse_dt(data["created_at"]),
owner_name=data.get("owner_name"),
description=data.get("description"),
tags=list(data.get("tags") or []),
thumbnail_url=data.get("thumbnail_url"),
preview_url=data.get("preview_url"),
full_url=data.get("full_url"),
extra=dict(data.get("extra") or {}),
)
def deserialize_event(data: dict[str, Any]) -> ServiceEvent:
"""Inverse of :func:`serialize_event`."""
return ServiceEvent(
event_type=EventType(data["event_type"]),
provider_type=ServiceProviderType(data["provider_type"]),
provider_name=data["provider_name"],
collection_id=data["collection_id"],
collection_name=data["collection_name"],
timestamp=_parse_dt(data["timestamp"]),
added_assets=[_deserialize_asset(a) for a in data.get("added_assets") or []],
removed_asset_ids=list(data.get("removed_asset_ids") or []),
added_count=int(data.get("added_count") or 0),
removed_count=int(data.get("removed_count") or 0),
old_name=data.get("old_name"),
new_name=data.get("new_name"),
old_shared=data.get("old_shared"),
new_shared=data.get("new_shared"),
extra=dict(data.get("extra") or {}),
)
# ---------------------------------------------------------------------------
# Coalescing
# ---------------------------------------------------------------------------
def _added_ids(payload: dict[str, Any]) -> list[str]:
return [a["id"] for a in payload.get("added_assets") or [] if "id" in a]
def _coalesce_assets_added(
new_event: ServiceEvent,
existing_added_row: DeferredDispatch | None,
existing_removed_row: DeferredDispatch | None,
) -> tuple[str, DeferredDispatch | None, DeferredDispatch | None]:
"""Apply add-then-remove cancellation and add-then-add union.
Returns ``(action, updated_added_row, updated_removed_row)`` where action
is one of ``"insert"`` (caller must create a new row), ``"merge"`` (update
existing rows in place — caller must session.add them).
"""
new_ids = [a.id for a in new_event.added_assets]
new_ids_set = set(new_ids)
# 1) If a matching assets_removed row pending: subtract — that's a re-add.
if existing_removed_row is not None:
removed_ids = list(existing_removed_row.event_payload.get("removed_asset_ids") or [])
kept = [rid for rid in removed_ids if rid not in new_ids_set]
if len(kept) != len(removed_ids):
payload = dict(existing_removed_row.event_payload)
payload["removed_asset_ids"] = kept
payload["removed_count"] = len(kept)
existing_removed_row.event_payload = payload
# Belt-and-braces: SQLAlchemy's mutation tracker sometimes
# misses JSON-typed reassignments depending on dialect / column
# config. Explicit flag_modified guarantees the dirty bit is
# set for the upcoming flush.
flag_modified(existing_removed_row, "event_payload")
if not kept:
# All previously-removed IDs are being re-added → entire
# removal is cancelled. Mark for caller to delete.
existing_removed_row.status = "cancelled"
# The intersection re-adds are accounted for by the cancellation;
# remaining new IDs (those NOT in removed list) still need to land
# in the assets_added row.
new_ids = [nid for nid in new_ids if nid not in set(removed_ids)]
new_ids_set = set(new_ids)
if not new_ids:
# All new added IDs cancelled an existing remove → nothing to enqueue.
return ("merge", None, existing_removed_row)
if existing_added_row is None:
return ("insert", None, existing_removed_row)
# 2) Union with existing assets_added — earliest fire_at wins.
payload = dict(existing_added_row.event_payload)
existing_assets = list(payload.get("added_assets") or [])
seen = {a.get("id") for a in existing_assets}
new_serialized = serialize_event(new_event)
for a in new_serialized.get("added_assets") or []:
if a.get("id") in new_ids_set and a.get("id") not in seen:
existing_assets.append(a)
seen.add(a.get("id"))
payload["added_assets"] = existing_assets
payload["added_count"] = len(existing_assets)
existing_added_row.event_payload = payload
flag_modified(existing_added_row, "event_payload")
return ("merge", existing_added_row, existing_removed_row)
def _coalesce_assets_removed(
new_event: ServiceEvent,
existing_added_row: DeferredDispatch | None,
existing_removed_row: DeferredDispatch | None,
) -> tuple[str, DeferredDispatch | None, DeferredDispatch | None]:
"""Mirror of :func:`_coalesce_assets_added` for removal events."""
new_ids = list(new_event.removed_asset_ids)
new_ids_set = set(new_ids)
# 1) If a matching assets_added row pending: subtract — that's an
# add-then-remove within the window, cancel both sides.
if existing_added_row is not None:
added = list(existing_added_row.event_payload.get("added_assets") or [])
kept_assets = [a for a in added if a.get("id") not in new_ids_set]
if len(kept_assets) != len(added):
payload = dict(existing_added_row.event_payload)
payload["added_assets"] = kept_assets
payload["added_count"] = len(kept_assets)
existing_added_row.event_payload = payload
flag_modified(existing_added_row, "event_payload")
if not kept_assets:
existing_added_row.status = "cancelled"
# IDs that were just added during the window don't need to flow
# into the assets_removed row — they're a wash.
cancelled_ids = {a.get("id") for a in added if a.get("id") in new_ids_set}
new_ids = [nid for nid in new_ids if nid not in cancelled_ids]
new_ids_set = set(new_ids)
if not new_ids:
return ("merge", existing_added_row, None)
if existing_removed_row is None:
return ("insert", existing_added_row, None)
# 2) Union with existing assets_removed — earliest fire_at wins.
payload = dict(existing_removed_row.event_payload)
existing_ids = list(payload.get("removed_asset_ids") or [])
seen = set(existing_ids)
for rid in new_ids:
if rid not in seen:
existing_ids.append(rid)
seen.add(rid)
payload["removed_asset_ids"] = existing_ids
payload["removed_count"] = len(existing_ids)
existing_removed_row.event_payload = payload
flag_modified(existing_removed_row, "event_payload")
return ("merge", existing_added_row, existing_removed_row)
async def _find_pending_asset_rows(
session: AsyncSession,
link_id: int,
collection_id: str,
) -> tuple[DeferredDispatch | None, DeferredDispatch | None]:
"""Return ``(assets_added_row, assets_removed_row)`` pending for this link+collection."""
result = await session.exec(
select(DeferredDispatch).where(
DeferredDispatch.link_id == link_id,
DeferredDispatch.collection_id == collection_id,
DeferredDispatch.status == "pending",
DeferredDispatch.event_type.in_(["assets_added", "assets_removed"]),
)
)
added_row: DeferredDispatch | None = None
removed_row: DeferredDispatch | None = None
for row in result.all():
if row.event_type == "assets_added":
added_row = row
elif row.event_type == "assets_removed":
removed_row = row
return added_row, removed_row
async def _trim_queue_if_needed(
session: AsyncSession,
tracker_id: int,
) -> None:
"""Drop oldest pending rows beyond the per-tracker cap with a log row each.
Loads the parent tracker so the emitted event_log rows carry proper
``tracker_name``/``provider_id``/``provider_name`` and slot into the
dashboard's "by tracker" grouping — without these the drop rows show up
under an unattributed bucket and confuse the audit trail.
"""
rows = (await session.exec(
select(DeferredDispatch).where(
DeferredDispatch.tracker_id == tracker_id,
DeferredDispatch.status == "pending",
).order_by(DeferredDispatch.fire_at.asc(), DeferredDispatch.id.asc())
)).all()
overflow = len(rows) - _MAX_PENDING_PER_TRACKER
if overflow <= 0:
return
_LOGGER.warning(
"Deferred queue for tracker %d exceeds cap (%d > %d); dropping %d oldest",
tracker_id, len(rows), _MAX_PENDING_PER_TRACKER, overflow,
)
tracker = await session.get(NotificationTracker, tracker_id)
tracker_name = tracker.name if tracker else ""
provider_id = tracker.provider_id if tracker else None
provider_name = ""
if tracker is not None and provider_id is not None:
provider = await session.get(ServiceProvider, provider_id)
if provider is not None:
provider_name = provider.name
for row in rows[:overflow]:
await _mark_dropped(
session, row,
tracker_name=tracker_name,
provider_id=provider_id,
provider_name=provider_name,
reason="queue_overflow",
)
# ---------------------------------------------------------------------------
# Enqueue (called from dispatch sites when gate returns QUIET_HOURS)
# ---------------------------------------------------------------------------
async def defer_event(
session: AsyncSession,
*,
event: ServiceEvent,
user_id: int | None,
tracker_id: int,
link_id: int,
event_log_id: int | None,
fire_at: datetime,
) -> str:
"""Persist a deferred dispatch (or coalesce into an existing one).
Caller is responsible for committing the session. Returns one of:
* ``"inserted"`` — a fresh DeferredDispatch row was created.
* ``"merged"`` — coalesced into an existing row (union or partial cancel).
* ``"cancelled"`` — the new event fully cancelled an existing pending one
(add-then-remove or remove-then-readd of the same asset IDs). Both sides
are gone after this call.
* ``"non_deferrable"`` — event type is wall-clock; caller should drop it
with a ``"suppressed_quiet_hours_nondeferrable"`` event_log row.
"""
event_type = event.event_type.value
if not is_deferrable(event_type):
return "non_deferrable"
fire_at_utc = fire_at.astimezone(timezone.utc) if fire_at.tzinfo else fire_at.replace(tzinfo=timezone.utc)
# Asset events get set-merging across the same link+collection. Everything
# else just gets a new row — those events aren't naturally cancellable.
if event_type in ("assets_added", "assets_removed"):
added_row, removed_row = await _find_pending_asset_rows(
session, link_id, event.collection_id,
)
if event_type == "assets_added":
action, upd_added, upd_removed = _coalesce_assets_added(
event, added_row, removed_row,
)
else:
action, upd_added, upd_removed = _coalesce_assets_removed(
event, added_row, removed_row,
)
# Apply pending updates. ``status="cancelled"`` rows are deleted
# outright so the drain doesn't see them.
fully_cancelled = False
for row in (upd_added, upd_removed):
if row is None:
continue
if row.status == "cancelled":
await session.delete(row)
fully_cancelled = True
else:
session.add(row)
if action == "insert":
new_row = DeferredDispatch(
user_id=user_id,
tracker_id=tracker_id,
link_id=link_id,
event_log_id=event_log_id,
event_type=event_type,
collection_id=event.collection_id,
event_payload=serialize_event(event),
fire_at=fire_at_utc,
status="pending",
)
session.add(new_row)
await _trim_queue_if_needed(session, tracker_id)
return "inserted"
# action == "merge" — either updated existing or fully cancelled.
return "cancelled" if fully_cancelled and (upd_added is None or upd_added.status == "cancelled") and (upd_removed is None or upd_removed.status == "cancelled") else "merged"
# Non-asset event: no coalescing, fresh row.
new_row = DeferredDispatch(
user_id=user_id,
tracker_id=tracker_id,
link_id=link_id,
event_log_id=event_log_id,
event_type=event_type,
collection_id=event.collection_id,
event_payload=serialize_event(event),
fire_at=fire_at_utc,
status="pending",
)
session.add(new_row)
await _trim_queue_if_needed(session, tracker_id)
return "inserted"
# ---------------------------------------------------------------------------
# Drain (called by APScheduler date job at quiet_hours_end_at)
# ---------------------------------------------------------------------------
async def drain_deferred_due(now: datetime | None = None) -> dict[str, int]:
"""Dispatch all pending DeferredDispatch rows whose ``fire_at <= now``.
Re-resolves link/target/configs against current DB state so config edits
between suppression and drain time take effect. Returns a small stats
dict for logging.
Implementation note: rows are *re-fetched* by id inside each per-tracker
session rather than carried across session boundaries. Carrying a row
instance to a new session and calling ``session.add(row)`` on a detached
PK-bearing instance triggers an INSERT (collision with the existing PK)
on flush — a class of bug that's invisible until the first session
closes, hence the up-front re-fetch.
"""
now_utc = (now or datetime.now(timezone.utc))
if now_utc.tzinfo is None:
now_utc = now_utc.replace(tzinfo=timezone.utc)
stats = {"fired": 0, "dropped": 0, "rescheduled": 0, "errors": 0}
engine = get_engine()
async with AsyncSession(engine) as session:
# Only pull the row identity + grouping key. Loading the full ORM
# objects in a session that's about to close just wastes work — we
# re-fetch fresh attached instances in the per-tracker session below.
ident_rows = (await session.exec(
select(DeferredDispatch.id, DeferredDispatch.tracker_id).where(
DeferredDispatch.status == "pending",
DeferredDispatch.fire_at <= now_utc,
).order_by(DeferredDispatch.fire_at.asc())
)).all()
if not ident_rows:
_LOGGER.debug("drain_deferred_due: no pending rows due")
return stats
_LOGGER.info(
"Draining %d deferred dispatches due at %s",
len(ident_rows), now_utc.isoformat(),
)
# Group by tracker so a single per-tracker session can re-fetch its rows
# (attached) and re-resolve link state once.
ids_by_tracker: dict[int, list[int]] = {}
for row_id, tracker_id in ident_rows:
if row_id is None:
continue
ids_by_tracker.setdefault(tracker_id, []).append(row_id)
from .watcher import _get_telegram_caches
from .http_session import get_http_session
url_cache, asset_cache = await _get_telegram_caches()
shared_session = await get_http_session()
dispatcher = NotificationDispatcher(
url_cache=url_cache, asset_cache=asset_cache, session=shared_session,
)
for tracker_id, row_ids in ids_by_tracker.items():
async with AsyncSession(engine) as session:
tracker = await session.get(NotificationTracker, tracker_id)
# Re-fetch rows freshly attached to THIS session.
rows = (await session.exec(
select(DeferredDispatch).where(DeferredDispatch.id.in_(row_ids))
)).all()
if tracker is None or not tracker.enabled:
# Tracker deleted or disabled between defer and drain — drop
# all pending rows for it. Disable matches the live-path
# invariant (watcher / webhooks / scheduled_dispatch all
# short-circuit when ``tracker.enabled`` is False).
reason = "tracker_removed" if tracker is None else "tracker_disabled_after_defer"
for row in rows:
await _mark_dropped(
session, row,
tracker=tracker, reason=reason,
)
stats["dropped"] += 1
await session.commit()
continue
provider = await session.get(ServiceProvider, tracker.provider_id)
provider_config = dict(provider.config) if provider else {}
provider_id = provider.id if provider else tracker.provider_id
provider_name = provider.name if provider else ""
app_tz = await get_app_timezone(session)
# Reload current link state. Broadcast links emit ONE entry per
# child target sharing the SAME parent ``link_id`` — a plain
# ``{link_id: ld}`` dict would silently drop N-1 children. The
# drain dispatches to every expanded entry for the parent.
link_data = await load_link_data(session, tracker_id)
link_by_id: dict[int, list[dict[str, Any]]] = {}
for ld in link_data:
key = ld.get("link_id")
if key is None:
continue
link_by_id.setdefault(key, []).append(ld)
for row in rows:
try:
await _process_row(
session, row, tracker, provider_id, provider_name,
provider_config, app_tz, link_by_id, dispatcher, stats,
)
except Exception as err: # noqa: BLE001 — keep draining other rows
_LOGGER.exception(
"Drain failed for deferred dispatch id=%s: %s", row.id, err,
)
stats["errors"] += 1
await session.commit()
_LOGGER.info("Drain complete: %s", stats)
return stats
async def _mark_dropped(
session: AsyncSession,
row: DeferredDispatch,
*,
tracker: NotificationTracker | None = None,
tracker_name: str = "",
provider_id: int | None = None,
provider_name: str = "",
reason: str,
) -> None:
"""Record a drop on the deferred row and emit a follow-up event_log entry.
``tracker``/``tracker_name``/``provider_id``/``provider_name`` populate
the new event_log row's owner/provider columns so the dashboard "by
tracker" grouping works for the drop path. Without these the row would
have empty strings and slot into the "unknown" bucket.
"""
if tracker is not None:
tracker_name = tracker_name or tracker.name
if provider_id is None:
provider_id = tracker.provider_id
payload = row.event_payload if isinstance(row.event_payload, dict) else {}
row.status = "dropped"
row.fired_at = datetime.now(timezone.utc)
session.add(row)
session.add(EventLog(
user_id=row.user_id,
tracker_id=row.tracker_id,
tracker_name=tracker_name,
provider_id=provider_id,
provider_name=provider_name,
event_type=row.event_type,
collection_id=row.collection_id,
collection_name=payload.get("collection_name", ""),
assets_count=int(payload.get("added_count", 0))
or int(payload.get("removed_count", 0)),
details=enrich_details_with_correlation({
"dispatch_status": "deferred_then_dropped",
"reason": reason,
"original_event_log_id": row.event_log_id,
"provider_type": payload.get("provider_type", ""),
}),
))
async def _process_row(
session: AsyncSession,
row: DeferredDispatch,
tracker: NotificationTracker,
provider_id: int,
provider_name: str,
provider_config: dict[str, Any],
app_tz: str,
link_by_id: dict[int, list[dict[str, Any]]],
dispatcher: NotificationDispatcher,
stats: dict[str, int],
) -> None:
"""Drain a single row: re-resolve link, re-evaluate gate, dispatch.
``link_by_id`` maps parent link_id → list of expanded entries (one per
broadcast child, or a single-element list for regular targets). Every
entry produces its own target_config so a broadcast deferred row fans
out to all current children at drain time.
"""
# Bind a fresh dispatch_id per drained row so the EventLog rows written
# by the success/drop paths AND the inner dispatcher's log lines share
# one id. Each deferred row is a logically separate dispatch attempt.
with bind_log_context(dispatch_id=ensure_dispatch_id()):
await _process_row_impl(
session, row, tracker, provider_id, provider_name,
provider_config, app_tz, link_by_id, dispatcher, stats,
)
async def _process_row_impl(
session: AsyncSession,
row: DeferredDispatch,
tracker: NotificationTracker,
provider_id: int,
provider_name: str,
provider_config: dict[str, Any],
app_tz: str,
link_by_id: dict[int, list[dict[str, Any]]],
dispatcher: NotificationDispatcher,
stats: dict[str, int],
) -> None:
expanded = link_by_id.get(row.link_id)
if not expanded:
# Link removed/disabled between defer and drain.
await _mark_dropped(
session, row,
tracker=tracker, provider_id=provider_id, provider_name=provider_name,
reason="link_removed",
)
stats["dropped"] += 1
return
# Every expanded entry for a parent link shares the same tracking_config,
# so the gate decision and ``apply_tracking_display_filters`` shaping are
# made once. Only the target_configs differ across children.
tc = expanded[0].get("tracking_config")
event = deserialize_event(row.event_payload)
if tc is not None:
outcome = evaluate_event_gate(event, tc, app_tz)
if outcome.reason is GateReason.EVENT_TYPE_DISABLED:
await _mark_dropped(
session, row,
tracker=tracker, provider_id=provider_id, provider_name=provider_name,
reason="event_type_disabled_after_defer",
)
stats["dropped"] += 1
return
if outcome.reason is GateReason.QUIET_HOURS and outcome.quiet_hours_end_at is not None:
row.fire_at = outcome.quiet_hours_end_at
session.add(row)
stats["rescheduled"] += 1
try:
from .scheduler import schedule_deferred_drain
schedule_deferred_drain(outcome.quiet_hours_end_at)
except Exception: # noqa: BLE001
_LOGGER.exception(
"Failed to reschedule drain for %s", outcome.quiet_hours_end_at,
)
return
shaped = apply_tracking_display_filters(event, tc)
if shaped is None:
# ``notify_favorites_only`` (or another display filter) dropped every
# asset from the event. Inconsistent earlier behavior swallowed this
# silently; we now route through the same "dropped + event_log"
# pathway as link_removed so the dashboard shows why.
await _mark_dropped(
session, row,
tracker=tracker, provider_id=provider_id, provider_name=provider_name,
reason="filtered_after_defer",
)
stats["dropped"] += 1
return
# Build one target_config per expanded child (regular targets → length 1;
# broadcast → length N children).
target_configs: list[TargetConfig] = []
for ld in expanded:
tmpl = ld.get("template_config")
target_configs.append(TargetConfig(
type=ld["target_type"],
config=ld["target_config"],
template_slots=ld.get("template_slots"),
date_format=tmpl.date_format if tmpl else "%d.%m.%Y, %H:%M UTC",
date_only_format=(tmpl.date_only_format if tmpl and tmpl.date_only_format else "%d.%m.%Y"),
provider_api_key=resolve_provider_credential(provider_config),
provider_internal_url=provider_config.get("url", ""),
provider_external_url=provider_config.get("external_domain", "") or provider_config.get("url", ""),
receivers=ld["receivers"],
))
# Per-row timeout — a single hanging remote call (Telegram outage, slow
# SMTP) must not stall the rest of the queue.
try:
results = await asyncio.wait_for(
dispatcher.dispatch(shaped, target_configs),
timeout=_DRAIN_DISPATCH_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
_LOGGER.warning(
"Drain dispatch for row %s timed out after %ds",
row.id, _DRAIN_DISPATCH_TIMEOUT_SECONDS,
)
results = [{"success": False, "error": f"timeout after {_DRAIN_DISPATCH_TIMEOUT_SECONDS}s"}]
success = any(r.get("success") for r in results)
row.status = "fired" if success else "dropped"
row.fired_at = datetime.now(timezone.utc)
session.add(row)
summary = summarize_dispatch_results(results)
if success:
stats["fired"] += 1
session.add(EventLog(
user_id=row.user_id,
tracker_id=row.tracker_id,
tracker_name=tracker.name,
provider_id=provider_id,
provider_name=provider_name,
event_type=row.event_type,
collection_id=row.collection_id,
collection_name=event.collection_name,
assets_count=event.added_count or event.removed_count or 0,
details=enrich_details_with_correlation({
"dispatch_status": "delivered_after_quiet_hours",
"original_event_log_id": row.event_log_id,
"deferred_for_seconds": int(
(row.fired_at - row.created_at).total_seconds()
),
"provider_type": event.provider_type.value,
"dispatch_summary": summary,
}),
))
else:
stats["dropped"] += 1
first_err = next((r.get("error") for r in results if not r.get("success")), "unknown")
session.add(EventLog(
user_id=row.user_id,
tracker_id=row.tracker_id,
tracker_name=tracker.name,
provider_id=provider_id,
provider_name=provider_name,
event_type=row.event_type,
collection_id=row.collection_id,
collection_name=event.collection_name,
assets_count=event.added_count or event.removed_count or 0,
details=enrich_details_with_correlation({
"dispatch_status": "deferred_then_failed",
"reason": str(first_err)[:200],
"original_event_log_id": row.event_log_id,
"provider_type": event.provider_type.value,
"dispatch_summary": summary,
}),
))
# ---------------------------------------------------------------------------
# Startup: reschedule pending drain jobs found in the DB
# ---------------------------------------------------------------------------
async def load_pending_drain_jobs() -> int:
"""At startup, scan ``DeferredDispatch`` for pending rows and (re)schedule drains.
Rows whose ``fire_at`` already passed get a single immediate-fire job; the
rest get one job per distinct ``fire_at`` (minute-rounded) so all rows
sharing a window end share a drain.
"""
from .scheduler import schedule_deferred_drain
engine = get_engine()
async with AsyncSession(engine) as session:
rows = (await session.exec(
select(DeferredDispatch.fire_at).where(
DeferredDispatch.status == "pending",
)
)).all()
if not rows:
return 0
unique_fire_ats: set[datetime] = set()
for fa in rows:
if isinstance(fa, datetime):
unique_fire_ats.add(fa.astimezone(timezone.utc) if fa.tzinfo else fa.replace(tzinfo=timezone.utc))
for fa in unique_fire_ats:
schedule_deferred_drain(fa)
_LOGGER.info(
"Loaded %d pending deferred dispatches; scheduled %d drain job(s)",
len(rows), len(unique_fire_ats),
)
return len(unique_fire_ats)