Files
notify-bridge/packages/server/src/notify_bridge_server/services/scheduler.py
T
alexei.dolgolyov 6b2211353d feat: person excludes for auto-organize rules, backup & restore system
Add person exclude criteria to Immich auto-organize — assets containing
excluded persons are filtered out after candidate gathering. Also adds
full backup/restore system with export, import, scheduled backups, and
retention management.
2026-04-02 14:13:42 +03:00

427 lines
14 KiB
Python

"""APScheduler-based polling scheduler for trackers and actions."""
from __future__ import annotations
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
_LOGGER = logging.getLogger(__name__)
_scheduler: AsyncIOScheduler | None = None
def get_scheduler() -> AsyncIOScheduler:
global _scheduler
if _scheduler is None:
_scheduler = AsyncIOScheduler()
return _scheduler
async def start_scheduler() -> None:
scheduler = get_scheduler()
if not scheduler.running:
scheduler.start()
_LOGGER.info("Scheduler started")
await _load_tracker_jobs()
await _load_action_jobs()
# Start Telegram bot polling for bots with active command listeners
from .telegram_poller import start_command_listener_polling
await start_command_listener_polling()
# Schedule daily cleanup of old event log entries
_schedule_event_cleanup()
# Start debounced command auto-sync scheduler
from .command_sync import start_sync_scheduler
start_sync_scheduler()
# Load scheduled backup job if enabled
await _load_backup_job()
def _schedule_event_cleanup() -> None:
"""Schedule a daily job to delete EventLog entries older than 90 days."""
from apscheduler.triggers.cron import CronTrigger
scheduler = get_scheduler()
job_id = "cleanup_old_events"
if scheduler.get_job(job_id):
return
scheduler.add_job(
_cleanup_old_events,
CronTrigger(hour=3, minute=0),
id=job_id,
replace_existing=True,
max_instances=1,
)
_LOGGER.info("Scheduled daily event log cleanup at 03:00 UTC")
async def _cleanup_old_events() -> None:
"""Delete EventLog entries older than 90 days."""
from datetime import datetime, timedelta, timezone
from sqlmodel import delete
from sqlmodel.ext.asyncio.session import AsyncSession
from ..database.engine import get_engine
from ..database.models import EventLog
cutoff = datetime.now(timezone.utc) - timedelta(days=90)
engine = get_engine()
async with AsyncSession(engine) as session:
await session.exec(delete(EventLog).where(EventLog.created_at < cutoff))
await session.commit()
_LOGGER.info("Cleaned up event log entries older than %s", cutoff.date())
async def _load_tracker_jobs() -> None:
"""Load enabled trackers and schedule polling jobs."""
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..database.engine import get_engine
from ..database.models import NotificationTracker, ServiceProvider as ServiceProviderModel
engine = get_engine()
scheduler = get_scheduler()
async with AsyncSession(engine) as session:
result = await session.exec(select(NotificationTracker).where(NotificationTracker.enabled == True))
trackers = result.all()
# Batch-load provider types for scheduler detection
unique_provider_ids = list({t.provider_id for t in trackers})
provider_types: dict[int, str] = {}
if unique_provider_ids:
provider_result = await session.exec(
select(ServiceProviderModel).where(
ServiceProviderModel.id.in_(unique_provider_ids)
)
)
provider_types = {p.id: p.type for p in provider_result.all()}
for tracker in trackers:
job_id = f"tracker_{tracker.id}"
if scheduler.get_job(job_id):
continue
ptype = provider_types.get(tracker.provider_id, "")
filters = tracker.filters or {}
# Scheduler providers can use cron triggers
if ptype == "scheduler" and filters.get("schedule_type") == "cron":
cron_expr = filters.get("cron_expression", "")
if cron_expr:
try:
_add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name)
continue
except Exception as e:
_LOGGER.error(
"Invalid cron expression for tracker %d (%s): %s — falling back to interval",
tracker.id, tracker.name, e,
)
scheduler.add_job(
_poll_tracker,
"interval",
seconds=tracker.scan_interval,
id=job_id,
args=[tracker.id],
replace_existing=True,
max_instances=1,
)
_LOGGER.info("Scheduled tracker %d (%s) every %ds", tracker.id, tracker.name, tracker.scan_interval)
def _add_cron_job(
scheduler: AsyncIOScheduler,
job_id: str,
tracker_id: int,
cron_expression: str,
tracker_name: str,
) -> None:
"""Add a cron-triggered job for a scheduler-type tracker."""
from apscheduler.triggers.cron import CronTrigger
trigger = CronTrigger.from_crontab(cron_expression)
scheduler.add_job(
_poll_tracker,
trigger,
id=job_id,
args=[tracker_id],
replace_existing=True,
max_instances=1,
)
_LOGGER.info("Scheduled tracker %d (%s) with cron: %s", tracker_id, tracker_name, cron_expression)
async def schedule_tracker(
tracker_id: int,
interval: int,
cron_expression: str | None = None,
) -> None:
"""Add or update a scheduler job for a tracker."""
scheduler = get_scheduler()
job_id = f"tracker_{tracker_id}"
# Remove existing job first to allow trigger type changes
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
if cron_expression:
try:
_add_cron_job(scheduler, job_id, tracker_id, cron_expression, f"tracker-{tracker_id}")
return
except Exception as e:
_LOGGER.error("Invalid cron for tracker %d: %s — using interval", tracker_id, e)
scheduler.add_job(
_poll_tracker,
"interval",
seconds=interval,
id=job_id,
args=[tracker_id],
replace_existing=True,
)
_LOGGER.info("Scheduled tracker %d every %ds", tracker_id, interval)
async def unschedule_tracker(tracker_id: int) -> None:
"""Remove a scheduler job for a tracker."""
scheduler = get_scheduler()
job_id = f"tracker_{tracker_id}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
_LOGGER.info("Unscheduled tracker %d", tracker_id)
async def _poll_tracker(tracker_id: int) -> None:
"""Poll a tracker for changes."""
from .watcher import check_tracker
try:
await check_tracker(tracker_id)
except Exception as e:
_LOGGER.error("Error polling tracker %d: %s", tracker_id, e)
# ---------------------------------------------------------------------------
# Action scheduling
# ---------------------------------------------------------------------------
async def _load_action_jobs() -> None:
"""Load enabled actions and schedule execution jobs."""
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..database.engine import get_engine
from ..database.models import Action
engine = get_engine()
scheduler = get_scheduler()
async with AsyncSession(engine) as session:
result = await session.exec(
select(Action).where(Action.enabled == True) # noqa: E712
)
actions = result.all()
for action in actions:
job_id = f"action_{action.id}"
if scheduler.get_job(job_id):
continue
if action.schedule_type == "cron" and action.schedule_cron:
try:
from apscheduler.triggers.cron import CronTrigger
trigger = CronTrigger.from_crontab(action.schedule_cron)
scheduler.add_job(
_run_action,
trigger,
id=job_id,
args=[action.id],
replace_existing=True,
)
_LOGGER.info(
"Scheduled action %d (%s) with cron: %s",
action.id, action.name, action.schedule_cron,
)
continue
except Exception as e:
_LOGGER.error(
"Invalid cron for action %d (%s): %s — falling back to interval",
action.id, action.name, e,
)
scheduler.add_job(
_run_action,
"interval",
seconds=action.schedule_interval,
id=job_id,
args=[action.id],
replace_existing=True,
)
_LOGGER.info(
"Scheduled action %d (%s) every %ds",
action.id, action.name, action.schedule_interval,
)
async def schedule_action(
action_id: int,
schedule_type: str = "interval",
interval: int = 3600,
cron_expression: str = "",
) -> None:
"""Add or update a scheduler job for an action."""
scheduler = get_scheduler()
job_id = f"action_{action_id}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
if schedule_type == "cron" and cron_expression:
try:
from apscheduler.triggers.cron import CronTrigger
trigger = CronTrigger.from_crontab(cron_expression)
scheduler.add_job(
_run_action,
trigger,
id=job_id,
args=[action_id],
replace_existing=True,
)
_LOGGER.info("Scheduled action %d with cron: %s", action_id, cron_expression)
return
except Exception as e:
_LOGGER.error("Invalid cron for action %d: %s — using interval", action_id, e)
scheduler.add_job(
_run_action,
"interval",
seconds=interval,
id=job_id,
args=[action_id],
replace_existing=True,
)
_LOGGER.info("Scheduled action %d every %ds", action_id, interval)
async def unschedule_action(action_id: int) -> None:
"""Remove a scheduler job for an action."""
scheduler = get_scheduler()
job_id = f"action_{action_id}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
_LOGGER.info("Unscheduled action %d", action_id)
async def _run_action(action_id: int) -> None:
"""Run an action (called by APScheduler)."""
from .action_runner import run_action
try:
await run_action(action_id, trigger="scheduled")
except Exception as e:
_LOGGER.error("Error running action %d: %s", action_id, e)
# ---------------------------------------------------------------------------
# Scheduled backup
# ---------------------------------------------------------------------------
_BACKUP_JOB_ID = "scheduled_backup"
async def _load_backup_job() -> None:
"""Load scheduled backup job from settings if enabled."""
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession as _AS
from ..database.engine import get_engine
from ..database.models import AppSetting
engine = get_engine()
async with _AS(engine) as session:
enabled_row = await session.get(AppSetting, "backup_scheduled_enabled")
interval_row = await session.get(AppSetting, "backup_scheduled_interval_hours")
enabled = enabled_row and enabled_row.value == "true"
if not enabled:
return
interval_hours = int(interval_row.value) if interval_row and interval_row.value else 24
scheduler = get_scheduler()
scheduler.add_job(
_run_scheduled_backup,
"interval",
hours=interval_hours,
id=_BACKUP_JOB_ID,
replace_existing=True,
max_instances=1,
)
_LOGGER.info("Scheduled backup every %dh", interval_hours)
async def schedule_backup(interval_hours: int = 24) -> None:
"""Add or update the scheduled backup job."""
scheduler = get_scheduler()
if scheduler.get_job(_BACKUP_JOB_ID):
scheduler.remove_job(_BACKUP_JOB_ID)
scheduler.add_job(
_run_scheduled_backup,
"interval",
hours=interval_hours,
id=_BACKUP_JOB_ID,
replace_existing=True,
max_instances=1,
)
_LOGGER.info("Scheduled backup every %dh", interval_hours)
async def unschedule_backup() -> None:
"""Remove the scheduled backup job."""
scheduler = get_scheduler()
if scheduler.get_job(_BACKUP_JOB_ID):
scheduler.remove_job(_BACKUP_JOB_ID)
_LOGGER.info("Unscheduled backup job")
async def _run_scheduled_backup() -> None:
"""Run a scheduled backup (called by APScheduler)."""
from sqlmodel.ext.asyncio.session import AsyncSession as _AS
from ..database.engine import get_engine
from ..database.models import AppSetting, User
from ..config import settings as app_config
from .backup_schema import SecretsMode
from .backup_service import export_backup_to_file, cleanup_old_backups
try:
engine = get_engine()
async with _AS(engine) as session:
# Read settings
secrets_row = await session.get(AppSetting, "backup_secrets_mode")
retention_row = await session.get(AppSetting, "backup_retention_count")
secrets_mode = SecretsMode(secrets_row.value) if secrets_row and secrets_row.value else SecretsMode.EXCLUDE
retention = int(retention_row.value) if retention_row and retention_row.value else 5
# Find admin user (first admin) for ownership context
from sqlmodel import select
admin_result = await session.exec(
select(User).where(User.role == "admin")
)
admin = admin_result.first()
if not admin:
_LOGGER.warning("No admin user found, skipping scheduled backup")
return
backup_dir = app_config.data_dir / "backups"
await export_backup_to_file(session, admin.id, backup_dir, secrets_mode)
# Cleanup outside the session
cleanup_old_backups(backup_dir, keep=retention)
except Exception as e:
_LOGGER.error("Scheduled backup failed: %s", e)