feat: server telemetry, update entity, sync-clock controls

- Server device exposing CPU/RAM/GPU/temperature/battery sensors via
  /api/v1/system/performance, plus last-restart timestamp (cached with
  jitter threshold so the recorder doesn't see poll wobble) and version.
- Update entity backed by /api/v1/system/update — installs via
  /apply, hides the install button when the server reports
  can_auto_update=false.
- Sync-clock entities: reset button, speed number, running switch, and
  the event listener now refreshes on entity_changed events too.
- Bump manifest to 0.4.0.
This commit is contained in:
2026-04-27 01:35:42 +03:00
parent e8f2b5e528
commit a666d9eb9c
12 changed files with 1080 additions and 23 deletions
+49 -6
View File
@@ -35,6 +35,7 @@ PLATFORMS: list[Platform] = [
Platform.SENSOR,
Platform.NUMBER,
Platform.SELECT,
Platform.UPDATE,
]
@@ -61,6 +62,25 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
# Create device entries for each target and remove stale ones
device_registry = dr.async_get(hass)
current_identifiers: set[tuple[str, str]] = set()
# Server device — owns the system telemetry sensors (CPU, RAM, GPU, etc.)
server_identifier = (DOMAIN, f"{entry.entry_id}_server")
sw_version = (
coordinator.server_version
if coordinator.server_version and coordinator.server_version != "unknown"
else None
)
device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
identifiers={server_identifier},
name=server_name,
manufacturer="LedGrab",
model="Server",
sw_version=sw_version,
configuration_url=server_url,
)
current_identifiers.add(server_identifier)
if coordinator.data and "targets" in coordinator.data:
for target_id, target_data in coordinator.data["targets"].items():
info = target_data["info"]
@@ -93,6 +113,20 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
)
current_identifiers.add(scenes_identifier)
# One device per sync clock — groups its switch/number/button/sensor.
sync_clocks = coordinator.data.get("sync_clocks", []) if coordinator.data else []
for clock in sync_clocks:
clock_identifier = (DOMAIN, clock["id"])
device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
identifiers={clock_identifier},
name=clock.get("name", clock["id"]),
manufacturer=server_name,
model="Sync Clock",
configuration_url=server_url,
)
current_identifiers.add(clock_identifier)
# Remove devices for targets that no longer exist
for device_entry in dr.async_entries_for_config_entry(device_registry, entry.entry_id):
if not device_entry.identifiers & current_identifiers:
@@ -106,28 +140,37 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
DATA_EVENT_LISTENER: event_listener,
}
# Track target and scene IDs to detect changes
# Track target, scene, and sync-clock IDs to detect changes
known_target_ids = set(coordinator.data.get("targets", {}).keys() if coordinator.data else [])
known_scene_ids = set(
p["id"] for p in (coordinator.data.get("scene_presets", []) if coordinator.data else [])
)
known_clock_ids = set(
c["id"] for c in (coordinator.data.get("sync_clocks", []) if coordinator.data else [])
)
def _on_coordinator_update() -> None:
"""Detect target/scene list changes and trigger reload."""
nonlocal known_target_ids, known_scene_ids
"""Detect target/scene/clock list changes and trigger reload."""
nonlocal known_target_ids, known_scene_ids, known_clock_ids
if not coordinator.data:
return
targets = coordinator.data.get("targets", {})
# Reload if target or scene list changed
# Reload if target, scene, or sync-clock list changed
current_ids = set(targets.keys())
current_scene_ids = set(p["id"] for p in coordinator.data.get("scene_presets", []))
if current_ids != known_target_ids or current_scene_ids != known_scene_ids:
current_clock_ids = set(c["id"] for c in coordinator.data.get("sync_clocks", []))
if (
current_ids != known_target_ids
or current_scene_ids != known_scene_ids
or current_clock_ids != known_clock_ids
):
known_target_ids = current_ids
known_scene_ids = current_scene_ids
_LOGGER.info("Target or scene list changed, reloading integration")
known_clock_ids = current_clock_ids
_LOGGER.info("Target, scene, or sync-clock list changed, reloading integration")
hass.async_create_task(hass.config_entries.async_reload(entry.entry_id))
coordinator.async_add_listener(_on_coordinator_update)
+40 -1
View File
@@ -25,12 +25,16 @@ async def async_setup_entry(
data = hass.data[DOMAIN][entry.entry_id]
coordinator: LedGrabCoordinator = data[DATA_COORDINATOR]
entities = []
entities: list[ButtonEntity] = []
if coordinator.data:
for preset in coordinator.data.get("scene_presets", []):
entities.append(
SceneActivateButton(coordinator, preset, entry.entry_id)
)
for clock in coordinator.data.get("sync_clocks", []):
entities.append(
SyncClockResetButton(coordinator, clock["id"], entry.entry_id)
)
async_add_entities(entities)
@@ -72,3 +76,38 @@ class SceneActivateButton(CoordinatorEntity, ButtonEntity):
async def async_press(self) -> None:
"""Activate the scene preset."""
await self.coordinator.activate_scene(self._preset_id)
class SyncClockResetButton(CoordinatorEntity, ButtonEntity):
"""Button that resets a sync clock to t=0 (linked animations restart)."""
_attr_has_entity_name = True
_attr_icon = "mdi:restart"
def __init__(
self,
coordinator: LedGrabCoordinator,
clock_id: str,
entry_id: str,
) -> None:
super().__init__(coordinator)
self._clock_id = clock_id
self._entry_id = entry_id
self._attr_unique_id = f"{clock_id}_reset"
self._attr_translation_key = "sync_clock_reset"
@property
def device_info(self) -> dict[str, Any]:
return {"identifiers": {(DOMAIN, self._clock_id)}}
@property
def available(self) -> bool:
if not self.coordinator.data:
return False
return any(
c.get("id") == self._clock_id
for c in self.coordinator.data.get("sync_clocks", [])
)
async def async_press(self) -> None:
await self.coordinator.reset_sync_clock(self._clock_id)
+194 -12
View File
@@ -3,7 +3,7 @@
from __future__ import annotations
import asyncio
from datetime import timedelta
from datetime import datetime, timedelta
import logging
from typing import Any
@@ -11,12 +11,20 @@ import aiohttp
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util import dt as dt_util
from .const import (
DOMAIN,
DEFAULT_TIMEOUT,
)
# Boot-time jitter threshold: candidate boot_time is recomputed each poll from
# `now - uptime_seconds`, so network latency and clock skew make it wobble by
# sub-second amounts. Only update the cached value if the candidate differs by
# more than this many seconds — anything larger almost certainly means the
# server actually restarted.
_BOOT_TIME_JITTER_S = 5.0
_LOGGER = logging.getLogger(__name__)
@@ -36,6 +44,7 @@ class LedGrabCoordinator(DataUpdateCoordinator):
self.session = session
self.api_key = api_key
self.server_version = "unknown"
self.boot_time: datetime | None = None
self._auth_headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
self._timeout = aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT)
@@ -50,9 +59,6 @@ class LedGrabCoordinator(DataUpdateCoordinator):
"""Fetch data from API."""
try:
async with asyncio.timeout(DEFAULT_TIMEOUT * 3):
if self.server_version == "unknown":
await self._fetch_server_version()
targets_list = await self._fetch_targets()
# Fetch state and metrics for all targets in parallel
@@ -92,21 +98,49 @@ class LedGrabCoordinator(DataUpdateCoordinator):
target_id, data = r
targets_data[target_id] = data
# Fetch devices, CSS sources, value sources, and scene presets in parallel
devices_data, css_sources, value_sources, scene_presets = await asyncio.gather(
# Fetch devices, CSS sources, value sources, scene presets,
# sync clocks, system performance, health, and update status
# in parallel
(
devices_data,
css_sources,
value_sources,
scene_presets,
sync_clocks,
performance,
health,
update_status,
) = await asyncio.gather(
self._fetch_devices(),
self._fetch_css_sources(),
self._fetch_value_sources(),
self._fetch_scene_presets(),
self._fetch_sync_clocks(),
self._fetch_system_performance(),
self._fetch_health(),
self._fetch_update_status(),
)
if health:
version = health.get("version")
if version:
self.server_version = version
self._update_boot_time(health.get("uptime_seconds"))
return {
"targets": targets_data,
"devices": devices_data,
"css_sources": css_sources,
"value_sources": value_sources,
"scene_presets": scene_presets,
"sync_clocks": sync_clocks,
"server_version": self.server_version,
"system": {
"performance": performance,
"health": health,
"boot_time": self.boot_time,
"update": update_status,
},
}
except asyncio.TimeoutError as err:
@@ -114,19 +148,96 @@ class LedGrabCoordinator(DataUpdateCoordinator):
except aiohttp.ClientError as err:
raise UpdateFailed(f"Error communicating with API: {err}") from err
async def _fetch_server_version(self) -> None:
"""Fetch server version from health endpoint."""
async def _fetch_health(self) -> dict[str, Any] | None:
"""Fetch /health (unauthenticated, cheap; gives version + uptime)."""
try:
async with self.session.get(
f"{self.server_url}/health",
timeout=self._timeout,
) as resp:
resp.raise_for_status()
data = await resp.json()
self.server_version = data.get("version", "unknown")
return await resp.json()
except Exception as err:
_LOGGER.warning("Failed to fetch server version: %s", err)
self.server_version = "unknown"
_LOGGER.debug("Failed to fetch health: %s", err)
return None
async def _fetch_system_performance(self) -> dict[str, Any] | None:
"""Fetch CPU/RAM/GPU/temperature metrics from the server.
Returns ``None`` on older servers without the endpoint or on transient
failures — callers must treat the absence as "no telemetry this poll".
"""
try:
async with self.session.get(
f"{self.server_url}/api/v1/system/performance",
headers=self._auth_headers,
timeout=self._timeout,
) as resp:
if resp.status == 404:
return None
resp.raise_for_status()
return await resp.json()
except Exception as err:
_LOGGER.debug("Failed to fetch system performance: %s", err)
return None
async def _fetch_update_status(self) -> dict[str, Any] | None:
"""Fetch auto-update status (current version, available release, …).
Older servers without the update service return 404 — treat that as
"no update entity should be registered" by returning ``None``.
"""
try:
async with self.session.get(
f"{self.server_url}/api/v1/system/update/status",
headers=self._auth_headers,
timeout=self._timeout,
) as resp:
if resp.status == 404:
return None
resp.raise_for_status()
return await resp.json()
except Exception as err:
_LOGGER.debug("Failed to fetch update status: %s", err)
return None
async def apply_update(self) -> None:
"""Trigger the server's apply-update flow.
The server downloads the available release if needed and shuts down
once the new binaries are in place. We refresh straight away so the
update entity reflects ``applying=True`` until the connection drops.
"""
async with self.session.post(
f"{self.server_url}/api/v1/system/update/apply",
headers=self._auth_headers,
timeout=self._timeout,
) as resp:
if resp.status not in (200, 202):
body = await resp.text()
_LOGGER.error(
"Failed to apply update: %s %s",
resp.status,
body,
)
resp.raise_for_status()
await self.async_request_refresh()
def _update_boot_time(self, uptime_seconds: float | None) -> None:
"""Recompute cached server boot time from /health uptime.
Updates only when the candidate moves more than ``_BOOT_TIME_JITTER_S``
seconds, so the timestamp sensor stays stable across polls and the
recorder doesn't see spurious state changes.
"""
if uptime_seconds is None:
return
candidate = dt_util.utcnow() - timedelta(seconds=float(uptime_seconds))
if (
self.boot_time is None
or abs((self.boot_time - candidate).total_seconds()) > _BOOT_TIME_JITTER_S
):
self.boot_time = candidate
async def _fetch_targets(self) -> list[dict[str, Any]]:
"""Fetch all output targets."""
@@ -294,6 +405,27 @@ class LedGrabCoordinator(DataUpdateCoordinator):
_LOGGER.warning("Failed to fetch scene presets: %s", err)
return []
async def _fetch_sync_clocks(self) -> list[dict[str, Any]]:
"""Fetch all synchronization clocks (with runtime state).
Older servers without the sync-clock endpoint return 404 — treat
that as "no clocks" rather than failing the whole refresh.
"""
try:
async with self.session.get(
f"{self.server_url}/api/v1/sync-clocks",
headers=self._auth_headers,
timeout=self._timeout,
) as resp:
if resp.status == 404:
return []
resp.raise_for_status()
data = await resp.json()
return data.get("clocks", [])
except Exception as err:
_LOGGER.warning("Failed to fetch sync clocks: %s", err)
return []
async def push_colors(self, source_id: str, colors: list[list[int]]) -> None:
"""Push flat color array to an api_input CSS source."""
async with self.session.post(
@@ -418,6 +550,56 @@ class LedGrabCoordinator(DataUpdateCoordinator):
resp.raise_for_status()
await self.async_request_refresh()
async def _sync_clock_action(self, clock_id: str, action: str) -> None:
"""POST a sync-clock control action (pause/resume/reset)."""
async with self.session.post(
f"{self.server_url}/api/v1/sync-clocks/{clock_id}/{action}",
headers=self._auth_headers,
timeout=self._timeout,
) as resp:
if resp.status != 200:
body = await resp.text()
_LOGGER.error(
"Failed to %s sync clock %s: %s %s",
action,
clock_id,
resp.status,
body,
)
resp.raise_for_status()
await self.async_request_refresh()
async def pause_sync_clock(self, clock_id: str) -> None:
"""Pause a sync clock (linked animations freeze)."""
await self._sync_clock_action(clock_id, "pause")
async def resume_sync_clock(self, clock_id: str) -> None:
"""Resume a paused sync clock."""
await self._sync_clock_action(clock_id, "resume")
async def reset_sync_clock(self, clock_id: str) -> None:
"""Reset a sync clock to t=0 (linked animations restart)."""
await self._sync_clock_action(clock_id, "reset")
async def update_sync_clock(self, clock_id: str, **kwargs: Any) -> None:
"""Update a sync clock's persistent fields (name/speed/...)."""
async with self.session.put(
f"{self.server_url}/api/v1/sync-clocks/{clock_id}",
headers={**self._auth_headers, "Content-Type": "application/json"},
json=kwargs,
timeout=self._timeout,
) as resp:
if resp.status != 200:
body = await resp.text()
_LOGGER.error(
"Failed to update sync clock %s: %s %s",
clock_id,
resp.status,
body,
)
resp.raise_for_status()
await self.async_request_refresh()
async def stop_processing(self, target_id: str) -> None:
"""Stop processing for a target."""
async with self.session.post(
+1 -1
View File
@@ -99,7 +99,7 @@ class EventStreamListener:
data = json.loads(msg.data)
except json.JSONDecodeError:
continue
if data.get("type") == "state_change":
if data.get("type") in ("state_change", "entity_changed"):
await self._coordinator.async_request_refresh()
elif msg.type in (
aiohttp.WSMsgType.CLOSED,
+1 -1
View File
@@ -8,5 +8,5 @@
"iot_class": "local_push",
"issue_tracker": "https://git.dolgolyov-family.by/alexei.dolgolyov/ledgrab-haos-integration/issues",
"requirements": ["aiohttp>=3.9.0"],
"version": "0.2.2"
"version": "0.4.0"
}
+60
View File
@@ -32,6 +32,9 @@ async def async_setup_entry(
if source.get("source_type") == "api_input":
entities.append(ApiInputTimeout(coordinator, source, entry.entry_id))
for clock in coordinator.data.get("sync_clocks") or []:
entities.append(SyncClockSpeed(coordinator, clock["id"], entry.entry_id))
if coordinator.data and "targets" in coordinator.data:
devices = coordinator.data.get("devices") or {}
@@ -300,3 +303,60 @@ class ApiInputTimeout(CoordinatorEntity, NumberEntity):
if source.get("id") == self._source_id:
return source
return None
# --- Sync clock number entities ---
class SyncClockSpeed(CoordinatorEntity, NumberEntity):
"""Speed multiplier for a sync clock (server range 0.110.0).
Hot-applied — running animations linked to the clock change pace
immediately without resetting their position.
"""
_attr_has_entity_name = True
_attr_native_min_value = 0.1
_attr_native_max_value = 10.0
_attr_native_step = 0.1
_attr_mode = NumberMode.SLIDER
_attr_icon = "mdi:speedometer"
def __init__(
self,
coordinator: LedGrabCoordinator,
clock_id: str,
entry_id: str,
) -> None:
super().__init__(coordinator)
self._clock_id = clock_id
self._entry_id = entry_id
self._attr_unique_id = f"{clock_id}_speed"
self._attr_translation_key = "sync_clock_speed"
@property
def device_info(self) -> dict[str, Any]:
return {"identifiers": {(DOMAIN, self._clock_id)}}
@property
def native_value(self) -> float | None:
clock = self._get_clock()
if not clock:
return None
speed = clock.get("speed")
return float(speed) if speed is not None else None
@property
def available(self) -> bool:
return self._get_clock() is not None
async def async_set_native_value(self, value: float) -> None:
await self.coordinator.update_sync_clock(self._clock_id, speed=round(value, 2))
def _get_clock(self) -> dict[str, Any] | None:
if not self.coordinator.data:
return None
for clock in self.coordinator.data.get("sync_clocks", []):
if clock.get("id") == self._clock_id:
return clock
return None
+379
View File
@@ -2,6 +2,7 @@
from __future__ import annotations
from datetime import datetime
import logging
from typing import Any
@@ -11,7 +12,14 @@ from homeassistant.components.sensor import (
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
PERCENTAGE,
UnitOfInformation,
UnitOfTemperature,
UnitOfTime,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
@@ -47,9 +55,55 @@ async def async_setup_entry(
if info.get("target_type") == TARGET_TYPE_HA_LIGHT:
entities.append(HALightMappedLightsSensor(coordinator, target_id, entry.entry_id))
if coordinator.data:
for clock in coordinator.data.get("sync_clocks", []):
entities.append(SyncClockElapsedSensor(coordinator, clock["id"], entry.entry_id))
entities.extend(_build_server_sensors(coordinator, entry.entry_id))
async_add_entities(entities)
def _build_server_sensors(
coordinator: LedGrabCoordinator,
entry_id: str,
) -> list[SensorEntity]:
"""Build the server-level telemetry sensors.
GPU/CPU-temp/battery sensors are only registered when the first refresh
actually carries those readings — we don't want phantom unavailable
entities on hardware that doesn't support a metric. A reload picks up
metrics that come online later (e.g. user installs LibreHardwareMonitor).
"""
system = (coordinator.data or {}).get("system") or {}
perf = system.get("performance") or {}
health = system.get("health")
sensors: list[SensorEntity] = [
ServerCpuPercentSensor(coordinator, entry_id),
ServerRamPercentSensor(coordinator, entry_id),
ServerAppCpuPercentSensor(coordinator, entry_id),
ServerAppRamSensor(coordinator, entry_id),
ServerVersionSensor(coordinator, entry_id),
]
if health is not None:
# Only meaningful when /health actually responded on first refresh.
sensors.append(ServerLastRestartSensor(coordinator, entry_id))
if perf.get("gpu") is not None:
sensors.append(ServerGpuUtilizationSensor(coordinator, entry_id))
sensors.append(ServerGpuTemperatureSensor(coordinator, entry_id))
if perf.get("cpu_temp_c") is not None:
sensors.append(ServerCpuTemperatureSensor(coordinator, entry_id))
if perf.get("battery_percent") is not None:
sensors.append(ServerBatterySensor(coordinator, entry_id))
return sensors
class LedGrabFPSSensor(CoordinatorEntity, SensorEntity):
"""FPS sensor for a LED Screen Controller target."""
@@ -160,6 +214,60 @@ class LedGrabStatusSensor(CoordinatorEntity, SensorEntity):
return self.coordinator.data.get("targets", {}).get(self._target_id)
class SyncClockElapsedSensor(CoordinatorEntity, SensorEntity):
"""Elapsed seconds since the sync clock last reset.
Disabled by default — the value advances on every coordinator poll
(~3s) while the clock is running, which would write a recorder row
each refresh. Power users can enable it explicitly.
"""
_attr_has_entity_name = True
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_device_class = SensorDeviceClass.DURATION
_attr_native_unit_of_measurement = UnitOfTime.SECONDS
_attr_icon = "mdi:timer-outline"
_attr_entity_category = EntityCategory.DIAGNOSTIC
_attr_entity_registry_enabled_default = False
_attr_suggested_display_precision = 1
def __init__(
self,
coordinator: LedGrabCoordinator,
clock_id: str,
entry_id: str,
) -> None:
super().__init__(coordinator)
self._clock_id = clock_id
self._entry_id = entry_id
self._attr_unique_id = f"{clock_id}_elapsed"
self._attr_translation_key = "sync_clock_elapsed"
@property
def device_info(self) -> dict[str, Any]:
return {"identifiers": {(DOMAIN, self._clock_id)}}
@property
def native_value(self) -> float | None:
clock = self._get_clock()
if not clock:
return None
elapsed = clock.get("elapsed_time")
return float(elapsed) if elapsed is not None else None
@property
def available(self) -> bool:
return self._get_clock() is not None
def _get_clock(self) -> dict[str, Any] | None:
if not self.coordinator.data:
return None
for clock in self.coordinator.data.get("sync_clocks", []):
if clock.get("id") == self._clock_id:
return clock
return None
class HALightMappedLightsSensor(CoordinatorEntity, SensorEntity):
"""Sensor showing the number of mapped HA lights for an HA Light target."""
@@ -223,3 +331,274 @@ class HALightMappedLightsSensor(CoordinatorEntity, SensorEntity):
if not self.coordinator.data:
return None
return self.coordinator.data.get("targets", {}).get(self._target_id)
# ─────────────────────────── Server-level sensors ───────────────────────────
class _ServerSensorBase(CoordinatorEntity, SensorEntity):
"""Shared plumbing for sensors attached to the synthetic Server device."""
_attr_has_entity_name = True
_attr_entity_category = EntityCategory.DIAGNOSTIC
def __init__(
self,
coordinator: LedGrabCoordinator,
entry_id: str,
unique_suffix: str,
translation_key: str,
) -> None:
super().__init__(coordinator)
self._entry_id = entry_id
self._attr_unique_id = f"{entry_id}_server_{unique_suffix}"
self._attr_translation_key = translation_key
@property
def device_info(self) -> dict[str, Any]:
return {"identifiers": {(DOMAIN, f"{self._entry_id}_server")}}
def _perf(self) -> dict[str, Any] | None:
if not self.coordinator.data:
return None
system = self.coordinator.data.get("system") or {}
return system.get("performance")
@property
def available(self) -> bool:
# Performance metrics are missing whenever /api/v1/system/performance
# fails — surface that as unavailable instead of stale numbers.
return self._perf() is not None
class ServerCpuPercentSensor(_ServerSensorBase):
"""System-wide CPU usage on the led-grab server."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_native_unit_of_measurement = PERCENTAGE
_attr_icon = "mdi:cpu-64-bit"
_attr_suggested_display_precision = 1
def __init__(self, coordinator: LedGrabCoordinator, entry_id: str) -> None:
super().__init__(coordinator, entry_id, "cpu_percent", "server_cpu_percent")
@property
def native_value(self) -> float | None:
perf = self._perf()
return None if perf is None else perf.get("cpu_percent")
@property
def extra_state_attributes(self) -> dict[str, Any]:
perf = self._perf() or {}
return {"cpu_name": perf.get("cpu_name")}
class ServerRamPercentSensor(_ServerSensorBase):
"""System-wide RAM usage on the led-grab server."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_native_unit_of_measurement = PERCENTAGE
_attr_icon = "mdi:memory"
_attr_suggested_display_precision = 1
def __init__(self, coordinator: LedGrabCoordinator, entry_id: str) -> None:
super().__init__(coordinator, entry_id, "ram_percent", "server_ram_percent")
@property
def native_value(self) -> float | None:
perf = self._perf()
return None if perf is None else perf.get("ram_percent")
@property
def extra_state_attributes(self) -> dict[str, Any]:
perf = self._perf() or {}
return {
"used_mb": perf.get("ram_used_mb"),
"total_mb": perf.get("ram_total_mb"),
}
class ServerAppCpuPercentSensor(_ServerSensorBase):
"""CPU usage of the led-grab process itself."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_native_unit_of_measurement = PERCENTAGE
_attr_icon = "mdi:application-cog"
_attr_suggested_display_precision = 1
_attr_entity_registry_enabled_default = False
def __init__(self, coordinator: LedGrabCoordinator, entry_id: str) -> None:
super().__init__(coordinator, entry_id, "app_cpu_percent", "server_app_cpu_percent")
@property
def native_value(self) -> float | None:
perf = self._perf()
return None if perf is None else perf.get("app_cpu_percent")
class ServerAppRamSensor(_ServerSensorBase):
"""Resident memory of the led-grab process."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_device_class = SensorDeviceClass.DATA_SIZE
_attr_native_unit_of_measurement = UnitOfInformation.MEGABYTES
_attr_icon = "mdi:memory"
_attr_suggested_display_precision = 1
_attr_entity_registry_enabled_default = False
def __init__(self, coordinator: LedGrabCoordinator, entry_id: str) -> None:
super().__init__(coordinator, entry_id, "app_ram_mb", "server_app_ram")
@property
def native_value(self) -> float | None:
perf = self._perf()
return None if perf is None else perf.get("app_ram_mb")
class ServerGpuUtilizationSensor(_ServerSensorBase):
"""GPU utilization (NVML)."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_native_unit_of_measurement = PERCENTAGE
_attr_icon = "mdi:expansion-card"
_attr_suggested_display_precision = 1
def __init__(self, coordinator: LedGrabCoordinator, entry_id: str) -> None:
super().__init__(coordinator, entry_id, "gpu_utilization", "server_gpu_utilization")
@property
def native_value(self) -> float | None:
perf = self._perf()
if perf is None:
return None
gpu = perf.get("gpu") or {}
return gpu.get("utilization")
@property
def extra_state_attributes(self) -> dict[str, Any]:
perf = self._perf() or {}
gpu = perf.get("gpu") or {}
return {
"name": gpu.get("name"),
"memory_used_mb": gpu.get("memory_used_mb"),
"memory_total_mb": gpu.get("memory_total_mb"),
"app_memory_mb": gpu.get("app_memory_mb"),
}
@property
def available(self) -> bool:
perf = self._perf()
return perf is not None and perf.get("gpu") is not None
class ServerGpuTemperatureSensor(_ServerSensorBase):
"""GPU temperature (NVML)."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_device_class = SensorDeviceClass.TEMPERATURE
_attr_native_unit_of_measurement = UnitOfTemperature.CELSIUS
_attr_suggested_display_precision = 0
def __init__(self, coordinator: LedGrabCoordinator, entry_id: str) -> None:
super().__init__(coordinator, entry_id, "gpu_temp", "server_gpu_temp")
@property
def native_value(self) -> float | None:
perf = self._perf()
if perf is None:
return None
gpu = perf.get("gpu") or {}
return gpu.get("temperature_c")
@property
def available(self) -> bool:
perf = self._perf()
return perf is not None and (perf.get("gpu") or {}).get("temperature_c") is not None
class ServerCpuTemperatureSensor(_ServerSensorBase):
"""Hottest CPU/SoC thermal-zone reading.
Only registered when the first refresh actually reports a temperature —
Windows needs LibreHardwareMonitor / OpenHardwareMonitor publishing WMI
sensors, otherwise the field stays null.
"""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_device_class = SensorDeviceClass.TEMPERATURE
_attr_native_unit_of_measurement = UnitOfTemperature.CELSIUS
_attr_suggested_display_precision = 0
def __init__(self, coordinator: LedGrabCoordinator, entry_id: str) -> None:
super().__init__(coordinator, entry_id, "cpu_temp", "server_cpu_temp")
@property
def native_value(self) -> float | None:
perf = self._perf()
return None if perf is None else perf.get("cpu_temp_c")
class ServerBatterySensor(_ServerSensorBase):
"""Battery charge level (laptops / portable hosts)."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_device_class = SensorDeviceClass.BATTERY
_attr_native_unit_of_measurement = PERCENTAGE
_attr_suggested_display_precision = 0
def __init__(self, coordinator: LedGrabCoordinator, entry_id: str) -> None:
super().__init__(coordinator, entry_id, "battery", "server_battery")
@property
def native_value(self) -> float | None:
perf = self._perf()
return None if perf is None else perf.get("battery_percent")
@property
def extra_state_attributes(self) -> dict[str, Any]:
perf = self._perf() or {}
return {"temperature_c": perf.get("battery_temp_c")}
class ServerLastRestartSensor(_ServerSensorBase):
"""Timestamp of the last server start.
Derived from ``/health.uptime_seconds``; the coordinator caches the
computed boot time and only refreshes it when the candidate moves more
than a few seconds, so the recorder doesn't get poll-jitter writes.
"""
_attr_device_class = SensorDeviceClass.TIMESTAMP
_attr_icon = "mdi:restart"
def __init__(self, coordinator: LedGrabCoordinator, entry_id: str) -> None:
super().__init__(coordinator, entry_id, "last_restart", "server_last_restart")
@property
def native_value(self) -> datetime | None:
if not self.coordinator.data:
return None
system = self.coordinator.data.get("system") or {}
return system.get("boot_time")
@property
def available(self) -> bool:
return self.native_value is not None
class ServerVersionSensor(_ServerSensorBase):
"""Reported led-grab server version."""
_attr_icon = "mdi:tag-outline"
def __init__(self, coordinator: LedGrabCoordinator, entry_id: str) -> None:
super().__init__(coordinator, entry_id, "version", "server_version")
@property
def native_value(self) -> str | None:
version = self.coordinator.server_version
return version if version and version != "unknown" else None
@property
def available(self) -> bool:
return self.native_value is not None
+36 -1
View File
@@ -44,7 +44,7 @@
"name": "Processing"
},
"sync_clock_running": {
"name": "Running"
"name": "Active"
}
},
"sensor": {
@@ -65,6 +65,36 @@
},
"sync_clock_elapsed": {
"name": "Elapsed Time"
},
"server_cpu_percent": {
"name": "CPU Usage"
},
"server_ram_percent": {
"name": "RAM Usage"
},
"server_app_cpu_percent": {
"name": "App CPU Usage"
},
"server_app_ram": {
"name": "App Memory"
},
"server_gpu_utilization": {
"name": "GPU Usage"
},
"server_gpu_temp": {
"name": "GPU Temperature"
},
"server_cpu_temp": {
"name": "CPU Temperature"
},
"server_battery": {
"name": "Battery"
},
"server_last_restart": {
"name": "Last Restart"
},
"server_version": {
"name": "Server Version"
}
},
"number": {
@@ -105,6 +135,11 @@
"nearest": "Nearest"
}
}
},
"update": {
"server_update": {
"name": "Server Update"
}
}
},
"services": {
+58 -1
View File
@@ -25,13 +25,19 @@ async def async_setup_entry(
data = hass.data[DOMAIN][entry.entry_id]
coordinator: LedGrabCoordinator = data[DATA_COORDINATOR]
entities = []
entities: list[SwitchEntity] = []
if coordinator.data and "targets" in coordinator.data:
for target_id, target_data in coordinator.data["targets"].items():
entities.append(
LedGrabSwitch(coordinator, target_id, entry.entry_id)
)
if coordinator.data:
for clock in coordinator.data.get("sync_clocks", []):
entities.append(
LedGrabSyncClockSwitch(coordinator, clock["id"], entry.entry_id)
)
async_add_entities(entities)
@@ -107,3 +113,54 @@ class LedGrabSwitch(CoordinatorEntity, SwitchEntity):
if not self.coordinator.data:
return None
return self.coordinator.data.get("targets", {}).get(self._target_id)
class LedGrabSyncClockSwitch(CoordinatorEntity, SwitchEntity):
"""Running/paused control for a sync clock.
On = clock running (linked animations advance), off = paused (frozen).
"""
_attr_has_entity_name = True
_attr_icon = "mdi:clock-outline"
def __init__(
self,
coordinator: LedGrabCoordinator,
clock_id: str,
entry_id: str,
) -> None:
super().__init__(coordinator)
self._clock_id = clock_id
self._entry_id = entry_id
self._attr_unique_id = f"{clock_id}_running"
self._attr_translation_key = "sync_clock_running"
@property
def device_info(self) -> dict[str, Any]:
return {"identifiers": {(DOMAIN, self._clock_id)}}
@property
def is_on(self) -> bool:
clock = self._get_clock()
if not clock:
return False
return bool(clock.get("is_running", False))
@property
def available(self) -> bool:
return self._get_clock() is not None
async def async_turn_on(self, **kwargs: Any) -> None:
await self.coordinator.resume_sync_clock(self._clock_id)
async def async_turn_off(self, **kwargs: Any) -> None:
await self.coordinator.pause_sync_clock(self._clock_id)
def _get_clock(self) -> dict[str, Any] | None:
if not self.coordinator.data:
return None
for clock in self.coordinator.data.get("sync_clocks", []):
if clock.get("id") == self._clock_id:
return clock
return None
@@ -29,6 +29,9 @@
"button": {
"activate_scene": {
"name": "{scene_name}"
},
"sync_clock_reset": {
"name": "Reset"
}
},
"light": {
@@ -39,6 +42,9 @@
"switch": {
"processing": {
"name": "Processing"
},
"sync_clock_running": {
"name": "Active"
}
},
"sensor": {
@@ -56,6 +62,39 @@
},
"mapped_lights": {
"name": "Mapped Lights"
},
"sync_clock_elapsed": {
"name": "Elapsed Time"
},
"server_cpu_percent": {
"name": "CPU Usage"
},
"server_ram_percent": {
"name": "RAM Usage"
},
"server_app_cpu_percent": {
"name": "App CPU Usage"
},
"server_app_ram": {
"name": "App Memory"
},
"server_gpu_utilization": {
"name": "GPU Usage"
},
"server_gpu_temp": {
"name": "GPU Temperature"
},
"server_cpu_temp": {
"name": "CPU Temperature"
},
"server_battery": {
"name": "Battery"
},
"server_last_restart": {
"name": "Last Restart"
},
"server_version": {
"name": "Server Version"
}
},
"number": {
@@ -76,6 +115,9 @@
},
"api_input_timeout": {
"name": "Fallback Timeout"
},
"sync_clock_speed": {
"name": "Speed"
}
},
"select": {
@@ -93,6 +135,11 @@
"nearest": "Nearest"
}
}
},
"update": {
"server_update": {
"name": "Server Update"
}
}
}
}
@@ -29,6 +29,9 @@
"button": {
"activate_scene": {
"name": "{scene_name}"
},
"sync_clock_reset": {
"name": "Сброс"
}
},
"light": {
@@ -39,6 +42,9 @@
"switch": {
"processing": {
"name": "Обработка"
},
"sync_clock_running": {
"name": "Активно"
}
},
"sensor": {
@@ -56,6 +62,39 @@
},
"mapped_lights": {
"name": "Привязанные светильники"
},
"sync_clock_elapsed": {
"name": "Прошло времени"
},
"server_cpu_percent": {
"name": "Загрузка CPU"
},
"server_ram_percent": {
"name": "Загрузка ОЗУ"
},
"server_app_cpu_percent": {
"name": "CPU приложения"
},
"server_app_ram": {
"name": "Память приложения"
},
"server_gpu_utilization": {
"name": "Загрузка GPU"
},
"server_gpu_temp": {
"name": "Температура GPU"
},
"server_cpu_temp": {
"name": "Температура CPU"
},
"server_battery": {
"name": "Батарея"
},
"server_last_restart": {
"name": "Последний запуск"
},
"server_version": {
"name": "Версия сервера"
}
},
"number": {
@@ -76,6 +115,9 @@
},
"api_input_timeout": {
"name": "Таймаут подмены"
},
"sync_clock_speed": {
"name": "Скорость"
}
},
"select": {
@@ -93,6 +135,11 @@
"nearest": "Ближайший"
}
}
},
"update": {
"server_update": {
"name": "Обновление сервера"
}
}
}
}
+168
View File
@@ -0,0 +1,168 @@
"""Update platform for LED Screen Controller.
Surfaces the led-grab server's auto-update service (``/api/v1/system/update``)
as a Home Assistant Update entity attached to the synthetic Server device.
The entity is only registered when the server actually exposes the endpoint —
older servers return 404 and the coordinator stores ``None`` in
``data["system"]["update"]``.
"""
from __future__ import annotations
import logging
from typing import Any
from homeassistant.components.update import (
UpdateDeviceClass,
UpdateEntity,
UpdateEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DATA_COORDINATOR, DOMAIN
from .coordinator import LedGrabCoordinator
_LOGGER = logging.getLogger(__name__)
# release_summary is capped at 255 chars by HA's frontend; the full markdown
# body is exposed via async_release_notes() and rendered in the notes dialog.
_RELEASE_SUMMARY_MAX = 255
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the server update entity, when supported by the server."""
coordinator: LedGrabCoordinator = hass.data[DOMAIN][entry.entry_id][
DATA_COORDINATOR
]
update_data = (coordinator.data or {}).get("system", {}).get("update")
if update_data is None:
# Server doesn't expose /api/v1/system/update/status — skip silently.
return
async_add_entities([LedGrabServerUpdate(coordinator, entry.entry_id)])
class LedGrabServerUpdate(CoordinatorEntity, UpdateEntity):
"""Update entity for the led-grab server itself."""
_attr_has_entity_name = True
_attr_translation_key = "server_update"
_attr_device_class = UpdateDeviceClass.FIRMWARE
def __init__(self, coordinator: LedGrabCoordinator, entry_id: str) -> None:
super().__init__(coordinator)
self._entry_id = entry_id
self._attr_unique_id = f"{entry_id}_server_update"
@property
def device_info(self) -> dict[str, Any]:
return {"identifiers": {(DOMAIN, f"{self._entry_id}_server")}}
def _data(self) -> dict[str, Any] | None:
if not self.coordinator.data:
return None
return (self.coordinator.data.get("system") or {}).get("update")
def _release(self) -> dict[str, Any]:
return (self._data() or {}).get("release") or {}
@property
def supported_features(self) -> UpdateEntityFeature:
# The server's apply endpoint refuses to run for install types that
# can't auto-update (e.g. docker-managed containers). Hide the install
# button in those cases instead of letting users click it and 400.
features = UpdateEntityFeature.PROGRESS | UpdateEntityFeature.RELEASE_NOTES
data = self._data() or {}
if data.get("can_auto_update"):
features |= UpdateEntityFeature.INSTALL
return features
@property
def installed_version(self) -> str | None:
data = self._data() or {}
return data.get("current_version")
@property
def latest_version(self) -> str | None:
data = self._data()
if not data:
return None
if data.get("has_update"):
return self._release().get("version") or data.get("current_version")
# No update — match installed so HA shows "Up to date".
return data.get("current_version")
@property
def release_url(self) -> str | None:
data = self._data() or {}
base = data.get("releases_url") or None
tag = self._release().get("tag")
if base and tag:
# Works for both Gitea (`<base>/<repo>/releases/tag/<tag>`)
# and GitHub — `releases_url` already ends in `/releases`.
return f"{base.rstrip('/')}/tag/{tag}"
return base or None
@property
def release_summary(self) -> str | None:
body = self._release().get("body")
if not body:
return None
return body[:_RELEASE_SUMMARY_MAX]
async def async_release_notes(self) -> str | None:
"""Return the full release body for the notes dialog."""
return self._release().get("body") or None
@property
def in_progress(self) -> bool | int:
data = self._data() or {}
if data.get("applying"):
# Apply phase has no granular progress reporting from the server.
return True
if data.get("downloading"):
# Server reports 0..1; HA wants 0..100.
progress = float(data.get("download_progress") or 0.0)
return max(0, min(100, int(round(progress * 100))))
return False
@property
def available(self) -> bool:
return self._data() is not None
@property
def extra_state_attributes(self) -> dict[str, Any]:
data = self._data() or {}
return {
"install_type": data.get("install_type"),
"can_auto_update": data.get("can_auto_update"),
"checking": data.get("checking"),
"last_check": data.get("last_check"),
"last_error": data.get("last_error"),
"dismissed_version": data.get("dismissed_version"),
"prerelease": self._release().get("prerelease"),
"published_at": self._release().get("published_at"),
}
async def async_install(
self,
version: str | None,
backup: bool,
**kwargs: Any,
) -> None:
"""Apply the available update.
The server picks up whichever release it has cached; the ``version``
and ``backup`` arguments from HA aren't separately addressable on the
server side, so we ignore them.
"""
await self.coordinator.apply_update()