42af7a6551
- Gitea: NotificationTracker now exposes sender allowlist / blocklist filters via MultiEntitySelect, populated from Gitea /users/search merged with past EventLog senders so the picker is useful before the first webhook arrives. - Webhook providers (gitea, planka, webhook): stop scheduling interval polling jobs on tracker create/update/startup; hide the "every Xs" indicator in the tracker list since there is no polling. - Dashboard: stat cards are now <a> links that route to providers, trackers, targets, command-trackers, or scroll to the events panel. Provider deck rows highlight the target provider on click. - Command trackers / command configs: auto-reselect the right config when the provider type changes (matches notification-tracker behavior). - Migration: drop legacy batch_duration column from notification_tracker — the field is gone from the model but its NOT NULL constraint blocked inserts on older DBs. - Docs: refresh entity-relationships.md with current NotificationTracker fields (filters, adaptive_max_skip, default_*_config_id).
1119 lines
40 KiB
Python
1119 lines
40 KiB
Python
"""APScheduler-based polling scheduler for trackers and actions."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def _resolve_zoneinfo(tz_name: str | None) -> ZoneInfo:
|
|
"""Resolve an IANA tz string to a ZoneInfo, falling back to UTC on any error.
|
|
|
|
Kept local to avoid importing from api/dispatch layers inside the scheduler
|
|
module (which is loaded at startup, before the API routers).
|
|
"""
|
|
if not tz_name:
|
|
return ZoneInfo("UTC")
|
|
try:
|
|
return ZoneInfo(tz_name)
|
|
except (ZoneInfoNotFoundError, ValueError):
|
|
_LOGGER.warning("Unknown timezone %r; falling back to UTC", tz_name)
|
|
return ZoneInfo("UTC")
|
|
|
|
|
|
async def _load_app_timezone() -> ZoneInfo:
|
|
"""Load the admin-configured app timezone from AppSetting (falls back to UTC)."""
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from ..api.app_settings import get_setting
|
|
from ..database.engine import get_engine
|
|
|
|
async with AsyncSession(get_engine()) as session:
|
|
tz_name = await get_setting(session, "timezone")
|
|
return _resolve_zoneinfo(tz_name)
|
|
|
|
_scheduler: AsyncIOScheduler | None = None
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Adaptive polling (Tier 6 of the big-album optimization plan).
|
|
#
|
|
# We don't touch the user-configured ``scan_interval`` — that's still the
|
|
# authoritative cadence. Instead, we *skip* a growing fraction of scheduled
|
|
# ticks when a tracker is idle, and reset to 1:1 as soon as it detects
|
|
# anything. The scheduler keeps running on the user's chosen period, so
|
|
# response time to the *first* change after an idle stretch is never worse
|
|
# than one tick — but the steady-state HTTP cost for a fleet of idle
|
|
# trackers drops by ~75%.
|
|
#
|
|
# Opt-in per tracker via the ``adaptive_max_skip`` column:
|
|
# * NULL or 0 → adaptive polling disabled, every tick runs (default)
|
|
# * 2 → skip at most 1-in-2 ticks after long idle
|
|
# * 3, 4, ... → up to (N-1)-in-N skipping
|
|
# Thresholds are intentionally conservative: a tracker polling every 30 s
|
|
# needs 5 min of silence before we halve its effective rate, and 15 min
|
|
# before we quarter it.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_ADAPTIVE_HALVE_THRESHOLD = 10 # consecutive empty ticks → 1-in-2
|
|
_ADAPTIVE_QUARTER_THRESHOLD = 30 # consecutive empty ticks → 1-in-4
|
|
|
|
# Per-tracker adaptive state, keyed by tracker_id. Rebuilt on process
|
|
# restart — a short warmup period is fine and avoids persisting what is
|
|
# effectively a performance heuristic.
|
|
_adaptive_state: dict[int, dict[str, int]] = {}
|
|
|
|
# Per-tracker cap on the skip factor, mirrored from the DB column at
|
|
# schedule time. Absence of an entry (or 0) means adaptive polling is off
|
|
# for that tracker — ``_adaptive_should_skip`` returns False immediately.
|
|
_adaptive_max_skip: dict[int, int] = {}
|
|
|
|
|
|
def set_adaptive_max_skip(tracker_id: int, max_skip: int | None) -> None:
|
|
"""Register/clear the adaptive cap for a tracker.
|
|
|
|
Called by the scheduling helpers so the tick-fast-path in
|
|
``_adaptive_should_skip`` doesn't need to re-query the DB. Values ≤ 1
|
|
disable back-off for the tracker — every scheduled tick runs.
|
|
"""
|
|
if max_skip and max_skip > 1:
|
|
_adaptive_max_skip[tracker_id] = int(max_skip)
|
|
else:
|
|
_adaptive_max_skip.pop(tracker_id, None)
|
|
# Opting in/out mid-session should drop any prior counters so the
|
|
# new behavior applies from the next tick, not N ticks later.
|
|
_adaptive_state.pop(tracker_id, None)
|
|
|
|
|
|
def _compute_jitter(interval_seconds: int) -> int:
|
|
"""Return a jitter bound (in seconds) suitable for an IntervalTrigger.
|
|
|
|
Without jitter, a fleet of N trackers all on ``scan_interval=60`` wake up
|
|
at the same wall-clock second every minute — that creates a thundering-
|
|
herd on the upstream Immich/Gitea/etc. server. APScheduler's ``jitter``
|
|
randomizes each tick's firing time by ±jitter seconds.
|
|
|
|
We use a quarter of the interval up to a 30 s cap. For short intervals
|
|
(≤8 s) jitter would round to 0 — that's fine, at those cadences a
|
|
bursty pattern is what the user implicitly opted into.
|
|
"""
|
|
if interval_seconds <= 0:
|
|
return 0
|
|
return min(interval_seconds // 4, 30)
|
|
|
|
|
|
def get_scheduler() -> AsyncIOScheduler:
|
|
global _scheduler
|
|
if _scheduler is None:
|
|
# Sensible production defaults applied to every job unless overridden:
|
|
# * coalesce — collapse a queue of missed runs into one firing after
|
|
# a restart / pause, instead of bursting to catch up.
|
|
# * misfire_grace_time — accept firings up to 5 min late without
|
|
# dropping them silently.
|
|
# * max_instances=1 — never run two copies of the same tracker tick
|
|
# concurrently; the scheduler already enforces this on add_job,
|
|
# but we also set it as the default for safety.
|
|
_scheduler = AsyncIOScheduler(
|
|
job_defaults={
|
|
"coalesce": True,
|
|
"misfire_grace_time": 300,
|
|
"max_instances": 1,
|
|
},
|
|
)
|
|
return _scheduler
|
|
|
|
|
|
async def start_scheduler() -> None:
|
|
scheduler = get_scheduler()
|
|
if not scheduler.running:
|
|
scheduler.start()
|
|
_LOGGER.info("Scheduler started")
|
|
|
|
await _load_tracker_jobs()
|
|
await _load_action_jobs()
|
|
await _load_immich_dispatch_jobs()
|
|
|
|
# Start Telegram bot polling for bots with active command listeners
|
|
from .telegram_poller import start_command_listener_polling
|
|
await start_command_listener_polling()
|
|
|
|
# Schedule daily cleanup of old event log entries
|
|
_schedule_event_cleanup()
|
|
|
|
# Schedule periodic Telegram chat title refresh
|
|
_schedule_telegram_chat_sync()
|
|
|
|
# Start debounced command auto-sync scheduler
|
|
from .command_sync import start_sync_scheduler
|
|
start_sync_scheduler()
|
|
|
|
# Load scheduled backup job if enabled
|
|
await _load_backup_job()
|
|
|
|
|
|
def _schedule_event_cleanup() -> None:
|
|
"""Schedule a daily job to delete EventLog entries older than 90 days."""
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
|
|
scheduler = get_scheduler()
|
|
job_id = "cleanup_old_events"
|
|
if scheduler.get_job(job_id):
|
|
return
|
|
scheduler.add_job(
|
|
_cleanup_old_events,
|
|
CronTrigger(hour=3, minute=0),
|
|
id=job_id,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info("Scheduled daily event log cleanup at 03:00 UTC")
|
|
|
|
|
|
# Chat-title refresh tuning.
|
|
# Sweep runs daily as a fallback — we additionally refresh opportunistically
|
|
# on every incoming webhook/long-poll update (``save_chat_from_webhook``), so
|
|
# the sweep only catches chats that haven't sent anything recently.
|
|
_CHAT_SYNC_INTERVAL_HOURS = 24
|
|
_CHAT_SYNC_INITIAL_DELAY_SECONDS = 60
|
|
_CHAT_SYNC_CONCURRENCY = 10
|
|
|
|
|
|
def _schedule_telegram_chat_sync() -> None:
|
|
"""Schedule periodic refresh of Telegram chat titles via getChat."""
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
|
|
scheduler = get_scheduler()
|
|
job_id = "refresh_telegram_chat_titles"
|
|
if scheduler.get_job(job_id):
|
|
return
|
|
scheduler.add_job(
|
|
_refresh_telegram_chat_titles,
|
|
IntervalTrigger(hours=_CHAT_SYNC_INTERVAL_HOURS),
|
|
id=job_id,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
next_run_time=None,
|
|
)
|
|
# Fire once shortly after startup so stale names refresh without waiting a day.
|
|
from datetime import datetime, timedelta, timezone
|
|
scheduler.add_job(
|
|
_refresh_telegram_chat_titles,
|
|
"date",
|
|
run_date=datetime.now(timezone.utc) + timedelta(seconds=_CHAT_SYNC_INITIAL_DELAY_SECONDS),
|
|
id="refresh_telegram_chat_titles_once",
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled Telegram chat title refresh every %sh (concurrency %s)",
|
|
_CHAT_SYNC_INTERVAL_HOURS, _CHAT_SYNC_CONCURRENCY,
|
|
)
|
|
|
|
|
|
async def _refresh_telegram_chat_titles() -> None:
|
|
"""Refresh TelegramChat.title/username via getChat for all known chats.
|
|
|
|
Runs requests in bounded parallel (``_CHAT_SYNC_CONCURRENCY``) so a fleet
|
|
of 50 chats finishes in ~5 round-trips instead of 50. Telegram's
|
|
``getChat`` rate limit is well above 10 concurrent per bot, and the cap is
|
|
global across bots so we never flood the shared HTTP session.
|
|
"""
|
|
import asyncio
|
|
from collections import defaultdict
|
|
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from notify_bridge_core.notifications.telegram.client import TelegramClient
|
|
|
|
from ..database.engine import get_engine
|
|
from ..database.models import TelegramBot, TelegramChat
|
|
from .http_session import get_http_session
|
|
|
|
engine = get_engine()
|
|
async with AsyncSession(engine) as session:
|
|
bots = (await session.exec(select(TelegramBot))).all()
|
|
bot_tokens = {b.id: b.token for b in bots if b.token}
|
|
if not bot_tokens:
|
|
return
|
|
chats = (await session.exec(select(TelegramChat))).all()
|
|
|
|
by_bot: dict[int, list[TelegramChat]] = defaultdict(list)
|
|
for chat in chats:
|
|
if chat.bot_id in bot_tokens:
|
|
by_bot[chat.bot_id].append(chat)
|
|
if not by_bot:
|
|
return
|
|
|
|
http = await get_http_session()
|
|
clients_by_bot = {
|
|
bot_id: TelegramClient(http, token) for bot_id, token in bot_tokens.items()
|
|
}
|
|
|
|
sem = asyncio.Semaphore(_CHAT_SYNC_CONCURRENCY)
|
|
|
|
async def _fetch(bot_id: int, chat: TelegramChat) -> tuple[int, dict | None, str | None]:
|
|
"""Return (chat_row_id, info_dict_or_None, error_message_or_None)."""
|
|
async with sem:
|
|
try:
|
|
res = await clients_by_bot[bot_id].get_chat(chat.chat_id)
|
|
except Exception as err: # noqa: BLE001
|
|
return chat.id, None, str(err)
|
|
if not res.get("success"):
|
|
return chat.id, None, res.get("error") or "unknown"
|
|
return chat.id, (res.get("result") or {}), None
|
|
|
|
tasks = [
|
|
_fetch(bot_id, chat)
|
|
for bot_id, bot_chats in by_bot.items()
|
|
for chat in bot_chats
|
|
]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
refreshed = 0
|
|
errors = 0
|
|
# Bucket results first, then fetch all rows in one IN-query instead of
|
|
# per-row ``session.get`` — otherwise a 50-chat fleet issues 50 extra
|
|
# SELECTs before commit.
|
|
successes: dict[int, dict] = {}
|
|
for chat_id, info, err in results:
|
|
if err is not None or info is None:
|
|
errors += 1
|
|
if err:
|
|
_LOGGER.debug("getChat failed for chat row %s: %s", chat_id, err)
|
|
continue
|
|
if chat_id is not None:
|
|
successes[chat_id] = info
|
|
async with AsyncSession(engine) as session:
|
|
if successes:
|
|
rows = (await session.exec(
|
|
select(TelegramChat).where(TelegramChat.id.in_(list(successes.keys())))
|
|
)).all()
|
|
for merged in rows:
|
|
info = successes.get(merged.id)
|
|
if not info:
|
|
continue
|
|
title = info.get("title") or (
|
|
(info.get("first_name", "") + " " + info.get("last_name", "")).strip()
|
|
)
|
|
changed = False
|
|
if title and merged.title != title:
|
|
merged.title = title
|
|
changed = True
|
|
new_username = info.get("username")
|
|
if new_username is not None and merged.username != new_username:
|
|
merged.username = new_username
|
|
changed = True
|
|
if changed:
|
|
session.add(merged)
|
|
refreshed += 1
|
|
await session.commit()
|
|
_LOGGER.info(
|
|
"Telegram chat title refresh: %s updated, %s errors", refreshed, errors
|
|
)
|
|
|
|
|
|
async def _cleanup_old_events() -> None:
|
|
"""Delete EventLog / WebhookPayloadLog / ActionExecution rows older than the
|
|
configured retention window. A retention of 0 disables the job.
|
|
"""
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from sqlmodel import delete
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from ..config import settings
|
|
from ..database.engine import get_engine
|
|
from ..database.models import ActionExecution, EventLog, WebhookPayloadLog
|
|
|
|
days = settings.event_log_retention_days
|
|
if days <= 0:
|
|
_LOGGER.debug("Event log retention disabled (days=0); skipping cleanup")
|
|
return
|
|
|
|
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
|
|
engine = get_engine()
|
|
async with AsyncSession(engine) as session:
|
|
await session.exec(delete(EventLog).where(EventLog.created_at < cutoff))
|
|
await session.exec(
|
|
delete(WebhookPayloadLog).where(WebhookPayloadLog.created_at < cutoff)
|
|
)
|
|
await session.exec(
|
|
delete(ActionExecution).where(ActionExecution.started_at < cutoff)
|
|
)
|
|
await session.commit()
|
|
_LOGGER.info(
|
|
"Cleaned event_log / webhook_payload_log / action_execution older than %s",
|
|
cutoff.date(),
|
|
)
|
|
|
|
|
|
async def _load_tracker_jobs() -> None:
|
|
"""Load enabled trackers and schedule polling jobs."""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
from ..database.engine import get_engine
|
|
from ..database.models import NotificationTracker, ServiceProvider as ServiceProviderModel
|
|
|
|
engine = get_engine()
|
|
scheduler = get_scheduler()
|
|
|
|
async with AsyncSession(engine) as session:
|
|
result = await session.exec(select(NotificationTracker).where(NotificationTracker.enabled == True))
|
|
trackers = result.all()
|
|
|
|
# Batch-load provider types for scheduler detection
|
|
unique_provider_ids = list({t.provider_id for t in trackers})
|
|
provider_types: dict[int, str] = {}
|
|
if unique_provider_ids:
|
|
provider_result = await session.exec(
|
|
select(ServiceProviderModel).where(
|
|
ServiceProviderModel.id.in_(unique_provider_ids)
|
|
)
|
|
)
|
|
provider_types = {p.id: p.type for p in provider_result.all()}
|
|
|
|
tz = await _load_app_timezone()
|
|
|
|
from notify_bridge_core.providers.capabilities import get_capabilities
|
|
|
|
for tracker in trackers:
|
|
job_id = f"tracker_{tracker.id}"
|
|
if scheduler.get_job(job_id):
|
|
continue
|
|
|
|
ptype = provider_types.get(tracker.provider_id, "")
|
|
filters = tracker.filters or {}
|
|
|
|
# Webhook-based providers receive events via inbound HTTP — there is
|
|
# nothing to poll. Scheduling an interval job for them just wakes up
|
|
# check_tracker every scan_interval seconds to immediately return,
|
|
# wasting CPU and DB queries for no work.
|
|
caps = get_capabilities(ptype) if ptype else None
|
|
if caps and caps.webhook_based:
|
|
_LOGGER.debug(
|
|
"Skipping interval scheduling for webhook tracker %d (%s, type=%s)",
|
|
tracker.id, tracker.name, ptype,
|
|
)
|
|
continue
|
|
|
|
# Scheduler providers can use cron triggers
|
|
if ptype == "scheduler" and filters.get("schedule_type") == "cron":
|
|
cron_expr = filters.get("cron_expression", "")
|
|
if cron_expr:
|
|
try:
|
|
_add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name, tz)
|
|
continue
|
|
except Exception as e:
|
|
_LOGGER.error(
|
|
"Invalid cron expression for tracker %d (%s): %s — falling back to interval",
|
|
tracker.id, tracker.name, e,
|
|
)
|
|
|
|
jitter = _compute_jitter(tracker.scan_interval)
|
|
scheduler.add_job(
|
|
_poll_tracker,
|
|
"interval",
|
|
seconds=tracker.scan_interval,
|
|
jitter=jitter or None,
|
|
id=job_id,
|
|
args=[tracker.id],
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
set_adaptive_max_skip(tracker.id, tracker.adaptive_max_skip)
|
|
_LOGGER.info(
|
|
"Scheduled tracker %d (%s) every %ds (jitter ±%ds, adaptive_max_skip=%s)",
|
|
tracker.id, tracker.name, tracker.scan_interval, jitter,
|
|
tracker.adaptive_max_skip,
|
|
)
|
|
|
|
|
|
def _add_cron_job(
|
|
scheduler: AsyncIOScheduler,
|
|
job_id: str,
|
|
tracker_id: int,
|
|
cron_expression: str,
|
|
tracker_name: str,
|
|
tz: ZoneInfo,
|
|
) -> None:
|
|
"""Add a cron-triggered job for a scheduler-type tracker.
|
|
|
|
``tz`` is the user-configured app timezone; without it APScheduler
|
|
interprets the crontab in the host's local timezone, which surfaces as
|
|
events firing at the "wrong" wall-clock time for operators in a non-UTC
|
|
zone (see the companion fix in ``update_settings`` which reschedules on
|
|
timezone changes).
|
|
"""
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
trigger = CronTrigger.from_crontab(cron_expression, timezone=tz)
|
|
scheduler.add_job(
|
|
_poll_tracker,
|
|
trigger,
|
|
id=job_id,
|
|
args=[tracker_id],
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled tracker %d (%s) with cron: %s [tz=%s]",
|
|
tracker_id, tracker_name, cron_expression, tz.key,
|
|
)
|
|
|
|
|
|
async def _is_webhook_tracker(tracker_id: int) -> bool:
|
|
"""Return True iff the tracker's provider type is webhook-based.
|
|
|
|
Looks up provider type once via the capabilities registry. Used by
|
|
``schedule_tracker`` to short-circuit interval scheduling.
|
|
"""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
from notify_bridge_core.providers.capabilities import get_capabilities
|
|
from ..database.engine import get_engine
|
|
from ..database.models import NotificationTracker, ServiceProvider as ServiceProviderModel
|
|
|
|
async with AsyncSession(get_engine()) as session:
|
|
tracker = await session.get(NotificationTracker, tracker_id)
|
|
if tracker is None:
|
|
return False
|
|
provider = await session.get(ServiceProviderModel, tracker.provider_id)
|
|
if provider is None:
|
|
return False
|
|
caps = get_capabilities(provider.type)
|
|
return bool(caps and caps.webhook_based)
|
|
|
|
|
|
async def schedule_tracker(
|
|
tracker_id: int,
|
|
interval: int,
|
|
cron_expression: str | None = None,
|
|
adaptive_max_skip: int | None = None,
|
|
) -> None:
|
|
"""Add or update a scheduler job for a tracker.
|
|
|
|
``adaptive_max_skip`` mirrors the DB column and is registered with the
|
|
adaptive module-state so tick-time skip decisions don't re-query the DB.
|
|
Pass ``None`` or ``0`` to disable back-off for the tracker.
|
|
|
|
Webhook-based providers receive events via inbound HTTP and have nothing
|
|
to poll, so this no-ops for them — preventing scan_interval from creating
|
|
useless wakeups via the API create/update path.
|
|
"""
|
|
scheduler = get_scheduler()
|
|
job_id = f"tracker_{tracker_id}"
|
|
|
|
# A reschedule typically follows a config edit or enable/disable flip —
|
|
# drop adaptive back-off so the first tick after the change runs promptly.
|
|
reset_adaptive_state(tracker_id)
|
|
set_adaptive_max_skip(tracker_id, adaptive_max_skip)
|
|
|
|
# Remove existing job first to allow trigger type changes
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
|
|
# Webhook-based providers don't poll — skip job creation entirely.
|
|
if await _is_webhook_tracker(tracker_id):
|
|
_LOGGER.debug(
|
|
"Skipping interval scheduling for webhook tracker %d", tracker_id,
|
|
)
|
|
return
|
|
|
|
if cron_expression:
|
|
try:
|
|
tz = await _load_app_timezone()
|
|
_add_cron_job(scheduler, job_id, tracker_id, cron_expression, f"tracker-{tracker_id}", tz)
|
|
return
|
|
except Exception as e:
|
|
_LOGGER.error("Invalid cron for tracker %d: %s — using interval", tracker_id, e)
|
|
|
|
jitter = _compute_jitter(interval)
|
|
scheduler.add_job(
|
|
_poll_tracker,
|
|
"interval",
|
|
seconds=interval,
|
|
jitter=jitter or None,
|
|
id=job_id,
|
|
args=[tracker_id],
|
|
replace_existing=True,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled tracker %d every %ds (jitter ±%ds, adaptive_max_skip=%s)",
|
|
tracker_id, interval, jitter, adaptive_max_skip,
|
|
)
|
|
|
|
|
|
async def unschedule_tracker(tracker_id: int) -> None:
|
|
"""Remove a scheduler job for a tracker."""
|
|
scheduler = get_scheduler()
|
|
job_id = f"tracker_{tracker_id}"
|
|
reset_adaptive_state(tracker_id)
|
|
_adaptive_max_skip.pop(tracker_id, None)
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
_LOGGER.info("Unscheduled tracker %d", tracker_id)
|
|
|
|
|
|
def _adaptive_should_skip(tracker_id: int) -> bool:
|
|
"""Return True when the adaptive heuristic says to skip this tick.
|
|
|
|
Short-circuits to False for trackers without a registered cap (adaptive
|
|
off). Otherwise: if we're in 1-in-K mode, skip (K-1) ticks between each
|
|
real poll.
|
|
"""
|
|
if tracker_id not in _adaptive_max_skip:
|
|
return False
|
|
state = _adaptive_state.get(tracker_id)
|
|
if not state:
|
|
return False
|
|
skip_every = state.get("skip_every", 1)
|
|
if skip_every <= 1:
|
|
return False
|
|
state["tick_counter"] = state.get("tick_counter", 0) + 1
|
|
# Fire on ticks where counter % skip_every == 0; skip the rest.
|
|
return (state["tick_counter"] % skip_every) != 0
|
|
|
|
|
|
def _adaptive_update(tracker_id: int, events_detected: int) -> None:
|
|
"""Update the adaptive counter after a real tick ran.
|
|
|
|
No-op when the tracker has adaptive polling disabled — otherwise we'd
|
|
build up empty counters for trackers that will never use them.
|
|
"""
|
|
cap = _adaptive_max_skip.get(tracker_id)
|
|
if not cap or cap <= 1:
|
|
return
|
|
state = _adaptive_state.setdefault(
|
|
tracker_id, {"empty_count": 0, "skip_every": 1, "tick_counter": 0}
|
|
)
|
|
if events_detected > 0:
|
|
if state["skip_every"] > 1:
|
|
_LOGGER.info(
|
|
"Adaptive polling: tracker %d saw activity, restoring base rate",
|
|
tracker_id,
|
|
)
|
|
state["empty_count"] = 0
|
|
state["skip_every"] = 1
|
|
state["tick_counter"] = 0
|
|
return
|
|
|
|
state["empty_count"] = state.get("empty_count", 0) + 1
|
|
target_quarter = min(cap, 4)
|
|
if (
|
|
state["empty_count"] >= _ADAPTIVE_QUARTER_THRESHOLD
|
|
and state["skip_every"] < target_quarter
|
|
):
|
|
state["skip_every"] = target_quarter
|
|
_LOGGER.info(
|
|
"Adaptive polling: tracker %d idle for %d ticks, skipping %d of %d",
|
|
tracker_id, state["empty_count"],
|
|
target_quarter - 1, target_quarter,
|
|
)
|
|
elif (
|
|
state["empty_count"] >= _ADAPTIVE_HALVE_THRESHOLD
|
|
and state["skip_every"] < min(cap, 2)
|
|
):
|
|
state["skip_every"] = min(cap, 2)
|
|
_LOGGER.info(
|
|
"Adaptive polling: tracker %d idle for %d ticks, skipping every other",
|
|
tracker_id, state["empty_count"],
|
|
)
|
|
|
|
|
|
def reset_adaptive_state(tracker_id: int) -> None:
|
|
"""Drop cached adaptive counters for a tracker.
|
|
|
|
Used by API callers that make changes requiring the tracker to run
|
|
promptly on the next scheduled tick (enable/disable, config edits,
|
|
manual "check now" actions). Does NOT clear the configured cap — use
|
|
``set_adaptive_max_skip(..., None)`` for that.
|
|
"""
|
|
_adaptive_state.pop(tracker_id, None)
|
|
|
|
|
|
async def _poll_tracker(tracker_id: int) -> None:
|
|
"""Poll a tracker for changes."""
|
|
from .watcher import check_tracker
|
|
|
|
if _adaptive_should_skip(tracker_id):
|
|
return
|
|
|
|
try:
|
|
result = await check_tracker(tracker_id)
|
|
except Exception as e:
|
|
_LOGGER.error("Error polling tracker %d: %s", tracker_id, e)
|
|
return
|
|
|
|
# Treat the "error" / "skipped" statuses as inconclusive — don't let
|
|
# a transient upstream failure trick the heuristic into backing off.
|
|
if isinstance(result, dict) and result.get("status") == "ok":
|
|
_adaptive_update(tracker_id, int(result.get("events_detected", 0) or 0))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Action scheduling
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def _load_action_jobs() -> None:
|
|
"""Load enabled actions and schedule execution jobs."""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
from ..database.engine import get_engine
|
|
from ..database.models import Action
|
|
|
|
engine = get_engine()
|
|
scheduler = get_scheduler()
|
|
|
|
async with AsyncSession(engine) as session:
|
|
result = await session.exec(
|
|
select(Action).where(Action.enabled == True) # noqa: E712
|
|
)
|
|
actions = result.all()
|
|
|
|
tz = await _load_app_timezone()
|
|
|
|
for action in actions:
|
|
job_id = f"action_{action.id}"
|
|
if scheduler.get_job(job_id):
|
|
continue
|
|
|
|
if action.schedule_type == "cron" and action.schedule_cron:
|
|
try:
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
trigger = CronTrigger.from_crontab(action.schedule_cron, timezone=tz)
|
|
scheduler.add_job(
|
|
_run_action,
|
|
trigger,
|
|
id=job_id,
|
|
args=[action.id],
|
|
replace_existing=True,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled action %d (%s) with cron: %s [tz=%s]",
|
|
action.id, action.name, action.schedule_cron, tz.key,
|
|
)
|
|
continue
|
|
except Exception as e:
|
|
_LOGGER.error(
|
|
"Invalid cron for action %d (%s): %s — falling back to interval",
|
|
action.id, action.name, e,
|
|
)
|
|
|
|
scheduler.add_job(
|
|
_run_action,
|
|
"interval",
|
|
seconds=action.schedule_interval,
|
|
id=job_id,
|
|
args=[action.id],
|
|
replace_existing=True,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled action %d (%s) every %ds",
|
|
action.id, action.name, action.schedule_interval,
|
|
)
|
|
|
|
|
|
async def schedule_action(
|
|
action_id: int,
|
|
schedule_type: str = "interval",
|
|
interval: int = 3600,
|
|
cron_expression: str = "",
|
|
) -> None:
|
|
"""Add or update a scheduler job for an action."""
|
|
scheduler = get_scheduler()
|
|
job_id = f"action_{action_id}"
|
|
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
|
|
if schedule_type == "cron" and cron_expression:
|
|
try:
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
tz = await _load_app_timezone()
|
|
trigger = CronTrigger.from_crontab(cron_expression, timezone=tz)
|
|
scheduler.add_job(
|
|
_run_action,
|
|
trigger,
|
|
id=job_id,
|
|
args=[action_id],
|
|
replace_existing=True,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled action %d with cron: %s [tz=%s]",
|
|
action_id, cron_expression, tz.key,
|
|
)
|
|
return
|
|
except Exception as e:
|
|
_LOGGER.error("Invalid cron for action %d: %s — using interval", action_id, e)
|
|
|
|
scheduler.add_job(
|
|
_run_action,
|
|
"interval",
|
|
seconds=interval,
|
|
id=job_id,
|
|
args=[action_id],
|
|
replace_existing=True,
|
|
)
|
|
_LOGGER.info("Scheduled action %d every %ds", action_id, interval)
|
|
|
|
|
|
async def unschedule_action(action_id: int) -> None:
|
|
"""Remove a scheduler job for an action."""
|
|
scheduler = get_scheduler()
|
|
job_id = f"action_{action_id}"
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
_LOGGER.info("Unscheduled action %d", action_id)
|
|
|
|
|
|
async def reschedule_cron_jobs_for_timezone_change() -> None:
|
|
"""Re-add every cron-triggered tracker/action job under the new app timezone.
|
|
|
|
Called by the admin settings endpoint after the ``timezone`` AppSetting is
|
|
updated. APScheduler's ``CronTrigger`` freezes its timezone at construction
|
|
time, so a timezone change has no effect on jobs already in the scheduler
|
|
— we have to rebuild those jobs. Interval-triggered jobs are tz-agnostic
|
|
and are left alone.
|
|
"""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from ..database.engine import get_engine
|
|
from ..database.models import Action, NotificationTracker, ServiceProvider as ServiceProviderModel
|
|
|
|
engine = get_engine()
|
|
scheduler = get_scheduler()
|
|
tz = await _load_app_timezone()
|
|
rescheduled = 0
|
|
|
|
async with AsyncSession(engine) as session:
|
|
# Trackers with cron scheduling (scheduler provider + schedule_type=cron).
|
|
trackers = (await session.exec(
|
|
select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712
|
|
)).all()
|
|
provider_ids = list({t.provider_id for t in trackers})
|
|
provider_types: dict[int, str] = {}
|
|
if provider_ids:
|
|
rows = await session.exec(
|
|
select(ServiceProviderModel).where(ServiceProviderModel.id.in_(provider_ids))
|
|
)
|
|
provider_types = {p.id: p.type for p in rows.all()}
|
|
|
|
for tracker in trackers:
|
|
if provider_types.get(tracker.provider_id) != "scheduler":
|
|
continue
|
|
filters = tracker.filters or {}
|
|
if filters.get("schedule_type") != "cron":
|
|
continue
|
|
cron_expr = filters.get("cron_expression", "")
|
|
if not cron_expr:
|
|
continue
|
|
job_id = f"tracker_{tracker.id}"
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
try:
|
|
_add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name, tz)
|
|
rescheduled += 1
|
|
except Exception as e: # noqa: BLE001
|
|
_LOGGER.error(
|
|
"Failed to re-apply cron for tracker %d on tz change: %s",
|
|
tracker.id, e,
|
|
)
|
|
|
|
# Actions with cron schedules.
|
|
actions = (await session.exec(
|
|
select(Action).where(Action.enabled == True) # noqa: E712
|
|
)).all()
|
|
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
for action in actions:
|
|
if action.schedule_type != "cron" or not action.schedule_cron:
|
|
continue
|
|
job_id = f"action_{action.id}"
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
try:
|
|
scheduler.add_job(
|
|
_run_action,
|
|
CronTrigger.from_crontab(action.schedule_cron, timezone=tz),
|
|
id=job_id,
|
|
args=[action.id],
|
|
replace_existing=True,
|
|
)
|
|
rescheduled += 1
|
|
except Exception as e: # noqa: BLE001
|
|
_LOGGER.error(
|
|
"Failed to re-apply cron for action %d on tz change: %s",
|
|
action.id, e,
|
|
)
|
|
|
|
_LOGGER.info(
|
|
"Rescheduled %d cron job(s) for new app timezone %s", rescheduled, tz.key,
|
|
)
|
|
|
|
# Immich scheduled/periodic/memory jobs are also CronTrigger-based and
|
|
# carry the same frozen-tz problem — rebuild them under the new tz.
|
|
await reschedule_immich_dispatch_jobs()
|
|
|
|
|
|
async def _run_action(action_id: int) -> None:
|
|
"""Run an action (called by APScheduler)."""
|
|
from .action_runner import run_action
|
|
try:
|
|
await run_action(action_id, trigger="scheduled")
|
|
except Exception as e:
|
|
_LOGGER.error("Error running action %d: %s", action_id, e)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Immich scheduled / periodic / memory dispatch (cron-fired)
|
|
#
|
|
# These three slots fire on wall-clock schedules taken from the tracker's
|
|
# default ``TrackingConfig`` (``scheduled_times``, ``periodic_times``,
|
|
# ``memory_times`` — comma-separated ``HH:MM`` strings) interpreted in the
|
|
# app-level IANA timezone. The dispatch flow lives in
|
|
# ``services.scheduled_dispatch``; this section just owns scheduling.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_IMMICH_DISPATCH_KINDS = ("scheduled", "periodic", "memory")
|
|
_IMMICH_DISPATCH_PREFIX = "immich_dispatch_"
|
|
|
|
|
|
def _parse_hhmm_list(raw: str) -> list[tuple[int, int]]:
|
|
"""Parse ``"09:00,18:30"`` → ``[(9, 0), (18, 30)]``, skipping bad entries.
|
|
|
|
A typo in one slot must not prevent the others from scheduling — we log
|
|
and move on rather than raising.
|
|
"""
|
|
out: list[tuple[int, int]] = []
|
|
for part in (raw or "").split(","):
|
|
part = part.strip()
|
|
if not part:
|
|
continue
|
|
try:
|
|
h_str, m_str = part.split(":", 1)
|
|
hour, minute = int(h_str), int(m_str)
|
|
except ValueError:
|
|
_LOGGER.warning("Skipping invalid time literal %r", part)
|
|
continue
|
|
if not (0 <= hour <= 23 and 0 <= minute <= 59):
|
|
_LOGGER.warning("Skipping out-of-range time %r", part)
|
|
continue
|
|
out.append((hour, minute))
|
|
return out
|
|
|
|
|
|
async def _run_immich_dispatch(tracker_id: int, kind: str) -> None:
|
|
"""APScheduler entry point — wraps the dispatch helper to swallow errors."""
|
|
from .scheduled_dispatch import dispatch_scheduled_for_tracker
|
|
try:
|
|
await dispatch_scheduled_for_tracker(tracker_id, kind) # type: ignore[arg-type]
|
|
except Exception as err: # noqa: BLE001
|
|
_LOGGER.error(
|
|
"Immich %s dispatch for tracker %d failed: %s", kind, tracker_id, err,
|
|
)
|
|
|
|
|
|
async def _load_immich_dispatch_jobs() -> None:
|
|
"""Schedule cron jobs for every (tracker, kind, time) where the kind is on.
|
|
|
|
Reads each enabled Immich tracker's *default* tracking config — per-link
|
|
overrides only gate dispatch (handled in ``scheduled_dispatch``), they do
|
|
not influence the fire schedule.
|
|
"""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
|
|
from ..database.engine import get_engine
|
|
from ..database.models import (
|
|
NotificationTracker,
|
|
ServiceProvider as ServiceProviderModel,
|
|
TrackingConfig,
|
|
)
|
|
|
|
engine = get_engine()
|
|
scheduler = get_scheduler()
|
|
tz = await _load_app_timezone()
|
|
|
|
async with AsyncSession(engine) as session:
|
|
trackers = (await session.exec(
|
|
select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712
|
|
)).all()
|
|
if not trackers:
|
|
return
|
|
|
|
provider_ids = list({t.provider_id for t in trackers})
|
|
provider_types: dict[int, str] = {}
|
|
if provider_ids:
|
|
rows = await session.exec(
|
|
select(ServiceProviderModel).where(
|
|
ServiceProviderModel.id.in_(provider_ids)
|
|
)
|
|
)
|
|
provider_types = {p.id: p.type for p in rows.all()}
|
|
|
|
tc_ids = list({
|
|
t.default_tracking_config_id for t in trackers
|
|
if t.default_tracking_config_id
|
|
})
|
|
tc_map: dict[int, TrackingConfig] = {}
|
|
if tc_ids:
|
|
rows = await session.exec(
|
|
select(TrackingConfig).where(TrackingConfig.id.in_(tc_ids))
|
|
)
|
|
tc_map = {tc.id: tc for tc in rows.all()}
|
|
|
|
scheduled = 0
|
|
for tracker in trackers:
|
|
if provider_types.get(tracker.provider_id) != "immich":
|
|
continue
|
|
tc = tc_map.get(tracker.default_tracking_config_id) if tracker.default_tracking_config_id else None
|
|
if tc is None:
|
|
continue
|
|
|
|
for kind in _IMMICH_DISPATCH_KINDS:
|
|
if not getattr(tc, f"{kind}_enabled", False):
|
|
continue
|
|
times_raw = getattr(tc, f"{kind}_times", "") or ""
|
|
for hour, minute in _parse_hhmm_list(times_raw):
|
|
job_id = f"{_IMMICH_DISPATCH_PREFIX}{kind}_{tracker.id}_{hour:02d}{minute:02d}"
|
|
scheduler.add_job(
|
|
_run_immich_dispatch,
|
|
CronTrigger(hour=hour, minute=minute, timezone=tz),
|
|
id=job_id,
|
|
args=[tracker.id, kind],
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
scheduled += 1
|
|
_LOGGER.info(
|
|
"Scheduled Immich %s for tracker %d at %02d:%02d [tz=%s]",
|
|
kind, tracker.id, hour, minute, tz.key,
|
|
)
|
|
|
|
if scheduled:
|
|
_LOGGER.info(
|
|
"Loaded %d Immich scheduled/periodic/memory job(s) [tz=%s]",
|
|
scheduled, tz.key,
|
|
)
|
|
|
|
|
|
async def reschedule_immich_dispatch_jobs() -> None:
|
|
"""Drop and rebuild all Immich scheduled/periodic/memory jobs.
|
|
|
|
Cheap to call on every relevant mutation — a typical install has only a
|
|
handful of trackers. Called from the tracker, link, and tracking-config
|
|
CRUD endpoints, and from ``reschedule_cron_jobs_for_timezone_change``.
|
|
"""
|
|
scheduler = get_scheduler()
|
|
for job in list(scheduler.get_jobs()):
|
|
if job.id.startswith(_IMMICH_DISPATCH_PREFIX):
|
|
scheduler.remove_job(job.id)
|
|
await _load_immich_dispatch_jobs()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Scheduled backup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_BACKUP_JOB_ID = "scheduled_backup"
|
|
|
|
|
|
async def _load_backup_job() -> None:
|
|
"""Load scheduled backup job from settings if enabled."""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession as _AS
|
|
from ..database.engine import get_engine
|
|
from ..database.models import AppSetting
|
|
|
|
engine = get_engine()
|
|
async with _AS(engine) as session:
|
|
enabled_row = await session.get(AppSetting, "backup_scheduled_enabled")
|
|
interval_row = await session.get(AppSetting, "backup_scheduled_interval_hours")
|
|
|
|
enabled = enabled_row and enabled_row.value == "true"
|
|
if not enabled:
|
|
return
|
|
|
|
interval_hours = int(interval_row.value) if interval_row and interval_row.value else 24
|
|
scheduler = get_scheduler()
|
|
scheduler.add_job(
|
|
_run_scheduled_backup,
|
|
"interval",
|
|
hours=interval_hours,
|
|
id=_BACKUP_JOB_ID,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info("Scheduled backup every %dh", interval_hours)
|
|
|
|
|
|
async def schedule_backup(interval_hours: int = 24) -> None:
|
|
"""Add or update the scheduled backup job."""
|
|
scheduler = get_scheduler()
|
|
if scheduler.get_job(_BACKUP_JOB_ID):
|
|
scheduler.remove_job(_BACKUP_JOB_ID)
|
|
|
|
scheduler.add_job(
|
|
_run_scheduled_backup,
|
|
"interval",
|
|
hours=interval_hours,
|
|
id=_BACKUP_JOB_ID,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info("Scheduled backup every %dh", interval_hours)
|
|
|
|
|
|
async def unschedule_backup() -> None:
|
|
"""Remove the scheduled backup job."""
|
|
scheduler = get_scheduler()
|
|
if scheduler.get_job(_BACKUP_JOB_ID):
|
|
scheduler.remove_job(_BACKUP_JOB_ID)
|
|
_LOGGER.info("Unscheduled backup job")
|
|
|
|
|
|
async def _run_scheduled_backup() -> None:
|
|
"""Run a scheduled backup (called by APScheduler)."""
|
|
from sqlmodel.ext.asyncio.session import AsyncSession as _AS
|
|
from ..database.engine import get_engine
|
|
from ..database.models import AppSetting, User
|
|
from ..config import settings as app_config
|
|
from .backup_schema import SecretsMode
|
|
from .backup_service import export_backup_to_file, cleanup_old_backups
|
|
|
|
try:
|
|
engine = get_engine()
|
|
async with _AS(engine) as session:
|
|
# Read settings
|
|
secrets_row = await session.get(AppSetting, "backup_secrets_mode")
|
|
retention_row = await session.get(AppSetting, "backup_retention_count")
|
|
|
|
secrets_mode = SecretsMode(secrets_row.value) if secrets_row and secrets_row.value else SecretsMode.EXCLUDE
|
|
retention = int(retention_row.value) if retention_row and retention_row.value else 5
|
|
|
|
# Find admin user (first admin) for ownership context
|
|
from sqlmodel import select
|
|
admin_result = await session.exec(
|
|
select(User).where(User.role == "admin")
|
|
)
|
|
admin = admin_result.first()
|
|
if not admin:
|
|
_LOGGER.warning("No admin user found, skipping scheduled backup")
|
|
return
|
|
|
|
backup_dir = app_config.data_dir / "backups"
|
|
await export_backup_to_file(session, admin.id, backup_dir, secrets_mode)
|
|
|
|
# Cleanup outside the session
|
|
cleanup_old_backups(backup_dir, keep=retention)
|
|
|
|
except Exception as e:
|
|
_LOGGER.error("Scheduled backup failed: %s", e)
|