"""APScheduler-based polling scheduler for trackers and actions.""" from __future__ import annotations import logging from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from apscheduler.schedulers.asyncio import AsyncIOScheduler _LOGGER = logging.getLogger(__name__) def _resolve_zoneinfo(tz_name: str | None) -> ZoneInfo: """Resolve an IANA tz string to a ZoneInfo, falling back to UTC on any error. Kept local to avoid importing from api/dispatch layers inside the scheduler module (which is loaded at startup, before the API routers). """ if not tz_name: return ZoneInfo("UTC") try: return ZoneInfo(tz_name) except (ZoneInfoNotFoundError, ValueError): _LOGGER.warning("Unknown timezone %r; falling back to UTC", tz_name) return ZoneInfo("UTC") async def _load_app_timezone() -> ZoneInfo: """Load the admin-configured app timezone from AppSetting (falls back to UTC).""" from sqlmodel.ext.asyncio.session import AsyncSession from ..api.app_settings import get_setting from ..database.engine import get_engine async with AsyncSession(get_engine()) as session: tz_name = await get_setting(session, "timezone") return _resolve_zoneinfo(tz_name) _scheduler: AsyncIOScheduler | None = None # --------------------------------------------------------------------------- # Adaptive polling (Tier 6 of the big-album optimization plan). # # We don't touch the user-configured ``scan_interval`` — that's still the # authoritative cadence. Instead, we *skip* a growing fraction of scheduled # ticks when a tracker is idle, and reset to 1:1 as soon as it detects # anything. The scheduler keeps running on the user's chosen period, so # response time to the *first* change after an idle stretch is never worse # than one tick — but the steady-state HTTP cost for a fleet of idle # trackers drops by ~75%. # # Thresholds are intentionally conservative: a tracker polling every 30 s # needs 5 min of silence before we halve its effective rate, and 15 min # before we quarter it. Any caller can disable adaptive behavior by passing # ``adaptive=False`` in the tracker filters dict (checked in ``_poll_tracker``). # --------------------------------------------------------------------------- _ADAPTIVE_HALVE_THRESHOLD = 10 # consecutive empty ticks → 1-in-2 _ADAPTIVE_QUARTER_THRESHOLD = 30 # consecutive empty ticks → 1-in-4 _ADAPTIVE_MAX_SKIP = 4 # hard cap on skip factor # Per-tracker adaptive state, keyed by tracker_id. Rebuilt on process # restart — a short warmup period is fine and avoids persisting what is # effectively a performance heuristic. _adaptive_state: dict[int, dict[str, int]] = {} def _compute_jitter(interval_seconds: int) -> int: """Return a jitter bound (in seconds) suitable for an IntervalTrigger. Without jitter, a fleet of N trackers all on ``scan_interval=60`` wake up at the same wall-clock second every minute — that creates a thundering- herd on the upstream Immich/Gitea/etc. server. APScheduler's ``jitter`` randomizes each tick's firing time by ±jitter seconds. We use a quarter of the interval up to a 30 s cap. For short intervals (≤8 s) jitter would round to 0 — that's fine, at those cadences a bursty pattern is what the user implicitly opted into. """ if interval_seconds <= 0: return 0 return min(interval_seconds // 4, 30) def get_scheduler() -> AsyncIOScheduler: global _scheduler if _scheduler is None: # Sensible production defaults applied to every job unless overridden: # * coalesce — collapse a queue of missed runs into one firing after # a restart / pause, instead of bursting to catch up. # * misfire_grace_time — accept firings up to 5 min late without # dropping them silently. # * max_instances=1 — never run two copies of the same tracker tick # concurrently; the scheduler already enforces this on add_job, # but we also set it as the default for safety. _scheduler = AsyncIOScheduler( job_defaults={ "coalesce": True, "misfire_grace_time": 300, "max_instances": 1, }, ) return _scheduler async def start_scheduler() -> None: scheduler = get_scheduler() if not scheduler.running: scheduler.start() _LOGGER.info("Scheduler started") await _load_tracker_jobs() await _load_action_jobs() await _load_immich_dispatch_jobs() # Start Telegram bot polling for bots with active command listeners from .telegram_poller import start_command_listener_polling await start_command_listener_polling() # Schedule daily cleanup of old event log entries _schedule_event_cleanup() # Schedule periodic Telegram chat title refresh _schedule_telegram_chat_sync() # Start debounced command auto-sync scheduler from .command_sync import start_sync_scheduler start_sync_scheduler() # Load scheduled backup job if enabled await _load_backup_job() def _schedule_event_cleanup() -> None: """Schedule a daily job to delete EventLog entries older than 90 days.""" from apscheduler.triggers.cron import CronTrigger scheduler = get_scheduler() job_id = "cleanup_old_events" if scheduler.get_job(job_id): return scheduler.add_job( _cleanup_old_events, CronTrigger(hour=3, minute=0), id=job_id, replace_existing=True, max_instances=1, ) _LOGGER.info("Scheduled daily event log cleanup at 03:00 UTC") # Chat-title refresh tuning. # Sweep runs daily as a fallback — we additionally refresh opportunistically # on every incoming webhook/long-poll update (``save_chat_from_webhook``), so # the sweep only catches chats that haven't sent anything recently. _CHAT_SYNC_INTERVAL_HOURS = 24 _CHAT_SYNC_INITIAL_DELAY_SECONDS = 60 _CHAT_SYNC_CONCURRENCY = 10 def _schedule_telegram_chat_sync() -> None: """Schedule periodic refresh of Telegram chat titles via getChat.""" from apscheduler.triggers.interval import IntervalTrigger scheduler = get_scheduler() job_id = "refresh_telegram_chat_titles" if scheduler.get_job(job_id): return scheduler.add_job( _refresh_telegram_chat_titles, IntervalTrigger(hours=_CHAT_SYNC_INTERVAL_HOURS), id=job_id, replace_existing=True, max_instances=1, next_run_time=None, ) # Fire once shortly after startup so stale names refresh without waiting a day. from datetime import datetime, timedelta, timezone scheduler.add_job( _refresh_telegram_chat_titles, "date", run_date=datetime.now(timezone.utc) + timedelta(seconds=_CHAT_SYNC_INITIAL_DELAY_SECONDS), id="refresh_telegram_chat_titles_once", replace_existing=True, max_instances=1, ) _LOGGER.info( "Scheduled Telegram chat title refresh every %sh (concurrency %s)", _CHAT_SYNC_INTERVAL_HOURS, _CHAT_SYNC_CONCURRENCY, ) async def _refresh_telegram_chat_titles() -> None: """Refresh TelegramChat.title/username via getChat for all known chats. Runs requests in bounded parallel (``_CHAT_SYNC_CONCURRENCY``) so a fleet of 50 chats finishes in ~5 round-trips instead of 50. Telegram's ``getChat`` rate limit is well above 10 concurrent per bot, and the cap is global across bots so we never flood the shared HTTP session. """ import asyncio from collections import defaultdict from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from notify_bridge_core.notifications.telegram.client import TelegramClient from ..database.engine import get_engine from ..database.models import TelegramBot, TelegramChat from .http_session import get_http_session engine = get_engine() async with AsyncSession(engine) as session: bots = (await session.exec(select(TelegramBot))).all() bot_tokens = {b.id: b.token for b in bots if b.token} if not bot_tokens: return chats = (await session.exec(select(TelegramChat))).all() by_bot: dict[int, list[TelegramChat]] = defaultdict(list) for chat in chats: if chat.bot_id in bot_tokens: by_bot[chat.bot_id].append(chat) if not by_bot: return http = await get_http_session() clients_by_bot = { bot_id: TelegramClient(http, token) for bot_id, token in bot_tokens.items() } sem = asyncio.Semaphore(_CHAT_SYNC_CONCURRENCY) async def _fetch(bot_id: int, chat: TelegramChat) -> tuple[int, dict | None, str | None]: """Return (chat_row_id, info_dict_or_None, error_message_or_None).""" async with sem: try: res = await clients_by_bot[bot_id].get_chat(chat.chat_id) except Exception as err: # noqa: BLE001 return chat.id, None, str(err) if not res.get("success"): return chat.id, None, res.get("error") or "unknown" return chat.id, (res.get("result") or {}), None tasks = [ _fetch(bot_id, chat) for bot_id, bot_chats in by_bot.items() for chat in bot_chats ] results = await asyncio.gather(*tasks) refreshed = 0 errors = 0 # Bucket results first, then fetch all rows in one IN-query instead of # per-row ``session.get`` — otherwise a 50-chat fleet issues 50 extra # SELECTs before commit. successes: dict[int, dict] = {} for chat_id, info, err in results: if err is not None or info is None: errors += 1 if err: _LOGGER.debug("getChat failed for chat row %s: %s", chat_id, err) continue if chat_id is not None: successes[chat_id] = info async with AsyncSession(engine) as session: if successes: rows = (await session.exec( select(TelegramChat).where(TelegramChat.id.in_(list(successes.keys()))) )).all() for merged in rows: info = successes.get(merged.id) if not info: continue title = info.get("title") or ( (info.get("first_name", "") + " " + info.get("last_name", "")).strip() ) changed = False if title and merged.title != title: merged.title = title changed = True new_username = info.get("username") if new_username is not None and merged.username != new_username: merged.username = new_username changed = True if changed: session.add(merged) refreshed += 1 await session.commit() _LOGGER.info( "Telegram chat title refresh: %s updated, %s errors", refreshed, errors ) async def _cleanup_old_events() -> None: """Delete EventLog / WebhookPayloadLog / ActionExecution rows older than the configured retention window. A retention of 0 disables the job. """ from datetime import datetime, timedelta, timezone from sqlmodel import delete from sqlmodel.ext.asyncio.session import AsyncSession from ..config import settings from ..database.engine import get_engine from ..database.models import ActionExecution, EventLog, WebhookPayloadLog days = settings.event_log_retention_days if days <= 0: _LOGGER.debug("Event log retention disabled (days=0); skipping cleanup") return cutoff = datetime.now(timezone.utc) - timedelta(days=days) engine = get_engine() async with AsyncSession(engine) as session: await session.exec(delete(EventLog).where(EventLog.created_at < cutoff)) await session.exec( delete(WebhookPayloadLog).where(WebhookPayloadLog.created_at < cutoff) ) await session.exec( delete(ActionExecution).where(ActionExecution.started_at < cutoff) ) await session.commit() _LOGGER.info( "Cleaned event_log / webhook_payload_log / action_execution older than %s", cutoff.date(), ) async def _load_tracker_jobs() -> None: """Load enabled trackers and schedule polling jobs.""" from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from ..database.engine import get_engine from ..database.models import NotificationTracker, ServiceProvider as ServiceProviderModel engine = get_engine() scheduler = get_scheduler() async with AsyncSession(engine) as session: result = await session.exec(select(NotificationTracker).where(NotificationTracker.enabled == True)) trackers = result.all() # Batch-load provider types for scheduler detection unique_provider_ids = list({t.provider_id for t in trackers}) provider_types: dict[int, str] = {} if unique_provider_ids: provider_result = await session.exec( select(ServiceProviderModel).where( ServiceProviderModel.id.in_(unique_provider_ids) ) ) provider_types = {p.id: p.type for p in provider_result.all()} tz = await _load_app_timezone() for tracker in trackers: job_id = f"tracker_{tracker.id}" if scheduler.get_job(job_id): continue ptype = provider_types.get(tracker.provider_id, "") filters = tracker.filters or {} # Scheduler providers can use cron triggers if ptype == "scheduler" and filters.get("schedule_type") == "cron": cron_expr = filters.get("cron_expression", "") if cron_expr: try: _add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name, tz) continue except Exception as e: _LOGGER.error( "Invalid cron expression for tracker %d (%s): %s — falling back to interval", tracker.id, tracker.name, e, ) jitter = _compute_jitter(tracker.scan_interval) scheduler.add_job( _poll_tracker, "interval", seconds=tracker.scan_interval, jitter=jitter or None, id=job_id, args=[tracker.id], replace_existing=True, max_instances=1, ) _LOGGER.info( "Scheduled tracker %d (%s) every %ds (jitter ±%ds)", tracker.id, tracker.name, tracker.scan_interval, jitter, ) def _add_cron_job( scheduler: AsyncIOScheduler, job_id: str, tracker_id: int, cron_expression: str, tracker_name: str, tz: ZoneInfo, ) -> None: """Add a cron-triggered job for a scheduler-type tracker. ``tz`` is the user-configured app timezone; without it APScheduler interprets the crontab in the host's local timezone, which surfaces as events firing at the "wrong" wall-clock time for operators in a non-UTC zone (see the companion fix in ``update_settings`` which reschedules on timezone changes). """ from apscheduler.triggers.cron import CronTrigger trigger = CronTrigger.from_crontab(cron_expression, timezone=tz) scheduler.add_job( _poll_tracker, trigger, id=job_id, args=[tracker_id], replace_existing=True, max_instances=1, ) _LOGGER.info( "Scheduled tracker %d (%s) with cron: %s [tz=%s]", tracker_id, tracker_name, cron_expression, tz.key, ) async def schedule_tracker( tracker_id: int, interval: int, cron_expression: str | None = None, ) -> None: """Add or update a scheduler job for a tracker.""" scheduler = get_scheduler() job_id = f"tracker_{tracker_id}" # A reschedule typically follows a config edit or enable/disable flip — # drop adaptive back-off so the first tick after the change runs promptly. reset_adaptive_state(tracker_id) # Remove existing job first to allow trigger type changes if scheduler.get_job(job_id): scheduler.remove_job(job_id) if cron_expression: try: tz = await _load_app_timezone() _add_cron_job(scheduler, job_id, tracker_id, cron_expression, f"tracker-{tracker_id}", tz) return except Exception as e: _LOGGER.error("Invalid cron for tracker %d: %s — using interval", tracker_id, e) jitter = _compute_jitter(interval) scheduler.add_job( _poll_tracker, "interval", seconds=interval, jitter=jitter or None, id=job_id, args=[tracker_id], replace_existing=True, ) _LOGGER.info( "Scheduled tracker %d every %ds (jitter ±%ds)", tracker_id, interval, jitter, ) async def unschedule_tracker(tracker_id: int) -> None: """Remove a scheduler job for a tracker.""" scheduler = get_scheduler() job_id = f"tracker_{tracker_id}" reset_adaptive_state(tracker_id) if scheduler.get_job(job_id): scheduler.remove_job(job_id) _LOGGER.info("Unscheduled tracker %d", tracker_id) def _adaptive_should_skip(tracker_id: int) -> bool: """Return True when the adaptive heuristic says to skip this tick. Run-length skip: if we're in 1-in-K mode, skip (K-1) ticks between each real poll. Stateless about the *current* tick counter except for the ``tick_counter`` we bump here. """ state = _adaptive_state.get(tracker_id) if not state: return False skip_every = state.get("skip_every", 1) if skip_every <= 1: return False state["tick_counter"] = state.get("tick_counter", 0) + 1 # Fire on ticks where counter % skip_every == 0; skip the rest. return (state["tick_counter"] % skip_every) != 0 def _adaptive_update(tracker_id: int, events_detected: int) -> None: """Update the adaptive counter after a real tick ran.""" state = _adaptive_state.setdefault( tracker_id, {"empty_count": 0, "skip_every": 1, "tick_counter": 0} ) if events_detected > 0: if state["skip_every"] > 1: _LOGGER.info( "Adaptive polling: tracker %d saw activity, restoring base rate", tracker_id, ) state["empty_count"] = 0 state["skip_every"] = 1 state["tick_counter"] = 0 return state["empty_count"] = state.get("empty_count", 0) + 1 if ( state["empty_count"] >= _ADAPTIVE_QUARTER_THRESHOLD and state["skip_every"] < _ADAPTIVE_MAX_SKIP ): state["skip_every"] = _ADAPTIVE_MAX_SKIP _LOGGER.info( "Adaptive polling: tracker %d idle for %d ticks, skipping 3 of 4", tracker_id, state["empty_count"], ) elif ( state["empty_count"] >= _ADAPTIVE_HALVE_THRESHOLD and state["skip_every"] < 2 ): state["skip_every"] = 2 _LOGGER.info( "Adaptive polling: tracker %d idle for %d ticks, skipping every other", tracker_id, state["empty_count"], ) def reset_adaptive_state(tracker_id: int) -> None: """Drop cached adaptive counters for a tracker. Used by API callers that make changes requiring the tracker to run promptly on the next scheduled tick (enable/disable, config edits, manual "check now" actions). """ _adaptive_state.pop(tracker_id, None) async def _poll_tracker(tracker_id: int) -> None: """Poll a tracker for changes.""" from .watcher import check_tracker if _adaptive_should_skip(tracker_id): return try: result = await check_tracker(tracker_id) except Exception as e: _LOGGER.error("Error polling tracker %d: %s", tracker_id, e) return # Treat the "error" / "skipped" statuses as inconclusive — don't let # a transient upstream failure trick the heuristic into backing off. if isinstance(result, dict) and result.get("status") == "ok": _adaptive_update(tracker_id, int(result.get("events_detected", 0) or 0)) # --------------------------------------------------------------------------- # Action scheduling # --------------------------------------------------------------------------- async def _load_action_jobs() -> None: """Load enabled actions and schedule execution jobs.""" from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from ..database.engine import get_engine from ..database.models import Action engine = get_engine() scheduler = get_scheduler() async with AsyncSession(engine) as session: result = await session.exec( select(Action).where(Action.enabled == True) # noqa: E712 ) actions = result.all() tz = await _load_app_timezone() for action in actions: job_id = f"action_{action.id}" if scheduler.get_job(job_id): continue if action.schedule_type == "cron" and action.schedule_cron: try: from apscheduler.triggers.cron import CronTrigger trigger = CronTrigger.from_crontab(action.schedule_cron, timezone=tz) scheduler.add_job( _run_action, trigger, id=job_id, args=[action.id], replace_existing=True, ) _LOGGER.info( "Scheduled action %d (%s) with cron: %s [tz=%s]", action.id, action.name, action.schedule_cron, tz.key, ) continue except Exception as e: _LOGGER.error( "Invalid cron for action %d (%s): %s — falling back to interval", action.id, action.name, e, ) scheduler.add_job( _run_action, "interval", seconds=action.schedule_interval, id=job_id, args=[action.id], replace_existing=True, ) _LOGGER.info( "Scheduled action %d (%s) every %ds", action.id, action.name, action.schedule_interval, ) async def schedule_action( action_id: int, schedule_type: str = "interval", interval: int = 3600, cron_expression: str = "", ) -> None: """Add or update a scheduler job for an action.""" scheduler = get_scheduler() job_id = f"action_{action_id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) if schedule_type == "cron" and cron_expression: try: from apscheduler.triggers.cron import CronTrigger tz = await _load_app_timezone() trigger = CronTrigger.from_crontab(cron_expression, timezone=tz) scheduler.add_job( _run_action, trigger, id=job_id, args=[action_id], replace_existing=True, ) _LOGGER.info( "Scheduled action %d with cron: %s [tz=%s]", action_id, cron_expression, tz.key, ) return except Exception as e: _LOGGER.error("Invalid cron for action %d: %s — using interval", action_id, e) scheduler.add_job( _run_action, "interval", seconds=interval, id=job_id, args=[action_id], replace_existing=True, ) _LOGGER.info("Scheduled action %d every %ds", action_id, interval) async def unschedule_action(action_id: int) -> None: """Remove a scheduler job for an action.""" scheduler = get_scheduler() job_id = f"action_{action_id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) _LOGGER.info("Unscheduled action %d", action_id) async def reschedule_cron_jobs_for_timezone_change() -> None: """Re-add every cron-triggered tracker/action job under the new app timezone. Called by the admin settings endpoint after the ``timezone`` AppSetting is updated. APScheduler's ``CronTrigger`` freezes its timezone at construction time, so a timezone change has no effect on jobs already in the scheduler — we have to rebuild those jobs. Interval-triggered jobs are tz-agnostic and are left alone. """ from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from ..database.engine import get_engine from ..database.models import Action, NotificationTracker, ServiceProvider as ServiceProviderModel engine = get_engine() scheduler = get_scheduler() tz = await _load_app_timezone() rescheduled = 0 async with AsyncSession(engine) as session: # Trackers with cron scheduling (scheduler provider + schedule_type=cron). trackers = (await session.exec( select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712 )).all() provider_ids = list({t.provider_id for t in trackers}) provider_types: dict[int, str] = {} if provider_ids: rows = await session.exec( select(ServiceProviderModel).where(ServiceProviderModel.id.in_(provider_ids)) ) provider_types = {p.id: p.type for p in rows.all()} for tracker in trackers: if provider_types.get(tracker.provider_id) != "scheduler": continue filters = tracker.filters or {} if filters.get("schedule_type") != "cron": continue cron_expr = filters.get("cron_expression", "") if not cron_expr: continue job_id = f"tracker_{tracker.id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) try: _add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name, tz) rescheduled += 1 except Exception as e: # noqa: BLE001 _LOGGER.error( "Failed to re-apply cron for tracker %d on tz change: %s", tracker.id, e, ) # Actions with cron schedules. actions = (await session.exec( select(Action).where(Action.enabled == True) # noqa: E712 )).all() from apscheduler.triggers.cron import CronTrigger for action in actions: if action.schedule_type != "cron" or not action.schedule_cron: continue job_id = f"action_{action.id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) try: scheduler.add_job( _run_action, CronTrigger.from_crontab(action.schedule_cron, timezone=tz), id=job_id, args=[action.id], replace_existing=True, ) rescheduled += 1 except Exception as e: # noqa: BLE001 _LOGGER.error( "Failed to re-apply cron for action %d on tz change: %s", action.id, e, ) _LOGGER.info( "Rescheduled %d cron job(s) for new app timezone %s", rescheduled, tz.key, ) # Immich scheduled/periodic/memory jobs are also CronTrigger-based and # carry the same frozen-tz problem — rebuild them under the new tz. await reschedule_immich_dispatch_jobs() async def _run_action(action_id: int) -> None: """Run an action (called by APScheduler).""" from .action_runner import run_action try: await run_action(action_id, trigger="scheduled") except Exception as e: _LOGGER.error("Error running action %d: %s", action_id, e) # --------------------------------------------------------------------------- # Immich scheduled / periodic / memory dispatch (cron-fired) # # These three slots fire on wall-clock schedules taken from the tracker's # default ``TrackingConfig`` (``scheduled_times``, ``periodic_times``, # ``memory_times`` — comma-separated ``HH:MM`` strings) interpreted in the # app-level IANA timezone. The dispatch flow lives in # ``services.scheduled_dispatch``; this section just owns scheduling. # --------------------------------------------------------------------------- _IMMICH_DISPATCH_KINDS = ("scheduled", "periodic", "memory") _IMMICH_DISPATCH_PREFIX = "immich_dispatch_" def _parse_hhmm_list(raw: str) -> list[tuple[int, int]]: """Parse ``"09:00,18:30"`` → ``[(9, 0), (18, 30)]``, skipping bad entries. A typo in one slot must not prevent the others from scheduling — we log and move on rather than raising. """ out: list[tuple[int, int]] = [] for part in (raw or "").split(","): part = part.strip() if not part: continue try: h_str, m_str = part.split(":", 1) hour, minute = int(h_str), int(m_str) except ValueError: _LOGGER.warning("Skipping invalid time literal %r", part) continue if not (0 <= hour <= 23 and 0 <= minute <= 59): _LOGGER.warning("Skipping out-of-range time %r", part) continue out.append((hour, minute)) return out async def _run_immich_dispatch(tracker_id: int, kind: str) -> None: """APScheduler entry point — wraps the dispatch helper to swallow errors.""" from .scheduled_dispatch import dispatch_scheduled_for_tracker try: await dispatch_scheduled_for_tracker(tracker_id, kind) # type: ignore[arg-type] except Exception as err: # noqa: BLE001 _LOGGER.error( "Immich %s dispatch for tracker %d failed: %s", kind, tracker_id, err, ) async def _load_immich_dispatch_jobs() -> None: """Schedule cron jobs for every (tracker, kind, time) where the kind is on. Reads each enabled Immich tracker's *default* tracking config — per-link overrides only gate dispatch (handled in ``scheduled_dispatch``), they do not influence the fire schedule. """ from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from apscheduler.triggers.cron import CronTrigger from ..database.engine import get_engine from ..database.models import ( NotificationTracker, ServiceProvider as ServiceProviderModel, TrackingConfig, ) engine = get_engine() scheduler = get_scheduler() tz = await _load_app_timezone() async with AsyncSession(engine) as session: trackers = (await session.exec( select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712 )).all() if not trackers: return provider_ids = list({t.provider_id for t in trackers}) provider_types: dict[int, str] = {} if provider_ids: rows = await session.exec( select(ServiceProviderModel).where( ServiceProviderModel.id.in_(provider_ids) ) ) provider_types = {p.id: p.type for p in rows.all()} tc_ids = list({ t.default_tracking_config_id for t in trackers if t.default_tracking_config_id }) tc_map: dict[int, TrackingConfig] = {} if tc_ids: rows = await session.exec( select(TrackingConfig).where(TrackingConfig.id.in_(tc_ids)) ) tc_map = {tc.id: tc for tc in rows.all()} scheduled = 0 for tracker in trackers: if provider_types.get(tracker.provider_id) != "immich": continue tc = tc_map.get(tracker.default_tracking_config_id) if tracker.default_tracking_config_id else None if tc is None: continue for kind in _IMMICH_DISPATCH_KINDS: if not getattr(tc, f"{kind}_enabled", False): continue times_raw = getattr(tc, f"{kind}_times", "") or "" for hour, minute in _parse_hhmm_list(times_raw): job_id = f"{_IMMICH_DISPATCH_PREFIX}{kind}_{tracker.id}_{hour:02d}{minute:02d}" scheduler.add_job( _run_immich_dispatch, CronTrigger(hour=hour, minute=minute, timezone=tz), id=job_id, args=[tracker.id, kind], replace_existing=True, max_instances=1, ) scheduled += 1 _LOGGER.info( "Scheduled Immich %s for tracker %d at %02d:%02d [tz=%s]", kind, tracker.id, hour, minute, tz.key, ) if scheduled: _LOGGER.info( "Loaded %d Immich scheduled/periodic/memory job(s) [tz=%s]", scheduled, tz.key, ) async def reschedule_immich_dispatch_jobs() -> None: """Drop and rebuild all Immich scheduled/periodic/memory jobs. Cheap to call on every relevant mutation — a typical install has only a handful of trackers. Called from the tracker, link, and tracking-config CRUD endpoints, and from ``reschedule_cron_jobs_for_timezone_change``. """ scheduler = get_scheduler() for job in list(scheduler.get_jobs()): if job.id.startswith(_IMMICH_DISPATCH_PREFIX): scheduler.remove_job(job.id) await _load_immich_dispatch_jobs() # --------------------------------------------------------------------------- # Scheduled backup # --------------------------------------------------------------------------- _BACKUP_JOB_ID = "scheduled_backup" async def _load_backup_job() -> None: """Load scheduled backup job from settings if enabled.""" from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession as _AS from ..database.engine import get_engine from ..database.models import AppSetting engine = get_engine() async with _AS(engine) as session: enabled_row = await session.get(AppSetting, "backup_scheduled_enabled") interval_row = await session.get(AppSetting, "backup_scheduled_interval_hours") enabled = enabled_row and enabled_row.value == "true" if not enabled: return interval_hours = int(interval_row.value) if interval_row and interval_row.value else 24 scheduler = get_scheduler() scheduler.add_job( _run_scheduled_backup, "interval", hours=interval_hours, id=_BACKUP_JOB_ID, replace_existing=True, max_instances=1, ) _LOGGER.info("Scheduled backup every %dh", interval_hours) async def schedule_backup(interval_hours: int = 24) -> None: """Add or update the scheduled backup job.""" scheduler = get_scheduler() if scheduler.get_job(_BACKUP_JOB_ID): scheduler.remove_job(_BACKUP_JOB_ID) scheduler.add_job( _run_scheduled_backup, "interval", hours=interval_hours, id=_BACKUP_JOB_ID, replace_existing=True, max_instances=1, ) _LOGGER.info("Scheduled backup every %dh", interval_hours) async def unschedule_backup() -> None: """Remove the scheduled backup job.""" scheduler = get_scheduler() if scheduler.get_job(_BACKUP_JOB_ID): scheduler.remove_job(_BACKUP_JOB_ID) _LOGGER.info("Unscheduled backup job") async def _run_scheduled_backup() -> None: """Run a scheduled backup (called by APScheduler).""" from sqlmodel.ext.asyncio.session import AsyncSession as _AS from ..database.engine import get_engine from ..database.models import AppSetting, User from ..config import settings as app_config from .backup_schema import SecretsMode from .backup_service import export_backup_to_file, cleanup_old_backups try: engine = get_engine() async with _AS(engine) as session: # Read settings secrets_row = await session.get(AppSetting, "backup_secrets_mode") retention_row = await session.get(AppSetting, "backup_retention_count") secrets_mode = SecretsMode(secrets_row.value) if secrets_row and secrets_row.value else SecretsMode.EXCLUDE retention = int(retention_row.value) if retention_row and retention_row.value else 5 # Find admin user (first admin) for ownership context from sqlmodel import select admin_result = await session.exec( select(User).where(User.role == "admin") ) admin = admin_result.first() if not admin: _LOGGER.warning("No admin user found, skipping scheduled backup") return backup_dir = app_config.data_dir / "backups" await export_backup_to_file(session, admin.id, backup_dir, secrets_mode) # Cleanup outside the session cleanup_old_backups(backup_dir, keep=retention) except Exception as e: _LOGGER.error("Scheduled backup failed: %s", e)