feat(immich): wire cron-fired scheduled/periodic/memory dispatch

The scheduled_enabled / scheduled_times (and the periodic / memory
counterparts) on TrackingConfig had been wired into the model, the
API, and the test-dispatch path — but no production scheduler ever
read them, so users saw the slot in the UI and only ever got fires
through "Test". This adds the missing cron jobs and the dispatch
fan-out, both keyed off the app-level IANA timezone.

* services/scheduled_dispatch.py — production fan-out reusing the
  test-path event builders, picking the slot template per kind, and
  writing an EventLog row per fire so the dashboard reflects it.
* services/scheduler.py — _load_immich_dispatch_jobs builds one
  CronTrigger per (tracker, kind, HH:MM) from the tracker's default
  TrackingConfig; reschedule_immich_dispatch_jobs rebuilds them all
  on any relevant CRUD or timezone change.
* tracker / link / tracking-config CRUD endpoints now invalidate.

Also: skip dispatch when scheduled/memory yield zero matching assets
(prevents header-only "On this day:" spam), and update the EN/RU
default scheduled_assets templates to surface that the delivery is
a scheduled random selection.
This commit is contained in:
2026-04-24 12:49:47 +03:00
parent 90def11b8d
commit 309dec2b44
7 changed files with 420 additions and 5 deletions
@@ -1,4 +1,4 @@
📸 Photos from {% if public_url %}<a href="{{ public_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}: 🗓️ Scheduled delivery — random photos from {% if public_url %}<a href="{{ public_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}:
{%- for asset in assets %} {%- for asset in assets %}
• {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {% if asset.public_url %}<a href="{{ asset.public_url }}">{{ asset.filename }}</a>{% else %}{{ asset.filename }}{% endif %} • {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {% if asset.public_url %}<a href="{{ asset.public_url }}">{{ asset.filename }}</a>{% else %}{{ asset.filename }}{% endif %}
{%- endfor %} {%- endfor %}
@@ -1,4 +1,4 @@
📸 Фото из {% if public_url %}<a href="{{ public_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}: 🗓️ Доставка по расписанию — случайные фото из {% if public_url %}<a href="{{ public_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}:
{%- for asset in assets %} {%- for asset in assets %}
• {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {% if asset.public_url %}<a href="{{ asset.public_url }}">{{ asset.filename }}</a>{% else %}{{ asset.filename }}{% endif %} • {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {% if asset.public_url %}<a href="{{ asset.public_url }}">{{ asset.filename }}</a>{% else %}{{ asset.filename }}{% endif %}
{%- endfor %} {%- endfor %}
@@ -23,6 +23,7 @@ from ..database.models import (
) )
from ..services.notifier import send_test_notification from ..services.notifier import send_test_notification
from ..services.manual_dispatch import dispatch_test_notification from ..services.manual_dispatch import dispatch_test_notification
from ..services.scheduler import reschedule_immich_dispatch_jobs
from .helpers import get_owned_entity from .helpers import get_owned_entity
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -118,6 +119,7 @@ async def create_notification_tracker_target(
session.add(tt) session.add(tt)
await session.commit() await session.commit()
await session.refresh(tt) await session.refresh(tt)
await reschedule_immich_dispatch_jobs()
return await _tt_response(session, tt) return await _tt_response(session, tt)
@@ -164,6 +166,7 @@ async def update_notification_tracker_target(
session.add(tt) session.add(tt)
await session.commit() await session.commit()
await session.refresh(tt) await session.refresh(tt)
await reschedule_immich_dispatch_jobs()
return await _tt_response(session, tt) return await _tt_response(session, tt)
@@ -181,6 +184,7 @@ async def delete_notification_tracker_target(
raise HTTPException(status_code=404, detail="Tracker-target link not found") raise HTTPException(status_code=404, detail="Tracker-target link not found")
await session.delete(tt) await session.delete(tt)
await session.commit() await session.commit()
await reschedule_immich_dispatch_jobs()
@router.post("/{tracker_target_id}/test/{test_type}") @router.post("/{tracker_target_id}/test/{test_type}")
@@ -18,7 +18,11 @@ from ..database.models import (
ServiceProvider, ServiceProvider,
User, User,
) )
from ..services.scheduler import schedule_tracker, unschedule_tracker from ..services.scheduler import (
reschedule_immich_dispatch_jobs,
schedule_tracker,
unschedule_tracker,
)
from .helpers import get_owned_entity from .helpers import get_owned_entity
from .notification_tracker_targets import _tt_response from .notification_tracker_targets import _tt_response
@@ -146,6 +150,7 @@ async def create_notification_tracker(
await session.refresh(tracker) await session.refresh(tracker)
if tracker.enabled: if tracker.enabled:
await schedule_tracker(tracker.id, tracker.scan_interval) await schedule_tracker(tracker.id, tracker.scan_interval)
await reschedule_immich_dispatch_jobs()
return await _tracker_response(session, tracker) return await _tracker_response(session, tracker)
@@ -176,6 +181,7 @@ async def update_notification_tracker(
await schedule_tracker(tracker.id, tracker.scan_interval) await schedule_tracker(tracker.id, tracker.scan_interval)
else: else:
await unschedule_tracker(tracker.id) await unschedule_tracker(tracker.id)
await reschedule_immich_dispatch_jobs()
return await _tracker_response(session, tracker) return await _tracker_response(session, tracker)
@@ -208,6 +214,7 @@ async def delete_notification_tracker(
await session.delete(tracker) await session.delete(tracker)
await session.commit() await session.commit()
await unschedule_tracker(tracker_id) await unschedule_tracker(tracker_id)
await reschedule_immich_dispatch_jobs()
@router.post("/{tracker_id}/trigger") @router.post("/{tracker_id}/trigger")
@@ -10,6 +10,7 @@ from sqlmodel.ext.asyncio.session import AsyncSession
from ..auth.dependencies import get_current_user from ..auth.dependencies import get_current_user
from ..database.engine import get_session from ..database.engine import get_session
from ..database.models import TrackingConfig, User from ..database.models import TrackingConfig, User
from ..services.scheduler import reschedule_immich_dispatch_jobs
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -127,6 +128,8 @@ async def create_config(
session.add(config) session.add(config)
await session.commit() await session.commit()
await session.refresh(config) await session.refresh(config)
if config.provider_type == "immich":
await reschedule_immich_dispatch_jobs()
return _response(config) return _response(config)
@@ -152,6 +155,8 @@ async def update_config(
session.add(config) session.add(config)
await session.commit() await session.commit()
await session.refresh(config) await session.refresh(config)
if config.provider_type == "immich":
await reschedule_immich_dispatch_jobs()
return _response(config) return _response(config)
@@ -164,8 +169,11 @@ async def delete_config(
from .delete_protection import check_tracking_config, raise_if_used from .delete_protection import check_tracking_config, raise_if_used
config = await _get(session, config_id, user.id) config = await _get(session, config_id, user.id)
raise_if_used(await check_tracking_config(session, config.id), config.name) raise_if_used(await check_tracking_config(session, config.id), config.name)
provider_type = config.provider_type
await session.delete(config) await session.delete(config)
await session.commit() await session.commit()
if provider_type == "immich":
await reschedule_immich_dispatch_jobs()
def _response(c: TrackingConfig) -> dict: def _response(c: TrackingConfig) -> dict:
@@ -0,0 +1,242 @@
"""Cron-fired scheduled / periodic / memory dispatch for Immich trackers.
The Immich provider exposes three notification slots that fire on a wall-clock
schedule rather than in response to album changes:
* ``scheduled_assets_message`` — random asset selection at fixed times of day
* ``periodic_summary_message`` — album stats summary at fixed times of day
* ``memory_mode_message`` — "On This Day" memories at fixed times of day
The fire times live on the tracker's default ``TrackingConfig`` as comma-
separated ``HH:MM`` strings (``scheduled_times`` / ``periodic_times`` /
``memory_times``) interpreted in the app-level IANA timezone
(``AppSetting.timezone``). The scheduler module wires the cron jobs; this
module owns the dispatch flow once a job fires.
Note on per-link tracking-config overrides: schedule *times* come from the
tracker's default config — a per-link override may disable the slot for that
link (via ``{kind}_enabled``) but cannot shift its fire time. Consistent with
the test-dispatch path in ``manual_dispatch``.
"""
from __future__ import annotations
import logging
from typing import Literal
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from notify_bridge_core.models.events import EventType
from notify_bridge_core.notifications.dispatcher import (
NotificationDispatcher,
TargetConfig,
)
from ..database.engine import get_engine
from ..database.models import (
EventLog,
NotificationTracker,
ServiceProvider,
TemplateSlot,
TrackingConfig,
)
from .dispatch_helpers import (
event_allowed_by_config,
get_app_timezone,
load_link_data,
)
from .manual_dispatch import _build_immich_event, _build_immich_periodic_event
_LOGGER = logging.getLogger(__name__)
ScheduledKind = Literal["scheduled", "periodic", "memory"]
# Maps the dispatch kind to the DB slot name that holds its template.
# The dispatcher keys templates by ``event.event_type.value`` (always
# ``scheduled_message`` here), so we read the right ``TemplateSlot`` row and
# inject it under that single event-type key — same pattern as the test path.
_SLOT_MAP: dict[ScheduledKind, str] = {
"scheduled": "scheduled_assets_message",
"periodic": "periodic_summary_message",
"memory": "memory_mode_message",
}
async def dispatch_scheduled_for_tracker(
tracker_id: int, kind: ScheduledKind
) -> None:
"""Build the slot's event for ``tracker_id`` and fan out to its links.
Skips silently when the tracker is disabled, the provider is not Immich,
the slot is disabled on the tracker's default tracking config, or no link
has a ``TemplateConfig`` with the corresponding slot row.
"""
engine = get_engine()
async with AsyncSession(engine) as session:
tracker = await session.get(NotificationTracker, tracker_id)
if not tracker or not tracker.enabled:
return
provider = await session.get(ServiceProvider, tracker.provider_id)
if not provider or provider.type != "immich":
return
default_tc: TrackingConfig | None = None
if tracker.default_tracking_config_id:
default_tc = await session.get(
TrackingConfig, tracker.default_tracking_config_id
)
# If the default config disables this kind, nothing to do — schedule
# rebuild only adds jobs when the flag is set, but a stale job from
# a previous DB state could still fire one tick before invalidation.
if default_tc is None or not getattr(default_tc, f"{kind}_enabled", False):
_LOGGER.debug(
"Scheduled %s skipped for tracker %d: kind disabled on default config",
kind, tracker_id,
)
return
# Snapshot every field we need outside the session — after the
# ``async with`` exits the instances are detached and lazy-load
# would fail. Cheaper than re-fetching, safer than touching
# attributes through a closed session.
provider_id = provider.id
provider_config = dict(provider.config)
provider_name = provider.name or provider.type
tracker_user_id = tracker.user_id
tracker_name = tracker.name or ""
collection_ids = list(tracker.collection_ids or [])
app_tz = await get_app_timezone(session)
link_data = await load_link_data(session, tracker_id)
if not link_data:
_LOGGER.info(
"Scheduled %s for tracker %d: no enabled links, skipping",
kind, tracker_id,
)
return
if kind == "periodic":
event = await _build_immich_periodic_event(
provider_config=provider_config,
provider_name=provider_name,
tracker_name=tracker_name,
collection_ids=collection_ids,
)
else:
event = await _build_immich_event(
provider_config=provider_config,
provider_name=provider_name,
tracker_name=tracker_name,
collection_ids=collection_ids,
test_type=kind,
tracking_config=default_tc,
)
if event is None:
_LOGGER.warning(
"Scheduled %s for tracker %d: provider returned no event",
kind, tracker_id,
)
return
# Skip empty payloads for asset-bearing kinds — sending the bare
# "On this day:" / "Scheduled delivery —" header with no items below
# spams chats with title-only messages every day. ``periodic`` is
# different: it's a stats summary that's still meaningful with zero
# assets, so we let it through.
if kind in ("scheduled", "memory") and not event.added_assets:
_LOGGER.info(
"Scheduled %s for tracker %d: 0 assets matched, skipping dispatch",
kind, tracker_id,
)
return
slot_name = _SLOT_MAP[kind]
target_configs: list[TargetConfig] = []
async with AsyncSession(engine) as session:
for ld in link_data:
tc = ld["tracking_config"] or default_tc
tmpl = ld["template_config"]
if tc is not None:
# Per-link override may disable this kind even when the
# default has it on — honour that here.
if not getattr(tc, f"{kind}_enabled", True):
continue
if not event_allowed_by_config(event, tc, app_tz):
continue
if tmpl is None:
continue
slot_rows = (await session.exec(
select(TemplateSlot).where(
TemplateSlot.config_id == tmpl.id,
TemplateSlot.slot_name == slot_name,
)
)).all()
if not slot_rows:
continue
locale_map = {s.locale: s.template for s in slot_rows}
template_slots = {EventType.SCHEDULED_MESSAGE.value: locale_map}
target_configs.append(TargetConfig(
type=ld["target_type"],
config=ld["target_config"],
template_slots=template_slots,
date_format=tmpl.date_format,
date_only_format=(
tmpl.date_only_format or "%d.%m.%Y"
),
provider_api_key=provider_config.get("api_key"),
provider_internal_url=provider_config.get("url", ""),
provider_external_url=provider_config.get("external_domain", ""),
receivers=ld["receivers"],
))
if not target_configs:
_LOGGER.info(
"Scheduled %s for tracker %d: no targets after filtering",
kind, tracker_id,
)
return
# Lazy import to break the watcher↔scheduler↔scheduled_dispatch cycle.
from .watcher import _get_telegram_caches
from .http_session import get_http_session
url_cache, asset_cache = await _get_telegram_caches()
http_session = await get_http_session()
dispatcher = NotificationDispatcher(
url_cache=url_cache, asset_cache=asset_cache, session=http_session,
)
_LOGGER.info(
"Dispatching scheduled %s for tracker %d to %d link(s)",
kind, tracker_id, len(target_configs),
)
results = await dispatcher.dispatch(event, target_configs)
# Mirror the watcher's audit trail: surface scheduled fires in EventLog so
# the dashboard shows *why* a notification arrived (otherwise these would
# be invisible to the activity feed).
successes = sum(1 for r in results if isinstance(r, dict) and r.get("success"))
async with AsyncSession(engine) as session:
session.add(EventLog(
user_id=tracker_user_id,
tracker_id=tracker_id,
tracker_name=tracker_name,
provider_id=provider_id,
provider_name=provider_name,
event_type=event.event_type.value,
collection_id=event.collection_id,
collection_name=event.collection_name,
assets_count=event.added_count or 0,
details={
"kind": kind,
"slot": slot_name,
"trigger": "cron",
"timezone": app_tz,
"targets_dispatched": len(target_configs),
"targets_succeeded": successes,
},
))
await session.commit()
@@ -111,6 +111,7 @@ async def start_scheduler() -> None:
await _load_tracker_jobs() await _load_tracker_jobs()
await _load_action_jobs() await _load_action_jobs()
await _load_immich_dispatch_jobs()
# Start Telegram bot polling for bots with active command listeners # Start Telegram bot polling for bots with active command listeners
from .telegram_poller import start_command_listener_polling from .telegram_poller import start_command_listener_polling
@@ -760,6 +761,10 @@ async def reschedule_cron_jobs_for_timezone_change() -> None:
"Rescheduled %d cron job(s) for new app timezone %s", rescheduled, tz.key, "Rescheduled %d cron job(s) for new app timezone %s", rescheduled, tz.key,
) )
# Immich scheduled/periodic/memory jobs are also CronTrigger-based and
# carry the same frozen-tz problem — rebuild them under the new tz.
await reschedule_immich_dispatch_jobs()
async def _run_action(action_id: int) -> None: async def _run_action(action_id: int) -> None:
"""Run an action (called by APScheduler).""" """Run an action (called by APScheduler)."""
@@ -770,6 +775,155 @@ async def _run_action(action_id: int) -> None:
_LOGGER.error("Error running action %d: %s", action_id, e) _LOGGER.error("Error running action %d: %s", action_id, e)
# ---------------------------------------------------------------------------
# Immich scheduled / periodic / memory dispatch (cron-fired)
#
# These three slots fire on wall-clock schedules taken from the tracker's
# default ``TrackingConfig`` (``scheduled_times``, ``periodic_times``,
# ``memory_times`` — comma-separated ``HH:MM`` strings) interpreted in the
# app-level IANA timezone. The dispatch flow lives in
# ``services.scheduled_dispatch``; this section just owns scheduling.
# ---------------------------------------------------------------------------
_IMMICH_DISPATCH_KINDS = ("scheduled", "periodic", "memory")
_IMMICH_DISPATCH_PREFIX = "immich_dispatch_"
def _parse_hhmm_list(raw: str) -> list[tuple[int, int]]:
"""Parse ``"09:00,18:30"`` → ``[(9, 0), (18, 30)]``, skipping bad entries.
A typo in one slot must not prevent the others from scheduling — we log
and move on rather than raising.
"""
out: list[tuple[int, int]] = []
for part in (raw or "").split(","):
part = part.strip()
if not part:
continue
try:
h_str, m_str = part.split(":", 1)
hour, minute = int(h_str), int(m_str)
except ValueError:
_LOGGER.warning("Skipping invalid time literal %r", part)
continue
if not (0 <= hour <= 23 and 0 <= minute <= 59):
_LOGGER.warning("Skipping out-of-range time %r", part)
continue
out.append((hour, minute))
return out
async def _run_immich_dispatch(tracker_id: int, kind: str) -> None:
"""APScheduler entry point — wraps the dispatch helper to swallow errors."""
from .scheduled_dispatch import dispatch_scheduled_for_tracker
try:
await dispatch_scheduled_for_tracker(tracker_id, kind) # type: ignore[arg-type]
except Exception as err: # noqa: BLE001
_LOGGER.error(
"Immich %s dispatch for tracker %d failed: %s", kind, tracker_id, err,
)
async def _load_immich_dispatch_jobs() -> None:
"""Schedule cron jobs for every (tracker, kind, time) where the kind is on.
Reads each enabled Immich tracker's *default* tracking config — per-link
overrides only gate dispatch (handled in ``scheduled_dispatch``), they do
not influence the fire schedule.
"""
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from apscheduler.triggers.cron import CronTrigger
from ..database.engine import get_engine
from ..database.models import (
NotificationTracker,
ServiceProvider as ServiceProviderModel,
TrackingConfig,
)
engine = get_engine()
scheduler = get_scheduler()
tz = await _load_app_timezone()
async with AsyncSession(engine) as session:
trackers = (await session.exec(
select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712
)).all()
if not trackers:
return
provider_ids = list({t.provider_id for t in trackers})
provider_types: dict[int, str] = {}
if provider_ids:
rows = await session.exec(
select(ServiceProviderModel).where(
ServiceProviderModel.id.in_(provider_ids)
)
)
provider_types = {p.id: p.type for p in rows.all()}
tc_ids = list({
t.default_tracking_config_id for t in trackers
if t.default_tracking_config_id
})
tc_map: dict[int, TrackingConfig] = {}
if tc_ids:
rows = await session.exec(
select(TrackingConfig).where(TrackingConfig.id.in_(tc_ids))
)
tc_map = {tc.id: tc for tc in rows.all()}
scheduled = 0
for tracker in trackers:
if provider_types.get(tracker.provider_id) != "immich":
continue
tc = tc_map.get(tracker.default_tracking_config_id) if tracker.default_tracking_config_id else None
if tc is None:
continue
for kind in _IMMICH_DISPATCH_KINDS:
if not getattr(tc, f"{kind}_enabled", False):
continue
times_raw = getattr(tc, f"{kind}_times", "") or ""
for hour, minute in _parse_hhmm_list(times_raw):
job_id = f"{_IMMICH_DISPATCH_PREFIX}{kind}_{tracker.id}_{hour:02d}{minute:02d}"
scheduler.add_job(
_run_immich_dispatch,
CronTrigger(hour=hour, minute=minute, timezone=tz),
id=job_id,
args=[tracker.id, kind],
replace_existing=True,
max_instances=1,
)
scheduled += 1
_LOGGER.info(
"Scheduled Immich %s for tracker %d at %02d:%02d [tz=%s]",
kind, tracker.id, hour, minute, tz.key,
)
if scheduled:
_LOGGER.info(
"Loaded %d Immich scheduled/periodic/memory job(s) [tz=%s]",
scheduled, tz.key,
)
async def reschedule_immich_dispatch_jobs() -> None:
"""Drop and rebuild all Immich scheduled/periodic/memory jobs.
Cheap to call on every relevant mutation — a typical install has only a
handful of trackers. Called from the tracker, link, and tracking-config
CRUD endpoints, and from ``reschedule_cron_jobs_for_timezone_change``.
"""
scheduler = get_scheduler()
for job in list(scheduler.get_jobs()):
if job.id.startswith(_IMMICH_DISPATCH_PREFIX):
scheduler.remove_job(job.id)
await _load_immich_dispatch_jobs()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Scheduled backup # Scheduled backup
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------