feat: log bot command invocations to the event stream

Bot commands were the only user-initiated path that didn't surface in
the dashboard. They now produce ``command_handled`` /
``command_rate_limited`` / ``command_failed`` rows in ``EventLog``
alongside tracker and action events.

Backend
- ``EventLog`` gains nullable ``command_tracker_id`` / ``telegram_bot_id``
  FKs plus deletion-snapshot name columns (idempotent migration).
- New ``_log_command_event`` helper emits one row per invocation at the
  three branches in ``handle_command``. Logging failures are swallowed
  so they cannot block the user-visible reply.
- Telegram ``from`` is captured in poller and webhook, whitelisted to
  identity fields by ``_normalize_issuer`` (drops ``language_code`` and
  any future PII), persisted under ``details.issuer``.
- ``/api/status`` resolves live ``CommandTracker`` / ``TelegramBot``
  names (mirroring the action pattern) and exposes ``tracker_id``,
  ``command_tracker_id``, ``telegram_bot_id`` so the frontend can
  deep-link.

Frontend
- Event rows are now clickable and open a detail modal with full
  provenance (bot → chat → issuer → provider), raw ``details`` JSON,
  and per-entity action buttons.
- Buttons use the existing ``requestHighlight`` + ``goto`` crosslink
  pattern, so clicking lands on the entity's list page with that
  specific card scrolled into view and pulsing.
- Auto-refresh dropdown (Off / 10s / 30s / 1m / 5m) persisted in
  ``localStorage``; ticker pauses while the tab is hidden.
- Event-type filter, dashboard verb labels, and gradients extended for
  the three new ``command_*`` types.
- Filled in pre-existing missing i18n keys (``common.hide`` /
  ``common.show`` / ``commandConfig.noCommandsForProvider``).

Tests
- New ``test_command_event_logging.py`` covers subject formatting,
  issuer normalization, the three event branches, and graceful failure
  when the DB is unreachable. ``pytest packages/server/tests/`` → 96/96.
