"""Data update coordinator for LED Screen Controller.""" from __future__ import annotations import asyncio from datetime import datetime, timedelta import logging from typing import Any import aiohttp from homeassistant.core import HomeAssistant from homeassistant.exceptions import ConfigEntryAuthFailed from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from homeassistant.util import dt as dt_util from .const import ( DOMAIN, DEFAULT_TIMEOUT, ) # Boot-time jitter threshold: candidate boot_time is recomputed each poll from # `now - uptime_seconds`, so network latency and clock skew make it wobble by # sub-second amounts. Only update the cached value if the candidate differs by # more than this many seconds — anything larger almost certainly means the # server actually restarted. _BOOT_TIME_JITTER_S = 5.0 _LOGGER = logging.getLogger(__name__) class LedGrabCoordinator(DataUpdateCoordinator): """Class to manage fetching LED Screen Controller data.""" def __init__( self, hass: HomeAssistant, session: aiohttp.ClientSession, server_url: str, api_key: str, update_interval: timedelta, ) -> None: """Initialize the coordinator.""" self.server_url = server_url self.session = session self.api_key = api_key self.server_version = "unknown" self.boot_time: datetime | None = None self._auth_headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} self._timeout = aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT) # The aggregated snapshot does more server-side work than any single # endpoint (system metrics + cold-cache device brightness), so give it # a more generous ceiling than the per-request default. self._snapshot_timeout = aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT * 3) super().__init__( hass, _LOGGER, name=DOMAIN, update_interval=update_interval, ) async def _async_update_data(self) -> dict[str, Any]: """Fetch the full server snapshot in one request and reshape it. The server's ``/api/v1/snapshot`` endpoint aggregates everything a poll needs (targets + state + metrics, devices + brightness, the source / preset / clock lists, and the system block), so the coordinator no longer fans out per-target ``/state`` + ``/metrics`` or per-device ``/brightness`` requests. The reshaped result keeps the same structure the entity platforms consume. """ try: snapshot = await self._fetch_snapshot() except asyncio.TimeoutError as err: raise UpdateFailed(f"Timeout fetching data: {err}") from err except aiohttp.ClientResponseError as err: # A rotated/invalid API key fails every poll identically now that # it's a single request — surface it as an auth failure so Home # Assistant starts the reauth flow instead of retrying forever. if err.status in (401, 403): raise ConfigEntryAuthFailed("Invalid API key") from err raise UpdateFailed(f"Error communicating with API: {err}") from err except aiohttp.ClientError as err: raise UpdateFailed(f"Error communicating with API: {err}") from err system = snapshot.get("system") or {} health = system.get("health") or {} version = health.get("version") if version: self.server_version = version self._update_boot_time(health.get("uptime_seconds")) states = snapshot.get("target_states") or {} metrics = snapshot.get("target_metrics") or {} targets_data: dict[str, dict[str, Any]] = {} for target in snapshot.get("targets") or []: target_id = target.get("id") if not target_id: continue targets_data[target_id] = { "info": target, "state": states.get(target_id), "metrics": metrics.get(target_id), } brightness = snapshot.get("device_brightness") or {} devices_data: dict[str, dict[str, Any]] = {} for device in snapshot.get("devices") or []: device_id = device.get("id") if not device_id: continue devices_data[device_id] = { "info": device, "brightness": brightness.get(device_id), } return { "targets": targets_data, "devices": devices_data, "css_sources": snapshot.get("css_sources") or [], "value_sources": snapshot.get("value_sources") or [], "scene_presets": snapshot.get("scene_presets") or [], "scene_playlists": snapshot.get("scene_playlists") or [], "playlist_state": snapshot.get("playlist_state") or {}, "sync_clocks": snapshot.get("sync_clocks") or [], "server_version": self.server_version, "system": { "performance": system.get("performance"), "health": health or None, "boot_time": self.boot_time, "update": system.get("update"), }, } async def _fetch_snapshot(self) -> dict[str, Any]: """GET the aggregated server snapshot (single round trip).""" async with self.session.get( f"{self.server_url}/api/v1/snapshot", headers=self._auth_headers, timeout=self._snapshot_timeout, ) as resp: resp.raise_for_status() return await resp.json() async def apply_update(self) -> None: """Trigger the server's apply-update flow. The server downloads the available release if needed and shuts down once the new binaries are in place. We refresh straight away so the update entity reflects ``applying=True`` until the connection drops. """ async with self.session.post( f"{self.server_url}/api/v1/system/update/apply", headers=self._auth_headers, timeout=self._timeout, ) as resp: if resp.status not in (200, 202): body = await resp.text() _LOGGER.error( "Failed to apply update: %s %s", resp.status, body, ) resp.raise_for_status() await self.async_request_refresh() def _update_boot_time(self, uptime_seconds: float | None) -> None: """Recompute cached server boot time from /health uptime. Updates only when the candidate moves more than ``_BOOT_TIME_JITTER_S`` seconds, so the timestamp sensor stays stable across polls and the recorder doesn't see spurious state changes. """ if uptime_seconds is None: return candidate = dt_util.utcnow() - timedelta(seconds=float(uptime_seconds)) if ( self.boot_time is None or abs((self.boot_time - candidate).total_seconds()) > _BOOT_TIME_JITTER_S ): self.boot_time = candidate async def set_brightness(self, device_id: str, brightness: int) -> None: """Set brightness for a device.""" async with self.session.put( f"{self.server_url}/api/v1/devices/{device_id}/brightness", headers={**self._auth_headers, "Content-Type": "application/json"}, json={"brightness": brightness}, timeout=self._timeout, ) as resp: if resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to set brightness for device %s: %s %s", device_id, resp.status, body, ) resp.raise_for_status() await self.async_request_refresh() async def set_color(self, device_id: str, color: list[int] | None) -> None: """Set or clear the static color for a device.""" async with self.session.put( f"{self.server_url}/api/v1/devices/{device_id}/color", headers={**self._auth_headers, "Content-Type": "application/json"}, json={"color": color}, timeout=self._timeout, ) as resp: if resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to set color for device %s: %s %s", device_id, resp.status, body, ) resp.raise_for_status() await self.async_request_refresh() async def push_colors(self, source_id: str, colors: list[list[int]]) -> None: """Push flat color array to an api_input CSS source.""" async with self.session.post( f"{self.server_url}/api/v1/color-strip-sources/{source_id}/colors", headers={**self._auth_headers, "Content-Type": "application/json"}, json={"colors": colors}, timeout=self._timeout, ) as resp: if resp.status not in (200, 204): body = await resp.text() _LOGGER.error( "Failed to push colors to source %s: %s %s", source_id, resp.status, body, ) resp.raise_for_status() async def push_segments(self, source_id: str, segments: list[dict]) -> None: """Push segment data to an api_input CSS source.""" async with self.session.post( f"{self.server_url}/api/v1/color-strip-sources/{source_id}/colors", headers={**self._auth_headers, "Content-Type": "application/json"}, json={"segments": segments}, timeout=self._timeout, ) as resp: if resp.status not in (200, 204): body = await resp.text() _LOGGER.error( "Failed to push segments to source %s: %s %s", source_id, resp.status, body, ) resp.raise_for_status() async def activate_scene(self, preset_id: str) -> None: """Activate a scene preset.""" async with self.session.post( f"{self.server_url}/api/v1/scene-presets/{preset_id}/activate", headers=self._auth_headers, timeout=self._timeout, ) as resp: if resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to activate scene %s: %s %s", preset_id, resp.status, body, ) resp.raise_for_status() await self.async_request_refresh() async def start_playlist(self, playlist_id: str) -> None: """Start cycling a scene playlist. The server engine runs at most one playlist at a time, so starting this one transparently stops whichever playlist was running before. """ async with self.session.post( f"{self.server_url}/api/v1/scene-playlists/{playlist_id}/start", headers=self._auth_headers, timeout=self._timeout, ) as resp: if resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to start playlist %s: %s %s", playlist_id, resp.status, body, ) resp.raise_for_status() await self.async_request_refresh() async def stop_playlist(self) -> None: """Stop the active playlist (the last applied scene stays in place). The stop endpoint is global — it stops whichever playlist is cycling, matching the server's single-active-playlist model. """ async with self.session.post( f"{self.server_url}/api/v1/scene-playlists/stop", headers=self._auth_headers, timeout=self._timeout, ) as resp: if resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to stop playlist: %s %s", resp.status, body, ) resp.raise_for_status() await self.async_request_refresh() async def update_source(self, source_id: str, **kwargs: Any) -> None: """Update a color strip source's fields. The server uses a discriminated-union body keyed on ``source_type``; if the caller didn't supply it, look it up from the cached sources so the request validates server-side. """ if "source_type" not in kwargs and self.data: for source in self.data.get("css_sources", []): if source.get("id") == source_id: src_type = source.get("source_type") if src_type: kwargs["source_type"] = src_type break async with self.session.put( f"{self.server_url}/api/v1/color-strip-sources/{source_id}", headers={**self._auth_headers, "Content-Type": "application/json"}, json=kwargs, timeout=self._timeout, ) as resp: if resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to update source %s: %s %s", source_id, resp.status, body, ) resp.raise_for_status() async def update_target(self, target_id: str, **kwargs: Any) -> None: """Update an output target's fields.""" async with self.session.put( f"{self.server_url}/api/v1/output-targets/{target_id}", headers={**self._auth_headers, "Content-Type": "application/json"}, json=kwargs, timeout=self._timeout, ) as resp: if resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to update target %s: %s %s", target_id, resp.status, body, ) resp.raise_for_status() await self.async_request_refresh() async def start_processing(self, target_id: str) -> None: """Start processing for a target.""" async with self.session.post( f"{self.server_url}/api/v1/output-targets/{target_id}/start", headers=self._auth_headers, timeout=self._timeout, ) as resp: if resp.status == 409: _LOGGER.debug("Target %s already processing", target_id) elif resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to start target %s: %s %s", target_id, resp.status, body, ) resp.raise_for_status() await self.async_request_refresh() async def _sync_clock_action(self, clock_id: str, action: str) -> None: """POST a sync-clock control action (pause/resume/reset).""" async with self.session.post( f"{self.server_url}/api/v1/sync-clocks/{clock_id}/{action}", headers=self._auth_headers, timeout=self._timeout, ) as resp: if resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to %s sync clock %s: %s %s", action, clock_id, resp.status, body, ) resp.raise_for_status() await self.async_request_refresh() async def pause_sync_clock(self, clock_id: str) -> None: """Pause a sync clock (linked animations freeze).""" await self._sync_clock_action(clock_id, "pause") async def resume_sync_clock(self, clock_id: str) -> None: """Resume a paused sync clock.""" await self._sync_clock_action(clock_id, "resume") async def reset_sync_clock(self, clock_id: str) -> None: """Reset a sync clock to t=0 (linked animations restart).""" await self._sync_clock_action(clock_id, "reset") async def update_sync_clock(self, clock_id: str, **kwargs: Any) -> None: """Update a sync clock's persistent fields (name/speed/...).""" async with self.session.put( f"{self.server_url}/api/v1/sync-clocks/{clock_id}", headers={**self._auth_headers, "Content-Type": "application/json"}, json=kwargs, timeout=self._timeout, ) as resp: if resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to update sync clock %s: %s %s", clock_id, resp.status, body, ) resp.raise_for_status() await self.async_request_refresh() async def stop_processing(self, target_id: str) -> None: """Stop processing for a target.""" async with self.session.post( f"{self.server_url}/api/v1/output-targets/{target_id}/stop", headers=self._auth_headers, timeout=self._timeout, ) as resp: if resp.status == 409: _LOGGER.debug("Target %s already stopped", target_id) elif resp.status != 200: body = await resp.text() _LOGGER.error( "Failed to stop target %s: %s %s", target_id, resp.status, body, ) resp.raise_for_status() await self.async_request_refresh()