"""Telegram bot management API routes.""" from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession import aiohttp from immich_watcher_core.telegram.media import TELEGRAM_API_BASE_URL from ..ai.commands import register_commands_with_telegram from ..auth.dependencies import get_current_user from ..database.engine import get_session from ..database.models import TelegramBot, TelegramChat, User router = APIRouter(prefix="/api/telegram-bots", tags=["telegram-bots"]) class BotCreate(BaseModel): name: str token: str class BotUpdate(BaseModel): name: str | None = None commands_config: dict | None = None @router.get("") async def list_bots( user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """List all registered Telegram bots.""" result = await session.exec( select(TelegramBot).where(TelegramBot.user_id == user.id) ) return [_bot_response(b) for b in result.all()] @router.post("", status_code=status.HTTP_201_CREATED) async def create_bot( body: BotCreate, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Register a new Telegram bot (validates token via getMe).""" bot_info = await _get_me(body.token) if not bot_info: raise HTTPException(status_code=400, detail="Invalid bot token") bot = TelegramBot( user_id=user.id, name=body.name, token=body.token, bot_username=bot_info.get("username", ""), bot_id=bot_info.get("id", 0), ) session.add(bot) await session.commit() await session.refresh(bot) return _bot_response(bot) @router.put("/{bot_id}") async def update_bot( bot_id: int, body: BotUpdate, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Update a bot's display name and/or commands config.""" bot = await _get_user_bot(session, bot_id, user.id) if body.name is not None: bot.name = body.name if body.commands_config is not None: bot.commands_config = body.commands_config session.add(bot) await session.commit() await session.refresh(bot) return _bot_response(bot) @router.delete("/{bot_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_bot( bot_id: int, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Delete a registered bot and its chats.""" bot = await _get_user_bot(session, bot_id, user.id) # Delete associated chats result = await session.exec(select(TelegramChat).where(TelegramChat.bot_id == bot_id)) for chat in result.all(): await session.delete(chat) await session.delete(bot) await session.commit() @router.get("/{bot_id}/token") async def get_bot_token( bot_id: int, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Get the full bot token (used by frontend to construct target config).""" bot = await _get_user_bot(session, bot_id, user.id) return {"token": bot.token} # --- Chat management --- @router.get("/{bot_id}/chats") async def list_bot_chats( bot_id: int, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """List persisted chats for a bot.""" await _get_user_bot(session, bot_id, user.id) # Auth check result = await session.exec( select(TelegramChat).where(TelegramChat.bot_id == bot_id) ) return [_chat_response(c) for c in result.all()] @router.post("/{bot_id}/chats/discover") async def discover_chats( bot_id: int, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Discover new chats via Telegram getUpdates and persist them. Merges newly discovered chats with existing ones (no duplicates). Returns the full updated chat list. """ bot = await _get_user_bot(session, bot_id, user.id) discovered = await _fetch_chats_from_telegram(bot.token) # Load existing chats to avoid duplicates result = await session.exec( select(TelegramChat).where(TelegramChat.bot_id == bot_id) ) existing = {c.chat_id: c for c in result.all()} new_count = 0 for chat_data in discovered: cid = str(chat_data["id"]) if cid in existing: # Update title/username if changed existing_chat = existing[cid] existing_chat.title = chat_data.get("title", existing_chat.title) existing_chat.username = chat_data.get("username", existing_chat.username) session.add(existing_chat) else: new_chat = TelegramChat( bot_id=bot_id, chat_id=cid, title=chat_data.get("title", ""), chat_type=chat_data.get("type", "private"), username=chat_data.get("username", ""), ) session.add(new_chat) new_count += 1 await session.commit() # Return full list result = await session.exec( select(TelegramChat).where(TelegramChat.bot_id == bot_id) ) return [_chat_response(c) for c in result.all()] @router.delete("/{bot_id}/chats/{chat_db_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_chat( bot_id: int, chat_db_id: int, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Delete a persisted chat entry.""" await _get_user_bot(session, bot_id, user.id) # Auth check chat = await session.get(TelegramChat, chat_db_id) if not chat or chat.bot_id != bot_id: raise HTTPException(status_code=404, detail="Chat not found") await session.delete(chat) await session.commit() # --- Commands --- @router.post("/{bot_id}/sync-commands") async def sync_commands( bot_id: int, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Register bot commands with Telegram BotFather API.""" bot = await _get_user_bot(session, bot_id, user.id) success = await register_commands_with_telegram(bot) if not success: raise HTTPException(status_code=500, detail="Failed to register commands with Telegram") return {"success": True} # --- Helpers --- async def _get_me(token: str) -> dict | None: """Call Telegram getMe to validate token and get bot info.""" try: async with aiohttp.ClientSession() as http: async with http.get(f"{TELEGRAM_API_BASE_URL}{token}/getMe") as resp: data = await resp.json() if data.get("ok"): return data.get("result", {}) except aiohttp.ClientError: pass return None async def _fetch_chats_from_telegram(token: str) -> list[dict]: """Fetch chats from Telegram getUpdates API.""" seen: dict[int, dict] = {} try: async with aiohttp.ClientSession() as http: async with http.get( f"{TELEGRAM_API_BASE_URL}{token}/getUpdates", params={"limit": 100, "allowed_updates": '["message"]'}, ) as resp: data = await resp.json() if not data.get("ok"): return [] for update in data.get("result", []): msg = update.get("message", {}) chat = msg.get("chat", {}) chat_id = chat.get("id") if chat_id and chat_id not in seen: seen[chat_id] = { "id": chat_id, "title": chat.get("title") or (chat.get("first_name", "") + (" " + chat.get("last_name", "")).strip()), "type": chat.get("type", "private"), "username": chat.get("username", ""), } except aiohttp.ClientError: pass return list(seen.values()) def _chat_response(c: TelegramChat) -> dict: return { "id": c.id, "chat_id": c.chat_id, "title": c.title, "type": c.chat_type, "username": c.username, "discovered_at": c.discovered_at.isoformat(), } def _bot_response(b: TelegramBot) -> dict: return { "id": b.id, "name": b.name, "bot_username": b.bot_username, "bot_id": b.bot_id, "token_preview": f"{b.token[:8]}...{b.token[-4:]}" if len(b.token) > 12 else "***", "commands_config": b.commands_config, "created_at": b.created_at.isoformat(), } async def _get_user_bot(session: AsyncSession, bot_id: int, user_id: int) -> TelegramBot: bot = await session.get(TelegramBot, bot_id) if not bot or bot.user_id != user_id: raise HTTPException(status_code=404, detail="Bot not found") return bot async def save_chat_from_webhook( session: AsyncSession, bot_id: int, chat_data: dict ) -> None: """Save or update a chat entry from an incoming webhook message. Called by the webhook handler to auto-persist chats. """ chat_id = str(chat_data.get("id", "")) if not chat_id: return result = await session.exec( select(TelegramChat).where( TelegramChat.bot_id == bot_id, TelegramChat.chat_id == chat_id, ) ) existing = result.first() title = chat_data.get("title") or ( chat_data.get("first_name", "") + (" " + chat_data.get("last_name", "")).strip() ) if existing: existing.title = title existing.username = chat_data.get("username", existing.username) session.add(existing) else: session.add(TelegramChat( bot_id=bot_id, chat_id=chat_id, title=title, chat_type=chat_data.get("type", "private"), username=chat_data.get("username", ""), ))