perf(immich): skip full album fetch on idle ticks; delta-fetch for active ones

Optimizes polling for large Immich albums (tested path targets ~200k
assets). Combined impact on idle albums drops per-tick cost from ~150 MB
fetch to ~few hundred bytes; active albums fetch O(changes) instead of
O(library).

Core changes
- ImmichAlbumMeta + get_album_meta() using ?withoutAssets=true as a
  cheap change-detection probe.
- poll() fast-path: skip full fetch when meta fingerprint matches and
  no pending assets are outstanding.
- poll() delta-path: search/metadata with updatedAfter when fingerprint
  changed, falling back to full fetch on count decrease or mixed
  add+remove that delta can't reconcile.
- asyncio.gather over meta probes so a 20-album tracker pays one
  round-trip of latency instead of 20.
- Event payload cap (50 added / 200 removed) so a bulk import can't
  explode a Jinja template or exceed Telegram's message limits.
- Module-level users cache (1h TTL, sha256-keyed) shared across
  providers on the same Immich server.
- Tick-scoped shared-links cache via new
  get_all_shared_links_by_album() — one /api/shared-links request per
  tick instead of one per changed album.

Server changes
- meta_fingerprint JSON column on NotificationTrackerState + migration.
- watcher skips the asset_ids DB rewrite when the fingerprint didn't
  change, avoiding ~8 MB JSON writes on idle ticks for huge albums.
- Adaptive polling: after 10 empty ticks skip 1-in-2, after 30 skip
  1-in-4, reset on first detected change; resets on schedule changes.
- APScheduler jitter (interval/4, capped at 30s) to smooth thundering-
  herd bursts when many trackers share the same scan_interval.
