"""Telegram webhook handler for bot commands.""" from __future__ import annotations import hmac import logging from typing import Any from fastapi import APIRouter, Depends, Header, HTTPException, Request from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from notify_bridge_core.notifications.telegram.client import TelegramClient from ..database.engine import get_session from ..database.models import TelegramBot, TelegramChat from ..services.telegram import save_chat_from_webhook from ..services.telegram_send import telegram_chat_action from .base import CommandResponse from .handler import classify_command_chat_action, handle_command, send_media_group, send_reply _LOGGER = logging.getLogger(__name__) router = APIRouter(prefix="/api/telegram", tags=["telegram-webhook"]) # Webhook secret — set via NOTIFY_BRIDGE_TELEGRAM_WEBHOOK_SECRET env var _webhook_secret: str | None = None def set_webhook_secret(secret: str | None) -> None: global _webhook_secret _webhook_secret = secret @router.post("/webhook/{webhook_id}") async def telegram_webhook( webhook_id: str, request: Request, x_telegram_bot_api_secret_token: str | None = Header(default=None), session: AsyncSession = Depends(get_session), ): """Handle incoming Telegram messages — route commands to handlers.""" # Validate webhook secret if configured if _webhook_secret: if not hmac.compare_digest(x_telegram_bot_api_secret_token or "", _webhook_secret): raise HTTPException(status_code=403, detail="Invalid webhook secret") # Find bot by opaque webhook path ID (not by token — token must not appear in URLs) bot_result = await session.exec( select(TelegramBot).where(TelegramBot.webhook_path_id == webhook_id) ) bot = bot_result.first() if not bot: raise HTTPException(status_code=403, detail="Unknown webhook") try: update = await request.json() except Exception: return {"ok": True, "error": "invalid_json"} message = update.get("message") if not message: return {"ok": True, "skipped": "no_message"} chat_info = message.get("chat", {}) chat_id = str(chat_info.get("id", "")) text = message.get("text", "") if not chat_id or not text: return {"ok": True, "skipped": "empty"} # Auto-persist chat from incoming message from_user = message.get("from", {}) msg_language = from_user.get("language_code", "") # Snapshot bot identity before commit — AsyncSession expires instances # on commit, and implicit lazy-load of `bot.id` / `bot.token` later would # raise sqlalchemy.exc.MissingGreenlet. bot_id = bot.id bot_token = bot.token try: await save_chat_from_webhook(session, bot_id, chat_info, language_code=msg_language) await session.commit() await session.refresh(bot) except Exception: _LOGGER.warning("Failed to auto-save chat %s", chat_id, exc_info=True) # Handle commands (only if chat has commands enabled) if text.startswith("/"): chat_row = (await session.exec( select(TelegramChat).where( TelegramChat.bot_id == bot_id, TelegramChat.chat_id == chat_id, ) )).first() if not chat_row or not chat_row.commands_enabled: return {"ok": True, "skipped": "commands_disabled"} effective_lang = chat_row.language_override or msg_language message_id = message.get("message_id") async with telegram_chat_action( bot_token, chat_id, classify_command_chat_action(text), ): responses = await handle_command(bot, chat_id, text, language_code=effective_lang) if responses: for resp in responses: if resp.text: await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id) if resp.media: await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id) return {"ok": True} return {"ok": True, "skipped": "not_a_command"} async def register_webhook(bot_token: str, webhook_url: str, secret: str | None = None) -> dict: """Register webhook URL with Telegram Bot API via TelegramClient.""" from ..services.http_session import get_http_session http = await get_http_session() client = TelegramClient(http, bot_token) return await client.set_webhook(webhook_url, secret=secret) async def unregister_webhook(bot_token: str) -> dict: """Remove webhook from Telegram Bot API via TelegramClient.""" from ..services.http_session import get_http_session http = await get_http_session() client = TelegramClient(http, bot_token) return await client.delete_webhook()