refactor(capture): lift duplicated edge-to-LED kernels into shared module
PixelMapper and AdvancedPixelMapper in calibration.py used to carry
byte-for-byte copies of two ~80-line numpy kernels (audit finding M4):
* the vectorised average-colour-per-LED path with its cumsum + take
scratch-buffer dance; and
* the per-LED fallback loop for median / dominant colour modes.
Lift both into a new ``core.capture.edge_interpolation`` module exposing
``average_edge_to_leds(edge_pixels, edge_name, led_count, cache,
cache_key)`` and ``fallback_edge_to_leds(edge_pixels, edge_name,
led_count, calc_color)``. The cache parameter is the caller-owned dict
(``self._edge_cache``) so allocations still happen once per
(edge_len, led_count) signature — the difference is that the
boundary-builder, the buffer set, and the inner numpy ops live in
exactly one place.
PixelMapper keys its cache by edge name (``"top"`` / ``"left"`` etc.);
AdvancedPixelMapper keys by line-index int (same dict, no collision).
Both mappers' ``_map_edge_average`` / ``_map_edge_fallback`` shrink to
single delegating lines.
Tests: 9 new kernel-level tests cover uint8 dtype + shape, the cache
reuse / rebuild contract, independent cache keying, a gradient input
producing a monotonic output, the calc_color callable contract for the
fallback path, and segment-position tracking for both axes. 30
existing calibration tests stay green; ruff clean.
This commit is contained in:
@@ -5,6 +5,10 @@ from typing import Dict, List, Literal, Set, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
from ledgrab.core.capture.edge_interpolation import (
|
||||
average_edge_to_leds,
|
||||
fallback_edge_to_leds,
|
||||
)
|
||||
from ledgrab.core.capture.screen_capture import (
|
||||
BorderPixels,
|
||||
calculate_average_color,
|
||||
@@ -404,107 +408,17 @@ class PixelMapper:
|
||||
self, edge_pixels: np.ndarray, edge_name: str, led_count: int
|
||||
) -> np.ndarray:
|
||||
"""Per-LED color mapping for median/dominant modes. Returns (led_count, 3) uint8."""
|
||||
if edge_name in ("top", "bottom"):
|
||||
edge_len = edge_pixels.shape[1]
|
||||
else:
|
||||
edge_len = edge_pixels.shape[0]
|
||||
|
||||
step = edge_len / led_count
|
||||
result = np.empty((led_count, 3), dtype=np.uint8)
|
||||
|
||||
for i in range(led_count):
|
||||
start = int(i * step)
|
||||
end = max(start + 1, int((i + 1) * step))
|
||||
end = min(end, edge_len)
|
||||
|
||||
if edge_name in ("top", "bottom"):
|
||||
segment = edge_pixels[:, start:end, :]
|
||||
else:
|
||||
segment = edge_pixels[start:end, :, :]
|
||||
|
||||
color = self._calc_color(segment)
|
||||
result[i] = color
|
||||
|
||||
return result
|
||||
return fallback_edge_to_leds(edge_pixels, edge_name, led_count, self._calc_color)
|
||||
|
||||
def _map_edge_average(
|
||||
self, edge_pixels: np.ndarray, edge_name: str, led_count: int
|
||||
) -> np.ndarray:
|
||||
"""Vectorized average-color mapping for one edge. Returns (led_count, 3) uint8.
|
||||
|
||||
Uses pre-allocated cumsum/mean buffers AND pre-allocated output
|
||||
buffers (lazy-initialized per edge). All per-frame numpy ops write
|
||||
in-place — zero allocations on the hot path.
|
||||
Scratch buffers are cached on ``self._edge_cache`` keyed by edge name;
|
||||
the shared kernel handles all allocations on first use.
|
||||
"""
|
||||
if edge_name in ("top", "bottom"):
|
||||
axis = 0
|
||||
edge_len = edge_pixels.shape[1]
|
||||
else:
|
||||
axis = 1
|
||||
edge_len = edge_pixels.shape[0]
|
||||
|
||||
# Lazy-init / resize per-edge scratch buffers.
|
||||
# float32 is sufficient: max cumsum value is edge_len * 255 (≈2M @ 8K
|
||||
# screens) which fits exactly in float32's 24-bit mantissa. Halves
|
||||
# memory bandwidth on the hot reduction.
|
||||
cache = self._edge_cache.get(edge_name)
|
||||
if cache is None or cache[0] != edge_len or cache[1] != led_count:
|
||||
step = edge_len / led_count
|
||||
boundaries = (np.arange(led_count + 1, dtype=np.float64) * step).astype(np.int64)
|
||||
boundaries[1:] = np.maximum(boundaries[1:], boundaries[:-1] + 1)
|
||||
np.minimum(boundaries, edge_len, out=boundaries)
|
||||
starts = boundaries[:-1]
|
||||
ends = boundaries[1:]
|
||||
lengths = (ends - starts).reshape(-1, 1).astype(np.float32)
|
||||
cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float32)
|
||||
edge_1d_buf = np.empty((edge_len, 3), dtype=np.float32)
|
||||
sums_buf = np.empty((led_count, 3), dtype=np.float32)
|
||||
starts_buf = np.empty((led_count, 3), dtype=np.float32)
|
||||
out_uint8 = np.empty((led_count, 3), dtype=np.uint8)
|
||||
cache = (
|
||||
edge_len,
|
||||
led_count,
|
||||
starts,
|
||||
ends,
|
||||
lengths,
|
||||
cumsum_buf,
|
||||
edge_1d_buf,
|
||||
sums_buf,
|
||||
starts_buf,
|
||||
out_uint8,
|
||||
)
|
||||
self._edge_cache[edge_name] = cache
|
||||
|
||||
(
|
||||
_,
|
||||
_,
|
||||
starts,
|
||||
ends,
|
||||
lengths,
|
||||
cumsum_buf,
|
||||
edge_1d_buf,
|
||||
sums_buf,
|
||||
starts_buf,
|
||||
out_uint8,
|
||||
) = cache
|
||||
|
||||
# Mean into pre-allocated buffer (no intermediate float64 array)
|
||||
np.mean(edge_pixels, axis=axis, out=edge_1d_buf)
|
||||
|
||||
# Cumsum into pre-allocated buffer (cumsum_buf[0] left at 0 from init)
|
||||
cumsum_buf[0] = 0
|
||||
np.cumsum(edge_1d_buf, axis=0, out=cumsum_buf[1:])
|
||||
|
||||
# segment_sums = cumsum_buf[ends] - cumsum_buf[starts] — but each
|
||||
# fancy-index expression allocates. np.take with ``out=`` writes
|
||||
# directly into our pre-allocated scratch.
|
||||
np.take(cumsum_buf, ends, axis=0, out=sums_buf)
|
||||
np.take(cumsum_buf, starts, axis=0, out=starts_buf)
|
||||
np.subtract(sums_buf, starts_buf, out=sums_buf)
|
||||
np.divide(sums_buf, lengths, out=sums_buf)
|
||||
np.clip(sums_buf, 0, 255, out=sums_buf)
|
||||
np.copyto(out_uint8, sums_buf, casting="unsafe")
|
||||
return out_uint8
|
||||
return average_edge_to_leds(edge_pixels, edge_name, led_count, self._edge_cache, edge_name)
|
||||
|
||||
def map_border_to_leds(self, border_pixels: BorderPixels) -> np.ndarray:
|
||||
"""Map screen border pixels to LED colors.
|
||||
@@ -669,64 +583,12 @@ class AdvancedPixelMapper:
|
||||
led_count: int,
|
||||
cache_key: int,
|
||||
) -> np.ndarray:
|
||||
"""Vectorized average-color mapping (same algo as PixelMapper)."""
|
||||
if edge_name in ("top", "bottom"):
|
||||
axis = 0
|
||||
edge_len = edge_pixels.shape[1]
|
||||
else:
|
||||
axis = 1
|
||||
edge_len = edge_pixels.shape[0]
|
||||
"""Vectorized average-color mapping; delegates to the shared kernel.
|
||||
|
||||
cache = self._edge_cache.get(cache_key)
|
||||
if cache is None or cache[0] != edge_len or cache[1] != led_count:
|
||||
step = edge_len / led_count
|
||||
boundaries = (np.arange(led_count + 1, dtype=np.float64) * step).astype(np.int64)
|
||||
boundaries[1:] = np.maximum(boundaries[1:], boundaries[:-1] + 1)
|
||||
np.minimum(boundaries, edge_len, out=boundaries)
|
||||
starts = boundaries[:-1]
|
||||
ends = boundaries[1:]
|
||||
lengths = (ends - starts).reshape(-1, 1).astype(np.float32)
|
||||
cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float32)
|
||||
edge_1d_buf = np.empty((edge_len, 3), dtype=np.float32)
|
||||
sums_buf = np.empty((led_count, 3), dtype=np.float32)
|
||||
starts_buf = np.empty((led_count, 3), dtype=np.float32)
|
||||
out_uint8 = np.empty((led_count, 3), dtype=np.uint8)
|
||||
cache = (
|
||||
edge_len,
|
||||
led_count,
|
||||
starts,
|
||||
ends,
|
||||
lengths,
|
||||
cumsum_buf,
|
||||
edge_1d_buf,
|
||||
sums_buf,
|
||||
starts_buf,
|
||||
out_uint8,
|
||||
)
|
||||
self._edge_cache[cache_key] = cache
|
||||
|
||||
(
|
||||
_,
|
||||
_,
|
||||
starts,
|
||||
ends,
|
||||
lengths,
|
||||
cumsum_buf,
|
||||
edge_1d_buf,
|
||||
sums_buf,
|
||||
starts_buf,
|
||||
out_uint8,
|
||||
) = cache
|
||||
np.mean(edge_pixels, axis=axis, out=edge_1d_buf)
|
||||
cumsum_buf[0] = 0
|
||||
np.cumsum(edge_1d_buf, axis=0, out=cumsum_buf[1:])
|
||||
np.take(cumsum_buf, ends, axis=0, out=sums_buf)
|
||||
np.take(cumsum_buf, starts, axis=0, out=starts_buf)
|
||||
np.subtract(sums_buf, starts_buf, out=sums_buf)
|
||||
np.divide(sums_buf, lengths, out=sums_buf)
|
||||
np.clip(sums_buf, 0, 255, out=sums_buf)
|
||||
np.copyto(out_uint8, sums_buf, casting="unsafe")
|
||||
return out_uint8
|
||||
``cache_key`` is an integer (e.g. line index) so multiple per-line
|
||||
edges can share the same ``self._edge_cache`` dict without colliding.
|
||||
"""
|
||||
return average_edge_to_leds(edge_pixels, edge_name, led_count, self._edge_cache, cache_key)
|
||||
|
||||
def _map_edge_fallback(
|
||||
self,
|
||||
@@ -734,24 +596,8 @@ class AdvancedPixelMapper:
|
||||
edge_name: str,
|
||||
led_count: int,
|
||||
) -> np.ndarray:
|
||||
"""Per-LED color mapping for median/dominant modes."""
|
||||
if edge_name in ("top", "bottom"):
|
||||
edge_len = edge_pixels.shape[1]
|
||||
else:
|
||||
edge_len = edge_pixels.shape[0]
|
||||
|
||||
step = edge_len / led_count
|
||||
result = np.empty((led_count, 3), dtype=np.uint8)
|
||||
for i in range(led_count):
|
||||
start = int(i * step)
|
||||
end = max(start + 1, int((i + 1) * step))
|
||||
end = min(end, edge_len)
|
||||
if edge_name in ("top", "bottom"):
|
||||
segment = edge_pixels[:, start:end, :]
|
||||
else:
|
||||
segment = edge_pixels[start:end, :, :]
|
||||
result[i] = self._calc_color(segment)
|
||||
return result
|
||||
"""Per-LED color mapping for median/dominant modes; delegates to shared kernel."""
|
||||
return fallback_edge_to_leds(edge_pixels, edge_name, led_count, self._calc_color)
|
||||
|
||||
def map_lines_to_leds(self, frames: Dict[str, np.ndarray]) -> np.ndarray:
|
||||
"""Map multi-source frames to LED colors using calibration lines.
|
||||
|
||||
@@ -0,0 +1,163 @@
|
||||
"""Shared edge-to-LED interpolation kernels for PixelMapper variants.
|
||||
|
||||
``PixelMapper`` and ``AdvancedPixelMapper`` in ``calibration.py`` historically
|
||||
carried two byte-for-byte copies of:
|
||||
|
||||
* the fast vectorised "average across each LED segment" path
|
||||
(``_map_edge_average``) — ~80 lines of buffer-allocation + cumsum tricks; and
|
||||
* the per-LED-loop "median / dominant colour" path (``_map_edge_fallback``).
|
||||
|
||||
Lifting both kernels into pure functions removes the duplication and
|
||||
keeps the algorithms in one place. Each mapper owns its own scratch-buffer
|
||||
cache (keyed differently in the two cases — see callers); the functions
|
||||
accept that cache as an in/out dict so allocations still happen once per
|
||||
(edge_len, led_count) pair.
|
||||
|
||||
These functions intentionally do NOT touch the mappers' state beyond what
|
||||
the callers pass in, so they are trivially testable in isolation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Callable, Dict, Hashable, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
# Cache value layout — kept as a tuple for the small per-frame cost of
|
||||
# tuple unpacking vs the readability of a dataclass. The first two entries
|
||||
# are the (edge_len, led_count) signature used to detect a re-build.
|
||||
_CacheEntry = Tuple[
|
||||
int, # edge_len
|
||||
int, # led_count
|
||||
np.ndarray, # starts (int64, shape (led_count,))
|
||||
np.ndarray, # ends (int64, shape (led_count,))
|
||||
np.ndarray, # lengths (float32, shape (led_count, 1))
|
||||
np.ndarray, # cumsum_buf (float32, shape (edge_len + 1, 3))
|
||||
np.ndarray, # edge_1d_buf (float32, shape (edge_len, 3))
|
||||
np.ndarray, # sums_buf (float32, shape (led_count, 3))
|
||||
np.ndarray, # starts_buf (float32, shape (led_count, 3))
|
||||
np.ndarray, # out_uint8 (uint8, shape (led_count, 3))
|
||||
]
|
||||
|
||||
|
||||
def _build_cache(edge_len: int, led_count: int) -> _CacheEntry:
|
||||
"""Pre-allocate all scratch buffers for one (edge_len, led_count) pair."""
|
||||
step = edge_len / led_count
|
||||
boundaries = (np.arange(led_count + 1, dtype=np.float64) * step).astype(np.int64)
|
||||
# Ensure monotonically increasing boundaries even when ``step`` < 1.
|
||||
boundaries[1:] = np.maximum(boundaries[1:], boundaries[:-1] + 1)
|
||||
np.minimum(boundaries, edge_len, out=boundaries)
|
||||
starts = boundaries[:-1]
|
||||
ends = boundaries[1:]
|
||||
lengths = (ends - starts).reshape(-1, 1).astype(np.float32)
|
||||
cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float32)
|
||||
edge_1d_buf = np.empty((edge_len, 3), dtype=np.float32)
|
||||
sums_buf = np.empty((led_count, 3), dtype=np.float32)
|
||||
starts_buf = np.empty((led_count, 3), dtype=np.float32)
|
||||
out_uint8 = np.empty((led_count, 3), dtype=np.uint8)
|
||||
return (
|
||||
edge_len,
|
||||
led_count,
|
||||
starts,
|
||||
ends,
|
||||
lengths,
|
||||
cumsum_buf,
|
||||
edge_1d_buf,
|
||||
sums_buf,
|
||||
starts_buf,
|
||||
out_uint8,
|
||||
)
|
||||
|
||||
|
||||
def average_edge_to_leds(
|
||||
edge_pixels: np.ndarray,
|
||||
edge_name: str,
|
||||
led_count: int,
|
||||
cache: Dict[Hashable, _CacheEntry],
|
||||
cache_key: Hashable,
|
||||
) -> np.ndarray:
|
||||
"""Vectorised average colour per LED segment.
|
||||
|
||||
``edge_pixels`` is shape ``(H, W, 3)``. For top/bottom edges we average
|
||||
over axis=0 (collapsing rows), then segment along the width; for
|
||||
left/right edges we average over axis=1 then segment along the height.
|
||||
|
||||
Returns a view into the caller-owned cache's ``out_uint8`` buffer —
|
||||
do NOT retain the result across calls without copying.
|
||||
"""
|
||||
if edge_name in ("top", "bottom"):
|
||||
axis = 0
|
||||
edge_len = edge_pixels.shape[1]
|
||||
else:
|
||||
axis = 1
|
||||
edge_len = edge_pixels.shape[0]
|
||||
|
||||
entry = cache.get(cache_key)
|
||||
if entry is None or entry[0] != edge_len or entry[1] != led_count:
|
||||
entry = _build_cache(edge_len, led_count)
|
||||
cache[cache_key] = entry
|
||||
|
||||
(
|
||||
_,
|
||||
_,
|
||||
starts,
|
||||
ends,
|
||||
lengths,
|
||||
cumsum_buf,
|
||||
edge_1d_buf,
|
||||
sums_buf,
|
||||
starts_buf,
|
||||
out_uint8,
|
||||
) = entry
|
||||
|
||||
# Mean into pre-allocated buffer (no intermediate float64 array)
|
||||
np.mean(edge_pixels, axis=axis, out=edge_1d_buf)
|
||||
|
||||
# Cumulative sum so each LED segment's sum is two array lookups apart.
|
||||
cumsum_buf[0] = 0
|
||||
np.cumsum(edge_1d_buf, axis=0, out=cumsum_buf[1:])
|
||||
|
||||
# segment_sum[i] = cumsum[ends[i]] - cumsum[starts[i]]
|
||||
np.take(cumsum_buf, ends, axis=0, out=sums_buf)
|
||||
np.take(cumsum_buf, starts, axis=0, out=starts_buf)
|
||||
np.subtract(sums_buf, starts_buf, out=sums_buf)
|
||||
np.divide(sums_buf, lengths, out=sums_buf)
|
||||
np.clip(sums_buf, 0, 255, out=sums_buf)
|
||||
np.copyto(out_uint8, sums_buf, casting="unsafe")
|
||||
return out_uint8
|
||||
|
||||
|
||||
def fallback_edge_to_leds(
|
||||
edge_pixels: np.ndarray,
|
||||
edge_name: str,
|
||||
led_count: int,
|
||||
calc_color: Callable[[np.ndarray], Any],
|
||||
) -> np.ndarray:
|
||||
"""Per-LED colour mapping for median / dominant modes.
|
||||
|
||||
Iterates LED segments and delegates colour reduction to ``calc_color``
|
||||
(which is e.g. ``np.median`` for median mode, ``_dominant_colour`` for
|
||||
dominant). Slower than ``average_edge_to_leds`` but supports any
|
||||
reducer over the segment's pixels.
|
||||
"""
|
||||
if edge_name in ("top", "bottom"):
|
||||
edge_len = edge_pixels.shape[1]
|
||||
else:
|
||||
edge_len = edge_pixels.shape[0]
|
||||
|
||||
step = edge_len / led_count
|
||||
result = np.empty((led_count, 3), dtype=np.uint8)
|
||||
|
||||
for i in range(led_count):
|
||||
start = int(i * step)
|
||||
end = max(start + 1, int((i + 1) * step))
|
||||
end = min(end, edge_len)
|
||||
|
||||
if edge_name in ("top", "bottom"):
|
||||
segment = edge_pixels[:, start:end, :]
|
||||
else:
|
||||
segment = edge_pixels[start:end, :, :]
|
||||
|
||||
result[i] = calc_color(segment)
|
||||
|
||||
return result
|
||||
Reference in New Issue
Block a user