feat(logging): production-grade logging with context vars, secret masking, and runtime level control

Boot-time logging was a three-line basicConfig stub with no timestamps, no
correlation, and silent drops at every layer of the Telegram send path — a
/random command that delivered text but no media left zero evidence in the
log. This replaces the setup and closes every silent drop encountered end-to-end.

New infrastructure:
- notify_bridge_core.log_context: request_id/command/chat_id/bot_id/dispatch_id
  ContextVars with a bind_log_context() context manager so deep call sites
  (TelegramClient, NotificationDispatcher) inherit the correlation tag without
  threading args through.
- notify_bridge_server.logging_setup: dictConfig-based setup with a
  LogRecordFactory that tags every record, a SecretMaskingFilter that redacts
  /botN:TOKEN plus Authorization/x-api-key/password/secret in messages AND
  tracebacks, a JSON formatter for aggregators, text formatter with grep-friendly
  [req=... cmd=... bot=... chat=... disp=...] prefix, and default dampening
  for sqlalchemy/aiohttp/apscheduler/urllib3/PIL.

Runtime control:
- NOTIFY_BRIDGE_LOG_LEVEL / _FORMAT / _LEVELS env vars (boot).
- DB-backed log_level / log_format / log_levels AppSettings, applied on
  boot after migrations and live via apply_log_levels() when edited in
  the settings UI (format still requires restart, logs a WARN).
- Frontend settings page gains a Logging card (level dropdown, format
  dropdown, per-module overrides); en/ru i18n keys added.

Call-site fixes (/random media-group blind spot and adjacent):
- TelegramClient._fetch_asset: every silent drop now WARN-logs with reason
  (missing url, HTTP non-200, size/dimension limits, ClientError).
- TelegramClient._send_media_group: WARN on "chunk had N items but 0 usable",
  ERROR on sendMediaGroup non-ok/transport with full context; returns
  success=False + "no_items_delivered" instead of success=True with an empty
  message_ids list so callers can distinguish.
- TelegramClient.send_message / _upload_media / _send_from_cache: ERROR on
  non-ok + transport failures with status/code/desc; DEBUG for cache-hit
  fallbacks.
- NotificationDispatcher.dispatch: generates a dispatch_id, binds it, logs
  start/finish with failure count, uses exc_info for target failures.
- commands/handler: missing/failed templates -> ERROR + exc_info; send_reply
  and send_media_group errors upgraded WARNING -> ERROR with chat/error_code
  context; rate-limit and truncation cases logged with full context.
- commands/webhook and services/telegram_poller: bind_log_context(request_id
  =tg:<update_id>, command, chat_id, bot_id), INFO on receive/dispatch/
  completion with duration, exc_info on raise, INFO when commands disabled.
- commands/immich: INFO when album scope is empty; WARN per asset dropped
  from media payload and a summary WARN when "N assets in, 0 out".
