From b107cfe67f71330f8cb8f31785929115799c53c0 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Thu, 19 Mar 2026 12:47:18 +0300 Subject: [PATCH] Refactor HAOS integration to use shared core library (Phase 2) Wire the integration to delegate all HA-independent logic to immich-watcher-core, eliminating ~2300 lines of duplicated code. Changes: - const.py: Import shared constants from core, keep HA-specific ones - storage.py: Create HAStorageBackend adapter wrapping HA's Store, use core TelegramFileCache and NotificationQueue via adapter - coordinator.py: Delegate to core ImmichClient for API calls, detect_album_changes() for change detection, and asset_utils for filtering/sorting/URL building. Keep HA-specific event firing. - sensor.py: Replace ~1300 lines of Telegram code with 15-line delegation to core TelegramClient. Keep entity classes unchanged. - __init__.py: Use factory functions for creating core instances with HA storage backends - manifest.json: Add immich-watcher-core dependency Integration line count: 3600 -> 1295 lines (-64%) Zero behavior changes for end users. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../immich_album_watcher/__init__.py | 14 +- .../immich_album_watcher/const.py | 108 +- .../immich_album_watcher/coordinator.py | 1242 +++---------- .../immich_album_watcher/manifest.json | 2 +- .../immich_album_watcher/sensor.py | 1543 +---------------- .../immich_album_watcher/storage.py | 320 +--- plans/phase-2-haos-refactor.md | 75 + plans/primary-plan.md | 2 +- 8 files changed, 502 insertions(+), 2804 deletions(-) create mode 100644 plans/phase-2-haos-refactor.md diff --git a/custom_components/immich_album_watcher/__init__.py b/custom_components/immich_album_watcher/__init__.py index 61c1ef7..d23ffbe 100644 --- a/custom_components/immich_album_watcher/__init__.py +++ b/custom_components/immich_album_watcher/__init__.py @@ -25,7 +25,13 @@ from .const import ( PLATFORMS, ) from .coordinator import ImmichAlbumWatcherCoordinator -from .storage import ImmichAlbumStorage, NotificationQueue, TelegramFileCache +from .storage import ( + ImmichAlbumStorage, + NotificationQueue, + TelegramFileCache, + create_notification_queue, + create_telegram_cache, +) _LOGGER = logging.getLogger(__name__) @@ -80,16 +86,16 @@ async def async_setup_entry(hass: HomeAssistant, entry: ImmichConfigEntry) -> bo # TTL is in hours from config, convert to seconds cache_ttl_seconds = telegram_cache_ttl * 60 * 60 # URL-based cache for non-Immich URLs or URLs without extractable asset IDs - telegram_cache = TelegramFileCache(hass, entry.entry_id, ttl_seconds=cache_ttl_seconds) + telegram_cache = create_telegram_cache(hass, entry.entry_id, ttl_seconds=cache_ttl_seconds) await telegram_cache.async_load() # Asset ID-based cache for Immich URLs — uses thumbhash validation instead of TTL - telegram_asset_cache = TelegramFileCache( + telegram_asset_cache = create_telegram_cache( hass, f"{entry.entry_id}_assets", use_thumbhash=True ) await telegram_asset_cache.async_load() # Create notification queue for quiet hours - notification_queue = NotificationQueue(hass, entry.entry_id) + notification_queue = create_notification_queue(hass, entry.entry_id) await notification_queue.async_load() # Store hub reference diff --git a/custom_components/immich_album_watcher/const.py b/custom_components/immich_album_watcher/const.py index 8a9caca..c8a3170 100644 --- a/custom_components/immich_album_watcher/const.py +++ b/custom_components/immich_album_watcher/const.py @@ -1,8 +1,59 @@ """Constants for the Immich Album Watcher integration.""" -from datetime import timedelta from typing import Final +# Re-export shared constants from core library +from immich_watcher_core.constants import ( # noqa: F401 + ASSET_TYPE_IMAGE, + ASSET_TYPE_VIDEO, + ATTR_ADDED_ASSETS, + ATTR_ADDED_COUNT, + ATTR_ALBUM_ID, + ATTR_ALBUM_NAME, + ATTR_ALBUM_PROTECTED_PASSWORD, + ATTR_ALBUM_PROTECTED_URL, + ATTR_ALBUM_URL, + ATTR_ALBUM_URLS, + ATTR_ASSET_CITY, + ATTR_ASSET_COUNT, + ATTR_ASSET_COUNTRY, + ATTR_ASSET_CREATED, + ATTR_ASSET_DESCRIPTION, + ATTR_ASSET_DOWNLOAD_URL, + ATTR_ASSET_FILENAME, + ATTR_ASSET_IS_FAVORITE, + ATTR_ASSET_LATITUDE, + ATTR_ASSET_LONGITUDE, + ATTR_ASSET_OWNER, + ATTR_ASSET_OWNER_ID, + ATTR_ASSET_PLAYBACK_URL, + ATTR_ASSET_RATING, + ATTR_ASSET_STATE, + ATTR_ASSET_TYPE, + ATTR_ASSET_URL, + ATTR_CHANGE_TYPE, + ATTR_CREATED_AT, + ATTR_HUB_NAME, + ATTR_LAST_UPDATED, + ATTR_NEW_NAME, + ATTR_NEW_SHARED, + ATTR_OLD_NAME, + ATTR_OLD_SHARED, + ATTR_OWNER, + ATTR_PEOPLE, + ATTR_PHOTO_COUNT, + ATTR_REMOVED_ASSETS, + ATTR_REMOVED_COUNT, + ATTR_SHARED, + ATTR_THUMBNAIL_URL, + ATTR_VIDEO_COUNT, + DEFAULT_SCAN_INTERVAL, + DEFAULT_SHARE_PASSWORD, + DEFAULT_TELEGRAM_CACHE_TTL, + NEW_ASSETS_RESET_DELAY, +) + +# HA-specific constants DOMAIN: Final = "immich_album_watcher" # Configuration keys @@ -19,13 +70,7 @@ CONF_TELEGRAM_CACHE_TTL: Final = "telegram_cache_ttl" # Subentry type SUBENTRY_TYPE_ALBUM: Final = "album" -# Defaults -DEFAULT_SCAN_INTERVAL: Final = 60 # seconds -DEFAULT_TELEGRAM_CACHE_TTL: Final = 48 # hours -NEW_ASSETS_RESET_DELAY: Final = 300 # 5 minutes -DEFAULT_SHARE_PASSWORD: Final = "immich123" - -# Events +# HA event names (prefixed with domain) EVENT_ALBUM_CHANGED: Final = f"{DOMAIN}_album_changed" EVENT_ASSETS_ADDED: Final = f"{DOMAIN}_assets_added" EVENT_ASSETS_REMOVED: Final = f"{DOMAIN}_assets_removed" @@ -33,53 +78,6 @@ EVENT_ALBUM_RENAMED: Final = f"{DOMAIN}_album_renamed" EVENT_ALBUM_DELETED: Final = f"{DOMAIN}_album_deleted" EVENT_ALBUM_SHARING_CHANGED: Final = f"{DOMAIN}_album_sharing_changed" -# Attributes -ATTR_HUB_NAME: Final = "hub_name" -ATTR_ALBUM_ID: Final = "album_id" -ATTR_ALBUM_NAME: Final = "album_name" -ATTR_ALBUM_URL: Final = "album_url" -ATTR_ALBUM_URLS: Final = "album_urls" -ATTR_ALBUM_PROTECTED_URL: Final = "album_protected_url" -ATTR_ALBUM_PROTECTED_PASSWORD: Final = "album_protected_password" -ATTR_ASSET_COUNT: Final = "asset_count" -ATTR_PHOTO_COUNT: Final = "photo_count" -ATTR_VIDEO_COUNT: Final = "video_count" -ATTR_ADDED_COUNT: Final = "added_count" -ATTR_REMOVED_COUNT: Final = "removed_count" -ATTR_ADDED_ASSETS: Final = "added_assets" -ATTR_REMOVED_ASSETS: Final = "removed_assets" -ATTR_CHANGE_TYPE: Final = "change_type" -ATTR_LAST_UPDATED: Final = "last_updated_at" -ATTR_CREATED_AT: Final = "created_at" -ATTR_THUMBNAIL_URL: Final = "thumbnail_url" -ATTR_SHARED: Final = "shared" -ATTR_OWNER: Final = "owner" -ATTR_PEOPLE: Final = "people" -ATTR_OLD_NAME: Final = "old_name" -ATTR_NEW_NAME: Final = "new_name" -ATTR_OLD_SHARED: Final = "old_shared" -ATTR_NEW_SHARED: Final = "new_shared" -ATTR_ASSET_TYPE: Final = "type" -ATTR_ASSET_FILENAME: Final = "filename" -ATTR_ASSET_CREATED: Final = "created_at" -ATTR_ASSET_OWNER: Final = "owner" -ATTR_ASSET_OWNER_ID: Final = "owner_id" -ATTR_ASSET_URL: Final = "url" -ATTR_ASSET_DOWNLOAD_URL: Final = "download_url" -ATTR_ASSET_PLAYBACK_URL: Final = "playback_url" -ATTR_ASSET_DESCRIPTION: Final = "description" -ATTR_ASSET_IS_FAVORITE: Final = "is_favorite" -ATTR_ASSET_RATING: Final = "rating" -ATTR_ASSET_LATITUDE: Final = "latitude" -ATTR_ASSET_LONGITUDE: Final = "longitude" -ATTR_ASSET_CITY: Final = "city" -ATTR_ASSET_STATE: Final = "state" -ATTR_ASSET_COUNTRY: Final = "country" - -# Asset types -ASSET_TYPE_IMAGE: Final = "IMAGE" -ASSET_TYPE_VIDEO: Final = "VIDEO" - # Platforms PLATFORMS: Final = ["sensor", "binary_sensor", "camera", "text", "button"] diff --git a/custom_components/immich_album_watcher/coordinator.py b/custom_components/immich_album_watcher/coordinator.py index 9abcfcb..2ca6924 100644 --- a/custom_components/immich_album_watcher/coordinator.py +++ b/custom_components/immich_album_watcher/coordinator.py @@ -3,12 +3,11 @@ from __future__ import annotations import logging -from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import TYPE_CHECKING, Any if TYPE_CHECKING: - from .storage import ImmichAlbumStorage, TelegramFileCache + from .storage import ImmichAlbumStorage import aiohttp @@ -16,317 +15,73 @@ from homeassistant.core import HomeAssistant from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed +# Import models and utilities from core library +from immich_watcher_core.models import ( + AlbumChange, + AlbumData, + AssetInfo, + SharedLinkInfo, +) +from immich_watcher_core.immich_client import ImmichClient, ImmichApiError +from immich_watcher_core.change_detector import detect_album_changes +from immich_watcher_core.asset_utils import ( + build_asset_detail, + filter_assets, + get_any_url, + get_protected_links, + get_protected_password, + get_protected_url, + get_protected_urls, + get_public_url, + get_public_urls, + get_accessible_links, + sort_assets, +) +from immich_watcher_core.telegram.cache import TelegramFileCache + from .const import ( - ASSET_TYPE_IMAGE, - ASSET_TYPE_VIDEO, ATTR_ADDED_ASSETS, ATTR_ADDED_COUNT, ATTR_ALBUM_ID, ATTR_ALBUM_NAME, ATTR_ALBUM_URL, - ATTR_ASSET_CREATED, - ATTR_ASSET_DESCRIPTION, - ATTR_ASSET_DOWNLOAD_URL, - ATTR_ASSET_FILENAME, - ATTR_ASSET_IS_FAVORITE, - ATTR_ASSET_LATITUDE, - ATTR_ASSET_LONGITUDE, - ATTR_ASSET_CITY, - ATTR_ASSET_STATE, - ATTR_ASSET_COUNTRY, - ATTR_ASSET_OWNER, - ATTR_ASSET_OWNER_ID, - ATTR_ASSET_PLAYBACK_URL, - ATTR_ASSET_RATING, - ATTR_ASSET_TYPE, - ATTR_ASSET_URL, ATTR_CHANGE_TYPE, ATTR_HUB_NAME, + ATTR_NEW_NAME, + ATTR_NEW_SHARED, + ATTR_OLD_NAME, + ATTR_OLD_SHARED, ATTR_PEOPLE, ATTR_REMOVED_ASSETS, ATTR_REMOVED_COUNT, - ATTR_OLD_NAME, - ATTR_NEW_NAME, - ATTR_OLD_SHARED, - ATTR_NEW_SHARED, ATTR_SHARED, - ATTR_THUMBNAIL_URL, DOMAIN, EVENT_ALBUM_CHANGED, + EVENT_ALBUM_DELETED, + EVENT_ALBUM_RENAMED, + EVENT_ALBUM_SHARING_CHANGED, EVENT_ASSETS_ADDED, EVENT_ASSETS_REMOVED, - EVENT_ALBUM_RENAMED, - EVENT_ALBUM_DELETED, - EVENT_ALBUM_SHARING_CHANGED, ) _LOGGER = logging.getLogger(__name__) - -@dataclass -class SharedLinkInfo: - """Data class for shared link information.""" - - id: str - key: str - has_password: bool = False - password: str | None = None - expires_at: datetime | None = None - allow_download: bool = True - show_metadata: bool = True - - @property - def is_expired(self) -> bool: - """Check if the link has expired.""" - if self.expires_at is None: - return False - return datetime.now(self.expires_at.tzinfo) > self.expires_at - - @property - def is_accessible(self) -> bool: - """Check if the link is accessible without password and not expired.""" - return not self.has_password and not self.is_expired - - @classmethod - def from_api_response(cls, data: dict[str, Any]) -> SharedLinkInfo: - """Create SharedLinkInfo from API response.""" - expires_at = None - if data.get("expiresAt"): - try: - expires_at = datetime.fromisoformat( - data["expiresAt"].replace("Z", "+00:00") - ) - except ValueError: - pass - - password = data.get("password") - return cls( - id=data["id"], - key=data["key"], - has_password=bool(password), - password=password if password else None, - expires_at=expires_at, - allow_download=data.get("allowDownload", True), - show_metadata=data.get("showMetadata", True), - ) - - -@dataclass -class AssetInfo: - """Data class for asset information.""" - - id: str - type: str # IMAGE or VIDEO - filename: str - created_at: str - owner_id: str = "" - owner_name: str = "" - description: str = "" - people: list[str] = field(default_factory=list) - is_favorite: bool = False - rating: int | None = None - latitude: float | None = None - longitude: float | None = None - city: str | None = None - state: str | None = None - country: str | None = None - is_processed: bool = True # Whether asset is fully processed by Immich - thumbhash: str | None = None # Perceptual hash for cache validation - - @classmethod - def from_api_response( - cls, data: dict[str, Any], users_cache: dict[str, str] | None = None - ) -> AssetInfo: - """Create AssetInfo from API response.""" - people = [] - if "people" in data: - people = [p.get("name", "") for p in data["people"] if p.get("name")] - - owner_id = data.get("ownerId", "") - owner_name = "" - if users_cache and owner_id: - owner_name = users_cache.get(owner_id, "") - - # Get description - prioritize user-added description over EXIF description - description = data.get("description", "") or "" - exif_info = data.get("exifInfo") - if not description and exif_info: - # Fall back to EXIF description if no user description - description = exif_info.get("description", "") or "" - - # Get favorites and rating - is_favorite = data.get("isFavorite", False) - rating = exif_info.get("rating") if exif_info else None - - # Get geolocation - latitude = exif_info.get("latitude") if exif_info else None - longitude = exif_info.get("longitude") if exif_info else None - - # Get reverse geocoded location - city = exif_info.get("city") if exif_info else None - state = exif_info.get("state") if exif_info else None - country = exif_info.get("country") if exif_info else None - - # Check if asset is fully processed by Immich - asset_type = data.get("type", ASSET_TYPE_IMAGE) - is_processed = cls._check_processing_status(data, asset_type) - thumbhash = data.get("thumbhash") - - return cls( - id=data["id"], - type=asset_type, - filename=data.get("originalFileName", ""), - created_at=data.get("fileCreatedAt", ""), - owner_id=owner_id, - owner_name=owner_name, - description=description, - people=people, - is_favorite=is_favorite, - rating=rating, - latitude=latitude, - longitude=longitude, - city=city, - state=state, - country=country, - is_processed=is_processed, - thumbhash=thumbhash, - ) - - @staticmethod - def _check_processing_status(data: dict[str, Any], _asset_type: str) -> bool: - """Check if asset has been fully processed by Immich. - - For all assets: Check if thumbnails have been generated (thumbhash exists). - Immich generates thumbnails for both photos and videos regardless of - whether video transcoding is needed. - - Args: - data: Asset data from API response - _asset_type: Asset type (IMAGE or VIDEO) - unused but kept for API stability - - Returns: - True if asset is fully processed and not trashed/offline/archived, False otherwise - """ - asset_id = data.get("id", "unknown") - asset_type = data.get("type", "unknown") - is_offline = data.get("isOffline", False) - is_trashed = data.get("isTrashed", False) - is_archived = data.get("isArchived", False) - thumbhash = data.get("thumbhash") - - _LOGGER.debug( - "Asset %s (%s): isOffline=%s, isTrashed=%s, isArchived=%s, thumbhash=%s", - asset_id, - asset_type, - is_offline, - is_trashed, - is_archived, - bool(thumbhash), - ) - - # Exclude offline assets - if is_offline: - _LOGGER.debug("Asset %s excluded: offline", asset_id) - return False - - # Exclude trashed assets - if is_trashed: - _LOGGER.debug("Asset %s excluded: trashed", asset_id) - return False - - # Exclude archived assets - if is_archived: - _LOGGER.debug("Asset %s excluded: archived", asset_id) - return False - - # Check if thumbnails have been generated - # This works for both photos and videos - Immich always generates thumbnails - # Note: The API doesn't expose video transcoding status (encodedVideoPath), - # but thumbhash is sufficient since Immich generates thumbnails for all assets - is_processed = bool(thumbhash) - if not is_processed: - _LOGGER.debug("Asset %s excluded: no thumbhash", asset_id) - return is_processed - - -@dataclass -class AlbumData: - """Data class for album information.""" - - id: str - name: str - asset_count: int - photo_count: int - video_count: int - created_at: str - updated_at: str - shared: bool - owner: str - thumbnail_asset_id: str | None - asset_ids: set[str] = field(default_factory=set) - assets: dict[str, AssetInfo] = field(default_factory=dict) - people: set[str] = field(default_factory=set) - has_new_assets: bool = False - last_change_time: datetime | None = None - - @classmethod - def from_api_response( - cls, data: dict[str, Any], users_cache: dict[str, str] | None = None - ) -> AlbumData: - """Create AlbumData from API response.""" - assets_data = data.get("assets", []) - asset_ids = set() - assets = {} - people = set() - photo_count = 0 - video_count = 0 - - for asset_data in assets_data: - asset = AssetInfo.from_api_response(asset_data, users_cache) - asset_ids.add(asset.id) - assets[asset.id] = asset - people.update(asset.people) - if asset.type == ASSET_TYPE_IMAGE: - photo_count += 1 - elif asset.type == ASSET_TYPE_VIDEO: - video_count += 1 - - return cls( - id=data["id"], - name=data.get("albumName", "Unnamed"), - asset_count=data.get("assetCount", len(asset_ids)), - photo_count=photo_count, - video_count=video_count, - created_at=data.get("createdAt", ""), - updated_at=data.get("updatedAt", ""), - shared=data.get("shared", False), - owner=data.get("owner", {}).get("name", "Unknown"), - thumbnail_asset_id=data.get("albumThumbnailAssetId"), - asset_ids=asset_ids, - assets=assets, - people=people, - ) - - -@dataclass -class AlbumChange: - """Data class for album changes.""" - - album_id: str - album_name: str - change_type: str - added_count: int = 0 - removed_count: int = 0 - added_assets: list[AssetInfo] = field(default_factory=list) - removed_asset_ids: list[str] = field(default_factory=list) - old_name: str | None = None - new_name: str | None = None - old_shared: bool | None = None - new_shared: bool | None = None +# Re-export models for backward compatibility with other integration files +__all__ = [ + "AlbumChange", + "AlbumData", + "AssetInfo", + "SharedLinkInfo", + "ImmichAlbumWatcherCoordinator", +] class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]): - """Coordinator for fetching Immich album data.""" + """Coordinator for fetching Immich album data. + + Delegates to ImmichClient from core library for API calls and uses + core's change detection, filtering, and URL utilities. + """ def __init__( self, @@ -348,22 +103,32 @@ class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]): name=f"{DOMAIN}_{album_id}", update_interval=timedelta(seconds=scan_interval), ) - self._url = url.rstrip("/") - self._api_key = api_key self._album_id = album_id self._album_name = album_name self._hub_name = hub_name self._previous_state: AlbumData | None = None - self._session: aiohttp.ClientSession | None = None - self._people_cache: dict[str, str] = {} # person_id -> name - self._users_cache: dict[str, str] = {} # user_id -> name - self._shared_links: list[SharedLinkInfo] = [] self._storage = storage self._telegram_cache = telegram_cache self._telegram_asset_cache = telegram_asset_cache self._persisted_asset_ids: set[str] | None = None - self._external_domain: str | None = None # Fetched from server config - self._pending_asset_ids: set[str] = set() # Assets detected but not yet processed + self._pending_asset_ids: set[str] = set() + + # Core library client (session injected lazily) + self._client: ImmichClient | None = None + self._url = url + self._api_key = api_key + + # Caches managed by the client + self._users_cache: dict[str, str] = {} + self._shared_links: list[SharedLinkInfo] = [] + self._server_config_fetched = False + + def _get_client(self) -> ImmichClient: + """Get or create the Immich client (lazy init with HA session).""" + if self._client is None: + session = async_get_clientsession(self.hass) + self._client = ImmichClient(session, self._url, self._api_key) + return self._client @property def immich_url(self) -> str: @@ -372,32 +137,8 @@ class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]): @property def external_url(self) -> str: - """Return the external URL for links. - - Uses externalDomain from Immich server config if set, - otherwise falls back to the connection URL. - """ - if self._external_domain: - return self._external_domain.rstrip("/") - return self._url - - def get_internal_download_url(self, url: str) -> str: - """Convert an external URL to internal URL for faster downloads. - - If the URL starts with the external domain, replace it with the - internal connection URL to download via local network. - - Args: - url: The URL to convert (may be external or internal) - - Returns: - URL using internal connection for downloads - """ - if self._external_domain: - external = self._external_domain.rstrip("/") - if url.startswith(external): - return url.replace(external, self._url, 1) - return url + """Return the external URL for links.""" + return self._get_client().external_url @property def api_key(self) -> str: @@ -424,6 +165,10 @@ class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]): """Return the Telegram asset cache (asset ID-based).""" return self._telegram_asset_cache + def get_internal_download_url(self, url: str) -> str: + """Convert an external URL to internal URL for faster downloads.""" + return self._get_client().get_internal_download_url(url) + def get_asset_thumbhash(self, asset_id: str) -> str | None: """Get the current thumbhash for an asset from coordinator data.""" if self.data and asset_id in self.data.assets: @@ -439,11 +184,7 @@ class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]): await self.async_request_refresh() async def async_load_persisted_state(self) -> None: - """Load persisted asset IDs from storage. - - This should be called before the first refresh to enable - detection of changes that occurred during downtime. - """ + """Load persisted asset IDs from storage.""" if self._storage: self._persisted_asset_ids = self._storage.get_album_asset_ids( self._album_id @@ -471,279 +212,69 @@ class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]): state: str | None = None, country: str | None = None, ) -> list[dict[str, Any]]: - """Get assets from the album with optional filtering and ordering. - - Args: - limit: Maximum number of assets to return (1-100) - offset: Number of assets to skip before returning results (for pagination) - favorite_only: Filter to show only favorite assets - filter_min_rating: Minimum rating for assets (1-5) - order_by: Field to sort by - 'date', 'rating', or 'name' - order: Sort direction - 'ascending', 'descending', or 'random' - asset_type: Asset type filter - 'all', 'photo', or 'video' - min_date: Filter assets created on or after this date (ISO 8601 format) - max_date: Filter assets created on or before this date (ISO 8601 format) - memory_date: Filter assets by matching month and day, excluding the same year (memories filter) - city: Filter assets by city (case-insensitive substring match) - state: Filter assets by state/region (case-insensitive substring match) - country: Filter assets by country (case-insensitive substring match) - - Returns: - List of asset data dictionaries - """ + """Get assets from the album with optional filtering and ordering.""" if self.data is None: return [] - # Start with all processed assets only - assets = [a for a in self.data.assets.values() if a.is_processed] - - # Apply favorite filter - if favorite_only: - assets = [a for a in assets if a.is_favorite] - - # Apply rating filter - if filter_min_rating > 1: - assets = [a for a in assets if a.rating is not None and a.rating >= filter_min_rating] - - # Apply asset type filtering - if asset_type == "photo": - assets = [a for a in assets if a.type == ASSET_TYPE_IMAGE] - elif asset_type == "video": - assets = [a for a in assets if a.type == ASSET_TYPE_VIDEO] - - # Apply date filtering - if min_date: - assets = [a for a in assets if a.created_at >= min_date] - if max_date: - assets = [a for a in assets if a.created_at <= max_date] - - # Apply memory date filtering (match month and day, exclude same year) - if memory_date: - try: - # Parse the reference date (supports ISO 8601 format) - ref_date = datetime.fromisoformat(memory_date.replace("Z", "+00:00")) - ref_year = ref_date.year - ref_month = ref_date.month - ref_day = ref_date.day - - def matches_memory(asset: AssetInfo) -> bool: - """Check if asset matches memory criteria (same month/day, different year).""" - try: - asset_date = datetime.fromisoformat( - asset.created_at.replace("Z", "+00:00") - ) - # Match month and day, but exclude same year (true memories behavior) - return ( - asset_date.month == ref_month - and asset_date.day == ref_day - and asset_date.year != ref_year - ) - except (ValueError, AttributeError): - return False - - assets = [a for a in assets if matches_memory(a)] - except ValueError: - _LOGGER.warning("Invalid memory_date format: %s", memory_date) - - # Apply geolocation filtering (case-insensitive substring match) - if city: - city_lower = city.lower() - assets = [a for a in assets if a.city and city_lower in a.city.lower()] - if state: - state_lower = state.lower() - assets = [a for a in assets if a.state and state_lower in a.state.lower()] - if country: - country_lower = country.lower() - assets = [a for a in assets if a.country and country_lower in a.country.lower()] - - # Apply ordering - if order_by == "random": - import random - random.shuffle(assets) - elif order_by == "rating": - # Sort by rating, putting None values last - assets = sorted( - assets, - key=lambda a: (a.rating is None, a.rating if a.rating is not None else 0), - reverse=(order == "descending") - ) - elif order_by == "name": - assets = sorted( - assets, - key=lambda a: a.filename.lower(), - reverse=(order == "descending") - ) - else: # date (default) - assets = sorted( - assets, - key=lambda a: a.created_at, - reverse=(order == "descending") - ) + # Use core library for filtering and sorting + assets = list(self.data.assets.values()) + assets = filter_assets( + assets, + favorite_only=favorite_only, + min_rating=filter_min_rating, + asset_type=asset_type, + min_date=min_date, + max_date=max_date, + memory_date=memory_date, + city=city, + state=state, + country=country, + processed_only=True, + ) + assets = sort_assets(assets, order_by=order_by, order=order) # Apply offset and limit for pagination assets = assets[offset : offset + limit] - # Build result with all available asset data (matching event data) - result = [] - for asset in assets: - asset_data = self._build_asset_detail(asset, include_thumbnail=True) - result.append(asset_data) - return result + # Build result using core utility + return [ + build_asset_detail( + asset, self.external_url, self._shared_links, include_thumbnail=True + ) + for asset in assets + ] async def async_fetch_people(self) -> dict[str, str]: """Fetch all people from Immich.""" - if self._session is None: - self._session = async_get_clientsession(self.hass) + client = self._get_client() + result = await client.get_people() + return result - headers = {"x-api-key": self._api_key} - try: - async with self._session.get( - f"{self._url}/api/people", - headers=headers, - ) as response: - if response.status == 200: - data = await response.json() - people_list = data.get("people", data) if isinstance(data, dict) else data - self._people_cache = { - p["id"]: p.get("name", "") - for p in people_list - if p.get("name") - } - except aiohttp.ClientError as err: - _LOGGER.warning("Failed to fetch people: %s", err) - - return self._people_cache - - async def _async_fetch_users(self) -> dict[str, str]: - """Fetch all users from Immich and cache them.""" - if self._session is None: - self._session = async_get_clientsession(self.hass) - - headers = {"x-api-key": self._api_key} - try: - async with self._session.get( - f"{self._url}/api/users", - headers=headers, - ) as response: - if response.status == 200: - data = await response.json() - self._users_cache = { - u["id"]: u.get("name", u.get("email", "Unknown")) - for u in data - if u.get("id") - } - except aiohttp.ClientError as err: - _LOGGER.warning("Failed to fetch users: %s", err) - - return self._users_cache - - async def _async_fetch_server_config(self) -> None: - """Fetch server config from Immich to get external domain.""" - if self._session is None: - self._session = async_get_clientsession(self.hass) - - headers = {"x-api-key": self._api_key} - try: - async with self._session.get( - f"{self._url}/api/server/config", - headers=headers, - ) as response: - if response.status == 200: - data = await response.json() - external_domain = data.get("externalDomain", "") or "" - self._external_domain = external_domain - if external_domain: - _LOGGER.debug( - "Using external domain from Immich: %s", external_domain - ) - else: - _LOGGER.debug( - "No external domain configured in Immich, using connection URL" - ) - else: - _LOGGER.warning( - "Failed to fetch server config: HTTP %s", response.status - ) - except aiohttp.ClientError as err: - _LOGGER.warning("Failed to fetch server config: %s", err) - - async def _async_fetch_shared_links(self) -> list[SharedLinkInfo]: - """Fetch shared links for this album from Immich.""" - if self._session is None: - self._session = async_get_clientsession(self.hass) - - headers = {"x-api-key": self._api_key} - self._shared_links = [] - - try: - async with self._session.get( - f"{self._url}/api/shared-links", - headers=headers, - ) as response: - if response.status == 200: - data = await response.json() - for link in data: - album = link.get("album") - key = link.get("key") - if album and key and album.get("id") == self._album_id: - link_info = SharedLinkInfo.from_api_response(link) - self._shared_links.append(link_info) - _LOGGER.debug( - "Found shared link for album: key=%s, has_password=%s", - key[:8], - link_info.has_password, - ) - except aiohttp.ClientError as err: - _LOGGER.warning("Failed to fetch shared links: %s", err) - - return self._shared_links - - def _get_accessible_links(self) -> list[SharedLinkInfo]: - """Get all accessible (no password, not expired) shared links.""" - return [link for link in self._shared_links if link.is_accessible] - - def _get_protected_links(self) -> list[SharedLinkInfo]: - """Get password-protected but not expired shared links.""" - return [link for link in self._shared_links if link.has_password and not link.is_expired] + # --- Shared link URL helpers (delegate to core asset_utils) --- def get_public_url(self) -> str | None: """Get the public URL if album has an accessible shared link.""" - accessible_links = self._get_accessible_links() - if accessible_links: - return f"{self.external_url}/share/{accessible_links[0].key}" - return None + return get_public_url(self.external_url, self._shared_links) def get_any_url(self) -> str | None: """Get any non-expired URL (prefers accessible, falls back to protected).""" - accessible_links = self._get_accessible_links() - if accessible_links: - return f"{self.external_url}/share/{accessible_links[0].key}" - non_expired = [link for link in self._shared_links if not link.is_expired] - if non_expired: - return f"{self.external_url}/share/{non_expired[0].key}" - return None + return get_any_url(self.external_url, self._shared_links) def get_protected_url(self) -> str | None: """Get a protected URL if any password-protected link exists.""" - protected_links = self._get_protected_links() - if protected_links: - return f"{self.external_url}/share/{protected_links[0].key}" - return None + return get_protected_url(self.external_url, self._shared_links) def get_protected_urls(self) -> list[str]: """Get all password-protected URLs.""" - return [f"{self.external_url}/share/{link.key}" for link in self._get_protected_links()] + return get_protected_urls(self.external_url, self._shared_links) def get_protected_password(self) -> str | None: """Get the password for the first protected link.""" - protected_links = self._get_protected_links() - if protected_links and protected_links[0].password: - return protected_links[0].password - return None + return get_protected_password(self._shared_links) def get_public_urls(self) -> list[str]: """Get all accessible public URLs.""" - return [f"{self.external_url}/share/{link.key}" for link in self._get_accessible_links()] + return get_public_urls(self.external_url, self._shared_links) def get_shared_links_info(self) -> list[dict[str, Any]]: """Get detailed info about all shared links.""" @@ -758,321 +289,182 @@ class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]): for link in self._shared_links ] - def _get_asset_public_url(self, asset_id: str) -> str | None: - """Get the public viewer URL for an asset (web page).""" - accessible_links = self._get_accessible_links() - if accessible_links: - return f"{self.external_url}/share/{accessible_links[0].key}/photos/{asset_id}" - non_expired = [link for link in self._shared_links if not link.is_expired] - if non_expired: - return f"{self.external_url}/share/{non_expired[0].key}/photos/{asset_id}" + def get_protected_link_id(self) -> str | None: + """Get the ID of the first protected link.""" + protected = get_protected_links(self._shared_links) + if protected: + return protected[0].id return None - def _get_asset_download_url(self, asset_id: str) -> str | None: - """Get the direct download URL for an asset (media file).""" - accessible_links = self._get_accessible_links() - if accessible_links: - return f"{self.external_url}/api/assets/{asset_id}/original?key={accessible_links[0].key}" - non_expired = [link for link in self._shared_links if not link.is_expired] - if non_expired: - return f"{self.external_url}/api/assets/{asset_id}/original?key={non_expired[0].key}" + def has_unprotected_link(self) -> bool: + """Check if album has an unprotected (accessible) shared link.""" + return len(get_accessible_links(self._shared_links)) > 0 + + def has_protected_link(self) -> bool: + """Check if album has a protected (password) shared link.""" + return len(get_protected_links(self._shared_links)) > 0 + + def get_unprotected_link_id(self) -> str | None: + """Get the ID of the first unprotected link.""" + accessible = get_accessible_links(self._shared_links) + if accessible: + return accessible[0].id return None - def _get_asset_video_url(self, asset_id: str) -> str | None: - """Get the transcoded video playback URL for a video asset.""" - accessible_links = self._get_accessible_links() - if accessible_links: - return f"{self.external_url}/api/assets/{asset_id}/video/playback?key={accessible_links[0].key}" - non_expired = [link for link in self._shared_links if not link.is_expired] - if non_expired: - return f"{self.external_url}/api/assets/{asset_id}/video/playback?key={non_expired[0].key}" - return None + # --- Shared link management (delegate to core client) --- - def _get_asset_photo_url(self, asset_id: str) -> str | None: - """Get the preview-sized thumbnail URL for a photo asset.""" - accessible_links = self._get_accessible_links() - if accessible_links: - return f"{self.external_url}/api/assets/{asset_id}/thumbnail?size=preview&key={accessible_links[0].key}" - non_expired = [link for link in self._shared_links if not link.is_expired] - if non_expired: - return f"{self.external_url}/api/assets/{asset_id}/thumbnail?size=preview&key={non_expired[0].key}" - return None + async def async_create_shared_link(self, password: str | None = None) -> bool: + """Create a new shared link for the album.""" + client = self._get_client() + result = await client.create_shared_link(self._album_id, password) + if result: + self._shared_links = await client.get_shared_links(self._album_id) + return result - def _build_asset_detail( - self, asset: AssetInfo, include_thumbnail: bool = True - ) -> dict[str, Any]: - """Build asset detail dictionary with all available data. + async def async_delete_shared_link(self, link_id: str) -> bool: + """Delete a shared link.""" + client = self._get_client() + result = await client.delete_shared_link(link_id) + if result: + self._shared_links = await client.get_shared_links(self._album_id) + return result - Args: - asset: AssetInfo object - include_thumbnail: If True, include thumbnail_url + async def async_set_shared_link_password( + self, link_id: str, password: str | None + ) -> bool: + """Update the password for a shared link.""" + client = self._get_client() + result = await client.set_shared_link_password(link_id, password) + if result: + self._shared_links = await client.get_shared_links(self._album_id) + return result - Returns: - Dictionary with asset details (using ATTR_* constants for consistency) - """ - # Base asset data - asset_detail = { - "id": asset.id, - ATTR_ASSET_TYPE: asset.type, - ATTR_ASSET_FILENAME: asset.filename, - ATTR_ASSET_CREATED: asset.created_at, - ATTR_ASSET_OWNER: asset.owner_name, - ATTR_ASSET_OWNER_ID: asset.owner_id, - ATTR_ASSET_DESCRIPTION: asset.description, - ATTR_PEOPLE: asset.people, - ATTR_ASSET_IS_FAVORITE: asset.is_favorite, - ATTR_ASSET_RATING: asset.rating, - ATTR_ASSET_LATITUDE: asset.latitude, - ATTR_ASSET_LONGITUDE: asset.longitude, - ATTR_ASSET_CITY: asset.city, - ATTR_ASSET_STATE: asset.state, - ATTR_ASSET_COUNTRY: asset.country, - } + def clear_new_assets_flag(self) -> None: + """Clear the new assets flag.""" + if self.data: + self.data.has_new_assets = False + self.data.last_change_time = None - # Add thumbnail URL if requested - if include_thumbnail: - asset_detail[ATTR_THUMBNAIL_URL] = f"{self.external_url}/api/assets/{asset.id}/thumbnail" - - # Add public viewer URL (web page) - asset_url = self._get_asset_public_url(asset.id) - if asset_url: - asset_detail[ATTR_ASSET_URL] = asset_url - - # Add download URL (direct media file) - asset_download_url = self._get_asset_download_url(asset.id) - if asset_download_url: - asset_detail[ATTR_ASSET_DOWNLOAD_URL] = asset_download_url - - # Add type-specific URLs - if asset.type == ASSET_TYPE_VIDEO: - asset_video_url = self._get_asset_video_url(asset.id) - if asset_video_url: - asset_detail[ATTR_ASSET_PLAYBACK_URL] = asset_video_url - elif asset.type == ASSET_TYPE_IMAGE: - asset_photo_url = self._get_asset_photo_url(asset.id) - if asset_photo_url: - asset_detail["photo_url"] = asset_photo_url # TODO: Add ATTR_ASSET_PHOTO_URL constant - - return asset_detail + # --- Data update (main polling loop) --- async def _async_update_data(self) -> AlbumData | None: """Fetch data from Immich API.""" - if self._session is None: - self._session = async_get_clientsession(self.hass) + client = self._get_client() # Fetch server config to get external domain (once) - if self._external_domain is None: - await self._async_fetch_server_config() + if not self._server_config_fetched: + await client.get_server_config() + self._server_config_fetched = True # Fetch users to resolve owner names if not self._users_cache: - await self._async_fetch_users() + self._users_cache = await client.get_users() # Fetch shared links (refresh each time as links can change) - await self._async_fetch_shared_links() - - headers = {"x-api-key": self._api_key} + self._shared_links = await client.get_shared_links(self._album_id) try: - async with self._session.get( - f"{self._url}/api/albums/{self._album_id}", - headers=headers, - ) as response: - if response.status == 404: - _LOGGER.warning("Album %s not found", self._album_id) - # Fire album_deleted event if we had previous state (album was deleted) - if self._previous_state: - event_data = { - ATTR_HUB_NAME: self._hub_name, - ATTR_ALBUM_ID: self._album_id, - ATTR_ALBUM_NAME: self._previous_state.name, - } - self.hass.bus.async_fire(EVENT_ALBUM_DELETED, event_data) - _LOGGER.info("Album '%s' was deleted", self._previous_state.name) - return None - if response.status != 200: - raise UpdateFailed( - f"Error fetching album {self._album_id}: HTTP {response.status}" - ) + album = await client.get_album(self._album_id, self._users_cache) + except ImmichApiError as err: + raise UpdateFailed(str(err)) from err - data = await response.json() - album = AlbumData.from_api_response(data, self._users_cache) - - # Detect changes - if self._previous_state: - change = self._detect_change(self._previous_state, album) - if change: - album.has_new_assets = change.added_count > 0 - album.last_change_time = datetime.now() - self._fire_events(change, album) - elif self._persisted_asset_ids is not None: - # First refresh after restart - compare with persisted state - added_ids = album.asset_ids - self._persisted_asset_ids - removed_ids = self._persisted_asset_ids - album.asset_ids - - if added_ids or removed_ids: - change_type = "changed" - if added_ids and not removed_ids: - change_type = "assets_added" - elif removed_ids and not added_ids: - change_type = "assets_removed" - - added_assets = [] - for aid in added_ids: - if aid not in album.assets: - continue - asset = album.assets[aid] - if asset.is_processed: - added_assets.append(asset) - else: - # Track unprocessed assets for later - self._pending_asset_ids.add(aid) - - change = AlbumChange( - album_id=album.id, - album_name=album.name, - change_type=change_type, - added_count=len(added_ids), - removed_count=len(removed_ids), - added_assets=added_assets, - removed_asset_ids=list(removed_ids), - ) - album.has_new_assets = change.added_count > 0 - album.last_change_time = datetime.now() - self._fire_events(change, album) - _LOGGER.info( - "Detected changes during downtime for album '%s': +%d -%d", - album.name, - len(added_ids), - len(removed_ids), - ) - else: - album.has_new_assets = False - - # Clear persisted state after first comparison - self._persisted_asset_ids = None - else: - album.has_new_assets = False - - # Preserve has_new_assets from previous state if still within window - if self._previous_state: - prev = self._previous_state - if prev.has_new_assets and prev.last_change_time: - album.last_change_time = prev.last_change_time - if not album.has_new_assets: - album.has_new_assets = prev.has_new_assets - - # Update previous state - self._previous_state = album - - # Persist current state for recovery after restart - if self._storage: - await self._storage.async_save_album_state( - self._album_id, album.asset_ids - ) - - return album - - except aiohttp.ClientError as err: - raise UpdateFailed(f"Error communicating with Immich: {err}") from err - - def _detect_change( - self, old_state: AlbumData, new_state: AlbumData - ) -> AlbumChange | None: - """Detect changes between two album states.""" - added_ids = new_state.asset_ids - old_state.asset_ids - removed_ids = old_state.asset_ids - new_state.asset_ids - - _LOGGER.debug( - "Change detection: added_ids=%d, removed_ids=%d, pending=%d", - len(added_ids), - len(removed_ids), - len(self._pending_asset_ids), - ) - - # Track new unprocessed assets and collect processed ones - added_assets = [] - for aid in added_ids: - if aid not in new_state.assets: - _LOGGER.debug("Asset %s: not in assets dict", aid) - continue - asset = new_state.assets[aid] - _LOGGER.debug( - "New asset %s (%s): is_processed=%s, filename=%s", - aid, - asset.type, - asset.is_processed, - asset.filename, - ) - if asset.is_processed: - added_assets.append(asset) - else: - # Track unprocessed assets for later - self._pending_asset_ids.add(aid) - _LOGGER.debug("Asset %s added to pending (not yet processed)", aid) - - # Check if any pending assets are now processed - newly_processed = [] - for aid in list(self._pending_asset_ids): - if aid not in new_state.assets: - # Asset was removed, no longer pending - self._pending_asset_ids.discard(aid) - continue - asset = new_state.assets[aid] - if asset.is_processed: - _LOGGER.debug( - "Pending asset %s (%s) is now processed: filename=%s", - aid, - asset.type, - asset.filename, - ) - newly_processed.append(asset) - self._pending_asset_ids.discard(aid) - - # Include newly processed pending assets - added_assets.extend(newly_processed) - - # Detect metadata changes - name_changed = old_state.name != new_state.name - sharing_changed = old_state.shared != new_state.shared - - # Return None only if nothing changed at all - if not added_assets and not removed_ids and not name_changed and not sharing_changed: + if album is None: + # Album was deleted (404) + if self._previous_state: + event_data = { + ATTR_HUB_NAME: self._hub_name, + ATTR_ALBUM_ID: self._album_id, + ATTR_ALBUM_NAME: self._previous_state.name, + } + self.hass.bus.async_fire(EVENT_ALBUM_DELETED, event_data) + _LOGGER.info("Album '%s' was deleted", self._previous_state.name) return None - # Determine primary change type (use added_assets not added_ids) - change_type = "changed" - if name_changed and not added_assets and not removed_ids and not sharing_changed: - change_type = "album_renamed" - elif sharing_changed and not added_assets and not removed_ids and not name_changed: - change_type = "album_sharing_changed" - elif added_assets and not removed_ids and not name_changed and not sharing_changed: - change_type = "assets_added" - elif removed_ids and not added_assets and not name_changed and not sharing_changed: - change_type = "assets_removed" + # Detect changes using core library + if self._previous_state: + change, self._pending_asset_ids = detect_album_changes( + self._previous_state, album, self._pending_asset_ids + ) + if change: + album.has_new_assets = change.added_count > 0 + album.last_change_time = datetime.now() + self._fire_events(change, album) + elif self._persisted_asset_ids is not None: + # First refresh after restart - compare with persisted state + added_ids = album.asset_ids - self._persisted_asset_ids + removed_ids = self._persisted_asset_ids - album.asset_ids - return AlbumChange( - album_id=new_state.id, - album_name=new_state.name, - change_type=change_type, - added_count=len(added_assets), # Count only processed assets - removed_count=len(removed_ids), - added_assets=added_assets, - removed_asset_ids=list(removed_ids), - old_name=old_state.name if name_changed else None, - new_name=new_state.name if name_changed else None, - old_shared=old_state.shared if sharing_changed else None, - new_shared=new_state.shared if sharing_changed else None, - ) + if added_ids or removed_ids: + change_type = "changed" + if added_ids and not removed_ids: + change_type = "assets_added" + elif removed_ids and not added_ids: + change_type = "assets_removed" + + added_assets = [] + for aid in added_ids: + if aid not in album.assets: + continue + asset = album.assets[aid] + if asset.is_processed: + added_assets.append(asset) + else: + self._pending_asset_ids.add(aid) + + change = AlbumChange( + album_id=album.id, + album_name=album.name, + change_type=change_type, + added_count=len(added_ids), + removed_count=len(removed_ids), + added_assets=added_assets, + removed_asset_ids=list(removed_ids), + ) + album.has_new_assets = change.added_count > 0 + album.last_change_time = datetime.now() + self._fire_events(change, album) + _LOGGER.info( + "Detected changes during downtime for album '%s': +%d -%d", + album.name, + len(added_ids), + len(removed_ids), + ) + else: + album.has_new_assets = False + + # Clear persisted state after first comparison + self._persisted_asset_ids = None + else: + album.has_new_assets = False + + # Preserve has_new_assets from previous state if still within window + if self._previous_state: + prev = self._previous_state + if prev.has_new_assets and prev.last_change_time: + album.last_change_time = prev.last_change_time + if not album.has_new_assets: + album.has_new_assets = prev.has_new_assets + + # Update previous state + self._previous_state = album + + # Persist current state for recovery after restart + if self._storage: + await self._storage.async_save_album_state( + self._album_id, album.asset_ids + ) + + return album def _fire_events(self, change: AlbumChange, album: AlbumData) -> None: """Fire Home Assistant events for album changes.""" added_assets_detail = [] for asset in change.added_assets: - # Only include fully processed assets if not asset.is_processed: continue - asset_detail = self._build_asset_detail(asset, include_thumbnail=False) + asset_detail = build_asset_detail( + asset, self.external_url, self._shared_links, include_thumbnail=False + ) added_assets_detail.append(asset_detail) event_data = { @@ -1088,7 +480,6 @@ class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]): ATTR_SHARED: album.shared, } - # Add metadata change attributes if applicable if change.old_name is not None: event_data[ATTR_OLD_NAME] = change.old_name event_data[ATTR_NEW_NAME] = change.new_name @@ -1116,7 +507,6 @@ class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]): if change.removed_count > 0: self.hass.bus.async_fire(EVENT_ASSETS_REMOVED, event_data) - # Fire specific events for metadata changes if change.old_name is not None: self.hass.bus.async_fire(EVENT_ALBUM_RENAMED, event_data) _LOGGER.info( @@ -1133,139 +523,3 @@ class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]): change.old_shared, change.new_shared, ) - - def get_protected_link_id(self) -> str | None: - """Get the ID of the first protected link.""" - protected_links = self._get_protected_links() - if protected_links: - return protected_links[0].id - return None - - async def async_set_shared_link_password( - self, link_id: str, password: str | None - ) -> bool: - """Update the password for a shared link via Immich API.""" - if self._session is None: - self._session = async_get_clientsession(self.hass) - - headers = { - "x-api-key": self._api_key, - "Content-Type": "application/json", - } - - payload = {"password": password if password else None} - - try: - async with self._session.patch( - f"{self._url}/api/shared-links/{link_id}", - headers=headers, - json=payload, - ) as response: - if response.status == 200: - _LOGGER.info("Successfully updated shared link password") - await self._async_fetch_shared_links() - return True - else: - _LOGGER.error( - "Failed to update shared link password: HTTP %s", - response.status, - ) - return False - except aiohttp.ClientError as err: - _LOGGER.error("Error updating shared link password: %s", err) - return False - - def clear_new_assets_flag(self) -> None: - """Clear the new assets flag.""" - if self.data: - self.data.has_new_assets = False - self.data.last_change_time = None - - def has_unprotected_link(self) -> bool: - """Check if album has an unprotected (accessible) shared link.""" - return len(self._get_accessible_links()) > 0 - - def has_protected_link(self) -> bool: - """Check if album has a protected (password) shared link.""" - return len(self._get_protected_links()) > 0 - - def get_unprotected_link_id(self) -> str | None: - """Get the ID of the first unprotected link.""" - accessible_links = self._get_accessible_links() - if accessible_links: - return accessible_links[0].id - return None - - async def async_create_shared_link(self, password: str | None = None) -> bool: - """Create a new shared link for the album via Immich API.""" - if self._session is None: - self._session = async_get_clientsession(self.hass) - - headers = { - "x-api-key": self._api_key, - "Content-Type": "application/json", - } - - payload: dict[str, Any] = { - "albumId": self._album_id, - "type": "ALBUM", - "allowDownload": True, - "allowUpload": False, - "showMetadata": True, - } - - if password: - payload["password"] = password - - try: - async with self._session.post( - f"{self._url}/api/shared-links", - headers=headers, - json=payload, - ) as response: - if response.status == 201: - _LOGGER.info( - "Successfully created shared link for album %s", - self._album_name, - ) - await self._async_fetch_shared_links() - return True - else: - error_text = await response.text() - _LOGGER.error( - "Failed to create shared link: HTTP %s - %s", - response.status, - error_text, - ) - return False - except aiohttp.ClientError as err: - _LOGGER.error("Error creating shared link: %s", err) - return False - - async def async_delete_shared_link(self, link_id: str) -> bool: - """Delete a shared link via Immich API.""" - if self._session is None: - self._session = async_get_clientsession(self.hass) - - headers = {"x-api-key": self._api_key} - - try: - async with self._session.delete( - f"{self._url}/api/shared-links/{link_id}", - headers=headers, - ) as response: - if response.status == 200: - _LOGGER.info("Successfully deleted shared link") - await self._async_fetch_shared_links() - return True - else: - error_text = await response.text() - _LOGGER.error( - "Failed to delete shared link: HTTP %s - %s", - response.status, - error_text, - ) - return False - except aiohttp.ClientError as err: - _LOGGER.error("Error deleting shared link: %s", err) - return False diff --git a/custom_components/immich_album_watcher/manifest.json b/custom_components/immich_album_watcher/manifest.json index b9b04c5..1507659 100644 --- a/custom_components/immich_album_watcher/manifest.json +++ b/custom_components/immich_album_watcher/manifest.json @@ -7,6 +7,6 @@ "documentation": "https://github.com/DolgolyovAlexei/haos-hacs-immich-album-watcher", "iot_class": "cloud_polling", "issue_tracker": "https://github.com/DolgolyovAlexei/haos-hacs-immich-album-watcher/issues", - "requirements": [], + "requirements": ["immich-watcher-core==0.1.0"], "version": "2.8.0" } diff --git a/custom_components/immich_album_watcher/sensor.py b/custom_components/immich_album_watcher/sensor.py index 3d84466..6c1c2d1 100644 --- a/custom_components/immich_album_watcher/sensor.py +++ b/custom_components/immich_album_watcher/sensor.py @@ -2,9 +2,7 @@ from __future__ import annotations -import asyncio import logging -import re from datetime import datetime from typing import Any @@ -18,12 +16,16 @@ from homeassistant.components.sensor import ( from homeassistant.config_entries import ConfigEntry, ConfigSubentry from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse, callback from homeassistant.helpers import entity_platform +from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.device_registry import DeviceEntryType from homeassistant.helpers.entity import DeviceInfo from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.update_coordinator import CoordinatorEntity from homeassistant.util import slugify +from immich_watcher_core.models import AlbumData +from immich_watcher_core.telegram.client import TelegramClient + from .const import ( ATTR_ALBUM_ID, ATTR_ALBUM_NAME, @@ -47,126 +49,28 @@ from .const import ( SERVICE_REFRESH, SERVICE_SEND_TELEGRAM_NOTIFICATION, ) -from .coordinator import AlbumData, ImmichAlbumWatcherCoordinator -from .storage import NotificationQueue, TelegramFileCache +from .coordinator import ImmichAlbumWatcherCoordinator +from .storage import NotificationQueue _LOGGER = logging.getLogger(__name__) -# Telegram constants -TELEGRAM_API_BASE_URL = "https://api.telegram.org/bot" -TELEGRAM_MAX_PHOTO_SIZE = 10 * 1024 * 1024 # 10 MB - Telegram's max photo size -TELEGRAM_MAX_VIDEO_SIZE = 50 * 1024 * 1024 # 50 MB - Telegram's max video/document upload size -TELEGRAM_MAX_DIMENSION_SUM = 10000 # Maximum sum of width + height in pixels - -# Regex pattern for Immich asset ID (UUID format) -_ASSET_ID_PATTERN = re.compile(r"^[a-f0-9-]{36}$") - -# Regex patterns to extract asset ID from Immich URLs -# Matches patterns like: -# - /api/assets/{asset_id}/original -# - /api/assets/{asset_id}/thumbnail -# - /api/assets/{asset_id}/video/playback -# - /share/{key}/photos/{asset_id} -_IMMICH_ASSET_ID_PATTERNS = [ - re.compile(r"/api/assets/([a-f0-9-]{36})/(?:original|thumbnail|video)"), - re.compile(r"/share/[^/]+/photos/([a-f0-9-]{36})"), -] - - -def _is_asset_id(value: str) -> bool: - """Check if a string is a valid Immich asset ID (UUID format). - - Args: - value: The string to check - - Returns: - True if the string matches the UUID format, False otherwise - """ - return bool(_ASSET_ID_PATTERN.match(value)) - - -def _extract_asset_id_from_url(url: str) -> str | None: - """Extract asset ID from Immich URL if possible. - - Supports the following URL patterns: - - /api/assets/{asset_id}/original?... - - /api/assets/{asset_id}/thumbnail?... - - /api/assets/{asset_id}/video/playback?... - - /share/{key}/photos/{asset_id} - - Args: - url: The URL to extract asset ID from - - Returns: - The asset ID if found, None otherwise - """ - if not url: - return None - - for pattern in _IMMICH_ASSET_ID_PATTERNS: - match = pattern.search(url) - if match: - return match.group(1) - - return None - - -def _split_media_by_upload_size( - media_items: list[tuple], max_upload_size: int -) -> list[list[tuple]]: - """Split media items into sub-groups respecting upload size limit. - - Cached items (file_id references) don't count toward upload size since - they are sent as lightweight JSON references, not uploaded data. - - Args: - media_items: List of (media_type, media_ref, filename, cache_key, is_cached, content_type) - max_upload_size: Maximum total upload bytes per sub-group - - Returns: - List of sub-groups, each a list of media_items tuples - """ - if not media_items: - return [] - - groups: list[list[tuple]] = [] - current_group: list[tuple] = [] - current_upload_size = 0 - - for item in media_items: - _, media_ref, _, _, is_cached, _ = item - item_upload_size = 0 if is_cached else len(media_ref) - - if current_group and current_upload_size + item_upload_size > max_upload_size: - groups.append(current_group) - current_group = [item] - current_upload_size = item_upload_size - else: - current_group.append(item) - current_upload_size += item_upload_size - - if current_group: - groups.append(current_group) - - return groups - async def async_setup_entry( hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback, ) -> None: - """Set up Immich Album Watcher sensors from a config entry.""" - # Iterate through all album subentries + """Set up sensor entities for all album subentries.""" + entry_data = hass.data[DOMAIN][entry.entry_id] + for subentry_id, subentry in entry.subentries.items(): - subentry_data = hass.data[DOMAIN][entry.entry_id]["subentries"].get(subentry_id) + subentry_data = entry_data["subentries"].get(subentry_id) if not subentry_data: - _LOGGER.error("Subentry data not found for %s", subentry_id) continue coordinator = subentry_data.coordinator - entities: list[SensorEntity] = [ + entities = [ ImmichAlbumIdSensor(coordinator, entry, subentry), ImmichAlbumAssetCountSensor(coordinator, entry, subentry), ImmichAlbumPhotoCountSensor(coordinator, entry, subentry), @@ -270,7 +174,6 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se self._album_id = subentry.data[CONF_ALBUM_ID] self._album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album") self._hub_name = entry.data.get(CONF_HUB_NAME, "Immich") - # Generate unique_id prefix: {hub_name}_album_{album_name} self._unique_id_prefix = slugify(f"{self._hub_name}_album_{self._album_name}") @property @@ -355,7 +258,6 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se if start_time <= end_time: return start_time <= now < end_time else: - # Crosses midnight (e.g., 22:00 - 08:00) return now >= start_time or now < end_time async def async_send_telegram_notification( @@ -376,24 +278,10 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se quiet_hours_start: str | None = None, quiet_hours_end: str | None = None, ) -> ServiceResponse: - """Send notification to Telegram. - - Supports: - - Empty assets: sends a simple text message - - Single photo: uses sendPhoto API - - Single video: uses sendVideo API - - Multiple items: uses sendMediaGroup API (splits into multiple groups if needed) - - Each item in assets should be a dict with 'url', optional 'type' (photo/video/document), - and optional 'cache_key' (custom key for caching instead of URL). - Downloads media and uploads to Telegram to bypass CORS restrictions. - - If wait_for_response is False, the task will be executed in the background - and the service will return immediately. - """ + """Send notification to Telegram.""" # Check quiet hours — queue notification if active if self._is_quiet_hours(quiet_hours_start, quiet_hours_end): - from . import _register_queue_timers, ImmichConfigEntry + from . import _register_queue_timers queue: NotificationQueue = self.hass.data[DOMAIN][self._entry.entry_id]["notification_queue"] await queue.async_enqueue({ "entity_id": self.entity_id, @@ -412,7 +300,6 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se "quiet_hours_start": quiet_hours_start, "quiet_hours_end": quiet_hours_end, }) - # Register timer for this end time if not already registered _register_queue_timers(self.hass, self._entry) return {"success": True, "status": "queued_quiet_hours"} @@ -436,7 +323,6 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se ) return {"success": True, "status": "queued", "message": "Notification queued for background processing"} - # Blocking mode - execute and return result return await self._execute_telegram_notification( chat_id=chat_id, assets=assets, @@ -467,12 +353,7 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se send_large_photos_as_documents: bool = False, chat_action: str | None = "typing", ) -> ServiceResponse: - """Execute the Telegram notification (internal method).""" - import json - import aiohttp - from aiohttp import FormData - from homeassistant.helpers.aiohttp_client import async_get_clientsession - + """Execute the Telegram notification using core TelegramClient.""" # Get bot token from parameter or config token = bot_token or self._entry.options.get(CONF_TELEGRAM_BOT_TOKEN) if not token: @@ -483,1274 +364,29 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se session = async_get_clientsession(self.hass) - # Handle empty assets - send simple text message (no typing indicator needed) - if not assets: - return await self._send_telegram_message( - session, token, chat_id, caption or "", reply_to_message_id, disable_web_page_preview, parse_mode - ) - - # Start chat action indicator for media notifications (before downloading assets) - typing_task = None - if chat_action: - typing_task = self._start_typing_indicator(session, token, chat_id, chat_action) - - try: - # Handle single photo - if len(assets) == 1 and assets[0].get("type") == "photo": - return await self._send_telegram_photo( - session, token, chat_id, assets[0].get("url"), caption, reply_to_message_id, parse_mode, - max_asset_data_size, send_large_photos_as_documents, assets[0].get("content_type"), - assets[0].get("cache_key") - ) - - # Handle single video - if len(assets) == 1 and assets[0].get("type") == "video": - return await self._send_telegram_video( - session, token, chat_id, assets[0].get("url"), caption, reply_to_message_id, parse_mode, - max_asset_data_size, assets[0].get("content_type"), assets[0].get("cache_key") - ) - - # Handle single document (default type) - if len(assets) == 1 and assets[0].get("type", "document") == "document": - url = assets[0].get("url") - if not url: - return {"success": False, "error": "Missing 'url' for document"} - item_content_type = assets[0].get("content_type") - item_cache_key = assets[0].get("cache_key") - try: - download_url = self.coordinator.get_internal_download_url(url) - async with session.get(download_url) as resp: - if resp.status != 200: - return {"success": False, "error": f"Failed to download media: HTTP {resp.status}"} - data = await resp.read() - if max_asset_data_size is not None and len(data) > max_asset_data_size: - return {"success": False, "error": f"Media size ({len(data)} bytes) exceeds max_asset_data_size limit ({max_asset_data_size} bytes)"} - # Detect filename from URL or use generic name - filename = url.split("/")[-1].split("?")[0] or "file" - return await self._send_telegram_document( - session, token, chat_id, data, filename, caption, reply_to_message_id, parse_mode, - url, item_content_type, item_cache_key - ) - except aiohttp.ClientError as err: - return {"success": False, "error": f"Failed to download media: {err}"} - - # Handle multiple items - send as media group(s) - return await self._send_telegram_media_group( - session, token, chat_id, assets, caption, reply_to_message_id, max_group_size, chunk_delay, parse_mode, - max_asset_data_size, send_large_photos_as_documents - ) - finally: - # Stop chat action indicator when done (success or error) - if typing_task: - typing_task.cancel() - try: - await typing_task - except asyncio.CancelledError: - pass - - async def _send_telegram_message( - self, - session: Any, - token: str, - chat_id: str, - text: str, - reply_to_message_id: int | None = None, - disable_web_page_preview: bool | None = None, - parse_mode: str = "HTML", - ) -> ServiceResponse: - """Send a simple text message to Telegram.""" - import aiohttp - - telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendMessage" - - payload: dict[str, Any] = { - "chat_id": chat_id, - "text": text or "Notification from Home Assistant", - "parse_mode": parse_mode, - } - - if reply_to_message_id: - payload["reply_to_message_id"] = reply_to_message_id - - if disable_web_page_preview is not None: - payload["disable_web_page_preview"] = disable_web_page_preview - - try: - _LOGGER.debug("Sending text message to Telegram") - async with session.post(telegram_url, json=payload) as response: - result = await response.json() - _LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok")) - if response.status == 200 and result.get("ok"): - return { - "success": True, - "message_id": result.get("result", {}).get("message_id"), - } - else: - _LOGGER.error("Telegram API error: %s", result) - return { - "success": False, - "error": result.get("description", "Unknown Telegram error"), - "error_code": result.get("error_code"), - } - except aiohttp.ClientError as err: - _LOGGER.error("Telegram message send failed: %s", err) - return {"success": False, "error": str(err)} - - async def _send_telegram_chat_action( - self, - session: Any, - token: str, - chat_id: str, - action: str = "typing", - ) -> bool: - """Send a chat action to Telegram (e.g., typing indicator). - - Args: - session: aiohttp client session - token: Telegram bot token - chat_id: Target chat ID - action: Chat action type (typing, upload_photo, upload_video, etc.) - - Returns: - True if successful, False otherwise - """ - import aiohttp - - telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendChatAction" - payload = {"chat_id": chat_id, "action": action} - - try: - async with session.post(telegram_url, json=payload) as response: - result = await response.json() - if response.status == 200 and result.get("ok"): - _LOGGER.debug("Sent chat action '%s' to chat %s", action, chat_id) - return True - else: - _LOGGER.debug("Failed to send chat action: %s", result.get("description")) - return False - except aiohttp.ClientError as err: - _LOGGER.debug("Chat action request failed: %s", err) - return False - - def _start_typing_indicator( - self, - session: Any, - token: str, - chat_id: str, - action: str = "typing", - ) -> asyncio.Task: - """Start a background task that sends chat action indicator periodically. - - The chat action indicator expires after ~5 seconds, so we refresh it every 4 seconds. - - Args: - session: aiohttp client session - token: Telegram bot token - chat_id: Target chat ID - action: Chat action type (typing, upload_photo, upload_video, etc.) - - Returns: - The background task (cancel it when done) - """ - - async def action_loop() -> None: - """Keep sending chat action until cancelled.""" - try: - while True: - await self._send_telegram_chat_action(session, token, chat_id, action) - await asyncio.sleep(4) - except asyncio.CancelledError: - _LOGGER.debug("Chat action indicator stopped for action '%s'", action) - - return asyncio.create_task(action_loop()) - - def _log_telegram_error( - self, - error_code: int | None, - description: str, - data: bytes | None = None, - media_type: str = "photo", - ) -> None: - """Log detailed Telegram API error with diagnostics. - - Args: - error_code: Telegram error code - description: Error description from Telegram - data: Media data bytes (optional, for size diagnostics) - media_type: Type of media (photo/video) - """ - error_msg = f"Telegram API error ({error_code}): {description}" - - # Add diagnostic information based on error type - if data: - error_msg += f" | Media size: {len(data)} bytes ({len(data) / (1024 * 1024):.2f} MB)" - - # Check dimensions for photos - if media_type == "photo": - try: - from PIL import Image - import io - - img = Image.open(io.BytesIO(data)) - width, height = img.size - dimension_sum = width + height - error_msg += f" | Dimensions: {width}x{height} (sum={dimension_sum})" - - # Highlight limit violations - if len(data) > TELEGRAM_MAX_PHOTO_SIZE: - error_msg += f" | EXCEEDS size limit ({TELEGRAM_MAX_PHOTO_SIZE / (1024 * 1024):.0f} MB)" - if dimension_sum > TELEGRAM_MAX_DIMENSION_SUM: - error_msg += f" | EXCEEDS dimension limit ({TELEGRAM_MAX_DIMENSION_SUM})" - except Exception: - pass - - # Check size limit for videos - if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE: - error_msg += f" | EXCEEDS Telegram upload limit ({TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024):.0f} MB)" - - # Provide suggestions based on error description - suggestions = [] - if "dimension" in description.lower() or "PHOTO_INVALID_DIMENSIONS" in description: - suggestions.append("Photo dimensions too large - consider setting send_large_photos_as_documents=true") - elif "too large" in description.lower() or error_code == 413: - suggestions.append("File size too large - consider setting send_large_photos_as_documents=true or max_asset_data_size to skip large files") - elif "entity too large" in description.lower(): - suggestions.append("Request entity too large - reduce max_group_size or set max_asset_data_size") - - if suggestions: - error_msg += f" | Suggestions: {'; '.join(suggestions)}" - - _LOGGER.error(error_msg) - - def _check_telegram_photo_limits( - self, - data: bytes, - ) -> tuple[bool, str | None, int | None, int | None]: - """Check if photo data exceeds Telegram photo limits. - - Telegram limits for photos: - - Max file size: 10 MB - - Max dimension sum: ~10,000 pixels (width + height) - - Returns: - Tuple of (exceeds_limits, reason, width, height) - - exceeds_limits: True if photo exceeds limits - - reason: Human-readable reason (None if within limits) - - width: Image width in pixels (None if PIL not available) - - height: Image height in pixels (None if PIL not available) - """ - # Check file size - if len(data) > TELEGRAM_MAX_PHOTO_SIZE: - return True, f"size {len(data)} bytes exceeds {TELEGRAM_MAX_PHOTO_SIZE} bytes limit", None, None - - # Try to check dimensions using PIL - try: - from PIL import Image - import io - - img = Image.open(io.BytesIO(data)) - width, height = img.size - dimension_sum = width + height - - if dimension_sum > TELEGRAM_MAX_DIMENSION_SUM: - return True, f"dimensions {width}x{height} (sum={dimension_sum}) exceed {TELEGRAM_MAX_DIMENSION_SUM} limit", width, height - - return False, None, width, height - except ImportError: - # PIL not available, can't check dimensions - _LOGGER.debug("PIL not available, skipping dimension check") - return False, None, None, None - except Exception as e: - # Failed to check dimensions - _LOGGER.debug("Failed to check photo dimensions: %s", e) - return False, None, None, None - - def _get_telegram_cache_and_key( - self, - url: str | None, - cache_key: str | None = None, - ) -> tuple[TelegramFileCache | None, str | None, str | None]: - """Determine which Telegram cache, key, and thumbhash to use. - - Priority: custom cache_key -> direct asset ID -> extracted asset ID from URL -> URL - - Args: - url: The URL of the media (or asset ID directly) - cache_key: Optional custom cache key provided by user - - Returns: - Tuple of (cache instance, cache key, thumbhash) to use. - thumbhash is only populated when using the asset cache. - """ - if cache_key: - # Custom cache_key uses URL cache (no thumbhash) - return self.coordinator.telegram_cache, cache_key, None - - if url: - # Check if url is already an asset ID (UUID format) - if _is_asset_id(url): - thumbhash = self.coordinator.get_asset_thumbhash(url) - return self.coordinator.telegram_asset_cache, url, thumbhash - # Try to extract asset ID from URL - asset_id = _extract_asset_id_from_url(url) - if asset_id: - # Extracted asset ID uses asset cache - thumbhash = self.coordinator.get_asset_thumbhash(asset_id) - return self.coordinator.telegram_asset_cache, asset_id, thumbhash - # Fallback to URL cache with URL as key (no thumbhash) - return self.coordinator.telegram_cache, url, None - - return None, None, None - - async def _send_telegram_photo( - self, - session: Any, - token: str, - chat_id: str, - url: str | None, - caption: str | None = None, - reply_to_message_id: int | None = None, - parse_mode: str = "HTML", - max_asset_data_size: int | None = None, - send_large_photos_as_documents: bool = False, - content_type: str | None = None, - cache_key: str | None = None, - ) -> ServiceResponse: - """Send a single photo to Telegram.""" - import aiohttp - from aiohttp import FormData - - # Use provided content type or default to image/jpeg - if not content_type: - content_type = "image/jpeg" - - if not url: - return {"success": False, "error": "Missing 'url' for photo"} - - # Determine which cache to use and the cache key - effective_cache, effective_cache_key, effective_thumbhash = self._get_telegram_cache_and_key(url, cache_key) - - # Check cache for file_id - cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None - - if cached and cached.get("file_id") and effective_cache_key: - # Use cached file_id - no download needed - file_id = cached["file_id"] - _LOGGER.debug("Using cached Telegram file_id for photo (key: %s)", effective_cache_key[:36] if len(effective_cache_key) > 36 else effective_cache_key) - - payload = { - "chat_id": chat_id, - "photo": file_id, - "parse_mode": parse_mode, - } - if caption: - payload["caption"] = caption - if reply_to_message_id: - payload["reply_to_message_id"] = reply_to_message_id - - telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendPhoto" - try: - async with session.post(telegram_url, json=payload) as response: - result = await response.json() - if response.status == 200 and result.get("ok"): - return { - "success": True, - "message_id": result.get("result", {}).get("message_id"), - "cached": True, - } - else: - # Cache might be stale, fall through to upload - _LOGGER.debug("Cached file_id failed, will re-upload: %s", result.get("description")) - except aiohttp.ClientError as err: - _LOGGER.debug("Cached file_id request failed: %s", err) - - try: - # Download the photo using internal URL for faster local network transfer - download_url = self.coordinator.get_internal_download_url(url) - _LOGGER.debug("Downloading photo from %s", download_url[:80]) - async with session.get(download_url) as resp: - if resp.status != 200: - return { - "success": False, - "error": f"Failed to download photo: HTTP {resp.status}", - } - data = await resp.read() - _LOGGER.debug("Downloaded photo: %d bytes", len(data)) - - # Check if photo exceeds max size limit (user-defined limit) - if max_asset_data_size is not None and len(data) > max_asset_data_size: - _LOGGER.warning( - "Photo size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping", - len(data), max_asset_data_size - ) - return { - "success": False, - "error": f"Photo size ({len(data)} bytes) exceeds max_asset_data_size limit ({max_asset_data_size} bytes)", - "skipped": True, - } - - # Check if photo exceeds Telegram's photo limits - exceeds_limits, reason, width, height = self._check_telegram_photo_limits(data) - if exceeds_limits: - if send_large_photos_as_documents: - # Send as document instead - _LOGGER.info("Photo %s, sending as document", reason) - return await self._send_telegram_document( - session, token, chat_id, data, "photo.jpg", - caption, reply_to_message_id, parse_mode, url, None, cache_key - ) - else: - # Skip oversized photo - _LOGGER.warning("Photo %s, skipping (set send_large_photos_as_documents=true to send as document)", reason) - return { - "success": False, - "error": f"Photo {reason}", - "skipped": True, - } - - # Build multipart form - form = FormData() - form.add_field("chat_id", chat_id) - form.add_field("photo", data, filename="photo.jpg", content_type=content_type) - form.add_field("parse_mode", parse_mode) - - if caption: - form.add_field("caption", caption) - - if reply_to_message_id: - form.add_field("reply_to_message_id", str(reply_to_message_id)) - - # Send to Telegram - telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendPhoto" - - _LOGGER.debug("Uploading photo to Telegram") - async with session.post(telegram_url, data=form) as response: - result = await response.json() - _LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok")) - if response.status == 200 and result.get("ok"): - # Extract and cache file_id - photos = result.get("result", {}).get("photo", []) - if photos and effective_cache and effective_cache_key: - # Use the largest photo's file_id - file_id = photos[-1].get("file_id") - if file_id: - await effective_cache.async_set(effective_cache_key, file_id, "photo", thumbhash=effective_thumbhash) - - return { - "success": True, - "message_id": result.get("result", {}).get("message_id"), - } - else: - # Log detailed error with diagnostics - self._log_telegram_error( - error_code=result.get("error_code"), - description=result.get("description", "Unknown Telegram error"), - data=data, - media_type="photo", - ) - return { - "success": False, - "error": result.get("description", "Unknown Telegram error"), - "error_code": result.get("error_code"), - } - except aiohttp.ClientError as err: - _LOGGER.error("Telegram photo upload failed: %s", err) - return {"success": False, "error": str(err)} - - async def _send_telegram_video( - self, - session: Any, - token: str, - chat_id: str, - url: str | None, - caption: str | None = None, - reply_to_message_id: int | None = None, - parse_mode: str = "HTML", - max_asset_data_size: int | None = None, - content_type: str | None = None, - cache_key: str | None = None, - ) -> ServiceResponse: - """Send a single video to Telegram.""" - import aiohttp - from aiohttp import FormData - - # Use provided content type or default to video/mp4 - if not content_type: - content_type = "video/mp4" - - if not url: - return {"success": False, "error": "Missing 'url' for video"} - - # Determine which cache to use and the cache key - effective_cache, effective_cache_key, effective_thumbhash = self._get_telegram_cache_and_key(url, cache_key) - - # Check cache for file_id - cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None - - if cached and cached.get("file_id") and effective_cache_key: - # Use cached file_id - no download needed - file_id = cached["file_id"] - _LOGGER.debug("Using cached Telegram file_id for video (key: %s)", effective_cache_key[:36] if len(effective_cache_key) > 36 else effective_cache_key) - - payload = { - "chat_id": chat_id, - "video": file_id, - "parse_mode": parse_mode, - } - if caption: - payload["caption"] = caption - if reply_to_message_id: - payload["reply_to_message_id"] = reply_to_message_id - - telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendVideo" - try: - async with session.post(telegram_url, json=payload) as response: - result = await response.json() - if response.status == 200 and result.get("ok"): - return { - "success": True, - "message_id": result.get("result", {}).get("message_id"), - "cached": True, - } - else: - # Cache might be stale, fall through to upload - _LOGGER.debug("Cached file_id failed, will re-upload: %s", result.get("description")) - except aiohttp.ClientError as err: - _LOGGER.debug("Cached file_id request failed: %s", err) - - try: - # Download the video using internal URL for faster local network transfer - download_url = self.coordinator.get_internal_download_url(url) - _LOGGER.debug("Downloading video from %s", download_url[:80]) - async with session.get(download_url) as resp: - if resp.status != 200: - return { - "success": False, - "error": f"Failed to download video: HTTP {resp.status}", - } - data = await resp.read() - _LOGGER.debug("Downloaded video: %d bytes", len(data)) - - # Check if video exceeds max size limit (user-defined limit) - if max_asset_data_size is not None and len(data) > max_asset_data_size: - _LOGGER.warning( - "Video size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping", - len(data), max_asset_data_size - ) - return { - "success": False, - "error": f"Video size ({len(data)} bytes) exceeds max_asset_data_size limit ({max_asset_data_size} bytes)", - "skipped": True, - } - - # Check if video exceeds Telegram's upload limit (50 MB) - if len(data) > TELEGRAM_MAX_VIDEO_SIZE: - _LOGGER.warning( - "Video size (%d bytes, %.1f MB) exceeds Telegram's %d bytes (%.0f MB) upload limit, skipping", - len(data), len(data) / (1024 * 1024), - TELEGRAM_MAX_VIDEO_SIZE, TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024), - ) - return { - "success": False, - "error": f"Video size ({len(data) / (1024 * 1024):.1f} MB) exceeds Telegram's {TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024):.0f} MB upload limit", - "skipped": True, - } - - # Build multipart form - form = FormData() - form.add_field("chat_id", chat_id) - form.add_field("video", data, filename="video.mp4", content_type=content_type) - form.add_field("parse_mode", parse_mode) - - if caption: - form.add_field("caption", caption) - - if reply_to_message_id: - form.add_field("reply_to_message_id", str(reply_to_message_id)) - - # Send to Telegram - telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendVideo" - - _LOGGER.debug("Uploading video to Telegram") - async with session.post(telegram_url, data=form) as response: - result = await response.json() - _LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok")) - if response.status == 200 and result.get("ok"): - # Extract and cache file_id - video = result.get("result", {}).get("video", {}) - if video and effective_cache and effective_cache_key: - file_id = video.get("file_id") - if file_id: - await effective_cache.async_set(effective_cache_key, file_id, "video", thumbhash=effective_thumbhash) - - return { - "success": True, - "message_id": result.get("result", {}).get("message_id"), - } - else: - # Log detailed error with diagnostics - self._log_telegram_error( - error_code=result.get("error_code"), - description=result.get("description", "Unknown Telegram error"), - data=data, - media_type="video", - ) - return { - "success": False, - "error": result.get("description", "Unknown Telegram error"), - "error_code": result.get("error_code"), - } - except aiohttp.ClientError as err: - _LOGGER.error("Telegram video upload failed: %s", err) - return {"success": False, "error": str(err)} - - async def _send_telegram_document( - self, - session: Any, - token: str, - chat_id: str, - data: bytes, - filename: str = "file", - caption: str | None = None, - reply_to_message_id: int | None = None, - parse_mode: str = "HTML", - source_url: str | None = None, - content_type: str | None = None, - cache_key: str | None = None, - ) -> ServiceResponse: - """Send a file as a document to Telegram.""" - import aiohttp - import mimetypes - from aiohttp import FormData - - # Use provided content type or detect from filename - if not content_type: - content_type, _ = mimetypes.guess_type(filename) - if not content_type: - content_type = "application/octet-stream" - - # Determine which cache and key to use - effective_cache, effective_cache_key, effective_thumbhash = self._get_telegram_cache_and_key(source_url, cache_key) - - # Check cache for file_id - if effective_cache and effective_cache_key: - cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) - if cached and cached.get("file_id") and cached.get("type") == "document": - # Use cached file_id - file_id = cached["file_id"] - _LOGGER.debug("Using cached Telegram file_id for document (key: %s)", effective_cache_key[:36] if len(effective_cache_key) > 36 else effective_cache_key) - - payload = { - "chat_id": chat_id, - "document": file_id, - "parse_mode": parse_mode, - } - if caption: - payload["caption"] = caption - if reply_to_message_id: - payload["reply_to_message_id"] = reply_to_message_id - - telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendDocument" - try: - async with session.post(telegram_url, json=payload) as response: - result = await response.json() - if response.status == 200 and result.get("ok"): - return { - "success": True, - "message_id": result.get("result", {}).get("message_id"), - "cached": True, - } - else: - _LOGGER.debug("Cached file_id failed, will re-upload: %s", result.get("description")) - except aiohttp.ClientError as err: - _LOGGER.debug("Cached file_id request failed: %s", err) - - try: - # Build multipart form - form = FormData() - form.add_field("chat_id", chat_id) - form.add_field("document", data, filename=filename, content_type=content_type) - form.add_field("parse_mode", parse_mode) - - if caption: - form.add_field("caption", caption) - - if reply_to_message_id: - form.add_field("reply_to_message_id", str(reply_to_message_id)) - - # Send to Telegram - telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendDocument" - - _LOGGER.debug("Uploading document to Telegram (%d bytes, %s)", len(data), content_type) - async with session.post(telegram_url, data=form) as response: - result = await response.json() - _LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok")) - if response.status == 200 and result.get("ok"): - # Extract and cache file_id - if effective_cache_key and effective_cache: - document = result.get("result", {}).get("document", {}) - file_id = document.get("file_id") - if file_id: - await effective_cache.async_set(effective_cache_key, file_id, "document", thumbhash=effective_thumbhash) - - return { - "success": True, - "message_id": result.get("result", {}).get("message_id"), - } - else: - # Log detailed error with diagnostics - self._log_telegram_error( - error_code=result.get("error_code"), - description=result.get("description", "Unknown Telegram error"), - data=data, - media_type="document", - ) - return { - "success": False, - "error": result.get("description", "Unknown Telegram error"), - "error_code": result.get("error_code"), - } - except aiohttp.ClientError as err: - _LOGGER.error("Telegram document upload failed: %s", err) - return {"success": False, "error": str(err)} - - async def _send_telegram_media_group( - self, - session: Any, - token: str, - chat_id: str, - assets: list[dict[str, str]], - caption: str | None = None, - reply_to_message_id: int | None = None, - max_group_size: int = 10, - chunk_delay: int = 0, - parse_mode: str = "HTML", - max_asset_data_size: int | None = None, - send_large_photos_as_documents: bool = False, - ) -> ServiceResponse: - """Send media assets to Telegram as media group(s). - - If assets list exceeds max_group_size, splits into multiple media groups. - For chunks with single items, uses sendPhoto/sendVideo APIs. - Applies chunk_delay (in milliseconds) between groups if specified. - """ - import json - import asyncio - import aiohttp - from aiohttp import FormData - - # Split assets into chunks based on max_group_size - chunks = [assets[i:i + max_group_size] for i in range(0, len(assets), max_group_size)] - all_message_ids = [] - - _LOGGER.debug("Sending %d media items in %d chunk(s) of max %d items (delay: %dms)", - len(assets), len(chunks), max_group_size, chunk_delay) - - for chunk_idx, chunk in enumerate(chunks): - # Add delay before sending subsequent chunks - if chunk_idx > 0 and chunk_delay > 0: - delay_seconds = chunk_delay / 1000 - _LOGGER.debug("Waiting %dms (%ss) before sending chunk %d/%d", - chunk_delay, delay_seconds, chunk_idx + 1, len(chunks)) - await asyncio.sleep(delay_seconds) - - # Optimize: Use single-item APIs for chunks with 1 item - if len(chunk) == 1: - item = chunk[0] - media_type = item.get("type", "document") - url = item.get("url") - item_content_type = item.get("content_type") - item_cache_key = item.get("cache_key") - - # Only apply caption and reply_to to the first chunk - chunk_caption = caption if chunk_idx == 0 else None - chunk_reply_to = reply_to_message_id if chunk_idx == 0 else None - result = None - - if media_type == "photo": - _LOGGER.debug("Sending chunk %d/%d as single photo", chunk_idx + 1, len(chunks)) - result = await self._send_telegram_photo( - session, token, chat_id, url, chunk_caption, chunk_reply_to, parse_mode, - max_asset_data_size, send_large_photos_as_documents, item_content_type, item_cache_key - ) - elif media_type == "video": - _LOGGER.debug("Sending chunk %d/%d as single video", chunk_idx + 1, len(chunks)) - result = await self._send_telegram_video( - session, token, chat_id, url, chunk_caption, chunk_reply_to, parse_mode, - max_asset_data_size, item_content_type, item_cache_key - ) - else: # document - _LOGGER.debug("Sending chunk %d/%d as single document", chunk_idx + 1, len(chunks)) - if not url: - return {"success": False, "error": "Missing 'url' for document", "failed_at_chunk": chunk_idx + 1} - try: - download_url = self.coordinator.get_internal_download_url(url) - async with session.get(download_url) as resp: - if resp.status != 200: - return {"success": False, "error": f"Failed to download media: HTTP {resp.status}", "failed_at_chunk": chunk_idx + 1} - data = await resp.read() - if max_asset_data_size is not None and len(data) > max_asset_data_size: - _LOGGER.warning("Media size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping", len(data), max_asset_data_size) - continue - filename = url.split("/")[-1].split("?")[0] or "file" - result = await self._send_telegram_document( - session, token, chat_id, data, filename, chunk_caption, chunk_reply_to, parse_mode, - url, item_content_type, item_cache_key - ) - except aiohttp.ClientError as err: - return {"success": False, "error": f"Failed to download media: {err}", "failed_at_chunk": chunk_idx + 1} - - if result is None: - # Document was skipped (e.g., exceeded max_asset_data_size) - continue - - if not result.get("success"): - result["failed_at_chunk"] = chunk_idx + 1 - return result - - all_message_ids.append(result.get("message_id")) - continue - # Multi-item chunk: use sendMediaGroup - _LOGGER.debug("Sending chunk %d/%d as media group (%d items)", chunk_idx + 1, len(chunks), len(chunk)) - - # Helper to get the appropriate cache for a cache key - def get_cache_for_key(key: str, is_asset: bool | None = None) -> TelegramFileCache | None: - """Return asset cache if key is a UUID, otherwise URL cache.""" - if is_asset is None: - is_asset = _is_asset_id(key) - return self.coordinator.telegram_asset_cache if is_asset else self.coordinator.telegram_cache - - # Collect media items - either from cache (file_id) or by downloading - # Each item: (type, media_ref, filename, cache_key, is_cached, content_type) - # media_ref is either file_id (str) or data (bytes) - media_items: list[tuple[str, str | bytes, str, str, bool, str | None]] = [] - oversized_photos: list[tuple[bytes, str | None, str, str | None]] = [] # (data, caption, url, cache_key) - documents_to_send: list[tuple[bytes, str | None, str, str | None, str, str | None]] = [] # (data, caption, url, cache_key, filename, content_type) - skipped_count = 0 - - for i, item in enumerate(chunk): - url = item.get("url") - if not url: - return { - "success": False, - "error": f"Missing 'url' in item {chunk_idx * max_group_size + i}", - } - - media_type = item.get("type", "document") - item_content_type = item.get("content_type") - # Determine cache key: custom cache_key -> extracted asset ID -> URL - custom_cache_key = item.get("cache_key") - extracted_asset_id = _extract_asset_id_from_url(url) if not custom_cache_key else None - item_cache_key = custom_cache_key or extracted_asset_id or url - - if media_type not in ("photo", "video", "document"): - return { - "success": False, - "error": f"Invalid type '{media_type}' in item {chunk_idx * max_group_size + i}. Must be 'photo', 'video', or 'document'.", - } - - # Documents can't be in media groups - collect them for separate sending - if media_type == "document": - try: - download_url = self.coordinator.get_internal_download_url(url) - async with session.get(download_url) as resp: - if resp.status != 200: - return { - "success": False, - "error": f"Failed to download media {chunk_idx * max_group_size + i}: HTTP {resp.status}", - } - data = await resp.read() - if max_asset_data_size is not None and len(data) > max_asset_data_size: - _LOGGER.warning( - "Media %d size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping", - chunk_idx * max_group_size + i, len(data), max_asset_data_size - ) - skipped_count += 1 - continue - # Caption only on first item of first chunk if no media items yet - doc_caption = caption if chunk_idx == 0 and i == 0 and len(media_items) == 0 and len(documents_to_send) == 0 else None - filename = url.split("/")[-1].split("?")[0] or f"file_{i}" - documents_to_send.append((data, doc_caption, url, custom_cache_key, filename, item_content_type)) - except aiohttp.ClientError as err: - return { - "success": False, - "error": f"Failed to download media {chunk_idx * max_group_size + i}: {err}", - } - continue - - # Check cache first for photos/videos - is_asset = _is_asset_id(item_cache_key) - item_cache = get_cache_for_key(item_cache_key, is_asset) - item_thumbhash = self.coordinator.get_asset_thumbhash(item_cache_key) if is_asset else None - cached = item_cache.get(item_cache_key, thumbhash=item_thumbhash) if item_cache else None - if cached and cached.get("file_id"): - # Use cached file_id - ext = "jpg" if media_type == "photo" else "mp4" - filename = f"media_{chunk_idx * max_group_size + i}.{ext}" - media_items.append((media_type, cached["file_id"], filename, item_cache_key, True, item_content_type)) - _LOGGER.debug("Using cached file_id for media %d", chunk_idx * max_group_size + i) - continue - - try: - # Download using internal URL for faster local network transfer - download_url = self.coordinator.get_internal_download_url(url) - _LOGGER.debug("Downloading media %d from %s", chunk_idx * max_group_size + i, download_url[:80]) - async with session.get(download_url) as resp: - if resp.status != 200: - return { - "success": False, - "error": f"Failed to download media {chunk_idx * max_group_size + i}: HTTP {resp.status}", - } - data = await resp.read() - _LOGGER.debug("Downloaded media %d: %d bytes", chunk_idx * max_group_size + i, len(data)) - - # Check if media exceeds max_asset_data_size limit (user-defined limit for skipping) - if max_asset_data_size is not None and len(data) > max_asset_data_size: - _LOGGER.warning( - "Media %d size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping", - chunk_idx * max_group_size + i, len(data), max_asset_data_size - ) - skipped_count += 1 - continue - - # For videos, check Telegram upload limit (50 MB) - if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE: - _LOGGER.warning( - "Video %d size (%d bytes, %.1f MB) exceeds Telegram's %.0f MB upload limit, skipping", - chunk_idx * max_group_size + i, len(data), - len(data) / (1024 * 1024), - TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024), - ) - skipped_count += 1 - continue - - # For photos, check Telegram limits - if media_type == "photo": - exceeds_limits, reason, width, height = self._check_telegram_photo_limits(data) - if exceeds_limits: - if send_large_photos_as_documents: - # Separate this photo to send as document later - # Caption only on first item of first chunk - photo_caption = caption if chunk_idx == 0 and i == 0 and len(media_items) == 0 else None - oversized_photos.append((data, photo_caption, url, custom_cache_key)) - _LOGGER.info("Photo %d %s, will send as document", i, reason) - continue - else: - # Skip oversized photo - _LOGGER.warning("Photo %d %s, skipping (set send_large_photos_as_documents=true to send as document)", i, reason) - skipped_count += 1 - continue - - ext = "jpg" if media_type == "photo" else "mp4" - filename = f"media_{chunk_idx * max_group_size + i}.{ext}" - media_items.append((media_type, data, filename, item_cache_key, False, item_content_type)) - except aiohttp.ClientError as err: - return { - "success": False, - "error": f"Failed to download media {chunk_idx * max_group_size + i}: {err}", - } - - # Skip this chunk if all files were filtered out - if not media_items and not oversized_photos and not documents_to_send: - _LOGGER.info("Chunk %d/%d: all %d media items skipped", - chunk_idx + 1, len(chunks), len(chunk)) - continue - - # Split media items into sub-groups respecting Telegram's upload size limit - # This ensures the total upload data per sendMediaGroup call stays under 50 MB - if media_items: - media_sub_groups = _split_media_by_upload_size(media_items, TELEGRAM_MAX_VIDEO_SIZE) - if len(media_sub_groups) > 1: - _LOGGER.debug( - "Chunk %d/%d: split %d media items into %d sub-groups by upload size", - chunk_idx + 1, len(chunks), len(media_items), len(media_sub_groups), - ) - - first_caption_used = False - for sub_idx, sub_group_items in enumerate(media_sub_groups): - is_first = chunk_idx == 0 and sub_idx == 0 - sub_caption = caption if is_first and not first_caption_used and not oversized_photos else None - sub_reply_to = reply_to_message_id if is_first else None - - # Add delay between sub-groups (not before the first one) - if sub_idx > 0 and chunk_delay > 0: - await asyncio.sleep(chunk_delay / 1000) - - # Single item - use sendPhoto/sendVideo (sendMediaGroup requires 2+ items) - if len(sub_group_items) == 1: - sg_type, sg_ref, sg_fname, sg_ck, sg_cached, sg_ct = sub_group_items[0] - if sg_type == "photo": - api_method = "sendPhoto" - media_field = "photo" - else: - api_method = "sendVideo" - media_field = "video" - - try: - if sg_cached: - sg_payload: dict[str, Any] = { - "chat_id": chat_id, - media_field: sg_ref, - "parse_mode": parse_mode, - } - if sub_caption: - sg_payload["caption"] = sub_caption - if sub_reply_to: - sg_payload["reply_to_message_id"] = sub_reply_to - telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/{api_method}" - async with session.post(telegram_url, json=sg_payload) as response: - result = await response.json() - if response.status == 200 and result.get("ok"): - all_message_ids.append(result["result"].get("message_id")) - if sub_caption: - first_caption_used = True - else: - _LOGGER.debug("Cached file_id failed in sub-group, will re-upload: %s", result.get("description")) - sg_cached = False # Fall through to upload - if not sg_cached: - sg_form = FormData() - sg_form.add_field("chat_id", chat_id) - sg_content_type = sg_ct or ("image/jpeg" if sg_type == "photo" else "video/mp4") - sg_form.add_field(media_field, sg_ref, filename=sg_fname, content_type=sg_content_type) - sg_form.add_field("parse_mode", parse_mode) - if sub_caption: - sg_form.add_field("caption", sub_caption) - if sub_reply_to: - sg_form.add_field("reply_to_message_id", str(sub_reply_to)) - telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/{api_method}" - async with session.post(telegram_url, data=sg_form) as response: - result = await response.json() - if response.status == 200 and result.get("ok"): - all_message_ids.append(result["result"].get("message_id")) - if sub_caption: - first_caption_used = True - # Cache the uploaded file_id - sg_is_asset = _is_asset_id(sg_ck) - sg_cache = get_cache_for_key(sg_ck, sg_is_asset) - if sg_cache: - sg_thumbhash = self.coordinator.get_asset_thumbhash(sg_ck) if sg_is_asset else None - result_data = result.get("result", {}) - if sg_type == "photo": - photos = result_data.get("photo", []) - if photos: - await sg_cache.async_set(sg_ck, photos[-1].get("file_id"), "photo", thumbhash=sg_thumbhash) - elif sg_type == "video": - video = result_data.get("video", {}) - if video.get("file_id"): - await sg_cache.async_set(sg_ck, video["file_id"], "video", thumbhash=sg_thumbhash) - else: - self._log_telegram_error( - error_code=result.get("error_code"), - description=result.get("description", "Unknown Telegram error"), - data=sg_ref if isinstance(sg_ref, bytes) else None, - media_type=sg_type, - ) - return { - "success": False, - "error": result.get("description", "Unknown Telegram error"), - "error_code": result.get("error_code"), - "failed_at_chunk": chunk_idx + 1, - } - except aiohttp.ClientError as err: - _LOGGER.error("Telegram upload failed for sub-group %d: %s", sub_idx + 1, err) - return {"success": False, "error": str(err), "failed_at_chunk": chunk_idx + 1} - continue - - # Multiple items - use sendMediaGroup - all_cached = all(is_cached for _, _, _, _, is_cached, _ in sub_group_items) - - if all_cached: - _LOGGER.debug("Sub-group %d/%d: all %d items cached, using file_ids", - sub_idx + 1, len(media_sub_groups), len(sub_group_items)) - media_json = [] - for i, (media_type, file_id, _, _, _, _) in enumerate(sub_group_items): - media_item_json: dict[str, Any] = { - "type": media_type, - "media": file_id, - } - if i == 0 and sub_caption and not first_caption_used: - media_item_json["caption"] = sub_caption - media_item_json["parse_mode"] = parse_mode - media_json.append(media_item_json) - - payload: dict[str, Any] = { - "chat_id": chat_id, - "media": media_json, - } - if sub_reply_to: - payload["reply_to_message_id"] = sub_reply_to - - telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendMediaGroup" - try: - async with session.post(telegram_url, json=payload) as response: - result = await response.json() - if response.status == 200 and result.get("ok"): - chunk_message_ids = [ - msg.get("message_id") for msg in result.get("result", []) - ] - all_message_ids.extend(chunk_message_ids) - if sub_caption: - first_caption_used = True - else: - # Cache might be stale - fall through to upload path - _LOGGER.debug("Cached file_ids failed, will re-upload: %s", result.get("description")) - all_cached = False # Force re-upload - except aiohttp.ClientError as err: - _LOGGER.debug("Cached file_ids request failed: %s", err) - all_cached = False - - if not all_cached: - # Build multipart form with mix of cached file_ids and uploaded data - form = FormData() - form.add_field("chat_id", chat_id) - - if sub_reply_to: - form.add_field("reply_to_message_id", str(sub_reply_to)) - - # Build media JSON - use file_id for cached, attach:// for uploaded - media_json = [] - upload_idx = 0 - # (cache_key, result_idx, type, is_asset, thumbhash) - keys_to_cache: list[tuple[str, int, str, bool, str | None]] = [] - - for i, (media_type, media_ref, filename, item_cache_key, is_cached, item_content_type) in enumerate(sub_group_items): - if is_cached: - # Use file_id directly - media_item_json: dict[str, Any] = { - "type": media_type, - "media": media_ref, # file_id - } - else: - # Upload this file - attach_name = f"file{upload_idx}" - media_item_json = { - "type": media_type, - "media": f"attach://{attach_name}", - } - # Use provided content_type or default based on media type - content_type = item_content_type or ("image/jpeg" if media_type == "photo" else "video/mp4") - form.add_field(attach_name, media_ref, filename=filename, content_type=content_type) - ck_is_asset = _is_asset_id(item_cache_key) - ck_thumbhash = self.coordinator.get_asset_thumbhash(item_cache_key) if ck_is_asset else None - keys_to_cache.append((item_cache_key, i, media_type, ck_is_asset, ck_thumbhash)) - upload_idx += 1 - - if i == 0 and sub_caption and not first_caption_used: - media_item_json["caption"] = sub_caption - media_item_json["parse_mode"] = parse_mode - media_json.append(media_item_json) - - form.add_field("media", json.dumps(media_json)) - - # Send to Telegram - telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendMediaGroup" - - try: - _LOGGER.debug("Uploading media group sub-group %d/%d (%d files, %d cached) to Telegram", - sub_idx + 1, len(media_sub_groups), len(sub_group_items), len(sub_group_items) - upload_idx) - async with session.post(telegram_url, data=form) as response: - result = await response.json() - _LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok")) - if response.status == 200 and result.get("ok"): - chunk_message_ids = [ - msg.get("message_id") for msg in result.get("result", []) - ] - all_message_ids.extend(chunk_message_ids) - if sub_caption: - first_caption_used = True - - # Cache the newly uploaded file_ids (batched per cache instance) - if keys_to_cache: - result_messages = result.get("result", []) - # Group entries by cache instance for batch writes - cache_batches: dict[int, tuple[TelegramFileCache, list[tuple[str, str, str, str | None]]]] = {} - for ck, result_idx, m_type, ck_is_asset, ck_thumbhash in keys_to_cache: - ck_cache = get_cache_for_key(ck, ck_is_asset) - if result_idx >= len(result_messages) or not ck_cache: - continue - msg = result_messages[result_idx] - file_id = None - if m_type == "photo": - photos = msg.get("photo", []) - if photos: - file_id = photos[-1].get("file_id") - elif m_type == "video": - video = msg.get("video", {}) - file_id = video.get("file_id") - if file_id: - cache_id = id(ck_cache) - if cache_id not in cache_batches: - cache_batches[cache_id] = (ck_cache, []) - cache_batches[cache_id][1].append((ck, file_id, m_type, ck_thumbhash)) - for ck_cache, batch_entries in cache_batches.values(): - await ck_cache.async_set_many(batch_entries) - else: - # Log detailed error for media group with total size info - uploaded_data = [m for m in sub_group_items if not m[4]] - total_size = sum(len(d) for _, d, _, _, _, _ in uploaded_data if isinstance(d, bytes)) - _LOGGER.error( - "Telegram API error for sub-group %d/%d: %s | Media count: %d | Uploaded size: %d bytes (%.2f MB)", - sub_idx + 1, len(media_sub_groups), - result.get("description", "Unknown Telegram error"), - len(sub_group_items), - total_size, - total_size / (1024 * 1024) if total_size else 0 - ) - # Log detailed diagnostics for the first photo in the group - for media_type, media_ref, _, _, is_cached, _ in sub_group_items: - if media_type == "photo" and not is_cached and isinstance(media_ref, bytes): - self._log_telegram_error( - error_code=result.get("error_code"), - description=result.get("description", "Unknown Telegram error"), - data=media_ref, - media_type="photo", - ) - break # Only log details for first photo - return { - "success": False, - "error": result.get("description", "Unknown Telegram error"), - "error_code": result.get("error_code"), - "failed_at_chunk": chunk_idx + 1, - } - except aiohttp.ClientError as err: - _LOGGER.error("Telegram upload failed for sub-group %d: %s", sub_idx + 1, err) - return { - "success": False, - "error": str(err), - "failed_at_chunk": chunk_idx + 1, - } - - # Send oversized photos as documents - for i, (data, photo_caption, photo_url, photo_cache_key) in enumerate(oversized_photos): - _LOGGER.debug("Sending oversized photo %d/%d as document", i + 1, len(oversized_photos)) - result = await self._send_telegram_document( - session, token, chat_id, data, f"photo_{i}.jpg", - photo_caption, None, parse_mode, photo_url, None, photo_cache_key - ) - if result.get("success"): - all_message_ids.append(result.get("message_id")) - else: - _LOGGER.error("Failed to send oversized photo as document: %s", result.get("error")) - # Continue with other photos even if one fails - - # Send documents (can't be in media groups) - for i, (data, doc_caption, doc_url, doc_cache_key, filename, doc_content_type) in enumerate(documents_to_send): - _LOGGER.debug("Sending document %d/%d", i + 1, len(documents_to_send)) - result = await self._send_telegram_document( - session, token, chat_id, data, filename, - doc_caption, None, parse_mode, doc_url, doc_content_type, doc_cache_key - ) - if result.get("success"): - all_message_ids.append(result.get("message_id")) - else: - _LOGGER.error("Failed to send document: %s", result.get("error")) - # Continue with other documents even if one fails - - return { - "success": True, - "message_ids": all_message_ids, - "chunks_sent": len(chunks), - } + # Create core TelegramClient with HA-managed session and coordinator caches + telegram = TelegramClient( + session, + token, + url_cache=self.coordinator.telegram_cache, + asset_cache=self.coordinator.telegram_asset_cache, + url_resolver=self.coordinator.get_internal_download_url, + thumbhash_resolver=self.coordinator.get_asset_thumbhash, + ) + + return await telegram.send_notification( + chat_id=chat_id, + assets=assets, + caption=caption, + reply_to_message_id=reply_to_message_id, + disable_web_page_preview=disable_web_page_preview, + parse_mode=parse_mode, + max_group_size=max_group_size, + chunk_delay=chunk_delay, + max_asset_data_size=max_asset_data_size, + send_large_photos_as_documents=send_large_photos_as_documents, + chat_action=chat_action, + ) class ImmichAlbumIdSensor(ImmichAlbumBaseSensor): @@ -1759,41 +395,29 @@ class ImmichAlbumIdSensor(ImmichAlbumBaseSensor): _attr_icon = "mdi:identifier" _attr_translation_key = "album_id" - def __init__( - self, - coordinator: ImmichAlbumWatcherCoordinator, - entry: ConfigEntry, - subentry: ConfigSubentry, - ) -> None: - """Initialize the sensor.""" + def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_album_id" @property def native_value(self) -> str | None: - """Return the album ID.""" if self._album_data: return self._album_data.id return None @property def extra_state_attributes(self) -> dict[str, Any]: - """Return extra state attributes.""" if not self._album_data: return {} - attrs: dict[str, Any] = { ATTR_ALBUM_NAME: self._album_data.name, ATTR_ASSET_COUNT: self._album_data.asset_count, ATTR_LAST_UPDATED: self._album_data.updated_at, ATTR_CREATED_AT: self._album_data.created_at, } - - # Primary share URL (prefers public, falls back to protected) share_url = self.coordinator.get_any_url() if share_url: attrs["share_url"] = share_url - return attrs @@ -1804,29 +428,20 @@ class ImmichAlbumAssetCountSensor(ImmichAlbumBaseSensor): _attr_icon = "mdi:image-album" _attr_translation_key = "album_asset_count" - def __init__( - self, - coordinator: ImmichAlbumWatcherCoordinator, - entry: ConfigEntry, - subentry: ConfigSubentry, - ) -> None: - """Initialize the sensor.""" + def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_asset_count" @property def native_value(self) -> int | None: - """Return the state of the sensor (asset count).""" if self._album_data: return self._album_data.asset_count return None @property def extra_state_attributes(self) -> dict[str, Any]: - """Return extra state attributes.""" if not self._album_data: return {} - attrs = { ATTR_ALBUM_ID: self._album_data.id, ATTR_ASSET_COUNT: self._album_data.asset_count, @@ -1838,13 +453,11 @@ class ImmichAlbumAssetCountSensor(ImmichAlbumBaseSensor): ATTR_OWNER: self._album_data.owner, ATTR_PEOPLE: list(self._album_data.people), } - if self._album_data.thumbnail_asset_id: attrs[ATTR_THUMBNAIL_URL] = ( f"{self.coordinator.immich_url}/api/assets/" f"{self._album_data.thumbnail_asset_id}/thumbnail" ) - return attrs @@ -1855,19 +468,12 @@ class ImmichAlbumPhotoCountSensor(ImmichAlbumBaseSensor): _attr_icon = "mdi:image" _attr_translation_key = "album_photo_count" - def __init__( - self, - coordinator: ImmichAlbumWatcherCoordinator, - entry: ConfigEntry, - subentry: ConfigSubentry, - ) -> None: - """Initialize the sensor.""" + def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_photo_count" @property def native_value(self) -> int | None: - """Return the state of the sensor (photo count).""" if self._album_data: return self._album_data.photo_count return None @@ -1880,19 +486,12 @@ class ImmichAlbumVideoCountSensor(ImmichAlbumBaseSensor): _attr_icon = "mdi:video" _attr_translation_key = "album_video_count" - def __init__( - self, - coordinator: ImmichAlbumWatcherCoordinator, - entry: ConfigEntry, - subentry: ConfigSubentry, - ) -> None: - """Initialize the sensor.""" + def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_video_count" @property def native_value(self) -> int | None: - """Return the state of the sensor (video count).""" if self._album_data: return self._album_data.video_count return None @@ -1905,19 +504,12 @@ class ImmichAlbumLastUpdatedSensor(ImmichAlbumBaseSensor): _attr_icon = "mdi:clock-outline" _attr_translation_key = "album_last_updated" - def __init__( - self, - coordinator: ImmichAlbumWatcherCoordinator, - entry: ConfigEntry, - subentry: ConfigSubentry, - ) -> None: - """Initialize the sensor.""" + def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_last_updated" @property def native_value(self) -> datetime | None: - """Return the state of the sensor (last updated datetime).""" if self._album_data and self._album_data.updated_at: try: return datetime.fromisoformat( @@ -1935,19 +527,12 @@ class ImmichAlbumCreatedSensor(ImmichAlbumBaseSensor): _attr_icon = "mdi:calendar-plus" _attr_translation_key = "album_created" - def __init__( - self, - coordinator: ImmichAlbumWatcherCoordinator, - entry: ConfigEntry, - subentry: ConfigSubentry, - ) -> None: - """Initialize the sensor.""" + def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_created" @property def native_value(self) -> datetime | None: - """Return the state of the sensor (creation datetime).""" if self._album_data and self._album_data.created_at: try: return datetime.fromisoformat( @@ -1964,42 +549,30 @@ class ImmichAlbumPublicUrlSensor(ImmichAlbumBaseSensor): _attr_icon = "mdi:link-variant" _attr_translation_key = "album_public_url" - def __init__( - self, - coordinator: ImmichAlbumWatcherCoordinator, - entry: ConfigEntry, - subentry: ConfigSubentry, - ) -> None: - """Initialize the sensor.""" + def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_public_url" @property def native_value(self) -> str | None: - """Return the state of the sensor (public URL).""" if self._album_data: return self.coordinator.get_public_url() return None @property def extra_state_attributes(self) -> dict[str, Any]: - """Return extra state attributes.""" if not self._album_data: return {} - attrs = { ATTR_ALBUM_ID: self._album_data.id, ATTR_SHARED: self._album_data.shared, } - all_urls = self.coordinator.get_public_urls() if len(all_urls) > 1: attrs[ATTR_ALBUM_URLS] = all_urls - links_info = self.coordinator.get_shared_links_info() if links_info: attrs["shared_links"] = links_info - return attrs @@ -2009,37 +582,24 @@ class ImmichAlbumProtectedUrlSensor(ImmichAlbumBaseSensor): _attr_icon = "mdi:link-lock" _attr_translation_key = "album_protected_url" - def __init__( - self, - coordinator: ImmichAlbumWatcherCoordinator, - entry: ConfigEntry, - subentry: ConfigSubentry, - ) -> None: - """Initialize the sensor.""" + def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_protected_url" @property def native_value(self) -> str | None: - """Return the state of the sensor (protected URL).""" if self._album_data: return self.coordinator.get_protected_url() return None @property def extra_state_attributes(self) -> dict[str, Any]: - """Return extra state attributes.""" if not self._album_data: return {} - - attrs = { - ATTR_ALBUM_ID: self._album_data.id, - } - + attrs = {ATTR_ALBUM_ID: self._album_data.id} all_urls = self.coordinator.get_protected_urls() if len(all_urls) > 1: attrs["protected_urls"] = all_urls - return attrs @@ -2049,29 +609,20 @@ class ImmichAlbumProtectedPasswordSensor(ImmichAlbumBaseSensor): _attr_icon = "mdi:key" _attr_translation_key = "album_protected_password" - def __init__( - self, - coordinator: ImmichAlbumWatcherCoordinator, - entry: ConfigEntry, - subentry: ConfigSubentry, - ) -> None: - """Initialize the sensor.""" + def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_protected_password" @property def native_value(self) -> str | None: - """Return the state of the sensor (protected link password).""" if self._album_data: return self.coordinator.get_protected_password() return None @property def extra_state_attributes(self) -> dict[str, Any]: - """Return extra state attributes.""" if not self._album_data: return {} - return { ATTR_ALBUM_ID: self._album_data.id, ATTR_ALBUM_PROTECTED_URL: self.coordinator.get_protected_url(), diff --git a/custom_components/immich_album_watcher/storage.py b/custom_components/immich_album_watcher/storage.py index f19eb8b..486498f 100644 --- a/custom_components/immich_album_watcher/storage.py +++ b/custom_components/immich_album_watcher/storage.py @@ -9,17 +9,51 @@ from typing import Any from homeassistant.core import HomeAssistant from homeassistant.helpers.storage import Store +from immich_watcher_core.notifications.queue import ( + NotificationQueue as CoreNotificationQueue, +) +from immich_watcher_core.telegram.cache import TelegramFileCache as CoreTelegramFileCache + _LOGGER = logging.getLogger(__name__) STORAGE_VERSION = 1 STORAGE_KEY_PREFIX = "immich_album_watcher" -# Default TTL for Telegram file_id cache (48 hours in seconds) -DEFAULT_TELEGRAM_CACHE_TTL = 48 * 60 * 60 + +class HAStorageBackend: + """Home Assistant storage backend adapter. + + Wraps homeassistant.helpers.storage.Store to satisfy the + StorageBackend protocol from immich_watcher_core. + """ + + def __init__(self, hass: HomeAssistant, key: str) -> None: + """Initialize with HA store. + + Args: + hass: Home Assistant instance + key: Storage key (e.g. "immich_album_watcher.telegram_cache.xxx") + """ + self._store: Store[dict[str, Any]] = Store(hass, STORAGE_VERSION, key) + + async def load(self) -> dict[str, Any] | None: + """Load data from HA storage.""" + return await self._store.async_load() + + async def save(self, data: dict[str, Any]) -> None: + """Save data to HA storage.""" + await self._store.async_save(data) + + async def remove(self) -> None: + """Remove all stored data.""" + await self._store.async_remove() class ImmichAlbumStorage: - """Handles persistence of album state across restarts.""" + """Handles persistence of album state across restarts. + + This remains HA-native as it manages HA-specific album tracking state. + """ def __init__(self, hass: HomeAssistant, entry_id: str) -> None: """Initialize the storage.""" @@ -68,260 +102,40 @@ class ImmichAlbumStorage: self._data = None -class TelegramFileCache: - """Cache for Telegram file_ids to avoid re-uploading media. +# Convenience factory functions for creating core classes with HA backends - When a file is uploaded to Telegram, it returns a file_id that can be reused - to send the same file without re-uploading. This cache stores these file_ids - keyed by the source URL or asset ID. - Supports two validation modes: - - TTL mode (default): entries expire after a configured time-to-live - - Thumbhash mode: entries are validated by comparing stored thumbhash with - the current asset thumbhash from Immich +def create_telegram_cache( + hass: HomeAssistant, + entry_id: str, + ttl_seconds: int = 48 * 60 * 60, + use_thumbhash: bool = False, +) -> CoreTelegramFileCache: + """Create a TelegramFileCache with HA storage backend. + + Args: + hass: Home Assistant instance + entry_id: Config entry ID for scoping + ttl_seconds: TTL for cache entries (TTL mode only) + use_thumbhash: Use thumbhash validation instead of TTL """ - - def __init__( - self, - hass: HomeAssistant, - entry_id: str, - ttl_seconds: int = DEFAULT_TELEGRAM_CACHE_TTL, - use_thumbhash: bool = False, - ) -> None: - """Initialize the Telegram file cache. - - Args: - hass: Home Assistant instance - entry_id: Config entry ID for scoping the cache (per hub) - ttl_seconds: Time-to-live for cache entries in seconds (TTL mode only) - use_thumbhash: Use thumbhash-based validation instead of TTL - """ - self._store: Store[dict[str, Any]] = Store( - hass, STORAGE_VERSION, f"{STORAGE_KEY_PREFIX}.telegram_cache.{entry_id}" - ) - self._data: dict[str, Any] | None = None - self._ttl_seconds = ttl_seconds - self._use_thumbhash = use_thumbhash - - async def async_load(self) -> None: - """Load cache data from storage.""" - self._data = await self._store.async_load() or {"files": {}} - # Clean up expired entries on load (TTL mode only) - await self._cleanup_expired() - mode = "thumbhash" if self._use_thumbhash else "TTL" - _LOGGER.debug( - "Loaded Telegram file cache with %d entries (mode: %s)", - len(self._data.get("files", {})), - mode, - ) - - # Maximum number of entries to keep in thumbhash mode to prevent unbounded growth - THUMBHASH_MAX_ENTRIES = 2000 - - async def _cleanup_expired(self) -> None: - """Remove expired cache entries (TTL mode) or trim old entries (thumbhash mode).""" - if self._use_thumbhash: - files = self._data.get("files", {}) if self._data else {} - if len(files) > self.THUMBHASH_MAX_ENTRIES: - sorted_keys = sorted( - files, key=lambda k: files[k].get("cached_at", "") - ) - keys_to_remove = sorted_keys[: len(files) - self.THUMBHASH_MAX_ENTRIES] - for key in keys_to_remove: - del files[key] - await self._store.async_save(self._data) - _LOGGER.debug( - "Trimmed thumbhash cache from %d to %d entries", - len(keys_to_remove) + self.THUMBHASH_MAX_ENTRIES, - self.THUMBHASH_MAX_ENTRIES, - ) - return - - if not self._data or "files" not in self._data: - return - - now = datetime.now(timezone.utc) - expired_keys = [] - - for url, entry in self._data["files"].items(): - cached_at_str = entry.get("cached_at") - if cached_at_str: - cached_at = datetime.fromisoformat(cached_at_str) - age_seconds = (now - cached_at).total_seconds() - if age_seconds > self._ttl_seconds: - expired_keys.append(url) - - if expired_keys: - for key in expired_keys: - del self._data["files"][key] - await self._store.async_save(self._data) - _LOGGER.debug("Cleaned up %d expired Telegram cache entries", len(expired_keys)) - - def get(self, key: str, thumbhash: str | None = None) -> dict[str, Any] | None: - """Get cached file_id for a key. - - Args: - key: The cache key (URL or asset ID) - thumbhash: Current thumbhash for validation (thumbhash mode only). - If provided, compares with stored thumbhash. Mismatch = cache miss. - - Returns: - Dict with 'file_id' and 'type' if cached and valid, None otherwise - """ - if not self._data or "files" not in self._data: - return None - - entry = self._data["files"].get(key) - if not entry: - return None - - if self._use_thumbhash: - # Thumbhash-based validation - if thumbhash is not None: - stored_thumbhash = entry.get("thumbhash") - if stored_thumbhash and stored_thumbhash != thumbhash: - _LOGGER.debug( - "Cache miss for %s: thumbhash changed, removing stale entry", - key[:36], - ) - del self._data["files"][key] - return None - # If no thumbhash provided (asset not in monitored album), - # return cached entry anyway — self-heals on Telegram rejection - else: - # TTL-based validation - cached_at_str = entry.get("cached_at") - if cached_at_str: - cached_at = datetime.fromisoformat(cached_at_str) - age_seconds = (datetime.now(timezone.utc) - cached_at).total_seconds() - if age_seconds > self._ttl_seconds: - return None - - return { - "file_id": entry.get("file_id"), - "type": entry.get("type"), - } - - async def async_set( - self, key: str, file_id: str, media_type: str, thumbhash: str | None = None - ) -> None: - """Store a file_id for a key. - - Args: - key: The cache key (URL or asset ID) - file_id: The Telegram file_id - media_type: The type of media ('photo', 'video', 'document') - thumbhash: Current thumbhash to store alongside file_id (thumbhash mode only) - """ - if self._data is None: - self._data = {"files": {}} - - entry_data: dict[str, Any] = { - "file_id": file_id, - "type": media_type, - "cached_at": datetime.now(timezone.utc).isoformat(), - } - if thumbhash is not None: - entry_data["thumbhash"] = thumbhash - - self._data["files"][key] = entry_data - await self._store.async_save(self._data) - _LOGGER.debug("Cached Telegram file_id for key (type: %s)", media_type) - - async def async_set_many( - self, entries: list[tuple[str, str, str, str | None]] - ) -> None: - """Store multiple file_ids in a single disk write. - - Args: - entries: List of (key, file_id, media_type, thumbhash) tuples - """ - if not entries: - return - - if self._data is None: - self._data = {"files": {}} - - now_iso = datetime.now(timezone.utc).isoformat() - for key, file_id, media_type, thumbhash in entries: - entry_data: dict[str, Any] = { - "file_id": file_id, - "type": media_type, - "cached_at": now_iso, - } - if thumbhash is not None: - entry_data["thumbhash"] = thumbhash - self._data["files"][key] = entry_data - - await self._store.async_save(self._data) - _LOGGER.debug("Batch cached %d Telegram file_ids", len(entries)) - - async def async_remove(self) -> None: - """Remove all cache data.""" - await self._store.async_remove() - self._data = None + suffix = f"_assets" if use_thumbhash else "" + backend = HAStorageBackend( + hass, f"{STORAGE_KEY_PREFIX}.telegram_cache.{entry_id}{suffix}" + ) + return CoreTelegramFileCache(backend, ttl_seconds=ttl_seconds, use_thumbhash=use_thumbhash) -class NotificationQueue: - """Persistent queue for notifications deferred during quiet hours. +def create_notification_queue( + hass: HomeAssistant, entry_id: str +) -> CoreNotificationQueue: + """Create a NotificationQueue with HA storage backend.""" + backend = HAStorageBackend( + hass, f"{STORAGE_KEY_PREFIX}.notification_queue.{entry_id}" + ) + return CoreNotificationQueue(backend) - Stores full service call parameters so notifications can be replayed - exactly as they were originally called. - """ - def __init__(self, hass: HomeAssistant, entry_id: str) -> None: - """Initialize the notification queue.""" - self._store: Store[dict[str, Any]] = Store( - hass, STORAGE_VERSION, f"{STORAGE_KEY_PREFIX}.notification_queue.{entry_id}" - ) - self._data: dict[str, Any] | None = None - - async def async_load(self) -> None: - """Load queue data from storage.""" - self._data = await self._store.async_load() or {"queue": []} - _LOGGER.debug( - "Loaded notification queue with %d items", - len(self._data.get("queue", [])), - ) - - async def async_enqueue(self, notification_params: dict[str, Any]) -> None: - """Add a notification to the queue.""" - if self._data is None: - self._data = {"queue": []} - - self._data["queue"].append({ - "params": notification_params, - "queued_at": datetime.now(timezone.utc).isoformat(), - }) - await self._store.async_save(self._data) - _LOGGER.debug("Queued notification during quiet hours (total: %d)", len(self._data["queue"])) - - def get_all(self) -> list[dict[str, Any]]: - """Get all queued notifications.""" - if not self._data: - return [] - return list(self._data.get("queue", [])) - - def has_pending(self) -> bool: - """Check if there are pending notifications.""" - return bool(self._data and self._data.get("queue")) - - async def async_remove_indices(self, indices: list[int]) -> None: - """Remove specific items by index (indices must be in descending order).""" - if not self._data or not indices: - return - for idx in indices: - if 0 <= idx < len(self._data["queue"]): - del self._data["queue"][idx] - await self._store.async_save(self._data) - - async def async_clear(self) -> None: - """Clear all queued notifications.""" - if self._data: - self._data["queue"] = [] - await self._store.async_save(self._data) - - async def async_remove(self) -> None: - """Remove all queue data.""" - await self._store.async_remove() - self._data = None +# Re-export core types for backward compatibility +TelegramFileCache = CoreTelegramFileCache +NotificationQueue = CoreNotificationQueue diff --git a/plans/phase-2-haos-refactor.md b/plans/phase-2-haos-refactor.md new file mode 100644 index 0000000..dd6d2a2 --- /dev/null +++ b/plans/phase-2-haos-refactor.md @@ -0,0 +1,75 @@ +# Phase 2: Wire Core Library into HAOS Integration + +**Status**: In progress +**Parent**: [primary-plan.md](primary-plan.md) + +--- + +## Goal + +Refactor the HAOS integration to delegate to `immich-watcher-core` for all HA-independent logic, reducing duplication and preparing for the standalone server. + +--- + +## Important: HACS Compatibility + +HACS requires `custom_components//` at the repository root. We **cannot** move it to `packages/haos/`. Instead: + +- `custom_components/` stays at repo root +- The integration imports from `immich_watcher_core` (the core library) +- `manifest.json` lists `immich-watcher-core` in `requirements` (for future PyPI publish) +- During development, `pip install -e packages/core` makes imports work +- For HACS distribution, we'll publish the core to PyPI + +--- + +## Tasks + +### 1. Update manifest.json `[ ]` +- Add `immich-watcher-core` to requirements list +- Do NOT bump version (only plans/core changed, not integration content yet) + +### 2. Refactor const.py `[ ]` +- Import shared constants from `immich_watcher_core.constants` +- Keep HA-specific constants (DOMAIN, CONF_*, PLATFORMS, SERVICE_*) local +- Re-export shared constants for backward compatibility with other integration files + +### 3. Refactor storage.py `[ ]` +- Create `HAStorageBackend` adapter wrapping `homeassistant.helpers.storage.Store` + that satisfies `StorageBackend` protocol from core +- Replace `TelegramFileCache` with core's version using `HAStorageBackend` +- Replace `NotificationQueue` with core's version using `HAStorageBackend` +- Keep `ImmichAlbumStorage` as-is (HA-specific album state management) + +### 4. Refactor coordinator.py `[ ]` +- Remove dataclass definitions (SharedLinkInfo, AssetInfo, AlbumData, AlbumChange) — import from core +- Replace Immich API methods with `ImmichClient` from core +- Replace `_detect_change()` with `detect_album_changes()` from core +- Replace `_build_asset_detail()` and URL helpers with `asset_utils` from core +- Replace `async_get_assets()` filtering/sorting with `filter_assets()` + `sort_assets()` from core +- Keep HA-specific: `DataUpdateCoordinator` subclass, `_fire_events()`, `async_get_clientsession()` + +### 5. Refactor sensor.py `[ ]` +- Remove Telegram constants, helper functions, and all `_send_telegram_*` methods +- Replace with `TelegramClient` from core in `_execute_telegram_notification()` +- Keep HA-specific: entity classes, service registration, platform setup + +### 6. Update __init__.py `[ ]` +- Update imports for new storage classes (HAStorageBackend adapter) +- Create TelegramFileCache instances using core class + HA adapter + +### 7. Verify `[ ]` +- All existing entities, services, and events work identically +- Telegram notifications work with caching +- Quiet hours queueing works +- No HA import in core library (verify with grep) + +--- + +## Acceptance Criteria + +- [ ] Integration imports and delegates to `immich_watcher_core` +- [ ] Zero behavior changes for end users +- [ ] No duplicated logic between core and integration +- [ ] Core library has no HA imports (verified) +- [ ] `ImmichAlbumStorage` is the only storage class still HA-native diff --git a/plans/primary-plan.md b/plans/primary-plan.md index 7a1ca6e..b9f6f47 100644 --- a/plans/primary-plan.md +++ b/plans/primary-plan.md @@ -179,7 +179,7 @@ async def _execute_telegram_notification(self, ...): - Write unit tests for all extracted modules - **Subplan**: `plans/phase-1-core-library.md` -### Phase 2: Wire Core into HAOS Integration `[ ]` +### Phase 2: Wire Core into HAOS Integration `[x]` - Move integration to `packages/haos/` - Refactor coordinator.py, sensor.py, storage.py to use core library - Update manifest.json, hacs.json for new structure