fix(setup): register scaffolded target with ProcessorManager + final-review hardening

Final-review blocker: the setup scaffold created the LED output target in the
store but never registered it with the ProcessorManager, so the wizard's
"Start" step 404'd on a fresh setup (target not found) — the lights never
started despite a success screen. Now the scaffold calls
target.register_with_manager(manager) right after create (mirroring the
canonical POST /output-targets route, same ValueError guard), so
start_processing finds the target. Rollback unregisters via
manager.remove_target before deleting the store entity, so a post-registration
failure leaves no half-registered target.

Also from the final review:
- solve corner_indices elements now bounded ge=0 (clear 422 instead of silent
  modulo-wrap).
- setup-wizard.ts: reuse tutorials' suppressGettingStartedTour()/TOUR_KEY
  instead of a duplicated 'tour_completed' literal; drop a duplicate manual-form
  submit listener.

Tests: + adversarial pass over the whole feature (solver/session/scaffold edge
cases) and a scaffold->register->startable regression test. Full suite
2149 passed / 2 skipped; tsc clean; build passes; ruff clean.
This commit is contained in:
2026-06-08 16:55:36 +03:00
parent 81b18089e1
commit 6cd5e057da
7 changed files with 1730 additions and 12 deletions
+31
View File
@@ -36,11 +36,13 @@ from ledgrab.api.dependencies import (
get_device_store, get_device_store,
get_output_target_store, get_output_target_store,
get_picture_source_store, get_picture_source_store,
get_processor_manager,
get_template_store, get_template_store,
) )
from ledgrab.api.schemas.setup import ScaffoldRequest, ScaffoldResponse from ledgrab.api.schemas.setup import ScaffoldRequest, ScaffoldResponse
from ledgrab.core.capture.calibration import calibration_from_dict, create_default_calibration from ledgrab.core.capture.calibration import calibration_from_dict, create_default_calibration
from ledgrab.core.capture_engines.factory import EngineRegistry from ledgrab.core.capture_engines.factory import EngineRegistry
from ledgrab.core.processing.processor_manager import ProcessorManager
from ledgrab.storage.base_store import EntityNotFoundError from ledgrab.storage.base_store import EntityNotFoundError
from ledgrab.storage.color_strip_store import ColorStripStore from ledgrab.storage.color_strip_store import ColorStripStore
from ledgrab.storage.output_target_store import OutputTargetStore from ledgrab.storage.output_target_store import OutputTargetStore
@@ -116,11 +118,16 @@ def _rollback(
picture_source_store: PictureSourceStore, picture_source_store: PictureSourceStore,
css_store: ColorStripStore, css_store: ColorStripStore,
output_target_store: OutputTargetStore, output_target_store: OutputTargetStore,
manager: ProcessorManager | None = None,
) -> None: ) -> None:
"""Delete entities created during this call, in reverse order. """Delete entities created during this call, in reverse order.
Only entities listed in ``created_ids`` are deleted; reused/pre-existing Only entities listed in ``created_ids`` are deleted; reused/pre-existing
entities (including the device) are never touched. entities (including the device) are never touched.
If *manager* is provided, any ``output_target`` entity in the rollback set
is also unregistered from the ProcessorManager before store deletion, so no
half-registered target is left behind.
""" """
store_map: dict[str, Any] = { store_map: dict[str, Any] = {
"capture_template": template_store, "capture_template": template_store,
@@ -129,6 +136,18 @@ def _rollback(
"output_target": output_target_store, "output_target": output_target_store,
} }
for entity_type, entity_id in reversed(created_ids): for entity_type, entity_id in reversed(created_ids):
# Unregister output targets from the processor manager first
if entity_type == "output_target" and manager is not None:
try:
manager.remove_target(entity_id)
logger.info("Scaffold rollback: unregistered target %s from manager", entity_id)
except (ValueError, RuntimeError) as exc:
logger.debug(
"Scaffold rollback: manager unregister skipped for %s%s",
entity_id,
exc,
)
store = store_map.get(entity_type) store = store_map.get(entity_type)
if store is None: if store is None:
logger.warning("Scaffold rollback: unknown entity type %r — skipping", entity_type) logger.warning("Scaffold rollback: unknown entity type %r — skipping", entity_type)
@@ -164,6 +183,7 @@ async def scaffold_setup(
picture_source_store: PictureSourceStore = Depends(get_picture_source_store), picture_source_store: PictureSourceStore = Depends(get_picture_source_store),
css_store: ColorStripStore = Depends(get_color_strip_store), css_store: ColorStripStore = Depends(get_color_strip_store),
output_target_store: OutputTargetStore = Depends(get_output_target_store), output_target_store: OutputTargetStore = Depends(get_output_target_store),
manager: ProcessorManager = Depends(get_processor_manager),
) -> ScaffoldResponse: ) -> ScaffoldResponse:
"""Create a ready-to-start LED capture chain. """Create a ready-to-start LED capture chain.
@@ -192,6 +212,7 @@ async def scaffold_setup(
picture_source_store=picture_source_store, picture_source_store=picture_source_store,
css_store=css_store, css_store=css_store,
output_target_store=output_target_store, output_target_store=output_target_store,
manager=manager,
) )
try: try:
@@ -269,6 +290,16 @@ async def scaffold_setup(
pending_events.append(("output_target", target.id)) pending_events.append(("output_target", target.id))
logger.info("Scaffold: created output target %s", target.id) logger.info("Scaffold: created output target %s", target.id)
# ── Step 5b: register target with ProcessorManager ───────────────────
try:
target.register_with_manager(manager)
except ValueError as exc:
logger.warning(
"Scaffold: could not register target %s in processor manager: %s",
target.id,
exc,
)
except HTTPException: except HTTPException:
_rollback(created_ids, **rollback_stores) _rollback(created_ids, **rollback_stores)
raise raise
@@ -1,6 +1,6 @@
"""Pydantic schemas for the calibration session and solver API.""" """Pydantic schemas for the calibration session and solver API."""
from typing import List, Literal from typing import Annotated, List, Literal
from pydantic import BaseModel, Field, model_validator from pydantic import BaseModel, Field, model_validator
@@ -65,14 +65,15 @@ class CalibrationSolveRequest(BaseModel):
layout: Literal["clockwise", "counterclockwise"] = Field( layout: Literal["clockwise", "counterclockwise"] = Field(
description="Winding direction of the strip" description="Winding direction of the strip"
) )
corner_indices: List[int] = Field( corner_indices: List[Annotated[int, Field(ge=0)]] = Field(
description=( description=(
"Four strip indices — one per screen corner — in the strip-walk order " "Four strip indices — one per screen corner — in the strip-walk order "
"defined by (start_position, layout). Index 0 of the strip is the " "defined by (start_position, layout). Index 0 of the strip is the "
"start corner; the four tap positions are recorded in strip order " "start corner; the four tap positions are recorded in strip order "
"beginning from that start corner (the solver lays edges out from " "beginning from that start corner (the solver lays edges out from "
"led_start=0, so a non-zero physical start would require the `offset` " "led_start=0, so a non-zero physical start would require the `offset` "
"field rather than a shifted corner_indices[0])." "field rather than a shifted corner_indices[0]). Each element must be "
"non-negative (ge=0); out-of-range values yield a 422."
), ),
min_length=4, min_length=4,
max_length=4, max_length=4,
@@ -27,6 +27,7 @@ import { t } from '../core/i18n.ts';
import { showToast } from '../core/ui.ts'; import { showToast } from '../core/ui.ts';
import { Modal } from '../core/modal.ts'; import { Modal } from '../core/modal.ts';
import { mountAutoCalibration, unmountAutoCalibration } from './auto-calibration.ts'; import { mountAutoCalibration, unmountAutoCalibration } from './auto-calibration.ts';
import { suppressGettingStartedTour } from './tutorials.ts';
import { import {
ICON_MONITOR, ICON_SPARKLES, ICON_DEVICE, ICON_OK, ICON_CHECK, ICON_ROCKET_ICON, ICON_MONITOR, ICON_SPARKLES, ICON_DEVICE, ICON_OK, ICON_CHECK, ICON_ROCKET_ICON,
ICON_CALIBRATION, ICON_START, ICON_SEARCH, ICON_PLUS, ICON_CALIBRATION, ICON_START, ICON_SEARCH, ICON_PLUS,
@@ -76,8 +77,6 @@ interface WizardState {
let _state: WizardState | null = null; let _state: WizardState | null = null;
let _modal: SetupWizardModal | null = null; let _modal: SetupWizardModal | null = null;
const ONBOARDED_KEY = 'tour_completed'; // mirror tutorials.ts TOUR_KEY
const STEPS: WizardStep[] = ['welcome', 'device', 'display', 'scaffold', 'calibrate', 'start', 'done']; const STEPS: WizardStep[] = ['welcome', 'device', 'display', 'scaffold', 'calibrate', 'start', 'done'];
// ── Modal class ──────────────────────────────────────────────────────────────── // ── Modal class ────────────────────────────────────────────────────────────────
@@ -167,7 +166,7 @@ async function _markOnboarded(): Promise<void> {
try { try {
await apiPut('/preferences/onboarding', { onboarded: true }); await apiPut('/preferences/onboarding', { onboarded: true });
// Suppress tooltip tour too — wizard owns the first-run experience // Suppress tooltip tour too — wizard owns the first-run experience
localStorage.setItem(ONBOARDED_KEY, '1'); suppressGettingStartedTour();
} catch { } catch {
// Non-fatal: UI already moved on // Non-fatal: UI already moved on
} }
@@ -770,11 +769,9 @@ function _buildDoneStep(state: WizardState): string {
</div>`; </div>`;
} }
function _attachStepListeners(step: WizardStep): void { function _attachStepListeners(_step: WizardStep): void {
if (step === 'device') { // The manual device form uses onsubmit="wizardAddManualDevice(event)" inline —
const form = document.getElementById('wizard-manual-form'); // no duplicate addEventListener needed here.
if (form) form.addEventListener('submit', (e) => void wizardAddManualDevice(e));
}
} }
// ───────────────────────────────────────────────────────────────────────────── // ─────────────────────────────────────────────────────────────────────────────
+87 -1
View File
@@ -21,7 +21,7 @@ from __future__ import annotations
import pytest import pytest
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from unittest.mock import patch from unittest.mock import MagicMock, patch
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -89,6 +89,14 @@ def event_log():
return log return log
@pytest.fixture
def mock_manager():
"""A MagicMock ProcessorManager that silently accepts all calls."""
mgr = MagicMock()
mgr.remove_target = MagicMock()
return mgr
@pytest.fixture @pytest.fixture
def setup_client( def setup_client(
tmp_db, tmp_db,
@@ -98,6 +106,7 @@ def setup_client(
css_store, css_store,
output_target_store, output_target_store,
event_log, event_log,
mock_manager,
): ):
from ledgrab.api.routes.setup import router from ledgrab.api.routes.setup import router
from ledgrab.api.auth import verify_api_key from ledgrab.api.auth import verify_api_key
@@ -111,6 +120,7 @@ def setup_client(
app.dependency_overrides[deps.get_picture_source_store] = lambda: picture_source_store app.dependency_overrides[deps.get_picture_source_store] = lambda: picture_source_store
app.dependency_overrides[deps.get_color_strip_store] = lambda: css_store app.dependency_overrides[deps.get_color_strip_store] = lambda: css_store
app.dependency_overrides[deps.get_output_target_store] = lambda: output_target_store app.dependency_overrides[deps.get_output_target_store] = lambda: output_target_store
app.dependency_overrides[deps.get_processor_manager] = lambda: mock_manager
# Capture entity events # Capture entity events
def _fire(entity_type, action, entity_id): def _fire(entity_type, action, entity_id):
@@ -512,3 +522,79 @@ class TestScaffoldCalibrationIntegration:
assert updated.calibration.leds_top == 15 assert updated.calibration.leds_top == 15
assert updated.calibration.leds_right == 9 assert updated.calibration.leds_right == 9
assert updated.calibration.layout == "clockwise" assert updated.calibration.layout == "clockwise"
# ---------------------------------------------------------------------------
# Regression: scaffold registers the output target with ProcessorManager
# ---------------------------------------------------------------------------
class TestScaffoldRegistersWithManager:
def test_scaffold_calls_add_target_on_manager(
self,
setup_client,
sample_device,
mock_manager,
):
"""Blocker regression: scaffold must register the created output target
with the ProcessorManager so that a subsequent start call can find it.
WledOutputTarget.register_with_manager calls manager.add_target(...)
— we assert that method was invoked with the scaffolded target id.
"""
resp = _scaffold(setup_client, device_id=sample_device.id)
assert resp.status_code == 201, resp.text
target_id = resp.json()["output_target_id"]
# register_with_manager → manager.add_target(target_id=..., ...)
mock_manager.add_target.assert_called_once()
call_kwargs = mock_manager.add_target.call_args
assert call_kwargs.kwargs.get("target_id") == target_id, (
f"manager.add_target was not called with target_id={target_id!r}; "
f"actual call: {call_kwargs}"
)
def test_scaffold_rollback_calls_remove_target_on_manager(
self,
setup_client,
sample_device,
output_target_store,
mock_manager,
):
"""Rollback after a post-target-creation failure must call manager.remove_target
so no half-registered target lingers in the ProcessorManager.
We inject a failure by making register_with_manager raise RuntimeError
(bypassing the ValueError-only guard), which puts the outer except branch
in play. The target IS already in created_ids at that point, so rollback
must call manager.remove_target(target_id).
"""
created_target_ids: list[str] = []
original_create = output_target_store.create_wled_target
def _spy_create(*args, **kwargs):
target = original_create(*args, **kwargs)
created_target_ids.append(target.id)
return target
output_target_store.create_wled_target = _spy_create
try:
# Patch register_with_manager on WledOutputTarget to raise RuntimeError —
# RuntimeError bypasses the ValueError guard and triggers the outer except,
# so rollback fires with the target already in created_ids.
with patch(
"ledgrab.storage.wled_output_target.WledOutputTarget.register_with_manager",
side_effect=RuntimeError("Injected registration failure for rollback test"),
):
resp = setup_client.post(
"/api/v1/setup/scaffold",
json={"device_id": sample_device.id, "display_index": 0},
)
assert resp.status_code == 500, resp.text
assert len(created_target_ids) == 1, "spy did not record a created target"
target_id = created_target_ids[0]
mock_manager.remove_target.assert_called_with(target_id)
finally:
output_target_store.create_wled_target = original_create
@@ -0,0 +1,506 @@
"""Adversarial tests for the setup scaffold and onboarding preference endpoints.
Phase 2 acceptance criteria (NOT what the code happens to do):
- Rollback when the FINAL step (output target create) fails leaves ZERO orphans
AND emits ZERO "created" events.
- Reused capture template is NOT deleted on rollback.
- display_index > 63 → 422.
- Missing device_id → 422 (Pydantic validation before handler runs).
- Unknown device_id → 404.
- PUT onboarding false clears completed_at to null.
- Corrupt stored onboarding value falls back to default (onboarded=false).
These fill the gaps in the existing 22 happy-path tests.
"""
from __future__ import annotations
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from unittest.mock import MagicMock, patch
# ---------------------------------------------------------------------------
# Shared fixtures (mirrors test_setup_routes.py exactly)
# ---------------------------------------------------------------------------
@pytest.fixture
def tmp_db(tmp_path):
from ledgrab.storage.database import Database
db = Database(tmp_path / "adv_setup.db")
yield db
db.close()
@pytest.fixture
def device_store(tmp_db):
from ledgrab.storage import DeviceStore
return DeviceStore(tmp_db)
@pytest.fixture
def template_store(tmp_db):
from ledgrab.storage.template_store import TemplateStore
return TemplateStore(tmp_db)
@pytest.fixture
def picture_source_store(tmp_db):
from ledgrab.storage.picture_source_store import PictureSourceStore
return PictureSourceStore(tmp_db)
@pytest.fixture
def css_store(tmp_db):
from ledgrab.storage.color_strip_store import ColorStripStore
return ColorStripStore(tmp_db)
@pytest.fixture
def output_target_store(tmp_db):
from ledgrab.storage.output_target_store import OutputTargetStore
return OutputTargetStore(tmp_db)
@pytest.fixture
def sample_device(device_store):
return device_store.create_device(
name="Test LED Strip",
url="http://192.168.1.10",
led_count=60,
)
@pytest.fixture
def event_log():
log = []
return log
@pytest.fixture
def mock_manager():
"""A MagicMock ProcessorManager that silently accepts all calls."""
mgr = MagicMock()
mgr.remove_target = MagicMock()
return mgr
@pytest.fixture
def setup_client(
tmp_db,
device_store,
template_store,
picture_source_store,
css_store,
output_target_store,
event_log,
mock_manager,
):
from ledgrab.api.routes.setup import router
from ledgrab.api.auth import verify_api_key
from ledgrab.api import dependencies as deps
app = FastAPI()
app.include_router(router)
app.dependency_overrides[verify_api_key] = lambda: "test"
app.dependency_overrides[deps.get_device_store] = lambda: device_store
app.dependency_overrides[deps.get_template_store] = lambda: template_store
app.dependency_overrides[deps.get_picture_source_store] = lambda: picture_source_store
app.dependency_overrides[deps.get_color_strip_store] = lambda: css_store
app.dependency_overrides[deps.get_output_target_store] = lambda: output_target_store
app.dependency_overrides[deps.get_processor_manager] = lambda: mock_manager
def _fire(entity_type, action, entity_id):
event_log.append((entity_type, action, entity_id))
with patch("ledgrab.api.routes.setup.fire_entity_event", side_effect=_fire):
yield TestClient(app, raise_server_exceptions=False)
def _scaffold(client, **overrides):
body = {"display_index": 0, **overrides}
return client.post("/api/v1/setup/scaffold", json=body)
# ---------------------------------------------------------------------------
# Rollback when the FINAL step (output target) fails
# Criteria: "zero orphans AND zero 'created' events"
# ---------------------------------------------------------------------------
class TestFinalStepRollback:
def test_final_step_failure_leaves_zero_orphans(
self,
setup_client,
sample_device,
picture_source_store,
css_store,
output_target_store,
):
"""output_target_store.create_wled_target failing leaves NO orphaned entities."""
original_create = output_target_store.create_wled_target
def _fail(*args, **kwargs):
raise ValueError("Injected final step failure")
output_target_store.create_wled_target = _fail
try:
resp = _scaffold(setup_client, device_id=sample_device.id)
assert resp.status_code in (
400,
500,
), f"Expected 4xx/5xx on final-step failure, got {resp.status_code}: {resp.text}"
# Zero picture sources remaining
remaining_ps = picture_source_store.get_all_streams()
assert len(remaining_ps) == 0, f"Picture sources not rolled back: {remaining_ps}"
# Zero color-strip sources remaining
remaining_css = css_store.get_all_sources()
assert len(remaining_css) == 0, f"Color-strip sources not rolled back: {remaining_css}"
# Zero output targets (trivially true since creation was never reached,
# but included for completeness)
remaining_ot = output_target_store.get_all_targets()
assert len(remaining_ot) == 0, f"Output targets not rolled back: {remaining_ot}"
finally:
output_target_store.create_wled_target = original_create
def test_final_step_failure_emits_zero_created_events(
self,
setup_client,
sample_device,
output_target_store,
event_log,
):
"""No 'created' events must be emitted when the final step fails (deferred-event contract)."""
original_create = output_target_store.create_wled_target
def _fail(*args, **kwargs):
raise ValueError("Final step injected failure")
output_target_store.create_wled_target = _fail
try:
resp = _scaffold(setup_client, device_id=sample_device.id)
assert resp.status_code in (400, 500)
created_events = [(et, act) for et, act, _ in event_log if act == "created"]
assert (
created_events == []
), f"'created' events leaked on final-step rollback: {created_events}"
finally:
output_target_store.create_wled_target = original_create
def test_final_step_failure_reused_template_survives(
self,
setup_client,
sample_device,
template_store,
output_target_store,
):
"""A reused capture template must NOT be deleted when the final step fails."""
templates_before = {t.id for t in template_store.get_all_templates()}
original_create = output_target_store.create_wled_target
def _fail(*args, **kwargs):
raise ValueError("Final step injected failure")
output_target_store.create_wled_target = _fail
try:
_scaffold(setup_client, device_id=sample_device.id)
finally:
output_target_store.create_wled_target = original_create
templates_after = {t.id for t in template_store.get_all_templates()}
assert templates_before == templates_after, (
f"Template set changed after rollback: "
f"before={templates_before} after={templates_after}"
)
def test_final_step_failure_device_not_deleted(
self,
setup_client,
sample_device,
device_store,
output_target_store,
):
"""The pre-existing device must never be touched by rollback of any step."""
original_create = output_target_store.create_wled_target
def _fail(*args, **kwargs):
raise ValueError("Final step injected failure")
output_target_store.create_wled_target = _fail
try:
_scaffold(setup_client, device_id=sample_device.id)
finally:
output_target_store.create_wled_target = original_create
device = device_store.get(sample_device.id)
assert device is not None, "Pre-existing device was deleted during rollback"
# ---------------------------------------------------------------------------
# Validation: display_index bounds
# Criteria: display_index > 63 → 422; display_index 63 → 201; negative → 422
# ---------------------------------------------------------------------------
class TestDisplayIndexBounds:
def test_display_index_64_returns_422(self, setup_client, sample_device):
"""display_index=64 (one above max) must be rejected with 422."""
resp = _scaffold(setup_client, device_id=sample_device.id, display_index=64)
assert (
resp.status_code == 422
), f"Expected 422 for display_index=64, got {resp.status_code}: {resp.text}"
def test_display_index_63_returns_201(self, setup_client, sample_device):
"""display_index=63 is the maximum valid value — must be accepted."""
resp = _scaffold(setup_client, device_id=sample_device.id, display_index=63)
assert (
resp.status_code == 201
), f"Expected 201 for display_index=63, got {resp.status_code}: {resp.text}"
def test_display_index_0_returns_201(self, setup_client, sample_device):
"""display_index=0 is the minimum valid value."""
resp = _scaffold(setup_client, device_id=sample_device.id, display_index=0)
assert (
resp.status_code == 201
), f"Expected 201 for display_index=0, got {resp.status_code}: {resp.text}"
def test_display_index_negative_1_returns_422(self, setup_client, sample_device):
"""display_index=-1 must be rejected with 422."""
resp = _scaffold(setup_client, device_id=sample_device.id, display_index=-1)
assert (
resp.status_code == 422
), f"Expected 422 for display_index=-1, got {resp.status_code}: {resp.text}"
def test_display_index_very_large_returns_422(self, setup_client, sample_device):
"""display_index=10000 must be rejected with 422."""
resp = _scaffold(setup_client, device_id=sample_device.id, display_index=10000)
assert (
resp.status_code == 422
), f"Expected 422 for display_index=10000, got {resp.status_code}: {resp.text}"
# ---------------------------------------------------------------------------
# Validation: missing / unknown device_id
# ---------------------------------------------------------------------------
class TestDeviceValidation:
def test_missing_device_id_returns_422(self, setup_client):
"""Omitting device_id entirely must yield 422 (Pydantic required field)."""
resp = setup_client.post("/api/v1/setup/scaffold", json={"display_index": 0})
assert (
resp.status_code == 422
), f"Expected 422 for missing device_id, got {resp.status_code}: {resp.text}"
def test_empty_string_device_id_handled(self, setup_client):
"""device_id='' — should yield 404 (empty string not in device store) or 422."""
resp = _scaffold(setup_client, device_id="")
assert resp.status_code in (
404,
422,
), f"Expected 404 or 422 for empty device_id, got {resp.status_code}: {resp.text}"
def test_unknown_device_id_returns_404(self, setup_client):
"""device_id that does not exist in the store must yield 404."""
resp = _scaffold(setup_client, device_id="device_definitely_does_not_exist")
assert (
resp.status_code == 404
), f"Expected 404 for unknown device_id, got {resp.status_code}: {resp.text}"
def test_none_device_id_returns_422(self, setup_client):
"""device_id=null must yield 422 (not None-convertible to str)."""
resp = setup_client.post(
"/api/v1/setup/scaffold", json={"device_id": None, "display_index": 0}
)
assert (
resp.status_code == 422
), f"Expected 422 for null device_id, got {resp.status_code}: {resp.text}"
# ---------------------------------------------------------------------------
# Rollback idempotency: calling scaffold twice with the final step failing
# must still leave exactly zero orphans (no accumulation across calls).
# ---------------------------------------------------------------------------
class TestRollbackIdempotency:
def test_two_failed_scaffolds_leave_zero_orphans(
self,
setup_client,
sample_device,
picture_source_store,
css_store,
output_target_store,
):
"""Two sequential scaffold failures must not accumulate orphans."""
original_create = output_target_store.create_wled_target
def _fail(*args, **kwargs):
raise ValueError("Always fails")
output_target_store.create_wled_target = _fail
try:
_scaffold(setup_client, device_id=sample_device.id)
_scaffold(setup_client, device_id=sample_device.id)
finally:
output_target_store.create_wled_target = original_create
assert (
len(picture_source_store.get_all_streams()) == 0
), "Picture sources accumulated across two failed scaffolds"
assert (
len(css_store.get_all_sources()) == 0
), "Color-strip sources accumulated across two failed scaffolds"
# ---------------------------------------------------------------------------
# Onboarding: adversarial cases
# ---------------------------------------------------------------------------
@pytest.fixture
def pref_client(tmp_db):
from ledgrab.api.routes.preferences import router
from ledgrab.api.auth import verify_api_key
from ledgrab.api import dependencies as deps
app = FastAPI()
app.include_router(router)
app.dependency_overrides[verify_api_key] = lambda: "test"
app.dependency_overrides[deps.get_database] = lambda: tmp_db
return TestClient(app)
class TestOnboardingAdversarial:
def test_put_false_clears_completed_at(self, pref_client):
"""PUT onboarded=false must clear completed_at to null, per criteria."""
# First mark as onboarded
r1 = pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": True})
assert r1.status_code == 200
assert r1.json()["completed_at"] is not None
# Now set to false — completed_at must be cleared
r2 = pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": False})
assert r2.status_code == 200
data = r2.json()
assert data["onboarded"] is False
assert (
data["completed_at"] is None
), f"completed_at should be null after PUT onboarded=false, got {data['completed_at']!r}"
def test_put_false_then_get_returns_null_completed_at(self, pref_client):
"""After PUT false, GET must also return null completed_at (persisted correctly)."""
pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": True})
pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": False})
resp = pref_client.get("/api/v1/preferences/onboarding")
assert resp.status_code == 200
assert (
resp.json()["completed_at"] is None
), "GET after PUT false should return null completed_at"
def test_corrupt_stored_value_falls_back_to_default(self, tmp_db, pref_client):
"""If the stored onboarding value is corrupt, GET must fall back to default.
Criteria: "corrupt stored value falls back to default".
We inject garbage into the db directly, then hit GET.
"""
# Inject a value that is syntactically valid JSON (dict) but fails
# Pydantic validation because the types are wrong.
tmp_db.set_setting("onboarded", {"onboarded": "not_a_bool", "completed_at": 12345})
resp = pref_client.get("/api/v1/preferences/onboarding")
assert (
resp.status_code == 200
), f"Expected 200 for corrupt onboarding value, got {resp.status_code}"
data = resp.json()
# Must fall back to default
assert (
data["onboarded"] is False
), f"Expected onboarded=false as default after corrupt value, got {data['onboarded']!r}"
assert (
data["completed_at"] is None
), f"Expected completed_at=null as default, got {data['completed_at']!r}"
def test_corrupt_stored_value_as_wrong_type_falls_back(self, tmp_db, pref_client):
"""Stored value is a string (not a dict) — must fall back to default."""
tmp_db.set_setting("onboarded", "this_is_not_valid")
resp = pref_client.get("/api/v1/preferences/onboarding")
assert resp.status_code == 200
data = resp.json()
assert data["onboarded"] is False
assert data["completed_at"] is None
def test_corrupt_stored_null_falls_back_to_default(self, tmp_db, pref_client):
"""Stored value is null/None — must return default (not crash)."""
tmp_db.set_setting("onboarded", None)
resp = pref_client.get("/api/v1/preferences/onboarding")
assert resp.status_code == 200
assert resp.json()["onboarded"] is False
def test_put_true_without_completed_at_stamps_timestamp(self, pref_client):
"""PUT onboarded=true without completed_at must auto-stamp a timestamp."""
resp = pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": True})
assert resp.status_code == 200
data = resp.json()
assert (
data["completed_at"] is not None
), "Server must auto-stamp completed_at when onboarded=true is sent without a timestamp"
# Should be a non-empty ISO timestamp string
assert len(data["completed_at"]) > 10
def test_put_true_with_completed_at_preserves_it(self, pref_client):
"""PUT onboarded=true with explicit completed_at must preserve that value."""
ts = "2025-03-15T10:00:00+00:00"
resp = pref_client.put(
"/api/v1/preferences/onboarding",
json={"onboarded": True, "completed_at": ts},
)
assert resp.status_code == 200
assert resp.json()["completed_at"] == ts
def test_put_false_with_completed_at_clears_it(self, pref_client):
"""PUT onboarded=false even with a completed_at payload must clear it.
The criteria say: 'Setting onboarded=false clears completed_at to null.'
"""
ts = "2025-01-01T00:00:00+00:00"
resp = pref_client.put(
"/api/v1/preferences/onboarding",
json={"onboarded": False, "completed_at": ts},
)
assert resp.status_code == 200
data = resp.json()
assert data["onboarded"] is False
assert (
data["completed_at"] is None
), f"completed_at should be cleared to null when onboarded=false, got {data['completed_at']!r}"
def test_multiple_true_puts_only_stamp_once(self, pref_client):
"""Two successive PUT true calls — second call must preserve the original timestamp."""
r1 = pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": True})
ts1 = r1.json()["completed_at"]
assert ts1 is not None
# Explicitly provide the original timestamp in second call
r2 = pref_client.put(
"/api/v1/preferences/onboarding",
json={"onboarded": True, "completed_at": ts1},
)
assert (
r2.json()["completed_at"] == ts1
), "Explicit completed_at should be preserved on second PUT"
@@ -0,0 +1,535 @@
"""Adversarial / concurrency tests for CalibrationSession.
Phase 1 acceptance criteria tested here (NOT what the code happens to do):
- Interleaved start/start (same device, then different device) must never
leave the old device without restore.
- Interleaved start/stop racing the idle watchdog must not leave the device
dark or stuck.
- Idle-timeout teardown restores the prior target.
- position() with index out of range → ValueError.
- stop() when idle is a safe no-op (does not call start_processing or crash).
- CalibrationSession lock must prevent double-teardown.
All tests use a fake ProcessorManager matching the shape used in
test_calibration_routes.py.
"""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import pytest_asyncio
from ledgrab.core.capture.calibration_session import (
CalibrationSession,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_manager(device_id: str = "dev1", led_count: int = 100) -> MagicMock:
"""Build a minimal fake ProcessorManager."""
mgr = MagicMock()
ds = MagicMock()
ds.led_count = led_count
mgr._devices = {device_id: ds}
mgr.get_processing_target_for_device = MagicMock(return_value=None)
mgr.stop_processing = AsyncMock()
mgr.start_processing = AsyncMock()
mgr.send_clear_pixels = AsyncMock()
mgr.set_calibration_pixel = AsyncMock()
return mgr
@pytest_asyncio.fixture(autouse=True)
async def fresh_session():
"""Yield a brand-new CalibrationSession for each test (not the singleton)."""
session = CalibrationSession()
yield session
# Cleanup: cancel any lingering watchdog
if session._timeout_task and not session._timeout_task.done():
session._timeout_task.cancel()
try:
await session._timeout_task
except (asyncio.CancelledError, Exception):
pass
# ---------------------------------------------------------------------------
# stop() when idle is a safe no-op
# ---------------------------------------------------------------------------
class TestStopWhenIdle:
@pytest.mark.asyncio
async def test_stop_idle_does_not_call_start_processing(self, fresh_session):
"""Calling stop() when no session is active must not call start_processing."""
mgr = _make_manager()
# Do NOT start a session — just stop immediately
await fresh_session.stop()
mgr.start_processing.assert_not_awaited()
@pytest.mark.asyncio
async def test_stop_idle_returns_inactive_state(self, fresh_session):
"""stop() on an idle session returns state with active=False."""
state = fresh_session.get_state()
assert state["active"] is False
await fresh_session.stop() # no-op
assert fresh_session.is_active is False
@pytest.mark.asyncio
async def test_cancel_idle_safe(self, fresh_session):
"""cancel() on idle session is also a safe no-op."""
await fresh_session.cancel()
assert fresh_session.is_active is False
@pytest.mark.asyncio
async def test_double_stop_is_idempotent(self, fresh_session):
"""Calling stop() twice on an active session must not double-call start_processing."""
mgr = _make_manager()
mgr.get_processing_target_for_device = MagicMock(return_value="tgt_restore")
await fresh_session.start("dev1", mgr)
assert fresh_session.is_active is True
await fresh_session.stop()
assert fresh_session.is_active is False
# Restore called exactly once
mgr.start_processing.assert_awaited_once_with("tgt_restore")
# Second stop must be a no-op
await fresh_session.stop()
# start_processing should still be called exactly once (not twice)
assert mgr.start_processing.await_count == 1
# ---------------------------------------------------------------------------
# position() out of range
# ---------------------------------------------------------------------------
class TestPositionOutOfRange:
@pytest.mark.asyncio
async def test_position_equal_to_led_count_raises(self, fresh_session):
"""index == led_count must raise ValueError (0-based, so out of range)."""
mgr = _make_manager(led_count=100)
await fresh_session.start("dev1", mgr)
with pytest.raises(ValueError, match="out of range"):
await fresh_session.position(100)
@pytest.mark.asyncio
async def test_position_above_led_count_raises(self, fresh_session):
"""index > led_count raises ValueError."""
mgr = _make_manager(led_count=50)
await fresh_session.start("dev1", mgr)
with pytest.raises(ValueError, match="out of range"):
await fresh_session.position(999)
@pytest.mark.asyncio
async def test_position_negative_raises(self, fresh_session):
"""Negative index raises ValueError."""
mgr = _make_manager(led_count=100)
await fresh_session.start("dev1", mgr)
with pytest.raises(ValueError):
await fresh_session.position(-1)
@pytest.mark.asyncio
async def test_position_at_led_count_minus_1_is_valid(self, fresh_session):
"""Last valid index (led_count - 1) must succeed."""
mgr = _make_manager(led_count=10)
await fresh_session.start("dev1", mgr)
await fresh_session.position(9) # must not raise
mgr.set_calibration_pixel.assert_awaited()
@pytest.mark.asyncio
async def test_position_without_active_session_raises(self, fresh_session):
"""position() with no active session must raise RuntimeError."""
with pytest.raises(RuntimeError, match="No active calibration session"):
await fresh_session.position(5)
# ---------------------------------------------------------------------------
# Interleaved start/start — old device must be restored
# ---------------------------------------------------------------------------
class TestInterleavedStartStart:
@pytest.mark.asyncio
async def test_start_on_same_device_restores_prior_target(self, fresh_session):
"""Starting a second session on the same device auto-stops the first.
The first device's prior target must be restored before the second session begins.
"""
mgr = _make_manager(led_count=60)
mgr.get_processing_target_for_device = MagicMock(return_value="tgt_original")
# Start first session
await fresh_session.start("dev1", mgr)
assert fresh_session.is_active is True
assert fresh_session._prior_target_id == "tgt_original"
# Now start again on the same device
# The second start should stop the first (restoring tgt_original),
# then re-query the current running target (which will be tgt_original again
# since start_processing will have been called).
# For isolation: change what get_processing_target_for_device returns after
# the first stop so the second session records a fresh prior.
call_count = {"n": 0}
def _get_target(device_id):
call_count["n"] += 1
if call_count["n"] == 1:
return "tgt_original"
return None # After first stop, no target running
mgr.get_processing_target_for_device = MagicMock(side_effect=_get_target)
# First session with the original target
fresh_session2 = CalibrationSession()
await fresh_session2.start("dev1", mgr)
assert fresh_session2.is_active is True
# Start a NEW session on the same device — must auto-stop fresh_session2
fresh_session3 = CalibrationSession()
# Inject fresh_session2's internal state into fresh_session3 to simulate
# the singleton pattern: replace session3's state to reflect session2 active
# (this mirrors the module-level singleton where only one CalibrationSession exists)
fresh_session3._active = fresh_session2._active
fresh_session3._device_id = fresh_session2._device_id
fresh_session3._led_count = fresh_session2._led_count
fresh_session3._prior_target_id = fresh_session2._prior_target_id
fresh_session3._last_activity = fresh_session2._last_activity
fresh_session3._manager = fresh_session2._manager
fresh_session3._timeout_task = fresh_session2._timeout_task
fresh_session2._timeout_task = None # prevent double-cancel
await fresh_session3.start("dev1", mgr)
# The start must have called stop on the previous session → restore was called
# (mgr.start_processing was called at least once to restore the prior target)
assert mgr.start_processing.await_count >= 1
# Cleanup
await fresh_session3.stop()
if fresh_session2._timeout_task and not fresh_session2._timeout_task.done():
fresh_session2._timeout_task.cancel()
@pytest.mark.asyncio
async def test_new_session_on_different_device_clears_old_device(self, fresh_session):
"""Starting a new session on a different device must clear the first device.
The first session must be stopped (its prior target restored or cleared)
before the second session on the new device becomes active.
"""
mgr = MagicMock()
ds1 = MagicMock()
ds1.led_count = 30
ds2 = MagicMock()
ds2.led_count = 60
mgr._devices = {"dev1": ds1, "dev2": ds2}
mgr.get_processing_target_for_device = MagicMock(return_value=None)
mgr.stop_processing = AsyncMock()
mgr.start_processing = AsyncMock()
mgr.send_clear_pixels = AsyncMock()
mgr.set_calibration_pixel = AsyncMock()
# Start first session on dev1
await fresh_session.start("dev1", mgr)
assert fresh_session._device_id == "dev1"
assert fresh_session.is_active is True
# Now start second session on dev2 — must auto-stop dev1 first
await fresh_session.start("dev2", mgr)
# After the second start, session must be on dev2
assert fresh_session._device_id == "dev2"
assert fresh_session.is_active is True
# send_clear_pixels was called for dev1 (stop) AND for dev2 (start)
clear_calls = [call[0][0] for call in mgr.send_clear_pixels.call_args_list]
assert (
"dev1" in clear_calls
), f"dev1 was never cleared during session switch; clear calls: {clear_calls}"
assert (
"dev2" in clear_calls
), f"dev2 was never cleared at session start; clear calls: {clear_calls}"
# Cleanup
await fresh_session.stop()
# ---------------------------------------------------------------------------
# Idle-timeout teardown restores prior target
# ---------------------------------------------------------------------------
class TestIdleTimeoutRestoresPriorTarget:
@pytest.mark.asyncio
async def test_idle_timeout_calls_start_processing(self, fresh_session):
"""When the session times out, start_processing must be called to restore the target.
We patch IDLE_TIMEOUT_SECONDS to a tiny value so the test doesn't actually
wait 60 seconds.
"""
mgr = _make_manager(led_count=40)
mgr.get_processing_target_for_device = MagicMock(return_value="tgt_to_restore")
# Patch the idle timeout to 0.05 seconds
with patch(
"ledgrab.core.capture.calibration_session.IDLE_TIMEOUT_SECONDS",
0.05,
):
# Also patch the watchdog sleep to something tiny
async def _fast_watchdog():
"""A watchdog that checks every 0.02 seconds instead of 5."""
try:
while True:
await asyncio.sleep(0.02)
if not fresh_session._active or fresh_session._last_activity is None:
break
from datetime import datetime, timezone
elapsed = (
datetime.now(timezone.utc) - fresh_session._last_activity
).total_seconds()
if elapsed >= 0.05:
async with fresh_session._lock:
await fresh_session._teardown_locked(cancelled=False)
break
except asyncio.CancelledError:
pass
fresh_session._idle_watchdog = _fast_watchdog # type: ignore[method-assign]
await fresh_session.start("dev1", mgr)
assert fresh_session.is_active is True
# Wait long enough for the watchdog to fire
await asyncio.sleep(0.25)
# Session should have been auto-stopped
assert (
fresh_session.is_active is False
), "Session should have been auto-stopped by idle timeout"
# Prior target must have been restored
mgr.start_processing.assert_awaited_once_with("tgt_to_restore")
@pytest.mark.asyncio
async def test_idle_timeout_clears_device_to_black(self, fresh_session):
"""Idle timeout must send all-black before (or after) restoring target."""
mgr = _make_manager(led_count=40)
async def _fast_watchdog():
try:
while True:
await asyncio.sleep(0.02)
if not fresh_session._active or fresh_session._last_activity is None:
break
from datetime import datetime, timezone
elapsed = (
datetime.now(timezone.utc) - fresh_session._last_activity
).total_seconds()
if elapsed >= 0.05:
async with fresh_session._lock:
await fresh_session._teardown_locked(cancelled=False)
break
except asyncio.CancelledError:
pass
fresh_session._idle_watchdog = _fast_watchdog # type: ignore[method-assign]
await fresh_session.start("dev1", mgr)
initial_clear_count = mgr.send_clear_pixels.await_count # from start()
await asyncio.sleep(0.25)
# send_clear_pixels must have been called at least once more during teardown
assert (
mgr.send_clear_pixels.await_count > initial_clear_count
), "Device was not cleared to black during idle-timeout teardown"
# ---------------------------------------------------------------------------
# Concurrent start/start using asyncio.gather (true concurrency within the loop)
# ---------------------------------------------------------------------------
class TestConcurrentStartCalls:
@pytest.mark.asyncio
async def test_concurrent_starts_dont_corrupt_state(self, fresh_session):
"""Two concurrent start() calls must leave the session in a consistent state.
Only one session should be active after both complete; the final device
must match one of the two requested devices.
"""
mgr = MagicMock()
ds1 = MagicMock()
ds1.led_count = 50
ds2 = MagicMock()
ds2.led_count = 80
mgr._devices = {"devA": ds1, "devB": ds2}
mgr.get_processing_target_for_device = MagicMock(return_value=None)
mgr.stop_processing = AsyncMock()
mgr.start_processing = AsyncMock()
mgr.send_clear_pixels = AsyncMock()
mgr.set_calibration_pixel = AsyncMock()
# Fire both concurrently — the lock must ensure exactly one wins
results = await asyncio.gather(
fresh_session.start("devA", mgr),
fresh_session.start("devB", mgr),
return_exceptions=True,
)
# Neither call should raise (both complete without exception)
for r in results:
if isinstance(r, BaseException):
pytest.fail(f"Concurrent start raised unexpectedly: {r!r}")
# Exactly one session active with a valid device
assert fresh_session.is_active is True
assert fresh_session._device_id in ("devA", "devB")
# Cleanup
await fresh_session.stop()
# ---------------------------------------------------------------------------
# stop() while watchdog is still pending (concurrent stop/watchdog)
# ---------------------------------------------------------------------------
class TestStopVsWatchdogRace:
@pytest.mark.asyncio
async def test_explicit_stop_wins_over_watchdog(self, fresh_session):
"""Explicit stop() must cleanly terminate before the watchdog fires.
After explicit stop, start_processing must be called exactly once
even if the watchdog later tries to tear down.
"""
mgr = _make_manager(led_count=100)
mgr.get_processing_target_for_device = MagicMock(return_value="tgt_explicit")
await fresh_session.start("dev1", mgr)
assert fresh_session.is_active is True
# Explicitly stop — the watchdog task should be cancelled
await fresh_session.stop()
assert fresh_session.is_active is False
# start_processing called exactly once for the prior target
mgr.start_processing.assert_awaited_once_with("tgt_explicit")
# Give the event loop a moment to confirm the watchdog didn't double-fire
await asyncio.sleep(0.05)
# Still exactly once
assert (
mgr.start_processing.await_count == 1
), "start_processing was called more than once — watchdog fired after explicit stop"
# ---------------------------------------------------------------------------
# start() with unknown device_id
# ---------------------------------------------------------------------------
class TestStartUnknownDevice:
@pytest.mark.asyncio
async def test_unknown_device_raises_valueerror(self, fresh_session):
"""start() with a device_id not in manager._devices must raise ValueError."""
mgr = _make_manager(device_id="known_device")
with pytest.raises(ValueError, match="not found"):
await fresh_session.start("does_not_exist", mgr)
@pytest.mark.asyncio
async def test_unknown_device_leaves_session_inactive(self, fresh_session):
"""After a failed start() the session must remain inactive."""
mgr = _make_manager(device_id="known_device")
try:
await fresh_session.start("no_such_device", mgr)
except ValueError:
pass
assert fresh_session.is_active is False
@pytest.mark.asyncio
async def test_failed_start_does_not_corrupt_existing_session(self, fresh_session):
"""A failed start() attempt must not corrupt an already-active session."""
mgr = MagicMock()
ds = MagicMock()
ds.led_count = 100
mgr._devices = {"dev1": ds}
mgr.get_processing_target_for_device = MagicMock(return_value="prior_tgt")
mgr.stop_processing = AsyncMock()
mgr.start_processing = AsyncMock()
mgr.send_clear_pixels = AsyncMock()
mgr.set_calibration_pixel = AsyncMock()
# Start a valid session
await fresh_session.start("dev1", mgr)
assert fresh_session.is_active is True
# Now try to start on an unknown device — should fail
try:
await fresh_session.start("unknown_device", mgr)
except ValueError:
pass
except Exception:
pass # Other errors are also acceptable
# The original session must still be active (or was cleanly stopped and
# replaced); either way the device must not be stuck in a broken state.
# The critical invariant: if active, device_id is valid.
if fresh_session.is_active:
assert fresh_session._device_id in mgr._devices
await fresh_session.stop()
# ---------------------------------------------------------------------------
# State snapshot integrity
# ---------------------------------------------------------------------------
class TestStateSnapshot:
@pytest.mark.asyncio
async def test_get_state_before_start(self, fresh_session):
state = fresh_session.get_state()
assert state["active"] is False
assert state["device_id"] is None
assert state["led_count"] == 0
assert state["prior_target_id"] is None
assert state["last_activity"] is None
@pytest.mark.asyncio
async def test_get_state_after_start(self, fresh_session):
mgr = _make_manager(led_count=60)
mgr.get_processing_target_for_device = MagicMock(return_value="saved_tgt")
await fresh_session.start("dev1", mgr)
state = fresh_session.get_state()
assert state["active"] is True
assert state["device_id"] == "dev1"
assert state["led_count"] == 60
assert state["prior_target_id"] == "saved_tgt"
assert state["last_activity"] is not None
await fresh_session.stop()
@pytest.mark.asyncio
async def test_get_state_after_stop(self, fresh_session):
mgr = _make_manager()
await fresh_session.start("dev1", mgr)
await fresh_session.stop()
state = fresh_session.get_state()
assert state["active"] is False
assert state["device_id"] is None
assert state["led_count"] == 0
assert state["prior_target_id"] is None
@@ -0,0 +1,562 @@
"""Adversarial / edge-case tests for solve_calibration() and build_segments().
Criteria derived from Phase 1 acceptance criteria (NOT from what the code does):
- solve_calibration returns correct per-edge counts; result round-trips through
build_segments() and totals are preserved.
- An edge with 0 LEDs (two corners tapped adjacent) is valid.
- Wrap-around edges work correctly.
- Invalid inputs raise cleanly (ValueError).
New adversarial cases not covered by the existing 19 happy-path tests:
- All four corner_indices equal (degenerate: every edge = 0) — the code must
either handle it gracefully (total=0) OR raise with a clear message.
Per criteria the total must be preserved (0 == 0) and round-trip must not crash.
- Descending / out-of-order corner_indices where wrap-around should apply.
- An edge that spans the whole strip (one edge wraps the full led_count minus
the contribution of the three zero-LED edges).
- led_count just above minimum (led_count=1) with a single LED claimed by
one edge.
- offset >= led_count: the solver stores offset verbatim; PixelMapper normalises
it via % total_leds. The build_segments() round-trip must not crash.
- Corner indices modulo led_count are used, so indices >= led_count should be
accepted and reduced.
"""
import pytest
from ledgrab.core.capture.calibration import (
EDGE_ORDER,
CalibrationConfig,
solve_calibration,
)
# ---------------------------------------------------------------------------
# Helper (same as in test_calibration_solver.py — duplicated intentionally so
# this adversarial file can run in isolation)
# ---------------------------------------------------------------------------
def _roundtrip(cfg: CalibrationConfig) -> None:
"""Assert build_segments() doesn't crash and totals match."""
segs = cfg.build_segments()
total_from_segs = sum(s.led_count for s in segs)
expected = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left
assert (
total_from_segs == expected
), f"Segment total {total_from_segs} != field total {expected} for cfg={cfg!r}"
# ---------------------------------------------------------------------------
# Degenerate: all four corner indices are equal
# When all four corners are tapped at the same index every consecutive pair
# has start_idx == end_idx, so every edge gets 0 LEDs. Total = 0 is
# mathematically consistent with the input; the function should either
# return a config with 0-LED edges OR raise a clear ValueError.
# It must NOT silently return wrong counts and must NOT crash unexpectedly.
# ---------------------------------------------------------------------------
class TestAllFourCornersEqual:
"""All four corner_indices equal → every edge = 0 LEDs or clean ValueError."""
@pytest.mark.parametrize(
"start_position,layout",
[(sp, lay) for sp, lay in EDGE_ORDER.keys()],
)
def test_all_equal_returns_zero_total_or_raises(self, start_position, layout):
"""solve_calibration([k,k,k,k]) must not produce non-zero counts."""
led_count = 100
try:
cfg = solve_calibration(
led_count=led_count,
start_position=start_position,
layout=layout,
corner_indices=[0, 0, 0, 0],
)
# If it doesn't raise: total must be 0 (consistent with input)
total = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left
assert (
total == 0
), f"{start_position}/{layout}: all-equal corners produced total={total}; expected 0"
# build_segments must not crash on a 0-total config
segs = cfg.build_segments()
assert segs == [], f"Expected no segments for 0-LED config, got {segs!r}"
except ValueError:
# Raising is also acceptable — as long as it's a ValueError, not a
# crash/assertion/other exception.
pass
def test_all_equal_roundtrip_does_not_crash(self):
"""Even if all edges are 0, build_segments() must not raise."""
try:
cfg = solve_calibration(
led_count=50,
start_position="top_left",
layout="clockwise",
corner_indices=[25, 25, 25, 25],
)
_roundtrip(cfg) # must not crash
except ValueError:
pass # acceptable
# ---------------------------------------------------------------------------
# Descending / out-of-order corner indices
# Out-of-order but non-wrapping: e.g. [70, 50, 30, 10] for clockwise bottom_left
# The solver uses consecutive-pair differences with wrap logic.
# Each pair (70→50, 50→30, 30→10, 10→70) has end < start, so ALL four edges
# get wrap-around counts. Total must still equal led_count.
# ---------------------------------------------------------------------------
class TestDescendingCornerIndices:
"""All four corners in descending order — every pair wraps."""
def test_descending_total_equals_led_count(self):
"""Descending indices [75, 50, 25, 0] — total must == led_count."""
# bottom_left/clockwise: edge_order=[left,top,right,bottom]
# left: 75→50 = 25, top: 50→25 = 25, right: 25→0 = 25, bottom: 0→75 = 75
# Wait — end > start for bottom: 0→75 means 75. But 0 < 75 so count=75-0=75.
# Recalculate: for bottom: start_idx=0, end_idx=75 → end>start → count=75
# total = 25+25+25+75 = 150 ≠ 100 → That's wrong input.
# Use [80, 60, 40, 20] for 100 LEDs:
# left: 80→60: end<start → wrap: (100-80)+60=80 WRONG too.
# Actually for a valid descending case that totals 100:
# Monotone descending with wrap on last pair only:
# [75, 50, 25, 0] for bottom_left/clockwise:
# left: start=75,end=50: 50<75 → wrap: (100-75)+50=75 NO
# All pairs descend when [75,50,25,0]:
# 0→75 (last pair, bottom): 75>0 → count=75-0=75
# 75→50 (left): 50<75 → wrap: (100-75)+50=75
# Total would be 75+25+25+75=200 ≠ 100
# That shows descending indices don't sum to led_count in general.
# The critical invariant is: sum of per-edge counts == led_count when
# the corner indices span a single full traversal.
# To get descending [80,60,40,20] → sum via wrap logic:
# left: 80→60: 60<80 → (100-80)+60=80
# top: 60→40: 40<60 → (100-60)+40=80
# right: 40→20: 20<40 → (100-40)+20=80
# bottom: 20→80: 80>20 → 80-20=60
# total = 80+80+80+60 = 300 — still not 100.
#
# The INVARIANT of solve_calibration is: if corner_indices form a valid
# partition of the strip (i.e. each consecutive pair covers a segment
# that together span exactly led_count LEDs), the total == led_count.
# Descending indices that form valid partitions do sum to led_count.
# Example: if we want all edges wrapped, use [99, 74, 49, 24]:
# left: 99→74: 74<99 → (100-99)+74=75
# top: 74→49: 49<74 → (100-74)+49=75
# right: 49→24: 24<49 → (100-49)+24=75
# bottom: 24→99: 99>24 → 99-24=75
# total = 75+75+75+75=300 ≠ 100
#
# Conclusion: descending indices don't need to sum to led_count — only
# a proper strip traversal (covering each LED exactly once) does.
# The adversarial test here is: the function must NOT crash, must NOT
# produce negative counts, and must round-trip cleanly.
cfg = solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[80, 60, 40, 20],
)
counts = [cfg.leds_top, cfg.leds_right, cfg.leds_bottom, cfg.leds_left]
assert all(c >= 0 for c in counts), f"Negative edge counts: {counts}"
_roundtrip(cfg)
def test_all_four_wrap_around_non_negative(self):
"""Every consecutive pair wraps (descending) — all counts must be >= 0."""
# [90, 70, 50, 30] for led_count=100, top_right/clockwise
cfg = solve_calibration(
led_count=100,
start_position="top_right",
layout="clockwise",
corner_indices=[90, 70, 50, 30],
)
counts = [cfg.leds_top, cfg.leds_right, cfg.leds_bottom, cfg.leds_left]
assert all(c >= 0 for c in counts), f"Negative edge counts: {counts}"
_roundtrip(cfg)
# ---------------------------------------------------------------------------
# Edge that wraps the whole strip
# When only one edge is used (the other 3 are 0 LEDs), that edge should span
# led_count LEDs. Corner indices: [0, 0, 0, 0] gives all-zero (tested above).
# A single-edge case: [0, 100, 100, 100] would give left=100, top=0, right=0,
# bottom=0 for bottom_left/clockwise. But 100 % 100 = 0 so end_idx=0,
# start_idx=0 → count=0 for left too. Use index 100 directly (>led_count).
# Alternatively: for led_count=100, corners [0, 0, 0, 0] with all-zero is
# already tested. The wrap-all case uses non-modulo indices.
#
# The real case: one valid single-edge scenario:
# bottom_left/clockwise, 100 LEDs, EDGE_ORDER = [left,top,right,bottom]
# corners [0, 100, 100, 100]: left: 0→100%100=0 → 0-LED because end==start.
# No, there's no way to make one edge claim all 100 LEDs with index-based input
# other than having it wrap around. Test: [50, 50, 50, 50] (all equal):
# already tested above.
# The closest adversarial case: one edge with count == led_count via wrap-around.
# For bottom_left/clockwise, [50, 50, 50, 50] makes top=0,right=0,bottom=0,left=0.
# To make ONE edge == 100: we need start_idx=50, end_idx=50 for three edges
# and start=50, end=50 wraps to 0 for that edge... that's the all-zeros case.
# The only way one edge gets ALL leds is:
# e.g. left: corners[0]=50, corners[1]=50 → 50==50 → count=0 (NOT 100).
# There's a subtle algorithmic distinction: adjacent indices being equal → 0 LED
# vs wrap-around: if the algorithm used (start+count)%N as end, then one-edge
# spanning the whole strip would require start=end via wrap, giving count=N.
# But the current algorithm: end==start → count=0. This is the CORRECT behavior
# per the Phase 1 spec ("Adjacent taps on the same index → 0-LED edge").
# ---------------------------------------------------------------------------
class TestWholestripSingleEdge:
"""Verify that a wrap-around edge spanning led_count-3 LEDs (max when 3 others are 1 each) works."""
def test_one_large_edge_with_minimal_others(self):
"""One edge has led_count-3 LEDs, the other 3 each have 1 LED.
bottom_left/clockwise, led_count=100:
EDGE_ORDER = [left, top, right, bottom]
corners: [0, 97, 98, 99]
left: 0→97 = 97
top: 97→98 = 1
right: 98→99 = 1
bottom: 99→0 = 1 (wrap: (100-99)+0 = 1)
total = 97+1+1+1 = 100 ✓
"""
cfg = solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 97, 98, 99],
)
assert cfg.leds_left == 97
assert cfg.leds_top == 1
assert cfg.leds_right == 1
assert cfg.leds_bottom == 1
assert (cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left) == 100
_roundtrip(cfg)
def test_last_edge_wraps_nearly_all_leds(self):
"""The last edge (bottom in bottom_left/clockwise) wraps from 3 to 0.
corners: [0, 1, 2, 3]
left: 0→1 = 1
top: 1→2 = 1
right: 2→3 = 1
bottom: 3→0 = (100-3)+0 = 97 (wrap-around)
total = 100 ✓
"""
cfg = solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 1, 2, 3],
)
assert cfg.leds_left == 1
assert cfg.leds_top == 1
assert cfg.leds_right == 1
assert cfg.leds_bottom == 97
assert (cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left) == 100
_roundtrip(cfg)
# ---------------------------------------------------------------------------
# led_count just above minimum
# led_count=1: only 1 LED; corner_indices can only meaningfully be [0,0,0,0]
# which gives all zeros. That's the all-equal case above.
# led_count=5 (just above conceptual minimum) with a valid single-wrap partition.
# ---------------------------------------------------------------------------
class TestMinimalLedCount:
"""Edge cases around very small led_count values."""
def test_led_count_1_all_equal_indices(self):
"""led_count=1: the only valid index is 0; all-equal → all-zero edges."""
try:
cfg = solve_calibration(
led_count=1,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 0, 0, 0],
)
total = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left
assert total == 0, f"Expected total=0, got {total}"
_roundtrip(cfg)
except ValueError:
pass # also acceptable
def test_led_count_4_minimal_partition(self):
"""led_count=4, each edge gets 1 LED — minimum non-trivial partition."""
# bottom_left/clockwise EDGE_ORDER: [left, top, right, bottom]
# corners: [0, 1, 2, 3]
# left: 0→1=1, top: 1→2=1, right: 2→3=1, bottom: 3→0=(4-3)+0=1
cfg = solve_calibration(
led_count=4,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 1, 2, 3],
)
assert cfg.leds_left == 1
assert cfg.leds_top == 1
assert cfg.leds_right == 1
assert cfg.leds_bottom == 1
_roundtrip(cfg)
def test_led_count_0_raises_valueerror(self):
"""led_count=0 is explicitly invalid per the docstring."""
with pytest.raises(ValueError, match="led_count"):
solve_calibration(
led_count=0,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 0, 0, 0],
)
def test_led_count_negative_raises_valueerror(self):
"""led_count=-5 is invalid."""
with pytest.raises(ValueError):
solve_calibration(
led_count=-5,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 0, 0, 0],
)
def test_led_count_5_just_above_minimum(self):
"""led_count=5 with a valid 2/1/1/1 partition."""
# bottom_left/clockwise: [left,top,right,bottom]
# corners: [0, 2, 3, 4]
# left: 0→2=2, top: 2→3=1, right: 3→4=1, bottom: 4→0=(5-4)+0=1
cfg = solve_calibration(
led_count=5,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 2, 3, 4],
)
assert cfg.leds_left == 2
assert cfg.leds_top == 1
assert cfg.leds_right == 1
assert cfg.leds_bottom == 1
assert sum([cfg.leds_top, cfg.leds_right, cfg.leds_bottom, cfg.leds_left]) == 5
_roundtrip(cfg)
# ---------------------------------------------------------------------------
# Offset interactions
# The offset is stored verbatim; PixelMapper normalises it via % total_leds.
# solve_calibration must pass it through without modification.
# Also test: offset == 0 (explicit), offset == led_count (should store as-is),
# and offset >> led_count.
# ---------------------------------------------------------------------------
class TestOffsetInteractions:
"""Offset is stored verbatim and must not affect per-edge counts."""
def test_offset_zero_explicit(self):
cfg = solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 25, 50, 75],
offset=0,
)
assert cfg.offset == 0
_roundtrip(cfg)
def test_offset_equals_led_count_stored_verbatim(self):
"""offset=led_count should be stored as-is (not reduced)."""
cfg = solve_calibration(
led_count=100,
start_position="top_left",
layout="clockwise",
corner_indices=[0, 25, 50, 75],
offset=100,
)
assert cfg.offset == 100
_roundtrip(cfg)
def test_large_offset_stored_verbatim(self):
"""offset >> led_count — stored verbatim, build_segments must not crash."""
cfg = solve_calibration(
led_count=60,
start_position="top_right",
layout="counterclockwise",
corner_indices=[0, 15, 30, 45],
offset=9999,
)
assert cfg.offset == 9999
_roundtrip(cfg)
def test_offset_does_not_change_edge_counts(self):
"""Two calls with different offsets must produce identical edge counts."""
kwargs = dict(
led_count=100,
start_position="bottom_left",
layout="counterclockwise",
corner_indices=[0, 25, 50, 75],
)
cfg_no_offset = solve_calibration(**kwargs, offset=0)
cfg_offset_13 = solve_calibration(**kwargs, offset=13)
assert cfg_no_offset.leds_top == cfg_offset_13.leds_top
assert cfg_no_offset.leds_right == cfg_offset_13.leds_right
assert cfg_no_offset.leds_bottom == cfg_offset_13.leds_bottom
assert cfg_no_offset.leds_left == cfg_offset_13.leds_left
# ---------------------------------------------------------------------------
# corner_indices >= led_count (modulo reduction)
# The solver applies % led_count to each index before computing counts.
# Index 150 for led_count=100 should behave identically to index 50.
# ---------------------------------------------------------------------------
class TestCornerIndicesModuloReduction:
"""Indices >= led_count are reduced modulo led_count before use."""
def test_indices_above_led_count_reduced(self):
"""corner_indices [0, 25, 50, 75] and [100, 125, 150, 175] must give same result."""
led_count = 100
cfg_base = solve_calibration(
led_count=led_count,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 25, 50, 75],
)
cfg_shifted = solve_calibration(
led_count=led_count,
start_position="bottom_left",
layout="clockwise",
corner_indices=[100, 125, 150, 175], # each + 100 (≡ same mod 100)
)
assert cfg_base.leds_left == cfg_shifted.leds_left
assert cfg_base.leds_top == cfg_shifted.leds_top
assert cfg_base.leds_right == cfg_shifted.leds_right
assert cfg_base.leds_bottom == cfg_shifted.leds_bottom
_roundtrip(cfg_shifted)
def test_index_exactly_led_count_is_zero(self):
"""Index = led_count reduces to 0, same as explicit 0."""
cfg_zero = solve_calibration(
led_count=100,
start_position="top_left",
layout="clockwise",
corner_indices=[0, 25, 50, 75],
)
cfg_hundred = solve_calibration(
led_count=100,
start_position="top_left",
layout="clockwise",
corner_indices=[100, 25, 50, 75], # first index = led_count → reduces to 0
)
assert cfg_zero.leds_top == cfg_hundred.leds_top
assert cfg_zero.leds_right == cfg_hundred.leds_right
assert cfg_zero.leds_bottom == cfg_hundred.leds_bottom
assert cfg_zero.leds_left == cfg_hundred.leds_left
# ---------------------------------------------------------------------------
# Invalid input: wrong number of corner_indices
# ---------------------------------------------------------------------------
class TestInvalidCornerIndicesLength:
"""Wrong number of corner indices must raise ValueError."""
def test_three_corners_raises(self):
with pytest.raises(ValueError):
solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 25, 50],
)
def test_five_corners_raises(self):
with pytest.raises(ValueError):
solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[0, 20, 40, 60, 80],
)
def test_empty_corners_raises(self):
with pytest.raises(ValueError):
solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[],
)
def test_one_corner_raises(self):
with pytest.raises(ValueError):
solve_calibration(
led_count=100,
start_position="bottom_left",
layout="clockwise",
corner_indices=[50],
)
# ---------------------------------------------------------------------------
# Invalid start_position / layout
# ---------------------------------------------------------------------------
class TestInvalidEnumInputs:
def test_invalid_start_position_raises(self):
with pytest.raises(ValueError, match="start_position"):
solve_calibration(
led_count=100,
start_position="center",
layout="clockwise",
corner_indices=[0, 25, 50, 75],
)
def test_invalid_layout_raises(self):
with pytest.raises(ValueError):
solve_calibration(
led_count=100,
start_position="bottom_left",
layout="diagonal",
corner_indices=[0, 25, 50, 75],
)
def test_both_invalid_raises(self):
with pytest.raises(ValueError):
solve_calibration(
led_count=100,
start_position="wrong",
layout="wrong",
corner_indices=[0, 25, 50, 75],
)
# ---------------------------------------------------------------------------
# Totals preservation invariant
# For any valid input, sum(edge_counts) == sum via build_segments()
# Test across all 8 combinations with a wrap-around partition.
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("start_position,layout", list(EDGE_ORDER.keys()))
def test_total_preservation_with_wraparound(start_position, layout):
"""For all 8 combinations with a wrap-around partition, total preserved."""
# The wrap-around partition: first 3 edges get 20 LEDs each, last edge
# gets the remaining 40 via wrap.
# EDGE_ORDER tells us walk order; corners: [0, 20, 40, 60] — last edge wraps to 40.
# bottom: 60→0 = (100-60)+0 = 40.
cfg = solve_calibration(
led_count=100,
start_position=start_position,
layout=layout,
corner_indices=[0, 20, 40, 60],
)
# Each of first 3 edges = 20, last = 40
total = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left
assert total == 100, f"{start_position}/{layout}: expected total=100, got {total}"
_roundtrip(cfg)