Files
notify-bridge/packages/server/src/notify_bridge_server/commands/immich/common.py
T
alexei.dolgolyov 7dae68fd93 fix(commands): match notification cache-key format so writes share one namespace
common._format_assets was passing cache_key=<bare asset UUID>, but the
notification dispatcher writes keys as <host>:<uuid> (derived from the
URL by extract_asset_id_from_url). Result: the two paths populated
different keys for the same asset, so neither could hit the other's
cached file_id and the WebUI stats only ever reflected the notification
side.

Drop the explicit cache_key — TelegramClient derives <host>:<uuid> from
the URL, identical to the notification path, so one file_id cached by
any dispatch or /random / /latest reply is reused by every later send.
2026-04-22 17:00:07 +03:00

163 lines
6.3 KiB
Python

"""Shared helpers, imports, and constants for Immich command handlers."""
from __future__ import annotations
import asyncio
import logging
from typing import Any
from notify_bridge_core.notifications.telegram.media import build_telegram_asset_entry
from notify_bridge_core.providers.immich.asset_utils import (
build_asset_media_urls,
get_public_url,
)
from ..handler import _render_cmd_template
_LOGGER = logging.getLogger(__name__)
_IMMICH_COMMANDS = {
"status", "albums", "events", "people",
"search", "find", "person", "place",
"latest", "random", "favorites", "summary", "memory",
}
async def fetch_albums_with_links(
client: Any,
album_ids: list[str],
ext_domain: str,
*,
include_failed: bool = True,
) -> list[dict[str, Any]]:
"""Fetch albums and their shared links concurrently.
Returns a list of album data dicts with keys: name, asset_count, id,
public_url, and ``_album`` (the raw album object for callers that need
asset-level access).
When *include_failed* is True, albums that fail to fetch are included
with placeholder data (``"?"`` for counts). When False, they are
silently skipped.
"""
album_results = await asyncio.gather(
*[client.get_album(aid) for aid in album_ids],
return_exceptions=True,
)
link_results = await asyncio.gather(
*[client.get_shared_links(aid) for aid in album_ids],
return_exceptions=True,
)
albums_data: list[dict[str, Any]] = []
for album_id, result, links in zip(album_ids, album_results, link_results):
if isinstance(result, Exception):
_LOGGER.warning("Failed to fetch album %s: %s", album_id, result)
if include_failed:
albums_data.append({
"name": f"{album_id[:8]}...", "asset_count": "?",
"id": album_id, "public_url": "", "_album": None,
})
continue
if result:
pub_url = ""
if not isinstance(links, Exception) and ext_domain:
pub_url = get_public_url(ext_domain, links) or ""
albums_data.append({
"name": result.name, "asset_count": result.asset_count,
"id": album_id, "public_url": pub_url, "_album": result,
})
return albums_data
def build_asset_dict(
asset: Any,
*,
public_url: str = "",
year: int | None = None,
) -> dict[str, Any]:
"""Build a rich asset dict for command templates from an ImmichAssetInfo or raw dict."""
if isinstance(asset, dict):
# Immich raw search responses nest geo under exifInfo — pull it out so
# templates can use flat asset.city / asset.country.
exif = asset.get("exifInfo") or {}
d = {
"id": asset.get("id", ""),
"originalFileName": asset.get("originalFileName", asset.get("filename", "")),
"type": asset.get("type", "IMAGE"),
"createdAt": asset.get("createdAt", asset.get("created_at", asset.get("fileCreatedAt", ""))),
"city": asset.get("city") or exif.get("city") or "",
"country": asset.get("country") or exif.get("country") or "",
"is_favorite": asset.get("is_favorite", asset.get("isFavorite", False)),
"public_url": asset.get("public_url", public_url),
}
if year or asset.get("year"):
d["year"] = year or asset.get("year")
return d
# ImmichAssetInfo dataclass
return {
"id": asset.id,
"originalFileName": asset.filename,
"type": asset.type,
"createdAt": asset.created_at,
"city": getattr(asset, "city", "") or "",
"country": getattr(asset, "country", "") or "",
"is_favorite": getattr(asset, "is_favorite", False),
"public_url": public_url,
**({"year": year} if year else {}),
}
def _format_assets(
assets: list[dict[str, Any]], cmd: str, query: str,
locale: str, response_mode: str, client: Any,
cmd_templates: dict[str, dict[str, str]],
) -> str | dict[str, Any]:
"""Format asset results as text or a text-plus-media payload.
Returns:
str: rendered text when *response_mode* is ``"text"`` (or no assets).
dict: ``{"text": ..., "media": [...]}`` when *response_mode* is
``"media"`` and assets are present.
"""
if not assets:
return _render_cmd_template(cmd_templates, "no_results", locale, {"command": cmd, "query": query})
slot_map = {"find": "search", "person": "search", "place": "search"}
slot_name = slot_map.get(cmd, cmd)
text = _render_cmd_template(cmd_templates, slot_name, locale, {
"assets": assets, "query": query, "command": cmd, "count": len(assets),
})
if response_mode == "media":
# Reuse the same URL rule (build_asset_media_urls) and entry builder
# (build_telegram_asset_entry) as the notification dispatcher so both
# paths agree on video → /video/playback and photo → thumbnail. When
# these diverged, Telegram rendered a still JPEG for each video in
# the media group instead of the real clip.
#
# We deliberately do NOT pass ``cache_key`` here. TelegramClient
# derives it from the URL as ``<host>:<uuid>`` — identical to what
# the notification dispatcher produces via extract_asset_id_from_url.
# Passing the bare UUID would put command writes in a separate
# namespace from notification writes, so neither path could hit the
# other's cached file_ids (which is what made the cache look empty
# from the WebUI after running /random).
media_items: list[dict[str, Any]] = []
for asset in assets:
asset_id = asset.get("id", "")
asset_type = (asset.get("type") or "").upper()
preview_url, _ = build_asset_media_urls(client.url, asset_id, asset_type)
entry = build_telegram_asset_entry(
url=preview_url,
media_type="video" if asset_type == "VIDEO" else "image",
api_key=client.api_key,
internal_url=client.url,
)
if entry is not None:
media_items.append(entry)
# Return text message + media items — text is sent first, media as reply
return {"text": text, "media": media_items}
return text