"""Tests for the bridge self-monitoring provider. Covers: 1. ``build_event`` parses a well-formed payload and rejects malformed ones. 2. The threshold-crossing helpers in ``services.bridge_self`` only emit on the actual crossing, not on every increment afterwards (anti-spam). 3. ``ensure_bridge_self_provider_for_user`` creates exactly one provider per user and is idempotent on re-run. 4. The capability registry exposes the new event/slot definitions. """ from __future__ import annotations from datetime import datetime, timezone import pytest from sqlmodel import SQLModel, select from sqlmodel.ext.asyncio.session import AsyncSession from sqlalchemy.ext.asyncio import create_async_engine # --------------------------------------------------------------------------- # Event parser # --------------------------------------------------------------------------- def test_build_event_well_formed_payload() -> None: from notify_bridge_core.providers.bridge_self.event_parser import build_event from notify_bridge_core.models.events import EventType from notify_bridge_core.providers.base import ServiceProviderType payload = { "failure_type": "poll_failures", "subject_id": 7, "subject_name": "My Tracker", "count": 3, "threshold": 3, "last_error": "Timeout", "details": {"tracker_id": 7}, } when = datetime(2026, 5, 16, 10, 0, tzinfo=timezone.utc) event = build_event(payload, timestamp=when) assert event is not None assert event.event_type == EventType.BRIDGE_SELF_POLL_FAILURES assert event.provider_type == ServiceProviderType.BRIDGE_SELF assert event.collection_id == "7" assert event.collection_name == "My Tracker" assert event.timestamp == when assert event.extra["count"] == 3 assert event.extra["threshold"] == 3 assert event.extra["last_error"] == "Timeout" assert event.extra["failure_type"] == "poll_failures" assert event.extra["details"] == {"tracker_id": 7} def test_build_event_unknown_failure_type_returns_none() -> None: from notify_bridge_core.providers.bridge_self.event_parser import build_event assert build_event({"failure_type": "rocket_launch"}) is None def test_build_event_non_dict_payload_returns_none() -> None: from notify_bridge_core.providers.bridge_self.event_parser import build_event assert build_event("not a dict") is None # type: ignore[arg-type] assert build_event(None) is None # type: ignore[arg-type] def test_build_event_clamps_long_error_messages() -> None: from notify_bridge_core.providers.bridge_self.event_parser import ( build_event, _MAX_ERROR_LEN, ) huge = "X" * (_MAX_ERROR_LEN * 5) event = build_event({ "failure_type": "target_failures", "subject_id": 1, "subject_name": "t", "count": 5, "threshold": 5, "last_error": huge, }) assert event is not None assert len(event.extra["last_error"]) <= _MAX_ERROR_LEN # --------------------------------------------------------------------------- # Threshold-crossing counters # --------------------------------------------------------------------------- def test_record_poll_failure_increments_then_success_resets() -> None: from notify_bridge_server.services import bridge_self as bs # Use a tracker_id we know is unique to this test to avoid pollution # across tests sharing the module-level dicts. tid = 9_001 bs.reset_poll_counter(tid) assert bs.record_poll_failure(tid, "boom") == 1 assert bs.record_poll_failure(tid, "boom") == 2 assert bs.record_poll_failure(tid, "boom") == 3 assert bs.get_poll_failure_count(tid) == 3 assert bs.get_poll_last_error(tid) == "boom" bs.record_poll_success(tid) assert bs.get_poll_failure_count(tid) == 0 assert bs.get_poll_last_error(tid) == "" def test_record_target_failure_increments_then_success_resets() -> None: from notify_bridge_server.services import bridge_self as bs tid = 9_101 bs.reset_target_counter(tid) assert bs.record_target_failure(tid, "503") == 1 assert bs.record_target_failure(tid, "503") == 2 assert bs.get_target_failure_count(tid) == 2 bs.record_target_success(tid) assert bs.get_target_failure_count(tid) == 0 def test_backlog_state_only_emits_on_crossing() -> None: """Only the False -> True transition should report a crossing. A sustained backlog must not re-fire on every scan, and a recovered backlog re-arms the latch so the next crossing is reported again. """ from notify_bridge_server.services import bridge_self as bs user_id = 9_201 # Reset latch by going through a False reading first. bs._backlog_above_threshold.pop(user_id, None) # Initial above-threshold reading IS a crossing (None -> True latch). assert bs.record_backlog_state(user_id, True) is True # Sustained above — no second alert. assert bs.record_backlog_state(user_id, True) is False assert bs.record_backlog_state(user_id, True) is False # Drop below — no alert (we don't notify on recovery). assert bs.record_backlog_state(user_id, False) is False # Cross again — alert. assert bs.record_backlog_state(user_id, True) is True # --------------------------------------------------------------------------- # ensure_bridge_self_provider_for_user — DB roundtrip # --------------------------------------------------------------------------- @pytest.fixture async def session() -> AsyncSession: """Fresh in-memory DB with the SQLModel schema applied.""" engine = create_async_engine("sqlite+aiosqlite:///:memory:") async with engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all) async with AsyncSession(engine) as session: yield session await engine.dispose() @pytest.mark.asyncio async def test_ensure_bridge_self_provider_creates_once(session: AsyncSession) -> None: from notify_bridge_server.database.models import ServiceProvider, User from notify_bridge_server.database.seeds import ( ensure_bridge_self_provider_for_user, ) # Create a real user. user = User(username="alice", hashed_password="x", role="user") session.add(user) await session.commit() await session.refresh(user) user_id = user.id p1 = await ensure_bridge_self_provider_for_user(session, user_id) assert p1 is not None p1_id = p1.id assert p1.type == "bridge_self" assert p1.user_id == user_id assert p1.config["poll_failure_threshold"] == 3 assert p1.config["deferred_backlog_threshold"] == 100 assert p1.config["target_failure_threshold"] == 5 await session.commit() # Idempotent: second call returns the same row, no duplicates. p2 = await ensure_bridge_self_provider_for_user(session, user_id) assert p2 is not None assert p2.id == p1_id await session.commit() rows = ( await session.exec( select(ServiceProvider).where( ServiceProvider.user_id == user_id, ServiceProvider.type == "bridge_self", ) ) ).all() assert len(rows) == 1 @pytest.mark.asyncio async def test_ensure_bridge_self_provider_skips_system_user(session: AsyncSession) -> None: """user_id <= 0 is the __system__ placeholder — never gets a provider.""" from notify_bridge_server.database.seeds import ( ensure_bridge_self_provider_for_user, ) result = await ensure_bridge_self_provider_for_user(session, 0) assert result is None # --------------------------------------------------------------------------- # Capability registry # --------------------------------------------------------------------------- def test_capability_registry_lists_bridge_self() -> None: from notify_bridge_core.providers.capabilities import ( get_capabilities, get_all_capabilities, ) caps = get_capabilities("bridge_self") assert caps is not None assert caps.provider_type == "bridge_self" assert caps.webhook_based is False event_names = {e["name"] for e in caps.events} assert event_names == { "bridge_self_poll_failures", "bridge_self_deferred_backlog", "bridge_self_target_failures", } slot_names = {s["name"] for s in caps.notification_slots} assert slot_names == { "message_bridge_self_poll_failures", "message_bridge_self_deferred_backlog", "message_bridge_self_target_failures", } # And it shows up in the global registry. assert "bridge_self" in get_all_capabilities() def test_default_template_loader_returns_bridge_self_slots() -> None: """All three bridge_self slots have shipped Jinja2 default templates.""" from notify_bridge_core.templates.defaults.loader import load_default_templates en = load_default_templates("en", "bridge_self") ru = load_default_templates("ru", "bridge_self") expected = { "message_bridge_self_poll_failures", "message_bridge_self_deferred_backlog", "message_bridge_self_target_failures", } assert set(en.keys()) == expected assert set(ru.keys()) == expected # Sanity: each template references at least one of the bridge_self vars. for tpl in list(en.values()) + list(ru.values()): assert "{{" in tpl # --------------------------------------------------------------------------- # Bot commands — context builders # # These tests run against the real (temp-dir) DB via the FastAPI lifespan so # that ``services.bridge_self`` helpers using ``get_engine()`` resolve to the # same DB the test seeds rows into. We follow the pattern used by # test_webhook_status_handler.py — bootstrap once, seed under TestClient. # --------------------------------------------------------------------------- def _bootstrap_app(): """Bring up the app once so migrations run against the temp DB.""" from notify_bridge_server.main import app return app async def _seed_user_and_provider( *, username: str, config: dict[str, int], ) -> tuple[int, int]: """Create a fresh ``(user_id, provider_id)`` against the live engine. Uses two short-lived sessions to avoid SQLAlchemy auto-expiring the first-committed object once a second commit fires on the same session. """ from notify_bridge_server.database.engine import get_engine from notify_bridge_server.database.models import ServiceProvider, User engine = get_engine() async with AsyncSession(engine) as db: user = User( username=f"{username}_{datetime.now(timezone.utc).timestamp()}", hashed_password="x", role="user", ) db.add(user) await db.commit() await db.refresh(user) user_id = int(user.id) async with AsyncSession(engine) as db: provider = ServiceProvider( user_id=user_id, type="bridge_self", name="Bridge", config=dict(config), ) db.add(provider) await db.commit() await db.refresh(provider) provider_id = int(provider.id) return user_id, provider_id async def _load_provider(provider_id: int): from notify_bridge_server.database.engine import get_engine from notify_bridge_server.database.models import ServiceProvider engine = get_engine() async with AsyncSession(engine) as db: return await db.get(ServiceProvider, provider_id) def test_command_status_returns_empty_lists_when_no_failures(tmp_data_dir) -> None: # noqa: ARG001 import asyncio from fastapi.testclient import TestClient from notify_bridge_server.commands.bridge_self_handler import ( _build_status_context, ) from notify_bridge_server.services import bridge_self as bs app = _bootstrap_app() with TestClient(app): async def run() -> None: _user_id, provider_id = await _seed_user_and_provider( username="status_user", config={ "poll_failure_threshold": 3, "deferred_backlog_threshold": 100, "target_failure_threshold": 5, }, ) provider = await _load_provider(provider_id) # Make sure the in-memory dicts contain nothing. bs._poll_failure_counts.clear() bs._target_failure_counts.clear() ctx = await _build_status_context(provider) assert ctx["poll_failures"] == [] assert ctx["target_failures"] == [] assert ctx["deferred_pending"] == 0 assert ctx["deferred_threshold"] == 100 asyncio.run(run()) def test_command_thresholds_returns_user_config(tmp_data_dir) -> None: # noqa: ARG001 import asyncio from fastapi.testclient import TestClient from notify_bridge_server.commands.bridge_self_handler import ( _build_thresholds_context, ) app = _bootstrap_app() with TestClient(app): async def run() -> None: _user_id, provider_id = await _seed_user_and_provider( username="thresholds_user", config={ "poll_failure_threshold": 7, "deferred_backlog_threshold": 250, "target_failure_threshold": 11, }, ) provider = await _load_provider(provider_id) ctx = await _build_thresholds_context(provider) assert ctx == { "poll_failure_threshold": 7, "deferred_backlog_threshold": 250, "target_failure_threshold": 11, } asyncio.run(run()) def test_command_reset_clears_named_counter_and_is_idempotent(tmp_data_dir) -> None: # noqa: ARG001 """``/reset`` clears the in-memory counter and is idempotent. Now ownership-aware: we seed a real NotificationTracker owned by the test user so ``find_tracker_owner`` returns a matching user_id and the reset proceeds. The cross-user-rejection case is covered by :func:`test_command_reset_rejects_cross_user_subject` below. """ import asyncio from fastapi.testclient import TestClient from notify_bridge_server.commands.bridge_self_handler import ( _build_reset_context, _parse_reset_subject, ) from notify_bridge_server.database.engine import get_engine from notify_bridge_server.database.models import NotificationTracker from notify_bridge_server.services import bridge_self as bs app = _bootstrap_app() with TestClient(app): async def run() -> None: user_id, provider_id = await _seed_user_and_provider( username="reset_user", config={ "poll_failure_threshold": 3, "deferred_backlog_threshold": 100, "target_failure_threshold": 5, }, ) provider = await _load_provider(provider_id) # Seed an owned NotificationTracker so the ownership check # in _build_reset_context can match it back to user_id. engine = get_engine() async with AsyncSession(engine) as db: tracker = NotificationTracker( user_id=user_id, provider_id=provider_id, name="reset-test", enabled=True, ) db.add(tracker) await db.commit() await db.refresh(tracker) tid = int(tracker.id) bs.reset_poll_counter(tid) bs.record_poll_failure(tid, "boom") bs.record_poll_failure(tid, "boom") assert bs.get_poll_failure_count(tid) == 2 ctx = await _build_reset_context(f"tracker:{tid}", provider) assert ctx["success"] is True assert ctx["subject_type"] == "tracker" assert ctx["subject_id"] == tid assert ctx["previous_count"] == 2 assert ctx["error_message"] is None assert bs.get_poll_failure_count(tid) == 0 # Idempotent — second call still succeeds with previous=0. ctx2 = await _build_reset_context(f"tracker:{tid}", provider) assert ctx2["success"] is True assert ctx2["previous_count"] == 0 # Parse error → templated error, no exception. bad_ctx = await _build_reset_context("not a subject", provider) assert bad_ctx["success"] is False assert bad_ctx["error_message"] asyncio.run(run()) # Parser direct sanity-checks (pure function, no DB needed). assert _parse_reset_subject("all") == ("all", None, None) assert _parse_reset_subject("tracker:42") == ("tracker", 42, None) assert _parse_reset_subject("target:7") == ("target", 7, None) _, _, err = _parse_reset_subject("rocket:1") assert err is not None def test_command_reset_rejects_cross_user_subject(tmp_data_dir) -> None: # noqa: ARG001 """User A cannot reset a counter belonging to user B's tracker. Regression guard for the multi-tenant data-leak hole the original handler had — ``reset_counter`` was called without verifying the subject's ``user_id`` matched ``provider.user_id``. """ import asyncio from fastapi.testclient import TestClient from notify_bridge_server.commands.bridge_self_handler import _build_reset_context from notify_bridge_server.database.engine import get_engine from notify_bridge_server.database.models import NotificationTracker from notify_bridge_server.services import bridge_self as bs app = _bootstrap_app() with TestClient(app): async def run() -> None: user_a_id, provider_a_id = await _seed_user_and_provider( username="user_a", config={ "poll_failure_threshold": 3, "deferred_backlog_threshold": 100, "target_failure_threshold": 5, }, ) user_b_id, provider_b_id = await _seed_user_and_provider( username="user_b", config={ "poll_failure_threshold": 3, "deferred_backlog_threshold": 100, "target_failure_threshold": 5, }, ) provider_a = await _load_provider(provider_a_id) # Seed a tracker owned by user B and increment its counter. engine = get_engine() async with AsyncSession(engine) as db: tracker_b = NotificationTracker( user_id=user_b_id, provider_id=provider_b_id, name="b-only", enabled=True, ) db.add(tracker_b) await db.commit() await db.refresh(tracker_b) tid_b = int(tracker_b.id) bs.reset_poll_counter(tid_b) bs.record_poll_failure(tid_b, "boom") assert bs.get_poll_failure_count(tid_b) == 1 # User A tries to reset user B's tracker — must fail. ctx = await _build_reset_context(f"tracker:{tid_b}", provider_a) assert ctx["success"] is False assert "not found" in (ctx["error_message"] or "").lower() # Counter must remain untouched. assert bs.get_poll_failure_count(tid_b) == 1 asyncio.run(run()) def test_command_health_is_healthy_when_counters_zero(tmp_data_dir) -> None: # noqa: ARG001 import asyncio from fastapi.testclient import TestClient from notify_bridge_server.commands.bridge_self_handler import ( _build_health_context, ) from notify_bridge_server.services import bridge_self as bs app = _bootstrap_app() with TestClient(app): async def run() -> None: _user_id, provider_id = await _seed_user_and_provider( username="health_user", config={ "poll_failure_threshold": 3, "deferred_backlog_threshold": 100, "target_failure_threshold": 5, }, ) provider = await _load_provider(provider_id) # Empty counters and no deferred rows for this user. bs._poll_failure_counts.clear() bs._target_failure_counts.clear() ctx = await _build_health_context(provider) assert ctx["healthy"] is True assert ctx["failing_tracker_count"] == 0 assert ctx["failing_target_count"] == 0 assert ctx["deferred_pending"] == 0 assert "healthy" in ctx["summary"].lower() asyncio.run(run()) # --------------------------------------------------------------------------- # reset_counter direct unit test (covers the "all" branch) # --------------------------------------------------------------------------- def test_reset_counter_all_clears_every_dict() -> None: from notify_bridge_server.services import bridge_self as bs # Seed both dicts with a couple of entries. bs._poll_failure_counts.clear() bs._target_failure_counts.clear() bs.record_poll_failure(9_401, "boom") bs.record_poll_failure(9_402, "boom") bs.record_target_failure(9_501, "503") cleared = bs.reset_counter("all") # 2 poll + 1 target = 3 entries cleared. assert cleared == 3 assert bs._poll_failure_counts == {} assert bs._target_failure_counts == {} def test_reset_counter_unknown_failure_type_is_noop() -> None: from notify_bridge_server.services import bridge_self as bs bs._poll_failure_counts.clear() bs.record_poll_failure(9_601, "boom") # Unknown type returns 0 and leaves dicts intact. assert bs.reset_counter("rocket_failures", 9_601) == 0 assert bs.get_poll_failure_count(9_601) == 1 # --------------------------------------------------------------------------- # Capability / handler registration # --------------------------------------------------------------------------- def test_capability_registry_lists_bridge_self_commands() -> None: from notify_bridge_core.providers.capabilities import get_capabilities caps = get_capabilities("bridge_self") assert caps is not None cmd_names = {c["name"] for c in caps.commands} assert {"status", "thresholds", "reset", "health", "help"}.issubset(cmd_names) slot_names = {s["name"] for s in caps.command_slots} # Response slots assert {"status", "thresholds", "reset", "health"}.issubset(slot_names) # Description slots — needed so the menu sync registers a description # for every operator-facing command. assert { "desc_status", "desc_thresholds", "desc_reset", "desc_health", }.issubset(slot_names) def test_command_handler_registered_for_bridge_self() -> None: """Auto-registration wires the bridge_self handler into dispatch.""" from notify_bridge_server.commands.dispatch import get_handler handler = get_handler("bridge_self") assert handler is not None assert handler.provider_type == "bridge_self" assert {"status", "thresholds", "reset", "health"} == handler.get_provider_commands() def test_default_command_template_loader_returns_bridge_self_slots() -> None: """All shipped command-template slots load for both locales.""" from notify_bridge_core.templates.command_defaults.loader import ( load_default_command_templates, ) en = load_default_command_templates("en", "bridge_self") ru = load_default_command_templates("ru", "bridge_self") required = {"status", "thresholds", "reset", "health", "help", "start"} assert required.issubset(en.keys()) assert required.issubset(ru.keys())