705616f8b0
One device per scene playlist (model "Scene Playlist"), each with: - a switch (start/stop; the server cycles one playlist at a time, so starting one stops any other) - a "Current Scene" sensor that resolves the active preset name from scene_presets while this playlist is cycling - an "Items" diagnostic sensor (configured scene count; no state_class) The coordinator reads scene_playlists plus the flat playlist_state from the /api/v1/snapshot payload and gains start_playlist()/stop_playlist(); __init__ registers and prunes per-playlist devices and reloads on playlist-id changes; the event listener also refreshes on the playlist_state_changed WS event. Shared device/lookup/running-state plumbing lives in a new LedGrabPlaylistEntity base (entity.py) used by both the switch and the sensors. Adds en/ru translation keys. Note: this also lands the in-progress coordinator migration from the per-request fan-out to the single /api/v1/snapshot poll that was already present in the working tree. Requires the led-grab server /api/v1/snapshot to emit scene_playlists + playlist_state (companion server change, tracked separately).
458 lines
18 KiB
Python
458 lines
18 KiB
Python
"""Data update coordinator for LED Screen Controller."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
import logging
|
|
from typing import Any
|
|
|
|
import aiohttp
|
|
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.exceptions import ConfigEntryAuthFailed
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from .const import (
|
|
DOMAIN,
|
|
DEFAULT_TIMEOUT,
|
|
)
|
|
|
|
# Boot-time jitter threshold: candidate boot_time is recomputed each poll from
|
|
# `now - uptime_seconds`, so network latency and clock skew make it wobble by
|
|
# sub-second amounts. Only update the cached value if the candidate differs by
|
|
# more than this many seconds — anything larger almost certainly means the
|
|
# server actually restarted.
|
|
_BOOT_TIME_JITTER_S = 5.0
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class LedGrabCoordinator(DataUpdateCoordinator):
|
|
"""Class to manage fetching LED Screen Controller data."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
session: aiohttp.ClientSession,
|
|
server_url: str,
|
|
api_key: str,
|
|
update_interval: timedelta,
|
|
) -> None:
|
|
"""Initialize the coordinator."""
|
|
self.server_url = server_url
|
|
self.session = session
|
|
self.api_key = api_key
|
|
self.server_version = "unknown"
|
|
self.boot_time: datetime | None = None
|
|
self._auth_headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
|
|
self._timeout = aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT)
|
|
# The aggregated snapshot does more server-side work than any single
|
|
# endpoint (system metrics + cold-cache device brightness), so give it
|
|
# a more generous ceiling than the per-request default.
|
|
self._snapshot_timeout = aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT * 3)
|
|
|
|
super().__init__(
|
|
hass,
|
|
_LOGGER,
|
|
name=DOMAIN,
|
|
update_interval=update_interval,
|
|
)
|
|
|
|
async def _async_update_data(self) -> dict[str, Any]:
|
|
"""Fetch the full server snapshot in one request and reshape it.
|
|
|
|
The server's ``/api/v1/snapshot`` endpoint aggregates everything a poll
|
|
needs (targets + state + metrics, devices + brightness, the source /
|
|
preset / clock lists, and the system block), so the coordinator no
|
|
longer fans out per-target ``/state`` + ``/metrics`` or per-device
|
|
``/brightness`` requests. The reshaped result keeps the same structure
|
|
the entity platforms consume.
|
|
"""
|
|
try:
|
|
snapshot = await self._fetch_snapshot()
|
|
except asyncio.TimeoutError as err:
|
|
raise UpdateFailed(f"Timeout fetching data: {err}") from err
|
|
except aiohttp.ClientResponseError as err:
|
|
# A rotated/invalid API key fails every poll identically now that
|
|
# it's a single request — surface it as an auth failure so Home
|
|
# Assistant starts the reauth flow instead of retrying forever.
|
|
if err.status in (401, 403):
|
|
raise ConfigEntryAuthFailed("Invalid API key") from err
|
|
raise UpdateFailed(f"Error communicating with API: {err}") from err
|
|
except aiohttp.ClientError as err:
|
|
raise UpdateFailed(f"Error communicating with API: {err}") from err
|
|
|
|
system = snapshot.get("system") or {}
|
|
health = system.get("health") or {}
|
|
version = health.get("version")
|
|
if version:
|
|
self.server_version = version
|
|
self._update_boot_time(health.get("uptime_seconds"))
|
|
|
|
states = snapshot.get("target_states") or {}
|
|
metrics = snapshot.get("target_metrics") or {}
|
|
targets_data: dict[str, dict[str, Any]] = {}
|
|
for target in snapshot.get("targets") or []:
|
|
target_id = target.get("id")
|
|
if not target_id:
|
|
continue
|
|
targets_data[target_id] = {
|
|
"info": target,
|
|
"state": states.get(target_id),
|
|
"metrics": metrics.get(target_id),
|
|
}
|
|
|
|
brightness = snapshot.get("device_brightness") or {}
|
|
devices_data: dict[str, dict[str, Any]] = {}
|
|
for device in snapshot.get("devices") or []:
|
|
device_id = device.get("id")
|
|
if not device_id:
|
|
continue
|
|
devices_data[device_id] = {
|
|
"info": device,
|
|
"brightness": brightness.get(device_id),
|
|
}
|
|
|
|
return {
|
|
"targets": targets_data,
|
|
"devices": devices_data,
|
|
"css_sources": snapshot.get("css_sources") or [],
|
|
"value_sources": snapshot.get("value_sources") or [],
|
|
"scene_presets": snapshot.get("scene_presets") or [],
|
|
"scene_playlists": snapshot.get("scene_playlists") or [],
|
|
"playlist_state": snapshot.get("playlist_state") or {},
|
|
"sync_clocks": snapshot.get("sync_clocks") or [],
|
|
"server_version": self.server_version,
|
|
"system": {
|
|
"performance": system.get("performance"),
|
|
"health": health or None,
|
|
"boot_time": self.boot_time,
|
|
"update": system.get("update"),
|
|
},
|
|
}
|
|
|
|
async def _fetch_snapshot(self) -> dict[str, Any]:
|
|
"""GET the aggregated server snapshot (single round trip)."""
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/snapshot",
|
|
headers=self._auth_headers,
|
|
timeout=self._snapshot_timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
return await resp.json()
|
|
|
|
async def apply_update(self) -> None:
|
|
"""Trigger the server's apply-update flow.
|
|
|
|
The server downloads the available release if needed and shuts down
|
|
once the new binaries are in place. We refresh straight away so the
|
|
update entity reflects ``applying=True`` until the connection drops.
|
|
"""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/system/update/apply",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status not in (200, 202):
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to apply update: %s %s",
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
def _update_boot_time(self, uptime_seconds: float | None) -> None:
|
|
"""Recompute cached server boot time from /health uptime.
|
|
|
|
Updates only when the candidate moves more than ``_BOOT_TIME_JITTER_S``
|
|
seconds, so the timestamp sensor stays stable across polls and the
|
|
recorder doesn't see spurious state changes.
|
|
"""
|
|
if uptime_seconds is None:
|
|
return
|
|
candidate = dt_util.utcnow() - timedelta(seconds=float(uptime_seconds))
|
|
if (
|
|
self.boot_time is None
|
|
or abs((self.boot_time - candidate).total_seconds()) > _BOOT_TIME_JITTER_S
|
|
):
|
|
self.boot_time = candidate
|
|
|
|
async def set_brightness(self, device_id: str, brightness: int) -> None:
|
|
"""Set brightness for a device."""
|
|
async with self.session.put(
|
|
f"{self.server_url}/api/v1/devices/{device_id}/brightness",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json={"brightness": brightness},
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to set brightness for device %s: %s %s",
|
|
device_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def set_color(self, device_id: str, color: list[int] | None) -> None:
|
|
"""Set or clear the static color for a device."""
|
|
async with self.session.put(
|
|
f"{self.server_url}/api/v1/devices/{device_id}/color",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json={"color": color},
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to set color for device %s: %s %s",
|
|
device_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def push_colors(self, source_id: str, colors: list[list[int]]) -> None:
|
|
"""Push flat color array to an api_input CSS source."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/color-strip-sources/{source_id}/colors",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json={"colors": colors},
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status not in (200, 204):
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to push colors to source %s: %s %s",
|
|
source_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
async def push_segments(self, source_id: str, segments: list[dict]) -> None:
|
|
"""Push segment data to an api_input CSS source."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/color-strip-sources/{source_id}/colors",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json={"segments": segments},
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status not in (200, 204):
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to push segments to source %s: %s %s",
|
|
source_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
async def activate_scene(self, preset_id: str) -> None:
|
|
"""Activate a scene preset."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/scene-presets/{preset_id}/activate",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to activate scene %s: %s %s",
|
|
preset_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def start_playlist(self, playlist_id: str) -> None:
|
|
"""Start cycling a scene playlist.
|
|
|
|
The server engine runs at most one playlist at a time, so starting this
|
|
one transparently stops whichever playlist was running before.
|
|
"""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/scene-playlists/{playlist_id}/start",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to start playlist %s: %s %s",
|
|
playlist_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def stop_playlist(self) -> None:
|
|
"""Stop the active playlist (the last applied scene stays in place).
|
|
|
|
The stop endpoint is global — it stops whichever playlist is cycling,
|
|
matching the server's single-active-playlist model.
|
|
"""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/scene-playlists/stop",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to stop playlist: %s %s",
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def update_source(self, source_id: str, **kwargs: Any) -> None:
|
|
"""Update a color strip source's fields.
|
|
|
|
The server uses a discriminated-union body keyed on ``source_type``;
|
|
if the caller didn't supply it, look it up from the cached sources so
|
|
the request validates server-side.
|
|
"""
|
|
if "source_type" not in kwargs and self.data:
|
|
for source in self.data.get("css_sources", []):
|
|
if source.get("id") == source_id:
|
|
src_type = source.get("source_type")
|
|
if src_type:
|
|
kwargs["source_type"] = src_type
|
|
break
|
|
|
|
async with self.session.put(
|
|
f"{self.server_url}/api/v1/color-strip-sources/{source_id}",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json=kwargs,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to update source %s: %s %s",
|
|
source_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
async def update_target(self, target_id: str, **kwargs: Any) -> None:
|
|
"""Update an output target's fields."""
|
|
async with self.session.put(
|
|
f"{self.server_url}/api/v1/output-targets/{target_id}",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json=kwargs,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to update target %s: %s %s",
|
|
target_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def start_processing(self, target_id: str) -> None:
|
|
"""Start processing for a target."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/output-targets/{target_id}/start",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status == 409:
|
|
_LOGGER.debug("Target %s already processing", target_id)
|
|
elif resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to start target %s: %s %s",
|
|
target_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def _sync_clock_action(self, clock_id: str, action: str) -> None:
|
|
"""POST a sync-clock control action (pause/resume/reset)."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/sync-clocks/{clock_id}/{action}",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to %s sync clock %s: %s %s",
|
|
action,
|
|
clock_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def pause_sync_clock(self, clock_id: str) -> None:
|
|
"""Pause a sync clock (linked animations freeze)."""
|
|
await self._sync_clock_action(clock_id, "pause")
|
|
|
|
async def resume_sync_clock(self, clock_id: str) -> None:
|
|
"""Resume a paused sync clock."""
|
|
await self._sync_clock_action(clock_id, "resume")
|
|
|
|
async def reset_sync_clock(self, clock_id: str) -> None:
|
|
"""Reset a sync clock to t=0 (linked animations restart)."""
|
|
await self._sync_clock_action(clock_id, "reset")
|
|
|
|
async def update_sync_clock(self, clock_id: str, **kwargs: Any) -> None:
|
|
"""Update a sync clock's persistent fields (name/speed/...)."""
|
|
async with self.session.put(
|
|
f"{self.server_url}/api/v1/sync-clocks/{clock_id}",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json=kwargs,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to update sync clock %s: %s %s",
|
|
clock_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def stop_processing(self, target_id: str) -> None:
|
|
"""Stop processing for a target."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/output-targets/{target_id}/stop",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status == 409:
|
|
_LOGGER.debug("Target %s already stopped", target_id)
|
|
elif resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to stop target %s: %s %s",
|
|
target_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|