Files
notify-bridge/packages/server/tests/test_bridge_self.py
T
alexei.dolgolyov 8651767112 feat: bridge_self bot commands — status, thresholds, reset, health
Adds bot commands for the bridge_self provider so operators can inspect
and manage bridge health from chat: /status, /thresholds, /reset, /health.
Includes Jinja2 templates for both locales, seed data, capability slots,
and a handler that exposes pending deferred backlog plus per-counter
reset. Also adds .claude/skills/ for project-scoped graph-aware skills.
2026-05-16 03:43:48 +03:00

649 lines
23 KiB
Python

"""Tests for the bridge self-monitoring provider.
Covers:
1. ``build_event`` parses a well-formed payload and rejects malformed ones.
2. The threshold-crossing helpers in ``services.bridge_self`` only emit on
the actual crossing, not on every increment afterwards (anti-spam).
3. ``ensure_bridge_self_provider_for_user`` creates exactly one provider
per user and is idempotent on re-run.
4. The capability registry exposes the new event/slot definitions.
"""
from __future__ import annotations
from datetime import datetime, timezone
import pytest
from sqlmodel import SQLModel, select
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
# ---------------------------------------------------------------------------
# Event parser
# ---------------------------------------------------------------------------
def test_build_event_well_formed_payload() -> None:
from notify_bridge_core.providers.bridge_self.event_parser import build_event
from notify_bridge_core.models.events import EventType
from notify_bridge_core.providers.base import ServiceProviderType
payload = {
"failure_type": "poll_failures",
"subject_id": 7,
"subject_name": "My Tracker",
"count": 3,
"threshold": 3,
"last_error": "Timeout",
"details": {"tracker_id": 7},
}
when = datetime(2026, 5, 16, 10, 0, tzinfo=timezone.utc)
event = build_event(payload, timestamp=when)
assert event is not None
assert event.event_type == EventType.BRIDGE_SELF_POLL_FAILURES
assert event.provider_type == ServiceProviderType.BRIDGE_SELF
assert event.collection_id == "7"
assert event.collection_name == "My Tracker"
assert event.timestamp == when
assert event.extra["count"] == 3
assert event.extra["threshold"] == 3
assert event.extra["last_error"] == "Timeout"
assert event.extra["failure_type"] == "poll_failures"
assert event.extra["details"] == {"tracker_id": 7}
def test_build_event_unknown_failure_type_returns_none() -> None:
from notify_bridge_core.providers.bridge_self.event_parser import build_event
assert build_event({"failure_type": "rocket_launch"}) is None
def test_build_event_non_dict_payload_returns_none() -> None:
from notify_bridge_core.providers.bridge_self.event_parser import build_event
assert build_event("not a dict") is None # type: ignore[arg-type]
assert build_event(None) is None # type: ignore[arg-type]
def test_build_event_clamps_long_error_messages() -> None:
from notify_bridge_core.providers.bridge_self.event_parser import (
build_event, _MAX_ERROR_LEN,
)
huge = "X" * (_MAX_ERROR_LEN * 5)
event = build_event({
"failure_type": "target_failures",
"subject_id": 1,
"subject_name": "t",
"count": 5,
"threshold": 5,
"last_error": huge,
})
assert event is not None
assert len(event.extra["last_error"]) <= _MAX_ERROR_LEN
# ---------------------------------------------------------------------------
# Threshold-crossing counters
# ---------------------------------------------------------------------------
def test_record_poll_failure_increments_then_success_resets() -> None:
from notify_bridge_server.services import bridge_self as bs
# Use a tracker_id we know is unique to this test to avoid pollution
# across tests sharing the module-level dicts.
tid = 9_001
bs.reset_poll_counter(tid)
assert bs.record_poll_failure(tid, "boom") == 1
assert bs.record_poll_failure(tid, "boom") == 2
assert bs.record_poll_failure(tid, "boom") == 3
assert bs.get_poll_failure_count(tid) == 3
assert bs.get_poll_last_error(tid) == "boom"
bs.record_poll_success(tid)
assert bs.get_poll_failure_count(tid) == 0
assert bs.get_poll_last_error(tid) == ""
def test_record_target_failure_increments_then_success_resets() -> None:
from notify_bridge_server.services import bridge_self as bs
tid = 9_101
bs.reset_target_counter(tid)
assert bs.record_target_failure(tid, "503") == 1
assert bs.record_target_failure(tid, "503") == 2
assert bs.get_target_failure_count(tid) == 2
bs.record_target_success(tid)
assert bs.get_target_failure_count(tid) == 0
def test_backlog_state_only_emits_on_crossing() -> None:
"""Only the False -> True transition should report a crossing.
A sustained backlog must not re-fire on every scan, and a recovered
backlog re-arms the latch so the next crossing is reported again.
"""
from notify_bridge_server.services import bridge_self as bs
user_id = 9_201
# Reset latch by going through a False reading first.
bs._backlog_above_threshold.pop(user_id, None)
# Initial above-threshold reading IS a crossing (None -> True latch).
assert bs.record_backlog_state(user_id, True) is True
# Sustained above — no second alert.
assert bs.record_backlog_state(user_id, True) is False
assert bs.record_backlog_state(user_id, True) is False
# Drop below — no alert (we don't notify on recovery).
assert bs.record_backlog_state(user_id, False) is False
# Cross again — alert.
assert bs.record_backlog_state(user_id, True) is True
# ---------------------------------------------------------------------------
# ensure_bridge_self_provider_for_user — DB roundtrip
# ---------------------------------------------------------------------------
@pytest.fixture
async def session() -> AsyncSession:
"""Fresh in-memory DB with the SQLModel schema applied."""
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
async with AsyncSession(engine) as session:
yield session
await engine.dispose()
@pytest.mark.asyncio
async def test_ensure_bridge_self_provider_creates_once(session: AsyncSession) -> None:
from notify_bridge_server.database.models import ServiceProvider, User
from notify_bridge_server.database.seeds import (
ensure_bridge_self_provider_for_user,
)
# Create a real user.
user = User(username="alice", hashed_password="x", role="user")
session.add(user)
await session.commit()
await session.refresh(user)
user_id = user.id
p1 = await ensure_bridge_self_provider_for_user(session, user_id)
assert p1 is not None
p1_id = p1.id
assert p1.type == "bridge_self"
assert p1.user_id == user_id
assert p1.config["poll_failure_threshold"] == 3
assert p1.config["deferred_backlog_threshold"] == 100
assert p1.config["target_failure_threshold"] == 5
await session.commit()
# Idempotent: second call returns the same row, no duplicates.
p2 = await ensure_bridge_self_provider_for_user(session, user_id)
assert p2 is not None
assert p2.id == p1_id
await session.commit()
rows = (
await session.exec(
select(ServiceProvider).where(
ServiceProvider.user_id == user_id,
ServiceProvider.type == "bridge_self",
)
)
).all()
assert len(rows) == 1
@pytest.mark.asyncio
async def test_ensure_bridge_self_provider_skips_system_user(session: AsyncSession) -> None:
"""user_id <= 0 is the __system__ placeholder — never gets a provider."""
from notify_bridge_server.database.seeds import (
ensure_bridge_self_provider_for_user,
)
result = await ensure_bridge_self_provider_for_user(session, 0)
assert result is None
# ---------------------------------------------------------------------------
# Capability registry
# ---------------------------------------------------------------------------
def test_capability_registry_lists_bridge_self() -> None:
from notify_bridge_core.providers.capabilities import (
get_capabilities, get_all_capabilities,
)
caps = get_capabilities("bridge_self")
assert caps is not None
assert caps.provider_type == "bridge_self"
assert caps.webhook_based is False
event_names = {e["name"] for e in caps.events}
assert event_names == {
"bridge_self_poll_failures",
"bridge_self_deferred_backlog",
"bridge_self_target_failures",
}
slot_names = {s["name"] for s in caps.notification_slots}
assert slot_names == {
"message_bridge_self_poll_failures",
"message_bridge_self_deferred_backlog",
"message_bridge_self_target_failures",
}
# And it shows up in the global registry.
assert "bridge_self" in get_all_capabilities()
def test_default_template_loader_returns_bridge_self_slots() -> None:
"""All three bridge_self slots have shipped Jinja2 default templates."""
from notify_bridge_core.templates.defaults.loader import load_default_templates
en = load_default_templates("en", "bridge_self")
ru = load_default_templates("ru", "bridge_self")
expected = {
"message_bridge_self_poll_failures",
"message_bridge_self_deferred_backlog",
"message_bridge_self_target_failures",
}
assert set(en.keys()) == expected
assert set(ru.keys()) == expected
# Sanity: each template references at least one of the bridge_self vars.
for tpl in list(en.values()) + list(ru.values()):
assert "{{" in tpl
# ---------------------------------------------------------------------------
# Bot commands — context builders
#
# These tests run against the real (temp-dir) DB via the FastAPI lifespan so
# that ``services.bridge_self`` helpers using ``get_engine()`` resolve to the
# same DB the test seeds rows into. We follow the pattern used by
# test_webhook_status_handler.py — bootstrap once, seed under TestClient.
# ---------------------------------------------------------------------------
def _bootstrap_app():
"""Bring up the app once so migrations run against the temp DB."""
from notify_bridge_server.main import app
return app
async def _seed_user_and_provider(
*, username: str, config: dict[str, int],
) -> tuple[int, int]:
"""Create a fresh ``(user_id, provider_id)`` against the live engine.
Uses two short-lived sessions to avoid SQLAlchemy auto-expiring the
first-committed object once a second commit fires on the same session.
"""
from notify_bridge_server.database.engine import get_engine
from notify_bridge_server.database.models import ServiceProvider, User
engine = get_engine()
async with AsyncSession(engine) as db:
user = User(
username=f"{username}_{datetime.now(timezone.utc).timestamp()}",
hashed_password="x", role="user",
)
db.add(user)
await db.commit()
await db.refresh(user)
user_id = int(user.id)
async with AsyncSession(engine) as db:
provider = ServiceProvider(
user_id=user_id, type="bridge_self", name="Bridge",
config=dict(config),
)
db.add(provider)
await db.commit()
await db.refresh(provider)
provider_id = int(provider.id)
return user_id, provider_id
async def _load_provider(provider_id: int):
from notify_bridge_server.database.engine import get_engine
from notify_bridge_server.database.models import ServiceProvider
engine = get_engine()
async with AsyncSession(engine) as db:
return await db.get(ServiceProvider, provider_id)
def test_command_status_returns_empty_lists_when_no_failures(tmp_data_dir) -> None: # noqa: ARG001
import asyncio
from fastapi.testclient import TestClient
from notify_bridge_server.commands.bridge_self_handler import (
_build_status_context,
)
from notify_bridge_server.services import bridge_self as bs
app = _bootstrap_app()
with TestClient(app):
async def run() -> None:
_user_id, provider_id = await _seed_user_and_provider(
username="status_user",
config={
"poll_failure_threshold": 3,
"deferred_backlog_threshold": 100,
"target_failure_threshold": 5,
},
)
provider = await _load_provider(provider_id)
# Make sure the in-memory dicts contain nothing.
bs._poll_failure_counts.clear()
bs._target_failure_counts.clear()
ctx = await _build_status_context(provider)
assert ctx["poll_failures"] == []
assert ctx["target_failures"] == []
assert ctx["deferred_pending"] == 0
assert ctx["deferred_threshold"] == 100
asyncio.run(run())
def test_command_thresholds_returns_user_config(tmp_data_dir) -> None: # noqa: ARG001
import asyncio
from fastapi.testclient import TestClient
from notify_bridge_server.commands.bridge_self_handler import (
_build_thresholds_context,
)
app = _bootstrap_app()
with TestClient(app):
async def run() -> None:
_user_id, provider_id = await _seed_user_and_provider(
username="thresholds_user",
config={
"poll_failure_threshold": 7,
"deferred_backlog_threshold": 250,
"target_failure_threshold": 11,
},
)
provider = await _load_provider(provider_id)
ctx = await _build_thresholds_context(provider)
assert ctx == {
"poll_failure_threshold": 7,
"deferred_backlog_threshold": 250,
"target_failure_threshold": 11,
}
asyncio.run(run())
def test_command_reset_clears_named_counter_and_is_idempotent(tmp_data_dir) -> None: # noqa: ARG001
"""``/reset`` clears the in-memory counter and is idempotent.
Now ownership-aware: we seed a real NotificationTracker owned by the
test user so ``find_tracker_owner`` returns a matching user_id and the
reset proceeds. The cross-user-rejection case is covered by
:func:`test_command_reset_rejects_cross_user_subject` below.
"""
import asyncio
from fastapi.testclient import TestClient
from notify_bridge_server.commands.bridge_self_handler import (
_build_reset_context, _parse_reset_subject,
)
from notify_bridge_server.database.engine import get_engine
from notify_bridge_server.database.models import NotificationTracker
from notify_bridge_server.services import bridge_self as bs
app = _bootstrap_app()
with TestClient(app):
async def run() -> None:
user_id, provider_id = await _seed_user_and_provider(
username="reset_user",
config={
"poll_failure_threshold": 3,
"deferred_backlog_threshold": 100,
"target_failure_threshold": 5,
},
)
provider = await _load_provider(provider_id)
# Seed an owned NotificationTracker so the ownership check
# in _build_reset_context can match it back to user_id.
engine = get_engine()
async with AsyncSession(engine) as db:
tracker = NotificationTracker(
user_id=user_id, provider_id=provider_id,
name="reset-test", enabled=True,
)
db.add(tracker)
await db.commit()
await db.refresh(tracker)
tid = int(tracker.id)
bs.reset_poll_counter(tid)
bs.record_poll_failure(tid, "boom")
bs.record_poll_failure(tid, "boom")
assert bs.get_poll_failure_count(tid) == 2
ctx = await _build_reset_context(f"tracker:{tid}", provider)
assert ctx["success"] is True
assert ctx["subject_type"] == "tracker"
assert ctx["subject_id"] == tid
assert ctx["previous_count"] == 2
assert ctx["error_message"] is None
assert bs.get_poll_failure_count(tid) == 0
# Idempotent — second call still succeeds with previous=0.
ctx2 = await _build_reset_context(f"tracker:{tid}", provider)
assert ctx2["success"] is True
assert ctx2["previous_count"] == 0
# Parse error → templated error, no exception.
bad_ctx = await _build_reset_context("not a subject", provider)
assert bad_ctx["success"] is False
assert bad_ctx["error_message"]
asyncio.run(run())
# Parser direct sanity-checks (pure function, no DB needed).
assert _parse_reset_subject("all") == ("all", None, None)
assert _parse_reset_subject("tracker:42") == ("tracker", 42, None)
assert _parse_reset_subject("target:7") == ("target", 7, None)
_, _, err = _parse_reset_subject("rocket:1")
assert err is not None
def test_command_reset_rejects_cross_user_subject(tmp_data_dir) -> None: # noqa: ARG001
"""User A cannot reset a counter belonging to user B's tracker.
Regression guard for the multi-tenant data-leak hole the original
handler had — ``reset_counter`` was called without verifying the
subject's ``user_id`` matched ``provider.user_id``.
"""
import asyncio
from fastapi.testclient import TestClient
from notify_bridge_server.commands.bridge_self_handler import _build_reset_context
from notify_bridge_server.database.engine import get_engine
from notify_bridge_server.database.models import NotificationTracker
from notify_bridge_server.services import bridge_self as bs
app = _bootstrap_app()
with TestClient(app):
async def run() -> None:
user_a_id, provider_a_id = await _seed_user_and_provider(
username="user_a", config={
"poll_failure_threshold": 3,
"deferred_backlog_threshold": 100,
"target_failure_threshold": 5,
},
)
user_b_id, provider_b_id = await _seed_user_and_provider(
username="user_b", config={
"poll_failure_threshold": 3,
"deferred_backlog_threshold": 100,
"target_failure_threshold": 5,
},
)
provider_a = await _load_provider(provider_a_id)
# Seed a tracker owned by user B and increment its counter.
engine = get_engine()
async with AsyncSession(engine) as db:
tracker_b = NotificationTracker(
user_id=user_b_id, provider_id=provider_b_id,
name="b-only", enabled=True,
)
db.add(tracker_b)
await db.commit()
await db.refresh(tracker_b)
tid_b = int(tracker_b.id)
bs.reset_poll_counter(tid_b)
bs.record_poll_failure(tid_b, "boom")
assert bs.get_poll_failure_count(tid_b) == 1
# User A tries to reset user B's tracker — must fail.
ctx = await _build_reset_context(f"tracker:{tid_b}", provider_a)
assert ctx["success"] is False
assert "not found" in (ctx["error_message"] or "").lower()
# Counter must remain untouched.
assert bs.get_poll_failure_count(tid_b) == 1
asyncio.run(run())
def test_command_health_is_healthy_when_counters_zero(tmp_data_dir) -> None: # noqa: ARG001
import asyncio
from fastapi.testclient import TestClient
from notify_bridge_server.commands.bridge_self_handler import (
_build_health_context,
)
from notify_bridge_server.services import bridge_self as bs
app = _bootstrap_app()
with TestClient(app):
async def run() -> None:
_user_id, provider_id = await _seed_user_and_provider(
username="health_user",
config={
"poll_failure_threshold": 3,
"deferred_backlog_threshold": 100,
"target_failure_threshold": 5,
},
)
provider = await _load_provider(provider_id)
# Empty counters and no deferred rows for this user.
bs._poll_failure_counts.clear()
bs._target_failure_counts.clear()
ctx = await _build_health_context(provider)
assert ctx["healthy"] is True
assert ctx["failing_tracker_count"] == 0
assert ctx["failing_target_count"] == 0
assert ctx["deferred_pending"] == 0
assert "healthy" in ctx["summary"].lower()
asyncio.run(run())
# ---------------------------------------------------------------------------
# reset_counter direct unit test (covers the "all" branch)
# ---------------------------------------------------------------------------
def test_reset_counter_all_clears_every_dict() -> None:
from notify_bridge_server.services import bridge_self as bs
# Seed both dicts with a couple of entries.
bs._poll_failure_counts.clear()
bs._target_failure_counts.clear()
bs.record_poll_failure(9_401, "boom")
bs.record_poll_failure(9_402, "boom")
bs.record_target_failure(9_501, "503")
cleared = bs.reset_counter("all")
# 2 poll + 1 target = 3 entries cleared.
assert cleared == 3
assert bs._poll_failure_counts == {}
assert bs._target_failure_counts == {}
def test_reset_counter_unknown_failure_type_is_noop() -> None:
from notify_bridge_server.services import bridge_self as bs
bs._poll_failure_counts.clear()
bs.record_poll_failure(9_601, "boom")
# Unknown type returns 0 and leaves dicts intact.
assert bs.reset_counter("rocket_failures", 9_601) == 0
assert bs.get_poll_failure_count(9_601) == 1
# ---------------------------------------------------------------------------
# Capability / handler registration
# ---------------------------------------------------------------------------
def test_capability_registry_lists_bridge_self_commands() -> None:
from notify_bridge_core.providers.capabilities import get_capabilities
caps = get_capabilities("bridge_self")
assert caps is not None
cmd_names = {c["name"] for c in caps.commands}
assert {"status", "thresholds", "reset", "health", "help"}.issubset(cmd_names)
slot_names = {s["name"] for s in caps.command_slots}
# Response slots
assert {"status", "thresholds", "reset", "health"}.issubset(slot_names)
# Description slots — needed so the menu sync registers a description
# for every operator-facing command.
assert {
"desc_status", "desc_thresholds", "desc_reset", "desc_health",
}.issubset(slot_names)
def test_command_handler_registered_for_bridge_self() -> None:
"""Auto-registration wires the bridge_self handler into dispatch."""
from notify_bridge_server.commands.dispatch import get_handler
handler = get_handler("bridge_self")
assert handler is not None
assert handler.provider_type == "bridge_self"
assert {"status", "thresholds", "reset", "health"} == handler.get_provider_commands()
def test_default_command_template_loader_returns_bridge_self_slots() -> None:
"""All shipped command-template slots load for both locales."""
from notify_bridge_core.templates.command_defaults.loader import (
load_default_command_templates,
)
en = load_default_command_templates("en", "bridge_self")
ru = load_default_command_templates("ru", "bridge_self")
required = {"status", "thresholds", "reset", "health", "help", "start"}
assert required.issubset(en.keys())
assert required.issubset(ru.keys())