feat(playlists): expose scene playlists as Home Assistant entities
One device per scene playlist (model "Scene Playlist"), each with: - a switch (start/stop; the server cycles one playlist at a time, so starting one stops any other) - a "Current Scene" sensor that resolves the active preset name from scene_presets while this playlist is cycling - an "Items" diagnostic sensor (configured scene count; no state_class) The coordinator reads scene_playlists plus the flat playlist_state from the /api/v1/snapshot payload and gains start_playlist()/stop_playlist(); __init__ registers and prunes per-playlist devices and reloads on playlist-id changes; the event listener also refreshes on the playlist_state_changed WS event. Shared device/lookup/running-state plumbing lives in a new LedGrabPlaylistEntity base (entity.py) used by both the switch and the sensors. Adds en/ru translation keys. Note: this also lands the in-progress coordinator migration from the per-request fan-out to the single /api/v1/snapshot poll that was already present in the working tree. Requires the led-grab server /api/v1/snapshot to emit scene_playlists + playlist_state (companion server change, tracked separately).
This commit is contained in:
@@ -10,6 +10,7 @@ from typing import Any
|
||||
import aiohttp
|
||||
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
@@ -47,6 +48,10 @@ class LedGrabCoordinator(DataUpdateCoordinator):
|
||||
self.boot_time: datetime | None = None
|
||||
self._auth_headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
|
||||
self._timeout = aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT)
|
||||
# The aggregated snapshot does more server-side work than any single
|
||||
# endpoint (system metrics + cold-cache device brightness), so give it
|
||||
# a more generous ceiling than the per-request default.
|
||||
self._snapshot_timeout = aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT * 3)
|
||||
|
||||
super().__init__(
|
||||
hass,
|
||||
@@ -56,150 +61,87 @@ class LedGrabCoordinator(DataUpdateCoordinator):
|
||||
)
|
||||
|
||||
async def _async_update_data(self) -> dict[str, Any]:
|
||||
"""Fetch data from API."""
|
||||
"""Fetch the full server snapshot in one request and reshape it.
|
||||
|
||||
The server's ``/api/v1/snapshot`` endpoint aggregates everything a poll
|
||||
needs (targets + state + metrics, devices + brightness, the source /
|
||||
preset / clock lists, and the system block), so the coordinator no
|
||||
longer fans out per-target ``/state`` + ``/metrics`` or per-device
|
||||
``/brightness`` requests. The reshaped result keeps the same structure
|
||||
the entity platforms consume.
|
||||
"""
|
||||
try:
|
||||
async with asyncio.timeout(DEFAULT_TIMEOUT * 3):
|
||||
targets_list = await self._fetch_targets()
|
||||
|
||||
# Fetch state and metrics for all targets in parallel
|
||||
targets_data: dict[str, dict[str, Any]] = {}
|
||||
|
||||
async def fetch_target_data(target: dict) -> tuple[str, dict]:
|
||||
target_id = target["id"]
|
||||
try:
|
||||
state, metrics = await asyncio.gather(
|
||||
self._fetch_target_state(target_id),
|
||||
self._fetch_target_metrics(target_id),
|
||||
)
|
||||
except Exception as err:
|
||||
_LOGGER.warning(
|
||||
"Failed to fetch data for target %s: %s",
|
||||
target_id,
|
||||
err,
|
||||
)
|
||||
state = None
|
||||
metrics = None
|
||||
|
||||
return target_id, {
|
||||
"info": target,
|
||||
"state": state,
|
||||
"metrics": metrics,
|
||||
}
|
||||
|
||||
results = await asyncio.gather(
|
||||
*(fetch_target_data(t) for t in targets_list),
|
||||
return_exceptions=True,
|
||||
)
|
||||
|
||||
for r in results:
|
||||
if isinstance(r, Exception):
|
||||
_LOGGER.warning("Target fetch failed: %s", r)
|
||||
continue
|
||||
target_id, data = r
|
||||
targets_data[target_id] = data
|
||||
|
||||
# Fetch devices, CSS sources, value sources, scene presets,
|
||||
# sync clocks, system performance, health, and update status
|
||||
# in parallel
|
||||
(
|
||||
devices_data,
|
||||
css_sources,
|
||||
value_sources,
|
||||
scene_presets,
|
||||
sync_clocks,
|
||||
performance,
|
||||
health,
|
||||
update_status,
|
||||
) = await asyncio.gather(
|
||||
self._fetch_devices(),
|
||||
self._fetch_css_sources(),
|
||||
self._fetch_value_sources(),
|
||||
self._fetch_scene_presets(),
|
||||
self._fetch_sync_clocks(),
|
||||
self._fetch_system_performance(),
|
||||
self._fetch_health(),
|
||||
self._fetch_update_status(),
|
||||
)
|
||||
|
||||
if health:
|
||||
version = health.get("version")
|
||||
if version:
|
||||
self.server_version = version
|
||||
self._update_boot_time(health.get("uptime_seconds"))
|
||||
|
||||
return {
|
||||
"targets": targets_data,
|
||||
"devices": devices_data,
|
||||
"css_sources": css_sources,
|
||||
"value_sources": value_sources,
|
||||
"scene_presets": scene_presets,
|
||||
"sync_clocks": sync_clocks,
|
||||
"server_version": self.server_version,
|
||||
"system": {
|
||||
"performance": performance,
|
||||
"health": health,
|
||||
"boot_time": self.boot_time,
|
||||
"update": update_status,
|
||||
},
|
||||
}
|
||||
|
||||
snapshot = await self._fetch_snapshot()
|
||||
except asyncio.TimeoutError as err:
|
||||
raise UpdateFailed(f"Timeout fetching data: {err}") from err
|
||||
except aiohttp.ClientResponseError as err:
|
||||
# A rotated/invalid API key fails every poll identically now that
|
||||
# it's a single request — surface it as an auth failure so Home
|
||||
# Assistant starts the reauth flow instead of retrying forever.
|
||||
if err.status in (401, 403):
|
||||
raise ConfigEntryAuthFailed("Invalid API key") from err
|
||||
raise UpdateFailed(f"Error communicating with API: {err}") from err
|
||||
except aiohttp.ClientError as err:
|
||||
raise UpdateFailed(f"Error communicating with API: {err}") from err
|
||||
|
||||
async def _fetch_health(self) -> dict[str, Any] | None:
|
||||
"""Fetch /health (unauthenticated, cheap; gives version + uptime)."""
|
||||
try:
|
||||
async with self.session.get(
|
||||
f"{self.server_url}/health",
|
||||
timeout=self._timeout,
|
||||
) as resp:
|
||||
resp.raise_for_status()
|
||||
return await resp.json()
|
||||
except Exception as err:
|
||||
_LOGGER.debug("Failed to fetch health: %s", err)
|
||||
return None
|
||||
system = snapshot.get("system") or {}
|
||||
health = system.get("health") or {}
|
||||
version = health.get("version")
|
||||
if version:
|
||||
self.server_version = version
|
||||
self._update_boot_time(health.get("uptime_seconds"))
|
||||
|
||||
async def _fetch_system_performance(self) -> dict[str, Any] | None:
|
||||
"""Fetch CPU/RAM/GPU/temperature metrics from the server.
|
||||
states = snapshot.get("target_states") or {}
|
||||
metrics = snapshot.get("target_metrics") or {}
|
||||
targets_data: dict[str, dict[str, Any]] = {}
|
||||
for target in snapshot.get("targets") or []:
|
||||
target_id = target.get("id")
|
||||
if not target_id:
|
||||
continue
|
||||
targets_data[target_id] = {
|
||||
"info": target,
|
||||
"state": states.get(target_id),
|
||||
"metrics": metrics.get(target_id),
|
||||
}
|
||||
|
||||
Returns ``None`` on older servers without the endpoint or on transient
|
||||
failures — callers must treat the absence as "no telemetry this poll".
|
||||
"""
|
||||
try:
|
||||
async with self.session.get(
|
||||
f"{self.server_url}/api/v1/system/performance",
|
||||
headers=self._auth_headers,
|
||||
timeout=self._timeout,
|
||||
) as resp:
|
||||
if resp.status == 404:
|
||||
return None
|
||||
resp.raise_for_status()
|
||||
return await resp.json()
|
||||
except Exception as err:
|
||||
_LOGGER.debug("Failed to fetch system performance: %s", err)
|
||||
return None
|
||||
brightness = snapshot.get("device_brightness") or {}
|
||||
devices_data: dict[str, dict[str, Any]] = {}
|
||||
for device in snapshot.get("devices") or []:
|
||||
device_id = device.get("id")
|
||||
if not device_id:
|
||||
continue
|
||||
devices_data[device_id] = {
|
||||
"info": device,
|
||||
"brightness": brightness.get(device_id),
|
||||
}
|
||||
|
||||
async def _fetch_update_status(self) -> dict[str, Any] | None:
|
||||
"""Fetch auto-update status (current version, available release, …).
|
||||
return {
|
||||
"targets": targets_data,
|
||||
"devices": devices_data,
|
||||
"css_sources": snapshot.get("css_sources") or [],
|
||||
"value_sources": snapshot.get("value_sources") or [],
|
||||
"scene_presets": snapshot.get("scene_presets") or [],
|
||||
"scene_playlists": snapshot.get("scene_playlists") or [],
|
||||
"playlist_state": snapshot.get("playlist_state") or {},
|
||||
"sync_clocks": snapshot.get("sync_clocks") or [],
|
||||
"server_version": self.server_version,
|
||||
"system": {
|
||||
"performance": system.get("performance"),
|
||||
"health": health or None,
|
||||
"boot_time": self.boot_time,
|
||||
"update": system.get("update"),
|
||||
},
|
||||
}
|
||||
|
||||
Older servers without the update service return 404 — treat that as
|
||||
"no update entity should be registered" by returning ``None``.
|
||||
"""
|
||||
try:
|
||||
async with self.session.get(
|
||||
f"{self.server_url}/api/v1/system/update/status",
|
||||
headers=self._auth_headers,
|
||||
timeout=self._timeout,
|
||||
) as resp:
|
||||
if resp.status == 404:
|
||||
return None
|
||||
resp.raise_for_status()
|
||||
return await resp.json()
|
||||
except Exception as err:
|
||||
_LOGGER.debug("Failed to fetch update status: %s", err)
|
||||
return None
|
||||
async def _fetch_snapshot(self) -> dict[str, Any]:
|
||||
"""GET the aggregated server snapshot (single round trip)."""
|
||||
async with self.session.get(
|
||||
f"{self.server_url}/api/v1/snapshot",
|
||||
headers=self._auth_headers,
|
||||
timeout=self._snapshot_timeout,
|
||||
) as resp:
|
||||
resp.raise_for_status()
|
||||
return await resp.json()
|
||||
|
||||
async def apply_update(self) -> None:
|
||||
"""Trigger the server's apply-update flow.
|
||||
@@ -239,89 +181,6 @@ class LedGrabCoordinator(DataUpdateCoordinator):
|
||||
):
|
||||
self.boot_time = candidate
|
||||
|
||||
async def _fetch_targets(self) -> list[dict[str, Any]]:
|
||||
"""Fetch all output targets."""
|
||||
async with self.session.get(
|
||||
f"{self.server_url}/api/v1/output-targets",
|
||||
headers=self._auth_headers,
|
||||
timeout=self._timeout,
|
||||
) as resp:
|
||||
resp.raise_for_status()
|
||||
data = await resp.json()
|
||||
return data.get("targets", [])
|
||||
|
||||
async def _fetch_target_state(self, target_id: str) -> dict[str, Any]:
|
||||
"""Fetch target processing state."""
|
||||
async with self.session.get(
|
||||
f"{self.server_url}/api/v1/output-targets/{target_id}/state",
|
||||
headers=self._auth_headers,
|
||||
timeout=self._timeout,
|
||||
) as resp:
|
||||
resp.raise_for_status()
|
||||
return await resp.json()
|
||||
|
||||
async def _fetch_target_metrics(self, target_id: str) -> dict[str, Any]:
|
||||
"""Fetch target metrics."""
|
||||
async with self.session.get(
|
||||
f"{self.server_url}/api/v1/output-targets/{target_id}/metrics",
|
||||
headers=self._auth_headers,
|
||||
timeout=self._timeout,
|
||||
) as resp:
|
||||
resp.raise_for_status()
|
||||
return await resp.json()
|
||||
|
||||
async def _fetch_devices(self) -> dict[str, dict[str, Any]]:
|
||||
"""Fetch all devices with capabilities and brightness."""
|
||||
try:
|
||||
async with self.session.get(
|
||||
f"{self.server_url}/api/v1/devices",
|
||||
headers=self._auth_headers,
|
||||
timeout=self._timeout,
|
||||
) as resp:
|
||||
resp.raise_for_status()
|
||||
data = await resp.json()
|
||||
devices = data.get("devices", [])
|
||||
except Exception as err:
|
||||
_LOGGER.warning("Failed to fetch devices: %s", err)
|
||||
return {}
|
||||
|
||||
# Fetch brightness for all capable devices in parallel
|
||||
async def fetch_device_entry(device: dict) -> tuple[str, dict[str, Any]]:
|
||||
device_id = device["id"]
|
||||
entry: dict[str, Any] = {"info": device, "brightness": None}
|
||||
if "brightness_control" in (device.get("capabilities") or []):
|
||||
try:
|
||||
async with self.session.get(
|
||||
f"{self.server_url}/api/v1/devices/{device_id}/brightness",
|
||||
headers=self._auth_headers,
|
||||
timeout=self._timeout,
|
||||
) as resp:
|
||||
if resp.status == 200:
|
||||
bri_data = await resp.json()
|
||||
entry["brightness"] = bri_data.get("brightness")
|
||||
except Exception as err:
|
||||
_LOGGER.warning(
|
||||
"Failed to fetch brightness for device %s: %s",
|
||||
device_id,
|
||||
err,
|
||||
)
|
||||
return device_id, entry
|
||||
|
||||
results = await asyncio.gather(
|
||||
*(fetch_device_entry(d) for d in devices),
|
||||
return_exceptions=True,
|
||||
)
|
||||
|
||||
devices_data: dict[str, dict[str, Any]] = {}
|
||||
for r in results:
|
||||
if isinstance(r, Exception):
|
||||
_LOGGER.warning("Device fetch failed: %s", r)
|
||||
continue
|
||||
device_id, entry = r
|
||||
devices_data[device_id] = entry
|
||||
|
||||
return devices_data
|
||||
|
||||
async def set_brightness(self, device_id: str, brightness: int) -> None:
|
||||
"""Set brightness for a device."""
|
||||
async with self.session.put(
|
||||
@@ -360,72 +219,6 @@ class LedGrabCoordinator(DataUpdateCoordinator):
|
||||
resp.raise_for_status()
|
||||
await self.async_request_refresh()
|
||||
|
||||
async def _fetch_css_sources(self) -> list[dict[str, Any]]:
|
||||
"""Fetch all color strip sources."""
|
||||
try:
|
||||
async with self.session.get(
|
||||
f"{self.server_url}/api/v1/color-strip-sources",
|
||||
headers=self._auth_headers,
|
||||
timeout=self._timeout,
|
||||
) as resp:
|
||||
resp.raise_for_status()
|
||||
data = await resp.json()
|
||||
return data.get("sources", [])
|
||||
except Exception as err:
|
||||
_LOGGER.warning("Failed to fetch CSS sources: %s", err)
|
||||
return []
|
||||
|
||||
async def _fetch_value_sources(self) -> list[dict[str, Any]]:
|
||||
"""Fetch all value sources."""
|
||||
try:
|
||||
async with self.session.get(
|
||||
f"{self.server_url}/api/v1/value-sources",
|
||||
headers=self._auth_headers,
|
||||
timeout=self._timeout,
|
||||
) as resp:
|
||||
resp.raise_for_status()
|
||||
data = await resp.json()
|
||||
return data.get("sources", [])
|
||||
except Exception as err:
|
||||
_LOGGER.warning("Failed to fetch value sources: %s", err)
|
||||
return []
|
||||
|
||||
async def _fetch_scene_presets(self) -> list[dict[str, Any]]:
|
||||
"""Fetch all scene presets."""
|
||||
try:
|
||||
async with self.session.get(
|
||||
f"{self.server_url}/api/v1/scene-presets",
|
||||
headers=self._auth_headers,
|
||||
timeout=self._timeout,
|
||||
) as resp:
|
||||
resp.raise_for_status()
|
||||
data = await resp.json()
|
||||
return data.get("presets", [])
|
||||
except Exception as err:
|
||||
_LOGGER.warning("Failed to fetch scene presets: %s", err)
|
||||
return []
|
||||
|
||||
async def _fetch_sync_clocks(self) -> list[dict[str, Any]]:
|
||||
"""Fetch all synchronization clocks (with runtime state).
|
||||
|
||||
Older servers without the sync-clock endpoint return 404 — treat
|
||||
that as "no clocks" rather than failing the whole refresh.
|
||||
"""
|
||||
try:
|
||||
async with self.session.get(
|
||||
f"{self.server_url}/api/v1/sync-clocks",
|
||||
headers=self._auth_headers,
|
||||
timeout=self._timeout,
|
||||
) as resp:
|
||||
if resp.status == 404:
|
||||
return []
|
||||
resp.raise_for_status()
|
||||
data = await resp.json()
|
||||
return data.get("clocks", [])
|
||||
except Exception as err:
|
||||
_LOGGER.warning("Failed to fetch sync clocks: %s", err)
|
||||
return []
|
||||
|
||||
async def push_colors(self, source_id: str, colors: list[list[int]]) -> None:
|
||||
"""Push flat color array to an api_input CSS source."""
|
||||
async with self.session.post(
|
||||
@@ -480,6 +273,49 @@ class LedGrabCoordinator(DataUpdateCoordinator):
|
||||
resp.raise_for_status()
|
||||
await self.async_request_refresh()
|
||||
|
||||
async def start_playlist(self, playlist_id: str) -> None:
|
||||
"""Start cycling a scene playlist.
|
||||
|
||||
The server engine runs at most one playlist at a time, so starting this
|
||||
one transparently stops whichever playlist was running before.
|
||||
"""
|
||||
async with self.session.post(
|
||||
f"{self.server_url}/api/v1/scene-playlists/{playlist_id}/start",
|
||||
headers=self._auth_headers,
|
||||
timeout=self._timeout,
|
||||
) as resp:
|
||||
if resp.status != 200:
|
||||
body = await resp.text()
|
||||
_LOGGER.error(
|
||||
"Failed to start playlist %s: %s %s",
|
||||
playlist_id,
|
||||
resp.status,
|
||||
body,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
await self.async_request_refresh()
|
||||
|
||||
async def stop_playlist(self) -> None:
|
||||
"""Stop the active playlist (the last applied scene stays in place).
|
||||
|
||||
The stop endpoint is global — it stops whichever playlist is cycling,
|
||||
matching the server's single-active-playlist model.
|
||||
"""
|
||||
async with self.session.post(
|
||||
f"{self.server_url}/api/v1/scene-playlists/stop",
|
||||
headers=self._auth_headers,
|
||||
timeout=self._timeout,
|
||||
) as resp:
|
||||
if resp.status != 200:
|
||||
body = await resp.text()
|
||||
_LOGGER.error(
|
||||
"Failed to stop playlist: %s %s",
|
||||
resp.status,
|
||||
body,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
await self.async_request_refresh()
|
||||
|
||||
async def update_source(self, source_id: str, **kwargs: Any) -> None:
|
||||
"""Update a color strip source's fields.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user