Files
notify-bridge/packages/server/src/notify_bridge_server/services/bridge_self.py
T
alexei.dolgolyov 8651767112 feat: bridge_self bot commands — status, thresholds, reset, health
Adds bot commands for the bridge_self provider so operators can inspect
and manage bridge health from chat: /status, /thresholds, /reset, /health.
Includes Jinja2 templates for both locales, seed data, capability slots,
and a handler that exposes pending deferred backlog plus per-counter
reset. Also adds .claude/skills/ for project-scoped graph-aware skills.
2026-05-16 03:43:48 +03:00

614 lines
22 KiB
Python

"""Bridge self-monitoring service helpers.
Three subsystems feed into ``emit_bridge_self_event``:
1. The watcher's poll loop, when consecutive provider polls fail.
2. A periodic scheduler job, when the deferred-dispatch backlog crosses
the configured threshold.
3. The notification dispatcher, when consecutive sends to a single target
fail with 5xx / network errors.
The helper looks up the user's ``bridge_self`` provider, builds a
synthetic :class:`ServiceEvent`, and pushes it through the same
``dispatch_provider_event`` pipeline that every other provider uses.
That keeps templates, quiet hours, deferral, target gating, and event
logging consistent with the rest of the system.
We intentionally avoid raising into the caller's flow — a
self-monitoring failure must never break the subsystem it's monitoring.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import Any
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from notify_bridge_core.models.events import ServiceEvent
from notify_bridge_core.providers.bridge_self import build_event
from notify_bridge_core.providers.bridge_self.provider import (
DEFAULT_DEFERRED_BACKLOG_THRESHOLD,
DEFAULT_POLL_FAILURE_THRESHOLD,
DEFAULT_TARGET_FAILURE_THRESHOLD,
)
from ..database.engine import get_engine
from ..database.models import ServiceProvider, User
_LOGGER = logging.getLogger(__name__)
# Detail keys carried into the EventLog.details JSON column. Mirrors the
# pattern used by the HA subscription and webhook routers for their
# ``dispatch_provider_event`` calls.
BRIDGE_SELF_DETAIL_KEYS: tuple[str, ...] = (
"failure_type", "subject_id", "subject_name",
"count", "threshold", "last_error", "details",
)
async def get_bridge_self_provider(
session: AsyncSession, user_id: int,
) -> ServiceProvider | None:
"""Return the user's bridge_self provider row (or None if absent)."""
result = await session.exec(
select(ServiceProvider).where(
ServiceProvider.user_id == user_id,
ServiceProvider.type == "bridge_self",
)
)
return result.first()
async def get_user_thresholds(user_id: int) -> dict[str, int]:
"""Return the user's bridge_self thresholds, falling back to defaults.
Reads in a short-lived session — emission sites should NOT hold a
transaction across this call.
"""
engine = get_engine()
async with AsyncSession(engine) as session:
provider = await get_bridge_self_provider(session, user_id)
if provider is None:
return {
"poll_failure_threshold": DEFAULT_POLL_FAILURE_THRESHOLD,
"deferred_backlog_threshold": DEFAULT_DEFERRED_BACKLOG_THRESHOLD,
"target_failure_threshold": DEFAULT_TARGET_FAILURE_THRESHOLD,
}
cfg = dict(provider.config or {})
def _int(key: str, fallback: int) -> int:
raw = cfg.get(key, fallback)
try:
value = int(raw)
except (TypeError, ValueError):
return fallback
return value if value >= 1 else fallback
return {
"poll_failure_threshold": _int("poll_failure_threshold", DEFAULT_POLL_FAILURE_THRESHOLD),
"deferred_backlog_threshold": _int(
"deferred_backlog_threshold", DEFAULT_DEFERRED_BACKLOG_THRESHOLD,
),
"target_failure_threshold": _int(
"target_failure_threshold", DEFAULT_TARGET_FAILURE_THRESHOLD,
),
}
async def emit_bridge_self_event(
*,
user_id: int,
failure_type: str,
subject_id: int,
subject_name: str,
count: int,
threshold: int,
last_error: str = "",
details: dict[str, Any] | None = None,
timestamp: datetime | None = None,
) -> int:
"""Emit a self-monitoring event for ``user_id``.
Resolves the user's bridge_self provider and dispatches the event via
``dispatch_provider_event``. Returns the number of dispatched
notifications (0 when the user has no bridge_self provider, no
matching trackers, or the event was suppressed by quiet hours / event-
type gating).
Always swallows internal exceptions so the calling subsystem keeps
running — self-monitoring must never crash the watcher / scheduler /
dispatcher.
"""
payload = {
"failure_type": failure_type,
"subject_id": subject_id,
"subject_name": subject_name,
"count": count,
"threshold": threshold,
"last_error": last_error,
"details": dict(details or {}),
}
event = build_event(payload, timestamp=timestamp or datetime.now(timezone.utc))
if event is None:
_LOGGER.debug("Skipping malformed bridge_self payload: %s", payload)
return 0
engine = get_engine()
try:
async with AsyncSession(engine) as session:
provider = await get_bridge_self_provider(session, user_id)
if provider is None:
_LOGGER.debug(
"User %s has no bridge_self provider; skipping %s emission",
user_id, failure_type,
)
return 0
provider_id = provider.id
provider_name = provider.name
provider_config = dict(provider.config or {})
# Imported here to avoid a top-level cycle: dispatch_helpers imports
# several models which transitively touch this module's siblings.
from .event_dispatch import dispatch_provider_event
return await dispatch_provider_event(
engine=engine,
provider_id=provider_id,
provider_name=provider_name,
provider_config=provider_config,
event=event,
detail_keys=BRIDGE_SELF_DETAIL_KEYS,
filter_fn=lambda _ev, _filters: True,
)
except Exception: # noqa: BLE001
_LOGGER.exception(
"bridge_self emission failed (user=%s, failure_type=%s)",
user_id, failure_type,
)
return 0
# ---------------------------------------------------------------------------
# Threshold-crossing trackers (in-memory, per-process).
#
# We track consecutive failure counts in module-level dicts keyed by the
# subject id (tracker_id, target_id). On threshold crossing we emit and
# reset the counter so we don't spam — the next emission only happens after
# another full streak of failures.
# ---------------------------------------------------------------------------
# Tracker poll failures (keyed by tracker_id).
_poll_failure_counts: dict[int, int] = {}
_poll_failure_last_error: dict[int, str] = {}
# Target send failures (keyed by target_id).
_target_failure_counts: dict[int, int] = {}
_target_failure_last_error: dict[int, str] = {}
# Last-known backlog state per user (True = above threshold, False = below).
# We only emit on the False -> True transition so a sustained backlog
# triggers exactly one notification per crossing.
_backlog_above_threshold: dict[int, bool] = {}
def record_poll_success(tracker_id: int) -> None:
"""Reset the failure counter for ``tracker_id`` after a successful poll."""
_poll_failure_counts.pop(tracker_id, None)
_poll_failure_last_error.pop(tracker_id, None)
def record_poll_failure(tracker_id: int, error: str = "") -> int:
"""Increment the failure counter for ``tracker_id``; return the new count."""
_poll_failure_counts[tracker_id] = _poll_failure_counts.get(tracker_id, 0) + 1
if error:
_poll_failure_last_error[tracker_id] = error
return _poll_failure_counts[tracker_id]
def reset_poll_counter(tracker_id: int) -> None:
"""Clear the failure counter for ``tracker_id`` without emitting."""
_poll_failure_counts.pop(tracker_id, None)
_poll_failure_last_error.pop(tracker_id, None)
def record_target_success(target_id: int) -> None:
"""Reset the failure counter for ``target_id`` after a successful send."""
_target_failure_counts.pop(target_id, None)
_target_failure_last_error.pop(target_id, None)
def record_target_failure(target_id: int, error: str = "") -> int:
"""Increment the failure counter for ``target_id``; return the new count."""
_target_failure_counts[target_id] = _target_failure_counts.get(target_id, 0) + 1
if error:
_target_failure_last_error[target_id] = error
return _target_failure_counts[target_id]
def reset_target_counter(target_id: int) -> None:
"""Clear the failure counter for ``target_id`` without emitting."""
_target_failure_counts.pop(target_id, None)
_target_failure_last_error.pop(target_id, None)
def record_backlog_state(user_id: int, above_threshold: bool) -> bool:
"""Record the new backlog state, returning True iff we just crossed up.
The first ever observation is treated as "below" so a process that
starts with a non-empty backlog still emits one notification.
"""
prior = _backlog_above_threshold.get(user_id, False)
_backlog_above_threshold[user_id] = above_threshold
return above_threshold and not prior
def get_poll_failure_count(tracker_id: int) -> int:
return _poll_failure_counts.get(tracker_id, 0)
def get_target_failure_count(target_id: int) -> int:
return _target_failure_counts.get(target_id, 0)
def get_poll_last_error(tracker_id: int) -> str:
return _poll_failure_last_error.get(tracker_id, "")
def get_target_last_error(target_id: int) -> str:
return _target_failure_last_error.get(target_id, "")
def get_all_poll_failures() -> dict[int, int]:
"""Return a snapshot of all current poll failure counters (tracker_id -> count).
Only includes non-zero counters. The returned dict is a copy and can be
iterated safely without holding a reference to the live module-level state.
"""
return {tid: count for tid, count in _poll_failure_counts.items() if count > 0}
def get_all_target_failures() -> dict[int, int]:
"""Return a snapshot of all current target failure counters (target_id -> count).
Only includes non-zero counters.
"""
return {tid: count for tid, count in _target_failure_counts.items() if count > 0}
def reset_counter(failure_type: str, subject_id: int | None = None) -> int:
"""Reset bridge_self failure counters.
Args:
failure_type: One of ``"poll_failures"``, ``"target_failures"``, or
``"all"``. Anything else is treated as a no-op.
subject_id: When ``failure_type`` is ``"poll_failures"`` or
``"target_failures"``, the tracker_id / target_id whose counter
to clear. Ignored when ``failure_type == "all"``.
Returns:
The previous count for the reset entry. For ``"all"``, the total
number of entries cleared across both counter dicts. Idempotent —
clearing an absent entry returns 0.
"""
if failure_type == "all":
cleared = (
len(_poll_failure_counts)
+ len(_target_failure_counts)
)
_poll_failure_counts.clear()
_poll_failure_last_error.clear()
_target_failure_counts.clear()
_target_failure_last_error.clear()
return cleared
if failure_type == "poll_failures" and subject_id is not None:
previous = _poll_failure_counts.get(subject_id, 0)
reset_poll_counter(subject_id)
return previous
if failure_type == "target_failures" and subject_id is not None:
previous = _target_failure_counts.get(subject_id, 0)
reset_target_counter(subject_id)
return previous
return 0
async def get_pending_deferred_count(user_id: int) -> int | None:
"""Return the count of pending DeferredDispatch rows for a user.
Used by command handlers to render the current backlog in /status and
/health responses. Returns ``None`` on any error so command templates
can render "unknown" instead of a misleading "0" — operators looking
at bridge health when the bridge is unhealthy must not be told
everything is fine.
"""
from sqlalchemy import func
from ..database.models import DeferredDispatch
engine = get_engine()
try:
async with AsyncSession(engine) as session:
result = await session.exec(
select(func.count(DeferredDispatch.id))
.where(DeferredDispatch.status == "pending")
.where(DeferredDispatch.user_id == user_id)
)
count = result.first()
return int(count or 0)
except Exception: # noqa: BLE001 — never block a command reply
_LOGGER.exception("get_pending_deferred_count failed for user_id=%s", user_id)
return None
async def get_tracker_name(tracker_id: int) -> str:
"""Return the display name of a NotificationTracker, or ``"tracker {id}"``."""
from ..database.models import NotificationTracker
engine = get_engine()
try:
async with AsyncSession(engine) as session:
tracker = await session.get(NotificationTracker, tracker_id)
if tracker is not None:
return tracker.name or f"tracker {tracker_id}"
except Exception: # noqa: BLE001
_LOGGER.exception("get_tracker_name failed for tracker_id=%s", tracker_id)
return f"tracker {tracker_id}"
async def get_target_name(target_id: int) -> str:
"""Return the display name of a NotificationTarget, or ``"target {id}"``."""
from ..database.models import NotificationTarget
engine = get_engine()
try:
async with AsyncSession(engine) as session:
target = await session.get(NotificationTarget, target_id)
if target is not None:
return target.name or f"target {target_id}"
except Exception: # noqa: BLE001
_LOGGER.exception("get_target_name failed for target_id=%s", target_id)
return f"target {target_id}"
# ---------------------------------------------------------------------------
# User-level helpers
# ---------------------------------------------------------------------------
async def list_user_ids() -> list[int]:
"""Return all real user ids (excluding the __system__ placeholder)."""
engine = get_engine()
async with AsyncSession(engine) as session:
result = await session.exec(select(User.id).where(User.id != 0))
return [int(uid) for uid in result.all() if uid is not None]
async def find_tracker_owner(tracker_id: int) -> int | None:
"""Return the user_id that owns ``tracker_id`` (or None)."""
from ..database.models import NotificationTracker
engine = get_engine()
async with AsyncSession(engine) as session:
tracker = await session.get(NotificationTracker, tracker_id)
if tracker is None:
return None
return int(tracker.user_id)
async def find_target_owner(target_id: int) -> int | None:
"""Return the user_id that owns ``target_id`` (or None)."""
from ..database.models import NotificationTarget
engine = get_engine()
async with AsyncSession(engine) as session:
target = await session.get(NotificationTarget, target_id)
if target is None:
return None
return int(target.user_id)
async def get_user_poll_failures(user_id: int) -> list[dict[str, Any]]:
"""Return ``[{"id", "name", "count"}]`` for trackers owned by ``user_id``.
Single batched query — replaces the N+1 pattern of calling
``get_tracker_name`` per failing tracker, and enforces ownership so
one user cannot see another user's failure list.
"""
from ..database.models import NotificationTracker
snapshot = {tid: c for tid, c in _poll_failure_counts.items() if c > 0}
if not snapshot:
return []
engine = get_engine()
async with AsyncSession(engine) as session:
result = await session.exec(
select(NotificationTracker.id, NotificationTracker.name).where(
NotificationTracker.id.in_(list(snapshot.keys())),
NotificationTracker.user_id == user_id,
)
)
rows = list(result.all())
return [
{"id": int(tid), "name": name or f"tracker {tid}", "count": snapshot[int(tid)]}
for tid, name in rows
]
async def get_user_target_failures(user_id: int) -> list[dict[str, Any]]:
"""Return ``[{"id", "name", "count"}]`` for targets owned by ``user_id``."""
from ..database.models import NotificationTarget
snapshot = {tid: c for tid, c in _target_failure_counts.items() if c > 0}
if not snapshot:
return []
engine = get_engine()
async with AsyncSession(engine) as session:
result = await session.exec(
select(NotificationTarget.id, NotificationTarget.name).where(
NotificationTarget.id.in_(list(snapshot.keys())),
NotificationTarget.user_id == user_id,
)
)
rows = list(result.all())
return [
{"id": int(tid), "name": name or f"target {tid}", "count": snapshot[int(tid)]}
for tid, name in rows
]
async def reset_user_counters(user_id: int) -> int:
"""Clear all poll/target counters for trackers/targets owned by ``user_id``.
Returns the number of distinct (tracker + target) entries cleared.
Cross-user counters are left untouched — addresses the multi-tenant
safety hole where ``reset_counter("all")`` wiped every user's state.
"""
polls = await get_user_poll_failures(user_id)
tgts = await get_user_target_failures(user_id)
cleared = 0
for entry in polls:
if _poll_failure_counts.pop(entry["id"], None) is not None:
cleared += 1
_poll_failure_last_error.pop(entry["id"], None)
for entry in tgts:
if _target_failure_counts.pop(entry["id"], None) is not None:
cleared += 1
_target_failure_last_error.pop(entry["id"], None)
return cleared
# ---------------------------------------------------------------------------
# Backlog scan
# ---------------------------------------------------------------------------
async def check_deferred_backlog() -> dict[str, Any]:
"""Scan the deferred_dispatch table and emit a backlog event if needed.
Counts pending rows per user, compares against each user's configured
threshold, and emits ``bridge_self_deferred_backlog`` for users that
just crossed up. Returns a small stats dict for logging.
"""
from sqlalchemy import func
from ..database.models import DeferredDispatch
engine = get_engine()
crossings = 0
async with AsyncSession(engine) as session:
# GROUP BY user_id so we don't have to scan once per user. Skip rows
# whose user_id is NULL — those are legacy / orphaned and have no
# bridge_self provider to alert anyway.
rows = (
await session.exec(
select(
DeferredDispatch.user_id,
func.count(DeferredDispatch.id),
)
.where(DeferredDispatch.status == "pending")
.where(DeferredDispatch.user_id.is_not(None))
.group_by(DeferredDispatch.user_id)
)
).all()
counts_by_user: dict[int, int] = {}
for row in rows:
if isinstance(row, tuple):
uid, count = row
else:
uid, count = row
if uid is None:
continue
counts_by_user[int(uid)] = int(count or 0)
for user_id, count in counts_by_user.items():
thresholds = await get_user_thresholds(user_id)
threshold = thresholds["deferred_backlog_threshold"]
above = count >= threshold
if record_backlog_state(user_id, above):
crossings += 1
await emit_bridge_self_event(
user_id=user_id,
failure_type="deferred_backlog",
subject_id=0,
subject_name="Deferred dispatch queue",
count=count,
threshold=threshold,
details={"pending": count},
)
# Reset latch for users that recovered (count < threshold or zero rows).
# Iterate all known users so a user whose backlog drained to 0 (no row in
# GROUP BY) still flips back to "below".
for user_id in list(_backlog_above_threshold.keys()):
if user_id in counts_by_user:
continue
# No pending rows for this user — clear the latch.
_backlog_above_threshold[user_id] = False
return {"users_scanned": len(counts_by_user), "crossings": crossings}
# ---------------------------------------------------------------------------
# Threshold-aware emission wrappers (used by watcher / dispatcher).
# ---------------------------------------------------------------------------
async def maybe_emit_poll_failure(
*, tracker_id: int, tracker_name: str, error: str = "",
) -> None:
"""Increment poll failure counter; emit + reset if threshold reached."""
count = record_poll_failure(tracker_id, error)
user_id = await find_tracker_owner(tracker_id)
if user_id is None:
return
thresholds = await get_user_thresholds(user_id)
threshold = thresholds["poll_failure_threshold"]
if count < threshold:
return
last_err = get_poll_last_error(tracker_id) or error
await emit_bridge_self_event(
user_id=user_id,
failure_type="poll_failures",
subject_id=tracker_id,
subject_name=tracker_name or f"tracker {tracker_id}",
count=count,
threshold=threshold,
last_error=last_err,
details={"tracker_id": tracker_id},
)
# Reset so the next emission requires another full streak. Without this
# the same tracker would fire on EVERY tick once it crosses the
# threshold, drowning the operator.
reset_poll_counter(tracker_id)
async def maybe_emit_target_failure(
*, target_id: int, target_name: str, target_type: str, error: str = "",
) -> None:
"""Increment target failure counter; emit + reset if threshold reached."""
count = record_target_failure(target_id, error)
user_id = await find_target_owner(target_id)
if user_id is None:
return
thresholds = await get_user_thresholds(user_id)
threshold = thresholds["target_failure_threshold"]
if count < threshold:
return
last_err = get_target_last_error(target_id) or error
await emit_bridge_self_event(
user_id=user_id,
failure_type="target_failures",
subject_id=target_id,
subject_name=target_name or f"target {target_id}",
count=count,
threshold=threshold,
last_error=last_err,
details={"target_id": target_id, "target_type": target_type},
)
reset_target_counter(target_id)