Files
haos-hacs-immich-album-watcher/custom_components/immich_album_watcher/config_flow.py

276 lines
8.5 KiB
Python

"""Config flow for Immich Album Watcher integration."""
from __future__ import annotations
import logging
from typing import Any
import aiohttp
import voluptuous as vol
from homeassistant.config_entries import (
ConfigEntry,
ConfigFlow,
ConfigFlowResult,
ConfigSubentryFlow,
OptionsFlow,
SubentryFlowResult,
)
from homeassistant.core import callback
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import (
CONF_ALBUM_ID,
CONF_ALBUM_NAME,
CONF_API_KEY,
CONF_HUB_NAME,
CONF_IMMICH_URL,
CONF_SCAN_INTERVAL,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
SUBENTRY_TYPE_ALBUM,
)
_LOGGER = logging.getLogger(__name__)
async def validate_connection(
session: aiohttp.ClientSession, url: str, api_key: str
) -> dict[str, Any]:
"""Validate the Immich connection and return server info."""
headers = {"x-api-key": api_key}
async with session.get(
f"{url.rstrip('/')}/api/server/ping", headers=headers
) as response:
if response.status == 401:
raise InvalidAuth
if response.status != 200:
raise CannotConnect
return await response.json()
async def fetch_albums(
session: aiohttp.ClientSession, url: str, api_key: str
) -> list[dict[str, Any]]:
"""Fetch all albums from Immich."""
headers = {"x-api-key": api_key}
async with session.get(
f"{url.rstrip('/')}/api/albums", headers=headers
) as response:
if response.status != 200:
raise CannotConnect
return await response.json()
class ImmichAlbumWatcherConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Immich Album Watcher."""
VERSION = 2
def __init__(self) -> None:
"""Initialize the config flow."""
self._url: str | None = None
self._api_key: str | None = None
@staticmethod
@callback
def async_get_options_flow(config_entry: ConfigEntry) -> OptionsFlow:
"""Get the options flow for this handler."""
return ImmichAlbumWatcherOptionsFlow(config_entry)
@classmethod
@callback
def async_get_supported_subentry_types(
cls, config_entry: ConfigEntry
) -> dict[str, type[ConfigSubentryFlow]]:
"""Return supported subentry types."""
return {SUBENTRY_TYPE_ALBUM: ImmichAlbumSubentryFlowHandler}
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step - connection details."""
errors: dict[str, str] = {}
if user_input is not None:
hub_name = user_input[CONF_HUB_NAME].strip()
self._url = user_input[CONF_IMMICH_URL].rstrip("/")
self._api_key = user_input[CONF_API_KEY]
session = async_get_clientsession(self.hass)
try:
await validate_connection(session, self._url, self._api_key)
# Set unique ID based on URL
await self.async_set_unique_id(self._url)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=hub_name,
data={
CONF_HUB_NAME: hub_name,
CONF_IMMICH_URL: self._url,
CONF_API_KEY: self._api_key,
},
options={
CONF_SCAN_INTERVAL: DEFAULT_SCAN_INTERVAL,
},
)
except InvalidAuth:
errors["base"] = "invalid_auth"
except CannotConnect:
errors["base"] = "cannot_connect"
except aiohttp.ClientError:
errors["base"] = "cannot_connect"
except Exception:
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_HUB_NAME, default="Immich"): str,
vol.Required(CONF_IMMICH_URL): str,
vol.Required(CONF_API_KEY): str,
}
),
errors=errors,
description_placeholders={
"docs_url": "https://immich.app/docs/features/command-line-interface#obtain-the-api-key"
},
)
class ImmichAlbumSubentryFlowHandler(ConfigSubentryFlow):
"""Handle subentry flow for adding albums."""
def __init__(self) -> None:
"""Initialize the subentry flow."""
super().__init__()
self._albums: list[dict[str, Any]] = []
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> SubentryFlowResult:
"""Handle album selection."""
errors: dict[str, str] = {}
# Get parent config entry data
config_entry = self._get_entry()
url = config_entry.data[CONF_IMMICH_URL]
api_key = config_entry.data[CONF_API_KEY]
# Fetch available albums
session = async_get_clientsession(self.hass)
try:
self._albums = await fetch_albums(session, url, api_key)
except Exception:
_LOGGER.exception("Failed to fetch albums")
errors["base"] = "cannot_connect"
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({}),
errors=errors,
)
if not self._albums:
return self.async_abort(reason="no_albums")
if user_input is not None:
album_id = user_input[CONF_ALBUM_ID]
# Check if album is already configured
for subentry in config_entry.subentries.values():
if subentry.data.get(CONF_ALBUM_ID) == album_id:
return self.async_abort(reason="album_already_configured")
# Find album name
album_name = "Unknown Album"
for album in self._albums:
if album["id"] == album_id:
album_name = album.get("albumName", "Unnamed")
break
return self.async_create_entry(
title=album_name,
data={
CONF_ALBUM_ID: album_id,
CONF_ALBUM_NAME: album_name,
},
)
# Get already configured album IDs
configured_albums = set()
for subentry in config_entry.subentries.values():
if aid := subentry.data.get(CONF_ALBUM_ID):
configured_albums.add(aid)
# Build album selection list (excluding already configured)
album_options = {
album["id"]: f"{album.get('albumName', 'Unnamed')} ({album.get('assetCount', 0)} assets)"
for album in self._albums
if album["id"] not in configured_albums
}
if not album_options:
return self.async_abort(reason="all_albums_configured")
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_ALBUM_ID): vol.In(album_options),
}
),
errors=errors,
)
class ImmichAlbumWatcherOptionsFlow(OptionsFlow):
"""Handle options flow for Immich Album Watcher."""
def __init__(self, config_entry: ConfigEntry) -> None:
"""Initialize options flow."""
self._config_entry = config_entry
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Manage the options."""
if user_input is not None:
return self.async_create_entry(
title="",
data={
CONF_SCAN_INTERVAL: user_input.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
),
},
)
current_interval = self._config_entry.options.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
)
return self.async_show_form(
step_id="init",
data_schema=vol.Schema(
{
vol.Required(
CONF_SCAN_INTERVAL, default=current_interval
): vol.All(vol.Coerce(int), vol.Range(min=10, max=3600)),
}
),
)
class CannotConnect(Exception):
"""Error to indicate we cannot connect."""
class InvalidAuth(Exception):
"""Error to indicate there is invalid auth."""