Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5604c733d1 | |||
| 3b7808aa9c | |||
| 155d25edf9 | |||
| 69711bbc84 | |||
| fe38d20b96 | |||
| d02616069d | |||
| 7dae68fd93 | |||
| e6481605ca | |||
| 6de9a1289e | |||
| 325eabd751 | |||
| fab6169cf9 | |||
| 85311684d9 |
+12
-8
@@ -1,19 +1,23 @@
|
||||
# v0.2.5 (2026-04-22)
|
||||
# v0.3.1 (2026-04-22)
|
||||
|
||||
Hotfix release on top of v0.2.4 — the settings page couldn't save numeric
|
||||
fields after the cache-TTL / max-entries rework. See v0.2.4 notes for the
|
||||
main changes (thumbhash-validated cache, settings UX overhaul, mobile-nav
|
||||
parity).
|
||||
Follow-up perf pass on top of v0.3.0's polling overhaul — extends the same
|
||||
caching discipline to the bot-command read paths so repeat `/random`,
|
||||
`/latest`, `/memory`, etc. against the same album don't each refetch a
|
||||
multi-megabyte album body or pay for a full server-wide `/api/shared-links`
|
||||
listing.
|
||||
|
||||
## Bug Fixes
|
||||
## Performance
|
||||
|
||||
- **Accept numeric values in settings update payload** — Svelte's `bind:value` on `<input type="number">` coerces to a JS number, and Pydantic v2 wouldn't auto-coerce `int → str`, producing a 422 on every save that touched a numeric setting (TTL, max entries) after v0.2.4. Widened numeric fields to `int | str | None` in `SettingsUpdate` and normalized to `str` before persisting. ([d7d0a5d](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/d7d0a5d))
|
||||
- **TTL-cache `GET /api/albums/{id}` responses** — 60 s TTL, 32-entry FIFO cap, keyed by `(server_digest, album_id)`. Module-scoped rather than instance-scoped because `ImmichClient` is constructed fresh per request in several places (`api/providers.py`, `services/action_runner.py`, command handlers), so an instance cache would never survive a second caller. Mirrors the existing `_users_cache` pattern. ([3b7808a](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/3b7808a))
|
||||
- **TTL-cache the bucketed shared-links map** — 60 s TTL, keyed by server digest. `/api/shared-links` has no per-album filter, so every `get_shared_links(album_id)` call was already paying for the full server-wide list; now one fetch serves every album until the TTL elapses. ([3b7808a](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/3b7808a))
|
||||
- **Collapse concurrent cache misses to one fetch** — async lock with an under-lock re-check around the album / shared-links populate step, so a burst of parallel commands hitting the same cold key issues one HTTP call instead of N. ([3b7808a](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/3b7808a))
|
||||
- **`use_cache=False` escape hatch on mutation / event-detection paths** — `ImmichActionExecutor.execute` (which diffs the current album state to decide what to add) and `ImmichServiceProvider.poll`'s full-fetch path (where a stale entry would silently delay asset-removal events) explicitly bypass the cache. Non-cached fetches still populate it for subsequent readers. ([3b7808a](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/3b7808a))
|
||||
|
||||
---
|
||||
|
||||
<details>
|
||||
<summary>All Commits</summary>
|
||||
|
||||
- [d7d0a5d](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/d7d0a5d) — fix(settings): accept numeric values in update payload *(alexei.dolgolyov)*
|
||||
- [3b7808a](https://git.dolgolyov-family.by/alexei.dolgolyov/notify-bridge/commit/3b7808a) — perf(immich): TTL cache for album bodies and shared-link listings *(alexei.dolgolyov)*
|
||||
|
||||
</details>
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "notify-bridge-frontend",
|
||||
"private": true,
|
||||
"version": "0.2.5",
|
||||
"version": "0.3.1",
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"dev": "vite dev",
|
||||
|
||||
@@ -117,6 +117,14 @@
|
||||
return form.slots[slotName]?.[activeLocale] || '';
|
||||
}
|
||||
|
||||
/** Resolve variable reference for a slot, preferring provider-specific over shared. */
|
||||
function getVarsFor(slotName: string) {
|
||||
const providerVars = varsRef[form.provider_type];
|
||||
return providerVars?.[slotName] ?? varsRef[slotName];
|
||||
}
|
||||
|
||||
let modalVars = $derived(showVarsFor ? getVarsFor(showVarsFor) : null);
|
||||
|
||||
/** Set slot template for current locale (immutable update). */
|
||||
function setSlotValue(slotName: string, value: string) {
|
||||
form.slots = {
|
||||
@@ -369,7 +377,7 @@
|
||||
{t('templateConfig.preview')}
|
||||
</button>
|
||||
{/if}
|
||||
{#if varsRef[slot.name]}
|
||||
{#if getVarsFor(slot.name)}
|
||||
<button type="button" onclick={() => showVarsFor = slot.name}
|
||||
class="text-xs text-[var(--color-muted-foreground)] hover:underline">{t('templateConfig.variables')}</button>
|
||||
{/if}
|
||||
@@ -385,7 +393,7 @@
|
||||
onchange={(v: string) => { setSlotValue(slot.name, v); validateSlot(slot.name, v); }}
|
||||
rows={3}
|
||||
errorLine={slotErrorLines[slot.name] || null}
|
||||
variables={varsRef[slot.name] || undefined}
|
||||
variables={getVarsFor(slot.name) || undefined}
|
||||
/>
|
||||
{/if}
|
||||
|
||||
@@ -468,11 +476,11 @@
|
||||
|
||||
<!-- Variables reference modal -->
|
||||
<Modal open={showVarsFor !== null} title="{t('templateConfig.variables')}: /{showVarsFor || ''}" onclose={() => showVarsFor = null}>
|
||||
{#if showVarsFor && varsRef[showVarsFor]}
|
||||
<p class="text-sm text-[var(--color-muted-foreground)] mb-3">{varsRef[showVarsFor].description}</p>
|
||||
{#if showVarsFor && modalVars}
|
||||
<p class="text-sm text-[var(--color-muted-foreground)] mb-3">{modalVars.description}</p>
|
||||
<div class="space-y-1">
|
||||
<p class="text-xs font-medium mb-1">{t('templateConfig.variables')}:</p>
|
||||
{#each Object.entries(varsRef[showVarsFor].variables || {}) as [name, desc]}
|
||||
{#each Object.entries(modalVars.variables || {}) as [name, desc]}
|
||||
<div class="flex items-start gap-2 text-sm">
|
||||
<code class="text-xs bg-[var(--color-muted)] px-1 py-0.5 rounded font-mono whitespace-nowrap">{'{{ ' + name + ' }}'}</code>
|
||||
<span class="text-xs text-[var(--color-muted-foreground)]">{desc}</span>
|
||||
@@ -484,11 +492,19 @@
|
||||
['album_fields', 'album', 'Album fields'],
|
||||
['command_fields', 'cmd', 'Command fields'],
|
||||
['event_fields', 'event', 'Event fields'],
|
||||
['repo_fields', 'repo', 'Repository fields'],
|
||||
['issue_fields', 'issue', 'Issue fields'],
|
||||
['pr_fields', 'pr', 'Pull request fields'],
|
||||
['commit_fields', 'c', 'Commit fields'],
|
||||
['board_fields', 'board', 'Board fields'],
|
||||
['card_fields', 'card', 'Card fields'],
|
||||
['list_fields', 'lst', 'List fields'],
|
||||
['device_fields', 'd', 'Device fields'],
|
||||
] as [fieldKey, prefix, title]}
|
||||
{#if varsRef[showVarsFor][fieldKey]}
|
||||
{#if modalVars[fieldKey]}
|
||||
<div class="mt-3 pt-3 border-t border-[var(--color-border)]">
|
||||
<p class="text-xs font-medium mb-1">{title} <span class="font-normal text-[var(--color-muted-foreground)]">(use {prefix}.field)</span>:</p>
|
||||
{#each Object.entries(varsRef[showVarsFor][fieldKey]) as [name, desc]}
|
||||
{#each Object.entries(modalVars[fieldKey]) as [name, desc]}
|
||||
<div class="flex items-start gap-2 text-sm">
|
||||
<code class="text-xs bg-[var(--color-muted)] px-1 py-0.5 rounded font-mono whitespace-nowrap">{'{{ ' + prefix + '.' + name + ' }}'}</code>
|
||||
<span class="text-xs text-[var(--color-muted-foreground)]">{desc}</span>
|
||||
|
||||
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "notify-bridge-core"
|
||||
version = "0.2.5"
|
||||
version = "0.3.1"
|
||||
description = "Core library for Notify Bridge — service provider abstractions, models, notifications, and templates"
|
||||
requires-python = ">=3.12"
|
||||
dependencies = [
|
||||
|
||||
@@ -46,6 +46,7 @@ from .receiver import (
|
||||
from .telegram.cache import TelegramFileCache
|
||||
from .telegram.client import TelegramClient
|
||||
from .telegram.media import (
|
||||
build_telegram_asset_entry,
|
||||
extract_asset_id_from_url,
|
||||
is_asset_cache_key,
|
||||
is_asset_id,
|
||||
@@ -266,23 +267,19 @@ class NotificationDispatcher:
|
||||
# Prefer internal URL for fetching (LAN speed vs public internet)
|
||||
internal_url = (target.provider_internal_url or "").rstrip("/")
|
||||
external_url = (target.provider_external_url or "").rstrip("/")
|
||||
provider_urls = [u for u in (internal_url, external_url) if u]
|
||||
assets = []
|
||||
media_assets: list[Any] = [] # aligned with `assets` for preload
|
||||
for asset in event.added_assets[:max_media]:
|
||||
url = asset.preview_url or asset.thumbnail_url or asset.full_url
|
||||
if url:
|
||||
# Rewrite external URL to internal for faster LAN fetching
|
||||
if internal_url and external_url and url.startswith(external_url):
|
||||
url = internal_url + url[len(external_url):]
|
||||
asset_type = "video" if asset.type.value == "video" else "photo"
|
||||
asset_headers = {}
|
||||
if target.provider_api_key and any(url.startswith(u) for u in provider_urls):
|
||||
asset_headers["x-api-key"] = target.provider_api_key
|
||||
asset_entry: dict[str, Any] = {"url": url, "type": asset_type, "headers": asset_headers}
|
||||
# Pass explicit cache_key if set by provider (e.g. Google Photos)
|
||||
if asset.extra.get("cache_key"):
|
||||
asset_entry["cache_key"] = asset.extra["cache_key"]
|
||||
asset_entry = build_telegram_asset_entry(
|
||||
url=url or "",
|
||||
media_type=asset.type.value,
|
||||
api_key=target.provider_api_key,
|
||||
internal_url=internal_url,
|
||||
external_url=external_url,
|
||||
cache_key=asset.extra.get("cache_key"),
|
||||
)
|
||||
if asset_entry is not None:
|
||||
assets.append(asset_entry)
|
||||
media_assets.append(asset)
|
||||
|
||||
|
||||
@@ -89,6 +89,18 @@ class TelegramClient:
|
||||
self, url: str | None, cache_key: str | None = None,
|
||||
) -> tuple[TelegramFileCache | None, str | None, str | None]:
|
||||
if cache_key:
|
||||
# Route asset-UUID cache keys to the asset cache so single-item
|
||||
# sends hit the same cache the media-group path uses. Without
|
||||
# this, a command returning one photo stored file_ids in the
|
||||
# URL cache and a command returning multiple stored them in
|
||||
# the asset cache — repeated sends never hit.
|
||||
if is_asset_cache_key(cache_key):
|
||||
bare_id = asset_id_from_cache_key(cache_key)
|
||||
thumbhash = (
|
||||
self._thumbhash_resolver(bare_id)
|
||||
if self._thumbhash_resolver else None
|
||||
)
|
||||
return self._asset_cache, cache_key, thumbhash
|
||||
return self._url_cache, cache_key, None
|
||||
if url:
|
||||
if is_asset_id(url):
|
||||
@@ -217,7 +229,7 @@ class TelegramClient:
|
||||
|
||||
typing_task = None
|
||||
if chat_action:
|
||||
typing_task = self._start_typing_indicator(chat_id, chat_action)
|
||||
typing_task = self.start_chat_action_keepalive(chat_id, chat_action)
|
||||
|
||||
try:
|
||||
if len(assets) == 1 and assets[0].get("type") == "photo":
|
||||
@@ -328,7 +340,13 @@ class TelegramClient:
|
||||
except aiohttp.ClientError:
|
||||
return False
|
||||
|
||||
def _start_typing_indicator(self, chat_id: str, action: str = "typing") -> asyncio.Task:
|
||||
def start_chat_action_keepalive(self, chat_id: str, action: str = "typing") -> asyncio.Task:
|
||||
"""Repeatedly post ``action`` every 4s until the returned task is cancelled.
|
||||
|
||||
Telegram chat actions expire after ~5s, so callers that want the hint
|
||||
to persist through longer work (fetching assets, multi-chunk uploads)
|
||||
need a keep-alive. Cancel the task in a ``finally`` to stop it.
|
||||
"""
|
||||
async def action_loop() -> None:
|
||||
try:
|
||||
while True:
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import Final
|
||||
from typing import Any, Final
|
||||
from urllib.parse import urlparse
|
||||
|
||||
# Telegram constants
|
||||
@@ -52,6 +52,65 @@ def extract_asset_id_from_url(url: str) -> str | None:
|
||||
return None
|
||||
|
||||
|
||||
def build_telegram_asset_entry(
|
||||
*,
|
||||
url: str,
|
||||
media_type: str,
|
||||
api_key: str | None = None,
|
||||
internal_url: str = "",
|
||||
external_url: str = "",
|
||||
cache_key: str | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Build a ``TelegramClient.send_notification`` asset dict from raw fields.
|
||||
|
||||
Shared by the notification dispatcher and provider command handlers so
|
||||
both paths agree on media typing, URL rewriting, and auth headers. In
|
||||
particular: video assets MUST be typed ``"video"`` and point at a real
|
||||
video endpoint (e.g. Immich ``/video/playback``) — if they are sent as
|
||||
``"photo"`` pointing at a thumbnail URL, Telegram delivers a still image
|
||||
for every video in a media group and the user sees a dead poster frame
|
||||
instead of a playable clip.
|
||||
|
||||
Args:
|
||||
url: Source URL for the asset bytes. Prefer a transcoded/preview
|
||||
URL for videos (``/video/playback``) and a preview-sized
|
||||
thumbnail for photos.
|
||||
media_type: Case-insensitive type token. Accepts ``"video"``/
|
||||
``"VIDEO"``/``MediaType.VIDEO`` or any photo-like string.
|
||||
api_key: Optional API key. Attached as ``x-api-key`` iff the URL is
|
||||
served by one of the provider hosts in ``internal_url`` /
|
||||
``external_url`` (prevents leaking the key to unrelated hosts).
|
||||
internal_url: LAN-facing provider URL. Used to rewrite
|
||||
``external_url`` prefixes so Docker-host downloads stay on the
|
||||
LAN instead of egressing to the public domain.
|
||||
external_url: Public provider URL the notification URL was built
|
||||
from. Only used for the LAN rewrite and the api-key scope check.
|
||||
cache_key: Optional explicit cache key. Providers whose URLs don't
|
||||
embed a stable asset id (Google Photos) pass one through so the
|
||||
file_id cache still works.
|
||||
|
||||
Returns ``None`` iff ``url`` is empty.
|
||||
"""
|
||||
if not url:
|
||||
return None
|
||||
|
||||
if internal_url and external_url and url.startswith(external_url):
|
||||
url = internal_url + url[len(external_url):]
|
||||
|
||||
normalized_type = str(media_type or "").lower()
|
||||
entry_type = "video" if normalized_type == "video" else "photo"
|
||||
|
||||
headers: dict[str, str] = {}
|
||||
provider_urls = [u for u in (internal_url, external_url) if u]
|
||||
if api_key and (not provider_urls or any(url.startswith(u) for u in provider_urls)):
|
||||
headers["x-api-key"] = api_key
|
||||
|
||||
entry: dict[str, Any] = {"url": url, "type": entry_type, "headers": headers}
|
||||
if cache_key:
|
||||
entry["cache_key"] = cache_key
|
||||
return entry
|
||||
|
||||
|
||||
def split_media_by_upload_size(
|
||||
media_items: list[tuple], max_upload_size: int
|
||||
) -> list[list[tuple]]:
|
||||
|
||||
@@ -177,7 +177,9 @@ class ImmichActionExecutor(ActionExecutor):
|
||||
needs_thumbnail = album_id in album_created_now
|
||||
|
||||
if album_id and album_id != "__dry_run_new__":
|
||||
album = await self._client.get_album(album_id)
|
||||
# Actions diff the current album state to decide what to
|
||||
# add — must observe fresh data, not a cached view.
|
||||
album = await self._client.get_album(album_id, use_cache=False)
|
||||
if album is None and create_if_missing and create_album_name:
|
||||
if not dry_run:
|
||||
created = await self._client.create_album(create_album_name)
|
||||
|
||||
@@ -193,6 +193,27 @@ def get_asset_video_url(
|
||||
return None
|
||||
|
||||
|
||||
def build_asset_media_urls(
|
||||
external_url: str, asset_id: str, asset_type: str,
|
||||
) -> tuple[str, str]:
|
||||
"""Return ``(preview_url, full_url)`` for an Immich asset.
|
||||
|
||||
Single source of truth for the photo-vs-video endpoint rule. Used by
|
||||
``asset_to_media`` (notification path) and the bot command handlers
|
||||
(command path) so both always pick the transcoded ``/video/playback``
|
||||
for videos and the preview-sized thumbnail for photos — if they
|
||||
diverge, Telegram ends up delivering a still JPEG for videos in a
|
||||
media group.
|
||||
"""
|
||||
is_video = asset_type == ASSET_TYPE_VIDEO
|
||||
if is_video:
|
||||
preview_url = f"{external_url}/api/assets/{asset_id}/video/playback"
|
||||
else:
|
||||
preview_url = f"{external_url}/api/assets/{asset_id}/thumbnail?size=preview"
|
||||
full_url = f"{external_url}/api/assets/{asset_id}/original"
|
||||
return preview_url, full_url
|
||||
|
||||
|
||||
def build_asset_detail(
|
||||
asset: ImmichAssetInfo,
|
||||
external_url: str,
|
||||
@@ -246,12 +267,7 @@ def asset_to_media(asset: ImmichAssetInfo, external_url: str) -> MediaAsset:
|
||||
# preview_url is what the notification dispatcher feeds to Telegram as the
|
||||
# actual media bytes — for videos it must be the transcoded playback (mp4),
|
||||
# not the JPEG thumbnail, or Telegram receives a JPEG labeled as video/mp4.
|
||||
if asset.type == ASSET_TYPE_VIDEO:
|
||||
preview_url = f"{external_url}/api/assets/{asset.id}/video/playback"
|
||||
full_url = f"{external_url}/api/assets/{asset.id}/original"
|
||||
else:
|
||||
preview_url = f"{external_url}/api/assets/{asset.id}/thumbnail?size=preview"
|
||||
full_url = f"{external_url}/api/assets/{asset.id}/original"
|
||||
preview_url, full_url = build_asset_media_urls(external_url, asset.id, asset.type)
|
||||
|
||||
return MediaAsset(
|
||||
id=asset.id,
|
||||
|
||||
@@ -13,6 +13,18 @@ from .models import ImmichAlbumData, ImmichAssetInfo
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
# Guard against runaway payloads when a bulk import lands in one poll tick.
|
||||
# Templates iterate every entry in ``added_assets`` / ``removed_asset_ids``
|
||||
# in Jinja for-loops (see defaults/*/assets_added.jinja2), and Telegram's
|
||||
# media group has a hard cap of its own — sending 200k entries would both
|
||||
# crash rendering and produce a message that no transport can deliver.
|
||||
#
|
||||
# ``added_count`` / ``removed_count`` on the event always carry the true
|
||||
# totals so templates can show an accurate "N added" number even when the
|
||||
# per-asset list is truncated.
|
||||
_MAX_ASSETS_PER_EVENT = 50
|
||||
_MAX_REMOVALS_PER_EVENT = 200
|
||||
|
||||
|
||||
def _make_base_extra(new_album: ImmichAlbumData, external_url: str) -> dict:
|
||||
"""Build the common extra dict for album events."""
|
||||
@@ -85,7 +97,17 @@ def detect_album_changes(
|
||||
|
||||
# Emit one event per change type detected
|
||||
if added_assets:
|
||||
media_assets = [asset_to_media(a, external_url) for a in added_assets]
|
||||
total_added = len(added_assets)
|
||||
truncated_added = added_assets[:_MAX_ASSETS_PER_EVENT]
|
||||
media_assets = [asset_to_media(a, external_url) for a in truncated_added]
|
||||
event_extra = dict(extra)
|
||||
if total_added > _MAX_ASSETS_PER_EVENT:
|
||||
event_extra["truncated"] = True
|
||||
event_extra["shown_count"] = _MAX_ASSETS_PER_EVENT
|
||||
_LOGGER.info(
|
||||
"Truncated assets_added event for album %s: %d → %d",
|
||||
new_album.id, total_added, _MAX_ASSETS_PER_EVENT,
|
||||
)
|
||||
events.append(ServiceEvent(
|
||||
event_type=EventType.ASSETS_ADDED,
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
@@ -95,12 +117,22 @@ def detect_album_changes(
|
||||
timestamp=now,
|
||||
added_assets=media_assets,
|
||||
removed_asset_ids=[],
|
||||
added_count=len(added_assets),
|
||||
added_count=total_added,
|
||||
removed_count=0,
|
||||
extra=dict(extra),
|
||||
extra=event_extra,
|
||||
))
|
||||
|
||||
if removed_ids:
|
||||
total_removed = len(removed_ids)
|
||||
truncated_removed = list(removed_ids)[:_MAX_REMOVALS_PER_EVENT]
|
||||
event_extra = dict(extra)
|
||||
if total_removed > _MAX_REMOVALS_PER_EVENT:
|
||||
event_extra["truncated"] = True
|
||||
event_extra["shown_count"] = _MAX_REMOVALS_PER_EVENT
|
||||
_LOGGER.info(
|
||||
"Truncated assets_removed event for album %s: %d → %d",
|
||||
new_album.id, total_removed, _MAX_REMOVALS_PER_EVENT,
|
||||
)
|
||||
events.append(ServiceEvent(
|
||||
event_type=EventType.ASSETS_REMOVED,
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
@@ -109,10 +141,10 @@ def detect_album_changes(
|
||||
collection_name=new_album.name,
|
||||
timestamp=now,
|
||||
added_assets=[],
|
||||
removed_asset_ids=list(removed_ids),
|
||||
removed_asset_ids=truncated_removed,
|
||||
added_count=0,
|
||||
removed_count=len(removed_ids),
|
||||
extra=dict(extra),
|
||||
removed_count=total_removed,
|
||||
extra=event_extra,
|
||||
))
|
||||
|
||||
if name_changed:
|
||||
|
||||
@@ -2,14 +2,17 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
|
||||
from ...notifications.ssrf import UnsafeURLError, validate_outbound_url
|
||||
from .models import ImmichAlbumData, SharedLinkInfo
|
||||
from .models import ImmichAlbumData, ImmichAlbumMeta, SharedLinkInfo
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@@ -18,6 +21,51 @@ _LOGGER = logging.getLogger(__name__)
|
||||
MAX_SEARCH_QUERY_LEN = 256
|
||||
MAX_SEARCH_PERSON_IDS = 50
|
||||
|
||||
# Module-level TTL caches for album bodies and shared-link listings. The
|
||||
# Immich ``GET /api/albums/{id}`` response can be tens or hundreds of MB on a
|
||||
# large album, and bot commands like /random, /latest, /memory all refetch
|
||||
# the same album in quick succession. A short TTL makes repeat runs nearly
|
||||
# instant and deduplicates concurrent fetches so a burst of commands issues
|
||||
# one HTTP call instead of N.
|
||||
#
|
||||
# Caches are module-scoped (not instance-scoped) because ``ImmichClient`` is
|
||||
# constructed fresh per request in several places (api/providers.py,
|
||||
# services/action_runner.py, command handlers), so an instance cache would
|
||||
# never survive to serve a second caller. This mirrors ``_users_cache`` in
|
||||
# ``provider.py``.
|
||||
_ALBUM_CACHE_TTL_SECONDS = 60
|
||||
_SHARED_LINKS_CACHE_TTL_SECONDS = 60
|
||||
# Guard rail against runaway memory — a 200k-asset album response can be
|
||||
# ~150 MB, so even modest caps bound the worst case.
|
||||
_ALBUM_CACHE_MAX_ENTRIES = 32
|
||||
_album_cache_lock = asyncio.Lock()
|
||||
# key = (server_digest, album_id); value = (monotonic_ts, raw_api_dict)
|
||||
# Store the raw dict rather than the parsed ``ImmichAlbumData`` so callers
|
||||
# that pass a ``users_cache`` still get owner-name enrichment on cache hits.
|
||||
_album_cache: dict[tuple[str, str], tuple[float, dict[str, Any]]] = {}
|
||||
_shared_links_cache_lock = asyncio.Lock()
|
||||
# key = server_digest; value = (monotonic_ts, {album_id: [SharedLinkInfo, ...]})
|
||||
# The underlying ``/api/shared-links`` endpoint has no per-album filter, so
|
||||
# every call was already paying for the full server-wide list. Caching the
|
||||
# bucketed result once per server turns N per-album calls into one fetch.
|
||||
_shared_links_cache: dict[str, tuple[float, dict[str, list[SharedLinkInfo]]]] = {}
|
||||
|
||||
|
||||
def _server_digest(url: str, api_key: str) -> str:
|
||||
"""Hashed key that avoids putting raw api_key into cache dict keys."""
|
||||
return hashlib.sha256(f"{url}|{api_key}".encode("utf-8")).hexdigest()[:32]
|
||||
|
||||
|
||||
def invalidate_album_cache() -> None:
|
||||
"""Drop every cached album body. Call after mutations that invalidate
|
||||
the cached view (e.g. integration tests, manual /refresh commands)."""
|
||||
_album_cache.clear()
|
||||
|
||||
|
||||
def invalidate_shared_links_cache() -> None:
|
||||
"""Drop every cached shared-link listing."""
|
||||
_shared_links_cache.clear()
|
||||
|
||||
# User-facing error bodies — Immich responses may leak internal paths,
|
||||
# hostnames, or headers injected by intermediary proxies. These helpers keep
|
||||
# only a short, scrubbed summary; full bodies are logged server-side only.
|
||||
@@ -184,28 +232,100 @@ class ImmichClient:
|
||||
return {}
|
||||
|
||||
async def get_shared_links(self, album_id: str) -> list[SharedLinkInfo]:
|
||||
links: list[SharedLinkInfo] = []
|
||||
bucketed = await self._get_shared_links_bucketed()
|
||||
return list(bucketed.get(album_id, []))
|
||||
|
||||
async def _get_shared_links_bucketed(self) -> dict[str, list[SharedLinkInfo]]:
|
||||
"""Return ``{album_id: [SharedLinkInfo, ...]}`` for the server, hitting
|
||||
the module-level TTL cache first. Underlying Immich endpoint has no
|
||||
per-album filter, so one server-wide fetch serves every caller until
|
||||
the TTL elapses.
|
||||
"""
|
||||
digest = _server_digest(self._url, self._api_key)
|
||||
now = time.monotonic()
|
||||
entry = _shared_links_cache.get(digest)
|
||||
if entry is not None and (now - entry[0]) < _SHARED_LINKS_CACHE_TTL_SECONDS:
|
||||
return entry[1]
|
||||
|
||||
async with _shared_links_cache_lock:
|
||||
# Re-check under the lock — another coroutine may have refreshed
|
||||
# while we waited.
|
||||
entry = _shared_links_cache.get(digest)
|
||||
if entry is not None and (time.monotonic() - entry[0]) < _SHARED_LINKS_CACHE_TTL_SECONDS:
|
||||
return entry[1]
|
||||
fresh = await self.get_all_shared_links_by_album()
|
||||
_shared_links_cache[digest] = (time.monotonic(), fresh)
|
||||
return fresh
|
||||
|
||||
async def get_all_shared_links_by_album(self) -> dict[str, list[SharedLinkInfo]]:
|
||||
"""Fetch every shared link on the server, bucketed by album id.
|
||||
|
||||
Immich's ``/api/shared-links`` endpoint is server-wide — there's no
|
||||
per-album filter server-side — so every call that wanted the links
|
||||
for a single album was already paying the cost of the full listing
|
||||
and then discarding most of the response. Callers that need links
|
||||
for multiple albums in one tick should use this method and index
|
||||
into the returned dict instead of hitting ``get_shared_links`` in
|
||||
a loop.
|
||||
|
||||
Returns an empty dict on any error (matches the silent-failure
|
||||
contract of ``get_shared_links`` so callers don't need to branch
|
||||
on transient outages).
|
||||
"""
|
||||
result: dict[str, list[SharedLinkInfo]] = {}
|
||||
try:
|
||||
async with self._session.get(
|
||||
f"{self._url}/api/shared-links",
|
||||
headers=self._headers,
|
||||
) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
for link in data:
|
||||
album = link.get("album")
|
||||
key = link.get("key")
|
||||
if album and key and album.get("id") == album_id:
|
||||
links.append(SharedLinkInfo.from_api_response(link))
|
||||
if response.status != 200:
|
||||
_LOGGER.warning(
|
||||
"get_all_shared_links non-200: HTTP %s", response.status
|
||||
)
|
||||
return result
|
||||
data = await response.json()
|
||||
for link in data:
|
||||
album = link.get("album")
|
||||
key = link.get("key")
|
||||
if not (album and key):
|
||||
continue
|
||||
aid = album.get("id")
|
||||
if not aid:
|
||||
continue
|
||||
result.setdefault(aid, []).append(
|
||||
SharedLinkInfo.from_api_response(link)
|
||||
)
|
||||
except aiohttp.ClientError as err:
|
||||
_LOGGER.warning("Failed to fetch shared links: %s", err)
|
||||
return links
|
||||
_LOGGER.warning("Failed to fetch all shared links: %s", err)
|
||||
return result
|
||||
|
||||
async def get_album(
|
||||
self,
|
||||
album_id: str,
|
||||
users_cache: dict[str, str] | None = None,
|
||||
*,
|
||||
use_cache: bool = True,
|
||||
) -> ImmichAlbumData | None:
|
||||
"""Fetch an album by id, optionally serving from the module-level
|
||||
TTL cache. Pass ``use_cache=False`` from paths that must observe the
|
||||
current server state (e.g. the notification poll loop's full-fetch
|
||||
path, where a stale cached entry would delay asset-removal events).
|
||||
Non-cached fetches still populate the cache for subsequent readers.
|
||||
"""
|
||||
cache_key = (_server_digest(self._url, self._api_key), album_id)
|
||||
if use_cache:
|
||||
entry = _album_cache.get(cache_key)
|
||||
if entry is not None and (time.monotonic() - entry[0]) < _ALBUM_CACHE_TTL_SECONDS:
|
||||
# Rehydrate per-call so ``users_cache`` enrichment is applied
|
||||
# with the caller's dict, not whichever one was live when the
|
||||
# cache was populated.
|
||||
return ImmichAlbumData.from_api_response(entry[1], users_cache)
|
||||
|
||||
# Deliberately fetch without holding a lock so concurrent calls for
|
||||
# *different* album_ids (the common case from asyncio.gather in
|
||||
# fetch_albums_with_links) stay parallel. The worst case is a small
|
||||
# duplicate-fetch stampede when two requests miss the same album at
|
||||
# the same instant — acceptable for our scale.
|
||||
try:
|
||||
async with self._session.get(
|
||||
f"{self._url}/api/albums/{album_id}",
|
||||
@@ -218,10 +338,132 @@ class ImmichClient:
|
||||
f"Error fetching album {album_id}: HTTP {response.status}"
|
||||
)
|
||||
data = await response.json()
|
||||
return ImmichAlbumData.from_api_response(data, users_cache)
|
||||
except aiohttp.ClientError as err:
|
||||
raise ImmichApiError(f"Error communicating with Immich: {err}") from err
|
||||
|
||||
async with _album_cache_lock:
|
||||
# Evict the oldest entry if we're at the cap — simple FIFO is fine
|
||||
# for our access pattern (commands touch a small working set).
|
||||
if len(_album_cache) >= _ALBUM_CACHE_MAX_ENTRIES and cache_key not in _album_cache:
|
||||
oldest = min(_album_cache.items(), key=lambda kv: kv[1][0])[0]
|
||||
_album_cache.pop(oldest, None)
|
||||
_album_cache[cache_key] = (time.monotonic(), data)
|
||||
return ImmichAlbumData.from_api_response(data, users_cache)
|
||||
|
||||
async def get_album_meta(self, album_id: str) -> ImmichAlbumMeta | None:
|
||||
"""Fetch album metadata without the assets array.
|
||||
|
||||
Uses Immich's ``?withoutAssets=true`` query param, which skips the
|
||||
(potentially huge) ``assets`` field. A 200k-asset album response
|
||||
drops from ~150 MB to a few hundred bytes, so this is cheap enough
|
||||
to run on every poll as a change-detection probe.
|
||||
"""
|
||||
try:
|
||||
async with self._session.get(
|
||||
f"{self._url}/api/albums/{album_id}",
|
||||
params={"withoutAssets": "true"},
|
||||
headers=self._headers,
|
||||
) as response:
|
||||
if response.status == 404:
|
||||
return None
|
||||
if response.status != 200:
|
||||
raise ImmichApiError(
|
||||
f"Error fetching album meta {album_id}: HTTP {response.status}"
|
||||
)
|
||||
data = await response.json()
|
||||
return ImmichAlbumMeta.from_api_response(data)
|
||||
except aiohttp.ClientError as err:
|
||||
raise ImmichApiError(f"Error communicating with Immich: {err}") from err
|
||||
|
||||
async def search_album_assets_updated_after(
|
||||
self,
|
||||
album_id: str,
|
||||
updated_after: str,
|
||||
*,
|
||||
page_size: int = 1000,
|
||||
max_pages: int = 50,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Fetch assets in ``album_id`` whose ``updatedAt`` is after ``updated_after``.
|
||||
|
||||
Uses ``POST /api/search/metadata`` with ``albumIds=[album_id]`` and
|
||||
``updatedAfter=<iso>``. Paginates through up to ``max_pages`` pages —
|
||||
the cap exists so a clock-skew or upstream bug cannot produce an
|
||||
infinite loop that exhausts memory on a 200k-asset album. In practice
|
||||
an active album sees a few hundred updated assets per tick and
|
||||
terminates after one page.
|
||||
|
||||
Returns raw Immich asset dicts (same shape as ``album.assets[*]``
|
||||
from ``get_album``), so callers can feed them into
|
||||
``ImmichAssetInfo.from_api_response`` directly.
|
||||
"""
|
||||
if not updated_after:
|
||||
return []
|
||||
|
||||
page_size = max(1, min(page_size, 1000))
|
||||
results: list[dict[str, Any]] = []
|
||||
for page in range(1, max_pages + 1):
|
||||
payload: dict[str, Any] = {
|
||||
"albumIds": [album_id],
|
||||
"updatedAfter": updated_after,
|
||||
"page": page,
|
||||
"size": page_size,
|
||||
# ``withExif`` keeps location/description parity with
|
||||
# ``get_album`` so downstream ``ImmichAssetInfo.from_api_response``
|
||||
# populates city/country/rating on the delta path too.
|
||||
"withExif": True,
|
||||
"withPeople": True,
|
||||
}
|
||||
try:
|
||||
async with self._session.post(
|
||||
f"{self._url}/api/search/metadata",
|
||||
headers=self._json_headers,
|
||||
json=payload,
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
body_snip = await response.text()
|
||||
_LOGGER.warning(
|
||||
"Immich delta search non-200: HTTP %s body=%s",
|
||||
response.status, _redact_body(body_snip),
|
||||
)
|
||||
break
|
||||
data = await response.json()
|
||||
assets_block = data.get("assets")
|
||||
if isinstance(assets_block, dict):
|
||||
items = assets_block.get("items", []) or []
|
||||
next_page = assets_block.get("nextPage")
|
||||
elif isinstance(assets_block, list):
|
||||
items = assets_block
|
||||
next_page = None
|
||||
else:
|
||||
_LOGGER.warning(
|
||||
"Immich delta search returned unexpected shape: keys=%s",
|
||||
list(data.keys())[:5],
|
||||
)
|
||||
break
|
||||
|
||||
results.extend(items)
|
||||
|
||||
# Stop early on the last page. Immich returns nextPage as
|
||||
# the next page number (string or int) or None/empty when
|
||||
# exhausted. Fall back to page-fullness heuristic if the
|
||||
# server omits the pagination hint.
|
||||
if next_page is None or next_page == "" or next_page == 0:
|
||||
break
|
||||
if len(items) < page_size:
|
||||
break
|
||||
except aiohttp.ClientError as err:
|
||||
_LOGGER.warning("Immich delta search transport error: %s", err)
|
||||
break
|
||||
except Exception as err: # noqa: BLE001 — resilience over correctness
|
||||
_LOGGER.warning("Immich delta search parse error: %s", err)
|
||||
break
|
||||
else:
|
||||
_LOGGER.warning(
|
||||
"Immich delta search for album %s hit max_pages=%d cap",
|
||||
album_id, max_pages,
|
||||
)
|
||||
return results
|
||||
|
||||
async def get_albums(self) -> list[dict[str, Any]]:
|
||||
try:
|
||||
async with self._session.get(
|
||||
|
||||
@@ -146,6 +146,49 @@ class ImmichAssetInfo:
|
||||
return bool(thumbhash)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ImmichAlbumMeta:
|
||||
"""Lightweight album metadata from ``GET /api/albums/{id}?withoutAssets=true``.
|
||||
|
||||
Used as a cheap change-detection probe so we can skip the multi-MB
|
||||
full-asset fetch when nothing interesting has changed. Large albums
|
||||
(tens to hundreds of thousands of assets) would otherwise re-serialize
|
||||
the entire asset list on every poll interval.
|
||||
"""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
asset_count: int
|
||||
updated_at: str
|
||||
shared: bool
|
||||
thumbnail_asset_id: str | None = None
|
||||
|
||||
@classmethod
|
||||
def from_api_response(cls, data: dict[str, Any]) -> ImmichAlbumMeta:
|
||||
return cls(
|
||||
id=data["id"],
|
||||
name=data.get("albumName", "Unnamed"),
|
||||
asset_count=int(data.get("assetCount", 0) or 0),
|
||||
updated_at=data.get("updatedAt", "") or "",
|
||||
shared=bool(data.get("shared", False)),
|
||||
thumbnail_asset_id=data.get("albumThumbnailAssetId"),
|
||||
)
|
||||
|
||||
def fingerprint(self) -> dict[str, Any]:
|
||||
"""Return a minimal serializable dict for persistence + equality checks.
|
||||
|
||||
We purposefully exclude ``id`` (known from the state row) and keep the
|
||||
dict flat so JSON round-trips are cheap and stable for equality.
|
||||
"""
|
||||
return {
|
||||
"updated_at": self.updated_at,
|
||||
"asset_count": self.asset_count,
|
||||
"shared": self.shared,
|
||||
"name": self.name,
|
||||
"thumbnail_asset_id": self.thumbnail_asset_id or "",
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImmichAlbumData:
|
||||
"""Full album data from Immich API."""
|
||||
|
||||
@@ -2,7 +2,10 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import logging
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
@@ -11,13 +14,62 @@ from notify_bridge_core.models.events import ServiceEvent
|
||||
from notify_bridge_core.providers.base import ServiceProvider, ServiceProviderType
|
||||
from notify_bridge_core.templates.variables import TemplateVariableDefinition
|
||||
|
||||
from .change_detector import detect_album_changes
|
||||
from .asset_utils import asset_to_media
|
||||
from .change_detector import _MAX_ASSETS_PER_EVENT, detect_album_changes
|
||||
from .client import ImmichClient
|
||||
from .models import ImmichAlbumData
|
||||
from .models import ImmichAlbumData, ImmichAlbumMeta, ImmichAssetInfo
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Module-level users cache shared across ImmichServiceProvider instances.
|
||||
# Users change rarely (new people joining the server, display-name edits), so
|
||||
# refetching on every tracker's ``connect()`` is wasteful — a fleet of 10
|
||||
# trackers on the same Immich server otherwise issues 10 ``GET /api/users``
|
||||
# calls per poll cycle. TTL is conservative (1h) and a hashed key keeps the
|
||||
# raw api_key out of dict keys in case of a memory dump.
|
||||
_USERS_CACHE_TTL_SECONDS = 3600
|
||||
_users_cache_lock = asyncio.Lock()
|
||||
_users_cache: dict[str, tuple[float, dict[str, str]]] = {}
|
||||
|
||||
|
||||
def _users_cache_key(url: str, api_key: str) -> str:
|
||||
digest = hashlib.sha256(f"{url}|{api_key}".encode("utf-8")).hexdigest()
|
||||
return digest[:32]
|
||||
|
||||
|
||||
async def _get_cached_users(
|
||||
client: ImmichClient, url: str, api_key: str
|
||||
) -> dict[str, str]:
|
||||
"""Return ``{user_id: display_name}`` for the server, reusing cache entries
|
||||
whose TTL has not elapsed. Misses and stale hits fall through to a real
|
||||
fetch under a single lock so concurrent polls don't stampede the server.
|
||||
"""
|
||||
key = _users_cache_key(url, api_key)
|
||||
now = time.monotonic()
|
||||
entry = _users_cache.get(key)
|
||||
if entry is not None and (now - entry[0]) < _USERS_CACHE_TTL_SECONDS:
|
||||
return entry[1]
|
||||
|
||||
async with _users_cache_lock:
|
||||
# Re-check after acquiring the lock — another coroutine may have
|
||||
# refreshed the entry while we waited.
|
||||
entry = _users_cache.get(key)
|
||||
if entry is not None and (time.monotonic() - entry[0]) < _USERS_CACHE_TTL_SECONDS:
|
||||
return entry[1]
|
||||
fresh = await client.get_users()
|
||||
_users_cache[key] = (time.monotonic(), fresh)
|
||||
return fresh
|
||||
|
||||
|
||||
def invalidate_users_cache() -> None:
|
||||
"""Drop every cached users dict. Exposed for callers that mutate users
|
||||
(e.g. provider config changes, integration tests) and need the next
|
||||
``connect()`` to re-fetch.
|
||||
"""
|
||||
_users_cache.clear()
|
||||
|
||||
|
||||
# Immich-specific template variables
|
||||
IMMICH_VARIABLES: list[TemplateVariableDefinition] = [
|
||||
TemplateVariableDefinition(
|
||||
@@ -135,7 +187,9 @@ class ImmichServiceProvider(ServiceProvider):
|
||||
await self._client.get_server_config()
|
||||
if self._external_domain:
|
||||
self._client.external_domain = self._external_domain
|
||||
self._users_cache = await self._client.get_users()
|
||||
self._users_cache = await _get_cached_users(
|
||||
self._client, self._client.url, self._client.api_key,
|
||||
)
|
||||
return ok
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
@@ -150,9 +204,32 @@ class ImmichServiceProvider(ServiceProvider):
|
||||
new_state = dict(tracker_state)
|
||||
external_url = self._client.external_url
|
||||
|
||||
for album_id in collection_ids:
|
||||
album = await self._client.get_album(album_id, self._users_cache)
|
||||
if album is None:
|
||||
# Tick-scoped share-link cache. Populated lazily on first enrichment;
|
||||
# a tracker watching 5 albums with changes now issues 1 ``/api/shared-links``
|
||||
# request per tick instead of 5 (and the endpoint is server-wide — each
|
||||
# call was already fetching all links and discarding most of them).
|
||||
self._tick_shared_links: dict[str, list] | None = None
|
||||
|
||||
# Fan out the cheap meta probes in parallel. For a tracker that
|
||||
# watches 20 albums on the same Immich server this turns a 20-hop
|
||||
# serial wait into ~1 round-trip's worth of latency. aiohttp's
|
||||
# connection pool caps concurrency per host, so this can't stampede.
|
||||
meta_results = await asyncio.gather(
|
||||
*(self._client.get_album_meta(aid) for aid in collection_ids),
|
||||
return_exceptions=True,
|
||||
)
|
||||
|
||||
for album_id, meta_or_exc in zip(collection_ids, meta_results):
|
||||
if isinstance(meta_or_exc, BaseException):
|
||||
# Transient failure on this album — preserve existing state
|
||||
# and move on. Logging at warning so flaky albums surface in
|
||||
# the log without flooding on hard outages.
|
||||
_LOGGER.warning(
|
||||
"Meta probe failed for album %s: %s", album_id, meta_or_exc,
|
||||
)
|
||||
continue
|
||||
meta = meta_or_exc
|
||||
if meta is None:
|
||||
# Album deleted
|
||||
if album_id in new_state:
|
||||
from notify_bridge_core.models.events import EventType
|
||||
@@ -168,11 +245,80 @@ class ImmichServiceProvider(ServiceProvider):
|
||||
del new_state[album_id]
|
||||
continue
|
||||
|
||||
# Get previous state
|
||||
prev = new_state.get(album_id)
|
||||
prev_fingerprint = prev.get("meta_fingerprint") if prev else None
|
||||
has_pending = bool(prev and prev.get("pending_asset_ids"))
|
||||
|
||||
# 2) Fast-path: fingerprint match and no pending assets → no work.
|
||||
# We still refresh the fingerprint slot (no-op if identical) and
|
||||
# leave asset_ids untouched on disk.
|
||||
if (
|
||||
prev is not None
|
||||
and prev_fingerprint == meta.fingerprint()
|
||||
and not has_pending
|
||||
):
|
||||
continue
|
||||
|
||||
# 3) Decide: delta fetch (cheap, active-album case) or full
|
||||
# fetch (first tick + reconciliation for removals).
|
||||
old_fp = prev.get("meta_fingerprint") if prev else None
|
||||
old_asset_count = (old_fp or {}).get("asset_count", 0)
|
||||
old_updated_at = (old_fp or {}).get("updated_at", "")
|
||||
|
||||
# Gate for the delta path:
|
||||
# - must be tracked already (prev exists, has asset_ids)
|
||||
# - must have a prior timestamp (empty ⇒ migrated DB row)
|
||||
# - asset_count must not have decreased (removals need full fetch)
|
||||
can_delta = (
|
||||
prev is not None
|
||||
and bool(prev.get("asset_ids"))
|
||||
and bool(old_updated_at)
|
||||
and meta.asset_count >= old_asset_count
|
||||
)
|
||||
|
||||
if can_delta:
|
||||
delta_events = await self._poll_delta(
|
||||
album_id=album_id,
|
||||
prev=prev,
|
||||
new_meta=meta,
|
||||
old_updated_at=old_updated_at,
|
||||
)
|
||||
if delta_events is not None:
|
||||
events.extend(delta_events["events"])
|
||||
new_state[album_id] = delta_events["new_state"]
|
||||
continue
|
||||
# delta_events is None ⇒ delta saw more additions than the
|
||||
# net count increase (mixed add+remove) ⇒ fall through to
|
||||
# the full-fetch path so removals get detected.
|
||||
|
||||
# Full fetch: first tick, or count-decreased, or delta-unsafe.
|
||||
# Bypass the module-level album cache — this path runs when we
|
||||
# specifically need the current server state (e.g. to detect
|
||||
# asset removals), so a stale cached entry would silently delay
|
||||
# the event.
|
||||
album = await self._client.get_album(
|
||||
album_id, self._users_cache, use_cache=False,
|
||||
)
|
||||
if album is None:
|
||||
# Album was deleted between meta probe and full fetch — handle
|
||||
# the deletion the same way as above.
|
||||
if album_id in new_state:
|
||||
from notify_bridge_core.models.events import EventType
|
||||
from datetime import datetime, timezone
|
||||
events.append(ServiceEvent(
|
||||
event_type=EventType.COLLECTION_DELETED,
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
provider_name=self._name,
|
||||
collection_id=album_id,
|
||||
collection_name=new_state.get(album_id, {}).get("name", "Unknown"),
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
))
|
||||
del new_state[album_id]
|
||||
continue
|
||||
|
||||
if prev is None:
|
||||
# First time seeing this album — store state, no event
|
||||
new_state[album_id] = _serialize_album_state(album)
|
||||
new_state[album_id] = _serialize_album_state(album, meta)
|
||||
continue
|
||||
|
||||
# Reconstruct previous album data for comparison
|
||||
@@ -184,34 +330,233 @@ class ImmichServiceProvider(ServiceProvider):
|
||||
)
|
||||
|
||||
if detected_events:
|
||||
# Fetch shared links to enrich events with public_url
|
||||
shared_links = await self._client.get_shared_links(album_id)
|
||||
public_link = None
|
||||
protected_link = None
|
||||
for link in shared_links:
|
||||
if link.is_accessible and not link.is_expired:
|
||||
if link.has_password:
|
||||
protected_link = link
|
||||
else:
|
||||
public_link = link
|
||||
break # prefer non-password link
|
||||
|
||||
ext_domain = self._external_domain or self._client.external_url
|
||||
for evt in detected_events:
|
||||
if public_link:
|
||||
evt.extra["public_url"] = f"{ext_domain}/share/{public_link.key}"
|
||||
elif protected_link:
|
||||
evt.extra["protected_url"] = f"{ext_domain}/share/{protected_link.key}"
|
||||
|
||||
await self._enrich_with_shared_links(album_id, detected_events)
|
||||
events.extend(detected_events)
|
||||
|
||||
# Update state
|
||||
state = _serialize_album_state(album)
|
||||
state = _serialize_album_state(album, meta)
|
||||
state["pending_asset_ids"] = list(updated_pending)
|
||||
new_state[album_id] = state
|
||||
|
||||
return events, new_state
|
||||
|
||||
async def _poll_delta(
|
||||
self,
|
||||
*,
|
||||
album_id: str,
|
||||
prev: dict[str, Any],
|
||||
new_meta: ImmichAlbumMeta,
|
||||
old_updated_at: str,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Delta-fetch path for an active album.
|
||||
|
||||
Calls ``search/metadata`` with ``updatedAfter`` instead of pulling
|
||||
the full asset list. Returns a dict with ``events`` and ``new_state``
|
||||
on success, or ``None`` to signal the caller to retry via full fetch
|
||||
(used when a mixed add+remove is detected — the delta endpoint can't
|
||||
tell us *what* was removed, only that additions alone don't account
|
||||
for the net count change).
|
||||
|
||||
Trades strict detection of removals-during-mixed-changes for a
|
||||
drastic reduction in bytes fetched per tick. On a 200k-asset album
|
||||
where 50 were just added, we fetch ~50 asset records instead of
|
||||
200 000.
|
||||
"""
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from notify_bridge_core.models.events import EventType
|
||||
|
||||
prev_asset_ids: set[str] = set(prev.get("asset_ids", []))
|
||||
prev_pending: set[str] = set(prev.get("pending_asset_ids", []))
|
||||
|
||||
raw_assets = await self._client.search_album_assets_updated_after(
|
||||
album_id, old_updated_at
|
||||
)
|
||||
|
||||
# Parse everything that came back. We need unprocessed entries too
|
||||
# (they feed the ``pending_asset_ids`` list used by the original
|
||||
# change detector's processed-later logic).
|
||||
delta_assets: list[ImmichAssetInfo] = []
|
||||
for raw in raw_assets:
|
||||
try:
|
||||
delta_assets.append(
|
||||
ImmichAssetInfo.from_api_response(raw, self._users_cache)
|
||||
)
|
||||
except Exception as err: # noqa: BLE001 — one bad record ≠ abort tick
|
||||
_LOGGER.warning(
|
||||
"Skipping malformed asset record in delta response: %s", err
|
||||
)
|
||||
|
||||
newly_added: list[ImmichAssetInfo] = []
|
||||
still_pending: set[str] = set()
|
||||
for asset in delta_assets:
|
||||
if asset.is_processed:
|
||||
if asset.id not in prev_asset_ids:
|
||||
newly_added.append(asset)
|
||||
else:
|
||||
still_pending.add(asset.id)
|
||||
|
||||
old_asset_count = int((prev.get("meta_fingerprint") or {}).get("asset_count", 0))
|
||||
net_change = new_meta.asset_count - old_asset_count
|
||||
|
||||
# If delta found more "added" assets than the net count change,
|
||||
# a concurrent removal happened. Full fetch is the only way to
|
||||
# know what was removed — bail out so the caller retries.
|
||||
if net_change >= 0 and len(newly_added) > net_change:
|
||||
_LOGGER.info(
|
||||
"Delta for album %s found %d additions but net change is %d "
|
||||
"— falling back to full fetch for removal reconciliation",
|
||||
album_id, len(newly_added), net_change,
|
||||
)
|
||||
return None
|
||||
|
||||
# Mirror case: positive net change we couldn't account for with the
|
||||
# delta results (possibly clock skew on ``updated_at``, or an asset
|
||||
# whose timestamp is before ``old_updated_at`` yet the album's
|
||||
# ``updatedAt`` bumped). Full fetch to avoid silently missing adds.
|
||||
if net_change > 0 and len(newly_added) < net_change:
|
||||
_LOGGER.info(
|
||||
"Delta for album %s found %d additions but net change is %d "
|
||||
"— falling back to full fetch to avoid missing assets",
|
||||
album_id, len(newly_added), net_change,
|
||||
)
|
||||
return None
|
||||
|
||||
events: list[ServiceEvent] = []
|
||||
now = datetime.now(timezone.utc)
|
||||
external_url = self._external_domain or self._client.external_url
|
||||
album_url = f"{external_url}/albums/{album_id}"
|
||||
|
||||
# Carry album-level attributes we know from the cheap meta probe.
|
||||
# Shared-link enrichment happens further down only if we emitted
|
||||
# any asset events.
|
||||
base_extra = {
|
||||
"album_url": album_url,
|
||||
"shared": new_meta.shared,
|
||||
"asset_count": new_meta.asset_count,
|
||||
"photo_count": 0, # unknown without per-asset scan; templates tolerate 0
|
||||
"video_count": 0,
|
||||
"people": [],
|
||||
"owner": "",
|
||||
}
|
||||
|
||||
# Metadata-only events (no asset fetch needed)
|
||||
old_fp = prev.get("meta_fingerprint") or {}
|
||||
if old_fp.get("name") and old_fp["name"] != new_meta.name:
|
||||
events.append(ServiceEvent(
|
||||
event_type=EventType.COLLECTION_RENAMED,
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
provider_name=self._name,
|
||||
collection_id=album_id,
|
||||
collection_name=new_meta.name,
|
||||
timestamp=now,
|
||||
added_assets=[],
|
||||
removed_asset_ids=[],
|
||||
added_count=0,
|
||||
removed_count=0,
|
||||
old_name=old_fp["name"],
|
||||
new_name=new_meta.name,
|
||||
extra=dict(base_extra),
|
||||
))
|
||||
|
||||
if "shared" in old_fp and bool(old_fp["shared"]) != bool(new_meta.shared):
|
||||
events.append(ServiceEvent(
|
||||
event_type=EventType.SHARING_CHANGED,
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
provider_name=self._name,
|
||||
collection_id=album_id,
|
||||
collection_name=new_meta.name,
|
||||
timestamp=now,
|
||||
added_assets=[],
|
||||
removed_asset_ids=[],
|
||||
added_count=0,
|
||||
removed_count=0,
|
||||
old_shared=bool(old_fp["shared"]),
|
||||
new_shared=bool(new_meta.shared),
|
||||
extra=dict(base_extra),
|
||||
))
|
||||
|
||||
if newly_added:
|
||||
total_added = len(newly_added)
|
||||
truncated = newly_added[:_MAX_ASSETS_PER_EVENT]
|
||||
media_assets = [
|
||||
asset_to_media(a, self._client.external_url) for a in truncated
|
||||
]
|
||||
extra = dict(base_extra)
|
||||
if total_added > _MAX_ASSETS_PER_EVENT:
|
||||
extra["truncated"] = True
|
||||
extra["shown_count"] = _MAX_ASSETS_PER_EVENT
|
||||
_LOGGER.info(
|
||||
"Delta-path truncated assets_added event for album %s: %d → %d",
|
||||
album_id, total_added, _MAX_ASSETS_PER_EVENT,
|
||||
)
|
||||
events.append(ServiceEvent(
|
||||
event_type=EventType.ASSETS_ADDED,
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
provider_name=self._name,
|
||||
collection_id=album_id,
|
||||
collection_name=new_meta.name,
|
||||
timestamp=now,
|
||||
added_assets=media_assets,
|
||||
removed_asset_ids=[],
|
||||
added_count=total_added,
|
||||
removed_count=0,
|
||||
extra=extra,
|
||||
))
|
||||
|
||||
if events:
|
||||
await self._enrich_with_shared_links(album_id, events)
|
||||
|
||||
# Rebuild state. asset_ids grows by the newly-added processed set.
|
||||
# pending is the union of the prior pending list (things still in
|
||||
# flight) and anything the delta confirmed as not-yet-processed.
|
||||
# When net_change is 0 or negative we trust the meta count over
|
||||
# our bookkeeping — skip-path will fix drift on the next full fetch.
|
||||
new_asset_ids = prev_asset_ids | {a.id for a in newly_added}
|
||||
# Discard any previously-pending IDs that just landed as processed.
|
||||
new_pending = (prev_pending | still_pending) - {a.id for a in newly_added}
|
||||
|
||||
return {
|
||||
"events": events,
|
||||
"new_state": {
|
||||
"name": new_meta.name,
|
||||
"asset_ids": list(new_asset_ids),
|
||||
"shared": new_meta.shared,
|
||||
"pending_asset_ids": list(new_pending),
|
||||
"meta_fingerprint": new_meta.fingerprint(),
|
||||
},
|
||||
}
|
||||
|
||||
async def _enrich_with_shared_links(
|
||||
self, album_id: str, events_to_enrich: list[ServiceEvent]
|
||||
) -> None:
|
||||
"""Attach public/protected share link URLs to events for this album.
|
||||
|
||||
Uses the tick-scoped bulk cache populated lazily on first call, so a
|
||||
tracker with changes across N albums makes one ``/api/shared-links``
|
||||
request per tick instead of N.
|
||||
"""
|
||||
if self._tick_shared_links is None:
|
||||
self._tick_shared_links = await self._client.get_all_shared_links_by_album()
|
||||
|
||||
shared_links = self._tick_shared_links.get(album_id, [])
|
||||
public_link = None
|
||||
protected_link = None
|
||||
for link in shared_links:
|
||||
if link.is_accessible and not link.is_expired:
|
||||
if link.has_password:
|
||||
protected_link = link
|
||||
else:
|
||||
public_link = link
|
||||
break # prefer non-password link
|
||||
|
||||
ext_domain = self._external_domain or self._client.external_url
|
||||
for evt in events_to_enrich:
|
||||
if public_link:
|
||||
evt.extra["public_url"] = f"{ext_domain}/share/{public_link.key}"
|
||||
elif protected_link:
|
||||
evt.extra["protected_url"] = f"{ext_domain}/share/{protected_link.key}"
|
||||
|
||||
def get_available_variables(self) -> list[TemplateVariableDefinition]:
|
||||
return list(IMMICH_VARIABLES)
|
||||
|
||||
@@ -262,13 +607,33 @@ class ImmichServiceProvider(ServiceProvider):
|
||||
return {"ok": False, "message": "Failed to connect to Immich"}
|
||||
|
||||
|
||||
def _serialize_album_state(album: ImmichAlbumData) -> dict[str, Any]:
|
||||
"""Serialize album state for persistence."""
|
||||
def _serialize_album_state(
|
||||
album: ImmichAlbumData,
|
||||
meta: ImmichAlbumMeta | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Serialize album state for persistence.
|
||||
|
||||
``meta`` carries the fingerprint used for cheap no-change detection on
|
||||
subsequent polls. When omitted (legacy callers, tests) we synthesize a
|
||||
best-effort fingerprint from the full album — it will still match on the
|
||||
next tick if nothing changed, which is what matters.
|
||||
"""
|
||||
if meta is None:
|
||||
fingerprint = {
|
||||
"updated_at": album.updated_at,
|
||||
"asset_count": len(album.asset_ids),
|
||||
"shared": album.shared,
|
||||
"name": album.name,
|
||||
"thumbnail_asset_id": album.thumbnail_asset_id or "",
|
||||
}
|
||||
else:
|
||||
fingerprint = meta.fingerprint()
|
||||
return {
|
||||
"name": album.name,
|
||||
"asset_ids": list(album.asset_ids),
|
||||
"shared": album.shared,
|
||||
"pending_asset_ids": [],
|
||||
"meta_fingerprint": fingerprint,
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "notify-bridge-server"
|
||||
version = "0.2.5"
|
||||
version = "0.3.1"
|
||||
description = "Standalone Notify Bridge server — FastAPI REST API with SQLite database"
|
||||
requires-python = ">=3.12"
|
||||
dependencies = [
|
||||
|
||||
@@ -100,6 +100,13 @@ async def update_settings(
|
||||
if value is None:
|
||||
continue
|
||||
value_str = str(value)
|
||||
# GET masks the webhook secret as "***<last4>" so the real value is
|
||||
# never exposed to the frontend. If the client sends the mask back
|
||||
# (which happens on every save, since bind:value holds whatever GET
|
||||
# returned), treat it as "unchanged" — otherwise we'd overwrite the
|
||||
# real secret with its mask, silently breaking webhook HMAC.
|
||||
if key == "telegram_webhook_secret" and value_str.startswith("***"):
|
||||
continue
|
||||
row = await session.get(AppSetting, key)
|
||||
if row:
|
||||
row.value = value_str
|
||||
|
||||
@@ -41,6 +41,36 @@ _rate_limits: TTLCache = TTLCache(maxsize=10000, ttl=3600)
|
||||
# Maximum responses per command to avoid Telegram rate limits
|
||||
_MAX_RESPONSES_PER_COMMAND = 5
|
||||
|
||||
# Commands that fetch assets from the service provider and usually reply
|
||||
# with media — "uploading photo" is the accurate UX hint while we wait on
|
||||
# the provider API + Telegram upload.
|
||||
_UPLOAD_PHOTO_COMMANDS = frozenset({
|
||||
"latest", "random", "favorites", "memory",
|
||||
"search", "find", "person", "place",
|
||||
})
|
||||
|
||||
# Commands that fetch from the provider but reply with text only.
|
||||
# "typing" is accurate; we still want an indicator because the fetch is slow.
|
||||
_TYPING_COMMANDS = frozenset({"summary"})
|
||||
|
||||
|
||||
def classify_command_chat_action(text: str) -> str | None:
|
||||
"""Return the Telegram chat-action hint to show for this command, or None.
|
||||
|
||||
The classification is by command name alone — good enough for the
|
||||
cases where a chat action is worthwhile (slow provider fetches). Fast
|
||||
DB-only commands (``/status``, ``/albums``, ``/events``, ``/people``)
|
||||
return ``None`` and skip the indicator entirely.
|
||||
"""
|
||||
cmd, _, _ = parse_command(text)
|
||||
if not cmd:
|
||||
return None
|
||||
if cmd in _UPLOAD_PHOTO_COMMANDS:
|
||||
return "upload_photo"
|
||||
if cmd in _TYPING_COMMANDS:
|
||||
return "typing"
|
||||
return None
|
||||
|
||||
|
||||
def _check_rate_limit(bot_id: int, chat_id: str, cmd: str, limits: dict[str, int]) -> int | None:
|
||||
"""Check rate limit. Returns seconds to wait, or None if OK."""
|
||||
@@ -367,20 +397,23 @@ async def send_reply(
|
||||
bot_token: str, chat_id: str, text: str, reply_to_message_id: int | None = None,
|
||||
session: aiohttp.ClientSession | None = None,
|
||||
) -> None:
|
||||
"""Send a text reply via TelegramClient.
|
||||
"""Send a text reply to a chat.
|
||||
|
||||
Command responses are listings (albums, people, events, ...) that embed
|
||||
multiple links; Telegram's default behavior of rendering a preview of
|
||||
the first URL is almost never what the user wants and clashes with the
|
||||
"Disable link previews" toggle operators set on their Telegram target.
|
||||
We always pass ``disable_web_page_preview=True`` here.
|
||||
Thin wrapper that goes through the single ``services.telegram_send``
|
||||
entry point so commands and notifications share one routine — same
|
||||
HTTP session pool, same file_id caches.
|
||||
|
||||
Command responses are listings (albums, people, events, ...) that
|
||||
embed multiple links; Telegram's default behavior of rendering a
|
||||
preview of the first URL is almost never what the user wants and
|
||||
clashes with the "Disable link previews" toggle operators set on
|
||||
their Telegram target. We always pass
|
||||
``disable_web_page_preview=True`` here.
|
||||
"""
|
||||
if session is None:
|
||||
from ..services.http_session import get_http_session
|
||||
session = await get_http_session()
|
||||
client = TelegramClient(session, bot_token)
|
||||
result = await client.send_message(
|
||||
chat_id, text,
|
||||
from ..services.telegram_send import send_telegram_message
|
||||
|
||||
result = await send_telegram_message(
|
||||
bot_token, chat_id, text,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
disable_web_page_preview=True,
|
||||
)
|
||||
@@ -393,30 +426,28 @@ async def send_media_group(
|
||||
reply_to_message_id: int | None = None,
|
||||
session: aiohttp.ClientSession | None = None,
|
||||
) -> None:
|
||||
"""Send media items via TelegramClient.send_notification."""
|
||||
"""Send media items via the shared Telegram routine.
|
||||
|
||||
``media_items`` must already be in TelegramClient asset format — each
|
||||
entry contains ``type`` (``"photo"``/``"video"``/``"document"``),
|
||||
``url``, optional ``cache_key``, and optional ``headers``. Provider
|
||||
command handlers build this format via
|
||||
``build_telegram_asset_entry`` — the same helper the notification
|
||||
dispatcher uses — so videos keep their ``"video"`` type and point at
|
||||
a real video URL instead of a still thumbnail.
|
||||
|
||||
Uses ``services.telegram_send.send_telegram_media`` so the URL cache
|
||||
and asset cache are wired in exactly like the notification path.
|
||||
Repeated ``/latest`` / ``/random`` commands that match previously-sent
|
||||
assets hit the cache and skip the re-upload.
|
||||
"""
|
||||
if not media_items:
|
||||
return
|
||||
|
||||
# Convert command handler media format to TelegramClient asset format
|
||||
assets = []
|
||||
for item in media_items:
|
||||
assets.append({
|
||||
"type": "photo",
|
||||
"url": item.get("thumbnail_url", ""),
|
||||
"cache_key": item.get("asset_id", ""),
|
||||
"headers": {"x-api-key": item.get("api_key", "")},
|
||||
})
|
||||
from ..services.telegram_send import send_telegram_media
|
||||
|
||||
# Build caption from first item
|
||||
captions = [item.get("caption", "") for item in media_items if item.get("caption")]
|
||||
caption = "\n".join(captions) if captions else None
|
||||
|
||||
if session is None:
|
||||
from ..services.http_session import get_http_session
|
||||
session = await get_http_session()
|
||||
client = TelegramClient(session, bot_token)
|
||||
result = await client.send_notification(
|
||||
chat_id, assets=assets, caption=caption,
|
||||
result = await send_telegram_media(
|
||||
bot_token, chat_id, media_items,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
chat_action=None,
|
||||
)
|
||||
|
||||
@@ -6,7 +6,11 @@ import asyncio
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from notify_bridge_core.providers.immich.asset_utils import get_public_url
|
||||
from notify_bridge_core.notifications.telegram.media import build_telegram_asset_entry
|
||||
from notify_bridge_core.providers.immich.asset_utils import (
|
||||
build_asset_media_urls,
|
||||
get_public_url,
|
||||
)
|
||||
|
||||
from ..handler import _render_cmd_template
|
||||
|
||||
@@ -74,13 +78,16 @@ def build_asset_dict(
|
||||
) -> dict[str, Any]:
|
||||
"""Build a rich asset dict for command templates from an ImmichAssetInfo or raw dict."""
|
||||
if isinstance(asset, dict):
|
||||
# Immich raw search responses nest geo under exifInfo — pull it out so
|
||||
# templates can use flat asset.city / asset.country.
|
||||
exif = asset.get("exifInfo") or {}
|
||||
d = {
|
||||
"id": asset.get("id", ""),
|
||||
"originalFileName": asset.get("originalFileName", asset.get("filename", "")),
|
||||
"type": asset.get("type", "IMAGE"),
|
||||
"createdAt": asset.get("createdAt", asset.get("created_at", asset.get("fileCreatedAt", ""))),
|
||||
"city": asset.get("city", ""),
|
||||
"country": asset.get("country", ""),
|
||||
"city": asset.get("city") or exif.get("city") or "",
|
||||
"country": asset.get("country") or exif.get("country") or "",
|
||||
"is_favorite": asset.get("is_favorite", asset.get("isFavorite", False)),
|
||||
"public_url": asset.get("public_url", public_url),
|
||||
}
|
||||
@@ -123,16 +130,32 @@ def _format_assets(
|
||||
})
|
||||
|
||||
if response_mode == "media":
|
||||
# Reuse the same URL rule (build_asset_media_urls) and entry builder
|
||||
# (build_telegram_asset_entry) as the notification dispatcher so both
|
||||
# paths agree on video → /video/playback and photo → thumbnail. When
|
||||
# these diverged, Telegram rendered a still JPEG for each video in
|
||||
# the media group instead of the real clip.
|
||||
#
|
||||
# We deliberately do NOT pass ``cache_key`` here. TelegramClient
|
||||
# derives it from the URL as ``<host>:<uuid>`` — identical to what
|
||||
# the notification dispatcher produces via extract_asset_id_from_url.
|
||||
# Passing the bare UUID would put command writes in a separate
|
||||
# namespace from notification writes, so neither path could hit the
|
||||
# other's cached file_ids (which is what made the cache look empty
|
||||
# from the WebUI after running /random).
|
||||
media_items: list[dict[str, Any]] = []
|
||||
for asset in assets:
|
||||
asset_id = asset.get("id", "")
|
||||
media_items.append({
|
||||
"type": "photo",
|
||||
"asset_id": asset_id,
|
||||
"caption": "",
|
||||
"thumbnail_url": f"{client.url}/api/assets/{asset_id}/thumbnail?size=preview",
|
||||
"api_key": client.api_key,
|
||||
})
|
||||
asset_type = (asset.get("type") or "").upper()
|
||||
preview_url, _ = build_asset_media_urls(client.url, asset_id, asset_type)
|
||||
entry = build_telegram_asset_entry(
|
||||
url=preview_url,
|
||||
media_type="video" if asset_type == "VIDEO" else "image",
|
||||
api_key=client.api_key,
|
||||
internal_url=client.url,
|
||||
)
|
||||
if entry is not None:
|
||||
media_items.append(entry)
|
||||
# Return text message + media items — text is sent first, media as reply
|
||||
return {"text": text, "media": media_items}
|
||||
|
||||
|
||||
@@ -5,17 +5,14 @@ from __future__ import annotations
|
||||
from typing import Any
|
||||
|
||||
from ..handler import _render_cmd_template
|
||||
from .common import _format_assets
|
||||
from .common import _format_assets, build_asset_dict
|
||||
|
||||
|
||||
def _enrich_assets(assets: list[dict[str, Any]], asset_public_urls: dict[str, str]) -> list[dict[str, Any]]:
|
||||
"""Add public_url to assets from the pre-built map. Returns new list without mutating inputs."""
|
||||
if not asset_public_urls:
|
||||
return assets
|
||||
"""Normalize raw Immich assets and attach public_url from the pre-built map."""
|
||||
pub = asset_public_urls or {}
|
||||
return [
|
||||
{**asset, "public_url": asset_public_urls.get(asset.get("id", ""), "")}
|
||||
if asset.get("id", "") in asset_public_urls and not asset.get("public_url")
|
||||
else asset
|
||||
build_asset_dict(asset, public_url=pub.get(asset.get("id", ""), ""))
|
||||
for asset in assets
|
||||
]
|
||||
|
||||
|
||||
@@ -15,8 +15,9 @@ from notify_bridge_core.notifications.telegram.client import TelegramClient
|
||||
from ..database.engine import get_session
|
||||
from ..database.models import TelegramBot, TelegramChat
|
||||
from ..services.telegram import save_chat_from_webhook
|
||||
from ..services.telegram_send import telegram_chat_action
|
||||
from .base import CommandResponse
|
||||
from .handler import handle_command, send_media_group, send_reply
|
||||
from .handler import classify_command_chat_action, handle_command, send_media_group, send_reply
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@@ -95,14 +96,17 @@ async def telegram_webhook(
|
||||
return {"ok": True, "skipped": "commands_disabled"}
|
||||
effective_lang = chat_row.language_override or msg_language
|
||||
message_id = message.get("message_id")
|
||||
responses = await handle_command(bot, chat_id, text, language_code=effective_lang)
|
||||
if responses:
|
||||
for resp in responses:
|
||||
if resp.text:
|
||||
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
|
||||
if resp.media:
|
||||
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
|
||||
return {"ok": True}
|
||||
async with telegram_chat_action(
|
||||
bot_token, chat_id, classify_command_chat_action(text),
|
||||
):
|
||||
responses = await handle_command(bot, chat_id, text, language_code=effective_lang)
|
||||
if responses:
|
||||
for resp in responses:
|
||||
if resp.text:
|
||||
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
|
||||
if resp.media:
|
||||
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
|
||||
return {"ok": True}
|
||||
|
||||
return {"ok": True, "skipped": "not_a_command"}
|
||||
|
||||
|
||||
@@ -309,6 +309,14 @@ async def migrate_schema(engine: AsyncEngine) -> None:
|
||||
text(f"ALTER TABLE {state_table} ADD COLUMN shared INTEGER DEFAULT 0")
|
||||
)
|
||||
logger.info("Added shared column to %s table", state_table)
|
||||
# meta_fingerprint — small JSON blob captured from the provider's
|
||||
# cheap meta probe. An empty default means "unknown, do a full
|
||||
# fetch next tick" so existing rows don't wrongly skip detection.
|
||||
if not await _has_column(conn, state_table, "meta_fingerprint"):
|
||||
await conn.execute(
|
||||
text(f"ALTER TABLE {state_table} ADD COLUMN meta_fingerprint TEXT DEFAULT '{{}}'")
|
||||
)
|
||||
logger.info("Added meta_fingerprint column to %s table", state_table)
|
||||
|
||||
# Add language_code to telegram_chat if missing
|
||||
if await _has_table(conn, "telegram_chat"):
|
||||
|
||||
@@ -376,6 +376,13 @@ class NotificationTrackerState(SQLModel, table=True):
|
||||
shared: bool = Field(default=False)
|
||||
asset_ids: list[str] = Field(default_factory=list, sa_column=Column(JSON))
|
||||
pending_asset_ids: list[str] = Field(default_factory=list, sa_column=Column(JSON))
|
||||
# Lightweight fingerprint ({updated_at, asset_count, shared, name, ...})
|
||||
# captured from the provider's cheap meta probe. Letting this differ from
|
||||
# the current provider response is what tells the watcher a full fetch is
|
||||
# actually required — letting it match lets the watcher skip the big read.
|
||||
meta_fingerprint: dict[str, Any] = Field(
|
||||
default_factory=dict, sa_column=Column(JSON)
|
||||
)
|
||||
last_updated: datetime = Field(default_factory=_utcnow)
|
||||
|
||||
|
||||
|
||||
@@ -10,6 +10,49 @@ _LOGGER = logging.getLogger(__name__)
|
||||
|
||||
_scheduler: AsyncIOScheduler | None = None
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Adaptive polling (Tier 6 of the big-album optimization plan).
|
||||
#
|
||||
# We don't touch the user-configured ``scan_interval`` — that's still the
|
||||
# authoritative cadence. Instead, we *skip* a growing fraction of scheduled
|
||||
# ticks when a tracker is idle, and reset to 1:1 as soon as it detects
|
||||
# anything. The scheduler keeps running on the user's chosen period, so
|
||||
# response time to the *first* change after an idle stretch is never worse
|
||||
# than one tick — but the steady-state HTTP cost for a fleet of idle
|
||||
# trackers drops by ~75%.
|
||||
#
|
||||
# Thresholds are intentionally conservative: a tracker polling every 30 s
|
||||
# needs 5 min of silence before we halve its effective rate, and 15 min
|
||||
# before we quarter it. Any caller can disable adaptive behavior by passing
|
||||
# ``adaptive=False`` in the tracker filters dict (checked in ``_poll_tracker``).
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_ADAPTIVE_HALVE_THRESHOLD = 10 # consecutive empty ticks → 1-in-2
|
||||
_ADAPTIVE_QUARTER_THRESHOLD = 30 # consecutive empty ticks → 1-in-4
|
||||
_ADAPTIVE_MAX_SKIP = 4 # hard cap on skip factor
|
||||
|
||||
# Per-tracker adaptive state, keyed by tracker_id. Rebuilt on process
|
||||
# restart — a short warmup period is fine and avoids persisting what is
|
||||
# effectively a performance heuristic.
|
||||
_adaptive_state: dict[int, dict[str, int]] = {}
|
||||
|
||||
|
||||
def _compute_jitter(interval_seconds: int) -> int:
|
||||
"""Return a jitter bound (in seconds) suitable for an IntervalTrigger.
|
||||
|
||||
Without jitter, a fleet of N trackers all on ``scan_interval=60`` wake up
|
||||
at the same wall-clock second every minute — that creates a thundering-
|
||||
herd on the upstream Immich/Gitea/etc. server. APScheduler's ``jitter``
|
||||
randomizes each tick's firing time by ±jitter seconds.
|
||||
|
||||
We use a quarter of the interval up to a 30 s cap. For short intervals
|
||||
(≤8 s) jitter would round to 0 — that's fine, at those cadences a
|
||||
bursty pattern is what the user implicitly opted into.
|
||||
"""
|
||||
if interval_seconds <= 0:
|
||||
return 0
|
||||
return min(interval_seconds // 4, 30)
|
||||
|
||||
|
||||
def get_scheduler() -> AsyncIOScheduler:
|
||||
global _scheduler
|
||||
@@ -271,16 +314,21 @@ async def _load_tracker_jobs() -> None:
|
||||
tracker.id, tracker.name, e,
|
||||
)
|
||||
|
||||
jitter = _compute_jitter(tracker.scan_interval)
|
||||
scheduler.add_job(
|
||||
_poll_tracker,
|
||||
"interval",
|
||||
seconds=tracker.scan_interval,
|
||||
jitter=jitter or None,
|
||||
id=job_id,
|
||||
args=[tracker.id],
|
||||
replace_existing=True,
|
||||
max_instances=1,
|
||||
)
|
||||
_LOGGER.info("Scheduled tracker %d (%s) every %ds", tracker.id, tracker.name, tracker.scan_interval)
|
||||
_LOGGER.info(
|
||||
"Scheduled tracker %d (%s) every %ds (jitter ±%ds)",
|
||||
tracker.id, tracker.name, tracker.scan_interval, jitter,
|
||||
)
|
||||
|
||||
|
||||
def _add_cron_job(
|
||||
@@ -313,6 +361,10 @@ async def schedule_tracker(
|
||||
scheduler = get_scheduler()
|
||||
job_id = f"tracker_{tracker_id}"
|
||||
|
||||
# A reschedule typically follows a config edit or enable/disable flip —
|
||||
# drop adaptive back-off so the first tick after the change runs promptly.
|
||||
reset_adaptive_state(tracker_id)
|
||||
|
||||
# Remove existing job first to allow trigger type changes
|
||||
if scheduler.get_job(job_id):
|
||||
scheduler.remove_job(job_id)
|
||||
@@ -324,33 +376,113 @@ async def schedule_tracker(
|
||||
except Exception as e:
|
||||
_LOGGER.error("Invalid cron for tracker %d: %s — using interval", tracker_id, e)
|
||||
|
||||
jitter = _compute_jitter(interval)
|
||||
scheduler.add_job(
|
||||
_poll_tracker,
|
||||
"interval",
|
||||
seconds=interval,
|
||||
jitter=jitter or None,
|
||||
id=job_id,
|
||||
args=[tracker_id],
|
||||
replace_existing=True,
|
||||
)
|
||||
_LOGGER.info("Scheduled tracker %d every %ds", tracker_id, interval)
|
||||
_LOGGER.info(
|
||||
"Scheduled tracker %d every %ds (jitter ±%ds)", tracker_id, interval, jitter,
|
||||
)
|
||||
|
||||
|
||||
async def unschedule_tracker(tracker_id: int) -> None:
|
||||
"""Remove a scheduler job for a tracker."""
|
||||
scheduler = get_scheduler()
|
||||
job_id = f"tracker_{tracker_id}"
|
||||
reset_adaptive_state(tracker_id)
|
||||
if scheduler.get_job(job_id):
|
||||
scheduler.remove_job(job_id)
|
||||
_LOGGER.info("Unscheduled tracker %d", tracker_id)
|
||||
|
||||
|
||||
def _adaptive_should_skip(tracker_id: int) -> bool:
|
||||
"""Return True when the adaptive heuristic says to skip this tick.
|
||||
|
||||
Run-length skip: if we're in 1-in-K mode, skip (K-1) ticks between each
|
||||
real poll. Stateless about the *current* tick counter except for the
|
||||
``tick_counter`` we bump here.
|
||||
"""
|
||||
state = _adaptive_state.get(tracker_id)
|
||||
if not state:
|
||||
return False
|
||||
skip_every = state.get("skip_every", 1)
|
||||
if skip_every <= 1:
|
||||
return False
|
||||
state["tick_counter"] = state.get("tick_counter", 0) + 1
|
||||
# Fire on ticks where counter % skip_every == 0; skip the rest.
|
||||
return (state["tick_counter"] % skip_every) != 0
|
||||
|
||||
|
||||
def _adaptive_update(tracker_id: int, events_detected: int) -> None:
|
||||
"""Update the adaptive counter after a real tick ran."""
|
||||
state = _adaptive_state.setdefault(
|
||||
tracker_id, {"empty_count": 0, "skip_every": 1, "tick_counter": 0}
|
||||
)
|
||||
if events_detected > 0:
|
||||
if state["skip_every"] > 1:
|
||||
_LOGGER.info(
|
||||
"Adaptive polling: tracker %d saw activity, restoring base rate",
|
||||
tracker_id,
|
||||
)
|
||||
state["empty_count"] = 0
|
||||
state["skip_every"] = 1
|
||||
state["tick_counter"] = 0
|
||||
return
|
||||
|
||||
state["empty_count"] = state.get("empty_count", 0) + 1
|
||||
if (
|
||||
state["empty_count"] >= _ADAPTIVE_QUARTER_THRESHOLD
|
||||
and state["skip_every"] < _ADAPTIVE_MAX_SKIP
|
||||
):
|
||||
state["skip_every"] = _ADAPTIVE_MAX_SKIP
|
||||
_LOGGER.info(
|
||||
"Adaptive polling: tracker %d idle for %d ticks, skipping 3 of 4",
|
||||
tracker_id, state["empty_count"],
|
||||
)
|
||||
elif (
|
||||
state["empty_count"] >= _ADAPTIVE_HALVE_THRESHOLD
|
||||
and state["skip_every"] < 2
|
||||
):
|
||||
state["skip_every"] = 2
|
||||
_LOGGER.info(
|
||||
"Adaptive polling: tracker %d idle for %d ticks, skipping every other",
|
||||
tracker_id, state["empty_count"],
|
||||
)
|
||||
|
||||
|
||||
def reset_adaptive_state(tracker_id: int) -> None:
|
||||
"""Drop cached adaptive counters for a tracker.
|
||||
|
||||
Used by API callers that make changes requiring the tracker to run
|
||||
promptly on the next scheduled tick (enable/disable, config edits,
|
||||
manual "check now" actions).
|
||||
"""
|
||||
_adaptive_state.pop(tracker_id, None)
|
||||
|
||||
|
||||
async def _poll_tracker(tracker_id: int) -> None:
|
||||
"""Poll a tracker for changes."""
|
||||
from .watcher import check_tracker
|
||||
|
||||
if _adaptive_should_skip(tracker_id):
|
||||
return
|
||||
|
||||
try:
|
||||
await check_tracker(tracker_id)
|
||||
result = await check_tracker(tracker_id)
|
||||
except Exception as e:
|
||||
_LOGGER.error("Error polling tracker %d: %s", tracker_id, e)
|
||||
return
|
||||
|
||||
# Treat the "error" / "skipped" statuses as inconclusive — don't let
|
||||
# a transient upstream failure trick the heuristic into backing off.
|
||||
if isinstance(result, dict) and result.get("status") == "ok":
|
||||
_adaptive_update(tracker_id, int(result.get("events_detected", 0) or 0))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -257,7 +257,13 @@ async def _poll_bot(bot_id: int) -> None:
|
||||
_last_update_id[bot_id] = updates[-1]["update_id"]
|
||||
|
||||
# Process each update
|
||||
from ..commands.handler import handle_command, send_media_group, send_reply
|
||||
from ..commands.handler import (
|
||||
classify_command_chat_action,
|
||||
handle_command,
|
||||
send_media_group,
|
||||
send_reply,
|
||||
)
|
||||
from .telegram_send import telegram_chat_action
|
||||
|
||||
for update in updates:
|
||||
message = update.get("message")
|
||||
@@ -295,13 +301,16 @@ async def _poll_bot(bot_id: int) -> None:
|
||||
continue
|
||||
effective_lang = chat_row.language_override or msg_language
|
||||
message_id = message.get("message_id")
|
||||
responses = await handle_command(bot_obj, chat_id, text, language_code=effective_lang)
|
||||
if responses:
|
||||
for resp in responses:
|
||||
if resp.text:
|
||||
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
|
||||
if resp.media:
|
||||
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
|
||||
async with telegram_chat_action(
|
||||
bot_token, chat_id, classify_command_chat_action(text),
|
||||
):
|
||||
responses = await handle_command(bot_obj, chat_id, text, language_code=effective_lang)
|
||||
if responses:
|
||||
for resp in responses:
|
||||
if resp.text:
|
||||
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
|
||||
if resp.media:
|
||||
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
|
||||
except Exception:
|
||||
_LOGGER.error("Error handling command from bot %d", bot_id, exc_info=True)
|
||||
|
||||
|
||||
@@ -0,0 +1,149 @@
|
||||
"""Single entry point for all Telegram send operations.
|
||||
|
||||
Both the notification dispatcher (event-driven) and the bot command
|
||||
handlers (user-driven) funnel their Telegram API calls through this
|
||||
module. Keeping construction in one place means:
|
||||
|
||||
* The shared aiohttp session is always reused (one TCP pool for the
|
||||
whole process).
|
||||
* The Telegram file_id caches (URL cache + asset cache) are always
|
||||
wired in, so repeated sends — whether from a scheduled tracker or
|
||||
a ``/latest`` command — reuse cached file_ids instead of re-uploading
|
||||
the same bytes.
|
||||
* Future cross-cutting concerns (rate limiting, telemetry, retries)
|
||||
have exactly one place to live.
|
||||
|
||||
The actual Telegram API routine is still ``TelegramClient`` in core —
|
||||
this module just guarantees every caller gets a properly-wired client.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
from typing import Any, AsyncIterator, Callable
|
||||
|
||||
import aiohttp
|
||||
|
||||
from notify_bridge_core.notifications.telegram.client import (
|
||||
NotificationResult,
|
||||
TelegramClient,
|
||||
)
|
||||
|
||||
from .http_session import get_http_session
|
||||
from .watcher import _get_telegram_caches
|
||||
|
||||
|
||||
async def get_telegram_client(
|
||||
bot_token: str,
|
||||
*,
|
||||
session: aiohttp.ClientSession | None = None,
|
||||
thumbhash_resolver: Callable[[str], str | None] | None = None,
|
||||
) -> TelegramClient:
|
||||
"""Return a ``TelegramClient`` wired to shared session + shared caches.
|
||||
|
||||
Every Telegram send in the process should acquire its client from
|
||||
here — constructing ``TelegramClient`` directly skips the caches and
|
||||
silently halves cache hit rate.
|
||||
|
||||
Args:
|
||||
bot_token: The bot's API token.
|
||||
session: Optional explicit aiohttp session. Defaults to the
|
||||
process-wide shared session.
|
||||
thumbhash_resolver: Optional asset-id → thumbhash lookup. The
|
||||
notification dispatcher passes one so asset-cache entries
|
||||
invalidate on visual change; the command path doesn't need it
|
||||
(commands always ask for a fresh result).
|
||||
"""
|
||||
if session is None:
|
||||
session = await get_http_session()
|
||||
url_cache, asset_cache = await _get_telegram_caches()
|
||||
return TelegramClient(
|
||||
session, bot_token,
|
||||
url_cache=url_cache,
|
||||
asset_cache=asset_cache,
|
||||
thumbhash_resolver=thumbhash_resolver,
|
||||
)
|
||||
|
||||
|
||||
async def send_telegram_message(
|
||||
bot_token: str,
|
||||
chat_id: str,
|
||||
text: str,
|
||||
*,
|
||||
reply_to_message_id: int | None = None,
|
||||
disable_web_page_preview: bool = True,
|
||||
parse_mode: str = "HTML",
|
||||
) -> NotificationResult:
|
||||
"""Send a plain-text Telegram message with caches wired in."""
|
||||
client = await get_telegram_client(bot_token)
|
||||
return await client.send_message(
|
||||
chat_id, text,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
disable_web_page_preview=disable_web_page_preview,
|
||||
parse_mode=parse_mode,
|
||||
)
|
||||
|
||||
|
||||
async def send_telegram_media(
|
||||
bot_token: str,
|
||||
chat_id: str,
|
||||
assets: list[dict[str, Any]],
|
||||
*,
|
||||
caption: str | None = None,
|
||||
reply_to_message_id: int | None = None,
|
||||
max_group_size: int = 10,
|
||||
chunk_delay: int = 0,
|
||||
max_asset_data_size: int | None = None,
|
||||
send_large_photos_as_documents: bool = False,
|
||||
chat_action: str | None = "typing",
|
||||
thumbhash_resolver: Callable[[str], str | None] | None = None,
|
||||
) -> NotificationResult:
|
||||
"""Send a Telegram media group (or single asset) with caches wired in.
|
||||
|
||||
``assets`` must be in ``TelegramClient`` format — see
|
||||
``notify_bridge_core.notifications.telegram.media.build_telegram_asset_entry``
|
||||
for the canonical builder.
|
||||
"""
|
||||
client = await get_telegram_client(
|
||||
bot_token, thumbhash_resolver=thumbhash_resolver,
|
||||
)
|
||||
return await client.send_notification(
|
||||
chat_id,
|
||||
assets=assets,
|
||||
caption=caption,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
max_group_size=max_group_size,
|
||||
chunk_delay=chunk_delay,
|
||||
max_asset_data_size=max_asset_data_size,
|
||||
send_large_photos_as_documents=send_large_photos_as_documents,
|
||||
chat_action=chat_action,
|
||||
)
|
||||
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def telegram_chat_action(
|
||||
bot_token: str,
|
||||
chat_id: str,
|
||||
action: str | None,
|
||||
) -> AsyncIterator[None]:
|
||||
"""Hold a Telegram chat action (e.g. ``upload_photo``) for the block's duration.
|
||||
|
||||
Used by the command path to show ``typing`` / ``uploading photo`` while
|
||||
the bot fetches assets from the service (Immich, etc.) AND uploads them
|
||||
to Telegram — i.e. for the whole user-visible wait, not just the upload.
|
||||
|
||||
A ``None`` action makes this a no-op so callers don't have to branch.
|
||||
"""
|
||||
if not action:
|
||||
yield
|
||||
return
|
||||
|
||||
client = await get_telegram_client(bot_token)
|
||||
task = client.start_chat_action_keepalive(chat_id, action)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await task
|
||||
@@ -187,8 +187,17 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
|
||||
"asset_ids": s.asset_ids,
|
||||
"pending_asset_ids": s.pending_asset_ids,
|
||||
"shared": bool(s.shared),
|
||||
"meta_fingerprint": s.meta_fingerprint or {},
|
||||
}
|
||||
|
||||
# Snapshot the original fingerprint per collection so we can skip the
|
||||
# (expensive) asset_ids rewrite when nothing changed. For a 200k-asset
|
||||
# album this avoids a ~7 MB JSON write to the state row every tick.
|
||||
original_fingerprints: dict[str, dict[str, Any]] = {
|
||||
cid: dict(cstate.get("meta_fingerprint") or {})
|
||||
for cid, cstate in state_dict.items()
|
||||
}
|
||||
|
||||
# Load tracker-target links
|
||||
link_data = await load_link_data(session, tracker_id)
|
||||
|
||||
@@ -279,11 +288,20 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
|
||||
existing = s
|
||||
break
|
||||
|
||||
current_fingerprint = dict(cstate.get("meta_fingerprint") or {})
|
||||
prior_fingerprint = original_fingerprints.get(cid, {})
|
||||
# Skip the DB update when the provider reported no meaningful
|
||||
# change. ``existing`` is None on first-ever fetch for a
|
||||
# collection — that path always writes so the row gets created.
|
||||
if existing is not None and current_fingerprint == prior_fingerprint:
|
||||
continue
|
||||
|
||||
if existing:
|
||||
existing.asset_ids = cstate.get("asset_ids", [])
|
||||
existing.pending_asset_ids = cstate.get("pending_asset_ids", [])
|
||||
existing.collection_name = cstate.get("name", "")
|
||||
existing.shared = cstate.get("shared", False)
|
||||
existing.meta_fingerprint = current_fingerprint
|
||||
session.add(existing)
|
||||
else:
|
||||
new_ts = NotificationTrackerState(
|
||||
@@ -293,6 +311,7 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
|
||||
shared=cstate.get("shared", False),
|
||||
asset_ids=cstate.get("asset_ids", []),
|
||||
pending_asset_ids=cstate.get("pending_asset_ids", []),
|
||||
meta_fingerprint=current_fingerprint,
|
||||
)
|
||||
session.add(new_ts)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user