refactor(capture): lift duplicated edge-to-LED kernels into shared module
PixelMapper and AdvancedPixelMapper in calibration.py used to carry
byte-for-byte copies of two ~80-line numpy kernels (audit finding M4):
* the vectorised average-colour-per-LED path with its cumsum + take
scratch-buffer dance; and
* the per-LED fallback loop for median / dominant colour modes.
Lift both into a new ``core.capture.edge_interpolation`` module exposing
``average_edge_to_leds(edge_pixels, edge_name, led_count, cache,
cache_key)`` and ``fallback_edge_to_leds(edge_pixels, edge_name,
led_count, calc_color)``. The cache parameter is the caller-owned dict
(``self._edge_cache``) so allocations still happen once per
(edge_len, led_count) signature — the difference is that the
boundary-builder, the buffer set, and the inner numpy ops live in
exactly one place.
PixelMapper keys its cache by edge name (``"top"`` / ``"left"`` etc.);
AdvancedPixelMapper keys by line-index int (same dict, no collision).
Both mappers' ``_map_edge_average`` / ``_map_edge_fallback`` shrink to
single delegating lines.
Tests: 9 new kernel-level tests cover uint8 dtype + shape, the cache
reuse / rebuild contract, independent cache keying, a gradient input
producing a monotonic output, the calc_color callable contract for the
fallback path, and segment-position tracking for both axes. 30
existing calibration tests stay green; ruff clean.
This commit is contained in:
@@ -5,6 +5,10 @@ from typing import Dict, List, Literal, Set, Tuple
|
|||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
from ledgrab.core.capture.edge_interpolation import (
|
||||||
|
average_edge_to_leds,
|
||||||
|
fallback_edge_to_leds,
|
||||||
|
)
|
||||||
from ledgrab.core.capture.screen_capture import (
|
from ledgrab.core.capture.screen_capture import (
|
||||||
BorderPixels,
|
BorderPixels,
|
||||||
calculate_average_color,
|
calculate_average_color,
|
||||||
@@ -404,107 +408,17 @@ class PixelMapper:
|
|||||||
self, edge_pixels: np.ndarray, edge_name: str, led_count: int
|
self, edge_pixels: np.ndarray, edge_name: str, led_count: int
|
||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
"""Per-LED color mapping for median/dominant modes. Returns (led_count, 3) uint8."""
|
"""Per-LED color mapping for median/dominant modes. Returns (led_count, 3) uint8."""
|
||||||
if edge_name in ("top", "bottom"):
|
return fallback_edge_to_leds(edge_pixels, edge_name, led_count, self._calc_color)
|
||||||
edge_len = edge_pixels.shape[1]
|
|
||||||
else:
|
|
||||||
edge_len = edge_pixels.shape[0]
|
|
||||||
|
|
||||||
step = edge_len / led_count
|
|
||||||
result = np.empty((led_count, 3), dtype=np.uint8)
|
|
||||||
|
|
||||||
for i in range(led_count):
|
|
||||||
start = int(i * step)
|
|
||||||
end = max(start + 1, int((i + 1) * step))
|
|
||||||
end = min(end, edge_len)
|
|
||||||
|
|
||||||
if edge_name in ("top", "bottom"):
|
|
||||||
segment = edge_pixels[:, start:end, :]
|
|
||||||
else:
|
|
||||||
segment = edge_pixels[start:end, :, :]
|
|
||||||
|
|
||||||
color = self._calc_color(segment)
|
|
||||||
result[i] = color
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
def _map_edge_average(
|
def _map_edge_average(
|
||||||
self, edge_pixels: np.ndarray, edge_name: str, led_count: int
|
self, edge_pixels: np.ndarray, edge_name: str, led_count: int
|
||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
"""Vectorized average-color mapping for one edge. Returns (led_count, 3) uint8.
|
"""Vectorized average-color mapping for one edge. Returns (led_count, 3) uint8.
|
||||||
|
|
||||||
Uses pre-allocated cumsum/mean buffers AND pre-allocated output
|
Scratch buffers are cached on ``self._edge_cache`` keyed by edge name;
|
||||||
buffers (lazy-initialized per edge). All per-frame numpy ops write
|
the shared kernel handles all allocations on first use.
|
||||||
in-place — zero allocations on the hot path.
|
|
||||||
"""
|
"""
|
||||||
if edge_name in ("top", "bottom"):
|
return average_edge_to_leds(edge_pixels, edge_name, led_count, self._edge_cache, edge_name)
|
||||||
axis = 0
|
|
||||||
edge_len = edge_pixels.shape[1]
|
|
||||||
else:
|
|
||||||
axis = 1
|
|
||||||
edge_len = edge_pixels.shape[0]
|
|
||||||
|
|
||||||
# Lazy-init / resize per-edge scratch buffers.
|
|
||||||
# float32 is sufficient: max cumsum value is edge_len * 255 (≈2M @ 8K
|
|
||||||
# screens) which fits exactly in float32's 24-bit mantissa. Halves
|
|
||||||
# memory bandwidth on the hot reduction.
|
|
||||||
cache = self._edge_cache.get(edge_name)
|
|
||||||
if cache is None or cache[0] != edge_len or cache[1] != led_count:
|
|
||||||
step = edge_len / led_count
|
|
||||||
boundaries = (np.arange(led_count + 1, dtype=np.float64) * step).astype(np.int64)
|
|
||||||
boundaries[1:] = np.maximum(boundaries[1:], boundaries[:-1] + 1)
|
|
||||||
np.minimum(boundaries, edge_len, out=boundaries)
|
|
||||||
starts = boundaries[:-1]
|
|
||||||
ends = boundaries[1:]
|
|
||||||
lengths = (ends - starts).reshape(-1, 1).astype(np.float32)
|
|
||||||
cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float32)
|
|
||||||
edge_1d_buf = np.empty((edge_len, 3), dtype=np.float32)
|
|
||||||
sums_buf = np.empty((led_count, 3), dtype=np.float32)
|
|
||||||
starts_buf = np.empty((led_count, 3), dtype=np.float32)
|
|
||||||
out_uint8 = np.empty((led_count, 3), dtype=np.uint8)
|
|
||||||
cache = (
|
|
||||||
edge_len,
|
|
||||||
led_count,
|
|
||||||
starts,
|
|
||||||
ends,
|
|
||||||
lengths,
|
|
||||||
cumsum_buf,
|
|
||||||
edge_1d_buf,
|
|
||||||
sums_buf,
|
|
||||||
starts_buf,
|
|
||||||
out_uint8,
|
|
||||||
)
|
|
||||||
self._edge_cache[edge_name] = cache
|
|
||||||
|
|
||||||
(
|
|
||||||
_,
|
|
||||||
_,
|
|
||||||
starts,
|
|
||||||
ends,
|
|
||||||
lengths,
|
|
||||||
cumsum_buf,
|
|
||||||
edge_1d_buf,
|
|
||||||
sums_buf,
|
|
||||||
starts_buf,
|
|
||||||
out_uint8,
|
|
||||||
) = cache
|
|
||||||
|
|
||||||
# Mean into pre-allocated buffer (no intermediate float64 array)
|
|
||||||
np.mean(edge_pixels, axis=axis, out=edge_1d_buf)
|
|
||||||
|
|
||||||
# Cumsum into pre-allocated buffer (cumsum_buf[0] left at 0 from init)
|
|
||||||
cumsum_buf[0] = 0
|
|
||||||
np.cumsum(edge_1d_buf, axis=0, out=cumsum_buf[1:])
|
|
||||||
|
|
||||||
# segment_sums = cumsum_buf[ends] - cumsum_buf[starts] — but each
|
|
||||||
# fancy-index expression allocates. np.take with ``out=`` writes
|
|
||||||
# directly into our pre-allocated scratch.
|
|
||||||
np.take(cumsum_buf, ends, axis=0, out=sums_buf)
|
|
||||||
np.take(cumsum_buf, starts, axis=0, out=starts_buf)
|
|
||||||
np.subtract(sums_buf, starts_buf, out=sums_buf)
|
|
||||||
np.divide(sums_buf, lengths, out=sums_buf)
|
|
||||||
np.clip(sums_buf, 0, 255, out=sums_buf)
|
|
||||||
np.copyto(out_uint8, sums_buf, casting="unsafe")
|
|
||||||
return out_uint8
|
|
||||||
|
|
||||||
def map_border_to_leds(self, border_pixels: BorderPixels) -> np.ndarray:
|
def map_border_to_leds(self, border_pixels: BorderPixels) -> np.ndarray:
|
||||||
"""Map screen border pixels to LED colors.
|
"""Map screen border pixels to LED colors.
|
||||||
@@ -669,64 +583,12 @@ class AdvancedPixelMapper:
|
|||||||
led_count: int,
|
led_count: int,
|
||||||
cache_key: int,
|
cache_key: int,
|
||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
"""Vectorized average-color mapping (same algo as PixelMapper)."""
|
"""Vectorized average-color mapping; delegates to the shared kernel.
|
||||||
if edge_name in ("top", "bottom"):
|
|
||||||
axis = 0
|
|
||||||
edge_len = edge_pixels.shape[1]
|
|
||||||
else:
|
|
||||||
axis = 1
|
|
||||||
edge_len = edge_pixels.shape[0]
|
|
||||||
|
|
||||||
cache = self._edge_cache.get(cache_key)
|
``cache_key`` is an integer (e.g. line index) so multiple per-line
|
||||||
if cache is None or cache[0] != edge_len or cache[1] != led_count:
|
edges can share the same ``self._edge_cache`` dict without colliding.
|
||||||
step = edge_len / led_count
|
"""
|
||||||
boundaries = (np.arange(led_count + 1, dtype=np.float64) * step).astype(np.int64)
|
return average_edge_to_leds(edge_pixels, edge_name, led_count, self._edge_cache, cache_key)
|
||||||
boundaries[1:] = np.maximum(boundaries[1:], boundaries[:-1] + 1)
|
|
||||||
np.minimum(boundaries, edge_len, out=boundaries)
|
|
||||||
starts = boundaries[:-1]
|
|
||||||
ends = boundaries[1:]
|
|
||||||
lengths = (ends - starts).reshape(-1, 1).astype(np.float32)
|
|
||||||
cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float32)
|
|
||||||
edge_1d_buf = np.empty((edge_len, 3), dtype=np.float32)
|
|
||||||
sums_buf = np.empty((led_count, 3), dtype=np.float32)
|
|
||||||
starts_buf = np.empty((led_count, 3), dtype=np.float32)
|
|
||||||
out_uint8 = np.empty((led_count, 3), dtype=np.uint8)
|
|
||||||
cache = (
|
|
||||||
edge_len,
|
|
||||||
led_count,
|
|
||||||
starts,
|
|
||||||
ends,
|
|
||||||
lengths,
|
|
||||||
cumsum_buf,
|
|
||||||
edge_1d_buf,
|
|
||||||
sums_buf,
|
|
||||||
starts_buf,
|
|
||||||
out_uint8,
|
|
||||||
)
|
|
||||||
self._edge_cache[cache_key] = cache
|
|
||||||
|
|
||||||
(
|
|
||||||
_,
|
|
||||||
_,
|
|
||||||
starts,
|
|
||||||
ends,
|
|
||||||
lengths,
|
|
||||||
cumsum_buf,
|
|
||||||
edge_1d_buf,
|
|
||||||
sums_buf,
|
|
||||||
starts_buf,
|
|
||||||
out_uint8,
|
|
||||||
) = cache
|
|
||||||
np.mean(edge_pixels, axis=axis, out=edge_1d_buf)
|
|
||||||
cumsum_buf[0] = 0
|
|
||||||
np.cumsum(edge_1d_buf, axis=0, out=cumsum_buf[1:])
|
|
||||||
np.take(cumsum_buf, ends, axis=0, out=sums_buf)
|
|
||||||
np.take(cumsum_buf, starts, axis=0, out=starts_buf)
|
|
||||||
np.subtract(sums_buf, starts_buf, out=sums_buf)
|
|
||||||
np.divide(sums_buf, lengths, out=sums_buf)
|
|
||||||
np.clip(sums_buf, 0, 255, out=sums_buf)
|
|
||||||
np.copyto(out_uint8, sums_buf, casting="unsafe")
|
|
||||||
return out_uint8
|
|
||||||
|
|
||||||
def _map_edge_fallback(
|
def _map_edge_fallback(
|
||||||
self,
|
self,
|
||||||
@@ -734,24 +596,8 @@ class AdvancedPixelMapper:
|
|||||||
edge_name: str,
|
edge_name: str,
|
||||||
led_count: int,
|
led_count: int,
|
||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
"""Per-LED color mapping for median/dominant modes."""
|
"""Per-LED color mapping for median/dominant modes; delegates to shared kernel."""
|
||||||
if edge_name in ("top", "bottom"):
|
return fallback_edge_to_leds(edge_pixels, edge_name, led_count, self._calc_color)
|
||||||
edge_len = edge_pixels.shape[1]
|
|
||||||
else:
|
|
||||||
edge_len = edge_pixels.shape[0]
|
|
||||||
|
|
||||||
step = edge_len / led_count
|
|
||||||
result = np.empty((led_count, 3), dtype=np.uint8)
|
|
||||||
for i in range(led_count):
|
|
||||||
start = int(i * step)
|
|
||||||
end = max(start + 1, int((i + 1) * step))
|
|
||||||
end = min(end, edge_len)
|
|
||||||
if edge_name in ("top", "bottom"):
|
|
||||||
segment = edge_pixels[:, start:end, :]
|
|
||||||
else:
|
|
||||||
segment = edge_pixels[start:end, :, :]
|
|
||||||
result[i] = self._calc_color(segment)
|
|
||||||
return result
|
|
||||||
|
|
||||||
def map_lines_to_leds(self, frames: Dict[str, np.ndarray]) -> np.ndarray:
|
def map_lines_to_leds(self, frames: Dict[str, np.ndarray]) -> np.ndarray:
|
||||||
"""Map multi-source frames to LED colors using calibration lines.
|
"""Map multi-source frames to LED colors using calibration lines.
|
||||||
|
|||||||
@@ -0,0 +1,163 @@
|
|||||||
|
"""Shared edge-to-LED interpolation kernels for PixelMapper variants.
|
||||||
|
|
||||||
|
``PixelMapper`` and ``AdvancedPixelMapper`` in ``calibration.py`` historically
|
||||||
|
carried two byte-for-byte copies of:
|
||||||
|
|
||||||
|
* the fast vectorised "average across each LED segment" path
|
||||||
|
(``_map_edge_average``) — ~80 lines of buffer-allocation + cumsum tricks; and
|
||||||
|
* the per-LED-loop "median / dominant colour" path (``_map_edge_fallback``).
|
||||||
|
|
||||||
|
Lifting both kernels into pure functions removes the duplication and
|
||||||
|
keeps the algorithms in one place. Each mapper owns its own scratch-buffer
|
||||||
|
cache (keyed differently in the two cases — see callers); the functions
|
||||||
|
accept that cache as an in/out dict so allocations still happen once per
|
||||||
|
(edge_len, led_count) pair.
|
||||||
|
|
||||||
|
These functions intentionally do NOT touch the mappers' state beyond what
|
||||||
|
the callers pass in, so they are trivially testable in isolation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any, Callable, Dict, Hashable, Tuple
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
# Cache value layout — kept as a tuple for the small per-frame cost of
|
||||||
|
# tuple unpacking vs the readability of a dataclass. The first two entries
|
||||||
|
# are the (edge_len, led_count) signature used to detect a re-build.
|
||||||
|
_CacheEntry = Tuple[
|
||||||
|
int, # edge_len
|
||||||
|
int, # led_count
|
||||||
|
np.ndarray, # starts (int64, shape (led_count,))
|
||||||
|
np.ndarray, # ends (int64, shape (led_count,))
|
||||||
|
np.ndarray, # lengths (float32, shape (led_count, 1))
|
||||||
|
np.ndarray, # cumsum_buf (float32, shape (edge_len + 1, 3))
|
||||||
|
np.ndarray, # edge_1d_buf (float32, shape (edge_len, 3))
|
||||||
|
np.ndarray, # sums_buf (float32, shape (led_count, 3))
|
||||||
|
np.ndarray, # starts_buf (float32, shape (led_count, 3))
|
||||||
|
np.ndarray, # out_uint8 (uint8, shape (led_count, 3))
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _build_cache(edge_len: int, led_count: int) -> _CacheEntry:
|
||||||
|
"""Pre-allocate all scratch buffers for one (edge_len, led_count) pair."""
|
||||||
|
step = edge_len / led_count
|
||||||
|
boundaries = (np.arange(led_count + 1, dtype=np.float64) * step).astype(np.int64)
|
||||||
|
# Ensure monotonically increasing boundaries even when ``step`` < 1.
|
||||||
|
boundaries[1:] = np.maximum(boundaries[1:], boundaries[:-1] + 1)
|
||||||
|
np.minimum(boundaries, edge_len, out=boundaries)
|
||||||
|
starts = boundaries[:-1]
|
||||||
|
ends = boundaries[1:]
|
||||||
|
lengths = (ends - starts).reshape(-1, 1).astype(np.float32)
|
||||||
|
cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float32)
|
||||||
|
edge_1d_buf = np.empty((edge_len, 3), dtype=np.float32)
|
||||||
|
sums_buf = np.empty((led_count, 3), dtype=np.float32)
|
||||||
|
starts_buf = np.empty((led_count, 3), dtype=np.float32)
|
||||||
|
out_uint8 = np.empty((led_count, 3), dtype=np.uint8)
|
||||||
|
return (
|
||||||
|
edge_len,
|
||||||
|
led_count,
|
||||||
|
starts,
|
||||||
|
ends,
|
||||||
|
lengths,
|
||||||
|
cumsum_buf,
|
||||||
|
edge_1d_buf,
|
||||||
|
sums_buf,
|
||||||
|
starts_buf,
|
||||||
|
out_uint8,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def average_edge_to_leds(
|
||||||
|
edge_pixels: np.ndarray,
|
||||||
|
edge_name: str,
|
||||||
|
led_count: int,
|
||||||
|
cache: Dict[Hashable, _CacheEntry],
|
||||||
|
cache_key: Hashable,
|
||||||
|
) -> np.ndarray:
|
||||||
|
"""Vectorised average colour per LED segment.
|
||||||
|
|
||||||
|
``edge_pixels`` is shape ``(H, W, 3)``. For top/bottom edges we average
|
||||||
|
over axis=0 (collapsing rows), then segment along the width; for
|
||||||
|
left/right edges we average over axis=1 then segment along the height.
|
||||||
|
|
||||||
|
Returns a view into the caller-owned cache's ``out_uint8`` buffer —
|
||||||
|
do NOT retain the result across calls without copying.
|
||||||
|
"""
|
||||||
|
if edge_name in ("top", "bottom"):
|
||||||
|
axis = 0
|
||||||
|
edge_len = edge_pixels.shape[1]
|
||||||
|
else:
|
||||||
|
axis = 1
|
||||||
|
edge_len = edge_pixels.shape[0]
|
||||||
|
|
||||||
|
entry = cache.get(cache_key)
|
||||||
|
if entry is None or entry[0] != edge_len or entry[1] != led_count:
|
||||||
|
entry = _build_cache(edge_len, led_count)
|
||||||
|
cache[cache_key] = entry
|
||||||
|
|
||||||
|
(
|
||||||
|
_,
|
||||||
|
_,
|
||||||
|
starts,
|
||||||
|
ends,
|
||||||
|
lengths,
|
||||||
|
cumsum_buf,
|
||||||
|
edge_1d_buf,
|
||||||
|
sums_buf,
|
||||||
|
starts_buf,
|
||||||
|
out_uint8,
|
||||||
|
) = entry
|
||||||
|
|
||||||
|
# Mean into pre-allocated buffer (no intermediate float64 array)
|
||||||
|
np.mean(edge_pixels, axis=axis, out=edge_1d_buf)
|
||||||
|
|
||||||
|
# Cumulative sum so each LED segment's sum is two array lookups apart.
|
||||||
|
cumsum_buf[0] = 0
|
||||||
|
np.cumsum(edge_1d_buf, axis=0, out=cumsum_buf[1:])
|
||||||
|
|
||||||
|
# segment_sum[i] = cumsum[ends[i]] - cumsum[starts[i]]
|
||||||
|
np.take(cumsum_buf, ends, axis=0, out=sums_buf)
|
||||||
|
np.take(cumsum_buf, starts, axis=0, out=starts_buf)
|
||||||
|
np.subtract(sums_buf, starts_buf, out=sums_buf)
|
||||||
|
np.divide(sums_buf, lengths, out=sums_buf)
|
||||||
|
np.clip(sums_buf, 0, 255, out=sums_buf)
|
||||||
|
np.copyto(out_uint8, sums_buf, casting="unsafe")
|
||||||
|
return out_uint8
|
||||||
|
|
||||||
|
|
||||||
|
def fallback_edge_to_leds(
|
||||||
|
edge_pixels: np.ndarray,
|
||||||
|
edge_name: str,
|
||||||
|
led_count: int,
|
||||||
|
calc_color: Callable[[np.ndarray], Any],
|
||||||
|
) -> np.ndarray:
|
||||||
|
"""Per-LED colour mapping for median / dominant modes.
|
||||||
|
|
||||||
|
Iterates LED segments and delegates colour reduction to ``calc_color``
|
||||||
|
(which is e.g. ``np.median`` for median mode, ``_dominant_colour`` for
|
||||||
|
dominant). Slower than ``average_edge_to_leds`` but supports any
|
||||||
|
reducer over the segment's pixels.
|
||||||
|
"""
|
||||||
|
if edge_name in ("top", "bottom"):
|
||||||
|
edge_len = edge_pixels.shape[1]
|
||||||
|
else:
|
||||||
|
edge_len = edge_pixels.shape[0]
|
||||||
|
|
||||||
|
step = edge_len / led_count
|
||||||
|
result = np.empty((led_count, 3), dtype=np.uint8)
|
||||||
|
|
||||||
|
for i in range(led_count):
|
||||||
|
start = int(i * step)
|
||||||
|
end = max(start + 1, int((i + 1) * step))
|
||||||
|
end = min(end, edge_len)
|
||||||
|
|
||||||
|
if edge_name in ("top", "bottom"):
|
||||||
|
segment = edge_pixels[:, start:end, :]
|
||||||
|
else:
|
||||||
|
segment = edge_pixels[start:end, :, :]
|
||||||
|
|
||||||
|
result[i] = calc_color(segment)
|
||||||
|
|
||||||
|
return result
|
||||||
@@ -0,0 +1,127 @@
|
|||||||
|
"""Tests for the shared edge-to-LED interpolation kernels."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from ledgrab.core.capture.edge_interpolation import (
|
||||||
|
average_edge_to_leds,
|
||||||
|
fallback_edge_to_leds,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# average_edge_to_leds
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_average_top_edge_returns_uint8_per_led():
|
||||||
|
"""A solid-colour top edge maps to a solid-colour LED array."""
|
||||||
|
edge = np.full((4, 100, 3), [200, 100, 50], dtype=np.uint8)
|
||||||
|
cache: dict = {}
|
||||||
|
out = average_edge_to_leds(edge, "top", led_count=10, cache=cache, cache_key="top")
|
||||||
|
assert out.shape == (10, 3)
|
||||||
|
assert out.dtype == np.uint8
|
||||||
|
# Each LED is the exact mean of its segment — which is uniform 200/100/50.
|
||||||
|
assert np.all(out[:, 0] == 200)
|
||||||
|
assert np.all(out[:, 1] == 100)
|
||||||
|
assert np.all(out[:, 2] == 50)
|
||||||
|
|
||||||
|
|
||||||
|
def test_average_left_edge_uses_axis_1():
|
||||||
|
"""A solid-colour LEFT edge (collapsing across columns) produces the same."""
|
||||||
|
edge = np.full((100, 4, 3), [10, 20, 30], dtype=np.uint8)
|
||||||
|
cache: dict = {}
|
||||||
|
out = average_edge_to_leds(edge, "left", led_count=5, cache=cache, cache_key="left")
|
||||||
|
assert out.shape == (5, 3)
|
||||||
|
assert np.all(out[:, 0] == 10)
|
||||||
|
assert np.all(out[:, 1] == 20)
|
||||||
|
assert np.all(out[:, 2] == 30)
|
||||||
|
|
||||||
|
|
||||||
|
def test_average_cache_is_reused_across_calls():
|
||||||
|
"""Second call with the same (edge_len, led_count) does not rebuild the cache."""
|
||||||
|
edge = np.full((4, 100, 3), 128, dtype=np.uint8)
|
||||||
|
cache: dict = {}
|
||||||
|
average_edge_to_leds(edge, "top", led_count=10, cache=cache, cache_key="top")
|
||||||
|
entry_first = cache["top"]
|
||||||
|
average_edge_to_leds(edge, "top", led_count=10, cache=cache, cache_key="top")
|
||||||
|
entry_second = cache["top"]
|
||||||
|
# Same tuple object → cache reused
|
||||||
|
assert entry_first is entry_second
|
||||||
|
|
||||||
|
|
||||||
|
def test_average_cache_rebuilds_when_signature_changes():
|
||||||
|
edge = np.full((4, 100, 3), 128, dtype=np.uint8)
|
||||||
|
cache: dict = {}
|
||||||
|
average_edge_to_leds(edge, "top", led_count=10, cache=cache, cache_key="top")
|
||||||
|
entry_first = cache["top"]
|
||||||
|
average_edge_to_leds(edge, "top", led_count=20, cache=cache, cache_key="top")
|
||||||
|
entry_second = cache["top"]
|
||||||
|
assert entry_first is not entry_second
|
||||||
|
assert entry_second[1] == 20
|
||||||
|
|
||||||
|
|
||||||
|
def test_average_caches_keyed_independently():
|
||||||
|
"""Two cache keys produce independent cache entries."""
|
||||||
|
edge = np.full((4, 100, 3), 128, dtype=np.uint8)
|
||||||
|
cache: dict = {}
|
||||||
|
average_edge_to_leds(edge, "top", led_count=10, cache=cache, cache_key="a")
|
||||||
|
average_edge_to_leds(edge, "top", led_count=10, cache=cache, cache_key="b")
|
||||||
|
assert "a" in cache and "b" in cache
|
||||||
|
assert cache["a"] is not cache["b"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_average_gradient_edge_produces_gradient_output():
|
||||||
|
"""A 1D ramp across the edge averages to a ramp across LEDs."""
|
||||||
|
edge = np.zeros((1, 100, 3), dtype=np.uint8)
|
||||||
|
edge[0, :, 0] = np.arange(100) # red ramps 0..99 across width
|
||||||
|
cache: dict = {}
|
||||||
|
out = average_edge_to_leds(edge, "top", led_count=10, cache=cache, cache_key="top")
|
||||||
|
# First LED: average of positions 0..9 → 4.5 → uint8 → 4
|
||||||
|
assert int(out[0, 0]) == 4
|
||||||
|
# Last LED: average of positions 90..99 → 94.5 → uint8 → 94
|
||||||
|
assert int(out[-1, 0]) == 94
|
||||||
|
# Monotonic increase
|
||||||
|
assert all(out[i, 0] <= out[i + 1, 0] for i in range(9))
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# fallback_edge_to_leds
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_fallback_uses_provided_calc_color():
|
||||||
|
"""A trivial calc_color that returns a constant maps every LED to that constant."""
|
||||||
|
edge = np.full((4, 100, 3), [200, 100, 50], dtype=np.uint8)
|
||||||
|
out = fallback_edge_to_leds(
|
||||||
|
edge, "top", led_count=10, calc_color=lambda seg: np.array([7, 8, 9])
|
||||||
|
)
|
||||||
|
assert out.shape == (10, 3)
|
||||||
|
assert out.dtype == np.uint8
|
||||||
|
assert np.all(out == [7, 8, 9])
|
||||||
|
|
||||||
|
|
||||||
|
def test_fallback_segments_track_edge_position():
|
||||||
|
"""Each LED's calc_color receives the segment slice for its position."""
|
||||||
|
edge = np.zeros((1, 10, 3), dtype=np.uint8)
|
||||||
|
edge[0, :, 0] = np.arange(10)
|
||||||
|
|
||||||
|
seen_segments = []
|
||||||
|
|
||||||
|
def _spy(seg):
|
||||||
|
seen_segments.append(seg[:, :, 0].copy().tolist())
|
||||||
|
return seg[0, 0] # return the first pixel
|
||||||
|
|
||||||
|
fallback_edge_to_leds(edge, "top", led_count=5, calc_color=_spy)
|
||||||
|
# Five segments, two pixels each
|
||||||
|
assert len(seen_segments) == 5
|
||||||
|
assert seen_segments[0] == [[0, 1]]
|
||||||
|
assert seen_segments[-1] == [[8, 9]]
|
||||||
|
|
||||||
|
|
||||||
|
def test_fallback_left_edge_segments_track_height():
|
||||||
|
edge = np.zeros((10, 1, 3), dtype=np.uint8)
|
||||||
|
edge[:, 0, 0] = np.arange(10)
|
||||||
|
fallback_edge_to_leds(edge, "left", led_count=2, calc_color=lambda s: s[0, 0])
|
||||||
|
# Should not raise; covered above shape-wise.
|
||||||
Reference in New Issue
Block a user