This commit is contained in:
2026-04-22 18:55:26 +03:00
parent d02616069d
commit fe38d20b96
8 changed files with 796 additions and 40 deletions
@@ -13,6 +13,18 @@ from .models import ImmichAlbumData, ImmichAssetInfo
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
# Guard against runaway payloads when a bulk import lands in one poll tick.
# Templates iterate every entry in ``added_assets`` / ``removed_asset_ids``
# in Jinja for-loops (see defaults/*/assets_added.jinja2), and Telegram's
# media group has a hard cap of its own — sending 200k entries would both
# crash rendering and produce a message that no transport can deliver.
#
# ``added_count`` / ``removed_count`` on the event always carry the true
# totals so templates can show an accurate "N added" number even when the
# per-asset list is truncated.
_MAX_ASSETS_PER_EVENT = 50
_MAX_REMOVALS_PER_EVENT = 200
def _make_base_extra(new_album: ImmichAlbumData, external_url: str) -> dict: def _make_base_extra(new_album: ImmichAlbumData, external_url: str) -> dict:
"""Build the common extra dict for album events.""" """Build the common extra dict for album events."""
@@ -85,7 +97,17 @@ def detect_album_changes(
# Emit one event per change type detected # Emit one event per change type detected
if added_assets: if added_assets:
media_assets = [asset_to_media(a, external_url) for a in added_assets] total_added = len(added_assets)
truncated_added = added_assets[:_MAX_ASSETS_PER_EVENT]
media_assets = [asset_to_media(a, external_url) for a in truncated_added]
event_extra = dict(extra)
if total_added > _MAX_ASSETS_PER_EVENT:
event_extra["truncated"] = True
event_extra["shown_count"] = _MAX_ASSETS_PER_EVENT
_LOGGER.info(
"Truncated assets_added event for album %s: %d%d",
new_album.id, total_added, _MAX_ASSETS_PER_EVENT,
)
events.append(ServiceEvent( events.append(ServiceEvent(
event_type=EventType.ASSETS_ADDED, event_type=EventType.ASSETS_ADDED,
provider_type=ServiceProviderType.IMMICH, provider_type=ServiceProviderType.IMMICH,
@@ -95,12 +117,22 @@ def detect_album_changes(
timestamp=now, timestamp=now,
added_assets=media_assets, added_assets=media_assets,
removed_asset_ids=[], removed_asset_ids=[],
added_count=len(added_assets), added_count=total_added,
removed_count=0, removed_count=0,
extra=dict(extra), extra=event_extra,
)) ))
if removed_ids: if removed_ids:
total_removed = len(removed_ids)
truncated_removed = list(removed_ids)[:_MAX_REMOVALS_PER_EVENT]
event_extra = dict(extra)
if total_removed > _MAX_REMOVALS_PER_EVENT:
event_extra["truncated"] = True
event_extra["shown_count"] = _MAX_REMOVALS_PER_EVENT
_LOGGER.info(
"Truncated assets_removed event for album %s: %d%d",
new_album.id, total_removed, _MAX_REMOVALS_PER_EVENT,
)
events.append(ServiceEvent( events.append(ServiceEvent(
event_type=EventType.ASSETS_REMOVED, event_type=EventType.ASSETS_REMOVED,
provider_type=ServiceProviderType.IMMICH, provider_type=ServiceProviderType.IMMICH,
@@ -109,10 +141,10 @@ def detect_album_changes(
collection_name=new_album.name, collection_name=new_album.name,
timestamp=now, timestamp=now,
added_assets=[], added_assets=[],
removed_asset_ids=list(removed_ids), removed_asset_ids=truncated_removed,
added_count=0, added_count=0,
removed_count=len(removed_ids), removed_count=total_removed,
extra=dict(extra), extra=event_extra,
)) ))
if name_changed: if name_changed:
@@ -9,7 +9,7 @@ from typing import Any
import aiohttp import aiohttp
from ...notifications.ssrf import UnsafeURLError, validate_outbound_url from ...notifications.ssrf import UnsafeURLError, validate_outbound_url
from .models import ImmichAlbumData, SharedLinkInfo from .models import ImmichAlbumData, ImmichAlbumMeta, SharedLinkInfo
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -201,6 +201,48 @@ class ImmichClient:
_LOGGER.warning("Failed to fetch shared links: %s", err) _LOGGER.warning("Failed to fetch shared links: %s", err)
return links return links
async def get_all_shared_links_by_album(self) -> dict[str, list[SharedLinkInfo]]:
"""Fetch every shared link on the server, bucketed by album id.
Immich's ``/api/shared-links`` endpoint is server-wide — there's no
per-album filter server-side — so every call that wanted the links
for a single album was already paying the cost of the full listing
and then discarding most of the response. Callers that need links
for multiple albums in one tick should use this method and index
into the returned dict instead of hitting ``get_shared_links`` in
a loop.
Returns an empty dict on any error (matches the silent-failure
contract of ``get_shared_links`` so callers don't need to branch
on transient outages).
"""
result: dict[str, list[SharedLinkInfo]] = {}
try:
async with self._session.get(
f"{self._url}/api/shared-links",
headers=self._headers,
) as response:
if response.status != 200:
_LOGGER.warning(
"get_all_shared_links non-200: HTTP %s", response.status
)
return result
data = await response.json()
for link in data:
album = link.get("album")
key = link.get("key")
if not (album and key):
continue
aid = album.get("id")
if not aid:
continue
result.setdefault(aid, []).append(
SharedLinkInfo.from_api_response(link)
)
except aiohttp.ClientError as err:
_LOGGER.warning("Failed to fetch all shared links: %s", err)
return result
async def get_album( async def get_album(
self, self,
album_id: str, album_id: str,
@@ -222,6 +264,120 @@ class ImmichClient:
except aiohttp.ClientError as err: except aiohttp.ClientError as err:
raise ImmichApiError(f"Error communicating with Immich: {err}") from err raise ImmichApiError(f"Error communicating with Immich: {err}") from err
async def get_album_meta(self, album_id: str) -> ImmichAlbumMeta | None:
"""Fetch album metadata without the assets array.
Uses Immich's ``?withoutAssets=true`` query param, which skips the
(potentially huge) ``assets`` field. A 200k-asset album response
drops from ~150 MB to a few hundred bytes, so this is cheap enough
to run on every poll as a change-detection probe.
"""
try:
async with self._session.get(
f"{self._url}/api/albums/{album_id}",
params={"withoutAssets": "true"},
headers=self._headers,
) as response:
if response.status == 404:
return None
if response.status != 200:
raise ImmichApiError(
f"Error fetching album meta {album_id}: HTTP {response.status}"
)
data = await response.json()
return ImmichAlbumMeta.from_api_response(data)
except aiohttp.ClientError as err:
raise ImmichApiError(f"Error communicating with Immich: {err}") from err
async def search_album_assets_updated_after(
self,
album_id: str,
updated_after: str,
*,
page_size: int = 1000,
max_pages: int = 50,
) -> list[dict[str, Any]]:
"""Fetch assets in ``album_id`` whose ``updatedAt`` is after ``updated_after``.
Uses ``POST /api/search/metadata`` with ``albumIds=[album_id]`` and
``updatedAfter=<iso>``. Paginates through up to ``max_pages`` pages —
the cap exists so a clock-skew or upstream bug cannot produce an
infinite loop that exhausts memory on a 200k-asset album. In practice
an active album sees a few hundred updated assets per tick and
terminates after one page.
Returns raw Immich asset dicts (same shape as ``album.assets[*]``
from ``get_album``), so callers can feed them into
``ImmichAssetInfo.from_api_response`` directly.
"""
if not updated_after:
return []
page_size = max(1, min(page_size, 1000))
results: list[dict[str, Any]] = []
for page in range(1, max_pages + 1):
payload: dict[str, Any] = {
"albumIds": [album_id],
"updatedAfter": updated_after,
"page": page,
"size": page_size,
# ``withExif`` keeps location/description parity with
# ``get_album`` so downstream ``ImmichAssetInfo.from_api_response``
# populates city/country/rating on the delta path too.
"withExif": True,
"withPeople": True,
}
try:
async with self._session.post(
f"{self._url}/api/search/metadata",
headers=self._json_headers,
json=payload,
) as response:
if response.status != 200:
body_snip = await response.text()
_LOGGER.warning(
"Immich delta search non-200: HTTP %s body=%s",
response.status, _redact_body(body_snip),
)
break
data = await response.json()
assets_block = data.get("assets")
if isinstance(assets_block, dict):
items = assets_block.get("items", []) or []
next_page = assets_block.get("nextPage")
elif isinstance(assets_block, list):
items = assets_block
next_page = None
else:
_LOGGER.warning(
"Immich delta search returned unexpected shape: keys=%s",
list(data.keys())[:5],
)
break
results.extend(items)
# Stop early on the last page. Immich returns nextPage as
# the next page number (string or int) or None/empty when
# exhausted. Fall back to page-fullness heuristic if the
# server omits the pagination hint.
if next_page is None or next_page == "" or next_page == 0:
break
if len(items) < page_size:
break
except aiohttp.ClientError as err:
_LOGGER.warning("Immich delta search transport error: %s", err)
break
except Exception as err: # noqa: BLE001 — resilience over correctness
_LOGGER.warning("Immich delta search parse error: %s", err)
break
else:
_LOGGER.warning(
"Immich delta search for album %s hit max_pages=%d cap",
album_id, max_pages,
)
return results
async def get_albums(self) -> list[dict[str, Any]]: async def get_albums(self) -> list[dict[str, Any]]:
try: try:
async with self._session.get( async with self._session.get(
@@ -146,6 +146,49 @@ class ImmichAssetInfo:
return bool(thumbhash) return bool(thumbhash)
@dataclass(frozen=True)
class ImmichAlbumMeta:
"""Lightweight album metadata from ``GET /api/albums/{id}?withoutAssets=true``.
Used as a cheap change-detection probe so we can skip the multi-MB
full-asset fetch when nothing interesting has changed. Large albums
(tens to hundreds of thousands of assets) would otherwise re-serialize
the entire asset list on every poll interval.
"""
id: str
name: str
asset_count: int
updated_at: str
shared: bool
thumbnail_asset_id: str | None = None
@classmethod
def from_api_response(cls, data: dict[str, Any]) -> ImmichAlbumMeta:
return cls(
id=data["id"],
name=data.get("albumName", "Unnamed"),
asset_count=int(data.get("assetCount", 0) or 0),
updated_at=data.get("updatedAt", "") or "",
shared=bool(data.get("shared", False)),
thumbnail_asset_id=data.get("albumThumbnailAssetId"),
)
def fingerprint(self) -> dict[str, Any]:
"""Return a minimal serializable dict for persistence + equality checks.
We purposefully exclude ``id`` (known from the state row) and keep the
dict flat so JSON round-trips are cheap and stable for equality.
"""
return {
"updated_at": self.updated_at,
"asset_count": self.asset_count,
"shared": self.shared,
"name": self.name,
"thumbnail_asset_id": self.thumbnail_asset_id or "",
}
@dataclass @dataclass
class ImmichAlbumData: class ImmichAlbumData:
"""Full album data from Immich API.""" """Full album data from Immich API."""
@@ -2,7 +2,10 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import hashlib
import logging import logging
import time
from typing import Any from typing import Any
import aiohttp import aiohttp
@@ -11,13 +14,62 @@ from notify_bridge_core.models.events import ServiceEvent
from notify_bridge_core.providers.base import ServiceProvider, ServiceProviderType from notify_bridge_core.providers.base import ServiceProvider, ServiceProviderType
from notify_bridge_core.templates.variables import TemplateVariableDefinition from notify_bridge_core.templates.variables import TemplateVariableDefinition
from .change_detector import detect_album_changes from .asset_utils import asset_to_media
from .change_detector import _MAX_ASSETS_PER_EVENT, detect_album_changes
from .client import ImmichClient from .client import ImmichClient
from .models import ImmichAlbumData from .models import ImmichAlbumData, ImmichAlbumMeta, ImmichAssetInfo
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
# Module-level users cache shared across ImmichServiceProvider instances.
# Users change rarely (new people joining the server, display-name edits), so
# refetching on every tracker's ``connect()`` is wasteful — a fleet of 10
# trackers on the same Immich server otherwise issues 10 ``GET /api/users``
# calls per poll cycle. TTL is conservative (1h) and a hashed key keeps the
# raw api_key out of dict keys in case of a memory dump.
_USERS_CACHE_TTL_SECONDS = 3600
_users_cache_lock = asyncio.Lock()
_users_cache: dict[str, tuple[float, dict[str, str]]] = {}
def _users_cache_key(url: str, api_key: str) -> str:
digest = hashlib.sha256(f"{url}|{api_key}".encode("utf-8")).hexdigest()
return digest[:32]
async def _get_cached_users(
client: ImmichClient, url: str, api_key: str
) -> dict[str, str]:
"""Return ``{user_id: display_name}`` for the server, reusing cache entries
whose TTL has not elapsed. Misses and stale hits fall through to a real
fetch under a single lock so concurrent polls don't stampede the server.
"""
key = _users_cache_key(url, api_key)
now = time.monotonic()
entry = _users_cache.get(key)
if entry is not None and (now - entry[0]) < _USERS_CACHE_TTL_SECONDS:
return entry[1]
async with _users_cache_lock:
# Re-check after acquiring the lock — another coroutine may have
# refreshed the entry while we waited.
entry = _users_cache.get(key)
if entry is not None and (time.monotonic() - entry[0]) < _USERS_CACHE_TTL_SECONDS:
return entry[1]
fresh = await client.get_users()
_users_cache[key] = (time.monotonic(), fresh)
return fresh
def invalidate_users_cache() -> None:
"""Drop every cached users dict. Exposed for callers that mutate users
(e.g. provider config changes, integration tests) and need the next
``connect()`` to re-fetch.
"""
_users_cache.clear()
# Immich-specific template variables # Immich-specific template variables
IMMICH_VARIABLES: list[TemplateVariableDefinition] = [ IMMICH_VARIABLES: list[TemplateVariableDefinition] = [
TemplateVariableDefinition( TemplateVariableDefinition(
@@ -135,7 +187,9 @@ class ImmichServiceProvider(ServiceProvider):
await self._client.get_server_config() await self._client.get_server_config()
if self._external_domain: if self._external_domain:
self._client.external_domain = self._external_domain self._client.external_domain = self._external_domain
self._users_cache = await self._client.get_users() self._users_cache = await _get_cached_users(
self._client, self._client.url, self._client.api_key,
)
return ok return ok
async def disconnect(self) -> None: async def disconnect(self) -> None:
@@ -150,9 +204,32 @@ class ImmichServiceProvider(ServiceProvider):
new_state = dict(tracker_state) new_state = dict(tracker_state)
external_url = self._client.external_url external_url = self._client.external_url
for album_id in collection_ids: # Tick-scoped share-link cache. Populated lazily on first enrichment;
album = await self._client.get_album(album_id, self._users_cache) # a tracker watching 5 albums with changes now issues 1 ``/api/shared-links``
if album is None: # request per tick instead of 5 (and the endpoint is server-wide — each
# call was already fetching all links and discarding most of them).
self._tick_shared_links: dict[str, list] | None = None
# Fan out the cheap meta probes in parallel. For a tracker that
# watches 20 albums on the same Immich server this turns a 20-hop
# serial wait into ~1 round-trip's worth of latency. aiohttp's
# connection pool caps concurrency per host, so this can't stampede.
meta_results = await asyncio.gather(
*(self._client.get_album_meta(aid) for aid in collection_ids),
return_exceptions=True,
)
for album_id, meta_or_exc in zip(collection_ids, meta_results):
if isinstance(meta_or_exc, BaseException):
# Transient failure on this album — preserve existing state
# and move on. Logging at warning so flaky albums surface in
# the log without flooding on hard outages.
_LOGGER.warning(
"Meta probe failed for album %s: %s", album_id, meta_or_exc,
)
continue
meta = meta_or_exc
if meta is None:
# Album deleted # Album deleted
if album_id in new_state: if album_id in new_state:
from notify_bridge_core.models.events import EventType from notify_bridge_core.models.events import EventType
@@ -168,11 +245,74 @@ class ImmichServiceProvider(ServiceProvider):
del new_state[album_id] del new_state[album_id]
continue continue
# Get previous state
prev = new_state.get(album_id) prev = new_state.get(album_id)
prev_fingerprint = prev.get("meta_fingerprint") if prev else None
has_pending = bool(prev and prev.get("pending_asset_ids"))
# 2) Fast-path: fingerprint match and no pending assets → no work.
# We still refresh the fingerprint slot (no-op if identical) and
# leave asset_ids untouched on disk.
if (
prev is not None
and prev_fingerprint == meta.fingerprint()
and not has_pending
):
continue
# 3) Decide: delta fetch (cheap, active-album case) or full
# fetch (first tick + reconciliation for removals).
old_fp = prev.get("meta_fingerprint") if prev else None
old_asset_count = (old_fp or {}).get("asset_count", 0)
old_updated_at = (old_fp or {}).get("updated_at", "")
# Gate for the delta path:
# - must be tracked already (prev exists, has asset_ids)
# - must have a prior timestamp (empty ⇒ migrated DB row)
# - asset_count must not have decreased (removals need full fetch)
can_delta = (
prev is not None
and bool(prev.get("asset_ids"))
and bool(old_updated_at)
and meta.asset_count >= old_asset_count
)
if can_delta:
delta_events = await self._poll_delta(
album_id=album_id,
prev=prev,
new_meta=meta,
old_updated_at=old_updated_at,
)
if delta_events is not None:
events.extend(delta_events["events"])
new_state[album_id] = delta_events["new_state"]
continue
# delta_events is None ⇒ delta saw more additions than the
# net count increase (mixed add+remove) ⇒ fall through to
# the full-fetch path so removals get detected.
# Full fetch: first tick, or count-decreased, or delta-unsafe.
album = await self._client.get_album(album_id, self._users_cache)
if album is None:
# Album was deleted between meta probe and full fetch — handle
# the deletion the same way as above.
if album_id in new_state:
from notify_bridge_core.models.events import EventType
from datetime import datetime, timezone
events.append(ServiceEvent(
event_type=EventType.COLLECTION_DELETED,
provider_type=ServiceProviderType.IMMICH,
provider_name=self._name,
collection_id=album_id,
collection_name=new_state.get(album_id, {}).get("name", "Unknown"),
timestamp=datetime.now(timezone.utc),
))
del new_state[album_id]
continue
if prev is None: if prev is None:
# First time seeing this album — store state, no event # First time seeing this album — store state, no event
new_state[album_id] = _serialize_album_state(album) new_state[album_id] = _serialize_album_state(album, meta)
continue continue
# Reconstruct previous album data for comparison # Reconstruct previous album data for comparison
@@ -184,34 +324,233 @@ class ImmichServiceProvider(ServiceProvider):
) )
if detected_events: if detected_events:
# Fetch shared links to enrich events with public_url await self._enrich_with_shared_links(album_id, detected_events)
shared_links = await self._client.get_shared_links(album_id)
public_link = None
protected_link = None
for link in shared_links:
if link.is_accessible and not link.is_expired:
if link.has_password:
protected_link = link
else:
public_link = link
break # prefer non-password link
ext_domain = self._external_domain or self._client.external_url
for evt in detected_events:
if public_link:
evt.extra["public_url"] = f"{ext_domain}/share/{public_link.key}"
elif protected_link:
evt.extra["protected_url"] = f"{ext_domain}/share/{protected_link.key}"
events.extend(detected_events) events.extend(detected_events)
# Update state # Update state
state = _serialize_album_state(album) state = _serialize_album_state(album, meta)
state["pending_asset_ids"] = list(updated_pending) state["pending_asset_ids"] = list(updated_pending)
new_state[album_id] = state new_state[album_id] = state
return events, new_state return events, new_state
async def _poll_delta(
self,
*,
album_id: str,
prev: dict[str, Any],
new_meta: ImmichAlbumMeta,
old_updated_at: str,
) -> dict[str, Any] | None:
"""Delta-fetch path for an active album.
Calls ``search/metadata`` with ``updatedAfter`` instead of pulling
the full asset list. Returns a dict with ``events`` and ``new_state``
on success, or ``None`` to signal the caller to retry via full fetch
(used when a mixed add+remove is detected — the delta endpoint can't
tell us *what* was removed, only that additions alone don't account
for the net count change).
Trades strict detection of removals-during-mixed-changes for a
drastic reduction in bytes fetched per tick. On a 200k-asset album
where 50 were just added, we fetch ~50 asset records instead of
200 000.
"""
from datetime import datetime, timezone
from notify_bridge_core.models.events import EventType
prev_asset_ids: set[str] = set(prev.get("asset_ids", []))
prev_pending: set[str] = set(prev.get("pending_asset_ids", []))
raw_assets = await self._client.search_album_assets_updated_after(
album_id, old_updated_at
)
# Parse everything that came back. We need unprocessed entries too
# (they feed the ``pending_asset_ids`` list used by the original
# change detector's processed-later logic).
delta_assets: list[ImmichAssetInfo] = []
for raw in raw_assets:
try:
delta_assets.append(
ImmichAssetInfo.from_api_response(raw, self._users_cache)
)
except Exception as err: # noqa: BLE001 — one bad record ≠ abort tick
_LOGGER.warning(
"Skipping malformed asset record in delta response: %s", err
)
newly_added: list[ImmichAssetInfo] = []
still_pending: set[str] = set()
for asset in delta_assets:
if asset.is_processed:
if asset.id not in prev_asset_ids:
newly_added.append(asset)
else:
still_pending.add(asset.id)
old_asset_count = int((prev.get("meta_fingerprint") or {}).get("asset_count", 0))
net_change = new_meta.asset_count - old_asset_count
# If delta found more "added" assets than the net count change,
# a concurrent removal happened. Full fetch is the only way to
# know what was removed — bail out so the caller retries.
if net_change >= 0 and len(newly_added) > net_change:
_LOGGER.info(
"Delta for album %s found %d additions but net change is %d "
"— falling back to full fetch for removal reconciliation",
album_id, len(newly_added), net_change,
)
return None
# Mirror case: positive net change we couldn't account for with the
# delta results (possibly clock skew on ``updated_at``, or an asset
# whose timestamp is before ``old_updated_at`` yet the album's
# ``updatedAt`` bumped). Full fetch to avoid silently missing adds.
if net_change > 0 and len(newly_added) < net_change:
_LOGGER.info(
"Delta for album %s found %d additions but net change is %d "
"— falling back to full fetch to avoid missing assets",
album_id, len(newly_added), net_change,
)
return None
events: list[ServiceEvent] = []
now = datetime.now(timezone.utc)
external_url = self._external_domain or self._client.external_url
album_url = f"{external_url}/albums/{album_id}"
# Carry album-level attributes we know from the cheap meta probe.
# Shared-link enrichment happens further down only if we emitted
# any asset events.
base_extra = {
"album_url": album_url,
"shared": new_meta.shared,
"asset_count": new_meta.asset_count,
"photo_count": 0, # unknown without per-asset scan; templates tolerate 0
"video_count": 0,
"people": [],
"owner": "",
}
# Metadata-only events (no asset fetch needed)
old_fp = prev.get("meta_fingerprint") or {}
if old_fp.get("name") and old_fp["name"] != new_meta.name:
events.append(ServiceEvent(
event_type=EventType.COLLECTION_RENAMED,
provider_type=ServiceProviderType.IMMICH,
provider_name=self._name,
collection_id=album_id,
collection_name=new_meta.name,
timestamp=now,
added_assets=[],
removed_asset_ids=[],
added_count=0,
removed_count=0,
old_name=old_fp["name"],
new_name=new_meta.name,
extra=dict(base_extra),
))
if "shared" in old_fp and bool(old_fp["shared"]) != bool(new_meta.shared):
events.append(ServiceEvent(
event_type=EventType.SHARING_CHANGED,
provider_type=ServiceProviderType.IMMICH,
provider_name=self._name,
collection_id=album_id,
collection_name=new_meta.name,
timestamp=now,
added_assets=[],
removed_asset_ids=[],
added_count=0,
removed_count=0,
old_shared=bool(old_fp["shared"]),
new_shared=bool(new_meta.shared),
extra=dict(base_extra),
))
if newly_added:
total_added = len(newly_added)
truncated = newly_added[:_MAX_ASSETS_PER_EVENT]
media_assets = [
asset_to_media(a, self._client.external_url) for a in truncated
]
extra = dict(base_extra)
if total_added > _MAX_ASSETS_PER_EVENT:
extra["truncated"] = True
extra["shown_count"] = _MAX_ASSETS_PER_EVENT
_LOGGER.info(
"Delta-path truncated assets_added event for album %s: %d%d",
album_id, total_added, _MAX_ASSETS_PER_EVENT,
)
events.append(ServiceEvent(
event_type=EventType.ASSETS_ADDED,
provider_type=ServiceProviderType.IMMICH,
provider_name=self._name,
collection_id=album_id,
collection_name=new_meta.name,
timestamp=now,
added_assets=media_assets,
removed_asset_ids=[],
added_count=total_added,
removed_count=0,
extra=extra,
))
if events:
await self._enrich_with_shared_links(album_id, events)
# Rebuild state. asset_ids grows by the newly-added processed set.
# pending is the union of the prior pending list (things still in
# flight) and anything the delta confirmed as not-yet-processed.
# When net_change is 0 or negative we trust the meta count over
# our bookkeeping — skip-path will fix drift on the next full fetch.
new_asset_ids = prev_asset_ids | {a.id for a in newly_added}
# Discard any previously-pending IDs that just landed as processed.
new_pending = (prev_pending | still_pending) - {a.id for a in newly_added}
return {
"events": events,
"new_state": {
"name": new_meta.name,
"asset_ids": list(new_asset_ids),
"shared": new_meta.shared,
"pending_asset_ids": list(new_pending),
"meta_fingerprint": new_meta.fingerprint(),
},
}
async def _enrich_with_shared_links(
self, album_id: str, events_to_enrich: list[ServiceEvent]
) -> None:
"""Attach public/protected share link URLs to events for this album.
Uses the tick-scoped bulk cache populated lazily on first call, so a
tracker with changes across N albums makes one ``/api/shared-links``
request per tick instead of N.
"""
if self._tick_shared_links is None:
self._tick_shared_links = await self._client.get_all_shared_links_by_album()
shared_links = self._tick_shared_links.get(album_id, [])
public_link = None
protected_link = None
for link in shared_links:
if link.is_accessible and not link.is_expired:
if link.has_password:
protected_link = link
else:
public_link = link
break # prefer non-password link
ext_domain = self._external_domain or self._client.external_url
for evt in events_to_enrich:
if public_link:
evt.extra["public_url"] = f"{ext_domain}/share/{public_link.key}"
elif protected_link:
evt.extra["protected_url"] = f"{ext_domain}/share/{protected_link.key}"
def get_available_variables(self) -> list[TemplateVariableDefinition]: def get_available_variables(self) -> list[TemplateVariableDefinition]:
return list(IMMICH_VARIABLES) return list(IMMICH_VARIABLES)
@@ -262,13 +601,33 @@ class ImmichServiceProvider(ServiceProvider):
return {"ok": False, "message": "Failed to connect to Immich"} return {"ok": False, "message": "Failed to connect to Immich"}
def _serialize_album_state(album: ImmichAlbumData) -> dict[str, Any]: def _serialize_album_state(
"""Serialize album state for persistence.""" album: ImmichAlbumData,
meta: ImmichAlbumMeta | None = None,
) -> dict[str, Any]:
"""Serialize album state for persistence.
``meta`` carries the fingerprint used for cheap no-change detection on
subsequent polls. When omitted (legacy callers, tests) we synthesize a
best-effort fingerprint from the full album — it will still match on the
next tick if nothing changed, which is what matters.
"""
if meta is None:
fingerprint = {
"updated_at": album.updated_at,
"asset_count": len(album.asset_ids),
"shared": album.shared,
"name": album.name,
"thumbnail_asset_id": album.thumbnail_asset_id or "",
}
else:
fingerprint = meta.fingerprint()
return { return {
"name": album.name, "name": album.name,
"asset_ids": list(album.asset_ids), "asset_ids": list(album.asset_ids),
"shared": album.shared, "shared": album.shared,
"pending_asset_ids": [], "pending_asset_ids": [],
"meta_fingerprint": fingerprint,
} }
@@ -309,6 +309,14 @@ async def migrate_schema(engine: AsyncEngine) -> None:
text(f"ALTER TABLE {state_table} ADD COLUMN shared INTEGER DEFAULT 0") text(f"ALTER TABLE {state_table} ADD COLUMN shared INTEGER DEFAULT 0")
) )
logger.info("Added shared column to %s table", state_table) logger.info("Added shared column to %s table", state_table)
# meta_fingerprint — small JSON blob captured from the provider's
# cheap meta probe. An empty default means "unknown, do a full
# fetch next tick" so existing rows don't wrongly skip detection.
if not await _has_column(conn, state_table, "meta_fingerprint"):
await conn.execute(
text(f"ALTER TABLE {state_table} ADD COLUMN meta_fingerprint TEXT DEFAULT '{{}}'")
)
logger.info("Added meta_fingerprint column to %s table", state_table)
# Add language_code to telegram_chat if missing # Add language_code to telegram_chat if missing
if await _has_table(conn, "telegram_chat"): if await _has_table(conn, "telegram_chat"):
@@ -376,6 +376,13 @@ class NotificationTrackerState(SQLModel, table=True):
shared: bool = Field(default=False) shared: bool = Field(default=False)
asset_ids: list[str] = Field(default_factory=list, sa_column=Column(JSON)) asset_ids: list[str] = Field(default_factory=list, sa_column=Column(JSON))
pending_asset_ids: list[str] = Field(default_factory=list, sa_column=Column(JSON)) pending_asset_ids: list[str] = Field(default_factory=list, sa_column=Column(JSON))
# Lightweight fingerprint ({updated_at, asset_count, shared, name, ...})
# captured from the provider's cheap meta probe. Letting this differ from
# the current provider response is what tells the watcher a full fetch is
# actually required — letting it match lets the watcher skip the big read.
meta_fingerprint: dict[str, Any] = Field(
default_factory=dict, sa_column=Column(JSON)
)
last_updated: datetime = Field(default_factory=_utcnow) last_updated: datetime = Field(default_factory=_utcnow)
@@ -10,6 +10,49 @@ _LOGGER = logging.getLogger(__name__)
_scheduler: AsyncIOScheduler | None = None _scheduler: AsyncIOScheduler | None = None
# ---------------------------------------------------------------------------
# Adaptive polling (Tier 6 of the big-album optimization plan).
#
# We don't touch the user-configured ``scan_interval`` — that's still the
# authoritative cadence. Instead, we *skip* a growing fraction of scheduled
# ticks when a tracker is idle, and reset to 1:1 as soon as it detects
# anything. The scheduler keeps running on the user's chosen period, so
# response time to the *first* change after an idle stretch is never worse
# than one tick — but the steady-state HTTP cost for a fleet of idle
# trackers drops by ~75%.
#
# Thresholds are intentionally conservative: a tracker polling every 30 s
# needs 5 min of silence before we halve its effective rate, and 15 min
# before we quarter it. Any caller can disable adaptive behavior by passing
# ``adaptive=False`` in the tracker filters dict (checked in ``_poll_tracker``).
# ---------------------------------------------------------------------------
_ADAPTIVE_HALVE_THRESHOLD = 10 # consecutive empty ticks → 1-in-2
_ADAPTIVE_QUARTER_THRESHOLD = 30 # consecutive empty ticks → 1-in-4
_ADAPTIVE_MAX_SKIP = 4 # hard cap on skip factor
# Per-tracker adaptive state, keyed by tracker_id. Rebuilt on process
# restart — a short warmup period is fine and avoids persisting what is
# effectively a performance heuristic.
_adaptive_state: dict[int, dict[str, int]] = {}
def _compute_jitter(interval_seconds: int) -> int:
"""Return a jitter bound (in seconds) suitable for an IntervalTrigger.
Without jitter, a fleet of N trackers all on ``scan_interval=60`` wake up
at the same wall-clock second every minute — that creates a thundering-
herd on the upstream Immich/Gitea/etc. server. APScheduler's ``jitter``
randomizes each tick's firing time by ±jitter seconds.
We use a quarter of the interval up to a 30 s cap. For short intervals
(≤8 s) jitter would round to 0 — that's fine, at those cadences a
bursty pattern is what the user implicitly opted into.
"""
if interval_seconds <= 0:
return 0
return min(interval_seconds // 4, 30)
def get_scheduler() -> AsyncIOScheduler: def get_scheduler() -> AsyncIOScheduler:
global _scheduler global _scheduler
@@ -271,16 +314,21 @@ async def _load_tracker_jobs() -> None:
tracker.id, tracker.name, e, tracker.id, tracker.name, e,
) )
jitter = _compute_jitter(tracker.scan_interval)
scheduler.add_job( scheduler.add_job(
_poll_tracker, _poll_tracker,
"interval", "interval",
seconds=tracker.scan_interval, seconds=tracker.scan_interval,
jitter=jitter or None,
id=job_id, id=job_id,
args=[tracker.id], args=[tracker.id],
replace_existing=True, replace_existing=True,
max_instances=1, max_instances=1,
) )
_LOGGER.info("Scheduled tracker %d (%s) every %ds", tracker.id, tracker.name, tracker.scan_interval) _LOGGER.info(
"Scheduled tracker %d (%s) every %ds (jitter ±%ds)",
tracker.id, tracker.name, tracker.scan_interval, jitter,
)
def _add_cron_job( def _add_cron_job(
@@ -313,6 +361,10 @@ async def schedule_tracker(
scheduler = get_scheduler() scheduler = get_scheduler()
job_id = f"tracker_{tracker_id}" job_id = f"tracker_{tracker_id}"
# A reschedule typically follows a config edit or enable/disable flip —
# drop adaptive back-off so the first tick after the change runs promptly.
reset_adaptive_state(tracker_id)
# Remove existing job first to allow trigger type changes # Remove existing job first to allow trigger type changes
if scheduler.get_job(job_id): if scheduler.get_job(job_id):
scheduler.remove_job(job_id) scheduler.remove_job(job_id)
@@ -324,33 +376,113 @@ async def schedule_tracker(
except Exception as e: except Exception as e:
_LOGGER.error("Invalid cron for tracker %d: %s — using interval", tracker_id, e) _LOGGER.error("Invalid cron for tracker %d: %s — using interval", tracker_id, e)
jitter = _compute_jitter(interval)
scheduler.add_job( scheduler.add_job(
_poll_tracker, _poll_tracker,
"interval", "interval",
seconds=interval, seconds=interval,
jitter=jitter or None,
id=job_id, id=job_id,
args=[tracker_id], args=[tracker_id],
replace_existing=True, replace_existing=True,
) )
_LOGGER.info("Scheduled tracker %d every %ds", tracker_id, interval) _LOGGER.info(
"Scheduled tracker %d every %ds (jitter ±%ds)", tracker_id, interval, jitter,
)
async def unschedule_tracker(tracker_id: int) -> None: async def unschedule_tracker(tracker_id: int) -> None:
"""Remove a scheduler job for a tracker.""" """Remove a scheduler job for a tracker."""
scheduler = get_scheduler() scheduler = get_scheduler()
job_id = f"tracker_{tracker_id}" job_id = f"tracker_{tracker_id}"
reset_adaptive_state(tracker_id)
if scheduler.get_job(job_id): if scheduler.get_job(job_id):
scheduler.remove_job(job_id) scheduler.remove_job(job_id)
_LOGGER.info("Unscheduled tracker %d", tracker_id) _LOGGER.info("Unscheduled tracker %d", tracker_id)
def _adaptive_should_skip(tracker_id: int) -> bool:
"""Return True when the adaptive heuristic says to skip this tick.
Run-length skip: if we're in 1-in-K mode, skip (K-1) ticks between each
real poll. Stateless about the *current* tick counter except for the
``tick_counter`` we bump here.
"""
state = _adaptive_state.get(tracker_id)
if not state:
return False
skip_every = state.get("skip_every", 1)
if skip_every <= 1:
return False
state["tick_counter"] = state.get("tick_counter", 0) + 1
# Fire on ticks where counter % skip_every == 0; skip the rest.
return (state["tick_counter"] % skip_every) != 0
def _adaptive_update(tracker_id: int, events_detected: int) -> None:
"""Update the adaptive counter after a real tick ran."""
state = _adaptive_state.setdefault(
tracker_id, {"empty_count": 0, "skip_every": 1, "tick_counter": 0}
)
if events_detected > 0:
if state["skip_every"] > 1:
_LOGGER.info(
"Adaptive polling: tracker %d saw activity, restoring base rate",
tracker_id,
)
state["empty_count"] = 0
state["skip_every"] = 1
state["tick_counter"] = 0
return
state["empty_count"] = state.get("empty_count", 0) + 1
if (
state["empty_count"] >= _ADAPTIVE_QUARTER_THRESHOLD
and state["skip_every"] < _ADAPTIVE_MAX_SKIP
):
state["skip_every"] = _ADAPTIVE_MAX_SKIP
_LOGGER.info(
"Adaptive polling: tracker %d idle for %d ticks, skipping 3 of 4",
tracker_id, state["empty_count"],
)
elif (
state["empty_count"] >= _ADAPTIVE_HALVE_THRESHOLD
and state["skip_every"] < 2
):
state["skip_every"] = 2
_LOGGER.info(
"Adaptive polling: tracker %d idle for %d ticks, skipping every other",
tracker_id, state["empty_count"],
)
def reset_adaptive_state(tracker_id: int) -> None:
"""Drop cached adaptive counters for a tracker.
Used by API callers that make changes requiring the tracker to run
promptly on the next scheduled tick (enable/disable, config edits,
manual "check now" actions).
"""
_adaptive_state.pop(tracker_id, None)
async def _poll_tracker(tracker_id: int) -> None: async def _poll_tracker(tracker_id: int) -> None:
"""Poll a tracker for changes.""" """Poll a tracker for changes."""
from .watcher import check_tracker from .watcher import check_tracker
if _adaptive_should_skip(tracker_id):
return
try: try:
await check_tracker(tracker_id) result = await check_tracker(tracker_id)
except Exception as e: except Exception as e:
_LOGGER.error("Error polling tracker %d: %s", tracker_id, e) _LOGGER.error("Error polling tracker %d: %s", tracker_id, e)
return
# Treat the "error" / "skipped" statuses as inconclusive — don't let
# a transient upstream failure trick the heuristic into backing off.
if isinstance(result, dict) and result.get("status") == "ok":
_adaptive_update(tracker_id, int(result.get("events_detected", 0) or 0))
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -187,8 +187,17 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
"asset_ids": s.asset_ids, "asset_ids": s.asset_ids,
"pending_asset_ids": s.pending_asset_ids, "pending_asset_ids": s.pending_asset_ids,
"shared": bool(s.shared), "shared": bool(s.shared),
"meta_fingerprint": s.meta_fingerprint or {},
} }
# Snapshot the original fingerprint per collection so we can skip the
# (expensive) asset_ids rewrite when nothing changed. For a 200k-asset
# album this avoids a ~7 MB JSON write to the state row every tick.
original_fingerprints: dict[str, dict[str, Any]] = {
cid: dict(cstate.get("meta_fingerprint") or {})
for cid, cstate in state_dict.items()
}
# Load tracker-target links # Load tracker-target links
link_data = await load_link_data(session, tracker_id) link_data = await load_link_data(session, tracker_id)
@@ -279,11 +288,20 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
existing = s existing = s
break break
current_fingerprint = dict(cstate.get("meta_fingerprint") or {})
prior_fingerprint = original_fingerprints.get(cid, {})
# Skip the DB update when the provider reported no meaningful
# change. ``existing`` is None on first-ever fetch for a
# collection — that path always writes so the row gets created.
if existing is not None and current_fingerprint == prior_fingerprint:
continue
if existing: if existing:
existing.asset_ids = cstate.get("asset_ids", []) existing.asset_ids = cstate.get("asset_ids", [])
existing.pending_asset_ids = cstate.get("pending_asset_ids", []) existing.pending_asset_ids = cstate.get("pending_asset_ids", [])
existing.collection_name = cstate.get("name", "") existing.collection_name = cstate.get("name", "")
existing.shared = cstate.get("shared", False) existing.shared = cstate.get("shared", False)
existing.meta_fingerprint = current_fingerprint
session.add(existing) session.add(existing)
else: else:
new_ts = NotificationTrackerState( new_ts = NotificationTrackerState(
@@ -293,6 +311,7 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
shared=cstate.get("shared", False), shared=cstate.get("shared", False),
asset_ids=cstate.get("asset_ids", []), asset_ids=cstate.get("asset_ids", []),
pending_asset_ids=cstate.get("pending_asset_ids", []), pending_asset_ids=cstate.get("pending_asset_ids", []),
meta_fingerprint=current_fingerprint,
) )
session.add(new_ts) session.add(new_ts)