diff --git a/custom_components/ledgrab/__init__.py b/custom_components/ledgrab/__init__.py index ac4cc97..08a65e9 100644 --- a/custom_components/ledgrab/__init__.py +++ b/custom_components/ledgrab/__init__.py @@ -113,6 +113,22 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: ) current_identifiers.add(scenes_identifier) + # One device per scene playlist — groups its switch + stats sensors. + scene_playlists = ( + coordinator.data.get("scene_playlists", []) if coordinator.data else [] + ) + for playlist in scene_playlists: + playlist_identifier = (DOMAIN, playlist["id"]) + device_registry.async_get_or_create( + config_entry_id=entry.entry_id, + identifiers={playlist_identifier}, + name=playlist.get("name", playlist["id"]), + manufacturer=server_name, + model="Scene Playlist", + configuration_url=server_url, + ) + current_identifiers.add(playlist_identifier) + # One device per sync clock — groups its switch/number/button/sensor. sync_clocks = coordinator.data.get("sync_clocks", []) if coordinator.data else [] for clock in sync_clocks: @@ -140,37 +156,45 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: DATA_EVENT_LISTENER: event_listener, } - # Track target, scene, and sync-clock IDs to detect changes + # Track target, scene, playlist, and sync-clock IDs to detect changes known_target_ids = set(coordinator.data.get("targets", {}).keys() if coordinator.data else []) known_scene_ids = set( p["id"] for p in (coordinator.data.get("scene_presets", []) if coordinator.data else []) ) + known_playlist_ids = set( + p["id"] for p in (coordinator.data.get("scene_playlists", []) if coordinator.data else []) + ) known_clock_ids = set( c["id"] for c in (coordinator.data.get("sync_clocks", []) if coordinator.data else []) ) def _on_coordinator_update() -> None: - """Detect target/scene/clock list changes and trigger reload.""" - nonlocal known_target_ids, known_scene_ids, known_clock_ids + """Detect target/scene/playlist/clock list changes and trigger reload.""" + nonlocal known_target_ids, known_scene_ids, known_playlist_ids, known_clock_ids if not coordinator.data: return targets = coordinator.data.get("targets", {}) - # Reload if target, scene, or sync-clock list changed + # Reload if target, scene, playlist, or sync-clock list changed current_ids = set(targets.keys()) current_scene_ids = set(p["id"] for p in coordinator.data.get("scene_presets", [])) + current_playlist_ids = set(p["id"] for p in coordinator.data.get("scene_playlists", [])) current_clock_ids = set(c["id"] for c in coordinator.data.get("sync_clocks", [])) if ( current_ids != known_target_ids or current_scene_ids != known_scene_ids + or current_playlist_ids != known_playlist_ids or current_clock_ids != known_clock_ids ): known_target_ids = current_ids known_scene_ids = current_scene_ids + known_playlist_ids = current_playlist_ids known_clock_ids = current_clock_ids - _LOGGER.info("Target, scene, or sync-clock list changed, reloading integration") + _LOGGER.info( + "Target, scene, playlist, or sync-clock list changed, reloading integration" + ) hass.async_create_task(hass.config_entries.async_reload(entry.entry_id)) coordinator.async_add_listener(_on_coordinator_update) diff --git a/custom_components/ledgrab/coordinator.py b/custom_components/ledgrab/coordinator.py index 4fb2948..a12c1c8 100644 --- a/custom_components/ledgrab/coordinator.py +++ b/custom_components/ledgrab/coordinator.py @@ -10,6 +10,7 @@ from typing import Any import aiohttp from homeassistant.core import HomeAssistant +from homeassistant.exceptions import ConfigEntryAuthFailed from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from homeassistant.util import dt as dt_util @@ -47,6 +48,10 @@ class LedGrabCoordinator(DataUpdateCoordinator): self.boot_time: datetime | None = None self._auth_headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} self._timeout = aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT) + # The aggregated snapshot does more server-side work than any single + # endpoint (system metrics + cold-cache device brightness), so give it + # a more generous ceiling than the per-request default. + self._snapshot_timeout = aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT * 3) super().__init__( hass, @@ -56,150 +61,87 @@ class LedGrabCoordinator(DataUpdateCoordinator): ) async def _async_update_data(self) -> dict[str, Any]: - """Fetch data from API.""" + """Fetch the full server snapshot in one request and reshape it. + + The server's ``/api/v1/snapshot`` endpoint aggregates everything a poll + needs (targets + state + metrics, devices + brightness, the source / + preset / clock lists, and the system block), so the coordinator no + longer fans out per-target ``/state`` + ``/metrics`` or per-device + ``/brightness`` requests. The reshaped result keeps the same structure + the entity platforms consume. + """ try: - async with asyncio.timeout(DEFAULT_TIMEOUT * 3): - targets_list = await self._fetch_targets() - - # Fetch state and metrics for all targets in parallel - targets_data: dict[str, dict[str, Any]] = {} - - async def fetch_target_data(target: dict) -> tuple[str, dict]: - target_id = target["id"] - try: - state, metrics = await asyncio.gather( - self._fetch_target_state(target_id), - self._fetch_target_metrics(target_id), - ) - except Exception as err: - _LOGGER.warning( - "Failed to fetch data for target %s: %s", - target_id, - err, - ) - state = None - metrics = None - - return target_id, { - "info": target, - "state": state, - "metrics": metrics, - } - - results = await asyncio.gather( - *(fetch_target_data(t) for t in targets_list), - return_exceptions=True, - ) - - for r in results: - if isinstance(r, Exception): - _LOGGER.warning("Target fetch failed: %s", r) - continue - target_id, data = r - targets_data[target_id] = data - - # Fetch devices, CSS sources, value sources, scene presets, - # sync clocks, system performance, health, and update status - # in parallel - ( - devices_data, - css_sources, - value_sources, - scene_presets, - sync_clocks, - performance, - health, - update_status, - ) = await asyncio.gather( - self._fetch_devices(), - self._fetch_css_sources(), - self._fetch_value_sources(), - self._fetch_scene_presets(), - self._fetch_sync_clocks(), - self._fetch_system_performance(), - self._fetch_health(), - self._fetch_update_status(), - ) - - if health: - version = health.get("version") - if version: - self.server_version = version - self._update_boot_time(health.get("uptime_seconds")) - - return { - "targets": targets_data, - "devices": devices_data, - "css_sources": css_sources, - "value_sources": value_sources, - "scene_presets": scene_presets, - "sync_clocks": sync_clocks, - "server_version": self.server_version, - "system": { - "performance": performance, - "health": health, - "boot_time": self.boot_time, - "update": update_status, - }, - } - + snapshot = await self._fetch_snapshot() except asyncio.TimeoutError as err: raise UpdateFailed(f"Timeout fetching data: {err}") from err + except aiohttp.ClientResponseError as err: + # A rotated/invalid API key fails every poll identically now that + # it's a single request — surface it as an auth failure so Home + # Assistant starts the reauth flow instead of retrying forever. + if err.status in (401, 403): + raise ConfigEntryAuthFailed("Invalid API key") from err + raise UpdateFailed(f"Error communicating with API: {err}") from err except aiohttp.ClientError as err: raise UpdateFailed(f"Error communicating with API: {err}") from err - async def _fetch_health(self) -> dict[str, Any] | None: - """Fetch /health (unauthenticated, cheap; gives version + uptime).""" - try: - async with self.session.get( - f"{self.server_url}/health", - timeout=self._timeout, - ) as resp: - resp.raise_for_status() - return await resp.json() - except Exception as err: - _LOGGER.debug("Failed to fetch health: %s", err) - return None + system = snapshot.get("system") or {} + health = system.get("health") or {} + version = health.get("version") + if version: + self.server_version = version + self._update_boot_time(health.get("uptime_seconds")) - async def _fetch_system_performance(self) -> dict[str, Any] | None: - """Fetch CPU/RAM/GPU/temperature metrics from the server. + states = snapshot.get("target_states") or {} + metrics = snapshot.get("target_metrics") or {} + targets_data: dict[str, dict[str, Any]] = {} + for target in snapshot.get("targets") or []: + target_id = target.get("id") + if not target_id: + continue + targets_data[target_id] = { + "info": target, + "state": states.get(target_id), + "metrics": metrics.get(target_id), + } - Returns ``None`` on older servers without the endpoint or on transient - failures — callers must treat the absence as "no telemetry this poll". - """ - try: - async with self.session.get( - f"{self.server_url}/api/v1/system/performance", - headers=self._auth_headers, - timeout=self._timeout, - ) as resp: - if resp.status == 404: - return None - resp.raise_for_status() - return await resp.json() - except Exception as err: - _LOGGER.debug("Failed to fetch system performance: %s", err) - return None + brightness = snapshot.get("device_brightness") or {} + devices_data: dict[str, dict[str, Any]] = {} + for device in snapshot.get("devices") or []: + device_id = device.get("id") + if not device_id: + continue + devices_data[device_id] = { + "info": device, + "brightness": brightness.get(device_id), + } - async def _fetch_update_status(self) -> dict[str, Any] | None: - """Fetch auto-update status (current version, available release, …). + return { + "targets": targets_data, + "devices": devices_data, + "css_sources": snapshot.get("css_sources") or [], + "value_sources": snapshot.get("value_sources") or [], + "scene_presets": snapshot.get("scene_presets") or [], + "scene_playlists": snapshot.get("scene_playlists") or [], + "playlist_state": snapshot.get("playlist_state") or {}, + "sync_clocks": snapshot.get("sync_clocks") or [], + "server_version": self.server_version, + "system": { + "performance": system.get("performance"), + "health": health or None, + "boot_time": self.boot_time, + "update": system.get("update"), + }, + } - Older servers without the update service return 404 — treat that as - "no update entity should be registered" by returning ``None``. - """ - try: - async with self.session.get( - f"{self.server_url}/api/v1/system/update/status", - headers=self._auth_headers, - timeout=self._timeout, - ) as resp: - if resp.status == 404: - return None - resp.raise_for_status() - return await resp.json() - except Exception as err: - _LOGGER.debug("Failed to fetch update status: %s", err) - return None + async def _fetch_snapshot(self) -> dict[str, Any]: + """GET the aggregated server snapshot (single round trip).""" + async with self.session.get( + f"{self.server_url}/api/v1/snapshot", + headers=self._auth_headers, + timeout=self._snapshot_timeout, + ) as resp: + resp.raise_for_status() + return await resp.json() async def apply_update(self) -> None: """Trigger the server's apply-update flow. @@ -239,89 +181,6 @@ class LedGrabCoordinator(DataUpdateCoordinator): ): self.boot_time = candidate - async def _fetch_targets(self) -> list[dict[str, Any]]: - """Fetch all output targets.""" - async with self.session.get( - f"{self.server_url}/api/v1/output-targets", - headers=self._auth_headers, - timeout=self._timeout, - ) as resp: - resp.raise_for_status() - data = await resp.json() - return data.get("targets", []) - - async def _fetch_target_state(self, target_id: str) -> dict[str, Any]: - """Fetch target processing state.""" - async with self.session.get( - f"{self.server_url}/api/v1/output-targets/{target_id}/state", - headers=self._auth_headers, - timeout=self._timeout, - ) as resp: - resp.raise_for_status() - return await resp.json() - - async def _fetch_target_metrics(self, target_id: str) -> dict[str, Any]: - """Fetch target metrics.""" - async with self.session.get( - f"{self.server_url}/api/v1/output-targets/{target_id}/metrics", - headers=self._auth_headers, - timeout=self._timeout, - ) as resp: - resp.raise_for_status() - return await resp.json() - - async def _fetch_devices(self) -> dict[str, dict[str, Any]]: - """Fetch all devices with capabilities and brightness.""" - try: - async with self.session.get( - f"{self.server_url}/api/v1/devices", - headers=self._auth_headers, - timeout=self._timeout, - ) as resp: - resp.raise_for_status() - data = await resp.json() - devices = data.get("devices", []) - except Exception as err: - _LOGGER.warning("Failed to fetch devices: %s", err) - return {} - - # Fetch brightness for all capable devices in parallel - async def fetch_device_entry(device: dict) -> tuple[str, dict[str, Any]]: - device_id = device["id"] - entry: dict[str, Any] = {"info": device, "brightness": None} - if "brightness_control" in (device.get("capabilities") or []): - try: - async with self.session.get( - f"{self.server_url}/api/v1/devices/{device_id}/brightness", - headers=self._auth_headers, - timeout=self._timeout, - ) as resp: - if resp.status == 200: - bri_data = await resp.json() - entry["brightness"] = bri_data.get("brightness") - except Exception as err: - _LOGGER.warning( - "Failed to fetch brightness for device %s: %s", - device_id, - err, - ) - return device_id, entry - - results = await asyncio.gather( - *(fetch_device_entry(d) for d in devices), - return_exceptions=True, - ) - - devices_data: dict[str, dict[str, Any]] = {} - for r in results: - if isinstance(r, Exception): - _LOGGER.warning("Device fetch failed: %s", r) - continue - device_id, entry = r - devices_data[device_id] = entry - - return devices_data - async def set_brightness(self, device_id: str, brightness: int) -> None: """Set brightness for a device.""" async with self.session.put( @@ -360,72 +219,6 @@ class LedGrabCoordinator(DataUpdateCoordinator): resp.raise_for_status() await self.async_request_refresh() - async def _fetch_css_sources(self) -> list[dict[str, Any]]: - """Fetch all color strip sources.""" - try: - async with self.session.get( - f"{self.server_url}/api/v1/color-strip-sources", - headers=self._auth_headers, - timeout=self._timeout, - ) as resp: - resp.raise_for_status() - data = await resp.json() - return data.get("sources", []) - except Exception as err: - _LOGGER.warning("Failed to fetch CSS sources: %s", err) - return [] - - async def _fetch_value_sources(self) -> list[dict[str, Any]]: - """Fetch all value sources.""" - try: - async with self.session.get( - f"{self.server_url}/api/v1/value-sources", - headers=self._auth_headers, - timeout=self._timeout, - ) as resp: - resp.raise_for_status() - data = await resp.json() - return data.get("sources", []) - except Exception as err: - _LOGGER.warning("Failed to fetch value sources: %s", err) - return [] - - async def _fetch_scene_presets(self) -> list[dict[str, Any]]: - """Fetch all scene presets.""" - try: - async with self.session.get( - f"{self.server_url}/api/v1/scene-presets", - headers=self._auth_headers, - timeout=self._timeout, - ) as resp: - resp.raise_for_status() - data = await resp.json() - return data.get("presets", []) - except Exception as err: - _LOGGER.warning("Failed to fetch scene presets: %s", err) - return [] - - async def _fetch_sync_clocks(self) -> list[dict[str, Any]]: - """Fetch all synchronization clocks (with runtime state). - - Older servers without the sync-clock endpoint return 404 — treat - that as "no clocks" rather than failing the whole refresh. - """ - try: - async with self.session.get( - f"{self.server_url}/api/v1/sync-clocks", - headers=self._auth_headers, - timeout=self._timeout, - ) as resp: - if resp.status == 404: - return [] - resp.raise_for_status() - data = await resp.json() - return data.get("clocks", []) - except Exception as err: - _LOGGER.warning("Failed to fetch sync clocks: %s", err) - return [] - async def push_colors(self, source_id: str, colors: list[list[int]]) -> None: """Push flat color array to an api_input CSS source.""" async with self.session.post( @@ -480,6 +273,49 @@ class LedGrabCoordinator(DataUpdateCoordinator): resp.raise_for_status() await self.async_request_refresh() + async def start_playlist(self, playlist_id: str) -> None: + """Start cycling a scene playlist. + + The server engine runs at most one playlist at a time, so starting this + one transparently stops whichever playlist was running before. + """ + async with self.session.post( + f"{self.server_url}/api/v1/scene-playlists/{playlist_id}/start", + headers=self._auth_headers, + timeout=self._timeout, + ) as resp: + if resp.status != 200: + body = await resp.text() + _LOGGER.error( + "Failed to start playlist %s: %s %s", + playlist_id, + resp.status, + body, + ) + resp.raise_for_status() + await self.async_request_refresh() + + async def stop_playlist(self) -> None: + """Stop the active playlist (the last applied scene stays in place). + + The stop endpoint is global — it stops whichever playlist is cycling, + matching the server's single-active-playlist model. + """ + async with self.session.post( + f"{self.server_url}/api/v1/scene-playlists/stop", + headers=self._auth_headers, + timeout=self._timeout, + ) as resp: + if resp.status != 200: + body = await resp.text() + _LOGGER.error( + "Failed to stop playlist: %s %s", + resp.status, + body, + ) + resp.raise_for_status() + await self.async_request_refresh() + async def update_source(self, source_id: str, **kwargs: Any) -> None: """Update a color strip source's fields. diff --git a/custom_components/ledgrab/entity.py b/custom_components/ledgrab/entity.py new file mode 100644 index 0000000..e0e8264 --- /dev/null +++ b/custom_components/ledgrab/entity.py @@ -0,0 +1,58 @@ +"""Shared base entity for scene-playlist platforms. + +The playlist switch and the playlist stats sensors live on the same device and +share the same lookup, availability, and running-state logic. Keeping that in +one place means the snapshot key names (``scene_playlists`` / ``playlist_state``) +and the "is this the active playlist?" match only ever live once, so the switch +and sensors can't silently drift apart. +""" + +from __future__ import annotations + +from typing import Any + +from homeassistant.helpers.update_coordinator import CoordinatorEntity + +from .const import DOMAIN +from .coordinator import LedGrabCoordinator + + +class LedGrabPlaylistEntity(CoordinatorEntity): + """Common plumbing for entities attached to a scene-playlist device.""" + + _attr_has_entity_name = True + + def __init__( + self, + coordinator: LedGrabCoordinator, + playlist_id: str, + entry_id: str, + ) -> None: + super().__init__(coordinator) + self._playlist_id = playlist_id + self._entry_id = entry_id + + @property + def device_info(self) -> dict[str, Any]: + return {"identifiers": {(DOMAIN, self._playlist_id)}} + + @property + def available(self) -> bool: + return self._get_playlist() is not None + + def _get_playlist(self) -> dict[str, Any] | None: + if not self.coordinator.data: + return None + for playlist in self.coordinator.data.get("scene_playlists", []): + if playlist.get("id") == self._playlist_id: + return playlist + return None + + def _running_state(self) -> dict[str, Any] | None: + """Return the global cycling state if this playlist is the active one.""" + if not self.coordinator.data: + return None + state = self.coordinator.data.get("playlist_state") or {} + if state.get("is_running") and state.get("playlist_id") == self._playlist_id: + return state + return None diff --git a/custom_components/ledgrab/event_listener.py b/custom_components/ledgrab/event_listener.py index 53d6fa4..19a6158 100644 --- a/custom_components/ledgrab/event_listener.py +++ b/custom_components/ledgrab/event_listener.py @@ -99,7 +99,11 @@ class EventStreamListener: data = json.loads(msg.data) except json.JSONDecodeError: continue - if data.get("type") in ("state_change", "entity_changed"): + if data.get("type") in ( + "state_change", + "entity_changed", + "playlist_state_changed", + ): await self._coordinator.async_request_refresh() elif msg.type in ( aiohttp.WSMsgType.CLOSED, diff --git a/custom_components/ledgrab/sensor.py b/custom_components/ledgrab/sensor.py index 500ab92..3fdb5b5 100644 --- a/custom_components/ledgrab/sensor.py +++ b/custom_components/ledgrab/sensor.py @@ -29,6 +29,7 @@ from .const import ( DATA_COORDINATOR, ) from .coordinator import LedGrabCoordinator +from .entity import LedGrabPlaylistEntity _LOGGER = logging.getLogger(__name__) @@ -59,6 +60,14 @@ async def async_setup_entry( for clock in coordinator.data.get("sync_clocks", []): entities.append(SyncClockElapsedSensor(coordinator, clock["id"], entry.entry_id)) + for playlist in coordinator.data.get("scene_playlists", []): + entities.append( + PlaylistCurrentSceneSensor(coordinator, playlist["id"], entry.entry_id) + ) + entities.append( + PlaylistItemsSensor(coordinator, playlist["id"], entry.entry_id) + ) + entities.extend(_build_server_sensors(coordinator, entry.entry_id)) async_add_entities(entities) @@ -268,6 +277,93 @@ class SyncClockElapsedSensor(CoordinatorEntity, SensorEntity): return None +class _PlaylistSensorBase(LedGrabPlaylistEntity, SensorEntity): + """Sensor attached to a scene-playlist device. + + Device/availability/lookup plumbing lives in :class:`LedGrabPlaylistEntity` + so it stays in sync with the playlist switch. + """ + + +class PlaylistCurrentSceneSensor(_PlaylistSensorBase): + """Name of the scene preset this playlist is currently holding. + + Only this playlist's value populates while it is the active (cycling) one; + every other playlist's sensor reads ``None`` (idle). The value advances as + the engine steps to the next item. + """ + + _attr_icon = "mdi:playlist-star" + + def __init__( + self, + coordinator: LedGrabCoordinator, + playlist_id: str, + entry_id: str, + ) -> None: + super().__init__(coordinator, playlist_id, entry_id) + self._attr_unique_id = f"{playlist_id}_current_scene" + self._attr_translation_key = "playlist_current_scene" + + @property + def native_value(self) -> str | None: + state = self._running_state() + if not state: + return None + preset_id = state.get("current_preset_id") + if not preset_id: + return None + return self._resolve_preset_name(preset_id) + + @property + def extra_state_attributes(self) -> dict[str, Any]: + state = self._running_state() + if not state: + return {} + return { + "current_preset_id": state.get("current_preset_id"), + "current_index": state.get("current_index"), + "item_count": state.get("item_count"), + } + + def _resolve_preset_name(self, preset_id: str) -> str: + """Map a scene-preset id to its display name (fall back to the id).""" + if self.coordinator.data: + for preset in self.coordinator.data.get("scene_presets", []): + if preset.get("id") == preset_id: + return preset.get("name", preset_id) + return preset_id + + +class PlaylistItemsSensor(_PlaylistSensorBase): + """Number of scene presets queued in a playlist (static config stat). + + No ``state_class`` — this is a configuration cardinality that only changes + when the playlist is edited, so it shouldn't be enrolled in long-term + statistics. + """ + + _attr_icon = "mdi:playlist-music" + _attr_entity_category = EntityCategory.DIAGNOSTIC + + def __init__( + self, + coordinator: LedGrabCoordinator, + playlist_id: str, + entry_id: str, + ) -> None: + super().__init__(coordinator, playlist_id, entry_id) + self._attr_unique_id = f"{playlist_id}_items" + self._attr_translation_key = "playlist_items" + + @property + def native_value(self) -> int | None: + playlist = self._get_playlist() + if playlist is None: + return None + return len(playlist.get("items") or []) + + class HALightMappedLightsSensor(CoordinatorEntity, SensorEntity): """Sensor showing the number of mapped HA lights for an HA Light target.""" diff --git a/custom_components/ledgrab/strings.json b/custom_components/ledgrab/strings.json index 3f326a9..40f0c73 100644 --- a/custom_components/ledgrab/strings.json +++ b/custom_components/ledgrab/strings.json @@ -45,6 +45,9 @@ }, "sync_clock_running": { "name": "Active" + }, + "playlist_active": { + "name": "Active" } }, "sensor": { @@ -66,6 +69,12 @@ "sync_clock_elapsed": { "name": "Elapsed Time" }, + "playlist_current_scene": { + "name": "Current Scene" + }, + "playlist_items": { + "name": "Scenes" + }, "server_cpu_percent": { "name": "CPU Usage" }, diff --git a/custom_components/ledgrab/switch.py b/custom_components/ledgrab/switch.py index b865f6e..9eeeb49 100644 --- a/custom_components/ledgrab/switch.py +++ b/custom_components/ledgrab/switch.py @@ -12,6 +12,7 @@ from homeassistant.helpers.update_coordinator import CoordinatorEntity from .const import DOMAIN, DATA_COORDINATOR from .coordinator import LedGrabCoordinator +from .entity import LedGrabPlaylistEntity _LOGGER = logging.getLogger(__name__) @@ -38,6 +39,11 @@ async def async_setup_entry( LedGrabSyncClockSwitch(coordinator, clock["id"], entry.entry_id) ) + for playlist in coordinator.data.get("scene_playlists", []): + entities.append( + LedGrabPlaylistSwitch(coordinator, playlist["id"], entry.entry_id) + ) + async_add_entities(entities) @@ -164,3 +170,67 @@ class LedGrabSyncClockSwitch(CoordinatorEntity, SwitchEntity): if clock.get("id") == self._clock_id: return clock return None + + +class LedGrabPlaylistSwitch(LedGrabPlaylistEntity, SwitchEntity): + """Start/stop control for a scene playlist. + + On = this playlist is cycling, off = stopped. The server runs at most one + playlist at a time, so turning one on stops any other that was running (the + other switches flip off on the next refresh). + + Device/availability/lookup plumbing lives in :class:`LedGrabPlaylistEntity` + so it stays in sync with the playlist stats sensors. + """ + + _attr_icon = "mdi:playlist-play" + + def __init__( + self, + coordinator: LedGrabCoordinator, + playlist_id: str, + entry_id: str, + ) -> None: + super().__init__(coordinator, playlist_id, entry_id) + self._attr_unique_id = f"{playlist_id}_active" + self._attr_translation_key = "playlist_active" + + @property + def is_on(self) -> bool: + playlist = self._get_playlist() + if not playlist: + return False + return bool(playlist.get("is_running", False)) + + @property + def extra_state_attributes(self) -> dict[str, Any]: + playlist = self._get_playlist() + if not playlist: + return {} + + attrs: dict[str, Any] = { + "playlist_id": self._playlist_id, + "item_count": len(playlist.get("items") or []), + "loop": playlist.get("loop"), + "shuffle": playlist.get("shuffle"), + "tags": playlist.get("tags") or [], + } + + # Runtime cycling details only apply to whichever playlist is running. + state = self._running_state() + if state: + attrs["current_index"] = state.get("current_index") + attrs["current_preset_id"] = state.get("current_preset_id") + attrs["step_duration"] = state.get("step_duration") + attrs["started_at"] = state.get("started_at") + + return attrs + + async def async_turn_on(self, **kwargs: Any) -> None: + await self.coordinator.start_playlist(self._playlist_id) + + async def async_turn_off(self, **kwargs: Any) -> None: + # Stop is global; only act if this playlist is the one actually cycling + # so toggling an already-idle switch can't stop a different playlist. + if self.is_on: + await self.coordinator.stop_playlist() diff --git a/custom_components/ledgrab/translations/en.json b/custom_components/ledgrab/translations/en.json index a068aac..aef8e6c 100644 --- a/custom_components/ledgrab/translations/en.json +++ b/custom_components/ledgrab/translations/en.json @@ -45,6 +45,9 @@ }, "sync_clock_running": { "name": "Active" + }, + "playlist_active": { + "name": "Active" } }, "sensor": { @@ -66,6 +69,12 @@ "sync_clock_elapsed": { "name": "Elapsed Time" }, + "playlist_current_scene": { + "name": "Current Scene" + }, + "playlist_items": { + "name": "Scenes" + }, "server_cpu_percent": { "name": "CPU Usage" }, diff --git a/custom_components/ledgrab/translations/ru.json b/custom_components/ledgrab/translations/ru.json index faa322b..3ef06f6 100644 --- a/custom_components/ledgrab/translations/ru.json +++ b/custom_components/ledgrab/translations/ru.json @@ -45,6 +45,9 @@ }, "sync_clock_running": { "name": "Активно" + }, + "playlist_active": { + "name": "Активно" } }, "sensor": { @@ -66,6 +69,12 @@ "sync_clock_elapsed": { "name": "Прошло времени" }, + "playlist_current_scene": { + "name": "Текущая сцена" + }, + "playlist_items": { + "name": "Сцены" + }, "server_cpu_percent": { "name": "Загрузка CPU" },