Files
alexei.dolgolyov 8065e6effa feat(immich): multi-time-point scheduling for scheduled/periodic/memory
Replace the single comma-separated text box with an add/remove list of
native HH:MM pickers for the Immich scheduled-assets, periodic-summary,
and memory slots. The backend already stored comma-separated *_times and
scheduled one cron job per time; this makes entering several times per
day discoverable and hardens the read/write path.

Backend:
- services/time_list.py: normalize_time_list (validate / dedup / sort /
  cap at 24, raising TimeListError) + lenient parse_hhmm_list; the
  scheduler now uses the shared parser (drops its private copy).
- tracking_configs API normalizes *_times on every write (422 on bad
  input) and rejects enabling a slot whose times list is empty.
- scheduler warns when an enabled slot has zero or dropped fire times,
  restoring the observability lost with the old per-call warning.

Frontend:
- TimeListEditor.svelte: add/remove native time rows, dedupe + sort on
  emit, per-day cap, collapses on-screen duplicates, aria-labelled rows;
  syncs from the value prop only on external changes (untrack guard) so
  keyboard entry isn't clobbered mid-edit.
- Descriptor-driven save guard: an enabled feature section must have at
  least one time.
- i18n (en/ru) keys; refreshed help text; removed dead invalidTimeList.

Tests: time_list normalization/parsing (incl. non-ASCII/odd shapes) and
the enabled-implies-times validation.
2026-05-29 14:57:41 +03:00

1361 lines
50 KiB
Python

