Files
haos-hacs-immich-album-watcher/custom_components/immich_album_watcher/__init__.py
alexei.dolgolyov 381de98c40
Some checks failed
Validate / Hassfest (push) Has been cancelled
Comprehensive review fixes: security, performance, code quality, and UI polish
Backend: Fix CORS wildcard+credentials, add secret key warning, remove raw
API keys from sync endpoint, fix N+1 queries in watcher/sync, fix
AttributeError on event_types, delete dead scheduled.py/templates.py,
add limit cap on history, re-validate server on URL/key update, apply
tracking/template config IDs in update_target.

HA Integration: Replace datetime.now() with dt_util.now(), fix notification
queue to only remove successfully sent items, use album UUID for entity
unique IDs, add shared links dirty flag and users cache hourly refresh,
deduplicate _is_quiet_hours, add HTTP timeouts, cache albums in config
flow, change iot_class to local_polling.

Frontend: Make i18n reactive via $state (remove window.location.reload),
add Modal transitions/a11y/Escape key, create ConfirmModal replacing all
confirm() calls, add error handling to all pages, replace Unicode nav
icons with MDI SVGs, add card hover effects, dashboard stat icons, global
focus-visible styles, form slide transitions, mobile responsive bottom
nav, fix password error color, add ~20 i18n keys (EN/RU).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 18:34:31 +03:00

446 lines
15 KiB
Python

