8065e6effa
Replace the single comma-separated text box with an add/remove list of native HH:MM pickers for the Immich scheduled-assets, periodic-summary, and memory slots. The backend already stored comma-separated *_times and scheduled one cron job per time; this makes entering several times per day discoverable and hardens the read/write path. Backend: - services/time_list.py: normalize_time_list (validate / dedup / sort / cap at 24, raising TimeListError) + lenient parse_hhmm_list; the scheduler now uses the shared parser (drops its private copy). - tracking_configs API normalizes *_times on every write (422 on bad input) and rejects enabling a slot whose times list is empty. - scheduler warns when an enabled slot has zero or dropped fire times, restoring the observability lost with the old per-call warning. Frontend: - TimeListEditor.svelte: add/remove native time rows, dedupe + sort on emit, per-day cap, collapses on-screen duplicates, aria-labelled rows; syncs from the value prop only on external changes (untrack guard) so keyboard entry isn't clobbered mid-edit. - Descriptor-driven save guard: an enabled feature section must have at least one time. - i18n (en/ru) keys; refreshed help text; removed dead invalidTimeList. Tests: time_list normalization/parsing (incl. non-ASCII/odd shapes) and the enabled-implies-times validation.
1361 lines
50 KiB
Python
1361 lines
50 KiB
Python
"""APScheduler-based polling scheduler for trackers and actions."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
|
|
from .time_list import parse_hhmm_list
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def _resolve_zoneinfo(tz_name: str | None) -> ZoneInfo:
|
|
"""Resolve an IANA tz string to a ZoneInfo, falling back to UTC on any error.
|
|
|
|
Kept local to avoid importing from api/dispatch layers inside the scheduler
|
|
module (which is loaded at startup, before the API routers).
|
|
"""
|
|
if not tz_name:
|
|
return ZoneInfo("UTC")
|
|
try:
|
|
return ZoneInfo(tz_name)
|
|
except (ZoneInfoNotFoundError, ValueError):
|
|
_LOGGER.warning("Unknown timezone %r; falling back to UTC", tz_name)
|
|
return ZoneInfo("UTC")
|
|
|
|
|
|
async def _load_app_timezone() -> ZoneInfo:
|
|
"""Load the admin-configured app timezone from AppSetting (falls back to UTC)."""
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from ..api.app_settings import get_setting
|
|
from ..database.engine import get_engine
|
|
|
|
async with AsyncSession(get_engine()) as session:
|
|
tz_name = await get_setting(session, "timezone")
|
|
return _resolve_zoneinfo(tz_name)
|
|
|
|
_scheduler: AsyncIOScheduler | None = None
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Adaptive polling (Tier 6 of the big-album optimization plan).
|
|
#
|
|
# We don't touch the user-configured ``scan_interval`` — that's still the
|
|
# authoritative cadence. Instead, we *skip* a growing fraction of scheduled
|
|
# ticks when a tracker is idle, and reset to 1:1 as soon as it detects
|
|
# anything. The scheduler keeps running on the user's chosen period, so
|
|
# response time to the *first* change after an idle stretch is never worse
|
|
# than one tick — but the steady-state HTTP cost for a fleet of idle
|
|
# trackers drops by ~75%.
|
|
#
|
|
# Opt-in per tracker via the ``adaptive_max_skip`` column:
|
|
# * NULL or 0 → adaptive polling disabled, every tick runs (default)
|
|
# * 2 → skip at most 1-in-2 ticks after long idle
|
|
# * 3, 4, ... → up to (N-1)-in-N skipping
|
|
# Thresholds are intentionally conservative: a tracker polling every 30 s
|
|
# needs 5 min of silence before we halve its effective rate, and 15 min
|
|
# before we quarter it.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_ADAPTIVE_HALVE_THRESHOLD = 10 # consecutive empty ticks → 1-in-2
|
|
_ADAPTIVE_QUARTER_THRESHOLD = 30 # consecutive empty ticks → 1-in-4
|
|
|
|
# Per-tracker adaptive state, keyed by tracker_id. Rebuilt on process
|
|
# restart — a short warmup period is fine and avoids persisting what is
|
|
# effectively a performance heuristic.
|
|
_adaptive_state: dict[int, dict[str, int]] = {}
|
|
|
|
# Per-tracker cap on the skip factor, mirrored from the DB column at
|
|
# schedule time. Absence of an entry (or 0) means adaptive polling is off
|
|
# for that tracker — ``_adaptive_should_skip`` returns False immediately.
|
|
_adaptive_max_skip: dict[int, int] = {}
|
|
|
|
|
|
def set_adaptive_max_skip(tracker_id: int, max_skip: int | None) -> None:
|
|
"""Register/clear the adaptive cap for a tracker.
|
|
|
|
Called by the scheduling helpers so the tick-fast-path in
|
|
``_adaptive_should_skip`` doesn't need to re-query the DB. Values ≤ 1
|
|
disable back-off for the tracker — every scheduled tick runs.
|
|
"""
|
|
if max_skip and max_skip > 1:
|
|
_adaptive_max_skip[tracker_id] = int(max_skip)
|
|
else:
|
|
_adaptive_max_skip.pop(tracker_id, None)
|
|
# Opting in/out mid-session should drop any prior counters so the
|
|
# new behavior applies from the next tick, not N ticks later.
|
|
_adaptive_state.pop(tracker_id, None)
|
|
|
|
|
|
def _compute_jitter(interval_seconds: int) -> int:
|
|
"""Return a jitter bound (in seconds) suitable for an IntervalTrigger.
|
|
|
|
Without jitter, a fleet of N trackers all on ``scan_interval=60`` wake up
|
|
at the same wall-clock second every minute — that creates a thundering-
|
|
herd on the upstream Immich/Gitea/etc. server. APScheduler's ``jitter``
|
|
randomizes each tick's firing time by ±jitter seconds.
|
|
|
|
We use a quarter of the interval up to a 30 s cap. For short intervals
|
|
(≤8 s) jitter would round to 0 — that's fine, at those cadences a
|
|
bursty pattern is what the user implicitly opted into.
|
|
"""
|
|
if interval_seconds <= 0:
|
|
return 0
|
|
return min(interval_seconds // 4, 30)
|
|
|
|
|
|
def get_scheduler() -> AsyncIOScheduler:
|
|
global _scheduler
|
|
if _scheduler is None:
|
|
# Sensible production defaults applied to every job unless overridden:
|
|
# * coalesce — collapse a queue of missed runs into one firing after
|
|
# a restart / pause, instead of bursting to catch up.
|
|
# * misfire_grace_time — accept firings up to 5 min late without
|
|
# dropping them silently.
|
|
# * max_instances=1 — never run two copies of the same tracker tick
|
|
# concurrently; the scheduler already enforces this on add_job,
|
|
# but we also set it as the default for safety.
|
|
_scheduler = AsyncIOScheduler(
|
|
job_defaults={
|
|
"coalesce": True,
|
|
"misfire_grace_time": 300,
|
|
"max_instances": 1,
|
|
},
|
|
)
|
|
return _scheduler
|
|
|
|
|
|
async def start_scheduler() -> None:
|
|
scheduler = get_scheduler()
|
|
if not scheduler.running:
|
|
scheduler.start()
|
|
_LOGGER.info("Scheduler started")
|
|
|
|
await _load_tracker_jobs()
|
|
await _load_action_jobs()
|
|
await _load_immich_dispatch_jobs()
|
|
|
|
# Start Telegram bot polling for bots with active command listeners
|
|
from .telegram_poller import start_command_listener_polling
|
|
await start_command_listener_polling()
|
|
|
|
# Schedule daily cleanup of old event log entries
|
|
_schedule_event_cleanup()
|
|
|
|
# Schedule periodic Telegram chat title refresh
|
|
_schedule_telegram_chat_sync()
|
|
|
|
# Start debounced command auto-sync scheduler
|
|
from .command_sync import start_sync_scheduler
|
|
start_sync_scheduler()
|
|
|
|
# Load scheduled backup job if enabled
|
|
await _load_backup_job()
|
|
|
|
# Re-arm any deferred-dispatch drains that were pending across restart.
|
|
from .deferred_dispatch import load_pending_drain_jobs
|
|
await load_pending_drain_jobs()
|
|
|
|
# And install the periodic safety-net catch-up scan.
|
|
_schedule_drain_catchup()
|
|
|
|
# Schedule the upstream release-check probe.
|
|
await _schedule_release_check()
|
|
|
|
# Schedule the bridge_self deferred-backlog scan (every 5 min).
|
|
_schedule_bridge_self_backlog_scan()
|
|
|
|
|
|
def _schedule_event_cleanup() -> None:
|
|
"""Schedule a daily job to delete EventLog entries older than 90 days."""
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
|
|
scheduler = get_scheduler()
|
|
job_id = "cleanup_old_events"
|
|
if scheduler.get_job(job_id):
|
|
return
|
|
scheduler.add_job(
|
|
_cleanup_old_events,
|
|
CronTrigger(hour=3, minute=0),
|
|
id=job_id,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info("Scheduled daily event log cleanup at 03:00 UTC")
|
|
|
|
|
|
# Chat-title refresh tuning.
|
|
# Sweep runs daily as a fallback — we additionally refresh opportunistically
|
|
# on every incoming webhook/long-poll update (``save_chat_from_webhook``), so
|
|
# the sweep only catches chats that haven't sent anything recently.
|
|
_CHAT_SYNC_INTERVAL_HOURS = 24
|
|
_CHAT_SYNC_INITIAL_DELAY_SECONDS = 60
|
|
_CHAT_SYNC_CONCURRENCY = 10
|
|
|
|
|
|
def _schedule_telegram_chat_sync() -> None:
|
|
"""Schedule periodic refresh of Telegram chat titles via getChat."""
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
|
|
scheduler = get_scheduler()
|
|
job_id = "refresh_telegram_chat_titles"
|
|
if scheduler.get_job(job_id):
|
|
return
|
|
scheduler.add_job(
|
|
_refresh_telegram_chat_titles,
|
|
IntervalTrigger(hours=_CHAT_SYNC_INTERVAL_HOURS),
|
|
id=job_id,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
next_run_time=None,
|
|
)
|
|
# Fire once shortly after startup so stale names refresh without waiting a day.
|
|
from datetime import datetime, timedelta, timezone
|
|
scheduler.add_job(
|
|
_refresh_telegram_chat_titles,
|
|
"date",
|
|
run_date=datetime.now(timezone.utc) + timedelta(seconds=_CHAT_SYNC_INITIAL_DELAY_SECONDS),
|
|
id="refresh_telegram_chat_titles_once",
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled Telegram chat title refresh every %sh (concurrency %s)",
|
|
_CHAT_SYNC_INTERVAL_HOURS, _CHAT_SYNC_CONCURRENCY,
|
|
)
|
|
|
|
|
|
async def _refresh_telegram_chat_titles() -> None:
|
|
"""Refresh TelegramChat.title/username via getChat for all known chats.
|
|
|
|
Runs requests in bounded parallel (``_CHAT_SYNC_CONCURRENCY``) so a fleet
|
|
of 50 chats finishes in ~5 round-trips instead of 50. Telegram's
|
|
``getChat`` rate limit is well above 10 concurrent per bot, and the cap is
|
|
global across bots so we never flood the shared HTTP session.
|
|
"""
|
|
import asyncio
|
|
from collections import defaultdict
|
|
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from notify_bridge_core.notifications.telegram.client import TelegramClient
|
|
|
|
from ..database.engine import get_engine
|
|
from ..database.models import TelegramBot, TelegramChat
|
|
from .http_session import get_http_session
|
|
|
|
engine = get_engine()
|
|
async with AsyncSession(engine) as session:
|
|
bots = (await session.exec(select(TelegramBot))).all()
|
|
bot_tokens = {b.id: b.token for b in bots if b.token}
|
|
if not bot_tokens:
|
|
return
|
|
chats = (await session.exec(select(TelegramChat))).all()
|
|
|
|
by_bot: dict[int, list[TelegramChat]] = defaultdict(list)
|
|
for chat in chats:
|
|
if chat.bot_id in bot_tokens:
|
|
by_bot[chat.bot_id].append(chat)
|
|
if not by_bot:
|
|
return
|
|
|
|
http = await get_http_session()
|
|
clients_by_bot = {
|
|
bot_id: TelegramClient(http, token) for bot_id, token in bot_tokens.items()
|
|
}
|
|
|
|
sem = asyncio.Semaphore(_CHAT_SYNC_CONCURRENCY)
|
|
|
|
async def _fetch(bot_id: int, chat: TelegramChat) -> tuple[int, dict | None, str | None]:
|
|
"""Return (chat_row_id, info_dict_or_None, error_message_or_None)."""
|
|
async with sem:
|
|
try:
|
|
res = await clients_by_bot[bot_id].get_chat(chat.chat_id)
|
|
except Exception as err: # noqa: BLE001
|
|
return chat.id, None, str(err)
|
|
if not res.get("success"):
|
|
return chat.id, None, res.get("error") or "unknown"
|
|
return chat.id, (res.get("result") or {}), None
|
|
|
|
tasks = [
|
|
_fetch(bot_id, chat)
|
|
for bot_id, bot_chats in by_bot.items()
|
|
for chat in bot_chats
|
|
]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
refreshed = 0
|
|
errors = 0
|
|
# Bucket results first, then fetch all rows in one IN-query instead of
|
|
# per-row ``session.get`` — otherwise a 50-chat fleet issues 50 extra
|
|
# SELECTs before commit.
|
|
successes: dict[int, dict] = {}
|
|
for chat_id, info, err in results:
|
|
if err is not None or info is None:
|
|
errors += 1
|
|
if err:
|
|
_LOGGER.debug("getChat failed for chat row %s: %s", chat_id, err)
|
|
continue
|
|
if chat_id is not None:
|
|
successes[chat_id] = info
|
|
async with AsyncSession(engine) as session:
|
|
if successes:
|
|
rows = (await session.exec(
|
|
select(TelegramChat).where(TelegramChat.id.in_(list(successes.keys())))
|
|
)).all()
|
|
for merged in rows:
|
|
info = successes.get(merged.id)
|
|
if not info:
|
|
continue
|
|
title = info.get("title") or (
|
|
(info.get("first_name", "") + " " + info.get("last_name", "")).strip()
|
|
)
|
|
changed = False
|
|
if title and merged.title != title:
|
|
merged.title = title
|
|
changed = True
|
|
new_username = info.get("username")
|
|
if new_username is not None and merged.username != new_username:
|
|
merged.username = new_username
|
|
changed = True
|
|
if changed:
|
|
session.add(merged)
|
|
refreshed += 1
|
|
await session.commit()
|
|
_LOGGER.info(
|
|
"Telegram chat title refresh: %s updated, %s errors", refreshed, errors
|
|
)
|
|
|
|
|
|
async def _cleanup_old_events() -> None:
|
|
"""Delete EventLog / WebhookPayloadLog / ActionExecution rows older than the
|
|
configured retention window. A retention of 0 disables the job.
|
|
"""
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from sqlmodel import delete
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from ..config import settings
|
|
from ..database.engine import get_engine
|
|
from ..database.models import ActionExecution, EventLog, WebhookPayloadLog
|
|
|
|
days = settings.event_log_retention_days
|
|
if days <= 0:
|
|
_LOGGER.debug("Event log retention disabled (days=0); skipping cleanup")
|
|
return
|
|
|
|
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
|
|
engine = get_engine()
|
|
async with AsyncSession(engine) as session:
|
|
await session.exec(delete(EventLog).where(EventLog.created_at < cutoff))
|
|
await session.exec(
|
|
delete(WebhookPayloadLog).where(WebhookPayloadLog.created_at < cutoff)
|
|
)
|
|
await session.exec(
|
|
delete(ActionExecution).where(ActionExecution.started_at < cutoff)
|
|
)
|
|
await session.commit()
|
|
_LOGGER.info(
|
|
"Cleaned event_log / webhook_payload_log / action_execution older than %s",
|
|
cutoff.date(),
|
|
)
|
|
|
|
|
|
async def _load_tracker_jobs() -> None:
|
|
"""Load enabled trackers and schedule polling jobs."""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
from ..database.engine import get_engine
|
|
from ..database.models import NotificationTracker, ServiceProvider as ServiceProviderModel
|
|
|
|
engine = get_engine()
|
|
scheduler = get_scheduler()
|
|
|
|
async with AsyncSession(engine) as session:
|
|
result = await session.exec(select(NotificationTracker).where(NotificationTracker.enabled == True))
|
|
trackers = result.all()
|
|
|
|
# Batch-load provider types for scheduler detection
|
|
unique_provider_ids = list({t.provider_id for t in trackers})
|
|
provider_types: dict[int, str] = {}
|
|
if unique_provider_ids:
|
|
provider_result = await session.exec(
|
|
select(ServiceProviderModel).where(
|
|
ServiceProviderModel.id.in_(unique_provider_ids)
|
|
)
|
|
)
|
|
provider_types = {p.id: p.type for p in provider_result.all()}
|
|
|
|
tz = await _load_app_timezone()
|
|
|
|
from notify_bridge_core.providers.capabilities import get_capabilities
|
|
|
|
for tracker in trackers:
|
|
job_id = f"tracker_{tracker.id}"
|
|
if scheduler.get_job(job_id):
|
|
continue
|
|
|
|
ptype = provider_types.get(tracker.provider_id, "")
|
|
filters = tracker.filters or {}
|
|
|
|
# Webhook-based providers receive events via inbound HTTP — there is
|
|
# nothing to poll. Scheduling an interval job for them just wakes up
|
|
# check_tracker every scan_interval seconds to immediately return,
|
|
# wasting CPU and DB queries for no work.
|
|
caps = get_capabilities(ptype) if ptype else None
|
|
if caps and caps.webhook_based:
|
|
_LOGGER.debug(
|
|
"Skipping interval scheduling for webhook tracker %d (%s, type=%s)",
|
|
tracker.id, tracker.name, ptype,
|
|
)
|
|
continue
|
|
|
|
# Scheduler providers can use cron triggers
|
|
if ptype == "scheduler" and filters.get("schedule_type") == "cron":
|
|
cron_expr = filters.get("cron_expression", "")
|
|
if cron_expr:
|
|
try:
|
|
_add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name, tz)
|
|
continue
|
|
except Exception as e:
|
|
_LOGGER.error(
|
|
"Invalid cron expression for tracker %d (%s): %s — falling back to interval",
|
|
tracker.id, tracker.name, e,
|
|
)
|
|
|
|
jitter = _compute_jitter(tracker.scan_interval)
|
|
scheduler.add_job(
|
|
_poll_tracker,
|
|
"interval",
|
|
seconds=tracker.scan_interval,
|
|
jitter=jitter or None,
|
|
id=job_id,
|
|
args=[tracker.id],
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
set_adaptive_max_skip(tracker.id, tracker.adaptive_max_skip)
|
|
_LOGGER.info(
|
|
"Scheduled tracker %d (%s) every %ds (jitter ±%ds, adaptive_max_skip=%s)",
|
|
tracker.id, tracker.name, tracker.scan_interval, jitter,
|
|
tracker.adaptive_max_skip,
|
|
)
|
|
|
|
|
|
def _add_cron_job(
|
|
scheduler: AsyncIOScheduler,
|
|
job_id: str,
|
|
tracker_id: int,
|
|
cron_expression: str,
|
|
tracker_name: str,
|
|
tz: ZoneInfo,
|
|
) -> None:
|
|
"""Add a cron-triggered job for a scheduler-type tracker.
|
|
|
|
``tz`` is the user-configured app timezone; without it APScheduler
|
|
interprets the crontab in the host's local timezone, which surfaces as
|
|
events firing at the "wrong" wall-clock time for operators in a non-UTC
|
|
zone (see the companion fix in ``update_settings`` which reschedules on
|
|
timezone changes).
|
|
"""
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
trigger = CronTrigger.from_crontab(cron_expression, timezone=tz)
|
|
scheduler.add_job(
|
|
_poll_tracker,
|
|
trigger,
|
|
id=job_id,
|
|
args=[tracker_id],
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled tracker %d (%s) with cron: %s [tz=%s]",
|
|
tracker_id, tracker_name, cron_expression, tz.key,
|
|
)
|
|
|
|
|
|
async def _is_webhook_tracker(tracker_id: int) -> bool:
|
|
"""Return True iff the tracker's provider type is webhook-based.
|
|
|
|
Looks up provider type once via the capabilities registry. Used by
|
|
``schedule_tracker`` to short-circuit interval scheduling.
|
|
"""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
from notify_bridge_core.providers.capabilities import get_capabilities
|
|
from ..database.engine import get_engine
|
|
from ..database.models import NotificationTracker, ServiceProvider as ServiceProviderModel
|
|
|
|
async with AsyncSession(get_engine()) as session:
|
|
tracker = await session.get(NotificationTracker, tracker_id)
|
|
if tracker is None:
|
|
return False
|
|
provider = await session.get(ServiceProviderModel, tracker.provider_id)
|
|
if provider is None:
|
|
return False
|
|
caps = get_capabilities(provider.type)
|
|
return bool(caps and caps.webhook_based)
|
|
|
|
|
|
async def schedule_tracker(
|
|
tracker_id: int,
|
|
interval: int,
|
|
cron_expression: str | None = None,
|
|
adaptive_max_skip: int | None = None,
|
|
) -> None:
|
|
"""Add or update a scheduler job for a tracker.
|
|
|
|
``adaptive_max_skip`` mirrors the DB column and is registered with the
|
|
adaptive module-state so tick-time skip decisions don't re-query the DB.
|
|
Pass ``None`` or ``0`` to disable back-off for the tracker.
|
|
|
|
Webhook-based providers receive events via inbound HTTP and have nothing
|
|
to poll, so this no-ops for them — preventing scan_interval from creating
|
|
useless wakeups via the API create/update path.
|
|
"""
|
|
scheduler = get_scheduler()
|
|
job_id = f"tracker_{tracker_id}"
|
|
|
|
# A reschedule typically follows a config edit or enable/disable flip —
|
|
# drop adaptive back-off so the first tick after the change runs promptly.
|
|
reset_adaptive_state(tracker_id)
|
|
set_adaptive_max_skip(tracker_id, adaptive_max_skip)
|
|
|
|
# Remove existing job first to allow trigger type changes
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
|
|
# Webhook-based providers don't poll — skip job creation entirely.
|
|
if await _is_webhook_tracker(tracker_id):
|
|
_LOGGER.debug(
|
|
"Skipping interval scheduling for webhook tracker %d", tracker_id,
|
|
)
|
|
return
|
|
|
|
if cron_expression:
|
|
try:
|
|
tz = await _load_app_timezone()
|
|
_add_cron_job(scheduler, job_id, tracker_id, cron_expression, f"tracker-{tracker_id}", tz)
|
|
return
|
|
except Exception as e:
|
|
_LOGGER.error("Invalid cron for tracker %d: %s — using interval", tracker_id, e)
|
|
|
|
jitter = _compute_jitter(interval)
|
|
scheduler.add_job(
|
|
_poll_tracker,
|
|
"interval",
|
|
seconds=interval,
|
|
jitter=jitter or None,
|
|
id=job_id,
|
|
args=[tracker_id],
|
|
replace_existing=True,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled tracker %d every %ds (jitter ±%ds, adaptive_max_skip=%s)",
|
|
tracker_id, interval, jitter, adaptive_max_skip,
|
|
)
|
|
|
|
|
|
async def unschedule_tracker(tracker_id: int) -> None:
|
|
"""Remove a scheduler job for a tracker."""
|
|
scheduler = get_scheduler()
|
|
job_id = f"tracker_{tracker_id}"
|
|
reset_adaptive_state(tracker_id)
|
|
_adaptive_max_skip.pop(tracker_id, None)
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
_LOGGER.info("Unscheduled tracker %d", tracker_id)
|
|
|
|
|
|
def _adaptive_should_skip(tracker_id: int) -> bool:
|
|
"""Return True when the adaptive heuristic says to skip this tick.
|
|
|
|
Short-circuits to False for trackers without a registered cap (adaptive
|
|
off). Otherwise: if we're in 1-in-K mode, skip (K-1) ticks between each
|
|
real poll.
|
|
"""
|
|
if tracker_id not in _adaptive_max_skip:
|
|
return False
|
|
state = _adaptive_state.get(tracker_id)
|
|
if not state:
|
|
return False
|
|
skip_every = state.get("skip_every", 1)
|
|
if skip_every <= 1:
|
|
return False
|
|
state["tick_counter"] = state.get("tick_counter", 0) + 1
|
|
# Fire on ticks where counter % skip_every == 0; skip the rest.
|
|
return (state["tick_counter"] % skip_every) != 0
|
|
|
|
|
|
def _adaptive_update(tracker_id: int, events_detected: int) -> None:
|
|
"""Update the adaptive counter after a real tick ran.
|
|
|
|
No-op when the tracker has adaptive polling disabled — otherwise we'd
|
|
build up empty counters for trackers that will never use them.
|
|
"""
|
|
cap = _adaptive_max_skip.get(tracker_id)
|
|
if not cap or cap <= 1:
|
|
return
|
|
state = _adaptive_state.setdefault(
|
|
tracker_id, {"empty_count": 0, "skip_every": 1, "tick_counter": 0}
|
|
)
|
|
if events_detected > 0:
|
|
if state["skip_every"] > 1:
|
|
_LOGGER.info(
|
|
"Adaptive polling: tracker %d saw activity, restoring base rate",
|
|
tracker_id,
|
|
)
|
|
state["empty_count"] = 0
|
|
state["skip_every"] = 1
|
|
state["tick_counter"] = 0
|
|
return
|
|
|
|
state["empty_count"] = state.get("empty_count", 0) + 1
|
|
target_quarter = min(cap, 4)
|
|
if (
|
|
state["empty_count"] >= _ADAPTIVE_QUARTER_THRESHOLD
|
|
and state["skip_every"] < target_quarter
|
|
):
|
|
state["skip_every"] = target_quarter
|
|
_LOGGER.info(
|
|
"Adaptive polling: tracker %d idle for %d ticks, skipping %d of %d",
|
|
tracker_id, state["empty_count"],
|
|
target_quarter - 1, target_quarter,
|
|
)
|
|
elif (
|
|
state["empty_count"] >= _ADAPTIVE_HALVE_THRESHOLD
|
|
and state["skip_every"] < min(cap, 2)
|
|
):
|
|
state["skip_every"] = min(cap, 2)
|
|
_LOGGER.info(
|
|
"Adaptive polling: tracker %d idle for %d ticks, skipping every other",
|
|
tracker_id, state["empty_count"],
|
|
)
|
|
|
|
|
|
def reset_adaptive_state(tracker_id: int) -> None:
|
|
"""Drop cached adaptive counters for a tracker.
|
|
|
|
Used by API callers that make changes requiring the tracker to run
|
|
promptly on the next scheduled tick (enable/disable, config edits,
|
|
manual "check now" actions). Does NOT clear the configured cap — use
|
|
``set_adaptive_max_skip(..., None)`` for that.
|
|
"""
|
|
_adaptive_state.pop(tracker_id, None)
|
|
|
|
|
|
async def _poll_tracker(tracker_id: int) -> None:
|
|
"""Poll a tracker for changes."""
|
|
from .watcher import check_tracker
|
|
|
|
if _adaptive_should_skip(tracker_id):
|
|
return
|
|
|
|
try:
|
|
result = await check_tracker(tracker_id)
|
|
except Exception as e:
|
|
_LOGGER.error("Error polling tracker %d: %s", tracker_id, e)
|
|
return
|
|
|
|
# Treat the "error" / "skipped" statuses as inconclusive — don't let
|
|
# a transient upstream failure trick the heuristic into backing off.
|
|
if isinstance(result, dict) and result.get("status") == "ok":
|
|
_adaptive_update(tracker_id, int(result.get("events_detected", 0) or 0))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Action scheduling
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def _load_action_jobs() -> None:
|
|
"""Load enabled actions and schedule execution jobs."""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
from ..database.engine import get_engine
|
|
from ..database.models import Action
|
|
|
|
engine = get_engine()
|
|
scheduler = get_scheduler()
|
|
|
|
async with AsyncSession(engine) as session:
|
|
result = await session.exec(
|
|
select(Action).where(Action.enabled == True) # noqa: E712
|
|
)
|
|
actions = result.all()
|
|
|
|
tz = await _load_app_timezone()
|
|
|
|
for action in actions:
|
|
job_id = f"action_{action.id}"
|
|
if scheduler.get_job(job_id):
|
|
continue
|
|
|
|
if action.schedule_type == "cron" and action.schedule_cron:
|
|
try:
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
trigger = CronTrigger.from_crontab(action.schedule_cron, timezone=tz)
|
|
scheduler.add_job(
|
|
_run_action,
|
|
trigger,
|
|
id=job_id,
|
|
args=[action.id],
|
|
replace_existing=True,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled action %d (%s) with cron: %s [tz=%s]",
|
|
action.id, action.name, action.schedule_cron, tz.key,
|
|
)
|
|
continue
|
|
except Exception as e:
|
|
_LOGGER.error(
|
|
"Invalid cron for action %d (%s): %s — falling back to interval",
|
|
action.id, action.name, e,
|
|
)
|
|
|
|
scheduler.add_job(
|
|
_run_action,
|
|
"interval",
|
|
seconds=action.schedule_interval,
|
|
id=job_id,
|
|
args=[action.id],
|
|
replace_existing=True,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled action %d (%s) every %ds",
|
|
action.id, action.name, action.schedule_interval,
|
|
)
|
|
|
|
|
|
async def schedule_action(
|
|
action_id: int,
|
|
schedule_type: str = "interval",
|
|
interval: int = 3600,
|
|
cron_expression: str = "",
|
|
) -> None:
|
|
"""Add or update a scheduler job for an action."""
|
|
scheduler = get_scheduler()
|
|
job_id = f"action_{action_id}"
|
|
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
|
|
if schedule_type == "cron" and cron_expression:
|
|
try:
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
tz = await _load_app_timezone()
|
|
trigger = CronTrigger.from_crontab(cron_expression, timezone=tz)
|
|
scheduler.add_job(
|
|
_run_action,
|
|
trigger,
|
|
id=job_id,
|
|
args=[action_id],
|
|
replace_existing=True,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled action %d with cron: %s [tz=%s]",
|
|
action_id, cron_expression, tz.key,
|
|
)
|
|
return
|
|
except Exception as e:
|
|
_LOGGER.error("Invalid cron for action %d: %s — using interval", action_id, e)
|
|
|
|
scheduler.add_job(
|
|
_run_action,
|
|
"interval",
|
|
seconds=interval,
|
|
id=job_id,
|
|
args=[action_id],
|
|
replace_existing=True,
|
|
)
|
|
_LOGGER.info("Scheduled action %d every %ds", action_id, interval)
|
|
|
|
|
|
async def unschedule_action(action_id: int) -> None:
|
|
"""Remove a scheduler job for an action."""
|
|
scheduler = get_scheduler()
|
|
job_id = f"action_{action_id}"
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
_LOGGER.info("Unscheduled action %d", action_id)
|
|
|
|
|
|
async def reschedule_cron_jobs_for_timezone_change() -> None:
|
|
"""Re-add every cron-triggered tracker/action job under the new app timezone.
|
|
|
|
Called by the admin settings endpoint after the ``timezone`` AppSetting is
|
|
updated. APScheduler's ``CronTrigger`` freezes its timezone at construction
|
|
time, so a timezone change has no effect on jobs already in the scheduler
|
|
— we have to rebuild those jobs. Interval-triggered jobs are tz-agnostic
|
|
and are left alone.
|
|
"""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from ..database.engine import get_engine
|
|
from ..database.models import Action, NotificationTracker, ServiceProvider as ServiceProviderModel
|
|
|
|
engine = get_engine()
|
|
scheduler = get_scheduler()
|
|
tz = await _load_app_timezone()
|
|
rescheduled = 0
|
|
|
|
async with AsyncSession(engine) as session:
|
|
# Trackers with cron scheduling (scheduler provider + schedule_type=cron).
|
|
trackers = (await session.exec(
|
|
select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712
|
|
)).all()
|
|
provider_ids = list({t.provider_id for t in trackers})
|
|
provider_types: dict[int, str] = {}
|
|
if provider_ids:
|
|
rows = await session.exec(
|
|
select(ServiceProviderModel).where(ServiceProviderModel.id.in_(provider_ids))
|
|
)
|
|
provider_types = {p.id: p.type for p in rows.all()}
|
|
|
|
for tracker in trackers:
|
|
if provider_types.get(tracker.provider_id) != "scheduler":
|
|
continue
|
|
filters = tracker.filters or {}
|
|
if filters.get("schedule_type") != "cron":
|
|
continue
|
|
cron_expr = filters.get("cron_expression", "")
|
|
if not cron_expr:
|
|
continue
|
|
job_id = f"tracker_{tracker.id}"
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
try:
|
|
_add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name, tz)
|
|
rescheduled += 1
|
|
except Exception as e: # noqa: BLE001
|
|
_LOGGER.error(
|
|
"Failed to re-apply cron for tracker %d on tz change: %s",
|
|
tracker.id, e,
|
|
)
|
|
|
|
# Actions with cron schedules.
|
|
actions = (await session.exec(
|
|
select(Action).where(Action.enabled == True) # noqa: E712
|
|
)).all()
|
|
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
for action in actions:
|
|
if action.schedule_type != "cron" or not action.schedule_cron:
|
|
continue
|
|
job_id = f"action_{action.id}"
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
try:
|
|
scheduler.add_job(
|
|
_run_action,
|
|
CronTrigger.from_crontab(action.schedule_cron, timezone=tz),
|
|
id=job_id,
|
|
args=[action.id],
|
|
replace_existing=True,
|
|
)
|
|
rescheduled += 1
|
|
except Exception as e: # noqa: BLE001
|
|
_LOGGER.error(
|
|
"Failed to re-apply cron for action %d on tz change: %s",
|
|
action.id, e,
|
|
)
|
|
|
|
_LOGGER.info(
|
|
"Rescheduled %d cron job(s) for new app timezone %s", rescheduled, tz.key,
|
|
)
|
|
|
|
# Immich scheduled/periodic/memory jobs are also CronTrigger-based and
|
|
# carry the same frozen-tz problem — rebuild them under the new tz.
|
|
await reschedule_immich_dispatch_jobs()
|
|
|
|
|
|
async def _run_action(action_id: int) -> None:
|
|
"""Run an action (called by APScheduler)."""
|
|
from .action_runner import run_action
|
|
try:
|
|
await run_action(action_id, trigger="scheduled")
|
|
except Exception as e:
|
|
_LOGGER.error("Error running action %d: %s", action_id, e)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Immich scheduled / periodic / memory dispatch (cron-fired)
|
|
#
|
|
# These three slots fire on wall-clock schedules taken from the tracker's
|
|
# default ``TrackingConfig`` (``scheduled_times``, ``periodic_times``,
|
|
# ``memory_times`` — comma-separated ``HH:MM`` strings) interpreted in the
|
|
# app-level IANA timezone. The dispatch flow lives in
|
|
# ``services.scheduled_dispatch``; this section just owns scheduling.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_IMMICH_DISPATCH_KINDS = ("scheduled", "periodic", "memory")
|
|
_IMMICH_DISPATCH_PREFIX = "immich_dispatch_"
|
|
|
|
|
|
async def _run_immich_dispatch(tracker_id: int, kind: str) -> None:
|
|
"""APScheduler entry point — wraps the dispatch helper to swallow errors."""
|
|
from .scheduled_dispatch import dispatch_scheduled_for_tracker
|
|
try:
|
|
await dispatch_scheduled_for_tracker(tracker_id, kind) # type: ignore[arg-type]
|
|
except Exception as err: # noqa: BLE001
|
|
_LOGGER.error(
|
|
"Immich %s dispatch for tracker %d failed: %s", kind, tracker_id, err,
|
|
)
|
|
|
|
|
|
async def _load_immich_dispatch_jobs() -> None:
|
|
"""Schedule cron jobs for every (tracker, kind, time) where the kind is on.
|
|
|
|
Reads each enabled Immich tracker's *default* tracking config — per-link
|
|
overrides only gate dispatch (handled in ``scheduled_dispatch``), they do
|
|
not influence the fire schedule.
|
|
"""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
|
|
from ..database.engine import get_engine
|
|
from ..database.models import (
|
|
NotificationTracker,
|
|
ServiceProvider as ServiceProviderModel,
|
|
TrackingConfig,
|
|
)
|
|
|
|
engine = get_engine()
|
|
scheduler = get_scheduler()
|
|
tz = await _load_app_timezone()
|
|
|
|
async with AsyncSession(engine) as session:
|
|
trackers = (await session.exec(
|
|
select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712
|
|
)).all()
|
|
if not trackers:
|
|
return
|
|
|
|
provider_ids = list({t.provider_id for t in trackers})
|
|
provider_types: dict[int, str] = {}
|
|
if provider_ids:
|
|
rows = await session.exec(
|
|
select(ServiceProviderModel).where(
|
|
ServiceProviderModel.id.in_(provider_ids)
|
|
)
|
|
)
|
|
provider_types = {p.id: p.type for p in rows.all()}
|
|
|
|
tc_ids = list({
|
|
t.default_tracking_config_id for t in trackers
|
|
if t.default_tracking_config_id
|
|
})
|
|
tc_map: dict[int, TrackingConfig] = {}
|
|
if tc_ids:
|
|
rows = await session.exec(
|
|
select(TrackingConfig).where(TrackingConfig.id.in_(tc_ids))
|
|
)
|
|
tc_map = {tc.id: tc for tc in rows.all()}
|
|
|
|
scheduled = 0
|
|
for tracker in trackers:
|
|
if provider_types.get(tracker.provider_id) != "immich":
|
|
continue
|
|
tc = tc_map.get(tracker.default_tracking_config_id) if tracker.default_tracking_config_id else None
|
|
if tc is None:
|
|
continue
|
|
|
|
for kind in _IMMICH_DISPATCH_KINDS:
|
|
if not getattr(tc, f"{kind}_enabled", False):
|
|
continue
|
|
times_raw = getattr(tc, f"{kind}_times", "") or ""
|
|
parsed = parse_hhmm_list(times_raw)
|
|
# Observability for misconfigured/legacy data: warn when some tokens
|
|
# were unparseable (the lenient parser drops them silently) and when
|
|
# an enabled slot resolves to zero fire times (it will never fire).
|
|
raw_tokens = [p for p in times_raw.split(",") if p.strip()]
|
|
if len(parsed) < len(raw_tokens):
|
|
_LOGGER.warning(
|
|
"Tracker %d %s: dropped %d unparseable time(s) from %r",
|
|
tracker.id, kind, len(raw_tokens) - len(parsed), times_raw,
|
|
)
|
|
if not parsed:
|
|
_LOGGER.warning(
|
|
"Tracker %d has %s enabled but no valid fire times (%r); "
|
|
"slot will not fire",
|
|
tracker.id, kind, times_raw,
|
|
)
|
|
continue
|
|
for hour, minute in parsed:
|
|
job_id = f"{_IMMICH_DISPATCH_PREFIX}{kind}_{tracker.id}_{hour:02d}{minute:02d}"
|
|
scheduler.add_job(
|
|
_run_immich_dispatch,
|
|
CronTrigger(hour=hour, minute=minute, timezone=tz),
|
|
id=job_id,
|
|
args=[tracker.id, kind],
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
scheduled += 1
|
|
_LOGGER.info(
|
|
"Scheduled Immich %s for tracker %d at %02d:%02d [tz=%s]",
|
|
kind, tracker.id, hour, minute, tz.key,
|
|
)
|
|
|
|
if scheduled:
|
|
_LOGGER.info(
|
|
"Loaded %d Immich scheduled/periodic/memory job(s) [tz=%s]",
|
|
scheduled, tz.key,
|
|
)
|
|
|
|
|
|
async def reschedule_immich_dispatch_jobs() -> None:
|
|
"""Drop and rebuild all Immich scheduled/periodic/memory jobs.
|
|
|
|
Cheap to call on every relevant mutation — a typical install has only a
|
|
handful of trackers. Called from the tracker, link, and tracking-config
|
|
CRUD endpoints, and from ``reschedule_cron_jobs_for_timezone_change``.
|
|
"""
|
|
scheduler = get_scheduler()
|
|
for job in list(scheduler.get_jobs()):
|
|
if job.id.startswith(_IMMICH_DISPATCH_PREFIX):
|
|
scheduler.remove_job(job.id)
|
|
await _load_immich_dispatch_jobs()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Scheduled backup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_BACKUP_JOB_ID = "scheduled_backup"
|
|
|
|
|
|
async def _load_backup_job() -> None:
|
|
"""Load scheduled backup job from settings if enabled."""
|
|
from sqlmodel import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession as _AS
|
|
from ..database.engine import get_engine
|
|
from ..database.models import AppSetting
|
|
|
|
engine = get_engine()
|
|
async with _AS(engine) as session:
|
|
enabled_row = await session.get(AppSetting, "backup_scheduled_enabled")
|
|
interval_row = await session.get(AppSetting, "backup_scheduled_interval_hours")
|
|
|
|
enabled = enabled_row and enabled_row.value == "true"
|
|
if not enabled:
|
|
return
|
|
|
|
interval_hours = int(interval_row.value) if interval_row and interval_row.value else 24
|
|
scheduler = get_scheduler()
|
|
scheduler.add_job(
|
|
_run_scheduled_backup,
|
|
"interval",
|
|
hours=interval_hours,
|
|
id=_BACKUP_JOB_ID,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info("Scheduled backup every %dh", interval_hours)
|
|
|
|
|
|
async def schedule_backup(interval_hours: int = 24) -> None:
|
|
"""Add or update the scheduled backup job."""
|
|
scheduler = get_scheduler()
|
|
if scheduler.get_job(_BACKUP_JOB_ID):
|
|
scheduler.remove_job(_BACKUP_JOB_ID)
|
|
|
|
scheduler.add_job(
|
|
_run_scheduled_backup,
|
|
"interval",
|
|
hours=interval_hours,
|
|
id=_BACKUP_JOB_ID,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info("Scheduled backup every %dh", interval_hours)
|
|
|
|
|
|
async def unschedule_backup() -> None:
|
|
"""Remove the scheduled backup job."""
|
|
scheduler = get_scheduler()
|
|
if scheduler.get_job(_BACKUP_JOB_ID):
|
|
scheduler.remove_job(_BACKUP_JOB_ID)
|
|
_LOGGER.info("Unscheduled backup job")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Deferred-dispatch drain
|
|
# ---------------------------------------------------------------------------
|
|
#
|
|
# When ``defer_event`` enqueues a quiet-hours notification, the calling site
|
|
# asks us to add a one-shot ``date`` job at ``quiet_hours_end_at``. We key the
|
|
# job id by the minute-rounded end time so multiple defers that share the same
|
|
# window-end share a single drain job (idempotent via ``replace_existing``).
|
|
#
|
|
# At fire time the job runs ``drain_deferred_due`` which scans all pending
|
|
# rows and dispatches whatever is ready.
|
|
#
|
|
# A periodic catch-up scan runs every ``_DRAIN_CATCHUP_INTERVAL_SECONDS`` as
|
|
# the safety net for failure modes the one-shot job can't cover:
|
|
# * APScheduler's misfire grace exceeded (event loop blocked past fire_at;
|
|
# the date job is silently discarded by the scheduler)
|
|
# * Process killed between the deferred-row DB commit and the
|
|
# ``schedule_deferred_drain`` call — row exists, job doesn't
|
|
# * Clock drift / DST seam edge cases
|
|
|
|
_DEFERRED_DRAIN_PREFIX = "deferred_drain_"
|
|
_DEFERRED_DRAIN_CATCHUP_JOB = "deferred_drain_catchup"
|
|
# Generous so a temporarily-blocked event loop doesn't make the scheduler
|
|
# discard our drain job. Once discarded the deferred rows would wait for the
|
|
# next process restart or the catch-up scan below — survivable but visibly
|
|
# late from the user's perspective.
|
|
_DEFERRED_DRAIN_MISFIRE_GRACE_SECONDS = 3600
|
|
# 5 min trade-off between "promptness of late delivery" and "extra DB churn".
|
|
# The scan is a single indexed lookup on (status, fire_at).
|
|
_DRAIN_CATCHUP_INTERVAL_SECONDS = 300
|
|
|
|
|
|
def _drain_job_id_for(fire_at_utc: datetime) -> str:
|
|
# Include seconds — two trackers with quiet windows that end at the same
|
|
# minute but different seconds (e.g. user-set 06:00:00 vs 06:00:30) would
|
|
# otherwise collide on a single APScheduler job id, and ``replace_existing``
|
|
# would silently drop the second one.
|
|
return f"{_DEFERRED_DRAIN_PREFIX}{fire_at_utc.strftime('%Y%m%d%H%M%S')}"
|
|
|
|
|
|
def schedule_deferred_drain(fire_at_utc: datetime) -> None:
|
|
"""Add an idempotent one-shot drain job for ``fire_at_utc``.
|
|
|
|
Past times schedule a near-immediate firing (now+1s) — the drain query
|
|
handles ``fire_at <= now`` regardless of which job fired, so a near-miss
|
|
still picks up the work.
|
|
"""
|
|
from datetime import datetime, timezone
|
|
|
|
if fire_at_utc.tzinfo is None:
|
|
fire_at_utc = fire_at_utc.replace(tzinfo=timezone.utc)
|
|
|
|
scheduler = get_scheduler()
|
|
job_id = _drain_job_id_for(fire_at_utc)
|
|
run_at = fire_at_utc
|
|
if run_at <= datetime.now(timezone.utc):
|
|
from datetime import timedelta
|
|
run_at = datetime.now(timezone.utc) + timedelta(seconds=1)
|
|
|
|
scheduler.add_job(
|
|
_run_deferred_drain,
|
|
"date",
|
|
run_date=run_at,
|
|
id=job_id,
|
|
args=[fire_at_utc.isoformat()],
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
# Override the global 5-min grace — see module-level comment.
|
|
misfire_grace_time=_DEFERRED_DRAIN_MISFIRE_GRACE_SECONDS,
|
|
)
|
|
_LOGGER.debug("Scheduled deferred drain %s (fire_at=%s)", job_id, fire_at_utc.isoformat())
|
|
|
|
|
|
def _schedule_drain_catchup() -> None:
|
|
"""Install the periodic catch-up scan. See module comment."""
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
|
|
scheduler = get_scheduler()
|
|
if scheduler.get_job(_DEFERRED_DRAIN_CATCHUP_JOB):
|
|
return
|
|
scheduler.add_job(
|
|
_run_deferred_drain_catchup,
|
|
IntervalTrigger(seconds=_DRAIN_CATCHUP_INTERVAL_SECONDS),
|
|
id=_DEFERRED_DRAIN_CATCHUP_JOB,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
coalesce=True,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled deferred-dispatch catch-up scan every %ds",
|
|
_DRAIN_CATCHUP_INTERVAL_SECONDS,
|
|
)
|
|
|
|
|
|
async def _run_deferred_drain(fire_at_iso: str) -> None:
|
|
"""APScheduler entry point — log the original fire_at then drain due rows.
|
|
|
|
The ``fire_at_iso`` arg is only used for logging; the drain itself picks
|
|
up every pending row whose ``fire_at`` has passed.
|
|
"""
|
|
from .deferred_dispatch import drain_deferred_due
|
|
try:
|
|
stats = await drain_deferred_due()
|
|
_LOGGER.info("Deferred drain (fire_at=%s) stats: %s", fire_at_iso, stats)
|
|
except Exception as err: # noqa: BLE001
|
|
_LOGGER.exception("Deferred drain (fire_at=%s) failed: %s", fire_at_iso, err)
|
|
|
|
|
|
async def _run_deferred_drain_catchup() -> None:
|
|
"""Periodic safety-net drain — see module comment.
|
|
|
|
Distinct from the per-fire-at job only in cadence and log line; calls the
|
|
same ``drain_deferred_due`` which is a no-op when nothing is due.
|
|
"""
|
|
from .deferred_dispatch import drain_deferred_due
|
|
try:
|
|
stats = await drain_deferred_due()
|
|
# Quiet at debug level when nothing happened — every 5 min is too
|
|
# noisy at info on an idle system.
|
|
if stats.get("fired") or stats.get("dropped") or stats.get("errors"):
|
|
_LOGGER.info("Deferred catch-up stats: %s", stats)
|
|
else:
|
|
_LOGGER.debug("Deferred catch-up stats: %s", stats)
|
|
except Exception as err: # noqa: BLE001
|
|
_LOGGER.exception("Deferred catch-up drain failed: %s", err)
|
|
|
|
|
|
async def _run_scheduled_backup() -> None:
|
|
"""Run a scheduled backup (called by APScheduler)."""
|
|
from sqlmodel.ext.asyncio.session import AsyncSession as _AS
|
|
from ..database.engine import get_engine
|
|
from ..database.models import AppSetting, User
|
|
from ..config import settings as app_config
|
|
from .backup_schema import SecretsMode
|
|
from .backup_service import export_backup_to_file, cleanup_old_backups
|
|
|
|
try:
|
|
engine = get_engine()
|
|
async with _AS(engine) as session:
|
|
# Read settings
|
|
secrets_row = await session.get(AppSetting, "backup_secrets_mode")
|
|
retention_row = await session.get(AppSetting, "backup_retention_count")
|
|
|
|
secrets_mode = SecretsMode(secrets_row.value) if secrets_row and secrets_row.value else SecretsMode.EXCLUDE
|
|
retention = int(retention_row.value) if retention_row and retention_row.value else 5
|
|
|
|
# Find admin user (first admin) for ownership context
|
|
from sqlmodel import select
|
|
admin_result = await session.exec(
|
|
select(User).where(User.role == "admin")
|
|
)
|
|
admin = admin_result.first()
|
|
if not admin:
|
|
_LOGGER.warning("No admin user found, skipping scheduled backup")
|
|
return
|
|
|
|
backup_dir = app_config.data_dir / "backups"
|
|
await export_backup_to_file(session, admin.id, backup_dir, secrets_mode)
|
|
|
|
# Cleanup outside the session
|
|
cleanup_old_backups(backup_dir, keep=retention)
|
|
|
|
except Exception as e:
|
|
_LOGGER.error("Scheduled backup failed: %s", e)
|
|
|
|
|
|
# --- Release-check probe -----------------------------------------------------
|
|
|
|
_RELEASE_CHECK_JOB_ID = "upstream_release_check"
|
|
_RELEASE_CHECK_ONESHOT_JOB_ID = "upstream_release_check_oneshot"
|
|
_RELEASE_CHECK_ONESHOT_DELAY_SECONDS = 30
|
|
|
|
|
|
async def _schedule_release_check() -> None:
|
|
"""Register the interval + one-shot release-check jobs.
|
|
|
|
Reads the configured interval from AppSettings at startup. Idempotent —
|
|
APScheduler de-dupes via ``replace_existing=True``.
|
|
"""
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
from datetime import datetime, timedelta, timezone
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from ..api.app_settings import get_setting
|
|
from ..database.engine import get_engine
|
|
from .release_check import parse_interval_hours, run_check
|
|
|
|
async with AsyncSession(get_engine()) as session:
|
|
raw = await get_setting(session, "release_check_interval_hours")
|
|
interval_hours = parse_interval_hours(raw)
|
|
|
|
scheduler = get_scheduler()
|
|
scheduler.add_job(
|
|
run_check,
|
|
IntervalTrigger(hours=interval_hours),
|
|
id=_RELEASE_CHECK_JOB_ID,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
# One-shot probe shortly after start so admins see a fresh status without
|
|
# waiting for the first interval tick. Mirrors the chat-title sync.
|
|
scheduler.add_job(
|
|
run_check,
|
|
"date",
|
|
run_date=datetime.now(timezone.utc) + timedelta(seconds=_RELEASE_CHECK_ONESHOT_DELAY_SECONDS),
|
|
id=_RELEASE_CHECK_ONESHOT_JOB_ID,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
)
|
|
_LOGGER.info("Scheduled release-check every %sh (one-shot in %ss)",
|
|
interval_hours, _RELEASE_CHECK_ONESHOT_DELAY_SECONDS)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bridge self-monitoring — deferred-backlog scan
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_BRIDGE_SELF_BACKLOG_JOB_ID = "bridge_self_deferred_backlog_scan"
|
|
# 5 min trade-off between "operator notices the backlog quickly" and "extra
|
|
# DB churn on a quiet system". The scan is one indexed GROUP BY query.
|
|
_BRIDGE_SELF_BACKLOG_INTERVAL_SECONDS = 300
|
|
|
|
|
|
def _schedule_bridge_self_backlog_scan() -> None:
|
|
"""Install the periodic deferred-backlog scan for bridge_self."""
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
|
|
scheduler = get_scheduler()
|
|
if scheduler.get_job(_BRIDGE_SELF_BACKLOG_JOB_ID):
|
|
return
|
|
scheduler.add_job(
|
|
_run_bridge_self_backlog_scan,
|
|
IntervalTrigger(seconds=_BRIDGE_SELF_BACKLOG_INTERVAL_SECONDS),
|
|
id=_BRIDGE_SELF_BACKLOG_JOB_ID,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
coalesce=True,
|
|
)
|
|
_LOGGER.info(
|
|
"Scheduled bridge_self deferred-backlog scan every %ds",
|
|
_BRIDGE_SELF_BACKLOG_INTERVAL_SECONDS,
|
|
)
|
|
|
|
|
|
async def _run_bridge_self_backlog_scan() -> None:
|
|
"""APScheduler entry point — scan deferred backlog and emit if needed."""
|
|
from .bridge_self import check_deferred_backlog
|
|
try:
|
|
stats = await check_deferred_backlog()
|
|
if stats.get("crossings"):
|
|
_LOGGER.info("bridge_self backlog scan stats: %s", stats)
|
|
else:
|
|
_LOGGER.debug("bridge_self backlog scan stats: %s", stats)
|
|
except Exception as err: # noqa: BLE001
|
|
_LOGGER.exception("bridge_self backlog scan failed: %s", err)
|
|
|
|
|
|
async def reschedule_release_check() -> None:
|
|
"""Re-arm the release-check job after settings changed.
|
|
|
|
Called from the PUT /settings handler when the interval or provider config
|
|
changes. Removes the existing interval job, lets ``_schedule_release_check``
|
|
re-read the setting and rebuild it, and queues a fresh one-shot so the new
|
|
config takes effect within seconds rather than at the next interval tick.
|
|
"""
|
|
scheduler = get_scheduler()
|
|
if scheduler.get_job(_RELEASE_CHECK_JOB_ID):
|
|
scheduler.remove_job(_RELEASE_CHECK_JOB_ID)
|
|
if scheduler.get_job(_RELEASE_CHECK_ONESHOT_JOB_ID):
|
|
scheduler.remove_job(_RELEASE_CHECK_ONESHOT_JOB_ID)
|
|
await _schedule_release_check()
|