Files
haos-hacs-immich-album-watcher/custom_components/immich_album_watcher/config_flow.py
alexei.dolgolyov ab1c7ac0db
Some checks failed
Validate / Hassfest (push) Has been cancelled
Add HAOS-Server sync for optional centralized management (Phase 5)
Enable the HAOS integration to optionally connect to the standalone
Immich Watcher server for config sync and event reporting.

Server-side:
- New /api/sync/* endpoints: GET trackers, POST template render,
  POST event report
- API key auth via X-API-Key header (accepts JWT access tokens)

Integration-side:
- New sync.py: ServerSyncClient with graceful error handling
  (all methods return defaults on connection failure)
- Options flow: optional server_url and server_api_key fields
  with connection validation
- Coordinator: fire-and-forget event reporting to server when
  album changes are detected
- Translations: en.json and ru.json updated with new fields

The connection is fully additive -- the integration works identically
without a server URL configured. Server failures never break HA.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 14:10:29 +03:00

335 lines
11 KiB
Python

"""Config flow for Immich Album Watcher integration."""
from __future__ import annotations
import logging
from typing import Any
import aiohttp
import voluptuous as vol
from homeassistant.config_entries import (
ConfigEntry,
ConfigFlow,
ConfigFlowResult,
ConfigSubentryFlow,
OptionsFlow,
SubentryFlowResult,
)
from homeassistant.core import callback
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import (
CONF_ALBUM_ID,
CONF_ALBUM_NAME,
CONF_API_KEY,
CONF_HUB_NAME,
CONF_IMMICH_URL,
CONF_SCAN_INTERVAL,
CONF_SERVER_API_KEY,
CONF_SERVER_URL,
CONF_TELEGRAM_BOT_TOKEN,
CONF_TELEGRAM_CACHE_TTL,
DEFAULT_SCAN_INTERVAL,
DEFAULT_TELEGRAM_CACHE_TTL,
DOMAIN,
SUBENTRY_TYPE_ALBUM,
)
_LOGGER = logging.getLogger(__name__)
async def validate_connection(
session: aiohttp.ClientSession, url: str, api_key: str
) -> dict[str, Any]:
"""Validate the Immich connection and return server info."""
headers = {"x-api-key": api_key}
async with session.get(
f"{url.rstrip('/')}/api/server/ping", headers=headers
) as response:
if response.status == 401:
raise InvalidAuth
if response.status != 200:
raise CannotConnect
return await response.json()
async def fetch_albums(
session: aiohttp.ClientSession, url: str, api_key: str
) -> list[dict[str, Any]]:
"""Fetch all albums from Immich."""
headers = {"x-api-key": api_key}
async with session.get(
f"{url.rstrip('/')}/api/albums", headers=headers
) as response:
if response.status != 200:
raise CannotConnect
return await response.json()
class ImmichAlbumWatcherConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Immich Album Watcher."""
VERSION = 2
def __init__(self) -> None:
"""Initialize the config flow."""
self._url: str | None = None
self._api_key: str | None = None
@staticmethod
@callback
def async_get_options_flow(config_entry: ConfigEntry) -> OptionsFlow:
"""Get the options flow for this handler."""
return ImmichAlbumWatcherOptionsFlow(config_entry)
@classmethod
@callback
def async_get_supported_subentry_types(
cls, config_entry: ConfigEntry
) -> dict[str, type[ConfigSubentryFlow]]:
"""Return supported subentry types."""
return {SUBENTRY_TYPE_ALBUM: ImmichAlbumSubentryFlowHandler}
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step - connection details."""
errors: dict[str, str] = {}
if user_input is not None:
hub_name = user_input[CONF_HUB_NAME].strip()
self._url = user_input[CONF_IMMICH_URL].rstrip("/")
self._api_key = user_input[CONF_API_KEY]
session = async_get_clientsession(self.hass)
try:
await validate_connection(session, self._url, self._api_key)
# Set unique ID based on URL
await self.async_set_unique_id(self._url)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=hub_name,
data={
CONF_HUB_NAME: hub_name,
CONF_IMMICH_URL: self._url,
CONF_API_KEY: self._api_key,
},
options={
CONF_SCAN_INTERVAL: DEFAULT_SCAN_INTERVAL,
},
)
except InvalidAuth:
errors["base"] = "invalid_auth"
except CannotConnect:
errors["base"] = "cannot_connect"
except aiohttp.ClientError:
errors["base"] = "cannot_connect"
except Exception:
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_HUB_NAME, default="Immich"): str,
vol.Required(CONF_IMMICH_URL): str,
vol.Required(CONF_API_KEY): str,
}
),
errors=errors,
description_placeholders={
"docs_url": "https://immich.app/docs/features/command-line-interface#obtain-the-api-key"
},
)
class ImmichAlbumSubentryFlowHandler(ConfigSubentryFlow):
"""Handle subentry flow for adding albums."""
def __init__(self) -> None:
"""Initialize the subentry flow."""
super().__init__()
self._albums: list[dict[str, Any]] = []
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> SubentryFlowResult:
"""Handle album selection."""
errors: dict[str, str] = {}
# Get parent config entry data
config_entry = self._get_entry()
url = config_entry.data[CONF_IMMICH_URL]
api_key = config_entry.data[CONF_API_KEY]
# Fetch available albums
session = async_get_clientsession(self.hass)
try:
self._albums = await fetch_albums(session, url, api_key)
except Exception:
_LOGGER.exception("Failed to fetch albums")
errors["base"] = "cannot_connect"
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({}),
errors=errors,
)
if not self._albums:
return self.async_abort(reason="no_albums")
if user_input is not None:
album_id = user_input[CONF_ALBUM_ID]
# Check if album is already configured
for subentry in config_entry.subentries.values():
if subentry.data.get(CONF_ALBUM_ID) == album_id:
return self.async_abort(reason="album_already_configured")
# Find album name
album_name = "Unknown Album"
for album in self._albums:
if album["id"] == album_id:
album_name = album.get("albumName", "Unnamed")
break
return self.async_create_entry(
title=album_name,
data={
CONF_ALBUM_ID: album_id,
CONF_ALBUM_NAME: album_name,
},
)
# Get already configured album IDs
configured_albums = set()
for subentry in config_entry.subentries.values():
if aid := subentry.data.get(CONF_ALBUM_ID):
configured_albums.add(aid)
# Build album selection list (excluding already configured)
album_options = {
album["id"]: f"{album.get('albumName', 'Unnamed')} ({album.get('assetCount', 0)} assets)"
for album in self._albums
if album["id"] not in configured_albums
}
if not album_options:
return self.async_abort(reason="all_albums_configured")
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_ALBUM_ID): vol.In(album_options),
}
),
errors=errors,
)
class ImmichAlbumWatcherOptionsFlow(OptionsFlow):
"""Handle options flow for Immich Album Watcher."""
def __init__(self, config_entry: ConfigEntry) -> None:
"""Initialize options flow."""
self._config_entry = config_entry
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Manage the options."""
errors: dict[str, str] = {}
if user_input is not None:
# Validate server connection if URL is provided
server_url = user_input.get(CONF_SERVER_URL, "").strip()
server_api_key = user_input.get(CONF_SERVER_API_KEY, "").strip()
if server_url and server_api_key:
try:
session = async_get_clientsession(self.hass)
async with session.get(
f"{server_url.rstrip('/')}/api/health"
) as response:
if response.status != 200:
errors["base"] = "server_connect_failed"
except Exception:
errors["base"] = "server_connect_failed"
if not errors:
return self.async_create_entry(
title="",
data={
CONF_SCAN_INTERVAL: user_input.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
),
CONF_TELEGRAM_BOT_TOKEN: user_input.get(
CONF_TELEGRAM_BOT_TOKEN, ""
),
CONF_TELEGRAM_CACHE_TTL: user_input.get(
CONF_TELEGRAM_CACHE_TTL, DEFAULT_TELEGRAM_CACHE_TTL
),
CONF_SERVER_URL: server_url,
CONF_SERVER_API_KEY: server_api_key,
},
)
return self.async_show_form(
step_id="init",
data_schema=self._build_options_schema(),
)
def _build_options_schema(self) -> vol.Schema:
"""Build the options form schema."""
current_interval = self._config_entry.options.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
)
current_bot_token = self._config_entry.options.get(
CONF_TELEGRAM_BOT_TOKEN, ""
)
current_cache_ttl = self._config_entry.options.get(
CONF_TELEGRAM_CACHE_TTL, DEFAULT_TELEGRAM_CACHE_TTL
)
current_server_url = self._config_entry.options.get(
CONF_SERVER_URL, ""
)
current_server_api_key = self._config_entry.options.get(
CONF_SERVER_API_KEY, ""
)
return vol.Schema(
{
vol.Required(
CONF_SCAN_INTERVAL, default=current_interval
): vol.All(vol.Coerce(int), vol.Range(min=10, max=3600)),
vol.Optional(
CONF_TELEGRAM_BOT_TOKEN, default=current_bot_token
): str,
vol.Optional(
CONF_TELEGRAM_CACHE_TTL, default=current_cache_ttl
): vol.All(vol.Coerce(int), vol.Range(min=1, max=168)),
vol.Optional(
CONF_SERVER_URL, default=current_server_url,
description={"suggested_value": current_server_url},
): str,
vol.Optional(
CONF_SERVER_API_KEY, default=current_server_api_key,
): str,
}
)
class CannotConnect(Exception):
"""Error to indicate we cannot connect."""
class InvalidAuth(Exception):
"""Error to indicate there is invalid auth."""