Files
haos-hacs-immich-album-watcher/custom_components/immich_album_watcher/sensor.py
alexei.dolgolyov dd7032b411
Some checks failed
Validate / Hassfest (push) Has been cancelled
Replace TTL with thumbhash-based cache validation and add Telegram video size limits
- Asset cache now validates entries by comparing stored thumbhash with current
  Immich thumbhash instead of using TTL expiration. This makes cache invalidation
  precise (only when content actually changes) and eliminates unnecessary re-uploads.
  URL-based cache retains TTL for non-Immich URLs.
- Add TELEGRAM_MAX_VIDEO_SIZE (50 MB) check to skip oversized videos in both
  single-video and media-group paths, preventing entire groups from failing.
- Split media groups into sub-groups by cumulative upload size to ensure each
  sendMediaGroup request stays under Telegram's 50 MB upload limit.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 12:28:33 +03:00

2008 lines
89 KiB
Python

"""Sensor platform for Immich Album Watcher."""
from __future__ import annotations
import asyncio
import logging
import re
from datetime import datetime
from typing import Any
import voluptuous as vol
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry, ConfigSubentry
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse, callback
from homeassistant.helpers import entity_platform
from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import slugify
from .const import (
ATTR_ALBUM_ID,
ATTR_ALBUM_NAME,
ATTR_ALBUM_PROTECTED_URL,
ATTR_ALBUM_URLS,
ATTR_ASSET_COUNT,
ATTR_CREATED_AT,
ATTR_LAST_UPDATED,
ATTR_OWNER,
ATTR_PEOPLE,
ATTR_PHOTO_COUNT,
ATTR_SHARED,
ATTR_THUMBNAIL_URL,
ATTR_VIDEO_COUNT,
CONF_ALBUM_ID,
CONF_ALBUM_NAME,
CONF_HUB_NAME,
CONF_TELEGRAM_BOT_TOKEN,
DOMAIN,
SERVICE_GET_ASSETS,
SERVICE_REFRESH,
SERVICE_SEND_TELEGRAM_NOTIFICATION,
)
from .coordinator import AlbumData, ImmichAlbumWatcherCoordinator
from .storage import TelegramFileCache
_LOGGER = logging.getLogger(__name__)
# Telegram constants
TELEGRAM_API_BASE_URL = "https://api.telegram.org/bot"
TELEGRAM_MAX_PHOTO_SIZE = 10 * 1024 * 1024 # 10 MB - Telegram's max photo size
TELEGRAM_MAX_VIDEO_SIZE = 50 * 1024 * 1024 # 50 MB - Telegram's max video/document upload size
TELEGRAM_MAX_DIMENSION_SUM = 10000 # Maximum sum of width + height in pixels
# Regex pattern for Immich asset ID (UUID format)
_ASSET_ID_PATTERN = re.compile(r"^[a-f0-9-]{36}$")
# Regex patterns to extract asset ID from Immich URLs
# Matches patterns like:
# - /api/assets/{asset_id}/original
# - /api/assets/{asset_id}/thumbnail
# - /api/assets/{asset_id}/video/playback
# - /share/{key}/photos/{asset_id}
_IMMICH_ASSET_ID_PATTERNS = [
re.compile(r"/api/assets/([a-f0-9-]{36})/(?:original|thumbnail|video)"),
re.compile(r"/share/[^/]+/photos/([a-f0-9-]{36})"),
]
def _is_asset_id(value: str) -> bool:
"""Check if a string is a valid Immich asset ID (UUID format).
Args:
value: The string to check
Returns:
True if the string matches the UUID format, False otherwise
"""
return bool(_ASSET_ID_PATTERN.match(value))
def _extract_asset_id_from_url(url: str) -> str | None:
"""Extract asset ID from Immich URL if possible.
Supports the following URL patterns:
- /api/assets/{asset_id}/original?...
- /api/assets/{asset_id}/thumbnail?...
- /api/assets/{asset_id}/video/playback?...
- /share/{key}/photos/{asset_id}
Args:
url: The URL to extract asset ID from
Returns:
The asset ID if found, None otherwise
"""
if not url:
return None
for pattern in _IMMICH_ASSET_ID_PATTERNS:
match = pattern.search(url)
if match:
return match.group(1)
return None
def _split_media_by_upload_size(
media_items: list[tuple], max_upload_size: int
) -> list[list[tuple]]:
"""Split media items into sub-groups respecting upload size limit.
Cached items (file_id references) don't count toward upload size since
they are sent as lightweight JSON references, not uploaded data.
Args:
media_items: List of (media_type, media_ref, filename, cache_key, is_cached, content_type)
max_upload_size: Maximum total upload bytes per sub-group
Returns:
List of sub-groups, each a list of media_items tuples
"""
if not media_items:
return []
groups: list[list[tuple]] = []
current_group: list[tuple] = []
current_upload_size = 0
for item in media_items:
_, media_ref, _, _, is_cached, _ = item
item_upload_size = 0 if is_cached else len(media_ref)
if current_group and current_upload_size + item_upload_size > max_upload_size:
groups.append(current_group)
current_group = [item]
current_upload_size = item_upload_size
else:
current_group.append(item)
current_upload_size += item_upload_size
if current_group:
groups.append(current_group)
return groups
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up Immich Album Watcher sensors from a config entry."""
# Iterate through all album subentries
for subentry_id, subentry in entry.subentries.items():
subentry_data = hass.data[DOMAIN][entry.entry_id]["subentries"].get(subentry_id)
if not subentry_data:
_LOGGER.error("Subentry data not found for %s", subentry_id)
continue
coordinator = subentry_data.coordinator
entities: list[SensorEntity] = [
ImmichAlbumIdSensor(coordinator, entry, subentry),
ImmichAlbumAssetCountSensor(coordinator, entry, subentry),
ImmichAlbumPhotoCountSensor(coordinator, entry, subentry),
ImmichAlbumVideoCountSensor(coordinator, entry, subentry),
ImmichAlbumLastUpdatedSensor(coordinator, entry, subentry),
ImmichAlbumCreatedSensor(coordinator, entry, subentry),
ImmichAlbumPublicUrlSensor(coordinator, entry, subentry),
ImmichAlbumProtectedUrlSensor(coordinator, entry, subentry),
ImmichAlbumProtectedPasswordSensor(coordinator, entry, subentry),
]
async_add_entities(entities, config_subentry_id=subentry_id)
# Register entity services
platform = entity_platform.async_get_current_platform()
platform.async_register_entity_service(
SERVICE_REFRESH,
{},
"async_refresh_album",
)
platform.async_register_entity_service(
SERVICE_GET_ASSETS,
{
vol.Optional("limit", default=10): vol.All(
vol.Coerce(int), vol.Range(min=1, max=100)
),
vol.Optional("offset", default=0): vol.All(
vol.Coerce(int), vol.Range(min=0)
),
vol.Optional("favorite_only", default=False): bool,
vol.Optional("filter_min_rating", default=1): vol.All(
vol.Coerce(int), vol.Range(min=1, max=5)
),
vol.Optional("order_by", default="date"): vol.In(
["date", "rating", "name", "random"]
),
vol.Optional("order", default="descending"): vol.In(
["ascending", "descending"]
),
vol.Optional("asset_type", default="all"): vol.In(["all", "photo", "video"]),
vol.Optional("min_date"): str,
vol.Optional("max_date"): str,
vol.Optional("memory_date"): str,
vol.Optional("city"): str,
vol.Optional("state"): str,
vol.Optional("country"): str,
},
"async_get_assets",
supports_response=SupportsResponse.ONLY,
)
platform.async_register_entity_service(
SERVICE_SEND_TELEGRAM_NOTIFICATION,
{
vol.Optional("bot_token"): str,
vol.Required("chat_id"): vol.Coerce(str),
vol.Optional("assets"): list,
vol.Optional("caption"): str,
vol.Optional("reply_to_message_id"): vol.Coerce(int),
vol.Optional("disable_web_page_preview"): bool,
vol.Optional("parse_mode", default="HTML"): str,
vol.Optional("max_group_size", default=10): vol.All(
vol.Coerce(int), vol.Range(min=2, max=10)
),
vol.Optional("chunk_delay", default=0): vol.All(
vol.Coerce(int), vol.Range(min=0, max=60000)
),
vol.Optional("wait_for_response", default=True): bool,
vol.Optional("max_asset_data_size"): vol.All(
vol.Coerce(int), vol.Range(min=1, max=52428800)
),
vol.Optional("send_large_photos_as_documents", default=False): bool,
vol.Optional("chat_action", default="typing"): vol.Any(
None, vol.In(["typing", "upload_photo", "upload_video", "upload_document"])
),
},
"async_send_telegram_notification",
supports_response=SupportsResponse.OPTIONAL,
)
class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], SensorEntity):
"""Base sensor for Immich album."""
_attr_has_entity_name = True
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator)
self._entry = entry
self._subentry = subentry
self._album_id = subentry.data[CONF_ALBUM_ID]
self._album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album")
self._hub_name = entry.data.get(CONF_HUB_NAME, "Immich")
# Generate unique_id prefix: {hub_name}_album_{album_name}
self._unique_id_prefix = slugify(f"{self._hub_name}_album_{self._album_name}")
@property
def _album_data(self) -> AlbumData | None:
"""Get the album data from coordinator."""
return self.coordinator.data
@property
def available(self) -> bool:
"""Return if entity is available."""
return self.coordinator.last_update_success and self._album_data is not None
@property
def device_info(self) -> DeviceInfo:
"""Return device info - one device per album."""
return DeviceInfo(
identifiers={(DOMAIN, self._subentry.subentry_id)},
name=self._album_name,
manufacturer="Immich",
entry_type=DeviceEntryType.SERVICE,
)
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self.async_write_ha_state()
async def async_refresh_album(self) -> None:
"""Refresh data for this album."""
await self.coordinator.async_refresh_now()
async def async_get_assets(
self,
limit: int = 10,
offset: int = 0,
favorite_only: bool = False,
filter_min_rating: int = 1,
order_by: str = "date",
order: str = "descending",
asset_type: str = "all",
min_date: str | None = None,
max_date: str | None = None,
memory_date: str | None = None,
city: str | None = None,
state: str | None = None,
country: str | None = None,
) -> ServiceResponse:
"""Get assets for this album with optional filtering and ordering."""
assets = await self.coordinator.async_get_assets(
limit=limit,
offset=offset,
favorite_only=favorite_only,
filter_min_rating=filter_min_rating,
order_by=order_by,
order=order,
asset_type=asset_type,
min_date=min_date,
max_date=max_date,
memory_date=memory_date,
city=city,
state=state,
country=country,
)
return {"assets": assets}
async def async_send_telegram_notification(
self,
chat_id: str,
assets: list[dict[str, str]] | None = None,
bot_token: str | None = None,
caption: str | None = None,
reply_to_message_id: int | None = None,
disable_web_page_preview: bool | None = None,
parse_mode: str = "HTML",
max_group_size: int = 10,
chunk_delay: int = 0,
wait_for_response: bool = True,
max_asset_data_size: int | None = None,
send_large_photos_as_documents: bool = False,
chat_action: str | None = "typing",
) -> ServiceResponse:
"""Send notification to Telegram.
Supports:
- Empty assets: sends a simple text message
- Single photo: uses sendPhoto API
- Single video: uses sendVideo API
- Multiple items: uses sendMediaGroup API (splits into multiple groups if needed)
Each item in assets should be a dict with 'url', optional 'type' (photo/video/document),
and optional 'cache_key' (custom key for caching instead of URL).
Downloads media and uploads to Telegram to bypass CORS restrictions.
If wait_for_response is False, the task will be executed in the background
and the service will return immediately.
"""
# If non-blocking mode, create a background task and return immediately
if not wait_for_response:
self.hass.async_create_task(
self._execute_telegram_notification(
chat_id=chat_id,
assets=assets,
bot_token=bot_token,
caption=caption,
reply_to_message_id=reply_to_message_id,
disable_web_page_preview=disable_web_page_preview,
parse_mode=parse_mode,
max_group_size=max_group_size,
chunk_delay=chunk_delay,
max_asset_data_size=max_asset_data_size,
send_large_photos_as_documents=send_large_photos_as_documents,
chat_action=chat_action,
)
)
return {"success": True, "status": "queued", "message": "Notification queued for background processing"}
# Blocking mode - execute and return result
return await self._execute_telegram_notification(
chat_id=chat_id,
assets=assets,
bot_token=bot_token,
caption=caption,
reply_to_message_id=reply_to_message_id,
disable_web_page_preview=disable_web_page_preview,
parse_mode=parse_mode,
max_group_size=max_group_size,
chunk_delay=chunk_delay,
max_asset_data_size=max_asset_data_size,
send_large_photos_as_documents=send_large_photos_as_documents,
chat_action=chat_action,
)
async def _execute_telegram_notification(
self,
chat_id: str,
assets: list[dict[str, str]] | None = None,
bot_token: str | None = None,
caption: str | None = None,
reply_to_message_id: int | None = None,
disable_web_page_preview: bool | None = None,
parse_mode: str = "HTML",
max_group_size: int = 10,
chunk_delay: int = 0,
max_asset_data_size: int | None = None,
send_large_photos_as_documents: bool = False,
chat_action: str | None = "typing",
) -> ServiceResponse:
"""Execute the Telegram notification (internal method)."""
import json
import aiohttp
from aiohttp import FormData
from homeassistant.helpers.aiohttp_client import async_get_clientsession
# Get bot token from parameter or config
token = bot_token or self._entry.options.get(CONF_TELEGRAM_BOT_TOKEN)
if not token:
return {
"success": False,
"error": "No bot token provided. Set it in integration options or pass as parameter.",
}
session = async_get_clientsession(self.hass)
# Handle empty assets - send simple text message (no typing indicator needed)
if not assets:
return await self._send_telegram_message(
session, token, chat_id, caption or "", reply_to_message_id, disable_web_page_preview, parse_mode
)
# Start chat action indicator for media notifications (before downloading assets)
typing_task = None
if chat_action:
typing_task = self._start_typing_indicator(session, token, chat_id, chat_action)
try:
# Handle single photo
if len(assets) == 1 and assets[0].get("type") == "photo":
return await self._send_telegram_photo(
session, token, chat_id, assets[0].get("url"), caption, reply_to_message_id, parse_mode,
max_asset_data_size, send_large_photos_as_documents, assets[0].get("content_type"),
assets[0].get("cache_key")
)
# Handle single video
if len(assets) == 1 and assets[0].get("type") == "video":
return await self._send_telegram_video(
session, token, chat_id, assets[0].get("url"), caption, reply_to_message_id, parse_mode,
max_asset_data_size, assets[0].get("content_type"), assets[0].get("cache_key")
)
# Handle single document (default type)
if len(assets) == 1 and assets[0].get("type", "document") == "document":
url = assets[0].get("url")
if not url:
return {"success": False, "error": "Missing 'url' for document"}
item_content_type = assets[0].get("content_type")
item_cache_key = assets[0].get("cache_key")
try:
download_url = self.coordinator.get_internal_download_url(url)
async with session.get(download_url) as resp:
if resp.status != 200:
return {"success": False, "error": f"Failed to download media: HTTP {resp.status}"}
data = await resp.read()
if max_asset_data_size is not None and len(data) > max_asset_data_size:
return {"success": False, "error": f"Media size ({len(data)} bytes) exceeds max_asset_data_size limit ({max_asset_data_size} bytes)"}
# Detect filename from URL or use generic name
filename = url.split("/")[-1].split("?")[0] or "file"
return await self._send_telegram_document(
session, token, chat_id, data, filename, caption, reply_to_message_id, parse_mode,
url, item_content_type, item_cache_key
)
except aiohttp.ClientError as err:
return {"success": False, "error": f"Failed to download media: {err}"}
# Handle multiple items - send as media group(s)
return await self._send_telegram_media_group(
session, token, chat_id, assets, caption, reply_to_message_id, max_group_size, chunk_delay, parse_mode,
max_asset_data_size, send_large_photos_as_documents
)
finally:
# Stop chat action indicator when done (success or error)
if typing_task:
typing_task.cancel()
try:
await typing_task
except asyncio.CancelledError:
pass
async def _send_telegram_message(
self,
session: Any,
token: str,
chat_id: str,
text: str,
reply_to_message_id: int | None = None,
disable_web_page_preview: bool | None = None,
parse_mode: str = "HTML",
) -> ServiceResponse:
"""Send a simple text message to Telegram."""
import aiohttp
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendMessage"
payload: dict[str, Any] = {
"chat_id": chat_id,
"text": text or "Notification from Home Assistant",
"parse_mode": parse_mode,
}
if reply_to_message_id:
payload["reply_to_message_id"] = reply_to_message_id
if disable_web_page_preview is not None:
payload["disable_web_page_preview"] = disable_web_page_preview
try:
_LOGGER.debug("Sending text message to Telegram")
async with session.post(telegram_url, json=payload) as response:
result = await response.json()
_LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok"))
if response.status == 200 and result.get("ok"):
return {
"success": True,
"message_id": result.get("result", {}).get("message_id"),
}
else:
_LOGGER.error("Telegram API error: %s", result)
return {
"success": False,
"error": result.get("description", "Unknown Telegram error"),
"error_code": result.get("error_code"),
}
except aiohttp.ClientError as err:
_LOGGER.error("Telegram message send failed: %s", err)
return {"success": False, "error": str(err)}
async def _send_telegram_chat_action(
self,
session: Any,
token: str,
chat_id: str,
action: str = "typing",
) -> bool:
"""Send a chat action to Telegram (e.g., typing indicator).
Args:
session: aiohttp client session
token: Telegram bot token
chat_id: Target chat ID
action: Chat action type (typing, upload_photo, upload_video, etc.)
Returns:
True if successful, False otherwise
"""
import aiohttp
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendChatAction"
payload = {"chat_id": chat_id, "action": action}
try:
async with session.post(telegram_url, json=payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
_LOGGER.debug("Sent chat action '%s' to chat %s", action, chat_id)
return True
else:
_LOGGER.debug("Failed to send chat action: %s", result.get("description"))
return False
except aiohttp.ClientError as err:
_LOGGER.debug("Chat action request failed: %s", err)
return False
def _start_typing_indicator(
self,
session: Any,
token: str,
chat_id: str,
action: str = "typing",
) -> asyncio.Task:
"""Start a background task that sends chat action indicator periodically.
The chat action indicator expires after ~5 seconds, so we refresh it every 4 seconds.
Args:
session: aiohttp client session
token: Telegram bot token
chat_id: Target chat ID
action: Chat action type (typing, upload_photo, upload_video, etc.)
Returns:
The background task (cancel it when done)
"""
async def action_loop() -> None:
"""Keep sending chat action until cancelled."""
try:
while True:
await self._send_telegram_chat_action(session, token, chat_id, action)
await asyncio.sleep(4)
except asyncio.CancelledError:
_LOGGER.debug("Chat action indicator stopped for action '%s'", action)
return asyncio.create_task(action_loop())
def _log_telegram_error(
self,
error_code: int | None,
description: str,
data: bytes | None = None,
media_type: str = "photo",
) -> None:
"""Log detailed Telegram API error with diagnostics.
Args:
error_code: Telegram error code
description: Error description from Telegram
data: Media data bytes (optional, for size diagnostics)
media_type: Type of media (photo/video)
"""
error_msg = f"Telegram API error ({error_code}): {description}"
# Add diagnostic information based on error type
if data:
error_msg += f" | Media size: {len(data)} bytes ({len(data) / (1024 * 1024):.2f} MB)"
# Check dimensions for photos
if media_type == "photo":
try:
from PIL import Image
import io
img = Image.open(io.BytesIO(data))
width, height = img.size
dimension_sum = width + height
error_msg += f" | Dimensions: {width}x{height} (sum={dimension_sum})"
# Highlight limit violations
if len(data) > TELEGRAM_MAX_PHOTO_SIZE:
error_msg += f" | EXCEEDS size limit ({TELEGRAM_MAX_PHOTO_SIZE / (1024 * 1024):.0f} MB)"
if dimension_sum > TELEGRAM_MAX_DIMENSION_SUM:
error_msg += f" | EXCEEDS dimension limit ({TELEGRAM_MAX_DIMENSION_SUM})"
except Exception:
pass
# Check size limit for videos
if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE:
error_msg += f" | EXCEEDS Telegram upload limit ({TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024):.0f} MB)"
# Provide suggestions based on error description
suggestions = []
if "dimension" in description.lower() or "PHOTO_INVALID_DIMENSIONS" in description:
suggestions.append("Photo dimensions too large - consider setting send_large_photos_as_documents=true")
elif "too large" in description.lower() or error_code == 413:
suggestions.append("File size too large - consider setting send_large_photos_as_documents=true or max_asset_data_size to skip large files")
elif "entity too large" in description.lower():
suggestions.append("Request entity too large - reduce max_group_size or set max_asset_data_size")
if suggestions:
error_msg += f" | Suggestions: {'; '.join(suggestions)}"
_LOGGER.error(error_msg)
def _check_telegram_photo_limits(
self,
data: bytes,
) -> tuple[bool, str | None, int | None, int | None]:
"""Check if photo data exceeds Telegram photo limits.
Telegram limits for photos:
- Max file size: 10 MB
- Max dimension sum: ~10,000 pixels (width + height)
Returns:
Tuple of (exceeds_limits, reason, width, height)
- exceeds_limits: True if photo exceeds limits
- reason: Human-readable reason (None if within limits)
- width: Image width in pixels (None if PIL not available)
- height: Image height in pixels (None if PIL not available)
"""
# Check file size
if len(data) > TELEGRAM_MAX_PHOTO_SIZE:
return True, f"size {len(data)} bytes exceeds {TELEGRAM_MAX_PHOTO_SIZE} bytes limit", None, None
# Try to check dimensions using PIL
try:
from PIL import Image
import io
img = Image.open(io.BytesIO(data))
width, height = img.size
dimension_sum = width + height
if dimension_sum > TELEGRAM_MAX_DIMENSION_SUM:
return True, f"dimensions {width}x{height} (sum={dimension_sum}) exceed {TELEGRAM_MAX_DIMENSION_SUM} limit", width, height
return False, None, width, height
except ImportError:
# PIL not available, can't check dimensions
_LOGGER.debug("PIL not available, skipping dimension check")
return False, None, None, None
except Exception as e:
# Failed to check dimensions
_LOGGER.debug("Failed to check photo dimensions: %s", e)
return False, None, None, None
def _get_telegram_cache_and_key(
self,
url: str | None,
cache_key: str | None = None,
) -> tuple[TelegramFileCache | None, str | None, str | None]:
"""Determine which Telegram cache, key, and thumbhash to use.
Priority: custom cache_key -> direct asset ID -> extracted asset ID from URL -> URL
Args:
url: The URL of the media (or asset ID directly)
cache_key: Optional custom cache key provided by user
Returns:
Tuple of (cache instance, cache key, thumbhash) to use.
thumbhash is only populated when using the asset cache.
"""
if cache_key:
# Custom cache_key uses URL cache (no thumbhash)
return self.coordinator.telegram_cache, cache_key, None
if url:
# Check if url is already an asset ID (UUID format)
if _is_asset_id(url):
thumbhash = self.coordinator.get_asset_thumbhash(url)
return self.coordinator.telegram_asset_cache, url, thumbhash
# Try to extract asset ID from URL
asset_id = _extract_asset_id_from_url(url)
if asset_id:
# Extracted asset ID uses asset cache
thumbhash = self.coordinator.get_asset_thumbhash(asset_id)
return self.coordinator.telegram_asset_cache, asset_id, thumbhash
# Fallback to URL cache with URL as key (no thumbhash)
return self.coordinator.telegram_cache, url, None
return None, None, None
async def _send_telegram_photo(
self,
session: Any,
token: str,
chat_id: str,
url: str | None,
caption: str | None = None,
reply_to_message_id: int | None = None,
parse_mode: str = "HTML",
max_asset_data_size: int | None = None,
send_large_photos_as_documents: bool = False,
content_type: str | None = None,
cache_key: str | None = None,
) -> ServiceResponse:
"""Send a single photo to Telegram."""
import aiohttp
from aiohttp import FormData
# Use provided content type or default to image/jpeg
if not content_type:
content_type = "image/jpeg"
if not url:
return {"success": False, "error": "Missing 'url' for photo"}
# Determine which cache to use and the cache key
effective_cache, effective_cache_key, effective_thumbhash = self._get_telegram_cache_and_key(url, cache_key)
# Check cache for file_id
cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None
if cached and cached.get("file_id") and effective_cache_key:
# Use cached file_id - no download needed
file_id = cached["file_id"]
_LOGGER.debug("Using cached Telegram file_id for photo (key: %s)", effective_cache_key[:36] if len(effective_cache_key) > 36 else effective_cache_key)
payload = {
"chat_id": chat_id,
"photo": file_id,
"parse_mode": parse_mode,
}
if caption:
payload["caption"] = caption
if reply_to_message_id:
payload["reply_to_message_id"] = reply_to_message_id
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendPhoto"
try:
async with session.post(telegram_url, json=payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
return {
"success": True,
"message_id": result.get("result", {}).get("message_id"),
"cached": True,
}
else:
# Cache might be stale, fall through to upload
_LOGGER.debug("Cached file_id failed, will re-upload: %s", result.get("description"))
except aiohttp.ClientError as err:
_LOGGER.debug("Cached file_id request failed: %s", err)
try:
# Download the photo using internal URL for faster local network transfer
download_url = self.coordinator.get_internal_download_url(url)
_LOGGER.debug("Downloading photo from %s", download_url[:80])
async with session.get(download_url) as resp:
if resp.status != 200:
return {
"success": False,
"error": f"Failed to download photo: HTTP {resp.status}",
}
data = await resp.read()
_LOGGER.debug("Downloaded photo: %d bytes", len(data))
# Check if photo exceeds max size limit (user-defined limit)
if max_asset_data_size is not None and len(data) > max_asset_data_size:
_LOGGER.warning(
"Photo size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping",
len(data), max_asset_data_size
)
return {
"success": False,
"error": f"Photo size ({len(data)} bytes) exceeds max_asset_data_size limit ({max_asset_data_size} bytes)",
"skipped": True,
}
# Check if photo exceeds Telegram's photo limits
exceeds_limits, reason, width, height = self._check_telegram_photo_limits(data)
if exceeds_limits:
if send_large_photos_as_documents:
# Send as document instead
_LOGGER.info("Photo %s, sending as document", reason)
return await self._send_telegram_document(
session, token, chat_id, data, "photo.jpg",
caption, reply_to_message_id, parse_mode, url, None, cache_key
)
else:
# Skip oversized photo
_LOGGER.warning("Photo %s, skipping (set send_large_photos_as_documents=true to send as document)", reason)
return {
"success": False,
"error": f"Photo {reason}",
"skipped": True,
}
# Build multipart form
form = FormData()
form.add_field("chat_id", chat_id)
form.add_field("photo", data, filename="photo.jpg", content_type=content_type)
form.add_field("parse_mode", parse_mode)
if caption:
form.add_field("caption", caption)
if reply_to_message_id:
form.add_field("reply_to_message_id", str(reply_to_message_id))
# Send to Telegram
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendPhoto"
_LOGGER.debug("Uploading photo to Telegram")
async with session.post(telegram_url, data=form) as response:
result = await response.json()
_LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok"))
if response.status == 200 and result.get("ok"):
# Extract and cache file_id
photos = result.get("result", {}).get("photo", [])
if photos and effective_cache and effective_cache_key:
# Use the largest photo's file_id
file_id = photos[-1].get("file_id")
if file_id:
await effective_cache.async_set(effective_cache_key, file_id, "photo", thumbhash=effective_thumbhash)
return {
"success": True,
"message_id": result.get("result", {}).get("message_id"),
}
else:
# Log detailed error with diagnostics
self._log_telegram_error(
error_code=result.get("error_code"),
description=result.get("description", "Unknown Telegram error"),
data=data,
media_type="photo",
)
return {
"success": False,
"error": result.get("description", "Unknown Telegram error"),
"error_code": result.get("error_code"),
}
except aiohttp.ClientError as err:
_LOGGER.error("Telegram photo upload failed: %s", err)
return {"success": False, "error": str(err)}
async def _send_telegram_video(
self,
session: Any,
token: str,
chat_id: str,
url: str | None,
caption: str | None = None,
reply_to_message_id: int | None = None,
parse_mode: str = "HTML",
max_asset_data_size: int | None = None,
content_type: str | None = None,
cache_key: str | None = None,
) -> ServiceResponse:
"""Send a single video to Telegram."""
import aiohttp
from aiohttp import FormData
# Use provided content type or default to video/mp4
if not content_type:
content_type = "video/mp4"
if not url:
return {"success": False, "error": "Missing 'url' for video"}
# Determine which cache to use and the cache key
effective_cache, effective_cache_key, effective_thumbhash = self._get_telegram_cache_and_key(url, cache_key)
# Check cache for file_id
cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None
if cached and cached.get("file_id") and effective_cache_key:
# Use cached file_id - no download needed
file_id = cached["file_id"]
_LOGGER.debug("Using cached Telegram file_id for video (key: %s)", effective_cache_key[:36] if len(effective_cache_key) > 36 else effective_cache_key)
payload = {
"chat_id": chat_id,
"video": file_id,
"parse_mode": parse_mode,
}
if caption:
payload["caption"] = caption
if reply_to_message_id:
payload["reply_to_message_id"] = reply_to_message_id
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendVideo"
try:
async with session.post(telegram_url, json=payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
return {
"success": True,
"message_id": result.get("result", {}).get("message_id"),
"cached": True,
}
else:
# Cache might be stale, fall through to upload
_LOGGER.debug("Cached file_id failed, will re-upload: %s", result.get("description"))
except aiohttp.ClientError as err:
_LOGGER.debug("Cached file_id request failed: %s", err)
try:
# Download the video using internal URL for faster local network transfer
download_url = self.coordinator.get_internal_download_url(url)
_LOGGER.debug("Downloading video from %s", download_url[:80])
async with session.get(download_url) as resp:
if resp.status != 200:
return {
"success": False,
"error": f"Failed to download video: HTTP {resp.status}",
}
data = await resp.read()
_LOGGER.debug("Downloaded video: %d bytes", len(data))
# Check if video exceeds max size limit (user-defined limit)
if max_asset_data_size is not None and len(data) > max_asset_data_size:
_LOGGER.warning(
"Video size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping",
len(data), max_asset_data_size
)
return {
"success": False,
"error": f"Video size ({len(data)} bytes) exceeds max_asset_data_size limit ({max_asset_data_size} bytes)",
"skipped": True,
}
# Check if video exceeds Telegram's upload limit (50 MB)
if len(data) > TELEGRAM_MAX_VIDEO_SIZE:
_LOGGER.warning(
"Video size (%d bytes, %.1f MB) exceeds Telegram's %d bytes (%.0f MB) upload limit, skipping",
len(data), len(data) / (1024 * 1024),
TELEGRAM_MAX_VIDEO_SIZE, TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024),
)
return {
"success": False,
"error": f"Video size ({len(data) / (1024 * 1024):.1f} MB) exceeds Telegram's {TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024):.0f} MB upload limit",
"skipped": True,
}
# Build multipart form
form = FormData()
form.add_field("chat_id", chat_id)
form.add_field("video", data, filename="video.mp4", content_type=content_type)
form.add_field("parse_mode", parse_mode)
if caption:
form.add_field("caption", caption)
if reply_to_message_id:
form.add_field("reply_to_message_id", str(reply_to_message_id))
# Send to Telegram
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendVideo"
_LOGGER.debug("Uploading video to Telegram")
async with session.post(telegram_url, data=form) as response:
result = await response.json()
_LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok"))
if response.status == 200 and result.get("ok"):
# Extract and cache file_id
video = result.get("result", {}).get("video", {})
if video and effective_cache and effective_cache_key:
file_id = video.get("file_id")
if file_id:
await effective_cache.async_set(effective_cache_key, file_id, "video", thumbhash=effective_thumbhash)
return {
"success": True,
"message_id": result.get("result", {}).get("message_id"),
}
else:
# Log detailed error with diagnostics
self._log_telegram_error(
error_code=result.get("error_code"),
description=result.get("description", "Unknown Telegram error"),
data=data,
media_type="video",
)
return {
"success": False,
"error": result.get("description", "Unknown Telegram error"),
"error_code": result.get("error_code"),
}
except aiohttp.ClientError as err:
_LOGGER.error("Telegram video upload failed: %s", err)
return {"success": False, "error": str(err)}
async def _send_telegram_document(
self,
session: Any,
token: str,
chat_id: str,
data: bytes,
filename: str = "file",
caption: str | None = None,
reply_to_message_id: int | None = None,
parse_mode: str = "HTML",
source_url: str | None = None,
content_type: str | None = None,
cache_key: str | None = None,
) -> ServiceResponse:
"""Send a file as a document to Telegram."""
import aiohttp
import mimetypes
from aiohttp import FormData
# Use provided content type or detect from filename
if not content_type:
content_type, _ = mimetypes.guess_type(filename)
if not content_type:
content_type = "application/octet-stream"
# Determine which cache and key to use
effective_cache, effective_cache_key, effective_thumbhash = self._get_telegram_cache_and_key(source_url, cache_key)
# Check cache for file_id
if effective_cache and effective_cache_key:
cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash)
if cached and cached.get("file_id") and cached.get("type") == "document":
# Use cached file_id
file_id = cached["file_id"]
_LOGGER.debug("Using cached Telegram file_id for document (key: %s)", effective_cache_key[:36] if len(effective_cache_key) > 36 else effective_cache_key)
payload = {
"chat_id": chat_id,
"document": file_id,
"parse_mode": parse_mode,
}
if caption:
payload["caption"] = caption
if reply_to_message_id:
payload["reply_to_message_id"] = reply_to_message_id
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendDocument"
try:
async with session.post(telegram_url, json=payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
return {
"success": True,
"message_id": result.get("result", {}).get("message_id"),
"cached": True,
}
else:
_LOGGER.debug("Cached file_id failed, will re-upload: %s", result.get("description"))
except aiohttp.ClientError as err:
_LOGGER.debug("Cached file_id request failed: %s", err)
try:
# Build multipart form
form = FormData()
form.add_field("chat_id", chat_id)
form.add_field("document", data, filename=filename, content_type=content_type)
form.add_field("parse_mode", parse_mode)
if caption:
form.add_field("caption", caption)
if reply_to_message_id:
form.add_field("reply_to_message_id", str(reply_to_message_id))
# Send to Telegram
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendDocument"
_LOGGER.debug("Uploading document to Telegram (%d bytes, %s)", len(data), content_type)
async with session.post(telegram_url, data=form) as response:
result = await response.json()
_LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok"))
if response.status == 200 and result.get("ok"):
# Extract and cache file_id
if effective_cache_key and effective_cache:
document = result.get("result", {}).get("document", {})
file_id = document.get("file_id")
if file_id:
await effective_cache.async_set(effective_cache_key, file_id, "document", thumbhash=effective_thumbhash)
return {
"success": True,
"message_id": result.get("result", {}).get("message_id"),
}
else:
# Log detailed error with diagnostics
self._log_telegram_error(
error_code=result.get("error_code"),
description=result.get("description", "Unknown Telegram error"),
data=data,
media_type="document",
)
return {
"success": False,
"error": result.get("description", "Unknown Telegram error"),
"error_code": result.get("error_code"),
}
except aiohttp.ClientError as err:
_LOGGER.error("Telegram document upload failed: %s", err)
return {"success": False, "error": str(err)}
async def _send_telegram_media_group(
self,
session: Any,
token: str,
chat_id: str,
assets: list[dict[str, str]],
caption: str | None = None,
reply_to_message_id: int | None = None,
max_group_size: int = 10,
chunk_delay: int = 0,
parse_mode: str = "HTML",
max_asset_data_size: int | None = None,
send_large_photos_as_documents: bool = False,
) -> ServiceResponse:
"""Send media assets to Telegram as media group(s).
If assets list exceeds max_group_size, splits into multiple media groups.
For chunks with single items, uses sendPhoto/sendVideo APIs.
Applies chunk_delay (in milliseconds) between groups if specified.
"""
import json
import asyncio
import aiohttp
from aiohttp import FormData
# Split assets into chunks based on max_group_size
chunks = [assets[i:i + max_group_size] for i in range(0, len(assets), max_group_size)]
all_message_ids = []
_LOGGER.debug("Sending %d media items in %d chunk(s) of max %d items (delay: %dms)",
len(assets), len(chunks), max_group_size, chunk_delay)
for chunk_idx, chunk in enumerate(chunks):
# Add delay before sending subsequent chunks
if chunk_idx > 0 and chunk_delay > 0:
delay_seconds = chunk_delay / 1000
_LOGGER.debug("Waiting %dms (%ss) before sending chunk %d/%d",
chunk_delay, delay_seconds, chunk_idx + 1, len(chunks))
await asyncio.sleep(delay_seconds)
# Optimize: Use single-item APIs for chunks with 1 item
if len(chunk) == 1:
item = chunk[0]
media_type = item.get("type", "document")
url = item.get("url")
item_content_type = item.get("content_type")
item_cache_key = item.get("cache_key")
# Only apply caption and reply_to to the first chunk
chunk_caption = caption if chunk_idx == 0 else None
chunk_reply_to = reply_to_message_id if chunk_idx == 0 else None
if media_type == "photo":
_LOGGER.debug("Sending chunk %d/%d as single photo", chunk_idx + 1, len(chunks))
result = await self._send_telegram_photo(
session, token, chat_id, url, chunk_caption, chunk_reply_to, parse_mode,
max_asset_data_size, send_large_photos_as_documents, item_content_type, item_cache_key
)
elif media_type == "video":
_LOGGER.debug("Sending chunk %d/%d as single video", chunk_idx + 1, len(chunks))
result = await self._send_telegram_video(
session, token, chat_id, url, chunk_caption, chunk_reply_to, parse_mode,
max_asset_data_size, item_content_type, item_cache_key
)
else: # document
_LOGGER.debug("Sending chunk %d/%d as single document", chunk_idx + 1, len(chunks))
if not url:
return {"success": False, "error": "Missing 'url' for document", "failed_at_chunk": chunk_idx + 1}
try:
download_url = self.coordinator.get_internal_download_url(url)
async with session.get(download_url) as resp:
if resp.status != 200:
return {"success": False, "error": f"Failed to download media: HTTP {resp.status}", "failed_at_chunk": chunk_idx + 1}
data = await resp.read()
if max_asset_data_size is not None and len(data) > max_asset_data_size:
_LOGGER.warning("Media size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping", len(data), max_asset_data_size)
continue
filename = url.split("/")[-1].split("?")[0] or "file"
result = await self._send_telegram_document(
session, token, chat_id, data, filename, chunk_caption, chunk_reply_to, parse_mode,
url, item_content_type, item_cache_key
)
except aiohttp.ClientError as err:
return {"success": False, "error": f"Failed to download media: {err}", "failed_at_chunk": chunk_idx + 1}
if not result.get("success"):
result["failed_at_chunk"] = chunk_idx + 1
return result
all_message_ids.append(result.get("message_id"))
continue
# Multi-item chunk: use sendMediaGroup
_LOGGER.debug("Sending chunk %d/%d as media group (%d items)", chunk_idx + 1, len(chunks), len(chunk))
# Helper to get the appropriate cache for a cache key
def get_cache_for_key(key: str) -> TelegramFileCache | None:
"""Return asset cache if key is a UUID, otherwise URL cache."""
is_asset_id = _is_asset_id(key)
return self.coordinator.telegram_asset_cache if is_asset_id else self.coordinator.telegram_cache
# Collect media items - either from cache (file_id) or by downloading
# Each item: (type, media_ref, filename, cache_key, is_cached, content_type)
# media_ref is either file_id (str) or data (bytes)
media_items: list[tuple[str, str | bytes, str, str, bool, str | None]] = []
oversized_photos: list[tuple[bytes, str | None, str, str | None]] = [] # (data, caption, url, cache_key)
documents_to_send: list[tuple[bytes, str | None, str, str | None, str, str | None]] = [] # (data, caption, url, cache_key, filename, content_type)
skipped_count = 0
for i, item in enumerate(chunk):
url = item.get("url")
if not url:
return {
"success": False,
"error": f"Missing 'url' in item {chunk_idx * max_group_size + i}",
}
media_type = item.get("type", "document")
item_content_type = item.get("content_type")
# Determine cache key: custom cache_key -> extracted asset ID -> URL
custom_cache_key = item.get("cache_key")
extracted_asset_id = _extract_asset_id_from_url(url) if not custom_cache_key else None
item_cache_key = custom_cache_key or extracted_asset_id or url
if media_type not in ("photo", "video", "document"):
return {
"success": False,
"error": f"Invalid type '{media_type}' in item {chunk_idx * max_group_size + i}. Must be 'photo', 'video', or 'document'.",
}
# Documents can't be in media groups - collect them for separate sending
if media_type == "document":
try:
download_url = self.coordinator.get_internal_download_url(url)
async with session.get(download_url) as resp:
if resp.status != 200:
return {
"success": False,
"error": f"Failed to download media {chunk_idx * max_group_size + i}: HTTP {resp.status}",
}
data = await resp.read()
if max_asset_data_size is not None and len(data) > max_asset_data_size:
_LOGGER.warning(
"Media %d size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping",
chunk_idx * max_group_size + i, len(data), max_asset_data_size
)
skipped_count += 1
continue
# Caption only on first item of first chunk if no media items yet
doc_caption = caption if chunk_idx == 0 and i == 0 and len(media_items) == 0 and len(documents_to_send) == 0 else None
filename = url.split("/")[-1].split("?")[0] or f"file_{i}"
documents_to_send.append((data, doc_caption, url, custom_cache_key, filename, item_content_type))
except aiohttp.ClientError as err:
return {
"success": False,
"error": f"Failed to download media {chunk_idx * max_group_size + i}: {err}",
}
continue
# Check cache first for photos/videos
item_cache = get_cache_for_key(item_cache_key)
item_thumbhash = self.coordinator.get_asset_thumbhash(item_cache_key) if _is_asset_id(item_cache_key) else None
cached = item_cache.get(item_cache_key, thumbhash=item_thumbhash) if item_cache else None
if cached and cached.get("file_id"):
# Use cached file_id
ext = "jpg" if media_type == "photo" else "mp4"
filename = f"media_{chunk_idx * max_group_size + i}.{ext}"
media_items.append((media_type, cached["file_id"], filename, item_cache_key, True, item_content_type))
_LOGGER.debug("Using cached file_id for media %d", chunk_idx * max_group_size + i)
continue
try:
# Download using internal URL for faster local network transfer
download_url = self.coordinator.get_internal_download_url(url)
_LOGGER.debug("Downloading media %d from %s", chunk_idx * max_group_size + i, download_url[:80])
async with session.get(download_url) as resp:
if resp.status != 200:
return {
"success": False,
"error": f"Failed to download media {chunk_idx * max_group_size + i}: HTTP {resp.status}",
}
data = await resp.read()
_LOGGER.debug("Downloaded media %d: %d bytes", chunk_idx * max_group_size + i, len(data))
# Check if media exceeds max_asset_data_size limit (user-defined limit for skipping)
if max_asset_data_size is not None and len(data) > max_asset_data_size:
_LOGGER.warning(
"Media %d size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping",
chunk_idx * max_group_size + i, len(data), max_asset_data_size
)
skipped_count += 1
continue
# For videos, check Telegram upload limit (50 MB)
if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE:
_LOGGER.warning(
"Video %d size (%d bytes, %.1f MB) exceeds Telegram's %.0f MB upload limit, skipping",
chunk_idx * max_group_size + i, len(data),
len(data) / (1024 * 1024),
TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024),
)
skipped_count += 1
continue
# For photos, check Telegram limits
if media_type == "photo":
exceeds_limits, reason, width, height = self._check_telegram_photo_limits(data)
if exceeds_limits:
if send_large_photos_as_documents:
# Separate this photo to send as document later
# Caption only on first item of first chunk
photo_caption = caption if chunk_idx == 0 and i == 0 and len(media_items) == 0 else None
oversized_photos.append((data, photo_caption, url, custom_cache_key))
_LOGGER.info("Photo %d %s, will send as document", i, reason)
continue
else:
# Skip oversized photo
_LOGGER.warning("Photo %d %s, skipping (set send_large_photos_as_documents=true to send as document)", i, reason)
skipped_count += 1
continue
ext = "jpg" if media_type == "photo" else "mp4"
filename = f"media_{chunk_idx * max_group_size + i}.{ext}"
media_items.append((media_type, data, filename, item_cache_key, False, item_content_type))
except aiohttp.ClientError as err:
return {
"success": False,
"error": f"Failed to download media {chunk_idx * max_group_size + i}: {err}",
}
# Skip this chunk if all files were filtered out
if not media_items and not oversized_photos:
_LOGGER.info("Chunk %d/%d: all %d media items skipped",
chunk_idx + 1, len(chunks), len(chunk))
continue
# Split media items into sub-groups respecting Telegram's upload size limit
# This ensures the total upload data per sendMediaGroup call stays under 50 MB
if media_items:
media_sub_groups = _split_media_by_upload_size(media_items, TELEGRAM_MAX_VIDEO_SIZE)
if len(media_sub_groups) > 1:
_LOGGER.debug(
"Chunk %d/%d: split %d media items into %d sub-groups by upload size",
chunk_idx + 1, len(chunks), len(media_items), len(media_sub_groups),
)
first_caption_used = False
for sub_idx, sub_group_items in enumerate(media_sub_groups):
is_first = chunk_idx == 0 and sub_idx == 0
sub_caption = caption if is_first and not first_caption_used and not oversized_photos else None
sub_reply_to = reply_to_message_id if is_first else None
# Add delay between sub-groups (not before the first one)
if sub_idx > 0 and chunk_delay > 0:
await asyncio.sleep(chunk_delay / 1000)
# Single item - use sendPhoto/sendVideo (sendMediaGroup requires 2+ items)
if len(sub_group_items) == 1:
sg_type, sg_ref, sg_fname, sg_ck, sg_cached, sg_ct = sub_group_items[0]
if sg_type == "photo":
api_method = "sendPhoto"
media_field = "photo"
else:
api_method = "sendVideo"
media_field = "video"
try:
if sg_cached:
sg_payload: dict[str, Any] = {
"chat_id": chat_id,
media_field: sg_ref,
"parse_mode": parse_mode,
}
if sub_caption:
sg_payload["caption"] = sub_caption
if sub_reply_to:
sg_payload["reply_to_message_id"] = sub_reply_to
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/{api_method}"
async with session.post(telegram_url, json=sg_payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
all_message_ids.append(result["result"].get("message_id"))
if sub_caption:
first_caption_used = True
else:
_LOGGER.debug("Cached file_id failed in sub-group, will re-upload: %s", result.get("description"))
sg_cached = False # Fall through to upload
if not sg_cached:
sg_form = FormData()
sg_form.add_field("chat_id", chat_id)
sg_content_type = sg_ct or ("image/jpeg" if sg_type == "photo" else "video/mp4")
sg_form.add_field(media_field, sg_ref, filename=sg_fname, content_type=sg_content_type)
sg_form.add_field("parse_mode", parse_mode)
if sub_caption:
sg_form.add_field("caption", sub_caption)
if sub_reply_to:
sg_form.add_field("reply_to_message_id", str(sub_reply_to))
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/{api_method}"
async with session.post(telegram_url, data=sg_form) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
all_message_ids.append(result["result"].get("message_id"))
if sub_caption:
first_caption_used = True
# Cache the uploaded file_id
sg_cache = get_cache_for_key(sg_ck)
if sg_cache:
sg_thumbhash = self.coordinator.get_asset_thumbhash(sg_ck) if _is_asset_id(sg_ck) else None
result_data = result.get("result", {})
if sg_type == "photo":
photos = result_data.get("photo", [])
if photos:
await sg_cache.async_set(sg_ck, photos[-1].get("file_id"), "photo", thumbhash=sg_thumbhash)
elif sg_type == "video":
video = result_data.get("video", {})
if video.get("file_id"):
await sg_cache.async_set(sg_ck, video["file_id"], "video", thumbhash=sg_thumbhash)
else:
self._log_telegram_error(
error_code=result.get("error_code"),
description=result.get("description", "Unknown Telegram error"),
data=sg_ref if isinstance(sg_ref, bytes) else None,
media_type=sg_type,
)
return {
"success": False,
"error": result.get("description", "Unknown Telegram error"),
"error_code": result.get("error_code"),
"failed_at_chunk": chunk_idx + 1,
}
except aiohttp.ClientError as err:
_LOGGER.error("Telegram upload failed for sub-group %d: %s", sub_idx + 1, err)
return {"success": False, "error": str(err), "failed_at_chunk": chunk_idx + 1}
continue
# Multiple items - use sendMediaGroup
all_cached = all(is_cached for _, _, _, _, is_cached, _ in sub_group_items)
if all_cached:
_LOGGER.debug("Sub-group %d/%d: all %d items cached, using file_ids",
sub_idx + 1, len(media_sub_groups), len(sub_group_items))
media_json = []
for i, (media_type, file_id, _, _, _, _) in enumerate(sub_group_items):
media_item_json: dict[str, Any] = {
"type": media_type,
"media": file_id,
}
if i == 0 and sub_caption and not first_caption_used:
media_item_json["caption"] = sub_caption
media_item_json["parse_mode"] = parse_mode
media_json.append(media_item_json)
payload: dict[str, Any] = {
"chat_id": chat_id,
"media": media_json,
}
if sub_reply_to:
payload["reply_to_message_id"] = sub_reply_to
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendMediaGroup"
try:
async with session.post(telegram_url, json=payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
chunk_message_ids = [
msg.get("message_id") for msg in result.get("result", [])
]
all_message_ids.extend(chunk_message_ids)
if sub_caption:
first_caption_used = True
else:
# Cache might be stale - fall through to upload path
_LOGGER.debug("Cached file_ids failed, will re-upload: %s", result.get("description"))
all_cached = False # Force re-upload
except aiohttp.ClientError as err:
_LOGGER.debug("Cached file_ids request failed: %s", err)
all_cached = False
if not all_cached:
# Build multipart form with mix of cached file_ids and uploaded data
form = FormData()
form.add_field("chat_id", chat_id)
if sub_reply_to:
form.add_field("reply_to_message_id", str(sub_reply_to))
# Build media JSON - use file_id for cached, attach:// for uploaded
media_json = []
upload_idx = 0
keys_to_cache: list[tuple[str, int, str]] = [] # (cache_key, result_idx, type)
for i, (media_type, media_ref, filename, item_cache_key, is_cached, item_content_type) in enumerate(sub_group_items):
if is_cached:
# Use file_id directly
media_item_json: dict[str, Any] = {
"type": media_type,
"media": media_ref, # file_id
}
else:
# Upload this file
attach_name = f"file{upload_idx}"
media_item_json = {
"type": media_type,
"media": f"attach://{attach_name}",
}
# Use provided content_type or default based on media type
content_type = item_content_type or ("image/jpeg" if media_type == "photo" else "video/mp4")
form.add_field(attach_name, media_ref, filename=filename, content_type=content_type)
keys_to_cache.append((item_cache_key, i, media_type))
upload_idx += 1
if i == 0 and sub_caption and not first_caption_used:
media_item_json["caption"] = sub_caption
media_item_json["parse_mode"] = parse_mode
media_json.append(media_item_json)
form.add_field("media", json.dumps(media_json))
# Send to Telegram
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendMediaGroup"
try:
_LOGGER.debug("Uploading media group sub-group %d/%d (%d files, %d cached) to Telegram",
sub_idx + 1, len(media_sub_groups), len(sub_group_items), len(sub_group_items) - upload_idx)
async with session.post(telegram_url, data=form) as response:
result = await response.json()
_LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok"))
if response.status == 200 and result.get("ok"):
chunk_message_ids = [
msg.get("message_id") for msg in result.get("result", [])
]
all_message_ids.extend(chunk_message_ids)
if sub_caption:
first_caption_used = True
# Cache the newly uploaded file_ids
if keys_to_cache:
result_messages = result.get("result", [])
for ck, result_idx, m_type in keys_to_cache:
ck_cache = get_cache_for_key(ck)
ck_thumbhash = self.coordinator.get_asset_thumbhash(ck) if _is_asset_id(ck) else None
if result_idx < len(result_messages) and ck_cache:
msg = result_messages[result_idx]
if m_type == "photo":
photos = msg.get("photo", [])
if photos:
await ck_cache.async_set(ck, photos[-1].get("file_id"), "photo", thumbhash=ck_thumbhash)
elif m_type == "video":
video = msg.get("video", {})
if video.get("file_id"):
await ck_cache.async_set(ck, video["file_id"], "video", thumbhash=ck_thumbhash)
else:
# Log detailed error for media group with total size info
uploaded_data = [m for m in sub_group_items if not m[4]]
total_size = sum(len(d) for _, d, _, _, _, _ in uploaded_data if isinstance(d, bytes))
_LOGGER.error(
"Telegram API error for sub-group %d/%d: %s | Media count: %d | Uploaded size: %d bytes (%.2f MB)",
sub_idx + 1, len(media_sub_groups),
result.get("description", "Unknown Telegram error"),
len(sub_group_items),
total_size,
total_size / (1024 * 1024) if total_size else 0
)
# Log detailed diagnostics for the first photo in the group
for media_type, media_ref, _, _, is_cached, _ in sub_group_items:
if media_type == "photo" and not is_cached and isinstance(media_ref, bytes):
self._log_telegram_error(
error_code=result.get("error_code"),
description=result.get("description", "Unknown Telegram error"),
data=media_ref,
media_type="photo",
)
break # Only log details for first photo
return {
"success": False,
"error": result.get("description", "Unknown Telegram error"),
"error_code": result.get("error_code"),
"failed_at_chunk": chunk_idx + 1,
}
except aiohttp.ClientError as err:
_LOGGER.error("Telegram upload failed for sub-group %d: %s", sub_idx + 1, err)
return {
"success": False,
"error": str(err),
"failed_at_chunk": chunk_idx + 1,
}
# Send oversized photos as documents
for i, (data, photo_caption, photo_url, photo_cache_key) in enumerate(oversized_photos):
_LOGGER.debug("Sending oversized photo %d/%d as document", i + 1, len(oversized_photos))
result = await self._send_telegram_document(
session, token, chat_id, data, f"photo_{i}.jpg",
photo_caption, None, parse_mode, photo_url, None, photo_cache_key
)
if result.get("success"):
all_message_ids.append(result.get("message_id"))
else:
_LOGGER.error("Failed to send oversized photo as document: %s", result.get("error"))
# Continue with other photos even if one fails
# Send documents (can't be in media groups)
for i, (data, doc_caption, doc_url, doc_cache_key, filename, doc_content_type) in enumerate(documents_to_send):
_LOGGER.debug("Sending document %d/%d", i + 1, len(documents_to_send))
result = await self._send_telegram_document(
session, token, chat_id, data, filename,
doc_caption, None, parse_mode, doc_url, doc_content_type, doc_cache_key
)
if result.get("success"):
all_message_ids.append(result.get("message_id"))
else:
_LOGGER.error("Failed to send document: %s", result.get("error"))
# Continue with other documents even if one fails
return {
"success": True,
"message_ids": all_message_ids,
"chunks_sent": len(chunks),
}
class ImmichAlbumIdSensor(ImmichAlbumBaseSensor):
"""Sensor exposing the Immich album ID."""
_attr_icon = "mdi:identifier"
_attr_translation_key = "album_id"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_album_id"
@property
def native_value(self) -> str | None:
"""Return the album ID."""
if self._album_data:
return self._album_data.id
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return extra state attributes."""
if not self._album_data:
return {}
attrs: dict[str, Any] = {
ATTR_ALBUM_NAME: self._album_data.name,
ATTR_ASSET_COUNT: self._album_data.asset_count,
ATTR_LAST_UPDATED: self._album_data.updated_at,
ATTR_CREATED_AT: self._album_data.created_at,
}
# Primary share URL (prefers public, falls back to protected)
share_url = self.coordinator.get_any_url()
if share_url:
attrs["share_url"] = share_url
return attrs
class ImmichAlbumAssetCountSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album asset count."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_icon = "mdi:image-album"
_attr_translation_key = "album_asset_count"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_asset_count"
@property
def native_value(self) -> int | None:
"""Return the state of the sensor (asset count)."""
if self._album_data:
return self._album_data.asset_count
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return extra state attributes."""
if not self._album_data:
return {}
attrs = {
ATTR_ALBUM_ID: self._album_data.id,
ATTR_ASSET_COUNT: self._album_data.asset_count,
ATTR_PHOTO_COUNT: self._album_data.photo_count,
ATTR_VIDEO_COUNT: self._album_data.video_count,
ATTR_LAST_UPDATED: self._album_data.updated_at,
ATTR_CREATED_AT: self._album_data.created_at,
ATTR_SHARED: self._album_data.shared,
ATTR_OWNER: self._album_data.owner,
ATTR_PEOPLE: list(self._album_data.people),
}
if self._album_data.thumbnail_asset_id:
attrs[ATTR_THUMBNAIL_URL] = (
f"{self.coordinator.immich_url}/api/assets/"
f"{self._album_data.thumbnail_asset_id}/thumbnail"
)
return attrs
class ImmichAlbumPhotoCountSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album photo count."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_icon = "mdi:image"
_attr_translation_key = "album_photo_count"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_photo_count"
@property
def native_value(self) -> int | None:
"""Return the state of the sensor (photo count)."""
if self._album_data:
return self._album_data.photo_count
return None
class ImmichAlbumVideoCountSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album video count."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_icon = "mdi:video"
_attr_translation_key = "album_video_count"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_video_count"
@property
def native_value(self) -> int | None:
"""Return the state of the sensor (video count)."""
if self._album_data:
return self._album_data.video_count
return None
class ImmichAlbumLastUpdatedSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album last updated time."""
_attr_device_class = SensorDeviceClass.TIMESTAMP
_attr_icon = "mdi:clock-outline"
_attr_translation_key = "album_last_updated"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_last_updated"
@property
def native_value(self) -> datetime | None:
"""Return the state of the sensor (last updated datetime)."""
if self._album_data and self._album_data.updated_at:
try:
return datetime.fromisoformat(
self._album_data.updated_at.replace("Z", "+00:00")
)
except ValueError:
return None
return None
class ImmichAlbumCreatedSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album creation date."""
_attr_device_class = SensorDeviceClass.TIMESTAMP
_attr_icon = "mdi:calendar-plus"
_attr_translation_key = "album_created"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_created"
@property
def native_value(self) -> datetime | None:
"""Return the state of the sensor (creation datetime)."""
if self._album_data and self._album_data.created_at:
try:
return datetime.fromisoformat(
self._album_data.created_at.replace("Z", "+00:00")
)
except ValueError:
return None
return None
class ImmichAlbumPublicUrlSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album public URL."""
_attr_icon = "mdi:link-variant"
_attr_translation_key = "album_public_url"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_public_url"
@property
def native_value(self) -> str | None:
"""Return the state of the sensor (public URL)."""
if self._album_data:
return self.coordinator.get_public_url()
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return extra state attributes."""
if not self._album_data:
return {}
attrs = {
ATTR_ALBUM_ID: self._album_data.id,
ATTR_SHARED: self._album_data.shared,
}
all_urls = self.coordinator.get_public_urls()
if len(all_urls) > 1:
attrs[ATTR_ALBUM_URLS] = all_urls
links_info = self.coordinator.get_shared_links_info()
if links_info:
attrs["shared_links"] = links_info
return attrs
class ImmichAlbumProtectedUrlSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album password-protected URL."""
_attr_icon = "mdi:link-lock"
_attr_translation_key = "album_protected_url"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_protected_url"
@property
def native_value(self) -> str | None:
"""Return the state of the sensor (protected URL)."""
if self._album_data:
return self.coordinator.get_protected_url()
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return extra state attributes."""
if not self._album_data:
return {}
attrs = {
ATTR_ALBUM_ID: self._album_data.id,
}
all_urls = self.coordinator.get_protected_urls()
if len(all_urls) > 1:
attrs["protected_urls"] = all_urls
return attrs
class ImmichAlbumProtectedPasswordSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album protected link password."""
_attr_icon = "mdi:key"
_attr_translation_key = "album_protected_password"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_protected_password"
@property
def native_value(self) -> str | None:
"""Return the state of the sensor (protected link password)."""
if self._album_data:
return self.coordinator.get_protected_password()
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return extra state attributes."""
if not self._album_data:
return {}
return {
ATTR_ALBUM_ID: self._album_data.id,
ATTR_ALBUM_PROTECTED_URL: self.coordinator.get_protected_url(),
}