"""Generic webhook provider bot command handler. The generic webhook provider has no upstream API to query — its only runtime signal is the stream of incoming webhook payloads recorded as ``EventLog`` rows. ``/status`` therefore reports DB-derived stats: * ``trackers_active`` — enabled ``NotificationTracker`` rows for the provider * ``trackers_total`` — all ``NotificationTracker`` rows for the provider * ``provider_name`` — the provider's display name * ``last_event`` — formatted timestamp of the most recent received event, or ``-`` if nothing has been received yet """ from __future__ import annotations from typing import Any from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from ..database.engine import get_engine from ..database.models import ( CommandConfig, CommandTracker, CommandTrackerListener, NotificationTracker, ServiceProvider, TelegramBot, ) from .base import CommandResponse, ProviderCommandHandler from .command_utils import get_last_event_str from .handler import _render_cmd_template _WEBHOOK_COMMANDS = {"status"} async def _cmd_status(provider: ServiceProvider) -> dict[str, Any]: """Build the context for ``/status`` on a webhook provider.""" engine = get_engine() async with AsyncSession(engine) as session: result = await session.exec( select(NotificationTracker).where( NotificationTracker.provider_id == provider.id ) ) trackers = list(result.all()) active = [t for t in trackers if t.enabled] last_str = await get_last_event_str([t.id for t in active]) return { "trackers_active": len(active), "trackers_total": len(trackers), "provider_name": provider.name or "", "last_event": last_str, } class WebhookCommandHandler(ProviderCommandHandler): """Handles ``/status`` for generic-webhook providers.""" provider_type = "webhook" def get_provider_commands(self) -> set[str]: return _WEBHOOK_COMMANDS async def handle( self, cmd: str, args: str, count: int, locale: str, response_mode: str, provider: ServiceProvider, cmd_templates: dict[str, dict[str, str]], bot: TelegramBot, tracker: CommandTracker, config: CommandConfig, *, listener: CommandTrackerListener | None = None, allowed_album_ids: set[str] | None = None, # noqa: ARG002 — webhook has no album scope page: int = 1, ) -> CommandResponse | None: if cmd != "status": return None ctx = await _cmd_status(provider) return CommandResponse( text=_render_cmd_template(cmd_templates, "status", locale, ctx), )