From fbf597dc29ab0f7dbcaa729f0b2fed4a1b884385 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Thu, 19 Feb 2026 22:55:21 +0300 Subject: [PATCH] Optimize streaming pipeline and capture hot paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace asyncio.to_thread with dedicated ThreadPoolExecutor (skip per-frame context copy overhead) - Move brightness scaling into _process_frame thread (avoid extra numpy array copies on event loop) - Remove PIL intermediate in MSS capture (direct bytes→numpy) - Unify median/dominant pixel mapping to numpy arrays (eliminate Python list-of-tuples path and duplicate Phase 2/3 code) - Cache CalibrationConfig.segments property (avoid ~240 rebuilds/sec) - Make KC WebSocket broadcasts concurrent via asyncio.gather - Fix fps_samples list.pop(0) → deque(maxlen=10) in both processors - Cache time.time() calls to reduce redundant syscalls per frame - Log event queue drops instead of silently discarding Co-Authored-By: Claude Opus 4.6 --- .../core/capture/calibration.py | 143 ++++++++---------- .../core/capture/screen_capture.py | 8 +- .../core/capture_engines/mss_engine.py | 8 +- .../core/processing/kc_target_processor.py | 15 +- .../core/processing/processor_manager.py | 2 +- .../core/processing/wled_target_processor.py | 82 ++++++---- 6 files changed, 131 insertions(+), 127 deletions(-) diff --git a/server/src/wled_controller/core/capture/calibration.py b/server/src/wled_controller/core/capture/calibration.py index df05a25..2dd943e 100644 --- a/server/src/wled_controller/core/capture/calibration.py +++ b/server/src/wled_controller/core/capture/calibration.py @@ -1,13 +1,12 @@ """Calibration system for mapping screen pixels to LED positions.""" -from dataclasses import dataclass, field +from dataclasses import dataclass from typing import Dict, List, Literal, Tuple import numpy as np from wled_controller.core.capture.screen_capture import ( BorderPixels, - get_edge_segments, calculate_average_color, calculate_median_color, calculate_dominant_color, @@ -110,10 +109,15 @@ class CalibrationConfig: return segments + def __post_init__(self): + self._cached_segments: List[CalibrationSegment] | None = None + @property def segments(self) -> List[CalibrationSegment]: - """Get derived segment list.""" - return self.build_segments() + """Get derived segment list (cached after first call).""" + if self._cached_segments is None: + self._cached_segments = self.build_segments() + return self._cached_segments def get_edge_span(self, edge: str) -> tuple[float, float]: """Get span (start, end) for a given edge.""" @@ -219,6 +223,33 @@ class PixelMapper: edge_pixels = edge_pixels[s:e, :, :] return edge_pixels + def _map_edge_fallback( + self, edge_pixels: np.ndarray, edge_name: str, led_count: int + ) -> np.ndarray: + """Per-LED color mapping for median/dominant modes. Returns (led_count, 3) uint8.""" + if edge_name in ("top", "bottom"): + edge_len = edge_pixels.shape[1] + else: + edge_len = edge_pixels.shape[0] + + step = edge_len / led_count + result = np.empty((led_count, 3), dtype=np.uint8) + + for i in range(led_count): + start = int(i * step) + end = max(start + 1, int((i + 1) * step)) + end = min(end, edge_len) + + if edge_name in ("top", "bottom"): + segment = edge_pixels[:, start:end, :] + else: + segment = edge_pixels[start:end, :, :] + + color = self._calc_color(segment) + result[i] = color + + return result + def _map_edge_average( self, edge_pixels: np.ndarray, edge_name: str, led_count: int ) -> np.ndarray: @@ -274,92 +305,48 @@ class PixelMapper: active_count = max(0, total_leds - skip_start - skip_end) use_fast_avg = self.interpolation_mode == "average" - # Phase 1: Map full perimeter to total_leds positions - if use_fast_avg: - led_array = np.zeros((total_leds, 3), dtype=np.uint8) - else: - led_colors = [(0, 0, 0)] * total_leds + # Phase 1: Map full perimeter to total_leds positions (numpy for all modes) + led_array = np.zeros((total_leds, 3), dtype=np.uint8) - for edge_name in ["top", "right", "bottom", "left"]: - segment = self.calibration.get_segment_for_edge(edge_name) - if not segment: - continue - - edge_pixels = self._get_edge_pixels(border_pixels, edge_name) + for segment in self.calibration.segments: + edge_pixels = self._get_edge_pixels(border_pixels, segment.edge) if use_fast_avg: - # Vectorized: compute all LED colors for this edge at once colors = self._map_edge_average( - edge_pixels, edge_name, segment.led_count + edge_pixels, segment.edge, segment.led_count ) - led_indices = np.arange(segment.led_start, segment.led_start + segment.led_count) - if segment.reverse: - led_indices = led_indices[::-1] - led_array[led_indices] = colors else: - # Per-LED fallback for median/dominant modes - try: - pixel_segments = get_edge_segments( - edge_pixels, segment.led_count, edge_name - ) - except ValueError as e: - logger.error(f"Failed to segment {edge_name} edge: {e}") - raise + colors = self._map_edge_fallback( + edge_pixels, segment.edge, segment.led_count + ) - led_indices = list(range(segment.led_start, segment.led_start + segment.led_count)) - if segment.reverse: - led_indices = list(reversed(led_indices)) - - for led_idx, pixel_segment in zip(led_indices, pixel_segments): - color = self._calc_color(pixel_segment) - led_colors[led_idx] = color + led_indices = np.arange(segment.led_start, segment.led_start + segment.led_count) + if segment.reverse: + led_indices = led_indices[::-1] + led_array[led_indices] = colors # Phase 2: Offset rotation offset = self.calibration.offset % total_leds if total_leds > 0 else 0 + if offset > 0: + led_array = np.roll(led_array, offset, axis=0) - if use_fast_avg: - if offset > 0: - led_array = np.roll(led_array, offset, axis=0) + # Phase 3: Physical skip — resample full perimeter to active LEDs + if active_count > 0 and active_count < total_leds: + src = np.linspace(0, total_leds - 1, active_count) + full_f = led_array.astype(np.float64) + x = np.arange(total_leds, dtype=np.float64) + resampled = np.empty((active_count, 3), dtype=np.uint8) + for ch in range(3): + resampled[:, ch] = np.round( + np.interp(src, x, full_f[:, ch]) + ).astype(np.uint8) + led_array[:] = 0 + end_idx = total_leds - skip_end + led_array[skip_start:end_idx] = resampled + elif active_count <= 0: + led_array[:] = 0 - # Phase 3: Physical skip — resample full perimeter to active LEDs - # Maps the entire screen to active_count positions so each active LED - # covers a proportionally larger slice of the perimeter. - if active_count > 0 and active_count < total_leds: - src = np.linspace(0, total_leds - 1, active_count) - full_f = led_array.astype(np.float64) - x = np.arange(total_leds, dtype=np.float64) - resampled = np.empty((active_count, 3), dtype=np.uint8) - for ch in range(3): - resampled[:, ch] = np.round( - np.interp(src, x, full_f[:, ch]) - ).astype(np.uint8) - led_array[:] = 0 - end_idx = total_leds - skip_end - led_array[skip_start:end_idx] = resampled - elif active_count <= 0: - led_array[:] = 0 - - return led_array - else: - if offset > 0: - led_colors = led_colors[total_leds - offset:] + led_colors[:total_leds - offset] - - # Phase 3: Physical skip — resample full perimeter to active LEDs - if active_count > 0 and active_count < total_leds: - arr = np.array(led_colors, dtype=np.float64) - src = np.linspace(0, total_leds - 1, active_count) - x = np.arange(total_leds, dtype=np.float64) - resampled = np.empty((active_count, 3), dtype=np.float64) - for ch in range(3): - resampled[:, ch] = np.interp(src, x, arr[:, ch]) - led_colors = [(0, 0, 0)] * total_leds - for i in range(active_count): - r, g, b = resampled[i] - led_colors[skip_start + i] = (int(round(r)), int(round(g)), int(round(b))) - elif active_count <= 0: - led_colors = [(0, 0, 0)] * total_leds - - return np.array(led_colors, dtype=np.uint8) + return led_array def test_calibration(self, edge: str, color: Tuple[int, int, int]) -> List[Tuple[int, int, int]]: """Generate test pattern to light up specific edge. diff --git a/server/src/wled_controller/core/capture/screen_capture.py b/server/src/wled_controller/core/capture/screen_capture.py index 8ff8046..16b66e3 100644 --- a/server/src/wled_controller/core/capture/screen_capture.py +++ b/server/src/wled_controller/core/capture/screen_capture.py @@ -5,7 +5,6 @@ from typing import Dict, List import mss import numpy as np -from PIL import Image from wled_controller.utils import get_logger, get_monitor_names, get_monitor_refresh_rates @@ -122,9 +121,10 @@ def capture_display(display_index: int = 0) -> ScreenCapture: # Capture screenshot screenshot = sct.grab(monitor) - # Convert to numpy array (RGB) - img = Image.frombytes("RGB", screenshot.size, screenshot.rgb) - img_array = np.array(img) + # Direct bytes→numpy (skips PIL intermediate object) + img_array = np.frombuffer( + screenshot.rgb, dtype=np.uint8, + ).reshape(screenshot.height, screenshot.width, 3) logger.debug( f"Captured display {display_index}: {monitor['width']}x{monitor['height']}" diff --git a/server/src/wled_controller/core/capture_engines/mss_engine.py b/server/src/wled_controller/core/capture_engines/mss_engine.py index 8b94416..65ab53a 100644 --- a/server/src/wled_controller/core/capture_engines/mss_engine.py +++ b/server/src/wled_controller/core/capture_engines/mss_engine.py @@ -4,7 +4,6 @@ from typing import Any, Dict, List, Optional import mss import numpy as np -from PIL import Image from wled_controller.core.capture_engines.base import ( CaptureEngine, @@ -56,9 +55,10 @@ class MSSCaptureStream(CaptureStream): monitor = self._sct.monitors[monitor_index] screenshot = self._sct.grab(monitor) - # Convert to numpy array (RGB) - img = Image.frombytes("RGB", screenshot.size, screenshot.rgb) - img_array = np.array(img) + # Direct bytes→numpy (skips PIL intermediate object) + img_array = np.frombuffer( + screenshot.rgb, dtype=np.uint8, + ).reshape(screenshot.height, screenshot.width, 3) logger.debug( f"MSS captured display {self.display_index}: {monitor['width']}x{monitor['height']}" diff --git a/server/src/wled_controller/core/processing/kc_target_processor.py b/server/src/wled_controller/core/processing/kc_target_processor.py index 682a122..2232e0f 100644 --- a/server/src/wled_controller/core/processing/kc_target_processor.py +++ b/server/src/wled_controller/core/processing/kc_target_processor.py @@ -273,7 +273,7 @@ class KCTargetProcessor(TargetProcessor): calc_fn = calc_fns.get(settings.interpolation_mode, calculate_average_color) frame_time = 1.0 / target_fps - fps_samples: List[float] = [] + fps_samples: collections.deque = collections.deque(maxlen=10) timing_samples: collections.deque = collections.deque(maxlen=10) prev_frame_time_stamp = time.time() prev_capture = None @@ -366,8 +366,6 @@ class KCTargetProcessor(TargetProcessor): interval = now - prev_frame_time_stamp prev_frame_time_stamp = now fps_samples.append(1.0 / interval if interval > 0 else 0) - if len(fps_samples) > 10: - fps_samples.pop(0) self._metrics.fps_actual = sum(fps_samples) / len(fps_samples) # Potential FPS @@ -401,7 +399,7 @@ class KCTargetProcessor(TargetProcessor): logger.info(f"KC processing loop ended for target {self._target_id}") async def _broadcast_colors(self, colors: Dict[str, Tuple[int, int, int]]) -> None: - """Broadcast extracted colors to WebSocket clients.""" + """Broadcast extracted colors to WebSocket clients (concurrent sends).""" if not self._ws_clients: return @@ -415,12 +413,15 @@ class KCTargetProcessor(TargetProcessor): "timestamp": datetime.utcnow().isoformat(), }) - disconnected = [] - for ws in self._ws_clients: + async def _send_safe(ws): try: await ws.send_text(message) + return True except Exception: - disconnected.append(ws) + return False + results = await asyncio.gather(*[_send_safe(ws) for ws in self._ws_clients]) + + disconnected = [ws for ws, ok in zip(self._ws_clients, results) if not ok] for ws in disconnected: self._ws_clients.remove(ws) diff --git a/server/src/wled_controller/core/processing/processor_manager.py b/server/src/wled_controller/core/processing/processor_manager.py index b6c7b8e..f499a25 100644 --- a/server/src/wled_controller/core/processing/processor_manager.py +++ b/server/src/wled_controller/core/processing/processor_manager.py @@ -136,7 +136,7 @@ class ProcessorManager: try: q.put_nowait(event) except asyncio.QueueFull: - pass + logger.warning(f"Event queue full, dropping: {event.get('type', '?')}") async def _get_http_client(self) -> httpx.AsyncClient: """Get or create a shared HTTP client for health checks.""" diff --git a/server/src/wled_controller/core/processing/wled_target_processor.py b/server/src/wled_controller/core/processing/wled_target_processor.py index ec928b3..f94a108 100644 --- a/server/src/wled_controller/core/processing/wled_target_processor.py +++ b/server/src/wled_controller/core/processing/wled_target_processor.py @@ -4,6 +4,7 @@ from __future__ import annotations import asyncio import collections +import concurrent.futures import time from datetime import datetime from typing import TYPE_CHECKING, Optional @@ -31,16 +32,22 @@ if TYPE_CHECKING: logger = get_logger(__name__) +_frame_executor = concurrent.futures.ThreadPoolExecutor( + max_workers=4, thread_name_prefix="frame-proc", +) + # --------------------------------------------------------------------------- -# CPU-bound frame processing (runs in thread pool via asyncio.to_thread) +# CPU-bound frame processing (runs in dedicated thread-pool executor) # --------------------------------------------------------------------------- -def _process_frame(capture, border_width, pixel_mapper, previous_colors, smoothing): +def _process_frame(capture, border_width, pixel_mapper, previous_colors, smoothing, brightness): """All CPU-bound work for one WLED frame. - Returns (led_colors, timing_ms) where led_colors is numpy array (N, 3) uint8 - and timing_ms is a dict with per-stage timing in milliseconds. + Returns (raw_colors, send_colors, timing_ms). + raw_colors: unscaled array for smoothing history. + send_colors: brightness-scaled array ready for DDP send. + timing_ms: dict with per-stage timing in milliseconds. """ t0 = time.perf_counter() border_pixels = extract_border_pixels(capture, border_width) @@ -58,13 +65,20 @@ def _process_frame(capture, border_width, pixel_mapper, previous_colors, smoothi led_colors = led_colors.astype(np.uint8) t3 = time.perf_counter() + # Apply brightness scaling in thread pool (avoids extra array copies on event loop) + if brightness < 255: + send_colors = (led_colors.astype(np.uint16) * brightness >> 8).astype(np.uint8) + else: + send_colors = led_colors + t4 = time.perf_counter() + timing_ms = { "extract": (t1 - t0) * 1000, "map_leds": (t2 - t1) * 1000, "smooth": (t3 - t2) * 1000, - "total": (t3 - t0) * 1000, + "total": (t4 - t0) * 1000, } - return led_colors, timing_ms + return led_colors, send_colors, timing_ms # --------------------------------------------------------------------------- @@ -405,16 +419,17 @@ class WledTargetProcessor(TargetProcessor): frame_time = 1.0 / target_fps standby_interval = settings.standby_interval - fps_samples = [] + fps_samples: collections.deque = collections.deque(maxlen=10) timing_samples: collections.deque = collections.deque(maxlen=10) prev_frame_time_stamp = time.time() prev_capture = None last_send_time = 0.0 send_timestamps: collections.deque = collections.deque() + loop = asyncio.get_running_loop() try: while self._is_running: - loop_start = time.time() + now = loop_start = time.time() # Re-fetch device info for runtime changes (test mode, brightness) device_info = self._ctx.get_device_info(self._device_id) @@ -445,39 +460,43 @@ class WledTargetProcessor(TargetProcessor): self._led_client.send_pixels_fast(self._previous_colors, brightness=brightness_value) else: await self._led_client.send_pixels(self._previous_colors, brightness=brightness_value) - last_send_time = time.time() - send_timestamps.append(last_send_time) + now = time.time() + last_send_time = now + send_timestamps.append(now) self._metrics.frames_keepalive += 1 self._metrics.frames_skipped += 1 - now_ts = time.time() - while send_timestamps and send_timestamps[0] < now_ts - 1.0: + while send_timestamps and send_timestamps[0] < now - 1.0: send_timestamps.popleft() self._metrics.fps_current = len(send_timestamps) await asyncio.sleep(frame_time) continue prev_capture = capture - # CPU-bound work in thread pool - led_colors, frame_timing = await asyncio.to_thread( - _process_frame, - capture, border_width, - self._pixel_mapper, self._previous_colors, smoothing, - ) - - # Send to LED device with brightness - if not self._is_running or self._led_client is None: - break + # Compute brightness before thread dispatch brightness_value = int(led_brightness * 255) if device_info and device_info.software_brightness < 255: brightness_value = brightness_value * device_info.software_brightness // 255 + + # CPU-bound work in dedicated thread-pool executor + raw_colors, send_colors, frame_timing = await loop.run_in_executor( + _frame_executor, + _process_frame, capture, border_width, + self._pixel_mapper, self._previous_colors, smoothing, + brightness_value, + ) + + # Send to LED device (brightness already applied in thread) + if not self._is_running or self._led_client is None: + break t_send_start = time.perf_counter() if self._led_client.supports_fast_send: - self._led_client.send_pixels_fast(led_colors, brightness=brightness_value) + self._led_client.send_pixels_fast(send_colors) else: - await self._led_client.send_pixels(led_colors, brightness=brightness_value) + await self._led_client.send_pixels(send_colors) send_ms = (time.perf_counter() - t_send_start) * 1000 - last_send_time = time.time() - send_timestamps.append(last_send_time) + now = time.time() + last_send_time = now + send_timestamps.append(now) # Per-stage timing (rolling average over last 10 frames) frame_timing["send"] = send_ms @@ -494,22 +513,19 @@ class WledTargetProcessor(TargetProcessor): if self._metrics.frames_processed <= 3 or self._metrics.frames_processed % 100 == 0: logger.info( f"Frame {self._metrics.frames_processed} for {self._target_id} " - f"({len(led_colors)} LEDs, bri={brightness_value}) — " + f"({len(send_colors)} LEDs, bri={brightness_value}) — " f"extract={frame_timing['extract']:.1f}ms " f"map={frame_timing['map_leds']:.1f}ms " f"smooth={frame_timing['smooth']:.1f}ms " f"send={send_ms:.1f}ms" ) self._metrics.last_update = datetime.utcnow() - self._previous_colors = led_colors + self._previous_colors = raw_colors - # Calculate actual FPS - now = time.time() + # Calculate actual FPS (reuse cached 'now' from send timestamp) interval = now - prev_frame_time_stamp prev_frame_time_stamp = now fps_samples.append(1.0 / interval if interval > 0 else 0) - if len(fps_samples) > 10: - fps_samples.pop(0) self._metrics.fps_actual = sum(fps_samples) / len(fps_samples) # Potential FPS @@ -527,7 +543,7 @@ class WledTargetProcessor(TargetProcessor): logger.error(f"Processing error for target {self._target_id}: {e}", exc_info=True) # Throttle to target FPS - elapsed = time.time() - loop_start + elapsed = now - loop_start remaining = frame_time - elapsed if remaining > 0: await asyncio.sleep(remaining)