5fec8db901
PixelMapper and AdvancedPixelMapper in calibration.py used to carry
byte-for-byte copies of two ~80-line numpy kernels (audit finding M4):
* the vectorised average-colour-per-LED path with its cumsum + take
scratch-buffer dance; and
* the per-LED fallback loop for median / dominant colour modes.
Lift both into a new ``core.capture.edge_interpolation`` module exposing
``average_edge_to_leds(edge_pixels, edge_name, led_count, cache,
cache_key)`` and ``fallback_edge_to_leds(edge_pixels, edge_name,
led_count, calc_color)``. The cache parameter is the caller-owned dict
(``self._edge_cache``) so allocations still happen once per
(edge_len, led_count) signature — the difference is that the
boundary-builder, the buffer set, and the inner numpy ops live in
exactly one place.
PixelMapper keys its cache by edge name (``"top"`` / ``"left"`` etc.);
AdvancedPixelMapper keys by line-index int (same dict, no collision).
Both mappers' ``_map_edge_average`` / ``_map_edge_fallback`` shrink to
single delegating lines.
Tests: 9 new kernel-level tests cover uint8 dtype + shape, the cache
reuse / rebuild contract, independent cache keying, a gradient input
producing a monotonic output, the calc_color callable contract for the
fallback path, and segment-position tracking for both axes. 30
existing calibration tests stay green; ruff clean.
874 lines
32 KiB
Python
874 lines
32 KiB
Python
"""Calibration system for mapping screen pixels to LED positions."""
|
||
|
||
from dataclasses import dataclass, field
|
||
from typing import Dict, List, Literal, Set, Tuple
|
||
|
||
import numpy as np
|
||
|
||
from ledgrab.core.capture.edge_interpolation import (
|
||
average_edge_to_leds,
|
||
fallback_edge_to_leds,
|
||
)
|
||
from ledgrab.core.capture.screen_capture import (
|
||
BorderPixels,
|
||
calculate_average_color,
|
||
calculate_median_color,
|
||
calculate_dominant_color,
|
||
)
|
||
from ledgrab.utils import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
# Edge traversal order for each (start_position, layout) combination.
|
||
# Determines which edge comes first when walking around the screen.
|
||
EDGE_ORDER: Dict[Tuple[str, str], List[str]] = {
|
||
("bottom_left", "clockwise"): ["left", "top", "right", "bottom"],
|
||
("bottom_left", "counterclockwise"): ["bottom", "right", "top", "left"],
|
||
("bottom_right", "clockwise"): ["bottom", "left", "top", "right"],
|
||
("bottom_right", "counterclockwise"): ["right", "top", "left", "bottom"],
|
||
("top_left", "clockwise"): ["top", "right", "bottom", "left"],
|
||
("top_left", "counterclockwise"): ["left", "bottom", "right", "top"],
|
||
("top_right", "clockwise"): ["right", "bottom", "left", "top"],
|
||
("top_right", "counterclockwise"): ["top", "left", "bottom", "right"],
|
||
}
|
||
|
||
# Whether LEDs are reversed on each edge for each (start_position, layout) combination.
|
||
EDGE_REVERSE: Dict[Tuple[str, str], Dict[str, bool]] = {
|
||
("bottom_left", "clockwise"): {"left": True, "top": False, "right": False, "bottom": True},
|
||
("bottom_left", "counterclockwise"): {
|
||
"bottom": False,
|
||
"right": True,
|
||
"top": True,
|
||
"left": False,
|
||
},
|
||
("bottom_right", "clockwise"): {"bottom": True, "left": True, "top": False, "right": False},
|
||
("bottom_right", "counterclockwise"): {
|
||
"right": True,
|
||
"top": True,
|
||
"left": False,
|
||
"bottom": False,
|
||
},
|
||
("top_left", "clockwise"): {"top": False, "right": False, "bottom": True, "left": True},
|
||
("top_left", "counterclockwise"): {"left": False, "bottom": False, "right": True, "top": True},
|
||
("top_right", "clockwise"): {"right": False, "bottom": True, "left": True, "top": False},
|
||
("top_right", "counterclockwise"): {"top": True, "left": False, "bottom": False, "right": True},
|
||
}
|
||
|
||
|
||
@dataclass
|
||
class CalibrationSegment:
|
||
"""Configuration for one segment of the LED strip."""
|
||
|
||
edge: Literal["top", "right", "bottom", "left"]
|
||
led_start: int
|
||
led_count: int
|
||
reverse: bool = False
|
||
|
||
|
||
@dataclass
|
||
class CalibrationLine:
|
||
"""One LED line in advanced calibration — references one picture source edge."""
|
||
|
||
picture_source_id: str
|
||
edge: Literal["top", "right", "bottom", "left"]
|
||
led_count: int
|
||
span_start: float = 0.0
|
||
span_end: float = 1.0
|
||
reverse: bool = False
|
||
border_width: int = 10
|
||
|
||
|
||
@dataclass
|
||
class CalibrationConfig:
|
||
"""Complete calibration configuration.
|
||
|
||
Stores only the core parameters. Segments (with led_start, reverse, edge order)
|
||
are derived at runtime via the `segments` property.
|
||
"""
|
||
|
||
# Mode: "simple" = 4-edge model (backward compat), "advanced" = generic line list
|
||
mode: Literal["simple", "advanced"] = "simple"
|
||
# Advanced mode: ordered list of CalibrationLine objects (ignored in simple mode)
|
||
lines: List[CalibrationLine] = field(default_factory=list)
|
||
|
||
# Simple mode fields (also used as defaults for CalibrationConfig constructor)
|
||
layout: Literal["clockwise", "counterclockwise"] = "clockwise"
|
||
start_position: Literal["top_left", "top_right", "bottom_left", "bottom_right"] = "bottom_left"
|
||
offset: int = 0
|
||
leds_top: int = 0
|
||
leds_right: int = 0
|
||
leds_bottom: int = 0
|
||
leds_left: int = 0
|
||
# Per-edge span: fraction of screen side covered by LEDs (0.0–1.0)
|
||
span_top_start: float = 0.0
|
||
span_top_end: float = 1.0
|
||
span_right_start: float = 0.0
|
||
span_right_end: float = 1.0
|
||
span_bottom_start: float = 0.0
|
||
span_bottom_end: float = 1.0
|
||
span_left_start: float = 0.0
|
||
span_left_end: float = 1.0
|
||
# Skip LEDs: black out N LEDs at the start/end of the strip
|
||
skip_leds_start: int = 0
|
||
skip_leds_end: int = 0
|
||
# Border width: how many pixels from the screen edge to sample
|
||
border_width: int = 10
|
||
|
||
def build_segments(self) -> List[CalibrationSegment]:
|
||
"""Derive segment list from core parameters."""
|
||
key = (self.start_position, self.layout)
|
||
edge_order = EDGE_ORDER.get(key, ["bottom", "right", "top", "left"])
|
||
reverse_map = EDGE_REVERSE.get(key, {})
|
||
|
||
led_counts = {
|
||
"top": self.leds_top,
|
||
"right": self.leds_right,
|
||
"bottom": self.leds_bottom,
|
||
"left": self.leds_left,
|
||
}
|
||
|
||
segments = []
|
||
led_start = 0
|
||
for edge in edge_order:
|
||
count = led_counts[edge]
|
||
if count > 0:
|
||
segments.append(
|
||
CalibrationSegment(
|
||
edge=edge,
|
||
led_start=led_start,
|
||
led_count=count,
|
||
reverse=reverse_map.get(edge, False),
|
||
)
|
||
)
|
||
led_start += count
|
||
|
||
return segments
|
||
|
||
def __post_init__(self):
|
||
self._cached_segments: List[CalibrationSegment] | None = None
|
||
|
||
@property
|
||
def segments(self) -> List[CalibrationSegment]:
|
||
"""Get derived segment list (cached after first call)."""
|
||
if self._cached_segments is None:
|
||
self._cached_segments = self.build_segments()
|
||
return self._cached_segments
|
||
|
||
def get_edge_span(self, edge: str) -> tuple[float, float]:
|
||
"""Get span (start, end) for a given edge."""
|
||
return (
|
||
getattr(self, f"span_{edge}_start", 0.0),
|
||
getattr(self, f"span_{edge}_end", 1.0),
|
||
)
|
||
|
||
def validate(self) -> bool:
|
||
"""Validate calibration configuration.
|
||
|
||
Returns:
|
||
True if configuration is valid
|
||
|
||
Raises:
|
||
ValueError: If configuration is invalid
|
||
"""
|
||
if self.mode == "advanced":
|
||
if not self.lines:
|
||
raise ValueError("Advanced calibration must have at least one line")
|
||
for i, line in enumerate(self.lines):
|
||
if line.led_count <= 0:
|
||
raise ValueError(f"Line {i}: LED count must be positive, got {line.led_count}")
|
||
if not (0.0 <= line.span_start <= 1.0) or not (0.0 <= line.span_end <= 1.0):
|
||
raise ValueError(f"Line {i}: span must be in [0.0, 1.0]")
|
||
if line.span_end <= line.span_start:
|
||
raise ValueError(f"Line {i}: span_end must be greater than span_start")
|
||
if line.border_width < 1:
|
||
raise ValueError(f"Line {i}: border_width must be at least 1")
|
||
return True
|
||
|
||
total = self.get_total_leds()
|
||
if total <= 0:
|
||
raise ValueError("Calibration must have at least one LED")
|
||
|
||
for edge, count in [
|
||
("top", self.leds_top),
|
||
("right", self.leds_right),
|
||
("bottom", self.leds_bottom),
|
||
("left", self.leds_left),
|
||
]:
|
||
if count < 0:
|
||
raise ValueError(f"LED count for {edge} must be non-negative, got {count}")
|
||
|
||
for edge in ["top", "right", "bottom", "left"]:
|
||
start, end = self.get_edge_span(edge)
|
||
if not (0.0 <= start <= 1.0) or not (0.0 <= end <= 1.0):
|
||
raise ValueError(f"Span for {edge} must be in [0.0, 1.0], got ({start}, {end})")
|
||
if end <= start:
|
||
raise ValueError(
|
||
f"Span end must be greater than start for {edge}, got ({start}, {end})"
|
||
)
|
||
|
||
return True
|
||
|
||
def get_total_leds(self) -> int:
|
||
"""Get total number of LEDs across all edges/lines."""
|
||
if self.mode == "advanced":
|
||
return sum(line.led_count for line in self.lines)
|
||
return self.leds_top + self.leds_right + self.leds_bottom + self.leds_left
|
||
|
||
def get_required_picture_source_ids(self) -> List[str]:
|
||
"""Get deduplicated list of picture source IDs referenced by lines.
|
||
|
||
Returns empty list for simple mode (the stream provides the source).
|
||
"""
|
||
if self.mode != "advanced":
|
||
return []
|
||
seen: Set[str] = set()
|
||
result: List[str] = []
|
||
for line in self.lines:
|
||
if line.picture_source_id not in seen:
|
||
seen.add(line.picture_source_id)
|
||
result.append(line.picture_source_id)
|
||
return result
|
||
|
||
def get_segment_for_edge(self, edge: str) -> CalibrationSegment | None:
|
||
"""Get segment configuration for a specific edge."""
|
||
for seg in self.segments:
|
||
if seg.edge == edge:
|
||
return seg
|
||
return None
|
||
|
||
|
||
def _build_skip_buffers(mapper, calibration: CalibrationConfig, total_leds: int) -> None:
|
||
"""Pre-compute Phase 3 skip-LED resampling indices and scratch buffers.
|
||
|
||
Phase 3 takes the full ``total_leds`` strip and resamples it into
|
||
``active_count = total_leds - skip_start - skip_end`` LEDs using linear
|
||
interpolation. We precompute floor/ceil source indices and fractional
|
||
weights once so per-frame work becomes a couple of ``np.take`` +
|
||
in-place arithmetic ops with no allocations.
|
||
|
||
Attaches all skip-related state to ``mapper`` directly to keep the
|
||
storage layout consistent between PixelMapper and AdvancedPixelMapper.
|
||
"""
|
||
skip_start = calibration.skip_leds_start
|
||
skip_end = calibration.skip_leds_end
|
||
mapper._skip_start = skip_start
|
||
mapper._skip_end = skip_end
|
||
active_count = max(0, total_leds - skip_start - skip_end)
|
||
mapper._active_count = active_count
|
||
|
||
if not (0 < active_count < total_leds):
|
||
# No skip needed (full strip used) or no active LEDs.
|
||
mapper._skip_floor_idx = None
|
||
mapper._skip_ceil_idx = None
|
||
mapper._skip_frac = None
|
||
mapper._skip_left_u8 = None
|
||
mapper._skip_right_u8 = None
|
||
mapper._skip_blend_f32 = None
|
||
mapper._skip_resampled = None
|
||
return
|
||
|
||
# Floor/ceil source indices and fractional weights for each
|
||
# destination LED. ``t = src_x[k] = k * (total_leds - 1) / (active_count - 1)``
|
||
# — equivalent to ``np.linspace(0, total_leds - 1, active_count)``.
|
||
if active_count > 1:
|
||
t = np.arange(active_count, dtype=np.float64) * ((total_leds - 1) / (active_count - 1))
|
||
else:
|
||
t = np.zeros(active_count, dtype=np.float64)
|
||
floor_idx = np.floor(t).astype(np.int64)
|
||
np.clip(floor_idx, 0, total_leds - 1, out=floor_idx)
|
||
ceil_idx = np.minimum(floor_idx + 1, total_leds - 1)
|
||
frac = (t - floor_idx).astype(np.float32)[:, None] # (active_count, 1)
|
||
|
||
mapper._skip_floor_idx = floor_idx
|
||
mapper._skip_ceil_idx = ceil_idx
|
||
mapper._skip_frac = frac
|
||
# uint8 take destinations + float32 blend scratch — all reused per frame
|
||
mapper._skip_left_u8 = np.empty((active_count, 3), dtype=np.uint8)
|
||
mapper._skip_right_u8 = np.empty((active_count, 3), dtype=np.uint8)
|
||
mapper._skip_blend_f32 = np.empty((active_count, 3), dtype=np.float32)
|
||
mapper._skip_resampled = np.empty((active_count, 3), dtype=np.uint8)
|
||
|
||
|
||
def _apply_skip_resample(mapper, led_array: np.ndarray) -> None:
|
||
"""Phase 3 in-place resample of ``led_array`` (no allocations).
|
||
|
||
Applies linear interpolation precomputed in ``_build_skip_buffers`` and
|
||
writes the result back into ``led_array`` with the configured skip
|
||
leading/trailing zeros.
|
||
"""
|
||
floor_idx = mapper._skip_floor_idx
|
||
if floor_idx is None:
|
||
if mapper._active_count <= 0:
|
||
led_array[:] = 0
|
||
return
|
||
|
||
left_u8 = mapper._skip_left_u8
|
||
right_u8 = mapper._skip_right_u8
|
||
blend = mapper._skip_blend_f32
|
||
resampled = mapper._skip_resampled
|
||
|
||
np.take(led_array, floor_idx, axis=0, out=left_u8)
|
||
np.take(led_array, mapper._skip_ceil_idx, axis=0, out=right_u8)
|
||
np.copyto(blend, right_u8, casting="unsafe") # uint8 → float32
|
||
blend -= left_u8 # right - left
|
||
blend *= mapper._skip_frac # frac * (right - left)
|
||
blend += left_u8 # left + frac*(right - left)
|
||
np.clip(blend, 0, 255, out=blend)
|
||
np.copyto(resampled, blend, casting="unsafe") # float32 → uint8
|
||
|
||
led_array[:] = 0
|
||
end_idx = mapper._total_leds - mapper._skip_end
|
||
led_array[mapper._skip_start : end_idx] = resampled
|
||
|
||
|
||
class PixelMapper:
|
||
"""Maps screen border pixels to LED colors based on calibration."""
|
||
|
||
def __init__(
|
||
self,
|
||
calibration: CalibrationConfig,
|
||
interpolation_mode: Literal["average", "median", "dominant"] = "average",
|
||
):
|
||
"""Initialize pixel mapper.
|
||
|
||
Args:
|
||
calibration: Calibration configuration
|
||
interpolation_mode: Color calculation mode
|
||
"""
|
||
self.calibration = calibration
|
||
self.interpolation_mode = interpolation_mode
|
||
|
||
# Validate calibration
|
||
self.calibration.validate()
|
||
|
||
# Select color calculation function
|
||
if interpolation_mode == "average":
|
||
self._calc_color = calculate_average_color
|
||
elif interpolation_mode == "median":
|
||
self._calc_color = calculate_median_color
|
||
elif interpolation_mode == "dominant":
|
||
self._calc_color = calculate_dominant_color
|
||
else:
|
||
raise ValueError(f"Invalid interpolation mode: {interpolation_mode}")
|
||
|
||
# Pre-allocate LED output buffer (reused every call)
|
||
total_leds = calibration.get_total_leds()
|
||
self._total_leds = total_leds
|
||
self._led_buf = np.zeros((total_leds, 3), dtype=np.uint8)
|
||
self._use_fast_avg = interpolation_mode == "average"
|
||
|
||
# Pre-compute offset-adjusted index arrays per segment (avoids np.roll)
|
||
offset = calibration.offset % total_leds if total_leds > 0 else 0
|
||
self._segment_indices: List[np.ndarray] = []
|
||
for segment in calibration.segments:
|
||
indices = np.arange(segment.led_start, segment.led_start + segment.led_count)
|
||
if segment.reverse:
|
||
indices = indices[::-1]
|
||
if offset > 0:
|
||
indices = (indices + offset) % total_leds
|
||
self._segment_indices.append(indices)
|
||
|
||
# Pre-compute Phase 3 skip — linear interpolation by precomputed
|
||
# floor/ceil indices and fractional weights. Per-frame work is
|
||
# entirely write-in-place into pre-allocated scratch buffers.
|
||
_build_skip_buffers(self, calibration, total_leds)
|
||
|
||
# Per-edge average computation cache (lazy-initialized on first frame)
|
||
self._edge_cache: Dict[str, tuple] = {}
|
||
|
||
logger.info(
|
||
f"Initialized pixel mapper with {total_leds} LEDs "
|
||
f"using {interpolation_mode} interpolation"
|
||
)
|
||
|
||
def _get_edge_pixels(self, border_pixels: BorderPixels, edge_name: str) -> np.ndarray:
|
||
"""Get edge pixel array with span slicing applied."""
|
||
if edge_name == "top":
|
||
edge_pixels = border_pixels.top
|
||
elif edge_name == "right":
|
||
edge_pixels = border_pixels.right
|
||
elif edge_name == "bottom":
|
||
edge_pixels = border_pixels.bottom
|
||
else:
|
||
edge_pixels = border_pixels.left
|
||
|
||
span_start, span_end = self.calibration.get_edge_span(edge_name)
|
||
if span_start > 0.0 or span_end < 1.0:
|
||
if edge_name in ("top", "bottom"):
|
||
total_w = edge_pixels.shape[1]
|
||
s, e = int(span_start * total_w), int(span_end * total_w)
|
||
edge_pixels = edge_pixels[:, s:e, :]
|
||
else:
|
||
total_h = edge_pixels.shape[0]
|
||
s, e = int(span_start * total_h), int(span_end * total_h)
|
||
edge_pixels = edge_pixels[s:e, :, :]
|
||
return edge_pixels
|
||
|
||
def _map_edge_fallback(
|
||
self, edge_pixels: np.ndarray, edge_name: str, led_count: int
|
||
) -> np.ndarray:
|
||
"""Per-LED color mapping for median/dominant modes. Returns (led_count, 3) uint8."""
|
||
return fallback_edge_to_leds(edge_pixels, edge_name, led_count, self._calc_color)
|
||
|
||
def _map_edge_average(
|
||
self, edge_pixels: np.ndarray, edge_name: str, led_count: int
|
||
) -> np.ndarray:
|
||
"""Vectorized average-color mapping for one edge. Returns (led_count, 3) uint8.
|
||
|
||
Scratch buffers are cached on ``self._edge_cache`` keyed by edge name;
|
||
the shared kernel handles all allocations on first use.
|
||
"""
|
||
return average_edge_to_leds(edge_pixels, edge_name, led_count, self._edge_cache, edge_name)
|
||
|
||
def map_border_to_leds(self, border_pixels: BorderPixels) -> np.ndarray:
|
||
"""Map screen border pixels to LED colors.
|
||
|
||
Uses pre-allocated buffers and pre-computed index arrays to avoid
|
||
per-frame allocations (np.zeros, np.roll, np.arange, np.linspace).
|
||
|
||
Args:
|
||
border_pixels: Extracted border pixels from screen
|
||
|
||
Returns:
|
||
numpy array of shape (total_leds, 3), dtype uint8
|
||
|
||
Raises:
|
||
ValueError: If border pixels don't match calibration
|
||
"""
|
||
led_array = self._led_buf
|
||
led_array[:] = 0
|
||
|
||
# Phase 1+2: Map edges and place at offset-adjusted positions (no np.roll)
|
||
for i, segment in enumerate(self.calibration.segments):
|
||
edge_pixels = self._get_edge_pixels(border_pixels, segment.edge)
|
||
|
||
if self._use_fast_avg:
|
||
colors = self._map_edge_average(edge_pixels, segment.edge, segment.led_count)
|
||
else:
|
||
colors = self._map_edge_fallback(edge_pixels, segment.edge, segment.led_count)
|
||
|
||
led_array[self._segment_indices[i]] = colors
|
||
|
||
# Phase 3: physical skip — resample full perimeter into active LEDs
|
||
# using precomputed weights, all in-place.
|
||
_apply_skip_resample(self, led_array)
|
||
|
||
return led_array
|
||
|
||
def test_calibration(
|
||
self, edge: str, color: Tuple[int, int, int]
|
||
) -> List[Tuple[int, int, int]]:
|
||
"""Generate test pattern to light up specific edge.
|
||
|
||
Useful for verifying calibration configuration.
|
||
|
||
Args:
|
||
edge: Edge to light up (top, right, bottom, left)
|
||
color: RGB color to use
|
||
|
||
Returns:
|
||
List of LED colors with only the specified edge lit
|
||
|
||
Raises:
|
||
ValueError: If edge is not in calibration
|
||
"""
|
||
segment = self.calibration.get_segment_for_edge(edge)
|
||
if not segment:
|
||
raise ValueError(f"Edge '{edge}' not found in calibration")
|
||
|
||
total_leds = self.calibration.get_total_leds()
|
||
led_colors = [(0, 0, 0)] * total_leds
|
||
|
||
# Light up the specified edge
|
||
led_indices = range(segment.led_start, segment.led_start + segment.led_count)
|
||
for led_idx in led_indices:
|
||
led_colors[led_idx] = color
|
||
|
||
logger.info(f"Generated test pattern for {edge} edge with color {color}")
|
||
return led_colors
|
||
|
||
|
||
class AdvancedPixelMapper:
|
||
"""Maps multi-source screen pixels to LED colors for advanced calibration.
|
||
|
||
Each CalibrationLine references a picture source and an edge, with its own
|
||
span and border_width. Frames from multiple sources are passed in as a dict.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
calibration: CalibrationConfig,
|
||
interpolation_mode: Literal["average", "median", "dominant"] = "average",
|
||
):
|
||
self.calibration = calibration
|
||
self.interpolation_mode = interpolation_mode
|
||
calibration.validate()
|
||
|
||
if interpolation_mode == "average":
|
||
self._calc_color = calculate_average_color
|
||
elif interpolation_mode == "median":
|
||
self._calc_color = calculate_median_color
|
||
elif interpolation_mode == "dominant":
|
||
self._calc_color = calculate_dominant_color
|
||
else:
|
||
raise ValueError(f"Invalid interpolation mode: {interpolation_mode}")
|
||
|
||
total_leds = calibration.get_total_leds()
|
||
self._total_leds = total_leds
|
||
self._led_buf = np.zeros((total_leds, 3), dtype=np.uint8)
|
||
self._use_fast_avg = interpolation_mode == "average"
|
||
|
||
# Build segment-like metadata from lines (led_start for each line)
|
||
offset = calibration.offset % total_leds if total_leds > 0 else 0
|
||
self._line_indices: List[np.ndarray] = []
|
||
led_start = 0
|
||
for line in calibration.lines:
|
||
indices = np.arange(led_start, led_start + line.led_count)
|
||
if line.reverse:
|
||
indices = indices[::-1]
|
||
if offset > 0:
|
||
indices = (indices + offset) % total_leds
|
||
self._line_indices.append(indices)
|
||
led_start += line.led_count
|
||
|
||
# Skip arrays — share the same buffer layout as PixelMapper
|
||
_build_skip_buffers(self, calibration, total_leds)
|
||
|
||
# Per-line edge cache (keyed by line index to avoid collision)
|
||
self._edge_cache: Dict[int, tuple] = {}
|
||
|
||
logger.info(
|
||
f"Initialized advanced pixel mapper with {total_leds} LEDs, "
|
||
f"{len(calibration.lines)} lines, {interpolation_mode} interpolation"
|
||
)
|
||
|
||
@staticmethod
|
||
def _extract_edge_strip(
|
||
frame: np.ndarray,
|
||
edge: str,
|
||
border_width: int,
|
||
span_start: float,
|
||
span_end: float,
|
||
) -> np.ndarray:
|
||
"""Extract a border strip from a frame for the given edge and span."""
|
||
h, w = frame.shape[:2]
|
||
bw = min(border_width, h // 4, w // 4)
|
||
|
||
if edge == "top":
|
||
strip = frame[:bw, :, :]
|
||
elif edge == "bottom":
|
||
strip = frame[-bw:, :, :]
|
||
elif edge == "right":
|
||
strip = frame[:, -bw:, :]
|
||
else: # left
|
||
strip = frame[:, :bw, :]
|
||
|
||
# Apply span
|
||
if span_start > 0.0 or span_end < 1.0:
|
||
if edge in ("top", "bottom"):
|
||
total_w = strip.shape[1]
|
||
s, e = int(span_start * total_w), int(span_end * total_w)
|
||
strip = strip[:, s:e, :]
|
||
else:
|
||
total_h = strip.shape[0]
|
||
s, e = int(span_start * total_h), int(span_end * total_h)
|
||
strip = strip[s:e, :, :]
|
||
|
||
return strip
|
||
|
||
def _map_edge_average(
|
||
self,
|
||
edge_pixels: np.ndarray,
|
||
edge_name: str,
|
||
led_count: int,
|
||
cache_key: int,
|
||
) -> np.ndarray:
|
||
"""Vectorized average-color mapping; delegates to the shared kernel.
|
||
|
||
``cache_key`` is an integer (e.g. line index) so multiple per-line
|
||
edges can share the same ``self._edge_cache`` dict without colliding.
|
||
"""
|
||
return average_edge_to_leds(edge_pixels, edge_name, led_count, self._edge_cache, cache_key)
|
||
|
||
def _map_edge_fallback(
|
||
self,
|
||
edge_pixels: np.ndarray,
|
||
edge_name: str,
|
||
led_count: int,
|
||
) -> np.ndarray:
|
||
"""Per-LED color mapping for median/dominant modes; delegates to shared kernel."""
|
||
return fallback_edge_to_leds(edge_pixels, edge_name, led_count, self._calc_color)
|
||
|
||
def map_lines_to_leds(self, frames: Dict[str, np.ndarray]) -> np.ndarray:
|
||
"""Map multi-source frames to LED colors using calibration lines.
|
||
|
||
Args:
|
||
frames: Dict mapping picture_source_id to captured frame (H, W, 3) uint8
|
||
|
||
Returns:
|
||
numpy array of shape (total_leds, 3), dtype uint8
|
||
"""
|
||
led_array = self._led_buf
|
||
led_array[:] = 0
|
||
|
||
for i, line in enumerate(self.calibration.lines):
|
||
frame = frames.get(line.picture_source_id)
|
||
if frame is None:
|
||
continue
|
||
|
||
edge_pixels = self._extract_edge_strip(
|
||
frame,
|
||
line.edge,
|
||
line.border_width,
|
||
line.span_start,
|
||
line.span_end,
|
||
)
|
||
|
||
if self._use_fast_avg:
|
||
colors = self._map_edge_average(
|
||
edge_pixels,
|
||
line.edge,
|
||
line.led_count,
|
||
cache_key=i,
|
||
)
|
||
else:
|
||
colors = self._map_edge_fallback(
|
||
edge_pixels,
|
||
line.edge,
|
||
line.led_count,
|
||
)
|
||
|
||
led_array[self._line_indices[i]] = colors
|
||
|
||
# Phase 3: physical skip — same precomputed-weight resample as PixelMapper
|
||
_apply_skip_resample(self, led_array)
|
||
|
||
return led_array
|
||
|
||
|
||
def create_pixel_mapper(
|
||
calibration: CalibrationConfig,
|
||
interpolation_mode: str = "average",
|
||
):
|
||
"""Factory: create the right mapper for the calibration mode."""
|
||
if calibration.mode == "advanced":
|
||
return AdvancedPixelMapper(calibration, interpolation_mode)
|
||
return PixelMapper(calibration, interpolation_mode)
|
||
|
||
|
||
def create_default_calibration(
|
||
led_count: int,
|
||
aspect_width: int = 16,
|
||
aspect_height: int = 9,
|
||
) -> CalibrationConfig:
|
||
"""Create a default calibration for a rectangular screen.
|
||
|
||
Distributes LEDs proportionally to the screen aspect ratio so that
|
||
horizontal and vertical edges have equal LED density.
|
||
|
||
Args:
|
||
led_count: Total number of LEDs
|
||
aspect_width: Screen width component of the aspect ratio (default 16)
|
||
aspect_height: Screen height component of the aspect ratio (default 9)
|
||
|
||
Returns:
|
||
Default calibration configuration
|
||
"""
|
||
if led_count < 4:
|
||
raise ValueError("Need at least 4 LEDs for default calibration")
|
||
|
||
# Distribute LEDs proportionally to aspect ratio (same density per edge)
|
||
perimeter = 2 * (aspect_width + aspect_height)
|
||
h_frac = aspect_width / perimeter # fraction for each horizontal edge
|
||
v_frac = aspect_height / perimeter # fraction for each vertical edge
|
||
|
||
# Float counts, then round so total == led_count
|
||
raw_h = led_count * h_frac
|
||
raw_v = led_count * v_frac
|
||
bottom_count = round(raw_h)
|
||
top_count = round(raw_h)
|
||
right_count = round(raw_v)
|
||
left_count = round(raw_v)
|
||
|
||
# Fix rounding error
|
||
diff = led_count - (bottom_count + top_count + right_count + left_count)
|
||
# Distribute remainder to horizontal edges first (longer edges)
|
||
if diff > 0:
|
||
bottom_count += 1
|
||
diff -= 1
|
||
if diff > 0:
|
||
top_count += 1
|
||
diff -= 1
|
||
if diff > 0:
|
||
right_count += 1
|
||
diff -= 1
|
||
if diff > 0:
|
||
left_count += 1
|
||
diff -= 1
|
||
# If we over-counted, remove from shorter edges first
|
||
if diff < 0:
|
||
left_count += diff # diff is negative
|
||
diff = 0
|
||
if left_count < 0:
|
||
diff = left_count
|
||
left_count = 0
|
||
right_count += diff
|
||
|
||
# Ensure each edge has at least 1 LED
|
||
bottom_count = max(1, bottom_count)
|
||
top_count = max(1, top_count)
|
||
right_count = max(1, right_count)
|
||
left_count = max(1, left_count)
|
||
|
||
config = CalibrationConfig(
|
||
layout="clockwise",
|
||
start_position="bottom_left",
|
||
leds_bottom=bottom_count,
|
||
leds_right=right_count,
|
||
leds_top=top_count,
|
||
leds_left=left_count,
|
||
)
|
||
|
||
logger.info(
|
||
f"Created default calibration for {led_count} LEDs "
|
||
f"(aspect {aspect_width}:{aspect_height}): "
|
||
f"bottom={bottom_count}, right={right_count}, "
|
||
f"top={top_count}, left={left_count}"
|
||
)
|
||
|
||
return config
|
||
|
||
|
||
def calibration_from_dict(data: dict) -> CalibrationConfig:
|
||
"""Create calibration configuration from dictionary.
|
||
|
||
Args:
|
||
data: Dictionary with calibration data
|
||
|
||
Returns:
|
||
CalibrationConfig instance
|
||
|
||
Raises:
|
||
ValueError: If data is invalid
|
||
"""
|
||
try:
|
||
mode = data.get("mode", "simple")
|
||
|
||
if mode == "advanced":
|
||
lines_data = data.get("lines", [])
|
||
lines = [
|
||
CalibrationLine(
|
||
picture_source_id=ld["picture_source_id"],
|
||
edge=ld["edge"],
|
||
led_count=ld["led_count"],
|
||
span_start=ld.get("span_start", 0.0),
|
||
span_end=ld.get("span_end", 1.0),
|
||
reverse=ld.get("reverse", False),
|
||
border_width=ld.get("border_width", 10),
|
||
)
|
||
for ld in lines_data
|
||
]
|
||
config = CalibrationConfig(
|
||
mode="advanced",
|
||
lines=lines,
|
||
offset=data.get("offset", 0),
|
||
skip_leds_start=data.get("skip_leds_start", 0),
|
||
skip_leds_end=data.get("skip_leds_end", 0),
|
||
)
|
||
config.validate()
|
||
return config
|
||
|
||
# Simple mode (backward compat — missing "mode" key defaults here)
|
||
config = CalibrationConfig(
|
||
mode="simple",
|
||
layout=data["layout"],
|
||
start_position=data["start_position"],
|
||
offset=data.get("offset", 0),
|
||
leds_top=data.get("leds_top", 0),
|
||
leds_right=data.get("leds_right", 0),
|
||
leds_bottom=data.get("leds_bottom", 0),
|
||
leds_left=data.get("leds_left", 0),
|
||
span_top_start=data.get("span_top_start", 0.0),
|
||
span_top_end=data.get("span_top_end", 1.0),
|
||
span_right_start=data.get("span_right_start", 0.0),
|
||
span_right_end=data.get("span_right_end", 1.0),
|
||
span_bottom_start=data.get("span_bottom_start", 0.0),
|
||
span_bottom_end=data.get("span_bottom_end", 1.0),
|
||
span_left_start=data.get("span_left_start", 0.0),
|
||
span_left_end=data.get("span_left_end", 1.0),
|
||
skip_leds_start=data.get("skip_leds_start", 0),
|
||
skip_leds_end=data.get("skip_leds_end", 0),
|
||
border_width=data.get("border_width", 10),
|
||
)
|
||
|
||
config.validate()
|
||
return config
|
||
|
||
except KeyError as e:
|
||
raise ValueError(f"Missing required calibration field: {e}")
|
||
except Exception as e:
|
||
if isinstance(e, ValueError):
|
||
raise
|
||
raise ValueError(f"Invalid calibration data: {e}")
|
||
|
||
|
||
def calibration_to_dict(config: CalibrationConfig) -> dict:
|
||
"""Convert calibration configuration to dictionary.
|
||
|
||
Args:
|
||
config: Calibration configuration
|
||
|
||
Returns:
|
||
Dictionary representation
|
||
"""
|
||
if config.mode == "advanced":
|
||
result: dict = {
|
||
"mode": "advanced",
|
||
"lines": [
|
||
{
|
||
"picture_source_id": line.picture_source_id,
|
||
"edge": line.edge,
|
||
"led_count": line.led_count,
|
||
"span_start": line.span_start,
|
||
"span_end": line.span_end,
|
||
"reverse": line.reverse,
|
||
"border_width": line.border_width,
|
||
}
|
||
for line in config.lines
|
||
],
|
||
}
|
||
if config.offset != 0:
|
||
result["offset"] = config.offset
|
||
if config.skip_leds_start > 0:
|
||
result["skip_leds_start"] = config.skip_leds_start
|
||
if config.skip_leds_end > 0:
|
||
result["skip_leds_end"] = config.skip_leds_end
|
||
return result
|
||
|
||
# Simple mode
|
||
result = {
|
||
"mode": "simple",
|
||
"layout": config.layout,
|
||
"start_position": config.start_position,
|
||
"offset": config.offset,
|
||
"leds_top": config.leds_top,
|
||
"leds_right": config.leds_right,
|
||
"leds_bottom": config.leds_bottom,
|
||
"leds_left": config.leds_left,
|
||
}
|
||
# Include span fields only when not default (full coverage)
|
||
for edge in ["top", "right", "bottom", "left"]:
|
||
start = getattr(config, f"span_{edge}_start")
|
||
end = getattr(config, f"span_{edge}_end")
|
||
if start != 0.0 or end != 1.0:
|
||
result[f"span_{edge}_start"] = start
|
||
result[f"span_{edge}_end"] = end
|
||
# Include skip fields only when non-default
|
||
if config.skip_leds_start > 0:
|
||
result["skip_leds_start"] = config.skip_leds_start
|
||
if config.skip_leds_end > 0:
|
||
result["skip_leds_end"] = config.skip_leds_end
|
||
if config.border_width != 10:
|
||
result["border_width"] = config.border_width
|
||
return result
|