Add Key Colors target type for extracting colors from screen regions

Introduce a new "key_colors" target type alongside WLED targets, enabling
real-time color extraction from configurable screen rectangles with
average/median/dominant modes, temporal smoothing, and WebSocket streaming.

- Split WledPictureTarget into its own module, add KeyColorsPictureTarget
- Add KC target lifecycle to ProcessorManager (register, start/stop, processing loop)
- Extend API routes and schemas for KC targets (CRUD, settings, state, metrics, colors)
- Add WebSocket endpoint for real-time color updates with auth
- Add KC sub-tab in Targets UI with editor modal and live color swatches
- Add EN and RU translations for all key colors strings

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-12 16:43:09 +03:00
parent 3d2393e474
commit 5f9bc9a37e
13 changed files with 1525 additions and 111 deletions

View File

@@ -1,6 +1,7 @@
"""Processing manager for coordinating screen capture and WLED updates."""
import asyncio
import json
import time
from dataclasses import dataclass, field
from datetime import datetime
@@ -18,7 +19,12 @@ from wled_controller.core.capture_engines.base import ScreenCapture
from wled_controller.core.live_stream import LiveStream
from wled_controller.core.live_stream_manager import LiveStreamManager
from wled_controller.core.pixel_processor import smooth_colors
from wled_controller.core.screen_capture import extract_border_pixels
from wled_controller.core.screen_capture import (
calculate_average_color,
calculate_dominant_color,
calculate_median_color,
extract_border_pixels,
)
from wled_controller.core.wled_client import WLEDClient
from wled_controller.utils import get_logger
@@ -125,6 +131,23 @@ class TargetState:
wled_state_before: Optional[dict] = None
@dataclass
class KeyColorsTargetState:
"""State of a running key-colors extractor processor."""
target_id: str
picture_source_id: str
settings: "KeyColorsSettings" # forward ref, resolved at runtime
is_running: bool = False
task: Optional[asyncio.Task] = None
metrics: ProcessingMetrics = field(default_factory=ProcessingMetrics)
live_stream: Optional[LiveStream] = None
previous_colors: Optional[Dict[str, Tuple[int, int, int]]] = None
latest_colors: Optional[Dict[str, Tuple[int, int, int]]] = None
ws_clients: list = field(default_factory=list) # List[WebSocket]
resolved_target_fps: Optional[int] = None
class ProcessorManager:
"""Manages screen processing for multiple WLED devices.
@@ -136,6 +159,7 @@ class ProcessorManager:
"""Initialize processor manager."""
self._devices: Dict[str, DeviceState] = {}
self._targets: Dict[str, TargetState] = {}
self._kc_targets: Dict[str, KeyColorsTargetState] = {}
self._health_monitoring_active = False
self._http_client: Optional[httpx.AsyncClient] = None
self._picture_source_store = picture_source_store
@@ -849,7 +873,7 @@ class ProcessorManager:
# Stop health monitoring
await self.stop_health_monitoring()
# Stop all targets
# Stop all WLED targets
target_ids = list(self._targets.keys())
for target_id in target_ids:
if self._targets[target_id].is_running:
@@ -858,6 +882,15 @@ class ProcessorManager:
except Exception as e:
logger.error(f"Error stopping target {target_id}: {e}")
# Stop all key-colors targets
kc_ids = list(self._kc_targets.keys())
for target_id in kc_ids:
if self._kc_targets[target_id].is_running:
try:
await self.stop_kc_processing(target_id)
except Exception as e:
logger.error(f"Error stopping KC target {target_id}: {e}")
# Safety net: release any remaining managed live streams
self._live_stream_manager.release_all()
@@ -971,3 +1004,302 @@ class ProcessorManager:
wled_led_type=state.health.wled_led_type,
error=str(e),
)
# ===== KEY COLORS TARGET MANAGEMENT =====
def add_kc_target(self, target_id: str, picture_source_id: str, settings) -> None:
"""Register a key-colors target for processing."""
if target_id in self._kc_targets:
raise ValueError(f"KC target {target_id} already registered")
self._kc_targets[target_id] = KeyColorsTargetState(
target_id=target_id,
picture_source_id=picture_source_id,
settings=settings,
)
logger.info(f"Registered KC target: {target_id}")
def remove_kc_target(self, target_id: str) -> None:
"""Unregister a key-colors target."""
if target_id not in self._kc_targets:
raise ValueError(f"KC target {target_id} not found")
state = self._kc_targets[target_id]
if state.is_running:
raise ValueError(f"Cannot remove KC target {target_id}: still running")
del self._kc_targets[target_id]
logger.info(f"Removed KC target: {target_id}")
def update_kc_target_settings(self, target_id: str, settings) -> None:
"""Update settings for a key-colors target."""
if target_id not in self._kc_targets:
raise ValueError(f"KC target {target_id} not found")
self._kc_targets[target_id].settings = settings
logger.info(f"Updated KC target settings: {target_id}")
def update_kc_target_source(self, target_id: str, picture_source_id: str) -> None:
"""Update picture source for a key-colors target."""
if target_id not in self._kc_targets:
raise ValueError(f"KC target {target_id} not found")
self._kc_targets[target_id].picture_source_id = picture_source_id
logger.info(f"Updated KC target source: {target_id}")
async def start_kc_processing(self, target_id: str) -> None:
"""Start key-colors extraction for a target."""
if target_id not in self._kc_targets:
raise ValueError(f"KC target {target_id} not found")
state = self._kc_targets[target_id]
if state.is_running:
raise ValueError(f"KC target {target_id} is already running")
if not state.picture_source_id:
raise ValueError(f"KC target {target_id} has no picture source assigned")
if not state.settings.rectangles:
raise ValueError(f"KC target {target_id} has no rectangles defined")
# Acquire live stream
try:
live_stream = await asyncio.to_thread(
self._live_stream_manager.acquire, state.picture_source_id
)
state.live_stream = live_stream
state.resolved_target_fps = live_stream.target_fps
logger.info(
f"Acquired live stream for KC target {target_id} "
f"(picture_source={state.picture_source_id})"
)
except Exception as e:
logger.error(f"Failed to initialize live stream for KC target {target_id}: {e}")
raise RuntimeError(f"Failed to initialize live stream: {e}")
# Reset metrics
state.metrics = ProcessingMetrics(start_time=datetime.utcnow())
state.previous_colors = None
state.latest_colors = None
# Start processing task
state.task = asyncio.create_task(self._kc_processing_loop(target_id))
state.is_running = True
logger.info(f"Started KC processing for target {target_id}")
async def stop_kc_processing(self, target_id: str) -> None:
"""Stop key-colors extraction for a target."""
if target_id not in self._kc_targets:
raise ValueError(f"KC target {target_id} not found")
state = self._kc_targets[target_id]
if not state.is_running:
logger.warning(f"KC processing not running for target {target_id}")
return
state.is_running = False
# Cancel task
if state.task:
state.task.cancel()
try:
await state.task
except asyncio.CancelledError:
pass
state.task = None
# Release live stream
if state.live_stream:
try:
self._live_stream_manager.release(state.picture_source_id)
except Exception as e:
logger.warning(f"Error releasing live stream for KC target: {e}")
state.live_stream = None
logger.info(f"Stopped KC processing for target {target_id}")
async def _kc_processing_loop(self, target_id: str) -> None:
"""Main processing loop for a key-colors target."""
state = self._kc_targets[target_id]
settings = state.settings
target_fps = state.resolved_target_fps or settings.fps
smoothing = settings.smoothing
# Select color calculation function
calc_fns = {
"average": calculate_average_color,
"median": calculate_median_color,
"dominant": calculate_dominant_color,
}
calc_fn = calc_fns.get(settings.interpolation_mode, calculate_average_color)
frame_time = 1.0 / target_fps
fps_samples: List[float] = []
logger.info(
f"KC processing loop started for target {target_id} "
f"(fps={target_fps}, rects={len(settings.rectangles)})"
)
try:
while state.is_running:
loop_start = time.time()
try:
capture = await asyncio.to_thread(state.live_stream.get_latest_frame)
if capture is None:
await asyncio.sleep(frame_time)
continue
img = capture.image
h, w = img.shape[:2]
colors: Dict[str, Tuple[int, int, int]] = {}
for rect in settings.rectangles:
# Convert relative coords to pixel coords
px_x = max(0, int(rect.x * w))
px_y = max(0, int(rect.y * h))
px_w = max(1, int(rect.width * w))
px_h = max(1, int(rect.height * h))
# Clamp to image bounds
px_x = min(px_x, w - 1)
px_y = min(px_y, h - 1)
px_w = min(px_w, w - px_x)
px_h = min(px_h, h - px_y)
# Extract sub-image and compute color
sub_img = img[px_y:px_y + px_h, px_x:px_x + px_w]
color = calc_fn(sub_img)
colors[rect.name] = color
# Apply per-rectangle temporal smoothing
if state.previous_colors and smoothing > 0:
for name, color in colors.items():
if name in state.previous_colors:
prev = state.previous_colors[name]
alpha = smoothing
colors[name] = (
int(color[0] * (1 - alpha) + prev[0] * alpha),
int(color[1] * (1 - alpha) + prev[1] * alpha),
int(color[2] * (1 - alpha) + prev[2] * alpha),
)
state.previous_colors = dict(colors)
state.latest_colors = dict(colors)
# Broadcast to WebSocket clients
await self._broadcast_kc_colors(target_id, colors)
# Update metrics
state.metrics.frames_processed += 1
state.metrics.last_update = datetime.utcnow()
loop_time = time.time() - loop_start
fps_samples.append(1.0 / loop_time if loop_time > 0 else 0)
if len(fps_samples) > 10:
fps_samples.pop(0)
state.metrics.fps_actual = sum(fps_samples) / len(fps_samples)
except Exception as e:
state.metrics.errors_count += 1
state.metrics.last_error = str(e)
logger.error(f"KC processing error for {target_id}: {e}", exc_info=True)
# FPS control
elapsed = time.time() - loop_start
sleep_time = max(0, frame_time - elapsed)
if sleep_time > 0:
await asyncio.sleep(sleep_time)
except asyncio.CancelledError:
logger.info(f"KC processing loop cancelled for target {target_id}")
raise
except Exception as e:
logger.error(f"Fatal error in KC processing loop for target {target_id}: {e}")
state.is_running = False
raise
finally:
logger.info(f"KC processing loop ended for target {target_id}")
async def _broadcast_kc_colors(self, target_id: str, colors: Dict[str, Tuple[int, int, int]]) -> None:
"""Broadcast extracted colors to WebSocket clients."""
state = self._kc_targets.get(target_id)
if not state or not state.ws_clients:
return
message = json.dumps({
"type": "colors_update",
"target_id": target_id,
"colors": {
name: {"r": c[0], "g": c[1], "b": c[2]}
for name, c in colors.items()
},
"timestamp": datetime.utcnow().isoformat(),
})
disconnected = []
for ws in state.ws_clients:
try:
await ws.send_text(message)
except Exception:
disconnected.append(ws)
for ws in disconnected:
state.ws_clients.remove(ws)
def add_kc_ws_client(self, target_id: str, ws) -> None:
"""Add a WebSocket client for KC color updates."""
if target_id not in self._kc_targets:
raise ValueError(f"KC target {target_id} not found")
self._kc_targets[target_id].ws_clients.append(ws)
def remove_kc_ws_client(self, target_id: str, ws) -> None:
"""Remove a WebSocket client."""
state = self._kc_targets.get(target_id)
if state and ws in state.ws_clients:
state.ws_clients.remove(ws)
def get_kc_target_state(self, target_id: str) -> dict:
"""Get current state for a KC target."""
if target_id not in self._kc_targets:
raise ValueError(f"KC target {target_id} not found")
state = self._kc_targets[target_id]
return {
"target_id": target_id,
"processing": state.is_running,
"fps_actual": round(state.metrics.fps_actual, 1) if state.is_running else None,
"fps_target": state.resolved_target_fps or state.settings.fps,
"last_update": state.metrics.last_update.isoformat() if state.metrics.last_update else None,
"errors": [state.metrics.last_error] if state.metrics.last_error else [],
}
def get_kc_target_metrics(self, target_id: str) -> dict:
"""Get metrics for a KC target."""
if target_id not in self._kc_targets:
raise ValueError(f"KC target {target_id} not found")
state = self._kc_targets[target_id]
uptime = 0.0
if state.metrics.start_time and state.is_running:
uptime = (datetime.utcnow() - state.metrics.start_time).total_seconds()
return {
"target_id": target_id,
"processing": state.is_running,
"fps_actual": round(state.metrics.fps_actual, 1),
"fps_target": state.resolved_target_fps or state.settings.fps,
"uptime_seconds": round(uptime, 1),
"frames_processed": state.metrics.frames_processed,
"errors_count": state.metrics.errors_count,
"last_error": state.metrics.last_error,
"last_update": state.metrics.last_update.isoformat() if state.metrics.last_update else None,
}
def get_kc_latest_colors(self, target_id: str) -> Dict[str, Tuple[int, int, int]]:
"""Get latest extracted colors for a KC target."""
if target_id not in self._kc_targets:
raise ValueError(f"KC target {target_id} not found")
return self._kc_targets[target_id].latest_colors or {}
def is_kc_target(self, target_id: str) -> bool:
"""Check if a target ID belongs to a KC target."""
return target_id in self._kc_targets