Some checks failed
Validate / Hassfest (push) Has been cancelled
Fixes 5 issues identified by code-reviewer agent: 1. (Critical) EventLog.tracker_id now nullable - use None instead of 0 when tracker name doesn't match, avoiding FK constraint violations on PostgreSQL 2. (Critical) Replace jinja2.Environment with SandboxedEnvironment in all 3 server template rendering locations to prevent SSTI 3. (Important) Rebuild sync_client in _async_update_listener when server URL/key options change, propagate to all coordinators 4. (Important) Validate partial server config - require both URL and API key or neither, with clear error message 5. (Important) Name fire-and-forget sync task for debugging Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
443 lines
15 KiB
Python
443 lines
15 KiB
Python
"""Immich Album Watcher integration for Home Assistant."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, time as dt_time
|
|
|
|
from homeassistant.config_entries import ConfigEntry, ConfigSubentry
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.event import async_track_time_change
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from .const import (
|
|
CONF_ALBUM_ID,
|
|
CONF_ALBUM_NAME,
|
|
CONF_API_KEY,
|
|
CONF_HUB_NAME,
|
|
CONF_IMMICH_URL,
|
|
CONF_SCAN_INTERVAL,
|
|
CONF_SERVER_API_KEY,
|
|
CONF_SERVER_URL,
|
|
CONF_TELEGRAM_CACHE_TTL,
|
|
DEFAULT_SCAN_INTERVAL,
|
|
DEFAULT_TELEGRAM_CACHE_TTL,
|
|
DOMAIN,
|
|
PLATFORMS,
|
|
)
|
|
from .coordinator import ImmichAlbumWatcherCoordinator
|
|
from .storage import (
|
|
ImmichAlbumStorage,
|
|
NotificationQueue,
|
|
TelegramFileCache,
|
|
create_notification_queue,
|
|
create_telegram_cache,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class ImmichHubData:
|
|
"""Data for the Immich hub."""
|
|
|
|
name: str
|
|
url: str
|
|
api_key: str
|
|
scan_interval: int
|
|
telegram_cache_ttl: int
|
|
|
|
|
|
@dataclass
|
|
class ImmichAlbumRuntimeData:
|
|
"""Runtime data for an album subentry."""
|
|
|
|
coordinator: ImmichAlbumWatcherCoordinator
|
|
album_id: str
|
|
album_name: str
|
|
|
|
|
|
type ImmichConfigEntry = ConfigEntry[ImmichHubData]
|
|
|
|
|
|
async def async_setup_entry(hass: HomeAssistant, entry: ImmichConfigEntry) -> bool:
|
|
"""Set up Immich Album Watcher hub from a config entry."""
|
|
hass.data.setdefault(DOMAIN, {})
|
|
|
|
hub_name = entry.data.get(CONF_HUB_NAME, "Immich")
|
|
url = entry.data[CONF_IMMICH_URL]
|
|
api_key = entry.data[CONF_API_KEY]
|
|
scan_interval = entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
|
|
telegram_cache_ttl = entry.options.get(CONF_TELEGRAM_CACHE_TTL, DEFAULT_TELEGRAM_CACHE_TTL)
|
|
|
|
# Store hub data
|
|
entry.runtime_data = ImmichHubData(
|
|
name=hub_name,
|
|
url=url,
|
|
api_key=api_key,
|
|
scan_interval=scan_interval,
|
|
telegram_cache_ttl=telegram_cache_ttl,
|
|
)
|
|
|
|
# Create storage for persisting album state across restarts
|
|
storage = ImmichAlbumStorage(hass, entry.entry_id)
|
|
await storage.async_load()
|
|
|
|
# Create and load Telegram file caches once per hub (shared across all albums)
|
|
# TTL is in hours from config, convert to seconds
|
|
cache_ttl_seconds = telegram_cache_ttl * 60 * 60
|
|
# URL-based cache for non-Immich URLs or URLs without extractable asset IDs
|
|
telegram_cache = create_telegram_cache(hass, entry.entry_id, ttl_seconds=cache_ttl_seconds)
|
|
await telegram_cache.async_load()
|
|
# Asset ID-based cache for Immich URLs — uses thumbhash validation instead of TTL
|
|
telegram_asset_cache = create_telegram_cache(
|
|
hass, f"{entry.entry_id}_assets", use_thumbhash=True
|
|
)
|
|
await telegram_asset_cache.async_load()
|
|
|
|
# Create notification queue for quiet hours
|
|
notification_queue = create_notification_queue(hass, entry.entry_id)
|
|
await notification_queue.async_load()
|
|
|
|
# Create optional server sync client
|
|
server_url = entry.options.get(CONF_SERVER_URL, "")
|
|
server_api_key = entry.options.get(CONF_SERVER_API_KEY, "")
|
|
sync_client = None
|
|
if server_url and server_api_key:
|
|
from .sync import ServerSyncClient
|
|
sync_client = ServerSyncClient(hass, server_url, server_api_key)
|
|
_LOGGER.info("Server sync enabled: %s", server_url)
|
|
|
|
# Store hub reference
|
|
hass.data[DOMAIN][entry.entry_id] = {
|
|
"hub": entry.runtime_data,
|
|
"subentries": {},
|
|
"storage": storage,
|
|
"telegram_cache": telegram_cache,
|
|
"telegram_asset_cache": telegram_asset_cache,
|
|
"notification_queue": notification_queue,
|
|
"sync_client": sync_client,
|
|
"quiet_hours_unsubs": {}, # keyed by "HH:MM" end time
|
|
}
|
|
|
|
# Track loaded subentries to detect changes
|
|
hass.data[DOMAIN][entry.entry_id]["loaded_subentries"] = set(entry.subentries.keys())
|
|
|
|
# Set up coordinators for all subentries (albums)
|
|
for subentry_id, subentry in entry.subentries.items():
|
|
await _async_setup_subentry_coordinator(hass, entry, subentry)
|
|
|
|
# Forward platform setup once - platforms will iterate through subentries
|
|
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
|
|
|
# Check if there are queued notifications from before restart
|
|
if notification_queue.has_pending():
|
|
_register_queue_timers(hass, entry)
|
|
# Process any items whose quiet hours have already ended
|
|
hass.async_create_task(_process_ready_notifications(hass, entry))
|
|
|
|
# Register update listener for options and subentry changes
|
|
entry.async_on_unload(entry.add_update_listener(_async_update_listener))
|
|
|
|
_LOGGER.info(
|
|
"Immich Album Watcher hub set up successfully with %d albums",
|
|
len(entry.subentries),
|
|
)
|
|
|
|
return True
|
|
|
|
|
|
async def _async_setup_subentry_coordinator(
|
|
hass: HomeAssistant, entry: ImmichConfigEntry, subentry: ConfigSubentry
|
|
) -> None:
|
|
"""Set up coordinator for an album subentry."""
|
|
hub_data: ImmichHubData = entry.runtime_data
|
|
album_id = subentry.data[CONF_ALBUM_ID]
|
|
album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album")
|
|
storage: ImmichAlbumStorage = hass.data[DOMAIN][entry.entry_id]["storage"]
|
|
telegram_cache: TelegramFileCache = hass.data[DOMAIN][entry.entry_id]["telegram_cache"]
|
|
telegram_asset_cache: TelegramFileCache = hass.data[DOMAIN][entry.entry_id]["telegram_asset_cache"]
|
|
|
|
_LOGGER.debug("Setting up coordinator for album: %s (%s)", album_name, album_id)
|
|
|
|
# Create coordinator for this album
|
|
coordinator = ImmichAlbumWatcherCoordinator(
|
|
hass,
|
|
url=hub_data.url,
|
|
api_key=hub_data.api_key,
|
|
album_id=album_id,
|
|
album_name=album_name,
|
|
scan_interval=hub_data.scan_interval,
|
|
hub_name=hub_data.name,
|
|
storage=storage,
|
|
telegram_cache=telegram_cache,
|
|
telegram_asset_cache=telegram_asset_cache,
|
|
sync_client=hass.data[DOMAIN][entry.entry_id].get("sync_client"),
|
|
)
|
|
|
|
# Load persisted state before first refresh to detect changes during downtime
|
|
await coordinator.async_load_persisted_state()
|
|
|
|
# Fetch initial data
|
|
await coordinator.async_config_entry_first_refresh()
|
|
|
|
# Store subentry runtime data
|
|
subentry_data = ImmichAlbumRuntimeData(
|
|
coordinator=coordinator,
|
|
album_id=album_id,
|
|
album_name=album_name,
|
|
)
|
|
hass.data[DOMAIN][entry.entry_id]["subentries"][subentry.subentry_id] = subentry_data
|
|
|
|
_LOGGER.info("Coordinator for album '%s' set up successfully", album_name)
|
|
|
|
|
|
def _is_quiet_hours(start_str: str, end_str: str) -> bool:
|
|
"""Check if current time is within quiet hours."""
|
|
if not start_str or not end_str:
|
|
return False
|
|
|
|
try:
|
|
now = dt_util.now().time()
|
|
start_time = dt_time.fromisoformat(start_str)
|
|
end_time = dt_time.fromisoformat(end_str)
|
|
except ValueError:
|
|
return False
|
|
|
|
if start_time <= end_time:
|
|
return start_time <= now < end_time
|
|
else:
|
|
# Crosses midnight (e.g., 22:00 - 08:00)
|
|
return now >= start_time or now < end_time
|
|
|
|
|
|
def _register_queue_timers(hass: HomeAssistant, entry: ImmichConfigEntry) -> None:
|
|
"""Register timers for each unique quiet_hours_end in the queue."""
|
|
entry_data = hass.data[DOMAIN][entry.entry_id]
|
|
queue: NotificationQueue = entry_data["notification_queue"]
|
|
unsubs: dict[str, list] = entry_data["quiet_hours_unsubs"]
|
|
|
|
# Collect unique end times from queued items
|
|
end_times: set[str] = set()
|
|
for item in queue.get_all():
|
|
end_str = item.get("params", {}).get("quiet_hours_end", "")
|
|
if end_str:
|
|
end_times.add(end_str)
|
|
|
|
for end_str in end_times:
|
|
if end_str in unsubs:
|
|
continue # Timer already registered for this end time
|
|
|
|
try:
|
|
end_time = dt_time.fromisoformat(end_str)
|
|
except ValueError:
|
|
_LOGGER.warning("Invalid quiet hours end time in queue: %s", end_str)
|
|
continue
|
|
|
|
async def _on_quiet_hours_end(_now: datetime, _end_str: str = end_str) -> None:
|
|
"""Handle quiet hours end — process matching queued notifications."""
|
|
_LOGGER.info("Quiet hours ended (%s), processing queued notifications", _end_str)
|
|
await _process_notifications_for_end_time(hass, entry, _end_str)
|
|
|
|
unsub = async_track_time_change(
|
|
hass, _on_quiet_hours_end, hour=end_time.hour, minute=end_time.minute, second=0
|
|
)
|
|
unsubs[end_str] = unsub
|
|
entry.async_on_unload(unsub)
|
|
|
|
_LOGGER.debug("Registered quiet hours timer for %s", end_str)
|
|
|
|
|
|
def _unregister_queue_timer(hass: HomeAssistant, entry: ImmichConfigEntry, end_str: str) -> None:
|
|
"""Unregister a quiet hours timer if no more items need it."""
|
|
entry_data = hass.data[DOMAIN][entry.entry_id]
|
|
queue: NotificationQueue = entry_data["notification_queue"]
|
|
unsubs: dict[str, list] = entry_data["quiet_hours_unsubs"]
|
|
|
|
# Check if any remaining items still use this end time
|
|
for item in queue.get_all():
|
|
if item.get("params", {}).get("quiet_hours_end", "") == end_str:
|
|
return # Still needed
|
|
|
|
unsub = unsubs.pop(end_str, None)
|
|
if unsub:
|
|
unsub()
|
|
_LOGGER.debug("Unregistered quiet hours timer for %s (no more items)", end_str)
|
|
|
|
|
|
async def _process_ready_notifications(
|
|
hass: HomeAssistant, entry: ImmichConfigEntry
|
|
) -> None:
|
|
"""Process queued notifications whose quiet hours have already ended."""
|
|
entry_data = hass.data[DOMAIN].get(entry.entry_id)
|
|
if not entry_data:
|
|
return
|
|
|
|
queue: NotificationQueue = entry_data["notification_queue"]
|
|
items = queue.get_all()
|
|
if not items:
|
|
return
|
|
|
|
# Find items whose quiet hours have ended
|
|
ready_indices = []
|
|
for i, item in enumerate(items):
|
|
params = item.get("params", {})
|
|
start_str = params.get("quiet_hours_start", "")
|
|
end_str = params.get("quiet_hours_end", "")
|
|
if not _is_quiet_hours(start_str, end_str):
|
|
ready_indices.append(i)
|
|
|
|
if not ready_indices:
|
|
return
|
|
|
|
_LOGGER.info("Found %d queued notifications ready to send (quiet hours ended)", len(ready_indices))
|
|
await _send_queued_items(hass, entry, ready_indices)
|
|
|
|
|
|
async def _process_notifications_for_end_time(
|
|
hass: HomeAssistant, entry: ImmichConfigEntry, end_str: str
|
|
) -> None:
|
|
"""Process queued notifications matching a specific quiet_hours_end time."""
|
|
entry_data = hass.data[DOMAIN].get(entry.entry_id)
|
|
if not entry_data:
|
|
return
|
|
|
|
queue: NotificationQueue = entry_data["notification_queue"]
|
|
items = queue.get_all()
|
|
if not items:
|
|
return
|
|
|
|
# Find items matching this end time that are no longer in quiet hours
|
|
matching_indices = []
|
|
for i, item in enumerate(items):
|
|
params = item.get("params", {})
|
|
if params.get("quiet_hours_end", "") == end_str:
|
|
start_str = params.get("quiet_hours_start", "")
|
|
if not _is_quiet_hours(start_str, end_str):
|
|
matching_indices.append(i)
|
|
|
|
if not matching_indices:
|
|
return
|
|
|
|
_LOGGER.info("Processing %d queued notifications for quiet hours end %s", len(matching_indices), end_str)
|
|
await _send_queued_items(hass, entry, matching_indices)
|
|
|
|
# Clean up timer if no more items need it
|
|
_unregister_queue_timer(hass, entry, end_str)
|
|
|
|
|
|
async def _send_queued_items(
|
|
hass: HomeAssistant, entry: ImmichConfigEntry, indices: list[int]
|
|
) -> None:
|
|
"""Send specific queued notifications by index and remove them from the queue."""
|
|
import asyncio
|
|
from homeassistant.helpers import entity_registry as er
|
|
|
|
entry_data = hass.data[DOMAIN].get(entry.entry_id)
|
|
if not entry_data:
|
|
return
|
|
|
|
queue: NotificationQueue = entry_data["notification_queue"]
|
|
|
|
# Find a fallback sensor entity
|
|
ent_reg = er.async_get(hass)
|
|
fallback_entity_id = None
|
|
for ent in er.async_entries_for_config_entry(ent_reg, entry.entry_id):
|
|
if ent.domain == "sensor":
|
|
fallback_entity_id = ent.entity_id
|
|
break
|
|
|
|
if not fallback_entity_id:
|
|
_LOGGER.warning("No sensor entity found to process notification queue")
|
|
return
|
|
|
|
items = queue.get_all()
|
|
sent_count = 0
|
|
for i in indices:
|
|
if i >= len(items):
|
|
continue
|
|
params = dict(items[i].get("params", {}))
|
|
try:
|
|
target_entity_id = params.pop("entity_id", None) or fallback_entity_id
|
|
# Remove quiet hours params so the replay doesn't re-queue
|
|
params.pop("quiet_hours_start", None)
|
|
params.pop("quiet_hours_end", None)
|
|
await hass.services.async_call(
|
|
DOMAIN,
|
|
"send_telegram_notification",
|
|
params,
|
|
target={"entity_id": target_entity_id},
|
|
blocking=True,
|
|
)
|
|
sent_count += 1
|
|
except Exception:
|
|
_LOGGER.exception("Failed to send queued notification %d", i + 1)
|
|
|
|
# Small delay between notifications to avoid rate limiting
|
|
await asyncio.sleep(1)
|
|
|
|
# Remove sent items from queue (in reverse order to preserve indices)
|
|
await queue.async_remove_indices(sorted(indices, reverse=True))
|
|
_LOGGER.info("Sent %d/%d queued notifications", sent_count, len(indices))
|
|
|
|
|
|
async def _async_update_listener(
|
|
hass: HomeAssistant, entry: ImmichConfigEntry
|
|
) -> None:
|
|
"""Handle config entry updates (options or subentry changes)."""
|
|
entry_data = hass.data[DOMAIN][entry.entry_id]
|
|
loaded_subentries = entry_data.get("loaded_subentries", set())
|
|
current_subentries = set(entry.subentries.keys())
|
|
|
|
# Check if subentries changed
|
|
if loaded_subentries != current_subentries:
|
|
_LOGGER.info(
|
|
"Subentries changed (loaded: %d, current: %d), reloading entry",
|
|
len(loaded_subentries),
|
|
len(current_subentries),
|
|
)
|
|
await hass.config_entries.async_reload(entry.entry_id)
|
|
return
|
|
|
|
# Handle options-only update
|
|
new_interval = entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
|
|
|
|
# Update hub data
|
|
entry.runtime_data.scan_interval = new_interval
|
|
|
|
# Rebuild sync client if server URL/key changed
|
|
server_url = entry.options.get(CONF_SERVER_URL, "")
|
|
server_api_key = entry.options.get(CONF_SERVER_API_KEY, "")
|
|
sync_client = None
|
|
if server_url and server_api_key:
|
|
from .sync import ServerSyncClient
|
|
sync_client = ServerSyncClient(hass, server_url, server_api_key)
|
|
entry_data["sync_client"] = sync_client
|
|
|
|
# Update all subentry coordinators
|
|
subentries_data = entry_data["subentries"]
|
|
for subentry_data in subentries_data.values():
|
|
subentry_data.coordinator.update_scan_interval(new_interval)
|
|
subentry_data.coordinator._sync_client = sync_client
|
|
|
|
_LOGGER.info("Updated hub options (scan_interval=%d)", new_interval)
|
|
|
|
|
|
async def async_unload_entry(hass: HomeAssistant, entry: ImmichConfigEntry) -> bool:
|
|
"""Unload a config entry."""
|
|
# Cancel all quiet hours timers
|
|
entry_data = hass.data[DOMAIN].get(entry.entry_id, {})
|
|
for unsub in entry_data.get("quiet_hours_unsubs", {}).values():
|
|
unsub()
|
|
|
|
# Unload all platforms
|
|
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
|
|
|
|
if unload_ok:
|
|
# Clean up hub data
|
|
hass.data[DOMAIN].pop(entry.entry_id, None)
|
|
_LOGGER.info("Immich Album Watcher hub unloaded")
|
|
|
|
return unload_ok
|