Files
notify-bridge/packages/server/src/notify_bridge_server/services/telegram_poller.py
T
alexei.dolgolyov 920920bc67
Build and Test / test-frontend (push) Successful in 9m37s
Build and Test / test-backend (push) Successful in 10m53s
Build and Test / build-image (push) Failing after 14m52s
feat: production-readiness hardening across security, async, DB, ops
Security
- SSRF: async DNS resolver; allow_redirects=False on all outbound clients;
  matrix homeserver_url validated on create/update/test; update_provider
  and email_bot merge incoming config and reject ***-masked secrets.
- Auth: bcrypt offloaded to asyncio.to_thread; JWT now carries iss/aud +
  leeway and rejects missing claims; setup TOCTOU closed inside a
  transaction; rate limits extended (default 600/min, 10/min on password
  change, 30/min on needs-setup); constant-time login to prevent username
  enumeration.
- Config: rejects known dev secret keys; validates CORS origin schemes,
  port range, token lifetimes.
- Webhook handlers stream-read body with a 1 MiB cap; Discord 429 retries
  bounded (3 attempts, Retry-After capped at 60 s).
- CSP + HSTS added to SecurityHeadersMiddleware.

Async / runtime
- SQLite engine: WAL, synchronous=NORMAL, foreign_keys=ON, busy_timeout,
  pool_pre_ping, dispose on shutdown.
- Lifespan shutdown now stops scheduler before closing HTTP session and
  disposing the engine.
- Shared aiohttp session locked against concurrent first-caller races;
  core NotificationDispatcher accepts and reuses it.
- Storage and scheduled backup writes wrapped in asyncio.to_thread.
- NUT client writes bounded by asyncio.wait_for.
- Telegram poller switched from 3 s short-poll to 30 s interval + 25 s
  long-poll (~10x fewer API calls).

Database
- New performance-indexes migration covers every FK/owner column and
  hot-path composite (notification_tracker(provider_id, enabled);
  event_log(user_id, created_at DESC); webhook_payload_log(provider_id,
  created_at DESC); action_execution(action_id, started_at DESC)).
- New schema_version table for future upgrade gating.
- __system__ placeholder user (id=0) seeded so user_id=0 system defaults
  satisfy the newly enforced FK; filtered out of /auth/needs-setup,
  /api/users, and setup.
- list_notification_trackers rewritten to batched loads (was 1+N+N*M).
- Retention job extended to event_log, webhook_payload_log, and
  action_execution; retention days exposed as a setting.

Scheduler
- AsyncIOScheduler job_defaults: coalesce, misfire_grace_time=300,
  max_instances=1.

Ops
- uvicorn runs with proxy_headers, forwarded_allow_ips,
  timeout_graceful_shutdown; access log suppressed in non-debug.
- FastAPI version string now reads from importlib.metadata.
- New /api/ready endpoint separate from /api/health.
- docker-compose drops the ALLOW_PRIVATE_URLS=1 default, adds mem/cpu/pid
  limits, read_only + tmpfs, cap_drop:ALL, no-new-privileges; healthcheck
  targets /api/ready.
- CI now runs on push/PR with backend pytest, frontend svelte-check +
  build, and a non-push image build; release workflow gated on tests,
  publishes immutable sha-<commit> image tag, adds Trivy scan.

Tests
- New packages/server/tests/ with 29 passing tests: config validation,
  JWT round-trip + aud/alg=none rejection, SSRF scheme and private-range
  enforcement (sync + async), Discord bounded retry, and a lifespan-level
  /api/health + /api/ready smoke check.
- Renamed the misnamed services/test_dispatch.py to manual_dispatch.py so
  pytest never auto-collects production code.

Frontend
- /login now redirects already-authenticated users to /, shows a distinct
  'backend unreachable' banner (en/ru) when /auth/needs-setup fails.
2026-04-23 19:44:56 +03:00

364 lines
14 KiB
Python