"""Immich Album Watcher integration for Home Assistant."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import datetime, time as dt_time
from homeassistant.config_entries import ConfigEntry, ConfigSubentry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.event import async_track_time_change
from homeassistant.util import dt as dt_util
from .const import (
CONF_ALBUM_ID,
CONF_ALBUM_NAME,
CONF_API_KEY,
CONF_HUB_NAME,
CONF_IMMICH_URL,
CONF_SCAN_INTERVAL,
CONF_SERVER_API_KEY,
CONF_SERVER_URL,
CONF_TELEGRAM_CACHE_TTL,
DEFAULT_SCAN_INTERVAL,
DEFAULT_TELEGRAM_CACHE_TTL,
DOMAIN,
PLATFORMS,
)
from .coordinator import ImmichAlbumWatcherCoordinator
from .storage import (
ImmichAlbumStorage,
NotificationQueue,
TelegramFileCache,
create_notification_queue,
create_telegram_cache,
)
_LOGGER = logging.getLogger(__name__)
@dataclass
class ImmichHubData:
"""Data for the Immich hub."""
name: str
url: str
api_key: str
scan_interval: int
telegram_cache_ttl: int
@dataclass
class ImmichAlbumRuntimeData:
"""Runtime data for an album subentry."""
coordinator: ImmichAlbumWatcherCoordinator
album_id: str
album_name: str
type ImmichConfigEntry = ConfigEntry[ImmichHubData]
async def async_setup_entry(hass: HomeAssistant, entry: ImmichConfigEntry) -> bool:
"""Set up Immich Album Watcher hub from a config entry."""
hass.data.setdefault(DOMAIN, {})
hub_name = entry.data.get(CONF_HUB_NAME, "Immich")
url = entry.data[CONF_IMMICH_URL]
api_key = entry.data[CONF_API_KEY]
scan_interval = entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
telegram_cache_ttl = entry.options.get(CONF_TELEGRAM_CACHE_TTL, DEFAULT_TELEGRAM_CACHE_TTL)
# Store hub data
entry.runtime_data = ImmichHubData(
name=hub_name,
url=url,
api_key=api_key,
scan_interval=scan_interval,
telegram_cache_ttl=telegram_cache_ttl,
)
# Create storage for persisting album state across restarts
storage = ImmichAlbumStorage(hass, entry.entry_id)
await storage.async_load()
# Create and load Telegram file caches once per hub (shared across all albums)
# TTL is in hours from config, convert to seconds
cache_ttl_seconds = telegram_cache_ttl * 60 * 60
# URL-based cache for non-Immich URLs or URLs without extractable asset IDs
telegram_cache = create_telegram_cache(hass, entry.entry_id, ttl_seconds=cache_ttl_seconds)
await telegram_cache.async_load()
# Asset ID-based cache for Immich URLs — uses thumbhash validation instead of TTL
telegram_asset_cache = create_telegram_cache(
hass, f"{entry.entry_id}_assets", use_thumbhash=True
)
await telegram_asset_cache.async_load()
# Create notification queue for quiet hours
notification_queue = create_notification_queue(hass, entry.entry_id)
await notification_queue.async_load()
# Create optional server sync client
server_url = entry.options.get(CONF_SERVER_URL, "")
server_api_key = entry.options.get(CONF_SERVER_API_KEY, "")
sync_client = None
if server_url and server_api_key:
from .sync import ServerSyncClient
sync_client = ServerSyncClient(hass, server_url, server_api_key)
_LOGGER.info("Server sync enabled: %s", server_url)
# Store hub reference
hass.data[DOMAIN][entry.entry_id] = {
"hub": entry.runtime_data,
"subentries": {},
"storage": storage,
"telegram_cache": telegram_cache,
"telegram_asset_cache": telegram_asset_cache,
"notification_queue": notification_queue,
"sync_client": sync_client,
"quiet_hours_unsubs": {}, # keyed by "HH:MM" end time
}
# Track loaded subentries to detect changes
hass.data[DOMAIN][entry.entry_id]["loaded_subentries"] = set(entry.subentries.keys())
# Set up coordinators for all subentries (albums)
for subentry_id, subentry in entry.subentries.items():
await _async_setup_subentry_coordinator(hass, entry, subentry)
# Forward platform setup once - platforms will iterate through subentries
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
# Check if there are queued notifications from before restart
if notification_queue.has_pending():
_register_queue_timers(hass, entry)
# Process any items whose quiet hours have already ended
hass.async_create_task(_process_ready_notifications(hass, entry))
# Register update listener for options and subentry changes
entry.async_on_unload(entry.add_update_listener(_async_update_listener))
_LOGGER.info(
"Immich Album Watcher hub set up successfully with %d albums",
len(entry.subentries),
)
return True
async def _async_setup_subentry_coordinator(
hass: HomeAssistant, entry: ImmichConfigEntry, subentry: ConfigSubentry
) -> None:
"""Set up coordinator for an album subentry."""
hub_data: ImmichHubData = entry.runtime_data
album_id = subentry.data[CONF_ALBUM_ID]
album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album")
storage: ImmichAlbumStorage = hass.data[DOMAIN][entry.entry_id]["storage"]
telegram_cache: TelegramFileCache = hass.data[DOMAIN][entry.entry_id]["telegram_cache"]
telegram_asset_cache: TelegramFileCache = hass.data[DOMAIN][entry.entry_id]["telegram_asset_cache"]
_LOGGER.debug("Setting up coordinator for album: %s (%s)", album_name, album_id)
# Create coordinator for this album
coordinator = ImmichAlbumWatcherCoordinator(
hass,
url=hub_data.url,
api_key=hub_data.api_key,
album_id=album_id,
album_name=album_name,
scan_interval=hub_data.scan_interval,
hub_name=hub_data.name,
storage=storage,
telegram_cache=telegram_cache,
telegram_asset_cache=telegram_asset_cache,
sync_client=hass.data[DOMAIN][entry.entry_id].get("sync_client"),
)
# Load persisted state before first refresh to detect changes during downtime
await coordinator.async_load_persisted_state()
# Fetch initial data
await coordinator.async_config_entry_first_refresh()
# Store subentry runtime data
subentry_data = ImmichAlbumRuntimeData(
coordinator=coordinator,
album_id=album_id,
album_name=album_name,
)
hass.data[DOMAIN][entry.entry_id]["subentries"][subentry.subentry_id] = subentry_data
_LOGGER.info("Coordinator for album '%s' set up successfully", album_name)
def _is_quiet_hours(start_str: str, end_str: str) -> bool:
"""Check if current time is within quiet hours."""
if not start_str or not end_str:
return False
try:
now = dt_util.now().time()
start_time = dt_time.fromisoformat(start_str)
end_time = dt_time.fromisoformat(end_str)
except ValueError:
return False
if start_time <= end_time:
return start_time <= now < end_time
else:
# Crosses midnight (e.g., 22:00 - 08:00)
return now >= start_time or now < end_time
def _register_queue_timers(hass: HomeAssistant, entry: ImmichConfigEntry) -> None:
"""Register timers for each unique quiet_hours_end in the queue."""
entry_data = hass.data[DOMAIN][entry.entry_id]
queue: NotificationQueue = entry_data["notification_queue"]
unsubs: dict[str, list] = entry_data["quiet_hours_unsubs"]
# Collect unique end times from queued items
end_times: set[str] = set()
for item in queue.get_all():
end_str = item.get("params", {}).get("quiet_hours_end", "")
if end_str:
end_times.add(end_str)
for end_str in end_times:
if end_str in unsubs:
continue # Timer already registered for this end time
try:
end_time = dt_time.fromisoformat(end_str)
except ValueError:
_LOGGER.warning("Invalid quiet hours end time in queue: %s", end_str)
continue
async def _on_quiet_hours_end(_now: datetime, _end_str: str = end_str) -> None:
"""Handle quiet hours end — process matching queued notifications."""
_LOGGER.info("Quiet hours ended (%s), processing queued notifications", _end_str)
await _process_notifications_for_end_time(hass, entry, _end_str)
unsub = async_track_time_change(
hass, _on_quiet_hours_end, hour=end_time.hour, minute=end_time.minute, second=0
)
unsubs[end_str] = unsub
entry.async_on_unload(unsub)
_LOGGER.debug("Registered quiet hours timer for %s", end_str)
def _unregister_queue_timer(hass: HomeAssistant, entry: ImmichConfigEntry, end_str: str) -> None:
"""Unregister a quiet hours timer if no more items need it."""
entry_data = hass.data[DOMAIN][entry.entry_id]
queue: NotificationQueue = entry_data["notification_queue"]
unsubs: dict[str, list] = entry_data["quiet_hours_unsubs"]
# Check if any remaining items still use this end time
for item in queue.get_all():
if item.get("params", {}).get("quiet_hours_end", "") == end_str:
return # Still needed
unsub = unsubs.pop(end_str, None)
if unsub:
unsub()
_LOGGER.debug("Unregistered quiet hours timer for %s (no more items)", end_str)
async def _process_ready_notifications(
hass: HomeAssistant, entry: ImmichConfigEntry
) -> None:
"""Process queued notifications whose quiet hours have already ended."""
entry_data = hass.data[DOMAIN].get(entry.entry_id)
if not entry_data:
return
queue: NotificationQueue = entry_data["notification_queue"]
items = queue.get_all()
if not items:
return
# Find items whose quiet hours have ended
ready_indices = []
for i, item in enumerate(items):
params = item.get("params", {})
start_str = params.get("quiet_hours_start", "")
end_str = params.get("quiet_hours_end", "")
if not _is_quiet_hours(start_str, end_str):
ready_indices.append(i)
if not ready_indices:
return
_LOGGER.info("Found %d queued notifications ready to send (quiet hours ended)", len(ready_indices))
await _send_queued_items(hass, entry, ready_indices)
async def _process_notifications_for_end_time(
hass: HomeAssistant, entry: ImmichConfigEntry, end_str: str
) -> None:
"""Process queued notifications matching a specific quiet_hours_end time."""
entry_data = hass.data[DOMAIN].get(entry.entry_id)
if not entry_data:
return
queue: NotificationQueue = entry_data["notification_queue"]
items = queue.get_all()
if not items:
return
# Find items matching this end time that are no longer in quiet hours
matching_indices = []
for i, item in enumerate(items):
params = item.get("params", {})
if params.get("quiet_hours_end", "") == end_str:
start_str = params.get("quiet_hours_start", "")
if not _is_quiet_hours(start_str, end_str):
matching_indices.append(i)
if not matching_indices:
return
_LOGGER.info("Processing %d queued notifications for quiet hours end %s", len(matching_indices), end_str)
await _send_queued_items(hass, entry, matching_indices)
# Clean up timer if no more items need it
_unregister_queue_timer(hass, entry, end_str)
async def _send_queued_items(
hass: HomeAssistant, entry: ImmichConfigEntry, indices: list[int]
) -> None:
"""Send specific queued notifications by index and remove them from the queue."""
import asyncio
from homeassistant.helpers import entity_registry as er
entry_data = hass.data[DOMAIN].get(entry.entry_id)
if not entry_data:
return
queue: NotificationQueue = entry_data["notification_queue"]
# Find a fallback sensor entity
ent_reg = er.async_get(hass)
fallback_entity_id = None
for ent in er.async_entries_for_config_entry(ent_reg, entry.entry_id):
if ent.domain == "sensor":
fallback_entity_id = ent.entity_id
break
if not fallback_entity_id:
_LOGGER.warning("No sensor entity found to process notification queue")
return
items = queue.get_all()
sent_count = 0
sent_indices = []
for i in indices:
if i >= len(items):
continue
params = dict(items[i].get("params", {}))
try:
target_entity_id = params.pop("entity_id", None) or fallback_entity_id
# Remove quiet hours params so the replay doesn't re-queue
params.pop("quiet_hours_start", None)
params.pop("quiet_hours_end", None)
await hass.services.async_call(
DOMAIN,
"send_telegram_notification",
params,
target={"entity_id": target_entity_id},
blocking=True,
)
sent_count += 1
sent_indices.append(i)
except Exception:
_LOGGER.exception("Failed to send queued notification %d", i + 1)
# Small delay between notifications to avoid rate limiting
await asyncio.sleep(1)
# Only remove successfully sent items (in reverse order to preserve indices)
if sent_indices:
await queue.async_remove_indices(sorted(sent_indices, reverse=True))
_LOGGER.info("Sent %d/%d queued notifications", sent_count, len(indices))
async def _async_update_listener(
hass: HomeAssistant, entry: ImmichConfigEntry
) -> None:
"""Handle config entry updates (options or subentry changes)."""
entry_data = hass.data[DOMAIN][entry.entry_id]
loaded_subentries = entry_data.get("loaded_subentries", set())
current_subentries = set(entry.subentries.keys())
# Check if subentries changed
if loaded_subentries != current_subentries:
_LOGGER.info(
"Subentries changed (loaded: %d, current: %d), reloading entry",
len(loaded_subentries),
len(current_subentries),
)
await hass.config_entries.async_reload(entry.entry_id)
return
# Handle options-only update
new_interval = entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
# Update hub data
entry.runtime_data.scan_interval = new_interval
# Rebuild sync client if server URL/key changed
server_url = entry.options.get(CONF_SERVER_URL, "")
server_api_key = entry.options.get(CONF_SERVER_API_KEY, "")
sync_client = None
if server_url and server_api_key:
from .sync import ServerSyncClient
sync_client = ServerSyncClient(hass, server_url, server_api_key)
entry_data["sync_client"] = sync_client
# Update all subentry coordinators
subentries_data = entry_data["subentries"]
for subentry_data in subentries_data.values():
subentry_data.coordinator.update_scan_interval(new_interval)
subentry_data.coordinator.update_sync_client(sync_client)
_LOGGER.info("Updated hub options (scan_interval=%d)", new_interval)
async def async_unload_entry(hass: HomeAssistant, entry: ImmichConfigEntry) -> bool:
"""Unload a config entry."""
# Cancel all quiet hours timers
entry_data = hass.data[DOMAIN].get(entry.entry_id, {})
for unsub in entry_data.get("quiet_hours_unsubs", {}).values():
unsub()
# Unload all platforms
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
# Clean up hub data
hass.data[DOMAIN].pop(entry.entry_id, None)
_LOGGER.info("Immich Album Watcher hub unloaded")
return unload_ok