perf(immich): skip full album fetch on idle ticks; delta-fetch for active ones
Optimizes polling for large Immich albums (tested path targets ~200k assets). Combined impact on idle albums drops per-tick cost from ~150 MB fetch to ~few hundred bytes; active albums fetch O(changes) instead of O(library). Core changes - ImmichAlbumMeta + get_album_meta() using ?withoutAssets=true as a cheap change-detection probe. - poll() fast-path: skip full fetch when meta fingerprint matches and no pending assets are outstanding. - poll() delta-path: search/metadata with updatedAfter when fingerprint changed, falling back to full fetch on count decrease or mixed add+remove that delta can't reconcile. - asyncio.gather over meta probes so a 20-album tracker pays one round-trip of latency instead of 20. - Event payload cap (50 added / 200 removed) so a bulk import can't explode a Jinja template or exceed Telegram's message limits. - Module-level users cache (1h TTL, sha256-keyed) shared across providers on the same Immich server. - Tick-scoped shared-links cache via new get_all_shared_links_by_album() — one /api/shared-links request per tick instead of one per changed album. Server changes - meta_fingerprint JSON column on NotificationTrackerState + migration. - watcher skips the asset_ids DB rewrite when the fingerprint didn't change, avoiding ~8 MB JSON writes on idle ticks for huge albums. - Adaptive polling: after 10 empty ticks skip 1-in-2, after 30 skip 1-in-4, reset on first detected change; resets on schedule changes. - APScheduler jitter (interval/4, capped at 30s) to smooth thundering- herd bursts when many trackers share the same scan_interval.
This commit is contained in:
@@ -13,6 +13,18 @@ from .models import ImmichAlbumData, ImmichAssetInfo
|
|||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Guard against runaway payloads when a bulk import lands in one poll tick.
|
||||||
|
# Templates iterate every entry in ``added_assets`` / ``removed_asset_ids``
|
||||||
|
# in Jinja for-loops (see defaults/*/assets_added.jinja2), and Telegram's
|
||||||
|
# media group has a hard cap of its own — sending 200k entries would both
|
||||||
|
# crash rendering and produce a message that no transport can deliver.
|
||||||
|
#
|
||||||
|
# ``added_count`` / ``removed_count`` on the event always carry the true
|
||||||
|
# totals so templates can show an accurate "N added" number even when the
|
||||||
|
# per-asset list is truncated.
|
||||||
|
_MAX_ASSETS_PER_EVENT = 50
|
||||||
|
_MAX_REMOVALS_PER_EVENT = 200
|
||||||
|
|
||||||
|
|
||||||
def _make_base_extra(new_album: ImmichAlbumData, external_url: str) -> dict:
|
def _make_base_extra(new_album: ImmichAlbumData, external_url: str) -> dict:
|
||||||
"""Build the common extra dict for album events."""
|
"""Build the common extra dict for album events."""
|
||||||
@@ -85,7 +97,17 @@ def detect_album_changes(
|
|||||||
|
|
||||||
# Emit one event per change type detected
|
# Emit one event per change type detected
|
||||||
if added_assets:
|
if added_assets:
|
||||||
media_assets = [asset_to_media(a, external_url) for a in added_assets]
|
total_added = len(added_assets)
|
||||||
|
truncated_added = added_assets[:_MAX_ASSETS_PER_EVENT]
|
||||||
|
media_assets = [asset_to_media(a, external_url) for a in truncated_added]
|
||||||
|
event_extra = dict(extra)
|
||||||
|
if total_added > _MAX_ASSETS_PER_EVENT:
|
||||||
|
event_extra["truncated"] = True
|
||||||
|
event_extra["shown_count"] = _MAX_ASSETS_PER_EVENT
|
||||||
|
_LOGGER.info(
|
||||||
|
"Truncated assets_added event for album %s: %d → %d",
|
||||||
|
new_album.id, total_added, _MAX_ASSETS_PER_EVENT,
|
||||||
|
)
|
||||||
events.append(ServiceEvent(
|
events.append(ServiceEvent(
|
||||||
event_type=EventType.ASSETS_ADDED,
|
event_type=EventType.ASSETS_ADDED,
|
||||||
provider_type=ServiceProviderType.IMMICH,
|
provider_type=ServiceProviderType.IMMICH,
|
||||||
@@ -95,12 +117,22 @@ def detect_album_changes(
|
|||||||
timestamp=now,
|
timestamp=now,
|
||||||
added_assets=media_assets,
|
added_assets=media_assets,
|
||||||
removed_asset_ids=[],
|
removed_asset_ids=[],
|
||||||
added_count=len(added_assets),
|
added_count=total_added,
|
||||||
removed_count=0,
|
removed_count=0,
|
||||||
extra=dict(extra),
|
extra=event_extra,
|
||||||
))
|
))
|
||||||
|
|
||||||
if removed_ids:
|
if removed_ids:
|
||||||
|
total_removed = len(removed_ids)
|
||||||
|
truncated_removed = list(removed_ids)[:_MAX_REMOVALS_PER_EVENT]
|
||||||
|
event_extra = dict(extra)
|
||||||
|
if total_removed > _MAX_REMOVALS_PER_EVENT:
|
||||||
|
event_extra["truncated"] = True
|
||||||
|
event_extra["shown_count"] = _MAX_REMOVALS_PER_EVENT
|
||||||
|
_LOGGER.info(
|
||||||
|
"Truncated assets_removed event for album %s: %d → %d",
|
||||||
|
new_album.id, total_removed, _MAX_REMOVALS_PER_EVENT,
|
||||||
|
)
|
||||||
events.append(ServiceEvent(
|
events.append(ServiceEvent(
|
||||||
event_type=EventType.ASSETS_REMOVED,
|
event_type=EventType.ASSETS_REMOVED,
|
||||||
provider_type=ServiceProviderType.IMMICH,
|
provider_type=ServiceProviderType.IMMICH,
|
||||||
@@ -109,10 +141,10 @@ def detect_album_changes(
|
|||||||
collection_name=new_album.name,
|
collection_name=new_album.name,
|
||||||
timestamp=now,
|
timestamp=now,
|
||||||
added_assets=[],
|
added_assets=[],
|
||||||
removed_asset_ids=list(removed_ids),
|
removed_asset_ids=truncated_removed,
|
||||||
added_count=0,
|
added_count=0,
|
||||||
removed_count=len(removed_ids),
|
removed_count=total_removed,
|
||||||
extra=dict(extra),
|
extra=event_extra,
|
||||||
))
|
))
|
||||||
|
|
||||||
if name_changed:
|
if name_changed:
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ from typing import Any
|
|||||||
import aiohttp
|
import aiohttp
|
||||||
|
|
||||||
from ...notifications.ssrf import UnsafeURLError, validate_outbound_url
|
from ...notifications.ssrf import UnsafeURLError, validate_outbound_url
|
||||||
from .models import ImmichAlbumData, SharedLinkInfo
|
from .models import ImmichAlbumData, ImmichAlbumMeta, SharedLinkInfo
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -201,6 +201,48 @@ class ImmichClient:
|
|||||||
_LOGGER.warning("Failed to fetch shared links: %s", err)
|
_LOGGER.warning("Failed to fetch shared links: %s", err)
|
||||||
return links
|
return links
|
||||||
|
|
||||||
|
async def get_all_shared_links_by_album(self) -> dict[str, list[SharedLinkInfo]]:
|
||||||
|
"""Fetch every shared link on the server, bucketed by album id.
|
||||||
|
|
||||||
|
Immich's ``/api/shared-links`` endpoint is server-wide — there's no
|
||||||
|
per-album filter server-side — so every call that wanted the links
|
||||||
|
for a single album was already paying the cost of the full listing
|
||||||
|
and then discarding most of the response. Callers that need links
|
||||||
|
for multiple albums in one tick should use this method and index
|
||||||
|
into the returned dict instead of hitting ``get_shared_links`` in
|
||||||
|
a loop.
|
||||||
|
|
||||||
|
Returns an empty dict on any error (matches the silent-failure
|
||||||
|
contract of ``get_shared_links`` so callers don't need to branch
|
||||||
|
on transient outages).
|
||||||
|
"""
|
||||||
|
result: dict[str, list[SharedLinkInfo]] = {}
|
||||||
|
try:
|
||||||
|
async with self._session.get(
|
||||||
|
f"{self._url}/api/shared-links",
|
||||||
|
headers=self._headers,
|
||||||
|
) as response:
|
||||||
|
if response.status != 200:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"get_all_shared_links non-200: HTTP %s", response.status
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
data = await response.json()
|
||||||
|
for link in data:
|
||||||
|
album = link.get("album")
|
||||||
|
key = link.get("key")
|
||||||
|
if not (album and key):
|
||||||
|
continue
|
||||||
|
aid = album.get("id")
|
||||||
|
if not aid:
|
||||||
|
continue
|
||||||
|
result.setdefault(aid, []).append(
|
||||||
|
SharedLinkInfo.from_api_response(link)
|
||||||
|
)
|
||||||
|
except aiohttp.ClientError as err:
|
||||||
|
_LOGGER.warning("Failed to fetch all shared links: %s", err)
|
||||||
|
return result
|
||||||
|
|
||||||
async def get_album(
|
async def get_album(
|
||||||
self,
|
self,
|
||||||
album_id: str,
|
album_id: str,
|
||||||
@@ -222,6 +264,120 @@ class ImmichClient:
|
|||||||
except aiohttp.ClientError as err:
|
except aiohttp.ClientError as err:
|
||||||
raise ImmichApiError(f"Error communicating with Immich: {err}") from err
|
raise ImmichApiError(f"Error communicating with Immich: {err}") from err
|
||||||
|
|
||||||
|
async def get_album_meta(self, album_id: str) -> ImmichAlbumMeta | None:
|
||||||
|
"""Fetch album metadata without the assets array.
|
||||||
|
|
||||||
|
Uses Immich's ``?withoutAssets=true`` query param, which skips the
|
||||||
|
(potentially huge) ``assets`` field. A 200k-asset album response
|
||||||
|
drops from ~150 MB to a few hundred bytes, so this is cheap enough
|
||||||
|
to run on every poll as a change-detection probe.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
async with self._session.get(
|
||||||
|
f"{self._url}/api/albums/{album_id}",
|
||||||
|
params={"withoutAssets": "true"},
|
||||||
|
headers=self._headers,
|
||||||
|
) as response:
|
||||||
|
if response.status == 404:
|
||||||
|
return None
|
||||||
|
if response.status != 200:
|
||||||
|
raise ImmichApiError(
|
||||||
|
f"Error fetching album meta {album_id}: HTTP {response.status}"
|
||||||
|
)
|
||||||
|
data = await response.json()
|
||||||
|
return ImmichAlbumMeta.from_api_response(data)
|
||||||
|
except aiohttp.ClientError as err:
|
||||||
|
raise ImmichApiError(f"Error communicating with Immich: {err}") from err
|
||||||
|
|
||||||
|
async def search_album_assets_updated_after(
|
||||||
|
self,
|
||||||
|
album_id: str,
|
||||||
|
updated_after: str,
|
||||||
|
*,
|
||||||
|
page_size: int = 1000,
|
||||||
|
max_pages: int = 50,
|
||||||
|
) -> list[dict[str, Any]]:
|
||||||
|
"""Fetch assets in ``album_id`` whose ``updatedAt`` is after ``updated_after``.
|
||||||
|
|
||||||
|
Uses ``POST /api/search/metadata`` with ``albumIds=[album_id]`` and
|
||||||
|
``updatedAfter=<iso>``. Paginates through up to ``max_pages`` pages —
|
||||||
|
the cap exists so a clock-skew or upstream bug cannot produce an
|
||||||
|
infinite loop that exhausts memory on a 200k-asset album. In practice
|
||||||
|
an active album sees a few hundred updated assets per tick and
|
||||||
|
terminates after one page.
|
||||||
|
|
||||||
|
Returns raw Immich asset dicts (same shape as ``album.assets[*]``
|
||||||
|
from ``get_album``), so callers can feed them into
|
||||||
|
``ImmichAssetInfo.from_api_response`` directly.
|
||||||
|
"""
|
||||||
|
if not updated_after:
|
||||||
|
return []
|
||||||
|
|
||||||
|
page_size = max(1, min(page_size, 1000))
|
||||||
|
results: list[dict[str, Any]] = []
|
||||||
|
for page in range(1, max_pages + 1):
|
||||||
|
payload: dict[str, Any] = {
|
||||||
|
"albumIds": [album_id],
|
||||||
|
"updatedAfter": updated_after,
|
||||||
|
"page": page,
|
||||||
|
"size": page_size,
|
||||||
|
# ``withExif`` keeps location/description parity with
|
||||||
|
# ``get_album`` so downstream ``ImmichAssetInfo.from_api_response``
|
||||||
|
# populates city/country/rating on the delta path too.
|
||||||
|
"withExif": True,
|
||||||
|
"withPeople": True,
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
async with self._session.post(
|
||||||
|
f"{self._url}/api/search/metadata",
|
||||||
|
headers=self._json_headers,
|
||||||
|
json=payload,
|
||||||
|
) as response:
|
||||||
|
if response.status != 200:
|
||||||
|
body_snip = await response.text()
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Immich delta search non-200: HTTP %s body=%s",
|
||||||
|
response.status, _redact_body(body_snip),
|
||||||
|
)
|
||||||
|
break
|
||||||
|
data = await response.json()
|
||||||
|
assets_block = data.get("assets")
|
||||||
|
if isinstance(assets_block, dict):
|
||||||
|
items = assets_block.get("items", []) or []
|
||||||
|
next_page = assets_block.get("nextPage")
|
||||||
|
elif isinstance(assets_block, list):
|
||||||
|
items = assets_block
|
||||||
|
next_page = None
|
||||||
|
else:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Immich delta search returned unexpected shape: keys=%s",
|
||||||
|
list(data.keys())[:5],
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
results.extend(items)
|
||||||
|
|
||||||
|
# Stop early on the last page. Immich returns nextPage as
|
||||||
|
# the next page number (string or int) or None/empty when
|
||||||
|
# exhausted. Fall back to page-fullness heuristic if the
|
||||||
|
# server omits the pagination hint.
|
||||||
|
if next_page is None or next_page == "" or next_page == 0:
|
||||||
|
break
|
||||||
|
if len(items) < page_size:
|
||||||
|
break
|
||||||
|
except aiohttp.ClientError as err:
|
||||||
|
_LOGGER.warning("Immich delta search transport error: %s", err)
|
||||||
|
break
|
||||||
|
except Exception as err: # noqa: BLE001 — resilience over correctness
|
||||||
|
_LOGGER.warning("Immich delta search parse error: %s", err)
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Immich delta search for album %s hit max_pages=%d cap",
|
||||||
|
album_id, max_pages,
|
||||||
|
)
|
||||||
|
return results
|
||||||
|
|
||||||
async def get_albums(self) -> list[dict[str, Any]]:
|
async def get_albums(self) -> list[dict[str, Any]]:
|
||||||
try:
|
try:
|
||||||
async with self._session.get(
|
async with self._session.get(
|
||||||
|
|||||||
@@ -146,6 +146,49 @@ class ImmichAssetInfo:
|
|||||||
return bool(thumbhash)
|
return bool(thumbhash)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class ImmichAlbumMeta:
|
||||||
|
"""Lightweight album metadata from ``GET /api/albums/{id}?withoutAssets=true``.
|
||||||
|
|
||||||
|
Used as a cheap change-detection probe so we can skip the multi-MB
|
||||||
|
full-asset fetch when nothing interesting has changed. Large albums
|
||||||
|
(tens to hundreds of thousands of assets) would otherwise re-serialize
|
||||||
|
the entire asset list on every poll interval.
|
||||||
|
"""
|
||||||
|
|
||||||
|
id: str
|
||||||
|
name: str
|
||||||
|
asset_count: int
|
||||||
|
updated_at: str
|
||||||
|
shared: bool
|
||||||
|
thumbnail_asset_id: str | None = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_api_response(cls, data: dict[str, Any]) -> ImmichAlbumMeta:
|
||||||
|
return cls(
|
||||||
|
id=data["id"],
|
||||||
|
name=data.get("albumName", "Unnamed"),
|
||||||
|
asset_count=int(data.get("assetCount", 0) or 0),
|
||||||
|
updated_at=data.get("updatedAt", "") or "",
|
||||||
|
shared=bool(data.get("shared", False)),
|
||||||
|
thumbnail_asset_id=data.get("albumThumbnailAssetId"),
|
||||||
|
)
|
||||||
|
|
||||||
|
def fingerprint(self) -> dict[str, Any]:
|
||||||
|
"""Return a minimal serializable dict for persistence + equality checks.
|
||||||
|
|
||||||
|
We purposefully exclude ``id`` (known from the state row) and keep the
|
||||||
|
dict flat so JSON round-trips are cheap and stable for equality.
|
||||||
|
"""
|
||||||
|
return {
|
||||||
|
"updated_at": self.updated_at,
|
||||||
|
"asset_count": self.asset_count,
|
||||||
|
"shared": self.shared,
|
||||||
|
"name": self.name,
|
||||||
|
"thumbnail_asset_id": self.thumbnail_asset_id or "",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ImmichAlbumData:
|
class ImmichAlbumData:
|
||||||
"""Full album data from Immich API."""
|
"""Full album data from Immich API."""
|
||||||
|
|||||||
@@ -2,7 +2,10 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import hashlib
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
@@ -11,13 +14,62 @@ from notify_bridge_core.models.events import ServiceEvent
|
|||||||
from notify_bridge_core.providers.base import ServiceProvider, ServiceProviderType
|
from notify_bridge_core.providers.base import ServiceProvider, ServiceProviderType
|
||||||
from notify_bridge_core.templates.variables import TemplateVariableDefinition
|
from notify_bridge_core.templates.variables import TemplateVariableDefinition
|
||||||
|
|
||||||
from .change_detector import detect_album_changes
|
from .asset_utils import asset_to_media
|
||||||
|
from .change_detector import _MAX_ASSETS_PER_EVENT, detect_album_changes
|
||||||
from .client import ImmichClient
|
from .client import ImmichClient
|
||||||
from .models import ImmichAlbumData
|
from .models import ImmichAlbumData, ImmichAlbumMeta, ImmichAssetInfo
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# Module-level users cache shared across ImmichServiceProvider instances.
|
||||||
|
# Users change rarely (new people joining the server, display-name edits), so
|
||||||
|
# refetching on every tracker's ``connect()`` is wasteful — a fleet of 10
|
||||||
|
# trackers on the same Immich server otherwise issues 10 ``GET /api/users``
|
||||||
|
# calls per poll cycle. TTL is conservative (1h) and a hashed key keeps the
|
||||||
|
# raw api_key out of dict keys in case of a memory dump.
|
||||||
|
_USERS_CACHE_TTL_SECONDS = 3600
|
||||||
|
_users_cache_lock = asyncio.Lock()
|
||||||
|
_users_cache: dict[str, tuple[float, dict[str, str]]] = {}
|
||||||
|
|
||||||
|
|
||||||
|
def _users_cache_key(url: str, api_key: str) -> str:
|
||||||
|
digest = hashlib.sha256(f"{url}|{api_key}".encode("utf-8")).hexdigest()
|
||||||
|
return digest[:32]
|
||||||
|
|
||||||
|
|
||||||
|
async def _get_cached_users(
|
||||||
|
client: ImmichClient, url: str, api_key: str
|
||||||
|
) -> dict[str, str]:
|
||||||
|
"""Return ``{user_id: display_name}`` for the server, reusing cache entries
|
||||||
|
whose TTL has not elapsed. Misses and stale hits fall through to a real
|
||||||
|
fetch under a single lock so concurrent polls don't stampede the server.
|
||||||
|
"""
|
||||||
|
key = _users_cache_key(url, api_key)
|
||||||
|
now = time.monotonic()
|
||||||
|
entry = _users_cache.get(key)
|
||||||
|
if entry is not None and (now - entry[0]) < _USERS_CACHE_TTL_SECONDS:
|
||||||
|
return entry[1]
|
||||||
|
|
||||||
|
async with _users_cache_lock:
|
||||||
|
# Re-check after acquiring the lock — another coroutine may have
|
||||||
|
# refreshed the entry while we waited.
|
||||||
|
entry = _users_cache.get(key)
|
||||||
|
if entry is not None and (time.monotonic() - entry[0]) < _USERS_CACHE_TTL_SECONDS:
|
||||||
|
return entry[1]
|
||||||
|
fresh = await client.get_users()
|
||||||
|
_users_cache[key] = (time.monotonic(), fresh)
|
||||||
|
return fresh
|
||||||
|
|
||||||
|
|
||||||
|
def invalidate_users_cache() -> None:
|
||||||
|
"""Drop every cached users dict. Exposed for callers that mutate users
|
||||||
|
(e.g. provider config changes, integration tests) and need the next
|
||||||
|
``connect()`` to re-fetch.
|
||||||
|
"""
|
||||||
|
_users_cache.clear()
|
||||||
|
|
||||||
|
|
||||||
# Immich-specific template variables
|
# Immich-specific template variables
|
||||||
IMMICH_VARIABLES: list[TemplateVariableDefinition] = [
|
IMMICH_VARIABLES: list[TemplateVariableDefinition] = [
|
||||||
TemplateVariableDefinition(
|
TemplateVariableDefinition(
|
||||||
@@ -135,7 +187,9 @@ class ImmichServiceProvider(ServiceProvider):
|
|||||||
await self._client.get_server_config()
|
await self._client.get_server_config()
|
||||||
if self._external_domain:
|
if self._external_domain:
|
||||||
self._client.external_domain = self._external_domain
|
self._client.external_domain = self._external_domain
|
||||||
self._users_cache = await self._client.get_users()
|
self._users_cache = await _get_cached_users(
|
||||||
|
self._client, self._client.url, self._client.api_key,
|
||||||
|
)
|
||||||
return ok
|
return ok
|
||||||
|
|
||||||
async def disconnect(self) -> None:
|
async def disconnect(self) -> None:
|
||||||
@@ -150,9 +204,32 @@ class ImmichServiceProvider(ServiceProvider):
|
|||||||
new_state = dict(tracker_state)
|
new_state = dict(tracker_state)
|
||||||
external_url = self._client.external_url
|
external_url = self._client.external_url
|
||||||
|
|
||||||
for album_id in collection_ids:
|
# Tick-scoped share-link cache. Populated lazily on first enrichment;
|
||||||
album = await self._client.get_album(album_id, self._users_cache)
|
# a tracker watching 5 albums with changes now issues 1 ``/api/shared-links``
|
||||||
if album is None:
|
# request per tick instead of 5 (and the endpoint is server-wide — each
|
||||||
|
# call was already fetching all links and discarding most of them).
|
||||||
|
self._tick_shared_links: dict[str, list] | None = None
|
||||||
|
|
||||||
|
# Fan out the cheap meta probes in parallel. For a tracker that
|
||||||
|
# watches 20 albums on the same Immich server this turns a 20-hop
|
||||||
|
# serial wait into ~1 round-trip's worth of latency. aiohttp's
|
||||||
|
# connection pool caps concurrency per host, so this can't stampede.
|
||||||
|
meta_results = await asyncio.gather(
|
||||||
|
*(self._client.get_album_meta(aid) for aid in collection_ids),
|
||||||
|
return_exceptions=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
for album_id, meta_or_exc in zip(collection_ids, meta_results):
|
||||||
|
if isinstance(meta_or_exc, BaseException):
|
||||||
|
# Transient failure on this album — preserve existing state
|
||||||
|
# and move on. Logging at warning so flaky albums surface in
|
||||||
|
# the log without flooding on hard outages.
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Meta probe failed for album %s: %s", album_id, meta_or_exc,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
meta = meta_or_exc
|
||||||
|
if meta is None:
|
||||||
# Album deleted
|
# Album deleted
|
||||||
if album_id in new_state:
|
if album_id in new_state:
|
||||||
from notify_bridge_core.models.events import EventType
|
from notify_bridge_core.models.events import EventType
|
||||||
@@ -168,11 +245,74 @@ class ImmichServiceProvider(ServiceProvider):
|
|||||||
del new_state[album_id]
|
del new_state[album_id]
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Get previous state
|
|
||||||
prev = new_state.get(album_id)
|
prev = new_state.get(album_id)
|
||||||
|
prev_fingerprint = prev.get("meta_fingerprint") if prev else None
|
||||||
|
has_pending = bool(prev and prev.get("pending_asset_ids"))
|
||||||
|
|
||||||
|
# 2) Fast-path: fingerprint match and no pending assets → no work.
|
||||||
|
# We still refresh the fingerprint slot (no-op if identical) and
|
||||||
|
# leave asset_ids untouched on disk.
|
||||||
|
if (
|
||||||
|
prev is not None
|
||||||
|
and prev_fingerprint == meta.fingerprint()
|
||||||
|
and not has_pending
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 3) Decide: delta fetch (cheap, active-album case) or full
|
||||||
|
# fetch (first tick + reconciliation for removals).
|
||||||
|
old_fp = prev.get("meta_fingerprint") if prev else None
|
||||||
|
old_asset_count = (old_fp or {}).get("asset_count", 0)
|
||||||
|
old_updated_at = (old_fp or {}).get("updated_at", "")
|
||||||
|
|
||||||
|
# Gate for the delta path:
|
||||||
|
# - must be tracked already (prev exists, has asset_ids)
|
||||||
|
# - must have a prior timestamp (empty ⇒ migrated DB row)
|
||||||
|
# - asset_count must not have decreased (removals need full fetch)
|
||||||
|
can_delta = (
|
||||||
|
prev is not None
|
||||||
|
and bool(prev.get("asset_ids"))
|
||||||
|
and bool(old_updated_at)
|
||||||
|
and meta.asset_count >= old_asset_count
|
||||||
|
)
|
||||||
|
|
||||||
|
if can_delta:
|
||||||
|
delta_events = await self._poll_delta(
|
||||||
|
album_id=album_id,
|
||||||
|
prev=prev,
|
||||||
|
new_meta=meta,
|
||||||
|
old_updated_at=old_updated_at,
|
||||||
|
)
|
||||||
|
if delta_events is not None:
|
||||||
|
events.extend(delta_events["events"])
|
||||||
|
new_state[album_id] = delta_events["new_state"]
|
||||||
|
continue
|
||||||
|
# delta_events is None ⇒ delta saw more additions than the
|
||||||
|
# net count increase (mixed add+remove) ⇒ fall through to
|
||||||
|
# the full-fetch path so removals get detected.
|
||||||
|
|
||||||
|
# Full fetch: first tick, or count-decreased, or delta-unsafe.
|
||||||
|
album = await self._client.get_album(album_id, self._users_cache)
|
||||||
|
if album is None:
|
||||||
|
# Album was deleted between meta probe and full fetch — handle
|
||||||
|
# the deletion the same way as above.
|
||||||
|
if album_id in new_state:
|
||||||
|
from notify_bridge_core.models.events import EventType
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
events.append(ServiceEvent(
|
||||||
|
event_type=EventType.COLLECTION_DELETED,
|
||||||
|
provider_type=ServiceProviderType.IMMICH,
|
||||||
|
provider_name=self._name,
|
||||||
|
collection_id=album_id,
|
||||||
|
collection_name=new_state.get(album_id, {}).get("name", "Unknown"),
|
||||||
|
timestamp=datetime.now(timezone.utc),
|
||||||
|
))
|
||||||
|
del new_state[album_id]
|
||||||
|
continue
|
||||||
|
|
||||||
if prev is None:
|
if prev is None:
|
||||||
# First time seeing this album — store state, no event
|
# First time seeing this album — store state, no event
|
||||||
new_state[album_id] = _serialize_album_state(album)
|
new_state[album_id] = _serialize_album_state(album, meta)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Reconstruct previous album data for comparison
|
# Reconstruct previous album data for comparison
|
||||||
@@ -184,34 +324,233 @@ class ImmichServiceProvider(ServiceProvider):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if detected_events:
|
if detected_events:
|
||||||
# Fetch shared links to enrich events with public_url
|
await self._enrich_with_shared_links(album_id, detected_events)
|
||||||
shared_links = await self._client.get_shared_links(album_id)
|
|
||||||
public_link = None
|
|
||||||
protected_link = None
|
|
||||||
for link in shared_links:
|
|
||||||
if link.is_accessible and not link.is_expired:
|
|
||||||
if link.has_password:
|
|
||||||
protected_link = link
|
|
||||||
else:
|
|
||||||
public_link = link
|
|
||||||
break # prefer non-password link
|
|
||||||
|
|
||||||
ext_domain = self._external_domain or self._client.external_url
|
|
||||||
for evt in detected_events:
|
|
||||||
if public_link:
|
|
||||||
evt.extra["public_url"] = f"{ext_domain}/share/{public_link.key}"
|
|
||||||
elif protected_link:
|
|
||||||
evt.extra["protected_url"] = f"{ext_domain}/share/{protected_link.key}"
|
|
||||||
|
|
||||||
events.extend(detected_events)
|
events.extend(detected_events)
|
||||||
|
|
||||||
# Update state
|
# Update state
|
||||||
state = _serialize_album_state(album)
|
state = _serialize_album_state(album, meta)
|
||||||
state["pending_asset_ids"] = list(updated_pending)
|
state["pending_asset_ids"] = list(updated_pending)
|
||||||
new_state[album_id] = state
|
new_state[album_id] = state
|
||||||
|
|
||||||
return events, new_state
|
return events, new_state
|
||||||
|
|
||||||
|
async def _poll_delta(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
album_id: str,
|
||||||
|
prev: dict[str, Any],
|
||||||
|
new_meta: ImmichAlbumMeta,
|
||||||
|
old_updated_at: str,
|
||||||
|
) -> dict[str, Any] | None:
|
||||||
|
"""Delta-fetch path for an active album.
|
||||||
|
|
||||||
|
Calls ``search/metadata`` with ``updatedAfter`` instead of pulling
|
||||||
|
the full asset list. Returns a dict with ``events`` and ``new_state``
|
||||||
|
on success, or ``None`` to signal the caller to retry via full fetch
|
||||||
|
(used when a mixed add+remove is detected — the delta endpoint can't
|
||||||
|
tell us *what* was removed, only that additions alone don't account
|
||||||
|
for the net count change).
|
||||||
|
|
||||||
|
Trades strict detection of removals-during-mixed-changes for a
|
||||||
|
drastic reduction in bytes fetched per tick. On a 200k-asset album
|
||||||
|
where 50 were just added, we fetch ~50 asset records instead of
|
||||||
|
200 000.
|
||||||
|
"""
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
from notify_bridge_core.models.events import EventType
|
||||||
|
|
||||||
|
prev_asset_ids: set[str] = set(prev.get("asset_ids", []))
|
||||||
|
prev_pending: set[str] = set(prev.get("pending_asset_ids", []))
|
||||||
|
|
||||||
|
raw_assets = await self._client.search_album_assets_updated_after(
|
||||||
|
album_id, old_updated_at
|
||||||
|
)
|
||||||
|
|
||||||
|
# Parse everything that came back. We need unprocessed entries too
|
||||||
|
# (they feed the ``pending_asset_ids`` list used by the original
|
||||||
|
# change detector's processed-later logic).
|
||||||
|
delta_assets: list[ImmichAssetInfo] = []
|
||||||
|
for raw in raw_assets:
|
||||||
|
try:
|
||||||
|
delta_assets.append(
|
||||||
|
ImmichAssetInfo.from_api_response(raw, self._users_cache)
|
||||||
|
)
|
||||||
|
except Exception as err: # noqa: BLE001 — one bad record ≠ abort tick
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Skipping malformed asset record in delta response: %s", err
|
||||||
|
)
|
||||||
|
|
||||||
|
newly_added: list[ImmichAssetInfo] = []
|
||||||
|
still_pending: set[str] = set()
|
||||||
|
for asset in delta_assets:
|
||||||
|
if asset.is_processed:
|
||||||
|
if asset.id not in prev_asset_ids:
|
||||||
|
newly_added.append(asset)
|
||||||
|
else:
|
||||||
|
still_pending.add(asset.id)
|
||||||
|
|
||||||
|
old_asset_count = int((prev.get("meta_fingerprint") or {}).get("asset_count", 0))
|
||||||
|
net_change = new_meta.asset_count - old_asset_count
|
||||||
|
|
||||||
|
# If delta found more "added" assets than the net count change,
|
||||||
|
# a concurrent removal happened. Full fetch is the only way to
|
||||||
|
# know what was removed — bail out so the caller retries.
|
||||||
|
if net_change >= 0 and len(newly_added) > net_change:
|
||||||
|
_LOGGER.info(
|
||||||
|
"Delta for album %s found %d additions but net change is %d "
|
||||||
|
"— falling back to full fetch for removal reconciliation",
|
||||||
|
album_id, len(newly_added), net_change,
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Mirror case: positive net change we couldn't account for with the
|
||||||
|
# delta results (possibly clock skew on ``updated_at``, or an asset
|
||||||
|
# whose timestamp is before ``old_updated_at`` yet the album's
|
||||||
|
# ``updatedAt`` bumped). Full fetch to avoid silently missing adds.
|
||||||
|
if net_change > 0 and len(newly_added) < net_change:
|
||||||
|
_LOGGER.info(
|
||||||
|
"Delta for album %s found %d additions but net change is %d "
|
||||||
|
"— falling back to full fetch to avoid missing assets",
|
||||||
|
album_id, len(newly_added), net_change,
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
events: list[ServiceEvent] = []
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
external_url = self._external_domain or self._client.external_url
|
||||||
|
album_url = f"{external_url}/albums/{album_id}"
|
||||||
|
|
||||||
|
# Carry album-level attributes we know from the cheap meta probe.
|
||||||
|
# Shared-link enrichment happens further down only if we emitted
|
||||||
|
# any asset events.
|
||||||
|
base_extra = {
|
||||||
|
"album_url": album_url,
|
||||||
|
"shared": new_meta.shared,
|
||||||
|
"asset_count": new_meta.asset_count,
|
||||||
|
"photo_count": 0, # unknown without per-asset scan; templates tolerate 0
|
||||||
|
"video_count": 0,
|
||||||
|
"people": [],
|
||||||
|
"owner": "",
|
||||||
|
}
|
||||||
|
|
||||||
|
# Metadata-only events (no asset fetch needed)
|
||||||
|
old_fp = prev.get("meta_fingerprint") or {}
|
||||||
|
if old_fp.get("name") and old_fp["name"] != new_meta.name:
|
||||||
|
events.append(ServiceEvent(
|
||||||
|
event_type=EventType.COLLECTION_RENAMED,
|
||||||
|
provider_type=ServiceProviderType.IMMICH,
|
||||||
|
provider_name=self._name,
|
||||||
|
collection_id=album_id,
|
||||||
|
collection_name=new_meta.name,
|
||||||
|
timestamp=now,
|
||||||
|
added_assets=[],
|
||||||
|
removed_asset_ids=[],
|
||||||
|
added_count=0,
|
||||||
|
removed_count=0,
|
||||||
|
old_name=old_fp["name"],
|
||||||
|
new_name=new_meta.name,
|
||||||
|
extra=dict(base_extra),
|
||||||
|
))
|
||||||
|
|
||||||
|
if "shared" in old_fp and bool(old_fp["shared"]) != bool(new_meta.shared):
|
||||||
|
events.append(ServiceEvent(
|
||||||
|
event_type=EventType.SHARING_CHANGED,
|
||||||
|
provider_type=ServiceProviderType.IMMICH,
|
||||||
|
provider_name=self._name,
|
||||||
|
collection_id=album_id,
|
||||||
|
collection_name=new_meta.name,
|
||||||
|
timestamp=now,
|
||||||
|
added_assets=[],
|
||||||
|
removed_asset_ids=[],
|
||||||
|
added_count=0,
|
||||||
|
removed_count=0,
|
||||||
|
old_shared=bool(old_fp["shared"]),
|
||||||
|
new_shared=bool(new_meta.shared),
|
||||||
|
extra=dict(base_extra),
|
||||||
|
))
|
||||||
|
|
||||||
|
if newly_added:
|
||||||
|
total_added = len(newly_added)
|
||||||
|
truncated = newly_added[:_MAX_ASSETS_PER_EVENT]
|
||||||
|
media_assets = [
|
||||||
|
asset_to_media(a, self._client.external_url) for a in truncated
|
||||||
|
]
|
||||||
|
extra = dict(base_extra)
|
||||||
|
if total_added > _MAX_ASSETS_PER_EVENT:
|
||||||
|
extra["truncated"] = True
|
||||||
|
extra["shown_count"] = _MAX_ASSETS_PER_EVENT
|
||||||
|
_LOGGER.info(
|
||||||
|
"Delta-path truncated assets_added event for album %s: %d → %d",
|
||||||
|
album_id, total_added, _MAX_ASSETS_PER_EVENT,
|
||||||
|
)
|
||||||
|
events.append(ServiceEvent(
|
||||||
|
event_type=EventType.ASSETS_ADDED,
|
||||||
|
provider_type=ServiceProviderType.IMMICH,
|
||||||
|
provider_name=self._name,
|
||||||
|
collection_id=album_id,
|
||||||
|
collection_name=new_meta.name,
|
||||||
|
timestamp=now,
|
||||||
|
added_assets=media_assets,
|
||||||
|
removed_asset_ids=[],
|
||||||
|
added_count=total_added,
|
||||||
|
removed_count=0,
|
||||||
|
extra=extra,
|
||||||
|
))
|
||||||
|
|
||||||
|
if events:
|
||||||
|
await self._enrich_with_shared_links(album_id, events)
|
||||||
|
|
||||||
|
# Rebuild state. asset_ids grows by the newly-added processed set.
|
||||||
|
# pending is the union of the prior pending list (things still in
|
||||||
|
# flight) and anything the delta confirmed as not-yet-processed.
|
||||||
|
# When net_change is 0 or negative we trust the meta count over
|
||||||
|
# our bookkeeping — skip-path will fix drift on the next full fetch.
|
||||||
|
new_asset_ids = prev_asset_ids | {a.id for a in newly_added}
|
||||||
|
# Discard any previously-pending IDs that just landed as processed.
|
||||||
|
new_pending = (prev_pending | still_pending) - {a.id for a in newly_added}
|
||||||
|
|
||||||
|
return {
|
||||||
|
"events": events,
|
||||||
|
"new_state": {
|
||||||
|
"name": new_meta.name,
|
||||||
|
"asset_ids": list(new_asset_ids),
|
||||||
|
"shared": new_meta.shared,
|
||||||
|
"pending_asset_ids": list(new_pending),
|
||||||
|
"meta_fingerprint": new_meta.fingerprint(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
async def _enrich_with_shared_links(
|
||||||
|
self, album_id: str, events_to_enrich: list[ServiceEvent]
|
||||||
|
) -> None:
|
||||||
|
"""Attach public/protected share link URLs to events for this album.
|
||||||
|
|
||||||
|
Uses the tick-scoped bulk cache populated lazily on first call, so a
|
||||||
|
tracker with changes across N albums makes one ``/api/shared-links``
|
||||||
|
request per tick instead of N.
|
||||||
|
"""
|
||||||
|
if self._tick_shared_links is None:
|
||||||
|
self._tick_shared_links = await self._client.get_all_shared_links_by_album()
|
||||||
|
|
||||||
|
shared_links = self._tick_shared_links.get(album_id, [])
|
||||||
|
public_link = None
|
||||||
|
protected_link = None
|
||||||
|
for link in shared_links:
|
||||||
|
if link.is_accessible and not link.is_expired:
|
||||||
|
if link.has_password:
|
||||||
|
protected_link = link
|
||||||
|
else:
|
||||||
|
public_link = link
|
||||||
|
break # prefer non-password link
|
||||||
|
|
||||||
|
ext_domain = self._external_domain or self._client.external_url
|
||||||
|
for evt in events_to_enrich:
|
||||||
|
if public_link:
|
||||||
|
evt.extra["public_url"] = f"{ext_domain}/share/{public_link.key}"
|
||||||
|
elif protected_link:
|
||||||
|
evt.extra["protected_url"] = f"{ext_domain}/share/{protected_link.key}"
|
||||||
|
|
||||||
def get_available_variables(self) -> list[TemplateVariableDefinition]:
|
def get_available_variables(self) -> list[TemplateVariableDefinition]:
|
||||||
return list(IMMICH_VARIABLES)
|
return list(IMMICH_VARIABLES)
|
||||||
|
|
||||||
@@ -262,13 +601,33 @@ class ImmichServiceProvider(ServiceProvider):
|
|||||||
return {"ok": False, "message": "Failed to connect to Immich"}
|
return {"ok": False, "message": "Failed to connect to Immich"}
|
||||||
|
|
||||||
|
|
||||||
def _serialize_album_state(album: ImmichAlbumData) -> dict[str, Any]:
|
def _serialize_album_state(
|
||||||
"""Serialize album state for persistence."""
|
album: ImmichAlbumData,
|
||||||
|
meta: ImmichAlbumMeta | None = None,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Serialize album state for persistence.
|
||||||
|
|
||||||
|
``meta`` carries the fingerprint used for cheap no-change detection on
|
||||||
|
subsequent polls. When omitted (legacy callers, tests) we synthesize a
|
||||||
|
best-effort fingerprint from the full album — it will still match on the
|
||||||
|
next tick if nothing changed, which is what matters.
|
||||||
|
"""
|
||||||
|
if meta is None:
|
||||||
|
fingerprint = {
|
||||||
|
"updated_at": album.updated_at,
|
||||||
|
"asset_count": len(album.asset_ids),
|
||||||
|
"shared": album.shared,
|
||||||
|
"name": album.name,
|
||||||
|
"thumbnail_asset_id": album.thumbnail_asset_id or "",
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
fingerprint = meta.fingerprint()
|
||||||
return {
|
return {
|
||||||
"name": album.name,
|
"name": album.name,
|
||||||
"asset_ids": list(album.asset_ids),
|
"asset_ids": list(album.asset_ids),
|
||||||
"shared": album.shared,
|
"shared": album.shared,
|
||||||
"pending_asset_ids": [],
|
"pending_asset_ids": [],
|
||||||
|
"meta_fingerprint": fingerprint,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -309,6 +309,14 @@ async def migrate_schema(engine: AsyncEngine) -> None:
|
|||||||
text(f"ALTER TABLE {state_table} ADD COLUMN shared INTEGER DEFAULT 0")
|
text(f"ALTER TABLE {state_table} ADD COLUMN shared INTEGER DEFAULT 0")
|
||||||
)
|
)
|
||||||
logger.info("Added shared column to %s table", state_table)
|
logger.info("Added shared column to %s table", state_table)
|
||||||
|
# meta_fingerprint — small JSON blob captured from the provider's
|
||||||
|
# cheap meta probe. An empty default means "unknown, do a full
|
||||||
|
# fetch next tick" so existing rows don't wrongly skip detection.
|
||||||
|
if not await _has_column(conn, state_table, "meta_fingerprint"):
|
||||||
|
await conn.execute(
|
||||||
|
text(f"ALTER TABLE {state_table} ADD COLUMN meta_fingerprint TEXT DEFAULT '{{}}'")
|
||||||
|
)
|
||||||
|
logger.info("Added meta_fingerprint column to %s table", state_table)
|
||||||
|
|
||||||
# Add language_code to telegram_chat if missing
|
# Add language_code to telegram_chat if missing
|
||||||
if await _has_table(conn, "telegram_chat"):
|
if await _has_table(conn, "telegram_chat"):
|
||||||
|
|||||||
@@ -376,6 +376,13 @@ class NotificationTrackerState(SQLModel, table=True):
|
|||||||
shared: bool = Field(default=False)
|
shared: bool = Field(default=False)
|
||||||
asset_ids: list[str] = Field(default_factory=list, sa_column=Column(JSON))
|
asset_ids: list[str] = Field(default_factory=list, sa_column=Column(JSON))
|
||||||
pending_asset_ids: list[str] = Field(default_factory=list, sa_column=Column(JSON))
|
pending_asset_ids: list[str] = Field(default_factory=list, sa_column=Column(JSON))
|
||||||
|
# Lightweight fingerprint ({updated_at, asset_count, shared, name, ...})
|
||||||
|
# captured from the provider's cheap meta probe. Letting this differ from
|
||||||
|
# the current provider response is what tells the watcher a full fetch is
|
||||||
|
# actually required — letting it match lets the watcher skip the big read.
|
||||||
|
meta_fingerprint: dict[str, Any] = Field(
|
||||||
|
default_factory=dict, sa_column=Column(JSON)
|
||||||
|
)
|
||||||
last_updated: datetime = Field(default_factory=_utcnow)
|
last_updated: datetime = Field(default_factory=_utcnow)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -10,6 +10,49 @@ _LOGGER = logging.getLogger(__name__)
|
|||||||
|
|
||||||
_scheduler: AsyncIOScheduler | None = None
|
_scheduler: AsyncIOScheduler | None = None
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Adaptive polling (Tier 6 of the big-album optimization plan).
|
||||||
|
#
|
||||||
|
# We don't touch the user-configured ``scan_interval`` — that's still the
|
||||||
|
# authoritative cadence. Instead, we *skip* a growing fraction of scheduled
|
||||||
|
# ticks when a tracker is idle, and reset to 1:1 as soon as it detects
|
||||||
|
# anything. The scheduler keeps running on the user's chosen period, so
|
||||||
|
# response time to the *first* change after an idle stretch is never worse
|
||||||
|
# than one tick — but the steady-state HTTP cost for a fleet of idle
|
||||||
|
# trackers drops by ~75%.
|
||||||
|
#
|
||||||
|
# Thresholds are intentionally conservative: a tracker polling every 30 s
|
||||||
|
# needs 5 min of silence before we halve its effective rate, and 15 min
|
||||||
|
# before we quarter it. Any caller can disable adaptive behavior by passing
|
||||||
|
# ``adaptive=False`` in the tracker filters dict (checked in ``_poll_tracker``).
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_ADAPTIVE_HALVE_THRESHOLD = 10 # consecutive empty ticks → 1-in-2
|
||||||
|
_ADAPTIVE_QUARTER_THRESHOLD = 30 # consecutive empty ticks → 1-in-4
|
||||||
|
_ADAPTIVE_MAX_SKIP = 4 # hard cap on skip factor
|
||||||
|
|
||||||
|
# Per-tracker adaptive state, keyed by tracker_id. Rebuilt on process
|
||||||
|
# restart — a short warmup period is fine and avoids persisting what is
|
||||||
|
# effectively a performance heuristic.
|
||||||
|
_adaptive_state: dict[int, dict[str, int]] = {}
|
||||||
|
|
||||||
|
|
||||||
|
def _compute_jitter(interval_seconds: int) -> int:
|
||||||
|
"""Return a jitter bound (in seconds) suitable for an IntervalTrigger.
|
||||||
|
|
||||||
|
Without jitter, a fleet of N trackers all on ``scan_interval=60`` wake up
|
||||||
|
at the same wall-clock second every minute — that creates a thundering-
|
||||||
|
herd on the upstream Immich/Gitea/etc. server. APScheduler's ``jitter``
|
||||||
|
randomizes each tick's firing time by ±jitter seconds.
|
||||||
|
|
||||||
|
We use a quarter of the interval up to a 30 s cap. For short intervals
|
||||||
|
(≤8 s) jitter would round to 0 — that's fine, at those cadences a
|
||||||
|
bursty pattern is what the user implicitly opted into.
|
||||||
|
"""
|
||||||
|
if interval_seconds <= 0:
|
||||||
|
return 0
|
||||||
|
return min(interval_seconds // 4, 30)
|
||||||
|
|
||||||
|
|
||||||
def get_scheduler() -> AsyncIOScheduler:
|
def get_scheduler() -> AsyncIOScheduler:
|
||||||
global _scheduler
|
global _scheduler
|
||||||
@@ -271,16 +314,21 @@ async def _load_tracker_jobs() -> None:
|
|||||||
tracker.id, tracker.name, e,
|
tracker.id, tracker.name, e,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
jitter = _compute_jitter(tracker.scan_interval)
|
||||||
scheduler.add_job(
|
scheduler.add_job(
|
||||||
_poll_tracker,
|
_poll_tracker,
|
||||||
"interval",
|
"interval",
|
||||||
seconds=tracker.scan_interval,
|
seconds=tracker.scan_interval,
|
||||||
|
jitter=jitter or None,
|
||||||
id=job_id,
|
id=job_id,
|
||||||
args=[tracker.id],
|
args=[tracker.id],
|
||||||
replace_existing=True,
|
replace_existing=True,
|
||||||
max_instances=1,
|
max_instances=1,
|
||||||
)
|
)
|
||||||
_LOGGER.info("Scheduled tracker %d (%s) every %ds", tracker.id, tracker.name, tracker.scan_interval)
|
_LOGGER.info(
|
||||||
|
"Scheduled tracker %d (%s) every %ds (jitter ±%ds)",
|
||||||
|
tracker.id, tracker.name, tracker.scan_interval, jitter,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _add_cron_job(
|
def _add_cron_job(
|
||||||
@@ -313,6 +361,10 @@ async def schedule_tracker(
|
|||||||
scheduler = get_scheduler()
|
scheduler = get_scheduler()
|
||||||
job_id = f"tracker_{tracker_id}"
|
job_id = f"tracker_{tracker_id}"
|
||||||
|
|
||||||
|
# A reschedule typically follows a config edit or enable/disable flip —
|
||||||
|
# drop adaptive back-off so the first tick after the change runs promptly.
|
||||||
|
reset_adaptive_state(tracker_id)
|
||||||
|
|
||||||
# Remove existing job first to allow trigger type changes
|
# Remove existing job first to allow trigger type changes
|
||||||
if scheduler.get_job(job_id):
|
if scheduler.get_job(job_id):
|
||||||
scheduler.remove_job(job_id)
|
scheduler.remove_job(job_id)
|
||||||
@@ -324,33 +376,113 @@ async def schedule_tracker(
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
_LOGGER.error("Invalid cron for tracker %d: %s — using interval", tracker_id, e)
|
_LOGGER.error("Invalid cron for tracker %d: %s — using interval", tracker_id, e)
|
||||||
|
|
||||||
|
jitter = _compute_jitter(interval)
|
||||||
scheduler.add_job(
|
scheduler.add_job(
|
||||||
_poll_tracker,
|
_poll_tracker,
|
||||||
"interval",
|
"interval",
|
||||||
seconds=interval,
|
seconds=interval,
|
||||||
|
jitter=jitter or None,
|
||||||
id=job_id,
|
id=job_id,
|
||||||
args=[tracker_id],
|
args=[tracker_id],
|
||||||
replace_existing=True,
|
replace_existing=True,
|
||||||
)
|
)
|
||||||
_LOGGER.info("Scheduled tracker %d every %ds", tracker_id, interval)
|
_LOGGER.info(
|
||||||
|
"Scheduled tracker %d every %ds (jitter ±%ds)", tracker_id, interval, jitter,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def unschedule_tracker(tracker_id: int) -> None:
|
async def unschedule_tracker(tracker_id: int) -> None:
|
||||||
"""Remove a scheduler job for a tracker."""
|
"""Remove a scheduler job for a tracker."""
|
||||||
scheduler = get_scheduler()
|
scheduler = get_scheduler()
|
||||||
job_id = f"tracker_{tracker_id}"
|
job_id = f"tracker_{tracker_id}"
|
||||||
|
reset_adaptive_state(tracker_id)
|
||||||
if scheduler.get_job(job_id):
|
if scheduler.get_job(job_id):
|
||||||
scheduler.remove_job(job_id)
|
scheduler.remove_job(job_id)
|
||||||
_LOGGER.info("Unscheduled tracker %d", tracker_id)
|
_LOGGER.info("Unscheduled tracker %d", tracker_id)
|
||||||
|
|
||||||
|
|
||||||
|
def _adaptive_should_skip(tracker_id: int) -> bool:
|
||||||
|
"""Return True when the adaptive heuristic says to skip this tick.
|
||||||
|
|
||||||
|
Run-length skip: if we're in 1-in-K mode, skip (K-1) ticks between each
|
||||||
|
real poll. Stateless about the *current* tick counter except for the
|
||||||
|
``tick_counter`` we bump here.
|
||||||
|
"""
|
||||||
|
state = _adaptive_state.get(tracker_id)
|
||||||
|
if not state:
|
||||||
|
return False
|
||||||
|
skip_every = state.get("skip_every", 1)
|
||||||
|
if skip_every <= 1:
|
||||||
|
return False
|
||||||
|
state["tick_counter"] = state.get("tick_counter", 0) + 1
|
||||||
|
# Fire on ticks where counter % skip_every == 0; skip the rest.
|
||||||
|
return (state["tick_counter"] % skip_every) != 0
|
||||||
|
|
||||||
|
|
||||||
|
def _adaptive_update(tracker_id: int, events_detected: int) -> None:
|
||||||
|
"""Update the adaptive counter after a real tick ran."""
|
||||||
|
state = _adaptive_state.setdefault(
|
||||||
|
tracker_id, {"empty_count": 0, "skip_every": 1, "tick_counter": 0}
|
||||||
|
)
|
||||||
|
if events_detected > 0:
|
||||||
|
if state["skip_every"] > 1:
|
||||||
|
_LOGGER.info(
|
||||||
|
"Adaptive polling: tracker %d saw activity, restoring base rate",
|
||||||
|
tracker_id,
|
||||||
|
)
|
||||||
|
state["empty_count"] = 0
|
||||||
|
state["skip_every"] = 1
|
||||||
|
state["tick_counter"] = 0
|
||||||
|
return
|
||||||
|
|
||||||
|
state["empty_count"] = state.get("empty_count", 0) + 1
|
||||||
|
if (
|
||||||
|
state["empty_count"] >= _ADAPTIVE_QUARTER_THRESHOLD
|
||||||
|
and state["skip_every"] < _ADAPTIVE_MAX_SKIP
|
||||||
|
):
|
||||||
|
state["skip_every"] = _ADAPTIVE_MAX_SKIP
|
||||||
|
_LOGGER.info(
|
||||||
|
"Adaptive polling: tracker %d idle for %d ticks, skipping 3 of 4",
|
||||||
|
tracker_id, state["empty_count"],
|
||||||
|
)
|
||||||
|
elif (
|
||||||
|
state["empty_count"] >= _ADAPTIVE_HALVE_THRESHOLD
|
||||||
|
and state["skip_every"] < 2
|
||||||
|
):
|
||||||
|
state["skip_every"] = 2
|
||||||
|
_LOGGER.info(
|
||||||
|
"Adaptive polling: tracker %d idle for %d ticks, skipping every other",
|
||||||
|
tracker_id, state["empty_count"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def reset_adaptive_state(tracker_id: int) -> None:
|
||||||
|
"""Drop cached adaptive counters for a tracker.
|
||||||
|
|
||||||
|
Used by API callers that make changes requiring the tracker to run
|
||||||
|
promptly on the next scheduled tick (enable/disable, config edits,
|
||||||
|
manual "check now" actions).
|
||||||
|
"""
|
||||||
|
_adaptive_state.pop(tracker_id, None)
|
||||||
|
|
||||||
|
|
||||||
async def _poll_tracker(tracker_id: int) -> None:
|
async def _poll_tracker(tracker_id: int) -> None:
|
||||||
"""Poll a tracker for changes."""
|
"""Poll a tracker for changes."""
|
||||||
from .watcher import check_tracker
|
from .watcher import check_tracker
|
||||||
|
|
||||||
|
if _adaptive_should_skip(tracker_id):
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await check_tracker(tracker_id)
|
result = await check_tracker(tracker_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
_LOGGER.error("Error polling tracker %d: %s", tracker_id, e)
|
_LOGGER.error("Error polling tracker %d: %s", tracker_id, e)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Treat the "error" / "skipped" statuses as inconclusive — don't let
|
||||||
|
# a transient upstream failure trick the heuristic into backing off.
|
||||||
|
if isinstance(result, dict) and result.get("status") == "ok":
|
||||||
|
_adaptive_update(tracker_id, int(result.get("events_detected", 0) or 0))
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -187,8 +187,17 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
|
|||||||
"asset_ids": s.asset_ids,
|
"asset_ids": s.asset_ids,
|
||||||
"pending_asset_ids": s.pending_asset_ids,
|
"pending_asset_ids": s.pending_asset_ids,
|
||||||
"shared": bool(s.shared),
|
"shared": bool(s.shared),
|
||||||
|
"meta_fingerprint": s.meta_fingerprint or {},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Snapshot the original fingerprint per collection so we can skip the
|
||||||
|
# (expensive) asset_ids rewrite when nothing changed. For a 200k-asset
|
||||||
|
# album this avoids a ~7 MB JSON write to the state row every tick.
|
||||||
|
original_fingerprints: dict[str, dict[str, Any]] = {
|
||||||
|
cid: dict(cstate.get("meta_fingerprint") or {})
|
||||||
|
for cid, cstate in state_dict.items()
|
||||||
|
}
|
||||||
|
|
||||||
# Load tracker-target links
|
# Load tracker-target links
|
||||||
link_data = await load_link_data(session, tracker_id)
|
link_data = await load_link_data(session, tracker_id)
|
||||||
|
|
||||||
@@ -279,11 +288,20 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
|
|||||||
existing = s
|
existing = s
|
||||||
break
|
break
|
||||||
|
|
||||||
|
current_fingerprint = dict(cstate.get("meta_fingerprint") or {})
|
||||||
|
prior_fingerprint = original_fingerprints.get(cid, {})
|
||||||
|
# Skip the DB update when the provider reported no meaningful
|
||||||
|
# change. ``existing`` is None on first-ever fetch for a
|
||||||
|
# collection — that path always writes so the row gets created.
|
||||||
|
if existing is not None and current_fingerprint == prior_fingerprint:
|
||||||
|
continue
|
||||||
|
|
||||||
if existing:
|
if existing:
|
||||||
existing.asset_ids = cstate.get("asset_ids", [])
|
existing.asset_ids = cstate.get("asset_ids", [])
|
||||||
existing.pending_asset_ids = cstate.get("pending_asset_ids", [])
|
existing.pending_asset_ids = cstate.get("pending_asset_ids", [])
|
||||||
existing.collection_name = cstate.get("name", "")
|
existing.collection_name = cstate.get("name", "")
|
||||||
existing.shared = cstate.get("shared", False)
|
existing.shared = cstate.get("shared", False)
|
||||||
|
existing.meta_fingerprint = current_fingerprint
|
||||||
session.add(existing)
|
session.add(existing)
|
||||||
else:
|
else:
|
||||||
new_ts = NotificationTrackerState(
|
new_ts = NotificationTrackerState(
|
||||||
@@ -293,6 +311,7 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
|
|||||||
shared=cstate.get("shared", False),
|
shared=cstate.get("shared", False),
|
||||||
asset_ids=cstate.get("asset_ids", []),
|
asset_ids=cstate.get("asset_ids", []),
|
||||||
pending_asset_ids=cstate.get("pending_asset_ids", []),
|
pending_asset_ids=cstate.get("pending_asset_ids", []),
|
||||||
|
meta_fingerprint=current_fingerprint,
|
||||||
)
|
)
|
||||||
session.add(new_ts)
|
session.add(new_ts)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user