579553a850
HACS-compatible custom component split out from the main LedGrab repo. Creates light, switch, sensor, number, and select entities for each configured LedGrab device.
427 lines
16 KiB
Python
427 lines
16 KiB
Python
"""Data update coordinator for LED Screen Controller."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from datetime import timedelta
|
|
import logging
|
|
from typing import Any
|
|
|
|
import aiohttp
|
|
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
|
|
|
from .const import (
|
|
DOMAIN,
|
|
DEFAULT_TIMEOUT,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class LedGrabCoordinator(DataUpdateCoordinator):
|
|
"""Class to manage fetching LED Screen Controller data."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
session: aiohttp.ClientSession,
|
|
server_url: str,
|
|
api_key: str,
|
|
update_interval: timedelta,
|
|
) -> None:
|
|
"""Initialize the coordinator."""
|
|
self.server_url = server_url
|
|
self.session = session
|
|
self.api_key = api_key
|
|
self.server_version = "unknown"
|
|
self._auth_headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
|
|
self._timeout = aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT)
|
|
|
|
super().__init__(
|
|
hass,
|
|
_LOGGER,
|
|
name=DOMAIN,
|
|
update_interval=update_interval,
|
|
)
|
|
|
|
async def _async_update_data(self) -> dict[str, Any]:
|
|
"""Fetch data from API."""
|
|
try:
|
|
async with asyncio.timeout(DEFAULT_TIMEOUT * 3):
|
|
if self.server_version == "unknown":
|
|
await self._fetch_server_version()
|
|
|
|
targets_list = await self._fetch_targets()
|
|
|
|
# Fetch state and metrics for all targets in parallel
|
|
targets_data: dict[str, dict[str, Any]] = {}
|
|
|
|
async def fetch_target_data(target: dict) -> tuple[str, dict]:
|
|
target_id = target["id"]
|
|
try:
|
|
state, metrics = await asyncio.gather(
|
|
self._fetch_target_state(target_id),
|
|
self._fetch_target_metrics(target_id),
|
|
)
|
|
except Exception as err:
|
|
_LOGGER.warning(
|
|
"Failed to fetch data for target %s: %s",
|
|
target_id,
|
|
err,
|
|
)
|
|
state = None
|
|
metrics = None
|
|
|
|
return target_id, {
|
|
"info": target,
|
|
"state": state,
|
|
"metrics": metrics,
|
|
}
|
|
|
|
results = await asyncio.gather(
|
|
*(fetch_target_data(t) for t in targets_list),
|
|
return_exceptions=True,
|
|
)
|
|
|
|
for r in results:
|
|
if isinstance(r, Exception):
|
|
_LOGGER.warning("Target fetch failed: %s", r)
|
|
continue
|
|
target_id, data = r
|
|
targets_data[target_id] = data
|
|
|
|
# Fetch devices, CSS sources, value sources, and scene presets in parallel
|
|
devices_data, css_sources, value_sources, scene_presets = await asyncio.gather(
|
|
self._fetch_devices(),
|
|
self._fetch_css_sources(),
|
|
self._fetch_value_sources(),
|
|
self._fetch_scene_presets(),
|
|
)
|
|
|
|
return {
|
|
"targets": targets_data,
|
|
"devices": devices_data,
|
|
"css_sources": css_sources,
|
|
"value_sources": value_sources,
|
|
"scene_presets": scene_presets,
|
|
"server_version": self.server_version,
|
|
}
|
|
|
|
except asyncio.TimeoutError as err:
|
|
raise UpdateFailed(f"Timeout fetching data: {err}") from err
|
|
except aiohttp.ClientError as err:
|
|
raise UpdateFailed(f"Error communicating with API: {err}") from err
|
|
|
|
async def _fetch_server_version(self) -> None:
|
|
"""Fetch server version from health endpoint."""
|
|
try:
|
|
async with self.session.get(
|
|
f"{self.server_url}/health",
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
data = await resp.json()
|
|
self.server_version = data.get("version", "unknown")
|
|
except Exception as err:
|
|
_LOGGER.warning("Failed to fetch server version: %s", err)
|
|
self.server_version = "unknown"
|
|
|
|
async def _fetch_targets(self) -> list[dict[str, Any]]:
|
|
"""Fetch all output targets."""
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/output-targets",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
data = await resp.json()
|
|
return data.get("targets", [])
|
|
|
|
async def _fetch_target_state(self, target_id: str) -> dict[str, Any]:
|
|
"""Fetch target processing state."""
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/output-targets/{target_id}/state",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
return await resp.json()
|
|
|
|
async def _fetch_target_metrics(self, target_id: str) -> dict[str, Any]:
|
|
"""Fetch target metrics."""
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/output-targets/{target_id}/metrics",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
return await resp.json()
|
|
|
|
async def _fetch_devices(self) -> dict[str, dict[str, Any]]:
|
|
"""Fetch all devices with capabilities and brightness."""
|
|
try:
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/devices",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
data = await resp.json()
|
|
devices = data.get("devices", [])
|
|
except Exception as err:
|
|
_LOGGER.warning("Failed to fetch devices: %s", err)
|
|
return {}
|
|
|
|
# Fetch brightness for all capable devices in parallel
|
|
async def fetch_device_entry(device: dict) -> tuple[str, dict[str, Any]]:
|
|
device_id = device["id"]
|
|
entry: dict[str, Any] = {"info": device, "brightness": None}
|
|
if "brightness_control" in (device.get("capabilities") or []):
|
|
try:
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/devices/{device_id}/brightness",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status == 200:
|
|
bri_data = await resp.json()
|
|
entry["brightness"] = bri_data.get("brightness")
|
|
except Exception as err:
|
|
_LOGGER.warning(
|
|
"Failed to fetch brightness for device %s: %s",
|
|
device_id,
|
|
err,
|
|
)
|
|
return device_id, entry
|
|
|
|
results = await asyncio.gather(
|
|
*(fetch_device_entry(d) for d in devices),
|
|
return_exceptions=True,
|
|
)
|
|
|
|
devices_data: dict[str, dict[str, Any]] = {}
|
|
for r in results:
|
|
if isinstance(r, Exception):
|
|
_LOGGER.warning("Device fetch failed: %s", r)
|
|
continue
|
|
device_id, entry = r
|
|
devices_data[device_id] = entry
|
|
|
|
return devices_data
|
|
|
|
async def set_brightness(self, device_id: str, brightness: int) -> None:
|
|
"""Set brightness for a device."""
|
|
async with self.session.put(
|
|
f"{self.server_url}/api/v1/devices/{device_id}/brightness",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json={"brightness": brightness},
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to set brightness for device %s: %s %s",
|
|
device_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def set_color(self, device_id: str, color: list[int] | None) -> None:
|
|
"""Set or clear the static color for a device."""
|
|
async with self.session.put(
|
|
f"{self.server_url}/api/v1/devices/{device_id}/color",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json={"color": color},
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to set color for device %s: %s %s",
|
|
device_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def _fetch_css_sources(self) -> list[dict[str, Any]]:
|
|
"""Fetch all color strip sources."""
|
|
try:
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/color-strip-sources",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
data = await resp.json()
|
|
return data.get("sources", [])
|
|
except Exception as err:
|
|
_LOGGER.warning("Failed to fetch CSS sources: %s", err)
|
|
return []
|
|
|
|
async def _fetch_value_sources(self) -> list[dict[str, Any]]:
|
|
"""Fetch all value sources."""
|
|
try:
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/value-sources",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
data = await resp.json()
|
|
return data.get("sources", [])
|
|
except Exception as err:
|
|
_LOGGER.warning("Failed to fetch value sources: %s", err)
|
|
return []
|
|
|
|
async def _fetch_scene_presets(self) -> list[dict[str, Any]]:
|
|
"""Fetch all scene presets."""
|
|
try:
|
|
async with self.session.get(
|
|
f"{self.server_url}/api/v1/scene-presets",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
data = await resp.json()
|
|
return data.get("presets", [])
|
|
except Exception as err:
|
|
_LOGGER.warning("Failed to fetch scene presets: %s", err)
|
|
return []
|
|
|
|
async def push_colors(self, source_id: str, colors: list[list[int]]) -> None:
|
|
"""Push flat color array to an api_input CSS source."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/color-strip-sources/{source_id}/colors",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json={"colors": colors},
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status not in (200, 204):
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to push colors to source %s: %s %s",
|
|
source_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
async def push_segments(self, source_id: str, segments: list[dict]) -> None:
|
|
"""Push segment data to an api_input CSS source."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/color-strip-sources/{source_id}/colors",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json={"segments": segments},
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status not in (200, 204):
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to push segments to source %s: %s %s",
|
|
source_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
async def activate_scene(self, preset_id: str) -> None:
|
|
"""Activate a scene preset."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/scene-presets/{preset_id}/activate",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to activate scene %s: %s %s",
|
|
preset_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def update_source(self, source_id: str, **kwargs: Any) -> None:
|
|
"""Update a color strip source's fields."""
|
|
async with self.session.put(
|
|
f"{self.server_url}/api/v1/color-strip-sources/{source_id}",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json=kwargs,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to update source %s: %s %s",
|
|
source_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
async def update_target(self, target_id: str, **kwargs: Any) -> None:
|
|
"""Update an output target's fields."""
|
|
async with self.session.put(
|
|
f"{self.server_url}/api/v1/output-targets/{target_id}",
|
|
headers={**self._auth_headers, "Content-Type": "application/json"},
|
|
json=kwargs,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to update target %s: %s %s",
|
|
target_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def start_processing(self, target_id: str) -> None:
|
|
"""Start processing for a target."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/output-targets/{target_id}/start",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status == 409:
|
|
_LOGGER.debug("Target %s already processing", target_id)
|
|
elif resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to start target %s: %s %s",
|
|
target_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|
|
|
|
async def stop_processing(self, target_id: str) -> None:
|
|
"""Stop processing for a target."""
|
|
async with self.session.post(
|
|
f"{self.server_url}/api/v1/output-targets/{target_id}/stop",
|
|
headers=self._auth_headers,
|
|
timeout=self._timeout,
|
|
) as resp:
|
|
if resp.status == 409:
|
|
_LOGGER.debug("Target %s already stopped", target_id)
|
|
elif resp.status != 200:
|
|
body = await resp.text()
|
|
_LOGGER.error(
|
|
"Failed to stop target %s: %s %s",
|
|
target_id,
|
|
resp.status,
|
|
body,
|
|
)
|
|
resp.raise_for_status()
|
|
await self.async_request_refresh()
|