"""Sensor platform for Immich Album Watcher.""" from __future__ import annotations import logging from datetime import datetime from typing import Any import voluptuous as vol from homeassistant.components.sensor import ( SensorDeviceClass, SensorEntity, SensorStateClass, ) from homeassistant.config_entries import ConfigEntry, ConfigSubentry from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse, callback from homeassistant.helpers import entity_platform from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.device_registry import DeviceEntryType from homeassistant.helpers.entity import DeviceInfo from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.update_coordinator import CoordinatorEntity from homeassistant.util import slugify from immich_watcher_core.models import AlbumData from immich_watcher_core.telegram.client import TelegramClient from .const import ( ATTR_ALBUM_ID, ATTR_ALBUM_NAME, ATTR_ALBUM_PROTECTED_URL, ATTR_ALBUM_URLS, ATTR_ASSET_COUNT, ATTR_CREATED_AT, ATTR_LAST_UPDATED, ATTR_OWNER, ATTR_PEOPLE, ATTR_PHOTO_COUNT, ATTR_SHARED, ATTR_THUMBNAIL_URL, ATTR_VIDEO_COUNT, CONF_ALBUM_ID, CONF_ALBUM_NAME, CONF_HUB_NAME, CONF_TELEGRAM_BOT_TOKEN, DOMAIN, SERVICE_GET_ASSETS, SERVICE_REFRESH, SERVICE_SEND_TELEGRAM_NOTIFICATION, ) from .coordinator import ImmichAlbumWatcherCoordinator from .storage import NotificationQueue _LOGGER = logging.getLogger(__name__) async def async_setup_entry( hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback, ) -> None: """Set up sensor entities for all album subentries.""" entry_data = hass.data[DOMAIN][entry.entry_id] for subentry_id, subentry in entry.subentries.items(): subentry_data = entry_data["subentries"].get(subentry_id) if not subentry_data: continue coordinator = subentry_data.coordinator entities = [ ImmichAlbumIdSensor(coordinator, entry, subentry), ImmichAlbumAssetCountSensor(coordinator, entry, subentry), ImmichAlbumPhotoCountSensor(coordinator, entry, subentry), ImmichAlbumVideoCountSensor(coordinator, entry, subentry), ImmichAlbumLastUpdatedSensor(coordinator, entry, subentry), ImmichAlbumCreatedSensor(coordinator, entry, subentry), ImmichAlbumPublicUrlSensor(coordinator, entry, subentry), ImmichAlbumProtectedUrlSensor(coordinator, entry, subentry), ImmichAlbumProtectedPasswordSensor(coordinator, entry, subentry), ] async_add_entities(entities, config_subentry_id=subentry_id) # Register entity services platform = entity_platform.async_get_current_platform() platform.async_register_entity_service( SERVICE_REFRESH, {}, "async_refresh_album", ) platform.async_register_entity_service( SERVICE_GET_ASSETS, { vol.Optional("limit", default=10): vol.All( vol.Coerce(int), vol.Range(min=1, max=100) ), vol.Optional("offset", default=0): vol.All( vol.Coerce(int), vol.Range(min=0) ), vol.Optional("favorite_only", default=False): bool, vol.Optional("filter_min_rating", default=1): vol.All( vol.Coerce(int), vol.Range(min=1, max=5) ), vol.Optional("order_by", default="date"): vol.In( ["date", "rating", "name", "random"] ), vol.Optional("order", default="descending"): vol.In( ["ascending", "descending"] ), vol.Optional("asset_type", default="all"): vol.In(["all", "photo", "video"]), vol.Optional("min_date"): str, vol.Optional("max_date"): str, vol.Optional("memory_date"): str, vol.Optional("city"): str, vol.Optional("state"): str, vol.Optional("country"): str, }, "async_get_assets", supports_response=SupportsResponse.ONLY, ) platform.async_register_entity_service( SERVICE_SEND_TELEGRAM_NOTIFICATION, { vol.Optional("bot_token"): str, vol.Required("chat_id"): vol.Coerce(str), vol.Optional("assets"): list, vol.Optional("caption"): str, vol.Optional("reply_to_message_id"): vol.Coerce(int), vol.Optional("disable_web_page_preview"): bool, vol.Optional("parse_mode", default="HTML"): str, vol.Optional("max_group_size", default=10): vol.All( vol.Coerce(int), vol.Range(min=2, max=10) ), vol.Optional("chunk_delay", default=0): vol.All( vol.Coerce(int), vol.Range(min=0, max=60000) ), vol.Optional("wait_for_response", default=True): bool, vol.Optional("max_asset_data_size"): vol.All( vol.Coerce(int), vol.Range(min=1, max=52428800) ), vol.Optional("send_large_photos_as_documents", default=False): bool, vol.Optional("chat_action", default="typing"): vol.Any( None, vol.In(["", "typing", "upload_photo", "upload_video", "upload_document"]) ), vol.Optional("quiet_hours_start"): vol.Match(r"^\d{2}:\d{2}$"), vol.Optional("quiet_hours_end"): vol.Match(r"^\d{2}:\d{2}$"), }, "async_send_telegram_notification", supports_response=SupportsResponse.OPTIONAL, ) class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], SensorEntity): """Base sensor for Immich album.""" _attr_has_entity_name = True def __init__( self, coordinator: ImmichAlbumWatcherCoordinator, entry: ConfigEntry, subentry: ConfigSubentry, ) -> None: """Initialize the sensor.""" super().__init__(coordinator) self._entry = entry self._subentry = subentry self._album_id = subentry.data[CONF_ALBUM_ID] self._album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album") self._hub_name = entry.data.get(CONF_HUB_NAME, "Immich") self._unique_id_prefix = slugify(f"{self._hub_name}_album_{self._album_name}") @property def _album_data(self) -> AlbumData | None: """Get the album data from coordinator.""" return self.coordinator.data @property def available(self) -> bool: """Return if entity is available.""" return self.coordinator.last_update_success and self._album_data is not None @property def device_info(self) -> DeviceInfo: """Return device info - one device per album.""" return DeviceInfo( identifiers={(DOMAIN, self._subentry.subentry_id)}, name=self._album_name, manufacturer="Immich", entry_type=DeviceEntryType.SERVICE, ) @callback def _handle_coordinator_update(self) -> None: """Handle updated data from the coordinator.""" self.async_write_ha_state() async def async_refresh_album(self) -> None: """Refresh data for this album.""" await self.coordinator.async_refresh_now() async def async_get_assets( self, limit: int = 10, offset: int = 0, favorite_only: bool = False, filter_min_rating: int = 1, order_by: str = "date", order: str = "descending", asset_type: str = "all", min_date: str | None = None, max_date: str | None = None, memory_date: str | None = None, city: str | None = None, state: str | None = None, country: str | None = None, ) -> ServiceResponse: """Get assets for this album with optional filtering and ordering.""" assets = await self.coordinator.async_get_assets( limit=limit, offset=offset, favorite_only=favorite_only, filter_min_rating=filter_min_rating, order_by=order_by, order=order, asset_type=asset_type, min_date=min_date, max_date=max_date, memory_date=memory_date, city=city, state=state, country=country, ) return {"assets": assets} @staticmethod def _is_quiet_hours(start_str: str | None, end_str: str | None) -> bool: """Check if current time is within quiet hours.""" from datetime import time as dt_time from homeassistant.util import dt as dt_util if not start_str or not end_str: return False try: now = dt_util.now().time() start_time = dt_time.fromisoformat(start_str) end_time = dt_time.fromisoformat(end_str) except ValueError: return False if start_time <= end_time: return start_time <= now < end_time else: return now >= start_time or now < end_time async def async_send_telegram_notification( self, chat_id: str, assets: list[dict[str, str]] | None = None, bot_token: str | None = None, caption: str | None = None, reply_to_message_id: int | None = None, disable_web_page_preview: bool | None = None, parse_mode: str = "HTML", max_group_size: int = 10, chunk_delay: int = 0, wait_for_response: bool = True, max_asset_data_size: int | None = None, send_large_photos_as_documents: bool = False, chat_action: str | None = "typing", quiet_hours_start: str | None = None, quiet_hours_end: str | None = None, ) -> ServiceResponse: """Send notification to Telegram.""" # Check quiet hours — queue notification if active if self._is_quiet_hours(quiet_hours_start, quiet_hours_end): from . import _register_queue_timers queue: NotificationQueue = self.hass.data[DOMAIN][self._entry.entry_id]["notification_queue"] await queue.async_enqueue({ "entity_id": self.entity_id, "chat_id": chat_id, "assets": assets, "bot_token": bot_token, "caption": caption, "reply_to_message_id": reply_to_message_id, "disable_web_page_preview": disable_web_page_preview, "parse_mode": parse_mode, "max_group_size": max_group_size, "chunk_delay": chunk_delay, "max_asset_data_size": max_asset_data_size, "send_large_photos_as_documents": send_large_photos_as_documents, "chat_action": chat_action, "quiet_hours_start": quiet_hours_start, "quiet_hours_end": quiet_hours_end, }) _register_queue_timers(self.hass, self._entry) return {"success": True, "status": "queued_quiet_hours"} # If non-blocking mode, create a background task and return immediately if not wait_for_response: self.hass.async_create_task( self._execute_telegram_notification( chat_id=chat_id, assets=assets, bot_token=bot_token, caption=caption, reply_to_message_id=reply_to_message_id, disable_web_page_preview=disable_web_page_preview, parse_mode=parse_mode, max_group_size=max_group_size, chunk_delay=chunk_delay, max_asset_data_size=max_asset_data_size, send_large_photos_as_documents=send_large_photos_as_documents, chat_action=chat_action, ) ) return {"success": True, "status": "queued", "message": "Notification queued for background processing"} return await self._execute_telegram_notification( chat_id=chat_id, assets=assets, bot_token=bot_token, caption=caption, reply_to_message_id=reply_to_message_id, disable_web_page_preview=disable_web_page_preview, parse_mode=parse_mode, max_group_size=max_group_size, chunk_delay=chunk_delay, max_asset_data_size=max_asset_data_size, send_large_photos_as_documents=send_large_photos_as_documents, chat_action=chat_action, ) async def _execute_telegram_notification( self, chat_id: str, assets: list[dict[str, str]] | None = None, bot_token: str | None = None, caption: str | None = None, reply_to_message_id: int | None = None, disable_web_page_preview: bool | None = None, parse_mode: str = "HTML", max_group_size: int = 10, chunk_delay: int = 0, max_asset_data_size: int | None = None, send_large_photos_as_documents: bool = False, chat_action: str | None = "typing", ) -> ServiceResponse: """Execute the Telegram notification using core TelegramClient.""" # Get bot token from parameter or config token = bot_token or self._entry.options.get(CONF_TELEGRAM_BOT_TOKEN) if not token: return { "success": False, "error": "No bot token provided. Set it in integration options or pass as parameter.", } session = async_get_clientsession(self.hass) # Create core TelegramClient with HA-managed session and coordinator caches telegram = TelegramClient( session, token, url_cache=self.coordinator.telegram_cache, asset_cache=self.coordinator.telegram_asset_cache, url_resolver=self.coordinator.get_internal_download_url, thumbhash_resolver=self.coordinator.get_asset_thumbhash, ) return await telegram.send_notification( chat_id=chat_id, assets=assets, caption=caption, reply_to_message_id=reply_to_message_id, disable_web_page_preview=disable_web_page_preview, parse_mode=parse_mode, max_group_size=max_group_size, chunk_delay=chunk_delay, max_asset_data_size=max_asset_data_size, send_large_photos_as_documents=send_large_photos_as_documents, chat_action=chat_action, ) class ImmichAlbumIdSensor(ImmichAlbumBaseSensor): """Sensor exposing the Immich album ID.""" _attr_icon = "mdi:identifier" _attr_translation_key = "album_id" def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_album_id" @property def native_value(self) -> str | None: if self._album_data: return self._album_data.id return None @property def extra_state_attributes(self) -> dict[str, Any]: if not self._album_data: return {} attrs: dict[str, Any] = { ATTR_ALBUM_NAME: self._album_data.name, ATTR_ASSET_COUNT: self._album_data.asset_count, ATTR_LAST_UPDATED: self._album_data.updated_at, ATTR_CREATED_AT: self._album_data.created_at, } share_url = self.coordinator.get_any_url() if share_url: attrs["share_url"] = share_url return attrs class ImmichAlbumAssetCountSensor(ImmichAlbumBaseSensor): """Sensor representing an Immich album asset count.""" _attr_state_class = SensorStateClass.MEASUREMENT _attr_icon = "mdi:image-album" _attr_translation_key = "album_asset_count" def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_asset_count" @property def native_value(self) -> int | None: if self._album_data: return self._album_data.asset_count return None @property def extra_state_attributes(self) -> dict[str, Any]: if not self._album_data: return {} attrs = { ATTR_ALBUM_ID: self._album_data.id, ATTR_ASSET_COUNT: self._album_data.asset_count, ATTR_PHOTO_COUNT: self._album_data.photo_count, ATTR_VIDEO_COUNT: self._album_data.video_count, ATTR_LAST_UPDATED: self._album_data.updated_at, ATTR_CREATED_AT: self._album_data.created_at, ATTR_SHARED: self._album_data.shared, ATTR_OWNER: self._album_data.owner, ATTR_PEOPLE: list(self._album_data.people), } if self._album_data.thumbnail_asset_id: attrs[ATTR_THUMBNAIL_URL] = ( f"{self.coordinator.immich_url}/api/assets/" f"{self._album_data.thumbnail_asset_id}/thumbnail" ) return attrs class ImmichAlbumPhotoCountSensor(ImmichAlbumBaseSensor): """Sensor representing an Immich album photo count.""" _attr_state_class = SensorStateClass.MEASUREMENT _attr_icon = "mdi:image" _attr_translation_key = "album_photo_count" def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_photo_count" @property def native_value(self) -> int | None: if self._album_data: return self._album_data.photo_count return None class ImmichAlbumVideoCountSensor(ImmichAlbumBaseSensor): """Sensor representing an Immich album video count.""" _attr_state_class = SensorStateClass.MEASUREMENT _attr_icon = "mdi:video" _attr_translation_key = "album_video_count" def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_video_count" @property def native_value(self) -> int | None: if self._album_data: return self._album_data.video_count return None class ImmichAlbumLastUpdatedSensor(ImmichAlbumBaseSensor): """Sensor representing an Immich album last updated time.""" _attr_device_class = SensorDeviceClass.TIMESTAMP _attr_icon = "mdi:clock-outline" _attr_translation_key = "album_last_updated" def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_last_updated" @property def native_value(self) -> datetime | None: if self._album_data and self._album_data.updated_at: try: return datetime.fromisoformat( self._album_data.updated_at.replace("Z", "+00:00") ) except ValueError: return None return None class ImmichAlbumCreatedSensor(ImmichAlbumBaseSensor): """Sensor representing an Immich album creation date.""" _attr_device_class = SensorDeviceClass.TIMESTAMP _attr_icon = "mdi:calendar-plus" _attr_translation_key = "album_created" def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_created" @property def native_value(self) -> datetime | None: if self._album_data and self._album_data.created_at: try: return datetime.fromisoformat( self._album_data.created_at.replace("Z", "+00:00") ) except ValueError: return None return None class ImmichAlbumPublicUrlSensor(ImmichAlbumBaseSensor): """Sensor representing an Immich album public URL.""" _attr_icon = "mdi:link-variant" _attr_translation_key = "album_public_url" def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_public_url" @property def native_value(self) -> str | None: if self._album_data: return self.coordinator.get_public_url() return None @property def extra_state_attributes(self) -> dict[str, Any]: if not self._album_data: return {} attrs = { ATTR_ALBUM_ID: self._album_data.id, ATTR_SHARED: self._album_data.shared, } all_urls = self.coordinator.get_public_urls() if len(all_urls) > 1: attrs[ATTR_ALBUM_URLS] = all_urls links_info = self.coordinator.get_shared_links_info() if links_info: attrs["shared_links"] = links_info return attrs class ImmichAlbumProtectedUrlSensor(ImmichAlbumBaseSensor): """Sensor representing an Immich album password-protected URL.""" _attr_icon = "mdi:link-lock" _attr_translation_key = "album_protected_url" def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_protected_url" @property def native_value(self) -> str | None: if self._album_data: return self.coordinator.get_protected_url() return None @property def extra_state_attributes(self) -> dict[str, Any]: if not self._album_data: return {} attrs = {ATTR_ALBUM_ID: self._album_data.id} all_urls = self.coordinator.get_protected_urls() if len(all_urls) > 1: attrs["protected_urls"] = all_urls return attrs class ImmichAlbumProtectedPasswordSensor(ImmichAlbumBaseSensor): """Sensor representing an Immich album protected link password.""" _attr_icon = "mdi:key" _attr_translation_key = "album_protected_password" def __init__(self, coordinator, entry, subentry): super().__init__(coordinator, entry, subentry) self._attr_unique_id = f"{self._unique_id_prefix}_protected_password" @property def native_value(self) -> str | None: if self._album_data: return self.coordinator.get_protected_password() return None @property def extra_state_attributes(self) -> dict[str, Any]: if not self._album_data: return {} return { ATTR_ALBUM_ID: self._album_data.id, ATTR_ALBUM_PROTECTED_URL: self.coordinator.get_protected_url(), }