Optimize streaming pipeline and capture hot paths

- Replace asyncio.to_thread with dedicated ThreadPoolExecutor (skip
  per-frame context copy overhead)
- Move brightness scaling into _process_frame thread (avoid extra
  numpy array copies on event loop)
- Remove PIL intermediate in MSS capture (direct bytes→numpy)
- Unify median/dominant pixel mapping to numpy arrays (eliminate
  Python list-of-tuples path and duplicate Phase 2/3 code)
- Cache CalibrationConfig.segments property (avoid ~240 rebuilds/sec)
- Make KC WebSocket broadcasts concurrent via asyncio.gather
- Fix fps_samples list.pop(0) → deque(maxlen=10) in both processors
- Cache time.time() calls to reduce redundant syscalls per frame
- Log event queue drops instead of silently discarding

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-19 22:55:21 +03:00
parent bfe6a7a2ab
commit fbf597dc29
6 changed files with 131 additions and 127 deletions
@@ -1,13 +1,12 @@
"""Calibration system for mapping screen pixels to LED positions."""
from dataclasses import dataclass, field
from dataclasses import dataclass
from typing import Dict, List, Literal, Tuple
import numpy as np
from wled_controller.core.capture.screen_capture import (
BorderPixels,
get_edge_segments,
calculate_average_color,
calculate_median_color,
calculate_dominant_color,
@@ -110,10 +109,15 @@ class CalibrationConfig:
return segments
def __post_init__(self):
self._cached_segments: List[CalibrationSegment] | None = None
@property
def segments(self) -> List[CalibrationSegment]:
"""Get derived segment list."""
return self.build_segments()
"""Get derived segment list (cached after first call)."""
if self._cached_segments is None:
self._cached_segments = self.build_segments()
return self._cached_segments
def get_edge_span(self, edge: str) -> tuple[float, float]:
"""Get span (start, end) for a given edge."""
@@ -219,6 +223,33 @@ class PixelMapper:
edge_pixels = edge_pixels[s:e, :, :]
return edge_pixels
def _map_edge_fallback(
self, edge_pixels: np.ndarray, edge_name: str, led_count: int
) -> np.ndarray:
"""Per-LED color mapping for median/dominant modes. Returns (led_count, 3) uint8."""
if edge_name in ("top", "bottom"):
edge_len = edge_pixels.shape[1]
else:
edge_len = edge_pixels.shape[0]
step = edge_len / led_count
result = np.empty((led_count, 3), dtype=np.uint8)
for i in range(led_count):
start = int(i * step)
end = max(start + 1, int((i + 1) * step))
end = min(end, edge_len)
if edge_name in ("top", "bottom"):
segment = edge_pixels[:, start:end, :]
else:
segment = edge_pixels[start:end, :, :]
color = self._calc_color(segment)
result[i] = color
return result
def _map_edge_average(
self, edge_pixels: np.ndarray, edge_name: str, led_count: int
) -> np.ndarray:
@@ -274,92 +305,48 @@ class PixelMapper:
active_count = max(0, total_leds - skip_start - skip_end)
use_fast_avg = self.interpolation_mode == "average"
# Phase 1: Map full perimeter to total_leds positions
if use_fast_avg:
led_array = np.zeros((total_leds, 3), dtype=np.uint8)
else:
led_colors = [(0, 0, 0)] * total_leds
# Phase 1: Map full perimeter to total_leds positions (numpy for all modes)
led_array = np.zeros((total_leds, 3), dtype=np.uint8)
for edge_name in ["top", "right", "bottom", "left"]:
segment = self.calibration.get_segment_for_edge(edge_name)
if not segment:
continue
edge_pixels = self._get_edge_pixels(border_pixels, edge_name)
for segment in self.calibration.segments:
edge_pixels = self._get_edge_pixels(border_pixels, segment.edge)
if use_fast_avg:
# Vectorized: compute all LED colors for this edge at once
colors = self._map_edge_average(
edge_pixels, edge_name, segment.led_count
edge_pixels, segment.edge, segment.led_count
)
led_indices = np.arange(segment.led_start, segment.led_start + segment.led_count)
if segment.reverse:
led_indices = led_indices[::-1]
led_array[led_indices] = colors
else:
# Per-LED fallback for median/dominant modes
try:
pixel_segments = get_edge_segments(
edge_pixels, segment.led_count, edge_name
)
except ValueError as e:
logger.error(f"Failed to segment {edge_name} edge: {e}")
raise
colors = self._map_edge_fallback(
edge_pixels, segment.edge, segment.led_count
)
led_indices = list(range(segment.led_start, segment.led_start + segment.led_count))
if segment.reverse:
led_indices = list(reversed(led_indices))
for led_idx, pixel_segment in zip(led_indices, pixel_segments):
color = self._calc_color(pixel_segment)
led_colors[led_idx] = color
led_indices = np.arange(segment.led_start, segment.led_start + segment.led_count)
if segment.reverse:
led_indices = led_indices[::-1]
led_array[led_indices] = colors
# Phase 2: Offset rotation
offset = self.calibration.offset % total_leds if total_leds > 0 else 0
if offset > 0:
led_array = np.roll(led_array, offset, axis=0)
if use_fast_avg:
if offset > 0:
led_array = np.roll(led_array, offset, axis=0)
# Phase 3: Physical skip — resample full perimeter to active LEDs
if active_count > 0 and active_count < total_leds:
src = np.linspace(0, total_leds - 1, active_count)
full_f = led_array.astype(np.float64)
x = np.arange(total_leds, dtype=np.float64)
resampled = np.empty((active_count, 3), dtype=np.uint8)
for ch in range(3):
resampled[:, ch] = np.round(
np.interp(src, x, full_f[:, ch])
).astype(np.uint8)
led_array[:] = 0
end_idx = total_leds - skip_end
led_array[skip_start:end_idx] = resampled
elif active_count <= 0:
led_array[:] = 0
# Phase 3: Physical skip — resample full perimeter to active LEDs
# Maps the entire screen to active_count positions so each active LED
# covers a proportionally larger slice of the perimeter.
if active_count > 0 and active_count < total_leds:
src = np.linspace(0, total_leds - 1, active_count)
full_f = led_array.astype(np.float64)
x = np.arange(total_leds, dtype=np.float64)
resampled = np.empty((active_count, 3), dtype=np.uint8)
for ch in range(3):
resampled[:, ch] = np.round(
np.interp(src, x, full_f[:, ch])
).astype(np.uint8)
led_array[:] = 0
end_idx = total_leds - skip_end
led_array[skip_start:end_idx] = resampled
elif active_count <= 0:
led_array[:] = 0
return led_array
else:
if offset > 0:
led_colors = led_colors[total_leds - offset:] + led_colors[:total_leds - offset]
# Phase 3: Physical skip — resample full perimeter to active LEDs
if active_count > 0 and active_count < total_leds:
arr = np.array(led_colors, dtype=np.float64)
src = np.linspace(0, total_leds - 1, active_count)
x = np.arange(total_leds, dtype=np.float64)
resampled = np.empty((active_count, 3), dtype=np.float64)
for ch in range(3):
resampled[:, ch] = np.interp(src, x, arr[:, ch])
led_colors = [(0, 0, 0)] * total_leds
for i in range(active_count):
r, g, b = resampled[i]
led_colors[skip_start + i] = (int(round(r)), int(round(g)), int(round(b)))
elif active_count <= 0:
led_colors = [(0, 0, 0)] * total_leds
return np.array(led_colors, dtype=np.uint8)
return led_array
def test_calibration(self, edge: str, color: Tuple[int, int, int]) -> List[Tuple[int, int, int]]:
"""Generate test pattern to light up specific edge.
@@ -5,7 +5,6 @@ from typing import Dict, List
import mss
import numpy as np
from PIL import Image
from wled_controller.utils import get_logger, get_monitor_names, get_monitor_refresh_rates
@@ -122,9 +121,10 @@ def capture_display(display_index: int = 0) -> ScreenCapture:
# Capture screenshot
screenshot = sct.grab(monitor)
# Convert to numpy array (RGB)
img = Image.frombytes("RGB", screenshot.size, screenshot.rgb)
img_array = np.array(img)
# Direct bytes→numpy (skips PIL intermediate object)
img_array = np.frombuffer(
screenshot.rgb, dtype=np.uint8,
).reshape(screenshot.height, screenshot.width, 3)
logger.debug(
f"Captured display {display_index}: {monitor['width']}x{monitor['height']}"