"""Telegram long-polling service for bots in polling mode.
Uses APScheduler to run getUpdates periodically for each bot
with update_mode == "polling". Processes updates identically
to the webhook handler (auto-save chat, dispatch commands).
Ref-counted: only starts/stops polling for bots that have active
CommandTrackerListeners with enabled CommandTrackers.
"""
from __future__ import annotations
import logging
import time
from typing import Any
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from notify_bridge_core.log_context import bind_log_context
from notify_bridge_core.notifications.telegram.client import TelegramClient
from ..database.engine import get_engine
from ..database.models import CommandTracker, CommandTrackerListener, TelegramBot, TelegramChat
from ..services.telegram import save_chat_from_webhook
from .scheduler import get_scheduler
_LOGGER = logging.getLogger(__name__)
# Track last update_id per bot to use as offset
_last_update_id: dict[int, int] = {}
# Throttle auto-reclaim attempts so we don't hammer deleteWebhook when a
# stubborn external instance keeps re-setting the webhook. (bot_id → unix ts)
_last_webhook_reclaim_at: dict[int, float] = {}
_WEBHOOK_RECLAIM_COOLDOWN_SECONDS = 60.0
# Phrase Telegram uses in the 409 response description for the
# "webhook is active" conflict. Matched case-insensitively so we don't
# depend on exact wording.
_WEBHOOK_CONFLICT_PHRASE = "webhook is active"
async def _get_bot_ids_with_active_listeners() -> set[int]:
"""Return bot IDs that have at least one active command tracker listener.
A bot is "active" if there is a CommandTrackerListener with
listener_type="telegram_bot" pointing to it, AND the associated
CommandTracker is enabled.
"""
engine = get_engine()
async with AsyncSession(engine) as session:
result = await session.exec(
select(CommandTrackerListener).where(
CommandTrackerListener.listener_type == "telegram_bot"
)
)
listeners = result.all()
active_bot_ids: set[int] = set()
tracker_ids = list({l.command_tracker_id for l in listeners})
if tracker_ids:
tracker_result = await session.exec(
select(CommandTracker).where(
CommandTracker.id.in_(tracker_ids),
CommandTracker.enabled == True, # noqa: E712
)
)
enabled_tracker_ids = {t.id for t in tracker_result.all()}
for listener in listeners:
if listener.command_tracker_id in enabled_tracker_ids:
active_bot_ids.add(listener.listener_id)
return active_bot_ids
async def start_command_listener_polling() -> None:
"""Schedule polling jobs only for bots with active command tracker listeners."""
active_bot_ids = await _get_bot_ids_with_active_listeners()
if not active_bot_ids:
_LOGGER.info("No bots with active command listeners to poll")
return
engine = get_engine()
async with AsyncSession(engine) as session:
result = await session.exec(
select(TelegramBot).where(
TelegramBot.update_mode == "polling",
TelegramBot.id.in_(active_bot_ids),
)
)
bots = result.all()
for bot in bots:
schedule_bot_polling(bot.id)
_LOGGER.info("Started command listener polling for %d bot(s)", len(bots))
async def start_bot_polling() -> None:
"""Schedule polling jobs for all bots with update_mode == 'polling'.
Deprecated: prefer start_command_listener_polling() which only starts
bots with active command tracker listeners.
"""
await start_command_listener_polling()
async def start_bot_if_needed(bot_id: int) -> None:
"""Start polling for a bot if it has active listeners and is not already running."""
engine = get_engine()
async with AsyncSession(engine) as session:
bot = await session.get(TelegramBot, bot_id)
if not bot or bot.update_mode != "polling":
return
active_bot_ids = await _get_bot_ids_with_active_listeners()
if bot_id in active_bot_ids:
schedule_bot_polling(bot_id)
async def stop_bot_if_unused(bot_id: int) -> None:
"""Stop polling for a bot if it has no enabled command tracker listeners."""
active_bot_ids = await _get_bot_ids_with_active_listeners()
if bot_id not in active_bot_ids:
unschedule_bot_polling(bot_id)
def schedule_bot_polling(bot_id: int) -> None:
"""Add a polling job for a bot (idempotent).
We schedule at a 30 s interval, but each tick calls ``getUpdates`` with
``timeout=25`` — Telegram holds the connection open until either an
update arrives or the timeout elapses, so in practice the bot streams
updates with sub-second latency while consuming ~2 API calls / minute
per bot (down from 20 under the old 3 s short-poll).
"""
scheduler = get_scheduler()
job_id = f"telegram_poll_{bot_id}"
if scheduler.get_job(job_id):
return
scheduler.add_job(
_poll_bot,
"interval",
seconds=30,
id=job_id,
args=[bot_id],
replace_existing=True,
max_instances=1,
)
_LOGGER.info("Started polling for bot %d (long-poll, 25s timeout)", bot_id)
def unschedule_bot_polling(bot_id: int) -> None:
"""Remove polling job for a bot."""
scheduler = get_scheduler()
job_id = f"telegram_poll_{bot_id}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
_LOGGER.info("Stopped polling for bot %d", bot_id)
async def _handle_webhook_conflict(bot_id: int, bot_token: str, description: str) -> None:
"""Reclaim a bot stuck behind an active webhook set by another instance.
Telegram's ``getUpdates`` returns 409 ``Conflict: can't use getUpdates
method while webhook is active`` whenever a webhook is currently
registered for the bot. Since this bot row has ``update_mode="polling"``
in our DB (that's the only reason we're polling it), the user's intent
is polling, so we drop the webhook and resume. Throttled to once per
minute per bot so a rival instance constantly re-registering the
webhook doesn't trigger a reclaim storm.
"""
import time
now = time.time()
last = _last_webhook_reclaim_at.get(bot_id, 0.0)
if now - last < _WEBHOOK_RECLAIM_COOLDOWN_SECONDS:
# Already logged recently; stay quiet until cooldown expires so the
# user gets one clear warning line per minute, not one every 3s.
return
_last_webhook_reclaim_at[bot_id] = now
from .http_session import get_http_session
http = await get_http_session()
client = TelegramClient(http, bot_token)
# Surface which URL stole the bot so the user can tell where it came from.
conflicting_url = ""
try:
info = await client.get_webhook_info()
if info.get("success"):
conflicting_url = info.get("result", {}).get("url", "") or ""
except Exception as err: # noqa: BLE001
_LOGGER.debug("getWebhookInfo during conflict recovery failed: %s", err)
_LOGGER.warning(
"Bot %d: webhook is active (url=%r) but this instance is in polling "
"mode — calling deleteWebhook to reclaim. Telegram said: %s",
bot_id, conflicting_url, description,
)
try:
del_result = await client.delete_webhook()
if del_result.get("success"):
_LOGGER.warning(
"Bot %d: webhook cleared; polling will resume on next tick",
bot_id,
)
# Reset offset so we don't skip updates that accumulated during the
# conflict window (Telegram held them until a client acknowledged).
_last_update_id.pop(bot_id, None)
else:
_LOGGER.error(
"Bot %d: deleteWebhook failed: %s",
bot_id, del_result.get("error"),
)
except Exception as err: # noqa: BLE001
_LOGGER.error("Bot %d: deleteWebhook raised: %s", bot_id, err)
async def _poll_bot(bot_id: int) -> None:
"""Fetch updates from Telegram and process them."""
engine = get_engine()
# Eagerly load bot data and close session before aiohttp work
# (cannot nest aiohttp inside active SQLAlchemy async session)
async with AsyncSession(engine) as session:
bot = await session.get(TelegramBot, bot_id)
if not bot or bot.update_mode != "polling":
unschedule_bot_polling(bot_id)
return
# Copy attributes before session closes to avoid detached-instance errors
from types import SimpleNamespace
bot_token = bot.token
bot_obj = SimpleNamespace(id=bot.id, name=bot.name, token=bot.token)
offset = _last_update_id.get(bot_id, 0)
try:
from .http_session import get_http_session
http = await get_http_session()
client = TelegramClient(http, bot_token)
# Long-poll: hold connection open until an update arrives or 25 s
# elapse. Drastically cuts API calls vs. 3 s short-poll.
result = await client.get_updates(
offset=offset + 1 if offset else None, limit=50, timeout=25,
)
if not result.get("success"):
err_text = str(result.get("error") or "")
# Detect the webhook-is-active conflict: another instance (or a
# stale registration) owns this bot's delivery, so getUpdates
# returns 409 and we get zero updates forever. Reclaim it —
# but only for bots the user explicitly set to polling mode.
if _WEBHOOK_CONFLICT_PHRASE in err_text.lower():
await _handle_webhook_conflict(bot_id, bot_token, err_text)
else:
_LOGGER.debug("Polling error for bot %d: %s", bot_id, err_text)
return
updates = result.get("result", [])
except Exception as e:
_LOGGER.debug("Polling error for bot %d: %s", bot_id, e)
return
if not updates:
return
# Update offset to latest
_last_update_id[bot_id] = updates[-1]["update_id"]
# Process each update
from ..commands.handler import (
classify_command_chat_action,
handle_command,
send_media_group,
send_reply,
)
from .telegram_send import telegram_chat_action
for update in updates:
message = update.get("message")
if not message:
continue
chat_info = message.get("chat", {})
chat_id = str(chat_info.get("id", ""))
text = message.get("text", "")
from_user = message.get("from", {})
msg_language = from_user.get("language_code", "")
if not chat_id:
continue
# Auto-persist chat (fresh session per save)
try:
async with AsyncSession(engine) as save_session:
await save_chat_from_webhook(save_session, bot_obj.id, chat_info, language_code=msg_language)
await save_session.commit()
except Exception:
_LOGGER.debug("Failed to auto-save chat %s", chat_id, exc_info=True)
# Dispatch commands (only if chat has commands enabled)
if text and text.startswith("/"):
from ..commands.parser import parse_command
cmd_name, _, _ = parse_command(text)
update_id = update.get("update_id")
message_id = message.get("message_id")
request_id = f"tg:{update_id}" if update_id is not None else f"tg:msg{message_id}"
with bind_log_context(
request_id=request_id,
command=cmd_name or "-",
chat_id=chat_id,
bot_id=bot_obj.id,
):
started = time.monotonic()
try:
async with AsyncSession(engine) as cmd_session:
chat_row = (await cmd_session.exec(
select(TelegramChat).where(
TelegramChat.bot_id == bot_obj.id,
TelegramChat.chat_id == chat_id,
)
)).first()
if not chat_row or not chat_row.commands_enabled:
_LOGGER.info(
"Command ignored — commands disabled (poll) for bot=%s chat=%s",
bot_obj.id, chat_id,
)
continue
effective_lang = chat_row.language_override or msg_language
_LOGGER.info("Command received (poll): /%s args=%r lang=%s", cmd_name, text[:200], effective_lang)
async with telegram_chat_action(
bot_token, chat_id, classify_command_chat_action(text),
):
responses = await handle_command(bot_obj, chat_id, text, language_code=effective_lang)
if not responses:
_LOGGER.info(
"Command produced no response (cmd=%r, poll) after %.0f ms",
cmd_name, (time.monotonic() - started) * 1000,
)
continue
text_count = sum(1 for r in responses if r.text)
media_count = sum(len(r.media or []) for r in responses)
_LOGGER.info(
"Command dispatching %d response(s): text=%d media_items=%d",
len(responses), text_count, media_count,
)
for resp in responses:
if resp.text:
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
if resp.media:
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
_LOGGER.info(
"Command /%s completed in %.0f ms (responses=%d media=%d)",
cmd_name, (time.monotonic() - started) * 1000,
len(responses), media_count,
)
except Exception:
_LOGGER.exception(
"Error handling command /%s from bot %d after %.0f ms",
cmd_name, bot_id, (time.monotonic() - started) * 1000,
)