"""APScheduler-based polling scheduler for trackers and actions.""" from __future__ import annotations import logging from apscheduler.schedulers.asyncio import AsyncIOScheduler _LOGGER = logging.getLogger(__name__) _scheduler: AsyncIOScheduler | None = None def get_scheduler() -> AsyncIOScheduler: global _scheduler if _scheduler is None: _scheduler = AsyncIOScheduler() return _scheduler async def start_scheduler() -> None: scheduler = get_scheduler() if not scheduler.running: scheduler.start() _LOGGER.info("Scheduler started") await _load_tracker_jobs() await _load_action_jobs() # Start Telegram bot polling for bots with active command listeners from .telegram_poller import start_command_listener_polling await start_command_listener_polling() # Start debounced command auto-sync scheduler from .command_sync import start_sync_scheduler start_sync_scheduler() async def _load_tracker_jobs() -> None: """Load enabled trackers and schedule polling jobs.""" from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from ..database.engine import get_engine from ..database.models import NotificationTracker, ServiceProvider as ServiceProviderModel engine = get_engine() scheduler = get_scheduler() async with AsyncSession(engine) as session: result = await session.exec(select(NotificationTracker).where(NotificationTracker.enabled == True)) trackers = result.all() # Pre-load provider types for scheduler detection provider_types: dict[int, str] = {} for tracker in trackers: if tracker.provider_id not in provider_types: provider = await session.get(ServiceProviderModel, tracker.provider_id) if provider: provider_types[tracker.provider_id] = provider.type for tracker in trackers: job_id = f"tracker_{tracker.id}" if scheduler.get_job(job_id): continue ptype = provider_types.get(tracker.provider_id, "") filters = tracker.filters or {} # Scheduler providers can use cron triggers if ptype == "scheduler" and filters.get("schedule_type") == "cron": cron_expr = filters.get("cron_expression", "") if cron_expr: try: _add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name) continue except Exception as e: _LOGGER.error( "Invalid cron expression for tracker %d (%s): %s — falling back to interval", tracker.id, tracker.name, e, ) scheduler.add_job( _poll_tracker, "interval", seconds=tracker.scan_interval, id=job_id, args=[tracker.id], replace_existing=True, ) _LOGGER.info("Scheduled tracker %d (%s) every %ds", tracker.id, tracker.name, tracker.scan_interval) def _add_cron_job( scheduler: AsyncIOScheduler, job_id: str, tracker_id: int, cron_expression: str, tracker_name: str, ) -> None: """Add a cron-triggered job for a scheduler-type tracker.""" from apscheduler.triggers.cron import CronTrigger trigger = CronTrigger.from_crontab(cron_expression) scheduler.add_job( _poll_tracker, trigger, id=job_id, args=[tracker_id], replace_existing=True, ) _LOGGER.info("Scheduled tracker %d (%s) with cron: %s", tracker_id, tracker_name, cron_expression) async def schedule_tracker( tracker_id: int, interval: int, cron_expression: str | None = None, ) -> None: """Add or update a scheduler job for a tracker.""" scheduler = get_scheduler() job_id = f"tracker_{tracker_id}" # Remove existing job first to allow trigger type changes if scheduler.get_job(job_id): scheduler.remove_job(job_id) if cron_expression: try: _add_cron_job(scheduler, job_id, tracker_id, cron_expression, f"tracker-{tracker_id}") return except Exception as e: _LOGGER.error("Invalid cron for tracker %d: %s — using interval", tracker_id, e) scheduler.add_job( _poll_tracker, "interval", seconds=interval, id=job_id, args=[tracker_id], replace_existing=True, ) _LOGGER.info("Scheduled tracker %d every %ds", tracker_id, interval) async def unschedule_tracker(tracker_id: int) -> None: """Remove a scheduler job for a tracker.""" scheduler = get_scheduler() job_id = f"tracker_{tracker_id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) _LOGGER.info("Unscheduled tracker %d", tracker_id) async def _poll_tracker(tracker_id: int) -> None: """Poll a tracker for changes.""" from .watcher import check_tracker try: await check_tracker(tracker_id) except Exception as e: _LOGGER.error("Error polling tracker %d: %s", tracker_id, e) # --------------------------------------------------------------------------- # Action scheduling # --------------------------------------------------------------------------- async def _load_action_jobs() -> None: """Load enabled actions and schedule execution jobs.""" from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from ..database.engine import get_engine from ..database.models import Action engine = get_engine() scheduler = get_scheduler() async with AsyncSession(engine) as session: result = await session.exec( select(Action).where(Action.enabled == True) # noqa: E712 ) actions = result.all() for action in actions: job_id = f"action_{action.id}" if scheduler.get_job(job_id): continue if action.schedule_type == "cron" and action.schedule_cron: try: from apscheduler.triggers.cron import CronTrigger trigger = CronTrigger.from_crontab(action.schedule_cron) scheduler.add_job( _run_action, trigger, id=job_id, args=[action.id], replace_existing=True, ) _LOGGER.info( "Scheduled action %d (%s) with cron: %s", action.id, action.name, action.schedule_cron, ) continue except Exception as e: _LOGGER.error( "Invalid cron for action %d (%s): %s — falling back to interval", action.id, action.name, e, ) scheduler.add_job( _run_action, "interval", seconds=action.schedule_interval, id=job_id, args=[action.id], replace_existing=True, ) _LOGGER.info( "Scheduled action %d (%s) every %ds", action.id, action.name, action.schedule_interval, ) async def schedule_action( action_id: int, schedule_type: str = "interval", interval: int = 3600, cron_expression: str = "", ) -> None: """Add or update a scheduler job for an action.""" scheduler = get_scheduler() job_id = f"action_{action_id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) if schedule_type == "cron" and cron_expression: try: from apscheduler.triggers.cron import CronTrigger trigger = CronTrigger.from_crontab(cron_expression) scheduler.add_job( _run_action, trigger, id=job_id, args=[action_id], replace_existing=True, ) _LOGGER.info("Scheduled action %d with cron: %s", action_id, cron_expression) return except Exception as e: _LOGGER.error("Invalid cron for action %d: %s — using interval", action_id, e) scheduler.add_job( _run_action, "interval", seconds=interval, id=job_id, args=[action_id], replace_existing=True, ) _LOGGER.info("Scheduled action %d every %ds", action_id, interval) async def unschedule_action(action_id: int) -> None: """Remove a scheduler job for an action.""" scheduler = get_scheduler() job_id = f"action_{action_id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) _LOGGER.info("Unscheduled action %d", action_id) async def _run_action(action_id: int) -> None: """Run an action (called by APScheduler).""" from .action_runner import run_action try: await run_action(action_id, trigger="scheduled") except Exception as e: _LOGGER.error("Error running action %d: %s", action_id, e)