Pre-allocate PixelMapper buffers to eliminate GC-induced map_leds spikes
Reduces map_leds_ms timing spikes from 4ms to ~1.5ms by eliminating ~540KB/frame of numpy temporary allocations: - Pre-allocate _led_buf (reused instead of np.zeros per call) - Pre-compute offset-adjusted segment indices (eliminates np.roll copy) - Lazy-cache per-edge cumsum and mean buffers with np.mean/cumsum out= - Pre-compute Phase 3 skip resampling arrays in __init__ Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -195,8 +195,42 @@ class PixelMapper:
|
|||||||
else:
|
else:
|
||||||
raise ValueError(f"Invalid interpolation mode: {interpolation_mode}")
|
raise ValueError(f"Invalid interpolation mode: {interpolation_mode}")
|
||||||
|
|
||||||
|
# Pre-allocate LED output buffer (reused every call)
|
||||||
|
total_leds = calibration.get_total_leds()
|
||||||
|
self._total_leds = total_leds
|
||||||
|
self._led_buf = np.zeros((total_leds, 3), dtype=np.uint8)
|
||||||
|
self._use_fast_avg = interpolation_mode == "average"
|
||||||
|
|
||||||
|
# Pre-compute offset-adjusted index arrays per segment (avoids np.roll)
|
||||||
|
offset = calibration.offset % total_leds if total_leds > 0 else 0
|
||||||
|
self._segment_indices: List[np.ndarray] = []
|
||||||
|
for segment in calibration.segments:
|
||||||
|
indices = np.arange(segment.led_start, segment.led_start + segment.led_count)
|
||||||
|
if segment.reverse:
|
||||||
|
indices = indices[::-1]
|
||||||
|
if offset > 0:
|
||||||
|
indices = (indices + offset) % total_leds
|
||||||
|
self._segment_indices.append(indices)
|
||||||
|
|
||||||
|
# Pre-compute Phase 3 skip arrays (static geometry)
|
||||||
|
skip_start = calibration.skip_leds_start
|
||||||
|
skip_end = calibration.skip_leds_end
|
||||||
|
self._skip_start = skip_start
|
||||||
|
self._skip_end = skip_end
|
||||||
|
self._active_count = max(0, total_leds - skip_start - skip_end)
|
||||||
|
if 0 < self._active_count < total_leds:
|
||||||
|
self._skip_src = np.linspace(0, total_leds - 1, self._active_count)
|
||||||
|
self._skip_x = np.arange(total_leds, dtype=np.float64)
|
||||||
|
self._skip_float = np.empty((total_leds, 3), dtype=np.float64)
|
||||||
|
self._skip_resampled = np.empty((self._active_count, 3), dtype=np.uint8)
|
||||||
|
else:
|
||||||
|
self._skip_src = self._skip_x = self._skip_float = self._skip_resampled = None
|
||||||
|
|
||||||
|
# Per-edge average computation cache (lazy-initialized on first frame)
|
||||||
|
self._edge_cache: Dict[str, tuple] = {}
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Initialized pixel mapper with {self.calibration.get_total_leds()} LEDs "
|
f"Initialized pixel mapper with {total_leds} LEDs "
|
||||||
f"using {interpolation_mode} interpolation"
|
f"using {interpolation_mode} interpolation"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -253,31 +287,43 @@ class PixelMapper:
|
|||||||
def _map_edge_average(
|
def _map_edge_average(
|
||||||
self, edge_pixels: np.ndarray, edge_name: str, led_count: int
|
self, edge_pixels: np.ndarray, edge_name: str, led_count: int
|
||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
"""Vectorized average-color mapping for one edge. Returns (led_count, 3) uint8."""
|
"""Vectorized average-color mapping for one edge. Returns (led_count, 3) uint8.
|
||||||
# Reduce border dimension → 1D array of shape (edge_length, 3)
|
|
||||||
|
Uses pre-allocated cumsum/mean buffers (lazy-initialized per edge) to
|
||||||
|
avoid per-frame allocations that cause GC-induced timing spikes.
|
||||||
|
"""
|
||||||
if edge_name in ("top", "bottom"):
|
if edge_name in ("top", "bottom"):
|
||||||
edge_1d = edge_pixels.mean(axis=0) # mean across border_width
|
axis = 0
|
||||||
|
edge_len = edge_pixels.shape[1]
|
||||||
else:
|
else:
|
||||||
edge_1d = edge_pixels.mean(axis=1) # mean across border_width
|
axis = 1
|
||||||
|
edge_len = edge_pixels.shape[0]
|
||||||
|
|
||||||
edge_len = edge_1d.shape[0]
|
# Lazy-init / resize per-edge scratch buffers
|
||||||
|
cache = self._edge_cache.get(edge_name)
|
||||||
|
if cache is None or cache[0] != edge_len:
|
||||||
|
step = edge_len / led_count
|
||||||
|
boundaries = (np.arange(led_count + 1, dtype=np.float64) * step).astype(np.int64)
|
||||||
|
boundaries[1:] = np.maximum(boundaries[1:], boundaries[:-1] + 1)
|
||||||
|
np.minimum(boundaries, edge_len, out=boundaries)
|
||||||
|
starts = boundaries[:-1]
|
||||||
|
ends = boundaries[1:]
|
||||||
|
lengths = (ends - starts).reshape(-1, 1).astype(np.float64)
|
||||||
|
cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float64)
|
||||||
|
edge_1d_buf = np.empty((edge_len, 3), dtype=np.float64)
|
||||||
|
cache = (edge_len, starts, ends, lengths, cumsum_buf, edge_1d_buf)
|
||||||
|
self._edge_cache[edge_name] = cache
|
||||||
|
|
||||||
# Compute segment boundaries (matching get_edge_segments float stepping)
|
_, starts, ends, lengths, cumsum_buf, edge_1d_buf = cache
|
||||||
step = edge_len / led_count
|
|
||||||
boundaries = (np.arange(led_count + 1, dtype=np.float64) * step).astype(np.int64)
|
|
||||||
# Ensure each segment has at least 1 pixel
|
|
||||||
boundaries[1:] = np.maximum(boundaries[1:], boundaries[:-1] + 1)
|
|
||||||
# Clamp all boundaries to edge_len (not just the last one)
|
|
||||||
np.minimum(boundaries, edge_len, out=boundaries)
|
|
||||||
|
|
||||||
# Cumulative sum for O(1) range means — no per-LED Python numpy calls
|
# Mean into pre-allocated buffer (no intermediate float64 array)
|
||||||
cumsum = np.zeros((edge_len + 1, 3), dtype=np.float64)
|
np.mean(edge_pixels, axis=axis, out=edge_1d_buf)
|
||||||
cumsum[1:] = np.cumsum(edge_1d.astype(np.float64), axis=0)
|
|
||||||
|
|
||||||
starts = boundaries[:-1]
|
# Cumsum into pre-allocated buffer
|
||||||
ends = boundaries[1:]
|
cumsum_buf[0] = 0
|
||||||
lengths = (ends - starts).reshape(-1, 1).astype(np.float64)
|
np.cumsum(edge_1d_buf, axis=0, out=cumsum_buf[1:])
|
||||||
segment_sums = cumsum[ends] - cumsum[starts]
|
|
||||||
|
segment_sums = cumsum_buf[ends] - cumsum_buf[starts]
|
||||||
return np.clip(segment_sums / lengths, 0, 255).astype(np.uint8)
|
return np.clip(segment_sums / lengths, 0, 255).astype(np.uint8)
|
||||||
|
|
||||||
def map_border_to_leds(
|
def map_border_to_leds(
|
||||||
@@ -286,6 +332,9 @@ class PixelMapper:
|
|||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
"""Map screen border pixels to LED colors.
|
"""Map screen border pixels to LED colors.
|
||||||
|
|
||||||
|
Uses pre-allocated buffers and pre-computed index arrays to avoid
|
||||||
|
per-frame allocations (np.zeros, np.roll, np.arange, np.linspace).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
border_pixels: Extracted border pixels from screen
|
border_pixels: Extracted border pixels from screen
|
||||||
|
|
||||||
@@ -295,19 +344,14 @@ class PixelMapper:
|
|||||||
Raises:
|
Raises:
|
||||||
ValueError: If border pixels don't match calibration
|
ValueError: If border pixels don't match calibration
|
||||||
"""
|
"""
|
||||||
total_leds = self.calibration.get_total_leds()
|
led_array = self._led_buf
|
||||||
skip_start = self.calibration.skip_leds_start
|
led_array[:] = 0
|
||||||
skip_end = self.calibration.skip_leds_end
|
|
||||||
active_count = max(0, total_leds - skip_start - skip_end)
|
|
||||||
use_fast_avg = self.interpolation_mode == "average"
|
|
||||||
|
|
||||||
# Phase 1: Map full perimeter to total_leds positions (numpy for all modes)
|
# Phase 1+2: Map edges and place at offset-adjusted positions (no np.roll)
|
||||||
led_array = np.zeros((total_leds, 3), dtype=np.uint8)
|
for i, segment in enumerate(self.calibration.segments):
|
||||||
|
|
||||||
for segment in self.calibration.segments:
|
|
||||||
edge_pixels = self._get_edge_pixels(border_pixels, segment.edge)
|
edge_pixels = self._get_edge_pixels(border_pixels, segment.edge)
|
||||||
|
|
||||||
if use_fast_avg:
|
if self._use_fast_avg:
|
||||||
colors = self._map_edge_average(
|
colors = self._map_edge_average(
|
||||||
edge_pixels, segment.edge, segment.led_count
|
edge_pixels, segment.edge, segment.led_count
|
||||||
)
|
)
|
||||||
@@ -316,30 +360,19 @@ class PixelMapper:
|
|||||||
edge_pixels, segment.edge, segment.led_count
|
edge_pixels, segment.edge, segment.led_count
|
||||||
)
|
)
|
||||||
|
|
||||||
led_indices = np.arange(segment.led_start, segment.led_start + segment.led_count)
|
led_array[self._segment_indices[i]] = colors
|
||||||
if segment.reverse:
|
|
||||||
led_indices = led_indices[::-1]
|
|
||||||
led_array[led_indices] = colors
|
|
||||||
|
|
||||||
# Phase 2: Offset rotation
|
|
||||||
offset = self.calibration.offset % total_leds if total_leds > 0 else 0
|
|
||||||
if offset > 0:
|
|
||||||
led_array = np.roll(led_array, offset, axis=0)
|
|
||||||
|
|
||||||
# Phase 3: Physical skip — resample full perimeter to active LEDs
|
# Phase 3: Physical skip — resample full perimeter to active LEDs
|
||||||
if active_count > 0 and active_count < total_leds:
|
if self._skip_src is not None:
|
||||||
src = np.linspace(0, total_leds - 1, active_count)
|
np.copyto(self._skip_float, led_array, casting='unsafe')
|
||||||
full_f = led_array.astype(np.float64)
|
|
||||||
x = np.arange(total_leds, dtype=np.float64)
|
|
||||||
resampled = np.empty((active_count, 3), dtype=np.uint8)
|
|
||||||
for ch in range(3):
|
for ch in range(3):
|
||||||
resampled[:, ch] = np.round(
|
self._skip_resampled[:, ch] = np.round(
|
||||||
np.interp(src, x, full_f[:, ch])
|
np.interp(self._skip_src, self._skip_x, self._skip_float[:, ch])
|
||||||
).astype(np.uint8)
|
).astype(np.uint8)
|
||||||
led_array[:] = 0
|
led_array[:] = 0
|
||||||
end_idx = total_leds - skip_end
|
end_idx = self._total_leds - self._skip_end
|
||||||
led_array[skip_start:end_idx] = resampled
|
led_array[self._skip_start:end_idx] = self._skip_resampled
|
||||||
elif active_count <= 0:
|
elif self._active_count <= 0:
|
||||||
led_array[:] = 0
|
led_array[:] = 0
|
||||||
|
|
||||||
return led_array
|
return led_array
|
||||||
|
|||||||
Reference in New Issue
Block a user