feat(commands): per-chat album scope derived from notification routing

The "per-chat album scope" feature stored on CommandTrackerListener was
really per-bot: listener_id = bot.id, and every chat that bot served
shared the same scope.  Commands like /albums, /random, /status,
/events leaked the full provider catalog into chats that were never
wired up to receive notifications from those trackers.

New model: the album scope for /commands in a given chat is derived
from the notification-routing graph.  For a (provider, bot, chat_id)
triple we walk TargetReceiver (chat_id match, enabled) →
NotificationTarget (telegram or broadcast parent) →
NotificationTrackerTarget → NotificationTracker (provider match) and
union their collection_ids.  That's the natural "what does this chat
get notifications about" set, and it becomes the command scope.

- New helper: command_utils.resolve_chat_album_scope(provider_id,
  bot_id, chat_id) -> set[str].  Empty set is the default for chats
  with no routing — commands return nothing rather than leaking the
  provider's catalog.
- Dispatcher computes the scope per (tracker, bot, chat) and threads
  it through handler.handle(..., allowed_album_ids=...).  Explicit
  CommandTrackerListener.allowed_album_ids override, when set, still
  wins verbatim (kept as an escape hatch for users who want a divergent
  scope for a whole bot).
- /status, /albums, /events, and all /_cmd_immich-routed commands
  (/random, /search, /find, /latest, /memory, /summary, /favorites,
  /place, /person) now intersect with the resolved scope.
- UI scope modal relabeled: it's an explicit *override for this bot*,
  not a per-chat setting.  Default is "derive from notification
  routing", which matches what users already configured elsewhere.

Also:
- /search, /find, /person, /place — _enrich_assets return value was
  discarded, dropping public_url enrichment.  Assign the return value.
- search_smart / search_metadata — consolidated into _search_items
  helper that logs non-200 responses and transport errors instead of
  silently returning [].  Makes "always no results" bugs actually
  diagnosable.  Also accepts the alternate {"assets": [...]} flat-list
  shape from older Immich versions.
- Immich search error bodies go through _redact_body so credentials
  echoed by authenticating proxies don't land in server logs.