This commit is contained in:
2026-05-07 22:22:41 +03:00
parent 632e4c1aa3
commit 35a3008896
13 changed files with 952 additions and 50 deletions
@@ -118,6 +118,31 @@ async def get_status(
)).all()
action_name_map = {aid: aname for aid, aname in action_rows}
# Live-resolve command tracker and bot names for command_* events
# (mirrors the action/tracker pattern above). Falls back to the
# snapshot stored on the EventLog when the entity has been deleted.
cmd_tracker_ids = {
e.command_tracker_id for e in event_rows if e.command_tracker_id is not None
}
cmd_tracker_name_map: dict[int, str] = {}
if cmd_tracker_ids:
cmd_tracker_rows = (await session.exec(
select(CommandTracker.id, CommandTracker.name).where(
CommandTracker.id.in_(cmd_tracker_ids)
)
)).all()
cmd_tracker_name_map = {tid: tname for tid, tname in cmd_tracker_rows}
bot_ids = {
e.telegram_bot_id for e in event_rows if e.telegram_bot_id is not None
}
bot_name_map: dict[int, str] = {}
if bot_ids:
bot_rows = (await session.exec(
select(TelegramBot.id, TelegramBot.name).where(TelegramBot.id.in_(bot_ids))
)).all()
bot_name_map = {bid: bname for bid, bname in bot_rows}
def _display_tracker_name(e: EventLog) -> str:
if e.tracker_id is not None and e.tracker_id in tracker_name_map:
return tracker_name_map[e.tracker_id]
@@ -135,11 +160,30 @@ async def get_status(
return f"(deleted) {e.action_name}"
return ""
def _display_command_tracker_name(e: EventLog) -> str:
if (
e.command_tracker_id is not None
and e.command_tracker_id in cmd_tracker_name_map
):
return cmd_tracker_name_map[e.command_tracker_id]
if e.command_tracker_name:
return f"(deleted) {e.command_tracker_name}"
return ""
def _display_bot_name(e: EventLog) -> str:
if e.telegram_bot_id is not None and e.telegram_bot_id in bot_name_map:
return bot_name_map[e.telegram_bot_id]
if e.bot_name:
return f"(deleted) {e.bot_name}"
return ""
def _display_subject(e: EventLog) -> str:
"""The primary label shown on the event row.
For action events the ``collection_name`` stores the action name;
use the live-resolved action name when available so renames show.
For command events the ``collection_name`` already stores the
rendered ``/cmd args`` string so we just pass it through.
"""
if e.action_id is not None or (e.event_type or "").startswith("action_"):
return _display_action_name(e) or e.collection_name
@@ -155,9 +199,14 @@ async def get_status(
"id": e.id,
"event_type": e.event_type,
"collection_name": _display_subject(e),
"tracker_id": e.tracker_id,
"tracker_name": _display_tracker_name(e),
"action_id": e.action_id,
"action_name": _display_action_name(e),
"command_tracker_id": e.command_tracker_id,
"command_tracker_name": _display_command_tracker_name(e),
"telegram_bot_id": e.telegram_bot_id,
"bot_name": _display_bot_name(e),
"provider_name": _display_provider_name(e),
"provider_id": e.provider_id,
"assets_count": e.assets_count or 0,
@@ -262,6 +262,101 @@ def _merge_enabled_commands(
return sorted(enabled), merged_limits
# ---------------------------------------------------------------------------
# Event logging
# ---------------------------------------------------------------------------
def _format_command_subject(cmd: str, args: str) -> str:
"""Render the dashboard ``collection_name`` for a command event."""
args = (args or "").strip()
return f"/{cmd} {args}".rstrip() if args else f"/{cmd}"
def _normalize_issuer(issuer: dict[str, Any] | None) -> dict[str, Any] | None:
"""Strip a Telegram ``from`` payload to the fields the dashboard needs.
Telegram's ``from`` carries plenty we don't want to persist (premium
badge, language code already captured elsewhere, etc.). Keep just
the identity bits and drop anything else so future Telegram changes
can't accidentally start logging extra PII.
"""
if not issuer:
return None
keep = ("id", "username", "first_name", "last_name", "is_bot")
out = {k: issuer[k] for k in keep if k in issuer and issuer[k] not in (None, "")}
return out or None
async def _log_command_event(
*,
bot: TelegramBot,
chat_id: str,
cmd: str,
args: str,
locale: str,
event_type: str,
responses: list[CommandResponse],
ctx_tuples: list[
tuple[CommandTracker, CommandConfig, ServiceProvider, CommandTrackerListener]
],
extra_details: dict[str, Any] | None = None,
issuer: dict[str, Any] | None = None,
) -> None:
"""Persist a single ``EventLog`` row for a bot-command invocation.
One row per user invocation. Per-tracker breakdown lives in ``details``
(``tracker_count`` / ``responses_count``). Best-effort: a logging
failure must never block the user-visible reply, so we swallow.
"""
try:
first_tracker: CommandTracker | None = None
first_provider: ServiceProvider | None = None
if ctx_tuples:
first_tracker, _, first_provider, _ = ctx_tuples[0]
media_total = sum(len(r.media or []) for r in responses)
details: dict[str, Any] = {
"command": cmd,
"args": args or "",
"chat_id": chat_id,
"locale": locale,
"tracker_count": len(ctx_tuples),
"responses_count": len(responses),
}
normalized_issuer = _normalize_issuer(issuer)
if normalized_issuer:
details["issuer"] = normalized_issuer
if extra_details:
details.update(extra_details)
engine = get_engine()
async with AsyncSession(engine) as session:
session.add(EventLog(
user_id=bot.user_id,
tracker_id=None,
tracker_name="",
action_id=None,
action_name="",
command_tracker_id=first_tracker.id if first_tracker else None,
command_tracker_name=first_tracker.name if first_tracker else "",
telegram_bot_id=bot.id,
bot_name=bot.name or "",
provider_id=first_provider.id if first_provider else None,
provider_name=(first_provider.name if first_provider else "") or "",
event_type=event_type,
collection_id=str(chat_id),
collection_name=_format_command_subject(cmd, args),
assets_count=media_total,
details=details,
))
await session.commit()
except Exception: # noqa: BLE001 — diagnostic only, never block reply
_LOGGER.exception(
"Failed to log command event bot=%d chat=%s cmd=/%s",
bot.id, chat_id, cmd,
)
# ---------------------------------------------------------------------------
# Main dispatcher
# ---------------------------------------------------------------------------
@@ -271,12 +366,18 @@ async def handle_command(
chat_id: str,
text: str,
language_code: str = "",
*,
issuer: dict[str, Any] | None = None,
) -> list[CommandResponse] | None:
"""Handle a bot command. Routes to provider-specific handlers.
Returns a list of CommandResponse objects (one per tracker), or None.
Universal commands (/start, /help) return a single-element list.
Provider-specific commands dispatch per-tracker with per-tracker config.
``issuer`` is the Telegram ``from`` object (``{id, username,
first_name, last_name, language_code}``) when known. Stored on the
EventLog row so the dashboard can show *who* invoked the command.
"""
cmd, args, count_override = parse_command(text)
if not cmd:
@@ -292,10 +393,20 @@ async def handle_command(
# Merged templates for universal commands
merged_templates = _merge_all_templates(templates_by_config_id)
# Universal commands have no tracker/provider context.
if cmd == "start":
text_resp = _render_cmd_template(merged_templates, "start", locale, {"bot_name": bot.name})
return [CommandResponse(text=text_resp)]
responses = [CommandResponse(text=text_resp)]
await _log_command_event(
bot=bot, chat_id=chat_id, cmd=cmd, args=args, locale=locale,
event_type="command_handled", responses=responses,
ctx_tuples=[], issuer=issuer,
)
return responses
# Unknown / disabled command — caller treats this the same as "no
# match" and we deliberately do NOT log it (avoids dashboard spam
# from random ``/foo`` traffic).
if cmd not in enabled and cmd != "start":
return None
@@ -307,13 +418,26 @@ async def handle_command(
cmd, bot.id, chat_id, wait,
)
text_resp = _render_cmd_template(merged_templates, "rate_limited", locale, {"wait": wait})
return [CommandResponse(text=text_resp)]
responses = [CommandResponse(text=text_resp)]
await _log_command_event(
bot=bot, chat_id=chat_id, cmd=cmd, args=args, locale=locale,
event_type="command_rate_limited", responses=responses,
ctx_tuples=ctx_tuples, extra_details={"wait_seconds": wait},
issuer=issuer,
)
return responses
# Universal commands — single merged response
if cmd == "help":
ctx = _cmd_help(enabled, locale, merged_templates)
text_resp = _render_cmd_template(merged_templates, "help", locale, ctx)
return [CommandResponse(text=text_resp)]
responses = [CommandResponse(text=text_resp)]
await _log_command_event(
bot=bot, chat_id=chat_id, cmd=cmd, args=args, locale=locale,
event_type="command_handled", responses=responses,
ctx_tuples=ctx_tuples, issuer=issuer,
)
return responses
# Provider-specific dispatch — per-tracker
from .dispatch import get_handler
@@ -329,48 +453,69 @@ async def handle_command(
from .command_utils import resolve_chat_album_scope
responses: list[CommandResponse] = []
for tracker, config, provider, listener in ctx_tuples:
if len(responses) >= _MAX_RESPONSES_PER_COMMAND:
_LOGGER.warning(
"Truncated command responses at %d for bot=%d chat=%s cmd=/%s (listener context size=%d)",
_MAX_RESPONSES_PER_COMMAND, bot.id, chat_id, cmd, len(ctx_tuples),
dispatched_ctx: list[
tuple[CommandTracker, CommandConfig, ServiceProvider, CommandTrackerListener]
] = []
try:
for tracker, config, provider, listener in ctx_tuples:
if len(responses) >= _MAX_RESPONSES_PER_COMMAND:
_LOGGER.warning(
"Truncated command responses at %d for bot=%d chat=%s cmd=/%s (listener context size=%d)",
_MAX_RESPONSES_PER_COMMAND, bot.id, chat_id, cmd, len(ctx_tuples),
)
break
handler = get_handler(provider.type)
if not handler or cmd not in handler.get_provider_commands():
continue
tracker_templates = _templates_for_config(templates_by_config_id, config)
count = min(count_override or config.default_count or 5, 20)
response_mode = config.response_mode or "media"
# Resolve the album scope for this (provider, bot, chat) triple.
# - Explicit ``listener.allowed_album_ids`` override wins as-is.
# - Otherwise derive from notification routing: only albums that
# already deliver notifications to this chat are queryable from
# it. Prevents commands leaking the full album catalog into
# chats that were never set up to receive from those trackers.
if listener is not None and listener.allowed_album_ids is not None:
allowed_album_ids: set[str] = set(listener.allowed_album_ids)
else:
allowed_album_ids = await resolve_chat_album_scope(
provider_id=provider.id,
bot_id=bot.id,
chat_id=chat_id,
)
result = await handler.handle(
cmd, args, count, locale, response_mode,
provider, tracker_templates, bot, tracker, config,
listener=listener,
allowed_album_ids=allowed_album_ids,
page=page,
)
break
handler = get_handler(provider.type)
if not handler or cmd not in handler.get_provider_commands():
continue
tracker_templates = _templates_for_config(templates_by_config_id, config)
count = min(count_override or config.default_count or 5, 20)
response_mode = config.response_mode or "media"
# Resolve the album scope for this (provider, bot, chat) triple.
# - Explicit ``listener.allowed_album_ids`` override wins as-is.
# - Otherwise derive from notification routing: only albums that
# already deliver notifications to this chat are queryable from
# it. Prevents commands leaking the full album catalog into
# chats that were never set up to receive from those trackers.
if listener is not None and listener.allowed_album_ids is not None:
allowed_album_ids: set[str] = set(listener.allowed_album_ids)
else:
allowed_album_ids = await resolve_chat_album_scope(
provider_id=provider.id,
bot_id=bot.id,
chat_id=chat_id,
)
result = await handler.handle(
cmd, args, count, locale, response_mode,
provider, tracker_templates, bot, tracker, config,
listener=listener,
allowed_album_ids=allowed_album_ids,
page=page,
if result is not None:
responses.append(result)
dispatched_ctx.append((tracker, config, provider, listener))
except Exception as exc: # noqa: BLE001 — log then re-raise
await _log_command_event(
bot=bot, chat_id=chat_id, cmd=cmd, args=args, locale=locale,
event_type="command_failed", responses=responses,
ctx_tuples=ctx_tuples,
extra_details={"error": f"{type(exc).__name__}: {exc}"},
issuer=issuer,
)
if result is not None:
responses.append(result)
raise
return responses if responses else None
if responses:
await _log_command_event(
bot=bot, chat_id=chat_id, cmd=cmd, args=args, locale=locale,
event_type="command_handled", responses=responses,
ctx_tuples=dispatched_ctx, issuer=issuer,
)
return responses
return None
def _cmd_help(
@@ -120,7 +120,11 @@ async def telegram_webhook(
async with telegram_chat_action(
bot_token, chat_id, classify_command_chat_action(text),
):
responses = await handle_command(bot, chat_id, text, language_code=effective_lang)
responses = await handle_command(
bot, chat_id, text,
language_code=effective_lang,
issuer=from_user or None,
)
if not responses:
_LOGGER.info(
"Command produced no response (cmd=%r) after %.0f ms",
@@ -90,6 +90,10 @@ async def migrate_schema(engine: AsyncEngine) -> None:
("user_id", "ALTER TABLE event_log ADD COLUMN user_id INTEGER"),
("action_id", "ALTER TABLE event_log ADD COLUMN action_id INTEGER"),
("action_name", "ALTER TABLE event_log ADD COLUMN action_name TEXT DEFAULT ''"),
("command_tracker_id", "ALTER TABLE event_log ADD COLUMN command_tracker_id INTEGER"),
("command_tracker_name", "ALTER TABLE event_log ADD COLUMN command_tracker_name TEXT DEFAULT ''"),
("telegram_bot_id", "ALTER TABLE event_log ADD COLUMN telegram_bot_id INTEGER"),
("bot_name", "ALTER TABLE event_log ADD COLUMN bot_name TEXT DEFAULT ''"),
]:
if not await _has_column(conn, "event_log", col):
await conn.execute(text(sql))
@@ -105,6 +109,8 @@ async def migrate_schema(engine: AsyncEngine) -> None:
("ix_event_log_user_id", "user_id"),
("ix_event_log_action_id", "action_id"),
("ix_event_log_provider_id", "provider_id"),
("ix_event_log_command_tracker_id", "command_tracker_id"),
("ix_event_log_telegram_bot_id", "telegram_bot_id"),
]:
await conn.execute(
text(f"CREATE INDEX IF NOT EXISTS {idx_name} ON event_log ({col})")
@@ -519,6 +519,17 @@ class EventLog(SQLModel, table=True):
default=None, foreign_key="action.id", index=True,
)
action_name: str = Field(default="")
# Bot command provenance. Populated when ``event_type`` starts with
# ``command_`` so the dashboard can render command activity alongside
# tracker and action events. NULL for non-command rows.
command_tracker_id: int | None = Field(
default=None, foreign_key="command_tracker.id", index=True,
)
command_tracker_name: str = Field(default="")
telegram_bot_id: int | None = Field(
default=None, foreign_key="telegram_bot.id", index=True,
)
bot_name: str = Field(default="")
provider_id: int | None = Field(default=None, index=True)
provider_name: str = Field(default="")
event_type: str = Field(index=True)
@@ -232,7 +232,9 @@ async def _poll_bot(bot_id: int) -> None:
# Copy attributes before session closes to avoid detached-instance errors
from types import SimpleNamespace
bot_token = bot.token
bot_obj = SimpleNamespace(id=bot.id, name=bot.name, token=bot.token)
bot_obj = SimpleNamespace(
id=bot.id, name=bot.name, token=bot.token, user_id=bot.user_id,
)
offset = _last_update_id.get(bot_id, 0)
@@ -331,7 +333,11 @@ async def _poll_bot(bot_id: int) -> None:
async with telegram_chat_action(
bot_token, chat_id, classify_command_chat_action(text),
):
responses = await handle_command(bot_obj, chat_id, text, language_code=effective_lang)
responses = await handle_command(
bot_obj, chat_id, text,
language_code=effective_lang,
issuer=from_user or None,
)
if not responses:
_LOGGER.info(
"Command produced no response (cmd=%r, poll) after %.0f ms",
@@ -0,0 +1,227 @@
"""Bot command invocations must be logged to ``EventLog``.
Covers the three branches in ``handle_command``:
* ``command_handled`` a successful invocation (here exercised via the
helper directly so the test stays focused on the persistence shape).
* ``command_rate_limited`` caller hit the cooldown.
* ``command_failed`` an exception bubbled out of dispatch.
The dashboard reads these rows via ``GET /api/status`` so the test also
asserts the row is filterable by ``event_type=command_*``.
"""
from __future__ import annotations
import pytest
from fastapi.testclient import TestClient
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
def _bootstrap_app():
"""Bring up the app once so migrations run against the temp DB."""
from notify_bridge_server.main import app
return app
async def _seed_user_and_bot(name: str = "Test bot"):
"""Create a User + TelegramBot, return the bot row."""
from notify_bridge_server.database.engine import get_engine
from notify_bridge_server.database.models import TelegramBot, User
engine = get_engine()
async with AsyncSession(engine) as session:
user = User(username=f"u_{name}", hashed_password="x")
session.add(user)
await session.commit()
await session.refresh(user)
bot = TelegramBot(user_id=user.id, name=name, token="dummy")
session.add(bot)
await session.commit()
await session.refresh(bot)
return bot
async def _read_events(event_type: str, bot_id: int):
"""Filter by bot_id so tests don't leak rows into each other.
The temp DB is shared across tests in this module without this
filter a row left by an earlier test would make the next assertion
flaky depending on collection order.
"""
from notify_bridge_server.database.engine import get_engine
from notify_bridge_server.database.models import EventLog
engine = get_engine()
async with AsyncSession(engine) as session:
result = await session.exec(
select(EventLog)
.where(EventLog.event_type == event_type)
.where(EventLog.telegram_bot_id == bot_id)
)
return list(result.all())
def test_format_command_subject_no_args(tmp_data_dir) -> None: # noqa: ARG001
from notify_bridge_server.commands.handler import _format_command_subject
assert _format_command_subject("latest", "") == "/latest"
assert _format_command_subject("help", None) == "/help"
def test_format_command_subject_with_args(tmp_data_dir) -> None: # noqa: ARG001
from notify_bridge_server.commands.handler import _format_command_subject
assert _format_command_subject("search", "sunset") == "/search sunset"
# Trailing whitespace must not leak into the dashboard label.
assert _format_command_subject("search", "sunset ") == "/search sunset"
def test_normalize_issuer_keeps_identity_drops_extras(tmp_data_dir) -> None: # noqa: ARG001
"""Telegram ``from`` is whitelisted to identity fields only."""
from notify_bridge_server.commands.handler import _normalize_issuer
assert _normalize_issuer(None) is None
assert _normalize_issuer({}) is None
raw = {
"id": 1234,
"username": "alex",
"first_name": "Alex",
"last_name": "",
"language_code": "ru", # already captured separately — must drop
"is_premium": True, # must not leak into our log
}
assert _normalize_issuer(raw) == {
"id": 1234,
"username": "alex",
"first_name": "Alex",
}
def test_log_command_handled_persists_row(tmp_data_dir) -> None: # noqa: ARG001
"""``command_handled`` row carries bot + provenance + media count."""
import asyncio
from notify_bridge_server.commands.base import CommandResponse
from notify_bridge_server.commands.handler import _log_command_event
app = _bootstrap_app()
with TestClient(app):
async def run() -> None:
bot = await _seed_user_and_bot("HandledBot")
await _log_command_event(
bot=bot,
chat_id="123456",
cmd="latest",
args="",
locale="en",
event_type="command_handled",
responses=[CommandResponse(text="ok", media=[{"type": "photo"}])],
ctx_tuples=[], # universal command path: no tracker context
)
rows = await _read_events("command_handled", bot.id)
assert len(rows) == 1
row = rows[0]
assert row.user_id == bot.user_id
assert row.telegram_bot_id == bot.id
assert row.bot_name == "HandledBot"
assert row.collection_id == "123456"
assert row.collection_name == "/latest"
assert row.assets_count == 1
assert row.details["command"] == "latest"
assert row.details["chat_id"] == "123456"
assert row.details["responses_count"] == 1
asyncio.run(run())
def test_log_command_rate_limited_carries_wait_seconds(tmp_data_dir) -> None: # noqa: ARG001
import asyncio
from notify_bridge_server.commands.base import CommandResponse
from notify_bridge_server.commands.handler import _log_command_event
app = _bootstrap_app()
with TestClient(app):
async def run() -> None:
bot = await _seed_user_and_bot("ThrottledBot")
await _log_command_event(
bot=bot,
chat_id="42",
cmd="random",
args="",
locale="en",
event_type="command_rate_limited",
responses=[CommandResponse(text="cooldown")],
ctx_tuples=[],
extra_details={"wait_seconds": 7},
)
rows = await _read_events("command_rate_limited", bot.id)
assert len(rows) == 1
assert rows[0].details["wait_seconds"] == 7
assert rows[0].assets_count == 0 # text-only response
asyncio.run(run())
def test_log_command_failed_records_error(tmp_data_dir) -> None: # noqa: ARG001
import asyncio
from notify_bridge_server.commands.handler import _log_command_event
app = _bootstrap_app()
with TestClient(app):
async def run() -> None:
bot = await _seed_user_and_bot("BrokenBot")
await _log_command_event(
bot=bot,
chat_id="9",
cmd="albums",
args="",
locale="ru",
event_type="command_failed",
responses=[],
ctx_tuples=[],
extra_details={"error": "RuntimeError: boom"},
)
rows = await _read_events("command_failed", bot.id)
assert len(rows) == 1
assert rows[0].details["error"] == "RuntimeError: boom"
assert rows[0].details["locale"] == "ru"
asyncio.run(run())
def test_log_command_event_handles_db_error_gracefully(tmp_data_dir, monkeypatch) -> None: # noqa: ARG001
"""A logging failure must NOT raise — the user still gets their reply."""
import asyncio
from notify_bridge_server.commands import handler as handler_mod
from notify_bridge_server.commands.base import CommandResponse
app = _bootstrap_app()
with TestClient(app):
async def run() -> None:
bot = await _seed_user_and_bot("StillRepliesBot")
def boom() -> object:
raise RuntimeError("db gone")
monkeypatch.setattr(handler_mod, "get_engine", boom)
# Must not raise.
await handler_mod._log_command_event(
bot=bot,
chat_id="1",
cmd="help",
args="",
locale="en",
event_type="command_handled",
responses=[CommandResponse(text="hi")],
ctx_tuples=[],
)
asyncio.run(run())