Optimize WLED processing pipeline and add FPS metrics

- Add numpy-based DDP pixel packing (send_pixels_numpy) and fast send
  path (send_pixels_fast) eliminating per-pixel Python loops
- Move ProcessedLiveStream filter processing to background thread so
  get_latest_frame() returns pre-computed cached result instantly
- Vectorize map_border_to_leds for average interpolation using cumulative
  sums instead of 934 individual np.mean calls (~16ms -> <1ms)
- Batch all CPU work into single asyncio.to_thread call per frame
- Fix FPS calculation to measure frame-to-frame interval (was measuring
  processing time only, reporting inflated values)
- Add Potential FPS metric showing theoretical max without throttling
- Add FPS label to WLED target card properties
- Add fps_potential field to TargetProcessingState API schema

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-13 14:43:19 +03:00
parent 7e729c1e4b
commit 8d5ebc92ee
9 changed files with 304 additions and 141 deletions
+97 -53
View File
@@ -3,6 +3,8 @@
from dataclasses import dataclass, field
from typing import Dict, List, Literal, Tuple
import numpy as np
from wled_controller.core.screen_capture import (
BorderPixels,
get_edge_segments,
@@ -189,6 +191,62 @@ class PixelMapper:
f"using {interpolation_mode} interpolation"
)
def _get_edge_pixels(self, border_pixels: BorderPixels, edge_name: str) -> np.ndarray:
"""Get edge pixel array with span slicing applied."""
if edge_name == "top":
edge_pixels = border_pixels.top
elif edge_name == "right":
edge_pixels = border_pixels.right
elif edge_name == "bottom":
edge_pixels = border_pixels.bottom
else:
edge_pixels = border_pixels.left
span_start, span_end = self.calibration.get_edge_span(edge_name)
if span_start > 0.0 or span_end < 1.0:
if edge_name in ("top", "bottom"):
total_w = edge_pixels.shape[1]
s, e = int(span_start * total_w), int(span_end * total_w)
edge_pixels = edge_pixels[:, s:e, :]
else:
total_h = edge_pixels.shape[0]
s, e = int(span_start * total_h), int(span_end * total_h)
edge_pixels = edge_pixels[s:e, :, :]
return edge_pixels
def _map_edge_average(
self, edge_pixels: np.ndarray, edge_name: str, led_count: int
) -> np.ndarray:
"""Vectorized average-color mapping for one edge. Returns (led_count, 3) uint8."""
# Reduce border dimension → 1D array of shape (edge_length, 3)
if edge_name in ("top", "bottom"):
edge_1d = edge_pixels.mean(axis=0) # mean across border_width
else:
edge_1d = edge_pixels.mean(axis=1) # mean across border_width
edge_len = edge_1d.shape[0]
# Compute segment boundaries (matching get_edge_segments float stepping)
step = edge_len / led_count
boundaries = np.empty(led_count + 1, dtype=np.int64)
for i in range(led_count + 1):
boundaries[i] = int(i * step)
# Ensure each segment has at least 1 pixel
for i in range(led_count):
if boundaries[i + 1] <= boundaries[i]:
boundaries[i + 1] = boundaries[i] + 1
boundaries[-1] = min(boundaries[-1], edge_len)
# Cumulative sum for O(1) range means — no per-LED Python numpy calls
cumsum = np.zeros((edge_len + 1, 3), dtype=np.float64)
cumsum[1:] = np.cumsum(edge_1d.astype(np.float64), axis=0)
starts = boundaries[:-1]
ends = boundaries[1:]
lengths = (ends - starts).reshape(-1, 1).astype(np.float64)
segment_sums = cumsum[ends] - cumsum[starts]
return np.clip(segment_sums / lengths, 0, 255).astype(np.uint8)
def map_border_to_leds(
self,
border_pixels: BorderPixels
@@ -205,72 +263,58 @@ class PixelMapper:
ValueError: If border pixels don't match calibration
"""
total_leds = self.calibration.get_total_leds()
led_colors = [(0, 0, 0)] * total_leds
use_fast_avg = self.interpolation_mode == "average"
if use_fast_avg:
led_array = np.zeros((total_leds, 3), dtype=np.uint8)
else:
led_colors = [(0, 0, 0)] * total_leds
# Process each edge
for edge_name in ["top", "right", "bottom", "left"]:
segment = self.calibration.get_segment_for_edge(edge_name)
if not segment:
# This edge is not configured
continue
# Get pixels for this edge
if edge_name == "top":
edge_pixels = border_pixels.top
elif edge_name == "right":
edge_pixels = border_pixels.right
elif edge_name == "bottom":
edge_pixels = border_pixels.bottom
else: # left
edge_pixels = border_pixels.left
edge_pixels = self._get_edge_pixels(border_pixels, edge_name)
# Slice to span region if not full coverage
span_start, span_end = self.calibration.get_edge_span(edge_name)
if span_start > 0.0 or span_end < 1.0:
if edge_name in ("top", "bottom"):
total_w = edge_pixels.shape[1]
s = int(span_start * total_w)
e = int(span_end * total_w)
edge_pixels = edge_pixels[:, s:e, :]
else:
total_h = edge_pixels.shape[0]
s = int(span_start * total_h)
e = int(span_end * total_h)
edge_pixels = edge_pixels[s:e, :, :]
# Divide edge into segments matching LED count
try:
pixel_segments = get_edge_segments(
edge_pixels,
segment.led_count,
edge_name
if use_fast_avg:
# Vectorized: compute all LED colors for this edge at once
colors = self._map_edge_average(
edge_pixels, edge_name, segment.led_count
)
except ValueError as e:
logger.error(f"Failed to segment {edge_name} edge: {e}")
raise
led_indices = np.arange(segment.led_start, segment.led_start + segment.led_count)
if segment.reverse:
led_indices = led_indices[::-1]
led_array[led_indices] = colors
else:
# Per-LED fallback for median/dominant modes
try:
pixel_segments = get_edge_segments(
edge_pixels, segment.led_count, edge_name
)
except ValueError as e:
logger.error(f"Failed to segment {edge_name} edge: {e}")
raise
# Calculate LED indices for this segment
led_indices = list(range(segment.led_start, segment.led_start + segment.led_count))
led_indices = list(range(segment.led_start, segment.led_start + segment.led_count))
if segment.reverse:
led_indices = list(reversed(led_indices))
# Reverse if needed
if segment.reverse:
led_indices = list(reversed(led_indices))
for led_idx, pixel_segment in zip(led_indices, pixel_segments):
color = self._calc_color(pixel_segment)
led_colors[led_idx] = color
# Map pixel segments to LEDs
for led_idx, pixel_segment in zip(led_indices, pixel_segments):
color = self._calc_color(pixel_segment)
led_colors[led_idx] = color
# Apply physical LED offset by rotating the array
# Offset = number of LEDs from LED 0 to the start corner
# Physical LED[i] should get calibration color[(i - offset) % total]
offset = self.calibration.offset % total_leds if total_leds > 0 else 0
if offset > 0:
led_colors = led_colors[total_leds - offset:] + led_colors[:total_leds - offset]
logger.debug(f"Mapped border pixels to {total_leds} LED colors (offset={offset})")
return led_colors
if use_fast_avg:
if offset > 0:
led_array = np.roll(led_array, offset, axis=0)
return [tuple(c) for c in led_array]
else:
if offset > 0:
led_colors = led_colors[total_leds - offset:] + led_colors[:total_leds - offset]
logger.debug(f"Mapped border pixels to {total_leds} LED colors (offset={offset})")
return led_colors
def test_calibration(self, edge: str, color: Tuple[int, int, int]) -> List[Tuple[int, int, int]]:
"""Generate test pattern to light up specific edge.