Rewrite HAOS integration: target-centric architecture with KC color sensors

- Rewrite integration to target-centric model: each picture target becomes
  a HA device under a server hub with switch, FPS, and status sensors
- Replace KC light entities with color sensors (hex state + RGB attributes)
  for better automation support via WebSocket real-time updates
- Add WebSocket manager for Key Colors color streaming
- Add KC per-stage timing metrics (calc_colors, broadcast) with rolling avg
- Fix KC timing fields missing from API by adding them to Pydantic schema
- Make start/stop processing idempotent to prevent intermittent 404 errors
- Add HAOS localization support (en, ru) using translation_key system
- Rename integration from "WLED Screen Controller" to "LED Screen Controller"
- Remove obsolete select.py (display select) and README.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-17 13:01:40 +03:00
parent e92fe4eb0a
commit 67da014684
19 changed files with 772 additions and 746 deletions

View File

@@ -1,214 +0,0 @@
# WLED Screen Controller - Home Assistant Integration
Native Home Assistant integration for WLED Screen Controller with full HACS support.
## Overview
This integration connects Home Assistant to the WLED Screen Controller server, providing:
- 🎛️ **Switch Entities** - Turn processing on/off per device
- 📊 **Sensor Entities** - Monitor FPS, status, and frame count
- 🖥️ **Select Entities** - Choose which display to capture
- 🔄 **Auto-Discovery** - Devices appear automatically
- 📦 **HACS Compatible** - Install directly from HACS
- ⚙️ **Config Flow** - Easy setup through UI
## Installation
### Method 1: HACS (Recommended)
1. **Install HACS** if you haven't already:
- Visit https://hacs.xyz/docs/setup/download
2. **Add Custom Repository:**
- Open HACS in Home Assistant
- Click the menu (⋮) → Custom repositories
- Add URL: `https://github.com/yourusername/wled-screen-controller`
- Category: **Integration**
- Click **Add**
3. **Install Integration:**
- In HACS, search for "WLED Screen Controller"
- Click **Download**
- Restart Home Assistant
4. **Configure:**
- Go to Settings → Devices & Services
- Click **+ Add Integration**
- Search for "WLED Screen Controller"
- Enter your server URL (e.g., `http://192.168.1.100:8080`)
- Click **Submit**
### Method 2: Manual Installation
1. **Download:**
```bash
cd /config # Your Home Assistant config directory
mkdir -p custom_components
```
2. **Copy Files:**
Copy the entire `custom_components/wled_screen_controller` folder to your Home Assistant `custom_components/` directory.
3. **Restart Home Assistant**
4. **Configure:**
- Settings → Devices & Services → Add Integration
- Search for "WLED Screen Controller"
## Configuration
### Initial Setup
When adding the integration, you'll be prompted for:
- **Name**: Friendly name for the integration (default: "WLED Screen Controller")
- **Server URL**: URL of your WLED Screen Controller server (e.g., `http://192.168.1.100:8080`)
The integration will automatically:
- Verify connection to the server
- Discover all configured WLED devices
- Create entities for each device
### Entities Created
For each WLED device, the following entities are created:
#### Switch Entities
**`switch.{device_name}_processing`**
- Controls processing on/off for the device
- Attributes:
- `device_id`: Internal device ID
- `fps_target`: Target FPS
- `fps_actual`: Current FPS
- `display_index`: Active display
- `frames_processed`: Total frames
- `errors_count`: Error count
- `uptime_seconds`: Processing uptime
#### Sensor Entities
**`sensor.{device_name}_fps`**
- Current FPS value
- Unit: FPS
- Attributes:
- `target_fps`: Target FPS setting
**`sensor.{device_name}_status`**
- Processing status
- States: `processing`, `idle`, `unavailable`, `unknown`
**`sensor.{device_name}_frames_processed`**
- Total frames processed counter
- Continuously increasing while processing
#### Select Entities
**`select.{device_name}_display`**
- Select which display to capture
- Options: `Display 0`, `Display 1`, etc.
- Changes take effect immediately
## Usage Examples
### Basic Automation
Turn on processing when TV turns on:
```yaml
automation:
- alias: "Auto Start WLED with TV"
trigger:
- platform: state
entity_id: media_player.living_room_tv
to: "on"
action:
- service: switch.turn_on
target:
entity_id: switch.living_room_wled_processing
- alias: "Auto Stop WLED with TV"
trigger:
- platform: state
entity_id: media_player.living_room_tv
to: "off"
action:
- service: switch.turn_off
target:
entity_id: switch.living_room_wled_processing
```
### Lovelace UI Examples
#### Simple Card
```yaml
type: entities
title: WLED Screen Controller
entities:
- entity: switch.living_room_wled_processing
- entity: sensor.living_room_wled_fps
- entity: sensor.living_room_wled_status
- entity: select.living_room_wled_display
```
#### Advanced Card
```yaml
type: vertical-stack
cards:
- type: entity
entity: switch.living_room_wled_processing
name: Ambient Lighting
icon: mdi:television-ambient-light
- type: conditional
conditions:
- entity: switch.living_room_wled_processing
state: "on"
card:
type: entities
entities:
- entity: sensor.living_room_wled_fps
name: Current FPS
- entity: sensor.living_room_wled_frames_processed
name: Frames Processed
- entity: select.living_room_wled_display
name: Display Selection
```
## Troubleshooting
### Integration Not Appearing
1. Check HACS installation
2. Clear browser cache
3. Restart Home Assistant
4. Check logs: Settings → System → Logs
### Connection Errors
1. Verify server is running:
```bash
curl http://YOUR_SERVER_IP:8080/health
```
2. Check firewall settings
3. Ensure Home Assistant can reach server
4. Try http:// not https://
### Entities Not Updating
1. Check coordinator logs
2. Verify server has devices
3. Restart integration
## Support
- 📖 [Full Documentation](../../INSTALLATION.md)
- 🐛 [Report Issues](https://github.com/yourusername/wled-screen-controller/issues)
## License
MIT License - see [../../LICENSE](../../LICENSE)

View File

@@ -1,84 +1,112 @@
"""The WLED Screen Controller integration."""
"""The LED Screen Controller integration."""
from __future__ import annotations
import logging
from datetime import timedelta
from urllib.parse import urlparse
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform, CONF_NAME
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN, CONF_SERVER_URL, DEFAULT_SCAN_INTERVAL
from .const import (
DOMAIN,
CONF_SERVER_URL,
CONF_API_KEY,
DEFAULT_SCAN_INTERVAL,
TARGET_TYPE_KEY_COLORS,
DATA_COORDINATOR,
DATA_WS_MANAGER,
)
from .coordinator import WLEDScreenControllerCoordinator
from .ws_manager import KeyColorsWebSocketManager
_LOGGER = logging.getLogger(__name__)
PLATFORMS: list[Platform] = [
Platform.SWITCH,
Platform.SENSOR,
Platform.SELECT,
]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up WLED Screen Controller from a config entry."""
"""Set up LED Screen Controller from a config entry."""
server_url = entry.data[CONF_SERVER_URL]
server_name = entry.data.get(CONF_NAME, "WLED Screen Controller")
api_key = entry.data[CONF_API_KEY]
session = async_get_clientsession(hass)
coordinator = WLEDScreenControllerCoordinator(
hass,
session,
server_url,
api_key,
update_interval=timedelta(seconds=DEFAULT_SCAN_INTERVAL),
)
# Fetch initial data
await coordinator.async_config_entry_first_refresh()
# Create hub device (the server PC)
ws_manager = KeyColorsWebSocketManager(hass, server_url, api_key)
# Create device entries for each target
device_registry = dr.async_get(hass)
# Parse URL for hub identifier
parsed_url = urlparse(server_url)
hub_identifier = f"{parsed_url.hostname}:{parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)}"
hub_device = device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
identifiers={(DOMAIN, hub_identifier)},
name=server_name,
manufacturer="WLED Screen Controller",
model="Server",
sw_version=coordinator.server_version,
configuration_url=server_url,
)
# Create device entries for each WLED device
if coordinator.data and "devices" in coordinator.data:
for device_id, device_data in coordinator.data["devices"].items():
device_info = device_data["info"]
if coordinator.data and "targets" in coordinator.data:
for target_id, target_data in coordinator.data["targets"].items():
info = target_data["info"]
target_type = info.get("target_type", "led")
model = (
"Key Colors Target"
if target_type == TARGET_TYPE_KEY_COLORS
else "LED Target"
)
device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
identifiers={(DOMAIN, device_id)},
name=device_info["name"],
manufacturer="WLED",
model="Screen Ambient Lighting",
sw_version=f"{device_info.get('led_count', 0)} LEDs",
via_device=(DOMAIN, hub_identifier), # Link to hub
configuration_url=device_info.get("url"),
identifiers={(DOMAIN, target_id)},
name=info.get("name", target_id),
manufacturer="LED Screen Controller",
model=model,
configuration_url=server_url,
)
# Store coordinator and hub info
# Store data
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = {
"coordinator": coordinator,
"hub_device_id": hub_device.id,
DATA_COORDINATOR: coordinator,
DATA_WS_MANAGER: ws_manager,
}
# Setup platforms
# Track target IDs to detect changes
initial_target_ids = set(
coordinator.data.get("targets", {}).keys() if coordinator.data else []
)
def _on_coordinator_update() -> None:
"""Manage WS connections and detect target list changes."""
if not coordinator.data:
return
targets = coordinator.data.get("targets", {})
# Start/stop WS connections for KC targets based on processing state
for target_id, target_data in targets.items():
info = target_data.get("info", {})
state = target_data.get("state") or {}
if info.get("target_type") == TARGET_TYPE_KEY_COLORS:
if state.get("processing"):
hass.async_create_task(ws_manager.start_listening(target_id))
else:
hass.async_create_task(ws_manager.stop_listening(target_id))
# Reload if target list changed
current_ids = set(targets.keys())
if current_ids != initial_target_ids:
_LOGGER.info("Target list changed, reloading integration")
hass.async_create_task(
hass.config_entries.async_reload(entry.entry_id)
)
coordinator.async_add_listener(_on_coordinator_update)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
@@ -86,15 +114,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
ws_manager: KeyColorsWebSocketManager = hass.data[DOMAIN][entry.entry_id][
DATA_WS_MANAGER
]
await ws_manager.shutdown()
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
async def async_reload_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Reload config entry."""
await async_unload_entry(hass, entry)
await async_setup_entry(hass, entry)

View File

@@ -1,4 +1,4 @@
"""Config flow for WLED Screen Controller integration."""
"""Config flow for LED Screen Controller integration."""
from __future__ import annotations
import logging
@@ -9,19 +9,18 @@ import aiohttp
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN, CONF_SERVER_URL, DEFAULT_TIMEOUT
from .const import DOMAIN, CONF_SERVER_URL, CONF_API_KEY, DEFAULT_TIMEOUT
_LOGGER = logging.getLogger(__name__)
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_NAME, default="WLED Screen Controller"): str,
vol.Required(CONF_SERVER_URL, default="http://localhost:8080"): str,
vol.Required(CONF_API_KEY): str,
}
)
@@ -30,48 +29,56 @@ def normalize_url(url: str) -> str:
"""Normalize URL to ensure port is an integer."""
parsed = urlparse(url)
# If port is specified, ensure it's an integer
if parsed.port is not None:
# Reconstruct URL with integer port
netloc = parsed.hostname or "localhost"
port = int(parsed.port) # Cast to int to avoid float
port = int(parsed.port)
if port != (443 if parsed.scheme == "https" else 80):
netloc = f"{netloc}:{port}"
parsed = parsed._replace(netloc=netloc)
return urlunparse(parsed)
async def validate_server_connection(
hass: HomeAssistant, server_url: str
async def validate_server(
hass: HomeAssistant, server_url: str, api_key: str
) -> dict[str, Any]:
"""Validate the server URL by checking the health endpoint."""
"""Validate server connectivity and API key."""
session = async_get_clientsession(hass)
timeout = aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT)
# Step 1: Check connectivity via health endpoint (no auth needed)
try:
async with session.get(f"{server_url}/health", timeout=timeout) as resp:
if resp.status != 200:
raise ConnectionError(f"Server returned status {resp.status}")
data = await resp.json()
version = data.get("version", "unknown")
except aiohttp.ClientError as err:
raise ConnectionError(f"Cannot connect to server: {err}") from err
# Step 2: Validate API key via authenticated endpoint
headers = {"Authorization": f"Bearer {api_key}"}
try:
async with session.get(
f"{server_url}/health",
timeout=aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT),
) as response:
if response.status == 200:
data = await response.json()
return {
"version": data.get("version", "unknown"),
"status": data.get("status", "unknown"),
}
raise ConnectionError(f"Server returned status {response.status}")
f"{server_url}/api/v1/picture-targets",
headers=headers,
timeout=timeout,
) as resp:
if resp.status == 401:
raise PermissionError("Invalid API key")
resp.raise_for_status()
except PermissionError:
raise
except aiohttp.ClientError as err:
raise ConnectionError(f"Cannot connect to server: {err}")
except Exception as err:
raise ConnectionError(f"Unexpected error: {err}")
raise ConnectionError(f"API request failed: {err}") from err
return {"version": version}
class WLEDScreenControllerConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for WLED Screen Controller."""
"""Handle a config flow for LED Screen Controller."""
VERSION = 1
VERSION = 2
async def async_step_user(
self, user_input: dict[str, Any] | None = None
@@ -81,26 +88,28 @@ class WLEDScreenControllerConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
if user_input is not None:
server_url = normalize_url(user_input[CONF_SERVER_URL].rstrip("/"))
api_key = user_input[CONF_API_KEY]
try:
info = await validate_server_connection(self.hass, server_url)
await validate_server(self.hass, server_url, api_key)
# Set unique ID based on server URL
await self.async_set_unique_id(server_url)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=user_input[CONF_NAME],
title="LED Screen Controller",
data={
CONF_SERVER_URL: server_url,
"version": info["version"],
CONF_API_KEY: api_key,
},
)
except ConnectionError as err:
_LOGGER.error("Connection error: %s", err)
errors["base"] = "cannot_connect"
except Exception as err: # pylint: disable=broad-except
except PermissionError:
errors["base"] = "invalid_api_key"
except Exception as err:
_LOGGER.exception("Unexpected exception: %s", err)
errors["base"] = "unknown"

View File

@@ -1,23 +1,21 @@
"""Constants for the WLED Screen Controller integration."""
"""Constants for the LED Screen Controller integration."""
DOMAIN = "wled_screen_controller"
# Configuration
CONF_SERVER_URL = "server_url"
CONF_API_KEY = "api_key"
# Default values
DEFAULT_SCAN_INTERVAL = 10 # seconds
DEFAULT_TIMEOUT = 10 # seconds
WS_RECONNECT_DELAY = 5 # seconds
WS_MAX_RECONNECT_DELAY = 60 # seconds
# Attributes
ATTR_DEVICE_ID = "device_id"
ATTR_FPS_ACTUAL = "fps_actual"
ATTR_FPS_TARGET = "fps_target"
ATTR_DISPLAY_INDEX = "display_index"
ATTR_FRAMES_PROCESSED = "frames_processed"
ATTR_ERRORS_COUNT = "errors_count"
ATTR_UPTIME = "uptime_seconds"
# Target types
TARGET_TYPE_LED = "led"
TARGET_TYPE_KEY_COLORS = "key_colors"
# Services
SERVICE_START_PROCESSING = "start_processing"
SERVICE_STOP_PROCESSING = "stop_processing"
# Data keys stored in hass.data[DOMAIN][entry_id]
DATA_COORDINATOR = "coordinator"
DATA_WS_MANAGER = "ws_manager"

View File

@@ -1,4 +1,4 @@
"""Data update coordinator for WLED Screen Controller."""
"""Data update coordinator for LED Screen Controller."""
from __future__ import annotations
import asyncio
@@ -11,25 +11,33 @@ import aiohttp
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN, DEFAULT_TIMEOUT
from .const import (
DOMAIN,
DEFAULT_TIMEOUT,
TARGET_TYPE_KEY_COLORS,
)
_LOGGER = logging.getLogger(__name__)
class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
"""Class to manage fetching WLED Screen Controller data."""
"""Class to manage fetching LED Screen Controller data."""
def __init__(
self,
hass: HomeAssistant,
session: aiohttp.ClientSession,
server_url: str,
api_key: str,
update_interval: timedelta,
) -> None:
"""Initialize the coordinator."""
self.server_url = server_url
self.session = session
self.api_key = api_key
self.server_version = "unknown"
self._auth_headers = {"Authorization": f"Bearer {api_key}"}
self._pattern_cache: dict[str, list[dict]] = {}
super().__init__(
hass,
@@ -41,44 +49,67 @@ class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
async def _async_update_data(self) -> dict[str, Any]:
"""Fetch data from API."""
try:
async with asyncio.timeout(DEFAULT_TIMEOUT):
# Fetch server version on first update
async with asyncio.timeout(DEFAULT_TIMEOUT * 3):
if self.server_version == "unknown":
await self._fetch_server_version()
# Fetch devices list
devices = await self._fetch_devices()
targets_list = await self._fetch_targets()
# Fetch state for each device
devices_data = {}
for device in devices:
device_id = device["id"]
# Fetch state and metrics for all targets in parallel
targets_data: dict[str, dict[str, Any]] = {}
async def fetch_target_data(target: dict) -> tuple[str, dict]:
target_id = target["id"]
try:
state = await self._fetch_device_state(device_id)
metrics = await self._fetch_device_metrics(device_id)
devices_data[device_id] = {
"info": device,
"state": state,
"metrics": metrics,
}
state, metrics = await asyncio.gather(
self._fetch_target_state(target_id),
self._fetch_target_metrics(target_id),
)
except Exception as err:
_LOGGER.warning(
"Failed to fetch data for device %s: %s", device_id, err
"Failed to fetch data for target %s: %s",
target_id,
err,
)
# Include device info even if state fetch fails
devices_data[device_id] = {
"info": device,
"state": None,
"metrics": None,
}
state = None
metrics = None
# Fetch available displays
displays = await self._fetch_displays()
result: dict[str, Any] = {
"info": target,
"state": state,
"metrics": metrics,
}
# Fetch rectangles for key_colors targets
if target.get("target_type") == TARGET_TYPE_KEY_COLORS:
kc_settings = target.get("key_colors_settings") or {}
template_id = kc_settings.get("pattern_template_id", "")
if template_id:
result["rectangles"] = await self._get_rectangles(
template_id
)
else:
result["rectangles"] = []
else:
result["rectangles"] = []
return target_id, result
results = await asyncio.gather(
*(fetch_target_data(t) for t in targets_list),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
_LOGGER.warning("Target fetch failed: %s", r)
continue
target_id, data = r
targets_data[target_id] = data
return {
"devices": devices_data,
"displays": displays,
"targets": targets_data,
"server_version": self.server_version,
}
except asyncio.TimeoutError as err:
@@ -92,88 +123,99 @@ class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
async with self.session.get(
f"{self.server_url}/health",
timeout=aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT),
) as response:
response.raise_for_status()
data = await response.json()
) as resp:
resp.raise_for_status()
data = await resp.json()
self.server_version = data.get("version", "unknown")
except Exception as err:
_LOGGER.warning("Failed to fetch server version: %s", err)
self.server_version = "unknown"
async def _fetch_devices(self) -> list[dict[str, Any]]:
"""Fetch devices list."""
async def _fetch_targets(self) -> list[dict[str, Any]]:
"""Fetch all picture targets."""
async with self.session.get(
f"{self.server_url}/api/v1/devices",
f"{self.server_url}/api/v1/picture-targets",
headers=self._auth_headers,
timeout=aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT),
) as response:
response.raise_for_status()
data = await response.json()
return data.get("devices", [])
) as resp:
resp.raise_for_status()
data = await resp.json()
return data.get("targets", [])
async def _fetch_device_state(self, device_id: str) -> dict[str, Any]:
"""Fetch device processing state."""
async def _fetch_target_state(self, target_id: str) -> dict[str, Any]:
"""Fetch target processing state."""
async with self.session.get(
f"{self.server_url}/api/v1/devices/{device_id}/state",
f"{self.server_url}/api/v1/picture-targets/{target_id}/state",
headers=self._auth_headers,
timeout=aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT),
) as response:
response.raise_for_status()
return await response.json()
) as resp:
resp.raise_for_status()
return await resp.json()
async def _fetch_device_metrics(self, device_id: str) -> dict[str, Any]:
"""Fetch device metrics."""
async def _fetch_target_metrics(self, target_id: str) -> dict[str, Any]:
"""Fetch target metrics."""
async with self.session.get(
f"{self.server_url}/api/v1/devices/{device_id}/metrics",
f"{self.server_url}/api/v1/picture-targets/{target_id}/metrics",
headers=self._auth_headers,
timeout=aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT),
) as response:
response.raise_for_status()
return await response.json()
) as resp:
resp.raise_for_status()
return await resp.json()
async def _get_rectangles(self, template_id: str) -> list[dict]:
"""Get rectangles for a pattern template, using cache."""
if template_id in self._pattern_cache:
return self._pattern_cache[template_id]
async def _fetch_displays(self) -> list[dict[str, Any]]:
"""Fetch available displays."""
try:
async with self.session.get(
f"{self.server_url}/api/v1/config/displays",
f"{self.server_url}/api/v1/pattern-templates/{template_id}",
headers=self._auth_headers,
timeout=aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT),
) as response:
response.raise_for_status()
data = await response.json()
return data.get("displays", [])
) as resp:
resp.raise_for_status()
data = await resp.json()
rectangles = data.get("rectangles", [])
self._pattern_cache[template_id] = rectangles
return rectangles
except Exception as err:
_LOGGER.warning("Failed to fetch displays: %s", err)
_LOGGER.warning(
"Failed to fetch pattern template %s: %s", template_id, err
)
return []
async def start_processing(self, device_id: str) -> None:
"""Start processing for a device."""
async def start_processing(self, target_id: str) -> None:
"""Start processing for a target."""
async with self.session.post(
f"{self.server_url}/api/v1/devices/{device_id}/start",
f"{self.server_url}/api/v1/picture-targets/{target_id}/start",
headers=self._auth_headers,
timeout=aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT),
) as response:
response.raise_for_status()
# Refresh data immediately
) as resp:
if resp.status == 409:
_LOGGER.debug("Target %s already processing", target_id)
elif resp.status != 200:
body = await resp.text()
_LOGGER.error(
"Failed to start target %s: %s %s",
target_id, resp.status, body,
)
resp.raise_for_status()
await self.async_request_refresh()
async def stop_processing(self, device_id: str) -> None:
"""Stop processing for a device."""
async def stop_processing(self, target_id: str) -> None:
"""Stop processing for a target."""
async with self.session.post(
f"{self.server_url}/api/v1/devices/{device_id}/stop",
f"{self.server_url}/api/v1/picture-targets/{target_id}/stop",
headers=self._auth_headers,
timeout=aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT),
) as response:
response.raise_for_status()
# Refresh data immediately
await self.async_request_refresh()
async def update_settings(
self, device_id: str, settings: dict[str, Any]
) -> None:
"""Update device settings."""
async with self.session.put(
f"{self.server_url}/api/v1/devices/{device_id}/settings",
json=settings,
timeout=aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT),
) as response:
response.raise_for_status()
# Refresh data immediately
) as resp:
if resp.status == 409:
_LOGGER.debug("Target %s already stopped", target_id)
elif resp.status != 200:
body = await resp.text()
_LOGGER.error(
"Failed to stop target %s: %s %s",
target_id, resp.status, body,
)
resp.raise_for_status()
await self.async_request_refresh()

View File

@@ -1,12 +1,12 @@
{
"domain": "wled_screen_controller",
"name": "WLED Screen Controller",
"name": "LED Screen Controller",
"codeowners": ["@alexeidolgolyov"],
"config_flow": true,
"dependencies": [],
"documentation": "https://github.com/yourusername/wled-screen-controller",
"iot_class": "local_polling",
"iot_class": "local_push",
"issue_tracker": "https://github.com/yourusername/wled-screen-controller/issues",
"requirements": ["aiohttp>=3.9.0"],
"version": "0.1.0"
"version": "0.2.0"
}

View File

@@ -1,117 +0,0 @@
"""Select platform for WLED Screen Controller."""
from __future__ import annotations
import logging
from typing import Any
from homeassistant.components.select import SelectEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
from .coordinator import WLEDScreenControllerCoordinator
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up WLED Screen Controller select entities."""
data = hass.data[DOMAIN][entry.entry_id]
coordinator: WLEDScreenControllerCoordinator = data["coordinator"]
entities = []
if coordinator.data and "devices" in coordinator.data:
for device_id, device_data in coordinator.data["devices"].items():
device_info = device_data["info"]
entities.append(
WLEDScreenControllerDisplaySelect(
coordinator, device_id, device_info, entry.entry_id
)
)
async_add_entities(entities)
class WLEDScreenControllerDisplaySelect(CoordinatorEntity, SelectEntity):
"""Display selection for WLED Screen Controller."""
_attr_has_entity_name = True
_attr_icon = "mdi:monitor-multiple"
def __init__(
self,
coordinator: WLEDScreenControllerCoordinator,
device_id: str,
device_info: dict[str, Any],
entry_id: str,
) -> None:
"""Initialize the select."""
super().__init__(coordinator)
self._device_id = device_id
self._device_info = device_info
self._entry_id = entry_id
self._attr_unique_id = f"{device_id}_display"
self._attr_name = "Display"
@property
def device_info(self) -> dict[str, Any]:
"""Return device information."""
return {
"identifiers": {(DOMAIN, self._device_id)},
}
@property
def options(self) -> list[str]:
"""Return available display options."""
if not self.coordinator.data or "displays" not in self.coordinator.data:
return ["Display 0"]
displays = self.coordinator.data["displays"]
return [f"Display {d['index']}" for d in displays]
@property
def current_option(self) -> str | None:
"""Return current display."""
if not self.coordinator.data:
return None
device_data = self.coordinator.data["devices"].get(self._device_id)
if not device_data or not device_data.get("state"):
return None
display_index = device_data["state"].get("display_index", 0)
return f"Display {display_index}"
async def async_select_option(self, option: str) -> None:
"""Change the selected display."""
try:
# Extract display index from option (e.g., "Display 1" -> 1)
display_index = int(option.split()[-1])
# Get current settings
device_data = self.coordinator.data["devices"].get(self._device_id)
if not device_data:
return
info = device_data["info"]
settings = info.get("settings", {})
# Update settings with new display index
updated_settings = {
"display_index": display_index,
"fps": settings.get("fps", 30),
"border_width": settings.get("border_width", 10),
}
await self.coordinator.update_settings(self._device_id, updated_settings)
except Exception as err:
_LOGGER.error("Failed to update display: %s", err)
raise

