Compare commits

...

2 Commits

Author SHA1 Message Date
alexei.dolgolyov 461fb495d7 chore: release v0.5.0
Release / release (push) Successful in 1m10s
2026-04-24 14:16:34 +03:00
alexei.dolgolyov 309dec2b44 feat(immich): wire cron-fired scheduled/periodic/memory dispatch
The scheduled_enabled / scheduled_times (and the periodic / memory
counterparts) on TrackingConfig had been wired into the model, the
API, and the test-dispatch path — but no production scheduler ever
read them, so users saw the slot in the UI and only ever got fires
through "Test". This adds the missing cron jobs and the dispatch
fan-out, both keyed off the app-level IANA timezone.

* services/scheduled_dispatch.py — production fan-out reusing the
  test-path event builders, picking the slot template per kind, and
  writing an EventLog row per fire so the dashboard reflects it.
* services/scheduler.py — _load_immich_dispatch_jobs builds one
  CronTrigger per (tracker, kind, HH:MM) from the tracker's default
  TrackingConfig; reschedule_immich_dispatch_jobs rebuilds them all
  on any relevant CRUD or timezone change.
* tracker / link / tracking-config CRUD endpoints now invalidate.

Also: skip dispatch when scheduled/memory yield zero matching assets
(prevents header-only "On this day:" spam), and update the EN/RU
default scheduled_assets templates to surface that the delivery is
a scheduled random selection.
2026-04-24 12:49:47 +03:00
11 changed files with 436 additions and 62 deletions
+13 -54
View File
@@ -1,69 +1,28 @@
# v0.4.0 (2026-04-23)
# v0.5.0 (2026-04-24)
A production-readiness release focused on hardening the service for real-world deployment: end-to-end structured logging with runtime controls, a broad security and runtime review across the HTTP, auth, DB, and scheduler layers, and a new pre-migration database snapshot that makes upgrades recoverable with a single file restore. Release CI and the Docker image build were also reworked for speed and reliability.
A small but impactful release that finally makes the Immich scheduled / periodic / memory dispatch fire on its own. The slot was already visible in the tracker UI and the "Test" button worked — but no production scheduler was reading the config, so users only ever saw fires through manual tests. This release wires the missing cron jobs end-to-end.
## Features
- **Production-grade logging** with per-request correlation (`request_id` / `command` / `chat_id` / `bot_id` / `dispatch_id`), secret masking in both messages and tracebacks, JSON or text format, runtime log level + per-module overrides editable from the settings UI, and env-var boot overrides (`NOTIFY_BRIDGE_LOG_LEVEL` / `_FORMAT` / `_LEVELS`). Closes every silent drop in the Telegram send path — `/random` and media-group failures now log `WARN` / `ERROR` with full context instead of disappearing ([f50d465](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/f50d465))
- **Production-readiness hardening across security, async, DB, and ops** ([920920b](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/920920b)):
- *Security:* async SSRF-safe DNS resolver; `allow_redirects=False` on all outbound clients; Matrix `homeserver_url` validation; rejection of `***`-masked secrets on provider / email-bot updates; bcrypt moved off the event loop; JWT `iss` / `aud` + leeway with strict claim rejection; setup TOCTOU closed inside a transaction; expanded rate limits; constant-time login; config rejects known dev secret keys and validates CORS / ports / token lifetimes; webhook bodies capped at 1 MiB; Discord 429 retries bounded; CSP + HSTS headers added.
- *Async / runtime:* SQLite engine tuned (WAL, `synchronous=NORMAL`, `foreign_keys=ON`, busy timeout, pool pre-ping); ordered lifespan shutdown; shared `aiohttp` session race-free; blocking storage / backup writes offloaded to threads; NUT client timeouts; Telegram poller switched from 3 s short-poll to 30 s + 25 s long-poll (~10x fewer API calls).
- *Database:* new performance-index migration covering every FK and hot-path composite; new `schema_version` table; `__system__` placeholder user (`id=0`) seeded to satisfy FKs; `list_notification_trackers` rewritten from `1+N+N*M` to batched loads; retention job extended to event / webhook / action-execution logs.
- *Scheduler:* `AsyncIOScheduler` job defaults set (`coalesce`, `misfire_grace_time=300`, `max_instances=1`).
- *Ops:* uvicorn runs with `proxy_headers` / `forwarded_allow_ips` / graceful shutdown timeout; access log suppressed outside debug; FastAPI version read from `importlib.metadata`; new `/api/ready` endpoint; docker-compose adds resource / PID limits, `read_only` + tmpfs, `cap_drop: ALL`, `no-new-privileges`, drops the `ALLOW_PRIVATE_URLS=1` default, and points healthcheck at `/api/ready`.
- *Frontend:* `/login` redirects already-authenticated users to `/` and shows a distinct "backend unreachable" banner (en / ru) when `/auth/needs-setup` fails.
- **Pre-migration SQLite snapshots** via `VACUUM INTO` at lifespan startup — takes a consistent, atomic copy of the DB before migrations run, so a botched upgrade is recoverable by restoring a single file. Safe under WAL; best-effort (failures log but never raise); configurable via `NOTIFY_BRIDGE_PRE_MIGRATE_SNAPSHOT_KEEP` (default 5; 0 disables). Snapshots land in `data_dir/backups/pre-migrate-<ts>.db` and the N oldest are pruned each boot ([7cbb02b](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/7cbb02b))
## Bug Fixes
- Allow `unsafe-inline` scripts in CSP so SvelteKit's hydration bootstrap inline `<script>` runs in production — without it the frontend failed to hydrate under the hardened CSP introduced in this release ([8f0346e](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/8f0346e))
- **Cron-fired Immich dispatch for scheduled / periodic / memory slots** — adds the missing production fan-out so `scheduled_enabled` / `scheduled_times` (and the periodic / memory counterparts) on `TrackingConfig` actually fire on their own, not only through "Test" ([309dec2](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/309dec2)):
- New `services/scheduled_dispatch.py` reuses the test-path event builders, picks the slot template per kind (`scheduled_assets` / `periodic_assets` / `memory_assets`), and writes an `EventLog` row per fire so the dashboard reflects it.
- `services/scheduler.py` gains `_load_immich_dispatch_jobs`, which builds one `CronTrigger` per `(tracker, kind, HH:MM)` from each tracker's default `TrackingConfig`, all keyed off the app-level IANA timezone. `reschedule_immich_dispatch_jobs` rebuilds the job set on any relevant CRUD or timezone change.
- Tracker / link / tracking-config CRUD endpoints now invalidate the schedule, so edits take effect immediately without a restart.
- Dispatch is skipped when scheduled / memory queries yield zero matching assets — prevents header-only "On this day:" spam when nothing qualifies.
- EN / RU default `scheduled_assets` templates updated to surface that the delivery is a scheduled random selection.
## Upgrade Notes
- `ALLOW_PRIVATE_URLS=1` is no longer set by default in `docker-compose.yml`. If your deployment targets private network URLs, set it explicitly.
- Docker healthchecks now probe `/api/ready` (separate from `/api/health`); update any external monitors accordingly.
- Config startup now rejects known dev secret keys — set real values (e.g. `JWT_SECRET`) before upgrading.
- Log format and level can now be changed at runtime from the settings UI; the `log_format` field still requires a restart to apply (a `WARN` is logged noting this).
---
## Development / Internal
### Tests
- New `packages/server/tests/` suite with 29 passing tests: config validation; JWT round-trip and `aud` / `alg=none` rejection; SSRF scheme and private-range enforcement (sync + async); Discord bounded retry; a lifespan-level `/api/health` + `/api/ready` smoke check. `services/test_dispatch.py` renamed to `manual_dispatch.py` so pytest no longer auto-collects production code ([920920b](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/920920b))
### CI / Build
- CI now runs on push / PR with frontend `svelte-check` + build, and a non-push image build. Release workflow is gated on tests and publishes an immutable `sha-<commit>` image tag ([920920b](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/920920b))
- Install editable packages inside a venv ([2bec253](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/2bec253))
- Cache pip downloads and collapse install into a single pip call ([3b683ce](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/3b683ce))
- Drop backend pytest from Gitea CI — editable install is too slow on the hosted runner ([f904037](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/f904037))
- Skip `build.yml` on release commits to avoid redundant runs ([bbcdf1c](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/bbcdf1c))
- Drop Trivy scan from release (output was discarded and it never failed) ([19036a9](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/19036a9))
### Performance
- Split external Docker deps into a cacheable layer and swap pip for uv for faster image builds ([592e1b6](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/592e1b6))
- Install uv from PyPI instead of the ghcr.io distroless image to avoid slow GHCR pulls on CI ([a6a854a](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/a6a854a))
- No config changes required. Existing `scheduled_enabled` / `scheduled_times` / `periodic_*` / `memory_*` settings on tracking configs will start firing automatically on the next startup.
- If you had been relying on the "Test" button as a workaround, you can stop doing that now.
---
<details>
<summary>All Commits</summary>
| Hash | Message | Author |
| ---- | ------- | ------ |
| [8f0346e](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/8f0346e) | fix(csp): allow unsafe-inline scripts for SvelteKit hydration bootstrap | alexei.dolgolyov |
| [a6a854a](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/a6a854a) | perf(docker): install uv from PyPI instead of ghcr.io (avoid slow GHCR pulls) | alexei.dolgolyov |
| [19036a9](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/19036a9) | ci: drop trivy scan from release (never failed, output discarded) | alexei.dolgolyov |
| [592e1b6](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/592e1b6) | perf(docker): split external deps into a cacheable layer, swap pip for uv | alexei.dolgolyov |
| [bbcdf1c](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/bbcdf1c) | ci: skip build.yml on release commits | alexei.dolgolyov |
| [f904037](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/f904037) | ci: drop backend pytest stage (too slow on hosted runner) | alexei.dolgolyov |
| [3b683ce](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/3b683ce) | ci: cache pip downloads and collapse install into one pip call | alexei.dolgolyov |
| [2bec253](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/2bec253) | ci: install editable packages inside a venv | alexei.dolgolyov |
| [7cbb02b](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/7cbb02b) | feat(db): pre-migration SQLite snapshots via VACUUM INTO | alexei.dolgolyov |
| [920920b](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/920920b) | feat: production-readiness hardening across security, async, DB, ops | alexei.dolgolyov |
| [f50d465](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/f50d465) | feat(logging): production-grade logging with context vars, secret masking, and runtime level control | alexei.dolgolyov |
| Hash | Message | Author |
|------------------------------------------------------------------------------------------|------------------------------------------------------------------|------------------|
| [309dec2](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/309dec2) | feat(immich): wire cron-fired scheduled/periodic/memory dispatch | alexei.dolgolyov |
</details>
+1 -1
View File
@@ -1,7 +1,7 @@
{
"name": "notify-bridge-frontend",
"private": true,
"version": "0.4.0",
"version": "0.5.0",
"type": "module",
"scripts": {
"dev": "vite dev",
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "notify-bridge-core"
version = "0.4.0"
version = "0.5.0"
description = "Core library for Notify Bridge — service provider abstractions, models, notifications, and templates"
requires-python = ">=3.12"
dependencies = [
@@ -1,4 +1,4 @@
📸 Photos from {% if public_url %}<a href="{{ public_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}:
🗓️ Scheduled delivery — random photos from {% if public_url %}<a href="{{ public_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}:
{%- for asset in assets %}
• {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {% if asset.public_url %}<a href="{{ asset.public_url }}">{{ asset.filename }}</a>{% else %}{{ asset.filename }}{% endif %}
{%- endfor %}
{%- endfor %}
@@ -1,4 +1,4 @@
📸 Фото из {% if public_url %}<a href="{{ public_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}:
🗓️ Доставка по расписанию — случайные фото из {% if public_url %}<a href="{{ public_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}:
{%- for asset in assets %}
• {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {% if asset.public_url %}<a href="{{ asset.public_url }}">{{ asset.filename }}</a>{% else %}{{ asset.filename }}{% endif %}
{%- endfor %}
{%- endfor %}
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "notify-bridge-server"
version = "0.4.0"
version = "0.5.0"
description = "Standalone Notify Bridge server — FastAPI REST API with SQLite database"
requires-python = ">=3.12"
dependencies = [
@@ -23,6 +23,7 @@ from ..database.models import (
)
from ..services.notifier import send_test_notification
from ..services.manual_dispatch import dispatch_test_notification
from ..services.scheduler import reschedule_immich_dispatch_jobs
from .helpers import get_owned_entity
_LOGGER = logging.getLogger(__name__)
@@ -118,6 +119,7 @@ async def create_notification_tracker_target(
session.add(tt)
await session.commit()
await session.refresh(tt)
await reschedule_immich_dispatch_jobs()
return await _tt_response(session, tt)
@@ -164,6 +166,7 @@ async def update_notification_tracker_target(
session.add(tt)
await session.commit()
await session.refresh(tt)
await reschedule_immich_dispatch_jobs()
return await _tt_response(session, tt)
@@ -181,6 +184,7 @@ async def delete_notification_tracker_target(
raise HTTPException(status_code=404, detail="Tracker-target link not found")
await session.delete(tt)
await session.commit()
await reschedule_immich_dispatch_jobs()
@router.post("/{tracker_target_id}/test/{test_type}")
@@ -18,7 +18,11 @@ from ..database.models import (
ServiceProvider,
User,
)
from ..services.scheduler import schedule_tracker, unschedule_tracker
from ..services.scheduler import (
reschedule_immich_dispatch_jobs,
schedule_tracker,
unschedule_tracker,
)
from .helpers import get_owned_entity
from .notification_tracker_targets import _tt_response
@@ -146,6 +150,7 @@ async def create_notification_tracker(
await session.refresh(tracker)
if tracker.enabled:
await schedule_tracker(tracker.id, tracker.scan_interval)
await reschedule_immich_dispatch_jobs()
return await _tracker_response(session, tracker)
@@ -176,6 +181,7 @@ async def update_notification_tracker(
await schedule_tracker(tracker.id, tracker.scan_interval)
else:
await unschedule_tracker(tracker.id)
await reschedule_immich_dispatch_jobs()
return await _tracker_response(session, tracker)
@@ -208,6 +214,7 @@ async def delete_notification_tracker(
await session.delete(tracker)
await session.commit()
await unschedule_tracker(tracker_id)
await reschedule_immich_dispatch_jobs()
@router.post("/{tracker_id}/trigger")
@@ -10,6 +10,7 @@ from sqlmodel.ext.asyncio.session import AsyncSession
from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import TrackingConfig, User
from ..services.scheduler import reschedule_immich_dispatch_jobs
_LOGGER = logging.getLogger(__name__)
@@ -127,6 +128,8 @@ async def create_config(
session.add(config)
await session.commit()
await session.refresh(config)
if config.provider_type == "immich":
await reschedule_immich_dispatch_jobs()
return _response(config)
@@ -152,6 +155,8 @@ async def update_config(
session.add(config)
await session.commit()
await session.refresh(config)
if config.provider_type == "immich":
await reschedule_immich_dispatch_jobs()
return _response(config)
@@ -164,8 +169,11 @@ async def delete_config(
from .delete_protection import check_tracking_config, raise_if_used
config = await _get(session, config_id, user.id)
raise_if_used(await check_tracking_config(session, config.id), config.name)
provider_type = config.provider_type
await session.delete(config)
await session.commit()
if provider_type == "immich":
await reschedule_immich_dispatch_jobs()
def _response(c: TrackingConfig) -> dict:
@@ -0,0 +1,242 @@
"""Cron-fired scheduled / periodic / memory dispatch for Immich trackers.
The Immich provider exposes three notification slots that fire on a wall-clock
schedule rather than in response to album changes:
* ``scheduled_assets_message`` — random asset selection at fixed times of day
* ``periodic_summary_message`` — album stats summary at fixed times of day
* ``memory_mode_message`` — "On This Day" memories at fixed times of day
The fire times live on the tracker's default ``TrackingConfig`` as comma-
separated ``HH:MM`` strings (``scheduled_times`` / ``periodic_times`` /
``memory_times``) interpreted in the app-level IANA timezone
(``AppSetting.timezone``). The scheduler module wires the cron jobs; this
module owns the dispatch flow once a job fires.
Note on per-link tracking-config overrides: schedule *times* come from the
tracker's default config — a per-link override may disable the slot for that
link (via ``{kind}_enabled``) but cannot shift its fire time. Consistent with
the test-dispatch path in ``manual_dispatch``.
"""
from __future__ import annotations
import logging
from typing import Literal
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from notify_bridge_core.models.events import EventType
from notify_bridge_core.notifications.dispatcher import (
NotificationDispatcher,
TargetConfig,
)
from ..database.engine import get_engine
from ..database.models import (
EventLog,
NotificationTracker,
ServiceProvider,
TemplateSlot,
TrackingConfig,
)
from .dispatch_helpers import (
event_allowed_by_config,
get_app_timezone,
load_link_data,
)
from .manual_dispatch import _build_immich_event, _build_immich_periodic_event
_LOGGER = logging.getLogger(__name__)
ScheduledKind = Literal["scheduled", "periodic", "memory"]
# Maps the dispatch kind to the DB slot name that holds its template.
# The dispatcher keys templates by ``event.event_type.value`` (always
# ``scheduled_message`` here), so we read the right ``TemplateSlot`` row and
# inject it under that single event-type key — same pattern as the test path.
_SLOT_MAP: dict[ScheduledKind, str] = {
"scheduled": "scheduled_assets_message",
"periodic": "periodic_summary_message",
"memory": "memory_mode_message",
}
async def dispatch_scheduled_for_tracker(
tracker_id: int, kind: ScheduledKind
) -> None:
"""Build the slot's event for ``tracker_id`` and fan out to its links.
Skips silently when the tracker is disabled, the provider is not Immich,
the slot is disabled on the tracker's default tracking config, or no link
has a ``TemplateConfig`` with the corresponding slot row.
"""
engine = get_engine()
async with AsyncSession(engine) as session:
tracker = await session.get(NotificationTracker, tracker_id)
if not tracker or not tracker.enabled:
return
provider = await session.get(ServiceProvider, tracker.provider_id)
if not provider or provider.type != "immich":
return
default_tc: TrackingConfig | None = None
if tracker.default_tracking_config_id:
default_tc = await session.get(
TrackingConfig, tracker.default_tracking_config_id
)
# If the default config disables this kind, nothing to do — schedule
# rebuild only adds jobs when the flag is set, but a stale job from
# a previous DB state could still fire one tick before invalidation.
if default_tc is None or not getattr(default_tc, f"{kind}_enabled", False):
_LOGGER.debug(
"Scheduled %s skipped for tracker %d: kind disabled on default config",
kind, tracker_id,
)
return
# Snapshot every field we need outside the session — after the
# ``async with`` exits the instances are detached and lazy-load
# would fail. Cheaper than re-fetching, safer than touching
# attributes through a closed session.
provider_id = provider.id
provider_config = dict(provider.config)
provider_name = provider.name or provider.type
tracker_user_id = tracker.user_id
tracker_name = tracker.name or ""
collection_ids = list(tracker.collection_ids or [])
app_tz = await get_app_timezone(session)
link_data = await load_link_data(session, tracker_id)
if not link_data:
_LOGGER.info(
"Scheduled %s for tracker %d: no enabled links, skipping",
kind, tracker_id,
)
return
if kind == "periodic":
event = await _build_immich_periodic_event(
provider_config=provider_config,
provider_name=provider_name,
tracker_name=tracker_name,
collection_ids=collection_ids,
)
else:
event = await _build_immich_event(
provider_config=provider_config,
provider_name=provider_name,
tracker_name=tracker_name,
collection_ids=collection_ids,
test_type=kind,
tracking_config=default_tc,
)
if event is None:
_LOGGER.warning(
"Scheduled %s for tracker %d: provider returned no event",
kind, tracker_id,
)
return
# Skip empty payloads for asset-bearing kinds — sending the bare
# "On this day:" / "Scheduled delivery —" header with no items below
# spams chats with title-only messages every day. ``periodic`` is
# different: it's a stats summary that's still meaningful with zero
# assets, so we let it through.
if kind in ("scheduled", "memory") and not event.added_assets:
_LOGGER.info(
"Scheduled %s for tracker %d: 0 assets matched, skipping dispatch",
kind, tracker_id,
)
return
slot_name = _SLOT_MAP[kind]
target_configs: list[TargetConfig] = []
async with AsyncSession(engine) as session:
for ld in link_data:
tc = ld["tracking_config"] or default_tc
tmpl = ld["template_config"]
if tc is not None:
# Per-link override may disable this kind even when the
# default has it on — honour that here.
if not getattr(tc, f"{kind}_enabled", True):
continue
if not event_allowed_by_config(event, tc, app_tz):
continue
if tmpl is None:
continue
slot_rows = (await session.exec(
select(TemplateSlot).where(
TemplateSlot.config_id == tmpl.id,
TemplateSlot.slot_name == slot_name,
)
)).all()
if not slot_rows:
continue
locale_map = {s.locale: s.template for s in slot_rows}
template_slots = {EventType.SCHEDULED_MESSAGE.value: locale_map}
target_configs.append(TargetConfig(
type=ld["target_type"],
config=ld["target_config"],
template_slots=template_slots,
date_format=tmpl.date_format,
date_only_format=(
tmpl.date_only_format or "%d.%m.%Y"
),
provider_api_key=provider_config.get("api_key"),
provider_internal_url=provider_config.get("url", ""),
provider_external_url=provider_config.get("external_domain", ""),
receivers=ld["receivers"],
))
if not target_configs:
_LOGGER.info(
"Scheduled %s for tracker %d: no targets after filtering",
kind, tracker_id,
)
return
# Lazy import to break the watcher↔scheduler↔scheduled_dispatch cycle.
from .watcher import _get_telegram_caches
from .http_session import get_http_session
url_cache, asset_cache = await _get_telegram_caches()
http_session = await get_http_session()
dispatcher = NotificationDispatcher(
url_cache=url_cache, asset_cache=asset_cache, session=http_session,
)
_LOGGER.info(
"Dispatching scheduled %s for tracker %d to %d link(s)",
kind, tracker_id, len(target_configs),
)
results = await dispatcher.dispatch(event, target_configs)
# Mirror the watcher's audit trail: surface scheduled fires in EventLog so
# the dashboard shows *why* a notification arrived (otherwise these would
# be invisible to the activity feed).
successes = sum(1 for r in results if isinstance(r, dict) and r.get("success"))
async with AsyncSession(engine) as session:
session.add(EventLog(
user_id=tracker_user_id,
tracker_id=tracker_id,
tracker_name=tracker_name,
provider_id=provider_id,
provider_name=provider_name,
event_type=event.event_type.value,
collection_id=event.collection_id,
collection_name=event.collection_name,
assets_count=event.added_count or 0,
details={
"kind": kind,
"slot": slot_name,
"trigger": "cron",
"timezone": app_tz,
"targets_dispatched": len(target_configs),
"targets_succeeded": successes,
},
))
await session.commit()
@@ -111,6 +111,7 @@ async def start_scheduler() -> None:
await _load_tracker_jobs()
await _load_action_jobs()
await _load_immich_dispatch_jobs()
# Start Telegram bot polling for bots with active command listeners
from .telegram_poller import start_command_listener_polling
@@ -760,6 +761,10 @@ async def reschedule_cron_jobs_for_timezone_change() -> None:
"Rescheduled %d cron job(s) for new app timezone %s", rescheduled, tz.key,
)
# Immich scheduled/periodic/memory jobs are also CronTrigger-based and
# carry the same frozen-tz problem — rebuild them under the new tz.
await reschedule_immich_dispatch_jobs()
async def _run_action(action_id: int) -> None:
"""Run an action (called by APScheduler)."""
@@ -770,6 +775,155 @@ async def _run_action(action_id: int) -> None:
_LOGGER.error("Error running action %d: %s", action_id, e)
# ---------------------------------------------------------------------------
# Immich scheduled / periodic / memory dispatch (cron-fired)
#
# These three slots fire on wall-clock schedules taken from the tracker's
# default ``TrackingConfig`` (``scheduled_times``, ``periodic_times``,
# ``memory_times`` — comma-separated ``HH:MM`` strings) interpreted in the
# app-level IANA timezone. The dispatch flow lives in
# ``services.scheduled_dispatch``; this section just owns scheduling.
# ---------------------------------------------------------------------------
_IMMICH_DISPATCH_KINDS = ("scheduled", "periodic", "memory")
_IMMICH_DISPATCH_PREFIX = "immich_dispatch_"
def _parse_hhmm_list(raw: str) -> list[tuple[int, int]]:
"""Parse ``"09:00,18:30"`` → ``[(9, 0), (18, 30)]``, skipping bad entries.
A typo in one slot must not prevent the others from scheduling — we log
and move on rather than raising.
"""
out: list[tuple[int, int]] = []
for part in (raw or "").split(","):
part = part.strip()
if not part:
continue
try:
h_str, m_str = part.split(":", 1)
hour, minute = int(h_str), int(m_str)
except ValueError:
_LOGGER.warning("Skipping invalid time literal %r", part)
continue
if not (0 <= hour <= 23 and 0 <= minute <= 59):
_LOGGER.warning("Skipping out-of-range time %r", part)
continue
out.append((hour, minute))
return out
async def _run_immich_dispatch(tracker_id: int, kind: str) -> None:
"""APScheduler entry point — wraps the dispatch helper to swallow errors."""
from .scheduled_dispatch import dispatch_scheduled_for_tracker
try:
await dispatch_scheduled_for_tracker(tracker_id, kind) # type: ignore[arg-type]
except Exception as err: # noqa: BLE001
_LOGGER.error(
"Immich %s dispatch for tracker %d failed: %s", kind, tracker_id, err,
)
async def _load_immich_dispatch_jobs() -> None:
"""Schedule cron jobs for every (tracker, kind, time) where the kind is on.
Reads each enabled Immich tracker's *default* tracking config — per-link
overrides only gate dispatch (handled in ``scheduled_dispatch``), they do
not influence the fire schedule.
"""
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from apscheduler.triggers.cron import CronTrigger
from ..database.engine import get_engine
from ..database.models import (
NotificationTracker,
ServiceProvider as ServiceProviderModel,
TrackingConfig,
)
engine = get_engine()
scheduler = get_scheduler()
tz = await _load_app_timezone()
async with AsyncSession(engine) as session:
trackers = (await session.exec(
select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712
)).all()
if not trackers:
return
provider_ids = list({t.provider_id for t in trackers})
provider_types: dict[int, str] = {}
if provider_ids:
rows = await session.exec(
select(ServiceProviderModel).where(
ServiceProviderModel.id.in_(provider_ids)
)
)
provider_types = {p.id: p.type for p in rows.all()}
tc_ids = list({
t.default_tracking_config_id for t in trackers
if t.default_tracking_config_id
})
tc_map: dict[int, TrackingConfig] = {}
if tc_ids:
rows = await session.exec(
select(TrackingConfig).where(TrackingConfig.id.in_(tc_ids))
)
tc_map = {tc.id: tc for tc in rows.all()}
scheduled = 0
for tracker in trackers:
if provider_types.get(tracker.provider_id) != "immich":
continue
tc = tc_map.get(tracker.default_tracking_config_id) if tracker.default_tracking_config_id else None
if tc is None:
continue
for kind in _IMMICH_DISPATCH_KINDS:
if not getattr(tc, f"{kind}_enabled", False):
continue
times_raw = getattr(tc, f"{kind}_times", "") or ""
for hour, minute in _parse_hhmm_list(times_raw):
job_id = f"{_IMMICH_DISPATCH_PREFIX}{kind}_{tracker.id}_{hour:02d}{minute:02d}"
scheduler.add_job(
_run_immich_dispatch,
CronTrigger(hour=hour, minute=minute, timezone=tz),
id=job_id,
args=[tracker.id, kind],
replace_existing=True,
max_instances=1,
)
scheduled += 1
_LOGGER.info(
"Scheduled Immich %s for tracker %d at %02d:%02d [tz=%s]",
kind, tracker.id, hour, minute, tz.key,
)
if scheduled:
_LOGGER.info(
"Loaded %d Immich scheduled/periodic/memory job(s) [tz=%s]",
scheduled, tz.key,
)
async def reschedule_immich_dispatch_jobs() -> None:
"""Drop and rebuild all Immich scheduled/periodic/memory jobs.
Cheap to call on every relevant mutation — a typical install has only a
handful of trackers. Called from the tracker, link, and tracking-config
CRUD endpoints, and from ``reschedule_cron_jobs_for_timezone_change``.
"""
scheduler = get_scheduler()
for job in list(scheduler.get_jobs()):
if job.id.startswith(_IMMICH_DISPATCH_PREFIX):
scheduler.remove_job(job.id)
await _load_immich_dispatch_jobs()
# ---------------------------------------------------------------------------
# Scheduled backup
# ---------------------------------------------------------------------------