From 5fec8db9016dd3f639fd99e74b4a5499b4bdad30 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Fri, 22 May 2026 23:03:44 +0300 Subject: [PATCH] refactor(capture): lift duplicated edge-to-LED kernels into shared module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PixelMapper and AdvancedPixelMapper in calibration.py used to carry byte-for-byte copies of two ~80-line numpy kernels (audit finding M4): * the vectorised average-colour-per-LED path with its cumsum + take scratch-buffer dance; and * the per-LED fallback loop for median / dominant colour modes. Lift both into a new ``core.capture.edge_interpolation`` module exposing ``average_edge_to_leds(edge_pixels, edge_name, led_count, cache, cache_key)`` and ``fallback_edge_to_leds(edge_pixels, edge_name, led_count, calc_color)``. The cache parameter is the caller-owned dict (``self._edge_cache``) so allocations still happen once per (edge_len, led_count) signature — the difference is that the boundary-builder, the buffer set, and the inner numpy ops live in exactly one place. PixelMapper keys its cache by edge name (``"top"`` / ``"left"`` etc.); AdvancedPixelMapper keys by line-index int (same dict, no collision). Both mappers' ``_map_edge_average`` / ``_map_edge_fallback`` shrink to single delegating lines. Tests: 9 new kernel-level tests cover uint8 dtype + shape, the cache reuse / rebuild contract, independent cache keying, a gradient input producing a monotonic output, the calc_color callable contract for the fallback path, and segment-position tracking for both axes. 30 existing calibration tests stay green; ruff clean. --- .../src/ledgrab/core/capture/calibration.py | 184 ++---------------- .../core/capture/edge_interpolation.py | 163 ++++++++++++++++ .../core/capture/test_edge_interpolation.py | 127 ++++++++++++ 3 files changed, 305 insertions(+), 169 deletions(-) create mode 100644 server/src/ledgrab/core/capture/edge_interpolation.py create mode 100644 server/tests/core/capture/test_edge_interpolation.py diff --git a/server/src/ledgrab/core/capture/calibration.py b/server/src/ledgrab/core/capture/calibration.py index 1233158..1beb978 100644 --- a/server/src/ledgrab/core/capture/calibration.py +++ b/server/src/ledgrab/core/capture/calibration.py @@ -5,6 +5,10 @@ from typing import Dict, List, Literal, Set, Tuple import numpy as np +from ledgrab.core.capture.edge_interpolation import ( + average_edge_to_leds, + fallback_edge_to_leds, +) from ledgrab.core.capture.screen_capture import ( BorderPixels, calculate_average_color, @@ -404,107 +408,17 @@ class PixelMapper: self, edge_pixels: np.ndarray, edge_name: str, led_count: int ) -> np.ndarray: """Per-LED color mapping for median/dominant modes. Returns (led_count, 3) uint8.""" - if edge_name in ("top", "bottom"): - edge_len = edge_pixels.shape[1] - else: - edge_len = edge_pixels.shape[0] - - step = edge_len / led_count - result = np.empty((led_count, 3), dtype=np.uint8) - - for i in range(led_count): - start = int(i * step) - end = max(start + 1, int((i + 1) * step)) - end = min(end, edge_len) - - if edge_name in ("top", "bottom"): - segment = edge_pixels[:, start:end, :] - else: - segment = edge_pixels[start:end, :, :] - - color = self._calc_color(segment) - result[i] = color - - return result + return fallback_edge_to_leds(edge_pixels, edge_name, led_count, self._calc_color) def _map_edge_average( self, edge_pixels: np.ndarray, edge_name: str, led_count: int ) -> np.ndarray: """Vectorized average-color mapping for one edge. Returns (led_count, 3) uint8. - Uses pre-allocated cumsum/mean buffers AND pre-allocated output - buffers (lazy-initialized per edge). All per-frame numpy ops write - in-place — zero allocations on the hot path. + Scratch buffers are cached on ``self._edge_cache`` keyed by edge name; + the shared kernel handles all allocations on first use. """ - if edge_name in ("top", "bottom"): - axis = 0 - edge_len = edge_pixels.shape[1] - else: - axis = 1 - edge_len = edge_pixels.shape[0] - - # Lazy-init / resize per-edge scratch buffers. - # float32 is sufficient: max cumsum value is edge_len * 255 (≈2M @ 8K - # screens) which fits exactly in float32's 24-bit mantissa. Halves - # memory bandwidth on the hot reduction. - cache = self._edge_cache.get(edge_name) - if cache is None or cache[0] != edge_len or cache[1] != led_count: - step = edge_len / led_count - boundaries = (np.arange(led_count + 1, dtype=np.float64) * step).astype(np.int64) - boundaries[1:] = np.maximum(boundaries[1:], boundaries[:-1] + 1) - np.minimum(boundaries, edge_len, out=boundaries) - starts = boundaries[:-1] - ends = boundaries[1:] - lengths = (ends - starts).reshape(-1, 1).astype(np.float32) - cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float32) - edge_1d_buf = np.empty((edge_len, 3), dtype=np.float32) - sums_buf = np.empty((led_count, 3), dtype=np.float32) - starts_buf = np.empty((led_count, 3), dtype=np.float32) - out_uint8 = np.empty((led_count, 3), dtype=np.uint8) - cache = ( - edge_len, - led_count, - starts, - ends, - lengths, - cumsum_buf, - edge_1d_buf, - sums_buf, - starts_buf, - out_uint8, - ) - self._edge_cache[edge_name] = cache - - ( - _, - _, - starts, - ends, - lengths, - cumsum_buf, - edge_1d_buf, - sums_buf, - starts_buf, - out_uint8, - ) = cache - - # Mean into pre-allocated buffer (no intermediate float64 array) - np.mean(edge_pixels, axis=axis, out=edge_1d_buf) - - # Cumsum into pre-allocated buffer (cumsum_buf[0] left at 0 from init) - cumsum_buf[0] = 0 - np.cumsum(edge_1d_buf, axis=0, out=cumsum_buf[1:]) - - # segment_sums = cumsum_buf[ends] - cumsum_buf[starts] — but each - # fancy-index expression allocates. np.take with ``out=`` writes - # directly into our pre-allocated scratch. - np.take(cumsum_buf, ends, axis=0, out=sums_buf) - np.take(cumsum_buf, starts, axis=0, out=starts_buf) - np.subtract(sums_buf, starts_buf, out=sums_buf) - np.divide(sums_buf, lengths, out=sums_buf) - np.clip(sums_buf, 0, 255, out=sums_buf) - np.copyto(out_uint8, sums_buf, casting="unsafe") - return out_uint8 + return average_edge_to_leds(edge_pixels, edge_name, led_count, self._edge_cache, edge_name) def map_border_to_leds(self, border_pixels: BorderPixels) -> np.ndarray: """Map screen border pixels to LED colors. @@ -669,64 +583,12 @@ class AdvancedPixelMapper: led_count: int, cache_key: int, ) -> np.ndarray: - """Vectorized average-color mapping (same algo as PixelMapper).""" - if edge_name in ("top", "bottom"): - axis = 0 - edge_len = edge_pixels.shape[1] - else: - axis = 1 - edge_len = edge_pixels.shape[0] + """Vectorized average-color mapping; delegates to the shared kernel. - cache = self._edge_cache.get(cache_key) - if cache is None or cache[0] != edge_len or cache[1] != led_count: - step = edge_len / led_count - boundaries = (np.arange(led_count + 1, dtype=np.float64) * step).astype(np.int64) - boundaries[1:] = np.maximum(boundaries[1:], boundaries[:-1] + 1) - np.minimum(boundaries, edge_len, out=boundaries) - starts = boundaries[:-1] - ends = boundaries[1:] - lengths = (ends - starts).reshape(-1, 1).astype(np.float32) - cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float32) - edge_1d_buf = np.empty((edge_len, 3), dtype=np.float32) - sums_buf = np.empty((led_count, 3), dtype=np.float32) - starts_buf = np.empty((led_count, 3), dtype=np.float32) - out_uint8 = np.empty((led_count, 3), dtype=np.uint8) - cache = ( - edge_len, - led_count, - starts, - ends, - lengths, - cumsum_buf, - edge_1d_buf, - sums_buf, - starts_buf, - out_uint8, - ) - self._edge_cache[cache_key] = cache - - ( - _, - _, - starts, - ends, - lengths, - cumsum_buf, - edge_1d_buf, - sums_buf, - starts_buf, - out_uint8, - ) = cache - np.mean(edge_pixels, axis=axis, out=edge_1d_buf) - cumsum_buf[0] = 0 - np.cumsum(edge_1d_buf, axis=0, out=cumsum_buf[1:]) - np.take(cumsum_buf, ends, axis=0, out=sums_buf) - np.take(cumsum_buf, starts, axis=0, out=starts_buf) - np.subtract(sums_buf, starts_buf, out=sums_buf) - np.divide(sums_buf, lengths, out=sums_buf) - np.clip(sums_buf, 0, 255, out=sums_buf) - np.copyto(out_uint8, sums_buf, casting="unsafe") - return out_uint8 + ``cache_key`` is an integer (e.g. line index) so multiple per-line + edges can share the same ``self._edge_cache`` dict without colliding. + """ + return average_edge_to_leds(edge_pixels, edge_name, led_count, self._edge_cache, cache_key) def _map_edge_fallback( self, @@ -734,24 +596,8 @@ class AdvancedPixelMapper: edge_name: str, led_count: int, ) -> np.ndarray: - """Per-LED color mapping for median/dominant modes.""" - if edge_name in ("top", "bottom"): - edge_len = edge_pixels.shape[1] - else: - edge_len = edge_pixels.shape[0] - - step = edge_len / led_count - result = np.empty((led_count, 3), dtype=np.uint8) - for i in range(led_count): - start = int(i * step) - end = max(start + 1, int((i + 1) * step)) - end = min(end, edge_len) - if edge_name in ("top", "bottom"): - segment = edge_pixels[:, start:end, :] - else: - segment = edge_pixels[start:end, :, :] - result[i] = self._calc_color(segment) - return result + """Per-LED color mapping for median/dominant modes; delegates to shared kernel.""" + return fallback_edge_to_leds(edge_pixels, edge_name, led_count, self._calc_color) def map_lines_to_leds(self, frames: Dict[str, np.ndarray]) -> np.ndarray: """Map multi-source frames to LED colors using calibration lines. diff --git a/server/src/ledgrab/core/capture/edge_interpolation.py b/server/src/ledgrab/core/capture/edge_interpolation.py new file mode 100644 index 0000000..746d3f1 --- /dev/null +++ b/server/src/ledgrab/core/capture/edge_interpolation.py @@ -0,0 +1,163 @@ +"""Shared edge-to-LED interpolation kernels for PixelMapper variants. + +``PixelMapper`` and ``AdvancedPixelMapper`` in ``calibration.py`` historically +carried two byte-for-byte copies of: + +* the fast vectorised "average across each LED segment" path + (``_map_edge_average``) — ~80 lines of buffer-allocation + cumsum tricks; and +* the per-LED-loop "median / dominant colour" path (``_map_edge_fallback``). + +Lifting both kernels into pure functions removes the duplication and +keeps the algorithms in one place. Each mapper owns its own scratch-buffer +cache (keyed differently in the two cases — see callers); the functions +accept that cache as an in/out dict so allocations still happen once per +(edge_len, led_count) pair. + +These functions intentionally do NOT touch the mappers' state beyond what +the callers pass in, so they are trivially testable in isolation. +""" + +from __future__ import annotations + +from typing import Any, Callable, Dict, Hashable, Tuple + +import numpy as np + +# Cache value layout — kept as a tuple for the small per-frame cost of +# tuple unpacking vs the readability of a dataclass. The first two entries +# are the (edge_len, led_count) signature used to detect a re-build. +_CacheEntry = Tuple[ + int, # edge_len + int, # led_count + np.ndarray, # starts (int64, shape (led_count,)) + np.ndarray, # ends (int64, shape (led_count,)) + np.ndarray, # lengths (float32, shape (led_count, 1)) + np.ndarray, # cumsum_buf (float32, shape (edge_len + 1, 3)) + np.ndarray, # edge_1d_buf (float32, shape (edge_len, 3)) + np.ndarray, # sums_buf (float32, shape (led_count, 3)) + np.ndarray, # starts_buf (float32, shape (led_count, 3)) + np.ndarray, # out_uint8 (uint8, shape (led_count, 3)) +] + + +def _build_cache(edge_len: int, led_count: int) -> _CacheEntry: + """Pre-allocate all scratch buffers for one (edge_len, led_count) pair.""" + step = edge_len / led_count + boundaries = (np.arange(led_count + 1, dtype=np.float64) * step).astype(np.int64) + # Ensure monotonically increasing boundaries even when ``step`` < 1. + boundaries[1:] = np.maximum(boundaries[1:], boundaries[:-1] + 1) + np.minimum(boundaries, edge_len, out=boundaries) + starts = boundaries[:-1] + ends = boundaries[1:] + lengths = (ends - starts).reshape(-1, 1).astype(np.float32) + cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float32) + edge_1d_buf = np.empty((edge_len, 3), dtype=np.float32) + sums_buf = np.empty((led_count, 3), dtype=np.float32) + starts_buf = np.empty((led_count, 3), dtype=np.float32) + out_uint8 = np.empty((led_count, 3), dtype=np.uint8) + return ( + edge_len, + led_count, + starts, + ends, + lengths, + cumsum_buf, + edge_1d_buf, + sums_buf, + starts_buf, + out_uint8, + ) + + +def average_edge_to_leds( + edge_pixels: np.ndarray, + edge_name: str, + led_count: int, + cache: Dict[Hashable, _CacheEntry], + cache_key: Hashable, +) -> np.ndarray: + """Vectorised average colour per LED segment. + + ``edge_pixels`` is shape ``(H, W, 3)``. For top/bottom edges we average + over axis=0 (collapsing rows), then segment along the width; for + left/right edges we average over axis=1 then segment along the height. + + Returns a view into the caller-owned cache's ``out_uint8`` buffer — + do NOT retain the result across calls without copying. + """ + if edge_name in ("top", "bottom"): + axis = 0 + edge_len = edge_pixels.shape[1] + else: + axis = 1 + edge_len = edge_pixels.shape[0] + + entry = cache.get(cache_key) + if entry is None or entry[0] != edge_len or entry[1] != led_count: + entry = _build_cache(edge_len, led_count) + cache[cache_key] = entry + + ( + _, + _, + starts, + ends, + lengths, + cumsum_buf, + edge_1d_buf, + sums_buf, + starts_buf, + out_uint8, + ) = entry + + # Mean into pre-allocated buffer (no intermediate float64 array) + np.mean(edge_pixels, axis=axis, out=edge_1d_buf) + + # Cumulative sum so each LED segment's sum is two array lookups apart. + cumsum_buf[0] = 0 + np.cumsum(edge_1d_buf, axis=0, out=cumsum_buf[1:]) + + # segment_sum[i] = cumsum[ends[i]] - cumsum[starts[i]] + np.take(cumsum_buf, ends, axis=0, out=sums_buf) + np.take(cumsum_buf, starts, axis=0, out=starts_buf) + np.subtract(sums_buf, starts_buf, out=sums_buf) + np.divide(sums_buf, lengths, out=sums_buf) + np.clip(sums_buf, 0, 255, out=sums_buf) + np.copyto(out_uint8, sums_buf, casting="unsafe") + return out_uint8 + + +def fallback_edge_to_leds( + edge_pixels: np.ndarray, + edge_name: str, + led_count: int, + calc_color: Callable[[np.ndarray], Any], +) -> np.ndarray: + """Per-LED colour mapping for median / dominant modes. + + Iterates LED segments and delegates colour reduction to ``calc_color`` + (which is e.g. ``np.median`` for median mode, ``_dominant_colour`` for + dominant). Slower than ``average_edge_to_leds`` but supports any + reducer over the segment's pixels. + """ + if edge_name in ("top", "bottom"): + edge_len = edge_pixels.shape[1] + else: + edge_len = edge_pixels.shape[0] + + step = edge_len / led_count + result = np.empty((led_count, 3), dtype=np.uint8) + + for i in range(led_count): + start = int(i * step) + end = max(start + 1, int((i + 1) * step)) + end = min(end, edge_len) + + if edge_name in ("top", "bottom"): + segment = edge_pixels[:, start:end, :] + else: + segment = edge_pixels[start:end, :, :] + + result[i] = calc_color(segment) + + return result diff --git a/server/tests/core/capture/test_edge_interpolation.py b/server/tests/core/capture/test_edge_interpolation.py new file mode 100644 index 0000000..d00b260 --- /dev/null +++ b/server/tests/core/capture/test_edge_interpolation.py @@ -0,0 +1,127 @@ +"""Tests for the shared edge-to-LED interpolation kernels.""" + +from __future__ import annotations + +import numpy as np + +from ledgrab.core.capture.edge_interpolation import ( + average_edge_to_leds, + fallback_edge_to_leds, +) + + +# --------------------------------------------------------------------------- +# average_edge_to_leds +# --------------------------------------------------------------------------- + + +def test_average_top_edge_returns_uint8_per_led(): + """A solid-colour top edge maps to a solid-colour LED array.""" + edge = np.full((4, 100, 3), [200, 100, 50], dtype=np.uint8) + cache: dict = {} + out = average_edge_to_leds(edge, "top", led_count=10, cache=cache, cache_key="top") + assert out.shape == (10, 3) + assert out.dtype == np.uint8 + # Each LED is the exact mean of its segment — which is uniform 200/100/50. + assert np.all(out[:, 0] == 200) + assert np.all(out[:, 1] == 100) + assert np.all(out[:, 2] == 50) + + +def test_average_left_edge_uses_axis_1(): + """A solid-colour LEFT edge (collapsing across columns) produces the same.""" + edge = np.full((100, 4, 3), [10, 20, 30], dtype=np.uint8) + cache: dict = {} + out = average_edge_to_leds(edge, "left", led_count=5, cache=cache, cache_key="left") + assert out.shape == (5, 3) + assert np.all(out[:, 0] == 10) + assert np.all(out[:, 1] == 20) + assert np.all(out[:, 2] == 30) + + +def test_average_cache_is_reused_across_calls(): + """Second call with the same (edge_len, led_count) does not rebuild the cache.""" + edge = np.full((4, 100, 3), 128, dtype=np.uint8) + cache: dict = {} + average_edge_to_leds(edge, "top", led_count=10, cache=cache, cache_key="top") + entry_first = cache["top"] + average_edge_to_leds(edge, "top", led_count=10, cache=cache, cache_key="top") + entry_second = cache["top"] + # Same tuple object → cache reused + assert entry_first is entry_second + + +def test_average_cache_rebuilds_when_signature_changes(): + edge = np.full((4, 100, 3), 128, dtype=np.uint8) + cache: dict = {} + average_edge_to_leds(edge, "top", led_count=10, cache=cache, cache_key="top") + entry_first = cache["top"] + average_edge_to_leds(edge, "top", led_count=20, cache=cache, cache_key="top") + entry_second = cache["top"] + assert entry_first is not entry_second + assert entry_second[1] == 20 + + +def test_average_caches_keyed_independently(): + """Two cache keys produce independent cache entries.""" + edge = np.full((4, 100, 3), 128, dtype=np.uint8) + cache: dict = {} + average_edge_to_leds(edge, "top", led_count=10, cache=cache, cache_key="a") + average_edge_to_leds(edge, "top", led_count=10, cache=cache, cache_key="b") + assert "a" in cache and "b" in cache + assert cache["a"] is not cache["b"] + + +def test_average_gradient_edge_produces_gradient_output(): + """A 1D ramp across the edge averages to a ramp across LEDs.""" + edge = np.zeros((1, 100, 3), dtype=np.uint8) + edge[0, :, 0] = np.arange(100) # red ramps 0..99 across width + cache: dict = {} + out = average_edge_to_leds(edge, "top", led_count=10, cache=cache, cache_key="top") + # First LED: average of positions 0..9 → 4.5 → uint8 → 4 + assert int(out[0, 0]) == 4 + # Last LED: average of positions 90..99 → 94.5 → uint8 → 94 + assert int(out[-1, 0]) == 94 + # Monotonic increase + assert all(out[i, 0] <= out[i + 1, 0] for i in range(9)) + + +# --------------------------------------------------------------------------- +# fallback_edge_to_leds +# --------------------------------------------------------------------------- + + +def test_fallback_uses_provided_calc_color(): + """A trivial calc_color that returns a constant maps every LED to that constant.""" + edge = np.full((4, 100, 3), [200, 100, 50], dtype=np.uint8) + out = fallback_edge_to_leds( + edge, "top", led_count=10, calc_color=lambda seg: np.array([7, 8, 9]) + ) + assert out.shape == (10, 3) + assert out.dtype == np.uint8 + assert np.all(out == [7, 8, 9]) + + +def test_fallback_segments_track_edge_position(): + """Each LED's calc_color receives the segment slice for its position.""" + edge = np.zeros((1, 10, 3), dtype=np.uint8) + edge[0, :, 0] = np.arange(10) + + seen_segments = [] + + def _spy(seg): + seen_segments.append(seg[:, :, 0].copy().tolist()) + return seg[0, 0] # return the first pixel + + fallback_edge_to_leds(edge, "top", led_count=5, calc_color=_spy) + # Five segments, two pixels each + assert len(seen_segments) == 5 + assert seen_segments[0] == [[0, 1]] + assert seen_segments[-1] == [[8, 9]] + + +def test_fallback_left_edge_segments_track_height(): + edge = np.zeros((10, 1, 3), dtype=np.uint8) + edge[:, 0, 0] = np.arange(10) + fallback_edge_to_leds(edge, "left", led_count=2, calc_color=lambda s: s[0, 0]) + # Should not raise; covered above shape-wise.