Files
haos-hacs-immich-album-watcher/custom_components/immich_album_watcher/sensor.py
alexei.dolgolyov b107cfe67f
Some checks failed
Validate / Hassfest (push) Has been cancelled
Refactor HAOS integration to use shared core library (Phase 2)
Wire the integration to delegate all HA-independent logic to
immich-watcher-core, eliminating ~2300 lines of duplicated code.

Changes:
- const.py: Import shared constants from core, keep HA-specific ones
- storage.py: Create HAStorageBackend adapter wrapping HA's Store,
  use core TelegramFileCache and NotificationQueue via adapter
- coordinator.py: Delegate to core ImmichClient for API calls,
  detect_album_changes() for change detection, and asset_utils
  for filtering/sorting/URL building. Keep HA-specific event firing.
- sensor.py: Replace ~1300 lines of Telegram code with 15-line
  delegation to core TelegramClient. Keep entity classes unchanged.
- __init__.py: Use factory functions for creating core instances
  with HA storage backends
- manifest.json: Add immich-watcher-core dependency

Integration line count: 3600 -> 1295 lines (-64%)
Zero behavior changes for end users.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 12:47:18 +03:00

630 lines
22 KiB
Python

"""Sensor platform for Immich Album Watcher."""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Any
import voluptuous as vol
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry, ConfigSubentry
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse, callback
from homeassistant.helpers import entity_platform
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import slugify
from immich_watcher_core.models import AlbumData
from immich_watcher_core.telegram.client import TelegramClient
from .const import (
ATTR_ALBUM_ID,
ATTR_ALBUM_NAME,
ATTR_ALBUM_PROTECTED_URL,
ATTR_ALBUM_URLS,
ATTR_ASSET_COUNT,
ATTR_CREATED_AT,
ATTR_LAST_UPDATED,
ATTR_OWNER,
ATTR_PEOPLE,
ATTR_PHOTO_COUNT,
ATTR_SHARED,
ATTR_THUMBNAIL_URL,
ATTR_VIDEO_COUNT,
CONF_ALBUM_ID,
CONF_ALBUM_NAME,
CONF_HUB_NAME,
CONF_TELEGRAM_BOT_TOKEN,
DOMAIN,
SERVICE_GET_ASSETS,
SERVICE_REFRESH,
SERVICE_SEND_TELEGRAM_NOTIFICATION,
)
from .coordinator import ImmichAlbumWatcherCoordinator
from .storage import NotificationQueue
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up sensor entities for all album subentries."""
entry_data = hass.data[DOMAIN][entry.entry_id]
for subentry_id, subentry in entry.subentries.items():
subentry_data = entry_data["subentries"].get(subentry_id)
if not subentry_data:
continue
coordinator = subentry_data.coordinator
entities = [
ImmichAlbumIdSensor(coordinator, entry, subentry),
ImmichAlbumAssetCountSensor(coordinator, entry, subentry),
ImmichAlbumPhotoCountSensor(coordinator, entry, subentry),
ImmichAlbumVideoCountSensor(coordinator, entry, subentry),
ImmichAlbumLastUpdatedSensor(coordinator, entry, subentry),
ImmichAlbumCreatedSensor(coordinator, entry, subentry),
ImmichAlbumPublicUrlSensor(coordinator, entry, subentry),
ImmichAlbumProtectedUrlSensor(coordinator, entry, subentry),
ImmichAlbumProtectedPasswordSensor(coordinator, entry, subentry),
]
async_add_entities(entities, config_subentry_id=subentry_id)
# Register entity services
platform = entity_platform.async_get_current_platform()
platform.async_register_entity_service(
SERVICE_REFRESH,
{},
"async_refresh_album",
)
platform.async_register_entity_service(
SERVICE_GET_ASSETS,
{
vol.Optional("limit", default=10): vol.All(
vol.Coerce(int), vol.Range(min=1, max=100)
),
vol.Optional("offset", default=0): vol.All(
vol.Coerce(int), vol.Range(min=0)
),
vol.Optional("favorite_only", default=False): bool,
vol.Optional("filter_min_rating", default=1): vol.All(
vol.Coerce(int), vol.Range(min=1, max=5)
),
vol.Optional("order_by", default="date"): vol.In(
["date", "rating", "name", "random"]
),
vol.Optional("order", default="descending"): vol.In(
["ascending", "descending"]
),
vol.Optional("asset_type", default="all"): vol.In(["all", "photo", "video"]),
vol.Optional("min_date"): str,
vol.Optional("max_date"): str,
vol.Optional("memory_date"): str,
vol.Optional("city"): str,
vol.Optional("state"): str,
vol.Optional("country"): str,
},
"async_get_assets",
supports_response=SupportsResponse.ONLY,
)
platform.async_register_entity_service(
SERVICE_SEND_TELEGRAM_NOTIFICATION,
{
vol.Optional("bot_token"): str,
vol.Required("chat_id"): vol.Coerce(str),
vol.Optional("assets"): list,
vol.Optional("caption"): str,
vol.Optional("reply_to_message_id"): vol.Coerce(int),
vol.Optional("disable_web_page_preview"): bool,
vol.Optional("parse_mode", default="HTML"): str,
vol.Optional("max_group_size", default=10): vol.All(
vol.Coerce(int), vol.Range(min=2, max=10)
),
vol.Optional("chunk_delay", default=0): vol.All(
vol.Coerce(int), vol.Range(min=0, max=60000)
),
vol.Optional("wait_for_response", default=True): bool,
vol.Optional("max_asset_data_size"): vol.All(
vol.Coerce(int), vol.Range(min=1, max=52428800)
),
vol.Optional("send_large_photos_as_documents", default=False): bool,
vol.Optional("chat_action", default="typing"): vol.Any(
None, vol.In(["", "typing", "upload_photo", "upload_video", "upload_document"])
),
vol.Optional("quiet_hours_start"): vol.Match(r"^\d{2}:\d{2}$"),
vol.Optional("quiet_hours_end"): vol.Match(r"^\d{2}:\d{2}$"),
},
"async_send_telegram_notification",
supports_response=SupportsResponse.OPTIONAL,
)
class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], SensorEntity):
"""Base sensor for Immich album."""
_attr_has_entity_name = True
def __init__(
self,
coordinator: ImmichAlbumWatcherCoordinator,
entry: ConfigEntry,
subentry: ConfigSubentry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator)
self._entry = entry
self._subentry = subentry
self._album_id = subentry.data[CONF_ALBUM_ID]
self._album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album")
self._hub_name = entry.data.get(CONF_HUB_NAME, "Immich")
self._unique_id_prefix = slugify(f"{self._hub_name}_album_{self._album_name}")
@property
def _album_data(self) -> AlbumData | None:
"""Get the album data from coordinator."""
return self.coordinator.data
@property
def available(self) -> bool:
"""Return if entity is available."""
return self.coordinator.last_update_success and self._album_data is not None
@property
def device_info(self) -> DeviceInfo:
"""Return device info - one device per album."""
return DeviceInfo(
identifiers={(DOMAIN, self._subentry.subentry_id)},
name=self._album_name,
manufacturer="Immich",
entry_type=DeviceEntryType.SERVICE,
)
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self.async_write_ha_state()
async def async_refresh_album(self) -> None:
"""Refresh data for this album."""
await self.coordinator.async_refresh_now()
async def async_get_assets(
self,
limit: int = 10,
offset: int = 0,
favorite_only: bool = False,
filter_min_rating: int = 1,
order_by: str = "date",
order: str = "descending",
asset_type: str = "all",
min_date: str | None = None,
max_date: str | None = None,
memory_date: str | None = None,
city: str | None = None,
state: str | None = None,
country: str | None = None,
) -> ServiceResponse:
"""Get assets for this album with optional filtering and ordering."""
assets = await self.coordinator.async_get_assets(
limit=limit,
offset=offset,
favorite_only=favorite_only,
filter_min_rating=filter_min_rating,
order_by=order_by,
order=order,
asset_type=asset_type,
min_date=min_date,
max_date=max_date,
memory_date=memory_date,
city=city,
state=state,
country=country,
)
return {"assets": assets}
@staticmethod
def _is_quiet_hours(start_str: str | None, end_str: str | None) -> bool:
"""Check if current time is within quiet hours."""
from datetime import time as dt_time
from homeassistant.util import dt as dt_util
if not start_str or not end_str:
return False
try:
now = dt_util.now().time()
start_time = dt_time.fromisoformat(start_str)
end_time = dt_time.fromisoformat(end_str)
except ValueError:
return False
if start_time <= end_time:
return start_time <= now < end_time
else:
return now >= start_time or now < end_time
async def async_send_telegram_notification(
self,
chat_id: str,
assets: list[dict[str, str]] | None = None,
bot_token: str | None = None,
caption: str | None = None,
reply_to_message_id: int | None = None,
disable_web_page_preview: bool | None = None,
parse_mode: str = "HTML",
max_group_size: int = 10,
chunk_delay: int = 0,
wait_for_response: bool = True,
max_asset_data_size: int | None = None,
send_large_photos_as_documents: bool = False,
chat_action: str | None = "typing",
quiet_hours_start: str | None = None,
quiet_hours_end: str | None = None,
) -> ServiceResponse:
"""Send notification to Telegram."""
# Check quiet hours — queue notification if active
if self._is_quiet_hours(quiet_hours_start, quiet_hours_end):
from . import _register_queue_timers
queue: NotificationQueue = self.hass.data[DOMAIN][self._entry.entry_id]["notification_queue"]
await queue.async_enqueue({
"entity_id": self.entity_id,
"chat_id": chat_id,
"assets": assets,
"bot_token": bot_token,
"caption": caption,
"reply_to_message_id": reply_to_message_id,
"disable_web_page_preview": disable_web_page_preview,
"parse_mode": parse_mode,
"max_group_size": max_group_size,
"chunk_delay": chunk_delay,
"max_asset_data_size": max_asset_data_size,
"send_large_photos_as_documents": send_large_photos_as_documents,
"chat_action": chat_action,
"quiet_hours_start": quiet_hours_start,
"quiet_hours_end": quiet_hours_end,
})
_register_queue_timers(self.hass, self._entry)
return {"success": True, "status": "queued_quiet_hours"}
# If non-blocking mode, create a background task and return immediately
if not wait_for_response:
self.hass.async_create_task(
self._execute_telegram_notification(
chat_id=chat_id,
assets=assets,
bot_token=bot_token,
caption=caption,
reply_to_message_id=reply_to_message_id,
disable_web_page_preview=disable_web_page_preview,
parse_mode=parse_mode,
max_group_size=max_group_size,
chunk_delay=chunk_delay,
max_asset_data_size=max_asset_data_size,
send_large_photos_as_documents=send_large_photos_as_documents,
chat_action=chat_action,
)
)
return {"success": True, "status": "queued", "message": "Notification queued for background processing"}
return await self._execute_telegram_notification(
chat_id=chat_id,
assets=assets,
bot_token=bot_token,
caption=caption,
reply_to_message_id=reply_to_message_id,
disable_web_page_preview=disable_web_page_preview,
parse_mode=parse_mode,
max_group_size=max_group_size,
chunk_delay=chunk_delay,
max_asset_data_size=max_asset_data_size,
send_large_photos_as_documents=send_large_photos_as_documents,
chat_action=chat_action,
)
async def _execute_telegram_notification(
self,
chat_id: str,
assets: list[dict[str, str]] | None = None,
bot_token: str | None = None,
caption: str | None = None,
reply_to_message_id: int | None = None,
disable_web_page_preview: bool | None = None,
parse_mode: str = "HTML",
max_group_size: int = 10,
chunk_delay: int = 0,
max_asset_data_size: int | None = None,
send_large_photos_as_documents: bool = False,
chat_action: str | None = "typing",
) -> ServiceResponse:
"""Execute the Telegram notification using core TelegramClient."""
# Get bot token from parameter or config
token = bot_token or self._entry.options.get(CONF_TELEGRAM_BOT_TOKEN)
if not token:
return {
"success": False,
"error": "No bot token provided. Set it in integration options or pass as parameter.",
}
session = async_get_clientsession(self.hass)
# Create core TelegramClient with HA-managed session and coordinator caches
telegram = TelegramClient(
session,
token,
url_cache=self.coordinator.telegram_cache,
asset_cache=self.coordinator.telegram_asset_cache,
url_resolver=self.coordinator.get_internal_download_url,
thumbhash_resolver=self.coordinator.get_asset_thumbhash,
)
return await telegram.send_notification(
chat_id=chat_id,
assets=assets,
caption=caption,
reply_to_message_id=reply_to_message_id,
disable_web_page_preview=disable_web_page_preview,
parse_mode=parse_mode,
max_group_size=max_group_size,
chunk_delay=chunk_delay,
max_asset_data_size=max_asset_data_size,
send_large_photos_as_documents=send_large_photos_as_documents,
chat_action=chat_action,
)
class ImmichAlbumIdSensor(ImmichAlbumBaseSensor):
"""Sensor exposing the Immich album ID."""
_attr_icon = "mdi:identifier"
_attr_translation_key = "album_id"
def __init__(self, coordinator, entry, subentry):
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_album_id"
@property
def native_value(self) -> str | None:
if self._album_data:
return self._album_data.id
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
if not self._album_data:
return {}
attrs: dict[str, Any] = {
ATTR_ALBUM_NAME: self._album_data.name,
ATTR_ASSET_COUNT: self._album_data.asset_count,
ATTR_LAST_UPDATED: self._album_data.updated_at,
ATTR_CREATED_AT: self._album_data.created_at,
}
share_url = self.coordinator.get_any_url()
if share_url:
attrs["share_url"] = share_url
return attrs
class ImmichAlbumAssetCountSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album asset count."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_icon = "mdi:image-album"
_attr_translation_key = "album_asset_count"
def __init__(self, coordinator, entry, subentry):
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_asset_count"
@property
def native_value(self) -> int | None:
if self._album_data:
return self._album_data.asset_count
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
if not self._album_data:
return {}
attrs = {
ATTR_ALBUM_ID: self._album_data.id,
ATTR_ASSET_COUNT: self._album_data.asset_count,
ATTR_PHOTO_COUNT: self._album_data.photo_count,
ATTR_VIDEO_COUNT: self._album_data.video_count,
ATTR_LAST_UPDATED: self._album_data.updated_at,
ATTR_CREATED_AT: self._album_data.created_at,
ATTR_SHARED: self._album_data.shared,
ATTR_OWNER: self._album_data.owner,
ATTR_PEOPLE: list(self._album_data.people),
}
if self._album_data.thumbnail_asset_id:
attrs[ATTR_THUMBNAIL_URL] = (
f"{self.coordinator.immich_url}/api/assets/"
f"{self._album_data.thumbnail_asset_id}/thumbnail"
)
return attrs
class ImmichAlbumPhotoCountSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album photo count."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_icon = "mdi:image"
_attr_translation_key = "album_photo_count"
def __init__(self, coordinator, entry, subentry):
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_photo_count"
@property
def native_value(self) -> int | None:
if self._album_data:
return self._album_data.photo_count
return None
class ImmichAlbumVideoCountSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album video count."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_icon = "mdi:video"
_attr_translation_key = "album_video_count"
def __init__(self, coordinator, entry, subentry):
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_video_count"
@property
def native_value(self) -> int | None:
if self._album_data:
return self._album_data.video_count
return None
class ImmichAlbumLastUpdatedSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album last updated time."""
_attr_device_class = SensorDeviceClass.TIMESTAMP
_attr_icon = "mdi:clock-outline"
_attr_translation_key = "album_last_updated"
def __init__(self, coordinator, entry, subentry):
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_last_updated"
@property
def native_value(self) -> datetime | None:
if self._album_data and self._album_data.updated_at:
try:
return datetime.fromisoformat(
self._album_data.updated_at.replace("Z", "+00:00")
)
except ValueError:
return None
return None
class ImmichAlbumCreatedSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album creation date."""
_attr_device_class = SensorDeviceClass.TIMESTAMP
_attr_icon = "mdi:calendar-plus"
_attr_translation_key = "album_created"
def __init__(self, coordinator, entry, subentry):
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_created"
@property
def native_value(self) -> datetime | None:
if self._album_data and self._album_data.created_at:
try:
return datetime.fromisoformat(
self._album_data.created_at.replace("Z", "+00:00")
)
except ValueError:
return None
return None
class ImmichAlbumPublicUrlSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album public URL."""
_attr_icon = "mdi:link-variant"
_attr_translation_key = "album_public_url"
def __init__(self, coordinator, entry, subentry):
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_public_url"
@property
def native_value(self) -> str | None:
if self._album_data:
return self.coordinator.get_public_url()
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
if not self._album_data:
return {}
attrs = {
ATTR_ALBUM_ID: self._album_data.id,
ATTR_SHARED: self._album_data.shared,
}
all_urls = self.coordinator.get_public_urls()
if len(all_urls) > 1:
attrs[ATTR_ALBUM_URLS] = all_urls
links_info = self.coordinator.get_shared_links_info()
if links_info:
attrs["shared_links"] = links_info
return attrs
class ImmichAlbumProtectedUrlSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album password-protected URL."""
_attr_icon = "mdi:link-lock"
_attr_translation_key = "album_protected_url"
def __init__(self, coordinator, entry, subentry):
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_protected_url"
@property
def native_value(self) -> str | None:
if self._album_data:
return self.coordinator.get_protected_url()
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
if not self._album_data:
return {}
attrs = {ATTR_ALBUM_ID: self._album_data.id}
all_urls = self.coordinator.get_protected_urls()
if len(all_urls) > 1:
attrs["protected_urls"] = all_urls
return attrs
class ImmichAlbumProtectedPasswordSensor(ImmichAlbumBaseSensor):
"""Sensor representing an Immich album protected link password."""
_attr_icon = "mdi:key"
_attr_translation_key = "album_protected_password"
def __init__(self, coordinator, entry, subentry):
super().__init__(coordinator, entry, subentry)
self._attr_unique_id = f"{self._unique_id_prefix}_protected_password"
@property
def native_value(self) -> str | None:
if self._album_data:
return self.coordinator.get_protected_password()
return None
@property
def extra_state_attributes(self) -> dict[str, Any]:
if not self._album_data:
return {}
return {
ATTR_ALBUM_ID: self._album_data.id,
ATTR_ALBUM_PROTECTED_URL: self.coordinator.get_protected_url(),
}