309dec2b44
The scheduled_enabled / scheduled_times (and the periodic / memory counterparts) on TrackingConfig had been wired into the model, the API, and the test-dispatch path — but no production scheduler ever read them, so users saw the slot in the UI and only ever got fires through "Test". This adds the missing cron jobs and the dispatch fan-out, both keyed off the app-level IANA timezone. * services/scheduled_dispatch.py — production fan-out reusing the test-path event builders, picking the slot template per kind, and writing an EventLog row per fire so the dashboard reflects it. * services/scheduler.py — _load_immich_dispatch_jobs builds one CronTrigger per (tracker, kind, HH:MM) from the tracker's default TrackingConfig; reschedule_immich_dispatch_jobs rebuilds them all on any relevant CRUD or timezone change. * tracker / link / tracking-config CRUD endpoints now invalidate. Also: skip dispatch when scheduled/memory yield zero matching assets (prevents header-only "On this day:" spam), and update the EN/RU default scheduled_assets templates to surface that the delivery is a scheduled random selection.
1025 lines
36 KiB
Python
1025 lines
36 KiB
Python
"""APScheduler-based polling scheduler for trackers and actions."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def _resolve_zoneinfo(tz_name: str | None) -> ZoneInfo:
|
|
"""Resolve an IANA tz string to a ZoneInfo, falling back to UTC on any error.
|
|
|
|
Kept local to avoid importing from api/dispatch layers inside the scheduler
|
|
module (which is loaded at startup, before the API routers).
|
|
"""
|
|
if not tz_name:
|
|
return ZoneInfo("UTC")
|
|
try:
|
|
return ZoneInfo(tz_name)
|
|
except (ZoneInfoNotFoundError, ValueError):
|
|
_LOGGER.warning("Unknown timezone %r; falling back to UTC", tz_name)
|
|
return ZoneInfo("UTC")
|
|
|
|
|
|
async def _load_app_timezone() -> ZoneInfo:
|
|
"""Load the admin-configured app timezone from AppSetting (falls back to UTC)."""
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from ..api.app_settings import get_setting
|
|
from ..database.engine import get_engine
|
|
|
|
async with AsyncSession(get_engine()) as session:
|
|
tz_name = await get_setting(session, "timezone")
|
|
return _resolve_zoneinfo(tz_name)
|
|
|
|
_scheduler: AsyncIOScheduler | None = None
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Adaptive polling (Tier 6 of the big-album optimization plan).
|
|
#
|
|
# We don't touch the user-configured ``scan_interval`` — that's still the
|
|
# authoritative cadence. Instead, we *skip* a growing fraction of scheduled
|
|
# ticks when a tracker is idle, and reset to 1:1 as soon as it detects
|
|
# anything. The scheduler keeps running on the user's chosen period, so
|
|
# response time to the *first* change after an idle stretch is never worse
|
|
# than one tick — but the steady-state HTTP cost for a fleet of idle
|
|
# trackers drops by ~75%.
|
|
#
|
|
# Thresholds are intentionally conservative: a tracker polling every 30 s
|
|
# needs 5 min of silence before we halve its effective rate, and 15 min
|
|
# before we quarter it. Any caller can disable adaptive behavior by passing
|
|
# ``adaptive=False`` in the tracker filters dict (checked in ``_poll_tracker``).
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_ADAPTIVE_HALVE_THRESHOLD = 10 # consecutive empty ticks → 1-in-2
|
|
_ADAPTIVE_QUARTER_THRESHOLD = 30 # consecutive empty ticks → 1-in-4
|
|
_ADAPTIVE_MAX_SKIP = 4 # hard cap on skip factor
|
|
|
|
# Per-tracker adaptive state, keyed by tracker_id. Rebuilt on process
|
|
# restart — a short warmup period is fine and avoids persisting what is
|
|
# effectively a performance heuristic.
|
|
_adaptive_state: dict[int, dict[str, int]] = {}
|
|
|
|
|
|
def _compute_jitter(interval_seconds: int) -> int:
|
|
"""Return a jitter bound (in seconds) suitable for an IntervalTrigger.
|
|
|
|
Without jitter, a fleet of N trackers all on ``scan_interval=60`` wake up
|
|
at the same wall-clock second every minute — that creates a thundering-
|
|
herd on the upstream Immich/Gitea/etc. server. APScheduler's ``jitter``
|
|
randomizes each tick's firing time by ±jitter seconds.
|
|
|
|
We use a quarter of the interval up to a 30 s cap. For short intervals
|
|
(≤8 s) jitter would round to 0 — that's fine, at those cadences a
|
|
bursty pattern is what the user implicitly opted into.
|
|
"""
|
|
if interval_seconds <= 0:
|
|
return 0
|
|
return min(interval_seconds // 4, 30)
|
|
|
|
|
|
def get_scheduler() -> AsyncIOScheduler:
|
|
global _scheduler
|
|
if _scheduler is None:
|
|
# Sensible production defaults applied to every job unless overridden:
|
|
# * coalesce — collapse a queue of missed runs into one firing after
|
|
# a restart / pause, instead of bursting to catch up.
|
|
# * misfire_grace_time — accept firings up to 5 min late without
|
|
# dropping them silently.
|
|
# * max_instances=1 — never run two copies of the same tracker tick
|
|
# concurrently; the scheduler already enforces this on add_job,
|
|
# but we also set it as the default for safety.
|
|
_scheduler = AsyncIOScheduler(
|
|
job_defaults={
|
|
"coalesce": True,
|
|
"misfire_grace_time": 300,
|
|
"max_instances": 1,
|
|
},
|
|
)
|
|
return _scheduler
|
|
|
|
|
|
async def start_scheduler() -> None:
|
|
scheduler = get_scheduler()
|
|
if not scheduler.running:
|
|
scheduler.start()
|
|
_LOGGER.info("Scheduler started")
|
|
|
|
await _load_tracker_jobs()
|
|
await _load_action_jobs()
|
|
await _load_immich_dispatch_jobs()
|
|
|
|
# Start Telegram bot polling for bots with active command listeners
|
|
from .telegram_poller import start_command_listener_polling
|
|
await start_command_listener_polling()
|
|
|
|
# Schedule daily cleanup of old event log entries
|
|
_schedule_event_cleanup()
|
|
|
|
# Schedule periodic Telegram chat title refresh
|
|
_schedule_telegram_chat_sync()
|
|
|
|
# Start debounced command auto-sync scheduler
|
|
from .command_sync import start_sync_scheduler
|
|
start_sync_scheduler()
|
|
|
|
# Load scheduled backup job if enabled
|
|
await _load_backup_job()
|
|
|
|
|
|
def _schedule_event_cleanup() -> None:
|
|
"""Schedule a daily job to delete EventLog entries older than 90 days."""
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
|
|
scheduler = get_scheduler()
|
|
job_id = "cleanup_old_events"
|
|
if scheduler.get_job(job_id):
|
|
return
|
|
scheduler.add_job(
|
|
_cleanup_old_events,
|
|
CronTrigger(hour=3, minute=0),
|
|
id=job_id,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info("Scheduled daily event log cleanup at 03:00 UTC")
|
|
|
|
|
|
# Chat-title refresh tuning.
|
|
# Sweep runs daily as a fallback — we additionally refresh opportunistically
|
|
# on every incoming webhook/long-poll update (``save_chat_from_webhook``), so
|
|
# the sweep only catches chats that haven't sent anything recently.
|
|
_CHAT_SYNC_INTERVAL_HOURS = 24
|
|
_CHAT_SYNC_INITIAL_DELAY_SECONDS = 60
|
|
_CHAT_SYNC_CONCURRENCY = 10
|
|
|
|
|
|
def _schedule_telegram_chat_sync() -> None:
|
|
"""Schedule periodic refresh of Telegram chat titles via getChat."""
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
|
|
scheduler = get_scheduler()
|
|
job_id = "refresh_telegram_chat_titles"
|
|
if scheduler.get_job(job_id):
|
|
return
|
|
scheduler.add_job(
|
|
_refresh_telegram_chat_titles,
|
|
IntervalTrigger(hours=_CHAT_SYNC_INTERVAL_HOURS),
|
|
id=job_id,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
next_run_time=None,
|
|
)
|
|
# Fire once shortly after startup so stale names refresh without waiting a day.
|
|
from datetime import datetime, timedelta, timezone
|
|
scheduler.add_job(
|
|
_refresh_telegram_chat_titles,
|
|
"date",
|
|
run_date=datetime.now(timezone.utc) + timedelta(seconds=_CHAT_SYNC_INITIAL_DELAY_SECONDS),
|
|
id="refresh_telegram_chat_titles_once",
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled Telegram chat title refresh every %sh (concurrency %s)",
|
|
_CHAT_SYNC_INTERVAL_HOURS, _CHAT_SYNC_CONCURRENCY,
|
|
)
|
|
|
|
|
|
async def _refresh_telegram_chat_titles() -> None:
|
|
"""Refresh TelegramChat.title/username via getChat for all known chats.
|
|
|
|
Runs requests in bounded parallel (``_CHAT_SYNC_CONCURRENCY``) so a fleet
|
|
of 50 chats finishes in ~5 round-trips instead of 50. Telegram's
|
|
``getChat`` rate limit is well above 10 concurrent per bot, and the cap is
|
|
global across bots so we never flood the shared HTTP session.
|
|
"""
|
|
import asyncio
|
|
from collections import defaultdict
|
|
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from notify_bridge_core.notifications.telegram.client import TelegramClient
|
|
|
|
from ..database.engine import get_engine
|
|
from ..database.models import TelegramBot, TelegramChat
|
|
from .http_session import get_http_session
|
|
|
|
engine = get_engine()
|
|
async with AsyncSession(engine) as session:
|
|
bots = (await session.exec(select(TelegramBot))).all()
|
|
bot_tokens = {b.id: b.token for b in bots if b.token}
|
|
if not bot_tokens:
|
|
return
|
|
chats = (await session.exec(select(TelegramChat))).all()
|
|
|
|
by_bot: dict[int, list[TelegramChat]] = defaultdict(list)
|
|
for chat in chats:
|
|
if chat.bot_id in bot_tokens:
|
|
by_bot[chat.bot_id].append(chat)
|
|
if not by_bot:
|
|
return
|
|
|
|
http = await get_http_session()
|
|
clients_by_bot = {
|
|
bot_id: TelegramClient(http, token) for bot_id, token in bot_tokens.items()
|
|
}
|
|
|
|
sem = asyncio.Semaphore(_CHAT_SYNC_CONCURRENCY)
|
|
|
|
async def _fetch(bot_id: int, chat: TelegramChat) -> tuple[int, dict | None, str | None]:
|
|
"""Return (chat_row_id, info_dict_or_None, error_message_or_None)."""
|
|
async with sem:
|
|
try:
|
|
res = await clients_by_bot[bot_id].get_chat(chat.chat_id)
|
|
except Exception as err: # noqa: BLE001
|
|
return chat.id, None, str(err)
|
|
if not res.get("success"):
|
|
return chat.id, None, res.get("error") or "unknown"
|
|
return chat.id, (res.get("result") or {}), None
|
|
|
|
tasks = [
|
|
_fetch(bot_id, chat)
|
|
for bot_id, bot_chats in by_bot.items()
|
|
for chat in bot_chats
|
|
]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
refreshed = 0
|
|
errors = 0
|
|
# Bucket results first, then fetch all rows in one IN-query instead of
|
|
# per-row ``session.get`` — otherwise a 50-chat fleet issues 50 extra
|
|
# SELECTs before commit.
|
|
successes: dict[int, dict] = {}
|
|
for chat_id, info, err in results:
|
|
if err is not None or info is None:
|
|
errors += 1
|
|
if err:
|
|
_LOGGER.debug("getChat failed for chat row %s: %s", chat_id, err)
|
|
continue
|
|
if chat_id is not None:
|
|
successes[chat_id] = info
|
|
async with AsyncSession(engine) as session:
|
|
if successes:
|
|
rows = (await session.exec(
|
|
select(TelegramChat).where(TelegramChat.id.in_(list(successes.keys())))
|
|
)).all()
|
|
for merged in rows:
|
|
info = successes.get(merged.id)
|
|
if not info:
|
|
continue
|
|
title = info.get("title") or (
|
|
(info.get("first_name", "") + " " + info.get("last_name", "")).strip()
|
|
)
|
|
changed = False
|
|
if title and merged.title != title:
|
|
merged.title = title
|
|
changed = True
|
|
new_username = info.get("username")
|
|
if new_username is not None and merged.username != new_username:
|
|
merged.username = new_username
|
|
changed = True
|
|
if changed:
|
|
session.add(merged)
|
|
refreshed += 1
|
|
await session.commit()
|
|
_LOGGER.info(
|
|
"Telegram chat title refresh: %s updated, %s errors", refreshed, errors
|
|
)
|
|
|
|
|
|
async def _cleanup_old_events() -> None:
|
|
"""Delete EventLog / WebhookPayloadLog / ActionExecution rows older than the
|
|
configured retention window. A retention of 0 disables the job.
|
|
"""
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from sqlmodel import delete
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from ..config import settings
|
|
from ..database.engine import get_engine
|
|
from ..database.models import ActionExecution, EventLog, WebhookPayloadLog
|
|
|
|
days = settings.event_log_retention_days
|
|
if days <= 0:
|
|
_LOGGER.debug("Event log retention disabled (days=0); skipping cleanup")
|
|
return
|
|
|
|
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
|
|
engine = get_engine()
|
|
async with AsyncSession(engine) as session:
|
|
await session.exec(delete(EventLog).where(EventLog.created_at < cutoff))
|
|
await session.exec(
|
|
delete(WebhookPayloadLog).where(WebhookPayloadLog.created_at < cutoff)
|
|
)
|
|
await session.exec(
|
|
delete(ActionExecution).where(ActionExecution.started_at < cutoff)
|
|
)
|
|
await session.commit()
|
|
_LOGGER.info(
|
|
"Cleaned event_log / webhook_payload_log / action_execution older than %s",
|
|
cutoff.date(),
|
|
)
|
|
|
|
|
|
async def _load_tracker_jobs() -> None:
|
|
"""Load enabled trackers and schedule polling jobs."""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
from ..database.engine import get_engine
|
|
from ..database.models import NotificationTracker, ServiceProvider as ServiceProviderModel
|
|
|
|
engine = get_engine()
|
|
scheduler = get_scheduler()
|
|
|
|
async with AsyncSession(engine) as session:
|
|
result = await session.exec(select(NotificationTracker).where(NotificationTracker.enabled == True))
|
|
trackers = result.all()
|
|
|
|
# Batch-load provider types for scheduler detection
|
|
unique_provider_ids = list({t.provider_id for t in trackers})
|
|
provider_types: dict[int, str] = {}
|
|
if unique_provider_ids:
|
|
provider_result = await session.exec(
|
|
select(ServiceProviderModel).where(
|
|
ServiceProviderModel.id.in_(unique_provider_ids)
|
|
)
|
|
)
|
|
provider_types = {p.id: p.type for p in provider_result.all()}
|
|
|
|
tz = await _load_app_timezone()
|
|
|
|
for tracker in trackers:
|
|
job_id = f"tracker_{tracker.id}"
|
|
if scheduler.get_job(job_id):
|
|
continue
|
|
|
|
ptype = provider_types.get(tracker.provider_id, "")
|
|
filters = tracker.filters or {}
|
|
|
|
# Scheduler providers can use cron triggers
|
|
if ptype == "scheduler" and filters.get("schedule_type") == "cron":
|
|
cron_expr = filters.get("cron_expression", "")
|
|
if cron_expr:
|
|
try:
|
|
_add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name, tz)
|
|
continue
|
|
except Exception as e:
|
|
_LOGGER.error(
|
|
"Invalid cron expression for tracker %d (%s): %s — falling back to interval",
|
|
tracker.id, tracker.name, e,
|
|
)
|
|
|
|
jitter = _compute_jitter(tracker.scan_interval)
|
|
scheduler.add_job(
|
|
_poll_tracker,
|
|
"interval",
|
|
seconds=tracker.scan_interval,
|
|
jitter=jitter or None,
|
|
id=job_id,
|
|
args=[tracker.id],
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled tracker %d (%s) every %ds (jitter ±%ds)",
|
|
tracker.id, tracker.name, tracker.scan_interval, jitter,
|
|
)
|
|
|
|
|
|
def _add_cron_job(
|
|
scheduler: AsyncIOScheduler,
|
|
job_id: str,
|
|
tracker_id: int,
|
|
cron_expression: str,
|
|
tracker_name: str,
|
|
tz: ZoneInfo,
|
|
) -> None:
|
|
"""Add a cron-triggered job for a scheduler-type tracker.
|
|
|
|
``tz`` is the user-configured app timezone; without it APScheduler
|
|
interprets the crontab in the host's local timezone, which surfaces as
|
|
events firing at the "wrong" wall-clock time for operators in a non-UTC
|
|
zone (see the companion fix in ``update_settings`` which reschedules on
|
|
timezone changes).
|
|
"""
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
trigger = CronTrigger.from_crontab(cron_expression, timezone=tz)
|
|
scheduler.add_job(
|
|
_poll_tracker,
|
|
trigger,
|
|
id=job_id,
|
|
args=[tracker_id],
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled tracker %d (%s) with cron: %s [tz=%s]",
|
|
tracker_id, tracker_name, cron_expression, tz.key,
|
|
)
|
|
|
|
|
|
async def schedule_tracker(
|
|
tracker_id: int,
|
|
interval: int,
|
|
cron_expression: str | None = None,
|
|
) -> None:
|
|
"""Add or update a scheduler job for a tracker."""
|
|
scheduler = get_scheduler()
|
|
job_id = f"tracker_{tracker_id}"
|
|
|
|
# A reschedule typically follows a config edit or enable/disable flip —
|
|
# drop adaptive back-off so the first tick after the change runs promptly.
|
|
reset_adaptive_state(tracker_id)
|
|
|
|
# Remove existing job first to allow trigger type changes
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
|
|
if cron_expression:
|
|
try:
|
|
tz = await _load_app_timezone()
|
|
_add_cron_job(scheduler, job_id, tracker_id, cron_expression, f"tracker-{tracker_id}", tz)
|
|
return
|
|
except Exception as e:
|
|
_LOGGER.error("Invalid cron for tracker %d: %s — using interval", tracker_id, e)
|
|
|
|
jitter = _compute_jitter(interval)
|
|
scheduler.add_job(
|
|
_poll_tracker,
|
|
"interval",
|
|
seconds=interval,
|
|
jitter=jitter or None,
|
|
id=job_id,
|
|
args=[tracker_id],
|
|
replace_existing=True,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled tracker %d every %ds (jitter ±%ds)", tracker_id, interval, jitter,
|
|
)
|
|
|
|
|
|
async def unschedule_tracker(tracker_id: int) -> None:
|
|
"""Remove a scheduler job for a tracker."""
|
|
scheduler = get_scheduler()
|
|
job_id = f"tracker_{tracker_id}"
|
|
reset_adaptive_state(tracker_id)
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
_LOGGER.info("Unscheduled tracker %d", tracker_id)
|
|
|
|
|
|
def _adaptive_should_skip(tracker_id: int) -> bool:
|
|
"""Return True when the adaptive heuristic says to skip this tick.
|
|
|
|
Run-length skip: if we're in 1-in-K mode, skip (K-1) ticks between each
|
|
real poll. Stateless about the *current* tick counter except for the
|
|
``tick_counter`` we bump here.
|
|
"""
|
|
state = _adaptive_state.get(tracker_id)
|
|
if not state:
|
|
return False
|
|
skip_every = state.get("skip_every", 1)
|
|
if skip_every <= 1:
|
|
return False
|
|
state["tick_counter"] = state.get("tick_counter", 0) + 1
|
|
# Fire on ticks where counter % skip_every == 0; skip the rest.
|
|
return (state["tick_counter"] % skip_every) != 0
|
|
|
|
|
|
def _adaptive_update(tracker_id: int, events_detected: int) -> None:
|
|
"""Update the adaptive counter after a real tick ran."""
|
|
state = _adaptive_state.setdefault(
|
|
tracker_id, {"empty_count": 0, "skip_every": 1, "tick_counter": 0}
|
|
)
|
|
if events_detected > 0:
|
|
if state["skip_every"] > 1:
|
|
_LOGGER.info(
|
|
"Adaptive polling: tracker %d saw activity, restoring base rate",
|
|
tracker_id,
|
|
)
|
|
state["empty_count"] = 0
|
|
state["skip_every"] = 1
|
|
state["tick_counter"] = 0
|
|
return
|
|
|
|
state["empty_count"] = state.get("empty_count", 0) + 1
|
|
if (
|
|
state["empty_count"] >= _ADAPTIVE_QUARTER_THRESHOLD
|
|
and state["skip_every"] < _ADAPTIVE_MAX_SKIP
|
|
):
|
|
state["skip_every"] = _ADAPTIVE_MAX_SKIP
|
|
_LOGGER.info(
|
|
"Adaptive polling: tracker %d idle for %d ticks, skipping 3 of 4",
|
|
tracker_id, state["empty_count"],
|
|
)
|
|
elif (
|
|
state["empty_count"] >= _ADAPTIVE_HALVE_THRESHOLD
|
|
and state["skip_every"] < 2
|
|
):
|
|
state["skip_every"] = 2
|
|
_LOGGER.info(
|
|
"Adaptive polling: tracker %d idle for %d ticks, skipping every other",
|
|
tracker_id, state["empty_count"],
|
|
)
|
|
|
|
|
|
def reset_adaptive_state(tracker_id: int) -> None:
|
|
"""Drop cached adaptive counters for a tracker.
|
|
|
|
Used by API callers that make changes requiring the tracker to run
|
|
promptly on the next scheduled tick (enable/disable, config edits,
|
|
manual "check now" actions).
|
|
"""
|
|
_adaptive_state.pop(tracker_id, None)
|
|
|
|
|
|
async def _poll_tracker(tracker_id: int) -> None:
|
|
"""Poll a tracker for changes."""
|
|
from .watcher import check_tracker
|
|
|
|
if _adaptive_should_skip(tracker_id):
|
|
return
|
|
|
|
try:
|
|
result = await check_tracker(tracker_id)
|
|
except Exception as e:
|
|
_LOGGER.error("Error polling tracker %d: %s", tracker_id, e)
|
|
return
|
|
|
|
# Treat the "error" / "skipped" statuses as inconclusive — don't let
|
|
# a transient upstream failure trick the heuristic into backing off.
|
|
if isinstance(result, dict) and result.get("status") == "ok":
|
|
_adaptive_update(tracker_id, int(result.get("events_detected", 0) or 0))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Action scheduling
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def _load_action_jobs() -> None:
|
|
"""Load enabled actions and schedule execution jobs."""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
from ..database.engine import get_engine
|
|
from ..database.models import Action
|
|
|
|
engine = get_engine()
|
|
scheduler = get_scheduler()
|
|
|
|
async with AsyncSession(engine) as session:
|
|
result = await session.exec(
|
|
select(Action).where(Action.enabled == True) # noqa: E712
|
|
)
|
|
actions = result.all()
|
|
|
|
tz = await _load_app_timezone()
|
|
|
|
for action in actions:
|
|
job_id = f"action_{action.id}"
|
|
if scheduler.get_job(job_id):
|
|
continue
|
|
|
|
if action.schedule_type == "cron" and action.schedule_cron:
|
|
try:
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
trigger = CronTrigger.from_crontab(action.schedule_cron, timezone=tz)
|
|
scheduler.add_job(
|
|
_run_action,
|
|
trigger,
|
|
id=job_id,
|
|
args=[action.id],
|
|
replace_existing=True,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled action %d (%s) with cron: %s [tz=%s]",
|
|
action.id, action.name, action.schedule_cron, tz.key,
|
|
)
|
|
continue
|
|
except Exception as e:
|
|
_LOGGER.error(
|
|
"Invalid cron for action %d (%s): %s — falling back to interval",
|
|
action.id, action.name, e,
|
|
)
|
|
|
|
scheduler.add_job(
|
|
_run_action,
|
|
"interval",
|
|
seconds=action.schedule_interval,
|
|
id=job_id,
|
|
args=[action.id],
|
|
replace_existing=True,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled action %d (%s) every %ds",
|
|
action.id, action.name, action.schedule_interval,
|
|
)
|
|
|
|
|
|
async def schedule_action(
|
|
action_id: int,
|
|
schedule_type: str = "interval",
|
|
interval: int = 3600,
|
|
cron_expression: str = "",
|
|
) -> None:
|
|
"""Add or update a scheduler job for an action."""
|
|
scheduler = get_scheduler()
|
|
job_id = f"action_{action_id}"
|
|
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
|
|
if schedule_type == "cron" and cron_expression:
|
|
try:
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
tz = await _load_app_timezone()
|
|
trigger = CronTrigger.from_crontab(cron_expression, timezone=tz)
|
|
scheduler.add_job(
|
|
_run_action,
|
|
trigger,
|
|
id=job_id,
|
|
args=[action_id],
|
|
replace_existing=True,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled action %d with cron: %s [tz=%s]",
|
|
action_id, cron_expression, tz.key,
|
|
)
|
|
return
|
|
except Exception as e:
|
|
_LOGGER.error("Invalid cron for action %d: %s — using interval", action_id, e)
|
|
|
|
scheduler.add_job(
|
|
_run_action,
|
|
"interval",
|
|
seconds=interval,
|
|
id=job_id,
|
|
args=[action_id],
|
|
replace_existing=True,
|
|
)
|
|
_LOGGER.info("Scheduled action %d every %ds", action_id, interval)
|
|
|
|
|
|
async def unschedule_action(action_id: int) -> None:
|
|
"""Remove a scheduler job for an action."""
|
|
scheduler = get_scheduler()
|
|
job_id = f"action_{action_id}"
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
_LOGGER.info("Unscheduled action %d", action_id)
|
|
|
|
|
|
async def reschedule_cron_jobs_for_timezone_change() -> None:
|
|
"""Re-add every cron-triggered tracker/action job under the new app timezone.
|
|
|
|
Called by the admin settings endpoint after the ``timezone`` AppSetting is
|
|
updated. APScheduler's ``CronTrigger`` freezes its timezone at construction
|
|
time, so a timezone change has no effect on jobs already in the scheduler
|
|
— we have to rebuild those jobs. Interval-triggered jobs are tz-agnostic
|
|
and are left alone.
|
|
"""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from ..database.engine import get_engine
|
|
from ..database.models import Action, NotificationTracker, ServiceProvider as ServiceProviderModel
|
|
|
|
engine = get_engine()
|
|
scheduler = get_scheduler()
|
|
tz = await _load_app_timezone()
|
|
rescheduled = 0
|
|
|
|
async with AsyncSession(engine) as session:
|
|
# Trackers with cron scheduling (scheduler provider + schedule_type=cron).
|
|
trackers = (await session.exec(
|
|
select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712
|
|
)).all()
|
|
provider_ids = list({t.provider_id for t in trackers})
|
|
provider_types: dict[int, str] = {}
|
|
if provider_ids:
|
|
rows = await session.exec(
|
|
select(ServiceProviderModel).where(ServiceProviderModel.id.in_(provider_ids))
|
|
)
|
|
provider_types = {p.id: p.type for p in rows.all()}
|
|
|
|
for tracker in trackers:
|
|
if provider_types.get(tracker.provider_id) != "scheduler":
|
|
continue
|
|
filters = tracker.filters or {}
|
|
if filters.get("schedule_type") != "cron":
|
|
continue
|
|
cron_expr = filters.get("cron_expression", "")
|
|
if not cron_expr:
|
|
continue
|
|
job_id = f"tracker_{tracker.id}"
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
try:
|
|
_add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name, tz)
|
|
rescheduled += 1
|
|
except Exception as e: # noqa: BLE001
|
|
_LOGGER.error(
|
|
"Failed to re-apply cron for tracker %d on tz change: %s",
|
|
tracker.id, e,
|
|
)
|
|
|
|
# Actions with cron schedules.
|
|
actions = (await session.exec(
|
|
select(Action).where(Action.enabled == True) # noqa: E712
|
|
)).all()
|
|
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
for action in actions:
|
|
if action.schedule_type != "cron" or not action.schedule_cron:
|
|
continue
|
|
job_id = f"action_{action.id}"
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
try:
|
|
scheduler.add_job(
|
|
_run_action,
|
|
CronTrigger.from_crontab(action.schedule_cron, timezone=tz),
|
|
id=job_id,
|
|
args=[action.id],
|
|
replace_existing=True,
|
|
)
|
|
rescheduled += 1
|
|
except Exception as e: # noqa: BLE001
|
|
_LOGGER.error(
|
|
"Failed to re-apply cron for action %d on tz change: %s",
|
|
action.id, e,
|
|
)
|
|
|
|
_LOGGER.info(
|
|
"Rescheduled %d cron job(s) for new app timezone %s", rescheduled, tz.key,
|
|
)
|
|
|
|
# Immich scheduled/periodic/memory jobs are also CronTrigger-based and
|
|
# carry the same frozen-tz problem — rebuild them under the new tz.
|
|
await reschedule_immich_dispatch_jobs()
|
|
|
|
|
|
async def _run_action(action_id: int) -> None:
|
|
"""Run an action (called by APScheduler)."""
|
|
from .action_runner import run_action
|
|
try:
|
|
await run_action(action_id, trigger="scheduled")
|
|
except Exception as e:
|
|
_LOGGER.error("Error running action %d: %s", action_id, e)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Immich scheduled / periodic / memory dispatch (cron-fired)
|
|
#
|
|
# These three slots fire on wall-clock schedules taken from the tracker's
|
|
# default ``TrackingConfig`` (``scheduled_times``, ``periodic_times``,
|
|
# ``memory_times`` — comma-separated ``HH:MM`` strings) interpreted in the
|
|
# app-level IANA timezone. The dispatch flow lives in
|
|
# ``services.scheduled_dispatch``; this section just owns scheduling.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_IMMICH_DISPATCH_KINDS = ("scheduled", "periodic", "memory")
|
|
_IMMICH_DISPATCH_PREFIX = "immich_dispatch_"
|
|
|
|
|
|
def _parse_hhmm_list(raw: str) -> list[tuple[int, int]]:
|
|
"""Parse ``"09:00,18:30"`` → ``[(9, 0), (18, 30)]``, skipping bad entries.
|
|
|
|
A typo in one slot must not prevent the others from scheduling — we log
|
|
and move on rather than raising.
|
|
"""
|
|
out: list[tuple[int, int]] = []
|
|
for part in (raw or "").split(","):
|
|
part = part.strip()
|
|
if not part:
|
|
continue
|
|
try:
|
|
h_str, m_str = part.split(":", 1)
|
|
hour, minute = int(h_str), int(m_str)
|
|
except ValueError:
|
|
_LOGGER.warning("Skipping invalid time literal %r", part)
|
|
continue
|
|
if not (0 <= hour <= 23 and 0 <= minute <= 59):
|
|
_LOGGER.warning("Skipping out-of-range time %r", part)
|
|
continue
|
|
out.append((hour, minute))
|
|
return out
|
|
|
|
|
|
async def _run_immich_dispatch(tracker_id: int, kind: str) -> None:
|
|
"""APScheduler entry point — wraps the dispatch helper to swallow errors."""
|
|
from .scheduled_dispatch import dispatch_scheduled_for_tracker
|
|
try:
|
|
await dispatch_scheduled_for_tracker(tracker_id, kind) # type: ignore[arg-type]
|
|
except Exception as err: # noqa: BLE001
|
|
_LOGGER.error(
|
|
"Immich %s dispatch for tracker %d failed: %s", kind, tracker_id, err,
|
|
)
|
|
|
|
|
|
async def _load_immich_dispatch_jobs() -> None:
|
|
"""Schedule cron jobs for every (tracker, kind, time) where the kind is on.
|
|
|
|
Reads each enabled Immich tracker's *default* tracking config — per-link
|
|
overrides only gate dispatch (handled in ``scheduled_dispatch``), they do
|
|
not influence the fire schedule.
|
|
"""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
|
|
from ..database.engine import get_engine
|
|
from ..database.models import (
|
|
NotificationTracker,
|
|
ServiceProvider as ServiceProviderModel,
|
|
TrackingConfig,
|
|
)
|
|
|
|
engine = get_engine()
|
|
scheduler = get_scheduler()
|
|
tz = await _load_app_timezone()
|
|
|
|
async with AsyncSession(engine) as session:
|
|
trackers = (await session.exec(
|
|
select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712
|
|
)).all()
|
|
if not trackers:
|
|
return
|
|
|
|
provider_ids = list({t.provider_id for t in trackers})
|
|
provider_types: dict[int, str] = {}
|
|
if provider_ids:
|
|
rows = await session.exec(
|
|
select(ServiceProviderModel).where(
|
|
ServiceProviderModel.id.in_(provider_ids)
|
|
)
|
|
)
|
|
provider_types = {p.id: p.type for p in rows.all()}
|
|
|
|
tc_ids = list({
|
|
t.default_tracking_config_id for t in trackers
|
|
if t.default_tracking_config_id
|
|
})
|
|
tc_map: dict[int, TrackingConfig] = {}
|
|
if tc_ids:
|
|
rows = await session.exec(
|
|
select(TrackingConfig).where(TrackingConfig.id.in_(tc_ids))
|
|
)
|
|
tc_map = {tc.id: tc for tc in rows.all()}
|
|
|
|
scheduled = 0
|
|
for tracker in trackers:
|
|
if provider_types.get(tracker.provider_id) != "immich":
|
|
continue
|
|
tc = tc_map.get(tracker.default_tracking_config_id) if tracker.default_tracking_config_id else None
|
|
if tc is None:
|
|
continue
|
|
|
|
for kind in _IMMICH_DISPATCH_KINDS:
|
|
if not getattr(tc, f"{kind}_enabled", False):
|
|
continue
|
|
times_raw = getattr(tc, f"{kind}_times", "") or ""
|
|
for hour, minute in _parse_hhmm_list(times_raw):
|
|
job_id = f"{_IMMICH_DISPATCH_PREFIX}{kind}_{tracker.id}_{hour:02d}{minute:02d}"
|
|
scheduler.add_job(
|
|
_run_immich_dispatch,
|
|
CronTrigger(hour=hour, minute=minute, timezone=tz),
|
|
id=job_id,
|
|
args=[tracker.id, kind],
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
scheduled += 1
|
|
_LOGGER.info(
|
|
"Scheduled Immich %s for tracker %d at %02d:%02d [tz=%s]",
|
|
kind, tracker.id, hour, minute, tz.key,
|
|
)
|
|
|
|
if scheduled:
|
|
_LOGGER.info(
|
|
"Loaded %d Immich scheduled/periodic/memory job(s) [tz=%s]",
|
|
scheduled, tz.key,
|
|
)
|
|
|
|
|
|
async def reschedule_immich_dispatch_jobs() -> None:
|
|
"""Drop and rebuild all Immich scheduled/periodic/memory jobs.
|
|
|
|
Cheap to call on every relevant mutation — a typical install has only a
|
|
handful of trackers. Called from the tracker, link, and tracking-config
|
|
CRUD endpoints, and from ``reschedule_cron_jobs_for_timezone_change``.
|
|
"""
|
|
scheduler = get_scheduler()
|
|
for job in list(scheduler.get_jobs()):
|
|
if job.id.startswith(_IMMICH_DISPATCH_PREFIX):
|
|
scheduler.remove_job(job.id)
|
|
await _load_immich_dispatch_jobs()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Scheduled backup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_BACKUP_JOB_ID = "scheduled_backup"
|
|
|
|
|
|
async def _load_backup_job() -> None:
|
|
"""Load scheduled backup job from settings if enabled."""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession as _AS
|
|
from ..database.engine import get_engine
|
|
from ..database.models import AppSetting
|
|
|
|
engine = get_engine()
|
|
async with _AS(engine) as session:
|
|
enabled_row = await session.get(AppSetting, "backup_scheduled_enabled")
|
|
interval_row = await session.get(AppSetting, "backup_scheduled_interval_hours")
|
|
|
|
enabled = enabled_row and enabled_row.value == "true"
|
|
if not enabled:
|
|
return
|
|
|
|
interval_hours = int(interval_row.value) if interval_row and interval_row.value else 24
|
|
scheduler = get_scheduler()
|
|
scheduler.add_job(
|
|
_run_scheduled_backup,
|
|
"interval",
|
|
hours=interval_hours,
|
|
id=_BACKUP_JOB_ID,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info("Scheduled backup every %dh", interval_hours)
|
|
|
|
|
|
async def schedule_backup(interval_hours: int = 24) -> None:
|
|
"""Add or update the scheduled backup job."""
|
|
scheduler = get_scheduler()
|
|
if scheduler.get_job(_BACKUP_JOB_ID):
|
|
scheduler.remove_job(_BACKUP_JOB_ID)
|
|
|
|
scheduler.add_job(
|
|
_run_scheduled_backup,
|
|
"interval",
|
|
hours=interval_hours,
|
|
id=_BACKUP_JOB_ID,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info("Scheduled backup every %dh", interval_hours)
|
|
|
|
|
|
async def unschedule_backup() -> None:
|
|
"""Remove the scheduled backup job."""
|
|
scheduler = get_scheduler()
|
|
if scheduler.get_job(_BACKUP_JOB_ID):
|
|
scheduler.remove_job(_BACKUP_JOB_ID)
|
|
_LOGGER.info("Unscheduled backup job")
|
|
|
|
|
|
async def _run_scheduled_backup() -> None:
|
|
"""Run a scheduled backup (called by APScheduler)."""
|
|
from sqlmodel.ext.asyncio.session import AsyncSession as _AS
|
|
from ..database.engine import get_engine
|
|
from ..database.models import AppSetting, User
|
|
from ..config import settings as app_config
|
|
from .backup_schema import SecretsMode
|
|
from .backup_service import export_backup_to_file, cleanup_old_backups
|
|
|
|
try:
|
|
engine = get_engine()
|
|
async with _AS(engine) as session:
|
|
# Read settings
|
|
secrets_row = await session.get(AppSetting, "backup_secrets_mode")
|
|
retention_row = await session.get(AppSetting, "backup_retention_count")
|
|
|
|
secrets_mode = SecretsMode(secrets_row.value) if secrets_row and secrets_row.value else SecretsMode.EXCLUDE
|
|
retention = int(retention_row.value) if retention_row and retention_row.value else 5
|
|
|
|
# Find admin user (first admin) for ownership context
|
|
from sqlmodel import select
|
|
admin_result = await session.exec(
|
|
select(User).where(User.role == "admin")
|
|
)
|
|
admin = admin_result.first()
|
|
if not admin:
|
|
_LOGGER.warning("No admin user found, skipping scheduled backup")
|
|
return
|
|
|
|
backup_dir = app_config.data_dir / "backups"
|
|
await export_backup_to_file(session, admin.id, backup_dir, secrets_mode)
|
|
|
|
# Cleanup outside the session
|
|
cleanup_old_backups(backup_dir, keep=retention)
|
|
|
|
except Exception as e:
|
|
_LOGGER.error("Scheduled backup failed: %s", e)
|