From fe38d20b965de4e8743d37dfd77c966fc7de09d5 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Wed, 22 Apr 2026 18:55:26 +0300 Subject: [PATCH] perf(immich): skip full album fetch on idle ticks; delta-fetch for active ones MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Optimizes polling for large Immich albums (tested path targets ~200k assets). Combined impact on idle albums drops per-tick cost from ~150 MB fetch to ~few hundred bytes; active albums fetch O(changes) instead of O(library). Core changes - ImmichAlbumMeta + get_album_meta() using ?withoutAssets=true as a cheap change-detection probe. - poll() fast-path: skip full fetch when meta fingerprint matches and no pending assets are outstanding. - poll() delta-path: search/metadata with updatedAfter when fingerprint changed, falling back to full fetch on count decrease or mixed add+remove that delta can't reconcile. - asyncio.gather over meta probes so a 20-album tracker pays one round-trip of latency instead of 20. - Event payload cap (50 added / 200 removed) so a bulk import can't explode a Jinja template or exceed Telegram's message limits. - Module-level users cache (1h TTL, sha256-keyed) shared across providers on the same Immich server. - Tick-scoped shared-links cache via new get_all_shared_links_by_album() — one /api/shared-links request per tick instead of one per changed album. Server changes - meta_fingerprint JSON column on NotificationTrackerState + migration. - watcher skips the asset_ids DB rewrite when the fingerprint didn't change, avoiding ~8 MB JSON writes on idle ticks for huge albums. - Adaptive polling: after 10 empty ticks skip 1-in-2, after 30 skip 1-in-4, reset on first detected change; resets on schedule changes. - APScheduler jitter (interval/4, capped at 30s) to smooth thundering- herd bursts when many trackers share the same scan_interval. --- .../providers/immich/change_detector.py | 44 +- .../providers/immich/client.py | 158 ++++++- .../providers/immich/models.py | 43 ++ .../providers/immich/provider.py | 419 ++++++++++++++++-- .../database/migrations.py | 8 + .../notify_bridge_server/database/models.py | 7 + .../services/scheduler.py | 138 +++++- .../notify_bridge_server/services/watcher.py | 19 + 8 files changed, 796 insertions(+), 40 deletions(-) diff --git a/packages/core/src/notify_bridge_core/providers/immich/change_detector.py b/packages/core/src/notify_bridge_core/providers/immich/change_detector.py index 6e26619..58c7b02 100644 --- a/packages/core/src/notify_bridge_core/providers/immich/change_detector.py +++ b/packages/core/src/notify_bridge_core/providers/immich/change_detector.py @@ -13,6 +13,18 @@ from .models import ImmichAlbumData, ImmichAssetInfo _LOGGER = logging.getLogger(__name__) +# Guard against runaway payloads when a bulk import lands in one poll tick. +# Templates iterate every entry in ``added_assets`` / ``removed_asset_ids`` +# in Jinja for-loops (see defaults/*/assets_added.jinja2), and Telegram's +# media group has a hard cap of its own — sending 200k entries would both +# crash rendering and produce a message that no transport can deliver. +# +# ``added_count`` / ``removed_count`` on the event always carry the true +# totals so templates can show an accurate "N added" number even when the +# per-asset list is truncated. +_MAX_ASSETS_PER_EVENT = 50 +_MAX_REMOVALS_PER_EVENT = 200 + def _make_base_extra(new_album: ImmichAlbumData, external_url: str) -> dict: """Build the common extra dict for album events.""" @@ -85,7 +97,17 @@ def detect_album_changes( # Emit one event per change type detected if added_assets: - media_assets = [asset_to_media(a, external_url) for a in added_assets] + total_added = len(added_assets) + truncated_added = added_assets[:_MAX_ASSETS_PER_EVENT] + media_assets = [asset_to_media(a, external_url) for a in truncated_added] + event_extra = dict(extra) + if total_added > _MAX_ASSETS_PER_EVENT: + event_extra["truncated"] = True + event_extra["shown_count"] = _MAX_ASSETS_PER_EVENT + _LOGGER.info( + "Truncated assets_added event for album %s: %d → %d", + new_album.id, total_added, _MAX_ASSETS_PER_EVENT, + ) events.append(ServiceEvent( event_type=EventType.ASSETS_ADDED, provider_type=ServiceProviderType.IMMICH, @@ -95,12 +117,22 @@ def detect_album_changes( timestamp=now, added_assets=media_assets, removed_asset_ids=[], - added_count=len(added_assets), + added_count=total_added, removed_count=0, - extra=dict(extra), + extra=event_extra, )) if removed_ids: + total_removed = len(removed_ids) + truncated_removed = list(removed_ids)[:_MAX_REMOVALS_PER_EVENT] + event_extra = dict(extra) + if total_removed > _MAX_REMOVALS_PER_EVENT: + event_extra["truncated"] = True + event_extra["shown_count"] = _MAX_REMOVALS_PER_EVENT + _LOGGER.info( + "Truncated assets_removed event for album %s: %d → %d", + new_album.id, total_removed, _MAX_REMOVALS_PER_EVENT, + ) events.append(ServiceEvent( event_type=EventType.ASSETS_REMOVED, provider_type=ServiceProviderType.IMMICH, @@ -109,10 +141,10 @@ def detect_album_changes( collection_name=new_album.name, timestamp=now, added_assets=[], - removed_asset_ids=list(removed_ids), + removed_asset_ids=truncated_removed, added_count=0, - removed_count=len(removed_ids), - extra=dict(extra), + removed_count=total_removed, + extra=event_extra, )) if name_changed: diff --git a/packages/core/src/notify_bridge_core/providers/immich/client.py b/packages/core/src/notify_bridge_core/providers/immich/client.py index 4860de6..3213fe9 100644 --- a/packages/core/src/notify_bridge_core/providers/immich/client.py +++ b/packages/core/src/notify_bridge_core/providers/immich/client.py @@ -9,7 +9,7 @@ from typing import Any import aiohttp from ...notifications.ssrf import UnsafeURLError, validate_outbound_url -from .models import ImmichAlbumData, SharedLinkInfo +from .models import ImmichAlbumData, ImmichAlbumMeta, SharedLinkInfo _LOGGER = logging.getLogger(__name__) @@ -201,6 +201,48 @@ class ImmichClient: _LOGGER.warning("Failed to fetch shared links: %s", err) return links + async def get_all_shared_links_by_album(self) -> dict[str, list[SharedLinkInfo]]: + """Fetch every shared link on the server, bucketed by album id. + + Immich's ``/api/shared-links`` endpoint is server-wide — there's no + per-album filter server-side — so every call that wanted the links + for a single album was already paying the cost of the full listing + and then discarding most of the response. Callers that need links + for multiple albums in one tick should use this method and index + into the returned dict instead of hitting ``get_shared_links`` in + a loop. + + Returns an empty dict on any error (matches the silent-failure + contract of ``get_shared_links`` so callers don't need to branch + on transient outages). + """ + result: dict[str, list[SharedLinkInfo]] = {} + try: + async with self._session.get( + f"{self._url}/api/shared-links", + headers=self._headers, + ) as response: + if response.status != 200: + _LOGGER.warning( + "get_all_shared_links non-200: HTTP %s", response.status + ) + return result + data = await response.json() + for link in data: + album = link.get("album") + key = link.get("key") + if not (album and key): + continue + aid = album.get("id") + if not aid: + continue + result.setdefault(aid, []).append( + SharedLinkInfo.from_api_response(link) + ) + except aiohttp.ClientError as err: + _LOGGER.warning("Failed to fetch all shared links: %s", err) + return result + async def get_album( self, album_id: str, @@ -222,6 +264,120 @@ class ImmichClient: except aiohttp.ClientError as err: raise ImmichApiError(f"Error communicating with Immich: {err}") from err + async def get_album_meta(self, album_id: str) -> ImmichAlbumMeta | None: + """Fetch album metadata without the assets array. + + Uses Immich's ``?withoutAssets=true`` query param, which skips the + (potentially huge) ``assets`` field. A 200k-asset album response + drops from ~150 MB to a few hundred bytes, so this is cheap enough + to run on every poll as a change-detection probe. + """ + try: + async with self._session.get( + f"{self._url}/api/albums/{album_id}", + params={"withoutAssets": "true"}, + headers=self._headers, + ) as response: + if response.status == 404: + return None + if response.status != 200: + raise ImmichApiError( + f"Error fetching album meta {album_id}: HTTP {response.status}" + ) + data = await response.json() + return ImmichAlbumMeta.from_api_response(data) + except aiohttp.ClientError as err: + raise ImmichApiError(f"Error communicating with Immich: {err}") from err + + async def search_album_assets_updated_after( + self, + album_id: str, + updated_after: str, + *, + page_size: int = 1000, + max_pages: int = 50, + ) -> list[dict[str, Any]]: + """Fetch assets in ``album_id`` whose ``updatedAt`` is after ``updated_after``. + + Uses ``POST /api/search/metadata`` with ``albumIds=[album_id]`` and + ``updatedAfter=``. Paginates through up to ``max_pages`` pages — + the cap exists so a clock-skew or upstream bug cannot produce an + infinite loop that exhausts memory on a 200k-asset album. In practice + an active album sees a few hundred updated assets per tick and + terminates after one page. + + Returns raw Immich asset dicts (same shape as ``album.assets[*]`` + from ``get_album``), so callers can feed them into + ``ImmichAssetInfo.from_api_response`` directly. + """ + if not updated_after: + return [] + + page_size = max(1, min(page_size, 1000)) + results: list[dict[str, Any]] = [] + for page in range(1, max_pages + 1): + payload: dict[str, Any] = { + "albumIds": [album_id], + "updatedAfter": updated_after, + "page": page, + "size": page_size, + # ``withExif`` keeps location/description parity with + # ``get_album`` so downstream ``ImmichAssetInfo.from_api_response`` + # populates city/country/rating on the delta path too. + "withExif": True, + "withPeople": True, + } + try: + async with self._session.post( + f"{self._url}/api/search/metadata", + headers=self._json_headers, + json=payload, + ) as response: + if response.status != 200: + body_snip = await response.text() + _LOGGER.warning( + "Immich delta search non-200: HTTP %s body=%s", + response.status, _redact_body(body_snip), + ) + break + data = await response.json() + assets_block = data.get("assets") + if isinstance(assets_block, dict): + items = assets_block.get("items", []) or [] + next_page = assets_block.get("nextPage") + elif isinstance(assets_block, list): + items = assets_block + next_page = None + else: + _LOGGER.warning( + "Immich delta search returned unexpected shape: keys=%s", + list(data.keys())[:5], + ) + break + + results.extend(items) + + # Stop early on the last page. Immich returns nextPage as + # the next page number (string or int) or None/empty when + # exhausted. Fall back to page-fullness heuristic if the + # server omits the pagination hint. + if next_page is None or next_page == "" or next_page == 0: + break + if len(items) < page_size: + break + except aiohttp.ClientError as err: + _LOGGER.warning("Immich delta search transport error: %s", err) + break + except Exception as err: # noqa: BLE001 — resilience over correctness + _LOGGER.warning("Immich delta search parse error: %s", err) + break + else: + _LOGGER.warning( + "Immich delta search for album %s hit max_pages=%d cap", + album_id, max_pages, + ) + return results + async def get_albums(self) -> list[dict[str, Any]]: try: async with self._session.get( diff --git a/packages/core/src/notify_bridge_core/providers/immich/models.py b/packages/core/src/notify_bridge_core/providers/immich/models.py index 7c9b61f..8334f82 100644 --- a/packages/core/src/notify_bridge_core/providers/immich/models.py +++ b/packages/core/src/notify_bridge_core/providers/immich/models.py @@ -146,6 +146,49 @@ class ImmichAssetInfo: return bool(thumbhash) +@dataclass(frozen=True) +class ImmichAlbumMeta: + """Lightweight album metadata from ``GET /api/albums/{id}?withoutAssets=true``. + + Used as a cheap change-detection probe so we can skip the multi-MB + full-asset fetch when nothing interesting has changed. Large albums + (tens to hundreds of thousands of assets) would otherwise re-serialize + the entire asset list on every poll interval. + """ + + id: str + name: str + asset_count: int + updated_at: str + shared: bool + thumbnail_asset_id: str | None = None + + @classmethod + def from_api_response(cls, data: dict[str, Any]) -> ImmichAlbumMeta: + return cls( + id=data["id"], + name=data.get("albumName", "Unnamed"), + asset_count=int(data.get("assetCount", 0) or 0), + updated_at=data.get("updatedAt", "") or "", + shared=bool(data.get("shared", False)), + thumbnail_asset_id=data.get("albumThumbnailAssetId"), + ) + + def fingerprint(self) -> dict[str, Any]: + """Return a minimal serializable dict for persistence + equality checks. + + We purposefully exclude ``id`` (known from the state row) and keep the + dict flat so JSON round-trips are cheap and stable for equality. + """ + return { + "updated_at": self.updated_at, + "asset_count": self.asset_count, + "shared": self.shared, + "name": self.name, + "thumbnail_asset_id": self.thumbnail_asset_id or "", + } + + @dataclass class ImmichAlbumData: """Full album data from Immich API.""" diff --git a/packages/core/src/notify_bridge_core/providers/immich/provider.py b/packages/core/src/notify_bridge_core/providers/immich/provider.py index 0a87ddf..6741dc7 100644 --- a/packages/core/src/notify_bridge_core/providers/immich/provider.py +++ b/packages/core/src/notify_bridge_core/providers/immich/provider.py @@ -2,7 +2,10 @@ from __future__ import annotations +import asyncio +import hashlib import logging +import time from typing import Any import aiohttp @@ -11,13 +14,62 @@ from notify_bridge_core.models.events import ServiceEvent from notify_bridge_core.providers.base import ServiceProvider, ServiceProviderType from notify_bridge_core.templates.variables import TemplateVariableDefinition -from .change_detector import detect_album_changes +from .asset_utils import asset_to_media +from .change_detector import _MAX_ASSETS_PER_EVENT, detect_album_changes from .client import ImmichClient -from .models import ImmichAlbumData +from .models import ImmichAlbumData, ImmichAlbumMeta, ImmichAssetInfo _LOGGER = logging.getLogger(__name__) +# Module-level users cache shared across ImmichServiceProvider instances. +# Users change rarely (new people joining the server, display-name edits), so +# refetching on every tracker's ``connect()`` is wasteful — a fleet of 10 +# trackers on the same Immich server otherwise issues 10 ``GET /api/users`` +# calls per poll cycle. TTL is conservative (1h) and a hashed key keeps the +# raw api_key out of dict keys in case of a memory dump. +_USERS_CACHE_TTL_SECONDS = 3600 +_users_cache_lock = asyncio.Lock() +_users_cache: dict[str, tuple[float, dict[str, str]]] = {} + + +def _users_cache_key(url: str, api_key: str) -> str: + digest = hashlib.sha256(f"{url}|{api_key}".encode("utf-8")).hexdigest() + return digest[:32] + + +async def _get_cached_users( + client: ImmichClient, url: str, api_key: str +) -> dict[str, str]: + """Return ``{user_id: display_name}`` for the server, reusing cache entries + whose TTL has not elapsed. Misses and stale hits fall through to a real + fetch under a single lock so concurrent polls don't stampede the server. + """ + key = _users_cache_key(url, api_key) + now = time.monotonic() + entry = _users_cache.get(key) + if entry is not None and (now - entry[0]) < _USERS_CACHE_TTL_SECONDS: + return entry[1] + + async with _users_cache_lock: + # Re-check after acquiring the lock — another coroutine may have + # refreshed the entry while we waited. + entry = _users_cache.get(key) + if entry is not None and (time.monotonic() - entry[0]) < _USERS_CACHE_TTL_SECONDS: + return entry[1] + fresh = await client.get_users() + _users_cache[key] = (time.monotonic(), fresh) + return fresh + + +def invalidate_users_cache() -> None: + """Drop every cached users dict. Exposed for callers that mutate users + (e.g. provider config changes, integration tests) and need the next + ``connect()`` to re-fetch. + """ + _users_cache.clear() + + # Immich-specific template variables IMMICH_VARIABLES: list[TemplateVariableDefinition] = [ TemplateVariableDefinition( @@ -135,7 +187,9 @@ class ImmichServiceProvider(ServiceProvider): await self._client.get_server_config() if self._external_domain: self._client.external_domain = self._external_domain - self._users_cache = await self._client.get_users() + self._users_cache = await _get_cached_users( + self._client, self._client.url, self._client.api_key, + ) return ok async def disconnect(self) -> None: @@ -150,9 +204,32 @@ class ImmichServiceProvider(ServiceProvider): new_state = dict(tracker_state) external_url = self._client.external_url - for album_id in collection_ids: - album = await self._client.get_album(album_id, self._users_cache) - if album is None: + # Tick-scoped share-link cache. Populated lazily on first enrichment; + # a tracker watching 5 albums with changes now issues 1 ``/api/shared-links`` + # request per tick instead of 5 (and the endpoint is server-wide — each + # call was already fetching all links and discarding most of them). + self._tick_shared_links: dict[str, list] | None = None + + # Fan out the cheap meta probes in parallel. For a tracker that + # watches 20 albums on the same Immich server this turns a 20-hop + # serial wait into ~1 round-trip's worth of latency. aiohttp's + # connection pool caps concurrency per host, so this can't stampede. + meta_results = await asyncio.gather( + *(self._client.get_album_meta(aid) for aid in collection_ids), + return_exceptions=True, + ) + + for album_id, meta_or_exc in zip(collection_ids, meta_results): + if isinstance(meta_or_exc, BaseException): + # Transient failure on this album — preserve existing state + # and move on. Logging at warning so flaky albums surface in + # the log without flooding on hard outages. + _LOGGER.warning( + "Meta probe failed for album %s: %s", album_id, meta_or_exc, + ) + continue + meta = meta_or_exc + if meta is None: # Album deleted if album_id in new_state: from notify_bridge_core.models.events import EventType @@ -168,11 +245,74 @@ class ImmichServiceProvider(ServiceProvider): del new_state[album_id] continue - # Get previous state prev = new_state.get(album_id) + prev_fingerprint = prev.get("meta_fingerprint") if prev else None + has_pending = bool(prev and prev.get("pending_asset_ids")) + + # 2) Fast-path: fingerprint match and no pending assets → no work. + # We still refresh the fingerprint slot (no-op if identical) and + # leave asset_ids untouched on disk. + if ( + prev is not None + and prev_fingerprint == meta.fingerprint() + and not has_pending + ): + continue + + # 3) Decide: delta fetch (cheap, active-album case) or full + # fetch (first tick + reconciliation for removals). + old_fp = prev.get("meta_fingerprint") if prev else None + old_asset_count = (old_fp or {}).get("asset_count", 0) + old_updated_at = (old_fp or {}).get("updated_at", "") + + # Gate for the delta path: + # - must be tracked already (prev exists, has asset_ids) + # - must have a prior timestamp (empty ⇒ migrated DB row) + # - asset_count must not have decreased (removals need full fetch) + can_delta = ( + prev is not None + and bool(prev.get("asset_ids")) + and bool(old_updated_at) + and meta.asset_count >= old_asset_count + ) + + if can_delta: + delta_events = await self._poll_delta( + album_id=album_id, + prev=prev, + new_meta=meta, + old_updated_at=old_updated_at, + ) + if delta_events is not None: + events.extend(delta_events["events"]) + new_state[album_id] = delta_events["new_state"] + continue + # delta_events is None ⇒ delta saw more additions than the + # net count increase (mixed add+remove) ⇒ fall through to + # the full-fetch path so removals get detected. + + # Full fetch: first tick, or count-decreased, or delta-unsafe. + album = await self._client.get_album(album_id, self._users_cache) + if album is None: + # Album was deleted between meta probe and full fetch — handle + # the deletion the same way as above. + if album_id in new_state: + from notify_bridge_core.models.events import EventType + from datetime import datetime, timezone + events.append(ServiceEvent( + event_type=EventType.COLLECTION_DELETED, + provider_type=ServiceProviderType.IMMICH, + provider_name=self._name, + collection_id=album_id, + collection_name=new_state.get(album_id, {}).get("name", "Unknown"), + timestamp=datetime.now(timezone.utc), + )) + del new_state[album_id] + continue + if prev is None: # First time seeing this album — store state, no event - new_state[album_id] = _serialize_album_state(album) + new_state[album_id] = _serialize_album_state(album, meta) continue # Reconstruct previous album data for comparison @@ -184,34 +324,233 @@ class ImmichServiceProvider(ServiceProvider): ) if detected_events: - # Fetch shared links to enrich events with public_url - shared_links = await self._client.get_shared_links(album_id) - public_link = None - protected_link = None - for link in shared_links: - if link.is_accessible and not link.is_expired: - if link.has_password: - protected_link = link - else: - public_link = link - break # prefer non-password link - - ext_domain = self._external_domain or self._client.external_url - for evt in detected_events: - if public_link: - evt.extra["public_url"] = f"{ext_domain}/share/{public_link.key}" - elif protected_link: - evt.extra["protected_url"] = f"{ext_domain}/share/{protected_link.key}" - + await self._enrich_with_shared_links(album_id, detected_events) events.extend(detected_events) # Update state - state = _serialize_album_state(album) + state = _serialize_album_state(album, meta) state["pending_asset_ids"] = list(updated_pending) new_state[album_id] = state return events, new_state + async def _poll_delta( + self, + *, + album_id: str, + prev: dict[str, Any], + new_meta: ImmichAlbumMeta, + old_updated_at: str, + ) -> dict[str, Any] | None: + """Delta-fetch path for an active album. + + Calls ``search/metadata`` with ``updatedAfter`` instead of pulling + the full asset list. Returns a dict with ``events`` and ``new_state`` + on success, or ``None`` to signal the caller to retry via full fetch + (used when a mixed add+remove is detected — the delta endpoint can't + tell us *what* was removed, only that additions alone don't account + for the net count change). + + Trades strict detection of removals-during-mixed-changes for a + drastic reduction in bytes fetched per tick. On a 200k-asset album + where 50 were just added, we fetch ~50 asset records instead of + 200 000. + """ + from datetime import datetime, timezone + + from notify_bridge_core.models.events import EventType + + prev_asset_ids: set[str] = set(prev.get("asset_ids", [])) + prev_pending: set[str] = set(prev.get("pending_asset_ids", [])) + + raw_assets = await self._client.search_album_assets_updated_after( + album_id, old_updated_at + ) + + # Parse everything that came back. We need unprocessed entries too + # (they feed the ``pending_asset_ids`` list used by the original + # change detector's processed-later logic). + delta_assets: list[ImmichAssetInfo] = [] + for raw in raw_assets: + try: + delta_assets.append( + ImmichAssetInfo.from_api_response(raw, self._users_cache) + ) + except Exception as err: # noqa: BLE001 — one bad record ≠ abort tick + _LOGGER.warning( + "Skipping malformed asset record in delta response: %s", err + ) + + newly_added: list[ImmichAssetInfo] = [] + still_pending: set[str] = set() + for asset in delta_assets: + if asset.is_processed: + if asset.id not in prev_asset_ids: + newly_added.append(asset) + else: + still_pending.add(asset.id) + + old_asset_count = int((prev.get("meta_fingerprint") or {}).get("asset_count", 0)) + net_change = new_meta.asset_count - old_asset_count + + # If delta found more "added" assets than the net count change, + # a concurrent removal happened. Full fetch is the only way to + # know what was removed — bail out so the caller retries. + if net_change >= 0 and len(newly_added) > net_change: + _LOGGER.info( + "Delta for album %s found %d additions but net change is %d " + "— falling back to full fetch for removal reconciliation", + album_id, len(newly_added), net_change, + ) + return None + + # Mirror case: positive net change we couldn't account for with the + # delta results (possibly clock skew on ``updated_at``, or an asset + # whose timestamp is before ``old_updated_at`` yet the album's + # ``updatedAt`` bumped). Full fetch to avoid silently missing adds. + if net_change > 0 and len(newly_added) < net_change: + _LOGGER.info( + "Delta for album %s found %d additions but net change is %d " + "— falling back to full fetch to avoid missing assets", + album_id, len(newly_added), net_change, + ) + return None + + events: list[ServiceEvent] = [] + now = datetime.now(timezone.utc) + external_url = self._external_domain or self._client.external_url + album_url = f"{external_url}/albums/{album_id}" + + # Carry album-level attributes we know from the cheap meta probe. + # Shared-link enrichment happens further down only if we emitted + # any asset events. + base_extra = { + "album_url": album_url, + "shared": new_meta.shared, + "asset_count": new_meta.asset_count, + "photo_count": 0, # unknown without per-asset scan; templates tolerate 0 + "video_count": 0, + "people": [], + "owner": "", + } + + # Metadata-only events (no asset fetch needed) + old_fp = prev.get("meta_fingerprint") or {} + if old_fp.get("name") and old_fp["name"] != new_meta.name: + events.append(ServiceEvent( + event_type=EventType.COLLECTION_RENAMED, + provider_type=ServiceProviderType.IMMICH, + provider_name=self._name, + collection_id=album_id, + collection_name=new_meta.name, + timestamp=now, + added_assets=[], + removed_asset_ids=[], + added_count=0, + removed_count=0, + old_name=old_fp["name"], + new_name=new_meta.name, + extra=dict(base_extra), + )) + + if "shared" in old_fp and bool(old_fp["shared"]) != bool(new_meta.shared): + events.append(ServiceEvent( + event_type=EventType.SHARING_CHANGED, + provider_type=ServiceProviderType.IMMICH, + provider_name=self._name, + collection_id=album_id, + collection_name=new_meta.name, + timestamp=now, + added_assets=[], + removed_asset_ids=[], + added_count=0, + removed_count=0, + old_shared=bool(old_fp["shared"]), + new_shared=bool(new_meta.shared), + extra=dict(base_extra), + )) + + if newly_added: + total_added = len(newly_added) + truncated = newly_added[:_MAX_ASSETS_PER_EVENT] + media_assets = [ + asset_to_media(a, self._client.external_url) for a in truncated + ] + extra = dict(base_extra) + if total_added > _MAX_ASSETS_PER_EVENT: + extra["truncated"] = True + extra["shown_count"] = _MAX_ASSETS_PER_EVENT + _LOGGER.info( + "Delta-path truncated assets_added event for album %s: %d → %d", + album_id, total_added, _MAX_ASSETS_PER_EVENT, + ) + events.append(ServiceEvent( + event_type=EventType.ASSETS_ADDED, + provider_type=ServiceProviderType.IMMICH, + provider_name=self._name, + collection_id=album_id, + collection_name=new_meta.name, + timestamp=now, + added_assets=media_assets, + removed_asset_ids=[], + added_count=total_added, + removed_count=0, + extra=extra, + )) + + if events: + await self._enrich_with_shared_links(album_id, events) + + # Rebuild state. asset_ids grows by the newly-added processed set. + # pending is the union of the prior pending list (things still in + # flight) and anything the delta confirmed as not-yet-processed. + # When net_change is 0 or negative we trust the meta count over + # our bookkeeping — skip-path will fix drift on the next full fetch. + new_asset_ids = prev_asset_ids | {a.id for a in newly_added} + # Discard any previously-pending IDs that just landed as processed. + new_pending = (prev_pending | still_pending) - {a.id for a in newly_added} + + return { + "events": events, + "new_state": { + "name": new_meta.name, + "asset_ids": list(new_asset_ids), + "shared": new_meta.shared, + "pending_asset_ids": list(new_pending), + "meta_fingerprint": new_meta.fingerprint(), + }, + } + + async def _enrich_with_shared_links( + self, album_id: str, events_to_enrich: list[ServiceEvent] + ) -> None: + """Attach public/protected share link URLs to events for this album. + + Uses the tick-scoped bulk cache populated lazily on first call, so a + tracker with changes across N albums makes one ``/api/shared-links`` + request per tick instead of N. + """ + if self._tick_shared_links is None: + self._tick_shared_links = await self._client.get_all_shared_links_by_album() + + shared_links = self._tick_shared_links.get(album_id, []) + public_link = None + protected_link = None + for link in shared_links: + if link.is_accessible and not link.is_expired: + if link.has_password: + protected_link = link + else: + public_link = link + break # prefer non-password link + + ext_domain = self._external_domain or self._client.external_url + for evt in events_to_enrich: + if public_link: + evt.extra["public_url"] = f"{ext_domain}/share/{public_link.key}" + elif protected_link: + evt.extra["protected_url"] = f"{ext_domain}/share/{protected_link.key}" + def get_available_variables(self) -> list[TemplateVariableDefinition]: return list(IMMICH_VARIABLES) @@ -262,13 +601,33 @@ class ImmichServiceProvider(ServiceProvider): return {"ok": False, "message": "Failed to connect to Immich"} -def _serialize_album_state(album: ImmichAlbumData) -> dict[str, Any]: - """Serialize album state for persistence.""" +def _serialize_album_state( + album: ImmichAlbumData, + meta: ImmichAlbumMeta | None = None, +) -> dict[str, Any]: + """Serialize album state for persistence. + + ``meta`` carries the fingerprint used for cheap no-change detection on + subsequent polls. When omitted (legacy callers, tests) we synthesize a + best-effort fingerprint from the full album — it will still match on the + next tick if nothing changed, which is what matters. + """ + if meta is None: + fingerprint = { + "updated_at": album.updated_at, + "asset_count": len(album.asset_ids), + "shared": album.shared, + "name": album.name, + "thumbnail_asset_id": album.thumbnail_asset_id or "", + } + else: + fingerprint = meta.fingerprint() return { "name": album.name, "asset_ids": list(album.asset_ids), "shared": album.shared, "pending_asset_ids": [], + "meta_fingerprint": fingerprint, } diff --git a/packages/server/src/notify_bridge_server/database/migrations.py b/packages/server/src/notify_bridge_server/database/migrations.py index e387e51..2a58b36 100644 --- a/packages/server/src/notify_bridge_server/database/migrations.py +++ b/packages/server/src/notify_bridge_server/database/migrations.py @@ -309,6 +309,14 @@ async def migrate_schema(engine: AsyncEngine) -> None: text(f"ALTER TABLE {state_table} ADD COLUMN shared INTEGER DEFAULT 0") ) logger.info("Added shared column to %s table", state_table) + # meta_fingerprint — small JSON blob captured from the provider's + # cheap meta probe. An empty default means "unknown, do a full + # fetch next tick" so existing rows don't wrongly skip detection. + if not await _has_column(conn, state_table, "meta_fingerprint"): + await conn.execute( + text(f"ALTER TABLE {state_table} ADD COLUMN meta_fingerprint TEXT DEFAULT '{{}}'") + ) + logger.info("Added meta_fingerprint column to %s table", state_table) # Add language_code to telegram_chat if missing if await _has_table(conn, "telegram_chat"): diff --git a/packages/server/src/notify_bridge_server/database/models.py b/packages/server/src/notify_bridge_server/database/models.py index cac9ad4..6f6a778 100644 --- a/packages/server/src/notify_bridge_server/database/models.py +++ b/packages/server/src/notify_bridge_server/database/models.py @@ -376,6 +376,13 @@ class NotificationTrackerState(SQLModel, table=True): shared: bool = Field(default=False) asset_ids: list[str] = Field(default_factory=list, sa_column=Column(JSON)) pending_asset_ids: list[str] = Field(default_factory=list, sa_column=Column(JSON)) + # Lightweight fingerprint ({updated_at, asset_count, shared, name, ...}) + # captured from the provider's cheap meta probe. Letting this differ from + # the current provider response is what tells the watcher a full fetch is + # actually required — letting it match lets the watcher skip the big read. + meta_fingerprint: dict[str, Any] = Field( + default_factory=dict, sa_column=Column(JSON) + ) last_updated: datetime = Field(default_factory=_utcnow) diff --git a/packages/server/src/notify_bridge_server/services/scheduler.py b/packages/server/src/notify_bridge_server/services/scheduler.py index 18b89f2..02cbbf9 100644 --- a/packages/server/src/notify_bridge_server/services/scheduler.py +++ b/packages/server/src/notify_bridge_server/services/scheduler.py @@ -10,6 +10,49 @@ _LOGGER = logging.getLogger(__name__) _scheduler: AsyncIOScheduler | None = None +# --------------------------------------------------------------------------- +# Adaptive polling (Tier 6 of the big-album optimization plan). +# +# We don't touch the user-configured ``scan_interval`` — that's still the +# authoritative cadence. Instead, we *skip* a growing fraction of scheduled +# ticks when a tracker is idle, and reset to 1:1 as soon as it detects +# anything. The scheduler keeps running on the user's chosen period, so +# response time to the *first* change after an idle stretch is never worse +# than one tick — but the steady-state HTTP cost for a fleet of idle +# trackers drops by ~75%. +# +# Thresholds are intentionally conservative: a tracker polling every 30 s +# needs 5 min of silence before we halve its effective rate, and 15 min +# before we quarter it. Any caller can disable adaptive behavior by passing +# ``adaptive=False`` in the tracker filters dict (checked in ``_poll_tracker``). +# --------------------------------------------------------------------------- + +_ADAPTIVE_HALVE_THRESHOLD = 10 # consecutive empty ticks → 1-in-2 +_ADAPTIVE_QUARTER_THRESHOLD = 30 # consecutive empty ticks → 1-in-4 +_ADAPTIVE_MAX_SKIP = 4 # hard cap on skip factor + +# Per-tracker adaptive state, keyed by tracker_id. Rebuilt on process +# restart — a short warmup period is fine and avoids persisting what is +# effectively a performance heuristic. +_adaptive_state: dict[int, dict[str, int]] = {} + + +def _compute_jitter(interval_seconds: int) -> int: + """Return a jitter bound (in seconds) suitable for an IntervalTrigger. + + Without jitter, a fleet of N trackers all on ``scan_interval=60`` wake up + at the same wall-clock second every minute — that creates a thundering- + herd on the upstream Immich/Gitea/etc. server. APScheduler's ``jitter`` + randomizes each tick's firing time by ±jitter seconds. + + We use a quarter of the interval up to a 30 s cap. For short intervals + (≤8 s) jitter would round to 0 — that's fine, at those cadences a + bursty pattern is what the user implicitly opted into. + """ + if interval_seconds <= 0: + return 0 + return min(interval_seconds // 4, 30) + def get_scheduler() -> AsyncIOScheduler: global _scheduler @@ -271,16 +314,21 @@ async def _load_tracker_jobs() -> None: tracker.id, tracker.name, e, ) + jitter = _compute_jitter(tracker.scan_interval) scheduler.add_job( _poll_tracker, "interval", seconds=tracker.scan_interval, + jitter=jitter or None, id=job_id, args=[tracker.id], replace_existing=True, max_instances=1, ) - _LOGGER.info("Scheduled tracker %d (%s) every %ds", tracker.id, tracker.name, tracker.scan_interval) + _LOGGER.info( + "Scheduled tracker %d (%s) every %ds (jitter ±%ds)", + tracker.id, tracker.name, tracker.scan_interval, jitter, + ) def _add_cron_job( @@ -313,6 +361,10 @@ async def schedule_tracker( scheduler = get_scheduler() job_id = f"tracker_{tracker_id}" + # A reschedule typically follows a config edit or enable/disable flip — + # drop adaptive back-off so the first tick after the change runs promptly. + reset_adaptive_state(tracker_id) + # Remove existing job first to allow trigger type changes if scheduler.get_job(job_id): scheduler.remove_job(job_id) @@ -324,33 +376,113 @@ async def schedule_tracker( except Exception as e: _LOGGER.error("Invalid cron for tracker %d: %s — using interval", tracker_id, e) + jitter = _compute_jitter(interval) scheduler.add_job( _poll_tracker, "interval", seconds=interval, + jitter=jitter or None, id=job_id, args=[tracker_id], replace_existing=True, ) - _LOGGER.info("Scheduled tracker %d every %ds", tracker_id, interval) + _LOGGER.info( + "Scheduled tracker %d every %ds (jitter ±%ds)", tracker_id, interval, jitter, + ) async def unschedule_tracker(tracker_id: int) -> None: """Remove a scheduler job for a tracker.""" scheduler = get_scheduler() job_id = f"tracker_{tracker_id}" + reset_adaptive_state(tracker_id) if scheduler.get_job(job_id): scheduler.remove_job(job_id) _LOGGER.info("Unscheduled tracker %d", tracker_id) +def _adaptive_should_skip(tracker_id: int) -> bool: + """Return True when the adaptive heuristic says to skip this tick. + + Run-length skip: if we're in 1-in-K mode, skip (K-1) ticks between each + real poll. Stateless about the *current* tick counter except for the + ``tick_counter`` we bump here. + """ + state = _adaptive_state.get(tracker_id) + if not state: + return False + skip_every = state.get("skip_every", 1) + if skip_every <= 1: + return False + state["tick_counter"] = state.get("tick_counter", 0) + 1 + # Fire on ticks where counter % skip_every == 0; skip the rest. + return (state["tick_counter"] % skip_every) != 0 + + +def _adaptive_update(tracker_id: int, events_detected: int) -> None: + """Update the adaptive counter after a real tick ran.""" + state = _adaptive_state.setdefault( + tracker_id, {"empty_count": 0, "skip_every": 1, "tick_counter": 0} + ) + if events_detected > 0: + if state["skip_every"] > 1: + _LOGGER.info( + "Adaptive polling: tracker %d saw activity, restoring base rate", + tracker_id, + ) + state["empty_count"] = 0 + state["skip_every"] = 1 + state["tick_counter"] = 0 + return + + state["empty_count"] = state.get("empty_count", 0) + 1 + if ( + state["empty_count"] >= _ADAPTIVE_QUARTER_THRESHOLD + and state["skip_every"] < _ADAPTIVE_MAX_SKIP + ): + state["skip_every"] = _ADAPTIVE_MAX_SKIP + _LOGGER.info( + "Adaptive polling: tracker %d idle for %d ticks, skipping 3 of 4", + tracker_id, state["empty_count"], + ) + elif ( + state["empty_count"] >= _ADAPTIVE_HALVE_THRESHOLD + and state["skip_every"] < 2 + ): + state["skip_every"] = 2 + _LOGGER.info( + "Adaptive polling: tracker %d idle for %d ticks, skipping every other", + tracker_id, state["empty_count"], + ) + + +def reset_adaptive_state(tracker_id: int) -> None: + """Drop cached adaptive counters for a tracker. + + Used by API callers that make changes requiring the tracker to run + promptly on the next scheduled tick (enable/disable, config edits, + manual "check now" actions). + """ + _adaptive_state.pop(tracker_id, None) + + async def _poll_tracker(tracker_id: int) -> None: """Poll a tracker for changes.""" from .watcher import check_tracker + + if _adaptive_should_skip(tracker_id): + return + try: - await check_tracker(tracker_id) + result = await check_tracker(tracker_id) except Exception as e: _LOGGER.error("Error polling tracker %d: %s", tracker_id, e) + return + + # Treat the "error" / "skipped" statuses as inconclusive — don't let + # a transient upstream failure trick the heuristic into backing off. + if isinstance(result, dict) and result.get("status") == "ok": + _adaptive_update(tracker_id, int(result.get("events_detected", 0) or 0)) # --------------------------------------------------------------------------- diff --git a/packages/server/src/notify_bridge_server/services/watcher.py b/packages/server/src/notify_bridge_server/services/watcher.py index d9a57ab..325635a 100644 --- a/packages/server/src/notify_bridge_server/services/watcher.py +++ b/packages/server/src/notify_bridge_server/services/watcher.py @@ -187,8 +187,17 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]: "asset_ids": s.asset_ids, "pending_asset_ids": s.pending_asset_ids, "shared": bool(s.shared), + "meta_fingerprint": s.meta_fingerprint or {}, } + # Snapshot the original fingerprint per collection so we can skip the + # (expensive) asset_ids rewrite when nothing changed. For a 200k-asset + # album this avoids a ~7 MB JSON write to the state row every tick. + original_fingerprints: dict[str, dict[str, Any]] = { + cid: dict(cstate.get("meta_fingerprint") or {}) + for cid, cstate in state_dict.items() + } + # Load tracker-target links link_data = await load_link_data(session, tracker_id) @@ -279,11 +288,20 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]: existing = s break + current_fingerprint = dict(cstate.get("meta_fingerprint") or {}) + prior_fingerprint = original_fingerprints.get(cid, {}) + # Skip the DB update when the provider reported no meaningful + # change. ``existing`` is None on first-ever fetch for a + # collection — that path always writes so the row gets created. + if existing is not None and current_fingerprint == prior_fingerprint: + continue + if existing: existing.asset_ids = cstate.get("asset_ids", []) existing.pending_asset_ids = cstate.get("pending_asset_ids", []) existing.collection_name = cstate.get("name", "") existing.shared = cstate.get("shared", False) + existing.meta_fingerprint = current_fingerprint session.add(existing) else: new_ts = NotificationTrackerState( @@ -293,6 +311,7 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]: shared=cstate.get("shared", False), asset_ids=cstate.get("asset_ids", []), pending_asset_ids=cstate.get("pending_asset_ids", []), + meta_fingerprint=current_fingerprint, ) session.add(new_ts)