Optimize KC processing and add reactive HAOS state updates

- Optimize KC frame processing: downsample to 160x90 with cv2.resize
  before rectangle extraction, pre-compute pixel coords, vectorize
  smoothing with numpy arrays
- Add WebSocket event stream for server state changes: processor manager
  fires events on start/stop, new /api/v1/events/ws endpoint streams
  them to connected clients
- Add HAOS EventStreamListener that triggers coordinator refresh on
  state changes for near-instant switch updates
- Reduce HAOS polling interval from 10s to 3s for fresher FPS metrics
- Fix overlay button tooltips: flatten nested JSON keys in locale files
  to match flat dot-notation lookup used by t() function

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-17 14:21:47 +03:00
parent 67da014684
commit 3ee17ed083
7 changed files with 232 additions and 61 deletions

View File

@@ -18,8 +18,10 @@ from .const import (
TARGET_TYPE_KEY_COLORS, TARGET_TYPE_KEY_COLORS,
DATA_COORDINATOR, DATA_COORDINATOR,
DATA_WS_MANAGER, DATA_WS_MANAGER,
DATA_EVENT_LISTENER,
) )
from .coordinator import WLEDScreenControllerCoordinator from .coordinator import WLEDScreenControllerCoordinator
from .event_listener import EventStreamListener
from .ws_manager import KeyColorsWebSocketManager from .ws_manager import KeyColorsWebSocketManager
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -48,6 +50,9 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
ws_manager = KeyColorsWebSocketManager(hass, server_url, api_key) ws_manager = KeyColorsWebSocketManager(hass, server_url, api_key)
event_listener = EventStreamListener(hass, server_url, api_key, coordinator)
await event_listener.start()
# Create device entries for each target # Create device entries for each target
device_registry = dr.async_get(hass) device_registry = dr.async_get(hass)
if coordinator.data and "targets" in coordinator.data: if coordinator.data and "targets" in coordinator.data:
@@ -73,6 +78,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass.data[DOMAIN][entry.entry_id] = { hass.data[DOMAIN][entry.entry_id] = {
DATA_COORDINATOR: coordinator, DATA_COORDINATOR: coordinator,
DATA_WS_MANAGER: ws_manager, DATA_WS_MANAGER: ws_manager,
DATA_EVENT_LISTENER: event_listener,
} }
# Track target IDs to detect changes # Track target IDs to detect changes
@@ -114,10 +120,9 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry.""" """Unload a config entry."""
ws_manager: KeyColorsWebSocketManager = hass.data[DOMAIN][entry.entry_id][ entry_data = hass.data[DOMAIN][entry.entry_id]
DATA_WS_MANAGER await entry_data[DATA_WS_MANAGER].shutdown()
] await entry_data[DATA_EVENT_LISTENER].shutdown()
await ws_manager.shutdown()
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -7,7 +7,7 @@ CONF_SERVER_URL = "server_url"
CONF_API_KEY = "api_key" CONF_API_KEY = "api_key"
# Default values # Default values
DEFAULT_SCAN_INTERVAL = 10 # seconds DEFAULT_SCAN_INTERVAL = 3 # seconds
DEFAULT_TIMEOUT = 10 # seconds DEFAULT_TIMEOUT = 10 # seconds
WS_RECONNECT_DELAY = 5 # seconds WS_RECONNECT_DELAY = 5 # seconds
WS_MAX_RECONNECT_DELAY = 60 # seconds WS_MAX_RECONNECT_DELAY = 60 # seconds
@@ -19,3 +19,4 @@ TARGET_TYPE_KEY_COLORS = "key_colors"
# Data keys stored in hass.data[DOMAIN][entry_id] # Data keys stored in hass.data[DOMAIN][entry_id]
DATA_COORDINATOR = "coordinator" DATA_COORDINATOR = "coordinator"
DATA_WS_MANAGER = "ws_manager" DATA_WS_MANAGER = "ws_manager"
DATA_EVENT_LISTENER = "event_listener"

View File

@@ -0,0 +1,95 @@
"""WebSocket event listener for server state change notifications."""
from __future__ import annotations
import asyncio
import contextlib
import json
import logging
import aiohttp
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import WS_RECONNECT_DELAY, WS_MAX_RECONNECT_DELAY
_LOGGER = logging.getLogger(__name__)
class EventStreamListener:
"""Listens to server WS endpoint for state change events.
Triggers a coordinator refresh whenever a target starts or stops processing,
so HAOS entities react near-instantly to external state changes.
"""
def __init__(
self,
hass: HomeAssistant,
server_url: str,
api_key: str,
coordinator: DataUpdateCoordinator,
) -> None:
self._hass = hass
self._server_url = server_url
self._api_key = api_key
self._coordinator = coordinator
self._task: asyncio.Task | None = None
self._shutting_down = False
async def start(self) -> None:
"""Start listening to the event stream."""
self._task = self._hass.async_create_background_task(
self._ws_loop(),
"wled_screen_controller_events",
)
async def _ws_loop(self) -> None:
"""WebSocket connection loop with reconnection."""
delay = WS_RECONNECT_DELAY
session = async_get_clientsession(self._hass)
ws_base = self._server_url.replace("http://", "ws://").replace(
"https://", "wss://"
)
url = f"{ws_base}/api/v1/events/ws?token={self._api_key}"
while not self._shutting_down:
try:
async with session.ws_connect(url) as ws:
delay = WS_RECONNECT_DELAY # reset on successful connect
_LOGGER.debug("Event stream connected")
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
data = json.loads(msg.data)
except json.JSONDecodeError:
continue
if data.get("type") == "state_change":
await self._coordinator.async_request_refresh()
elif msg.type in (
aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.ERROR,
):
break
except asyncio.CancelledError:
raise
except (aiohttp.ClientError, asyncio.TimeoutError, OSError) as err:
_LOGGER.debug("Event stream connection error: %s", err)
except Exception as err:
_LOGGER.error("Unexpected event stream error: %s", err)
if self._shutting_down:
break
await asyncio.sleep(delay)
delay = min(delay * 2, WS_MAX_RECONNECT_DELAY)
async def shutdown(self) -> None:
"""Stop listening."""
self._shutting_down = True
if self._task:
self._task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._task
self._task = None

View File

@@ -791,6 +791,44 @@ async def target_colors_ws(
manager.remove_kc_ws_client(target_id, websocket) manager.remove_kc_ws_client(target_id, websocket)
# ===== STATE CHANGE EVENT STREAM =====
@router.websocket("/api/v1/events/ws")
async def events_ws(
websocket: WebSocket,
token: str = Query(""),
):
"""WebSocket for real-time state change events. Auth via ?token=<api_key>."""
authenticated = False
cfg = get_config()
if token and cfg.auth.api_keys:
for _label, api_key in cfg.auth.api_keys.items():
if secrets.compare_digest(token, api_key):
authenticated = True
break
if not authenticated:
await websocket.close(code=4001, reason="Unauthorized")
return
await websocket.accept()
manager = get_processor_manager()
queue = manager.subscribe_events()
try:
while True:
event = await queue.get()
await websocket.send_json(event)
except WebSocketDisconnect:
pass
except Exception:
pass
finally:
manager.unsubscribe_events(queue)
# ===== OVERLAY VISUALIZATION ===== # ===== OVERLAY VISUALIZATION =====
@router.post("/api/v1/picture-targets/{target_id}/overlay/start", tags=["Visualization"]) @router.post("/api/v1/picture-targets/{target_id}/overlay/start", tags=["Visualization"])

View File

@@ -8,6 +8,7 @@ from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
import cv2
import httpx import httpx
import numpy as np import numpy as np
@@ -71,45 +72,46 @@ def _process_frame(capture, border_width, pixel_mapper, previous_colors, smoothi
return led_colors, timing_ms return led_colors, timing_ms
def _process_kc_frame(capture, rectangles, calc_fn, previous_colors, smoothing): KC_WORK_SIZE = (160, 90) # (width, height) — small enough for fast color calc
def _process_kc_frame(capture, rect_names, rect_bounds, calc_fn, prev_colors_arr, smoothing):
"""All CPU-bound work for one KC frame (runs in thread pool). """All CPU-bound work for one KC frame (runs in thread pool).
Returns (colors, timing_ms) where colors is a dict {name: (r, g, b)} Returns (colors, colors_arr, timing_ms) where:
and timing_ms is a dict with per-stage timing in milliseconds. - colors is a dict {name: (r, g, b)}
- colors_arr is a (N, 3) float64 array for smoothing continuity
- timing_ms is a dict with per-stage timing in milliseconds.
""" """
t0 = time.perf_counter() t0 = time.perf_counter()
img = capture.image
h, w = img.shape[:2] # Downsample to working resolution — 144x fewer pixels at 1080p
colors = {} small = cv2.resize(capture.image, KC_WORK_SIZE, interpolation=cv2.INTER_AREA)
for rect in rectangles:
px_x = max(0, int(rect.x * w)) # Extract colors for each rectangle from the small image
px_y = max(0, int(rect.y * h)) n = len(rect_names)
px_w = max(1, int(rect.width * w)) colors_arr = np.empty((n, 3), dtype=np.float64)
px_h = max(1, int(rect.height * h)) for i, (y1, y2, x1, x2) in enumerate(rect_bounds):
px_x = min(px_x, w - 1) colors_arr[i] = calc_fn(small[y1:y2, x1:x2])
px_y = min(px_y, h - 1)
px_w = min(px_w, w - px_x)
px_h = min(px_h, h - px_y)
sub_img = img[px_y:px_y + px_h, px_x:px_x + px_w]
colors[rect.name] = calc_fn(sub_img)
t1 = time.perf_counter() t1 = time.perf_counter()
if previous_colors and smoothing > 0:
for name, color in colors.items(): # Vectorized smoothing on (N, 3) array
if name in previous_colors: if prev_colors_arr is not None and smoothing > 0:
prev = previous_colors[name] colors_arr = colors_arr * (1 - smoothing) + prev_colors_arr * smoothing
alpha = smoothing
colors[name] = ( colors_u8 = np.clip(colors_arr, 0, 255).astype(np.uint8)
int(color[0] * (1 - alpha) + prev[0] * alpha),
int(color[1] * (1 - alpha) + prev[1] * alpha),
int(color[2] * (1 - alpha) + prev[2] * alpha),
)
t2 = time.perf_counter() t2 = time.perf_counter()
# Build output dict
colors = {rect_names[i]: tuple(int(c) for c in colors_u8[i]) for i in range(n)}
timing_ms = { timing_ms = {
"calc_colors": (t1 - t0) * 1000, "calc_colors": (t1 - t0) * 1000,
"smooth": (t2 - t1) * 1000, "smooth": (t2 - t1) * 1000,
"total": (t2 - t0) * 1000, "total": (t2 - t0) * 1000,
} }
return colors, timing_ms return colors, colors_arr, timing_ms
@dataclass @dataclass
class ProcessingSettings: class ProcessingSettings:
@@ -239,8 +241,30 @@ class ProcessorManager:
picture_source_store, capture_template_store, pp_template_store picture_source_store, capture_template_store, pp_template_store
) )
self._overlay_manager = OverlayManager() self._overlay_manager = OverlayManager()
self._event_queues: List[asyncio.Queue] = []
logger.info("Processor manager initialized") logger.info("Processor manager initialized")
# ===== EVENT SYSTEM (state change notifications) =====
def subscribe_events(self) -> asyncio.Queue:
"""Subscribe to state change events. Returns queue to read from."""
queue: asyncio.Queue = asyncio.Queue(maxsize=64)
self._event_queues.append(queue)
return queue
def unsubscribe_events(self, queue: asyncio.Queue) -> None:
"""Unsubscribe from events."""
if queue in self._event_queues:
self._event_queues.remove(queue)
def _fire_event(self, event: dict) -> None:
"""Push event to all subscribers (non-blocking)."""
for q in self._event_queues:
try:
q.put_nowait(event)
except asyncio.QueueFull:
pass
async def _get_http_client(self) -> httpx.AsyncClient: async def _get_http_client(self) -> httpx.AsyncClient:
"""Get or create a shared HTTP client for health checks.""" """Get or create a shared HTTP client for health checks."""
if self._http_client is None or self._http_client.is_closed: if self._http_client is None or self._http_client.is_closed:
@@ -622,6 +646,7 @@ class ProcessorManager:
state.is_running = True state.is_running = True
logger.info(f"Started processing for target {target_id}") logger.info(f"Started processing for target {target_id}")
self._fire_event({"type": "state_change", "target_id": target_id, "processing": True})
async def stop_processing(self, target_id: str): async def stop_processing(self, target_id: str):
"""Stop screen processing for a target.""" """Stop screen processing for a target."""
@@ -665,6 +690,7 @@ class ProcessorManager:
state.live_stream = None state.live_stream = None
logger.info(f"Stopped processing for target {target_id}") logger.info(f"Stopped processing for target {target_id}")
self._fire_event({"type": "state_change", "target_id": target_id, "processing": False})
async def _processing_loop(self, target_id: str): async def _processing_loop(self, target_id: str):
"""Main processing loop for a target.""" """Main processing loop for a target."""
@@ -1284,6 +1310,7 @@ class ProcessorManager:
state.is_running = True state.is_running = True
logger.info(f"Started KC processing for target {target_id}") logger.info(f"Started KC processing for target {target_id}")
self._fire_event({"type": "state_change", "target_id": target_id, "processing": True})
async def stop_kc_processing(self, target_id: str) -> None: async def stop_kc_processing(self, target_id: str) -> None:
"""Stop key-colors extraction for a target.""" """Stop key-colors extraction for a target."""
@@ -1315,6 +1342,7 @@ class ProcessorManager:
state.live_stream = None state.live_stream = None
logger.info(f"Stopped KC processing for target {target_id}") logger.info(f"Stopped KC processing for target {target_id}")
self._fire_event({"type": "state_change", "target_id": target_id, "processing": False})
async def _kc_processing_loop(self, target_id: str) -> None: async def _kc_processing_loop(self, target_id: str) -> None:
"""Main processing loop for a key-colors target.""" """Main processing loop for a key-colors target."""
@@ -1342,6 +1370,22 @@ class ProcessorManager:
rectangles = state._resolved_rectangles rectangles = state._resolved_rectangles
# Pre-compute pixel bounds at working resolution (160x90)
kc_w, kc_h = KC_WORK_SIZE
rect_names = [r.name for r in rectangles]
rect_bounds = []
for rect in rectangles:
px_x = max(0, int(rect.x * kc_w))
px_y = max(0, int(rect.y * kc_h))
px_w = max(1, int(rect.width * kc_w))
px_h = max(1, int(rect.height * kc_h))
px_x = min(px_x, kc_w - 1)
px_y = min(px_y, kc_h - 1)
px_w = min(px_w, kc_w - px_x)
px_h = min(px_h, kc_h - px_y)
rect_bounds.append((px_y, px_y + px_h, px_x, px_x + px_w))
prev_colors_arr = None # (N, 3) float64 for vectorized smoothing
logger.info( logger.info(
f"KC processing loop started for target {target_id} " f"KC processing loop started for target {target_id} "
f"(fps={target_fps}, rects={len(rectangles)})" f"(fps={target_fps}, rects={len(rectangles)})"
@@ -1378,13 +1422,13 @@ class ProcessorManager:
prev_capture = capture prev_capture = capture
# CPU-bound work in thread pool # CPU-bound work in thread pool
colors, frame_timing = await asyncio.to_thread( colors, colors_arr, frame_timing = await asyncio.to_thread(
_process_kc_frame, _process_kc_frame,
capture, rectangles, calc_fn, capture, rect_names, rect_bounds, calc_fn,
state.previous_colors, smoothing, prev_colors_arr, smoothing,
) )
state.previous_colors = dict(colors) prev_colors_arr = colors_arr
state.latest_colors = dict(colors) state.latest_colors = dict(colors)
# Broadcast to WebSocket clients # Broadcast to WebSocket clients

View File

@@ -440,16 +440,10 @@
"pattern.description.hint": "Optional notes about where or how this pattern is used", "pattern.description.hint": "Optional notes about where or how this pattern is used",
"pattern.visual_editor.hint": "Click + buttons to add rectangles. Drag edges to resize, drag inside to move.", "pattern.visual_editor.hint": "Click + buttons to add rectangles. Drag edges to resize, drag inside to move.",
"pattern.rectangles.hint": "Fine-tune rectangle positions and sizes with exact coordinates (0.0 to 1.0)", "pattern.rectangles.hint": "Fine-tune rectangle positions and sizes with exact coordinates (0.0 to 1.0)",
"overlay": { "overlay.button.show": "Show overlay visualization",
"button": { "overlay.button.hide": "Hide overlay visualization",
"show": "Show overlay visualization", "overlay.started": "Overlay visualization started",
"hide": "Hide overlay visualization" "overlay.stopped": "Overlay visualization stopped",
}, "overlay.error.start": "Failed to start overlay",
"started": "Overlay visualization started", "overlay.error.stop": "Failed to stop overlay"
"stopped": "Overlay visualization stopped",
"error": {
"start": "Failed to start overlay",
"stop": "Failed to stop overlay"
}
}
} }

View File

@@ -440,16 +440,10 @@
"pattern.description.hint": "Необязательные заметки о назначении этого паттерна", "pattern.description.hint": "Необязательные заметки о назначении этого паттерна",
"pattern.visual_editor.hint": "Нажмите кнопки + чтобы добавить прямоугольники. Тяните края для изменения размера, тяните внутри для перемещения.", "pattern.visual_editor.hint": "Нажмите кнопки + чтобы добавить прямоугольники. Тяните края для изменения размера, тяните внутри для перемещения.",
"pattern.rectangles.hint": "Точная настройка позиций и размеров прямоугольников в координатах (0.0 до 1.0)", "pattern.rectangles.hint": "Точная настройка позиций и размеров прямоугольников в координатах (0.0 до 1.0)",
"overlay": { "overlay.button.show": "Показать визуализацию наложения",
"button": { "overlay.button.hide": "Скрыть визуализацию наложения",
"show": "Показать визуализацию наложения", "overlay.started": "Визуализация наложения запущена",
"hide": "Скрыть визуализацию наложения" "overlay.stopped": "Визуализация наложения остановлена",
}, "overlay.error.start": "Не удалось запустить наложение",
"started": "Визуализация наложения запущена", "overlay.error.stop": "Не удалось остановить наложение"
"stopped": "Визуализация наложения остановлена",
"error": {
"start": "Не удалось запустить наложение",
"stop": "Не удалось остановить наложение"
}
}
} }