Introduce ColorStripSource as first-class entity

Extracts color processing and calibration out of WledPictureTarget into a
new PictureColorStripSource entity, enabling multiple LED targets to share
one capture/processing pipeline.

New entities & processing:
- storage/color_strip_source.py: ColorStripSource + PictureColorStripSource models
- storage/color_strip_store.py: JSON-backed CRUD store (prefix css_)
- core/processing/color_strip_stream.py: ColorStripStream ABC + PictureColorStripStream (runs border-extract → map → smooth → brightness/sat/gamma in background thread)
- core/processing/color_strip_stream_manager.py: ref-counted shared stream manager

Modified storage/processing:
- WledPictureTarget simplified to device_id + color_strip_source_id + standby_interval + state_check_interval
- Device model: calibration field removed
- WledTargetProcessor: acquires ColorStripStream from manager instead of running its own pipeline
- ProcessorManager: wires ColorStripStreamManager into TargetContext

API layer:
- New routes: GET/POST/PUT/DELETE /api/v1/color-strip-sources, PUT calibration/test
- Removed calibration endpoints from /devices
- Updated /picture-targets CRUD for new target structure

Frontend:
- New color-strips.js module with CSS editor modal and card rendering
- Calibration modal extended with CSS mode (css-id hidden field + device picker)
- targets.js: Color Strip Sources section added to LED tab; target editor/card updated
- app.js: imports and window globals for CSS + showCSSCalibration
- en.json / ru.json: color_strip.* and targets.section.color_strips keys added

Data migration runs at startup: existing WledPictureTargets are converted to
reference a new PictureColorStripSource created from their old settings.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-20 15:49:47 +03:00
parent c4e0257389
commit 7de3546b14
33 changed files with 2325 additions and 814 deletions

View File

