feat(server): add /status command handler for webhook providers
The generic-webhook provider has no upstream API, so /status reports DB-derived stats: active/total trackers, provider name, and last event timestamp (formatted via the shared get_last_event_str helper). Includes pytest coverage for handler registration, populated stats with a recent event, the empty-state dash sentinel, and unknown-command fall-through. Template variable docs in command_template_configs.py extended with the new trackers_active/trackers_total keys.
This commit is contained in:
@@ -0,0 +1,195 @@
|
||||
"""Tests for the generic-webhook ``/status`` command handler.
|
||||
|
||||
The webhook provider has no upstream API, so ``/status`` is built from
|
||||
local DB stats:
|
||||
|
||||
* ``trackers_active`` — count of enabled ``NotificationTracker`` rows
|
||||
* ``trackers_total`` — count of all ``NotificationTracker`` rows
|
||||
* ``last_event`` — formatted timestamp of the most recent ``EventLog`` row
|
||||
tied to one of the provider's trackers, or ``-`` when there are none
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
|
||||
_STATUS_TEMPLATE_EN = (
|
||||
"Webhook Status\n"
|
||||
"Trackers active: {{ trackers_active }}/{{ trackers_total }}\n"
|
||||
"Provider: {{ provider_name }}\n"
|
||||
"Last event: {{ last_event }}"
|
||||
)
|
||||
|
||||
|
||||
def _bootstrap_app():
|
||||
"""Bring up the app once so migrations run against the temp DB."""
|
||||
from notify_bridge_server.main import app
|
||||
|
||||
return app
|
||||
|
||||
|
||||
async def _seed_user() -> int:
|
||||
from notify_bridge_server.database.engine import get_engine
|
||||
from notify_bridge_server.database.models import User
|
||||
|
||||
engine = get_engine()
|
||||
async with AsyncSession(engine) as session:
|
||||
user = User(username=f"u_{datetime.now(timezone.utc).timestamp()}", hashed_password="x")
|
||||
session.add(user)
|
||||
await session.commit()
|
||||
await session.refresh(user)
|
||||
return user.id
|
||||
|
||||
|
||||
async def _seed_provider(user_id: int, name: str = "WH") -> int:
|
||||
from notify_bridge_server.database.engine import get_engine
|
||||
from notify_bridge_server.database.models import ServiceProvider
|
||||
|
||||
engine = get_engine()
|
||||
async with AsyncSession(engine) as session:
|
||||
prov = ServiceProvider(user_id=user_id, type="webhook", name=name, config={})
|
||||
session.add(prov)
|
||||
await session.commit()
|
||||
await session.refresh(prov)
|
||||
return prov.id
|
||||
|
||||
|
||||
async def _seed_tracker(user_id: int, provider_id: int, name: str, *, enabled: bool) -> int:
|
||||
from notify_bridge_server.database.engine import get_engine
|
||||
from notify_bridge_server.database.models import NotificationTracker
|
||||
|
||||
engine = get_engine()
|
||||
async with AsyncSession(engine) as session:
|
||||
tr = NotificationTracker(
|
||||
user_id=user_id, provider_id=provider_id, name=name, enabled=enabled,
|
||||
)
|
||||
session.add(tr)
|
||||
await session.commit()
|
||||
await session.refresh(tr)
|
||||
return tr.id
|
||||
|
||||
|
||||
async def _seed_event(tracker_id: int, when: datetime) -> None:
|
||||
from notify_bridge_server.database.engine import get_engine
|
||||
from notify_bridge_server.database.models import EventLog
|
||||
|
||||
engine = get_engine()
|
||||
async with AsyncSession(engine) as session:
|
||||
session.add(EventLog(
|
||||
tracker_id=tracker_id,
|
||||
tracker_name="webhook-tr",
|
||||
event_type="webhook_received",
|
||||
collection_id="",
|
||||
collection_name="",
|
||||
created_at=when,
|
||||
))
|
||||
await session.commit()
|
||||
|
||||
|
||||
async def _load_provider(provider_id: int):
|
||||
from notify_bridge_server.database.engine import get_engine
|
||||
from notify_bridge_server.database.models import ServiceProvider
|
||||
|
||||
engine = get_engine()
|
||||
async with AsyncSession(engine) as session:
|
||||
return await session.get(ServiceProvider, provider_id)
|
||||
|
||||
|
||||
def test_dispatch_registers_webhook_handler(tmp_data_dir) -> None: # noqa: ARG001
|
||||
"""Auto-registration must wire the webhook handler to provider type 'webhook'."""
|
||||
from notify_bridge_server.commands.dispatch import get_handler
|
||||
|
||||
handler = get_handler("webhook")
|
||||
assert handler is not None, "WebhookCommandHandler must be registered"
|
||||
assert handler.provider_type == "webhook"
|
||||
assert "status" in handler.get_provider_commands()
|
||||
|
||||
|
||||
def test_status_renders_active_total_and_last_event(tmp_data_dir) -> None: # noqa: ARG001
|
||||
"""``/status`` returns active/total counts and formatted last-event timestamp."""
|
||||
from notify_bridge_server.commands.webhook_handler import WebhookCommandHandler
|
||||
|
||||
app = _bootstrap_app()
|
||||
with TestClient(app):
|
||||
async def run() -> None:
|
||||
user_id = await _seed_user()
|
||||
provider_id = await _seed_provider(user_id, "WH1")
|
||||
enabled_id = await _seed_tracker(user_id, provider_id, "tr-on", enabled=True)
|
||||
await _seed_tracker(user_id, provider_id, "tr-off", enabled=False)
|
||||
event_at = datetime(2026, 5, 1, 12, 34, tzinfo=timezone.utc)
|
||||
await _seed_event(enabled_id, event_at)
|
||||
|
||||
provider = await _load_provider(provider_id)
|
||||
handler = WebhookCommandHandler()
|
||||
templates = {"status": {"en": _STATUS_TEMPLATE_EN}}
|
||||
response = await handler.handle(
|
||||
cmd="status", args="", count=5, locale="en",
|
||||
response_mode="text", provider=provider,
|
||||
cmd_templates=templates,
|
||||
bot=None, tracker=None, config=None,
|
||||
)
|
||||
|
||||
assert response is not None
|
||||
assert response.text is not None
|
||||
text = response.text
|
||||
assert "Trackers active: 1/2" in text
|
||||
assert "Provider: WH1" in text
|
||||
assert "2026-05-01 12:34" in text
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
|
||||
def test_status_with_no_events_shows_dash(tmp_data_dir) -> None: # noqa: ARG001
|
||||
"""Zero events → ``last_event`` renders as '-' (the get_last_event_str sentinel)."""
|
||||
from notify_bridge_server.commands.webhook_handler import WebhookCommandHandler
|
||||
|
||||
app = _bootstrap_app()
|
||||
with TestClient(app):
|
||||
async def run() -> None:
|
||||
user_id = await _seed_user()
|
||||
provider_id = await _seed_provider(user_id, "WH-empty")
|
||||
provider = await _load_provider(provider_id)
|
||||
|
||||
handler = WebhookCommandHandler()
|
||||
templates = {"status": {"en": _STATUS_TEMPLATE_EN}}
|
||||
response = await handler.handle(
|
||||
cmd="status", args="", count=5, locale="en",
|
||||
response_mode="text", provider=provider,
|
||||
cmd_templates=templates,
|
||||
bot=None, tracker=None, config=None,
|
||||
)
|
||||
|
||||
assert response is not None
|
||||
assert "Trackers active: 0/0" in response.text
|
||||
assert "Last event: -" in response.text
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
|
||||
def test_status_returns_none_for_unknown_command(tmp_data_dir) -> None: # noqa: ARG001
|
||||
"""Commands the webhook handler doesn't own must return None (lets dispatch fall through)."""
|
||||
from notify_bridge_server.commands.webhook_handler import WebhookCommandHandler
|
||||
|
||||
app = _bootstrap_app()
|
||||
with TestClient(app):
|
||||
async def run() -> None:
|
||||
user_id = await _seed_user()
|
||||
provider_id = await _seed_provider(user_id, "WH-noop")
|
||||
provider = await _load_provider(provider_id)
|
||||
|
||||
handler = WebhookCommandHandler()
|
||||
response = await handler.handle(
|
||||
cmd="albums", args="", count=5, locale="en",
|
||||
response_mode="text", provider=provider,
|
||||
cmd_templates={},
|
||||
bot=None, tracker=None, config=None,
|
||||
)
|
||||
assert response is None
|
||||
|
||||
asyncio.run(run())
|
||||
Reference in New Issue
Block a user