"""Data update coordinator for LED Screen Controller.""" from __future__ import annotations import asyncio from datetime import timedelta import logging from typing import Any import aiohttp from homeassistant.core import HomeAssistant from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from .const import ( DOMAIN, DEFAULT_TIMEOUT, ) _LOGGER = logging.getLogger(__name__) class LedGrabCoordinator(DataUpdateCoordinator): """Class to manage fetching LED Screen Controller data.""" def __init__( self, hass: HomeAssistant, session: aiohttp.ClientSession, server_url: str, api_key: str, update_interval: timedelta, ) -> None: """Initialize the coordinator.""" self.server_url = server_url self.session = session self.api_key = api_key self.server_version = "unknown" self._auth_headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} self._timeout = aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT) super().__init__( hass, _LOGGER, name=DOMAIN, update_interval=update_interval, ) async def _async_update_data(self) -> dict[str, Any]: """Fetch data from API.""" try: async with asyncio.timeout(DEFAULT_TIMEOUT * 3): if self.server_version == "unknown": await self._fetch_server_version() targets_list = await self._fetch_targets() # Fetch state and metrics for all targets in parallel targets_data: dict[str, dict[str, Any]] = {} async def fetch_target_data(target: dict) -> tuple[str, dict]: target_id = target["id"] try: state, metrics = await asyncio.gather( self._fetch_target_state(target_id), self._fetch_target_metrics(target_id), ) except Exception as err: _LOGGER.warning( "Failed to fetch data for target %s: %s", target_id, err, ) state = None metrics = None return target_id, { "info": target, "state": state, "metrics": metrics, } results = await asyncio.gather( *(fetch_target_data(t) for t in targets_list), return_exceptions=True, ) for r in results: if isinstance(r, Exception): _LOGGER.warning("Target fetch failed: %s", r) continue target_id, data = r targets_data[target_id] = data # Fetch devices, CSS sources, value sources, and scene presets in parallel devices_data, css_sources, value_sources, scene_presets = await asyncio.gather( self._fetch_devices(), self._fetch_css_sources(), self._fetch_value_sources(), self._fetch_scene_presets(), ) return { "targets": targets_data, "devices": devices_data, "css_sources": css_sources, "value_sources": value_sources, "scene_presets": scene_presets, "server_version": self.server_version, } except asyncio.TimeoutError as err: raise UpdateFailed(f"Timeout fetching data: {err}") from err except aiohttp.ClientError as err: raise UpdateFailed(f"Error communicating with API: {err}") from err async def _fetch_server_version(self) -> None: """Fetch server version from health endpoint.""" try: async with self.session.get( f"{self.server_url}/health", timeout=self._timeout, ) as resp: resp.raise_for_status() data = await resp.json() self.server_version = data.get("version", "unknown") except Exception as err: _LOGGER.warning("Failed to fetch server version: %s", err) self.server_version = "unknown" async def _fetch_targets(self) -> list[dict[str, Any]]: """Fetch all output targets.""" async with self.session.get( f"{self.server_url}/api/v1/output-targets", headers=self._auth_headers, timeout=self._timeout, ) as resp: resp.raise_for_status() data = await resp.json() return data.get("targets", []) async def _fetch_target_state(self, target_id: str) -> dict[str, Any]: """Fetch target processing state.""" async with self.session.get( f"{self.server_url}/api/v1/output-targets/{target_id}/state", headers=self._auth_headers, timeout=self._timeout, ) as resp: resp.raise_for_status() return await resp.json() async def _fetch_target_metrics(self, target_id: str) -> dict[str, Any]: """Fetch target metrics.""" async with self.session.get( f"{self.server_url}/api/v1/output-targets/{target_id}/metrics", headers=self._auth_headers, timeout=self._timeout, ) as resp: resp.raise_for_status() return await resp.json() async def _fetch_devices(self) -> dict[str, dict[str, Any]]: """Fetch all devices with capabilities and brightness.""" try: async with self.session.get( f"{self.server_url}/api/v1/devices", headers=self._auth_headers, timeout=self._timeout, ) as resp: resp.raise_for_status() data = await resp.json() devices = data.get("devices", []) except Exception as err: _LOGGER.warning("Failed to fetch devices: %s", err) return {} # Fetch brightness for all capable devices in parallel async def fetch_device_entry(device: dict) -> tuple[str, dict[str, Any]]: device_id = device["id"] entry: dict[str, Any] = {"info": device, "brightness": None} if "brightness_control" in (device.get("capabilities") or []): try: async with self.session.get( f"{self.server_url}/api/v1/devices/{device_id}/brightness", headers=self._auth_headers, timeout=self._timeout, ) as resp: if resp.status == 200: bri_data = await resp.json() entry["brightness"] = bri_data.get("brightness") except Exception as err: _LOGGER.warning( "Failed to fetch brightness for device %s: %s", device_id, err, ) return device_id, entry results = await asyncio.gather( *(fetch_device_entry(d) for d in devices), return_exceptions=True, ) devices_data: dict[str, dict[str, Any]] = {} for r in results: if isinstance(r, Exception): _LOGGER.warning("Device fetch failed: %s", r) continue device_id, entry = r devices_data[device_id] = entry return devices_data async def set_brightness(self, device_id: str, brightness: int) -> None: """Set brightness for a device.""" async with self.session.put( f"{self.server_url}/api/v1/devices/{device_id}/brightness", headers={**self._auth_headers, "Content-Type": "application/json"}, json={"brightness": brightness}, timeout=self._timeout, ) as resp: if resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to set brightness for device %s: %s %s", device_id, resp.status, body, ) resp.raise_for_status() await self.async_request_refresh() async def set_color(self, device_id: str, color: list[int] | None) -> None: """Set or clear the static color for a device.""" async with self.session.put( f"{self.server_url}/api/v1/devices/{device_id}/color", headers={**self._auth_headers, "Content-Type": "application/json"}, json={"color": color}, timeout=self._timeout, ) as resp: if resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to set color for device %s: %s %s", device_id, resp.status, body, ) resp.raise_for_status() await self.async_request_refresh() async def _fetch_css_sources(self) -> list[dict[str, Any]]: """Fetch all color strip sources.""" try: async with self.session.get( f"{self.server_url}/api/v1/color-strip-sources", headers=self._auth_headers, timeout=self._timeout, ) as resp: resp.raise_for_status() data = await resp.json() return data.get("sources", []) except Exception as err: _LOGGER.warning("Failed to fetch CSS sources: %s", err) return [] async def _fetch_value_sources(self) -> list[dict[str, Any]]: """Fetch all value sources.""" try: async with self.session.get( f"{self.server_url}/api/v1/value-sources", headers=self._auth_headers, timeout=self._timeout, ) as resp: resp.raise_for_status() data = await resp.json() return data.get("sources", []) except Exception as err: _LOGGER.warning("Failed to fetch value sources: %s", err) return [] async def _fetch_scene_presets(self) -> list[dict[str, Any]]: """Fetch all scene presets.""" try: async with self.session.get( f"{self.server_url}/api/v1/scene-presets", headers=self._auth_headers, timeout=self._timeout, ) as resp: resp.raise_for_status() data = await resp.json() return data.get("presets", []) except Exception as err: _LOGGER.warning("Failed to fetch scene presets: %s", err) return [] async def push_colors(self, source_id: str, colors: list[list[int]]) -> None: """Push flat color array to an api_input CSS source.""" async with self.session.post( f"{self.server_url}/api/v1/color-strip-sources/{source_id}/colors", headers={**self._auth_headers, "Content-Type": "application/json"}, json={"colors": colors}, timeout=self._timeout, ) as resp: if resp.status not in (200, 204): body = await resp.text() _LOGGER.error( "Failed to push colors to source %s: %s %s", source_id, resp.status, body, ) resp.raise_for_status() async def push_segments(self, source_id: str, segments: list[dict]) -> None: """Push segment data to an api_input CSS source.""" async with self.session.post( f"{self.server_url}/api/v1/color-strip-sources/{source_id}/colors", headers={**self._auth_headers, "Content-Type": "application/json"}, json={"segments": segments}, timeout=self._timeout, ) as resp: if resp.status not in (200, 204): body = await resp.text() _LOGGER.error( "Failed to push segments to source %s: %s %s", source_id, resp.status, body, ) resp.raise_for_status() async def activate_scene(self, preset_id: str) -> None: """Activate a scene preset.""" async with self.session.post( f"{self.server_url}/api/v1/scene-presets/{preset_id}/activate", headers=self._auth_headers, timeout=self._timeout, ) as resp: if resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to activate scene %s: %s %s", preset_id, resp.status, body, ) resp.raise_for_status() await self.async_request_refresh() async def update_source(self, source_id: str, **kwargs: Any) -> None: """Update a color strip source's fields.""" async with self.session.put( f"{self.server_url}/api/v1/color-strip-sources/{source_id}", headers={**self._auth_headers, "Content-Type": "application/json"}, json=kwargs, timeout=self._timeout, ) as resp: if resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to update source %s: %s %s", source_id, resp.status, body, ) resp.raise_for_status() async def update_target(self, target_id: str, **kwargs: Any) -> None: """Update an output target's fields.""" async with self.session.put( f"{self.server_url}/api/v1/output-targets/{target_id}", headers={**self._auth_headers, "Content-Type": "application/json"}, json=kwargs, timeout=self._timeout, ) as resp: if resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to update target %s: %s %s", target_id, resp.status, body, ) resp.raise_for_status() await self.async_request_refresh() async def start_processing(self, target_id: str) -> None: """Start processing for a target.""" async with self.session.post( f"{self.server_url}/api/v1/output-targets/{target_id}/start", headers=self._auth_headers, timeout=self._timeout, ) as resp: if resp.status == 409: _LOGGER.debug("Target %s already processing", target_id) elif resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to start target %s: %s %s", target_id, resp.status, body, ) resp.raise_for_status() await self.async_request_refresh() async def stop_processing(self, target_id: str) -> None: """Stop processing for a target.""" async with self.session.post( f"{self.server_url}/api/v1/output-targets/{target_id}/stop", headers=self._auth_headers, timeout=self._timeout, ) as resp: if resp.status == 409: _LOGGER.debug("Target %s already stopped", target_id) elif resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to stop target %s: %s %s", target_id, resp.status, body, ) resp.raise_for_status() await self.async_request_refresh()