diff --git a/packages/core/src/notify_bridge_core/templates/defaults/en/scheduled_assets.jinja2 b/packages/core/src/notify_bridge_core/templates/defaults/en/scheduled_assets.jinja2 index 3749dff..c69df11 100644 --- a/packages/core/src/notify_bridge_core/templates/defaults/en/scheduled_assets.jinja2 +++ b/packages/core/src/notify_bridge_core/templates/defaults/en/scheduled_assets.jinja2 @@ -1,4 +1,4 @@ -📸 Photos from {% if public_url %}{{ album_name }}{% else %}"{{ album_name }}"{% endif %}: +🗓️ Scheduled delivery — random photos from {% if public_url %}{{ album_name }}{% else %}"{{ album_name }}"{% endif %}: {%- for asset in assets %} • {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {% if asset.public_url %}{{ asset.filename }}{% else %}{{ asset.filename }}{% endif %} -{%- endfor %} \ No newline at end of file +{%- endfor %} diff --git a/packages/core/src/notify_bridge_core/templates/defaults/ru/scheduled_assets.jinja2 b/packages/core/src/notify_bridge_core/templates/defaults/ru/scheduled_assets.jinja2 index 04f86df..fb44dd5 100644 --- a/packages/core/src/notify_bridge_core/templates/defaults/ru/scheduled_assets.jinja2 +++ b/packages/core/src/notify_bridge_core/templates/defaults/ru/scheduled_assets.jinja2 @@ -1,4 +1,4 @@ -📸 Фото из {% if public_url %}{{ album_name }}{% else %}"{{ album_name }}"{% endif %}: +🗓️ Доставка по расписанию — случайные фото из {% if public_url %}{{ album_name }}{% else %}"{{ album_name }}"{% endif %}: {%- for asset in assets %} • {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {% if asset.public_url %}{{ asset.filename }}{% else %}{{ asset.filename }}{% endif %} -{%- endfor %} \ No newline at end of file +{%- endfor %} diff --git a/packages/server/src/notify_bridge_server/api/notification_tracker_targets.py b/packages/server/src/notify_bridge_server/api/notification_tracker_targets.py index 43a4afe..15a209f 100644 --- a/packages/server/src/notify_bridge_server/api/notification_tracker_targets.py +++ b/packages/server/src/notify_bridge_server/api/notification_tracker_targets.py @@ -23,6 +23,7 @@ from ..database.models import ( ) from ..services.notifier import send_test_notification from ..services.manual_dispatch import dispatch_test_notification +from ..services.scheduler import reschedule_immich_dispatch_jobs from .helpers import get_owned_entity _LOGGER = logging.getLogger(__name__) @@ -118,6 +119,7 @@ async def create_notification_tracker_target( session.add(tt) await session.commit() await session.refresh(tt) + await reschedule_immich_dispatch_jobs() return await _tt_response(session, tt) @@ -164,6 +166,7 @@ async def update_notification_tracker_target( session.add(tt) await session.commit() await session.refresh(tt) + await reschedule_immich_dispatch_jobs() return await _tt_response(session, tt) @@ -181,6 +184,7 @@ async def delete_notification_tracker_target( raise HTTPException(status_code=404, detail="Tracker-target link not found") await session.delete(tt) await session.commit() + await reschedule_immich_dispatch_jobs() @router.post("/{tracker_target_id}/test/{test_type}") diff --git a/packages/server/src/notify_bridge_server/api/notification_trackers.py b/packages/server/src/notify_bridge_server/api/notification_trackers.py index eed6e07..96bc46d 100644 --- a/packages/server/src/notify_bridge_server/api/notification_trackers.py +++ b/packages/server/src/notify_bridge_server/api/notification_trackers.py @@ -18,7 +18,11 @@ from ..database.models import ( ServiceProvider, User, ) -from ..services.scheduler import schedule_tracker, unschedule_tracker +from ..services.scheduler import ( + reschedule_immich_dispatch_jobs, + schedule_tracker, + unschedule_tracker, +) from .helpers import get_owned_entity from .notification_tracker_targets import _tt_response @@ -146,6 +150,7 @@ async def create_notification_tracker( await session.refresh(tracker) if tracker.enabled: await schedule_tracker(tracker.id, tracker.scan_interval) + await reschedule_immich_dispatch_jobs() return await _tracker_response(session, tracker) @@ -176,6 +181,7 @@ async def update_notification_tracker( await schedule_tracker(tracker.id, tracker.scan_interval) else: await unschedule_tracker(tracker.id) + await reschedule_immich_dispatch_jobs() return await _tracker_response(session, tracker) @@ -208,6 +214,7 @@ async def delete_notification_tracker( await session.delete(tracker) await session.commit() await unschedule_tracker(tracker_id) + await reschedule_immich_dispatch_jobs() @router.post("/{tracker_id}/trigger") diff --git a/packages/server/src/notify_bridge_server/api/tracking_configs.py b/packages/server/src/notify_bridge_server/api/tracking_configs.py index 88098a3..abc7822 100644 --- a/packages/server/src/notify_bridge_server/api/tracking_configs.py +++ b/packages/server/src/notify_bridge_server/api/tracking_configs.py @@ -10,6 +10,7 @@ from sqlmodel.ext.asyncio.session import AsyncSession from ..auth.dependencies import get_current_user from ..database.engine import get_session from ..database.models import TrackingConfig, User +from ..services.scheduler import reschedule_immich_dispatch_jobs _LOGGER = logging.getLogger(__name__) @@ -127,6 +128,8 @@ async def create_config( session.add(config) await session.commit() await session.refresh(config) + if config.provider_type == "immich": + await reschedule_immich_dispatch_jobs() return _response(config) @@ -152,6 +155,8 @@ async def update_config( session.add(config) await session.commit() await session.refresh(config) + if config.provider_type == "immich": + await reschedule_immich_dispatch_jobs() return _response(config) @@ -164,8 +169,11 @@ async def delete_config( from .delete_protection import check_tracking_config, raise_if_used config = await _get(session, config_id, user.id) raise_if_used(await check_tracking_config(session, config.id), config.name) + provider_type = config.provider_type await session.delete(config) await session.commit() + if provider_type == "immich": + await reschedule_immich_dispatch_jobs() def _response(c: TrackingConfig) -> dict: diff --git a/packages/server/src/notify_bridge_server/services/scheduled_dispatch.py b/packages/server/src/notify_bridge_server/services/scheduled_dispatch.py new file mode 100644 index 0000000..8c23730 --- /dev/null +++ b/packages/server/src/notify_bridge_server/services/scheduled_dispatch.py @@ -0,0 +1,242 @@ +"""Cron-fired scheduled / periodic / memory dispatch for Immich trackers. + +The Immich provider exposes three notification slots that fire on a wall-clock +schedule rather than in response to album changes: + +* ``scheduled_assets_message`` — random asset selection at fixed times of day +* ``periodic_summary_message`` — album stats summary at fixed times of day +* ``memory_mode_message`` — "On This Day" memories at fixed times of day + +The fire times live on the tracker's default ``TrackingConfig`` as comma- +separated ``HH:MM`` strings (``scheduled_times`` / ``periodic_times`` / +``memory_times``) interpreted in the app-level IANA timezone +(``AppSetting.timezone``). The scheduler module wires the cron jobs; this +module owns the dispatch flow once a job fires. + +Note on per-link tracking-config overrides: schedule *times* come from the +tracker's default config — a per-link override may disable the slot for that +link (via ``{kind}_enabled``) but cannot shift its fire time. Consistent with +the test-dispatch path in ``manual_dispatch``. +""" + +from __future__ import annotations + +import logging +from typing import Literal + +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +from notify_bridge_core.models.events import EventType +from notify_bridge_core.notifications.dispatcher import ( + NotificationDispatcher, + TargetConfig, +) + +from ..database.engine import get_engine +from ..database.models import ( + EventLog, + NotificationTracker, + ServiceProvider, + TemplateSlot, + TrackingConfig, +) +from .dispatch_helpers import ( + event_allowed_by_config, + get_app_timezone, + load_link_data, +) +from .manual_dispatch import _build_immich_event, _build_immich_periodic_event + +_LOGGER = logging.getLogger(__name__) + +ScheduledKind = Literal["scheduled", "periodic", "memory"] + +# Maps the dispatch kind to the DB slot name that holds its template. +# The dispatcher keys templates by ``event.event_type.value`` (always +# ``scheduled_message`` here), so we read the right ``TemplateSlot`` row and +# inject it under that single event-type key — same pattern as the test path. +_SLOT_MAP: dict[ScheduledKind, str] = { + "scheduled": "scheduled_assets_message", + "periodic": "periodic_summary_message", + "memory": "memory_mode_message", +} + + +async def dispatch_scheduled_for_tracker( + tracker_id: int, kind: ScheduledKind +) -> None: + """Build the slot's event for ``tracker_id`` and fan out to its links. + + Skips silently when the tracker is disabled, the provider is not Immich, + the slot is disabled on the tracker's default tracking config, or no link + has a ``TemplateConfig`` with the corresponding slot row. + """ + engine = get_engine() + async with AsyncSession(engine) as session: + tracker = await session.get(NotificationTracker, tracker_id) + if not tracker or not tracker.enabled: + return + provider = await session.get(ServiceProvider, tracker.provider_id) + if not provider or provider.type != "immich": + return + + default_tc: TrackingConfig | None = None + if tracker.default_tracking_config_id: + default_tc = await session.get( + TrackingConfig, tracker.default_tracking_config_id + ) + # If the default config disables this kind, nothing to do — schedule + # rebuild only adds jobs when the flag is set, but a stale job from + # a previous DB state could still fire one tick before invalidation. + if default_tc is None or not getattr(default_tc, f"{kind}_enabled", False): + _LOGGER.debug( + "Scheduled %s skipped for tracker %d: kind disabled on default config", + kind, tracker_id, + ) + return + + # Snapshot every field we need outside the session — after the + # ``async with`` exits the instances are detached and lazy-load + # would fail. Cheaper than re-fetching, safer than touching + # attributes through a closed session. + provider_id = provider.id + provider_config = dict(provider.config) + provider_name = provider.name or provider.type + tracker_user_id = tracker.user_id + tracker_name = tracker.name or "" + collection_ids = list(tracker.collection_ids or []) + + app_tz = await get_app_timezone(session) + link_data = await load_link_data(session, tracker_id) + + if not link_data: + _LOGGER.info( + "Scheduled %s for tracker %d: no enabled links, skipping", + kind, tracker_id, + ) + return + + if kind == "periodic": + event = await _build_immich_periodic_event( + provider_config=provider_config, + provider_name=provider_name, + tracker_name=tracker_name, + collection_ids=collection_ids, + ) + else: + event = await _build_immich_event( + provider_config=provider_config, + provider_name=provider_name, + tracker_name=tracker_name, + collection_ids=collection_ids, + test_type=kind, + tracking_config=default_tc, + ) + if event is None: + _LOGGER.warning( + "Scheduled %s for tracker %d: provider returned no event", + kind, tracker_id, + ) + return + + # Skip empty payloads for asset-bearing kinds — sending the bare + # "On this day:" / "Scheduled delivery —" header with no items below + # spams chats with title-only messages every day. ``periodic`` is + # different: it's a stats summary that's still meaningful with zero + # assets, so we let it through. + if kind in ("scheduled", "memory") and not event.added_assets: + _LOGGER.info( + "Scheduled %s for tracker %d: 0 assets matched, skipping dispatch", + kind, tracker_id, + ) + return + + slot_name = _SLOT_MAP[kind] + target_configs: list[TargetConfig] = [] + async with AsyncSession(engine) as session: + for ld in link_data: + tc = ld["tracking_config"] or default_tc + tmpl = ld["template_config"] + if tc is not None: + # Per-link override may disable this kind even when the + # default has it on — honour that here. + if not getattr(tc, f"{kind}_enabled", True): + continue + if not event_allowed_by_config(event, tc, app_tz): + continue + if tmpl is None: + continue + + slot_rows = (await session.exec( + select(TemplateSlot).where( + TemplateSlot.config_id == tmpl.id, + TemplateSlot.slot_name == slot_name, + ) + )).all() + if not slot_rows: + continue + locale_map = {s.locale: s.template for s in slot_rows} + template_slots = {EventType.SCHEDULED_MESSAGE.value: locale_map} + + target_configs.append(TargetConfig( + type=ld["target_type"], + config=ld["target_config"], + template_slots=template_slots, + date_format=tmpl.date_format, + date_only_format=( + tmpl.date_only_format or "%d.%m.%Y" + ), + provider_api_key=provider_config.get("api_key"), + provider_internal_url=provider_config.get("url", ""), + provider_external_url=provider_config.get("external_domain", ""), + receivers=ld["receivers"], + )) + + if not target_configs: + _LOGGER.info( + "Scheduled %s for tracker %d: no targets after filtering", + kind, tracker_id, + ) + return + + # Lazy import to break the watcher↔scheduler↔scheduled_dispatch cycle. + from .watcher import _get_telegram_caches + from .http_session import get_http_session + + url_cache, asset_cache = await _get_telegram_caches() + http_session = await get_http_session() + dispatcher = NotificationDispatcher( + url_cache=url_cache, asset_cache=asset_cache, session=http_session, + ) + _LOGGER.info( + "Dispatching scheduled %s for tracker %d to %d link(s)", + kind, tracker_id, len(target_configs), + ) + results = await dispatcher.dispatch(event, target_configs) + + # Mirror the watcher's audit trail: surface scheduled fires in EventLog so + # the dashboard shows *why* a notification arrived (otherwise these would + # be invisible to the activity feed). + successes = sum(1 for r in results if isinstance(r, dict) and r.get("success")) + async with AsyncSession(engine) as session: + session.add(EventLog( + user_id=tracker_user_id, + tracker_id=tracker_id, + tracker_name=tracker_name, + provider_id=provider_id, + provider_name=provider_name, + event_type=event.event_type.value, + collection_id=event.collection_id, + collection_name=event.collection_name, + assets_count=event.added_count or 0, + details={ + "kind": kind, + "slot": slot_name, + "trigger": "cron", + "timezone": app_tz, + "targets_dispatched": len(target_configs), + "targets_succeeded": successes, + }, + )) + await session.commit() diff --git a/packages/server/src/notify_bridge_server/services/scheduler.py b/packages/server/src/notify_bridge_server/services/scheduler.py index 9402bf4..3784f48 100644 --- a/packages/server/src/notify_bridge_server/services/scheduler.py +++ b/packages/server/src/notify_bridge_server/services/scheduler.py @@ -111,6 +111,7 @@ async def start_scheduler() -> None: await _load_tracker_jobs() await _load_action_jobs() + await _load_immich_dispatch_jobs() # Start Telegram bot polling for bots with active command listeners from .telegram_poller import start_command_listener_polling @@ -760,6 +761,10 @@ async def reschedule_cron_jobs_for_timezone_change() -> None: "Rescheduled %d cron job(s) for new app timezone %s", rescheduled, tz.key, ) + # Immich scheduled/periodic/memory jobs are also CronTrigger-based and + # carry the same frozen-tz problem — rebuild them under the new tz. + await reschedule_immich_dispatch_jobs() + async def _run_action(action_id: int) -> None: """Run an action (called by APScheduler).""" @@ -770,6 +775,155 @@ async def _run_action(action_id: int) -> None: _LOGGER.error("Error running action %d: %s", action_id, e) +# --------------------------------------------------------------------------- +# Immich scheduled / periodic / memory dispatch (cron-fired) +# +# These three slots fire on wall-clock schedules taken from the tracker's +# default ``TrackingConfig`` (``scheduled_times``, ``periodic_times``, +# ``memory_times`` — comma-separated ``HH:MM`` strings) interpreted in the +# app-level IANA timezone. The dispatch flow lives in +# ``services.scheduled_dispatch``; this section just owns scheduling. +# --------------------------------------------------------------------------- + +_IMMICH_DISPATCH_KINDS = ("scheduled", "periodic", "memory") +_IMMICH_DISPATCH_PREFIX = "immich_dispatch_" + + +def _parse_hhmm_list(raw: str) -> list[tuple[int, int]]: + """Parse ``"09:00,18:30"`` → ``[(9, 0), (18, 30)]``, skipping bad entries. + + A typo in one slot must not prevent the others from scheduling — we log + and move on rather than raising. + """ + out: list[tuple[int, int]] = [] + for part in (raw or "").split(","): + part = part.strip() + if not part: + continue + try: + h_str, m_str = part.split(":", 1) + hour, minute = int(h_str), int(m_str) + except ValueError: + _LOGGER.warning("Skipping invalid time literal %r", part) + continue + if not (0 <= hour <= 23 and 0 <= minute <= 59): + _LOGGER.warning("Skipping out-of-range time %r", part) + continue + out.append((hour, minute)) + return out + + +async def _run_immich_dispatch(tracker_id: int, kind: str) -> None: + """APScheduler entry point — wraps the dispatch helper to swallow errors.""" + from .scheduled_dispatch import dispatch_scheduled_for_tracker + try: + await dispatch_scheduled_for_tracker(tracker_id, kind) # type: ignore[arg-type] + except Exception as err: # noqa: BLE001 + _LOGGER.error( + "Immich %s dispatch for tracker %d failed: %s", kind, tracker_id, err, + ) + + +async def _load_immich_dispatch_jobs() -> None: + """Schedule cron jobs for every (tracker, kind, time) where the kind is on. + + Reads each enabled Immich tracker's *default* tracking config — per-link + overrides only gate dispatch (handled in ``scheduled_dispatch``), they do + not influence the fire schedule. + """ + from sqlmodel import select + from sqlmodel.ext.asyncio.session import AsyncSession + + from apscheduler.triggers.cron import CronTrigger + + from ..database.engine import get_engine + from ..database.models import ( + NotificationTracker, + ServiceProvider as ServiceProviderModel, + TrackingConfig, + ) + + engine = get_engine() + scheduler = get_scheduler() + tz = await _load_app_timezone() + + async with AsyncSession(engine) as session: + trackers = (await session.exec( + select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712 + )).all() + if not trackers: + return + + provider_ids = list({t.provider_id for t in trackers}) + provider_types: dict[int, str] = {} + if provider_ids: + rows = await session.exec( + select(ServiceProviderModel).where( + ServiceProviderModel.id.in_(provider_ids) + ) + ) + provider_types = {p.id: p.type for p in rows.all()} + + tc_ids = list({ + t.default_tracking_config_id for t in trackers + if t.default_tracking_config_id + }) + tc_map: dict[int, TrackingConfig] = {} + if tc_ids: + rows = await session.exec( + select(TrackingConfig).where(TrackingConfig.id.in_(tc_ids)) + ) + tc_map = {tc.id: tc for tc in rows.all()} + + scheduled = 0 + for tracker in trackers: + if provider_types.get(tracker.provider_id) != "immich": + continue + tc = tc_map.get(tracker.default_tracking_config_id) if tracker.default_tracking_config_id else None + if tc is None: + continue + + for kind in _IMMICH_DISPATCH_KINDS: + if not getattr(tc, f"{kind}_enabled", False): + continue + times_raw = getattr(tc, f"{kind}_times", "") or "" + for hour, minute in _parse_hhmm_list(times_raw): + job_id = f"{_IMMICH_DISPATCH_PREFIX}{kind}_{tracker.id}_{hour:02d}{minute:02d}" + scheduler.add_job( + _run_immich_dispatch, + CronTrigger(hour=hour, minute=minute, timezone=tz), + id=job_id, + args=[tracker.id, kind], + replace_existing=True, + max_instances=1, + ) + scheduled += 1 + _LOGGER.info( + "Scheduled Immich %s for tracker %d at %02d:%02d [tz=%s]", + kind, tracker.id, hour, minute, tz.key, + ) + + if scheduled: + _LOGGER.info( + "Loaded %d Immich scheduled/periodic/memory job(s) [tz=%s]", + scheduled, tz.key, + ) + + +async def reschedule_immich_dispatch_jobs() -> None: + """Drop and rebuild all Immich scheduled/periodic/memory jobs. + + Cheap to call on every relevant mutation — a typical install has only a + handful of trackers. Called from the tracker, link, and tracking-config + CRUD endpoints, and from ``reschedule_cron_jobs_for_timezone_change``. + """ + scheduler = get_scheduler() + for job in list(scheduler.get_jobs()): + if job.id.startswith(_IMMICH_DISPATCH_PREFIX): + scheduler.remove_job(job.id) + await _load_immich_dispatch_jobs() + + # --------------------------------------------------------------------------- # Scheduled backup # ---------------------------------------------------------------------------