This commit is contained in:
2026-04-23 14:41:26 +03:00
parent 1f880daa0c
commit f50d465c0e
15 changed files with 831 additions and 63 deletions
+7
View File
@@ -695,6 +695,13 @@
"locales": "Template Languages",
"supportedLocales": "Supported Locales",
"supportedLocalesHint": "Languages available when authoring notification and command templates. Built-in defaults ship for English and Russian; other languages start empty.",
"logging": "Logging",
"logLevel": "Log Level",
"logLevelHint": "Root log level for the server. Raise to DEBUG while investigating; keep at INFO in production. WARNING/ERROR hide per-command progress lines.",
"logFormat": "Log Format",
"logFormatHint": "Output format. 'text' is human-readable; 'json' emits one object per line for log aggregators (Loki, ELK). Changing this requires a server restart.",
"logLevels": "Per-Module Overrides",
"logLevelsHint": "Comma-separated 'module=LEVEL' pairs to silence noisy modules or drill into one area. Example: sqlalchemy.engine=WARNING,notify_bridge_core.notifications.telegram.client=DEBUG",
"saved": "Settings saved"
},
"hints": {
+7
View File
@@ -695,6 +695,13 @@
"locales": "Языки шаблонов",
"supportedLocales": "Поддерживаемые локали",
"supportedLocalesHint": "Языки, доступные для редактирования шаблонов уведомлений и команд. Встроенные шаблоны поставляются для английского и русского; другие языки начинают с пустых.",
"logging": "Логирование",
"logLevel": "Уровень логов",
"logLevelHint": "Уровень логирования сервера. Поднимайте до DEBUG при отладке; оставляйте INFO в продакшене. WARNING/ERROR скрывают пошаговые строки по командам.",
"logFormat": "Формат логов",
"logFormatHint": "Формат вывода. 'text' — читаемый человеком; 'json' — по одному объекту в строке для агрегаторов (Loki, ELK). Смена требует перезапуска сервера.",
"logLevels": "Переопределения по модулям",
"logLevelsHint": "Пары 'модуль=УРОВЕНЬ' через запятую, чтобы приглушить шумные модули или углубиться в один. Пример: sqlalchemy.engine=WARNING,notify_bridge_core.notifications.telegram.client=DEBUG",
"saved": "Настройки сохранены"
},
"hints": {
+37
View File
@@ -37,6 +37,9 @@
telegram_asset_cache_max_entries: '5000',
supported_locales: 'en,ru',
timezone: 'UTC',
log_level: 'INFO',
log_format: 'text',
log_levels: '',
});
let cacheStats = $state<CacheStats | null>(null);
@@ -204,6 +207,40 @@
</div>
</Card>
<!-- Logging section -->
<Card>
<h3 class="text-sm font-semibold mb-4 flex items-center gap-2">
<MdiIcon name="mdiTextBoxOutline" size={18} />
{t('settings.logging')}
</h3>
<div class="grid grid-cols-1 sm:grid-cols-2 gap-4">
<div>
<label class="block text-xs font-medium mb-1">{t('settings.logLevel')}<Hint text={t('settings.logLevelHint')} /></label>
<select bind:value={settings.log_level}
class="w-full px-3 py-1.5 text-sm border border-[var(--color-border)] rounded-md bg-[var(--color-background)]">
<option value="DEBUG">DEBUG</option>
<option value="INFO">INFO</option>
<option value="WARNING">WARNING</option>
<option value="ERROR">ERROR</option>
</select>
</div>
<div>
<label class="block text-xs font-medium mb-1">{t('settings.logFormat')}<Hint text={t('settings.logFormatHint')} /></label>
<select bind:value={settings.log_format}
class="w-full px-3 py-1.5 text-sm border border-[var(--color-border)] rounded-md bg-[var(--color-background)]">
<option value="text">text</option>
<option value="json">json</option>
</select>
</div>
<div class="sm:col-span-2">
<label class="block text-xs font-medium mb-1">{t('settings.logLevels')}<Hint text={t('settings.logLevelsHint')} /></label>
<input bind:value={settings.log_levels}
placeholder="sqlalchemy.engine=WARNING,notify_bridge_core.notifications.telegram.client=DEBUG"
class="w-full px-3 py-1.5 text-sm border border-[var(--color-border)] rounded-md bg-[var(--color-background)] font-mono" />
</div>
</div>
</Card>
<Button onclick={save} disabled={saving}>
{saving ? t('common.loading') : t('common.save')}
</Button>
@@ -0,0 +1,66 @@
"""Request-scoped ContextVars that propagate into log records.
The server sets these at entry points (Telegram webhook, scheduler dispatch,
REST call) and they propagate through async calls automatically. A
``LogRecordFactory`` installed by ``notify_bridge_server.logging_setup``
reads them so every log line is tagged (``request_id``, ``command``,
``chat_id``, ``bot_id``, ``dispatch_id``) without each call site having
to pass the values explicitly.
Kept in ``notify_bridge_core`` so core modules (``TelegramClient``,
``NotificationDispatcher``) can *set* additional context (e.g. a
``dispatch_id``) without depending on the server package.
"""
from __future__ import annotations
from contextlib import contextmanager
from contextvars import ContextVar, Token
from typing import Any, Iterator
request_id_var: ContextVar[str | None] = ContextVar("request_id", default=None)
command_var: ContextVar[str | None] = ContextVar("command", default=None)
chat_id_var: ContextVar[str | None] = ContextVar("chat_id", default=None)
bot_id_var: ContextVar[int | None] = ContextVar("bot_id", default=None)
dispatch_id_var: ContextVar[str | None] = ContextVar("dispatch_id", default=None)
_VAR_MAP: dict[str, ContextVar[Any]] = {
"request_id": request_id_var,
"command": command_var,
"chat_id": chat_id_var,
"bot_id": bot_id_var,
"dispatch_id": dispatch_id_var,
}
@contextmanager
def bind_log_context(**kwargs: Any) -> Iterator[None]:
"""Bind the given context fields for the duration of the ``with`` block.
Unknown keys are ignored so callers can pass whatever they want without
an ``if`` ladder. Values are reset on exit even if the block raises.
Example:
``with bind_log_context(request_id="abc", command="random"): ...``
"""
tokens: list[tuple[ContextVar[Any], Token]] = []
try:
for key, value in kwargs.items():
var = _VAR_MAP.get(key)
if var is None:
continue
tokens.append((var, var.set(value)))
yield
finally:
for var, tok in tokens:
var.reset(tok)
def current_log_context() -> dict[str, Any]:
"""Return a snapshot of the currently-bound context values (non-None)."""
snap: dict[str, Any] = {}
for key, var in _VAR_MAP.items():
val = var.get()
if val is not None:
snap[key] = val
return snap
@@ -4,11 +4,13 @@ from __future__ import annotations
import asyncio
import logging
import uuid
from dataclasses import dataclass, field
from typing import Any
import aiohttp
from notify_bridge_core.log_context import bind_log_context, dispatch_id_var
from notify_bridge_core.models.events import ServiceEvent
from notify_bridge_core.templates.context import build_template_context
from notify_bridge_core.templates.renderer import render_template
@@ -95,18 +97,40 @@ class NotificationDispatcher:
Returns list of results (one per target).
"""
raw_results = await asyncio.gather(
*[self._send_to_target(event, t) for t in targets],
return_exceptions=True,
)
results = []
for raw in raw_results:
if isinstance(raw, Exception):
_LOGGER.error("Failed to dispatch to target: %s", raw)
results.append({"success": False, "error": str(raw)})
else:
results.append(raw)
return results
# Bind a dispatch_id so every log line emitted by the target sends
# (including deep in TelegramClient) can be correlated to the same
# upstream event.
new_id = dispatch_id_var.get() or f"disp:{uuid.uuid4().hex[:12]}"
with bind_log_context(dispatch_id=new_id):
_LOGGER.info(
"Dispatching event %s (collection=%r) to %d target(s)",
event.event_type.value if hasattr(event.event_type, "value") else event.event_type,
getattr(event, "collection_name", None), len(targets),
)
raw_results = await asyncio.gather(
*[self._send_to_target(event, t) for t in targets],
return_exceptions=True,
)
results = []
failures = 0
for target, raw in zip(targets, raw_results):
if isinstance(raw, Exception):
failures += 1
_LOGGER.error(
"Dispatch to target type=%s failed: %s",
target.type, raw, exc_info=raw,
)
results.append({"success": False, "error": str(raw)})
else:
if isinstance(raw, dict) and not raw.get("success"):
failures += 1
results.append(raw)
_LOGGER.info(
"Dispatch finished: %d target(s), %d failure(s)",
len(targets), failures,
)
return results
def _resolve_template(
self, event: ServiceEvent, target: TargetConfig, locale: str,
@@ -162,8 +162,20 @@ class TelegramClient:
"message_id": result.get("result", {}).get("message_id"),
"cached": True,
}
except aiohttp.ClientError:
pass
# Non-ok from a cached send — file_id stale or file deleted on
# Telegram's side. Log at DEBUG so operators who are hunting
# "why didn't the cached send work?" can see it, but the
# caller will fall through to a fresh upload.
_LOGGER.debug(
"Telegram %s (cached) returned non-ok: status=%s code=%s desc=%r — falling back to fresh upload",
kind.api_method, response.status, result.get("error_code"),
result.get("description"),
)
except aiohttp.ClientError as err:
_LOGGER.debug(
"Telegram %s (cached) transport error — falling back to fresh upload: %s",
kind.api_method, err,
)
return None
async def _upload_media(
@@ -203,8 +215,17 @@ class TelegramClient:
thumbhash=thumbhash, size=len(data),
)
return {"success": True, "message_id": res.get("message_id")}
_LOGGER.error(
"Telegram %s failed: status=%s code=%s desc=%r bytes=%d",
kind.api_method, response.status, result.get("error_code"),
result.get("description", "Unknown"), len(data),
)
return {"success": False, "error": result.get("description", "Unknown Telegram error")}
except aiohttp.ClientError as err:
_LOGGER.error(
"Telegram %s transport error (bytes=%d): %s",
kind.api_method, len(data), err, exc_info=True,
)
return {"success": False, "error": str(err)}
async def send_notification(
@@ -327,8 +348,14 @@ class TelegramClient:
retry_result = await retry_resp.json()
if retry_resp.status == 200 and retry_result.get("ok"):
return {"success": True, "message_id": retry_result.get("result", {}).get("message_id")}
_LOGGER.error(
"Telegram sendMessage failed: status=%s code=%s desc=%r",
response.status, result.get("error_code"),
result.get("description", "Unknown"),
)
return {"success": False, "error": result.get("description", "Unknown Telegram error"), "error_code": result.get("error_code")}
except aiohttp.ClientError as err:
_LOGGER.error("Telegram sendMessage transport error: %s", err, exc_info=True)
return {"success": False, "error": str(err)}
async def send_chat_action(self, chat_id: str, action: str = "typing") -> bool:
@@ -513,11 +540,14 @@ class TelegramClient:
# Tuple is (cache_key, media_type, thumbhash, uploaded_size).
media_cache_info: list[tuple[str, str, str | None, int] | None] = []
# Resolve cache hits and collect download tasks in parallel
# Resolve cache hits and collect download tasks in parallel.
# Each drop site logs the reason — otherwise a filtered asset
# disappears silently and the media group silently shrinks.
async def _fetch_asset(idx: int, item: dict) -> tuple[int, dict | None, bytes | None]:
"""Return (index, cache_entry_or_None, downloaded_bytes_or_None)."""
url = item.get("url")
if not url:
_LOGGER.warning("Media skipped: missing url (idx=%d type=%s)", idx, item.get("type"))
return idx, None, None
media_type = item.get("type", "photo")
custom_cache_key = item.get("cache_key")
@@ -537,12 +567,24 @@ class TelegramClient:
if preloaded is not None:
data = preloaded
if max_asset_data_size and len(data) > max_asset_data_size:
_LOGGER.warning(
"Media skipped: preloaded size %d exceeds max_asset_data_size %d (idx=%d type=%s url=%s)",
len(data), max_asset_data_size, idx, media_type, url,
)
return idx, None, None
if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE:
_LOGGER.warning(
"Media skipped: preloaded video %d bytes exceeds Telegram limit %d (idx=%d url=%s)",
len(data), TELEGRAM_MAX_VIDEO_SIZE, idx, url,
)
return idx, None, None
if media_type == "photo":
exceeds, _, _, _ = check_photo_limits(data)
exceeds, reason, _, _ = check_photo_limits(data)
if exceeds:
_LOGGER.warning(
"Media skipped: preloaded photo %s (idx=%d url=%s)",
reason, idx, url,
)
return idx, None, None
return idx, None, data
@@ -551,18 +593,38 @@ class TelegramClient:
dl_headers = item.get("headers") or {}
async with self._session.get(download_url, headers=dl_headers) as resp:
if resp.status != 200:
_LOGGER.warning(
"Media skipped: download HTTP %d (idx=%d type=%s url=%s)",
resp.status, idx, media_type, url,
)
return idx, None, None
data = await resp.read()
if max_asset_data_size and len(data) > max_asset_data_size:
_LOGGER.warning(
"Media skipped: downloaded size %d exceeds max_asset_data_size %d (idx=%d type=%s url=%s)",
len(data), max_asset_data_size, idx, media_type, url,
)
return idx, None, None
if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE:
_LOGGER.warning(
"Media skipped: video %d bytes exceeds Telegram %d-byte limit (idx=%d url=%s)",
len(data), TELEGRAM_MAX_VIDEO_SIZE, idx, url,
)
return idx, None, None
if media_type == "photo":
exceeds, _, _, _ = check_photo_limits(data)
exceeds, reason, _, _ = check_photo_limits(data)
if exceeds:
_LOGGER.warning(
"Media skipped: photo %s (idx=%d url=%s)",
reason, idx, url,
)
return idx, None, None
return idx, None, data
except aiohttp.ClientError:
except aiohttp.ClientError as err:
_LOGGER.warning(
"Media skipped: download failed (idx=%d type=%s url=%s): %s",
idx, media_type, url, err,
)
return idx, None, None
results = await asyncio.gather(
@@ -602,6 +664,14 @@ class TelegramClient:
media_json.append(mij)
if not media_json:
# Every asset in this chunk was filtered out (size, download
# failure, etc.). Without this log, sendMediaGroup returns
# success=True with zero message_ids and nobody knows why
# the user sees only the text reply and no media.
_LOGGER.warning(
"sendMediaGroup skipped — chunk %d/%d had %d input items but 0 usable (all filtered/failed)",
chunk_idx + 1, len(chunks), len(chunk),
)
continue
form.add_field("media", json.dumps(media_json))
@@ -638,10 +708,35 @@ class TelegramClient:
if eff_cache:
await eff_cache.async_set_many(cache_entries)
else:
return {"success": False, "error": result.get("description", "Unknown"), "failed_at_chunk": chunk_idx + 1}
_LOGGER.error(
"Telegram sendMediaGroup failed: status=%s code=%s desc=%r chunk=%d/%d items=%d",
response.status, result.get("error_code"),
result.get("description", "Unknown"),
chunk_idx + 1, len(chunks), len(media_json),
)
return {
"success": False,
"error": result.get("description", "Unknown"),
"error_code": result.get("error_code"),
"failed_at_chunk": chunk_idx + 1,
}
except aiohttp.ClientError as err:
_LOGGER.error(
"Telegram sendMediaGroup transport error on chunk %d/%d (%d items): %s",
chunk_idx + 1, len(chunks), len(media_json), err,
exc_info=True,
)
return {"success": False, "error": str(err), "failed_at_chunk": chunk_idx + 1}
# Distinguish "posted something" from "posted nothing" so the caller
# can surface an ERROR when a command produced a caption reply but no
# media ever reached Telegram.
if not all_message_ids:
_LOGGER.warning(
"sendMediaGroup completed with 0 message_ids across %d chunk(s) — nothing was delivered",
len(chunks),
)
return {"success": False, "error": "no_items_delivered", "chunks_sent": len(chunks)}
return {"success": True, "message_ids": all_message_ids, "chunks_sent": len(chunks)}
# ------------------------------------------------------------------
@@ -24,6 +24,10 @@ _SETTING_KEYS = {
"telegram_asset_cache_max_entries": None, # LRU cap for both caches
"supported_locales": None, # comma-separated locale codes
"timezone": "NOTIFY_BRIDGE_TIMEZONE", # IANA tz (e.g. "Europe/Warsaw"); empty = UTC
# Logging — applied live via apply_log_levels() when changed.
"log_level": "NOTIFY_BRIDGE_LOG_LEVEL", # DEBUG/INFO/WARNING/ERROR
"log_format": "NOTIFY_BRIDGE_LOG_FORMAT", # text|json (requires restart to switch)
"log_levels": "NOTIFY_BRIDGE_LOG_LEVELS", # module=LEVEL,module2=LEVEL
}
_DEFAULTS = {
@@ -35,12 +39,20 @@ _DEFAULTS = {
"telegram_asset_cache_max_entries": "5000",
"supported_locales": "en,ru",
"timezone": "UTC",
"log_level": "INFO",
"log_format": "text",
"log_levels": "",
}
# Settings whose changes require dropping in-memory Telegram caches so the
# next dispatch rebuilds them with the new parameters. Files are preserved.
_CACHE_SETTING_KEYS = {"telegram_cache_ttl_hours", "telegram_asset_cache_max_entries"}
# Settings that change logging behaviour. ``log_level`` + ``log_levels`` apply
# live via apply_log_levels(); ``log_format`` requires a restart because
# changing it means swapping the handler formatter entirely.
_LOG_SETTING_KEYS = {"log_level", "log_levels", "log_format"}
async def get_setting(session: AsyncSession, key: str) -> str:
"""Read a setting from DB, falling back to env var then default."""
@@ -66,6 +78,9 @@ class SettingsUpdate(BaseModel):
telegram_asset_cache_max_entries: int | str | None = None
supported_locales: str | None = None
timezone: str | None = None
log_level: str | None = None
log_format: str | None = None
log_levels: str | None = None
@router.get("")
@@ -95,6 +110,7 @@ async def update_settings(
old_secret = await get_setting(session, "telegram_webhook_secret")
old_cache_values = {k: await get_setting(session, k) for k in _CACHE_SETTING_KEYS}
old_timezone = await get_setting(session, "timezone")
old_log_values = {k: await get_setting(session, k) for k in _LOG_SETTING_KEYS}
for key in _SETTING_KEYS:
value = getattr(body, key, None)
@@ -130,6 +146,25 @@ async def update_settings(
new_base_url = await get_setting(session, "external_url")
new_secret = await get_setting(session, "telegram_webhook_secret")
new_timezone = await get_setting(session, "timezone")
new_log_values = {k: await get_setting(session, k) for k in _LOG_SETTING_KEYS}
# Apply live log-level changes (log_format still needs a restart).
if (new_log_values["log_level"] != old_log_values["log_level"]
or new_log_values["log_levels"] != old_log_values["log_levels"]):
from ..logging_setup import apply_log_levels
apply_log_levels(
level=new_log_values["log_level"] or None,
per_module_levels=new_log_values["log_levels"],
)
_LOGGER.info(
"Log levels updated: root=%s overrides=%r",
new_log_values["log_level"], new_log_values["log_levels"],
)
if new_log_values["log_format"] != old_log_values["log_format"]:
_LOGGER.warning(
"log_format changed from %r to %r — restart the server for it to take effect",
old_log_values["log_format"], new_log_values["log_format"],
)
# Cron triggers freeze their timezone at construction time, so a tz change
# has no effect until the jobs are rebuilt — do that here, before we
@@ -199,7 +234,10 @@ async def _reregister_webhooks(
if res.get("success"):
_LOGGER.info("Re-registered webhook for bot %d (%s)", bot.id, bot.name)
else:
_LOGGER.warning(
"Failed to re-register webhook for bot %d: %s",
bot.id, res.get("error"),
# Webhook re-register failure means the bot silently stops
# delivering updates — this is operational visibility for an
# admin, ERROR is appropriate.
_LOGGER.error(
"Failed to re-register webhook for bot %d (%s): %s",
bot.id, bot.name, res.get("error"),
)
@@ -108,13 +108,18 @@ def _render_cmd_template(
"""Render a locale-aware command template. Falls back to 'en'."""
template_str = _resolve_template(templates, slot_name, locale)
if not template_str:
_LOGGER.warning("No command template found for slot '%s' locale '%s'", slot_name, locale)
# Missing template = user sees "[No template: X]" — this is an ERROR,
# not a warning. Broken replies must stand out in production logs.
_LOGGER.error("No command template found for slot '%s' locale '%s'", slot_name, locale)
return f"[No template: {slot_name}]"
try:
tmpl = _compile_template(template_str)
return tmpl.render(**context)
except Exception as e:
_LOGGER.warning("Failed to render command template '%s': %s", slot_name, e)
except Exception:
_LOGGER.error(
"Failed to render command template '%s' locale=%s — user will see a broken reply",
slot_name, locale, exc_info=True,
)
return f"[Template error: {slot_name}]"
@@ -296,6 +301,10 @@ async def handle_command(
# Rate limit check (once per command, shared across all trackers)
wait = _check_rate_limit(bot.id, chat_id, cmd, rate_limits)
if wait is not None:
_LOGGER.info(
"Rate-limited /%s for bot=%d chat=%s%ds cooldown remaining",
cmd, bot.id, chat_id, wait,
)
text_resp = _render_cmd_template(merged_templates, "rate_limited", locale, {"wait": wait})
return [CommandResponse(text=text_resp)]
@@ -322,8 +331,8 @@ async def handle_command(
for tracker, config, provider, listener in ctx_tuples:
if len(responses) >= _MAX_RESPONSES_PER_COMMAND:
_LOGGER.warning(
"Truncated command responses at %d for bot %d cmd /%s",
_MAX_RESPONSES_PER_COMMAND, bot.id, cmd,
"Truncated command responses at %d for bot=%d chat=%s cmd=/%s (listener context size=%d)",
_MAX_RESPONSES_PER_COMMAND, bot.id, chat_id, cmd, len(ctx_tuples),
)
break
@@ -418,7 +427,12 @@ async def send_reply(
disable_web_page_preview=True,
)
if not result.get("success"):
_LOGGER.warning("Telegram reply failed: %s", result.get("error"))
# User-visible failure: the bot's reply never reached the chat.
_LOGGER.error(
"Telegram reply failed (chat=%s reply_to=%s len=%d): code=%s error=%r",
chat_id, reply_to_message_id, len(text or ""),
result.get("error_code"), result.get("error"),
)
async def send_media_group(
@@ -442,6 +456,14 @@ async def send_media_group(
assets hit the cache and skip the re-upload.
"""
if not media_items:
# This is what happened in the /random blind spot: the text reply
# was sent, but the media follow-up was silently skipped because
# the caller passed an empty media list. Surface it so we can see
# it in the log and correlate with the text message.
_LOGGER.warning(
"send_media_group called with 0 items (chat=%s reply_to=%s) — no media will be delivered",
chat_id, reply_to_message_id,
)
return
from ..services.telegram_send import send_telegram_media
@@ -452,7 +474,13 @@ async def send_media_group(
chat_action=None,
)
if not result.get("success"):
_LOGGER.warning("Telegram media group failed: %s", result.get("error"))
# User-visible failure: media promised by the text reply never arrived.
_LOGGER.error(
"Telegram media group failed (chat=%s items=%d reply_to=%s): code=%s error=%r failed_at_chunk=%s",
chat_id, len(media_items), reply_to_message_id,
result.get("error_code"), result.get("error"),
result.get("failed_at_chunk"),
)
async def register_commands_with_telegram(bot: TelegramBot) -> bool:
@@ -144,6 +144,7 @@ def _format_assets(
# other's cached file_ids (which is what made the cache look empty
# from the WebUI after running /random).
media_items: list[dict[str, Any]] = []
dropped = 0
for asset in assets:
asset_id = asset.get("id", "")
asset_type = (asset.get("type") or "").upper()
@@ -156,6 +157,20 @@ def _format_assets(
)
if entry is not None:
media_items.append(entry)
else:
dropped += 1
_LOGGER.warning(
"Dropped asset from /%s media payload: id=%s type=%s (empty preview URL)",
cmd, asset_id, asset_type,
)
if not media_items and assets:
# All assets were filtered out before reaching Telegram. The user
# will see the text reply but no media — surface it here so the
# log shows WHY the media group ended up empty.
_LOGGER.warning(
"/%s media payload empty: %d asset(s) in, 0 out (all dropped)",
cmd, len(assets),
)
# Return text message + media items — text is sent first, media as reply
return {"text": text, "media": media_items}
@@ -143,7 +143,16 @@ async def _cmd_immich(
# chat). ``None`` = no filter (rare); empty set = show nothing (common
# when the chat has no tracker routing).
if allowed_album_ids is not None:
before = len(all_album_ids)
all_album_ids = [aid for aid in all_album_ids if aid in allowed_album_ids]
if not all_album_ids:
# A command that sees zero albums is a routing/tracker config issue
# the operator needs to notice — otherwise the user gets
# "no results" with no hint at why.
_LOGGER.info(
"Command /%s has empty album scope for provider=%d (had %d trackers, chat scope allowed %d)",
cmd, provider.id, before, len(allowed_album_ids),
)
ext_domain = (provider.config.get("external_domain") or provider.config.get("url", "")).rstrip("/")
@@ -4,12 +4,14 @@ from __future__ import annotations
import hmac
import logging
import time
from typing import Any
from fastapi import APIRouter, Depends, Header, HTTPException, Request
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from notify_bridge_core.log_context import bind_log_context
from notify_bridge_core.notifications.telegram.client import TelegramClient
from ..database.engine import get_session
@@ -18,6 +20,7 @@ from ..services.telegram import save_chat_from_webhook
from ..services.telegram_send import telegram_chat_action
from .base import CommandResponse
from .handler import classify_command_chat_action, handle_command, send_media_group, send_reply
from .parser import parse_command
_LOGGER = logging.getLogger(__name__)
@@ -93,20 +96,62 @@ async def telegram_webhook(
)
)).first()
if not chat_row or not chat_row.commands_enabled:
_LOGGER.info(
"Command ignored — commands disabled for bot=%s chat=%s text=%r",
bot_id, chat_id, text[:64],
)
return {"ok": True, "skipped": "commands_disabled"}
effective_lang = chat_row.language_override or msg_language
message_id = message.get("message_id")
async with telegram_chat_action(
bot_token, chat_id, classify_command_chat_action(text),
cmd_name, _, _ = parse_command(text)
update_id = update.get("update_id")
request_id = f"tg:{update_id}" if update_id is not None else f"tg:msg{message_id}"
with bind_log_context(
request_id=request_id,
command=cmd_name or "-",
chat_id=chat_id,
bot_id=bot_id,
):
responses = await handle_command(bot, chat_id, text, language_code=effective_lang)
if responses:
for resp in responses:
if resp.text:
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
if resp.media:
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
return {"ok": True}
started = time.monotonic()
_LOGGER.info("Command received: /%s args=%r lang=%s", cmd_name, text[:200], effective_lang)
try:
async with telegram_chat_action(
bot_token, chat_id, classify_command_chat_action(text),
):
responses = await handle_command(bot, chat_id, text, language_code=effective_lang)
if not responses:
_LOGGER.info(
"Command produced no response (cmd=%r) after %.0f ms",
cmd_name, (time.monotonic() - started) * 1000,
)
return {"ok": True, "skipped": "no_response"}
text_count = sum(1 for r in responses if r.text)
media_count = sum(len(r.media or []) for r in responses)
_LOGGER.info(
"Command dispatching %d response(s): text=%d media_items=%d",
len(responses), text_count, media_count,
)
for idx, resp in enumerate(responses):
if resp.text:
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
if resp.media:
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
_LOGGER.info(
"Command /%s completed in %.0f ms (responses=%d media=%d)",
cmd_name, (time.monotonic() - started) * 1000,
len(responses), media_count,
)
return {"ok": True}
except Exception:
_LOGGER.exception(
"Command /%s raised after %.0f ms",
cmd_name, (time.monotonic() - started) * 1000,
)
# Return 200 so Telegram doesn't retry the same update — we
# already logged the failure and can't usefully reprocess.
return {"ok": True, "error": "handler_exception"}
return {"ok": True, "skipped": "not_a_command"}
@@ -48,6 +48,19 @@ class Settings(BaseSettings):
static_dir: str = ""
"""Path to frontend static files. Set to serve SvelteKit build via FastAPI (e.g. /app/static in Docker)."""
# --- Logging ---
# Boot-time logging configuration. DB AppSetting rows (``log_level`` /
# ``log_levels`` / ``log_format``) override these after startup, letting
# operators adjust levels from the settings UI without a restart.
log_level: str = "INFO"
"""Root log level for the app loggers (``DEBUG``/``INFO``/``WARNING``/``ERROR``)."""
log_format: str = "text"
"""Log output format: ``text`` (human-readable) or ``json`` (one object per line)."""
log_levels: str = ""
"""Comma-separated per-module overrides, e.g. ``notify_bridge_core.notifications.telegram.client=DEBUG,sqlalchemy.engine=INFO``."""
model_config = {"env_prefix": "NOTIFY_BRIDGE_"}
@property
@@ -0,0 +1,324 @@
"""Production-grade logging configuration.
Installs one ``dictConfig`` layout with:
* A ``LogRecordFactory`` that pulls request-scoped identifiers from
``notify_bridge_core.log_context`` onto every record, so logs can be
filtered/correlated by ``request_id``, ``command``, ``chat_id``,
``bot_id``, ``dispatch_id`` without each call site passing them.
* A ``SecretMaskingFilter`` that redacts Telegram bot tokens and common
``Authorization`` / ``x-api-key`` headers so an accidental ``repr`` or
dumped request doesn't leak credentials into the log aggregator.
* A text formatter (default) or a JSON formatter (one line per record)
selectable via ``NOTIFY_BRIDGE_LOG_FORMAT`` / app setting.
Levels are configurable three ways (later wins):
1. ``NOTIFY_BRIDGE_LOG_LEVEL`` env var (root) plus
``NOTIFY_BRIDGE_LOG_LEVELS`` (``mod=LEVEL,mod2=LEVEL``).
2. DB ``AppSetting`` rows ``log_level`` / ``log_levels`` / ``log_format``,
applied after migrations during startup.
3. Live edits via the settings API — ``apply_log_levels()`` updates
existing loggers in place without a server restart.
"""
from __future__ import annotations
import json
import logging
import logging.config
import re
import sys
from typing import Any
from notify_bridge_core.log_context import (
bot_id_var,
chat_id_var,
command_var,
dispatch_id_var,
request_id_var,
)
# ---------------------------------------------------------------------------
# Secret masking
# ---------------------------------------------------------------------------
# Telegram bot tokens: /bot<digits>:<alnum with dashes/underscores>
_TELEGRAM_TOKEN_RE = re.compile(r"/bot\d+:[A-Za-z0-9_-]{20,}")
# Header-style secrets: Authorization: Bearer xxx, x-api-key=xxx, etc.
# Only matches reasonably long tokens so short legitimate values don't trip.
_HEADER_SECRET_RE = re.compile(
r"(?i)(authorization|x-api-key|api[_-]?key|password|secret|access[_-]?token|refresh[_-]?token)"
r"([\"']?\s*[:=]\s*[\"']?)"
r"([A-Za-z0-9._+/=\-]{12,})"
)
def _mask(text: str) -> str:
redacted = _TELEGRAM_TOKEN_RE.sub("/bot***", text)
redacted = _HEADER_SECRET_RE.sub(r"\1\2***", redacted)
return redacted
class SecretMaskingFilter(logging.Filter):
"""Redact likely secrets from every log message before it's emitted.
Covers three surfaces where a leaked token can end up in the log:
the formatted message, a cached exception traceback (``exc_text``),
and a cached stack frame dump (``stack_info``). The formatter still
expands ``exc_info`` for us when ``exc_text`` is None, so we also
pre-render + mask on first emission.
"""
def filter(self, record: logging.LogRecord) -> bool:
try:
msg = record.getMessage()
except Exception:
return True
redacted = _mask(msg)
if redacted != msg:
# Replace the formatted message and drop args so the handler
# doesn't re-format with the original values.
record.msg = redacted
record.args = ()
if record.exc_info and not record.exc_text:
# Pre-render so we can mask before the formatter caches it.
fmt = logging.Formatter()
record.exc_text = fmt.formatException(record.exc_info)
if record.exc_text:
record.exc_text = _mask(record.exc_text)
if record.stack_info:
record.stack_info = _mask(record.stack_info)
return True
# ---------------------------------------------------------------------------
# Record factory — injects context identifiers onto every record
# ---------------------------------------------------------------------------
_CONTEXT_FIELDS = ("request_id", "command", "chat_id", "bot_id", "dispatch_id")
_PLACEHOLDER = "-"
_original_factory = logging.getLogRecordFactory()
def _context_record_factory(*args: Any, **kwargs: Any) -> logging.LogRecord:
record = _original_factory(*args, **kwargs)
record.request_id = request_id_var.get() or _PLACEHOLDER
record.command = command_var.get() or _PLACEHOLDER
record.chat_id = chat_id_var.get() or _PLACEHOLDER
bid = bot_id_var.get()
record.bot_id = str(bid) if bid is not None else _PLACEHOLDER
record.dispatch_id = dispatch_id_var.get() or _PLACEHOLDER
return record
# ---------------------------------------------------------------------------
# JSON formatter
# ---------------------------------------------------------------------------
class JsonFormatter(logging.Formatter):
"""Emit one JSON object per log record."""
def format(self, record: logging.LogRecord) -> str:
payload: dict[str, Any] = {
"ts": self.formatTime(record, "%Y-%m-%dT%H:%M:%S") + f".{int(record.msecs):03d}",
"level": record.levelname,
"logger": record.name,
"module": record.module,
"line": record.lineno,
"msg": record.getMessage(),
}
for field in _CONTEXT_FIELDS:
val = getattr(record, field, None)
if val and val != _PLACEHOLDER:
payload[field] = val
# Prefer the pre-masked exc_text cached by SecretMaskingFilter over
# re-formatting from exc_info, which would bypass the mask.
if record.exc_text:
payload["exc"] = record.exc_text
elif record.exc_info:
payload["exc"] = self.formatException(record.exc_info)
if record.stack_info:
payload["stack"] = record.stack_info
return json.dumps(payload, ensure_ascii=False, default=str)
# ---------------------------------------------------------------------------
# Text formatter
# ---------------------------------------------------------------------------
# Keeps all context fields on one line so grep-by-field works. Empty values
# are rendered as "-" by the record factory to avoid KeyError if a record
# arrives without the filter.
_TEXT_FORMAT = (
"%(asctime)s %(levelname)-7s %(name)s:%(lineno)d "
"[req=%(request_id)s cmd=%(command)s bot=%(bot_id)s chat=%(chat_id)s disp=%(dispatch_id)s] "
"%(message)s"
)
# ---------------------------------------------------------------------------
# Level override parsing
# ---------------------------------------------------------------------------
_VALID_LEVELS = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "NOTSET"}
def parse_level_overrides(raw: str) -> dict[str, str]:
"""Parse ``module=LEVEL,module2=LEVEL`` into a mapping of validated levels.
Invalid entries (bad format, unknown level) are silently dropped —
a malformed env var or DB setting must not crash boot.
"""
result: dict[str, str] = {}
for chunk in (raw or "").split(","):
chunk = chunk.strip()
if not chunk or "=" not in chunk:
continue
mod, _, lvl = chunk.partition("=")
mod = mod.strip()
lvl = lvl.strip().upper()
if not mod or lvl not in _VALID_LEVELS:
continue
result[mod] = lvl
return result
def _normalize_level(level: str | None, default: str = "INFO") -> str:
if not level:
return default
up = level.strip().upper()
return up if up in _VALID_LEVELS else default
# ---------------------------------------------------------------------------
# Setup + live apply
# ---------------------------------------------------------------------------
# Libraries we quiet by default — noisy at DEBUG and almost always irrelevant
# to a service issue. Override via LOG_LEVELS=sqlalchemy.engine=DEBUG when
# actually debugging.
_NOISY_LIBRARY_DEFAULTS: dict[str, str] = {
"sqlalchemy": "WARNING",
"sqlalchemy.engine": "WARNING",
"sqlalchemy.pool": "WARNING",
"aiohttp": "WARNING",
"aiohttp.access": "WARNING",
"aiohttp.client": "WARNING",
"aiohttp.server": "WARNING",
"apscheduler": "WARNING",
"apscheduler.scheduler": "WARNING",
"apscheduler.executors.default": "WARNING",
"urllib3": "WARNING",
"asyncio": "WARNING",
"httpx": "WARNING",
"httpcore": "WARNING",
"PIL": "WARNING",
"uvicorn.access": "WARNING",
}
def setup_logging(
*,
level: str = "INFO",
fmt: str = "text",
per_module_levels: str = "",
) -> None:
"""Install the logging configuration. Safe to call more than once.
Args:
level: Root log level (applied to ``notify_bridge_*`` loggers).
fmt: ``"text"`` (default) or ``"json"``.
per_module_levels: ``mod=LEVEL,mod2=LEVEL`` overrides. Wins over the
root level for the listed loggers.
"""
root_level = _normalize_level(level, "INFO")
overrides = parse_level_overrides(per_module_levels)
# Install the context-aware record factory (idempotent — setting the same
# factory twice is fine because ``_original_factory`` is captured at
# import time).
logging.setLogRecordFactory(_context_record_factory)
if fmt == "json":
formatters = {"default": {"()": f"{__name__}.JsonFormatter"}}
else:
formatters = {
"default": {
"format": _TEXT_FORMAT,
"datefmt": "%Y-%m-%d %H:%M:%S",
}
}
# Start with noisy-library defaults, then layer user overrides on top so
# the user can raise them to DEBUG when actually debugging.
loggers: dict[str, dict[str, Any]] = {}
for mod, lvl in _NOISY_LIBRARY_DEFAULTS.items():
loggers[mod] = {"level": lvl, "propagate": True}
# App loggers follow the root level unless overridden.
loggers["notify_bridge_server"] = {"level": root_level, "propagate": True}
loggers["notify_bridge_core"] = {"level": root_level, "propagate": True}
# User overrides win.
for mod, lvl in overrides.items():
loggers[mod] = {"level": lvl, "propagate": True}
config: dict[str, Any] = {
"version": 1,
"disable_existing_loggers": False,
"filters": {
"mask_secrets": {"()": f"{__name__}.SecretMaskingFilter"},
},
"formatters": formatters,
"handlers": {
"stderr": {
"class": "logging.StreamHandler",
"stream": sys.stderr,
"formatter": "default",
"filters": ["mask_secrets"],
},
},
"root": {
"level": root_level,
"handlers": ["stderr"],
},
"loggers": loggers,
}
logging.config.dictConfig(config)
def apply_log_levels(
*,
level: str | None,
per_module_levels: str | None,
) -> None:
"""Update existing logger levels in-place without rebuilding handlers.
Called when an admin changes the log settings at runtime. Setting
``level`` to None leaves the root untouched; setting it to a valid
level applies to ``notify_bridge_server`` / ``notify_bridge_core``.
``per_module_levels`` is treated as an exclusive set — loggers that
previously had an override but aren't in the new string are reset
*toward* the root level so a removed override actually takes effect.
"""
if level:
lvl = _normalize_level(level, "INFO")
logging.getLogger("notify_bridge_server").setLevel(lvl)
logging.getLogger("notify_bridge_core").setLevel(lvl)
# NOTSET on root is almost never what you want — keep root where it is
# unless the caller explicitly set something.
logging.getLogger().setLevel(lvl)
if per_module_levels is not None:
overrides = parse_level_overrides(per_module_levels)
# Apply new overrides
for mod, lvl in overrides.items():
logging.getLogger(mod).setLevel(lvl)
# Reset noisy libs that aren't in the new overrides back to defaults
for mod, default_lvl in _NOISY_LIBRARY_DEFAULTS.items():
if mod not in overrides:
logging.getLogger(mod).setLevel(default_lvl)
@@ -9,13 +9,18 @@ from slowapi import _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
# Ensure app-level loggers are visible
logging.basicConfig(level=logging.INFO)
from .config import settings as _log_cfg
_log_level = logging.DEBUG if _log_cfg.debug else logging.INFO
logging.getLogger("notify_bridge_server").setLevel(_log_level)
logging.getLogger("notify_bridge_core").setLevel(_log_level)
from .logging_setup import setup_logging
# Boot logging from env-based config. DB-backed AppSetting rows (``log_level`` /
# ``log_levels`` / ``log_format``) override this after migrations — see the
# lifespan block below.
setup_logging(
level="DEBUG" if _log_cfg.debug else _log_cfg.log_level,
fmt=_log_cfg.log_format,
per_module_levels=_log_cfg.log_levels,
)
_LOGGER = logging.getLogger(__name__)
from .database.engine import init_db
from .database.models import * # noqa: F401,F403 — ensure all models registered
@@ -66,6 +71,24 @@ async def lifespan(app: FastAPI):
await migrate_user_token_version(engine)
from .database.seeds import seed_all
await seed_all()
# Apply DB-backed logging settings (override env-based boot config).
# log_format still needs a restart — changing it means swapping the
# handler formatter entirely.
try:
from sqlmodel.ext.asyncio.session import AsyncSession as _AS_log
from .api.app_settings import get_setting as _get_log_setting
from .logging_setup import apply_log_levels
async with _AS_log(engine) as _log_session:
db_level = await _get_log_setting(_log_session, "log_level")
db_levels = await _get_log_setting(_log_session, "log_levels")
apply_log_levels(level=db_level or None, per_module_levels=db_levels)
_LOGGER.info(
"Logging initialized: level=%s overrides=%r format=%s",
db_level or _log_cfg.log_level, db_levels or _log_cfg.log_levels,
_log_cfg.log_format,
)
except Exception: # pragma: no cover — never let logging setup abort boot
_LOGGER.exception("Failed to apply DB-backed log settings; keeping env-based levels")
# Apply any pending restore staged via /api/backup/prepare-restore
from .services.pending_restore import apply_pending_restore_if_any
await apply_pending_restore_if_any()
@@ -11,11 +11,13 @@ CommandTrackerListeners with enabled CommandTrackers.
from __future__ import annotations
import logging
import time
from typing import Any
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from notify_bridge_core.log_context import bind_log_context
from notify_bridge_core.notifications.telegram.client import TelegramClient
from ..database.engine import get_engine
@@ -289,29 +291,64 @@ async def _poll_bot(bot_id: int) -> None:
# Dispatch commands (only if chat has commands enabled)
if text and text.startswith("/"):
try:
async with AsyncSession(engine) as cmd_session:
chat_row = (await cmd_session.exec(
select(TelegramChat).where(
TelegramChat.bot_id == bot_obj.id,
TelegramChat.chat_id == chat_id,
from ..commands.parser import parse_command
cmd_name, _, _ = parse_command(text)
update_id = update.get("update_id")
message_id = message.get("message_id")
request_id = f"tg:{update_id}" if update_id is not None else f"tg:msg{message_id}"
with bind_log_context(
request_id=request_id,
command=cmd_name or "-",
chat_id=chat_id,
bot_id=bot_obj.id,
):
started = time.monotonic()
try:
async with AsyncSession(engine) as cmd_session:
chat_row = (await cmd_session.exec(
select(TelegramChat).where(
TelegramChat.bot_id == bot_obj.id,
TelegramChat.chat_id == chat_id,
)
)).first()
if not chat_row or not chat_row.commands_enabled:
_LOGGER.info(
"Command ignored — commands disabled (poll) for bot=%s chat=%s",
bot_obj.id, chat_id,
)
continue
effective_lang = chat_row.language_override or msg_language
_LOGGER.info("Command received (poll): /%s args=%r lang=%s", cmd_name, text[:200], effective_lang)
async with telegram_chat_action(
bot_token, chat_id, classify_command_chat_action(text),
):
responses = await handle_command(bot_obj, chat_id, text, language_code=effective_lang)
if not responses:
_LOGGER.info(
"Command produced no response (cmd=%r, poll) after %.0f ms",
cmd_name, (time.monotonic() - started) * 1000,
)
continue
text_count = sum(1 for r in responses if r.text)
media_count = sum(len(r.media or []) for r in responses)
_LOGGER.info(
"Command dispatching %d response(s): text=%d media_items=%d",
len(responses), text_count, media_count,
)
)).first()
if not chat_row or not chat_row.commands_enabled:
continue
effective_lang = chat_row.language_override or msg_language
message_id = message.get("message_id")
async with telegram_chat_action(
bot_token, chat_id, classify_command_chat_action(text),
):
responses = await handle_command(bot_obj, chat_id, text, language_code=effective_lang)
if responses:
for resp in responses:
if resp.text:
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
if resp.media:
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
except Exception:
_LOGGER.error("Error handling command from bot %d", bot_id, exc_info=True)
_LOGGER.info(
"Command /%s completed in %.0f ms (responses=%d media=%d)",
cmd_name, (time.monotonic() - started) * 1000,
len(responses), media_count,
)
except Exception:
_LOGGER.exception(
"Error handling command /%s from bot %d after %.0f ms",
cmd_name, bot_id, (time.monotonic() - started) * 1000,
)