"""APScheduler-based polling scheduler for trackers and actions."""
from __future__ import annotations
import logging
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from .time_list import parse_hhmm_list
_LOGGER = logging.getLogger(__name__)
def _resolve_zoneinfo(tz_name: str | None) -> ZoneInfo:
"""Resolve an IANA tz string to a ZoneInfo, falling back to UTC on any error.
Kept local to avoid importing from api/dispatch layers inside the scheduler
module (which is loaded at startup, before the API routers).
"""
if not tz_name:
return ZoneInfo("UTC")
try:
return ZoneInfo(tz_name)
except (ZoneInfoNotFoundError, ValueError):
_LOGGER.warning("Unknown timezone %r; falling back to UTC", tz_name)
return ZoneInfo("UTC")
async def _load_app_timezone() -> ZoneInfo:
"""Load the admin-configured app timezone from AppSetting (falls back to UTC)."""
from sqlmodel.ext.asyncio.session import AsyncSession
from ..api.app_settings import get_setting
from ..database.engine import get_engine
async with AsyncSession(get_engine()) as session:
tz_name = await get_setting(session, "timezone")
return _resolve_zoneinfo(tz_name)
_scheduler: AsyncIOScheduler | None = None
# ---------------------------------------------------------------------------
# Adaptive polling (Tier 6 of the big-album optimization plan).
#
# We don't touch the user-configured ``scan_interval`` — that's still the
# authoritative cadence. Instead, we *skip* a growing fraction of scheduled
# ticks when a tracker is idle, and reset to 1:1 as soon as it detects
# anything. The scheduler keeps running on the user's chosen period, so
# response time to the *first* change after an idle stretch is never worse
# than one tick — but the steady-state HTTP cost for a fleet of idle
# trackers drops by ~75%.
#
# Opt-in per tracker via the ``adaptive_max_skip`` column:
# * NULL or 0 → adaptive polling disabled, every tick runs (default)
# * 2 → skip at most 1-in-2 ticks after long idle
# * 3, 4, ... → up to (N-1)-in-N skipping
# Thresholds are intentionally conservative: a tracker polling every 30 s
# needs 5 min of silence before we halve its effective rate, and 15 min
# before we quarter it.
# ---------------------------------------------------------------------------
_ADAPTIVE_HALVE_THRESHOLD = 10 # consecutive empty ticks → 1-in-2
_ADAPTIVE_QUARTER_THRESHOLD = 30 # consecutive empty ticks → 1-in-4
# Per-tracker adaptive state, keyed by tracker_id. Rebuilt on process
# restart — a short warmup period is fine and avoids persisting what is
# effectively a performance heuristic.
_adaptive_state: dict[int, dict[str, int]] = {}
# Per-tracker cap on the skip factor, mirrored from the DB column at
# schedule time. Absence of an entry (or 0) means adaptive polling is off
# for that tracker — ``_adaptive_should_skip`` returns False immediately.
_adaptive_max_skip: dict[int, int] = {}
def set_adaptive_max_skip(tracker_id: int, max_skip: int | None) -> None:
"""Register/clear the adaptive cap for a tracker.
Called by the scheduling helpers so the tick-fast-path in
``_adaptive_should_skip`` doesn't need to re-query the DB. Values ≤ 1
disable back-off for the tracker — every scheduled tick runs.
"""
if max_skip and max_skip > 1:
_adaptive_max_skip[tracker_id] = int(max_skip)
else:
_adaptive_max_skip.pop(tracker_id, None)
# Opting in/out mid-session should drop any prior counters so the
# new behavior applies from the next tick, not N ticks later.
_adaptive_state.pop(tracker_id, None)
def _compute_jitter(interval_seconds: int) -> int:
"""Return a jitter bound (in seconds) suitable for an IntervalTrigger.
Without jitter, a fleet of N trackers all on ``scan_interval=60`` wake up
at the same wall-clock second every minute — that creates a thundering-
herd on the upstream Immich/Gitea/etc. server. APScheduler's ``jitter``
randomizes each tick's firing time by ±jitter seconds.
We use a quarter of the interval up to a 30 s cap. For short intervals
(≤8 s) jitter would round to 0 — that's fine, at those cadences a
bursty pattern is what the user implicitly opted into.
"""
if interval_seconds <= 0:
return 0
return min(interval_seconds // 4, 30)
def get_scheduler() -> AsyncIOScheduler:
global _scheduler
if _scheduler is None:
# Sensible production defaults applied to every job unless overridden:
# * coalesce — collapse a queue of missed runs into one firing after
# a restart / pause, instead of bursting to catch up.
# * misfire_grace_time — accept firings up to 5 min late without
# dropping them silently.
# * max_instances=1 — never run two copies of the same tracker tick
# concurrently; the scheduler already enforces this on add_job,
# but we also set it as the default for safety.
_scheduler = AsyncIOScheduler(
job_defaults={
"coalesce": True,
"misfire_grace_time": 300,
"max_instances": 1,
},
)
return _scheduler
async def start_scheduler() -> None:
scheduler = get_scheduler()
if not scheduler.running:
scheduler.start()
_LOGGER.info("Scheduler started")
await _load_tracker_jobs()
await _load_action_jobs()
await _load_immich_dispatch_jobs()
# Start Telegram bot polling for bots with active command listeners
from .telegram_poller import start_command_listener_polling
await start_command_listener_polling()
# Schedule daily cleanup of old event log entries
_schedule_event_cleanup()
# Schedule periodic Telegram chat title refresh
_schedule_telegram_chat_sync()
# Start debounced command auto-sync scheduler
from .command_sync import start_sync_scheduler
start_sync_scheduler()
# Load scheduled backup job if enabled
await _load_backup_job()
# Re-arm any deferred-dispatch drains that were pending across restart.
from .deferred_dispatch import load_pending_drain_jobs
await load_pending_drain_jobs()
# And install the periodic safety-net catch-up scan.
_schedule_drain_catchup()
# Schedule the upstream release-check probe.
await _schedule_release_check()
# Schedule the bridge_self deferred-backlog scan (every 5 min).
_schedule_bridge_self_backlog_scan()
def _schedule_event_cleanup() -> None:
"""Schedule a daily job to delete EventLog entries older than 90 days."""
from apscheduler.triggers.cron import CronTrigger
scheduler = get_scheduler()
job_id = "cleanup_old_events"
if scheduler.get_job(job_id):
return
scheduler.add_job(
_cleanup_old_events,
CronTrigger(hour=3, minute=0),
id=job_id,
replace_existing=True,
max_instances=1,
)
_LOGGER.info("Scheduled daily event log cleanup at 03:00 UTC")
# Chat-title refresh tuning.
# Sweep runs daily as a fallback — we additionally refresh opportunistically
# on every incoming webhook/long-poll update (``save_chat_from_webhook``), so
# the sweep only catches chats that haven't sent anything recently.
_CHAT_SYNC_INTERVAL_HOURS = 24
_CHAT_SYNC_INITIAL_DELAY_SECONDS = 60
_CHAT_SYNC_CONCURRENCY = 10
def _schedule_telegram_chat_sync() -> None:
"""Schedule periodic refresh of Telegram chat titles via getChat."""
from apscheduler.triggers.interval import IntervalTrigger
scheduler = get_scheduler()
job_id = "refresh_telegram_chat_titles"
if scheduler.get_job(job_id):
return
scheduler.add_job(
_refresh_telegram_chat_titles,
IntervalTrigger(hours=_CHAT_SYNC_INTERVAL_HOURS),
id=job_id,
replace_existing=True,
max_instances=1,
next_run_time=None,
)
# Fire once shortly after startup so stale names refresh without waiting a day.
from datetime import datetime, timedelta, timezone
scheduler.add_job(
_refresh_telegram_chat_titles,
"date",
run_date=datetime.now(timezone.utc) + timedelta(seconds=_CHAT_SYNC_INITIAL_DELAY_SECONDS),
id="refresh_telegram_chat_titles_once",
replace_existing=True,
max_instances=1,
)
_LOGGER.info(
"Scheduled Telegram chat title refresh every %sh (concurrency %s)",
_CHAT_SYNC_INTERVAL_HOURS, _CHAT_SYNC_CONCURRENCY,
)
async def _refresh_telegram_chat_titles() -> None:
"""Refresh TelegramChat.title/username via getChat for all known chats.
Runs requests in bounded parallel (``_CHAT_SYNC_CONCURRENCY``) so a fleet
of 50 chats finishes in ~5 round-trips instead of 50. Telegram's
``getChat`` rate limit is well above 10 concurrent per bot, and the cap is
global across bots so we never flood the shared HTTP session.
"""
import asyncio
from collections import defaultdict
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from notify_bridge_core.notifications.telegram.client import TelegramClient
from ..database.engine import get_engine
from ..database.models import TelegramBot, TelegramChat
from .http_session import get_http_session
engine = get_engine()
async with AsyncSession(engine) as session:
bots = (await session.exec(select(TelegramBot))).all()
bot_tokens = {b.id: b.token for b in bots if b.token}
if not bot_tokens:
return
chats = (await session.exec(select(TelegramChat))).all()
by_bot: dict[int, list[TelegramChat]] = defaultdict(list)
for chat in chats:
if chat.bot_id in bot_tokens:
by_bot[chat.bot_id].append(chat)
if not by_bot:
return
http = await get_http_session()
clients_by_bot = {
bot_id: TelegramClient(http, token) for bot_id, token in bot_tokens.items()
}
sem = asyncio.Semaphore(_CHAT_SYNC_CONCURRENCY)
async def _fetch(bot_id: int, chat: TelegramChat) -> tuple[int, dict | None, str | None]:
"""Return (chat_row_id, info_dict_or_None, error_message_or_None)."""
async with sem:
try:
res = await clients_by_bot[bot_id].get_chat(chat.chat_id)
except Exception as err: # noqa: BLE001
return chat.id, None, str(err)
if not res.get("success"):
return chat.id, None, res.get("error") or "unknown"
return chat.id, (res.get("result") or {}), None
tasks = [
_fetch(bot_id, chat)
for bot_id, bot_chats in by_bot.items()
for chat in bot_chats
]
results = await asyncio.gather(*tasks)
refreshed = 0
errors = 0
# Bucket results first, then fetch all rows in one IN-query instead of
# per-row ``session.get`` — otherwise a 50-chat fleet issues 50 extra
# SELECTs before commit.
successes: dict[int, dict] = {}
for chat_id, info, err in results:
if err is not None or info is None:
errors += 1
if err:
_LOGGER.debug("getChat failed for chat row %s: %s", chat_id, err)
continue
if chat_id is not None:
successes[chat_id] = info
async with AsyncSession(engine) as session:
if successes:
rows = (await session.exec(
select(TelegramChat).where(TelegramChat.id.in_(list(successes.keys())))
)).all()
for merged in rows:
info = successes.get(merged.id)
if not info:
continue
title = info.get("title") or (
(info.get("first_name", "") + " " + info.get("last_name", "")).strip()
)
changed = False
if title and merged.title != title:
merged.title = title
changed = True
new_username = info.get("username")
if new_username is not None and merged.username != new_username:
merged.username = new_username
changed = True
if changed:
session.add(merged)
refreshed += 1
await session.commit()
_LOGGER.info(
"Telegram chat title refresh: %s updated, %s errors", refreshed, errors
)
async def _cleanup_old_events() -> None:
"""Delete EventLog / WebhookPayloadLog / ActionExecution rows older than the
configured retention window. A retention of 0 disables the job.
"""
from datetime import datetime, timedelta, timezone
from sqlmodel import delete
from sqlmodel.ext.asyncio.session import AsyncSession
from ..config import settings
from ..database.engine import get_engine
from ..database.models import ActionExecution, EventLog, WebhookPayloadLog
days = settings.event_log_retention_days
if days <= 0:
_LOGGER.debug("Event log retention disabled (days=0); skipping cleanup")
return
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
engine = get_engine()
async with AsyncSession(engine) as session:
await session.exec(delete(EventLog).where(EventLog.created_at < cutoff))
await session.exec(
delete(WebhookPayloadLog).where(WebhookPayloadLog.created_at < cutoff)
)
await session.exec(
delete(ActionExecution).where(ActionExecution.started_at < cutoff)
)
await session.commit()
_LOGGER.info(
"Cleaned event_log / webhook_payload_log / action_execution older than %s",
cutoff.date(),
)
async def _load_tracker_jobs() -> None:
"""Load enabled trackers and schedule polling jobs."""
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..database.engine import get_engine
from ..database.models import NotificationTracker, ServiceProvider as ServiceProviderModel
engine = get_engine()
scheduler = get_scheduler()
async with AsyncSession(engine) as session:
result = await session.exec(select(NotificationTracker).where(NotificationTracker.enabled == True))
trackers = result.all()
# Batch-load provider types for scheduler detection
unique_provider_ids = list({t.provider_id for t in trackers})
provider_types: dict[int, str] = {}
if unique_provider_ids:
provider_result = await session.exec(
select(ServiceProviderModel).where(
ServiceProviderModel.id.in_(unique_provider_ids)
)
)
provider_types = {p.id: p.type for p in provider_result.all()}
tz = await _load_app_timezone()
from notify_bridge_core.providers.capabilities import get_capabilities
for tracker in trackers:
job_id = f"tracker_{tracker.id}"
if scheduler.get_job(job_id):
continue
ptype = provider_types.get(tracker.provider_id, "")
filters = tracker.filters or {}
# Webhook-based providers receive events via inbound HTTP — there is
# nothing to poll. Scheduling an interval job for them just wakes up
# check_tracker every scan_interval seconds to immediately return,
# wasting CPU and DB queries for no work.
caps = get_capabilities(ptype) if ptype else None
if caps and caps.webhook_based:
_LOGGER.debug(
"Skipping interval scheduling for webhook tracker %d (%s, type=%s)",
tracker.id, tracker.name, ptype,
)
continue
# Scheduler providers can use cron triggers
if ptype == "scheduler" and filters.get("schedule_type") == "cron":
cron_expr = filters.get("cron_expression", "")
if cron_expr:
try:
_add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name, tz)
continue
except Exception as e:
_LOGGER.error(
"Invalid cron expression for tracker %d (%s): %s — falling back to interval",
tracker.id, tracker.name, e,
)
jitter = _compute_jitter(tracker.scan_interval)
scheduler.add_job(
_poll_tracker,
"interval",
seconds=tracker.scan_interval,
jitter=jitter or None,
id=job_id,
args=[tracker.id],
replace_existing=True,
max_instances=1,
)
set_adaptive_max_skip(tracker.id, tracker.adaptive_max_skip)
_LOGGER.info(
"Scheduled tracker %d (%s) every %ds (jitter ±%ds, adaptive_max_skip=%s)",
tracker.id, tracker.name, tracker.scan_interval, jitter,
tracker.adaptive_max_skip,
)
def _add_cron_job(
scheduler: AsyncIOScheduler,
job_id: str,
tracker_id: int,
cron_expression: str,
tracker_name: str,
tz: ZoneInfo,
) -> None:
"""Add a cron-triggered job for a scheduler-type tracker.
``tz`` is the user-configured app timezone; without it APScheduler
interprets the crontab in the host's local timezone, which surfaces as
events firing at the "wrong" wall-clock time for operators in a non-UTC
zone (see the companion fix in ``update_settings`` which reschedules on
timezone changes).
"""
from apscheduler.triggers.cron import CronTrigger
trigger = CronTrigger.from_crontab(cron_expression, timezone=tz)
scheduler.add_job(
_poll_tracker,
trigger,
id=job_id,
args=[tracker_id],
replace_existing=True,
max_instances=1,
)
_LOGGER.info(
"Scheduled tracker %d (%s) with cron: %s [tz=%s]",
tracker_id, tracker_name, cron_expression, tz.key,
)
async def _is_webhook_tracker(tracker_id: int) -> bool:
"""Return True iff the tracker's provider type is webhook-based.
Looks up provider type once via the capabilities registry. Used by
``schedule_tracker`` to short-circuit interval scheduling.
"""
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from notify_bridge_core.providers.capabilities import get_capabilities
from ..database.engine import get_engine
from ..database.models import NotificationTracker, ServiceProvider as ServiceProviderModel
async with AsyncSession(get_engine()) as session:
tracker = await session.get(NotificationTracker, tracker_id)
if tracker is None:
return False
provider = await session.get(ServiceProviderModel, tracker.provider_id)
if provider is None:
return False
caps = get_capabilities(provider.type)
return bool(caps and caps.webhook_based)
async def schedule_tracker(
tracker_id: int,
interval: int,
cron_expression: str | None = None,
adaptive_max_skip: int | None = None,
) -> None:
"""Add or update a scheduler job for a tracker.
``adaptive_max_skip`` mirrors the DB column and is registered with the
adaptive module-state so tick-time skip decisions don't re-query the DB.
Pass ``None`` or ``0`` to disable back-off for the tracker.
Webhook-based providers receive events via inbound HTTP and have nothing
to poll, so this no-ops for them — preventing scan_interval from creating
useless wakeups via the API create/update path.
"""
scheduler = get_scheduler()
job_id = f"tracker_{tracker_id}"
# A reschedule typically follows a config edit or enable/disable flip —
# drop adaptive back-off so the first tick after the change runs promptly.
reset_adaptive_state(tracker_id)
set_adaptive_max_skip(tracker_id, adaptive_max_skip)
# Remove existing job first to allow trigger type changes
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
# Webhook-based providers don't poll — skip job creation entirely.
if await _is_webhook_tracker(tracker_id):
_LOGGER.debug(
"Skipping interval scheduling for webhook tracker %d", tracker_id,
)
return
if cron_expression:
try:
tz = await _load_app_timezone()
_add_cron_job(scheduler, job_id, tracker_id, cron_expression, f"tracker-{tracker_id}", tz)
return
except Exception as e:
_LOGGER.error("Invalid cron for tracker %d: %s — using interval", tracker_id, e)
jitter = _compute_jitter(interval)
scheduler.add_job(
_poll_tracker,
"interval",
seconds=interval,
jitter=jitter or None,
id=job_id,
args=[tracker_id],
replace_existing=True,
)
_LOGGER.info(
"Scheduled tracker %d every %ds (jitter ±%ds, adaptive_max_skip=%s)",
tracker_id, interval, jitter, adaptive_max_skip,
)
async def unschedule_tracker(tracker_id: int) -> None:
"""Remove a scheduler job for a tracker."""
scheduler = get_scheduler()
job_id = f"tracker_{tracker_id}"
reset_adaptive_state(tracker_id)
_adaptive_max_skip.pop(tracker_id, None)
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
_LOGGER.info("Unscheduled tracker %d", tracker_id)
def _adaptive_should_skip(tracker_id: int) -> bool:
"""Return True when the adaptive heuristic says to skip this tick.
Short-circuits to False for trackers without a registered cap (adaptive
off). Otherwise: if we're in 1-in-K mode, skip (K-1) ticks between each
real poll.
"""
if tracker_id not in _adaptive_max_skip:
return False
state = _adaptive_state.get(tracker_id)
if not state:
return False
skip_every = state.get("skip_every", 1)
if skip_every <= 1:
return False
state["tick_counter"] = state.get("tick_counter", 0) + 1
# Fire on ticks where counter % skip_every == 0; skip the rest.
return (state["tick_counter"] % skip_every) != 0
def _adaptive_update(tracker_id: int, events_detected: int) -> None:
"""Update the adaptive counter after a real tick ran.
No-op when the tracker has adaptive polling disabled — otherwise we'd
build up empty counters for trackers that will never use them.
"""
cap = _adaptive_max_skip.get(tracker_id)
if not cap or cap <= 1:
return
state = _adaptive_state.setdefault(
tracker_id, {"empty_count": 0, "skip_every": 1, "tick_counter": 0}
)
if events_detected > 0:
if state["skip_every"] > 1:
_LOGGER.info(
"Adaptive polling: tracker %d saw activity, restoring base rate",
tracker_id,
)
state["empty_count"] = 0
state["skip_every"] = 1
state["tick_counter"] = 0
return
state["empty_count"] = state.get("empty_count", 0) + 1
target_quarter = min(cap, 4)
if (
state["empty_count"] >= _ADAPTIVE_QUARTER_THRESHOLD
and state["skip_every"] < target_quarter
):
state["skip_every"] = target_quarter
_LOGGER.info(
"Adaptive polling: tracker %d idle for %d ticks, skipping %d of %d",
tracker_id, state["empty_count"],
target_quarter - 1, target_quarter,
)
elif (
state["empty_count"] >= _ADAPTIVE_HALVE_THRESHOLD
and state["skip_every"] < min(cap, 2)
):
state["skip_every"] = min(cap, 2)
_LOGGER.info(
"Adaptive polling: tracker %d idle for %d ticks, skipping every other",
tracker_id, state["empty_count"],
)
def reset_adaptive_state(tracker_id: int) -> None:
"""Drop cached adaptive counters for a tracker.
Used by API callers that make changes requiring the tracker to run
promptly on the next scheduled tick (enable/disable, config edits,
manual "check now" actions). Does NOT clear the configured cap — use
``set_adaptive_max_skip(..., None)`` for that.
"""
_adaptive_state.pop(tracker_id, None)
async def _poll_tracker(tracker_id: int) -> None:
"""Poll a tracker for changes."""
from .watcher import check_tracker
if _adaptive_should_skip(tracker_id):
return
try:
result = await check_tracker(tracker_id)
except Exception as e:
_LOGGER.error("Error polling tracker %d: %s", tracker_id, e)
return
# Treat the "error" / "skipped" statuses as inconclusive — don't let
# a transient upstream failure trick the heuristic into backing off.
if isinstance(result, dict) and result.get("status") == "ok":
_adaptive_update(tracker_id, int(result.get("events_detected", 0) or 0))
# ---------------------------------------------------------------------------
# Action scheduling
# ---------------------------------------------------------------------------
async def _load_action_jobs() -> None:
"""Load enabled actions and schedule execution jobs."""
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..database.engine import get_engine
from ..database.models import Action
engine = get_engine()
scheduler = get_scheduler()
async with AsyncSession(engine) as session:
result = await session.exec(
select(Action).where(Action.enabled == True) # noqa: E712
)
actions = result.all()
tz = await _load_app_timezone()
for action in actions:
job_id = f"action_{action.id}"
if scheduler.get_job(job_id):
continue
if action.schedule_type == "cron" and action.schedule_cron:
try:
from apscheduler.triggers.cron import CronTrigger
trigger = CronTrigger.from_crontab(action.schedule_cron, timezone=tz)
scheduler.add_job(
_run_action,
trigger,
id=job_id,
args=[action.id],
replace_existing=True,
)
_LOGGER.info(
"Scheduled action %d (%s) with cron: %s [tz=%s]",
action.id, action.name, action.schedule_cron, tz.key,
)
continue
except Exception as e:
_LOGGER.error(
"Invalid cron for action %d (%s): %s — falling back to interval",
action.id, action.name, e,
)
scheduler.add_job(
_run_action,
"interval",
seconds=action.schedule_interval,
id=job_id,
args=[action.id],
replace_existing=True,
)
_LOGGER.info(
"Scheduled action %d (%s) every %ds",
action.id, action.name, action.schedule_interval,
)
async def schedule_action(
action_id: int,
schedule_type: str = "interval",
interval: int = 3600,
cron_expression: str = "",
) -> None:
"""Add or update a scheduler job for an action."""
scheduler = get_scheduler()
job_id = f"action_{action_id}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
if schedule_type == "cron" and cron_expression:
try:
from apscheduler.triggers.cron import CronTrigger
tz = await _load_app_timezone()
trigger = CronTrigger.from_crontab(cron_expression, timezone=tz)
scheduler.add_job(
_run_action,
trigger,
id=job_id,
args=[action_id],
replace_existing=True,
)
_LOGGER.info(
"Scheduled action %d with cron: %s [tz=%s]",
action_id, cron_expression, tz.key,
)
return
except Exception as e:
_LOGGER.error("Invalid cron for action %d: %s — using interval", action_id, e)
scheduler.add_job(
_run_action,
"interval",
seconds=interval,
id=job_id,
args=[action_id],
replace_existing=True,
)
_LOGGER.info("Scheduled action %d every %ds", action_id, interval)
async def unschedule_action(action_id: int) -> None:
"""Remove a scheduler job for an action."""
scheduler = get_scheduler()
job_id = f"action_{action_id}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
_LOGGER.info("Unscheduled action %d", action_id)
async def reschedule_cron_jobs_for_timezone_change() -> None:
"""Re-add every cron-triggered tracker/action job under the new app timezone.
Called by the admin settings endpoint after the ``timezone`` AppSetting is
updated. APScheduler's ``CronTrigger`` freezes its timezone at construction
time, so a timezone change has no effect on jobs already in the scheduler
— we have to rebuild those jobs. Interval-triggered jobs are tz-agnostic
and are left alone.
"""
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..database.engine import get_engine
from ..database.models import Action, NotificationTracker, ServiceProvider as ServiceProviderModel
engine = get_engine()
scheduler = get_scheduler()
tz = await _load_app_timezone()
rescheduled = 0
async with AsyncSession(engine) as session:
# Trackers with cron scheduling (scheduler provider + schedule_type=cron).
trackers = (await session.exec(
select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712
)).all()
provider_ids = list({t.provider_id for t in trackers})
provider_types: dict[int, str] = {}
if provider_ids:
rows = await session.exec(
select(ServiceProviderModel).where(ServiceProviderModel.id.in_(provider_ids))
)
provider_types = {p.id: p.type for p in rows.all()}
for tracker in trackers:
if provider_types.get(tracker.provider_id) != "scheduler":
continue
filters = tracker.filters or {}
if filters.get("schedule_type") != "cron":
continue
cron_expr = filters.get("cron_expression", "")
if not cron_expr:
continue
job_id = f"tracker_{tracker.id}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
try:
_add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name, tz)
rescheduled += 1
except Exception as e: # noqa: BLE001
_LOGGER.error(
"Failed to re-apply cron for tracker %d on tz change: %s",
tracker.id, e,
)
# Actions with cron schedules.
actions = (await session.exec(
select(Action).where(Action.enabled == True) # noqa: E712
)).all()
from apscheduler.triggers.cron import CronTrigger
for action in actions:
if action.schedule_type != "cron" or not action.schedule_cron:
continue
job_id = f"action_{action.id}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
try:
scheduler.add_job(
_run_action,
CronTrigger.from_crontab(action.schedule_cron, timezone=tz),
id=job_id,
args=[action.id],
replace_existing=True,
)
rescheduled += 1
except Exception as e: # noqa: BLE001
_LOGGER.error(
"Failed to re-apply cron for action %d on tz change: %s",
action.id, e,
)
_LOGGER.info(
"Rescheduled %d cron job(s) for new app timezone %s", rescheduled, tz.key,
)
# Immich scheduled/periodic/memory jobs are also CronTrigger-based and
# carry the same frozen-tz problem — rebuild them under the new tz.
await reschedule_immich_dispatch_jobs()
async def _run_action(action_id: int) -> None:
"""Run an action (called by APScheduler)."""
from .action_runner import run_action
try:
await run_action(action_id, trigger="scheduled")
except Exception as e:
_LOGGER.error("Error running action %d: %s", action_id, e)
# ---------------------------------------------------------------------------
# Immich scheduled / periodic / memory dispatch (cron-fired)
#
# These three slots fire on wall-clock schedules taken from the tracker's
# default ``TrackingConfig`` (``scheduled_times``, ``periodic_times``,
# ``memory_times`` — comma-separated ``HH:MM`` strings) interpreted in the
# app-level IANA timezone. The dispatch flow lives in
# ``services.scheduled_dispatch``; this section just owns scheduling.
# ---------------------------------------------------------------------------
_IMMICH_DISPATCH_KINDS = ("scheduled", "periodic", "memory")
_IMMICH_DISPATCH_PREFIX = "immich_dispatch_"
async def _run_immich_dispatch(tracker_id: int, kind: str) -> None:
"""APScheduler entry point — wraps the dispatch helper to swallow errors."""
from .scheduled_dispatch import dispatch_scheduled_for_tracker
try:
await dispatch_scheduled_for_tracker(tracker_id, kind) # type: ignore[arg-type]
except Exception as err: # noqa: BLE001
_LOGGER.error(
"Immich %s dispatch for tracker %d failed: %s", kind, tracker_id, err,
)
async def _load_immich_dispatch_jobs() -> None:
"""Schedule cron jobs for every (tracker, kind, time) where the kind is on.
Reads each enabled Immich tracker's *default* tracking config — per-link
overrides only gate dispatch (handled in ``scheduled_dispatch``), they do
not influence the fire schedule.
"""
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from apscheduler.triggers.cron import CronTrigger
from ..database.engine import get_engine
from ..database.models import (
NotificationTracker,
ServiceProvider as ServiceProviderModel,
TrackingConfig,
)
engine = get_engine()
scheduler = get_scheduler()
tz = await _load_app_timezone()
async with AsyncSession(engine) as session:
trackers = (await session.exec(
select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712
)).all()
if not trackers:
return
provider_ids = list({t.provider_id for t in trackers})
provider_types: dict[int, str] = {}
if provider_ids:
rows = await session.exec(
select(ServiceProviderModel).where(
ServiceProviderModel.id.in_(provider_ids)
)
)
provider_types = {p.id: p.type for p in rows.all()}
tc_ids = list({
t.default_tracking_config_id for t in trackers
if t.default_tracking_config_id
})
tc_map: dict[int, TrackingConfig] = {}
if tc_ids:
rows = await session.exec(
select(TrackingConfig).where(TrackingConfig.id.in_(tc_ids))
)
tc_map = {tc.id: tc for tc in rows.all()}
scheduled = 0
for tracker in trackers:
if provider_types.get(tracker.provider_id) != "immich":
continue
tc = tc_map.get(tracker.default_tracking_config_id) if tracker.default_tracking_config_id else None
if tc is None:
continue
for kind in _IMMICH_DISPATCH_KINDS:
if not getattr(tc, f"{kind}_enabled", False):
continue
times_raw = getattr(tc, f"{kind}_times", "") or ""
parsed = parse_hhmm_list(times_raw)
# Observability for misconfigured/legacy data: warn when some tokens
# were unparseable (the lenient parser drops them silently) and when
# an enabled slot resolves to zero fire times (it will never fire).
raw_tokens = [p for p in times_raw.split(",") if p.strip()]
if len(parsed) < len(raw_tokens):
_LOGGER.warning(
"Tracker %d %s: dropped %d unparseable time(s) from %r",
tracker.id, kind, len(raw_tokens) - len(parsed), times_raw,
)
if not parsed:
_LOGGER.warning(
"Tracker %d has %s enabled but no valid fire times (%r); "
"slot will not fire",
tracker.id, kind, times_raw,
)
continue
for hour, minute in parsed:
job_id = f"{_IMMICH_DISPATCH_PREFIX}{kind}_{tracker.id}_{hour:02d}{minute:02d}"
scheduler.add_job(
_run_immich_dispatch,
CronTrigger(hour=hour, minute=minute, timezone=tz),
id=job_id,
args=[tracker.id, kind],
replace_existing=True,
max_instances=1,
)
scheduled += 1
_LOGGER.info(
"Scheduled Immich %s for tracker %d at %02d:%02d [tz=%s]",
kind, tracker.id, hour, minute, tz.key,
)
if scheduled:
_LOGGER.info(
"Loaded %d Immich scheduled/periodic/memory job(s) [tz=%s]",
scheduled, tz.key,
)
async def reschedule_immich_dispatch_jobs() -> None:
"""Drop and rebuild all Immich scheduled/periodic/memory jobs.
Cheap to call on every relevant mutation — a typical install has only a
handful of trackers. Called from the tracker, link, and tracking-config
CRUD endpoints, and from ``reschedule_cron_jobs_for_timezone_change``.
"""
scheduler = get_scheduler()
for job in list(scheduler.get_jobs()):
if job.id.startswith(_IMMICH_DISPATCH_PREFIX):
scheduler.remove_job(job.id)
await _load_immich_dispatch_jobs()
# ---------------------------------------------------------------------------
# Scheduled backup
# ---------------------------------------------------------------------------
_BACKUP_JOB_ID = "scheduled_backup"
async def _load_backup_job() -> None:
"""Load scheduled backup job from settings if enabled."""
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession as _AS
from ..database.engine import get_engine
from ..database.models import AppSetting
engine = get_engine()
async with _AS(engine) as session:
enabled_row = await session.get(AppSetting, "backup_scheduled_enabled")
interval_row = await session.get(AppSetting, "backup_scheduled_interval_hours")
enabled = enabled_row and enabled_row.value == "true"
if not enabled:
return
interval_hours = int(interval_row.value) if interval_row and interval_row.value else 24
scheduler = get_scheduler()
scheduler.add_job(
_run_scheduled_backup,
"interval",
hours=interval_hours,
id=_BACKUP_JOB_ID,
replace_existing=True,
max_instances=1,
)
_LOGGER.info("Scheduled backup every %dh", interval_hours)
async def schedule_backup(interval_hours: int = 24) -> None:
"""Add or update the scheduled backup job."""
scheduler = get_scheduler()
if scheduler.get_job(_BACKUP_JOB_ID):
scheduler.remove_job(_BACKUP_JOB_ID)
scheduler.add_job(
_run_scheduled_backup,
"interval",
hours=interval_hours,
id=_BACKUP_JOB_ID,
replace_existing=True,
max_instances=1,
)
_LOGGER.info("Scheduled backup every %dh", interval_hours)
async def unschedule_backup() -> None:
"""Remove the scheduled backup job."""
scheduler = get_scheduler()
if scheduler.get_job(_BACKUP_JOB_ID):
scheduler.remove_job(_BACKUP_JOB_ID)
_LOGGER.info("Unscheduled backup job")
# ---------------------------------------------------------------------------
# Deferred-dispatch drain
# ---------------------------------------------------------------------------
#
# When ``defer_event`` enqueues a quiet-hours notification, the calling site
# asks us to add a one-shot ``date`` job at ``quiet_hours_end_at``. We key the
# job id by the minute-rounded end time so multiple defers that share the same
# window-end share a single drain job (idempotent via ``replace_existing``).
#
# At fire time the job runs ``drain_deferred_due`` which scans all pending
# rows and dispatches whatever is ready.
#
# A periodic catch-up scan runs every ``_DRAIN_CATCHUP_INTERVAL_SECONDS`` as
# the safety net for failure modes the one-shot job can't cover:
# * APScheduler's misfire grace exceeded (event loop blocked past fire_at;
# the date job is silently discarded by the scheduler)
# * Process killed between the deferred-row DB commit and the
# ``schedule_deferred_drain`` call — row exists, job doesn't
# * Clock drift / DST seam edge cases
_DEFERRED_DRAIN_PREFIX = "deferred_drain_"
_DEFERRED_DRAIN_CATCHUP_JOB = "deferred_drain_catchup"
# Generous so a temporarily-blocked event loop doesn't make the scheduler
# discard our drain job. Once discarded the deferred rows would wait for the
# next process restart or the catch-up scan below — survivable but visibly
# late from the user's perspective.
_DEFERRED_DRAIN_MISFIRE_GRACE_SECONDS = 3600
# 5 min trade-off between "promptness of late delivery" and "extra DB churn".
# The scan is a single indexed lookup on (status, fire_at).
_DRAIN_CATCHUP_INTERVAL_SECONDS = 300
def _drain_job_id_for(fire_at_utc: datetime) -> str:
# Include seconds — two trackers with quiet windows that end at the same
# minute but different seconds (e.g. user-set 06:00:00 vs 06:00:30) would
# otherwise collide on a single APScheduler job id, and ``replace_existing``
# would silently drop the second one.
return f"{_DEFERRED_DRAIN_PREFIX}{fire_at_utc.strftime('%Y%m%d%H%M%S')}"
def schedule_deferred_drain(fire_at_utc: datetime) -> None:
"""Add an idempotent one-shot drain job for ``fire_at_utc``.
Past times schedule a near-immediate firing (now+1s) — the drain query
handles ``fire_at <= now`` regardless of which job fired, so a near-miss
still picks up the work.
"""
from datetime import datetime, timezone
if fire_at_utc.tzinfo is None:
fire_at_utc = fire_at_utc.replace(tzinfo=timezone.utc)
scheduler = get_scheduler()
job_id = _drain_job_id_for(fire_at_utc)
run_at = fire_at_utc
if run_at <= datetime.now(timezone.utc):
from datetime import timedelta
run_at = datetime.now(timezone.utc) + timedelta(seconds=1)
scheduler.add_job(
_run_deferred_drain,
"date",
run_date=run_at,
id=job_id,
args=[fire_at_utc.isoformat()],
replace_existing=True,
max_instances=1,
# Override the global 5-min grace — see module-level comment.
misfire_grace_time=_DEFERRED_DRAIN_MISFIRE_GRACE_SECONDS,
)
_LOGGER.debug("Scheduled deferred drain %s (fire_at=%s)", job_id, fire_at_utc.isoformat())
def _schedule_drain_catchup() -> None:
"""Install the periodic catch-up scan. See module comment."""
from apscheduler.triggers.interval import IntervalTrigger
scheduler = get_scheduler()
if scheduler.get_job(_DEFERRED_DRAIN_CATCHUP_JOB):
return
scheduler.add_job(
_run_deferred_drain_catchup,
IntervalTrigger(seconds=_DRAIN_CATCHUP_INTERVAL_SECONDS),
id=_DEFERRED_DRAIN_CATCHUP_JOB,
replace_existing=True,
max_instances=1,
coalesce=True,
)
_LOGGER.info(
"Scheduled deferred-dispatch catch-up scan every %ds",
_DRAIN_CATCHUP_INTERVAL_SECONDS,
)
async def _run_deferred_drain(fire_at_iso: str) -> None:
"""APScheduler entry point — log the original fire_at then drain due rows.
The ``fire_at_iso`` arg is only used for logging; the drain itself picks
up every pending row whose ``fire_at`` has passed.
"""
from .deferred_dispatch import drain_deferred_due
try:
stats = await drain_deferred_due()
_LOGGER.info("Deferred drain (fire_at=%s) stats: %s", fire_at_iso, stats)
except Exception as err: # noqa: BLE001
_LOGGER.exception("Deferred drain (fire_at=%s) failed: %s", fire_at_iso, err)
async def _run_deferred_drain_catchup() -> None:
"""Periodic safety-net drain — see module comment.
Distinct from the per-fire-at job only in cadence and log line; calls the
same ``drain_deferred_due`` which is a no-op when nothing is due.
"""
from .deferred_dispatch import drain_deferred_due
try:
stats = await drain_deferred_due()
# Quiet at debug level when nothing happened — every 5 min is too
# noisy at info on an idle system.
if stats.get("fired") or stats.get("dropped") or stats.get("errors"):
_LOGGER.info("Deferred catch-up stats: %s", stats)
else:
_LOGGER.debug("Deferred catch-up stats: %s", stats)
except Exception as err: # noqa: BLE001
_LOGGER.exception("Deferred catch-up drain failed: %s", err)
async def _run_scheduled_backup() -> None:
"""Run a scheduled backup (called by APScheduler)."""
from sqlmodel.ext.asyncio.session import AsyncSession as _AS
from ..database.engine import get_engine
from ..database.models import AppSetting, User
from ..config import settings as app_config
from .backup_schema import SecretsMode
from .backup_service import export_backup_to_file, cleanup_old_backups
try:
engine = get_engine()
async with _AS(engine) as session:
# Read settings
secrets_row = await session.get(AppSetting, "backup_secrets_mode")
retention_row = await session.get(AppSetting, "backup_retention_count")
secrets_mode = SecretsMode(secrets_row.value) if secrets_row and secrets_row.value else SecretsMode.EXCLUDE
retention = int(retention_row.value) if retention_row and retention_row.value else 5
# Find admin user (first admin) for ownership context
from sqlmodel import select
admin_result = await session.exec(
select(User).where(User.role == "admin")
)
admin = admin_result.first()
if not admin:
_LOGGER.warning("No admin user found, skipping scheduled backup")
return
backup_dir = app_config.data_dir / "backups"
await export_backup_to_file(session, admin.id, backup_dir, secrets_mode)
# Cleanup outside the session
cleanup_old_backups(backup_dir, keep=retention)
except Exception as e:
_LOGGER.error("Scheduled backup failed: %s", e)
# --- Release-check probe -----------------------------------------------------
_RELEASE_CHECK_JOB_ID = "upstream_release_check"
_RELEASE_CHECK_ONESHOT_JOB_ID = "upstream_release_check_oneshot"
_RELEASE_CHECK_ONESHOT_DELAY_SECONDS = 30
async def _schedule_release_check() -> None:
"""Register the interval + one-shot release-check jobs.
Reads the configured interval from AppSettings at startup. Idempotent —
APScheduler de-dupes via ``replace_existing=True``.
"""
from apscheduler.triggers.interval import IntervalTrigger
from datetime import datetime, timedelta, timezone
from sqlmodel.ext.asyncio.session import AsyncSession
from ..api.app_settings import get_setting
from ..database.engine import get_engine
from .release_check import parse_interval_hours, run_check
async with AsyncSession(get_engine()) as session:
raw = await get_setting(session, "release_check_interval_hours")
interval_hours = parse_interval_hours(raw)
scheduler = get_scheduler()
scheduler.add_job(
run_check,
IntervalTrigger(hours=interval_hours),
id=_RELEASE_CHECK_JOB_ID,
replace_existing=True,
max_instances=1,
)
# One-shot probe shortly after start so admins see a fresh status without
# waiting for the first interval tick. Mirrors the chat-title sync.
scheduler.add_job(
run_check,
"date",
run_date=datetime.now(timezone.utc) + timedelta(seconds=_RELEASE_CHECK_ONESHOT_DELAY_SECONDS),
id=_RELEASE_CHECK_ONESHOT_JOB_ID,
replace_existing=True,
max_instances=1,
)
_LOGGER.info("Scheduled release-check every %sh (one-shot in %ss)",
interval_hours, _RELEASE_CHECK_ONESHOT_DELAY_SECONDS)
# ---------------------------------------------------------------------------
# Bridge self-monitoring — deferred-backlog scan
# ---------------------------------------------------------------------------
_BRIDGE_SELF_BACKLOG_JOB_ID = "bridge_self_deferred_backlog_scan"
# 5 min trade-off between "operator notices the backlog quickly" and "extra
# DB churn on a quiet system". The scan is one indexed GROUP BY query.
_BRIDGE_SELF_BACKLOG_INTERVAL_SECONDS = 300
def _schedule_bridge_self_backlog_scan() -> None:
"""Install the periodic deferred-backlog scan for bridge_self."""
from apscheduler.triggers.interval import IntervalTrigger
scheduler = get_scheduler()
if scheduler.get_job(_BRIDGE_SELF_BACKLOG_JOB_ID):
return
scheduler.add_job(
_run_bridge_self_backlog_scan,
IntervalTrigger(seconds=_BRIDGE_SELF_BACKLOG_INTERVAL_SECONDS),
id=_BRIDGE_SELF_BACKLOG_JOB_ID,
replace_existing=True,
max_instances=1,
coalesce=True,
)
_LOGGER.info(
"Scheduled bridge_self deferred-backlog scan every %ds",
_BRIDGE_SELF_BACKLOG_INTERVAL_SECONDS,
)
async def _run_bridge_self_backlog_scan() -> None:
"""APScheduler entry point — scan deferred backlog and emit if needed."""
from .bridge_self import check_deferred_backlog
try:
stats = await check_deferred_backlog()
if stats.get("crossings"):
_LOGGER.info("bridge_self backlog scan stats: %s", stats)
else:
_LOGGER.debug("bridge_self backlog scan stats: %s", stats)
except Exception as err: # noqa: BLE001
_LOGGER.exception("bridge_self backlog scan failed: %s", err)
async def reschedule_release_check() -> None:
"""Re-arm the release-check job after settings changed.
Called from the PUT /settings handler when the interval or provider config
changes. Removes the existing interval job, lets ``_schedule_release_check``
re-read the setting and rebuild it, and queues a fresh one-shot so the new
config takes effect within seconds rather than at the next interval tick.
"""
scheduler = get_scheduler()
if scheduler.get_job(_RELEASE_CHECK_JOB_ID):
scheduler.remove_job(_RELEASE_CHECK_JOB_ID)
if scheduler.get_job(_RELEASE_CHECK_ONESHOT_JOB_ID):
scheduler.remove_job(_RELEASE_CHECK_ONESHOT_JOB_ID)
await _schedule_release_check()