diff --git a/server/src/ledgrab/api/routes/setup.py b/server/src/ledgrab/api/routes/setup.py index 0e3a947..2cd6983 100644 --- a/server/src/ledgrab/api/routes/setup.py +++ b/server/src/ledgrab/api/routes/setup.py @@ -36,11 +36,13 @@ from ledgrab.api.dependencies import ( get_device_store, get_output_target_store, get_picture_source_store, + get_processor_manager, get_template_store, ) from ledgrab.api.schemas.setup import ScaffoldRequest, ScaffoldResponse from ledgrab.core.capture.calibration import calibration_from_dict, create_default_calibration from ledgrab.core.capture_engines.factory import EngineRegistry +from ledgrab.core.processing.processor_manager import ProcessorManager from ledgrab.storage.base_store import EntityNotFoundError from ledgrab.storage.color_strip_store import ColorStripStore from ledgrab.storage.output_target_store import OutputTargetStore @@ -116,11 +118,16 @@ def _rollback( picture_source_store: PictureSourceStore, css_store: ColorStripStore, output_target_store: OutputTargetStore, + manager: ProcessorManager | None = None, ) -> None: """Delete entities created during this call, in reverse order. Only entities listed in ``created_ids`` are deleted; reused/pre-existing entities (including the device) are never touched. + + If *manager* is provided, any ``output_target`` entity in the rollback set + is also unregistered from the ProcessorManager before store deletion, so no + half-registered target is left behind. """ store_map: dict[str, Any] = { "capture_template": template_store, @@ -129,6 +136,18 @@ def _rollback( "output_target": output_target_store, } for entity_type, entity_id in reversed(created_ids): + # Unregister output targets from the processor manager first + if entity_type == "output_target" and manager is not None: + try: + manager.remove_target(entity_id) + logger.info("Scaffold rollback: unregistered target %s from manager", entity_id) + except (ValueError, RuntimeError) as exc: + logger.debug( + "Scaffold rollback: manager unregister skipped for %s — %s", + entity_id, + exc, + ) + store = store_map.get(entity_type) if store is None: logger.warning("Scaffold rollback: unknown entity type %r — skipping", entity_type) @@ -164,6 +183,7 @@ async def scaffold_setup( picture_source_store: PictureSourceStore = Depends(get_picture_source_store), css_store: ColorStripStore = Depends(get_color_strip_store), output_target_store: OutputTargetStore = Depends(get_output_target_store), + manager: ProcessorManager = Depends(get_processor_manager), ) -> ScaffoldResponse: """Create a ready-to-start LED capture chain. @@ -192,6 +212,7 @@ async def scaffold_setup( picture_source_store=picture_source_store, css_store=css_store, output_target_store=output_target_store, + manager=manager, ) try: @@ -269,6 +290,16 @@ async def scaffold_setup( pending_events.append(("output_target", target.id)) logger.info("Scaffold: created output target %s", target.id) + # ── Step 5b: register target with ProcessorManager ─────────────────── + try: + target.register_with_manager(manager) + except ValueError as exc: + logger.warning( + "Scaffold: could not register target %s in processor manager: %s", + target.id, + exc, + ) + except HTTPException: _rollback(created_ids, **rollback_stores) raise diff --git a/server/src/ledgrab/api/schemas/calibration.py b/server/src/ledgrab/api/schemas/calibration.py index c13561c..ece3713 100644 --- a/server/src/ledgrab/api/schemas/calibration.py +++ b/server/src/ledgrab/api/schemas/calibration.py @@ -1,6 +1,6 @@ """Pydantic schemas for the calibration session and solver API.""" -from typing import List, Literal +from typing import Annotated, List, Literal from pydantic import BaseModel, Field, model_validator @@ -65,14 +65,15 @@ class CalibrationSolveRequest(BaseModel): layout: Literal["clockwise", "counterclockwise"] = Field( description="Winding direction of the strip" ) - corner_indices: List[int] = Field( + corner_indices: List[Annotated[int, Field(ge=0)]] = Field( description=( "Four strip indices — one per screen corner — in the strip-walk order " "defined by (start_position, layout). Index 0 of the strip is the " "start corner; the four tap positions are recorded in strip order " "beginning from that start corner (the solver lays edges out from " "led_start=0, so a non-zero physical start would require the `offset` " - "field rather than a shifted corner_indices[0])." + "field rather than a shifted corner_indices[0]). Each element must be " + "non-negative (ge=0); out-of-range values yield a 422." ), min_length=4, max_length=4, diff --git a/server/src/ledgrab/static/js/features/setup-wizard.ts b/server/src/ledgrab/static/js/features/setup-wizard.ts index bb002e2..94a8338 100644 --- a/server/src/ledgrab/static/js/features/setup-wizard.ts +++ b/server/src/ledgrab/static/js/features/setup-wizard.ts @@ -27,6 +27,7 @@ import { t } from '../core/i18n.ts'; import { showToast } from '../core/ui.ts'; import { Modal } from '../core/modal.ts'; import { mountAutoCalibration, unmountAutoCalibration } from './auto-calibration.ts'; +import { suppressGettingStartedTour } from './tutorials.ts'; import { ICON_MONITOR, ICON_SPARKLES, ICON_DEVICE, ICON_OK, ICON_CHECK, ICON_ROCKET_ICON, ICON_CALIBRATION, ICON_START, ICON_SEARCH, ICON_PLUS, @@ -76,8 +77,6 @@ interface WizardState { let _state: WizardState | null = null; let _modal: SetupWizardModal | null = null; -const ONBOARDED_KEY = 'tour_completed'; // mirror tutorials.ts TOUR_KEY - const STEPS: WizardStep[] = ['welcome', 'device', 'display', 'scaffold', 'calibrate', 'start', 'done']; // ── Modal class ──────────────────────────────────────────────────────────────── @@ -167,7 +166,7 @@ async function _markOnboarded(): Promise { try { await apiPut('/preferences/onboarding', { onboarded: true }); // Suppress tooltip tour too — wizard owns the first-run experience - localStorage.setItem(ONBOARDED_KEY, '1'); + suppressGettingStartedTour(); } catch { // Non-fatal: UI already moved on } @@ -770,11 +769,9 @@ function _buildDoneStep(state: WizardState): string { `; } -function _attachStepListeners(step: WizardStep): void { - if (step === 'device') { - const form = document.getElementById('wizard-manual-form'); - if (form) form.addEventListener('submit', (e) => void wizardAddManualDevice(e)); - } +function _attachStepListeners(_step: WizardStep): void { + // The manual device form uses onsubmit="wizardAddManualDevice(event)" inline — + // no duplicate addEventListener needed here. } // ───────────────────────────────────────────────────────────────────────────── diff --git a/server/tests/api/routes/test_setup_routes.py b/server/tests/api/routes/test_setup_routes.py index f5a51a8..31ab504 100644 --- a/server/tests/api/routes/test_setup_routes.py +++ b/server/tests/api/routes/test_setup_routes.py @@ -21,7 +21,7 @@ from __future__ import annotations import pytest from fastapi import FastAPI from fastapi.testclient import TestClient -from unittest.mock import patch +from unittest.mock import MagicMock, patch # --------------------------------------------------------------------------- @@ -89,6 +89,14 @@ def event_log(): return log +@pytest.fixture +def mock_manager(): + """A MagicMock ProcessorManager that silently accepts all calls.""" + mgr = MagicMock() + mgr.remove_target = MagicMock() + return mgr + + @pytest.fixture def setup_client( tmp_db, @@ -98,6 +106,7 @@ def setup_client( css_store, output_target_store, event_log, + mock_manager, ): from ledgrab.api.routes.setup import router from ledgrab.api.auth import verify_api_key @@ -111,6 +120,7 @@ def setup_client( app.dependency_overrides[deps.get_picture_source_store] = lambda: picture_source_store app.dependency_overrides[deps.get_color_strip_store] = lambda: css_store app.dependency_overrides[deps.get_output_target_store] = lambda: output_target_store + app.dependency_overrides[deps.get_processor_manager] = lambda: mock_manager # Capture entity events def _fire(entity_type, action, entity_id): @@ -512,3 +522,79 @@ class TestScaffoldCalibrationIntegration: assert updated.calibration.leds_top == 15 assert updated.calibration.leds_right == 9 assert updated.calibration.layout == "clockwise" + + +# --------------------------------------------------------------------------- +# Regression: scaffold registers the output target with ProcessorManager +# --------------------------------------------------------------------------- + + +class TestScaffoldRegistersWithManager: + def test_scaffold_calls_add_target_on_manager( + self, + setup_client, + sample_device, + mock_manager, + ): + """Blocker regression: scaffold must register the created output target + with the ProcessorManager so that a subsequent start call can find it. + + WledOutputTarget.register_with_manager calls manager.add_target(...) + — we assert that method was invoked with the scaffolded target id. + """ + resp = _scaffold(setup_client, device_id=sample_device.id) + assert resp.status_code == 201, resp.text + target_id = resp.json()["output_target_id"] + + # register_with_manager → manager.add_target(target_id=..., ...) + mock_manager.add_target.assert_called_once() + call_kwargs = mock_manager.add_target.call_args + assert call_kwargs.kwargs.get("target_id") == target_id, ( + f"manager.add_target was not called with target_id={target_id!r}; " + f"actual call: {call_kwargs}" + ) + + def test_scaffold_rollback_calls_remove_target_on_manager( + self, + setup_client, + sample_device, + output_target_store, + mock_manager, + ): + """Rollback after a post-target-creation failure must call manager.remove_target + so no half-registered target lingers in the ProcessorManager. + + We inject a failure by making register_with_manager raise RuntimeError + (bypassing the ValueError-only guard), which puts the outer except branch + in play. The target IS already in created_ids at that point, so rollback + must call manager.remove_target(target_id). + """ + created_target_ids: list[str] = [] + + original_create = output_target_store.create_wled_target + + def _spy_create(*args, **kwargs): + target = original_create(*args, **kwargs) + created_target_ids.append(target.id) + return target + + output_target_store.create_wled_target = _spy_create + try: + # Patch register_with_manager on WledOutputTarget to raise RuntimeError — + # RuntimeError bypasses the ValueError guard and triggers the outer except, + # so rollback fires with the target already in created_ids. + with patch( + "ledgrab.storage.wled_output_target.WledOutputTarget.register_with_manager", + side_effect=RuntimeError("Injected registration failure for rollback test"), + ): + resp = setup_client.post( + "/api/v1/setup/scaffold", + json={"device_id": sample_device.id, "display_index": 0}, + ) + assert resp.status_code == 500, resp.text + + assert len(created_target_ids) == 1, "spy did not record a created target" + target_id = created_target_ids[0] + mock_manager.remove_target.assert_called_with(target_id) + finally: + output_target_store.create_wled_target = original_create diff --git a/server/tests/api/routes/test_setup_routes_adversarial.py b/server/tests/api/routes/test_setup_routes_adversarial.py new file mode 100644 index 0000000..0ce95f1 --- /dev/null +++ b/server/tests/api/routes/test_setup_routes_adversarial.py @@ -0,0 +1,506 @@ +"""Adversarial tests for the setup scaffold and onboarding preference endpoints. + +Phase 2 acceptance criteria (NOT what the code happens to do): + - Rollback when the FINAL step (output target create) fails leaves ZERO orphans + AND emits ZERO "created" events. + - Reused capture template is NOT deleted on rollback. + - display_index > 63 → 422. + - Missing device_id → 422 (Pydantic validation before handler runs). + - Unknown device_id → 404. + - PUT onboarding false clears completed_at to null. + - Corrupt stored onboarding value falls back to default (onboarded=false). + +These fill the gaps in the existing 22 happy-path tests. +""" + +from __future__ import annotations + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient +from unittest.mock import MagicMock, patch + + +# --------------------------------------------------------------------------- +# Shared fixtures (mirrors test_setup_routes.py exactly) +# --------------------------------------------------------------------------- + + +@pytest.fixture +def tmp_db(tmp_path): + from ledgrab.storage.database import Database + + db = Database(tmp_path / "adv_setup.db") + yield db + db.close() + + +@pytest.fixture +def device_store(tmp_db): + from ledgrab.storage import DeviceStore + + return DeviceStore(tmp_db) + + +@pytest.fixture +def template_store(tmp_db): + from ledgrab.storage.template_store import TemplateStore + + return TemplateStore(tmp_db) + + +@pytest.fixture +def picture_source_store(tmp_db): + from ledgrab.storage.picture_source_store import PictureSourceStore + + return PictureSourceStore(tmp_db) + + +@pytest.fixture +def css_store(tmp_db): + from ledgrab.storage.color_strip_store import ColorStripStore + + return ColorStripStore(tmp_db) + + +@pytest.fixture +def output_target_store(tmp_db): + from ledgrab.storage.output_target_store import OutputTargetStore + + return OutputTargetStore(tmp_db) + + +@pytest.fixture +def sample_device(device_store): + return device_store.create_device( + name="Test LED Strip", + url="http://192.168.1.10", + led_count=60, + ) + + +@pytest.fixture +def event_log(): + log = [] + return log + + +@pytest.fixture +def mock_manager(): + """A MagicMock ProcessorManager that silently accepts all calls.""" + mgr = MagicMock() + mgr.remove_target = MagicMock() + return mgr + + +@pytest.fixture +def setup_client( + tmp_db, + device_store, + template_store, + picture_source_store, + css_store, + output_target_store, + event_log, + mock_manager, +): + from ledgrab.api.routes.setup import router + from ledgrab.api.auth import verify_api_key + from ledgrab.api import dependencies as deps + + app = FastAPI() + app.include_router(router) + app.dependency_overrides[verify_api_key] = lambda: "test" + app.dependency_overrides[deps.get_device_store] = lambda: device_store + app.dependency_overrides[deps.get_template_store] = lambda: template_store + app.dependency_overrides[deps.get_picture_source_store] = lambda: picture_source_store + app.dependency_overrides[deps.get_color_strip_store] = lambda: css_store + app.dependency_overrides[deps.get_output_target_store] = lambda: output_target_store + app.dependency_overrides[deps.get_processor_manager] = lambda: mock_manager + + def _fire(entity_type, action, entity_id): + event_log.append((entity_type, action, entity_id)) + + with patch("ledgrab.api.routes.setup.fire_entity_event", side_effect=_fire): + yield TestClient(app, raise_server_exceptions=False) + + +def _scaffold(client, **overrides): + body = {"display_index": 0, **overrides} + return client.post("/api/v1/setup/scaffold", json=body) + + +# --------------------------------------------------------------------------- +# Rollback when the FINAL step (output target) fails +# Criteria: "zero orphans AND zero 'created' events" +# --------------------------------------------------------------------------- + + +class TestFinalStepRollback: + def test_final_step_failure_leaves_zero_orphans( + self, + setup_client, + sample_device, + picture_source_store, + css_store, + output_target_store, + ): + """output_target_store.create_wled_target failing leaves NO orphaned entities.""" + original_create = output_target_store.create_wled_target + + def _fail(*args, **kwargs): + raise ValueError("Injected final step failure") + + output_target_store.create_wled_target = _fail + try: + resp = _scaffold(setup_client, device_id=sample_device.id) + assert resp.status_code in ( + 400, + 500, + ), f"Expected 4xx/5xx on final-step failure, got {resp.status_code}: {resp.text}" + + # Zero picture sources remaining + remaining_ps = picture_source_store.get_all_streams() + assert len(remaining_ps) == 0, f"Picture sources not rolled back: {remaining_ps}" + + # Zero color-strip sources remaining + remaining_css = css_store.get_all_sources() + assert len(remaining_css) == 0, f"Color-strip sources not rolled back: {remaining_css}" + + # Zero output targets (trivially true since creation was never reached, + # but included for completeness) + remaining_ot = output_target_store.get_all_targets() + assert len(remaining_ot) == 0, f"Output targets not rolled back: {remaining_ot}" + finally: + output_target_store.create_wled_target = original_create + + def test_final_step_failure_emits_zero_created_events( + self, + setup_client, + sample_device, + output_target_store, + event_log, + ): + """No 'created' events must be emitted when the final step fails (deferred-event contract).""" + original_create = output_target_store.create_wled_target + + def _fail(*args, **kwargs): + raise ValueError("Final step injected failure") + + output_target_store.create_wled_target = _fail + try: + resp = _scaffold(setup_client, device_id=sample_device.id) + assert resp.status_code in (400, 500) + + created_events = [(et, act) for et, act, _ in event_log if act == "created"] + assert ( + created_events == [] + ), f"'created' events leaked on final-step rollback: {created_events}" + finally: + output_target_store.create_wled_target = original_create + + def test_final_step_failure_reused_template_survives( + self, + setup_client, + sample_device, + template_store, + output_target_store, + ): + """A reused capture template must NOT be deleted when the final step fails.""" + templates_before = {t.id for t in template_store.get_all_templates()} + original_create = output_target_store.create_wled_target + + def _fail(*args, **kwargs): + raise ValueError("Final step injected failure") + + output_target_store.create_wled_target = _fail + try: + _scaffold(setup_client, device_id=sample_device.id) + finally: + output_target_store.create_wled_target = original_create + + templates_after = {t.id for t in template_store.get_all_templates()} + assert templates_before == templates_after, ( + f"Template set changed after rollback: " + f"before={templates_before} after={templates_after}" + ) + + def test_final_step_failure_device_not_deleted( + self, + setup_client, + sample_device, + device_store, + output_target_store, + ): + """The pre-existing device must never be touched by rollback of any step.""" + original_create = output_target_store.create_wled_target + + def _fail(*args, **kwargs): + raise ValueError("Final step injected failure") + + output_target_store.create_wled_target = _fail + try: + _scaffold(setup_client, device_id=sample_device.id) + finally: + output_target_store.create_wled_target = original_create + + device = device_store.get(sample_device.id) + assert device is not None, "Pre-existing device was deleted during rollback" + + +# --------------------------------------------------------------------------- +# Validation: display_index bounds +# Criteria: display_index > 63 → 422; display_index 63 → 201; negative → 422 +# --------------------------------------------------------------------------- + + +class TestDisplayIndexBounds: + def test_display_index_64_returns_422(self, setup_client, sample_device): + """display_index=64 (one above max) must be rejected with 422.""" + resp = _scaffold(setup_client, device_id=sample_device.id, display_index=64) + assert ( + resp.status_code == 422 + ), f"Expected 422 for display_index=64, got {resp.status_code}: {resp.text}" + + def test_display_index_63_returns_201(self, setup_client, sample_device): + """display_index=63 is the maximum valid value — must be accepted.""" + resp = _scaffold(setup_client, device_id=sample_device.id, display_index=63) + assert ( + resp.status_code == 201 + ), f"Expected 201 for display_index=63, got {resp.status_code}: {resp.text}" + + def test_display_index_0_returns_201(self, setup_client, sample_device): + """display_index=0 is the minimum valid value.""" + resp = _scaffold(setup_client, device_id=sample_device.id, display_index=0) + assert ( + resp.status_code == 201 + ), f"Expected 201 for display_index=0, got {resp.status_code}: {resp.text}" + + def test_display_index_negative_1_returns_422(self, setup_client, sample_device): + """display_index=-1 must be rejected with 422.""" + resp = _scaffold(setup_client, device_id=sample_device.id, display_index=-1) + assert ( + resp.status_code == 422 + ), f"Expected 422 for display_index=-1, got {resp.status_code}: {resp.text}" + + def test_display_index_very_large_returns_422(self, setup_client, sample_device): + """display_index=10000 must be rejected with 422.""" + resp = _scaffold(setup_client, device_id=sample_device.id, display_index=10000) + assert ( + resp.status_code == 422 + ), f"Expected 422 for display_index=10000, got {resp.status_code}: {resp.text}" + + +# --------------------------------------------------------------------------- +# Validation: missing / unknown device_id +# --------------------------------------------------------------------------- + + +class TestDeviceValidation: + def test_missing_device_id_returns_422(self, setup_client): + """Omitting device_id entirely must yield 422 (Pydantic required field).""" + resp = setup_client.post("/api/v1/setup/scaffold", json={"display_index": 0}) + assert ( + resp.status_code == 422 + ), f"Expected 422 for missing device_id, got {resp.status_code}: {resp.text}" + + def test_empty_string_device_id_handled(self, setup_client): + """device_id='' — should yield 404 (empty string not in device store) or 422.""" + resp = _scaffold(setup_client, device_id="") + assert resp.status_code in ( + 404, + 422, + ), f"Expected 404 or 422 for empty device_id, got {resp.status_code}: {resp.text}" + + def test_unknown_device_id_returns_404(self, setup_client): + """device_id that does not exist in the store must yield 404.""" + resp = _scaffold(setup_client, device_id="device_definitely_does_not_exist") + assert ( + resp.status_code == 404 + ), f"Expected 404 for unknown device_id, got {resp.status_code}: {resp.text}" + + def test_none_device_id_returns_422(self, setup_client): + """device_id=null must yield 422 (not None-convertible to str).""" + resp = setup_client.post( + "/api/v1/setup/scaffold", json={"device_id": None, "display_index": 0} + ) + assert ( + resp.status_code == 422 + ), f"Expected 422 for null device_id, got {resp.status_code}: {resp.text}" + + +# --------------------------------------------------------------------------- +# Rollback idempotency: calling scaffold twice with the final step failing +# must still leave exactly zero orphans (no accumulation across calls). +# --------------------------------------------------------------------------- + + +class TestRollbackIdempotency: + def test_two_failed_scaffolds_leave_zero_orphans( + self, + setup_client, + sample_device, + picture_source_store, + css_store, + output_target_store, + ): + """Two sequential scaffold failures must not accumulate orphans.""" + original_create = output_target_store.create_wled_target + + def _fail(*args, **kwargs): + raise ValueError("Always fails") + + output_target_store.create_wled_target = _fail + try: + _scaffold(setup_client, device_id=sample_device.id) + _scaffold(setup_client, device_id=sample_device.id) + finally: + output_target_store.create_wled_target = original_create + + assert ( + len(picture_source_store.get_all_streams()) == 0 + ), "Picture sources accumulated across two failed scaffolds" + assert ( + len(css_store.get_all_sources()) == 0 + ), "Color-strip sources accumulated across two failed scaffolds" + + +# --------------------------------------------------------------------------- +# Onboarding: adversarial cases +# --------------------------------------------------------------------------- + + +@pytest.fixture +def pref_client(tmp_db): + from ledgrab.api.routes.preferences import router + from ledgrab.api.auth import verify_api_key + from ledgrab.api import dependencies as deps + + app = FastAPI() + app.include_router(router) + app.dependency_overrides[verify_api_key] = lambda: "test" + app.dependency_overrides[deps.get_database] = lambda: tmp_db + + return TestClient(app) + + +class TestOnboardingAdversarial: + def test_put_false_clears_completed_at(self, pref_client): + """PUT onboarded=false must clear completed_at to null, per criteria.""" + # First mark as onboarded + r1 = pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": True}) + assert r1.status_code == 200 + assert r1.json()["completed_at"] is not None + + # Now set to false — completed_at must be cleared + r2 = pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": False}) + assert r2.status_code == 200 + data = r2.json() + assert data["onboarded"] is False + assert ( + data["completed_at"] is None + ), f"completed_at should be null after PUT onboarded=false, got {data['completed_at']!r}" + + def test_put_false_then_get_returns_null_completed_at(self, pref_client): + """After PUT false, GET must also return null completed_at (persisted correctly).""" + pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": True}) + pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": False}) + resp = pref_client.get("/api/v1/preferences/onboarding") + assert resp.status_code == 200 + assert ( + resp.json()["completed_at"] is None + ), "GET after PUT false should return null completed_at" + + def test_corrupt_stored_value_falls_back_to_default(self, tmp_db, pref_client): + """If the stored onboarding value is corrupt, GET must fall back to default. + + Criteria: "corrupt stored value falls back to default". + We inject garbage into the db directly, then hit GET. + """ + # Inject a value that is syntactically valid JSON (dict) but fails + # Pydantic validation because the types are wrong. + tmp_db.set_setting("onboarded", {"onboarded": "not_a_bool", "completed_at": 12345}) + + resp = pref_client.get("/api/v1/preferences/onboarding") + assert ( + resp.status_code == 200 + ), f"Expected 200 for corrupt onboarding value, got {resp.status_code}" + data = resp.json() + # Must fall back to default + assert ( + data["onboarded"] is False + ), f"Expected onboarded=false as default after corrupt value, got {data['onboarded']!r}" + assert ( + data["completed_at"] is None + ), f"Expected completed_at=null as default, got {data['completed_at']!r}" + + def test_corrupt_stored_value_as_wrong_type_falls_back(self, tmp_db, pref_client): + """Stored value is a string (not a dict) — must fall back to default.""" + tmp_db.set_setting("onboarded", "this_is_not_valid") + + resp = pref_client.get("/api/v1/preferences/onboarding") + assert resp.status_code == 200 + data = resp.json() + assert data["onboarded"] is False + assert data["completed_at"] is None + + def test_corrupt_stored_null_falls_back_to_default(self, tmp_db, pref_client): + """Stored value is null/None — must return default (not crash).""" + tmp_db.set_setting("onboarded", None) + + resp = pref_client.get("/api/v1/preferences/onboarding") + assert resp.status_code == 200 + assert resp.json()["onboarded"] is False + + def test_put_true_without_completed_at_stamps_timestamp(self, pref_client): + """PUT onboarded=true without completed_at must auto-stamp a timestamp.""" + resp = pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": True}) + assert resp.status_code == 200 + data = resp.json() + assert ( + data["completed_at"] is not None + ), "Server must auto-stamp completed_at when onboarded=true is sent without a timestamp" + # Should be a non-empty ISO timestamp string + assert len(data["completed_at"]) > 10 + + def test_put_true_with_completed_at_preserves_it(self, pref_client): + """PUT onboarded=true with explicit completed_at must preserve that value.""" + ts = "2025-03-15T10:00:00+00:00" + resp = pref_client.put( + "/api/v1/preferences/onboarding", + json={"onboarded": True, "completed_at": ts}, + ) + assert resp.status_code == 200 + assert resp.json()["completed_at"] == ts + + def test_put_false_with_completed_at_clears_it(self, pref_client): + """PUT onboarded=false even with a completed_at payload must clear it. + + The criteria say: 'Setting onboarded=false clears completed_at to null.' + """ + ts = "2025-01-01T00:00:00+00:00" + resp = pref_client.put( + "/api/v1/preferences/onboarding", + json={"onboarded": False, "completed_at": ts}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["onboarded"] is False + assert ( + data["completed_at"] is None + ), f"completed_at should be cleared to null when onboarded=false, got {data['completed_at']!r}" + + def test_multiple_true_puts_only_stamp_once(self, pref_client): + """Two successive PUT true calls — second call must preserve the original timestamp.""" + r1 = pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": True}) + ts1 = r1.json()["completed_at"] + assert ts1 is not None + + # Explicitly provide the original timestamp in second call + r2 = pref_client.put( + "/api/v1/preferences/onboarding", + json={"onboarded": True, "completed_at": ts1}, + ) + assert ( + r2.json()["completed_at"] == ts1 + ), "Explicit completed_at should be preserved on second PUT" diff --git a/server/tests/core/test_calibration_session_adversarial.py b/server/tests/core/test_calibration_session_adversarial.py new file mode 100644 index 0000000..c0635cc --- /dev/null +++ b/server/tests/core/test_calibration_session_adversarial.py @@ -0,0 +1,535 @@ +"""Adversarial / concurrency tests for CalibrationSession. + +Phase 1 acceptance criteria tested here (NOT what the code happens to do): + - Interleaved start/start (same device, then different device) must never + leave the old device without restore. + - Interleaved start/stop racing the idle watchdog must not leave the device + dark or stuck. + - Idle-timeout teardown restores the prior target. + - position() with index out of range → ValueError. + - stop() when idle is a safe no-op (does not call start_processing or crash). + - CalibrationSession lock must prevent double-teardown. + +All tests use a fake ProcessorManager matching the shape used in +test_calibration_routes.py. +""" + +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +import pytest_asyncio + +from ledgrab.core.capture.calibration_session import ( + CalibrationSession, +) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +def _make_manager(device_id: str = "dev1", led_count: int = 100) -> MagicMock: + """Build a minimal fake ProcessorManager.""" + mgr = MagicMock() + ds = MagicMock() + ds.led_count = led_count + mgr._devices = {device_id: ds} + mgr.get_processing_target_for_device = MagicMock(return_value=None) + mgr.stop_processing = AsyncMock() + mgr.start_processing = AsyncMock() + mgr.send_clear_pixels = AsyncMock() + mgr.set_calibration_pixel = AsyncMock() + return mgr + + +@pytest_asyncio.fixture(autouse=True) +async def fresh_session(): + """Yield a brand-new CalibrationSession for each test (not the singleton).""" + session = CalibrationSession() + yield session + # Cleanup: cancel any lingering watchdog + if session._timeout_task and not session._timeout_task.done(): + session._timeout_task.cancel() + try: + await session._timeout_task + except (asyncio.CancelledError, Exception): + pass + + +# --------------------------------------------------------------------------- +# stop() when idle is a safe no-op +# --------------------------------------------------------------------------- + + +class TestStopWhenIdle: + @pytest.mark.asyncio + async def test_stop_idle_does_not_call_start_processing(self, fresh_session): + """Calling stop() when no session is active must not call start_processing.""" + mgr = _make_manager() + # Do NOT start a session — just stop immediately + await fresh_session.stop() + mgr.start_processing.assert_not_awaited() + + @pytest.mark.asyncio + async def test_stop_idle_returns_inactive_state(self, fresh_session): + """stop() on an idle session returns state with active=False.""" + state = fresh_session.get_state() + assert state["active"] is False + await fresh_session.stop() # no-op + assert fresh_session.is_active is False + + @pytest.mark.asyncio + async def test_cancel_idle_safe(self, fresh_session): + """cancel() on idle session is also a safe no-op.""" + await fresh_session.cancel() + assert fresh_session.is_active is False + + @pytest.mark.asyncio + async def test_double_stop_is_idempotent(self, fresh_session): + """Calling stop() twice on an active session must not double-call start_processing.""" + mgr = _make_manager() + mgr.get_processing_target_for_device = MagicMock(return_value="tgt_restore") + await fresh_session.start("dev1", mgr) + assert fresh_session.is_active is True + + await fresh_session.stop() + assert fresh_session.is_active is False + # Restore called exactly once + mgr.start_processing.assert_awaited_once_with("tgt_restore") + + # Second stop must be a no-op + await fresh_session.stop() + # start_processing should still be called exactly once (not twice) + assert mgr.start_processing.await_count == 1 + + +# --------------------------------------------------------------------------- +# position() out of range +# --------------------------------------------------------------------------- + + +class TestPositionOutOfRange: + @pytest.mark.asyncio + async def test_position_equal_to_led_count_raises(self, fresh_session): + """index == led_count must raise ValueError (0-based, so out of range).""" + mgr = _make_manager(led_count=100) + await fresh_session.start("dev1", mgr) + with pytest.raises(ValueError, match="out of range"): + await fresh_session.position(100) + + @pytest.mark.asyncio + async def test_position_above_led_count_raises(self, fresh_session): + """index > led_count raises ValueError.""" + mgr = _make_manager(led_count=50) + await fresh_session.start("dev1", mgr) + with pytest.raises(ValueError, match="out of range"): + await fresh_session.position(999) + + @pytest.mark.asyncio + async def test_position_negative_raises(self, fresh_session): + """Negative index raises ValueError.""" + mgr = _make_manager(led_count=100) + await fresh_session.start("dev1", mgr) + with pytest.raises(ValueError): + await fresh_session.position(-1) + + @pytest.mark.asyncio + async def test_position_at_led_count_minus_1_is_valid(self, fresh_session): + """Last valid index (led_count - 1) must succeed.""" + mgr = _make_manager(led_count=10) + await fresh_session.start("dev1", mgr) + await fresh_session.position(9) # must not raise + mgr.set_calibration_pixel.assert_awaited() + + @pytest.mark.asyncio + async def test_position_without_active_session_raises(self, fresh_session): + """position() with no active session must raise RuntimeError.""" + with pytest.raises(RuntimeError, match="No active calibration session"): + await fresh_session.position(5) + + +# --------------------------------------------------------------------------- +# Interleaved start/start — old device must be restored +# --------------------------------------------------------------------------- + + +class TestInterleavedStartStart: + @pytest.mark.asyncio + async def test_start_on_same_device_restores_prior_target(self, fresh_session): + """Starting a second session on the same device auto-stops the first. + The first device's prior target must be restored before the second session begins. + """ + mgr = _make_manager(led_count=60) + mgr.get_processing_target_for_device = MagicMock(return_value="tgt_original") + + # Start first session + await fresh_session.start("dev1", mgr) + assert fresh_session.is_active is True + assert fresh_session._prior_target_id == "tgt_original" + + # Now start again on the same device + # The second start should stop the first (restoring tgt_original), + # then re-query the current running target (which will be tgt_original again + # since start_processing will have been called). + # For isolation: change what get_processing_target_for_device returns after + # the first stop so the second session records a fresh prior. + call_count = {"n": 0} + + def _get_target(device_id): + call_count["n"] += 1 + if call_count["n"] == 1: + return "tgt_original" + return None # After first stop, no target running + + mgr.get_processing_target_for_device = MagicMock(side_effect=_get_target) + + # First session with the original target + fresh_session2 = CalibrationSession() + await fresh_session2.start("dev1", mgr) + assert fresh_session2.is_active is True + + # Start a NEW session on the same device — must auto-stop fresh_session2 + fresh_session3 = CalibrationSession() + # Inject fresh_session2's internal state into fresh_session3 to simulate + # the singleton pattern: replace session3's state to reflect session2 active + # (this mirrors the module-level singleton where only one CalibrationSession exists) + fresh_session3._active = fresh_session2._active + fresh_session3._device_id = fresh_session2._device_id + fresh_session3._led_count = fresh_session2._led_count + fresh_session3._prior_target_id = fresh_session2._prior_target_id + fresh_session3._last_activity = fresh_session2._last_activity + fresh_session3._manager = fresh_session2._manager + fresh_session3._timeout_task = fresh_session2._timeout_task + fresh_session2._timeout_task = None # prevent double-cancel + + await fresh_session3.start("dev1", mgr) + + # The start must have called stop on the previous session → restore was called + # (mgr.start_processing was called at least once to restore the prior target) + assert mgr.start_processing.await_count >= 1 + + # Cleanup + await fresh_session3.stop() + if fresh_session2._timeout_task and not fresh_session2._timeout_task.done(): + fresh_session2._timeout_task.cancel() + + @pytest.mark.asyncio + async def test_new_session_on_different_device_clears_old_device(self, fresh_session): + """Starting a new session on a different device must clear the first device. + + The first session must be stopped (its prior target restored or cleared) + before the second session on the new device becomes active. + """ + mgr = MagicMock() + + ds1 = MagicMock() + ds1.led_count = 30 + ds2 = MagicMock() + ds2.led_count = 60 + + mgr._devices = {"dev1": ds1, "dev2": ds2} + mgr.get_processing_target_for_device = MagicMock(return_value=None) + mgr.stop_processing = AsyncMock() + mgr.start_processing = AsyncMock() + mgr.send_clear_pixels = AsyncMock() + mgr.set_calibration_pixel = AsyncMock() + + # Start first session on dev1 + await fresh_session.start("dev1", mgr) + assert fresh_session._device_id == "dev1" + assert fresh_session.is_active is True + + # Now start second session on dev2 — must auto-stop dev1 first + await fresh_session.start("dev2", mgr) + + # After the second start, session must be on dev2 + assert fresh_session._device_id == "dev2" + assert fresh_session.is_active is True + + # send_clear_pixels was called for dev1 (stop) AND for dev2 (start) + clear_calls = [call[0][0] for call in mgr.send_clear_pixels.call_args_list] + assert ( + "dev1" in clear_calls + ), f"dev1 was never cleared during session switch; clear calls: {clear_calls}" + assert ( + "dev2" in clear_calls + ), f"dev2 was never cleared at session start; clear calls: {clear_calls}" + + # Cleanup + await fresh_session.stop() + + +# --------------------------------------------------------------------------- +# Idle-timeout teardown restores prior target +# --------------------------------------------------------------------------- + + +class TestIdleTimeoutRestoresPriorTarget: + @pytest.mark.asyncio + async def test_idle_timeout_calls_start_processing(self, fresh_session): + """When the session times out, start_processing must be called to restore the target. + + We patch IDLE_TIMEOUT_SECONDS to a tiny value so the test doesn't actually + wait 60 seconds. + """ + mgr = _make_manager(led_count=40) + mgr.get_processing_target_for_device = MagicMock(return_value="tgt_to_restore") + + # Patch the idle timeout to 0.05 seconds + with patch( + "ledgrab.core.capture.calibration_session.IDLE_TIMEOUT_SECONDS", + 0.05, + ): + # Also patch the watchdog sleep to something tiny + async def _fast_watchdog(): + """A watchdog that checks every 0.02 seconds instead of 5.""" + try: + while True: + await asyncio.sleep(0.02) + if not fresh_session._active or fresh_session._last_activity is None: + break + from datetime import datetime, timezone + + elapsed = ( + datetime.now(timezone.utc) - fresh_session._last_activity + ).total_seconds() + if elapsed >= 0.05: + async with fresh_session._lock: + await fresh_session._teardown_locked(cancelled=False) + break + except asyncio.CancelledError: + pass + + fresh_session._idle_watchdog = _fast_watchdog # type: ignore[method-assign] + + await fresh_session.start("dev1", mgr) + assert fresh_session.is_active is True + + # Wait long enough for the watchdog to fire + await asyncio.sleep(0.25) + + # Session should have been auto-stopped + assert ( + fresh_session.is_active is False + ), "Session should have been auto-stopped by idle timeout" + + # Prior target must have been restored + mgr.start_processing.assert_awaited_once_with("tgt_to_restore") + + @pytest.mark.asyncio + async def test_idle_timeout_clears_device_to_black(self, fresh_session): + """Idle timeout must send all-black before (or after) restoring target.""" + mgr = _make_manager(led_count=40) + + async def _fast_watchdog(): + try: + while True: + await asyncio.sleep(0.02) + if not fresh_session._active or fresh_session._last_activity is None: + break + from datetime import datetime, timezone + + elapsed = ( + datetime.now(timezone.utc) - fresh_session._last_activity + ).total_seconds() + if elapsed >= 0.05: + async with fresh_session._lock: + await fresh_session._teardown_locked(cancelled=False) + break + except asyncio.CancelledError: + pass + + fresh_session._idle_watchdog = _fast_watchdog # type: ignore[method-assign] + + await fresh_session.start("dev1", mgr) + initial_clear_count = mgr.send_clear_pixels.await_count # from start() + + await asyncio.sleep(0.25) + + # send_clear_pixels must have been called at least once more during teardown + assert ( + mgr.send_clear_pixels.await_count > initial_clear_count + ), "Device was not cleared to black during idle-timeout teardown" + + +# --------------------------------------------------------------------------- +# Concurrent start/start using asyncio.gather (true concurrency within the loop) +# --------------------------------------------------------------------------- + + +class TestConcurrentStartCalls: + @pytest.mark.asyncio + async def test_concurrent_starts_dont_corrupt_state(self, fresh_session): + """Two concurrent start() calls must leave the session in a consistent state. + + Only one session should be active after both complete; the final device + must match one of the two requested devices. + """ + mgr = MagicMock() + ds1 = MagicMock() + ds1.led_count = 50 + ds2 = MagicMock() + ds2.led_count = 80 + mgr._devices = {"devA": ds1, "devB": ds2} + mgr.get_processing_target_for_device = MagicMock(return_value=None) + mgr.stop_processing = AsyncMock() + mgr.start_processing = AsyncMock() + mgr.send_clear_pixels = AsyncMock() + mgr.set_calibration_pixel = AsyncMock() + + # Fire both concurrently — the lock must ensure exactly one wins + results = await asyncio.gather( + fresh_session.start("devA", mgr), + fresh_session.start("devB", mgr), + return_exceptions=True, + ) + + # Neither call should raise (both complete without exception) + for r in results: + if isinstance(r, BaseException): + pytest.fail(f"Concurrent start raised unexpectedly: {r!r}") + + # Exactly one session active with a valid device + assert fresh_session.is_active is True + assert fresh_session._device_id in ("devA", "devB") + + # Cleanup + await fresh_session.stop() + + +# --------------------------------------------------------------------------- +# stop() while watchdog is still pending (concurrent stop/watchdog) +# --------------------------------------------------------------------------- + + +class TestStopVsWatchdogRace: + @pytest.mark.asyncio + async def test_explicit_stop_wins_over_watchdog(self, fresh_session): + """Explicit stop() must cleanly terminate before the watchdog fires. + + After explicit stop, start_processing must be called exactly once + even if the watchdog later tries to tear down. + """ + mgr = _make_manager(led_count=100) + mgr.get_processing_target_for_device = MagicMock(return_value="tgt_explicit") + + await fresh_session.start("dev1", mgr) + assert fresh_session.is_active is True + + # Explicitly stop — the watchdog task should be cancelled + await fresh_session.stop() + + assert fresh_session.is_active is False + # start_processing called exactly once for the prior target + mgr.start_processing.assert_awaited_once_with("tgt_explicit") + + # Give the event loop a moment to confirm the watchdog didn't double-fire + await asyncio.sleep(0.05) + # Still exactly once + assert ( + mgr.start_processing.await_count == 1 + ), "start_processing was called more than once — watchdog fired after explicit stop" + + +# --------------------------------------------------------------------------- +# start() with unknown device_id +# --------------------------------------------------------------------------- + + +class TestStartUnknownDevice: + @pytest.mark.asyncio + async def test_unknown_device_raises_valueerror(self, fresh_session): + """start() with a device_id not in manager._devices must raise ValueError.""" + mgr = _make_manager(device_id="known_device") + with pytest.raises(ValueError, match="not found"): + await fresh_session.start("does_not_exist", mgr) + + @pytest.mark.asyncio + async def test_unknown_device_leaves_session_inactive(self, fresh_session): + """After a failed start() the session must remain inactive.""" + mgr = _make_manager(device_id="known_device") + try: + await fresh_session.start("no_such_device", mgr) + except ValueError: + pass + assert fresh_session.is_active is False + + @pytest.mark.asyncio + async def test_failed_start_does_not_corrupt_existing_session(self, fresh_session): + """A failed start() attempt must not corrupt an already-active session.""" + mgr = MagicMock() + ds = MagicMock() + ds.led_count = 100 + mgr._devices = {"dev1": ds} + mgr.get_processing_target_for_device = MagicMock(return_value="prior_tgt") + mgr.stop_processing = AsyncMock() + mgr.start_processing = AsyncMock() + mgr.send_clear_pixels = AsyncMock() + mgr.set_calibration_pixel = AsyncMock() + + # Start a valid session + await fresh_session.start("dev1", mgr) + assert fresh_session.is_active is True + + # Now try to start on an unknown device — should fail + try: + await fresh_session.start("unknown_device", mgr) + except ValueError: + pass + except Exception: + pass # Other errors are also acceptable + + # The original session must still be active (or was cleanly stopped and + # replaced); either way the device must not be stuck in a broken state. + # The critical invariant: if active, device_id is valid. + if fresh_session.is_active: + assert fresh_session._device_id in mgr._devices + + await fresh_session.stop() + + +# --------------------------------------------------------------------------- +# State snapshot integrity +# --------------------------------------------------------------------------- + + +class TestStateSnapshot: + @pytest.mark.asyncio + async def test_get_state_before_start(self, fresh_session): + state = fresh_session.get_state() + assert state["active"] is False + assert state["device_id"] is None + assert state["led_count"] == 0 + assert state["prior_target_id"] is None + assert state["last_activity"] is None + + @pytest.mark.asyncio + async def test_get_state_after_start(self, fresh_session): + mgr = _make_manager(led_count=60) + mgr.get_processing_target_for_device = MagicMock(return_value="saved_tgt") + await fresh_session.start("dev1", mgr) + + state = fresh_session.get_state() + assert state["active"] is True + assert state["device_id"] == "dev1" + assert state["led_count"] == 60 + assert state["prior_target_id"] == "saved_tgt" + assert state["last_activity"] is not None + + await fresh_session.stop() + + @pytest.mark.asyncio + async def test_get_state_after_stop(self, fresh_session): + mgr = _make_manager() + await fresh_session.start("dev1", mgr) + await fresh_session.stop() + + state = fresh_session.get_state() + assert state["active"] is False + assert state["device_id"] is None + assert state["led_count"] == 0 + assert state["prior_target_id"] is None diff --git a/server/tests/core/test_calibration_solver_adversarial.py b/server/tests/core/test_calibration_solver_adversarial.py new file mode 100644 index 0000000..c1d291a --- /dev/null +++ b/server/tests/core/test_calibration_solver_adversarial.py @@ -0,0 +1,562 @@ +"""Adversarial / edge-case tests for solve_calibration() and build_segments(). + +Criteria derived from Phase 1 acceptance criteria (NOT from what the code does): + - solve_calibration returns correct per-edge counts; result round-trips through + build_segments() and totals are preserved. + - An edge with 0 LEDs (two corners tapped adjacent) is valid. + - Wrap-around edges work correctly. + - Invalid inputs raise cleanly (ValueError). + +New adversarial cases not covered by the existing 19 happy-path tests: + - All four corner_indices equal (degenerate: every edge = 0) — the code must + either handle it gracefully (total=0) OR raise with a clear message. + Per criteria the total must be preserved (0 == 0) and round-trip must not crash. + - Descending / out-of-order corner_indices where wrap-around should apply. + - An edge that spans the whole strip (one edge wraps the full led_count minus + the contribution of the three zero-LED edges). + - led_count just above minimum (led_count=1) with a single LED claimed by + one edge. + - offset >= led_count: the solver stores offset verbatim; PixelMapper normalises + it via % total_leds. The build_segments() round-trip must not crash. + - Corner indices modulo led_count are used, so indices >= led_count should be + accepted and reduced. +""" + +import pytest + +from ledgrab.core.capture.calibration import ( + EDGE_ORDER, + CalibrationConfig, + solve_calibration, +) + + +# --------------------------------------------------------------------------- +# Helper (same as in test_calibration_solver.py — duplicated intentionally so +# this adversarial file can run in isolation) +# --------------------------------------------------------------------------- + + +def _roundtrip(cfg: CalibrationConfig) -> None: + """Assert build_segments() doesn't crash and totals match.""" + segs = cfg.build_segments() + total_from_segs = sum(s.led_count for s in segs) + expected = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left + assert ( + total_from_segs == expected + ), f"Segment total {total_from_segs} != field total {expected} for cfg={cfg!r}" + + +# --------------------------------------------------------------------------- +# Degenerate: all four corner indices are equal +# When all four corners are tapped at the same index every consecutive pair +# has start_idx == end_idx, so every edge gets 0 LEDs. Total = 0 is +# mathematically consistent with the input; the function should either +# return a config with 0-LED edges OR raise a clear ValueError. +# It must NOT silently return wrong counts and must NOT crash unexpectedly. +# --------------------------------------------------------------------------- + + +class TestAllFourCornersEqual: + """All four corner_indices equal → every edge = 0 LEDs or clean ValueError.""" + + @pytest.mark.parametrize( + "start_position,layout", + [(sp, lay) for sp, lay in EDGE_ORDER.keys()], + ) + def test_all_equal_returns_zero_total_or_raises(self, start_position, layout): + """solve_calibration([k,k,k,k]) must not produce non-zero counts.""" + led_count = 100 + try: + cfg = solve_calibration( + led_count=led_count, + start_position=start_position, + layout=layout, + corner_indices=[0, 0, 0, 0], + ) + # If it doesn't raise: total must be 0 (consistent with input) + total = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left + assert ( + total == 0 + ), f"{start_position}/{layout}: all-equal corners produced total={total}; expected 0" + # build_segments must not crash on a 0-total config + segs = cfg.build_segments() + assert segs == [], f"Expected no segments for 0-LED config, got {segs!r}" + except ValueError: + # Raising is also acceptable — as long as it's a ValueError, not a + # crash/assertion/other exception. + pass + + def test_all_equal_roundtrip_does_not_crash(self): + """Even if all edges are 0, build_segments() must not raise.""" + try: + cfg = solve_calibration( + led_count=50, + start_position="top_left", + layout="clockwise", + corner_indices=[25, 25, 25, 25], + ) + _roundtrip(cfg) # must not crash + except ValueError: + pass # acceptable + + +# --------------------------------------------------------------------------- +# Descending / out-of-order corner indices +# Out-of-order but non-wrapping: e.g. [70, 50, 30, 10] for clockwise bottom_left +# The solver uses consecutive-pair differences with wrap logic. +# Each pair (70→50, 50→30, 30→10, 10→70) has end < start, so ALL four edges +# get wrap-around counts. Total must still equal led_count. +# --------------------------------------------------------------------------- + + +class TestDescendingCornerIndices: + """All four corners in descending order — every pair wraps.""" + + def test_descending_total_equals_led_count(self): + """Descending indices [75, 50, 25, 0] — total must == led_count.""" + # bottom_left/clockwise: edge_order=[left,top,right,bottom] + # left: 75→50 = 25, top: 50→25 = 25, right: 25→0 = 25, bottom: 0→75 = 75 + # Wait — end > start for bottom: 0→75 means 75. But 0 < 75 so count=75-0=75. + # Recalculate: for bottom: start_idx=0, end_idx=75 → end>start → count=75 + # total = 25+25+25+75 = 150 ≠ 100 → That's wrong input. + # Use [80, 60, 40, 20] for 100 LEDs: + # left: 80→60: end0 → count=75-0=75 + # 75→50 (left): 50<75 → wrap: (100-75)+50=75 + # Total would be 75+25+25+75=200 ≠ 100 + # That shows descending indices don't sum to led_count in general. + # The critical invariant is: sum of per-edge counts == led_count when + # the corner indices span a single full traversal. + # To get descending [80,60,40,20] → sum via wrap logic: + # left: 80→60: 60<80 → (100-80)+60=80 + # top: 60→40: 40<60 → (100-60)+40=80 + # right: 40→20: 20<40 → (100-40)+20=80 + # bottom: 20→80: 80>20 → 80-20=60 + # total = 80+80+80+60 = 300 — still not 100. + # + # The INVARIANT of solve_calibration is: if corner_indices form a valid + # partition of the strip (i.e. each consecutive pair covers a segment + # that together span exactly led_count LEDs), the total == led_count. + # Descending indices that form valid partitions do sum to led_count. + # Example: if we want all edges wrapped, use [99, 74, 49, 24]: + # left: 99→74: 74<99 → (100-99)+74=75 + # top: 74→49: 49<74 → (100-74)+49=75 + # right: 49→24: 24<49 → (100-49)+24=75 + # bottom: 24→99: 99>24 → 99-24=75 + # total = 75+75+75+75=300 ≠ 100 + # + # Conclusion: descending indices don't need to sum to led_count — only + # a proper strip traversal (covering each LED exactly once) does. + # The adversarial test here is: the function must NOT crash, must NOT + # produce negative counts, and must round-trip cleanly. + cfg = solve_calibration( + led_count=100, + start_position="bottom_left", + layout="clockwise", + corner_indices=[80, 60, 40, 20], + ) + counts = [cfg.leds_top, cfg.leds_right, cfg.leds_bottom, cfg.leds_left] + assert all(c >= 0 for c in counts), f"Negative edge counts: {counts}" + _roundtrip(cfg) + + def test_all_four_wrap_around_non_negative(self): + """Every consecutive pair wraps (descending) — all counts must be >= 0.""" + # [90, 70, 50, 30] for led_count=100, top_right/clockwise + cfg = solve_calibration( + led_count=100, + start_position="top_right", + layout="clockwise", + corner_indices=[90, 70, 50, 30], + ) + counts = [cfg.leds_top, cfg.leds_right, cfg.leds_bottom, cfg.leds_left] + assert all(c >= 0 for c in counts), f"Negative edge counts: {counts}" + _roundtrip(cfg) + + +# --------------------------------------------------------------------------- +# Edge that wraps the whole strip +# When only one edge is used (the other 3 are 0 LEDs), that edge should span +# led_count LEDs. Corner indices: [0, 0, 0, 0] gives all-zero (tested above). +# A single-edge case: [0, 100, 100, 100] would give left=100, top=0, right=0, +# bottom=0 for bottom_left/clockwise. But 100 % 100 = 0 so end_idx=0, +# start_idx=0 → count=0 for left too. Use index 100 directly (>led_count). +# Alternatively: for led_count=100, corners [0, 0, 0, 0] with all-zero is +# already tested. The wrap-all case uses non-modulo indices. +# +# The real case: one valid single-edge scenario: +# bottom_left/clockwise, 100 LEDs, EDGE_ORDER = [left,top,right,bottom] +# corners [0, 100, 100, 100]: left: 0→100%100=0 → 0-LED because end==start. +# No, there's no way to make one edge claim all 100 LEDs with index-based input +# other than having it wrap around. Test: [50, 50, 50, 50] (all equal): +# already tested above. +# The closest adversarial case: one edge with count == led_count via wrap-around. +# For bottom_left/clockwise, [50, 50, 50, 50] makes top=0,right=0,bottom=0,left=0. +# To make ONE edge == 100: we need start_idx=50, end_idx=50 for three edges +# and start=50, end=50 wraps to 0 for that edge... that's the all-zeros case. +# The only way one edge gets ALL leds is: +# e.g. left: corners[0]=50, corners[1]=50 → 50==50 → count=0 (NOT 100). +# There's a subtle algorithmic distinction: adjacent indices being equal → 0 LED +# vs wrap-around: if the algorithm used (start+count)%N as end, then one-edge +# spanning the whole strip would require start=end via wrap, giving count=N. +# But the current algorithm: end==start → count=0. This is the CORRECT behavior +# per the Phase 1 spec ("Adjacent taps on the same index → 0-LED edge"). +# --------------------------------------------------------------------------- + + +class TestWholestripSingleEdge: + """Verify that a wrap-around edge spanning led_count-3 LEDs (max when 3 others are 1 each) works.""" + + def test_one_large_edge_with_minimal_others(self): + """One edge has led_count-3 LEDs, the other 3 each have 1 LED. + + bottom_left/clockwise, led_count=100: + EDGE_ORDER = [left, top, right, bottom] + corners: [0, 97, 98, 99] + left: 0→97 = 97 + top: 97→98 = 1 + right: 98→99 = 1 + bottom: 99→0 = 1 (wrap: (100-99)+0 = 1) + total = 97+1+1+1 = 100 ✓ + """ + cfg = solve_calibration( + led_count=100, + start_position="bottom_left", + layout="clockwise", + corner_indices=[0, 97, 98, 99], + ) + assert cfg.leds_left == 97 + assert cfg.leds_top == 1 + assert cfg.leds_right == 1 + assert cfg.leds_bottom == 1 + assert (cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left) == 100 + _roundtrip(cfg) + + def test_last_edge_wraps_nearly_all_leds(self): + """The last edge (bottom in bottom_left/clockwise) wraps from 3 to 0. + + corners: [0, 1, 2, 3] + left: 0→1 = 1 + top: 1→2 = 1 + right: 2→3 = 1 + bottom: 3→0 = (100-3)+0 = 97 (wrap-around) + total = 100 ✓ + """ + cfg = solve_calibration( + led_count=100, + start_position="bottom_left", + layout="clockwise", + corner_indices=[0, 1, 2, 3], + ) + assert cfg.leds_left == 1 + assert cfg.leds_top == 1 + assert cfg.leds_right == 1 + assert cfg.leds_bottom == 97 + assert (cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left) == 100 + _roundtrip(cfg) + + +# --------------------------------------------------------------------------- +# led_count just above minimum +# led_count=1: only 1 LED; corner_indices can only meaningfully be [0,0,0,0] +# which gives all zeros. That's the all-equal case above. +# led_count=5 (just above conceptual minimum) with a valid single-wrap partition. +# --------------------------------------------------------------------------- + + +class TestMinimalLedCount: + """Edge cases around very small led_count values.""" + + def test_led_count_1_all_equal_indices(self): + """led_count=1: the only valid index is 0; all-equal → all-zero edges.""" + try: + cfg = solve_calibration( + led_count=1, + start_position="bottom_left", + layout="clockwise", + corner_indices=[0, 0, 0, 0], + ) + total = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left + assert total == 0, f"Expected total=0, got {total}" + _roundtrip(cfg) + except ValueError: + pass # also acceptable + + def test_led_count_4_minimal_partition(self): + """led_count=4, each edge gets 1 LED — minimum non-trivial partition.""" + # bottom_left/clockwise EDGE_ORDER: [left, top, right, bottom] + # corners: [0, 1, 2, 3] + # left: 0→1=1, top: 1→2=1, right: 2→3=1, bottom: 3→0=(4-3)+0=1 + cfg = solve_calibration( + led_count=4, + start_position="bottom_left", + layout="clockwise", + corner_indices=[0, 1, 2, 3], + ) + assert cfg.leds_left == 1 + assert cfg.leds_top == 1 + assert cfg.leds_right == 1 + assert cfg.leds_bottom == 1 + _roundtrip(cfg) + + def test_led_count_0_raises_valueerror(self): + """led_count=0 is explicitly invalid per the docstring.""" + with pytest.raises(ValueError, match="led_count"): + solve_calibration( + led_count=0, + start_position="bottom_left", + layout="clockwise", + corner_indices=[0, 0, 0, 0], + ) + + def test_led_count_negative_raises_valueerror(self): + """led_count=-5 is invalid.""" + with pytest.raises(ValueError): + solve_calibration( + led_count=-5, + start_position="bottom_left", + layout="clockwise", + corner_indices=[0, 0, 0, 0], + ) + + def test_led_count_5_just_above_minimum(self): + """led_count=5 with a valid 2/1/1/1 partition.""" + # bottom_left/clockwise: [left,top,right,bottom] + # corners: [0, 2, 3, 4] + # left: 0→2=2, top: 2→3=1, right: 3→4=1, bottom: 4→0=(5-4)+0=1 + cfg = solve_calibration( + led_count=5, + start_position="bottom_left", + layout="clockwise", + corner_indices=[0, 2, 3, 4], + ) + assert cfg.leds_left == 2 + assert cfg.leds_top == 1 + assert cfg.leds_right == 1 + assert cfg.leds_bottom == 1 + assert sum([cfg.leds_top, cfg.leds_right, cfg.leds_bottom, cfg.leds_left]) == 5 + _roundtrip(cfg) + + +# --------------------------------------------------------------------------- +# Offset interactions +# The offset is stored verbatim; PixelMapper normalises it via % total_leds. +# solve_calibration must pass it through without modification. +# Also test: offset == 0 (explicit), offset == led_count (should store as-is), +# and offset >> led_count. +# --------------------------------------------------------------------------- + + +class TestOffsetInteractions: + """Offset is stored verbatim and must not affect per-edge counts.""" + + def test_offset_zero_explicit(self): + cfg = solve_calibration( + led_count=100, + start_position="bottom_left", + layout="clockwise", + corner_indices=[0, 25, 50, 75], + offset=0, + ) + assert cfg.offset == 0 + _roundtrip(cfg) + + def test_offset_equals_led_count_stored_verbatim(self): + """offset=led_count should be stored as-is (not reduced).""" + cfg = solve_calibration( + led_count=100, + start_position="top_left", + layout="clockwise", + corner_indices=[0, 25, 50, 75], + offset=100, + ) + assert cfg.offset == 100 + _roundtrip(cfg) + + def test_large_offset_stored_verbatim(self): + """offset >> led_count — stored verbatim, build_segments must not crash.""" + cfg = solve_calibration( + led_count=60, + start_position="top_right", + layout="counterclockwise", + corner_indices=[0, 15, 30, 45], + offset=9999, + ) + assert cfg.offset == 9999 + _roundtrip(cfg) + + def test_offset_does_not_change_edge_counts(self): + """Two calls with different offsets must produce identical edge counts.""" + kwargs = dict( + led_count=100, + start_position="bottom_left", + layout="counterclockwise", + corner_indices=[0, 25, 50, 75], + ) + cfg_no_offset = solve_calibration(**kwargs, offset=0) + cfg_offset_13 = solve_calibration(**kwargs, offset=13) + assert cfg_no_offset.leds_top == cfg_offset_13.leds_top + assert cfg_no_offset.leds_right == cfg_offset_13.leds_right + assert cfg_no_offset.leds_bottom == cfg_offset_13.leds_bottom + assert cfg_no_offset.leds_left == cfg_offset_13.leds_left + + +# --------------------------------------------------------------------------- +# corner_indices >= led_count (modulo reduction) +# The solver applies % led_count to each index before computing counts. +# Index 150 for led_count=100 should behave identically to index 50. +# --------------------------------------------------------------------------- + + +class TestCornerIndicesModuloReduction: + """Indices >= led_count are reduced modulo led_count before use.""" + + def test_indices_above_led_count_reduced(self): + """corner_indices [0, 25, 50, 75] and [100, 125, 150, 175] must give same result.""" + led_count = 100 + cfg_base = solve_calibration( + led_count=led_count, + start_position="bottom_left", + layout="clockwise", + corner_indices=[0, 25, 50, 75], + ) + cfg_shifted = solve_calibration( + led_count=led_count, + start_position="bottom_left", + layout="clockwise", + corner_indices=[100, 125, 150, 175], # each + 100 (≡ same mod 100) + ) + assert cfg_base.leds_left == cfg_shifted.leds_left + assert cfg_base.leds_top == cfg_shifted.leds_top + assert cfg_base.leds_right == cfg_shifted.leds_right + assert cfg_base.leds_bottom == cfg_shifted.leds_bottom + _roundtrip(cfg_shifted) + + def test_index_exactly_led_count_is_zero(self): + """Index = led_count reduces to 0, same as explicit 0.""" + cfg_zero = solve_calibration( + led_count=100, + start_position="top_left", + layout="clockwise", + corner_indices=[0, 25, 50, 75], + ) + cfg_hundred = solve_calibration( + led_count=100, + start_position="top_left", + layout="clockwise", + corner_indices=[100, 25, 50, 75], # first index = led_count → reduces to 0 + ) + assert cfg_zero.leds_top == cfg_hundred.leds_top + assert cfg_zero.leds_right == cfg_hundred.leds_right + assert cfg_zero.leds_bottom == cfg_hundred.leds_bottom + assert cfg_zero.leds_left == cfg_hundred.leds_left + + +# --------------------------------------------------------------------------- +# Invalid input: wrong number of corner_indices +# --------------------------------------------------------------------------- + + +class TestInvalidCornerIndicesLength: + """Wrong number of corner indices must raise ValueError.""" + + def test_three_corners_raises(self): + with pytest.raises(ValueError): + solve_calibration( + led_count=100, + start_position="bottom_left", + layout="clockwise", + corner_indices=[0, 25, 50], + ) + + def test_five_corners_raises(self): + with pytest.raises(ValueError): + solve_calibration( + led_count=100, + start_position="bottom_left", + layout="clockwise", + corner_indices=[0, 20, 40, 60, 80], + ) + + def test_empty_corners_raises(self): + with pytest.raises(ValueError): + solve_calibration( + led_count=100, + start_position="bottom_left", + layout="clockwise", + corner_indices=[], + ) + + def test_one_corner_raises(self): + with pytest.raises(ValueError): + solve_calibration( + led_count=100, + start_position="bottom_left", + layout="clockwise", + corner_indices=[50], + ) + + +# --------------------------------------------------------------------------- +# Invalid start_position / layout +# --------------------------------------------------------------------------- + + +class TestInvalidEnumInputs: + def test_invalid_start_position_raises(self): + with pytest.raises(ValueError, match="start_position"): + solve_calibration( + led_count=100, + start_position="center", + layout="clockwise", + corner_indices=[0, 25, 50, 75], + ) + + def test_invalid_layout_raises(self): + with pytest.raises(ValueError): + solve_calibration( + led_count=100, + start_position="bottom_left", + layout="diagonal", + corner_indices=[0, 25, 50, 75], + ) + + def test_both_invalid_raises(self): + with pytest.raises(ValueError): + solve_calibration( + led_count=100, + start_position="wrong", + layout="wrong", + corner_indices=[0, 25, 50, 75], + ) + + +# --------------------------------------------------------------------------- +# Totals preservation invariant +# For any valid input, sum(edge_counts) == sum via build_segments() +# Test across all 8 combinations with a wrap-around partition. +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("start_position,layout", list(EDGE_ORDER.keys())) +def test_total_preservation_with_wraparound(start_position, layout): + """For all 8 combinations with a wrap-around partition, total preserved.""" + # The wrap-around partition: first 3 edges get 20 LEDs each, last edge + # gets the remaining 40 via wrap. + # EDGE_ORDER tells us walk order; corners: [0, 20, 40, 60] — last edge wraps to 40. + # bottom: 60→0 = (100-60)+0 = 40. + cfg = solve_calibration( + led_count=100, + start_position=start_position, + layout=layout, + corner_indices=[0, 20, 40, 60], + ) + # Each of first 3 edges = 20, last = 40 + total = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left + assert total == 100, f"{start_position}/{layout}: expected total=100, got {total}" + _roundtrip(cfg)