"""Album-related Immich bot commands: albums, favorites, summary.""" from __future__ import annotations import asyncio import logging from typing import Any from ...database.models import ServiceProvider from ...services import make_immich_provider from ...services.http_session import get_http_session from ..command_utils import get_trackers_for_provider from ..handler import _render_cmd_template from .common import _format_assets, build_asset_dict, fetch_albums_with_links _LOGGER = logging.getLogger(__name__) async def _cmd_albums( provider: ServiceProvider, locale: str, *, allowed_album_ids: set[str] | None = None, ) -> dict[str, Any]: trackers = await get_trackers_for_provider(provider.id) if not trackers: return {"albums": []} # Deduplicate album IDs while preserving order seen: set[str] = set() album_ids: list[str] = [] for tracker in trackers: for aid in tracker.collection_ids or []: if aid not in seen: seen.add(aid) album_ids.append(aid) # Intersect with the dispatcher-resolved scope (listener override, else # derived from notification routing for this chat). Without this, # /albums leaks the full tracked-album list into chats never wired up. if allowed_album_ids is not None: album_ids = [aid for aid in album_ids if aid in allowed_album_ids] if not album_ids: return {"albums": []} ext_domain = (provider.config.get("external_domain") or provider.config.get("url", "")).rstrip("/") http = await get_http_session() immich = make_immich_provider(http, provider) albums_data = await fetch_albums_with_links(immich.client, album_ids, ext_domain) return {"albums": albums_data} async def cmd_favorites( providers_map: dict[int, ServiceProvider], all_album_ids: list[str], count: int, locale: str, response_mode: str, client: Any, cmd_templates: dict[str, dict[str, str]], ) -> str | dict[str, Any]: """Handle /favorites command with concurrent album fetching.""" album_ids = all_album_ids[:10] if not album_ids: return _format_assets([], "favorites", "", locale, response_mode, client, cmd_templates) results = await asyncio.gather( *[client.get_album(aid) for aid in album_ids], return_exceptions=True, ) fav_assets: list[dict[str, Any]] = [] for album_id, result in zip(album_ids, results): if isinstance(result, Exception): _LOGGER.warning("Failed to fetch album %s: %s", album_id, result) continue if result: for aid, asset in list(result.assets.items())[:50]: if asset.is_favorite and len(fav_assets) < count: fav_assets.append(build_asset_dict(asset)) if len(fav_assets) >= count: break return _format_assets(fav_assets, "favorites", "", locale, response_mode, client, cmd_templates) async def cmd_summary( client: Any, all_album_ids: list[str], locale: str, cmd_templates: dict[str, dict[str, str]], external_domain: str = "", ) -> str: """Handle /summary command with concurrent album fetching.""" if not all_album_ids: return _render_cmd_template(cmd_templates, "summary", locale, {"albums": []}) ext = external_domain.rstrip("/") albums_data = await fetch_albums_with_links(client, all_album_ids, ext, include_failed=False) return _render_cmd_template(cmd_templates, "summary", locale, {"albums": albums_data})