feat(calibration): auto edge-calibration backend core (phase 1)

Backend engine for guided LED-chase calibration, driven by the upcoming
auto-calibration UI (phase 3) and first-run wizard (phase 4).

- solve_calibration(): pure function mapping start corner + direction + 4
  corner-tap indices to per-edge LED counts, consistent with EDGE_ORDER/
  EDGE_REVERSE so it round-trips through build_segments().
- CalibrationChaseMixin.set_calibration_pixel(): light a specific LED index
  (+ optional window) on a device, reusing the device_test_mode idle-client
  send path.
- CalibrationSession: single-active session with start/position/stop/cancel,
  a 60s idle-timeout watchdog, and a concurrency lock so interleaved calls
  can't corrupt the stop/restore bookkeeping — start() stops + remembers any
  running target on the device and stop/cancel/timeout always restore it
  (never leaves the device dark or stuck in chase).
- Routes /api/v1/calibration/{session,session/position,session/stop,
  session/cancel,session/state,solve} (all AuthRequired, bounds-validated);
  calibration is persisted by reusing the existing PUT /color-strip-sources/
  {id} (hot-reloads running streams) rather than a duplicate endpoint.
- Tests: 19 solver pure-logic + 19 route/bounds. docs/API.md updated.

Part of the edge-calibration + first-run-wizard feature (Big Bang; intermediate
phase — full build/suite gated at the final phase).
This commit is contained in:
2026-06-08 14:59:58 +03:00
parent 6180569b10
commit 0409cd8b66
9 changed files with 1584 additions and 2 deletions
@@ -0,0 +1,354 @@
"""Happy-path and bounds-validation tests for calibration API routes.
Runs with the full app test-client stack but mocks the ProcessorManager
so no real LED devices are required.
Note: Deep adversarial coverage is deferred to the Phase 4 test-writer
(Big Bang strategy).
"""
from __future__ import annotations
import pytest
import pytest_asyncio
from unittest.mock import AsyncMock, MagicMock
from fastapi import FastAPI
from httpx import AsyncClient, ASGITransport
from ledgrab.api.routes.calibration import router
from ledgrab.core.capture.calibration_session import get_calibration_session
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture()
def mock_manager() -> MagicMock:
"""A minimal fake ProcessorManager."""
mgr = MagicMock()
# Simulate a registered device with 100 LEDs
ds = MagicMock()
ds.led_count = 100
mgr._devices = {"dev1": ds}
mgr.get_processing_target_for_device = MagicMock(return_value=None)
mgr.stop_processing = AsyncMock()
mgr.start_processing = AsyncMock()
mgr.send_clear_pixels = AsyncMock()
mgr.set_calibration_pixel = AsyncMock()
return mgr
@pytest_asyncio.fixture(autouse=True)
async def reset_session():
"""Reset the module-level CalibrationSession singleton before each test."""
import asyncio
def _clear(session) -> None:
session._active = False
session._device_id = None
session._led_count = 0
session._prior_target_id = None
session._last_activity = None
session._manager = None
# Reset lock so a test that aborted mid-await doesn't leave it locked
session._lock = asyncio.Lock()
session = get_calibration_session()
# Cancel any leftover watchdog task before clearing
if session._timeout_task and not session._timeout_task.done():
session._timeout_task.cancel()
try:
await session._timeout_task
except Exception:
pass
session._timeout_task = None
_clear(session)
yield
# Cleanup after test
if session._timeout_task and not session._timeout_task.done():
session._timeout_task.cancel()
try:
await session._timeout_task
except Exception:
pass
session._timeout_task = None
_clear(session)
@pytest.fixture()
def app(mock_manager: MagicMock) -> FastAPI:
"""Tiny FastAPI app with only the calibration router and auth disabled."""
from fastapi import FastAPI
from ledgrab.api.auth import verify_api_key
from ledgrab.api import dependencies as deps_mod
_app = FastAPI()
_app.include_router(router)
# Override the underlying dependency that AuthRequired resolves to
_app.dependency_overrides[verify_api_key] = lambda: "test-token"
_app.dependency_overrides[deps_mod.get_processor_manager] = lambda: mock_manager
return _app
@pytest_asyncio.fixture()
async def client(app: FastAPI):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
yield c
# ---------------------------------------------------------------------------
# Session start
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_start_session_success(client: AsyncClient, mock_manager: MagicMock):
resp = await client.post("/api/v1/calibration/session", json={"device_id": "dev1"})
assert resp.status_code == 201
data = resp.json()
assert data["active"] is True
assert data["device_id"] == "dev1"
assert data["led_count"] == 100
mock_manager.send_clear_pixels.assert_awaited_once_with("dev1")
@pytest.mark.asyncio
async def test_start_session_unknown_device(client: AsyncClient):
resp = await client.post("/api/v1/calibration/session", json={"device_id": "does_not_exist"})
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# Session position
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_position_success(client: AsyncClient, mock_manager: MagicMock):
# Start session first
await client.post("/api/v1/calibration/session", json={"device_id": "dev1"})
resp = await client.post(
"/api/v1/calibration/session/position", json={"index": 42, "window": 2}
)
assert resp.status_code == 200
data = resp.json()
assert data["active"] is True
mock_manager.set_calibration_pixel.assert_awaited_with("dev1", 42, window=2)
@pytest.mark.asyncio
async def test_position_out_of_range(client: AsyncClient, mock_manager: MagicMock):
"""index >= led_count → 400."""
await client.post("/api/v1/calibration/session", json={"device_id": "dev1"})
resp = await client.post(
"/api/v1/calibration/session/position", json={"index": 100, "window": 1}
)
assert resp.status_code == 400
@pytest.mark.asyncio
async def test_position_negative_index_422(client: AsyncClient):
"""index < 0 → Pydantic 422."""
resp = await client.post(
"/api/v1/calibration/session/position", json={"index": -1, "window": 1}
)
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_position_no_active_session(client: AsyncClient):
"""Calling position without starting a session → 400."""
resp = await client.post("/api/v1/calibration/session/position", json={"index": 5, "window": 1})
assert resp.status_code == 400
# ---------------------------------------------------------------------------
# Session stop
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_stop_session_clears_device(client: AsyncClient, mock_manager: MagicMock):
await client.post("/api/v1/calibration/session", json={"device_id": "dev1"})
resp = await client.post("/api/v1/calibration/session/stop")
assert resp.status_code == 200
data = resp.json()
assert data["active"] is False
# send_clear_pixels called at start AND at stop
assert mock_manager.send_clear_pixels.await_count == 2
@pytest.mark.asyncio
async def test_stop_restores_prior_target(client: AsyncClient, mock_manager: MagicMock):
"""When a target was running, stop should restart it."""
mock_manager.get_processing_target_for_device = MagicMock(return_value="tgt1")
await client.post("/api/v1/calibration/session", json={"device_id": "dev1"})
await client.post("/api/v1/calibration/session/stop")
mock_manager.start_processing.assert_awaited_once_with("tgt1")
@pytest.mark.asyncio
async def test_stop_no_active_session_is_ok(client: AsyncClient):
"""stop when inactive → 200 with active=False."""
resp = await client.post("/api/v1/calibration/session/stop")
assert resp.status_code == 200
assert resp.json()["active"] is False
# ---------------------------------------------------------------------------
# Session state
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_state_inactive(client: AsyncClient):
resp = await client.get("/api/v1/calibration/session/state")
assert resp.status_code == 200
assert resp.json()["active"] is False
@pytest.mark.asyncio
async def test_get_state_active(client: AsyncClient, mock_manager: MagicMock):
await client.post("/api/v1/calibration/session", json={"device_id": "dev1"})
resp = await client.get("/api/v1/calibration/session/state")
assert resp.status_code == 200
assert resp.json()["active"] is True
# ---------------------------------------------------------------------------
# Solve endpoint
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_solve_with_device_id(client: AsyncClient):
resp = await client.post(
"/api/v1/calibration/solve",
json={
"device_id": "dev1",
"start_position": "bottom_left",
"layout": "clockwise",
"corner_indices": [0, 30, 60, 80],
},
)
assert resp.status_code == 200
data = resp.json()
assert data["mode"] == "simple"
# bottom_left/clockwise EDGE_ORDER: left, top, right, bottom
# left=30, top=30, right=20, bottom=20 → total=100
assert data["leds_left"] == 30
assert data["leds_top"] == 30
assert data["leds_right"] == 20
assert data["leds_bottom"] == 20
assert data["layout"] == "clockwise"
assert data["start_position"] == "bottom_left"
@pytest.mark.asyncio
async def test_solve_with_led_count(client: AsyncClient):
resp = await client.post(
"/api/v1/calibration/solve",
json={
"led_count": 80,
"start_position": "top_left",
"layout": "clockwise",
"corner_indices": [0, 20, 40, 60],
},
)
assert resp.status_code == 200
data = resp.json()
assert sum([data["leds_top"], data["leds_right"], data["leds_bottom"], data["leds_left"]]) == 80
@pytest.mark.asyncio
async def test_solve_missing_device_and_led_count(client: AsyncClient):
"""Omitting both device_id and led_count → 422 (model validator)."""
resp = await client.post(
"/api/v1/calibration/solve",
json={
"start_position": "bottom_left",
"layout": "clockwise",
"corner_indices": [0, 25, 50, 75],
},
)
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_solve_unknown_device(client: AsyncClient):
resp = await client.post(
"/api/v1/calibration/solve",
json={
"device_id": "no_such_device",
"start_position": "bottom_left",
"layout": "clockwise",
"corner_indices": [0, 25, 50, 75],
},
)
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_solve_invalid_start_position_422(client: AsyncClient):
resp = await client.post(
"/api/v1/calibration/solve",
json={
"led_count": 100,
"start_position": "invalid_corner",
"layout": "clockwise",
"corner_indices": [0, 25, 50, 75],
},
)
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_solve_invalid_layout_422(client: AsyncClient):
resp = await client.post(
"/api/v1/calibration/solve",
json={
"led_count": 100,
"start_position": "bottom_left",
"layout": "diagonal",
"corner_indices": [0, 25, 50, 75],
},
)
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_solve_wrong_corner_count_422(client: AsyncClient):
"""Only 3 corner indices → 422 (min_length=4)."""
resp = await client.post(
"/api/v1/calibration/solve",
json={
"led_count": 100,
"start_position": "bottom_left",
"layout": "clockwise",
"corner_indices": [0, 25, 50],
},
)
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_solve_with_offset(client: AsyncClient):
resp = await client.post(
"/api/v1/calibration/solve",
json={
"led_count": 100,
"start_position": "bottom_left",
"layout": "clockwise",
"corner_indices": [0, 25, 50, 75],
"offset": 7,
},
)
assert resp.status_code == 200
assert resp.json()["offset"] == 7
@@ -0,0 +1,315 @@
"""Unit tests for solve_calibration() — pure logic, runs in isolation.
Tests cover:
- All 8 (start_position × layout) combinations
- 0-LED edge (two corners tapped adjacent)
- offset pass-through
- Round-trip through build_segments()
- Wrap-around (corner_indices straddle the 0/led_count boundary)
"""
import pytest
from ledgrab.core.capture.calibration import (
EDGE_ORDER,
CalibrationConfig,
solve_calibration,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _assert_roundtrip(cfg: CalibrationConfig) -> None:
"""build_segments() must not crash and must cover the expected LED count."""
segs = cfg.build_segments()
total_from_segs = sum(s.led_count for s in segs)
expected = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left
assert total_from_segs == expected, (
f"Segment total {total_from_segs} != field total {expected} " f"for cfg={cfg!r}"
)
def _edge_counts(cfg: CalibrationConfig) -> dict[str, int]:
return {
"top": cfg.leds_top,
"right": cfg.leds_right,
"bottom": cfg.leds_bottom,
"left": cfg.leds_left,
}
# ---------------------------------------------------------------------------
# Basic: bottom_left / clockwise (canonical case)
# ---------------------------------------------------------------------------
class TestBottomLeftClockwise:
"""start_position=bottom_left, layout=clockwise.
EDGE_ORDER: ["left", "top", "right", "bottom"]
Strip walk: LED 0 is at bottom-left corner, goes UP the left edge,
across the top, DOWN the right, and back along the bottom.
Corner indices for a 100-LED, 20/30/20/30 (L/T/R/B) layout:
bottom_left -> 0
top_left -> 20 (after left edge)
top_right -> 50 (after top edge)
bottom_right -> 70 (after right edge)
"""
START = "bottom_left"
LAYOUT = "clockwise"
LED_COUNT = 100
def _make_corner_indices(self) -> list[int]:
# left=20, top=30, right=20, bottom=30
return [0, 20, 50, 70] # BL, TL, TR, BR
def test_basic_counts(self):
cfg = solve_calibration(
led_count=self.LED_COUNT,
start_position=self.START,
layout=self.LAYOUT,
corner_indices=self._make_corner_indices(),
)
counts = _edge_counts(cfg)
assert counts["left"] == 20
assert counts["top"] == 30
assert counts["right"] == 20
assert counts["bottom"] == 30
def test_start_position_preserved(self):
cfg = solve_calibration(
led_count=self.LED_COUNT,
start_position=self.START,
layout=self.LAYOUT,
corner_indices=self._make_corner_indices(),
)
assert cfg.start_position == self.START
def test_layout_preserved(self):
cfg = solve_calibration(
led_count=self.LED_COUNT,
start_position=self.START,
layout=self.LAYOUT,
corner_indices=self._make_corner_indices(),
)
assert cfg.layout == self.LAYOUT
def test_roundtrip(self):
cfg = solve_calibration(
led_count=self.LED_COUNT,
start_position=self.START,
layout=self.LAYOUT,
corner_indices=self._make_corner_indices(),
)
_assert_roundtrip(cfg)
def test_offset_passthrough(self):
cfg = solve_calibration(
led_count=self.LED_COUNT,
start_position=self.START,
layout=self.LAYOUT,
corner_indices=self._make_corner_indices(),
offset=5,
)
assert cfg.offset == 5
_assert_roundtrip(cfg)
# ---------------------------------------------------------------------------
# All 8 combinations: smoke test (round-trip + total == led_count)
# ---------------------------------------------------------------------------
ALL_CORNERS: dict[str, list[str]] = {
# start_position: [BL, TL, TR, BR] corners in the order they appear on the strip
# for layout=clockwise. We use 100 LEDs with 25 per edge for simplicity.
"bottom_left": ["BL", "TL", "TR", "BR"],
"top_left": ["TL", "TR", "BR", "BL"],
"top_right": ["TR", "BR", "BL", "TL"],
"bottom_right": ["BR", "BL", "TL", "TR"],
}
# For each start_position × layout, what are the 4 corner indices
# when all edges have 25 LEDs (100 total)?
# EDGE_ORDER for (start, "clockwise") gives the edge walk sequence.
# We map corner names to indices by placing them at the boundaries.
def _corner_indices_25_each(start_position: str, layout: str) -> list[int]:
"""
Build corner indices assuming all 4 edges have exactly 25 LEDs.
Returns [start_corner, second_corner, third_corner, fourth_corner]
following the strip walk order defined by EDGE_ORDER.
The corners of the screen are:
top_left=TL, top_right=TR, bottom_left=BL, bottom_right=BR
Each edge start-corner is at the leading edge index; its end-corner
is at that index + led_count of that edge (mod 100).
"""
key = (start_position, layout)
order = EDGE_ORDER[key] # e.g. ["left","top","right","bottom"]
# Map edge names to their start and end screen corners
# Corner positions: start corner of each edge in strip order
result = []
led_pos = 0
for edge in order:
result.append(led_pos)
led_pos += 25
return result
@pytest.mark.parametrize("start_position", list(EDGE_ORDER))
def test_all_combinations_roundtrip_25_each(start_position):
"""All 8 (start, layout) combos with 25 LEDs/edge must round-trip."""
start_pos_str, layout = start_position # unpack tuple key
indices = _corner_indices_25_each(start_pos_str, layout)
cfg = solve_calibration(
led_count=100,
start_position=start_pos_str,
layout=layout,
corner_indices=indices,
)
counts = _edge_counts(cfg)
assert (
sum(counts.values()) == 100
), f"{start_pos_str}/{layout}: total LEDs {sum(counts.values())} != 100"
assert all(
v == 25 for v in counts.values()
), f"{start_pos_str}/{layout}: edge counts {counts} not all 25"
_assert_roundtrip(cfg)
# ---------------------------------------------------------------------------
# 0-LED edge: two corners tapped adjacent (one edge has 0 LEDs)
# ---------------------------------------------------------------------------
class TestZeroLedEdge:
"""When two consecutive corner taps are the same index, that edge has 0 LEDs."""
def test_zero_bottom_edge(self):
"""
bottom_left / clockwise, 100 LEDs.
EDGE_ORDER: left, top, right, bottom
Tap top-left and bottom-right at the same index → bottom edge = 0
We place BL=0, TL=40, TR=70, BR=70 (top=30, right=0 would be wrong;
let's use BL=0, TL=25, TR=65, BR=90 for bottom=10, then make left=right=40)
Actually: make right edge 0: BL=0, TL=40, TR=60, BR=60
"""
# EDGE_ORDER for bottom_left/clockwise: ["left","top","right","bottom"]
# Strip indices: left 0..39 (40 LEDs), top 40..59 (20 LEDs), right 60..59 (0 LEDs!), bottom 60..99 (40 LEDs)
cfg = solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 40, 60, 60], # BL, TL, TR, BR — right=0
)
counts = _edge_counts(cfg)
assert counts["left"] == 40
assert counts["top"] == 20
assert counts["right"] == 0
assert counts["bottom"] == 40
assert sum(counts.values()) == 100
_assert_roundtrip(cfg)
def test_zero_first_edge(self):
"""First edge (left) can also be 0 if corners 0 and 1 are the same."""
# EDGE_ORDER bottom_left/clockwise: ["left","top","right","bottom"]
# If BL==TL, left edge has 0 LEDs
cfg = solve_calibration(
led_count=60,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 0, 20, 40], # BL=TL, left=0
)
counts = _edge_counts(cfg)
assert counts["left"] == 0
assert counts["top"] == 20
assert counts["right"] == 20
assert counts["bottom"] == 20
assert sum(counts.values()) == 60
_assert_roundtrip(cfg)
# ---------------------------------------------------------------------------
# Wrap-around: last corner index < first (straddles the 0 boundary)
# ---------------------------------------------------------------------------
class TestWrapAround:
"""When the strip wraps: the last segment spans from some index to led_count,
then continues from 0 to the start corner. This can happen if the user
provides indices that wrap around the physical end of the strip.
"""
def test_wrap_around_bottom_edge(self):
"""
bottom_left / clockwise, 100 LEDs.
EDGE_ORDER: left, top, right, bottom.
If the user taps: BL=80, TL=10, TR=40, BR=60 (wraps)
-> left: 80..10 = (100-80)+10 = 30
-> top: 10..40 = 30
-> right:40..60 = 20
-> bottom:60..80 = 20
"""
cfg = solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[80, 10, 40, 60],
)
counts = _edge_counts(cfg)
assert counts["left"] == 30
assert counts["top"] == 30
assert counts["right"] == 20
assert counts["bottom"] == 20
assert sum(counts.values()) == 100
_assert_roundtrip(cfg)
# ---------------------------------------------------------------------------
# Offset
# ---------------------------------------------------------------------------
class TestOffset:
def test_offset_stored_correctly(self):
cfg = solve_calibration(
led_count=100,
start_position="top_left",
layout="clockwise",
corner_indices=[0, 25, 50, 75],
offset=10,
)
assert cfg.offset == 10
_assert_roundtrip(cfg)
def test_offset_default_zero(self):
cfg = solve_calibration(
led_count=100,
start_position="top_left",
layout="clockwise",
corner_indices=[0, 25, 50, 75],
)
assert cfg.offset == 0
# ---------------------------------------------------------------------------
# Mode is always "simple"
# ---------------------------------------------------------------------------
def test_solve_returns_simple_mode():
cfg = solve_calibration(
led_count=80,
start_position="top_right",
layout="counterclockwise",
corner_indices=[0, 20, 40, 60],
)
assert cfg.mode == "simple"