View File

@@ -1,7 +1,8 @@
"""Sensor platform for WLED Screen Controller."""
"""Sensor platform for LED Screen Controller."""
from __future__ import annotations
import logging
from collections.abc import Callable
from typing import Any
from homeassistant.components.sensor import (
@@ -10,13 +11,18 @@ from homeassistant.components.sensor import (
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import UnitOfTime
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
from .const import (
DOMAIN,
TARGET_TYPE_KEY_COLORS,
DATA_COORDINATOR,
DATA_WS_MANAGER,
)
from .coordinator import WLEDScreenControllerCoordinator
from .ws_manager import KeyColorsWebSocketManager
_LOGGER = logging.getLogger(__name__)
@@ -26,180 +32,242 @@ async def async_setup_entry(
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up WLED Screen Controller sensors."""
"""Set up LED Screen Controller sensors."""
data = hass.data[DOMAIN][entry.entry_id]
coordinator: WLEDScreenControllerCoordinator = data["coordinator"]
coordinator: WLEDScreenControllerCoordinator = data[DATA_COORDINATOR]
ws_manager: KeyColorsWebSocketManager = data[DATA_WS_MANAGER]
entities = []
if coordinator.data and "devices" in coordinator.data:
for device_id, device_data in coordinator.data["devices"].items():
device_info = device_data["info"]
# FPS sensor
entities: list[SensorEntity] = []
if coordinator.data and "targets" in coordinator.data:
for target_id, target_data in coordinator.data["targets"].items():
entities.append(
WLEDScreenControllerFPSSensor(
coordinator, device_id, device_info, entry.entry_id
)
WLEDScreenControllerFPSSensor(coordinator, target_id, entry.entry_id)
)
# Status sensor
entities.append(
WLEDScreenControllerStatusSensor(
coordinator, device_id, device_info, entry.entry_id
coordinator, target_id, entry.entry_id
)
)
# Frames processed sensor
entities.append(
WLEDScreenControllerFramesSensor(
coordinator, device_id, device_info, entry.entry_id
)
)
# Add color sensors for Key Colors targets
info = target_data["info"]
if info.get("target_type") == TARGET_TYPE_KEY_COLORS:
rectangles = target_data.get("rectangles", [])
for rect in rectangles:
entities.append(
WLEDScreenControllerColorSensor(
coordinator=coordinator,
ws_manager=ws_manager,
target_id=target_id,
rectangle_name=rect["name"],
entry_id=entry.entry_id,
)
)
async_add_entities(entities)
class WLEDScreenControllerFPSSensor(CoordinatorEntity, SensorEntity):
"""FPS sensor for WLED Screen Controller."""
"""FPS sensor for a LED Screen Controller target."""
_attr_has_entity_name = True
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_native_unit_of_measurement = "FPS"
_attr_icon = "mdi:speedometer"
_attr_suggested_display_precision = 1
def __init__(
self,
coordinator: WLEDScreenControllerCoordinator,
device_id: str,
device_info: dict[str, Any],
target_id: str,
entry_id: str,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator)
self._device_id = device_id
self._device_info = device_info
self._target_id = target_id
self._entry_id = entry_id
self._attr_unique_id = f"{device_id}_fps"
self._attr_name = "FPS"
self._attr_unique_id = f"{target_id}_fps"
self._attr_translation_key = "fps"
@property
def device_info(self) -> dict[str, Any]:
"""Return device information."""
return {
"identifiers": {(DOMAIN, self._device_id)},
}
return {"identifiers": {(DOMAIN, self._target_id)}}
@property
def native_value(self) -> float | None:
"""Return the FPS value."""
if not self.coordinator.data:
target_data = self._get_target_data()
if not target_data or not target_data.get("state"):
return None
device_data = self.coordinator.data["devices"].get(self._device_id)
if not device_data or not device_data.get("state"):
state = target_data["state"]
if not state.get("processing"):
return None
return device_data["state"].get("fps_actual")
return state.get("fps_actual")
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return additional attributes."""
target_data = self._get_target_data()
if not target_data or not target_data.get("state"):
return {}
return {"fps_target": target_data["state"].get("fps_target")}
@property
def available(self) -> bool:
"""Return if entity is available."""
return self._get_target_data() is not None
def _get_target_data(self) -> dict[str, Any] | None:
if not self.coordinator.data:
return {}
device_data = self.coordinator.data["devices"].get(self._device_id)
if not device_data or not device_data.get("state"):
return {}
return {
"target_fps": device_data["state"].get("fps_target"),
}
return None
return self.coordinator.data.get("targets", {}).get(self._target_id)
class WLEDScreenControllerStatusSensor(CoordinatorEntity, SensorEntity):
"""Status sensor for WLED Screen Controller."""
"""Status sensor for a LED Screen Controller target."""
_attr_has_entity_name = True
_attr_icon = "mdi:information-outline"
_attr_device_class = SensorDeviceClass.ENUM
_attr_options = ["processing", "idle", "error", "unavailable"]
def __init__(
self,
coordinator: WLEDScreenControllerCoordinator,
device_id: str,
device_info: dict[str, Any],
target_id: str,
entry_id: str,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator)
self._device_id = device_id
self._device_info = device_info
self._target_id = target_id
self._entry_id = entry_id
self._attr_unique_id = f"{device_id}_status"
self._attr_name = "Status"
self._attr_unique_id = f"{target_id}_status"
self._attr_translation_key = "status"
@property
def device_info(self) -> dict[str, Any]:
"""Return device information."""
return {
"identifiers": {(DOMAIN, self._device_id)},
}
return {"identifiers": {(DOMAIN, self._target_id)}}
@property
def native_value(self) -> str:
"""Return the status."""
if not self.coordinator.data:
return "unknown"
device_data = self.coordinator.data["devices"].get(self._device_id)
if not device_data:
target_data = self._get_target_data()
if not target_data:
return "unavailable"
if device_data.get("state") and device_data["state"].get("processing"):
state = target_data.get("state")
if not state:
return "unavailable"
if state.get("processing"):
errors = state.get("errors", [])
if errors:
return "error"
return "processing"
return "idle"
@property
def available(self) -> bool:
"""Return if entity is available."""
return self._get_target_data() is not None
class WLEDScreenControllerFramesSensor(CoordinatorEntity, SensorEntity):
"""Frames processed sensor."""
def _get_target_data(self) -> dict[str, Any] | None:
if not self.coordinator.data:
return None
return self.coordinator.data.get("targets", {}).get(self._target_id)
class WLEDScreenControllerColorSensor(CoordinatorEntity, SensorEntity):
"""Color sensor reporting the extracted screen color for a Key Colors rectangle."""
_attr_has_entity_name = True
_attr_state_class = SensorStateClass.TOTAL_INCREASING
_attr_icon = "mdi:counter"
_attr_icon = "mdi:palette"
def __init__(
self,
coordinator: WLEDScreenControllerCoordinator,
device_id: str,
device_info: dict[str, Any],
ws_manager: KeyColorsWebSocketManager,
target_id: str,
rectangle_name: str,
entry_id: str,
) -> None:
"""Initialize the sensor."""
"""Initialize the color sensor."""
super().__init__(coordinator)
self._device_id = device_id
self._device_info = device_info
self._target_id = target_id
self._rectangle_name = rectangle_name
self._ws_manager = ws_manager
self._entry_id = entry_id
self._unregister_ws: Callable[[], None] | None = None
self._attr_unique_id = f"{device_id}_frames"
self._attr_name = "Frames Processed"
sanitized = rectangle_name.lower().replace(" ", "_").replace("-", "_")
self._attr_unique_id = f"{target_id}_{sanitized}_color"
self._attr_translation_key = "rectangle_color"
self._attr_translation_placeholders = {"rectangle_name": rectangle_name}
@property
def device_info(self) -> dict[str, Any]:
"""Return device information."""
return {"identifiers": {(DOMAIN, self._target_id)}}
async def async_added_to_hass(self) -> None:
"""Register WS callback when entity is added."""
await super().async_added_to_hass()
self._unregister_ws = self._ws_manager.register_callback(
self._target_id, self._handle_color_update
)
async def async_will_remove_from_hass(self) -> None:
"""Unregister WS callback when entity is removed."""
if self._unregister_ws:
self._unregister_ws()
self._unregister_ws = None
await super().async_will_remove_from_hass()
def _handle_color_update(self, colors: dict) -> None:
"""Handle incoming color update from WebSocket."""
if self._rectangle_name in colors:
self.async_write_ha_state()
@property
def native_value(self) -> str | None:
"""Return the hex color string (e.g. #FF8800)."""
color = self._get_color()
if color is None:
return None
return f"#{color['r']:02X}{color['g']:02X}{color['b']:02X}"
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return r, g, b, brightness as attributes."""
color = self._get_color()
if color is None:
return {}
r, g, b = color["r"], color["g"], color["b"]
brightness = int(0.299 * r + 0.587 * g + 0.114 * b)
return {
"identifiers": {(DOMAIN, self._device_id)},
"r": r,
"g": g,
"b": b,
"brightness": brightness,
"rgb_color": [r, g, b],
}
@property
def native_value(self) -> int | None:
"""Return frames processed."""
def available(self) -> bool:
"""Return if entity is available."""
return self._get_target_data() is not None
def _get_color(self) -> dict[str, int] | None:
"""Get the current color for this rectangle from WS manager."""
target_data = self._get_target_data()
if not target_data or not target_data.get("state"):
return None
if not target_data["state"].get("processing"):
return None
colors = self._ws_manager.get_latest_colors(self._target_id)
return colors.get(self._rectangle_name)
def _get_target_data(self) -> dict[str, Any] | None:
if not self.coordinator.data:
return None
device_data = self.coordinator.data["devices"].get(self._device_id)
if not device_data or not device_data.get("metrics"):
return None
return device_data["metrics"].get("frames_processed", 0)
return self.coordinator.data.get("targets", {}).get(self._target_id)

View File

@@ -2,20 +2,49 @@
"config": {
"step": {
"user": {
"title": "Set up WLED Screen Controller",
"description": "Enter the URL of your WLED Screen Controller server",
"title": "Set up LED Screen Controller",
"description": "Enter the URL and API key for your LED Screen Controller server.",
"data": {
"name": "Name",
"server_url": "Server URL"
"server_url": "Server URL",
"api_key": "API Key"
},
"data_description": {
"server_url": "URL of your LED Screen Controller server (e.g., http://192.168.1.100:8080)",
"api_key": "API key from your server's configuration file"
}
}
},
"error": {
"cannot_connect": "Failed to connect to server. Check the URL and ensure the server is running.",
"unknown": "Unexpected error occurred"
"cannot_connect": "Failed to connect to server.",
"invalid_api_key": "Invalid API key.",
"unknown": "Unexpected error occurred."
},
"abort": {
"already_configured": "This server is already configured"
"already_configured": "This server is already configured."
}
},
"entity": {
"switch": {
"processing": {
"name": "Processing"
}
},
"sensor": {
"fps": {
"name": "FPS"
},
"status": {
"name": "Status",
"state": {
"processing": "Processing",
"idle": "Idle",
"error": "Error",
"unavailable": "Unavailable"
}
},
"rectangle_color": {
"name": "{rectangle_name} Color"
}
}
}
}

View File

@@ -1,4 +1,4 @@
"""Switch platform for WLED Screen Controller."""
"""Switch platform for LED Screen Controller."""
from __future__ import annotations
import logging
@@ -10,7 +10,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN, ATTR_DEVICE_ID
from .const import DOMAIN, DATA_COORDINATOR
from .coordinator import WLEDScreenControllerCoordinator
_LOGGER = logging.getLogger(__name__)
@@ -21,93 +21,71 @@ async def async_setup_entry(
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up WLED Screen Controller switches."""
"""Set up LED Screen Controller switches."""
data = hass.data[DOMAIN][entry.entry_id]
coordinator: WLEDScreenControllerCoordinator = data["coordinator"]
coordinator: WLEDScreenControllerCoordinator = data[DATA_COORDINATOR]
entities = []
if coordinator.data and "devices" in coordinator.data:
for device_id, device_data in coordinator.data["devices"].items():
if coordinator.data and "targets" in coordinator.data:
for target_id, target_data in coordinator.data["targets"].items():
entities.append(
WLEDScreenControllerSwitch(
coordinator, device_id, device_data["info"], entry.entry_id
)
WLEDScreenControllerSwitch(coordinator, target_id, entry.entry_id)
)
async_add_entities(entities)
class WLEDScreenControllerSwitch(CoordinatorEntity, SwitchEntity):
"""Representation of a WLED Screen Controller processing switch."""
"""Representation of a LED Screen Controller target processing switch."""
_attr_has_entity_name = True
def __init__(
self,
coordinator: WLEDScreenControllerCoordinator,
device_id: str,
device_info: dict[str, Any],
target_id: str,
entry_id: str,
) -> None:
"""Initialize the switch."""
super().__init__(coordinator)
self._device_id = device_id
self._device_info = device_info
self._target_id = target_id
self._entry_id = entry_id
self._attr_unique_id = f"{device_id}_processing"
self._attr_name = "Processing"
self._attr_unique_id = f"{target_id}_processing"
self._attr_translation_key = "processing"
self._attr_icon = "mdi:television-ambient-light"
@property
def device_info(self) -> dict[str, Any]:
"""Return device information."""
return {
"identifiers": {(DOMAIN, self._device_id)},
}
return {"identifiers": {(DOMAIN, self._target_id)}}
@property
def is_on(self) -> bool:
"""Return true if processing is active."""
if not self.coordinator.data:
target_data = self._get_target_data()
if not target_data or not target_data.get("state"):
return False
device_data = self.coordinator.data["devices"].get(self._device_id)
if not device_data or not device_data.get("state"):
return False
return device_data["state"].get("processing", False)
return target_data["state"].get("processing", False)
@property
def available(self) -> bool:
"""Return if entity is available."""
if not self.coordinator.data:
return False
device_data = self.coordinator.data["devices"].get(self._device_id)
return device_data is not None
return self._get_target_data() is not None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return additional state attributes."""
if not self.coordinator.data:
target_data = self._get_target_data()
if not target_data:
return {}
device_data = self.coordinator.data["devices"].get(self._device_id)
if not device_data:
return {}
state = device_data.get("state", {})
metrics = device_data.get("metrics", {})
attrs = {
ATTR_DEVICE_ID: self._device_id,
}
attrs: dict[str, Any] = {"target_id": self._target_id}
state = target_data.get("state") or {}
metrics = target_data.get("metrics") or {}
if state:
attrs["fps_target"] = state.get("fps_target")
attrs["fps_actual"] = state.get("fps_actual")
attrs["display_index"] = state.get("display_index")
if metrics:
attrs["frames_processed"] = metrics.get("frames_processed")
@@ -117,17 +95,15 @@ class WLEDScreenControllerSwitch(CoordinatorEntity, SwitchEntity):
return attrs
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn on processing."""
try:
await self.coordinator.start_processing(self._device_id)
except Exception as err:
_LOGGER.error("Failed to start processing: %s", err)
raise
"""Start processing."""
await self.coordinator.start_processing(self._target_id)
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn off processing."""
try:
await self.coordinator.stop_processing(self._device_id)
except Exception as err:
_LOGGER.error("Failed to stop processing: %s", err)
raise
"""Stop processing."""
await self.coordinator.stop_processing(self._target_id)
def _get_target_data(self) -> dict[str, Any] | None:
"""Get target data from coordinator."""
if not self.coordinator.data:
return None
return self.coordinator.data.get("targets", {}).get(self._target_id)

View File

@@ -2,20 +2,49 @@
"config": {
"step": {
"user": {
"title": "Set up WLED Screen Controller",
"description": "Enter the URL of your WLED Screen Controller server",
"title": "Set up LED Screen Controller",
"description": "Enter the URL and API key for your LED Screen Controller server.",
"data": {
"name": "Name",
"server_url": "Server URL"
"server_url": "Server URL",
"api_key": "API Key"
},
"data_description": {
"server_url": "URL of your LED Screen Controller server (e.g., http://192.168.1.100:8080)",
"api_key": "API key from your server's configuration file"
}
}
},
"error": {
"cannot_connect": "Failed to connect to server. Check the URL and ensure the server is running.",
"unknown": "Unexpected error occurred"
"cannot_connect": "Failed to connect to server.",
"invalid_api_key": "Invalid API key.",
"unknown": "Unexpected error occurred."
},
"abort": {
"already_configured": "This server is already configured"
"already_configured": "This server is already configured."
}
},
"entity": {
"switch": {
"processing": {
"name": "Processing"
}
},
"sensor": {
"fps": {
"name": "FPS"
},
"status": {
"name": "Status",
"state": {
"processing": "Processing",
"idle": "Idle",
"error": "Error",
"unavailable": "Unavailable"
}
},
"rectangle_color": {
"name": "{rectangle_name} Color"
}
}
}
}

View File

@@ -0,0 +1,50 @@
{
"config": {
"step": {
"user": {
"title": "Настройка LED Screen Controller",
"description": "Введите URL и API-ключ вашего сервера LED Screen Controller.",
"data": {
"server_url": "URL сервера",
"api_key": "API-ключ"
},
"data_description": {
"server_url": "URL сервера LED Screen Controller (например, http://192.168.1.100:8080)",
"api_key": "API-ключ из конфигурационного файла сервера"
}
}
},
"error": {
"cannot_connect": "Не удалось подключиться к серверу.",
"invalid_api_key": "Неверный API-ключ.",
"unknown": "Произошла непредвиденная ошибка."
},
"abort": {
"already_configured": "Этот сервер уже настроен."
}
},
"entity": {
"switch": {
"processing": {
"name": "Обработка"
}
},
"sensor": {
"fps": {
"name": "FPS"
},
"status": {
"name": "Статус",
"state": {
"processing": "Обработка",
"idle": "Ожидание",
"error": "Ошибка",
"unavailable": "Недоступен"
}
},
"rectangle_color": {
"name": "{rectangle_name} Цвет"
}
}
}
}

View File

@@ -0,0 +1,136 @@
"""WebSocket connection manager for Key Colors target color streams."""
from __future__ import annotations
import asyncio
import contextlib
import json
import logging
from collections.abc import Callable
from typing import Any
import aiohttp
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import WS_RECONNECT_DELAY, WS_MAX_RECONNECT_DELAY
_LOGGER = logging.getLogger(__name__)
class KeyColorsWebSocketManager:
"""Manages WebSocket connections for Key Colors target color streams."""
def __init__(
self,
hass: HomeAssistant,
server_url: str,
api_key: str,
) -> None:
self._hass = hass
self._server_url = server_url
self._api_key = api_key
self._connections: dict[str, asyncio.Task] = {}
self._callbacks: dict[str, list[Callable]] = {}
self._latest_colors: dict[str, dict[str, dict[str, int]]] = {}
self._shutting_down = False
def _get_ws_url(self, target_id: str) -> str:
"""Build WebSocket URL for a target."""
ws_base = self._server_url.replace("http://", "ws://").replace(
"https://", "wss://"
)
return f"{ws_base}/api/v1/picture-targets/{target_id}/ws?token={self._api_key}"
async def start_listening(self, target_id: str) -> None:
"""Start WebSocket connection for a target."""
if target_id in self._connections:
return
task = self._hass.async_create_background_task(
self._ws_loop(target_id),
f"wled_screen_controller_ws_{target_id}",
)
self._connections[target_id] = task
async def stop_listening(self, target_id: str) -> None:
"""Stop WebSocket connection for a target."""
task = self._connections.pop(target_id, None)
if task:
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
self._latest_colors.pop(target_id, None)
def register_callback(
self, target_id: str, callback: Callable
) -> Callable[[], None]:
"""Register a callback for color updates. Returns unregister function."""
self._callbacks.setdefault(target_id, []).append(callback)
def unregister() -> None:
cbs = self._callbacks.get(target_id)
if cbs and callback in cbs:
cbs.remove(callback)
return unregister
def get_latest_colors(self, target_id: str) -> dict[str, dict[str, int]]:
"""Get latest colors for a target."""
return self._latest_colors.get(target_id, {})
async def _ws_loop(self, target_id: str) -> None:
"""WebSocket connection loop with reconnection."""
delay = WS_RECONNECT_DELAY
session = async_get_clientsession(self._hass)
while not self._shutting_down:
try:
url = self._get_ws_url(target_id)
async with session.ws_connect(url) as ws:
delay = WS_RECONNECT_DELAY # reset on successful connect
_LOGGER.debug("WS connected for target %s", target_id)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
self._handle_message(target_id, msg.data)
elif msg.type in (
aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.ERROR,
):
break
except asyncio.CancelledError:
raise
except (aiohttp.ClientError, asyncio.TimeoutError, OSError) as err:
_LOGGER.debug("WS connection error for %s: %s", target_id, err)
except Exception as err:
_LOGGER.error("Unexpected WS error for %s: %s", target_id, err)
if self._shutting_down:
break
await asyncio.sleep(delay)
delay = min(delay * 2, WS_MAX_RECONNECT_DELAY)
def _handle_message(self, target_id: str, raw: str) -> None:
"""Handle incoming WebSocket message."""
try:
data = json.loads(raw)
except json.JSONDecodeError:
return
if data.get("type") != "colors_update":
return
colors: dict[str, Any] = data.get("colors", {})
self._latest_colors[target_id] = colors
for cb in self._callbacks.get(target_id, []):
try:
cb(colors)
except Exception:
_LOGGER.exception("Error in WS color callback for %s", target_id)
async def shutdown(self) -> None:
"""Stop all WebSocket connections."""
self._shutting_down = True
for target_id in list(self._connections):
await self.stop_listening(target_id)

View File

@@ -12,25 +12,13 @@ auth:
# Generate secure keys: openssl rand -hex 32
dev: "development-key-change-in-production" # Development key - CHANGE THIS!
processing:
default_fps: 30
max_fps: 60
min_fps: 1
border_width: 10 # pixels to sample from screen edge
interpolation_mode: "average" # average, median, dominant
screen_capture:
buffer_size: 2 # Number of frames to buffer
wled:
timeout: 5 # seconds
retry_attempts: 3
retry_delay: 1 # seconds
protocol: "http" # http or https
max_brightness: 255
storage:
devices_file: "data/devices.json"
templates_file: "data/capture_templates.json"
postprocessing_templates_file: "data/postprocessing_templates.json"
picture_sources_file: "data/picture_sources.json"
picture_targets_file: "data/picture_targets.json"
pattern_templates_file: "data/pattern_templates.json"
logging:
format: "json" # json or text

View File

@@ -1,7 +1,7 @@
server:
host: "0.0.0.0" # Listen on all interfaces (accessible from local network)
host: "0.0.0.0"
port: 8080
log_level: "DEBUG" # Verbose logging for testing
log_level: "DEBUG"
cors_origins:
- "*"
@@ -10,28 +10,16 @@ auth:
api_keys:
test_client: "eb8a89cfd33ab067751fd0e38f74ddf7ac3d75ff012fbab35a616c45c12e0c8d"
processing:
default_fps: 30
max_fps: 60
min_fps: 1
border_width: 10
interpolation_mode: "average"
screen_capture:
buffer_size: 2
wled:
timeout: 5
retry_attempts: 3
retry_delay: 1
protocol: "http"
max_brightness: 255
storage:
devices_file: "data/test_devices.json"
templates_file: "data/capture_templates.json"
postprocessing_templates_file: "data/postprocessing_templates.json"
picture_sources_file: "data/picture_sources.json"
picture_targets_file: "data/picture_targets.json"
pattern_templates_file: "data/pattern_templates.json"
logging:
format: "text" # Easier to read during testing
format: "text"
file: "logs/wled_test.log"
max_size_mb: 10
backup_count: 2

View File

@@ -133,6 +133,8 @@ class TargetProcessingState(BaseModel):
timing_smooth_ms: Optional[float] = Field(None, description="Smoothing time (ms)")
timing_send_ms: Optional[float] = Field(None, description="DDP send time (ms)")
timing_total_ms: Optional[float] = Field(None, description="Total processing time (ms)")
timing_calc_colors_ms: Optional[float] = Field(None, description="Color calculation time (ms, KC targets)")
timing_broadcast_ms: Optional[float] = Field(None, description="WebSocket broadcast time (ms, KC targets)")
display_index: int = Field(default=0, description="Current display index")
overlay_active: bool = Field(default=False, description="Whether visualization overlay is active")
last_update: Optional[datetime] = Field(None, description="Last successful update")

View File

@@ -1,4 +1,4 @@
"""Configuration management for LED Grab."""
"""Configuration management for WLED Screen Controller."""
import os
from pathlib import Path
@@ -24,32 +24,6 @@ class AuthConfig(BaseSettings):
api_keys: dict[str, str] = {} # label: key mapping (required for security)
class ProcessingConfig(BaseSettings):
"""Processing configuration."""
default_fps: int = 30
max_fps: int = 90
min_fps: int = 10
border_width: int = 10
interpolation_mode: Literal["average", "median", "dominant"] = "average"
class ScreenCaptureConfig(BaseSettings):
"""Screen capture configuration."""
buffer_size: int = 2
class WLEDConfig(BaseSettings):
"""WLED client configuration."""
timeout: int = 5
retry_attempts: int = 3
retry_delay: int = 1
protocol: Literal["http", "https"] = "http"
max_brightness: int = 255
class StorageConfig(BaseSettings):
"""Storage configuration."""
@@ -81,9 +55,6 @@ class Config(BaseSettings):
server: ServerConfig = Field(default_factory=ServerConfig)
auth: AuthConfig = Field(default_factory=AuthConfig)
processing: ProcessingConfig = Field(default_factory=ProcessingConfig)
screen_capture: ScreenCaptureConfig = Field(default_factory=ScreenCaptureConfig)
wled: WLEDConfig = Field(default_factory=WLEDConfig)
storage: StorageConfig = Field(default_factory=StorageConfig)
logging: LoggingConfig = Field(default_factory=LoggingConfig)

View File

@@ -74,13 +74,10 @@ def _process_frame(capture, border_width, pixel_mapper, previous_colors, smoothi
def _process_kc_frame(capture, rectangles, calc_fn, previous_colors, smoothing):
"""All CPU-bound work for one KC frame (runs in thread pool).
Args:
capture: ScreenCapture from live_stream.get_latest_frame()
rectangles: List of pattern rectangles to extract colors from
calc_fn: Color calculation function (average/median/dominant)
previous_colors: Previous frame colors for smoothing
smoothing: Smoothing factor (0-1)
Returns (colors, timing_ms) where colors is a dict {name: (r, g, b)}
and timing_ms is a dict with per-stage timing in milliseconds.
"""
t0 = time.perf_counter()
img = capture.image
h, w = img.shape[:2]
colors = {}
@@ -95,6 +92,7 @@ def _process_kc_frame(capture, rectangles, calc_fn, previous_colors, smoothing):
px_h = min(px_h, h - px_y)
sub_img = img[px_y:px_y + px_h, px_x:px_x + px_w]
colors[rect.name] = calc_fn(sub_img)
t1 = time.perf_counter()
if previous_colors and smoothing > 0:
for name, color in colors.items():
if name in previous_colors:
@@ -105,7 +103,13 @@ def _process_kc_frame(capture, rectangles, calc_fn, previous_colors, smoothing):
int(color[1] * (1 - alpha) + prev[1] * alpha),
int(color[2] * (1 - alpha) + prev[2] * alpha),
)
return colors
t2 = time.perf_counter()
timing_ms = {
"calc_colors": (t1 - t0) * 1000,
"smooth": (t2 - t1) * 1000,
"total": (t2 - t0) * 1000,
}
return colors, timing_ms
@dataclass
class ProcessingSettings:
@@ -137,11 +141,15 @@ class ProcessingMetrics:
fps_potential: float = 0.0
fps_current: int = 0
# Per-stage timing (ms), averaged over last 10 frames
# LED targets
timing_extract_ms: float = 0.0
timing_map_leds_ms: float = 0.0
timing_smooth_ms: float = 0.0
timing_send_ms: float = 0.0
timing_total_ms: float = 0.0
# KC targets
timing_calc_colors_ms: float = 0.0
timing_broadcast_ms: float = 0.0
@dataclass
@@ -541,7 +549,8 @@ class ProcessorManager:
state = self._targets[target_id]
if state.is_running:
raise RuntimeError(f"Processing already running for target {target_id}")
logger.debug(f"Processing already running for target {target_id}")
return
# Enforce one-target-per-device constraint
for other_id, other in self._targets.items():
@@ -1230,7 +1239,8 @@ class ProcessorManager:
state = self._kc_targets[target_id]
if state.is_running:
raise ValueError(f"KC target {target_id} is already running")
logger.debug(f"KC target {target_id} is already running")
return
if not state.picture_source_id:
raise ValueError(f"KC target {target_id} has no picture source assigned")
@@ -1324,6 +1334,7 @@ class ProcessorManager:
frame_time = 1.0 / target_fps
fps_samples: List[float] = []
timing_samples: collections.deque = collections.deque(maxlen=10)
prev_frame_time_stamp = time.time()
prev_capture = None # Track previous ScreenCapture for change detection
last_broadcast_time = 0.0 # Timestamp of last WS broadcast (for keepalive)
@@ -1367,7 +1378,7 @@ class ProcessorManager:
prev_capture = capture
# CPU-bound work in thread pool
colors = await asyncio.to_thread(
colors, frame_timing = await asyncio.to_thread(
_process_kc_frame,
capture, rectangles, calc_fn,
state.previous_colors, smoothing,
@@ -1377,10 +1388,21 @@ class ProcessorManager:
state.latest_colors = dict(colors)
# Broadcast to WebSocket clients
t_broadcast_start = time.perf_counter()
await self._broadcast_kc_colors(target_id, colors)
broadcast_ms = (time.perf_counter() - t_broadcast_start) * 1000
last_broadcast_time = time.time()
send_timestamps.append(last_broadcast_time)
# Per-stage timing (rolling average over last 10 frames)
frame_timing["broadcast"] = broadcast_ms
timing_samples.append(frame_timing)
n = len(timing_samples)
state.metrics.timing_calc_colors_ms = sum(s["calc_colors"] for s in timing_samples) / n
state.metrics.timing_smooth_ms = sum(s["smooth"] for s in timing_samples) / n
state.metrics.timing_broadcast_ms = sum(s["broadcast"] for s in timing_samples) / n
state.metrics.timing_total_ms = sum(s["total"] for s in timing_samples) / n + broadcast_ms
# Update metrics
state.metrics.frames_processed += 1
state.metrics.last_update = datetime.utcnow()
@@ -1475,6 +1497,10 @@ class ProcessorManager:
"frames_skipped": metrics.frames_skipped if state.is_running else None,
"frames_keepalive": metrics.frames_keepalive if state.is_running else None,
"fps_current": metrics.fps_current if state.is_running else None,
"timing_calc_colors_ms": round(metrics.timing_calc_colors_ms, 1) if state.is_running else None,
"timing_smooth_ms": round(metrics.timing_smooth_ms, 1) if state.is_running else None,
"timing_broadcast_ms": round(metrics.timing_broadcast_ms, 1) if state.is_running else None,
"timing_total_ms": round(metrics.timing_total_ms, 1) if state.is_running else None,
"last_update": metrics.last_update,
"errors": [metrics.last_error] if metrics.last_error else [],
}

View File

@@ -4894,6 +4894,24 @@ function createKCTargetCard(target, sourceMap, patternTemplateMap) {
<div class="metric-value">${metrics.errors_count || 0}</div>
</div>
</div>
${state.timing_total_ms != null ? `
<div class="timing-breakdown">
<div class="timing-header">
<div class="metric-label">${t('device.metrics.timing')}</div>
<div class="timing-total"><strong>${state.timing_total_ms}ms</strong></div>
</div>
<div class="timing-bar">
<span class="timing-seg timing-extract" style="flex:${state.timing_calc_colors_ms}" title="calc ${state.timing_calc_colors_ms}ms"></span>
<span class="timing-seg timing-smooth" style="flex:${state.timing_smooth_ms || 0.1}" title="smooth ${state.timing_smooth_ms}ms"></span>
<span class="timing-seg timing-send" style="flex:${state.timing_broadcast_ms}" title="broadcast ${state.timing_broadcast_ms}ms"></span>
</div>
<div class="timing-legend">
<span class="timing-legend-item"><span class="timing-dot timing-extract"></span>calc ${state.timing_calc_colors_ms}ms</span>
<span class="timing-legend-item"><span class="timing-dot timing-smooth"></span>smooth ${state.timing_smooth_ms}ms</span>
<span class="timing-legend-item"><span class="timing-dot timing-send"></span>broadcast ${state.timing_broadcast_ms}ms</span>
</div>
</div>
` : ''}
` : ''}
</div>
<div class="card-actions">