diff --git a/server/src/wled_controller/core/capture/calibration.py b/server/src/wled_controller/core/capture/calibration.py index 3d44e66..d0f24d5 100644 --- a/server/src/wled_controller/core/capture/calibration.py +++ b/server/src/wled_controller/core/capture/calibration.py @@ -195,8 +195,42 @@ class PixelMapper: else: raise ValueError(f"Invalid interpolation mode: {interpolation_mode}") + # Pre-allocate LED output buffer (reused every call) + total_leds = calibration.get_total_leds() + self._total_leds = total_leds + self._led_buf = np.zeros((total_leds, 3), dtype=np.uint8) + self._use_fast_avg = interpolation_mode == "average" + + # Pre-compute offset-adjusted index arrays per segment (avoids np.roll) + offset = calibration.offset % total_leds if total_leds > 0 else 0 + self._segment_indices: List[np.ndarray] = [] + for segment in calibration.segments: + indices = np.arange(segment.led_start, segment.led_start + segment.led_count) + if segment.reverse: + indices = indices[::-1] + if offset > 0: + indices = (indices + offset) % total_leds + self._segment_indices.append(indices) + + # Pre-compute Phase 3 skip arrays (static geometry) + skip_start = calibration.skip_leds_start + skip_end = calibration.skip_leds_end + self._skip_start = skip_start + self._skip_end = skip_end + self._active_count = max(0, total_leds - skip_start - skip_end) + if 0 < self._active_count < total_leds: + self._skip_src = np.linspace(0, total_leds - 1, self._active_count) + self._skip_x = np.arange(total_leds, dtype=np.float64) + self._skip_float = np.empty((total_leds, 3), dtype=np.float64) + self._skip_resampled = np.empty((self._active_count, 3), dtype=np.uint8) + else: + self._skip_src = self._skip_x = self._skip_float = self._skip_resampled = None + + # Per-edge average computation cache (lazy-initialized on first frame) + self._edge_cache: Dict[str, tuple] = {} + logger.info( - f"Initialized pixel mapper with {self.calibration.get_total_leds()} LEDs " + f"Initialized pixel mapper with {total_leds} LEDs " f"using {interpolation_mode} interpolation" ) @@ -253,31 +287,43 @@ class PixelMapper: def _map_edge_average( self, edge_pixels: np.ndarray, edge_name: str, led_count: int ) -> np.ndarray: - """Vectorized average-color mapping for one edge. Returns (led_count, 3) uint8.""" - # Reduce border dimension → 1D array of shape (edge_length, 3) + """Vectorized average-color mapping for one edge. Returns (led_count, 3) uint8. + + Uses pre-allocated cumsum/mean buffers (lazy-initialized per edge) to + avoid per-frame allocations that cause GC-induced timing spikes. + """ if edge_name in ("top", "bottom"): - edge_1d = edge_pixels.mean(axis=0) # mean across border_width + axis = 0 + edge_len = edge_pixels.shape[1] else: - edge_1d = edge_pixels.mean(axis=1) # mean across border_width + axis = 1 + edge_len = edge_pixels.shape[0] - edge_len = edge_1d.shape[0] + # Lazy-init / resize per-edge scratch buffers + cache = self._edge_cache.get(edge_name) + if cache is None or cache[0] != edge_len: + step = edge_len / led_count + boundaries = (np.arange(led_count + 1, dtype=np.float64) * step).astype(np.int64) + boundaries[1:] = np.maximum(boundaries[1:], boundaries[:-1] + 1) + np.minimum(boundaries, edge_len, out=boundaries) + starts = boundaries[:-1] + ends = boundaries[1:] + lengths = (ends - starts).reshape(-1, 1).astype(np.float64) + cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float64) + edge_1d_buf = np.empty((edge_len, 3), dtype=np.float64) + cache = (edge_len, starts, ends, lengths, cumsum_buf, edge_1d_buf) + self._edge_cache[edge_name] = cache - # Compute segment boundaries (matching get_edge_segments float stepping) - step = edge_len / led_count - boundaries = (np.arange(led_count + 1, dtype=np.float64) * step).astype(np.int64) - # Ensure each segment has at least 1 pixel - boundaries[1:] = np.maximum(boundaries[1:], boundaries[:-1] + 1) - # Clamp all boundaries to edge_len (not just the last one) - np.minimum(boundaries, edge_len, out=boundaries) + _, starts, ends, lengths, cumsum_buf, edge_1d_buf = cache - # Cumulative sum for O(1) range means — no per-LED Python numpy calls - cumsum = np.zeros((edge_len + 1, 3), dtype=np.float64) - cumsum[1:] = np.cumsum(edge_1d.astype(np.float64), axis=0) + # Mean into pre-allocated buffer (no intermediate float64 array) + np.mean(edge_pixels, axis=axis, out=edge_1d_buf) - starts = boundaries[:-1] - ends = boundaries[1:] - lengths = (ends - starts).reshape(-1, 1).astype(np.float64) - segment_sums = cumsum[ends] - cumsum[starts] + # Cumsum into pre-allocated buffer + cumsum_buf[0] = 0 + np.cumsum(edge_1d_buf, axis=0, out=cumsum_buf[1:]) + + segment_sums = cumsum_buf[ends] - cumsum_buf[starts] return np.clip(segment_sums / lengths, 0, 255).astype(np.uint8) def map_border_to_leds( @@ -286,6 +332,9 @@ class PixelMapper: ) -> np.ndarray: """Map screen border pixels to LED colors. + Uses pre-allocated buffers and pre-computed index arrays to avoid + per-frame allocations (np.zeros, np.roll, np.arange, np.linspace). + Args: border_pixels: Extracted border pixels from screen @@ -295,19 +344,14 @@ class PixelMapper: Raises: ValueError: If border pixels don't match calibration """ - total_leds = self.calibration.get_total_leds() - skip_start = self.calibration.skip_leds_start - skip_end = self.calibration.skip_leds_end - active_count = max(0, total_leds - skip_start - skip_end) - use_fast_avg = self.interpolation_mode == "average" + led_array = self._led_buf + led_array[:] = 0 - # Phase 1: Map full perimeter to total_leds positions (numpy for all modes) - led_array = np.zeros((total_leds, 3), dtype=np.uint8) - - for segment in self.calibration.segments: + # Phase 1+2: Map edges and place at offset-adjusted positions (no np.roll) + for i, segment in enumerate(self.calibration.segments): edge_pixels = self._get_edge_pixels(border_pixels, segment.edge) - if use_fast_avg: + if self._use_fast_avg: colors = self._map_edge_average( edge_pixels, segment.edge, segment.led_count ) @@ -316,30 +360,19 @@ class PixelMapper: edge_pixels, segment.edge, segment.led_count ) - led_indices = np.arange(segment.led_start, segment.led_start + segment.led_count) - if segment.reverse: - led_indices = led_indices[::-1] - led_array[led_indices] = colors - - # Phase 2: Offset rotation - offset = self.calibration.offset % total_leds if total_leds > 0 else 0 - if offset > 0: - led_array = np.roll(led_array, offset, axis=0) + led_array[self._segment_indices[i]] = colors # Phase 3: Physical skip — resample full perimeter to active LEDs - if active_count > 0 and active_count < total_leds: - src = np.linspace(0, total_leds - 1, active_count) - full_f = led_array.astype(np.float64) - x = np.arange(total_leds, dtype=np.float64) - resampled = np.empty((active_count, 3), dtype=np.uint8) + if self._skip_src is not None: + np.copyto(self._skip_float, led_array, casting='unsafe') for ch in range(3): - resampled[:, ch] = np.round( - np.interp(src, x, full_f[:, ch]) + self._skip_resampled[:, ch] = np.round( + np.interp(self._skip_src, self._skip_x, self._skip_float[:, ch]) ).astype(np.uint8) led_array[:] = 0 - end_idx = total_leds - skip_end - led_array[skip_start:end_idx] = resampled - elif active_count <= 0: + end_idx = self._total_leds - self._skip_end + led_array[self._skip_start:end_idx] = self._skip_resampled + elif self._active_count <= 0: led_array[:] = 0 return led_array