@@ -1,10 +1,6 @@
"""Target processing pipeline."""
from wled_controller.core.processing.processor_manager import ProcessorManager
from wled_controller.core.processing.processing_settings import (
DEFAULT_STATE_CHECK_INTERVAL,
ProcessingSettings,
)
from wled_controller.core.processing.target_processor import (
DeviceInfo,
ProcessingMetrics,
@@ -13,10 +9,8 @@ from wled_controller.core.processing.target_processor import (
)
__all__ = [
"DEFAULT_STATE_CHECK_INTERVAL",
"DeviceInfo",
"ProcessingMetrics",
"ProcessingSettings",
"ProcessorManager",
"TargetContext",
"TargetProcessor",

View File

@@ -0,0 +1,321 @@
"""Color strip stream — produces LED color arrays from a source.
A ColorStripStream is the runtime counterpart of ColorStripSource.
It continuously computes and caches a fresh np.ndarray (led_count, 3) uint8
by processing frames from a LiveStream.
Multiple WledTargetProcessors may read from the same ColorStripStream instance
(shared via ColorStripStreamManager reference counting), meaning the CPU-bound
processing — border extraction, pixel mapping, color correction — runs only once
even when multiple devices share the same source configuration.
"""
import threading
import time
from abc import ABC, abstractmethod
from typing import Optional
import numpy as np
from wled_controller.core.capture.calibration import CalibrationConfig, PixelMapper
from wled_controller.core.capture.screen_capture import extract_border_pixels
from wled_controller.core.processing.live_stream import LiveStream
from wled_controller.utils import get_logger
logger = get_logger(__name__)
def _apply_saturation(colors: np.ndarray, saturation: float) -> np.ndarray:
"""Adjust saturation via luminance mixing (Rec.601 weights).
saturation=1.0: no change
saturation=0.0: grayscale
saturation=2.0: double saturation (clipped to 0-255)
"""
gray = (
colors[:, 0].astype(np.int32) * 299
+ colors[:, 1].astype(np.int32) * 587
+ colors[:, 2].astype(np.int32) * 114
) // 1000
gray = gray[:, np.newaxis] # (N, 1) for broadcast
result = gray + saturation * (colors.astype(np.int32) - gray)
return np.clip(result, 0, 255).astype(np.uint8)
def _build_gamma_lut(gamma: float) -> np.ndarray:
"""Build a 256-entry uint8 LUT for gamma correction.
gamma=1.0: identity (no correction)
gamma<1.0: brighter midtones
gamma>1.0: darker midtones
"""
if gamma == 1.0:
return np.arange(256, dtype=np.uint8)
lut = np.array(
[min(255, int(((i / 255.0) ** (1.0 / gamma)) * 255 + 0.5)) for i in range(256)],
dtype=np.uint8,
)
return lut
class ColorStripStream(ABC):
"""Abstract base: a runtime source of LED color arrays.
Produces a continuous stream of np.ndarray (led_count, 3) uint8 values.
Consumers call get_latest_colors() (non-blocking) to read the most recent
computed frame.
"""
@property
@abstractmethod
def target_fps(self) -> int:
"""Target processing rate."""
@property
@abstractmethod
def led_count(self) -> int:
"""Number of LEDs this stream produces colors for."""
@property
def display_index(self) -> Optional[int]:
"""Display index of the underlying capture, or None."""
return None
@property
def calibration(self) -> Optional[CalibrationConfig]:
"""Calibration config, or None if not applicable."""
return None
@abstractmethod
def start(self) -> None:
"""Start producing colors."""
@abstractmethod
def stop(self) -> None:
"""Stop producing colors and release resources."""
@abstractmethod
def get_latest_colors(self) -> Optional[np.ndarray]:
"""Get the most recent LED color array (led_count, 3) uint8, or None."""
def get_last_timing(self) -> dict:
"""Return per-stage timing from the last processed frame (ms)."""
return {}
def update_source(self, source) -> None:
"""Hot-update processing parameters. No-op by default."""
class PictureColorStripStream(ColorStripStream):
"""Color strip stream backed by a LiveStream (picture source).
Runs a background thread that:
1. Reads the latest frame from the LiveStream
2. Extracts border pixels using the calibration's border_width
3. Maps border pixels to LED colors via PixelMapper
4. Applies temporal smoothing
5. Applies saturation correction
6. Applies gamma correction (LUT-based, O(1) per pixel)
7. Applies brightness scaling
8. Caches the result for lock-free consumer reads
Processing parameters can be hot-updated via update_source() without
restarting the thread (except when the underlying LiveStream changes).
"""
def __init__(self, live_stream: LiveStream, source):
"""
Args:
live_stream: Acquired LiveStream (lifecycle managed by ColorStripStreamManager)
source: PictureColorStripSource config
"""
from wled_controller.storage.color_strip_source import PictureColorStripSource
self._live_stream = live_stream
self._fps: int = source.fps
self._smoothing: float = source.smoothing
self._brightness: float = source.brightness
self._saturation: float = source.saturation
self._gamma: float = source.gamma
self._interpolation_mode: str = source.interpolation_mode
self._calibration: CalibrationConfig = source.calibration
self._pixel_mapper = PixelMapper(
self._calibration, interpolation_mode=self._interpolation_mode
)
self._led_count: int = self._calibration.get_total_leds()
self._gamma_lut: np.ndarray = _build_gamma_lut(self._gamma)
# Thread-safe color cache
self._latest_colors: Optional[np.ndarray] = None
self._colors_lock = threading.Lock()
self._previous_colors: Optional[np.ndarray] = None
self._running = False
self._thread: Optional[threading.Thread] = None
self._last_timing: dict = {}
@property
def target_fps(self) -> int:
return self._fps
@property
def led_count(self) -> int:
return self._led_count
@property
def display_index(self) -> Optional[int]:
return self._live_stream.display_index
@property
def calibration(self) -> Optional[CalibrationConfig]:
return self._calibration
def start(self) -> None:
if self._running:
return
self._running = True
self._thread = threading.Thread(
target=self._processing_loop,
name="css-picture-stream",
daemon=True,
)
self._thread.start()
logger.info(
f"PictureColorStripStream started (fps={self._fps}, leds={self._led_count})"
)
def stop(self) -> None:
self._running = False
if self._thread:
self._thread.join(timeout=5.0)
if self._thread.is_alive():
logger.warning("PictureColorStripStream thread did not terminate within 5s")
self._thread = None
self._latest_colors = None
self._previous_colors = None
logger.info("PictureColorStripStream stopped")
def get_latest_colors(self) -> Optional[np.ndarray]:
with self._colors_lock:
return self._latest_colors
def get_last_timing(self) -> dict:
return dict(self._last_timing)
def update_source(self, source) -> None:
"""Hot-update processing parameters. Thread-safe for scalar params.
PixelMapper is rebuilt atomically if calibration or interpolation_mode changed.
"""
from wled_controller.storage.color_strip_source import PictureColorStripSource
if not isinstance(source, PictureColorStripSource):
return
self._fps = source.fps
self._smoothing = source.smoothing
self._brightness = source.brightness
self._saturation = source.saturation
if source.gamma != self._gamma:
self._gamma = source.gamma
self._gamma_lut = _build_gamma_lut(source.gamma)
if (
source.interpolation_mode != self._interpolation_mode
or source.calibration != self._calibration
):
self._interpolation_mode = source.interpolation_mode
self._calibration = source.calibration
self._led_count = source.calibration.get_total_leds()
self._pixel_mapper = PixelMapper(
source.calibration, interpolation_mode=source.interpolation_mode
)
self._previous_colors = None # Reset smoothing history on calibration change
logger.info("PictureColorStripStream params updated in-place")
def _processing_loop(self) -> None:
"""Background thread: poll source, process, cache colors."""
cached_frame = None
while self._running:
loop_start = time.perf_counter()
fps = self._fps
frame_time = 1.0 / fps if fps > 0 else 1.0
try:
frame = self._live_stream.get_latest_frame()
if frame is None or frame is cached_frame:
elapsed = time.perf_counter() - loop_start
time.sleep(max(frame_time - elapsed, 0.001))
continue
cached_frame = frame
t0 = time.perf_counter()
calibration = self._calibration
border_pixels = extract_border_pixels(frame, calibration.border_width)
t1 = time.perf_counter()
led_colors = self._pixel_mapper.map_border_to_leds(border_pixels)
t2 = time.perf_counter()
# Temporal smoothing
smoothing = self._smoothing
if (
self._previous_colors is not None
and smoothing > 0
and len(self._previous_colors) == len(led_colors)
):
alpha = int(smoothing * 256)
led_colors = (
(256 - alpha) * led_colors.astype(np.uint16)
+ alpha * self._previous_colors.astype(np.uint16)
) >> 8
led_colors = led_colors.astype(np.uint8)
t3 = time.perf_counter()
# Saturation
saturation = self._saturation
if saturation != 1.0:
led_colors = _apply_saturation(led_colors, saturation)
t4 = time.perf_counter()
# Gamma (LUT lookup — O(1) per pixel)
if self._gamma != 1.0:
led_colors = self._gamma_lut[led_colors]
t5 = time.perf_counter()
# Brightness
brightness = self._brightness
if brightness != 1.0:
led_colors = np.clip(
led_colors.astype(np.float32) * brightness, 0, 255
).astype(np.uint8)
t6 = time.perf_counter()
self._previous_colors = led_colors
with self._colors_lock:
self._latest_colors = led_colors
self._last_timing = {
"extract_ms": (t1 - t0) * 1000,
"map_leds_ms": (t2 - t1) * 1000,
"smooth_ms": (t3 - t2) * 1000,
"saturation_ms": (t4 - t3) * 1000,
"gamma_ms": (t5 - t4) * 1000,
"brightness_ms": (t6 - t5) * 1000,
"total_ms": (t6 - t0) * 1000,
}
except Exception as e:
logger.error(f"PictureColorStripStream processing error: {e}", exc_info=True)
elapsed = time.perf_counter() - loop_start
remaining = frame_time - elapsed
if remaining > 0:
time.sleep(remaining)

View File

@@ -0,0 +1,192 @@
"""Shared color strip stream management with reference counting.
ColorStripStreamManager creates PictureColorStripStream instances from
ColorStripSource configs and shares them across multiple consumers (LED targets).
When multiple targets reference the same ColorStripSource, they share a single
stream instance — processing runs once, not once per target.
Reference counting ensures streams are cleaned up when the last consumer
releases them.
"""
from dataclasses import dataclass
from typing import Dict, Optional
from wled_controller.core.processing.color_strip_stream import (
ColorStripStream,
PictureColorStripStream,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
@dataclass
class _ColorStripEntry:
"""Internal tracking entry for a managed color strip stream."""
stream: ColorStripStream
ref_count: int
# ID of the picture source whose LiveStream we acquired (for release)
picture_source_id: str
class ColorStripStreamManager:
"""Manages shared ColorStripStream instances with reference counting.
Multiple LED targets using the same ColorStripSource share a single
ColorStripStream. Streams are created on first acquire and cleaned up
when the last consumer releases.
"""
def __init__(self, color_strip_store, live_stream_manager):
"""
Args:
color_strip_store: ColorStripStore for resolving source configs
live_stream_manager: LiveStreamManager for acquiring picture streams
"""
self._color_strip_store = color_strip_store
self._live_stream_manager = live_stream_manager
self._streams: Dict[str, _ColorStripEntry] = {}
def acquire(self, css_id: str) -> ColorStripStream:
"""Get or create a ColorStripStream for the given ColorStripSource.
If a stream already exists for this css_id, increments the reference
count and returns the existing instance.
Otherwise, loads the ColorStripSource config, acquires the underlying
LiveStream, creates a new ColorStripStream, starts it, and stores it
with ref_count=1.
Args:
css_id: ID of the ColorStripSource config
Returns:
ColorStripStream instance (shared if already running)
Raises:
ValueError: If ColorStripSource not found or type unsupported
RuntimeError: If stream creation/start fails
"""
if css_id in self._streams:
entry = self._streams[css_id]
entry.ref_count += 1
logger.info(
f"Reusing color strip stream for source {css_id} "
f"(ref_count={entry.ref_count})"
)
return entry.stream
from wled_controller.storage.color_strip_source import PictureColorStripSource
source = self._color_strip_store.get_source(css_id)
if not isinstance(source, PictureColorStripSource):
raise ValueError(
f"Unsupported color strip source type '{source.source_type}' for {css_id}"
)
if not source.picture_source_id:
raise ValueError(
f"Color strip source {css_id} has no picture_source_id assigned"
)
# Acquire the underlying live stream (ref-counted)
live_stream = self._live_stream_manager.acquire(source.picture_source_id)
try:
css_stream = PictureColorStripStream(live_stream, source)
css_stream.start()
except Exception as e:
self._live_stream_manager.release(source.picture_source_id)
raise RuntimeError(
f"Failed to start color strip stream for source {css_id}: {e}"
) from e
self._streams[css_id] = _ColorStripEntry(
stream=css_stream,
ref_count=1,
picture_source_id=source.picture_source_id,
)
logger.info(f"Created color strip stream for source {css_id}")
return css_stream
def release(self, css_id: str) -> None:
"""Release a reference to a ColorStripStream.
Decrements the reference count. When it reaches 0, stops the stream,
releases the underlying LiveStream, and removes from registry.
Args:
css_id: ID of the ColorStripSource to release
"""
entry = self._streams.get(css_id)
if not entry:
logger.warning(f"Attempted to release unknown color strip stream: {css_id}")
return
entry.ref_count -= 1
logger.debug(f"Released color strip stream {css_id} (ref_count={entry.ref_count})")
if entry.ref_count <= 0:
try:
entry.stream.stop()
except Exception as e:
logger.error(f"Error stopping color strip stream {css_id}: {e}")
picture_source_id = entry.picture_source_id
del self._streams[css_id]
logger.info(f"Removed color strip stream for source {css_id}")
# Release the underlying live stream
self._live_stream_manager.release(picture_source_id)
def update_source(self, css_id: str, new_source) -> None:
"""Hot-update processing params on a running stream.
If the picture_source_id changed, only updates in-place params;
the underlying LiveStream is not swapped (requires target restart
to take full effect).
Args:
css_id: ID of the ColorStripSource
new_source: Updated ColorStripSource config
"""
entry = self._streams.get(css_id)
if not entry:
return # Stream not running; config will be used on next acquire
entry.stream.update_source(new_source)
# Track picture_source_id change for future reference counting
from wled_controller.storage.color_strip_source import PictureColorStripSource
if isinstance(new_source, PictureColorStripSource):
if new_source.picture_source_id != entry.picture_source_id:
logger.info(
f"CSS {css_id}: picture_source_id changed — "
f"restart target to use new source"
)
logger.info(f"Updated running color strip stream {css_id}")
def release_all(self) -> None:
"""Stop and remove all managed color strip streams. Called on shutdown."""
css_ids = list(self._streams.keys())
for css_id in css_ids:
entry = self._streams.get(css_id)
if entry:
try:
entry.stream.stop()
except Exception as e:
logger.error(f"Error stopping color strip stream {css_id}: {e}")
self._streams.clear()
logger.info("Released all managed color strip streams")
def get_active_stream_ids(self) -> list:
"""Get list of active stream IDs with ref counts (for diagnostics)."""
return [
{"id": sid, "ref_count": entry.ref_count}
for sid, entry in self._streams.items()
]

View File

@@ -93,7 +93,7 @@ class KCTargetProcessor(TargetProcessor):
settings, # KeyColorsSettings
ctx: TargetContext,
):
super().__init__(target_id, picture_source_id, ctx)
super().__init__(target_id, ctx, picture_source_id)
self._settings = settings
# Runtime state

View File

@@ -6,10 +6,7 @@ from typing import Dict, List, Optional, Tuple
import httpx
from wled_controller.core.capture.calibration import (
CalibrationConfig,
create_default_calibration,
)
from wled_controller.core.capture.calibration import CalibrationConfig
from wled_controller.core.devices.led_client import (
DeviceHealth,
check_device_health,
@@ -17,11 +14,8 @@ from wled_controller.core.devices.led_client import (
get_provider,
)
from wled_controller.core.processing.live_stream_manager import LiveStreamManager
from wled_controller.core.processing.color_strip_stream_manager import ColorStripStreamManager
from wled_controller.core.capture.screen_overlay import OverlayManager
from wled_controller.core.processing.processing_settings import (
DEFAULT_STATE_CHECK_INTERVAL,
ProcessingSettings,
)
from wled_controller.core.processing.target_processor import (
DeviceInfo,
TargetContext,
@@ -33,15 +27,16 @@ from wled_controller.utils import get_logger
logger = get_logger(__name__)
DEFAULT_STATE_CHECK_INTERVAL = 30 # seconds between health checks
@dataclass
class DeviceState:
"""State for a registered LED device (health monitoring + calibration)."""
"""State for a registered LED device (health monitoring)."""
device_id: str
device_url: str
led_count: int
calibration: CalibrationConfig
device_type: str = "wled"
baud_rate: Optional[int] = None
health: DeviceHealth = field(default_factory=DeviceHealth)
@@ -55,6 +50,8 @@ class DeviceState:
# Calibration test mode (works independently of target processing)
test_mode_active: bool = False
test_mode_edges: Dict[str, Tuple[int, int, int]] = field(default_factory=dict)
# Calibration used for the current test (from the CSS being tested)
test_calibration: Optional[CalibrationConfig] = None
# Tracked power state for serial devices (no hardware query)
power_on: bool = True
@@ -62,11 +59,11 @@ class DeviceState:
class ProcessorManager:
"""Manages devices and delegates target processing to TargetProcessor instances.
Devices are registered for health monitoring and calibration.
Devices are registered for health monitoring.
Targets are registered for processing via polymorphic TargetProcessor subclasses.
"""
def __init__(self, picture_source_store=None, capture_template_store=None, pp_template_store=None, pattern_template_store=None, device_store=None):
def __init__(self, picture_source_store=None, capture_template_store=None, pp_template_store=None, pattern_template_store=None, device_store=None, color_strip_store=None):
"""Initialize processor manager."""
self._devices: Dict[str, DeviceState] = {}
self._processors: Dict[str, TargetProcessor] = {}
@@ -78,9 +75,14 @@ class ProcessorManager:
self._pp_template_store = pp_template_store
self._pattern_template_store = pattern_template_store
self._device_store = device_store
self._color_strip_store = color_strip_store
self._live_stream_manager = LiveStreamManager(
picture_source_store, capture_template_store, pp_template_store
)
self._color_strip_stream_manager = ColorStripStreamManager(
color_strip_store=color_strip_store,
live_stream_manager=self._live_stream_manager,
)
self._overlay_manager = OverlayManager()
self._event_queues: List[asyncio.Queue] = []
logger.info("Processor manager initialized")
@@ -97,6 +99,7 @@ class ProcessorManager:
pp_template_store=self._pp_template_store,
pattern_template_store=self._pattern_template_store,
device_store=self._device_store,
color_strip_stream_manager=self._color_strip_stream_manager,
fire_event=self._fire_event,
get_device_info=self._get_device_info,
)
@@ -110,7 +113,6 @@ class ProcessorManager:
device_id=ds.device_id,
device_url=ds.device_url,
led_count=ds.led_count,
calibration=ds.calibration,
device_type=ds.device_type,
baud_rate=ds.baud_rate,
software_brightness=ds.software_brightness,
@@ -144,14 +146,13 @@ class ProcessorManager:
self._http_client = httpx.AsyncClient(timeout=5)
return self._http_client
# ===== DEVICE MANAGEMENT (health monitoring + calibration) =====
# ===== DEVICE MANAGEMENT (health monitoring) =====
def add_device(
self,
device_id: str,
device_url: str,
led_count: int,
calibration: Optional[CalibrationConfig] = None,
device_type: str = "wled",
baud_rate: Optional[int] = None,
software_brightness: int = 255,
@@ -162,14 +163,10 @@ class ProcessorManager:
if device_id in self._devices:
raise ValueError(f"Device {device_id} already registered")
if calibration is None:
calibration = create_default_calibration(led_count)
state = DeviceState(
device_id=device_id,
device_url=device_url,
led_count=led_count,
calibration=calibration,
device_type=device_type,
baud_rate=baud_rate,
software_brightness=software_brightness,
@@ -214,34 +211,8 @@ class ProcessorManager:
if baud_rate is not None:
ds.baud_rate = baud_rate
def update_calibration(self, device_id: str, calibration: CalibrationConfig):
"""Update calibration for a device.
Also propagates to any target processor using this device.
"""
if device_id not in self._devices:
raise ValueError(f"Device {device_id} not found")
calibration.validate()
ds = self._devices[device_id]
if calibration.get_total_leds() != ds.led_count:
raise ValueError(
f"Calibration LED count ({calibration.get_total_leds()}) "
f"does not match device LED count ({ds.led_count})"
)
ds.calibration = calibration
# Propagate to active processors using this device
for proc in self._processors.values():
if proc.device_id == device_id:
proc.update_calibration(calibration)
logger.info(f"Updated calibration for device {device_id}")
def get_device_state(self, device_id: str) -> DeviceState:
"""Get device state (for health/calibration info)."""
"""Get device state (for health info)."""
if device_id not in self._devices:
raise ValueError(f"Device {device_id} not found")
return self._devices[device_id]
@@ -298,8 +269,9 @@ class ProcessorManager:
self,
target_id: str,
device_id: str,
settings: Optional[ProcessingSettings] = None,
picture_source_id: str = "",
color_strip_source_id: str = "",
standby_interval: float = 1.0,
state_check_interval: int = DEFAULT_STATE_CHECK_INTERVAL,
):
"""Register a WLED target processor."""
if target_id in self._processors:
@@ -310,8 +282,9 @@ class ProcessorManager:
proc = WledTargetProcessor(
target_id=target_id,
device_id=device_id,
settings=settings or ProcessingSettings(),
picture_source_id=picture_source_id,
color_strip_source_id=color_strip_source_id,
standby_interval=standby_interval,
state_check_interval=state_check_interval,
ctx=self._build_context(),
)
self._processors[target_id] = proc
@@ -357,6 +330,11 @@ class ProcessorManager:
proc = self._get_processor(target_id)
proc.update_source(picture_source_id)
def update_target_color_strip_source(self, target_id: str, color_strip_source_id: str):
"""Update the color strip source for a WLED target."""
proc = self._get_processor(target_id)
proc.update_color_strip_source(color_strip_source_id)
def update_target_device(self, target_id: str, device_id: str):
"""Update the device for a target."""
proc = self._get_processor(target_id)
@@ -499,10 +477,19 @@ class ProcessorManager:
proc = self._get_processor(target_id)
return proc.get_latest_colors()
# ===== CALIBRATION TEST MODE (on device) =====
# ===== CALIBRATION TEST MODE (on device, driven by CSS calibration) =====
async def set_test_mode(self, device_id: str, edges: Dict[str, List[int]]) -> None:
"""Set or clear calibration test mode for a device."""
async def set_test_mode(
self,
device_id: str,
edges: Dict[str, List[int]],
calibration: Optional[CalibrationConfig] = None,
) -> None:
"""Set or clear calibration test mode for a device.
When setting test mode, pass the calibration from the CSS being tested.
When clearing (edges={}), calibration is not needed.
"""
if device_id not in self._devices:
raise ValueError(f"Device {device_id} not found")
@@ -513,10 +500,13 @@ class ProcessorManager:
ds.test_mode_edges = {
edge: tuple(color) for edge, color in edges.items()
}
if calibration is not None:
ds.test_calibration = calibration
await self._send_test_pixels(device_id)
else:
ds.test_mode_active = False
ds.test_mode_edges = {}
ds.test_calibration = None
await self._send_clear_pixels(device_id)
await self._close_idle_client(device_id)
@@ -558,10 +548,16 @@ class ProcessorManager:
async def _send_test_pixels(self, device_id: str) -> None:
"""Build and send test pixel array for active test edges."""
ds = self._devices[device_id]
# Require calibration to know which LEDs map to which edges
if ds.test_calibration is None:
logger.debug(f"No calibration for test mode on {device_id}, skipping LED test")
return
pixels = [(0, 0, 0)] * ds.led_count
for edge_name, color in ds.test_mode_edges.items():
for seg in ds.calibration.segments:
for seg in ds.test_calibration.segments:
if seg.edge == edge_name:
for i in range(seg.led_start, seg.led_start + seg.led_count):
if i < ds.led_count:
@@ -569,8 +565,8 @@ class ProcessorManager:
break
# Apply offset rotation (same as Phase 2 in PixelMapper.map_border_to_leds)
total_leds = ds.calibration.get_total_leds()
offset = ds.calibration.offset % total_leds if total_leds > 0 else 0
total_leds = ds.test_calibration.get_total_leds()
offset = ds.test_calibration.offset % total_leds if total_leds > 0 else 0
if offset > 0:
pixels = pixels[-offset:] + pixels[:-offset]
@@ -701,6 +697,9 @@ class ProcessorManager:
for did in list(self._idle_clients):
await self._close_idle_client(did)
# Safety net: release all color strip streams
self._color_strip_stream_manager.release_all()
# Safety net: release any remaining managed live streams
self._live_stream_manager.release_all()
@@ -780,7 +779,7 @@ class ProcessorManager:
state.device_type, state.device_url, client, state.health,
)
# Auto-sync LED count (preserve existing calibration)
# Auto-sync LED count
reported = state.health.device_led_count
if reported and reported != state.led_count and self._device_store:
old_count = state.led_count

View File

@@ -17,8 +17,7 @@ from datetime import datetime
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Tuple
if TYPE_CHECKING:
import httpx
from wled_controller.core.capture.calibration import CalibrationConfig
from wled_controller.core.processing.color_strip_stream_manager import ColorStripStreamManager
from wled_controller.core.processing.live_stream_manager import LiveStreamManager
from wled_controller.core.capture.screen_overlay import OverlayManager
from wled_controller.storage import DeviceStore
@@ -65,7 +64,6 @@ class DeviceInfo:
device_id: str
device_url: str
led_count: int
calibration: "CalibrationConfig"
device_type: str = "wled"
baud_rate: Optional[int] = None
software_brightness: int = 255
@@ -86,6 +84,7 @@ class TargetContext:
pp_template_store: Optional["PostprocessingTemplateStore"] = None
pattern_template_store: Optional["PatternTemplateStore"] = None
device_store: Optional["DeviceStore"] = None
color_strip_stream_manager: Optional["ColorStripStreamManager"] = None
fire_event: Callable[[dict], None] = lambda e: None
get_device_info: Callable[[str], Optional[DeviceInfo]] = lambda _: None
@@ -100,7 +99,7 @@ class TargetProcessor(ABC):
Lifecycle: register → start → (running loop) → stop → unregister
"""
def __init__(self, target_id: str, picture_source_id: str, ctx: TargetContext):
def __init__(self, target_id: str, ctx: TargetContext, picture_source_id: str = ""):
self._target_id = target_id
self._picture_source_id = picture_source_id
self._ctx = ctx
@@ -161,8 +160,8 @@ class TargetProcessor(ABC):
"""Update device association. Raises for targets without devices."""
raise ValueError(f"Target {self._target_id} does not support device assignment")
def update_calibration(self, calibration) -> None:
"""Update calibration. No-op for targets without devices."""
def update_color_strip_source(self, color_strip_source_id: str) -> None:
"""Update color strip source. No-op for targets that don't use CSS."""
pass
# ----- Device / display info (overridden by device-aware subclasses) -----

View File

@@ -1,24 +1,17 @@
"""WLED/LED target processor — captures screen, maps to LEDs, sends via DDP."""
"""WLED/LED target processor — gets colors from a ColorStripStream, sends via DDP."""
from __future__ import annotations
import asyncio
import collections
import concurrent.futures
import time
from datetime import datetime
from typing import TYPE_CHECKING, Optional
from typing import Optional
import numpy as np
from wled_controller.core.capture.calibration import CalibrationConfig, PixelMapper
from wled_controller.core.devices.led_client import LEDClient, create_led_client
from wled_controller.core.processing.live_stream import LiveStream
from wled_controller.core.processing.processing_settings import ProcessingSettings
from wled_controller.core.capture.screen_capture import (
extract_border_pixels,
get_available_displays,
)
from wled_controller.core.capture.screen_capture import get_available_displays
from wled_controller.core.processing.target_processor import (
DeviceInfo,
ProcessingMetrics,
@@ -27,92 +20,44 @@ from wled_controller.core.processing.target_processor import (
)
from wled_controller.utils import get_logger
if TYPE_CHECKING:
from wled_controller.core.capture_engines.base import ScreenCapture
logger = get_logger(__name__)
_frame_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=4, thread_name_prefix="frame-proc",
)
# ---------------------------------------------------------------------------
# CPU-bound frame processing (runs in dedicated thread-pool executor)
# ---------------------------------------------------------------------------
def _process_frame(capture, border_width, pixel_mapper, previous_colors, smoothing, brightness):
"""All CPU-bound work for one WLED frame.
Returns (raw_colors, send_colors, timing_ms).
raw_colors: unscaled array for smoothing history.
send_colors: brightness-scaled array ready for DDP send.
timing_ms: dict with per-stage timing in milliseconds.
"""
t0 = time.perf_counter()
border_pixels = extract_border_pixels(capture, border_width)
t1 = time.perf_counter()
led_colors = pixel_mapper.map_border_to_leds(border_pixels)
t2 = time.perf_counter()
# Inline numpy smoothing — avoids list↔numpy round-trip
if previous_colors is not None and smoothing > 0 and len(previous_colors) == len(led_colors):
alpha = int(smoothing * 256)
led_colors = (
(256 - alpha) * led_colors.astype(np.uint16)
+ alpha * previous_colors.astype(np.uint16)
) >> 8
led_colors = led_colors.astype(np.uint8)
t3 = time.perf_counter()
# Apply brightness scaling in thread pool (avoids extra array copies on event loop)
if brightness < 255:
send_colors = (led_colors.astype(np.uint16) * brightness >> 8).astype(np.uint8)
else:
send_colors = led_colors
t4 = time.perf_counter()
timing_ms = {
"extract": (t1 - t0) * 1000,
"map_leds": (t2 - t1) * 1000,
"smooth": (t3 - t2) * 1000,
"total": (t4 - t0) * 1000,
}
return led_colors, send_colors, timing_ms
# ---------------------------------------------------------------------------
# WledTargetProcessor
# ---------------------------------------------------------------------------
class WledTargetProcessor(TargetProcessor):
"""Processes screen capture frames and streams LED colors to a WLED/LED device."""
"""Streams LED colors from a ColorStripStream to a WLED/LED device.
The ColorStripStream handles all capture and color processing.
This processor only applies device software_brightness and sends pixels.
"""
def __init__(
self,
target_id: str,
device_id: str,
settings: ProcessingSettings,
picture_source_id: str,
color_strip_source_id: str,
standby_interval: float,
state_check_interval: int,
ctx: TargetContext,
):
super().__init__(target_id, picture_source_id, ctx)
super().__init__(target_id, ctx)
self._device_id = device_id
self._settings = settings
self._color_strip_source_id = color_strip_source_id
self._standby_interval = standby_interval
self._state_check_interval = state_check_interval
# Runtime state (populated on start)
self._led_client: Optional[LEDClient] = None
self._pixel_mapper: Optional[PixelMapper] = None
self._live_stream: Optional[LiveStream] = None
self._previous_colors: Optional[np.ndarray] = None
self._color_strip_stream = None
self._device_state_before: Optional[dict] = None
self._overlay_active = False
# Resolved stream metadata
# Resolved stream metadata (set once stream is acquired)
self._resolved_display_index: Optional[int] = None
self._resolved_target_fps: Optional[int] = None
self._resolved_engine_type: Optional[str] = None
self._resolved_engine_config: Optional[dict] = None
# ----- Properties -----
@@ -120,10 +65,6 @@ class WledTargetProcessor(TargetProcessor):
def device_id(self) -> str:
return self._device_id
@property
def settings(self) -> ProcessingSettings:
return self._settings
@property
def led_client(self) -> Optional[LEDClient]:
return self._led_client
@@ -139,9 +80,6 @@ class WledTargetProcessor(TargetProcessor):
if device_info is None:
raise ValueError(f"Device {self._device_id} not registered")
# Resolve stream settings
self._resolve_stream_settings()
# Connect to LED device
try:
self._led_client = create_led_client(
@@ -154,44 +92,42 @@ class WledTargetProcessor(TargetProcessor):
f"Target {self._target_id} connected to {device_info.device_type} "
f"device ({device_info.led_count} LEDs)"
)
# Snapshot device state before streaming
self._device_state_before = await self._led_client.snapshot_device_state()
except Exception as e:
logger.error(f"Failed to connect to LED device for target {self._target_id}: {e}")
raise RuntimeError(f"Failed to connect to LED device: {e}")
# Acquire live stream
# Acquire color strip stream
css_manager = self._ctx.color_strip_stream_manager
if css_manager is None:
await self._led_client.close()
self._led_client = None
raise RuntimeError("Color strip stream manager not available in context")
if not self._color_strip_source_id:
await self._led_client.close()
self._led_client = None
raise RuntimeError(f"Target {self._target_id} has no color strip source assigned")
try:
live_stream = await asyncio.to_thread(
self._ctx.live_stream_manager.acquire, self._picture_source_id
)
self._live_stream = live_stream
if live_stream.display_index is not None:
self._resolved_display_index = live_stream.display_index
self._resolved_target_fps = live_stream.target_fps
stream = await asyncio.to_thread(css_manager.acquire, self._color_strip_source_id)
self._color_strip_stream = stream
self._resolved_display_index = stream.display_index
self._resolved_target_fps = stream.target_fps
logger.info(
f"Acquired live stream for target {self._target_id} "
f"(picture_source={self._picture_source_id})"
f"Acquired color strip stream for target {self._target_id} "
f"(css={self._color_strip_source_id}, display={self._resolved_display_index}, "
f"fps={self._resolved_target_fps})"
)
except Exception as e:
logger.error(f"Failed to initialize live stream for target {self._target_id}: {e}")
logger.error(f"Failed to acquire color strip stream for target {self._target_id}: {e}")
if self._led_client:
await self._led_client.close()
raise RuntimeError(f"Failed to initialize live stream: {e}")
self._led_client = None
raise RuntimeError(f"Failed to acquire color strip stream: {e}")
# Initialize pixel mapper from current device calibration
calibration = device_info.calibration
self._pixel_mapper = PixelMapper(
calibration,
interpolation_mode=self._settings.interpolation_mode,
)
# Reset metrics
# Reset metrics and start loop
self._metrics = ProcessingMetrics(start_time=datetime.utcnow())
self._previous_colors = None
# Start processing task
self._task = asyncio.create_task(self._processing_loop())
self._is_running = True
@@ -224,80 +160,84 @@ class WledTargetProcessor(TargetProcessor):
await self._led_client.close()
self._led_client = None
# Release live stream
if self._live_stream:
try:
self._ctx.live_stream_manager.release(self._picture_source_id)
except Exception as e:
logger.warning(f"Error releasing live stream: {e}")
self._live_stream = None
# Release color strip stream
if self._color_strip_stream is not None:
css_manager = self._ctx.color_strip_stream_manager
if css_manager and self._color_strip_source_id:
try:
await asyncio.to_thread(css_manager.release, self._color_strip_source_id)
except Exception as e:
logger.warning(f"Error releasing color strip stream for {self._target_id}: {e}")
self._color_strip_stream = None
logger.info(f"Stopped processing for target {self._target_id}")
self._ctx.fire_event({"type": "state_change", "target_id": self._target_id, "processing": False})
# ----- Settings -----
def update_settings(self, settings: ProcessingSettings) -> None:
self._settings = settings
# Recreate pixel mapper if interpolation mode changed
if self._pixel_mapper:
device_info = self._ctx.get_device_info(self._device_id)
if device_info:
self._pixel_mapper = PixelMapper(
device_info.calibration,
interpolation_mode=settings.interpolation_mode,
)
def update_settings(self, settings: dict) -> None:
"""Update target-specific timing settings."""
if isinstance(settings, dict):
if "standby_interval" in settings:
self._standby_interval = settings["standby_interval"]
if "state_check_interval" in settings:
self._state_check_interval = settings["state_check_interval"]
logger.info(f"Updated settings for target {self._target_id}")
def update_device(self, device_id: str) -> None:
"""Update the device this target streams to."""
self._device_id = device_id
def update_calibration(self, calibration: CalibrationConfig) -> None:
"""Update the cached calibration + rebuild pixel mapper."""
if self._pixel_mapper:
self._pixel_mapper = PixelMapper(
calibration,
interpolation_mode=self._settings.interpolation_mode,
)
def update_color_strip_source(self, color_strip_source_id: str) -> None:
"""Hot-swap the color strip source for a running target."""
if not self._is_running or self._color_strip_source_id == color_strip_source_id:
self._color_strip_source_id = color_strip_source_id
return
css_manager = self._ctx.color_strip_stream_manager
if css_manager is None:
self._color_strip_source_id = color_strip_source_id
return
old_id = self._color_strip_source_id
try:
new_stream = css_manager.acquire(color_strip_source_id)
css_manager.release(old_id)
self._color_strip_stream = new_stream
self._resolved_display_index = new_stream.display_index
self._resolved_target_fps = new_stream.target_fps
self._color_strip_source_id = color_strip_source_id
logger.info(f"Swapped color strip source for {self._target_id}: {old_id}{color_strip_source_id}")
except Exception as e:
logger.error(f"Failed to swap color strip source for {self._target_id}: {e}")
def get_display_index(self) -> Optional[int]:
"""Display index being captured."""
"""Display index being captured, from the active stream."""
if self._resolved_display_index is not None:
return self._resolved_display_index
return self._settings.display_index
if self._color_strip_stream is not None:
return self._color_strip_stream.display_index
return None
# ----- State / Metrics -----
def get_state(self) -> dict:
metrics = self._metrics
device_info = self._ctx.get_device_info(self._device_id)
# Include device health info
health_info = {}
if device_info:
# Get full health from the device state (delegate via manager callback)
from wled_controller.core.devices.led_client import DeviceHealth
# We access health through the manager's get_device_health_dict
# For now, return empty — will be populated by manager wrapper
pass
fps_target = self._color_strip_stream.target_fps if self._color_strip_stream else None
return {
"target_id": self._target_id,
"device_id": self._device_id,
"color_strip_source_id": self._color_strip_source_id,
"processing": self._is_running,
"fps_actual": metrics.fps_actual if self._is_running else None,
"fps_potential": metrics.fps_potential if self._is_running else None,
"fps_target": self._settings.fps,
"fps_target": fps_target,
"frames_skipped": metrics.frames_skipped if self._is_running else None,
"frames_keepalive": metrics.frames_keepalive if self._is_running else None,
"fps_current": metrics.fps_current if self._is_running else None,
"timing_extract_ms": round(metrics.timing_extract_ms, 1) if self._is_running else None,
"timing_map_leds_ms": round(metrics.timing_map_leds_ms, 1) if self._is_running else None,
"timing_smooth_ms": round(metrics.timing_smooth_ms, 1) if self._is_running else None,
"timing_send_ms": round(metrics.timing_send_ms, 1) if self._is_running else None,
"timing_total_ms": round(metrics.timing_total_ms, 1) if self._is_running else None,
"display_index": self._resolved_display_index if self._resolved_display_index is not None else self._settings.display_index,
"display_index": self._resolved_display_index,
"overlay_active": self._overlay_active,
"last_update": metrics.last_update,
"errors": [metrics.last_error] if metrics.last_error else [],
@@ -305,6 +245,7 @@ class WledTargetProcessor(TargetProcessor):
def get_metrics(self) -> dict:
metrics = self._metrics
fps_target = self._color_strip_stream.target_fps if self._color_strip_stream else None
uptime_seconds = 0.0
if metrics.start_time and self._is_running:
uptime_seconds = (datetime.utcnow() - metrics.start_time).total_seconds()
@@ -314,7 +255,7 @@ class WledTargetProcessor(TargetProcessor):
"device_id": self._device_id,
"processing": self._is_running,
"fps_actual": metrics.fps_actual if self._is_running else None,
"fps_target": self._settings.fps,
"fps_target": fps_target,
"uptime_seconds": uptime_seconds,
"frames_processed": metrics.frames_processed,
"errors_count": metrics.errors_count,
@@ -331,11 +272,21 @@ class WledTargetProcessor(TargetProcessor):
if self._overlay_active:
raise RuntimeError(f"Overlay already active for {self._target_id}")
device_info = self._ctx.get_device_info(self._device_id)
if device_info is None:
raise ValueError(f"Device {self._device_id} not found")
# Calibration comes from the active color strip stream
if self._color_strip_stream is None:
raise ValueError(
f"Cannot start overlay for {self._target_id}: no color strip stream active. "
f"Start processing first."
)
calibration = self._color_strip_stream.calibration
display_index = self._resolved_display_index
if display_index is None:
display_index = self._color_strip_stream.display_index
if display_index is None or display_index < 0:
raise ValueError(f"Invalid display index {display_index} for overlay")
display_index = self._resolved_display_index or self._settings.display_index
displays = get_available_displays()
if display_index >= len(displays):
raise ValueError(f"Invalid display index {display_index}")
@@ -344,7 +295,7 @@ class WledTargetProcessor(TargetProcessor):
await asyncio.to_thread(
self._ctx.overlay_manager.start_overlay,
self._target_id, display_info, device_info.calibration, target_name,
self._target_id, display_info, calibration, target_name,
)
self._overlay_active = True
@@ -366,106 +317,65 @@ class WledTargetProcessor(TargetProcessor):
def is_overlay_active(self) -> bool:
return self._overlay_active
# ----- Private: stream settings resolution -----
def _resolve_stream_settings(self) -> None:
"""Resolve picture source chain to populate resolved_* metadata fields."""
if not self._picture_source_id or not self._ctx.picture_source_store:
raise ValueError(f"Target {self._target_id} has no picture source assigned")
from wled_controller.storage.picture_source import ScreenCapturePictureSource, StaticImagePictureSource
chain = self._ctx.picture_source_store.resolve_stream_chain(self._picture_source_id)
raw_stream = chain["raw_stream"]
if isinstance(raw_stream, StaticImagePictureSource):
self._resolved_display_index = -1
self._resolved_target_fps = 1
self._resolved_engine_type = None
self._resolved_engine_config = None
elif isinstance(raw_stream, ScreenCapturePictureSource):
self._resolved_display_index = raw_stream.display_index
self._resolved_target_fps = raw_stream.target_fps
if raw_stream.capture_template_id and self._ctx.capture_template_store:
try:
tpl = self._ctx.capture_template_store.get_template(raw_stream.capture_template_id)
self._resolved_engine_type = tpl.engine_type
self._resolved_engine_config = tpl.engine_config
except ValueError:
logger.warning(
f"Capture template {raw_stream.capture_template_id} not found, "
f"using MSS fallback"
)
self._resolved_engine_type = "mss"
self._resolved_engine_config = {}
logger.info(
f"Resolved stream metadata for target {self._target_id}: "
f"display={self._resolved_display_index}, fps={self._resolved_target_fps}, "
f"engine={self._resolved_engine_type}"
)
# ----- Private: processing loop -----
async def _processing_loop(self) -> None:
"""Main processing loop — capture → extract → map → smooth → send."""
settings = self._settings
device_info = self._ctx.get_device_info(self._device_id)
@staticmethod
def _apply_brightness(colors: np.ndarray, device_info: Optional[DeviceInfo]) -> np.ndarray:
"""Apply device software_brightness if < 255."""
if device_info and device_info.software_brightness < 255:
return (colors.astype(np.uint16) * device_info.software_brightness >> 8).astype(np.uint8)
return colors
target_fps = settings.fps
smoothing = settings.smoothing
border_width = device_info.calibration.border_width if device_info else 10
led_brightness = settings.brightness
async def _processing_loop(self) -> None:
"""Main processing loop — poll ColorStripStream → apply brightness → send."""
stream = self._color_strip_stream
target_fps = self._resolved_target_fps or 30
frame_time = 1.0 / target_fps
standby_interval = self._standby_interval
fps_samples: collections.deque = collections.deque(maxlen=10)
send_timestamps: collections.deque = collections.deque()
prev_colors = None
last_send_time = 0.0
prev_frame_time_stamp = time.time()
loop = asyncio.get_running_loop()
logger.info(
f"Processing loop started for target {self._target_id} "
f"(display={self._resolved_display_index}, fps={target_fps})"
)
frame_time = 1.0 / target_fps
standby_interval = settings.standby_interval
fps_samples: collections.deque = collections.deque(maxlen=10)
timing_samples: collections.deque = collections.deque(maxlen=10)
prev_frame_time_stamp = time.time()
prev_capture = None
last_send_time = 0.0
send_timestamps: collections.deque = collections.deque()
loop = asyncio.get_running_loop()
try:
while self._is_running:
now = loop_start = time.time()
loop_start = now = time.time()
# Re-fetch device info for runtime changes (test mode, brightness)
device_info = self._ctx.get_device_info(self._device_id)
# Skip capture/send while in calibration test mode
# Skip send while in calibration test mode
if device_info and device_info.test_mode_active:
await asyncio.sleep(frame_time)
continue
try:
capture = self._live_stream.get_latest_frame()
colors = stream.get_latest_colors()
if capture is None:
if colors is None:
if self._metrics.frames_processed == 0:
logger.info(f"Capture returned None for target {self._target_id} (no new frame yet)")
logger.info(f"Stream returned None for target {self._target_id} (no data yet)")
await asyncio.sleep(frame_time)
continue
# Skip processing + send if the frame hasn't changed
if capture is prev_capture:
if self._previous_colors is not None and (loop_start - last_send_time) >= standby_interval:
if colors is prev_colors:
# Same frame — send keepalive if interval elapsed
if prev_colors is not None and (loop_start - last_send_time) >= standby_interval:
if not self._is_running or self._led_client is None:
break
brightness_value = int(led_brightness * 255)
if device_info and device_info.software_brightness < 255:
brightness_value = brightness_value * device_info.software_brightness // 255
send_colors = self._apply_brightness(prev_colors, device_info)
if self._led_client.supports_fast_send:
self._led_client.send_pixels_fast(self._previous_colors, brightness=brightness_value)
self._led_client.send_pixels_fast(send_colors)
else:
await self._led_client.send_pixels(self._previous_colors, brightness=brightness_value)
await self._led_client.send_pixels(send_colors)
now = time.time()
last_send_time = now
send_timestamps.append(now)
@@ -476,22 +386,13 @@ class WledTargetProcessor(TargetProcessor):
self._metrics.fps_current = len(send_timestamps)
await asyncio.sleep(frame_time)
continue
prev_capture = capture
# Compute brightness before thread dispatch
brightness_value = int(led_brightness * 255)
if device_info and device_info.software_brightness < 255:
brightness_value = brightness_value * device_info.software_brightness // 255
prev_colors = colors
# CPU-bound work in dedicated thread-pool executor
raw_colors, send_colors, frame_timing = await loop.run_in_executor(
_frame_executor,
_process_frame, capture, border_width,
self._pixel_mapper, self._previous_colors, smoothing,
brightness_value,
)
# Apply device software brightness
send_colors = self._apply_brightness(colors, device_info)
# Send to LED device (brightness already applied in thread)
# Send to LED device
if not self._is_running or self._led_client is None:
break
t_send_start = time.perf_counter()
@@ -500,45 +401,30 @@ class WledTargetProcessor(TargetProcessor):
else:
await self._led_client.send_pixels(send_colors)
send_ms = (time.perf_counter() - t_send_start) * 1000
now = time.time()
last_send_time = now
send_timestamps.append(now)
# Per-stage timing (rolling average over last 10 frames)
frame_timing["send"] = send_ms
timing_samples.append(frame_timing)
n = len(timing_samples)
self._metrics.timing_extract_ms = sum(s["extract"] for s in timing_samples) / n
self._metrics.timing_map_leds_ms = sum(s["map_leds"] for s in timing_samples) / n
self._metrics.timing_smooth_ms = sum(s["smooth"] for s in timing_samples) / n
self._metrics.timing_send_ms = sum(s["send"] for s in timing_samples) / n
self._metrics.timing_total_ms = sum(s["total"] for s in timing_samples) / n + send_ms
# Update metrics
self._metrics.timing_send_ms = send_ms
self._metrics.frames_processed += 1
self._metrics.last_update = datetime.utcnow()
if self._metrics.frames_processed <= 3 or self._metrics.frames_processed % 100 == 0:
logger.info(
f"Frame {self._metrics.frames_processed} for {self._target_id} "
f"({len(send_colors)} LEDs, bri={brightness_value}) — "
f"extract={frame_timing['extract']:.1f}ms "
f"map={frame_timing['map_leds']:.1f}ms "
f"smooth={frame_timing['smooth']:.1f}ms "
f"send={send_ms:.1f}ms"
f"({len(send_colors)} LEDs) — send={send_ms:.1f}ms"
)
self._metrics.last_update = datetime.utcnow()
self._previous_colors = raw_colors
# Calculate actual FPS (reuse cached 'now' from send timestamp)
# FPS tracking
interval = now - prev_frame_time_stamp
prev_frame_time_stamp = now
fps_samples.append(1.0 / interval if interval > 0 else 0)
self._metrics.fps_actual = sum(fps_samples) / len(fps_samples)
# Potential FPS
processing_time = now - loop_start
self._metrics.fps_potential = 1.0 / processing_time if processing_time > 0 else 0
# fps_current: count sends in last 1 second
while send_timestamps and send_timestamps[0] < now - 1.0:
send_timestamps.popleft()
self._metrics.fps_current = len(send_timestamps)