Files
haos-hacs-immich-album-watcher/custom_components/immich_album_watcher/sensor.py
alexei.dolgolyov 71b79cd919
All checks were successful
Validate / Hassfest (push) Successful in 1m19s
Move quiet hours from hub config to per-call service params
Quiet hours are now specified per send_telegram_notification call via
quiet_hours_start/quiet_hours_end params instead of being a hub-wide
integration option. This allows different automations to use different
quiet hours windows (or none at all).

- Remove quiet_hours_start/end from config options UI and const.py
- Add quiet_hours_start/end as optional HH:MM params on the service
- Remove ignore_quiet_hours param (omit quiet hours params to send immediately)
- Queue stores quiet_hours_end per item; each unique end time gets its
  own async_track_time_change timer for replay
- On startup, items whose quiet hours have passed are sent immediately
- Add async_remove_indices() to NotificationQueue for selective removal
- Timers are cleaned up when no more items need them

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 12:04:20 +03:00

2079 lines
93 KiB
Python

"""Sensor platform for Immich Album Watcher."""
from __future__ import annotations
import asyncio
import logging
import re
from datetime import datetime
from typing import Any
import voluptuous as vol
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry, ConfigSubentry
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse, callback
from homeassistant.helpers import entity_platform
from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import slugify
from .const import (
ATTR_ALBUM_ID,
ATTR_ALBUM_NAME,
ATTR_ALBUM_PROTECTED_URL,
ATTR_ALBUM_URLS,
ATTR_ASSET_COUNT,
ATTR_CREATED_AT,
ATTR_LAST_UPDATED,
ATTR_OWNER,
ATTR_PEOPLE,
ATTR_PHOTO_COUNT,
ATTR_SHARED,
ATTR_THUMBNAIL_URL,
ATTR_VIDEO_COUNT,
CONF_ALBUM_ID,
CONF_ALBUM_NAME,
CONF_HUB_NAME,
CONF_TELEGRAM_BOT_TOKEN,
DOMAIN,
SERVICE_GET_ASSETS,
SERVICE_REFRESH,
SERVICE_SEND_TELEGRAM_NOTIFICATION,
)
from .coordinator import AlbumData, ImmichAlbumWatcherCoordinator
from .storage import NotificationQueue, TelegramFileCache
_LOGGER = logging.getLogger(__name__)
# Telegram constants
TELEGRAM_API_BASE_URL = "https://api.telegram.org/bot"
TELEGRAM_MAX_PHOTO_SIZE = 10 * 1024 * 1024 # 10 MB - Telegram's max photo size
TELEGRAM_MAX_VIDEO_SIZE = 50 * 1024 * 1024 # 50 MB - Telegram's max video/document upload size
TELEGRAM_MAX_DIMENSION_SUM = 10000 # Maximum sum of width + height in pixels
# Regex pattern for Immich asset ID (UUID format)
_ASSET_ID_PATTERN = re.compile(r"^[a-f0-9-]{36}$")
# Regex patterns to extract asset ID from Immich URLs
# Matches patterns like:
# - /api/assets/{asset_id}/original
# - /api/assets/{asset_id}/thumbnail
# - /api/assets/{asset_id}/video/playback
# - /share/{key}/photos/{asset_id}
_IMMICH_ASSET_ID_PATTERNS = [
re.compile(r"/api/assets/([a-f0-9-]{36})/(?:original|thumbnail|video)"),
re.compile(r"/share/[^/]+/photos/([a-f0-9-]{36})"),
]
def _is_asset_id(value: str) -> bool:
"""Check if a string is a valid Immich asset ID (UUID format).
Args:
value: The string to check
Returns:
True if the string matches the UUID format, False otherwise
"""
return bool(_ASSET_ID_PATTERN.match(value))
def _extract_asset_id_from_url(url: str) -> str | None:
"""Extract asset ID from Immich URL if possible.
Supports the following URL patterns:
- /api/assets/{asset_id}/original?...
- /api/assets/{asset_id}/thumbnail?...
- /api/assets/{asset_id}/video/playback?...
- /share/{key}/photos/{asset_id}
Args:
url: The URL to extract asset ID from
Returns:
The asset ID if found, None otherwise
"""
if not url:
return None
for pattern in _IMMICH_ASSET_ID_PATTERNS:
match = pattern.search(url)
if match:
return match.group(1)
return None
def _split_media_by_upload_size(
media_items: list[tuple], max_upload_size: int
) -> list[list[tuple]]:
"""Split media items into sub-groups respecting upload size limit.
Cached items (file_id references) don't count toward upload size since
they are sent as lightweight JSON references, not uploaded data.
Args:
media_items: List of (media_type, media_ref, filename, cache_key, is_cached, content_type)
max_upload_size: Maximum total upload bytes per sub-group
Returns:
List of sub-groups, each a list of media_items tuples
"""
if not media_items:
return []
groups: list[list[tuple]] = []
current_group: list[tuple] = []
current_upload_size = 0
for item in media_items:
_, media_ref, _, _, is_cached, _ = item
item_upload_size = 0 if is_cached else len(media_ref)
if current_group and current_upload_size + item_upload_size > max_upload_size:
groups.append(current_group)
current_group = [item]
current_upload_size = item_upload_size
else:
current_group.append(item)
current_upload_size += item_upload_size
if current_group:
groups.append(current_group)
return groups
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up Immich Album Watcher sensors from a config entry."""
# Iterate through all album subentries
for subentry_id, subentry in entry.subentries.items():
subentry_data = hass.data[DOMAIN][entry.entry_id]["subentries"].get(subentry_id)
if not subentry_data:
_LOGGER.error("Subentry data not found for %s", subentry_id)
continue
coordinator = subentry_data.coordinator
entities: list[SensorEntity] = [
ImmichAlbumIdSensor(coordinator, entry, subentry),
ImmichAlbumAssetCountSensor(coordinator, entry, subentry),
ImmichAlbumPhotoCountSensor(coordinator, entry, subentry),
ImmichAlbumVideoCountSensor(coordinator, entry, subentry),
ImmichAlbumLastUpdatedSensor(coordinator, entry, subentry),
ImmichAlbumCreatedSensor(coordinator, entry, subentry),
ImmichAlbumPublicUrlSensor(coordinator, entry, subentry),
ImmichAlbumProtectedUrlSensor(coordinator, entry, subentry),
ImmichAlbumProtectedPasswordSensor(coordinator, entry, subentry),
]
async_add_entities(entities, config_subentry_id=subentry_id)
# Register entity services
platform = entity_platform.async_get_current_platform()
platform.async_register_entity_service(
SERVICE_REFRESH,
{},
"async_refresh_album",
)
platform.async_register_entity_service(
SERVICE_GET_ASSETS,
{
vol.Optional("limit", default=10): vol.All(
vol.Coerce(int), vol.Range(min=1, max=100)
),
vol.Optional("offset", default=0): vol.All(
vol.Coerce(int), vol.Range(min=0)
),
vol.Optional("favorite_only", default=False): bool,
vol.Optional("filter_min_rating", default=1): vol.All(
vol.Coerce(int), vol.Range(min=1, max=5)
),
vol.Optional("order_by", default="date"): vol.In(
["date", "rating", "name", "random"]
),
vol.Optional("order", default="descending"): vol.In(
["ascending", "descending"]
),
vol.Optional("asset_type", default="all"): vol.In(["all", "photo", "video"]),
vol.Optional("min_date"): str,
vol.Optional("max_date"): str,
vol.Optional("memory_date"): str,
vol.Optional("city"): str,
vol.Optional("state"): str,
vol.Optional("country"): str,
},
"async_get_assets",
supports_response=SupportsResponse.ONLY,
)
platform.async_register_entity_service(
SERVICE_SEND_TELEGRAM_NOTIFICATION,
{
vol.Optional("bot_token"): str,
vol.Required("chat_id"): vol.Coerce(str),
vol.Optional("assets"): list,
vol.Optional("caption"): str,
vol.Optional("reply_to_message_id"): vol.Coerce(int),
vol.Optional("disable_web_page_preview"): bool,
vol.Optional("parse_mode", default="HTML"): str,
vol.Optional("max_group_size", default=10): vol.All(
vol.Coerce(int), vol.Range(min=2, max=10)
),
vol.Optional("chunk_delay", default=0): vol.All(
vol.Coerce(int), vol.Range(min=0, max=60000)
),
vol.Optional("wait_for_response", default=True): bool,
vol.Optional("max_asset_data_size"): vol.All(
vol.Coerce(int), vol.Range(min=1, max=52428800)
),
vol.Optional("send_large_photos_as_documents", default=False): bool,
vol.Optional("chat_action", default="typing"): vol.Any(
None, vol.In(["", "typing", "upload_photo", "upload_video", "upload_document"])
),
vol.Optional("quiet_hours_start"): vol.Match(r"^\d{2}:\d{2}$"),
vol.Optional("quiet_hours_end"): vol.Match(r"^\d{2}:\d{2}$"),
},
"async_send_telegram_notification",
supports_response=SupportsResponse.OPTIONAL,
)
class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], SensorEntity):
"""Base sensor for Immich album."""
_attr_has_entity_name = True
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator)
self._entry = entry
self._subentry = subentry
self._album_id = subentry.data[CONF_ALBUM_ID]
self._album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album")
self._hub_name = entry.data.get(CONF_HUB_NAME, "Immich")
# Generate unique_id prefix: {hub_name}_album_{album_name}
self._unique_id_prefix = slugify(f"{self._hub_name}_album_{self._album_name}")
@property
def _album_data(self) -> AlbumData | None:
"""Get the album data from coordinator."""
return self.coordinator.data
@property
def available(self) -> bool:
"""Return if entity is available."""
return self.coordinator.last_update_success and self._album_data is not None
@property
def device_info(self) -> DeviceInfo:
"""Return device info - one device per album."""
return DeviceInfo(
identifiers={(DOMAIN, self._subentry.subentry_id)},
name=self._album_name,
manufacturer="Immich",
entry_type=DeviceEntryType.SERVICE,
)
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self.async_write_ha_state()
async def async_refresh_album(self) -> None:
"""Refresh data for this album."""
await self.coordinator.async_refresh_now()
async def async_get_assets(
self,
limit: int = 10,
offset: int = 0,
favorite_only: bool = False,
filter_min_rating: int = 1,
order_by: str = "date",
order: str = "descending",
asset_type: str = "all",
min_date: str | None = None,
max_date: str | None = None,
memory_date: str | None = None,
city: str | None = None,
state: str | None = None,
country: str | None = None,
) -> ServiceResponse:
"""Get assets for this album with optional filtering and ordering."""
assets = await self.coordinator.async_get_assets(
limit=limit,
offset=offset,
favorite_only=favorite_only,
filter_min_rating=filter_min_rating,
order_by=order_by,
order=order,
asset_type=asset_type,
min_date=min_date,
max_date=max_date,
memory_date=memory_date,
city=city,
state=state,
country=country,
)
return {"assets": assets}
@staticmethod
def _is_quiet_hours(start_str: str | None, end_str: str | None) -> bool:
"""Check if current time is within quiet hours."""
from datetime import time as dt_time
from homeassistant.util import dt as dt_util
if not start_str or not end_str:
return False
try:
now = dt_util.now().time()
start_time = dt_time.fromisoformat(start_str)
end_time = dt_time.fromisoformat(end_str)
except ValueError:
return False
if start_time <= end_time:
return start_time <= now < end_time
else:
# Crosses midnight (e.g., 22:00 - 08:00)
return now >= start_time or now < end_time
async def async_send_telegram_notification(
self,
chat_id: str,
assets: list[dict[str, str]] | None = None,
bot_token: str | None = None,
caption: str | None = None,
reply_to_message_id: int | None = None,
disable_web_page_preview: bool | None = None,
parse_mode: str = "HTML",
max_group_size: int = 10,
chunk_delay: int = 0,
wait_for_response: bool = True,
max_asset_data_size: int | None = None,
send_large_photos_as_documents: bool = False,
chat_action: str | None = "typing",
quiet_hours_start: str | None = None,
quiet_hours_end: str | None = None,
) -> ServiceResponse:
"""Send notification to Telegram.
Supports:
- Empty assets: sends a simple text message
- Single photo: uses sendPhoto API
- Single video: uses sendVideo API
- Multiple items: uses sendMediaGroup API (splits into multiple groups if needed)
Each item in assets should be a dict with 'url', optional 'type' (photo/video/document),
and optional 'cache_key' (custom key for caching instead of URL).
Downloads media and uploads to Telegram to bypass CORS restrictions.
If wait_for_response is False, the task will be executed in the background
and the service will return immediately.
"""
# Check quiet hours — queue notification if active
if self._is_quiet_hours(quiet_hours_start, quiet_hours_end):
from . import _register_queue_timers, ImmichConfigEntry
queue: NotificationQueue = self.hass.data[DOMAIN][self._entry.entry_id]["notification_queue"]
await queue.async_enqueue({
"entity_id": self.entity_id,
"chat_id": chat_id,
"assets": assets,
"bot_token": bot_token,
"caption": caption,
"reply_to_message_id": reply_to_message_id,
"disable_web_page_preview": disable_web_page_preview,
"parse_mode": parse_mode,
"max_group_size": max_group_size,
"chunk_delay": chunk_delay,
"max_asset_data_size": max_asset_data_size,
"send_large_photos_as_documents": send_large_photos_as_documents,
"chat_action": chat_action,
"quiet_hours_start": quiet_hours_start,
"quiet_hours_end": quiet_hours_end,
})
# Register timer for this end time if not already registered
_register_queue_timers(self.hass, self._entry)
return {"success": True, "status": "queued_quiet_hours"}
# If non-blocking mode, create a background task and return immediately
if not wait_for_response:
self.hass.async_create_task(
self._execute_telegram_notification(
chat_id=chat_id,
assets=assets,
bot_token=bot_token,
caption=caption,
reply_to_message_id=reply_to_message_id,
disable_web_page_preview=disable_web_page_preview,
parse_mode=parse_mode,
max_group_size=max_group_size,
chunk_delay=chunk_delay,
max_asset_data_size=max_asset_data_size,
send_large_photos_as_documents=send_large_photos_as_documents,
chat_action=chat_action,
)
)
return {"success": True, "status": "queued", "message": "Notification queued for background processing"}
# Blocking mode - execute and return result
return await self._execute_telegram_notification(
chat_id=chat_id,
assets=assets,
bot_token=bot_token,
caption=caption,
reply_to_message_id=reply_to_message_id,
disable_web_page_preview=disable_web_page_preview,
parse_mode=parse_mode,
max_group_size=max_group_size,
chunk_delay=chunk_delay,
max_asset_data_size=max_asset_data_size,
send_large_photos_as_documents=send_large_photos_as_documents,
chat_action=chat_action,
)
async def _execute_telegram_notification(
self,
chat_id: str,
assets: list[dict[str, str]] | None = None,
bot_token: str | None = None,
caption: str | None = None,
reply_to_message_id: int | None = None,
disable_web_page_preview: bool | None = None,
parse_mode: str = "HTML",
max_group_size: int = 10,
chunk_delay: int = 0,
max_asset_data_size: int | None = None,
send_large_photos_as_documents: bool = False,
chat_action: str | None = "typing",
) -> ServiceResponse:
"""Execute the Telegram notification (internal method)."""
import json
import aiohttp
from aiohttp import FormData
from homeassistant.helpers.aiohttp_client import async_get_clientsession
# Get bot token from parameter or config
token = bot_token or self._entry.options.get(CONF_TELEGRAM_BOT_TOKEN)
if not token:
return {
"success": False,
"error": "No bot token provided. Set it in integration options or pass as parameter.",
}
session = async_get_clientsession(self.hass)
# Handle empty assets - send simple text message (no typing indicator needed)
if not assets:
return await self._send_telegram_message(
session, token, chat_id, caption or "", reply_to_message_id, disable_web_page_preview, parse_mode
)
# Start chat action indicator for media notifications (before downloading assets)
typing_task = None
if chat_action:
typing_task = self._start_typing_indicator(session, token, chat_id, chat_action)
try:
# Handle single photo
if len(assets) == 1 and assets[0].get("type") == "photo":
return await self._send_telegram_photo(
session, token, chat_id, assets[0].get("url"), caption, reply_to_message_id, parse_mode,
max_asset_data_size, send_large_photos_as_documents, assets[0].get("content_type"),
assets[0].get("cache_key")
)
# Handle single video
if len(assets) == 1 and assets[0].get("type") == "video":
return await self._send_telegram_video(
session, token, chat_id, assets[0].get("url"), caption, reply_to_message_id, parse_mode,
max_asset_data_size, assets[0].get("content_type"), assets[0].get("cache_key")
)
# Handle single document (default type)
if len(assets) == 1 and assets[0].get("type", "document") == "document":
url = assets[0].get("url")
if not url:
return {"success": False, "error": "Missing 'url' for document"}
item_content_type = assets[0].get("content_type")
item_cache_key = assets[0].get("cache_key")
try:
download_url = self.coordinator.get_internal_download_url(url)
async with session.get(download_url) as resp:
if resp.status != 200:
return {"success": False, "error": f"Failed to download media: HTTP {resp.status}"}
data = await resp.read()
if max_asset_data_size is not None and len(data) > max_asset_data_size:
return {"success": False, "error": f"Media size ({len(data)} bytes) exceeds max_asset_data_size limit ({max_asset_data_size} bytes)"}
# Detect filename from URL or use generic name
filename = url.split("/")[-1].split("?")[0] or "file"
return await self._send_telegram_document(
session, token, chat_id, data, filename, caption, reply_to_message_id, parse_mode,
url, item_content_type, item_cache_key
)
except aiohttp.ClientError as err:
return {"success": False, "error": f"Failed to download media: {err}"}
# Handle multiple items - send as media group(s)
return await self._send_telegram_media_group(
session, token, chat_id, assets, caption, reply_to_message_id, max_group_size, chunk_delay, parse_mode,
max_asset_data_size, send_large_photos_as_documents
)
finally:
# Stop chat action indicator when done (success or error)
if typing_task:
typing_task.cancel()
try:
await typing_task
except asyncio.CancelledError:
pass
async def _send_telegram_message(
self,
session: Any,
token: str,
chat_id: str,
text: str,
reply_to_message_id: int | None = None,
disable_web_page_preview: bool | None = None,
parse_mode: str = "HTML",
) -> ServiceResponse:
"""Send a simple text message to Telegram."""
import aiohttp
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendMessage"
payload: dict[str, Any] = {
"chat_id": chat_id,
"text": text or "Notification from Home Assistant",
"parse_mode": parse_mode,
}
if reply_to_message_id:
payload["reply_to_message_id"] = reply_to_message_id
if disable_web_page_preview is not None:
payload["disable_web_page_preview"] = disable_web_page_preview
try:
_LOGGER.debug("Sending text message to Telegram")
async with session.post(telegram_url, json=payload) as response:
result = await response.json()
_LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok"))
if response.status == 200 and result.get("ok"):
return {
"success": True,
"message_id": result.get("result", {}).get("message_id"),
}
else:
_LOGGER.error("Telegram API error: %s", result)
return {
"success": False,
"error": result.get("description", "Unknown Telegram error"),
"error_code": result.get("error_code"),
}
except aiohttp.ClientError as err:
_LOGGER.error("Telegram message send failed: %s", err)
return {"success": False, "error": str(err)}
async def _send_telegram_chat_action(
self,
session: Any,
token: str,
chat_id: str,
action: str = "typing",
) -> bool:
"""Send a chat action to Telegram (e.g., typing indicator).
Args:
session: aiohttp client session
token: Telegram bot token
chat_id: Target chat ID
action: Chat action type (typing, upload_photo, upload_video, etc.)
Returns:
True if successful, False otherwise
"""
import aiohttp
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendChatAction"
payload = {"chat_id": chat_id, "action": action}
try:
async with session.post(telegram_url, json=payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
_LOGGER.debug("Sent chat action '%s' to chat %s", action, chat_id)
return True
else:
_LOGGER.debug("Failed to send chat action: %s", result.get("description"))
return False
except aiohttp.ClientError as err:
_LOGGER.debug("Chat action request failed: %s", err)
return False
def _start_typing_indicator(
self,
session: Any,
token: str,
chat_id: str,
action: str = "typing",
) -> asyncio.Task:
"""Start a background task that sends chat action indicator periodically.
The chat action indicator expires after ~5 seconds, so we refresh it every 4 seconds.
Args:
session: aiohttp client session
token: Telegram bot token
chat_id: Target chat ID
action: Chat action type (typing, upload_photo, upload_video, etc.)
Returns:
The background task (cancel it when done)
"""
async def action_loop() -> None:
"""Keep sending chat action until cancelled."""
try:
while True:
await self._send_telegram_chat_action(session, token, chat_id, action)
await asyncio.sleep(4)
except asyncio.CancelledError:
_LOGGER.debug("Chat action indicator stopped for action '%s'", action)
return asyncio.create_task(action_loop())
def _log_telegram_error(
self,
error_code: int | None,
description: str,
data: bytes | None = None,
media_type: str = "photo",
) -> None:
"""Log detailed Telegram API error with diagnostics.
Args:
error_code: Telegram error code
description: Error description from Telegram
data: Media data bytes (optional, for size diagnostics)
media_type: Type of media (photo/video)
"""
error_msg = f"Telegram API error ({error_code}): {description}"
# Add diagnostic information based on error type
if data:
error_msg += f" | Media size: {len(data)} bytes ({len(data) / (1024 * 1024):.2f} MB)"
# Check dimensions for photos
if media_type == "photo":
try:
from PIL import Image
import io
img = Image.open(io.BytesIO(data))
width, height = img.size
dimension_sum = width + height
error_msg += f" | Dimensions: {width}x{height} (sum={dimension_sum})"
# Highlight limit violations
if len(data) > TELEGRAM_MAX_PHOTO_SIZE:
error_msg += f" | EXCEEDS size limit ({TELEGRAM_MAX_PHOTO_SIZE / (1024 * 1024):.0f} MB)"
if dimension_sum > TELEGRAM_MAX_DIMENSION_SUM:
error_msg += f" | EXCEEDS dimension limit ({TELEGRAM_MAX_DIMENSION_SUM})"
except Exception:
pass
# Check size limit for videos
if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE:
error_msg += f" | EXCEEDS Telegram upload limit ({TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024):.0f} MB)"
# Provide suggestions based on error description
suggestions = []
if "dimension" in description.lower() or "PHOTO_INVALID_DIMENSIONS" in description:
suggestions.append("Photo dimensions too large - consider setting send_large_photos_as_documents=true")
elif "too large" in description.lower() or error_code == 413:
suggestions.append("File size too large - consider setting send_large_photos_as_documents=true or max_asset_data_size to skip large files")
elif "entity too large" in description.lower():
suggestions.append("Request entity too large - reduce max_group_size or set max_asset_data_size")
if suggestions:
error_msg += f" | Suggestions: {'; '.join(suggestions)}"
_LOGGER.error(error_msg)
def _check_telegram_photo_limits(
self,
data: bytes,
) -> tuple[bool, str | None, int | None, int | None]:
"""Check if photo data exceeds Telegram photo limits.
Telegram limits for photos:
- Max file size: 10 MB
- Max dimension sum: ~10,000 pixels (width + height)
Returns:
Tuple of (exceeds_limits, reason, width, height)
- exceeds_limits: True if photo exceeds limits
- reason: Human-readable reason (None if within limits)
- width: Image width in pixels (None if PIL not available)
- height: Image height in pixels (None if PIL not available)
"""
# Check file size
if len(data) > TELEGRAM_MAX_PHOTO_SIZE:
return True, f"size {len(data)} bytes exceeds {TELEGRAM_MAX_PHOTO_SIZE} bytes limit", None, None
# Try to check dimensions using PIL
try:
from PIL import Image
import io
img = Image.open(io.BytesIO(data))
width, height = img.size
dimension_sum = width + height
if dimension_sum > TELEGRAM_MAX_DIMENSION_SUM:
return True, f"dimensions {width}x{height} (sum={dimension_sum}) exceed {TELEGRAM_MAX_DIMENSION_SUM} limit", width, height
return False, None, width, height
except ImportError:
# PIL not available, can't check dimensions
_LOGGER.debug("PIL not available, skipping dimension check")
return False, None, None, None
except Exception as e:
# Failed to check dimensions
_LOGGER.debug("Failed to check photo dimensions: %s", e)
return False, None, None, None
def _get_telegram_cache_and_key(
self,
url: str | None,
cache_key: str | None = None,
) -> tuple[TelegramFileCache | None, str | None, str | None]:
"""Determine which Telegram cache, key, and thumbhash to use.
Priority: custom cache_key -> direct asset ID -> extracted asset ID from URL -> URL
Args:
url: The URL of the media (or asset ID directly)
cache_key: Optional custom cache key provided by user
Returns:
Tuple of (cache instance, cache key, thumbhash) to use.
thumbhash is only populated when using the asset cache.
"""
if cache_key:
# Custom cache_key uses URL cache (no thumbhash)
return self.coordinator.telegram_cache, cache_key, None
if url:
# Check if url is already an asset ID (UUID format)
if _is_asset_id(url):
thumbhash = self.coordinator.get_asset_thumbhash(url)
return self.coordinator.telegram_asset_cache, url, thumbhash
# Try to extract asset ID from URL
asset_id = _extract_asset_id_from_url(url)
if asset_id:
# Extracted asset ID uses asset cache
thumbhash = self.coordinator.get_asset_thumbhash(asset_id)
return self.coordinator.telegram_asset_cache, asset_id, thumbhash
# Fallback to URL cache with URL as key (no thumbhash)
return self.coordinator.telegram_cache, url, None
return None, None, None
async def _send_telegram_photo(
self,
session: Any,
token: str,
chat_id: str,
url: str | None,
caption: str | None = None,
reply_to_message_id: int | None = None,
parse_mode: str = "HTML",
max_asset_data_size: int | None = None,
send_large_photos_as_documents: bool = False,
content_type: str | None = None,
cache_key: str | None = None,
) -> ServiceResponse:
"""Send a single photo to Telegram."""
import aiohttp
from aiohttp import FormData
# Use provided content type or default to image/jpeg
if not content_type:
content_type = "image/jpeg"
if not url:
return {"success": False, "error": "Missing 'url' for photo"}
# Determine which cache to use and the cache key
effective_cache, effective_cache_key, effective_thumbhash = self._get_telegram_cache_and_key(url, cache_key)
# Check cache for file_id
cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None
if cached and cached.get("file_id") and effective_cache_key:
# Use cached file_id - no download needed
file_id = cached["file_id"]
_LOGGER.debug("Using cached Telegram file_id for photo (key: %s)", effective_cache_key[:36] if len(effective_cache_key) > 36 else effective_cache_key)
payload = {
"chat_id": chat_id,
"photo": file_id,
"parse_mode": parse_mode,
}
if caption:
payload["caption"] = caption
if reply_to_message_id:
payload["reply_to_message_id"] = reply_to_message_id
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendPhoto"
try:
async with session.post(telegram_url, json=payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
return {
"success": True,
"message_id": result.get("result", {}).get("message_id"),
"cached": True,
}
else:
# Cache might be stale, fall through to upload
_LOGGER.debug("Cached file_id failed, will re-upload: %s", result.get("description"))
except aiohttp.ClientError as err:
_LOGGER.debug("Cached file_id request failed: %s", err)
try:
# Download the photo using internal URL for faster local network transfer
download_url = self.coordinator.get_internal_download_url(url)
_LOGGER.debug("Downloading photo from %s", download_url[:80])
async with session.get(download_url) as resp:
if resp.status != 200:
return {
"success": False,
"error": f"Failed to download photo: HTTP {resp.status}",
}
data = await resp.read()
_LOGGER.debug("Downloaded photo: %d bytes", len(data))
# Check if photo exceeds max size limit (user-defined limit)
if max_asset_data_size is not None and len(data) > max_asset_data_size:
_LOGGER.warning(
"Photo size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping",
len(data), max_asset_data_size
)
return {
"success": False,
"error": f"Photo size ({len(data)} bytes) exceeds max_asset_data_size limit ({max_asset_data_size} bytes)",
"skipped": True,
}
# Check if photo exceeds Telegram's photo limits
exceeds_limits, reason, width, height = self._check_telegram_photo_limits(data)
if exceeds_limits:
if send_large_photos_as_documents:
# Send as document instead
_LOGGER.info("Photo %s, sending as document", reason)
return await self._send_telegram_document(
session, token, chat_id, data, "photo.jpg",
caption, reply_to_message_id, parse_mode, url, None, cache_key
)
else:
# Skip oversized photo
_LOGGER.warning("Photo %s, skipping (set send_large_photos_as_documents=true to send as document)", reason)
return {
"success": False,
"error": f"Photo {reason}",
"skipped": True,
}
# Build multipart form
form = FormData()
form.add_field("chat_id", chat_id)
form.add_field("photo", data, filename="photo.jpg", content_type=content_type)
form.add_field("parse_mode", parse_mode)
if caption:
form.add_field("caption", caption)
if reply_to_message_id:
form.add_field("reply_to_message_id", str(reply_to_message_id))
# Send to Telegram
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendPhoto"
_LOGGER.debug("Uploading photo to Telegram")
async with session.post(telegram_url, data=form) as response:
result = await response.json()
_LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok"))
if response.status == 200 and result.get("ok"):
# Extract and cache file_id
photos = result.get("result", {}).get("photo", [])
if photos and effective_cache and effective_cache_key:
# Use the largest photo's file_id
file_id = photos[-1].get("file_id")
if file_id:
await effective_cache.async_set(effective_cache_key, file_id, "photo", thumbhash=effective_thumbhash)
return {
"success": True,
"message_id": result.get("result", {}).get("message_id"),
}
else:
# Log detailed error with diagnostics
self._log_telegram_error(
error_code=result.get("error_code"),
description=result.get("description", "Unknown Telegram error"),
data=data,
media_type="photo",
)
return {
"success": False,
"error": result.get("description", "Unknown Telegram error"),
"error_code": result.get("error_code"),
}
except aiohttp.ClientError as err:
_LOGGER.error("Telegram photo upload failed: %s", err)
return {"success": False, "error": str(err)}
async def _send_telegram_video(
self,
session: Any,
token: str,
chat_id: str,
url: str | None,
caption: str | None = None,
reply_to_message_id: int | None = None,
parse_mode: str = "HTML",
max_asset_data_size: int | None = None,
content_type: str | None = None,
cache_key: str | None = None,
) -> ServiceResponse:
"""Send a single video to Telegram."""
import aiohttp
from aiohttp import FormData
# Use provided content type or default to video/mp4
if not content_type:
content_type = "video/mp4"
if not url:
return {"success": False, "error": "Missing 'url' for video"}
# Determine which cache to use and the cache key
effective_cache, effective_cache_key, effective_thumbhash = self._get_telegram_cache_and_key(url, cache_key)
# Check cache for file_id
cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None
if cached and cached.get("file_id") and effective_cache_key:
# Use cached file_id - no download needed
file_id = cached["file_id"]
_LOGGER.debug("Using cached Telegram file_id for video (key: %s)", effective_cache_key[:36] if len(effective_cache_key) > 36 else effective_cache_key)
payload = {
"chat_id": chat_id,
"video": file_id,
"parse_mode": parse_mode,
}
if caption:
payload["caption"] = caption
if reply_to_message_id:
payload["reply_to_message_id"] = reply_to_message_id
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendVideo"
try:
async with session.post(telegram_url, json=payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
return {
"success": True,
"message_id": result.get("result", {}).get("message_id"),
"cached": True,
}
else:
# Cache might be stale, fall through to upload
_LOGGER.debug("Cached file_id failed, will re-upload: %s", result.get("description"))
except aiohttp.ClientError as err:
_LOGGER.debug("Cached file_id request failed: %s", err)
try:
# Download the video using internal URL for faster local network transfer
download_url = self.coordinator.get_internal_download_url(url)
_LOGGER.debug("Downloading video from %s", download_url[:80])
async with session.get(download_url) as resp:
if resp.status != 200:
return {
"success": False,
"error": f"Failed to download video: HTTP {resp.status}",
}
data = await resp.read()
_LOGGER.debug("Downloaded video: %d bytes", len(data))
# Check if video exceeds max size limit (user-defined limit)
if max_asset_data_size is not None and len(data) > max_asset_data_size:
_LOGGER.warning(
"Video size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping",
len(data), max_asset_data_size
)
return {
"success": False,
"error": f"Video size ({len(data)} bytes) exceeds max_asset_data_size limit ({max_asset_data_size} bytes)",
"skipped": True,
}
# Check if video exceeds Telegram's upload limit (50 MB)
if len(data) > TELEGRAM_MAX_VIDEO_SIZE:
_LOGGER.warning(
"Video size (%d bytes, %.1f MB) exceeds Telegram's %d bytes (%.0f MB) upload limit, skipping",
len(data), len(data) / (1024 * 1024),
TELEGRAM_MAX_VIDEO_SIZE, TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024),
)
return {
"success": False,
"error": f"Video size ({len(data) / (1024 * 1024):.1f} MB) exceeds Telegram's {TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024):.0f} MB upload limit",
"skipped": True,
}
# Build multipart form
form = FormData()
form.add_field("chat_id", chat_id)
form.add_field("video", data, filename="video.mp4", content_type=content_type)
form.add_field("parse_mode", parse_mode)
if caption:
form.add_field("caption", caption)
if reply_to_message_id:
form.add_field("reply_to_message_id", str(reply_to_message_id))
# Send to Telegram
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendVideo"
_LOGGER.debug("Uploading video to Telegram")
async with session.post(telegram_url, data=form) as response:
result = await response.json()
_LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok"))
if response.status == 200 and result.get("ok"):
# Extract and cache file_id
video = result.get("result", {}).get("video", {})
if video and effective_cache and effective_cache_key:
file_id = video.get("file_id")
if file_id:
await effective_cache.async_set(effective_cache_key, file_id, "video", thumbhash=effective_thumbhash)
return {
"success": True,
"message_id": result.get("result", {}).get("message_id"),
}
else:
# Log detailed error with diagnostics
self._log_telegram_error(
error_code=result.get("error_code"),
description=result.get("description", "Unknown Telegram error"),
data=data,
media_type="video",
)
return {
"success": False,
"error": result.get("description", "Unknown Telegram error"),
"error_code": result.get("error_code"),
}
except aiohttp.ClientError as err:
_LOGGER.error("Telegram video upload failed: %s", err)
return {"success": False, "error": str(err)}
async def _send_telegram_document(
self,
session: Any,
token: str,
chat_id: str,
data: bytes,
filename: str = "file",
caption: str | None = None,
reply_to_message_id: int | None = None,
parse_mode: str = "HTML",
source_url: str | None = None,
content_type: str | None = None,
cache_key: str | None = None,
) -> ServiceResponse:
"""Send a file as a document to Telegram."""
import aiohttp
import mimetypes
from aiohttp import FormData
# Use provided content type or detect from filename
if not content_type:
content_type, _ = mimetypes.guess_type(filename)
if not content_type:
content_type = "application/octet-stream"
# Determine which cache and key to use
effective_cache, effective_cache_key, effective_thumbhash = self._get_telegram_cache_and_key(source_url, cache_key)
# Check cache for file_id
if effective_cache and effective_cache_key:
cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash)
if cached and cached.get("file_id") and cached.get("type") == "document":
# Use cached file_id
file_id = cached["file_id"]
_LOGGER.debug("Using cached Telegram file_id for document (key: %s)", effective_cache_key[:36] if len(effective_cache_key) > 36 else effective_cache_key)
payload = {
"chat_id": chat_id,
"document": file_id,
"parse_mode": parse_mode,
}
if caption:
payload["caption"] = caption
if reply_to_message_id:
payload["reply_to_message_id"] = reply_to_message_id
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendDocument"
try:
async with session.post(telegram_url, json=payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
return {
"success": True,
"message_id": result.get("result", {}).get("message_id"),
"cached": True,
}
else:
_LOGGER.debug("Cached file_id failed, will re-upload: %s", result.get("description"))
except aiohttp.ClientError as err:
_LOGGER.debug("Cached file_id request failed: %s", err)
try:
# Build multipart form
form = FormData()
form.add_field("chat_id", chat_id)
form.add_field("document", data, filename=filename, content_type=content_type)
form.add_field("parse_mode", parse_mode)
if caption:
form.add_field("caption", caption)
if reply_to_message_id:
form.add_field("reply_to_message_id", str(reply_to_message_id))
# Send to Telegram
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendDocument"
_LOGGER.debug("Uploading document to Telegram (%d bytes, %s)", len(data), content_type)
async with session.post(telegram_url, data=form) as response:
result = await response.json()
_LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok"))
if response.status == 200 and result.get("ok"):
# Extract and cache file_id
if effective_cache_key and effective_cache:
document = result.get("result", {}).get("document", {})
file_id = document.get("file_id")
if file_id:
await effective_cache.async_set(effective_cache_key, file_id, "document", thumbhash=effective_thumbhash)
return {
"success": True,
"message_id": result.get("result", {}).get("message_id"),
}
else:
# Log detailed error with diagnostics
self._log_telegram_error(
error_code=result.get("error_code"),
description=result.get("description", "Unknown Telegram error"),
data=data,
media_type="document",
)
return {
"success": False,
"error": result.get("description", "Unknown Telegram error"),
"error_code": result.get("error_code"),
}
except aiohttp.ClientError as err:
_LOGGER.error("Telegram document upload failed: %s", err)
return {"success": False, "error": str(err)}
async def _send_telegram_media_group(
self,
session: Any,
token: str,
chat_id: str,
assets: list[dict[str, str]],
caption: str | None = None,
reply_to_message_id: int | None = None,
max_group_size: int = 10,
chunk_delay: int = 0,
parse_mode: str = "HTML",
max_asset_data_size: int | None = None,
send_large_photos_as_documents: bool = False,
) -> ServiceResponse:
"""Send media assets to Telegram as media group(s).
If assets list exceeds max_group_size, splits into multiple media groups.
For chunks with single items, uses sendPhoto/sendVideo APIs.
Applies chunk_delay (in milliseconds) between groups if specified.
"""
import json
import asyncio
import aiohttp
from aiohttp import FormData
# Split assets into chunks based on max_group_size
chunks = [assets[i:i + max_group_size] for i in range(0, len(assets), max_group_size)]
all_message_ids = []
_LOGGER.debug("Sending %d media items in %d chunk(s) of max %d items (delay: %dms)",
len(assets), len(chunks), max_group_size, chunk_delay)
for chunk_idx, chunk in enumerate(chunks):
# Add delay before sending subsequent chunks
if chunk_idx > 0 and chunk_delay > 0:
delay_seconds = chunk_delay / 1000
_LOGGER.debug("Waiting %dms (%ss) before sending chunk %d/%d",
chunk_delay, delay_seconds, chunk_idx + 1, len(chunks))
await asyncio.sleep(delay_seconds)
# Optimize: Use single-item APIs for chunks with 1 item
if len(chunk) == 1:
item = chunk[0]
media_type = item.get("type", "document")
url = item.get("url")
item_content_type = item.get("content_type")
item_cache_key = item.get("cache_key")
# Only apply caption and reply_to to the first chunk
chunk_caption = caption if chunk_idx == 0 else None
chunk_reply_to = reply_to_message_id if chunk_idx == 0 else None
result = None
if media_type == "photo":
_LOGGER.debug("Sending chunk %d/%d as single photo", chunk_idx + 1, len(chunks))
result = await self._send_telegram_photo(
session, token, chat_id, url, chunk_caption, chunk_reply_to, parse_mode,
max_asset_data_size, send_large_photos_as_documents, item_content_type, item_cache_key
)
elif media_type == "video":
_LOGGER.debug("Sending chunk %d/%d as single video", chunk_idx + 1, len(chunks))
result = await self._send_telegram_video(
session, token, chat_id, url, chunk_caption, chunk_reply_to, parse_mode,
max_asset_data_size, item_content_type, item_cache_key
)
else: # document
_LOGGER.debug("Sending chunk %d/%d as single document", chunk_idx + 1, len(chunks))
if not url:
return {"success": False, "error": "Missing 'url' for document", "failed_at_chunk": chunk_idx + 1}
try:
download_url = self.coordinator.get_internal_download_url(url)
async with session.get(download_url) as resp:
if resp.status != 200:
return {"success": False, "error": f"Failed to download media: HTTP {resp.status}", "failed_at_chunk": chunk_idx + 1}
data = await resp.read()
if max_asset_data_size is not None and len(data) > max_asset_data_size:
_LOGGER.warning("Media size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping", len(data), max_asset_data_size)
continue
filename = url.split("/")[-1].split("?")[0] or "file"
result = await self._send_telegram_document(
session, token, chat_id, data, filename, chunk_caption, chunk_reply_to, parse_mode,
url, item_content_type, item_cache_key
)
except aiohttp.ClientError as err:
return {"success": False, "error": f"Failed to download media: {err}", "failed_at_chunk": chunk_idx + 1}
if result is None:
# Document was skipped (e.g., exceeded max_asset_data_size)
continue
if not result.get("success"):
result["failed_at_chunk"] = chunk_idx + 1
return result
all_message_ids.append(result.get("message_id"))
continue
# Multi-item chunk: use sendMediaGroup
_LOGGER.debug("Sending chunk %d/%d as media group (%d items)", chunk_idx + 1, len(chunks), len(chunk))
# Helper to get the appropriate cache for a cache key
def get_cache_for_key(key: str, is_asset: bool | None = None) -> TelegramFileCache | None:
"""Return asset cache if key is a UUID, otherwise URL cache."""
if is_asset is None:
is_asset = _is_asset_id(key)
return self.coordinator.telegram_asset_cache if is_asset else self.coordinator.telegram_cache
# Collect media items - either from cache (file_id) or by downloading
# Each item: (type, media_ref, filename, cache_key, is_cached, content_type)
# media_ref is either file_id (str) or data (bytes)
media_items: list[tuple[str, str | bytes, str, str, bool, str | None]] = []
oversized_photos: list[tuple[bytes, str | None, str, str | None]] = [] # (data, caption, url, cache_key)
documents_to_send: list[tuple[bytes, str | None, str, str | None, str, str | None]] = [] # (data, caption, url, cache_key, filename, content_type)
skipped_count = 0
for i, item in enumerate(chunk):
url = item.get("url")
if not url:
return {
"success": False,
"error": f"Missing 'url' in item {chunk_idx * max_group_size + i}",
}
media_type = item.get("type", "document")
item_content_type = item.get("content_type")
# Determine cache key: custom cache_key -> extracted asset ID -> URL
custom_cache_key = item.get("cache_key")
extracted_asset_id = _extract_asset_id_from_url(url) if not custom_cache_key else None
item_cache_key = custom_cache_key or extracted_asset_id or url
if media_type not in ("photo", "video", "document"):
return {
"success": False,
"error": f"Invalid type '{media_type}' in item {chunk_idx * max_group_size + i}. Must be 'photo', 'video', or 'document'.",
}
# Documents can't be in media groups - collect them for separate sending
if media_type == "document":
try:
download_url = self.coordinator.get_internal_download_url(url)
async with session.get(download_url) as resp:
if resp.status != 200:
return {
"success": False,
"error": f"Failed to download media {chunk_idx * max_group_size + i}: HTTP {resp.status}",
}
data = await resp.read()
if max_asset_data_size is not None and len(data) > max_asset_data_size:
_LOGGER.warning(
"Media %d size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping",
chunk_idx * max_group_size + i, len(data), max_asset_data_size
)
skipped_count += 1
continue
# Caption only on first item of first chunk if no media items yet
doc_caption = caption if chunk_idx == 0 and i == 0 and len(media_items) == 0 and len(documents_to_send) == 0 else None
filename = url.split("/")[-1].split("?")[0] or f"file_{i}"
documents_to_send.append((data, doc_caption, url, custom_cache_key, filename, item_content_type))
except aiohttp.ClientError as err:
return {
"success": False,
"error": f"Failed to download media {chunk_idx * max_group_size + i}: {err}",
}
continue
# Check cache first for photos/videos
is_asset = _is_asset_id(item_cache_key)
item_cache = get_cache_for_key(item_cache_key, is_asset)
item_thumbhash = self.coordinator.get_asset_thumbhash(item_cache_key) if is_asset else None
cached = item_cache.get(item_cache_key, thumbhash=item_thumbhash) if item_cache else None
if cached and cached.get("file_id"):
# Use cached file_id
ext = "jpg" if media_type == "photo" else "mp4"
filename = f"media_{chunk_idx * max_group_size + i}.{ext}"
media_items.append((media_type, cached["file_id"], filename, item_cache_key, True, item_content_type))
_LOGGER.debug("Using cached file_id for media %d", chunk_idx * max_group_size + i)
continue
try:
# Download using internal URL for faster local network transfer
download_url = self.coordinator.get_internal_download_url(url)
_LOGGER.debug("Downloading media %d from %s", chunk_idx * max_group_size + i, download_url[:80])
async with session.get(download_url) as resp:
if resp.status != 200:
return {
"success": False,
"error": f"Failed to download media {chunk_idx * max_group_size + i}: HTTP {resp.status}",
}
data = await resp.read()
_LOGGER.debug("Downloaded media %d: %d bytes", chunk_idx * max_group_size + i, len(data))
# Check if media exceeds max_asset_data_size limit (user-defined limit for skipping)
if max_asset_data_size is not None and len(data) > max_asset_data_size:
_LOGGER.warning(
"Media %d size (%d bytes) exceeds max_asset_data_size limit (%d bytes), skipping",
chunk_idx * max_group_size + i, len(data), max_asset_data_size
)
skipped_count += 1
continue
# For videos, check Telegram upload limit (50 MB)
if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE:
_LOGGER.warning(
"Video %d size (%d bytes, %.1f MB) exceeds Telegram's %.0f MB upload limit, skipping",
chunk_idx * max_group_size + i, len(data),
len(data) / (1024 * 1024),
TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024),
)
skipped_count += 1
continue
# For photos, check Telegram limits
if media_type == "photo":
exceeds_limits, reason, width, height = self._check_telegram_photo_limits(data)
if exceeds_limits:
if send_large_photos_as_documents:
# Separate this photo to send as document later
# Caption only on first item of first chunk
photo_caption = caption if chunk_idx == 0 and i == 0 and len(media_items) == 0 else None
oversized_photos.append((data, photo_caption, url, custom_cache_key))
_LOGGER.info("Photo %d %s, will send as document", i, reason)
continue
else:
# Skip oversized photo
_LOGGER.warning("Photo %d %s, skipping (set send_large_photos_as_documents=true to send as document)", i, reason)
skipped_count += 1
continue
ext = "jpg" if media_type == "photo" else "mp4"
filename = f"media_{chunk_idx * max_group_size + i}.{ext}"
media_items.append((media_type, data, filename, item_cache_key, False, item_content_type))
except aiohttp.ClientError as err:
return {
"success": False,
"error": f"Failed to download media {chunk_idx * max_group_size + i}: {err}",
}
# Skip this chunk if all files were filtered out
if not media_items and not oversized_photos and not documents_to_send:
_LOGGER.info("Chunk %d/%d: all %d media items skipped",
chunk_idx + 1, len(chunks), len(chunk))
continue
# Split media items into sub-groups respecting Telegram's upload size limit
# This ensures the total upload data per sendMediaGroup call stays under 50 MB
if media_items:
media_sub_groups = _split_media_by_upload_size(media_items, TELEGRAM_MAX_VIDEO_SIZE)
if len(media_sub_groups) > 1:
_LOGGER.debug(
"Chunk %d/%d: split %d media items into %d sub-groups by upload size",
chunk_idx + 1, len(chunks), len(media_items), len(media_sub_groups),
)
first_caption_used = False
for sub_idx, sub_group_items in enumerate(media_sub_groups):
is_first = chunk_idx == 0 and sub_idx == 0
sub_caption = caption if is_first and not first_caption_used and not oversized_photos else None
sub_reply_to = reply_to_message_id if is_first else None
# Add delay between sub-groups (not before the first one)
if sub_idx > 0 and chunk_delay > 0:
await asyncio.sleep(chunk_delay / 1000)
# Single item - use sendPhoto/sendVideo (sendMediaGroup requires 2+ items)
if len(sub_group_items) == 1:
sg_type, sg_ref, sg_fname, sg_ck, sg_cached, sg_ct = sub_group_items[0]
if sg_type == "photo":
api_method = "sendPhoto"
media_field = "photo"
else:
api_method = "sendVideo"
media_field = "video"
try:
if sg_cached:
sg_payload: dict[str, Any] = {
"chat_id": chat_id,
media_field: sg_ref,
"parse_mode": parse_mode,
}
if sub_caption:
sg_payload["caption"] = sub_caption
if sub_reply_to:
sg_payload["reply_to_message_id"] = sub_reply_to
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/{api_method}"
async with session.post(telegram_url, json=sg_payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
all_message_ids.append(result["result"].get("message_id"))
if sub_caption:
first_caption_used = True
else:
_LOGGER.debug("Cached file_id failed in sub-group, will re-upload: %s", result.get("description"))
sg_cached = False # Fall through to upload
if not sg_cached:
sg_form = FormData()
sg_form.add_field("chat_id", chat_id)
sg_content_type = sg_ct or ("image/jpeg" if sg_type == "photo" else "video/mp4")
sg_form.add_field(media_field, sg_ref, filename=sg_fname, content_type=sg_content_type)
sg_form.add_field("parse_mode", parse_mode)
if sub_caption:
sg_form.add_field("caption", sub_caption)
if sub_reply_to:
sg_form.add_field("reply_to_message_id", str(sub_reply_to))
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/{api_method}"
async with session.post(telegram_url, data=sg_form) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
all_message_ids.append(result["result"].get("message_id"))
if sub_caption:
first_caption_used = True
# Cache the uploaded file_id
sg_is_asset = _is_asset_id(sg_ck)
sg_cache = get_cache_for_key(sg_ck, sg_is_asset)
if sg_cache:
sg_thumbhash = self.coordinator.get_asset_thumbhash(sg_ck) if sg_is_asset else None
result_data = result.get("result", {})
if sg_type == "photo":
photos = result_data.get("photo", [])
if photos:
await sg_cache.async_set(sg_ck, photos[-1].get("file_id"), "photo", thumbhash=sg_thumbhash)
elif sg_type == "video":
video = result_data.get("video", {})
if video.get("file_id"):
await sg_cache.async_set(sg_ck, video["file_id"], "video", thumbhash=sg_thumbhash)
else:
self._log_telegram_error(
error_code=result.get("error_code"),
description=result.get("description", "Unknown Telegram error"),
data=sg_ref if isinstance(sg_ref, bytes) else None,
media_type=sg_type,
)
return {
"success": False,
"error": result.get("description", "Unknown Telegram error"),
"error_code": result.get("error_code"),
"failed_at_chunk": chunk_idx + 1,
}
except aiohttp.ClientError as err:
_LOGGER.error("Telegram upload failed for sub-group %d: %s", sub_idx + 1, err)
return {"success": False, "error": str(err), "failed_at_chunk": chunk_idx + 1}
continue
# Multiple items - use sendMediaGroup
all_cached = all(is_cached for _, _, _, _, is_cached, _ in sub_group_items)
if all_cached:
_LOGGER.debug("Sub-group %d/%d: all %d items cached, using file_ids",
sub_idx + 1, len(media_sub_groups), len(sub_group_items))
media_json = []
for i, (media_type, file_id, _, _, _, _) in enumerate(sub_group_items):
media_item_json: dict[str, Any] = {
"type": media_type,
"media": file_id,
}
if i == 0 and sub_caption and not first_caption_used:
media_item_json["caption"] = sub_caption
media_item_json["parse_mode"] = parse_mode
media_json.append(media_item_json)
payload: dict[str, Any] = {
"chat_id": chat_id,
"media": media_json,
}
if sub_reply_to:
payload["reply_to_message_id"] = sub_reply_to
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendMediaGroup"
try:
async with session.post(telegram_url, json=payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
chunk_message_ids = [
msg.get("message_id") for msg in result.get("result", [])
]
all_message_ids.extend(chunk_message_ids)
if sub_caption:
first_caption_used = True
else:
# Cache might be stale - fall through to upload path
_LOGGER.debug("Cached file_ids failed, will re-upload: %s", result.get("description"))
all_cached = False # Force re-upload
except aiohttp.ClientError as err:
_LOGGER.debug("Cached file_ids request failed: %s", err)
all_cached = False
if not all_cached:
# Build multipart form with mix of cached file_ids and uploaded data
form = FormData()
form.add_field("chat_id", chat_id)
if sub_reply_to:
form.add_field("reply_to_message_id", str(sub_reply_to))
# Build media JSON - use file_id for cached, attach:// for uploaded
media_json = []
upload_idx = 0
# (cache_key, result_idx, type, is_asset, thumbhash)
keys_to_cache: list[tuple[str, int, str, bool, str | None]] = []
for i, (media_type, media_ref, filename, item_cache_key, is_cached, item_content_type) in enumerate(sub_group_items):
if is_cached:
# Use file_id directly
media_item_json: dict[str, Any] = {
"type": media_type,
"media": media_ref, # file_id
}
else:
# Upload this file
attach_name = f"file{upload_idx}"
media_item_json = {
"type": media_type,
"media": f"attach://{attach_name}",
}
# Use provided content_type or default based on media type
content_type = item_content_type or ("image/jpeg" if media_type == "photo" else "video/mp4")
form.add_field(attach_name, media_ref, filename=filename, content_type=content_type)
ck_is_asset = _is_asset_id(item_cache_key)
ck_thumbhash = self.coordinator.get_asset_thumbhash(item_cache_key) if ck_is_asset else None
keys_to_cache.append((item_cache_key, i, media_type, ck_is_asset, ck_thumbhash))
upload_idx += 1
if i == 0 and sub_caption and not first_caption_used:
media_item_json["caption"] = sub_caption
media_item_json["parse_mode"] = parse_mode
media_json.append(media_item_json)
form.add_field("media", json.dumps(media_json))
# Send to Telegram
telegram_url = f"{TELEGRAM_API_BASE_URL}{token}/sendMediaGroup"
try:
_LOGGER.debug("Uploading media group sub-group %d/%d (%d files, %d cached) to Telegram",
sub_idx + 1, len(media_sub_groups), len(sub_group_items), len(sub_group_items) - upload_idx)
async with session.post(telegram_url, data=form) as response:
result = await response.json()
_LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok"))
if response.status == 200 and result.get("ok"):
chunk_message_ids = [
msg.get("message_id") for msg in result.get("result", [])
]
all_message_ids.extend(chunk_message_ids)
if sub_caption:
first_caption_used = True
# Cache the newly uploaded file_ids (batched per cache instance)
if keys_to_cache:
result_messages = result.get("result", [])
# Group entries by cache instance for batch writes
cache_batches: dict[int, tuple[TelegramFileCache, list[tuple[str, str, str, str | None]]]] = {}
for ck, result_idx, m_type, ck_is_asset, ck_thumbhash in keys_to_cache:
ck_cache = get_cache_for_key(ck, ck_is_asset)
if result_idx >= len(result_messages) or not ck_cache:
continue
msg = result_messages[result_idx]
file_id = None
if m_type == "photo":
photos = msg.get("photo", [])
if photos:
file_id = photos[-1].get("file_id")
elif m_type == "video":
video = msg.get("video", {})
file_id = video.get("file_id")
if file_id:
cache_id = id(ck_cache)
if cache_id not in cache_batches:
cache_batches[cache_id] = (ck_cache, [])
cache_batches[cache_id][1].append((ck, file_id, m_type, ck_thumbhash))
for ck_cache, batch_entries in cache_batches.values():
await ck_cache.async_set_many(batch_entries)
else:
# Log detailed error for media group with total size info
uploaded_data = [m for m in sub_group_items if not m[4]]
total_size = sum(len(d) for _, d, _, _, _, _ in uploaded_data if isinstance(d, bytes))
_LOGGER.error(
"Telegram API error for sub-group %d/%d: %s | Media count: %d | Uploaded size: %d bytes (%.2f MB)",
sub_idx + 1, len(media_sub_groups),
result.get("description", "Unknown Telegram error"),
len(sub_group_items),
total_size,
total_size / (1024 * 1024) if total_size else 0
)
# Log detailed diagnostics for the first photo in the group
for media_type, media_ref, _, _, is_cached, _ in sub_group_items:
if media_type == "photo" and not is_cached and isinstance(media_ref, bytes):
self._log_telegram_error(
error_code=result.get("error_code"),
description=result.get("description", "Unknown Telegram error"),
data=media_ref,
media_type="photo",
)
break # Only log details for first photo
return {
"success": False,
"error": result.get("description", "Unknown Telegram error"),
"error_code": result.get("error_code"),
"failed_at_chunk": chunk_idx + 1,
}
except aiohttp.ClientError as err:
_LOGGER.error("Telegram upload failed for sub-group %d: %s", sub_idx + 1, err)
return {
"success": False,
"error": str(err),
"failed_at_chunk": chunk_idx + 1,
}
# Send oversized photos as documents
for i, (data, photo_caption, photo_url, photo_cache_key) in enumerate(oversized_photos):
_LOGGER.debug("Sending oversized photo %d/%d as document", i + 1, len(oversized_photos))
result = await self._send_telegram_document(
session, token, chat_id, data, f"photo_{i}.jpg",
photo_caption, None, parse_mode, photo_url, None, photo_cache_key
)
if result.get("success"):
all_message_ids.append(result.get("message_id"))
else:
_LOGGER.error("Failed to send oversized photo as document: %s", result.get("error"))
# Continue with other photos even if one fails
# Send documents (can't be in media groups)
for i, (data, doc_caption, doc_url, doc_cache_key, filename, doc_content_type) in enumerate(documents_to_send):
_LOGGER.debug("Sending document %d/%d", i + 1, len(documents_to_send))
result = await self._send_telegram_document(
session, token, chat_id, data, filename,
doc_caption, None, parse_mode, doc_url, doc_content_type, doc_cache_key
)
if result.get("success"):
all_message_ids.append(result.get("message_id"))
else:
_LOGGER.error("Failed to send document: %s", result.get("error"))
# Continue with other documents even if one fails
return {
"success": True,
"message_ids": all_message_ids,
"chunks_sent": len(chunks),
}
class ImmichAlbumIdSensor(ImmichAlbumBaseSensor):
"""Sensor exposing the Immich album ID."""
_attr_icon = "mdi:identifier"
_attr_translation_key = "album_id"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_album_id"
@property
def native_value(self) -> str | None:
"""Return the album ID."""
if self._album_data:
return self._album_data.id
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return extra state attributes."""
if not self._album_data:
return {}
attrs: dict[str, Any] = {
ATTR_ALBUM_NAME: self._album_data.name,
ATTR_ASSET_COUNT: self._album_data.asset_count,
ATTR_LAST_UPDATED: self._album_data.updated_at,
ATTR_CREATED_AT: self._album_data.created_at,
}
# Primary share URL (prefers public, falls back to protected)
share_url = self.coordinator.get_any_url()
if share_url:
attrs["share_url"] = share_url
return attrs
class ImmichAlbumAssetCountSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album asset count."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_icon = "mdi:image-album"
_attr_translation_key = "album_asset_count"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_asset_count"
@property
def native_value(self) -> int | None:
"""Return the state of the sensor (asset count)."""
if self._album_data:
return self._album_data.asset_count
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return extra state attributes."""
if not self._album_data:
return {}
attrs = {
ATTR_ALBUM_ID: self._album_data.id,
ATTR_ASSET_COUNT: self._album_data.asset_count,
ATTR_PHOTO_COUNT: self._album_data.photo_count,
ATTR_VIDEO_COUNT: self._album_data.video_count,
ATTR_LAST_UPDATED: self._album_data.updated_at,
ATTR_CREATED_AT: self._album_data.created_at,
ATTR_SHARED: self._album_data.shared,
ATTR_OWNER: self._album_data.owner,
ATTR_PEOPLE: list(self._album_data.people),
}
if self._album_data.thumbnail_asset_id:
attrs[ATTR_THUMBNAIL_URL] = (
f"{self.coordinator.immich_url}/api/assets/"
f"{self._album_data.thumbnail_asset_id}/thumbnail"
)
return attrs
class ImmichAlbumPhotoCountSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album photo count."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_icon = "mdi:image"
_attr_translation_key = "album_photo_count"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_photo_count"
@property
def native_value(self) -> int | None:
"""Return the state of the sensor (photo count)."""
if self._album_data:
return self._album_data.photo_count
return None
class ImmichAlbumVideoCountSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album video count."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_icon = "mdi:video"
_attr_translation_key = "album_video_count"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_video_count"
@property
def native_value(self) -> int | None:
"""Return the state of the sensor (video count)."""
if self._album_data:
return self._album_data.video_count
return None
class ImmichAlbumLastUpdatedSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album last updated time."""
_attr_device_class = SensorDeviceClass.TIMESTAMP
_attr_icon = "mdi:clock-outline"
_attr_translation_key = "album_last_updated"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_last_updated"
@property
def native_value(self) -> datetime | None:
"""Return the state of the sensor (last updated datetime)."""
if self._album_data and self._album_data.updated_at:
try:
return datetime.fromisoformat(
self._album_data.updated_at.replace("Z", "+00:00")
)
except ValueError:
return None
return None
class ImmichAlbumCreatedSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album creation date."""
_attr_device_class = SensorDeviceClass.TIMESTAMP
_attr_icon = "mdi:calendar-plus"
_attr_translation_key = "album_created"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_created"
@property
def native_value(self) -> datetime | None:
"""Return the state of the sensor (creation datetime)."""
if self._album_data and self._album_data.created_at:
try:
return datetime.fromisoformat(
self._album_data.created_at.replace("Z", "+00:00")
)
except ValueError:
return None
return None
class ImmichAlbumPublicUrlSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album public URL."""
_attr_icon = "mdi:link-variant"
_attr_translation_key = "album_public_url"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_public_url"
@property
def native_value(self) -> str | None:
"""Return the state of the sensor (public URL)."""
if self._album_data:
return self.coordinator.get_public_url()
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return extra state attributes."""
if not self._album_data:
return {}
attrs = {
ATTR_ALBUM_ID: self._album_data.id,
ATTR_SHARED: self._album_data.shared,
}
all_urls = self.coordinator.get_public_urls()
if len(all_urls) > 1:
attrs[ATTR_ALBUM_URLS] = all_urls
links_info = self.coordinator.get_shared_links_info()
if links_info:
attrs["shared_links"] = links_info
return attrs
class ImmichAlbumProtectedUrlSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album password-protected URL."""
_attr_icon = "mdi:link-lock"
_attr_translation_key = "album_protected_url"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_protected_url"
@property
def native_value(self) -> str | None:
"""Return the state of the sensor (protected URL)."""
if self._album_data:
return self.coordinator.get_protected_url()
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return extra state attributes."""
if not self._album_data:
return {}
attrs = {
ATTR_ALBUM_ID: self._album_data.id,
}
all_urls = self.coordinator.get_protected_urls()
if len(all_urls) > 1:
attrs["protected_urls"] = all_urls
return attrs
class ImmichAlbumProtectedPasswordSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album protected link password."""
_attr_icon = "mdi:key"
_attr_translation_key = "album_protected_password"
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_protected_password"
@property
def native_value(self) -> str | None:
"""Return the state of the sensor (protected link password)."""
if self._album_data:
return self.coordinator.get_protected_password()
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return extra state attributes."""
if not self._album_data:
return {}
return {
ATTR_ALBUM_ID: self._album_data.id,
ATTR_ALBUM_PROTECTED_URL: self.coordinator.get_protected_url(),
}