0a16dc9058
- coordinator.update_source now auto-injects source_type from cached data so the server's discriminated-union body validates (was rejecting fallback_color updates with 422). - Add Fallback Timeout (number) and Interpolation (select) entities per api_input CSS source. - Light no longer zeros fallback_color on turn_off; state restored across HA restarts via RestoreEntity so the chosen default color persists. - Bump manifest to 0.2.2 and update en/ru translations.
440 lines
16 KiB
Python
440 lines
16 KiB
Python
"""Data update coordinator for LED Screen Controller."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from datetime import timedelta
|
|
import logging
|
|
from typing import Any
|
|
|
|
import aiohttp
|
|
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
|
|
|
from .const import (
|
|
DOMAIN,
|
|
DEFAULT_TIMEOUT,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class LedGrabCoordinator(DataUpdateCoordinator):
|
|
"""Class to manage fetching LED Screen Controller data."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
session: aiohttp.ClientSession,
|
|
server_url: str,
|
|
api_key: str,
|
|
update_interval: timedelta,
|
|
) -> None:
|
|
"""Initialize the coordinator."""
|
|
self.server_url = server_url
|
|
self.session = session
|
|
self.api_key = api_key
|
|
self.server_version = "unknown"
|
|
self._auth_headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
|
|
self._timeout = aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT)
|
|
|
|
super().__init__(
|
|
hass,
|
|
_LOGGER,
|
|
name=DOMAIN,
|
|
update_interval=update_interval,
|
|
)
|
|
|
|
async def _async_update_data(self) -> dict[str, Any]:
|
|
"""Fetch data from API."""
|
|
try:
|
|
async with asyncio.timeout(DEFAULT_TIMEOUT * 3):
|
|
if self.server_version == "unknown":
|
|
await self._fetch_server_version()
|
|
|
|
targets_list = await self._fetch_targets()
|
|
|
|
# Fetch state and metrics for all targets in parallel
|
|
targets_data: dict[str, dict[str, Any]] = {}
|
|
|
|
async def fetch_target_data(target: dict) -> tuple[str, dict]:
|
|
target_id = target["id"]
|
|
try:
|
|
state, metrics = await asyncio.gather(
|
|
self._fetch_target_state(target_id),
|
|
self._fetch_target_metrics(target_id),
|
|
)
|
|
except Exception as err:
|
|
_LOGGER.warning(
|
|
"Failed to fetch data for target %s: %s",
|
|
target_id,
|
|
err,
|
|
)
|
|
state = None
|
|
metrics = None
|
|
|
|
return target_id, {
|
|
"info": target,
|
|
"state": state,
|
|
"metrics": metrics,
|
|
}
|
|
|
|
results = await asyncio.gather(
|
|
*(fetch_target_data(t) for t in targets_list),
|
|
return_exceptions=True,
|
|
)
|
|
|
|
for r in results:
|
|
if isinstance(r, Exception):
|
|
_LOGGER.warning("Target fetch failed: %s", r)
|
|
continue
|
|
target_id, data = r
|
|
targets_data[target_id] = data
|
|
|
|
# Fetch devices, CSS sources, value sources, and scene presets in parallel
|
|
devices_data, css_sources, value_sources, scene_presets = await asyncio.gather(
|
|
self._fetch_devices(),
|
|
self._fetch_css_sources(),
|
|
self._fetch_value_sources(),
|
|
self._fetch_scene_presets(),
|
|
)
|
|
|
|
return {
|
|
"targets": targets_data,
|
|
"devices": devices_data,
|
|
"css_sources": css_sources,
|
|
"value_sources": value_sources,
|
|
"scene_presets": scene_presets,
|
|
"server_version": self.server_version,
|
|
}
|
|
|
|
except asyncio.TimeoutError as err:
|
|
raise UpdateFailed(f"Timeout fetching data: {err}") from err
|
|
except aiohttp.ClientError as err:
|
|
raise UpdateFailed(f"Error communicating with API: {err}") from err
|
|
|
|
async def _fetch_server_version(self) -> None:
|
|
"""Fetch server version from health endpoint."""
|
|
try:
|
|
async with self.session.get(
|
|
f"{self.server_url}/health",
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
data = await resp.json()
|
|
self.server_version = data.get("version", "unknown")
|
|
except Exception as err:
|
|
_LOGGER.warning("Failed to fetch server version: %s", err)
|
|
self.server_version = "unknown"
|
|
|
|
async def _fetch_targets(self) -> list[dict[str, Any]]:
|
|
"""Fetch all output targets."""
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/output-targets",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
data = await resp.json()
|
|
return data.get("targets", [])
|
|
|
|
async def _fetch_target_state(self, target_id: str) -> dict[str, Any]:
|
|
"""Fetch target processing state."""
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/output-targets/{target_id}/state",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
return await resp.json()
|
|
|
|
async def _fetch_target_metrics(self, target_id: str) -> dict[str, Any]:
|
|
"""Fetch target metrics."""
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/output-targets/{target_id}/metrics",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
return await resp.json()
|
|
|
|
async def _fetch_devices(self) -> dict[str, dict[str, Any]]:
|
|
"""Fetch all devices with capabilities and brightness."""
|
|
try:
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/devices",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
data = await resp.json()
|
|
devices = data.get("devices", [])
|
|
except Exception as err:
|
|
_LOGGER.warning("Failed to fetch devices: %s", err)
|
|
return {}
|
|
|
|
# Fetch brightness for all capable devices in parallel
|
|
async def fetch_device_entry(device: dict) -> tuple[str, dict[str, Any]]:
|
|
device_id = device["id"]
|
|
entry: dict[str, Any] = {"info": device, "brightness": None}
|
|
if "brightness_control" in (device.get("capabilities") or []):
|
|
try:
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/devices/{device_id}/brightness",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status == 200:
|
|
bri_data = await resp.json()
|
|
entry["brightness"] = bri_data.get("brightness")
|
|
except Exception as err:
|
|
_LOGGER.warning(
|
|
"Failed to fetch brightness for device %s: %s",
|
|
device_id,
|
|
err,
|
|
)
|
|
return device_id, entry
|
|
|
|
results = await asyncio.gather(
|
|
*(fetch_device_entry(d) for d in devices),
|
|
return_exceptions=True,
|
|
)
|
|
|
|
devices_data: dict[str, dict[str, Any]] = {}
|
|
for r in results:
|
|
if isinstance(r, Exception):
|
|
_LOGGER.warning("Device fetch failed: %s", r)
|
|
continue
|
|
device_id, entry = r
|
|
devices_data[device_id] = entry
|
|
|
|
return devices_data
|
|
|
|
async def set_brightness(self, device_id: str, brightness: int) -> None:
|
|
"""Set brightness for a device."""
|
|
async with self.session.put(
|
|
f"{self.server_url}/api/v1/devices/{device_id}/brightness",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json={"brightness": brightness},
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to set brightness for device %s: %s %s",
|
|
device_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def set_color(self, device_id: str, color: list[int] | None) -> None:
|
|
"""Set or clear the static color for a device."""
|
|
async with self.session.put(
|
|
f"{self.server_url}/api/v1/devices/{device_id}/color",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json={"color": color},
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to set color for device %s: %s %s",
|
|
device_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def _fetch_css_sources(self) -> list[dict[str, Any]]:
|
|
"""Fetch all color strip sources."""
|
|
try:
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/color-strip-sources",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
data = await resp.json()
|
|
return data.get("sources", [])
|
|
except Exception as err:
|
|
_LOGGER.warning("Failed to fetch CSS sources: %s", err)
|
|
return []
|
|
|
|
async def _fetch_value_sources(self) -> list[dict[str, Any]]:
|
|
"""Fetch all value sources."""
|
|
try:
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/value-sources",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
data = await resp.json()
|
|
return data.get("sources", [])
|
|
except Exception as err:
|
|
_LOGGER.warning("Failed to fetch value sources: %s", err)
|
|
return []
|
|
|
|
async def _fetch_scene_presets(self) -> list[dict[str, Any]]:
|
|
"""Fetch all scene presets."""
|
|
try:
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/scene-presets",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
data = await resp.json()
|
|
return data.get("presets", [])
|
|
except Exception as err:
|
|
_LOGGER.warning("Failed to fetch scene presets: %s", err)
|
|
return []
|
|
|
|
async def push_colors(self, source_id: str, colors: list[list[int]]) -> None:
|
|
"""Push flat color array to an api_input CSS source."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/color-strip-sources/{source_id}/colors",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json={"colors": colors},
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status not in (200, 204):
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to push colors to source %s: %s %s",
|
|
source_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
async def push_segments(self, source_id: str, segments: list[dict]) -> None:
|
|
"""Push segment data to an api_input CSS source."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/color-strip-sources/{source_id}/colors",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json={"segments": segments},
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status not in (200, 204):
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to push segments to source %s: %s %s",
|
|
source_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
async def activate_scene(self, preset_id: str) -> None:
|
|
"""Activate a scene preset."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/scene-presets/{preset_id}/activate",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to activate scene %s: %s %s",
|
|
preset_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def update_source(self, source_id: str, **kwargs: Any) -> None:
|
|
"""Update a color strip source's fields.
|
|
|
|
The server uses a discriminated-union body keyed on ``source_type``;
|
|
if the caller didn't supply it, look it up from the cached sources so
|
|
the request validates server-side.
|
|
"""
|
|
if "source_type" not in kwargs and self.data:
|
|
for source in self.data.get("css_sources", []):
|
|
if source.get("id") == source_id:
|
|
src_type = source.get("source_type")
|
|
if src_type:
|
|
kwargs["source_type"] = src_type
|
|
break
|
|
|
|
async with self.session.put(
|
|
f"{self.server_url}/api/v1/color-strip-sources/{source_id}",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json=kwargs,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to update source %s: %s %s",
|
|
source_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
async def update_target(self, target_id: str, **kwargs: Any) -> None:
|
|
"""Update an output target's fields."""
|
|
async with self.session.put(
|
|
f"{self.server_url}/api/v1/output-targets/{target_id}",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json=kwargs,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to update target %s: %s %s",
|
|
target_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def start_processing(self, target_id: str) -> None:
|
|
"""Start processing for a target."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/output-targets/{target_id}/start",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status == 409:
|
|
_LOGGER.debug("Target %s already processing", target_id)
|
|
elif resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to start target %s: %s %s",
|
|
target_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def stop_processing(self, target_id: str) -> None:
|
|
"""Stop processing for a target."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/output-targets/{target_id}/stop",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status == 409:
|
|
_LOGGER.debug("Target %s already stopped", target_id)
|
|
elif resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to stop target %s: %s %s",
|
|
target_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|