From dd7032b4115e285a4129915b0f14c1854b1f39d0 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Mon, 16 Feb 2026 12:28:33 +0300 Subject: [PATCH] Replace TTL with thumbhash-based cache validation and add Telegram video size limits - Asset cache now validates entries by comparing stored thumbhash with current Immich thumbhash instead of using TTL expiration. This makes cache invalidation precise (only when content actually changes) and eliminates unnecessary re-uploads. URL-based cache retains TTL for non-Immich URLs. - Add TELEGRAM_MAX_VIDEO_SIZE (50 MB) check to skip oversized videos in both single-video and media-group paths, preventing entire groups from failing. - Split media groups into sub-groups by cumulative upload size to ensure each sendMediaGroup request stays under Telegram's 50 MB upload limit. Co-Authored-By: Claude Opus 4.6 --- README.md | 51 +- .../immich_album_watcher/__init__.py | 23 +- .../immich_album_watcher/coordinator.py | 18 +- .../immich_album_watcher/manifest.json | 2 +- .../immich_album_watcher/sensor.py | 697 +++++++++++++----- .../immich_album_watcher/services.yaml | 6 +- .../immich_album_watcher/storage.py | 85 ++- .../immich_album_watcher/translations/en.json | 6 +- .../immich_album_watcher/translations/ru.json | 6 +- 9 files changed, 637 insertions(+), 257 deletions(-) diff --git a/README.md b/README.md index 3d75e1e..68ee558 100644 --- a/README.md +++ b/README.md @@ -334,15 +334,29 @@ data: Send notifications to Telegram. Supports multiple formats: -- **Text message** - When `urls` is empty or not provided -- **Single document** - When `urls` contains one document (default type) -- **Single photo** - When `urls` contains one photo (`type: photo`) -- **Single video** - When `urls` contains one video (`type: video`) -- **Media group** - When `urls` contains multiple photos/videos (documents are sent separately) +- **Text message** - When `assets` is empty or not provided +- **Single document** - When `assets` contains one document (default type) +- **Single photo** - When `assets` contains one photo (`type: photo`) +- **Single video** - When `assets` contains one video (`type: video`) +- **Media group** - When `assets` contains multiple photos/videos (documents are sent separately) The service downloads media from Immich and uploads it to Telegram, bypassing any CORS restrictions. Large lists of photos and videos are automatically split into multiple media groups based on the `max_group_size` parameter (default: 10 items per group). Documents cannot be grouped and are sent individually. -**File ID Caching:** When media is uploaded to Telegram, the service caches the returned `file_id`. Subsequent sends of the same media will use the cached `file_id` instead of re-uploading, significantly improving performance. The cache TTL is configurable in hub options (default: 48 hours, range: 1-168 hours). The cache is persistent across Home Assistant restarts and is stored per album. +**File ID Caching:** When media is uploaded to Telegram, the service caches the returned `file_id`. Subsequent sends of the same media will use the cached `file_id` instead of re-uploading, significantly improving performance. The cache TTL is configurable in hub options (default: 48 hours, range: 1-168 hours). The cache is persistent across Home Assistant restarts and is shared across all albums in the hub. + +**Dual Cache System:** The integration maintains two separate caches for optimal performance: + +- **Asset ID Cache** - For Immich assets with extractable asset IDs (UUIDs). The same asset accessed via different URL types (thumbnail, original, video playback, share links) shares the same cache entry. +- **URL Cache** - For non-Immich URLs or URLs without extractable asset IDs. Also used when a custom `cache_key` is provided. + +**Smart Cache Keys:** The service automatically extracts asset IDs from Immich URLs. Supported URL patterns: + +- `/api/assets/{asset_id}/original` +- `/api/assets/{asset_id}/thumbnail` +- `/api/assets/{asset_id}/video/playback` +- `/share/{key}/photos/{asset_id}` + +You can provide a custom `cache_key` per asset to override this behavior (stored in URL cache). **Examples:** @@ -366,7 +380,7 @@ target: entity_id: sensor.album_name_asset_limit data: chat_id: "-1001234567890" - urls: + assets: - url: "https://immich.example.com/api/assets/xxx/original?key=yyy" content_type: "image/heic" # Optional: explicit MIME type caption: "Original file" @@ -380,7 +394,7 @@ target: entity_id: sensor.album_name_asset_limit data: chat_id: "-1001234567890" - urls: + assets: - url: "https://immich.example.com/api/assets/xxx/thumbnail?key=yyy" type: photo caption: "Beautiful sunset!" @@ -394,7 +408,7 @@ target: entity_id: sensor.album_name_asset_limit data: chat_id: "-1001234567890" - urls: + assets: - url: "https://immich.example.com/api/assets/xxx/thumbnail?key=yyy" type: photo - url: "https://immich.example.com/api/assets/zzz/video/playback?key=yyy" @@ -426,17 +440,32 @@ target: entity_id: sensor.album_name_asset_limit data: chat_id: "-1001234567890" - urls: + assets: - url: "https://immich.example.com/api/assets/xxx/thumbnail?key=yyy" type: photo caption: "Quick notification" wait_for_response: false # Automation continues immediately ``` +Using custom cache_key (useful when same media has different URLs): + +```yaml +service: immich_album_watcher.send_telegram_notification +target: + entity_id: sensor.album_name_asset_limit +data: + chat_id: "-1001234567890" + assets: + - url: "https://immich.example.com/api/assets/xxx/thumbnail?key=yyy" + type: photo + cache_key: "asset_xxx" # Custom key for caching instead of URL + caption: "Photo with custom cache key" +``` + | Field | Description | Required | |-------|-------------|----------| | `chat_id` | Telegram chat ID to send to | Yes | -| `urls` | List of media items with `url`, optional `type` (document/photo/video, default: document), and optional `content_type` (MIME type, e.g., `image/jpeg`). Empty for text message. Photos and videos can be grouped; documents are sent separately. | No | +| `assets` | List of media items with `url`, optional `type` (document/photo/video, default: document), optional `content_type` (MIME type, e.g., `image/jpeg`), and optional `cache_key` (custom key for caching). Empty for text message. Photos and videos can be grouped; documents are sent separately. | No | | `bot_token` | Telegram bot token (uses configured token if not provided) | No | | `caption` | For media: caption applied to first item. For text: the message text. Supports HTML formatting by default. | No | | `reply_to_message_id` | Message ID to reply to | No | diff --git a/custom_components/immich_album_watcher/__init__.py b/custom_components/immich_album_watcher/__init__.py index cb9aa54..cb78544 100644 --- a/custom_components/immich_album_watcher/__init__.py +++ b/custom_components/immich_album_watcher/__init__.py @@ -73,11 +73,25 @@ async def async_setup_entry(hass: HomeAssistant, entry: ImmichConfigEntry) -> bo storage = ImmichAlbumStorage(hass, entry.entry_id) await storage.async_load() + # Create and load Telegram file caches once per hub (shared across all albums) + # TTL is in hours from config, convert to seconds + cache_ttl_seconds = telegram_cache_ttl * 60 * 60 + # URL-based cache for non-Immich URLs or URLs without extractable asset IDs + telegram_cache = TelegramFileCache(hass, entry.entry_id, ttl_seconds=cache_ttl_seconds) + await telegram_cache.async_load() + # Asset ID-based cache for Immich URLs — uses thumbhash validation instead of TTL + telegram_asset_cache = TelegramFileCache( + hass, f"{entry.entry_id}_assets", use_thumbhash=True + ) + await telegram_asset_cache.async_load() + # Store hub reference hass.data[DOMAIN][entry.entry_id] = { "hub": entry.runtime_data, "subentries": {}, "storage": storage, + "telegram_cache": telegram_cache, + "telegram_asset_cache": telegram_asset_cache, } # Track loaded subentries to detect changes @@ -109,15 +123,11 @@ async def _async_setup_subentry_coordinator( album_id = subentry.data[CONF_ALBUM_ID] album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album") storage: ImmichAlbumStorage = hass.data[DOMAIN][entry.entry_id]["storage"] + telegram_cache: TelegramFileCache = hass.data[DOMAIN][entry.entry_id]["telegram_cache"] + telegram_asset_cache: TelegramFileCache = hass.data[DOMAIN][entry.entry_id]["telegram_asset_cache"] _LOGGER.debug("Setting up coordinator for album: %s (%s)", album_name, album_id) - # Create and load Telegram file cache for this album - # TTL is in hours from config, convert to seconds - cache_ttl_seconds = hub_data.telegram_cache_ttl * 60 * 60 - telegram_cache = TelegramFileCache(hass, album_id, ttl_seconds=cache_ttl_seconds) - await telegram_cache.async_load() - # Create coordinator for this album coordinator = ImmichAlbumWatcherCoordinator( hass, @@ -129,6 +139,7 @@ async def _async_setup_subentry_coordinator( hub_name=hub_data.name, storage=storage, telegram_cache=telegram_cache, + telegram_asset_cache=telegram_asset_cache, ) # Load persisted state before first refresh to detect changes during downtime diff --git a/custom_components/immich_album_watcher/coordinator.py b/custom_components/immich_album_watcher/coordinator.py index b2c87d7..9abcfcb 100644 --- a/custom_components/immich_album_watcher/coordinator.py +++ b/custom_components/immich_album_watcher/coordinator.py @@ -131,6 +131,7 @@ class AssetInfo: state: str | None = None country: str | None = None is_processed: bool = True # Whether asset is fully processed by Immich + thumbhash: str | None = None # Perceptual hash for cache validation @classmethod def from_api_response( @@ -169,6 +170,7 @@ class AssetInfo: # Check if asset is fully processed by Immich asset_type = data.get("type", ASSET_TYPE_IMAGE) is_processed = cls._check_processing_status(data, asset_type) + thumbhash = data.get("thumbhash") return cls( id=data["id"], @@ -187,6 +189,7 @@ class AssetInfo: state=state, country=country, is_processed=is_processed, + thumbhash=thumbhash, ) @staticmethod @@ -336,6 +339,7 @@ class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]): hub_name: str = "Immich", storage: ImmichAlbumStorage | None = None, telegram_cache: TelegramFileCache | None = None, + telegram_asset_cache: TelegramFileCache | None = None, ) -> None: """Initialize the coordinator.""" super().__init__( @@ -356,6 +360,7 @@ class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]): self._shared_links: list[SharedLinkInfo] = [] self._storage = storage self._telegram_cache = telegram_cache + self._telegram_asset_cache = telegram_asset_cache self._persisted_asset_ids: set[str] | None = None self._external_domain: str | None = None # Fetched from server config self._pending_asset_ids: set[str] = set() # Assets detected but not yet processed @@ -411,9 +416,20 @@ class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]): @property def telegram_cache(self) -> TelegramFileCache | None: - """Return the Telegram file cache.""" + """Return the Telegram file cache (URL-based).""" return self._telegram_cache + @property + def telegram_asset_cache(self) -> TelegramFileCache | None: + """Return the Telegram asset cache (asset ID-based).""" + return self._telegram_asset_cache + + def get_asset_thumbhash(self, asset_id: str) -> str | None: + """Get the current thumbhash for an asset from coordinator data.""" + if self.data and asset_id in self.data.assets: + return self.data.assets[asset_id].thumbhash + return None + def update_scan_interval(self, scan_interval: int) -> None: """Update the scan interval.""" self.update_interval = timedelta(seconds=scan_interval) diff --git a/custom_components/immich_album_watcher/manifest.json b/custom_components/immich_album_watcher/manifest.json index e4448cc..b9b04c5 100644 --- a/custom_components/immich_album_watcher/manifest.json +++ b/custom_components/immich_album_watcher/manifest.json @@ -8,5 +8,5 @@ "iot_class": "cloud_polling", "issue_tracker": "https://github.com/DolgolyovAlexei/haos-hacs-immich-album-watcher/issues", "requirements": [], - "version": "2.7.1" + "version": "2.8.0" } diff --git a/custom_components/immich_album_watcher/sensor.py b/custom_components/immich_album_watcher/sensor.py index b742f18..ed1001d 100644 --- a/custom_components/immich_album_watcher/sensor.py +++ b/custom_components/immich_album_watcher/sensor.py @@ -4,6 +4,7 @@ from __future__ import annotations import asyncio import logging +import re from datetime import datetime from typing import Any @@ -47,14 +48,108 @@ from .const import ( SERVICE_SEND_TELEGRAM_NOTIFICATION, ) from .coordinator import AlbumData, ImmichAlbumWatcherCoordinator +from .storage import TelegramFileCache _LOGGER = logging.getLogger(__name__) # Telegram constants TELEGRAM_API_BASE_URL = "https://api.telegram.org/bot" TELEGRAM_MAX_PHOTO_SIZE = 10 * 1024 * 1024 # 10 MB - Telegram's max photo size +TELEGRAM_MAX_VIDEO_SIZE = 50 * 1024 * 1024 # 50 MB - Telegram's max video/document upload size TELEGRAM_MAX_DIMENSION_SUM = 10000 # Maximum sum of width + height in pixels +# Regex pattern for Immich asset ID (UUID format) +_ASSET_ID_PATTERN = re.compile(r"^[a-f0-9-]{36}$") + +# Regex patterns to extract asset ID from Immich URLs +# Matches patterns like: +# - /api/assets/{asset_id}/original +# - /api/assets/{asset_id}/thumbnail +# - /api/assets/{asset_id}/video/playback +# - /share/{key}/photos/{asset_id} +_IMMICH_ASSET_ID_PATTERNS = [ + re.compile(r"/api/assets/([a-f0-9-]{36})/(?:original|thumbnail|video)"), + re.compile(r"/share/[^/]+/photos/([a-f0-9-]{36})"), +] + + +def _is_asset_id(value: str) -> bool: + """Check if a string is a valid Immich asset ID (UUID format). + + Args: + value: The string to check + + Returns: + True if the string matches the UUID format, False otherwise + """ + return bool(_ASSET_ID_PATTERN.match(value)) + + +def _extract_asset_id_from_url(url: str) -> str | None: + """Extract asset ID from Immich URL if possible. + + Supports the following URL patterns: + - /api/assets/{asset_id}/original?... + - /api/assets/{asset_id}/thumbnail?... + - /api/assets/{asset_id}/video/playback?... + - /share/{key}/photos/{asset_id} + + Args: + url: The URL to extract asset ID from + + Returns: + The asset ID if found, None otherwise + """ + if not url: + return None + + for pattern in _IMMICH_ASSET_ID_PATTERNS: + match = pattern.search(url) + if match: + return match.group(1) + + return None + + +def _split_media_by_upload_size( + media_items: list[tuple], max_upload_size: int +) -> list[list[tuple]]: + """Split media items into sub-groups respecting upload size limit. + + Cached items (file_id references) don't count toward upload size since + they are sent as lightweight JSON references, not uploaded data. + + Args: + media_items: List of (media_type, media_ref, filename, cache_key, is_cached, content_type) + max_upload_size: Maximum total upload bytes per sub-group + + Returns: + List of sub-groups, each a list of media_items tuples + """ + if not media_items: + return [] + + groups: list[list[tuple]] = [] + current_group: list[tuple] = [] + current_upload_size = 0 + + for item in media_items: + _, media_ref, _, _, is_cached, _ = item + item_upload_size = 0 if is_cached else len(media_ref) + + if current_group and current_upload_size + item_upload_size > max_upload_size: + groups.append(current_group) + current_group = [item] + current_upload_size = item_upload_size + else: + current_group.append(item) + current_upload_size += item_upload_size + + if current_group: + groups.append(current_group) + + return groups + async def async_setup_entry( hass: HomeAssistant, @@ -130,7 +225,7 @@ async def async_setup_entry( { vol.Optional("bot_token"): str, vol.Required("chat_id"): vol.Coerce(str), - vol.Optional("urls"): list, + vol.Optional("assets"): list, vol.Optional("caption"): str, vol.Optional("reply_to_message_id"): vol.Coerce(int), vol.Optional("disable_web_page_preview"): bool, @@ -242,7 +337,7 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se async def async_send_telegram_notification( self, chat_id: str, - urls: list[dict[str, str]] | None = None, + assets: list[dict[str, str]] | None = None, bot_token: str | None = None, caption: str | None = None, reply_to_message_id: int | None = None, @@ -258,12 +353,13 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se """Send notification to Telegram. Supports: - - Empty URLs: sends a simple text message + - Empty assets: sends a simple text message - Single photo: uses sendPhoto API - Single video: uses sendVideo API - Multiple items: uses sendMediaGroup API (splits into multiple groups if needed) - Each item in urls should be a dict with 'url' and 'type' (photo/video). + Each item in assets should be a dict with 'url', optional 'type' (photo/video/document), + and optional 'cache_key' (custom key for caching instead of URL). Downloads media and uploads to Telegram to bypass CORS restrictions. If wait_for_response is False, the task will be executed in the background @@ -274,7 +370,7 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se self.hass.async_create_task( self._execute_telegram_notification( chat_id=chat_id, - urls=urls, + assets=assets, bot_token=bot_token, caption=caption, reply_to_message_id=reply_to_message_id, @@ -292,7 +388,7 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se # Blocking mode - execute and return result return await self._execute_telegram_notification( chat_id=chat_id, - urls=urls, + assets=assets, bot_token=bot_token, caption=caption, reply_to_message_id=reply_to_message_id, @@ -308,7 +404,7 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se async def _execute_telegram_notification( self, chat_id: str, - urls: list[dict[str, str]] | None = None, + assets: list[dict[str, str]] | None = None, bot_token: str | None = None, caption: str | None = None, reply_to_message_id: int | None = None, @@ -336,8 +432,8 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se session = async_get_clientsession(self.hass) - # Handle empty URLs - send simple text message (no typing indicator needed) - if not urls: + # Handle empty assets - send simple text message (no typing indicator needed) + if not assets: return await self._send_telegram_message( session, token, chat_id, caption or "", reply_to_message_id, disable_web_page_preview, parse_mode ) @@ -349,23 +445,27 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se try: # Handle single photo - if len(urls) == 1 and urls[0].get("type") == "photo": + if len(assets) == 1 and assets[0].get("type") == "photo": return await self._send_telegram_photo( - session, token, chat_id, urls[0].get("url"), caption, reply_to_message_id, parse_mode, - max_asset_data_size, send_large_photos_as_documents, urls[0].get("content_type") + session, token, chat_id, assets[0].get("url"), caption, reply_to_message_id, parse_mode, + max_asset_data_size, send_large_photos_as_documents, assets[0].get("content_type"), + assets[0].get("cache_key") ) # Handle single video - if len(urls) == 1 and urls[0].get("type") == "video": + if len(assets) == 1 and assets[0].get("type") == "video": return await self._send_telegram_video( - session, token, chat_id, urls[0].get("url"), caption, reply_to_message_id, parse_mode, - max_asset_data_size, urls[0].get("content_type") + session, token, chat_id, assets[0].get("url"), caption, reply_to_message_id, parse_mode, + max_asset_data_size, assets[0].get("content_type"), assets[0].get("cache_key") ) # Handle single document (default type) - if len(urls) == 1 and urls[0].get("type", "document") == "document": - url = urls[0].get("url") - item_content_type = urls[0].get("content_type") + if len(assets) == 1 and assets[0].get("type", "document") == "document": + url = assets[0].get("url") + if not url: + return {"success": False, "error": "Missing 'url' for document"} + item_content_type = assets[0].get("content_type") + item_cache_key = assets[0].get("cache_key") try: download_url = self.coordinator.get_internal_download_url(url) async with session.get(download_url) as resp: @@ -377,14 +477,15 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se # Detect filename from URL or use generic name filename = url.split("/")[-1].split("?")[0] or "file" return await self._send_telegram_document( - session, token, chat_id, data, filename, caption, reply_to_message_id, parse_mode, url, item_content_type + session, token, chat_id, data, filename, caption, reply_to_message_id, parse_mode, + url, item_content_type, item_cache_key ) except aiohttp.ClientError as err: return {"success": False, "error": f"Failed to download media: {err}"} # Handle multiple items - send as media group(s) return await self._send_telegram_media_group( - session, token, chat_id, urls, caption, reply_to_message_id, max_group_size, chunk_delay, parse_mode, + session, token, chat_id, assets, caption, reply_to_message_id, max_group_size, chunk_delay, parse_mode, max_asset_data_size, send_large_photos_as_documents ) finally: @@ -552,6 +653,10 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se except Exception: pass + # Check size limit for videos + if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE: + error_msg += f" | EXCEEDS Telegram upload limit ({TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024):.0f} MB)" + # Provide suggestions based on error description suggestions = [] if "dimension" in description.lower() or "PHOTO_INVALID_DIMENSIONS" in description: @@ -609,6 +714,42 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se _LOGGER.debug("Failed to check photo dimensions: %s", e) return False, None, None, None + def _get_telegram_cache_and_key( + self, + url: str | None, + cache_key: str | None = None, + ) -> tuple[TelegramFileCache | None, str | None, str | None]: + """Determine which Telegram cache, key, and thumbhash to use. + + Priority: custom cache_key -> direct asset ID -> extracted asset ID from URL -> URL + + Args: + url: The URL of the media (or asset ID directly) + cache_key: Optional custom cache key provided by user + + Returns: + Tuple of (cache instance, cache key, thumbhash) to use. + thumbhash is only populated when using the asset cache. + """ + if cache_key: + # Custom cache_key uses URL cache (no thumbhash) + return self.coordinator.telegram_cache, cache_key, None + + if url: + # Check if url is already an asset ID (UUID format) + if _is_asset_id(url): + thumbhash = self.coordinator.get_asset_thumbhash(url) + return self.coordinator.telegram_asset_cache, url, thumbhash + # Try to extract asset ID from URL + asset_id = _extract_asset_id_from_url(url) + if asset_id: + # Extracted asset ID uses asset cache + thumbhash = self.coordinator.get_asset_thumbhash(asset_id) + return self.coordinator.telegram_asset_cache, asset_id, thumbhash + # Fallback to URL cache with URL as key (no thumbhash) + return self.coordinator.telegram_cache, url, None + + return None, None, None async def _send_telegram_photo( self, @@ -622,6 +763,7 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se max_asset_data_size: int | None = None, send_large_photos_as_documents: bool = False, content_type: str | None = None, + cache_key: str | None = None, ) -> ServiceResponse: """Send a single photo to Telegram.""" import aiohttp @@ -634,14 +776,16 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se if not url: return {"success": False, "error": "Missing 'url' for photo"} - # Check cache for file_id - cache = self.coordinator.telegram_cache - cached = cache.get(url) if cache else None + # Determine which cache to use and the cache key + effective_cache, effective_cache_key, effective_thumbhash = self._get_telegram_cache_and_key(url, cache_key) - if cached and cached.get("file_id"): + # Check cache for file_id + cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None + + if cached and cached.get("file_id") and effective_cache_key: # Use cached file_id - no download needed file_id = cached["file_id"] - _LOGGER.debug("Using cached Telegram file_id for photo") + _LOGGER.debug("Using cached Telegram file_id for photo (key: %s)", effective_cache_key[:36] if len(effective_cache_key) > 36 else effective_cache_key) payload = { "chat_id": chat_id, @@ -702,7 +846,7 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se _LOGGER.info("Photo %s, sending as document", reason) return await self._send_telegram_document( session, token, chat_id, data, "photo.jpg", - caption, reply_to_message_id, parse_mode, url + caption, reply_to_message_id, parse_mode, url, None, cache_key ) else: # Skip oversized photo @@ -735,11 +879,11 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se if response.status == 200 and result.get("ok"): # Extract and cache file_id photos = result.get("result", {}).get("photo", []) - if photos and cache: + if photos and effective_cache and effective_cache_key: # Use the largest photo's file_id file_id = photos[-1].get("file_id") if file_id: - await cache.async_set(url, file_id, "photo") + await effective_cache.async_set(effective_cache_key, file_id, "photo", thumbhash=effective_thumbhash) return { "success": True, @@ -773,6 +917,7 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se parse_mode: str = "HTML", max_asset_data_size: int | None = None, content_type: str | None = None, + cache_key: str | None = None, ) -> ServiceResponse: """Send a single video to Telegram.""" import aiohttp @@ -785,14 +930,16 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se if not url: return {"success": False, "error": "Missing 'url' for video"} - # Check cache for file_id - cache = self.coordinator.telegram_cache - cached = cache.get(url) if cache else None + # Determine which cache to use and the cache key + effective_cache, effective_cache_key, effective_thumbhash = self._get_telegram_cache_and_key(url, cache_key) - if cached and cached.get("file_id"): + # Check cache for file_id + cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None + + if cached and cached.get("file_id") and effective_cache_key: # Use cached file_id - no download needed file_id = cached["file_id"] - _LOGGER.debug("Using cached Telegram file_id for video") + _LOGGER.debug("Using cached Telegram file_id for video (key: %s)", effective_cache_key[:36] if len(effective_cache_key) > 36 else effective_cache_key) payload = { "chat_id": chat_id, @@ -833,7 +980,7 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se data = await resp.read() _LOGGER.debug("Downloaded video: %d bytes", len(data)) - # Check if video exceeds max size limit + # Check if video exceeds max size limit (user-defined limit) if max_asset_data_size is not None and len(data) > max_asset_data_size: _LOGGER.warning( "Video size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping", @@ -845,6 +992,19 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se "skipped": True, } + # Check if video exceeds Telegram's upload limit (50 MB) + if len(data) > TELEGRAM_MAX_VIDEO_SIZE: + _LOGGER.warning( + "Video size (%d bytes, %.1f MB) exceeds Telegram's %d bytes (%.0f MB) upload limit, skipping", + len(data), len(data) / (1024 * 1024), + TELEGRAM_MAX_VIDEO_SIZE, TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024), + ) + return { + "success": False, + "error": f"Video size ({len(data) / (1024 * 1024):.1f} MB) exceeds Telegram's {TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024):.0f} MB upload limit", + "skipped": True, + } + # Build multipart form form = FormData() form.add_field("chat_id", chat_id) @@ -867,10 +1027,10 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se if response.status == 200 and result.get("ok"): # Extract and cache file_id video = result.get("result", {}).get("video", {}) - if video and cache: + if video and effective_cache and effective_cache_key: file_id = video.get("file_id") if file_id: - await cache.async_set(url, file_id, "video") + await effective_cache.async_set(effective_cache_key, file_id, "video", thumbhash=effective_thumbhash) return { "success": True, @@ -905,6 +1065,7 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se parse_mode: str = "HTML", source_url: str | None = None, content_type: str | None = None, + cache_key: str | None = None, ) -> ServiceResponse: """Send a file as a document to Telegram.""" import aiohttp @@ -917,14 +1078,16 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se if not content_type: content_type = "application/octet-stream" - # Check cache for file_id if source_url is provided - cache = self.coordinator.telegram_cache - if source_url: - cached = cache.get(source_url) if cache else None + # Determine which cache and key to use + effective_cache, effective_cache_key, effective_thumbhash = self._get_telegram_cache_and_key(source_url, cache_key) + + # Check cache for file_id + if effective_cache and effective_cache_key: + cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if cached and cached.get("file_id") and cached.get("type") == "document": # Use cached file_id file_id = cached["file_id"] - _LOGGER.debug("Using cached Telegram file_id for document") + _LOGGER.debug("Using cached Telegram file_id for document (key: %s)", effective_cache_key[:36] if len(effective_cache_key) > 36 else effective_cache_key) payload = { "chat_id": chat_id, @@ -973,11 +1136,11 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se _LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok")) if response.status == 200 and result.get("ok"): # Extract and cache file_id - if source_url and cache: + if effective_cache_key and effective_cache: document = result.get("result", {}).get("document", {}) file_id = document.get("file_id") if file_id: - await cache.async_set(source_url, file_id, "document") + await effective_cache.async_set(effective_cache_key, file_id, "document", thumbhash=effective_thumbhash) return { "success": True, @@ -1005,7 +1168,7 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se session: Any, token: str, chat_id: str, - urls: list[dict[str, str]], + assets: list[dict[str, str]], caption: str | None = None, reply_to_message_id: int | None = None, max_group_size: int = 10, @@ -1014,9 +1177,9 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se max_asset_data_size: int | None = None, send_large_photos_as_documents: bool = False, ) -> ServiceResponse: - """Send media URLs to Telegram as media group(s). + """Send media assets to Telegram as media group(s). - If urls list exceeds max_group_size, splits into multiple media groups. + If assets list exceeds max_group_size, splits into multiple media groups. For chunks with single items, uses sendPhoto/sendVideo APIs. Applies chunk_delay (in milliseconds) between groups if specified. """ @@ -1025,12 +1188,12 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se import aiohttp from aiohttp import FormData - # Split URLs into chunks based on max_group_size - chunks = [urls[i:i + max_group_size] for i in range(0, len(urls), max_group_size)] + # Split assets into chunks based on max_group_size + chunks = [assets[i:i + max_group_size] for i in range(0, len(assets), max_group_size)] all_message_ids = [] _LOGGER.debug("Sending %d media items in %d chunk(s) of max %d items (delay: %dms)", - len(urls), len(chunks), max_group_size, chunk_delay) + len(assets), len(chunks), max_group_size, chunk_delay) for chunk_idx, chunk in enumerate(chunks): # Add delay before sending subsequent chunks @@ -1046,6 +1209,7 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se media_type = item.get("type", "document") url = item.get("url") item_content_type = item.get("content_type") + item_cache_key = item.get("cache_key") # Only apply caption and reply_to to the first chunk chunk_caption = caption if chunk_idx == 0 else None @@ -1055,16 +1219,18 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se _LOGGER.debug("Sending chunk %d/%d as single photo", chunk_idx + 1, len(chunks)) result = await self._send_telegram_photo( session, token, chat_id, url, chunk_caption, chunk_reply_to, parse_mode, - max_asset_data_size, send_large_photos_as_documents, item_content_type + max_asset_data_size, send_large_photos_as_documents, item_content_type, item_cache_key ) elif media_type == "video": _LOGGER.debug("Sending chunk %d/%d as single video", chunk_idx + 1, len(chunks)) result = await self._send_telegram_video( session, token, chat_id, url, chunk_caption, chunk_reply_to, parse_mode, - max_asset_data_size, item_content_type + max_asset_data_size, item_content_type, item_cache_key ) else: # document _LOGGER.debug("Sending chunk %d/%d as single document", chunk_idx + 1, len(chunks)) + if not url: + return {"success": False, "error": "Missing 'url' for document", "failed_at_chunk": chunk_idx + 1} try: download_url = self.coordinator.get_internal_download_url(url) async with session.get(download_url) as resp: @@ -1076,7 +1242,8 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se continue filename = url.split("/")[-1].split("?")[0] or "file" result = await self._send_telegram_document( - session, token, chat_id, data, filename, chunk_caption, chunk_reply_to, parse_mode, url, item_content_type + session, token, chat_id, data, filename, chunk_caption, chunk_reply_to, parse_mode, + url, item_content_type, item_cache_key ) except aiohttp.ClientError as err: return {"success": False, "error": f"Failed to download media: {err}", "failed_at_chunk": chunk_idx + 1} @@ -1090,28 +1257,35 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se # Multi-item chunk: use sendMediaGroup _LOGGER.debug("Sending chunk %d/%d as media group (%d items)", chunk_idx + 1, len(chunks), len(chunk)) - # Get cache reference - cache = self.coordinator.telegram_cache + # Helper to get the appropriate cache for a cache key + def get_cache_for_key(key: str) -> TelegramFileCache | None: + """Return asset cache if key is a UUID, otherwise URL cache.""" + is_asset_id = _is_asset_id(key) + return self.coordinator.telegram_asset_cache if is_asset_id else self.coordinator.telegram_cache # Collect media items - either from cache (file_id) or by downloading - # Each item: (type, media_ref, filename, url, is_cached, content_type) + # Each item: (type, media_ref, filename, cache_key, is_cached, content_type) # media_ref is either file_id (str) or data (bytes) media_items: list[tuple[str, str | bytes, str, str, bool, str | None]] = [] - oversized_photos: list[tuple[bytes, str | None, str]] = [] # (data, caption, url) - documents_to_send: list[tuple[bytes, str | None, str, str, str | None]] = [] # (data, caption, url, filename, content_type) + oversized_photos: list[tuple[bytes, str | None, str, str | None]] = [] # (data, caption, url, cache_key) + documents_to_send: list[tuple[bytes, str | None, str, str | None, str, str | None]] = [] # (data, caption, url, cache_key, filename, content_type) skipped_count = 0 for i, item in enumerate(chunk): url = item.get("url") - media_type = item.get("type", "document") - item_content_type = item.get("content_type") - if not url: return { "success": False, "error": f"Missing 'url' in item {chunk_idx * max_group_size + i}", } + media_type = item.get("type", "document") + item_content_type = item.get("content_type") + # Determine cache key: custom cache_key -> extracted asset ID -> URL + custom_cache_key = item.get("cache_key") + extracted_asset_id = _extract_asset_id_from_url(url) if not custom_cache_key else None + item_cache_key = custom_cache_key or extracted_asset_id or url + if media_type not in ("photo", "video", "document"): return { "success": False, @@ -1139,7 +1313,7 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se # Caption only on first item of first chunk if no media items yet doc_caption = caption if chunk_idx == 0 and i == 0 and len(media_items) == 0 and len(documents_to_send) == 0 else None filename = url.split("/")[-1].split("?")[0] or f"file_{i}" - documents_to_send.append((data, doc_caption, url, filename, item_content_type)) + documents_to_send.append((data, doc_caption, url, custom_cache_key, filename, item_content_type)) except aiohttp.ClientError as err: return { "success": False, @@ -1148,12 +1322,14 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se continue # Check cache first for photos/videos - cached = cache.get(url) if cache else None + item_cache = get_cache_for_key(item_cache_key) + item_thumbhash = self.coordinator.get_asset_thumbhash(item_cache_key) if _is_asset_id(item_cache_key) else None + cached = item_cache.get(item_cache_key, thumbhash=item_thumbhash) if item_cache else None if cached and cached.get("file_id"): # Use cached file_id ext = "jpg" if media_type == "photo" else "mp4" filename = f"media_{chunk_idx * max_group_size + i}.{ext}" - media_items.append((media_type, cached["file_id"], filename, url, True, item_content_type)) + media_items.append((media_type, cached["file_id"], filename, item_cache_key, True, item_content_type)) _LOGGER.debug("Using cached file_id for media %d", chunk_idx * max_group_size + i) continue @@ -1179,6 +1355,17 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se skipped_count += 1 continue + # For videos, check Telegram upload limit (50 MB) + if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE: + _LOGGER.warning( + "Video %d size (%d bytes, %.1f MB) exceeds Telegram's %.0f MB upload limit, skipping", + chunk_idx * max_group_size + i, len(data), + len(data) / (1024 * 1024), + TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024), + ) + skipped_count += 1 + continue + # For photos, check Telegram limits if media_type == "photo": exceeds_limits, reason, width, height = self._check_telegram_photo_limits(data) @@ -1187,7 +1374,7 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se # Separate this photo to send as document later # Caption only on first item of first chunk photo_caption = caption if chunk_idx == 0 and i == 0 and len(media_items) == 0 else None - oversized_photos.append((data, photo_caption, url)) + oversized_photos.append((data, photo_caption, url, custom_cache_key)) _LOGGER.info("Photo %d %s, will send as document", i, reason) continue else: @@ -1198,7 +1385,7 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se ext = "jpg" if media_type == "photo" else "mp4" filename = f"media_{chunk_idx * max_group_size + i}.{ext}" - media_items.append((media_type, data, filename, url, False, item_content_type)) + media_items.append((media_type, data, filename, item_cache_key, False, item_content_type)) except aiohttp.ClientError as err: return { "success": False, @@ -1211,161 +1398,263 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se chunk_idx + 1, len(chunks), len(chunk)) continue - # Send media group if we have normal-sized files + # Split media items into sub-groups respecting Telegram's upload size limit + # This ensures the total upload data per sendMediaGroup call stays under 50 MB if media_items: - # Check if all items are cached (can use simple JSON payload) - all_cached = all(is_cached for _, _, _, _, is_cached, _ in media_items) + media_sub_groups = _split_media_by_upload_size(media_items, TELEGRAM_MAX_VIDEO_SIZE) + if len(media_sub_groups) > 1: + _LOGGER.debug( + "Chunk %d/%d: split %d media items into %d sub-groups by upload size", + chunk_idx + 1, len(chunks), len(media_items), len(media_sub_groups), + ) - if all_cached: - # All items cached - use simple JSON payload with file_ids - _LOGGER.debug("All %d items cached, using file_ids", len(media_items)) - media_json = [] - for i, (media_type, file_id, _, _, _, _) in enumerate(media_items): - media_item_json: dict[str, Any] = { - "type": media_type, - "media": file_id, - } - if chunk_idx == 0 and i == 0 and caption and not oversized_photos: - media_item_json["caption"] = caption - media_item_json["parse_mode"] = parse_mode - media_json.append(media_item_json) + first_caption_used = False + for sub_idx, sub_group_items in enumerate(media_sub_groups): + is_first = chunk_idx == 0 and sub_idx == 0 + sub_caption = caption if is_first and not first_caption_used and not oversized_photos else None + sub_reply_to = reply_to_message_id if is_first else None - payload = { - "chat_id": chat_id, - "media": media_json, - } - if chunk_idx == 0 and reply_to_message_id: - payload["reply_to_message_id"] = reply_to_message_id + # Add delay between sub-groups (not before the first one) + if sub_idx > 0 and chunk_delay > 0: + await asyncio.sleep(chunk_delay / 1000) - telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendMediaGroup" - try: - async with session.post(telegram_url, json=payload) as response: - result = await response.json() - if response.status == 200 and result.get("ok"): - chunk_message_ids = [ - msg.get("message_id") for msg in result.get("result", []) - ] - all_message_ids.extend(chunk_message_ids) - else: - # Cache might be stale - fall through to upload path - _LOGGER.debug("Cached file_ids failed, will re-upload: %s", result.get("description")) - all_cached = False # Force re-upload - except aiohttp.ClientError as err: - _LOGGER.debug("Cached file_ids request failed: %s", err) - all_cached = False - - if not all_cached: - # Build multipart form with mix of cached file_ids and uploaded data - form = FormData() - form.add_field("chat_id", chat_id) - - # Only use reply_to_message_id for the first chunk - if chunk_idx == 0 and reply_to_message_id: - form.add_field("reply_to_message_id", str(reply_to_message_id)) - - # Build media JSON - use file_id for cached, attach:// for uploaded - media_json = [] - upload_idx = 0 - urls_to_cache: list[tuple[str, int, str]] = [] # (url, result_idx, type) - - for i, (media_type, media_ref, filename, url, is_cached, item_content_type) in enumerate(media_items): - if is_cached: - # Use file_id directly - media_item_json: dict[str, Any] = { - "type": media_type, - "media": media_ref, # file_id - } + # Single item - use sendPhoto/sendVideo (sendMediaGroup requires 2+ items) + if len(sub_group_items) == 1: + sg_type, sg_ref, sg_fname, sg_ck, sg_cached, sg_ct = sub_group_items[0] + if sg_type == "photo": + api_method = "sendPhoto" + media_field = "photo" else: - # Upload this file - attach_name = f"file{upload_idx}" - media_item_json = { - "type": media_type, - "media": f"attach://{attach_name}", - } - # Use provided content_type or default based on media type - content_type = item_content_type or ("image/jpeg" if media_type == "photo" else "video/mp4") - form.add_field(attach_name, media_ref, filename=filename, content_type=content_type) - urls_to_cache.append((url, i, media_type)) - upload_idx += 1 + api_method = "sendVideo" + media_field = "video" - if chunk_idx == 0 and i == 0 and caption and not oversized_photos: - media_item_json["caption"] = caption - media_item_json["parse_mode"] = parse_mode - media_json.append(media_item_json) - - form.add_field("media", json.dumps(media_json)) - - # Send to Telegram - telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendMediaGroup" - - try: - _LOGGER.debug("Uploading media group chunk %d/%d (%d files, %d cached) to Telegram", - chunk_idx + 1, len(chunks), len(media_items), len(media_items) - upload_idx) - async with session.post(telegram_url, data=form) as response: - result = await response.json() - _LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok")) - if response.status == 200 and result.get("ok"): - chunk_message_ids = [ - msg.get("message_id") for msg in result.get("result", []) - ] - all_message_ids.extend(chunk_message_ids) - - # Cache the newly uploaded file_ids - if cache and urls_to_cache: - result_messages = result.get("result", []) - for url, result_idx, m_type in urls_to_cache: - if result_idx < len(result_messages): - msg = result_messages[result_idx] - if m_type == "photo": - photos = msg.get("photo", []) + try: + if sg_cached: + sg_payload: dict[str, Any] = { + "chat_id": chat_id, + media_field: sg_ref, + "parse_mode": parse_mode, + } + if sub_caption: + sg_payload["caption"] = sub_caption + if sub_reply_to: + sg_payload["reply_to_message_id"] = sub_reply_to + telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/{api_method}" + async with session.post(telegram_url, json=sg_payload) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + all_message_ids.append(result["result"].get("message_id")) + if sub_caption: + first_caption_used = True + else: + _LOGGER.debug("Cached file_id failed in sub-group, will re-upload: %s", result.get("description")) + sg_cached = False # Fall through to upload + if not sg_cached: + sg_form = FormData() + sg_form.add_field("chat_id", chat_id) + sg_content_type = sg_ct or ("image/jpeg" if sg_type == "photo" else "video/mp4") + sg_form.add_field(media_field, sg_ref, filename=sg_fname, content_type=sg_content_type) + sg_form.add_field("parse_mode", parse_mode) + if sub_caption: + sg_form.add_field("caption", sub_caption) + if sub_reply_to: + sg_form.add_field("reply_to_message_id", str(sub_reply_to)) + telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/{api_method}" + async with session.post(telegram_url, data=sg_form) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + all_message_ids.append(result["result"].get("message_id")) + if sub_caption: + first_caption_used = True + # Cache the uploaded file_id + sg_cache = get_cache_for_key(sg_ck) + if sg_cache: + sg_thumbhash = self.coordinator.get_asset_thumbhash(sg_ck) if _is_asset_id(sg_ck) else None + result_data = result.get("result", {}) + if sg_type == "photo": + photos = result_data.get("photo", []) if photos: - await cache.async_set(url, photos[-1].get("file_id"), "photo") - elif m_type == "video": - video = msg.get("video", {}) + await sg_cache.async_set(sg_ck, photos[-1].get("file_id"), "photo", thumbhash=sg_thumbhash) + elif sg_type == "video": + video = result_data.get("video", {}) if video.get("file_id"): - await cache.async_set(url, video["file_id"], "video") - else: - # Log detailed error for media group with total size info - uploaded_data = [m for m in media_items if not m[4]] - total_size = sum(len(d) for _, d, _, _, _, _ in uploaded_data if isinstance(d, bytes)) - _LOGGER.error( - "Telegram API error for chunk %d/%d: %s | Media count: %d | Uploaded size: %d bytes (%.2f MB)", - chunk_idx + 1, len(chunks), - result.get("description", "Unknown Telegram error"), - len(media_items), - total_size, - total_size / (1024 * 1024) if total_size else 0 - ) - # Log detailed diagnostics for the first photo in the group - for media_type, media_ref, _, _, is_cached, _ in media_items: - if media_type == "photo" and not is_cached and isinstance(media_ref, bytes): + await sg_cache.async_set(sg_ck, video["file_id"], "video", thumbhash=sg_thumbhash) + else: self._log_telegram_error( error_code=result.get("error_code"), description=result.get("description", "Unknown Telegram error"), - data=media_ref, - media_type="photo", + data=sg_ref if isinstance(sg_ref, bytes) else None, + media_type=sg_type, ) - break # Only log details for first photo - return { - "success": False, - "error": result.get("description", "Unknown Telegram error"), - "error_code": result.get("error_code"), - "failed_at_chunk": chunk_idx + 1, - } - except aiohttp.ClientError as err: - _LOGGER.error("Telegram upload failed for chunk %d: %s", chunk_idx + 1, err) - return { - "success": False, - "error": str(err), - "failed_at_chunk": chunk_idx + 1, + return { + "success": False, + "error": result.get("description", "Unknown Telegram error"), + "error_code": result.get("error_code"), + "failed_at_chunk": chunk_idx + 1, + } + except aiohttp.ClientError as err: + _LOGGER.error("Telegram upload failed for sub-group %d: %s", sub_idx + 1, err) + return {"success": False, "error": str(err), "failed_at_chunk": chunk_idx + 1} + continue + + # Multiple items - use sendMediaGroup + all_cached = all(is_cached for _, _, _, _, is_cached, _ in sub_group_items) + + if all_cached: + _LOGGER.debug("Sub-group %d/%d: all %d items cached, using file_ids", + sub_idx + 1, len(media_sub_groups), len(sub_group_items)) + media_json = [] + for i, (media_type, file_id, _, _, _, _) in enumerate(sub_group_items): + media_item_json: dict[str, Any] = { + "type": media_type, + "media": file_id, + } + if i == 0 and sub_caption and not first_caption_used: + media_item_json["caption"] = sub_caption + media_item_json["parse_mode"] = parse_mode + media_json.append(media_item_json) + + payload: dict[str, Any] = { + "chat_id": chat_id, + "media": media_json, } + if sub_reply_to: + payload["reply_to_message_id"] = sub_reply_to + + telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendMediaGroup" + try: + async with session.post(telegram_url, json=payload) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + chunk_message_ids = [ + msg.get("message_id") for msg in result.get("result", []) + ] + all_message_ids.extend(chunk_message_ids) + if sub_caption: + first_caption_used = True + else: + # Cache might be stale - fall through to upload path + _LOGGER.debug("Cached file_ids failed, will re-upload: %s", result.get("description")) + all_cached = False # Force re-upload + except aiohttp.ClientError as err: + _LOGGER.debug("Cached file_ids request failed: %s", err) + all_cached = False + + if not all_cached: + # Build multipart form with mix of cached file_ids and uploaded data + form = FormData() + form.add_field("chat_id", chat_id) + + if sub_reply_to: + form.add_field("reply_to_message_id", str(sub_reply_to)) + + # Build media JSON - use file_id for cached, attach:// for uploaded + media_json = [] + upload_idx = 0 + keys_to_cache: list[tuple[str, int, str]] = [] # (cache_key, result_idx, type) + + for i, (media_type, media_ref, filename, item_cache_key, is_cached, item_content_type) in enumerate(sub_group_items): + if is_cached: + # Use file_id directly + media_item_json: dict[str, Any] = { + "type": media_type, + "media": media_ref, # file_id + } + else: + # Upload this file + attach_name = f"file{upload_idx}" + media_item_json = { + "type": media_type, + "media": f"attach://{attach_name}", + } + # Use provided content_type or default based on media type + content_type = item_content_type or ("image/jpeg" if media_type == "photo" else "video/mp4") + form.add_field(attach_name, media_ref, filename=filename, content_type=content_type) + keys_to_cache.append((item_cache_key, i, media_type)) + upload_idx += 1 + + if i == 0 and sub_caption and not first_caption_used: + media_item_json["caption"] = sub_caption + media_item_json["parse_mode"] = parse_mode + media_json.append(media_item_json) + + form.add_field("media", json.dumps(media_json)) + + # Send to Telegram + telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendMediaGroup" + + try: + _LOGGER.debug("Uploading media group sub-group %d/%d (%d files, %d cached) to Telegram", + sub_idx + 1, len(media_sub_groups), len(sub_group_items), len(sub_group_items) - upload_idx) + async with session.post(telegram_url, data=form) as response: + result = await response.json() + _LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok")) + if response.status == 200 and result.get("ok"): + chunk_message_ids = [ + msg.get("message_id") for msg in result.get("result", []) + ] + all_message_ids.extend(chunk_message_ids) + if sub_caption: + first_caption_used = True + + # Cache the newly uploaded file_ids + if keys_to_cache: + result_messages = result.get("result", []) + for ck, result_idx, m_type in keys_to_cache: + ck_cache = get_cache_for_key(ck) + ck_thumbhash = self.coordinator.get_asset_thumbhash(ck) if _is_asset_id(ck) else None + if result_idx < len(result_messages) and ck_cache: + msg = result_messages[result_idx] + if m_type == "photo": + photos = msg.get("photo", []) + if photos: + await ck_cache.async_set(ck, photos[-1].get("file_id"), "photo", thumbhash=ck_thumbhash) + elif m_type == "video": + video = msg.get("video", {}) + if video.get("file_id"): + await ck_cache.async_set(ck, video["file_id"], "video", thumbhash=ck_thumbhash) + else: + # Log detailed error for media group with total size info + uploaded_data = [m for m in sub_group_items if not m[4]] + total_size = sum(len(d) for _, d, _, _, _, _ in uploaded_data if isinstance(d, bytes)) + _LOGGER.error( + "Telegram API error for sub-group %d/%d: %s | Media count: %d | Uploaded size: %d bytes (%.2f MB)", + sub_idx + 1, len(media_sub_groups), + result.get("description", "Unknown Telegram error"), + len(sub_group_items), + total_size, + total_size / (1024 * 1024) if total_size else 0 + ) + # Log detailed diagnostics for the first photo in the group + for media_type, media_ref, _, _, is_cached, _ in sub_group_items: + if media_type == "photo" and not is_cached and isinstance(media_ref, bytes): + self._log_telegram_error( + error_code=result.get("error_code"), + description=result.get("description", "Unknown Telegram error"), + data=media_ref, + media_type="photo", + ) + break # Only log details for first photo + return { + "success": False, + "error": result.get("description", "Unknown Telegram error"), + "error_code": result.get("error_code"), + "failed_at_chunk": chunk_idx + 1, + } + except aiohttp.ClientError as err: + _LOGGER.error("Telegram upload failed for sub-group %d: %s", sub_idx + 1, err) + return { + "success": False, + "error": str(err), + "failed_at_chunk": chunk_idx + 1, + } # Send oversized photos as documents - for i, (data, photo_caption, photo_url) in enumerate(oversized_photos): + for i, (data, photo_caption, photo_url, photo_cache_key) in enumerate(oversized_photos): _LOGGER.debug("Sending oversized photo %d/%d as document", i + 1, len(oversized_photos)) result = await self._send_telegram_document( session, token, chat_id, data, f"photo_{i}.jpg", - photo_caption, None, parse_mode, photo_url + photo_caption, None, parse_mode, photo_url, None, photo_cache_key ) if result.get("success"): all_message_ids.append(result.get("message_id")) @@ -1374,11 +1663,11 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se # Continue with other photos even if one fails # Send documents (can't be in media groups) - for i, (data, doc_caption, doc_url, filename, doc_content_type) in enumerate(documents_to_send): + for i, (data, doc_caption, doc_url, doc_cache_key, filename, doc_content_type) in enumerate(documents_to_send): _LOGGER.debug("Sending document %d/%d", i + 1, len(documents_to_send)) result = await self._send_telegram_document( session, token, chat_id, data, filename, - doc_caption, None, parse_mode, doc_url, doc_content_type + doc_caption, None, parse_mode, doc_url, doc_content_type, doc_cache_key ) if result.get("success"): all_message_ids.append(result.get("message_id")) diff --git a/custom_components/immich_album_watcher/services.yaml b/custom_components/immich_album_watcher/services.yaml index 69f563a..2d69800 100644 --- a/custom_components/immich_album_watcher/services.yaml +++ b/custom_components/immich_album_watcher/services.yaml @@ -149,9 +149,9 @@ send_telegram_notification: required: true selector: text: - urls: - name: URLs - description: "List of media URLs to send. Each item should have 'url', optional 'type' (document/photo/video, default: document), and optional 'content_type' (MIME type, e.g., 'image/jpeg'). If empty, sends a text message. Photos and videos can be grouped; documents are sent separately." + assets: + name: Assets + description: "List of media assets to send. Each item should have 'url', optional 'type' (document/photo/video, default: document), optional 'content_type' (MIME type, e.g., 'image/jpeg'), and optional 'cache_key' (custom key for caching instead of URL). If empty, sends a text message. Photos and videos can be grouped; documents are sent separately." required: false selector: object: diff --git a/custom_components/immich_album_watcher/storage.py b/custom_components/immich_album_watcher/storage.py index f46d00c..e0b2272 100644 --- a/custom_components/immich_album_watcher/storage.py +++ b/custom_components/immich_album_watcher/storage.py @@ -73,40 +73,53 @@ class TelegramFileCache: When a file is uploaded to Telegram, it returns a file_id that can be reused to send the same file without re-uploading. This cache stores these file_ids - keyed by the source URL. + keyed by the source URL or asset ID. + + Supports two validation modes: + - TTL mode (default): entries expire after a configured time-to-live + - Thumbhash mode: entries are validated by comparing stored thumbhash with + the current asset thumbhash from Immich """ def __init__( self, hass: HomeAssistant, - album_id: str, + entry_id: str, ttl_seconds: int = DEFAULT_TELEGRAM_CACHE_TTL, + use_thumbhash: bool = False, ) -> None: """Initialize the Telegram file cache. Args: hass: Home Assistant instance - album_id: Album ID for scoping the cache - ttl_seconds: Time-to-live for cache entries in seconds (default: 48 hours) + entry_id: Config entry ID for scoping the cache (per hub) + ttl_seconds: Time-to-live for cache entries in seconds (TTL mode only) + use_thumbhash: Use thumbhash-based validation instead of TTL """ self._store: Store[dict[str, Any]] = Store( - hass, STORAGE_VERSION, f"{STORAGE_KEY_PREFIX}.telegram_cache.{album_id}" + hass, STORAGE_VERSION, f"{STORAGE_KEY_PREFIX}.telegram_cache.{entry_id}" ) self._data: dict[str, Any] | None = None self._ttl_seconds = ttl_seconds + self._use_thumbhash = use_thumbhash async def async_load(self) -> None: """Load cache data from storage.""" self._data = await self._store.async_load() or {"files": {}} - # Clean up expired entries on load + # Clean up expired entries on load (TTL mode only) await self._cleanup_expired() + mode = "thumbhash" if self._use_thumbhash else "TTL" _LOGGER.debug( - "Loaded Telegram file cache with %d entries", + "Loaded Telegram file cache with %d entries (mode: %s)", len(self._data.get("files", {})), + mode, ) async def _cleanup_expired(self) -> None: - """Remove expired cache entries.""" + """Remove expired cache entries (TTL mode only).""" + if self._use_thumbhash: + return + if not self._data or "files" not in self._data: return @@ -127,53 +140,75 @@ class TelegramFileCache: await self._store.async_save(self._data) _LOGGER.debug("Cleaned up %d expired Telegram cache entries", len(expired_keys)) - def get(self, url: str) -> dict[str, Any] | None: - """Get cached file_id for a URL. + def get(self, key: str, thumbhash: str | None = None) -> dict[str, Any] | None: + """Get cached file_id for a key. Args: - url: The source URL of the media + key: The cache key (URL or asset ID) + thumbhash: Current thumbhash for validation (thumbhash mode only). + If provided, compares with stored thumbhash. Mismatch = cache miss. Returns: - Dict with 'file_id' and 'type' if cached and not expired, None otherwise + Dict with 'file_id' and 'type' if cached and valid, None otherwise """ if not self._data or "files" not in self._data: return None - entry = self._data["files"].get(url) + entry = self._data["files"].get(key) if not entry: return None - # Check if expired - cached_at_str = entry.get("cached_at") - if cached_at_str: - cached_at = datetime.fromisoformat(cached_at_str) - age_seconds = (datetime.now(timezone.utc) - cached_at).total_seconds() - if age_seconds > self._ttl_seconds: - return None + if self._use_thumbhash: + # Thumbhash-based validation + if thumbhash is not None: + stored_thumbhash = entry.get("thumbhash") + if stored_thumbhash and stored_thumbhash != thumbhash: + _LOGGER.debug( + "Cache miss for %s: thumbhash changed", + key[:36], + ) + return None + # If no thumbhash provided (asset not in monitored album), + # return cached entry anyway — self-heals on Telegram rejection + else: + # TTL-based validation + cached_at_str = entry.get("cached_at") + if cached_at_str: + cached_at = datetime.fromisoformat(cached_at_str) + age_seconds = (datetime.now(timezone.utc) - cached_at).total_seconds() + if age_seconds > self._ttl_seconds: + return None return { "file_id": entry.get("file_id"), "type": entry.get("type"), } - async def async_set(self, url: str, file_id: str, media_type: str) -> None: - """Store a file_id for a URL. + async def async_set( + self, key: str, file_id: str, media_type: str, thumbhash: str | None = None + ) -> None: + """Store a file_id for a key. Args: - url: The source URL of the media + key: The cache key (URL or asset ID) file_id: The Telegram file_id media_type: The type of media ('photo', 'video', 'document') + thumbhash: Current thumbhash to store alongside file_id (thumbhash mode only) """ if self._data is None: self._data = {"files": {}} - self._data["files"][url] = { + entry_data: dict[str, Any] = { "file_id": file_id, "type": media_type, "cached_at": datetime.now(timezone.utc).isoformat(), } + if thumbhash is not None: + entry_data["thumbhash"] = thumbhash + + self._data["files"][key] = entry_data await self._store.async_save(self._data) - _LOGGER.debug("Cached Telegram file_id for URL (type: %s)", media_type) + _LOGGER.debug("Cached Telegram file_id for key (type: %s)", media_type) async def async_remove(self) -> None: """Remove all cache data.""" diff --git a/custom_components/immich_album_watcher/translations/en.json b/custom_components/immich_album_watcher/translations/en.json index 5965db4..80668cf 100644 --- a/custom_components/immich_album_watcher/translations/en.json +++ b/custom_components/immich_album_watcher/translations/en.json @@ -205,9 +205,9 @@ "name": "Chat ID", "description": "Telegram chat ID to send to." }, - "urls": { - "name": "URLs", - "description": "List of media URLs with optional type (document/photo/video, default: document) and optional content_type (MIME type). If empty, sends a text message. Photos and videos can be grouped; documents are sent separately." + "assets": { + "name": "Assets", + "description": "List of media assets with 'url', optional 'type' (document/photo/video, default: document), optional 'content_type' (MIME type), and optional 'cache_key' (custom key for caching instead of URL). If empty, sends a text message. Photos and videos can be grouped; documents are sent separately." }, "caption": { "name": "Caption", diff --git a/custom_components/immich_album_watcher/translations/ru.json b/custom_components/immich_album_watcher/translations/ru.json index d258bda..1231f7f 100644 --- a/custom_components/immich_album_watcher/translations/ru.json +++ b/custom_components/immich_album_watcher/translations/ru.json @@ -205,9 +205,9 @@ "name": "ID чата", "description": "ID чата Telegram для отправки." }, - "urls": { - "name": "URL-адреса", - "description": "Список URL медиа-файлов с типом (document/photo/video, по умолчанию document) и опциональным content_type (MIME-тип). Если пусто, отправляет текстовое сообщение. Фото и видео группируются; документы отправляются отдельно." + "assets": { + "name": "Ресурсы", + "description": "Список медиа-ресурсов с 'url', опциональным 'type' (document/photo/video, по умолчанию document), опциональным 'content_type' (MIME-тип) и опциональным 'cache_key' (свой ключ кэширования вместо URL). Если пусто, отправляет текстовое сообщение. Фото и видео группируются; документы отправляются отдельно." }, "caption": { "name": "Подпись",