This commit is contained in:
2026-04-22 03:20:51 +03:00
parent 4ff3876e49
commit 3b76a09759
13 changed files with 242 additions and 58 deletions
@@ -297,19 +297,9 @@ class ImmichClient:
payload: dict[str, Any] = {"query": query, "page": max(1, page), "size": min(max(1, limit), 100)}
if album_ids:
payload["albumIds"] = album_ids[:MAX_SEARCH_PERSON_IDS]
try:
async with self._session.post(
f"{self._url}/api/search/smart",
headers=self._json_headers,
json=payload,
) as response:
if response.status == 200:
data = await response.json()
items = data.get("assets", {}).get("items", [])
return items[:limit]
except aiohttp.ClientError:
pass
return []
return await self._search_items(
f"{self._url}/api/search/smart", payload, limit, "smart",
)
async def search_metadata(
self,
@@ -322,18 +312,57 @@ class ImmichClient:
payload: dict[str, Any] = {"originalFileName": query, "page": max(1, page), "size": min(max(1, limit), 100)}
if album_ids:
payload["albumIds"] = album_ids[:MAX_SEARCH_PERSON_IDS]
return await self._search_items(
f"{self._url}/api/search/metadata", payload, limit, "metadata",
)
async def _search_items(
self,
url: str,
payload: dict[str, Any],
limit: int,
kind: str,
) -> list[dict[str, Any]]:
"""Shared POST-and-extract-items helper with error logging.
Returns an empty list on any error; previously these paths swallowed
non-200s silently, making "/search always returns no results" on
misbehaving Immich deployments impossible to diagnose without a
network trace. Logging keeps the empty-list contract but tells the
operator *why* it's empty.
"""
try:
async with self._session.post(
f"{self._url}/api/search/metadata",
url,
headers=self._json_headers,
json=payload,
) as response:
if response.status == 200:
data = await response.json()
items = data.get("assets", {}).get("items", [])
return items[:limit]
except aiohttp.ClientError:
pass
if response.status != 200:
body_snip = await response.text()
_LOGGER.warning(
"Immich %s search non-200: HTTP %s body=%s",
kind, response.status, _redact_body(body_snip),
)
return []
data = await response.json()
# Modern Immich: {"assets": {"items": [...], ...}}
assets_block = data.get("assets")
if isinstance(assets_block, dict):
items = assets_block.get("items", []) or []
elif isinstance(assets_block, list):
# Older/alternate shape — flat list of assets.
items = assets_block
else:
_LOGGER.warning(
"Immich %s search returned unexpected shape: keys=%s",
kind, list(data.keys())[:5],
)
items = []
return items[:limit]
except aiohttp.ClientError as err:
_LOGGER.warning("Immich %s search transport error: %s", kind, err)
except Exception as err: # noqa: BLE001 — don't crash caller on unexpected JSON
_LOGGER.warning("Immich %s search parse error: %s", kind, err)
return []
async def search_by_person(
@@ -56,6 +56,7 @@ class ProviderCommandHandler(ABC):
config: CommandConfig,
*,
listener: CommandTrackerListener | None = None,
allowed_album_ids: set[str] | None = None,
page: int = 1,
) -> CommandResponse | None:
"""Handle a provider-specific command for a single tracker.
@@ -71,6 +72,13 @@ class ProviderCommandHandler(ABC):
bot: The Telegram bot instance.
tracker: The command tracker being dispatched.
config: The command config for this tracker.
listener: The listener row for this (tracker, bot) pair.
allowed_album_ids: Precomputed album scope for this (bot, chat)
pair. Resolved by the dispatcher from the listener override
(if set) or the notification-routing graph. ``None`` means
"no scope restriction" (rarely the right default for album
providers — empty set is the common case).
page: 1-based page number for paginated commands (/search, /find).
Returns:
A CommandResponse, or None if unhandled.
@@ -8,7 +8,14 @@ from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..database.engine import get_engine
from ..database.models import EventLog, NotificationTracker, ServiceProvider
from ..database.models import (
EventLog,
NotificationTarget,
NotificationTracker,
NotificationTrackerTarget,
ServiceProvider,
TargetReceiver,
)
_LOGGER = logging.getLogger(__name__)
@@ -20,25 +27,125 @@ async def get_trackers_for_provider(provider_id: int) -> list[NotificationTracke
return await _get_notification_trackers_for_providers({provider_id})
async def get_last_event_str(tracker_ids: list[int]) -> str:
async def get_last_event_str(
tracker_ids: list[int],
*,
allowed_album_ids: set[str] | None = None,
) -> str:
"""Get formatted timestamp of most recent event for given trackers.
Returns a 'YYYY-MM-DD HH:MM' string, or '-' if no events exist.
When ``allowed_album_ids`` is provided, only events whose
``collection_id`` is in the set are considered — matches the per-chat
scope applied via ``CommandTrackerListener.allowed_album_ids``.
"""
if not tracker_ids:
return "-"
engine = get_engine()
async with AsyncSession(engine) as session:
result = await session.exec(
query = (
select(EventLog)
.where(EventLog.tracker_id.in_(tracker_ids))
.order_by(EventLog.created_at.desc())
.limit(1)
)
if allowed_album_ids is not None:
query = query.where(EventLog.collection_id.in_(list(allowed_album_ids)))
result = await session.exec(query.limit(1))
last_event = result.first()
return last_event.created_at.strftime("%Y-%m-%d %H:%M") if last_event else "-"
async def resolve_chat_album_scope(
*,
provider_id: int,
bot_id: int,
chat_id: str,
) -> set[str]:
"""Compute the album scope for a (provider, bot, chat) triple.
Walks the notification-routing graph: find every notification tracker for
``provider_id`` that ultimately delivers to a Telegram receiver matching
this ``(bot_id, chat_id)``, then union their ``collection_ids``. The
result is the set of albums this specific chat legitimately sees
notifications for — which is the natural "allowed albums" for commands
issued in that chat.
Returns:
set of album ids. Empty set = "no tracker routes to this chat"
caller should treat as "show nothing" (defense in depth); otherwise
a bot's chats would leak the provider's full album catalog.
Notes:
- Only enabled ``TargetReceiver`` rows are considered.
- Both direct Telegram targets and broadcast targets that fan out
to a Telegram child target are resolved.
- Explicit ``CommandTrackerListener.allowed_album_ids`` override is
NOT applied here — that's the dispatcher's job. This helper is
the "derived" fallback.
"""
engine = get_engine()
async with AsyncSession(engine) as session:
# 1. Telegram receivers in this chat (directly or via broadcast).
direct_rows = (await session.exec(
select(TargetReceiver, NotificationTarget)
.join(
NotificationTarget,
TargetReceiver.target_id == NotificationTarget.id,
)
.where(
TargetReceiver.enabled == True, # noqa: E712
NotificationTarget.type == "telegram",
)
)).all()
target_ids: set[int] = set()
for recv, target in direct_rows:
rc_chat = str(recv.config.get("chat_id", "") or "")
rc_bot = target.config.get("bot_id")
if rc_chat == str(chat_id) and rc_bot == bot_id:
target_ids.add(target.id)
# Follow broadcast parents: any broadcast target whose
# child_target_ids includes one of our direct Telegram target_ids
# also counts as "routes to this chat".
broadcast_rows = (await session.exec(
select(NotificationTarget).where(NotificationTarget.type == "broadcast")
)).all()
for b in broadcast_rows:
children = set(b.config.get("child_target_ids", []) or [])
disabled = set(b.config.get("disabled_child_ids", []) or [])
if (children - disabled) & target_ids:
target_ids.add(b.id)
if not target_ids:
return set()
# 2. Trackers pointing at those targets.
tracker_target_rows = (await session.exec(
select(NotificationTrackerTarget).where(
NotificationTrackerTarget.target_id.in_(target_ids)
)
)).all()
tracker_ids = {tt.tracker_id for tt in tracker_target_rows}
if not tracker_ids:
return set()
# 3. Filter trackers by provider and collect collection_ids.
trackers = (await session.exec(
select(NotificationTracker).where(
NotificationTracker.id.in_(tracker_ids),
NotificationTracker.provider_id == provider_id,
)
)).all()
scope: set[str] = set()
for tr in trackers:
for aid in (tr.collection_ids or []):
if aid:
scope.add(aid)
return scope
def get_tracked_collection_ids(
provider: ServiceProvider,
trackers: list[NotificationTracker],
@@ -79,6 +79,7 @@ class GiteaCommandHandler(ProviderCommandHandler):
config: CommandConfig,
*,
listener: Any = None,
allowed_album_ids: set[str] | None = None, # noqa: ARG002 — unused (Gitea has no album model)
page: int = 1,
) -> CommandResponse | None:
fn = _TEXT_COMMANDS.get(cmd)
@@ -286,6 +286,8 @@ async def handle_command(
page = max(1, count_override)
count_override = None
from .command_utils import resolve_chat_album_scope
responses: list[CommandResponse] = []
for tracker, config, provider, listener in ctx_tuples:
if len(responses) >= _MAX_RESPONSES_PER_COMMAND:
@@ -303,10 +305,27 @@ async def handle_command(
count = min(count_override or config.default_count or 5, 20)
response_mode = config.response_mode or "media"
# Resolve the album scope for this (provider, bot, chat) triple.
# - Explicit ``listener.allowed_album_ids`` override wins as-is.
# - Otherwise derive from notification routing: only albums that
# already deliver notifications to this chat are queryable from
# it. Prevents commands leaking the full album catalog into
# chats that were never set up to receive from those trackers.
if listener is not None and listener.allowed_album_ids is not None:
allowed_album_ids: set[str] = set(listener.allowed_album_ids)
else:
allowed_album_ids = await resolve_chat_album_scope(
provider_id=provider.id,
bot_id=bot.id,
chat_id=chat_id,
)
result = await handler.handle(
cmd, args, count, locale, response_mode,
provider, tracker_templates, bot, tracker, config,
listener=listener, page=page,
listener=listener,
allowed_album_ids=allowed_album_ids,
page=page,
)
if result is not None:
responses.append(result)
@@ -6,7 +6,7 @@ import asyncio
import logging
from typing import Any
from ...database.models import CommandTrackerListener, ServiceProvider
from ...database.models import ServiceProvider
from ...services import make_immich_provider
from ...services.http_session import get_http_session
from ..command_utils import get_trackers_for_provider
@@ -20,7 +20,7 @@ async def _cmd_albums(
provider: ServiceProvider,
locale: str,
*,
listener: CommandTrackerListener | None = None,
allowed_album_ids: set[str] | None = None,
) -> dict[str, Any]:
trackers = await get_trackers_for_provider(provider.id)
if not trackers:
@@ -35,12 +35,11 @@ async def _cmd_albums(
seen.add(aid)
album_ids.append(aid)
# Per-chat album scope — match what _cmd_immich does for media commands.
# Without this, /albums leaks the full list of tracked albums into chats
# that were explicitly scoped to a subset.
if listener is not None and listener.allowed_album_ids is not None:
allowed = set(listener.allowed_album_ids)
album_ids = [aid for aid in album_ids if aid in allowed]
# Intersect with the dispatcher-resolved scope (listener override, else
# derived from notification routing for this chat). Without this,
# /albums leaks the full tracked-album list into chats never wired up.
if allowed_album_ids is not None:
album_ids = [aid for aid in album_ids if aid in allowed_album_ids]
if not album_ids:
return {"albums": []}
@@ -27,6 +27,8 @@ _LOGGER = logging.getLogger(__name__)
async def _cmd_events(
provider: ServiceProvider,
count: int, locale: str,
*,
allowed_album_ids: set[str] | None = None,
) -> dict[str, Any]:
trackers = await get_trackers_for_provider(provider.id)
tracker_ids = [t.id for t in trackers]
@@ -35,12 +37,14 @@ async def _cmd_events(
engine = get_engine()
async with AsyncSession(engine) as session:
result = await session.exec(
query = (
select(EventLog)
.where(EventLog.tracker_id.in_(tracker_ids))
.order_by(EventLog.created_at.desc())
.limit(count)
)
if allowed_album_ids is not None:
query = query.where(EventLog.collection_id.in_(list(allowed_album_ids)))
result = await session.exec(query.limit(count))
events = result.all()
events_data = [
@@ -25,14 +25,26 @@ _LOGGER = logging.getLogger(__name__)
async def _cmd_status(
provider: ServiceProvider, locale: str,
*,
allowed_album_ids: set[str] | None = None,
) -> dict[str, Any]:
trackers = await get_trackers_for_provider(provider.id)
active = sum(1 for t in trackers if t.enabled)
total = len(trackers)
total_albums = sum(len(t.collection_ids or []) for t in trackers)
# Count only albums visible to this chat. Without the scope filter,
# /status in a restricted chat leaks the full album count across the
# provider. ``None`` = no filter; empty set = show nothing.
total_albums = 0
for t in trackers:
for aid in (t.collection_ids or []):
if allowed_album_ids is None or aid in allowed_album_ids:
total_albums += 1
tracker_ids = [t.id for t in trackers]
last_str = await get_last_event_str(tracker_ids)
last_str = await get_last_event_str(
tracker_ids, allowed_album_ids=allowed_album_ids,
)
return {
"trackers_active": active, "trackers_total": total,
@@ -80,16 +92,17 @@ class ImmichCommandHandler(ProviderCommandHandler):
config: CommandConfig,
*,
listener: CommandTrackerListener | None = None,
allowed_album_ids: set[str] | None = None,
page: int = 1,
) -> CommandResponse | None:
if cmd == "status":
ctx = await _cmd_status(provider, locale)
ctx = await _cmd_status(provider, locale, allowed_album_ids=allowed_album_ids)
return CommandResponse(text=_render_cmd_template(cmd_templates, "status", locale, ctx))
if cmd == "albums":
ctx = await _cmd_albums(provider, locale, listener=listener)
ctx = await _cmd_albums(provider, locale, allowed_album_ids=allowed_album_ids)
return CommandResponse(text=_render_cmd_template(cmd_templates, "albums", locale, ctx))
if cmd == "events":
ctx = await _cmd_events(provider, count, locale)
ctx = await _cmd_events(provider, count, locale, allowed_album_ids=allowed_album_ids)
return CommandResponse(text=_render_cmd_template(cmd_templates, "events", locale, ctx))
if cmd == "people":
ctx = await _cmd_people(provider, locale)
@@ -99,7 +112,7 @@ class ImmichCommandHandler(ProviderCommandHandler):
return await _cmd_immich(
cmd, args, count, locale, response_mode,
provider, cmd_templates,
listener=listener, page=page,
allowed_album_ids=allowed_album_ids, page=page,
)
return None
@@ -109,7 +122,7 @@ async def _cmd_immich(
response_mode: str, provider: ServiceProvider,
cmd_templates: dict[str, dict[str, str]],
*,
listener: CommandTrackerListener | None = None,
allowed_album_ids: set[str] | None = None,
page: int = 1,
) -> CommandResponse | None:
"""Handle commands that need Immich API access and may return media."""
@@ -123,10 +136,12 @@ async def _cmd_immich(
seen.add(aid)
all_album_ids.append(aid)
# Per-chat album scope: intersect with listener.allowed_album_ids when set.
if listener is not None and listener.allowed_album_ids is not None:
allowed = set(listener.allowed_album_ids)
all_album_ids = [aid for aid in all_album_ids if aid in allowed]
# Intersect with the scope resolved by the dispatcher (from the listener
# override if set, otherwise from the notification-routing graph for this
# chat). ``None`` = no filter (rare); empty set = show nothing (common
# when the chat has no tracker routing).
if allowed_album_ids is not None:
all_album_ids = [aid for aid in all_album_ids if aid in allowed_album_ids]
ext_domain = (provider.config.get("external_domain") or provider.config.get("url", "")).rstrip("/")
@@ -31,7 +31,7 @@ async def cmd_search(
if not args:
return _render_cmd_template(cmd_templates, "no_results", locale, {"command": "search", "query": ""})
assets = await client.search_smart(args, album_ids=all_album_ids, limit=count, page=page)
_enrich_assets(assets, asset_public_urls or {})
assets = _enrich_assets(assets, asset_public_urls or {})
return _format_assets(assets, "search", args, locale, response_mode, client, cmd_templates)
@@ -46,7 +46,7 @@ async def cmd_find(
if not args:
return _render_cmd_template(cmd_templates, "no_results", locale, {"command": "find", "query": ""})
assets = await client.search_metadata(args, album_ids=all_album_ids, limit=count, page=page)
_enrich_assets(assets, asset_public_urls or {})
assets = _enrich_assets(assets, asset_public_urls or {})
return _format_assets(assets, "find", args, locale, response_mode, client, cmd_templates)
@@ -68,7 +68,7 @@ async def cmd_person(
if not person_id:
return _render_cmd_template(cmd_templates, "no_results", locale, {"command": "person", "query": args})
assets = await client.search_by_person(person_id, limit=count)
_enrich_assets(assets, asset_public_urls or {})
assets = _enrich_assets(assets, asset_public_urls or {})
return _format_assets(assets, "person", args, locale, response_mode, client, cmd_templates)
@@ -84,5 +84,5 @@ async def cmd_place(
assets = await client.search_smart(
f"photos taken in {args}", album_ids=all_album_ids, limit=count
)
_enrich_assets(assets, asset_public_urls or {})
assets = _enrich_assets(assets, asset_public_urls or {})
return _format_assets(assets, "place", args, locale, response_mode, client, cmd_templates)
@@ -54,6 +54,7 @@ class NutCommandHandler(ProviderCommandHandler):
config: CommandConfig,
*,
listener: Any = None,
allowed_album_ids: set[str] | None = None, # noqa: ARG002 — unused (NUT has no album model)
page: int = 1,
) -> CommandResponse | None:
fn = _TEXT_COMMANDS.get(cmd)
@@ -71,6 +71,7 @@ class PlankaCommandHandler(ProviderCommandHandler):
config: CommandConfig,
*,
listener: Any = None,
allowed_album_ids: set[str] | None = None, # noqa: ARG002 — unused (Planka has no album model)
page: int = 1,
) -> CommandResponse | None:
fn = _TEXT_COMMANDS.get(cmd)