"""Telegram long-polling service for bots in polling mode. Uses APScheduler to run getUpdates periodically for each bot with update_mode == "polling". Processes updates identically to the webhook handler (auto-save chat, dispatch commands). Ref-counted: only starts/stops polling for bots that have active CommandTrackerListeners with enabled CommandTrackers. """ from __future__ import annotations import logging from typing import Any import aiohttp from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from notify_bridge_core.notifications.telegram.client import TelegramClient from ..database.engine import get_engine from ..database.models import CommandTracker, CommandTrackerListener, TelegramBot from ..services.telegram import save_chat_from_webhook from .scheduler import get_scheduler _LOGGER = logging.getLogger(__name__) # Track last update_id per bot to use as offset _last_update_id: dict[int, int] = {} async def _get_bot_ids_with_active_listeners() -> set[int]: """Return bot IDs that have at least one active command tracker listener. A bot is "active" if there is a CommandTrackerListener with listener_type="telegram_bot" pointing to it, AND the associated CommandTracker is enabled. """ engine = get_engine() async with AsyncSession(engine) as session: result = await session.exec( select(CommandTrackerListener).where( CommandTrackerListener.listener_type == "telegram_bot" ) ) listeners = result.all() active_bot_ids: set[int] = set() for listener in listeners: tracker = await session.get(CommandTracker, listener.command_tracker_id) if tracker and tracker.enabled: active_bot_ids.add(listener.listener_id) return active_bot_ids async def start_command_listener_polling() -> None: """Schedule polling jobs only for bots with active command tracker listeners.""" active_bot_ids = await _get_bot_ids_with_active_listeners() if not active_bot_ids: _LOGGER.info("No bots with active command listeners to poll") return engine = get_engine() async with AsyncSession(engine) as session: result = await session.exec( select(TelegramBot).where( TelegramBot.update_mode == "polling", TelegramBot.id.in_(active_bot_ids), ) ) bots = result.all() for bot in bots: schedule_bot_polling(bot.id) _LOGGER.info("Started command listener polling for %d bot(s)", len(bots)) async def start_bot_polling() -> None: """Schedule polling jobs for all bots with update_mode == 'polling'. Deprecated: prefer start_command_listener_polling() which only starts bots with active command tracker listeners. """ await start_command_listener_polling() async def start_bot_if_needed(bot_id: int) -> None: """Start polling for a bot if it has active listeners and is not already running.""" engine = get_engine() async with AsyncSession(engine) as session: bot = await session.get(TelegramBot, bot_id) if not bot or bot.update_mode != "polling": return active_bot_ids = await _get_bot_ids_with_active_listeners() if bot_id in active_bot_ids: schedule_bot_polling(bot_id) async def stop_bot_if_unused(bot_id: int) -> None: """Stop polling for a bot if it has no enabled command tracker listeners.""" active_bot_ids = await _get_bot_ids_with_active_listeners() if bot_id not in active_bot_ids: unschedule_bot_polling(bot_id) def schedule_bot_polling(bot_id: int) -> None: """Add a polling job for a bot (idempotent).""" scheduler = get_scheduler() job_id = f"telegram_poll_{bot_id}" if scheduler.get_job(job_id): return scheduler.add_job( _poll_bot, "interval", seconds=3, id=job_id, args=[bot_id], replace_existing=True, max_instances=1, ) _LOGGER.info("Started polling for bot %d", bot_id) def unschedule_bot_polling(bot_id: int) -> None: """Remove polling job for a bot.""" scheduler = get_scheduler() job_id = f"telegram_poll_{bot_id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) _LOGGER.info("Stopped polling for bot %d", bot_id) async def _poll_bot(bot_id: int) -> None: """Fetch updates from Telegram and process them.""" engine = get_engine() # Eagerly load bot data and close session before aiohttp work # (cannot nest aiohttp inside active SQLAlchemy async session) async with AsyncSession(engine) as session: bot = await session.get(TelegramBot, bot_id) if not bot or bot.update_mode != "polling": unschedule_bot_polling(bot_id) return # Extract what we need before closing session bot_token = bot.token bot_obj = bot offset = _last_update_id.get(bot_id, 0) try: async with aiohttp.ClientSession() as http: client = TelegramClient(http, bot_token) result = await client.get_updates( offset=offset + 1 if offset else None, limit=50, ) if not result.get("success"): return updates = result.get("result", []) except Exception as e: _LOGGER.debug("Polling error for bot %d: %s", bot_id, e) return if not updates: return # Update offset to latest _last_update_id[bot_id] = updates[-1]["update_id"] # Process each update from ..commands.handler import handle_command, send_media_group, send_reply for update in updates: message = update.get("message") if not message: continue chat_info = message.get("chat", {}) chat_id = str(chat_info.get("id", "")) text = message.get("text", "") from_user = message.get("from", {}) msg_language = from_user.get("language_code", "") if not chat_id: continue # Auto-persist chat (fresh session per save) try: async with AsyncSession(engine) as save_session: await save_chat_from_webhook(save_session, bot_obj.id, chat_info, language_code=msg_language) await save_session.commit() except Exception: _LOGGER.debug("Failed to auto-save chat %s", chat_id, exc_info=True) # Dispatch commands if text and text.startswith("/"): try: message_id = message.get("message_id") cmd_response = await handle_command(bot_obj, chat_id, text, language_code=msg_language) if cmd_response is not None: if isinstance(cmd_response, list): await send_media_group(bot_token, chat_id, cmd_response, reply_to_message_id=message_id) else: await send_reply(bot_token, chat_id, cmd_response, reply_to_message_id=message_id) except Exception: _LOGGER.error("Error handling command from bot %d", bot_id, exc_info=True)