"""APScheduler-based polling scheduler for trackers and actions.""" from __future__ import annotations import logging from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from apscheduler.schedulers.asyncio import AsyncIOScheduler _LOGGER = logging.getLogger(__name__) def _resolve_zoneinfo(tz_name: str | None) -> ZoneInfo: """Resolve an IANA tz string to a ZoneInfo, falling back to UTC on any error. Kept local to avoid importing from api/dispatch layers inside the scheduler module (which is loaded at startup, before the API routers). """ if not tz_name: return ZoneInfo("UTC") try: return ZoneInfo(tz_name) except (ZoneInfoNotFoundError, ValueError): _LOGGER.warning("Unknown timezone %r; falling back to UTC", tz_name) return ZoneInfo("UTC") async def _load_app_timezone() -> ZoneInfo: """Load the admin-configured app timezone from AppSetting (falls back to UTC).""" from sqlmodel.ext.asyncio.session import AsyncSession from ..api.app_settings import get_setting from ..database.engine import get_engine async with AsyncSession(get_engine()) as session: tz_name = await get_setting(session, "timezone") return _resolve_zoneinfo(tz_name) _scheduler: AsyncIOScheduler | None = None # --------------------------------------------------------------------------- # Adaptive polling (Tier 6 of the big-album optimization plan). # # We don't touch the user-configured ``scan_interval`` — that's still the # authoritative cadence. Instead, we *skip* a growing fraction of scheduled # ticks when a tracker is idle, and reset to 1:1 as soon as it detects # anything. The scheduler keeps running on the user's chosen period, so # response time to the *first* change after an idle stretch is never worse # than one tick — but the steady-state HTTP cost for a fleet of idle # trackers drops by ~75%. # # Opt-in per tracker via the ``adaptive_max_skip`` column: # * NULL or 0 → adaptive polling disabled, every tick runs (default) # * 2 → skip at most 1-in-2 ticks after long idle # * 3, 4, ... → up to (N-1)-in-N skipping # Thresholds are intentionally conservative: a tracker polling every 30 s # needs 5 min of silence before we halve its effective rate, and 15 min # before we quarter it. # --------------------------------------------------------------------------- _ADAPTIVE_HALVE_THRESHOLD = 10 # consecutive empty ticks → 1-in-2 _ADAPTIVE_QUARTER_THRESHOLD = 30 # consecutive empty ticks → 1-in-4 # Per-tracker adaptive state, keyed by tracker_id. Rebuilt on process # restart — a short warmup period is fine and avoids persisting what is # effectively a performance heuristic. _adaptive_state: dict[int, dict[str, int]] = {} # Per-tracker cap on the skip factor, mirrored from the DB column at # schedule time. Absence of an entry (or 0) means adaptive polling is off # for that tracker — ``_adaptive_should_skip`` returns False immediately. _adaptive_max_skip: dict[int, int] = {} def set_adaptive_max_skip(tracker_id: int, max_skip: int | None) -> None: """Register/clear the adaptive cap for a tracker. Called by the scheduling helpers so the tick-fast-path in ``_adaptive_should_skip`` doesn't need to re-query the DB. Values ≤ 1 disable back-off for the tracker — every scheduled tick runs. """ if max_skip and max_skip > 1: _adaptive_max_skip[tracker_id] = int(max_skip) else: _adaptive_max_skip.pop(tracker_id, None) # Opting in/out mid-session should drop any prior counters so the # new behavior applies from the next tick, not N ticks later. _adaptive_state.pop(tracker_id, None) def _compute_jitter(interval_seconds: int) -> int: """Return a jitter bound (in seconds) suitable for an IntervalTrigger. Without jitter, a fleet of N trackers all on ``scan_interval=60`` wake up at the same wall-clock second every minute — that creates a thundering- herd on the upstream Immich/Gitea/etc. server. APScheduler's ``jitter`` randomizes each tick's firing time by ±jitter seconds. We use a quarter of the interval up to a 30 s cap. For short intervals (≤8 s) jitter would round to 0 — that's fine, at those cadences a bursty pattern is what the user implicitly opted into. """ if interval_seconds <= 0: return 0 return min(interval_seconds // 4, 30) def get_scheduler() -> AsyncIOScheduler: global _scheduler if _scheduler is None: # Sensible production defaults applied to every job unless overridden: # * coalesce — collapse a queue of missed runs into one firing after # a restart / pause, instead of bursting to catch up. # * misfire_grace_time — accept firings up to 5 min late without # dropping them silently. # * max_instances=1 — never run two copies of the same tracker tick # concurrently; the scheduler already enforces this on add_job, # but we also set it as the default for safety. _scheduler = AsyncIOScheduler( job_defaults={ "coalesce": True, "misfire_grace_time": 300, "max_instances": 1, }, ) return _scheduler async def start_scheduler() -> None: scheduler = get_scheduler() if not scheduler.running: scheduler.start() _LOGGER.info("Scheduler started") await _load_tracker_jobs() await _load_action_jobs() await _load_immich_dispatch_jobs() # Start Telegram bot polling for bots with active command listeners from .telegram_poller import start_command_listener_polling await start_command_listener_polling() # Schedule daily cleanup of old event log entries _schedule_event_cleanup() # Schedule periodic Telegram chat title refresh _schedule_telegram_chat_sync() # Start debounced command auto-sync scheduler from .command_sync import start_sync_scheduler start_sync_scheduler() # Load scheduled backup job if enabled await _load_backup_job() # Re-arm any deferred-dispatch drains that were pending across restart. from .deferred_dispatch import load_pending_drain_jobs await load_pending_drain_jobs() # And install the periodic safety-net catch-up scan. _schedule_drain_catchup() # Schedule the upstream release-check probe. await _schedule_release_check() # Schedule the bridge_self deferred-backlog scan (every 5 min). _schedule_bridge_self_backlog_scan() def _schedule_event_cleanup() -> None: """Schedule a daily job to delete EventLog entries older than 90 days.""" from apscheduler.triggers.cron import CronTrigger scheduler = get_scheduler() job_id = "cleanup_old_events" if scheduler.get_job(job_id): return scheduler.add_job( _cleanup_old_events, CronTrigger(hour=3, minute=0), id=job_id, replace_existing=True, max_instances=1, ) _LOGGER.info("Scheduled daily event log cleanup at 03:00 UTC") # Chat-title refresh tuning. # Sweep runs daily as a fallback — we additionally refresh opportunistically # on every incoming webhook/long-poll update (``save_chat_from_webhook``), so # the sweep only catches chats that haven't sent anything recently. _CHAT_SYNC_INTERVAL_HOURS = 24 _CHAT_SYNC_INITIAL_DELAY_SECONDS = 60 _CHAT_SYNC_CONCURRENCY = 10 def _schedule_telegram_chat_sync() -> None: """Schedule periodic refresh of Telegram chat titles via getChat.""" from apscheduler.triggers.interval import IntervalTrigger scheduler = get_scheduler() job_id = "refresh_telegram_chat_titles" if scheduler.get_job(job_id): return scheduler.add_job( _refresh_telegram_chat_titles, IntervalTrigger(hours=_CHAT_SYNC_INTERVAL_HOURS), id=job_id, replace_existing=True, max_instances=1, next_run_time=None, ) # Fire once shortly after startup so stale names refresh without waiting a day. from datetime import datetime, timedelta, timezone scheduler.add_job( _refresh_telegram_chat_titles, "date", run_date=datetime.now(timezone.utc) + timedelta(seconds=_CHAT_SYNC_INITIAL_DELAY_SECONDS), id="refresh_telegram_chat_titles_once", replace_existing=True, max_instances=1, ) _LOGGER.info( "Scheduled Telegram chat title refresh every %sh (concurrency %s)", _CHAT_SYNC_INTERVAL_HOURS, _CHAT_SYNC_CONCURRENCY, ) async def _refresh_telegram_chat_titles() -> None: """Refresh TelegramChat.title/username via getChat for all known chats. Runs requests in bounded parallel (``_CHAT_SYNC_CONCURRENCY``) so a fleet of 50 chats finishes in ~5 round-trips instead of 50. Telegram's ``getChat`` rate limit is well above 10 concurrent per bot, and the cap is global across bots so we never flood the shared HTTP session. """ import asyncio from collections import defaultdict from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from notify_bridge_core.notifications.telegram.client import TelegramClient from ..database.engine import get_engine from ..database.models import TelegramBot, TelegramChat from .http_session import get_http_session engine = get_engine() async with AsyncSession(engine) as session: bots = (await session.exec(select(TelegramBot))).all() bot_tokens = {b.id: b.token for b in bots if b.token} if not bot_tokens: return chats = (await session.exec(select(TelegramChat))).all() by_bot: dict[int, list[TelegramChat]] = defaultdict(list) for chat in chats: if chat.bot_id in bot_tokens: by_bot[chat.bot_id].append(chat) if not by_bot: return http = await get_http_session() clients_by_bot = { bot_id: TelegramClient(http, token) for bot_id, token in bot_tokens.items() } sem = asyncio.Semaphore(_CHAT_SYNC_CONCURRENCY) async def _fetch(bot_id: int, chat: TelegramChat) -> tuple[int, dict | None, str | None]: """Return (chat_row_id, info_dict_or_None, error_message_or_None).""" async with sem: try: res = await clients_by_bot[bot_id].get_chat(chat.chat_id) except Exception as err: # noqa: BLE001 return chat.id, None, str(err) if not res.get("success"): return chat.id, None, res.get("error") or "unknown" return chat.id, (res.get("result") or {}), None tasks = [ _fetch(bot_id, chat) for bot_id, bot_chats in by_bot.items() for chat in bot_chats ] results = await asyncio.gather(*tasks) refreshed = 0 errors = 0 # Bucket results first, then fetch all rows in one IN-query instead of # per-row ``session.get`` — otherwise a 50-chat fleet issues 50 extra # SELECTs before commit. successes: dict[int, dict] = {} for chat_id, info, err in results: if err is not None or info is None: errors += 1 if err: _LOGGER.debug("getChat failed for chat row %s: %s", chat_id, err) continue if chat_id is not None: successes[chat_id] = info async with AsyncSession(engine) as session: if successes: rows = (await session.exec( select(TelegramChat).where(TelegramChat.id.in_(list(successes.keys()))) )).all() for merged in rows: info = successes.get(merged.id) if not info: continue title = info.get("title") or ( (info.get("first_name", "") + " " + info.get("last_name", "")).strip() ) changed = False if title and merged.title != title: merged.title = title changed = True new_username = info.get("username") if new_username is not None and merged.username != new_username: merged.username = new_username changed = True if changed: session.add(merged) refreshed += 1 await session.commit() _LOGGER.info( "Telegram chat title refresh: %s updated, %s errors", refreshed, errors ) async def _cleanup_old_events() -> None: """Delete EventLog / WebhookPayloadLog / ActionExecution rows older than the configured retention window. A retention of 0 disables the job. """ from datetime import datetime, timedelta, timezone from sqlmodel import delete from sqlmodel.ext.asyncio.session import AsyncSession from ..config import settings from ..database.engine import get_engine from ..database.models import ActionExecution, EventLog, WebhookPayloadLog days = settings.event_log_retention_days if days <= 0: _LOGGER.debug("Event log retention disabled (days=0); skipping cleanup") return cutoff = datetime.now(timezone.utc) - timedelta(days=days) engine = get_engine() async with AsyncSession(engine) as session: await session.exec(delete(EventLog).where(EventLog.created_at < cutoff)) await session.exec( delete(WebhookPayloadLog).where(WebhookPayloadLog.created_at < cutoff) ) await session.exec( delete(ActionExecution).where(ActionExecution.started_at < cutoff) ) await session.commit() _LOGGER.info( "Cleaned event_log / webhook_payload_log / action_execution older than %s", cutoff.date(), ) async def _load_tracker_jobs() -> None: """Load enabled trackers and schedule polling jobs.""" from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from ..database.engine import get_engine from ..database.models import NotificationTracker, ServiceProvider as ServiceProviderModel engine = get_engine() scheduler = get_scheduler() async with AsyncSession(engine) as session: result = await session.exec(select(NotificationTracker).where(NotificationTracker.enabled == True)) trackers = result.all() # Batch-load provider types for scheduler detection unique_provider_ids = list({t.provider_id for t in trackers}) provider_types: dict[int, str] = {} if unique_provider_ids: provider_result = await session.exec( select(ServiceProviderModel).where( ServiceProviderModel.id.in_(unique_provider_ids) ) ) provider_types = {p.id: p.type for p in provider_result.all()} tz = await _load_app_timezone() from notify_bridge_core.providers.capabilities import get_capabilities for tracker in trackers: job_id = f"tracker_{tracker.id}" if scheduler.get_job(job_id): continue ptype = provider_types.get(tracker.provider_id, "") filters = tracker.filters or {} # Webhook-based providers receive events via inbound HTTP — there is # nothing to poll. Scheduling an interval job for them just wakes up # check_tracker every scan_interval seconds to immediately return, # wasting CPU and DB queries for no work. caps = get_capabilities(ptype) if ptype else None if caps and caps.webhook_based: _LOGGER.debug( "Skipping interval scheduling for webhook tracker %d (%s, type=%s)", tracker.id, tracker.name, ptype, ) continue # Scheduler providers can use cron triggers if ptype == "scheduler" and filters.get("schedule_type") == "cron": cron_expr = filters.get("cron_expression", "") if cron_expr: try: _add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name, tz) continue except Exception as e: _LOGGER.error( "Invalid cron expression for tracker %d (%s): %s — falling back to interval", tracker.id, tracker.name, e, ) jitter = _compute_jitter(tracker.scan_interval) scheduler.add_job( _poll_tracker, "interval", seconds=tracker.scan_interval, jitter=jitter or None, id=job_id, args=[tracker.id], replace_existing=True, max_instances=1, ) set_adaptive_max_skip(tracker.id, tracker.adaptive_max_skip) _LOGGER.info( "Scheduled tracker %d (%s) every %ds (jitter ±%ds, adaptive_max_skip=%s)", tracker.id, tracker.name, tracker.scan_interval, jitter, tracker.adaptive_max_skip, ) def _add_cron_job( scheduler: AsyncIOScheduler, job_id: str, tracker_id: int, cron_expression: str, tracker_name: str, tz: ZoneInfo, ) -> None: """Add a cron-triggered job for a scheduler-type tracker. ``tz`` is the user-configured app timezone; without it APScheduler interprets the crontab in the host's local timezone, which surfaces as events firing at the "wrong" wall-clock time for operators in a non-UTC zone (see the companion fix in ``update_settings`` which reschedules on timezone changes). """ from apscheduler.triggers.cron import CronTrigger trigger = CronTrigger.from_crontab(cron_expression, timezone=tz) scheduler.add_job( _poll_tracker, trigger, id=job_id, args=[tracker_id], replace_existing=True, max_instances=1, ) _LOGGER.info( "Scheduled tracker %d (%s) with cron: %s [tz=%s]", tracker_id, tracker_name, cron_expression, tz.key, ) async def _is_webhook_tracker(tracker_id: int) -> bool: """Return True iff the tracker's provider type is webhook-based. Looks up provider type once via the capabilities registry. Used by ``schedule_tracker`` to short-circuit interval scheduling. """ from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from notify_bridge_core.providers.capabilities import get_capabilities from ..database.engine import get_engine from ..database.models import NotificationTracker, ServiceProvider as ServiceProviderModel async with AsyncSession(get_engine()) as session: tracker = await session.get(NotificationTracker, tracker_id) if tracker is None: return False provider = await session.get(ServiceProviderModel, tracker.provider_id) if provider is None: return False caps = get_capabilities(provider.type) return bool(caps and caps.webhook_based) async def schedule_tracker( tracker_id: int, interval: int, cron_expression: str | None = None, adaptive_max_skip: int | None = None, ) -> None: """Add or update a scheduler job for a tracker. ``adaptive_max_skip`` mirrors the DB column and is registered with the adaptive module-state so tick-time skip decisions don't re-query the DB. Pass ``None`` or ``0`` to disable back-off for the tracker. Webhook-based providers receive events via inbound HTTP and have nothing to poll, so this no-ops for them — preventing scan_interval from creating useless wakeups via the API create/update path. """ scheduler = get_scheduler() job_id = f"tracker_{tracker_id}" # A reschedule typically follows a config edit or enable/disable flip — # drop adaptive back-off so the first tick after the change runs promptly. reset_adaptive_state(tracker_id) set_adaptive_max_skip(tracker_id, adaptive_max_skip) # Remove existing job first to allow trigger type changes if scheduler.get_job(job_id): scheduler.remove_job(job_id) # Webhook-based providers don't poll — skip job creation entirely. if await _is_webhook_tracker(tracker_id): _LOGGER.debug( "Skipping interval scheduling for webhook tracker %d", tracker_id, ) return if cron_expression: try: tz = await _load_app_timezone() _add_cron_job(scheduler, job_id, tracker_id, cron_expression, f"tracker-{tracker_id}", tz) return except Exception as e: _LOGGER.error("Invalid cron for tracker %d: %s — using interval", tracker_id, e) jitter = _compute_jitter(interval) scheduler.add_job( _poll_tracker, "interval", seconds=interval, jitter=jitter or None, id=job_id, args=[tracker_id], replace_existing=True, ) _LOGGER.info( "Scheduled tracker %d every %ds (jitter ±%ds, adaptive_max_skip=%s)", tracker_id, interval, jitter, adaptive_max_skip, ) async def unschedule_tracker(tracker_id: int) -> None: """Remove a scheduler job for a tracker.""" scheduler = get_scheduler() job_id = f"tracker_{tracker_id}" reset_adaptive_state(tracker_id) _adaptive_max_skip.pop(tracker_id, None) if scheduler.get_job(job_id): scheduler.remove_job(job_id) _LOGGER.info("Unscheduled tracker %d", tracker_id) def _adaptive_should_skip(tracker_id: int) -> bool: """Return True when the adaptive heuristic says to skip this tick. Short-circuits to False for trackers without a registered cap (adaptive off). Otherwise: if we're in 1-in-K mode, skip (K-1) ticks between each real poll. """ if tracker_id not in _adaptive_max_skip: return False state = _adaptive_state.get(tracker_id) if not state: return False skip_every = state.get("skip_every", 1) if skip_every <= 1: return False state["tick_counter"] = state.get("tick_counter", 0) + 1 # Fire on ticks where counter % skip_every == 0; skip the rest. return (state["tick_counter"] % skip_every) != 0 def _adaptive_update(tracker_id: int, events_detected: int) -> None: """Update the adaptive counter after a real tick ran. No-op when the tracker has adaptive polling disabled — otherwise we'd build up empty counters for trackers that will never use them. """ cap = _adaptive_max_skip.get(tracker_id) if not cap or cap <= 1: return state = _adaptive_state.setdefault( tracker_id, {"empty_count": 0, "skip_every": 1, "tick_counter": 0} ) if events_detected > 0: if state["skip_every"] > 1: _LOGGER.info( "Adaptive polling: tracker %d saw activity, restoring base rate", tracker_id, ) state["empty_count"] = 0 state["skip_every"] = 1 state["tick_counter"] = 0 return state["empty_count"] = state.get("empty_count", 0) + 1 target_quarter = min(cap, 4) if ( state["empty_count"] >= _ADAPTIVE_QUARTER_THRESHOLD and state["skip_every"] < target_quarter ): state["skip_every"] = target_quarter _LOGGER.info( "Adaptive polling: tracker %d idle for %d ticks, skipping %d of %d", tracker_id, state["empty_count"], target_quarter - 1, target_quarter, ) elif ( state["empty_count"] >= _ADAPTIVE_HALVE_THRESHOLD and state["skip_every"] < min(cap, 2) ): state["skip_every"] = min(cap, 2) _LOGGER.info( "Adaptive polling: tracker %d idle for %d ticks, skipping every other", tracker_id, state["empty_count"], ) def reset_adaptive_state(tracker_id: int) -> None: """Drop cached adaptive counters for a tracker. Used by API callers that make changes requiring the tracker to run promptly on the next scheduled tick (enable/disable, config edits, manual "check now" actions). Does NOT clear the configured cap — use ``set_adaptive_max_skip(..., None)`` for that. """ _adaptive_state.pop(tracker_id, None) async def _poll_tracker(tracker_id: int) -> None: """Poll a tracker for changes.""" from .watcher import check_tracker if _adaptive_should_skip(tracker_id): return try: result = await check_tracker(tracker_id) except Exception as e: _LOGGER.error("Error polling tracker %d: %s", tracker_id, e) return # Treat the "error" / "skipped" statuses as inconclusive — don't let # a transient upstream failure trick the heuristic into backing off. if isinstance(result, dict) and result.get("status") == "ok": _adaptive_update(tracker_id, int(result.get("events_detected", 0) or 0)) # --------------------------------------------------------------------------- # Action scheduling # --------------------------------------------------------------------------- async def _load_action_jobs() -> None: """Load enabled actions and schedule execution jobs.""" from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from ..database.engine import get_engine from ..database.models import Action engine = get_engine() scheduler = get_scheduler() async with AsyncSession(engine) as session: result = await session.exec( select(Action).where(Action.enabled == True) # noqa: E712 ) actions = result.all() tz = await _load_app_timezone() for action in actions: job_id = f"action_{action.id}" if scheduler.get_job(job_id): continue if action.schedule_type == "cron" and action.schedule_cron: try: from apscheduler.triggers.cron import CronTrigger trigger = CronTrigger.from_crontab(action.schedule_cron, timezone=tz) scheduler.add_job( _run_action, trigger, id=job_id, args=[action.id], replace_existing=True, ) _LOGGER.info( "Scheduled action %d (%s) with cron: %s [tz=%s]", action.id, action.name, action.schedule_cron, tz.key, ) continue except Exception as e: _LOGGER.error( "Invalid cron for action %d (%s): %s — falling back to interval", action.id, action.name, e, ) scheduler.add_job( _run_action, "interval", seconds=action.schedule_interval, id=job_id, args=[action.id], replace_existing=True, ) _LOGGER.info( "Scheduled action %d (%s) every %ds", action.id, action.name, action.schedule_interval, ) async def schedule_action( action_id: int, schedule_type: str = "interval", interval: int = 3600, cron_expression: str = "", ) -> None: """Add or update a scheduler job for an action.""" scheduler = get_scheduler() job_id = f"action_{action_id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) if schedule_type == "cron" and cron_expression: try: from apscheduler.triggers.cron import CronTrigger tz = await _load_app_timezone() trigger = CronTrigger.from_crontab(cron_expression, timezone=tz) scheduler.add_job( _run_action, trigger, id=job_id, args=[action_id], replace_existing=True, ) _LOGGER.info( "Scheduled action %d with cron: %s [tz=%s]", action_id, cron_expression, tz.key, ) return except Exception as e: _LOGGER.error("Invalid cron for action %d: %s — using interval", action_id, e) scheduler.add_job( _run_action, "interval", seconds=interval, id=job_id, args=[action_id], replace_existing=True, ) _LOGGER.info("Scheduled action %d every %ds", action_id, interval) async def unschedule_action(action_id: int) -> None: """Remove a scheduler job for an action.""" scheduler = get_scheduler() job_id = f"action_{action_id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) _LOGGER.info("Unscheduled action %d", action_id) async def reschedule_cron_jobs_for_timezone_change() -> None: """Re-add every cron-triggered tracker/action job under the new app timezone. Called by the admin settings endpoint after the ``timezone`` AppSetting is updated. APScheduler's ``CronTrigger`` freezes its timezone at construction time, so a timezone change has no effect on jobs already in the scheduler — we have to rebuild those jobs. Interval-triggered jobs are tz-agnostic and are left alone. """ from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from ..database.engine import get_engine from ..database.models import Action, NotificationTracker, ServiceProvider as ServiceProviderModel engine = get_engine() scheduler = get_scheduler() tz = await _load_app_timezone() rescheduled = 0 async with AsyncSession(engine) as session: # Trackers with cron scheduling (scheduler provider + schedule_type=cron). trackers = (await session.exec( select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712 )).all() provider_ids = list({t.provider_id for t in trackers}) provider_types: dict[int, str] = {} if provider_ids: rows = await session.exec( select(ServiceProviderModel).where(ServiceProviderModel.id.in_(provider_ids)) ) provider_types = {p.id: p.type for p in rows.all()} for tracker in trackers: if provider_types.get(tracker.provider_id) != "scheduler": continue filters = tracker.filters or {} if filters.get("schedule_type") != "cron": continue cron_expr = filters.get("cron_expression", "") if not cron_expr: continue job_id = f"tracker_{tracker.id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) try: _add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name, tz) rescheduled += 1 except Exception as e: # noqa: BLE001 _LOGGER.error( "Failed to re-apply cron for tracker %d on tz change: %s", tracker.id, e, ) # Actions with cron schedules. actions = (await session.exec( select(Action).where(Action.enabled == True) # noqa: E712 )).all() from apscheduler.triggers.cron import CronTrigger for action in actions: if action.schedule_type != "cron" or not action.schedule_cron: continue job_id = f"action_{action.id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) try: scheduler.add_job( _run_action, CronTrigger.from_crontab(action.schedule_cron, timezone=tz), id=job_id, args=[action.id], replace_existing=True, ) rescheduled += 1 except Exception as e: # noqa: BLE001 _LOGGER.error( "Failed to re-apply cron for action %d on tz change: %s", action.id, e, ) _LOGGER.info( "Rescheduled %d cron job(s) for new app timezone %s", rescheduled, tz.key, ) # Immich scheduled/periodic/memory jobs are also CronTrigger-based and # carry the same frozen-tz problem — rebuild them under the new tz. await reschedule_immich_dispatch_jobs() async def _run_action(action_id: int) -> None: """Run an action (called by APScheduler).""" from .action_runner import run_action try: await run_action(action_id, trigger="scheduled") except Exception as e: _LOGGER.error("Error running action %d: %s", action_id, e) # --------------------------------------------------------------------------- # Immich scheduled / periodic / memory dispatch (cron-fired) # # These three slots fire on wall-clock schedules taken from the tracker's # default ``TrackingConfig`` (``scheduled_times``, ``periodic_times``, # ``memory_times`` — comma-separated ``HH:MM`` strings) interpreted in the # app-level IANA timezone. The dispatch flow lives in # ``services.scheduled_dispatch``; this section just owns scheduling. # --------------------------------------------------------------------------- _IMMICH_DISPATCH_KINDS = ("scheduled", "periodic", "memory") _IMMICH_DISPATCH_PREFIX = "immich_dispatch_" def _parse_hhmm_list(raw: str) -> list[tuple[int, int]]: """Parse ``"09:00,18:30"`` → ``[(9, 0), (18, 30)]``, skipping bad entries. A typo in one slot must not prevent the others from scheduling — we log and move on rather than raising. """ out: list[tuple[int, int]] = [] for part in (raw or "").split(","): part = part.strip() if not part: continue try: h_str, m_str = part.split(":", 1) hour, minute = int(h_str), int(m_str) except ValueError: _LOGGER.warning("Skipping invalid time literal %r", part) continue if not (0 <= hour <= 23 and 0 <= minute <= 59): _LOGGER.warning("Skipping out-of-range time %r", part) continue out.append((hour, minute)) return out async def _run_immich_dispatch(tracker_id: int, kind: str) -> None: """APScheduler entry point — wraps the dispatch helper to swallow errors.""" from .scheduled_dispatch import dispatch_scheduled_for_tracker try: await dispatch_scheduled_for_tracker(tracker_id, kind) # type: ignore[arg-type] except Exception as err: # noqa: BLE001 _LOGGER.error( "Immich %s dispatch for tracker %d failed: %s", kind, tracker_id, err, ) async def _load_immich_dispatch_jobs() -> None: """Schedule cron jobs for every (tracker, kind, time) where the kind is on. Reads each enabled Immich tracker's *default* tracking config — per-link overrides only gate dispatch (handled in ``scheduled_dispatch``), they do not influence the fire schedule. """ from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from apscheduler.triggers.cron import CronTrigger from ..database.engine import get_engine from ..database.models import ( NotificationTracker, ServiceProvider as ServiceProviderModel, TrackingConfig, ) engine = get_engine() scheduler = get_scheduler() tz = await _load_app_timezone() async with AsyncSession(engine) as session: trackers = (await session.exec( select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712 )).all() if not trackers: return provider_ids = list({t.provider_id for t in trackers}) provider_types: dict[int, str] = {} if provider_ids: rows = await session.exec( select(ServiceProviderModel).where( ServiceProviderModel.id.in_(provider_ids) ) ) provider_types = {p.id: p.type for p in rows.all()} tc_ids = list({ t.default_tracking_config_id for t in trackers if t.default_tracking_config_id }) tc_map: dict[int, TrackingConfig] = {} if tc_ids: rows = await session.exec( select(TrackingConfig).where(TrackingConfig.id.in_(tc_ids)) ) tc_map = {tc.id: tc for tc in rows.all()} scheduled = 0 for tracker in trackers: if provider_types.get(tracker.provider_id) != "immich": continue tc = tc_map.get(tracker.default_tracking_config_id) if tracker.default_tracking_config_id else None if tc is None: continue for kind in _IMMICH_DISPATCH_KINDS: if not getattr(tc, f"{kind}_enabled", False): continue times_raw = getattr(tc, f"{kind}_times", "") or "" for hour, minute in _parse_hhmm_list(times_raw): job_id = f"{_IMMICH_DISPATCH_PREFIX}{kind}_{tracker.id}_{hour:02d}{minute:02d}" scheduler.add_job( _run_immich_dispatch, CronTrigger(hour=hour, minute=minute, timezone=tz), id=job_id, args=[tracker.id, kind], replace_existing=True, max_instances=1, ) scheduled += 1 _LOGGER.info( "Scheduled Immich %s for tracker %d at %02d:%02d [tz=%s]", kind, tracker.id, hour, minute, tz.key, ) if scheduled: _LOGGER.info( "Loaded %d Immich scheduled/periodic/memory job(s) [tz=%s]", scheduled, tz.key, ) async def reschedule_immich_dispatch_jobs() -> None: """Drop and rebuild all Immich scheduled/periodic/memory jobs. Cheap to call on every relevant mutation — a typical install has only a handful of trackers. Called from the tracker, link, and tracking-config CRUD endpoints, and from ``reschedule_cron_jobs_for_timezone_change``. """ scheduler = get_scheduler() for job in list(scheduler.get_jobs()): if job.id.startswith(_IMMICH_DISPATCH_PREFIX): scheduler.remove_job(job.id) await _load_immich_dispatch_jobs() # --------------------------------------------------------------------------- # Scheduled backup # --------------------------------------------------------------------------- _BACKUP_JOB_ID = "scheduled_backup" async def _load_backup_job() -> None: """Load scheduled backup job from settings if enabled.""" from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession as _AS from ..database.engine import get_engine from ..database.models import AppSetting engine = get_engine() async with _AS(engine) as session: enabled_row = await session.get(AppSetting, "backup_scheduled_enabled") interval_row = await session.get(AppSetting, "backup_scheduled_interval_hours") enabled = enabled_row and enabled_row.value == "true" if not enabled: return interval_hours = int(interval_row.value) if interval_row and interval_row.value else 24 scheduler = get_scheduler() scheduler.add_job( _run_scheduled_backup, "interval", hours=interval_hours, id=_BACKUP_JOB_ID, replace_existing=True, max_instances=1, ) _LOGGER.info("Scheduled backup every %dh", interval_hours) async def schedule_backup(interval_hours: int = 24) -> None: """Add or update the scheduled backup job.""" scheduler = get_scheduler() if scheduler.get_job(_BACKUP_JOB_ID): scheduler.remove_job(_BACKUP_JOB_ID) scheduler.add_job( _run_scheduled_backup, "interval", hours=interval_hours, id=_BACKUP_JOB_ID, replace_existing=True, max_instances=1, ) _LOGGER.info("Scheduled backup every %dh", interval_hours) async def unschedule_backup() -> None: """Remove the scheduled backup job.""" scheduler = get_scheduler() if scheduler.get_job(_BACKUP_JOB_ID): scheduler.remove_job(_BACKUP_JOB_ID) _LOGGER.info("Unscheduled backup job") # --------------------------------------------------------------------------- # Deferred-dispatch drain # --------------------------------------------------------------------------- # # When ``defer_event`` enqueues a quiet-hours notification, the calling site # asks us to add a one-shot ``date`` job at ``quiet_hours_end_at``. We key the # job id by the minute-rounded end time so multiple defers that share the same # window-end share a single drain job (idempotent via ``replace_existing``). # # At fire time the job runs ``drain_deferred_due`` which scans all pending # rows and dispatches whatever is ready. # # A periodic catch-up scan runs every ``_DRAIN_CATCHUP_INTERVAL_SECONDS`` as # the safety net for failure modes the one-shot job can't cover: # * APScheduler's misfire grace exceeded (event loop blocked past fire_at; # the date job is silently discarded by the scheduler) # * Process killed between the deferred-row DB commit and the # ``schedule_deferred_drain`` call — row exists, job doesn't # * Clock drift / DST seam edge cases _DEFERRED_DRAIN_PREFIX = "deferred_drain_" _DEFERRED_DRAIN_CATCHUP_JOB = "deferred_drain_catchup" # Generous so a temporarily-blocked event loop doesn't make the scheduler # discard our drain job. Once discarded the deferred rows would wait for the # next process restart or the catch-up scan below — survivable but visibly # late from the user's perspective. _DEFERRED_DRAIN_MISFIRE_GRACE_SECONDS = 3600 # 5 min trade-off between "promptness of late delivery" and "extra DB churn". # The scan is a single indexed lookup on (status, fire_at). _DRAIN_CATCHUP_INTERVAL_SECONDS = 300 def _drain_job_id_for(fire_at_utc: datetime) -> str: # Include seconds — two trackers with quiet windows that end at the same # minute but different seconds (e.g. user-set 06:00:00 vs 06:00:30) would # otherwise collide on a single APScheduler job id, and ``replace_existing`` # would silently drop the second one. return f"{_DEFERRED_DRAIN_PREFIX}{fire_at_utc.strftime('%Y%m%d%H%M%S')}" def schedule_deferred_drain(fire_at_utc: datetime) -> None: """Add an idempotent one-shot drain job for ``fire_at_utc``. Past times schedule a near-immediate firing (now+1s) — the drain query handles ``fire_at <= now`` regardless of which job fired, so a near-miss still picks up the work. """ from datetime import datetime, timezone if fire_at_utc.tzinfo is None: fire_at_utc = fire_at_utc.replace(tzinfo=timezone.utc) scheduler = get_scheduler() job_id = _drain_job_id_for(fire_at_utc) run_at = fire_at_utc if run_at <= datetime.now(timezone.utc): from datetime import timedelta run_at = datetime.now(timezone.utc) + timedelta(seconds=1) scheduler.add_job( _run_deferred_drain, "date", run_date=run_at, id=job_id, args=[fire_at_utc.isoformat()], replace_existing=True, max_instances=1, # Override the global 5-min grace — see module-level comment. misfire_grace_time=_DEFERRED_DRAIN_MISFIRE_GRACE_SECONDS, ) _LOGGER.debug("Scheduled deferred drain %s (fire_at=%s)", job_id, fire_at_utc.isoformat()) def _schedule_drain_catchup() -> None: """Install the periodic catch-up scan. See module comment.""" from apscheduler.triggers.interval import IntervalTrigger scheduler = get_scheduler() if scheduler.get_job(_DEFERRED_DRAIN_CATCHUP_JOB): return scheduler.add_job( _run_deferred_drain_catchup, IntervalTrigger(seconds=_DRAIN_CATCHUP_INTERVAL_SECONDS), id=_DEFERRED_DRAIN_CATCHUP_JOB, replace_existing=True, max_instances=1, coalesce=True, ) _LOGGER.info( "Scheduled deferred-dispatch catch-up scan every %ds", _DRAIN_CATCHUP_INTERVAL_SECONDS, ) async def _run_deferred_drain(fire_at_iso: str) -> None: """APScheduler entry point — log the original fire_at then drain due rows. The ``fire_at_iso`` arg is only used for logging; the drain itself picks up every pending row whose ``fire_at`` has passed. """ from .deferred_dispatch import drain_deferred_due try: stats = await drain_deferred_due() _LOGGER.info("Deferred drain (fire_at=%s) stats: %s", fire_at_iso, stats) except Exception as err: # noqa: BLE001 _LOGGER.exception("Deferred drain (fire_at=%s) failed: %s", fire_at_iso, err) async def _run_deferred_drain_catchup() -> None: """Periodic safety-net drain — see module comment. Distinct from the per-fire-at job only in cadence and log line; calls the same ``drain_deferred_due`` which is a no-op when nothing is due. """ from .deferred_dispatch import drain_deferred_due try: stats = await drain_deferred_due() # Quiet at debug level when nothing happened — every 5 min is too # noisy at info on an idle system. if stats.get("fired") or stats.get("dropped") or stats.get("errors"): _LOGGER.info("Deferred catch-up stats: %s", stats) else: _LOGGER.debug("Deferred catch-up stats: %s", stats) except Exception as err: # noqa: BLE001 _LOGGER.exception("Deferred catch-up drain failed: %s", err) async def _run_scheduled_backup() -> None: """Run a scheduled backup (called by APScheduler).""" from sqlmodel.ext.asyncio.session import AsyncSession as _AS from ..database.engine import get_engine from ..database.models import AppSetting, User from ..config import settings as app_config from .backup_schema import SecretsMode from .backup_service import export_backup_to_file, cleanup_old_backups try: engine = get_engine() async with _AS(engine) as session: # Read settings secrets_row = await session.get(AppSetting, "backup_secrets_mode") retention_row = await session.get(AppSetting, "backup_retention_count") secrets_mode = SecretsMode(secrets_row.value) if secrets_row and secrets_row.value else SecretsMode.EXCLUDE retention = int(retention_row.value) if retention_row and retention_row.value else 5 # Find admin user (first admin) for ownership context from sqlmodel import select admin_result = await session.exec( select(User).where(User.role == "admin") ) admin = admin_result.first() if not admin: _LOGGER.warning("No admin user found, skipping scheduled backup") return backup_dir = app_config.data_dir / "backups" await export_backup_to_file(session, admin.id, backup_dir, secrets_mode) # Cleanup outside the session cleanup_old_backups(backup_dir, keep=retention) except Exception as e: _LOGGER.error("Scheduled backup failed: %s", e) # --- Release-check probe ----------------------------------------------------- _RELEASE_CHECK_JOB_ID = "upstream_release_check" _RELEASE_CHECK_ONESHOT_JOB_ID = "upstream_release_check_oneshot" _RELEASE_CHECK_ONESHOT_DELAY_SECONDS = 30 async def _schedule_release_check() -> None: """Register the interval + one-shot release-check jobs. Reads the configured interval from AppSettings at startup. Idempotent — APScheduler de-dupes via ``replace_existing=True``. """ from apscheduler.triggers.interval import IntervalTrigger from datetime import datetime, timedelta, timezone from sqlmodel.ext.asyncio.session import AsyncSession from ..api.app_settings import get_setting from ..database.engine import get_engine from .release_check import parse_interval_hours, run_check async with AsyncSession(get_engine()) as session: raw = await get_setting(session, "release_check_interval_hours") interval_hours = parse_interval_hours(raw) scheduler = get_scheduler() scheduler.add_job( run_check, IntervalTrigger(hours=interval_hours), id=_RELEASE_CHECK_JOB_ID, replace_existing=True, max_instances=1, ) # One-shot probe shortly after start so admins see a fresh status without # waiting for the first interval tick. Mirrors the chat-title sync. scheduler.add_job( run_check, "date", run_date=datetime.now(timezone.utc) + timedelta(seconds=_RELEASE_CHECK_ONESHOT_DELAY_SECONDS), id=_RELEASE_CHECK_ONESHOT_JOB_ID, replace_existing=True, max_instances=1, ) _LOGGER.info("Scheduled release-check every %sh (one-shot in %ss)", interval_hours, _RELEASE_CHECK_ONESHOT_DELAY_SECONDS) # --------------------------------------------------------------------------- # Bridge self-monitoring — deferred-backlog scan # --------------------------------------------------------------------------- _BRIDGE_SELF_BACKLOG_JOB_ID = "bridge_self_deferred_backlog_scan" # 5 min trade-off between "operator notices the backlog quickly" and "extra # DB churn on a quiet system". The scan is one indexed GROUP BY query. _BRIDGE_SELF_BACKLOG_INTERVAL_SECONDS = 300 def _schedule_bridge_self_backlog_scan() -> None: """Install the periodic deferred-backlog scan for bridge_self.""" from apscheduler.triggers.interval import IntervalTrigger scheduler = get_scheduler() if scheduler.get_job(_BRIDGE_SELF_BACKLOG_JOB_ID): return scheduler.add_job( _run_bridge_self_backlog_scan, IntervalTrigger(seconds=_BRIDGE_SELF_BACKLOG_INTERVAL_SECONDS), id=_BRIDGE_SELF_BACKLOG_JOB_ID, replace_existing=True, max_instances=1, coalesce=True, ) _LOGGER.info( "Scheduled bridge_self deferred-backlog scan every %ds", _BRIDGE_SELF_BACKLOG_INTERVAL_SECONDS, ) async def _run_bridge_self_backlog_scan() -> None: """APScheduler entry point — scan deferred backlog and emit if needed.""" from .bridge_self import check_deferred_backlog try: stats = await check_deferred_backlog() if stats.get("crossings"): _LOGGER.info("bridge_self backlog scan stats: %s", stats) else: _LOGGER.debug("bridge_self backlog scan stats: %s", stats) except Exception as err: # noqa: BLE001 _LOGGER.exception("bridge_self backlog scan failed: %s", err) async def reschedule_release_check() -> None: """Re-arm the release-check job after settings changed. Called from the PUT /settings handler when the interval or provider config changes. Removes the existing interval job, lets ``_schedule_release_check`` re-read the setting and rebuild it, and queues a fresh one-shot so the new config takes effect within seconds rather than at the next interval tick. """ scheduler = get_scheduler() if scheduler.get_job(_RELEASE_CHECK_JOB_ID): scheduler.remove_job(_RELEASE_CHECK_JOB_ID) if scheduler.get_job(_RELEASE_CHECK_ONESHOT_JOB_ID): scheduler.remove_job(_RELEASE_CHECK_ONESHOT_JOB_ID) await _schedule_release_check()