fix(setup): register scaffolded target with ProcessorManager + final-review hardening

Final-review blocker: the setup scaffold created the LED output target in the
store but never registered it with the ProcessorManager, so the wizard's
"Start" step 404'd on a fresh setup (target not found) — the lights never
started despite a success screen. Now the scaffold calls
target.register_with_manager(manager) right after create (mirroring the
canonical POST /output-targets route, same ValueError guard), so
start_processing finds the target. Rollback unregisters via
manager.remove_target before deleting the store entity, so a post-registration
failure leaves no half-registered target.

Also from the final review:
- solve corner_indices elements now bounded ge=0 (clear 422 instead of silent
  modulo-wrap).
- setup-wizard.ts: reuse tutorials' suppressGettingStartedTour()/TOUR_KEY
  instead of a duplicated 'tour_completed' literal; drop a duplicate manual-form
  submit listener.

Tests: + adversarial pass over the whole feature (solver/session/scaffold edge
cases) and a scaffold->register->startable regression test. Full suite
2149 passed / 2 skipped; tsc clean; build passes; ruff clean.
This commit is contained in:
2026-06-08 16:55:36 +03:00
parent 81b18089e1
commit 6cd5e057da
7 changed files with 1730 additions and 12 deletions
@@ -0,0 +1,535 @@
"""Adversarial / concurrency tests for CalibrationSession.
Phase 1 acceptance criteria tested here (NOT what the code happens to do):
- Interleaved start/start (same device, then different device) must never
leave the old device without restore.
- Interleaved start/stop racing the idle watchdog must not leave the device
dark or stuck.
- Idle-timeout teardown restores the prior target.
- position() with index out of range → ValueError.
- stop() when idle is a safe no-op (does not call start_processing or crash).
- CalibrationSession lock must prevent double-teardown.
All tests use a fake ProcessorManager matching the shape used in
test_calibration_routes.py.
"""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import pytest_asyncio
from ledgrab.core.capture.calibration_session import (
CalibrationSession,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_manager(device_id: str = "dev1", led_count: int = 100) -> MagicMock:
"""Build a minimal fake ProcessorManager."""
mgr = MagicMock()
ds = MagicMock()
ds.led_count = led_count
mgr._devices = {device_id: ds}
mgr.get_processing_target_for_device = MagicMock(return_value=None)
mgr.stop_processing = AsyncMock()
mgr.start_processing = AsyncMock()
mgr.send_clear_pixels = AsyncMock()
mgr.set_calibration_pixel = AsyncMock()
return mgr
@pytest_asyncio.fixture(autouse=True)
async def fresh_session():
"""Yield a brand-new CalibrationSession for each test (not the singleton)."""
session = CalibrationSession()
yield session
# Cleanup: cancel any lingering watchdog
if session._timeout_task and not session._timeout_task.done():
session._timeout_task.cancel()
try:
await session._timeout_task
except (asyncio.CancelledError, Exception):
pass
# ---------------------------------------------------------------------------
# stop() when idle is a safe no-op
# ---------------------------------------------------------------------------
class TestStopWhenIdle:
@pytest.mark.asyncio
async def test_stop_idle_does_not_call_start_processing(self, fresh_session):
"""Calling stop() when no session is active must not call start_processing."""
mgr = _make_manager()
# Do NOT start a session — just stop immediately
await fresh_session.stop()
mgr.start_processing.assert_not_awaited()
@pytest.mark.asyncio
async def test_stop_idle_returns_inactive_state(self, fresh_session):
"""stop() on an idle session returns state with active=False."""
state = fresh_session.get_state()
assert state["active"] is False
await fresh_session.stop() # no-op
assert fresh_session.is_active is False
@pytest.mark.asyncio
async def test_cancel_idle_safe(self, fresh_session):
"""cancel() on idle session is also a safe no-op."""
await fresh_session.cancel()
assert fresh_session.is_active is False
@pytest.mark.asyncio
async def test_double_stop_is_idempotent(self, fresh_session):
"""Calling stop() twice on an active session must not double-call start_processing."""
mgr = _make_manager()
mgr.get_processing_target_for_device = MagicMock(return_value="tgt_restore")
await fresh_session.start("dev1", mgr)
assert fresh_session.is_active is True
await fresh_session.stop()
assert fresh_session.is_active is False
# Restore called exactly once
mgr.start_processing.assert_awaited_once_with("tgt_restore")
# Second stop must be a no-op
await fresh_session.stop()
# start_processing should still be called exactly once (not twice)
assert mgr.start_processing.await_count == 1
# ---------------------------------------------------------------------------
# position() out of range
# ---------------------------------------------------------------------------
class TestPositionOutOfRange:
@pytest.mark.asyncio
async def test_position_equal_to_led_count_raises(self, fresh_session):
"""index == led_count must raise ValueError (0-based, so out of range)."""
mgr = _make_manager(led_count=100)
await fresh_session.start("dev1", mgr)
with pytest.raises(ValueError, match="out of range"):
await fresh_session.position(100)
@pytest.mark.asyncio
async def test_position_above_led_count_raises(self, fresh_session):
"""index > led_count raises ValueError."""
mgr = _make_manager(led_count=50)
await fresh_session.start("dev1", mgr)
with pytest.raises(ValueError, match="out of range"):
await fresh_session.position(999)
@pytest.mark.asyncio
async def test_position_negative_raises(self, fresh_session):
"""Negative index raises ValueError."""
mgr = _make_manager(led_count=100)
await fresh_session.start("dev1", mgr)
with pytest.raises(ValueError):
await fresh_session.position(-1)
@pytest.mark.asyncio
async def test_position_at_led_count_minus_1_is_valid(self, fresh_session):
"""Last valid index (led_count - 1) must succeed."""
mgr = _make_manager(led_count=10)
await fresh_session.start("dev1", mgr)
await fresh_session.position(9) # must not raise
mgr.set_calibration_pixel.assert_awaited()
@pytest.mark.asyncio
async def test_position_without_active_session_raises(self, fresh_session):
"""position() with no active session must raise RuntimeError."""
with pytest.raises(RuntimeError, match="No active calibration session"):
await fresh_session.position(5)
# ---------------------------------------------------------------------------
# Interleaved start/start — old device must be restored
# ---------------------------------------------------------------------------
class TestInterleavedStartStart:
@pytest.mark.asyncio
async def test_start_on_same_device_restores_prior_target(self, fresh_session):
"""Starting a second session on the same device auto-stops the first.
The first device's prior target must be restored before the second session begins.
"""
mgr = _make_manager(led_count=60)
mgr.get_processing_target_for_device = MagicMock(return_value="tgt_original")
# Start first session
await fresh_session.start("dev1", mgr)
assert fresh_session.is_active is True
assert fresh_session._prior_target_id == "tgt_original"
# Now start again on the same device
# The second start should stop the first (restoring tgt_original),
# then re-query the current running target (which will be tgt_original again
# since start_processing will have been called).
# For isolation: change what get_processing_target_for_device returns after
# the first stop so the second session records a fresh prior.
call_count = {"n": 0}
def _get_target(device_id):
call_count["n"] += 1
if call_count["n"] == 1:
return "tgt_original"
return None # After first stop, no target running
mgr.get_processing_target_for_device = MagicMock(side_effect=_get_target)
# First session with the original target
fresh_session2 = CalibrationSession()
await fresh_session2.start("dev1", mgr)
assert fresh_session2.is_active is True
# Start a NEW session on the same device — must auto-stop fresh_session2
fresh_session3 = CalibrationSession()
# Inject fresh_session2's internal state into fresh_session3 to simulate
# the singleton pattern: replace session3's state to reflect session2 active
# (this mirrors the module-level singleton where only one CalibrationSession exists)
fresh_session3._active = fresh_session2._active
fresh_session3._device_id = fresh_session2._device_id
fresh_session3._led_count = fresh_session2._led_count
fresh_session3._prior_target_id = fresh_session2._prior_target_id
fresh_session3._last_activity = fresh_session2._last_activity
fresh_session3._manager = fresh_session2._manager
fresh_session3._timeout_task = fresh_session2._timeout_task
fresh_session2._timeout_task = None # prevent double-cancel
await fresh_session3.start("dev1", mgr)
# The start must have called stop on the previous session → restore was called
# (mgr.start_processing was called at least once to restore the prior target)
assert mgr.start_processing.await_count >= 1
# Cleanup
await fresh_session3.stop()
if fresh_session2._timeout_task and not fresh_session2._timeout_task.done():
fresh_session2._timeout_task.cancel()
@pytest.mark.asyncio
async def test_new_session_on_different_device_clears_old_device(self, fresh_session):
"""Starting a new session on a different device must clear the first device.
The first session must be stopped (its prior target restored or cleared)
before the second session on the new device becomes active.
"""
mgr = MagicMock()
ds1 = MagicMock()
ds1.led_count = 30
ds2 = MagicMock()
ds2.led_count = 60
mgr._devices = {"dev1": ds1, "dev2": ds2}
mgr.get_processing_target_for_device = MagicMock(return_value=None)
mgr.stop_processing = AsyncMock()
mgr.start_processing = AsyncMock()
mgr.send_clear_pixels = AsyncMock()
mgr.set_calibration_pixel = AsyncMock()
# Start first session on dev1
await fresh_session.start("dev1", mgr)
assert fresh_session._device_id == "dev1"
assert fresh_session.is_active is True
# Now start second session on dev2 — must auto-stop dev1 first
await fresh_session.start("dev2", mgr)
# After the second start, session must be on dev2
assert fresh_session._device_id == "dev2"
assert fresh_session.is_active is True
# send_clear_pixels was called for dev1 (stop) AND for dev2 (start)
clear_calls = [call[0][0] for call in mgr.send_clear_pixels.call_args_list]
assert (
"dev1" in clear_calls
), f"dev1 was never cleared during session switch; clear calls: {clear_calls}"
assert (
"dev2" in clear_calls
), f"dev2 was never cleared at session start; clear calls: {clear_calls}"
# Cleanup
await fresh_session.stop()
# ---------------------------------------------------------------------------
# Idle-timeout teardown restores prior target
# ---------------------------------------------------------------------------
class TestIdleTimeoutRestoresPriorTarget:
@pytest.mark.asyncio
async def test_idle_timeout_calls_start_processing(self, fresh_session):
"""When the session times out, start_processing must be called to restore the target.
We patch IDLE_TIMEOUT_SECONDS to a tiny value so the test doesn't actually
wait 60 seconds.
"""
mgr = _make_manager(led_count=40)
mgr.get_processing_target_for_device = MagicMock(return_value="tgt_to_restore")
# Patch the idle timeout to 0.05 seconds
with patch(
"ledgrab.core.capture.calibration_session.IDLE_TIMEOUT_SECONDS",
0.05,
):
# Also patch the watchdog sleep to something tiny
async def _fast_watchdog():
"""A watchdog that checks every 0.02 seconds instead of 5."""
try:
while True:
await asyncio.sleep(0.02)
if not fresh_session._active or fresh_session._last_activity is None:
break
from datetime import datetime, timezone
elapsed = (
datetime.now(timezone.utc) - fresh_session._last_activity
).total_seconds()
if elapsed >= 0.05:
async with fresh_session._lock:
await fresh_session._teardown_locked(cancelled=False)
break
except asyncio.CancelledError:
pass
fresh_session._idle_watchdog = _fast_watchdog # type: ignore[method-assign]
await fresh_session.start("dev1", mgr)
assert fresh_session.is_active is True
# Wait long enough for the watchdog to fire
await asyncio.sleep(0.25)
# Session should have been auto-stopped
assert (
fresh_session.is_active is False
), "Session should have been auto-stopped by idle timeout"
# Prior target must have been restored
mgr.start_processing.assert_awaited_once_with("tgt_to_restore")
@pytest.mark.asyncio
async def test_idle_timeout_clears_device_to_black(self, fresh_session):
"""Idle timeout must send all-black before (or after) restoring target."""
mgr = _make_manager(led_count=40)
async def _fast_watchdog():
try:
while True:
await asyncio.sleep(0.02)
if not fresh_session._active or fresh_session._last_activity is None:
break
from datetime import datetime, timezone
elapsed = (
datetime.now(timezone.utc) - fresh_session._last_activity
).total_seconds()
if elapsed >= 0.05:
async with fresh_session._lock:
await fresh_session._teardown_locked(cancelled=False)
break
except asyncio.CancelledError:
pass
fresh_session._idle_watchdog = _fast_watchdog # type: ignore[method-assign]
await fresh_session.start("dev1", mgr)
initial_clear_count = mgr.send_clear_pixels.await_count # from start()
await asyncio.sleep(0.25)
# send_clear_pixels must have been called at least once more during teardown
assert (
mgr.send_clear_pixels.await_count > initial_clear_count
), "Device was not cleared to black during idle-timeout teardown"
# ---------------------------------------------------------------------------
# Concurrent start/start using asyncio.gather (true concurrency within the loop)
# ---------------------------------------------------------------------------
class TestConcurrentStartCalls:
@pytest.mark.asyncio
async def test_concurrent_starts_dont_corrupt_state(self, fresh_session):
"""Two concurrent start() calls must leave the session in a consistent state.
Only one session should be active after both complete; the final device
must match one of the two requested devices.
"""
mgr = MagicMock()
ds1 = MagicMock()
ds1.led_count = 50
ds2 = MagicMock()
ds2.led_count = 80
mgr._devices = {"devA": ds1, "devB": ds2}
mgr.get_processing_target_for_device = MagicMock(return_value=None)
mgr.stop_processing = AsyncMock()
mgr.start_processing = AsyncMock()
mgr.send_clear_pixels = AsyncMock()
mgr.set_calibration_pixel = AsyncMock()
# Fire both concurrently — the lock must ensure exactly one wins
results = await asyncio.gather(
fresh_session.start("devA", mgr),
fresh_session.start("devB", mgr),
return_exceptions=True,
)
# Neither call should raise (both complete without exception)
for r in results:
if isinstance(r, BaseException):
pytest.fail(f"Concurrent start raised unexpectedly: {r!r}")
# Exactly one session active with a valid device
assert fresh_session.is_active is True
assert fresh_session._device_id in ("devA", "devB")
# Cleanup
await fresh_session.stop()
# ---------------------------------------------------------------------------
# stop() while watchdog is still pending (concurrent stop/watchdog)
# ---------------------------------------------------------------------------
class TestStopVsWatchdogRace:
@pytest.mark.asyncio
async def test_explicit_stop_wins_over_watchdog(self, fresh_session):
"""Explicit stop() must cleanly terminate before the watchdog fires.
After explicit stop, start_processing must be called exactly once
even if the watchdog later tries to tear down.
"""
mgr = _make_manager(led_count=100)
mgr.get_processing_target_for_device = MagicMock(return_value="tgt_explicit")
await fresh_session.start("dev1", mgr)
assert fresh_session.is_active is True
# Explicitly stop — the watchdog task should be cancelled
await fresh_session.stop()
assert fresh_session.is_active is False
# start_processing called exactly once for the prior target
mgr.start_processing.assert_awaited_once_with("tgt_explicit")
# Give the event loop a moment to confirm the watchdog didn't double-fire
await asyncio.sleep(0.05)
# Still exactly once
assert (
mgr.start_processing.await_count == 1
), "start_processing was called more than once — watchdog fired after explicit stop"
# ---------------------------------------------------------------------------
# start() with unknown device_id
# ---------------------------------------------------------------------------
class TestStartUnknownDevice:
@pytest.mark.asyncio
async def test_unknown_device_raises_valueerror(self, fresh_session):
"""start() with a device_id not in manager._devices must raise ValueError."""
mgr = _make_manager(device_id="known_device")
with pytest.raises(ValueError, match="not found"):
await fresh_session.start("does_not_exist", mgr)
@pytest.mark.asyncio
async def test_unknown_device_leaves_session_inactive(self, fresh_session):
"""After a failed start() the session must remain inactive."""
mgr = _make_manager(device_id="known_device")
try:
await fresh_session.start("no_such_device", mgr)
except ValueError:
pass
assert fresh_session.is_active is False
@pytest.mark.asyncio
async def test_failed_start_does_not_corrupt_existing_session(self, fresh_session):
"""A failed start() attempt must not corrupt an already-active session."""
mgr = MagicMock()
ds = MagicMock()
ds.led_count = 100
mgr._devices = {"dev1": ds}
mgr.get_processing_target_for_device = MagicMock(return_value="prior_tgt")
mgr.stop_processing = AsyncMock()
mgr.start_processing = AsyncMock()
mgr.send_clear_pixels = AsyncMock()
mgr.set_calibration_pixel = AsyncMock()
# Start a valid session
await fresh_session.start("dev1", mgr)
assert fresh_session.is_active is True
# Now try to start on an unknown device — should fail
try:
await fresh_session.start("unknown_device", mgr)
except ValueError:
pass
except Exception:
pass # Other errors are also acceptable
# The original session must still be active (or was cleanly stopped and
# replaced); either way the device must not be stuck in a broken state.
# The critical invariant: if active, device_id is valid.
if fresh_session.is_active:
assert fresh_session._device_id in mgr._devices
await fresh_session.stop()
# ---------------------------------------------------------------------------
# State snapshot integrity
# ---------------------------------------------------------------------------
class TestStateSnapshot:
@pytest.mark.asyncio
async def test_get_state_before_start(self, fresh_session):
state = fresh_session.get_state()
assert state["active"] is False
assert state["device_id"] is None
assert state["led_count"] == 0
assert state["prior_target_id"] is None
assert state["last_activity"] is None
@pytest.mark.asyncio
async def test_get_state_after_start(self, fresh_session):
mgr = _make_manager(led_count=60)
mgr.get_processing_target_for_device = MagicMock(return_value="saved_tgt")
await fresh_session.start("dev1", mgr)
state = fresh_session.get_state()
assert state["active"] is True
assert state["device_id"] == "dev1"
assert state["led_count"] == 60
assert state["prior_target_id"] == "saved_tgt"
assert state["last_activity"] is not None
await fresh_session.stop()
@pytest.mark.asyncio
async def test_get_state_after_stop(self, fresh_session):
mgr = _make_manager()
await fresh_session.start("dev1", mgr)
await fresh_session.stop()
state = fresh_session.get_state()
assert state["active"] is False
assert state["device_id"] is None
assert state["led_count"] == 0
assert state["prior_target_id"] is None
@@ -0,0 +1,562 @@
"""Adversarial / edge-case tests for solve_calibration() and build_segments().
Criteria derived from Phase 1 acceptance criteria (NOT from what the code does):
- solve_calibration returns correct per-edge counts; result round-trips through
build_segments() and totals are preserved.
- An edge with 0 LEDs (two corners tapped adjacent) is valid.
- Wrap-around edges work correctly.
- Invalid inputs raise cleanly (ValueError).
New adversarial cases not covered by the existing 19 happy-path tests:
- All four corner_indices equal (degenerate: every edge = 0) — the code must
either handle it gracefully (total=0) OR raise with a clear message.
Per criteria the total must be preserved (0 == 0) and round-trip must not crash.
- Descending / out-of-order corner_indices where wrap-around should apply.
- An edge that spans the whole strip (one edge wraps the full led_count minus
the contribution of the three zero-LED edges).
- led_count just above minimum (led_count=1) with a single LED claimed by
one edge.
- offset >= led_count: the solver stores offset verbatim; PixelMapper normalises
it via % total_leds. The build_segments() round-trip must not crash.
- Corner indices modulo led_count are used, so indices >= led_count should be
accepted and reduced.
"""
import pytest
from ledgrab.core.capture.calibration import (
EDGE_ORDER,
CalibrationConfig,
solve_calibration,
)
# ---------------------------------------------------------------------------
# Helper (same as in test_calibration_solver.py — duplicated intentionally so
# this adversarial file can run in isolation)
# ---------------------------------------------------------------------------
def _roundtrip(cfg: CalibrationConfig) -> None:
"""Assert build_segments() doesn't crash and totals match."""
segs = cfg.build_segments()
total_from_segs = sum(s.led_count for s in segs)
expected = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left
assert (
total_from_segs == expected
), f"Segment total {total_from_segs} != field total {expected} for cfg={cfg!r}"
# ---------------------------------------------------------------------------
# Degenerate: all four corner indices are equal
# When all four corners are tapped at the same index every consecutive pair
# has start_idx == end_idx, so every edge gets 0 LEDs. Total = 0 is
# mathematically consistent with the input; the function should either
# return a config with 0-LED edges OR raise a clear ValueError.
# It must NOT silently return wrong counts and must NOT crash unexpectedly.
# ---------------------------------------------------------------------------
class TestAllFourCornersEqual:
"""All four corner_indices equal → every edge = 0 LEDs or clean ValueError."""
@pytest.mark.parametrize(
"start_position,layout",
[(sp, lay) for sp, lay in EDGE_ORDER.keys()],
)
def test_all_equal_returns_zero_total_or_raises(self, start_position, layout):
"""solve_calibration([k,k,k,k]) must not produce non-zero counts."""
led_count = 100
try:
cfg = solve_calibration(
led_count=led_count,
start_position=start_position,
layout=layout,
corner_indices=[0, 0, 0, 0],
)
# If it doesn't raise: total must be 0 (consistent with input)
total = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left
assert (
total == 0
), f"{start_position}/{layout}: all-equal corners produced total={total}; expected 0"
# build_segments must not crash on a 0-total config
segs = cfg.build_segments()
assert segs == [], f"Expected no segments for 0-LED config, got {segs!r}"
except ValueError:
# Raising is also acceptable — as long as it's a ValueError, not a
# crash/assertion/other exception.
pass
def test_all_equal_roundtrip_does_not_crash(self):
"""Even if all edges are 0, build_segments() must not raise."""
try:
cfg = solve_calibration(
led_count=50,
start_position="top_left",
layout="clockwise",
corner_indices=[25, 25, 25, 25],
)
_roundtrip(cfg) # must not crash
except ValueError:
pass # acceptable
# ---------------------------------------------------------------------------
# Descending / out-of-order corner indices
# Out-of-order but non-wrapping: e.g. [70, 50, 30, 10] for clockwise bottom_left
# The solver uses consecutive-pair differences with wrap logic.
# Each pair (70→50, 50→30, 30→10, 10→70) has end < start, so ALL four edges
# get wrap-around counts. Total must still equal led_count.
# ---------------------------------------------------------------------------
class TestDescendingCornerIndices:
"""All four corners in descending order — every pair wraps."""
def test_descending_total_equals_led_count(self):
"""Descending indices [75, 50, 25, 0] — total must == led_count."""
# bottom_left/clockwise: edge_order=[left,top,right,bottom]
# left: 75→50 = 25, top: 50→25 = 25, right: 25→0 = 25, bottom: 0→75 = 75
# Wait — end > start for bottom: 0→75 means 75. But 0 < 75 so count=75-0=75.
# Recalculate: for bottom: start_idx=0, end_idx=75 → end>start → count=75
# total = 25+25+25+75 = 150 ≠ 100 → That's wrong input.
# Use [80, 60, 40, 20] for 100 LEDs:
# left: 80→60: end<start → wrap: (100-80)+60=80 WRONG too.
# Actually for a valid descending case that totals 100:
# Monotone descending with wrap on last pair only:
# [75, 50, 25, 0] for bottom_left/clockwise:
# left: start=75,end=50: 50<75 → wrap: (100-75)+50=75 NO
# All pairs descend when [75,50,25,0]:
# 0→75 (last pair, bottom): 75>0 → count=75-0=75
# 75→50 (left): 50<75 → wrap: (100-75)+50=75
# Total would be 75+25+25+75=200 ≠ 100
# That shows descending indices don't sum to led_count in general.
# The critical invariant is: sum of per-edge counts == led_count when
# the corner indices span a single full traversal.
# To get descending [80,60,40,20] → sum via wrap logic:
# left: 80→60: 60<80 → (100-80)+60=80
# top: 60→40: 40<60 → (100-60)+40=80
# right: 40→20: 20<40 → (100-40)+20=80
# bottom: 20→80: 80>20 → 80-20=60
# total = 80+80+80+60 = 300 — still not 100.
#
# The INVARIANT of solve_calibration is: if corner_indices form a valid
# partition of the strip (i.e. each consecutive pair covers a segment
# that together span exactly led_count LEDs), the total == led_count.
# Descending indices that form valid partitions do sum to led_count.
# Example: if we want all edges wrapped, use [99, 74, 49, 24]:
# left: 99→74: 74<99 → (100-99)+74=75
# top: 74→49: 49<74 → (100-74)+49=75
# right: 49→24: 24<49 → (100-49)+24=75
# bottom: 24→99: 99>24 → 99-24=75
# total = 75+75+75+75=300 ≠ 100
#
# Conclusion: descending indices don't need to sum to led_count — only
# a proper strip traversal (covering each LED exactly once) does.
# The adversarial test here is: the function must NOT crash, must NOT
# produce negative counts, and must round-trip cleanly.
cfg = solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[80, 60, 40, 20],
)
counts = [cfg.leds_top, cfg.leds_right, cfg.leds_bottom, cfg.leds_left]
assert all(c >= 0 for c in counts), f"Negative edge counts: {counts}"
_roundtrip(cfg)
def test_all_four_wrap_around_non_negative(self):
"""Every consecutive pair wraps (descending) — all counts must be >= 0."""
# [90, 70, 50, 30] for led_count=100, top_right/clockwise
cfg = solve_calibration(
led_count=100,
start_position="top_right",
layout="clockwise",
corner_indices=[90, 70, 50, 30],
)
counts = [cfg.leds_top, cfg.leds_right, cfg.leds_bottom, cfg.leds_left]
assert all(c >= 0 for c in counts), f"Negative edge counts: {counts}"
_roundtrip(cfg)
# ---------------------------------------------------------------------------
# Edge that wraps the whole strip
# When only one edge is used (the other 3 are 0 LEDs), that edge should span
# led_count LEDs. Corner indices: [0, 0, 0, 0] gives all-zero (tested above).
# A single-edge case: [0, 100, 100, 100] would give left=100, top=0, right=0,
# bottom=0 for bottom_left/clockwise. But 100 % 100 = 0 so end_idx=0,
# start_idx=0 → count=0 for left too. Use index 100 directly (>led_count).
# Alternatively: for led_count=100, corners [0, 0, 0, 0] with all-zero is
# already tested. The wrap-all case uses non-modulo indices.
#
# The real case: one valid single-edge scenario:
# bottom_left/clockwise, 100 LEDs, EDGE_ORDER = [left,top,right,bottom]
# corners [0, 100, 100, 100]: left: 0→100%100=0 → 0-LED because end==start.
# No, there's no way to make one edge claim all 100 LEDs with index-based input
# other than having it wrap around. Test: [50, 50, 50, 50] (all equal):
# already tested above.
# The closest adversarial case: one edge with count == led_count via wrap-around.
# For bottom_left/clockwise, [50, 50, 50, 50] makes top=0,right=0,bottom=0,left=0.
# To make ONE edge == 100: we need start_idx=50, end_idx=50 for three edges
# and start=50, end=50 wraps to 0 for that edge... that's the all-zeros case.
# The only way one edge gets ALL leds is:
# e.g. left: corners[0]=50, corners[1]=50 → 50==50 → count=0 (NOT 100).
# There's a subtle algorithmic distinction: adjacent indices being equal → 0 LED
# vs wrap-around: if the algorithm used (start+count)%N as end, then one-edge
# spanning the whole strip would require start=end via wrap, giving count=N.
# But the current algorithm: end==start → count=0. This is the CORRECT behavior
# per the Phase 1 spec ("Adjacent taps on the same index → 0-LED edge").
# ---------------------------------------------------------------------------
class TestWholestripSingleEdge:
"""Verify that a wrap-around edge spanning led_count-3 LEDs (max when 3 others are 1 each) works."""
def test_one_large_edge_with_minimal_others(self):
"""One edge has led_count-3 LEDs, the other 3 each have 1 LED.
bottom_left/clockwise, led_count=100:
EDGE_ORDER = [left, top, right, bottom]
corners: [0, 97, 98, 99]
left: 0→97 = 97
top: 97→98 = 1
right: 98→99 = 1
bottom: 99→0 = 1 (wrap: (100-99)+0 = 1)
total = 97+1+1+1 = 100 ✓
"""
cfg = solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 97, 98, 99],
)
assert cfg.leds_left == 97
assert cfg.leds_top == 1
assert cfg.leds_right == 1
assert cfg.leds_bottom == 1
assert (cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left) == 100
_roundtrip(cfg)
def test_last_edge_wraps_nearly_all_leds(self):
"""The last edge (bottom in bottom_left/clockwise) wraps from 3 to 0.
corners: [0, 1, 2, 3]
left: 0→1 = 1
top: 1→2 = 1
right: 2→3 = 1
bottom: 3→0 = (100-3)+0 = 97 (wrap-around)
total = 100 ✓
"""
cfg = solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 1, 2, 3],
)
assert cfg.leds_left == 1
assert cfg.leds_top == 1
assert cfg.leds_right == 1
assert cfg.leds_bottom == 97
assert (cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left) == 100
_roundtrip(cfg)
# ---------------------------------------------------------------------------
# led_count just above minimum
# led_count=1: only 1 LED; corner_indices can only meaningfully be [0,0,0,0]
# which gives all zeros. That's the all-equal case above.
# led_count=5 (just above conceptual minimum) with a valid single-wrap partition.
# ---------------------------------------------------------------------------
class TestMinimalLedCount:
"""Edge cases around very small led_count values."""
def test_led_count_1_all_equal_indices(self):
"""led_count=1: the only valid index is 0; all-equal → all-zero edges."""
try:
cfg = solve_calibration(
led_count=1,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 0, 0, 0],
)
total = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left
assert total == 0, f"Expected total=0, got {total}"
_roundtrip(cfg)
except ValueError:
pass # also acceptable
def test_led_count_4_minimal_partition(self):
"""led_count=4, each edge gets 1 LED — minimum non-trivial partition."""
# bottom_left/clockwise EDGE_ORDER: [left, top, right, bottom]
# corners: [0, 1, 2, 3]
# left: 0→1=1, top: 1→2=1, right: 2→3=1, bottom: 3→0=(4-3)+0=1
cfg = solve_calibration(
led_count=4,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 1, 2, 3],
)
assert cfg.leds_left == 1
assert cfg.leds_top == 1
assert cfg.leds_right == 1
assert cfg.leds_bottom == 1
_roundtrip(cfg)
def test_led_count_0_raises_valueerror(self):
"""led_count=0 is explicitly invalid per the docstring."""
with pytest.raises(ValueError, match="led_count"):
solve_calibration(
led_count=0,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 0, 0, 0],
)
def test_led_count_negative_raises_valueerror(self):
"""led_count=-5 is invalid."""
with pytest.raises(ValueError):
solve_calibration(
led_count=-5,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 0, 0, 0],
)
def test_led_count_5_just_above_minimum(self):
"""led_count=5 with a valid 2/1/1/1 partition."""
# bottom_left/clockwise: [left,top,right,bottom]
# corners: [0, 2, 3, 4]
# left: 0→2=2, top: 2→3=1, right: 3→4=1, bottom: 4→0=(5-4)+0=1
cfg = solve_calibration(
led_count=5,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 2, 3, 4],
)
assert cfg.leds_left == 2
assert cfg.leds_top == 1
assert cfg.leds_right == 1
assert cfg.leds_bottom == 1
assert sum([cfg.leds_top, cfg.leds_right, cfg.leds_bottom, cfg.leds_left]) == 5
_roundtrip(cfg)
# ---------------------------------------------------------------------------
# Offset interactions
# The offset is stored verbatim; PixelMapper normalises it via % total_leds.
# solve_calibration must pass it through without modification.
# Also test: offset == 0 (explicit), offset == led_count (should store as-is),
# and offset >> led_count.
# ---------------------------------------------------------------------------
class TestOffsetInteractions:
"""Offset is stored verbatim and must not affect per-edge counts."""
def test_offset_zero_explicit(self):
cfg = solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 25, 50, 75],
offset=0,
)
assert cfg.offset == 0
_roundtrip(cfg)
def test_offset_equals_led_count_stored_verbatim(self):
"""offset=led_count should be stored as-is (not reduced)."""
cfg = solve_calibration(
led_count=100,
start_position="top_left",
layout="clockwise",
corner_indices=[0, 25, 50, 75],
offset=100,
)
assert cfg.offset == 100
_roundtrip(cfg)
def test_large_offset_stored_verbatim(self):
"""offset >> led_count — stored verbatim, build_segments must not crash."""
cfg = solve_calibration(
led_count=60,
start_position="top_right",
layout="counterclockwise",
corner_indices=[0, 15, 30, 45],
offset=9999,
)
assert cfg.offset == 9999
_roundtrip(cfg)
def test_offset_does_not_change_edge_counts(self):
"""Two calls with different offsets must produce identical edge counts."""
kwargs = dict(
led_count=100,
start_position="bottom_left",
layout="counterclockwise",
corner_indices=[0, 25, 50, 75],
)
cfg_no_offset = solve_calibration(**kwargs, offset=0)
cfg_offset_13 = solve_calibration(**kwargs, offset=13)
assert cfg_no_offset.leds_top == cfg_offset_13.leds_top
assert cfg_no_offset.leds_right == cfg_offset_13.leds_right
assert cfg_no_offset.leds_bottom == cfg_offset_13.leds_bottom
assert cfg_no_offset.leds_left == cfg_offset_13.leds_left
# ---------------------------------------------------------------------------
# corner_indices >= led_count (modulo reduction)
# The solver applies % led_count to each index before computing counts.
# Index 150 for led_count=100 should behave identically to index 50.
# ---------------------------------------------------------------------------
class TestCornerIndicesModuloReduction:
"""Indices >= led_count are reduced modulo led_count before use."""
def test_indices_above_led_count_reduced(self):
"""corner_indices [0, 25, 50, 75] and [100, 125, 150, 175] must give same result."""
led_count = 100
cfg_base = solve_calibration(
led_count=led_count,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 25, 50, 75],
)
cfg_shifted = solve_calibration(
led_count=led_count,
start_position="bottom_left",
layout="clockwise",
corner_indices=[100, 125, 150, 175], # each + 100 (≡ same mod 100)
)
assert cfg_base.leds_left == cfg_shifted.leds_left
assert cfg_base.leds_top == cfg_shifted.leds_top
assert cfg_base.leds_right == cfg_shifted.leds_right
assert cfg_base.leds_bottom == cfg_shifted.leds_bottom
_roundtrip(cfg_shifted)
def test_index_exactly_led_count_is_zero(self):
"""Index = led_count reduces to 0, same as explicit 0."""
cfg_zero = solve_calibration(
led_count=100,
start_position="top_left",
layout="clockwise",
corner_indices=[0, 25, 50, 75],
)
cfg_hundred = solve_calibration(
led_count=100,
start_position="top_left",
layout="clockwise",
corner_indices=[100, 25, 50, 75], # first index = led_count → reduces to 0
)
assert cfg_zero.leds_top == cfg_hundred.leds_top
assert cfg_zero.leds_right == cfg_hundred.leds_right
assert cfg_zero.leds_bottom == cfg_hundred.leds_bottom
assert cfg_zero.leds_left == cfg_hundred.leds_left
# ---------------------------------------------------------------------------
# Invalid input: wrong number of corner_indices
# ---------------------------------------------------------------------------
class TestInvalidCornerIndicesLength:
"""Wrong number of corner indices must raise ValueError."""
def test_three_corners_raises(self):
with pytest.raises(ValueError):
solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 25, 50],
)
def test_five_corners_raises(self):
with pytest.raises(ValueError):
solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 20, 40, 60, 80],
)
def test_empty_corners_raises(self):
with pytest.raises(ValueError):
solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[],
)
def test_one_corner_raises(self):
with pytest.raises(ValueError):
solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[50],
)
# ---------------------------------------------------------------------------
# Invalid start_position / layout
# ---------------------------------------------------------------------------
class TestInvalidEnumInputs:
def test_invalid_start_position_raises(self):
with pytest.raises(ValueError, match="start_position"):
solve_calibration(
led_count=100,
start_position="center",
layout="clockwise",
corner_indices=[0, 25, 50, 75],
)
def test_invalid_layout_raises(self):
with pytest.raises(ValueError):
solve_calibration(
led_count=100,
start_position="bottom_left",
layout="diagonal",
corner_indices=[0, 25, 50, 75],
)
def test_both_invalid_raises(self):
with pytest.raises(ValueError):
solve_calibration(
led_count=100,
start_position="wrong",
layout="wrong",
corner_indices=[0, 25, 50, 75],
)
# ---------------------------------------------------------------------------
# Totals preservation invariant
# For any valid input, sum(edge_counts) == sum via build_segments()
# Test across all 8 combinations with a wrap-around partition.
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("start_position,layout", list(EDGE_ORDER.keys()))
def test_total_preservation_with_wraparound(start_position, layout):
"""For all 8 combinations with a wrap-around partition, total preserved."""
# The wrap-around partition: first 3 edges get 20 LEDs each, last edge
# gets the remaining 40 via wrap.
# EDGE_ORDER tells us walk order; corners: [0, 20, 40, 60] — last edge wraps to 40.
# bottom: 60→0 = (100-60)+0 = 40.
cfg = solve_calibration(
led_count=100,
start_position=start_position,
layout=layout,
corner_indices=[0, 20, 40, 60],
)
# Each of first 3 edges = 20, last = 40
total = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left
assert total == 100, f"{start_position}/{layout}: expected total=100, got {total}"
_roundtrip(cfg)