"""Storage helpers for Immich Album Watcher.""" from __future__ import annotations import logging from datetime import datetime, timezone from typing import Any from homeassistant.core import HomeAssistant from homeassistant.helpers.storage import Store _LOGGER = logging.getLogger(__name__) STORAGE_VERSION = 1 STORAGE_KEY_PREFIX = "immich_album_watcher" # Default TTL for Telegram file_id cache (48 hours in seconds) DEFAULT_TELEGRAM_CACHE_TTL = 48 * 60 * 60 class ImmichAlbumStorage: """Handles persistence of album state across restarts.""" def __init__(self, hass: HomeAssistant, entry_id: str) -> None: """Initialize the storage.""" self._store: Store[dict[str, Any]] = Store( hass, STORAGE_VERSION, f"{STORAGE_KEY_PREFIX}.{entry_id}" ) self._data: dict[str, Any] | None = None async def async_load(self) -> dict[str, Any]: """Load data from storage.""" self._data = await self._store.async_load() or {"albums": {}} _LOGGER.debug("Loaded storage data with %d albums", len(self._data.get("albums", {}))) return self._data async def async_save_album_state(self, album_id: str, asset_ids: set[str]) -> None: """Save album asset IDs to storage.""" if self._data is None: self._data = {"albums": {}} self._data["albums"][album_id] = { "asset_ids": list(asset_ids), "last_updated": datetime.now(timezone.utc).isoformat(), } await self._store.async_save(self._data) def get_album_asset_ids(self, album_id: str) -> set[str] | None: """Get persisted asset IDs for an album. Returns None if no persisted state exists for the album. """ if self._data and "albums" in self._data: album_data = self._data["albums"].get(album_id) if album_data: return set(album_data.get("asset_ids", [])) return None async def async_remove_album(self, album_id: str) -> None: """Remove an album from storage.""" if self._data and "albums" in self._data: self._data["albums"].pop(album_id, None) await self._store.async_save(self._data) async def async_remove(self) -> None: """Remove all storage data.""" await self._store.async_remove() self._data = None class TelegramFileCache: """Cache for Telegram file_ids to avoid re-uploading media. When a file is uploaded to Telegram, it returns a file_id that can be reused to send the same file without re-uploading. This cache stores these file_ids keyed by the source URL or asset ID. Supports two validation modes: - TTL mode (default): entries expire after a configured time-to-live - Thumbhash mode: entries are validated by comparing stored thumbhash with the current asset thumbhash from Immich """ def __init__( self, hass: HomeAssistant, entry_id: str, ttl_seconds: int = DEFAULT_TELEGRAM_CACHE_TTL, use_thumbhash: bool = False, ) -> None: """Initialize the Telegram file cache. Args: hass: Home Assistant instance entry_id: Config entry ID for scoping the cache (per hub) ttl_seconds: Time-to-live for cache entries in seconds (TTL mode only) use_thumbhash: Use thumbhash-based validation instead of TTL """ self._store: Store[dict[str, Any]] = Store( hass, STORAGE_VERSION, f"{STORAGE_KEY_PREFIX}.telegram_cache.{entry_id}" ) self._data: dict[str, Any] | None = None self._ttl_seconds = ttl_seconds self._use_thumbhash = use_thumbhash async def async_load(self) -> None: """Load cache data from storage.""" self._data = await self._store.async_load() or {"files": {}} # Clean up expired entries on load (TTL mode only) await self._cleanup_expired() mode = "thumbhash" if self._use_thumbhash else "TTL" _LOGGER.debug( "Loaded Telegram file cache with %d entries (mode: %s)", len(self._data.get("files", {})), mode, ) # Maximum number of entries to keep in thumbhash mode to prevent unbounded growth THUMBHASH_MAX_ENTRIES = 2000 async def _cleanup_expired(self) -> None: """Remove expired cache entries (TTL mode) or trim old entries (thumbhash mode).""" if self._use_thumbhash: files = self._data.get("files", {}) if self._data else {} if len(files) > self.THUMBHASH_MAX_ENTRIES: sorted_keys = sorted( files, key=lambda k: files[k].get("cached_at", "") ) keys_to_remove = sorted_keys[: len(files) - self.THUMBHASH_MAX_ENTRIES] for key in keys_to_remove: del files[key] await self._store.async_save(self._data) _LOGGER.debug( "Trimmed thumbhash cache from %d to %d entries", len(keys_to_remove) + self.THUMBHASH_MAX_ENTRIES, self.THUMBHASH_MAX_ENTRIES, ) return if not self._data or "files" not in self._data: return now = datetime.now(timezone.utc) expired_keys = [] for url, entry in self._data["files"].items(): cached_at_str = entry.get("cached_at") if cached_at_str: cached_at = datetime.fromisoformat(cached_at_str) age_seconds = (now - cached_at).total_seconds() if age_seconds > self._ttl_seconds: expired_keys.append(url) if expired_keys: for key in expired_keys: del self._data["files"][key] await self._store.async_save(self._data) _LOGGER.debug("Cleaned up %d expired Telegram cache entries", len(expired_keys)) def get(self, key: str, thumbhash: str | None = None) -> dict[str, Any] | None: """Get cached file_id for a key. Args: key: The cache key (URL or asset ID) thumbhash: Current thumbhash for validation (thumbhash mode only). If provided, compares with stored thumbhash. Mismatch = cache miss. Returns: Dict with 'file_id' and 'type' if cached and valid, None otherwise """ if not self._data or "files" not in self._data: return None entry = self._data["files"].get(key) if not entry: return None if self._use_thumbhash: # Thumbhash-based validation if thumbhash is not None: stored_thumbhash = entry.get("thumbhash") if stored_thumbhash and stored_thumbhash != thumbhash: _LOGGER.debug( "Cache miss for %s: thumbhash changed, removing stale entry", key[:36], ) del self._data["files"][key] return None # If no thumbhash provided (asset not in monitored album), # return cached entry anyway — self-heals on Telegram rejection else: # TTL-based validation cached_at_str = entry.get("cached_at") if cached_at_str: cached_at = datetime.fromisoformat(cached_at_str) age_seconds = (datetime.now(timezone.utc) - cached_at).total_seconds() if age_seconds > self._ttl_seconds: return None return { "file_id": entry.get("file_id"), "type": entry.get("type"), } async def async_set( self, key: str, file_id: str, media_type: str, thumbhash: str | None = None ) -> None: """Store a file_id for a key. Args: key: The cache key (URL or asset ID) file_id: The Telegram file_id media_type: The type of media ('photo', 'video', 'document') thumbhash: Current thumbhash to store alongside file_id (thumbhash mode only) """ if self._data is None: self._data = {"files": {}} entry_data: dict[str, Any] = { "file_id": file_id, "type": media_type, "cached_at": datetime.now(timezone.utc).isoformat(), } if thumbhash is not None: entry_data["thumbhash"] = thumbhash self._data["files"][key] = entry_data await self._store.async_save(self._data) _LOGGER.debug("Cached Telegram file_id for key (type: %s)", media_type) async def async_set_many( self, entries: list[tuple[str, str, str, str | None]] ) -> None: """Store multiple file_ids in a single disk write. Args: entries: List of (key, file_id, media_type, thumbhash) tuples """ if not entries: return if self._data is None: self._data = {"files": {}} now_iso = datetime.now(timezone.utc).isoformat() for key, file_id, media_type, thumbhash in entries: entry_data: dict[str, Any] = { "file_id": file_id, "type": media_type, "cached_at": now_iso, } if thumbhash is not None: entry_data["thumbhash"] = thumbhash self._data["files"][key] = entry_data await self._store.async_save(self._data) _LOGGER.debug("Batch cached %d Telegram file_ids", len(entries)) async def async_remove(self) -> None: """Remove all cache data.""" await self._store.async_remove() self._data = None class NotificationQueue: """Persistent queue for notifications deferred during quiet hours. Stores full service call parameters so notifications can be replayed exactly as they were originally called. """ def __init__(self, hass: HomeAssistant, entry_id: str) -> None: """Initialize the notification queue.""" self._store: Store[dict[str, Any]] = Store( hass, STORAGE_VERSION, f"{STORAGE_KEY_PREFIX}.notification_queue.{entry_id}" ) self._data: dict[str, Any] | None = None async def async_load(self) -> None: """Load queue data from storage.""" self._data = await self._store.async_load() or {"queue": []} _LOGGER.debug( "Loaded notification queue with %d items", len(self._data.get("queue", [])), ) async def async_enqueue(self, notification_params: dict[str, Any]) -> None: """Add a notification to the queue.""" if self._data is None: self._data = {"queue": []} self._data["queue"].append({ "params": notification_params, "queued_at": datetime.now(timezone.utc).isoformat(), }) await self._store.async_save(self._data) _LOGGER.debug("Queued notification during quiet hours (total: %d)", len(self._data["queue"])) def get_all(self) -> list[dict[str, Any]]: """Get all queued notifications.""" if not self._data: return [] return list(self._data.get("queue", [])) def has_pending(self) -> bool: """Check if there are pending notifications.""" return bool(self._data and self._data.get("queue")) async def async_clear(self) -> None: """Clear all queued notifications.""" if self._data: self._data["queue"] = [] await self._store.async_save(self._data) async def async_remove(self) -> None: """Remove all queue data.""" await self._store.async_remove() self._data = None