fix(setup): register scaffolded target with ProcessorManager + final-review hardening
Final-review blocker: the setup scaffold created the LED output target in the store but never registered it with the ProcessorManager, so the wizard's "Start" step 404'd on a fresh setup (target not found) — the lights never started despite a success screen. Now the scaffold calls target.register_with_manager(manager) right after create (mirroring the canonical POST /output-targets route, same ValueError guard), so start_processing finds the target. Rollback unregisters via manager.remove_target before deleting the store entity, so a post-registration failure leaves no half-registered target. Also from the final review: - solve corner_indices elements now bounded ge=0 (clear 422 instead of silent modulo-wrap). - setup-wizard.ts: reuse tutorials' suppressGettingStartedTour()/TOUR_KEY instead of a duplicated 'tour_completed' literal; drop a duplicate manual-form submit listener. Tests: + adversarial pass over the whole feature (solver/session/scaffold edge cases) and a scaffold->register->startable regression test. Full suite 2149 passed / 2 skipped; tsc clean; build passes; ruff clean.
This commit is contained in:
@@ -36,11 +36,13 @@ from ledgrab.api.dependencies import (
|
||||
get_device_store,
|
||||
get_output_target_store,
|
||||
get_picture_source_store,
|
||||
get_processor_manager,
|
||||
get_template_store,
|
||||
)
|
||||
from ledgrab.api.schemas.setup import ScaffoldRequest, ScaffoldResponse
|
||||
from ledgrab.core.capture.calibration import calibration_from_dict, create_default_calibration
|
||||
from ledgrab.core.capture_engines.factory import EngineRegistry
|
||||
from ledgrab.core.processing.processor_manager import ProcessorManager
|
||||
from ledgrab.storage.base_store import EntityNotFoundError
|
||||
from ledgrab.storage.color_strip_store import ColorStripStore
|
||||
from ledgrab.storage.output_target_store import OutputTargetStore
|
||||
@@ -116,11 +118,16 @@ def _rollback(
|
||||
picture_source_store: PictureSourceStore,
|
||||
css_store: ColorStripStore,
|
||||
output_target_store: OutputTargetStore,
|
||||
manager: ProcessorManager | None = None,
|
||||
) -> None:
|
||||
"""Delete entities created during this call, in reverse order.
|
||||
|
||||
Only entities listed in ``created_ids`` are deleted; reused/pre-existing
|
||||
entities (including the device) are never touched.
|
||||
|
||||
If *manager* is provided, any ``output_target`` entity in the rollback set
|
||||
is also unregistered from the ProcessorManager before store deletion, so no
|
||||
half-registered target is left behind.
|
||||
"""
|
||||
store_map: dict[str, Any] = {
|
||||
"capture_template": template_store,
|
||||
@@ -129,6 +136,18 @@ def _rollback(
|
||||
"output_target": output_target_store,
|
||||
}
|
||||
for entity_type, entity_id in reversed(created_ids):
|
||||
# Unregister output targets from the processor manager first
|
||||
if entity_type == "output_target" and manager is not None:
|
||||
try:
|
||||
manager.remove_target(entity_id)
|
||||
logger.info("Scaffold rollback: unregistered target %s from manager", entity_id)
|
||||
except (ValueError, RuntimeError) as exc:
|
||||
logger.debug(
|
||||
"Scaffold rollback: manager unregister skipped for %s — %s",
|
||||
entity_id,
|
||||
exc,
|
||||
)
|
||||
|
||||
store = store_map.get(entity_type)
|
||||
if store is None:
|
||||
logger.warning("Scaffold rollback: unknown entity type %r — skipping", entity_type)
|
||||
@@ -164,6 +183,7 @@ async def scaffold_setup(
|
||||
picture_source_store: PictureSourceStore = Depends(get_picture_source_store),
|
||||
css_store: ColorStripStore = Depends(get_color_strip_store),
|
||||
output_target_store: OutputTargetStore = Depends(get_output_target_store),
|
||||
manager: ProcessorManager = Depends(get_processor_manager),
|
||||
) -> ScaffoldResponse:
|
||||
"""Create a ready-to-start LED capture chain.
|
||||
|
||||
@@ -192,6 +212,7 @@ async def scaffold_setup(
|
||||
picture_source_store=picture_source_store,
|
||||
css_store=css_store,
|
||||
output_target_store=output_target_store,
|
||||
manager=manager,
|
||||
)
|
||||
|
||||
try:
|
||||
@@ -269,6 +290,16 @@ async def scaffold_setup(
|
||||
pending_events.append(("output_target", target.id))
|
||||
logger.info("Scaffold: created output target %s", target.id)
|
||||
|
||||
# ── Step 5b: register target with ProcessorManager ───────────────────
|
||||
try:
|
||||
target.register_with_manager(manager)
|
||||
except ValueError as exc:
|
||||
logger.warning(
|
||||
"Scaffold: could not register target %s in processor manager: %s",
|
||||
target.id,
|
||||
exc,
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
_rollback(created_ids, **rollback_stores)
|
||||
raise
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""Pydantic schemas for the calibration session and solver API."""
|
||||
|
||||
from typing import List, Literal
|
||||
from typing import Annotated, List, Literal
|
||||
|
||||
from pydantic import BaseModel, Field, model_validator
|
||||
|
||||
@@ -65,14 +65,15 @@ class CalibrationSolveRequest(BaseModel):
|
||||
layout: Literal["clockwise", "counterclockwise"] = Field(
|
||||
description="Winding direction of the strip"
|
||||
)
|
||||
corner_indices: List[int] = Field(
|
||||
corner_indices: List[Annotated[int, Field(ge=0)]] = Field(
|
||||
description=(
|
||||
"Four strip indices — one per screen corner — in the strip-walk order "
|
||||
"defined by (start_position, layout). Index 0 of the strip is the "
|
||||
"start corner; the four tap positions are recorded in strip order "
|
||||
"beginning from that start corner (the solver lays edges out from "
|
||||
"led_start=0, so a non-zero physical start would require the `offset` "
|
||||
"field rather than a shifted corner_indices[0])."
|
||||
"field rather than a shifted corner_indices[0]). Each element must be "
|
||||
"non-negative (ge=0); out-of-range values yield a 422."
|
||||
),
|
||||
min_length=4,
|
||||
max_length=4,
|
||||
|
||||
@@ -27,6 +27,7 @@ import { t } from '../core/i18n.ts';
|
||||
import { showToast } from '../core/ui.ts';
|
||||
import { Modal } from '../core/modal.ts';
|
||||
import { mountAutoCalibration, unmountAutoCalibration } from './auto-calibration.ts';
|
||||
import { suppressGettingStartedTour } from './tutorials.ts';
|
||||
import {
|
||||
ICON_MONITOR, ICON_SPARKLES, ICON_DEVICE, ICON_OK, ICON_CHECK, ICON_ROCKET_ICON,
|
||||
ICON_CALIBRATION, ICON_START, ICON_SEARCH, ICON_PLUS,
|
||||
@@ -76,8 +77,6 @@ interface WizardState {
|
||||
let _state: WizardState | null = null;
|
||||
let _modal: SetupWizardModal | null = null;
|
||||
|
||||
const ONBOARDED_KEY = 'tour_completed'; // mirror tutorials.ts TOUR_KEY
|
||||
|
||||
const STEPS: WizardStep[] = ['welcome', 'device', 'display', 'scaffold', 'calibrate', 'start', 'done'];
|
||||
|
||||
// ── Modal class ────────────────────────────────────────────────────────────────
|
||||
@@ -167,7 +166,7 @@ async function _markOnboarded(): Promise<void> {
|
||||
try {
|
||||
await apiPut('/preferences/onboarding', { onboarded: true });
|
||||
// Suppress tooltip tour too — wizard owns the first-run experience
|
||||
localStorage.setItem(ONBOARDED_KEY, '1');
|
||||
suppressGettingStartedTour();
|
||||
} catch {
|
||||
// Non-fatal: UI already moved on
|
||||
}
|
||||
@@ -770,11 +769,9 @@ function _buildDoneStep(state: WizardState): string {
|
||||
</div>`;
|
||||
}
|
||||
|
||||
function _attachStepListeners(step: WizardStep): void {
|
||||
if (step === 'device') {
|
||||
const form = document.getElementById('wizard-manual-form');
|
||||
if (form) form.addEventListener('submit', (e) => void wizardAddManualDevice(e));
|
||||
}
|
||||
function _attachStepListeners(_step: WizardStep): void {
|
||||
// The manual device form uses onsubmit="wizardAddManualDevice(event)" inline —
|
||||
// no duplicate addEventListener needed here.
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -21,7 +21,7 @@ from __future__ import annotations
|
||||
import pytest
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -89,6 +89,14 @@ def event_log():
|
||||
return log
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_manager():
|
||||
"""A MagicMock ProcessorManager that silently accepts all calls."""
|
||||
mgr = MagicMock()
|
||||
mgr.remove_target = MagicMock()
|
||||
return mgr
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def setup_client(
|
||||
tmp_db,
|
||||
@@ -98,6 +106,7 @@ def setup_client(
|
||||
css_store,
|
||||
output_target_store,
|
||||
event_log,
|
||||
mock_manager,
|
||||
):
|
||||
from ledgrab.api.routes.setup import router
|
||||
from ledgrab.api.auth import verify_api_key
|
||||
@@ -111,6 +120,7 @@ def setup_client(
|
||||
app.dependency_overrides[deps.get_picture_source_store] = lambda: picture_source_store
|
||||
app.dependency_overrides[deps.get_color_strip_store] = lambda: css_store
|
||||
app.dependency_overrides[deps.get_output_target_store] = lambda: output_target_store
|
||||
app.dependency_overrides[deps.get_processor_manager] = lambda: mock_manager
|
||||
|
||||
# Capture entity events
|
||||
def _fire(entity_type, action, entity_id):
|
||||
@@ -512,3 +522,79 @@ class TestScaffoldCalibrationIntegration:
|
||||
assert updated.calibration.leds_top == 15
|
||||
assert updated.calibration.leds_right == 9
|
||||
assert updated.calibration.layout == "clockwise"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Regression: scaffold registers the output target with ProcessorManager
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestScaffoldRegistersWithManager:
|
||||
def test_scaffold_calls_add_target_on_manager(
|
||||
self,
|
||||
setup_client,
|
||||
sample_device,
|
||||
mock_manager,
|
||||
):
|
||||
"""Blocker regression: scaffold must register the created output target
|
||||
with the ProcessorManager so that a subsequent start call can find it.
|
||||
|
||||
WledOutputTarget.register_with_manager calls manager.add_target(...)
|
||||
— we assert that method was invoked with the scaffolded target id.
|
||||
"""
|
||||
resp = _scaffold(setup_client, device_id=sample_device.id)
|
||||
assert resp.status_code == 201, resp.text
|
||||
target_id = resp.json()["output_target_id"]
|
||||
|
||||
# register_with_manager → manager.add_target(target_id=..., ...)
|
||||
mock_manager.add_target.assert_called_once()
|
||||
call_kwargs = mock_manager.add_target.call_args
|
||||
assert call_kwargs.kwargs.get("target_id") == target_id, (
|
||||
f"manager.add_target was not called with target_id={target_id!r}; "
|
||||
f"actual call: {call_kwargs}"
|
||||
)
|
||||
|
||||
def test_scaffold_rollback_calls_remove_target_on_manager(
|
||||
self,
|
||||
setup_client,
|
||||
sample_device,
|
||||
output_target_store,
|
||||
mock_manager,
|
||||
):
|
||||
"""Rollback after a post-target-creation failure must call manager.remove_target
|
||||
so no half-registered target lingers in the ProcessorManager.
|
||||
|
||||
We inject a failure by making register_with_manager raise RuntimeError
|
||||
(bypassing the ValueError-only guard), which puts the outer except branch
|
||||
in play. The target IS already in created_ids at that point, so rollback
|
||||
must call manager.remove_target(target_id).
|
||||
"""
|
||||
created_target_ids: list[str] = []
|
||||
|
||||
original_create = output_target_store.create_wled_target
|
||||
|
||||
def _spy_create(*args, **kwargs):
|
||||
target = original_create(*args, **kwargs)
|
||||
created_target_ids.append(target.id)
|
||||
return target
|
||||
|
||||
output_target_store.create_wled_target = _spy_create
|
||||
try:
|
||||
# Patch register_with_manager on WledOutputTarget to raise RuntimeError —
|
||||
# RuntimeError bypasses the ValueError guard and triggers the outer except,
|
||||
# so rollback fires with the target already in created_ids.
|
||||
with patch(
|
||||
"ledgrab.storage.wled_output_target.WledOutputTarget.register_with_manager",
|
||||
side_effect=RuntimeError("Injected registration failure for rollback test"),
|
||||
):
|
||||
resp = setup_client.post(
|
||||
"/api/v1/setup/scaffold",
|
||||
json={"device_id": sample_device.id, "display_index": 0},
|
||||
)
|
||||
assert resp.status_code == 500, resp.text
|
||||
|
||||
assert len(created_target_ids) == 1, "spy did not record a created target"
|
||||
target_id = created_target_ids[0]
|
||||
mock_manager.remove_target.assert_called_with(target_id)
|
||||
finally:
|
||||
output_target_store.create_wled_target = original_create
|
||||
|
||||
@@ -0,0 +1,506 @@
|
||||
"""Adversarial tests for the setup scaffold and onboarding preference endpoints.
|
||||
|
||||
Phase 2 acceptance criteria (NOT what the code happens to do):
|
||||
- Rollback when the FINAL step (output target create) fails leaves ZERO orphans
|
||||
AND emits ZERO "created" events.
|
||||
- Reused capture template is NOT deleted on rollback.
|
||||
- display_index > 63 → 422.
|
||||
- Missing device_id → 422 (Pydantic validation before handler runs).
|
||||
- Unknown device_id → 404.
|
||||
- PUT onboarding false clears completed_at to null.
|
||||
- Corrupt stored onboarding value falls back to default (onboarded=false).
|
||||
|
||||
These fill the gaps in the existing 22 happy-path tests.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Shared fixtures (mirrors test_setup_routes.py exactly)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_db(tmp_path):
|
||||
from ledgrab.storage.database import Database
|
||||
|
||||
db = Database(tmp_path / "adv_setup.db")
|
||||
yield db
|
||||
db.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def device_store(tmp_db):
|
||||
from ledgrab.storage import DeviceStore
|
||||
|
||||
return DeviceStore(tmp_db)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def template_store(tmp_db):
|
||||
from ledgrab.storage.template_store import TemplateStore
|
||||
|
||||
return TemplateStore(tmp_db)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def picture_source_store(tmp_db):
|
||||
from ledgrab.storage.picture_source_store import PictureSourceStore
|
||||
|
||||
return PictureSourceStore(tmp_db)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def css_store(tmp_db):
|
||||
from ledgrab.storage.color_strip_store import ColorStripStore
|
||||
|
||||
return ColorStripStore(tmp_db)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def output_target_store(tmp_db):
|
||||
from ledgrab.storage.output_target_store import OutputTargetStore
|
||||
|
||||
return OutputTargetStore(tmp_db)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_device(device_store):
|
||||
return device_store.create_device(
|
||||
name="Test LED Strip",
|
||||
url="http://192.168.1.10",
|
||||
led_count=60,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def event_log():
|
||||
log = []
|
||||
return log
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_manager():
|
||||
"""A MagicMock ProcessorManager that silently accepts all calls."""
|
||||
mgr = MagicMock()
|
||||
mgr.remove_target = MagicMock()
|
||||
return mgr
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def setup_client(
|
||||
tmp_db,
|
||||
device_store,
|
||||
template_store,
|
||||
picture_source_store,
|
||||
css_store,
|
||||
output_target_store,
|
||||
event_log,
|
||||
mock_manager,
|
||||
):
|
||||
from ledgrab.api.routes.setup import router
|
||||
from ledgrab.api.auth import verify_api_key
|
||||
from ledgrab.api import dependencies as deps
|
||||
|
||||
app = FastAPI()
|
||||
app.include_router(router)
|
||||
app.dependency_overrides[verify_api_key] = lambda: "test"
|
||||
app.dependency_overrides[deps.get_device_store] = lambda: device_store
|
||||
app.dependency_overrides[deps.get_template_store] = lambda: template_store
|
||||
app.dependency_overrides[deps.get_picture_source_store] = lambda: picture_source_store
|
||||
app.dependency_overrides[deps.get_color_strip_store] = lambda: css_store
|
||||
app.dependency_overrides[deps.get_output_target_store] = lambda: output_target_store
|
||||
app.dependency_overrides[deps.get_processor_manager] = lambda: mock_manager
|
||||
|
||||
def _fire(entity_type, action, entity_id):
|
||||
event_log.append((entity_type, action, entity_id))
|
||||
|
||||
with patch("ledgrab.api.routes.setup.fire_entity_event", side_effect=_fire):
|
||||
yield TestClient(app, raise_server_exceptions=False)
|
||||
|
||||
|
||||
def _scaffold(client, **overrides):
|
||||
body = {"display_index": 0, **overrides}
|
||||
return client.post("/api/v1/setup/scaffold", json=body)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Rollback when the FINAL step (output target) fails
|
||||
# Criteria: "zero orphans AND zero 'created' events"
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFinalStepRollback:
|
||||
def test_final_step_failure_leaves_zero_orphans(
|
||||
self,
|
||||
setup_client,
|
||||
sample_device,
|
||||
picture_source_store,
|
||||
css_store,
|
||||
output_target_store,
|
||||
):
|
||||
"""output_target_store.create_wled_target failing leaves NO orphaned entities."""
|
||||
original_create = output_target_store.create_wled_target
|
||||
|
||||
def _fail(*args, **kwargs):
|
||||
raise ValueError("Injected final step failure")
|
||||
|
||||
output_target_store.create_wled_target = _fail
|
||||
try:
|
||||
resp = _scaffold(setup_client, device_id=sample_device.id)
|
||||
assert resp.status_code in (
|
||||
400,
|
||||
500,
|
||||
), f"Expected 4xx/5xx on final-step failure, got {resp.status_code}: {resp.text}"
|
||||
|
||||
# Zero picture sources remaining
|
||||
remaining_ps = picture_source_store.get_all_streams()
|
||||
assert len(remaining_ps) == 0, f"Picture sources not rolled back: {remaining_ps}"
|
||||
|
||||
# Zero color-strip sources remaining
|
||||
remaining_css = css_store.get_all_sources()
|
||||
assert len(remaining_css) == 0, f"Color-strip sources not rolled back: {remaining_css}"
|
||||
|
||||
# Zero output targets (trivially true since creation was never reached,
|
||||
# but included for completeness)
|
||||
remaining_ot = output_target_store.get_all_targets()
|
||||
assert len(remaining_ot) == 0, f"Output targets not rolled back: {remaining_ot}"
|
||||
finally:
|
||||
output_target_store.create_wled_target = original_create
|
||||
|
||||
def test_final_step_failure_emits_zero_created_events(
|
||||
self,
|
||||
setup_client,
|
||||
sample_device,
|
||||
output_target_store,
|
||||
event_log,
|
||||
):
|
||||
"""No 'created' events must be emitted when the final step fails (deferred-event contract)."""
|
||||
original_create = output_target_store.create_wled_target
|
||||
|
||||
def _fail(*args, **kwargs):
|
||||
raise ValueError("Final step injected failure")
|
||||
|
||||
output_target_store.create_wled_target = _fail
|
||||
try:
|
||||
resp = _scaffold(setup_client, device_id=sample_device.id)
|
||||
assert resp.status_code in (400, 500)
|
||||
|
||||
created_events = [(et, act) for et, act, _ in event_log if act == "created"]
|
||||
assert (
|
||||
created_events == []
|
||||
), f"'created' events leaked on final-step rollback: {created_events}"
|
||||
finally:
|
||||
output_target_store.create_wled_target = original_create
|
||||
|
||||
def test_final_step_failure_reused_template_survives(
|
||||
self,
|
||||
setup_client,
|
||||
sample_device,
|
||||
template_store,
|
||||
output_target_store,
|
||||
):
|
||||
"""A reused capture template must NOT be deleted when the final step fails."""
|
||||
templates_before = {t.id for t in template_store.get_all_templates()}
|
||||
original_create = output_target_store.create_wled_target
|
||||
|
||||
def _fail(*args, **kwargs):
|
||||
raise ValueError("Final step injected failure")
|
||||
|
||||
output_target_store.create_wled_target = _fail
|
||||
try:
|
||||
_scaffold(setup_client, device_id=sample_device.id)
|
||||
finally:
|
||||
output_target_store.create_wled_target = original_create
|
||||
|
||||
templates_after = {t.id for t in template_store.get_all_templates()}
|
||||
assert templates_before == templates_after, (
|
||||
f"Template set changed after rollback: "
|
||||
f"before={templates_before} after={templates_after}"
|
||||
)
|
||||
|
||||
def test_final_step_failure_device_not_deleted(
|
||||
self,
|
||||
setup_client,
|
||||
sample_device,
|
||||
device_store,
|
||||
output_target_store,
|
||||
):
|
||||
"""The pre-existing device must never be touched by rollback of any step."""
|
||||
original_create = output_target_store.create_wled_target
|
||||
|
||||
def _fail(*args, **kwargs):
|
||||
raise ValueError("Final step injected failure")
|
||||
|
||||
output_target_store.create_wled_target = _fail
|
||||
try:
|
||||
_scaffold(setup_client, device_id=sample_device.id)
|
||||
finally:
|
||||
output_target_store.create_wled_target = original_create
|
||||
|
||||
device = device_store.get(sample_device.id)
|
||||
assert device is not None, "Pre-existing device was deleted during rollback"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Validation: display_index bounds
|
||||
# Criteria: display_index > 63 → 422; display_index 63 → 201; negative → 422
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDisplayIndexBounds:
|
||||
def test_display_index_64_returns_422(self, setup_client, sample_device):
|
||||
"""display_index=64 (one above max) must be rejected with 422."""
|
||||
resp = _scaffold(setup_client, device_id=sample_device.id, display_index=64)
|
||||
assert (
|
||||
resp.status_code == 422
|
||||
), f"Expected 422 for display_index=64, got {resp.status_code}: {resp.text}"
|
||||
|
||||
def test_display_index_63_returns_201(self, setup_client, sample_device):
|
||||
"""display_index=63 is the maximum valid value — must be accepted."""
|
||||
resp = _scaffold(setup_client, device_id=sample_device.id, display_index=63)
|
||||
assert (
|
||||
resp.status_code == 201
|
||||
), f"Expected 201 for display_index=63, got {resp.status_code}: {resp.text}"
|
||||
|
||||
def test_display_index_0_returns_201(self, setup_client, sample_device):
|
||||
"""display_index=0 is the minimum valid value."""
|
||||
resp = _scaffold(setup_client, device_id=sample_device.id, display_index=0)
|
||||
assert (
|
||||
resp.status_code == 201
|
||||
), f"Expected 201 for display_index=0, got {resp.status_code}: {resp.text}"
|
||||
|
||||
def test_display_index_negative_1_returns_422(self, setup_client, sample_device):
|
||||
"""display_index=-1 must be rejected with 422."""
|
||||
resp = _scaffold(setup_client, device_id=sample_device.id, display_index=-1)
|
||||
assert (
|
||||
resp.status_code == 422
|
||||
), f"Expected 422 for display_index=-1, got {resp.status_code}: {resp.text}"
|
||||
|
||||
def test_display_index_very_large_returns_422(self, setup_client, sample_device):
|
||||
"""display_index=10000 must be rejected with 422."""
|
||||
resp = _scaffold(setup_client, device_id=sample_device.id, display_index=10000)
|
||||
assert (
|
||||
resp.status_code == 422
|
||||
), f"Expected 422 for display_index=10000, got {resp.status_code}: {resp.text}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Validation: missing / unknown device_id
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDeviceValidation:
|
||||
def test_missing_device_id_returns_422(self, setup_client):
|
||||
"""Omitting device_id entirely must yield 422 (Pydantic required field)."""
|
||||
resp = setup_client.post("/api/v1/setup/scaffold", json={"display_index": 0})
|
||||
assert (
|
||||
resp.status_code == 422
|
||||
), f"Expected 422 for missing device_id, got {resp.status_code}: {resp.text}"
|
||||
|
||||
def test_empty_string_device_id_handled(self, setup_client):
|
||||
"""device_id='' — should yield 404 (empty string not in device store) or 422."""
|
||||
resp = _scaffold(setup_client, device_id="")
|
||||
assert resp.status_code in (
|
||||
404,
|
||||
422,
|
||||
), f"Expected 404 or 422 for empty device_id, got {resp.status_code}: {resp.text}"
|
||||
|
||||
def test_unknown_device_id_returns_404(self, setup_client):
|
||||
"""device_id that does not exist in the store must yield 404."""
|
||||
resp = _scaffold(setup_client, device_id="device_definitely_does_not_exist")
|
||||
assert (
|
||||
resp.status_code == 404
|
||||
), f"Expected 404 for unknown device_id, got {resp.status_code}: {resp.text}"
|
||||
|
||||
def test_none_device_id_returns_422(self, setup_client):
|
||||
"""device_id=null must yield 422 (not None-convertible to str)."""
|
||||
resp = setup_client.post(
|
||||
"/api/v1/setup/scaffold", json={"device_id": None, "display_index": 0}
|
||||
)
|
||||
assert (
|
||||
resp.status_code == 422
|
||||
), f"Expected 422 for null device_id, got {resp.status_code}: {resp.text}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Rollback idempotency: calling scaffold twice with the final step failing
|
||||
# must still leave exactly zero orphans (no accumulation across calls).
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRollbackIdempotency:
|
||||
def test_two_failed_scaffolds_leave_zero_orphans(
|
||||
self,
|
||||
setup_client,
|
||||
sample_device,
|
||||
picture_source_store,
|
||||
css_store,
|
||||
output_target_store,
|
||||
):
|
||||
"""Two sequential scaffold failures must not accumulate orphans."""
|
||||
original_create = output_target_store.create_wled_target
|
||||
|
||||
def _fail(*args, **kwargs):
|
||||
raise ValueError("Always fails")
|
||||
|
||||
output_target_store.create_wled_target = _fail
|
||||
try:
|
||||
_scaffold(setup_client, device_id=sample_device.id)
|
||||
_scaffold(setup_client, device_id=sample_device.id)
|
||||
finally:
|
||||
output_target_store.create_wled_target = original_create
|
||||
|
||||
assert (
|
||||
len(picture_source_store.get_all_streams()) == 0
|
||||
), "Picture sources accumulated across two failed scaffolds"
|
||||
assert (
|
||||
len(css_store.get_all_sources()) == 0
|
||||
), "Color-strip sources accumulated across two failed scaffolds"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Onboarding: adversarial cases
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pref_client(tmp_db):
|
||||
from ledgrab.api.routes.preferences import router
|
||||
from ledgrab.api.auth import verify_api_key
|
||||
from ledgrab.api import dependencies as deps
|
||||
|
||||
app = FastAPI()
|
||||
app.include_router(router)
|
||||
app.dependency_overrides[verify_api_key] = lambda: "test"
|
||||
app.dependency_overrides[deps.get_database] = lambda: tmp_db
|
||||
|
||||
return TestClient(app)
|
||||
|
||||
|
||||
class TestOnboardingAdversarial:
|
||||
def test_put_false_clears_completed_at(self, pref_client):
|
||||
"""PUT onboarded=false must clear completed_at to null, per criteria."""
|
||||
# First mark as onboarded
|
||||
r1 = pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": True})
|
||||
assert r1.status_code == 200
|
||||
assert r1.json()["completed_at"] is not None
|
||||
|
||||
# Now set to false — completed_at must be cleared
|
||||
r2 = pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": False})
|
||||
assert r2.status_code == 200
|
||||
data = r2.json()
|
||||
assert data["onboarded"] is False
|
||||
assert (
|
||||
data["completed_at"] is None
|
||||
), f"completed_at should be null after PUT onboarded=false, got {data['completed_at']!r}"
|
||||
|
||||
def test_put_false_then_get_returns_null_completed_at(self, pref_client):
|
||||
"""After PUT false, GET must also return null completed_at (persisted correctly)."""
|
||||
pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": True})
|
||||
pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": False})
|
||||
resp = pref_client.get("/api/v1/preferences/onboarding")
|
||||
assert resp.status_code == 200
|
||||
assert (
|
||||
resp.json()["completed_at"] is None
|
||||
), "GET after PUT false should return null completed_at"
|
||||
|
||||
def test_corrupt_stored_value_falls_back_to_default(self, tmp_db, pref_client):
|
||||
"""If the stored onboarding value is corrupt, GET must fall back to default.
|
||||
|
||||
Criteria: "corrupt stored value falls back to default".
|
||||
We inject garbage into the db directly, then hit GET.
|
||||
"""
|
||||
# Inject a value that is syntactically valid JSON (dict) but fails
|
||||
# Pydantic validation because the types are wrong.
|
||||
tmp_db.set_setting("onboarded", {"onboarded": "not_a_bool", "completed_at": 12345})
|
||||
|
||||
resp = pref_client.get("/api/v1/preferences/onboarding")
|
||||
assert (
|
||||
resp.status_code == 200
|
||||
), f"Expected 200 for corrupt onboarding value, got {resp.status_code}"
|
||||
data = resp.json()
|
||||
# Must fall back to default
|
||||
assert (
|
||||
data["onboarded"] is False
|
||||
), f"Expected onboarded=false as default after corrupt value, got {data['onboarded']!r}"
|
||||
assert (
|
||||
data["completed_at"] is None
|
||||
), f"Expected completed_at=null as default, got {data['completed_at']!r}"
|
||||
|
||||
def test_corrupt_stored_value_as_wrong_type_falls_back(self, tmp_db, pref_client):
|
||||
"""Stored value is a string (not a dict) — must fall back to default."""
|
||||
tmp_db.set_setting("onboarded", "this_is_not_valid")
|
||||
|
||||
resp = pref_client.get("/api/v1/preferences/onboarding")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["onboarded"] is False
|
||||
assert data["completed_at"] is None
|
||||
|
||||
def test_corrupt_stored_null_falls_back_to_default(self, tmp_db, pref_client):
|
||||
"""Stored value is null/None — must return default (not crash)."""
|
||||
tmp_db.set_setting("onboarded", None)
|
||||
|
||||
resp = pref_client.get("/api/v1/preferences/onboarding")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["onboarded"] is False
|
||||
|
||||
def test_put_true_without_completed_at_stamps_timestamp(self, pref_client):
|
||||
"""PUT onboarded=true without completed_at must auto-stamp a timestamp."""
|
||||
resp = pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": True})
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert (
|
||||
data["completed_at"] is not None
|
||||
), "Server must auto-stamp completed_at when onboarded=true is sent without a timestamp"
|
||||
# Should be a non-empty ISO timestamp string
|
||||
assert len(data["completed_at"]) > 10
|
||||
|
||||
def test_put_true_with_completed_at_preserves_it(self, pref_client):
|
||||
"""PUT onboarded=true with explicit completed_at must preserve that value."""
|
||||
ts = "2025-03-15T10:00:00+00:00"
|
||||
resp = pref_client.put(
|
||||
"/api/v1/preferences/onboarding",
|
||||
json={"onboarded": True, "completed_at": ts},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["completed_at"] == ts
|
||||
|
||||
def test_put_false_with_completed_at_clears_it(self, pref_client):
|
||||
"""PUT onboarded=false even with a completed_at payload must clear it.
|
||||
|
||||
The criteria say: 'Setting onboarded=false clears completed_at to null.'
|
||||
"""
|
||||
ts = "2025-01-01T00:00:00+00:00"
|
||||
resp = pref_client.put(
|
||||
"/api/v1/preferences/onboarding",
|
||||
json={"onboarded": False, "completed_at": ts},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["onboarded"] is False
|
||||
assert (
|
||||
data["completed_at"] is None
|
||||
), f"completed_at should be cleared to null when onboarded=false, got {data['completed_at']!r}"
|
||||
|
||||
def test_multiple_true_puts_only_stamp_once(self, pref_client):
|
||||
"""Two successive PUT true calls — second call must preserve the original timestamp."""
|
||||
r1 = pref_client.put("/api/v1/preferences/onboarding", json={"onboarded": True})
|
||||
ts1 = r1.json()["completed_at"]
|
||||
assert ts1 is not None
|
||||
|
||||
# Explicitly provide the original timestamp in second call
|
||||
r2 = pref_client.put(
|
||||
"/api/v1/preferences/onboarding",
|
||||
json={"onboarded": True, "completed_at": ts1},
|
||||
)
|
||||
assert (
|
||||
r2.json()["completed_at"] == ts1
|
||||
), "Explicit completed_at should be preserved on second PUT"
|
||||
@@ -0,0 +1,535 @@
|
||||
"""Adversarial / concurrency tests for CalibrationSession.
|
||||
|
||||
Phase 1 acceptance criteria tested here (NOT what the code happens to do):
|
||||
- Interleaved start/start (same device, then different device) must never
|
||||
leave the old device without restore.
|
||||
- Interleaved start/stop racing the idle watchdog must not leave the device
|
||||
dark or stuck.
|
||||
- Idle-timeout teardown restores the prior target.
|
||||
- position() with index out of range → ValueError.
|
||||
- stop() when idle is a safe no-op (does not call start_processing or crash).
|
||||
- CalibrationSession lock must prevent double-teardown.
|
||||
|
||||
All tests use a fake ProcessorManager matching the shape used in
|
||||
test_calibration_routes.py.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
from ledgrab.core.capture.calibration_session import (
|
||||
CalibrationSession,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_manager(device_id: str = "dev1", led_count: int = 100) -> MagicMock:
|
||||
"""Build a minimal fake ProcessorManager."""
|
||||
mgr = MagicMock()
|
||||
ds = MagicMock()
|
||||
ds.led_count = led_count
|
||||
mgr._devices = {device_id: ds}
|
||||
mgr.get_processing_target_for_device = MagicMock(return_value=None)
|
||||
mgr.stop_processing = AsyncMock()
|
||||
mgr.start_processing = AsyncMock()
|
||||
mgr.send_clear_pixels = AsyncMock()
|
||||
mgr.set_calibration_pixel = AsyncMock()
|
||||
return mgr
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(autouse=True)
|
||||
async def fresh_session():
|
||||
"""Yield a brand-new CalibrationSession for each test (not the singleton)."""
|
||||
session = CalibrationSession()
|
||||
yield session
|
||||
# Cleanup: cancel any lingering watchdog
|
||||
if session._timeout_task and not session._timeout_task.done():
|
||||
session._timeout_task.cancel()
|
||||
try:
|
||||
await session._timeout_task
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# stop() when idle is a safe no-op
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestStopWhenIdle:
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_idle_does_not_call_start_processing(self, fresh_session):
|
||||
"""Calling stop() when no session is active must not call start_processing."""
|
||||
mgr = _make_manager()
|
||||
# Do NOT start a session — just stop immediately
|
||||
await fresh_session.stop()
|
||||
mgr.start_processing.assert_not_awaited()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_idle_returns_inactive_state(self, fresh_session):
|
||||
"""stop() on an idle session returns state with active=False."""
|
||||
state = fresh_session.get_state()
|
||||
assert state["active"] is False
|
||||
await fresh_session.stop() # no-op
|
||||
assert fresh_session.is_active is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cancel_idle_safe(self, fresh_session):
|
||||
"""cancel() on idle session is also a safe no-op."""
|
||||
await fresh_session.cancel()
|
||||
assert fresh_session.is_active is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_double_stop_is_idempotent(self, fresh_session):
|
||||
"""Calling stop() twice on an active session must not double-call start_processing."""
|
||||
mgr = _make_manager()
|
||||
mgr.get_processing_target_for_device = MagicMock(return_value="tgt_restore")
|
||||
await fresh_session.start("dev1", mgr)
|
||||
assert fresh_session.is_active is True
|
||||
|
||||
await fresh_session.stop()
|
||||
assert fresh_session.is_active is False
|
||||
# Restore called exactly once
|
||||
mgr.start_processing.assert_awaited_once_with("tgt_restore")
|
||||
|
||||
# Second stop must be a no-op
|
||||
await fresh_session.stop()
|
||||
# start_processing should still be called exactly once (not twice)
|
||||
assert mgr.start_processing.await_count == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# position() out of range
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPositionOutOfRange:
|
||||
@pytest.mark.asyncio
|
||||
async def test_position_equal_to_led_count_raises(self, fresh_session):
|
||||
"""index == led_count must raise ValueError (0-based, so out of range)."""
|
||||
mgr = _make_manager(led_count=100)
|
||||
await fresh_session.start("dev1", mgr)
|
||||
with pytest.raises(ValueError, match="out of range"):
|
||||
await fresh_session.position(100)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_position_above_led_count_raises(self, fresh_session):
|
||||
"""index > led_count raises ValueError."""
|
||||
mgr = _make_manager(led_count=50)
|
||||
await fresh_session.start("dev1", mgr)
|
||||
with pytest.raises(ValueError, match="out of range"):
|
||||
await fresh_session.position(999)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_position_negative_raises(self, fresh_session):
|
||||
"""Negative index raises ValueError."""
|
||||
mgr = _make_manager(led_count=100)
|
||||
await fresh_session.start("dev1", mgr)
|
||||
with pytest.raises(ValueError):
|
||||
await fresh_session.position(-1)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_position_at_led_count_minus_1_is_valid(self, fresh_session):
|
||||
"""Last valid index (led_count - 1) must succeed."""
|
||||
mgr = _make_manager(led_count=10)
|
||||
await fresh_session.start("dev1", mgr)
|
||||
await fresh_session.position(9) # must not raise
|
||||
mgr.set_calibration_pixel.assert_awaited()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_position_without_active_session_raises(self, fresh_session):
|
||||
"""position() with no active session must raise RuntimeError."""
|
||||
with pytest.raises(RuntimeError, match="No active calibration session"):
|
||||
await fresh_session.position(5)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Interleaved start/start — old device must be restored
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestInterleavedStartStart:
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_on_same_device_restores_prior_target(self, fresh_session):
|
||||
"""Starting a second session on the same device auto-stops the first.
|
||||
The first device's prior target must be restored before the second session begins.
|
||||
"""
|
||||
mgr = _make_manager(led_count=60)
|
||||
mgr.get_processing_target_for_device = MagicMock(return_value="tgt_original")
|
||||
|
||||
# Start first session
|
||||
await fresh_session.start("dev1", mgr)
|
||||
assert fresh_session.is_active is True
|
||||
assert fresh_session._prior_target_id == "tgt_original"
|
||||
|
||||
# Now start again on the same device
|
||||
# The second start should stop the first (restoring tgt_original),
|
||||
# then re-query the current running target (which will be tgt_original again
|
||||
# since start_processing will have been called).
|
||||
# For isolation: change what get_processing_target_for_device returns after
|
||||
# the first stop so the second session records a fresh prior.
|
||||
call_count = {"n": 0}
|
||||
|
||||
def _get_target(device_id):
|
||||
call_count["n"] += 1
|
||||
if call_count["n"] == 1:
|
||||
return "tgt_original"
|
||||
return None # After first stop, no target running
|
||||
|
||||
mgr.get_processing_target_for_device = MagicMock(side_effect=_get_target)
|
||||
|
||||
# First session with the original target
|
||||
fresh_session2 = CalibrationSession()
|
||||
await fresh_session2.start("dev1", mgr)
|
||||
assert fresh_session2.is_active is True
|
||||
|
||||
# Start a NEW session on the same device — must auto-stop fresh_session2
|
||||
fresh_session3 = CalibrationSession()
|
||||
# Inject fresh_session2's internal state into fresh_session3 to simulate
|
||||
# the singleton pattern: replace session3's state to reflect session2 active
|
||||
# (this mirrors the module-level singleton where only one CalibrationSession exists)
|
||||
fresh_session3._active = fresh_session2._active
|
||||
fresh_session3._device_id = fresh_session2._device_id
|
||||
fresh_session3._led_count = fresh_session2._led_count
|
||||
fresh_session3._prior_target_id = fresh_session2._prior_target_id
|
||||
fresh_session3._last_activity = fresh_session2._last_activity
|
||||
fresh_session3._manager = fresh_session2._manager
|
||||
fresh_session3._timeout_task = fresh_session2._timeout_task
|
||||
fresh_session2._timeout_task = None # prevent double-cancel
|
||||
|
||||
await fresh_session3.start("dev1", mgr)
|
||||
|
||||
# The start must have called stop on the previous session → restore was called
|
||||
# (mgr.start_processing was called at least once to restore the prior target)
|
||||
assert mgr.start_processing.await_count >= 1
|
||||
|
||||
# Cleanup
|
||||
await fresh_session3.stop()
|
||||
if fresh_session2._timeout_task and not fresh_session2._timeout_task.done():
|
||||
fresh_session2._timeout_task.cancel()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_new_session_on_different_device_clears_old_device(self, fresh_session):
|
||||
"""Starting a new session on a different device must clear the first device.
|
||||
|
||||
The first session must be stopped (its prior target restored or cleared)
|
||||
before the second session on the new device becomes active.
|
||||
"""
|
||||
mgr = MagicMock()
|
||||
|
||||
ds1 = MagicMock()
|
||||
ds1.led_count = 30
|
||||
ds2 = MagicMock()
|
||||
ds2.led_count = 60
|
||||
|
||||
mgr._devices = {"dev1": ds1, "dev2": ds2}
|
||||
mgr.get_processing_target_for_device = MagicMock(return_value=None)
|
||||
mgr.stop_processing = AsyncMock()
|
||||
mgr.start_processing = AsyncMock()
|
||||
mgr.send_clear_pixels = AsyncMock()
|
||||
mgr.set_calibration_pixel = AsyncMock()
|
||||
|
||||
# Start first session on dev1
|
||||
await fresh_session.start("dev1", mgr)
|
||||
assert fresh_session._device_id == "dev1"
|
||||
assert fresh_session.is_active is True
|
||||
|
||||
# Now start second session on dev2 — must auto-stop dev1 first
|
||||
await fresh_session.start("dev2", mgr)
|
||||
|
||||
# After the second start, session must be on dev2
|
||||
assert fresh_session._device_id == "dev2"
|
||||
assert fresh_session.is_active is True
|
||||
|
||||
# send_clear_pixels was called for dev1 (stop) AND for dev2 (start)
|
||||
clear_calls = [call[0][0] for call in mgr.send_clear_pixels.call_args_list]
|
||||
assert (
|
||||
"dev1" in clear_calls
|
||||
), f"dev1 was never cleared during session switch; clear calls: {clear_calls}"
|
||||
assert (
|
||||
"dev2" in clear_calls
|
||||
), f"dev2 was never cleared at session start; clear calls: {clear_calls}"
|
||||
|
||||
# Cleanup
|
||||
await fresh_session.stop()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Idle-timeout teardown restores prior target
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestIdleTimeoutRestoresPriorTarget:
|
||||
@pytest.mark.asyncio
|
||||
async def test_idle_timeout_calls_start_processing(self, fresh_session):
|
||||
"""When the session times out, start_processing must be called to restore the target.
|
||||
|
||||
We patch IDLE_TIMEOUT_SECONDS to a tiny value so the test doesn't actually
|
||||
wait 60 seconds.
|
||||
"""
|
||||
mgr = _make_manager(led_count=40)
|
||||
mgr.get_processing_target_for_device = MagicMock(return_value="tgt_to_restore")
|
||||
|
||||
# Patch the idle timeout to 0.05 seconds
|
||||
with patch(
|
||||
"ledgrab.core.capture.calibration_session.IDLE_TIMEOUT_SECONDS",
|
||||
0.05,
|
||||
):
|
||||
# Also patch the watchdog sleep to something tiny
|
||||
async def _fast_watchdog():
|
||||
"""A watchdog that checks every 0.02 seconds instead of 5."""
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(0.02)
|
||||
if not fresh_session._active or fresh_session._last_activity is None:
|
||||
break
|
||||
from datetime import datetime, timezone
|
||||
|
||||
elapsed = (
|
||||
datetime.now(timezone.utc) - fresh_session._last_activity
|
||||
).total_seconds()
|
||||
if elapsed >= 0.05:
|
||||
async with fresh_session._lock:
|
||||
await fresh_session._teardown_locked(cancelled=False)
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
fresh_session._idle_watchdog = _fast_watchdog # type: ignore[method-assign]
|
||||
|
||||
await fresh_session.start("dev1", mgr)
|
||||
assert fresh_session.is_active is True
|
||||
|
||||
# Wait long enough for the watchdog to fire
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
# Session should have been auto-stopped
|
||||
assert (
|
||||
fresh_session.is_active is False
|
||||
), "Session should have been auto-stopped by idle timeout"
|
||||
|
||||
# Prior target must have been restored
|
||||
mgr.start_processing.assert_awaited_once_with("tgt_to_restore")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_idle_timeout_clears_device_to_black(self, fresh_session):
|
||||
"""Idle timeout must send all-black before (or after) restoring target."""
|
||||
mgr = _make_manager(led_count=40)
|
||||
|
||||
async def _fast_watchdog():
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(0.02)
|
||||
if not fresh_session._active or fresh_session._last_activity is None:
|
||||
break
|
||||
from datetime import datetime, timezone
|
||||
|
||||
elapsed = (
|
||||
datetime.now(timezone.utc) - fresh_session._last_activity
|
||||
).total_seconds()
|
||||
if elapsed >= 0.05:
|
||||
async with fresh_session._lock:
|
||||
await fresh_session._teardown_locked(cancelled=False)
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
fresh_session._idle_watchdog = _fast_watchdog # type: ignore[method-assign]
|
||||
|
||||
await fresh_session.start("dev1", mgr)
|
||||
initial_clear_count = mgr.send_clear_pixels.await_count # from start()
|
||||
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
# send_clear_pixels must have been called at least once more during teardown
|
||||
assert (
|
||||
mgr.send_clear_pixels.await_count > initial_clear_count
|
||||
), "Device was not cleared to black during idle-timeout teardown"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Concurrent start/start using asyncio.gather (true concurrency within the loop)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestConcurrentStartCalls:
|
||||
@pytest.mark.asyncio
|
||||
async def test_concurrent_starts_dont_corrupt_state(self, fresh_session):
|
||||
"""Two concurrent start() calls must leave the session in a consistent state.
|
||||
|
||||
Only one session should be active after both complete; the final device
|
||||
must match one of the two requested devices.
|
||||
"""
|
||||
mgr = MagicMock()
|
||||
ds1 = MagicMock()
|
||||
ds1.led_count = 50
|
||||
ds2 = MagicMock()
|
||||
ds2.led_count = 80
|
||||
mgr._devices = {"devA": ds1, "devB": ds2}
|
||||
mgr.get_processing_target_for_device = MagicMock(return_value=None)
|
||||
mgr.stop_processing = AsyncMock()
|
||||
mgr.start_processing = AsyncMock()
|
||||
mgr.send_clear_pixels = AsyncMock()
|
||||
mgr.set_calibration_pixel = AsyncMock()
|
||||
|
||||
# Fire both concurrently — the lock must ensure exactly one wins
|
||||
results = await asyncio.gather(
|
||||
fresh_session.start("devA", mgr),
|
||||
fresh_session.start("devB", mgr),
|
||||
return_exceptions=True,
|
||||
)
|
||||
|
||||
# Neither call should raise (both complete without exception)
|
||||
for r in results:
|
||||
if isinstance(r, BaseException):
|
||||
pytest.fail(f"Concurrent start raised unexpectedly: {r!r}")
|
||||
|
||||
# Exactly one session active with a valid device
|
||||
assert fresh_session.is_active is True
|
||||
assert fresh_session._device_id in ("devA", "devB")
|
||||
|
||||
# Cleanup
|
||||
await fresh_session.stop()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# stop() while watchdog is still pending (concurrent stop/watchdog)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestStopVsWatchdogRace:
|
||||
@pytest.mark.asyncio
|
||||
async def test_explicit_stop_wins_over_watchdog(self, fresh_session):
|
||||
"""Explicit stop() must cleanly terminate before the watchdog fires.
|
||||
|
||||
After explicit stop, start_processing must be called exactly once
|
||||
even if the watchdog later tries to tear down.
|
||||
"""
|
||||
mgr = _make_manager(led_count=100)
|
||||
mgr.get_processing_target_for_device = MagicMock(return_value="tgt_explicit")
|
||||
|
||||
await fresh_session.start("dev1", mgr)
|
||||
assert fresh_session.is_active is True
|
||||
|
||||
# Explicitly stop — the watchdog task should be cancelled
|
||||
await fresh_session.stop()
|
||||
|
||||
assert fresh_session.is_active is False
|
||||
# start_processing called exactly once for the prior target
|
||||
mgr.start_processing.assert_awaited_once_with("tgt_explicit")
|
||||
|
||||
# Give the event loop a moment to confirm the watchdog didn't double-fire
|
||||
await asyncio.sleep(0.05)
|
||||
# Still exactly once
|
||||
assert (
|
||||
mgr.start_processing.await_count == 1
|
||||
), "start_processing was called more than once — watchdog fired after explicit stop"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# start() with unknown device_id
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestStartUnknownDevice:
|
||||
@pytest.mark.asyncio
|
||||
async def test_unknown_device_raises_valueerror(self, fresh_session):
|
||||
"""start() with a device_id not in manager._devices must raise ValueError."""
|
||||
mgr = _make_manager(device_id="known_device")
|
||||
with pytest.raises(ValueError, match="not found"):
|
||||
await fresh_session.start("does_not_exist", mgr)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unknown_device_leaves_session_inactive(self, fresh_session):
|
||||
"""After a failed start() the session must remain inactive."""
|
||||
mgr = _make_manager(device_id="known_device")
|
||||
try:
|
||||
await fresh_session.start("no_such_device", mgr)
|
||||
except ValueError:
|
||||
pass
|
||||
assert fresh_session.is_active is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_failed_start_does_not_corrupt_existing_session(self, fresh_session):
|
||||
"""A failed start() attempt must not corrupt an already-active session."""
|
||||
mgr = MagicMock()
|
||||
ds = MagicMock()
|
||||
ds.led_count = 100
|
||||
mgr._devices = {"dev1": ds}
|
||||
mgr.get_processing_target_for_device = MagicMock(return_value="prior_tgt")
|
||||
mgr.stop_processing = AsyncMock()
|
||||
mgr.start_processing = AsyncMock()
|
||||
mgr.send_clear_pixels = AsyncMock()
|
||||
mgr.set_calibration_pixel = AsyncMock()
|
||||
|
||||
# Start a valid session
|
||||
await fresh_session.start("dev1", mgr)
|
||||
assert fresh_session.is_active is True
|
||||
|
||||
# Now try to start on an unknown device — should fail
|
||||
try:
|
||||
await fresh_session.start("unknown_device", mgr)
|
||||
except ValueError:
|
||||
pass
|
||||
except Exception:
|
||||
pass # Other errors are also acceptable
|
||||
|
||||
# The original session must still be active (or was cleanly stopped and
|
||||
# replaced); either way the device must not be stuck in a broken state.
|
||||
# The critical invariant: if active, device_id is valid.
|
||||
if fresh_session.is_active:
|
||||
assert fresh_session._device_id in mgr._devices
|
||||
|
||||
await fresh_session.stop()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# State snapshot integrity
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestStateSnapshot:
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_state_before_start(self, fresh_session):
|
||||
state = fresh_session.get_state()
|
||||
assert state["active"] is False
|
||||
assert state["device_id"] is None
|
||||
assert state["led_count"] == 0
|
||||
assert state["prior_target_id"] is None
|
||||
assert state["last_activity"] is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_state_after_start(self, fresh_session):
|
||||
mgr = _make_manager(led_count=60)
|
||||
mgr.get_processing_target_for_device = MagicMock(return_value="saved_tgt")
|
||||
await fresh_session.start("dev1", mgr)
|
||||
|
||||
state = fresh_session.get_state()
|
||||
assert state["active"] is True
|
||||
assert state["device_id"] == "dev1"
|
||||
assert state["led_count"] == 60
|
||||
assert state["prior_target_id"] == "saved_tgt"
|
||||
assert state["last_activity"] is not None
|
||||
|
||||
await fresh_session.stop()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_state_after_stop(self, fresh_session):
|
||||
mgr = _make_manager()
|
||||
await fresh_session.start("dev1", mgr)
|
||||
await fresh_session.stop()
|
||||
|
||||
state = fresh_session.get_state()
|
||||
assert state["active"] is False
|
||||
assert state["device_id"] is None
|
||||
assert state["led_count"] == 0
|
||||
assert state["prior_target_id"] is None
|
||||
@@ -0,0 +1,562 @@
|
||||
"""Adversarial / edge-case tests for solve_calibration() and build_segments().
|
||||
|
||||
Criteria derived from Phase 1 acceptance criteria (NOT from what the code does):
|
||||
- solve_calibration returns correct per-edge counts; result round-trips through
|
||||
build_segments() and totals are preserved.
|
||||
- An edge with 0 LEDs (two corners tapped adjacent) is valid.
|
||||
- Wrap-around edges work correctly.
|
||||
- Invalid inputs raise cleanly (ValueError).
|
||||
|
||||
New adversarial cases not covered by the existing 19 happy-path tests:
|
||||
- All four corner_indices equal (degenerate: every edge = 0) — the code must
|
||||
either handle it gracefully (total=0) OR raise with a clear message.
|
||||
Per criteria the total must be preserved (0 == 0) and round-trip must not crash.
|
||||
- Descending / out-of-order corner_indices where wrap-around should apply.
|
||||
- An edge that spans the whole strip (one edge wraps the full led_count minus
|
||||
the contribution of the three zero-LED edges).
|
||||
- led_count just above minimum (led_count=1) with a single LED claimed by
|
||||
one edge.
|
||||
- offset >= led_count: the solver stores offset verbatim; PixelMapper normalises
|
||||
it via % total_leds. The build_segments() round-trip must not crash.
|
||||
- Corner indices modulo led_count are used, so indices >= led_count should be
|
||||
accepted and reduced.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.core.capture.calibration import (
|
||||
EDGE_ORDER,
|
||||
CalibrationConfig,
|
||||
solve_calibration,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helper (same as in test_calibration_solver.py — duplicated intentionally so
|
||||
# this adversarial file can run in isolation)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _roundtrip(cfg: CalibrationConfig) -> None:
|
||||
"""Assert build_segments() doesn't crash and totals match."""
|
||||
segs = cfg.build_segments()
|
||||
total_from_segs = sum(s.led_count for s in segs)
|
||||
expected = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left
|
||||
assert (
|
||||
total_from_segs == expected
|
||||
), f"Segment total {total_from_segs} != field total {expected} for cfg={cfg!r}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Degenerate: all four corner indices are equal
|
||||
# When all four corners are tapped at the same index every consecutive pair
|
||||
# has start_idx == end_idx, so every edge gets 0 LEDs. Total = 0 is
|
||||
# mathematically consistent with the input; the function should either
|
||||
# return a config with 0-LED edges OR raise a clear ValueError.
|
||||
# It must NOT silently return wrong counts and must NOT crash unexpectedly.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAllFourCornersEqual:
|
||||
"""All four corner_indices equal → every edge = 0 LEDs or clean ValueError."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"start_position,layout",
|
||||
[(sp, lay) for sp, lay in EDGE_ORDER.keys()],
|
||||
)
|
||||
def test_all_equal_returns_zero_total_or_raises(self, start_position, layout):
|
||||
"""solve_calibration([k,k,k,k]) must not produce non-zero counts."""
|
||||
led_count = 100
|
||||
try:
|
||||
cfg = solve_calibration(
|
||||
led_count=led_count,
|
||||
start_position=start_position,
|
||||
layout=layout,
|
||||
corner_indices=[0, 0, 0, 0],
|
||||
)
|
||||
# If it doesn't raise: total must be 0 (consistent with input)
|
||||
total = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left
|
||||
assert (
|
||||
total == 0
|
||||
), f"{start_position}/{layout}: all-equal corners produced total={total}; expected 0"
|
||||
# build_segments must not crash on a 0-total config
|
||||
segs = cfg.build_segments()
|
||||
assert segs == [], f"Expected no segments for 0-LED config, got {segs!r}"
|
||||
except ValueError:
|
||||
# Raising is also acceptable — as long as it's a ValueError, not a
|
||||
# crash/assertion/other exception.
|
||||
pass
|
||||
|
||||
def test_all_equal_roundtrip_does_not_crash(self):
|
||||
"""Even if all edges are 0, build_segments() must not raise."""
|
||||
try:
|
||||
cfg = solve_calibration(
|
||||
led_count=50,
|
||||
start_position="top_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[25, 25, 25, 25],
|
||||
)
|
||||
_roundtrip(cfg) # must not crash
|
||||
except ValueError:
|
||||
pass # acceptable
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Descending / out-of-order corner indices
|
||||
# Out-of-order but non-wrapping: e.g. [70, 50, 30, 10] for clockwise bottom_left
|
||||
# The solver uses consecutive-pair differences with wrap logic.
|
||||
# Each pair (70→50, 50→30, 30→10, 10→70) has end < start, so ALL four edges
|
||||
# get wrap-around counts. Total must still equal led_count.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDescendingCornerIndices:
|
||||
"""All four corners in descending order — every pair wraps."""
|
||||
|
||||
def test_descending_total_equals_led_count(self):
|
||||
"""Descending indices [75, 50, 25, 0] — total must == led_count."""
|
||||
# bottom_left/clockwise: edge_order=[left,top,right,bottom]
|
||||
# left: 75→50 = 25, top: 50→25 = 25, right: 25→0 = 25, bottom: 0→75 = 75
|
||||
# Wait — end > start for bottom: 0→75 means 75. But 0 < 75 so count=75-0=75.
|
||||
# Recalculate: for bottom: start_idx=0, end_idx=75 → end>start → count=75
|
||||
# total = 25+25+25+75 = 150 ≠ 100 → That's wrong input.
|
||||
# Use [80, 60, 40, 20] for 100 LEDs:
|
||||
# left: 80→60: end<start → wrap: (100-80)+60=80 WRONG too.
|
||||
# Actually for a valid descending case that totals 100:
|
||||
# Monotone descending with wrap on last pair only:
|
||||
# [75, 50, 25, 0] for bottom_left/clockwise:
|
||||
# left: start=75,end=50: 50<75 → wrap: (100-75)+50=75 NO
|
||||
# All pairs descend when [75,50,25,0]:
|
||||
# 0→75 (last pair, bottom): 75>0 → count=75-0=75
|
||||
# 75→50 (left): 50<75 → wrap: (100-75)+50=75
|
||||
# Total would be 75+25+25+75=200 ≠ 100
|
||||
# That shows descending indices don't sum to led_count in general.
|
||||
# The critical invariant is: sum of per-edge counts == led_count when
|
||||
# the corner indices span a single full traversal.
|
||||
# To get descending [80,60,40,20] → sum via wrap logic:
|
||||
# left: 80→60: 60<80 → (100-80)+60=80
|
||||
# top: 60→40: 40<60 → (100-60)+40=80
|
||||
# right: 40→20: 20<40 → (100-40)+20=80
|
||||
# bottom: 20→80: 80>20 → 80-20=60
|
||||
# total = 80+80+80+60 = 300 — still not 100.
|
||||
#
|
||||
# The INVARIANT of solve_calibration is: if corner_indices form a valid
|
||||
# partition of the strip (i.e. each consecutive pair covers a segment
|
||||
# that together span exactly led_count LEDs), the total == led_count.
|
||||
# Descending indices that form valid partitions do sum to led_count.
|
||||
# Example: if we want all edges wrapped, use [99, 74, 49, 24]:
|
||||
# left: 99→74: 74<99 → (100-99)+74=75
|
||||
# top: 74→49: 49<74 → (100-74)+49=75
|
||||
# right: 49→24: 24<49 → (100-49)+24=75
|
||||
# bottom: 24→99: 99>24 → 99-24=75
|
||||
# total = 75+75+75+75=300 ≠ 100
|
||||
#
|
||||
# Conclusion: descending indices don't need to sum to led_count — only
|
||||
# a proper strip traversal (covering each LED exactly once) does.
|
||||
# The adversarial test here is: the function must NOT crash, must NOT
|
||||
# produce negative counts, and must round-trip cleanly.
|
||||
cfg = solve_calibration(
|
||||
led_count=100,
|
||||
start_position="bottom_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[80, 60, 40, 20],
|
||||
)
|
||||
counts = [cfg.leds_top, cfg.leds_right, cfg.leds_bottom, cfg.leds_left]
|
||||
assert all(c >= 0 for c in counts), f"Negative edge counts: {counts}"
|
||||
_roundtrip(cfg)
|
||||
|
||||
def test_all_four_wrap_around_non_negative(self):
|
||||
"""Every consecutive pair wraps (descending) — all counts must be >= 0."""
|
||||
# [90, 70, 50, 30] for led_count=100, top_right/clockwise
|
||||
cfg = solve_calibration(
|
||||
led_count=100,
|
||||
start_position="top_right",
|
||||
layout="clockwise",
|
||||
corner_indices=[90, 70, 50, 30],
|
||||
)
|
||||
counts = [cfg.leds_top, cfg.leds_right, cfg.leds_bottom, cfg.leds_left]
|
||||
assert all(c >= 0 for c in counts), f"Negative edge counts: {counts}"
|
||||
_roundtrip(cfg)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Edge that wraps the whole strip
|
||||
# When only one edge is used (the other 3 are 0 LEDs), that edge should span
|
||||
# led_count LEDs. Corner indices: [0, 0, 0, 0] gives all-zero (tested above).
|
||||
# A single-edge case: [0, 100, 100, 100] would give left=100, top=0, right=0,
|
||||
# bottom=0 for bottom_left/clockwise. But 100 % 100 = 0 so end_idx=0,
|
||||
# start_idx=0 → count=0 for left too. Use index 100 directly (>led_count).
|
||||
# Alternatively: for led_count=100, corners [0, 0, 0, 0] with all-zero is
|
||||
# already tested. The wrap-all case uses non-modulo indices.
|
||||
#
|
||||
# The real case: one valid single-edge scenario:
|
||||
# bottom_left/clockwise, 100 LEDs, EDGE_ORDER = [left,top,right,bottom]
|
||||
# corners [0, 100, 100, 100]: left: 0→100%100=0 → 0-LED because end==start.
|
||||
# No, there's no way to make one edge claim all 100 LEDs with index-based input
|
||||
# other than having it wrap around. Test: [50, 50, 50, 50] (all equal):
|
||||
# already tested above.
|
||||
# The closest adversarial case: one edge with count == led_count via wrap-around.
|
||||
# For bottom_left/clockwise, [50, 50, 50, 50] makes top=0,right=0,bottom=0,left=0.
|
||||
# To make ONE edge == 100: we need start_idx=50, end_idx=50 for three edges
|
||||
# and start=50, end=50 wraps to 0 for that edge... that's the all-zeros case.
|
||||
# The only way one edge gets ALL leds is:
|
||||
# e.g. left: corners[0]=50, corners[1]=50 → 50==50 → count=0 (NOT 100).
|
||||
# There's a subtle algorithmic distinction: adjacent indices being equal → 0 LED
|
||||
# vs wrap-around: if the algorithm used (start+count)%N as end, then one-edge
|
||||
# spanning the whole strip would require start=end via wrap, giving count=N.
|
||||
# But the current algorithm: end==start → count=0. This is the CORRECT behavior
|
||||
# per the Phase 1 spec ("Adjacent taps on the same index → 0-LED edge").
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestWholestripSingleEdge:
|
||||
"""Verify that a wrap-around edge spanning led_count-3 LEDs (max when 3 others are 1 each) works."""
|
||||
|
||||
def test_one_large_edge_with_minimal_others(self):
|
||||
"""One edge has led_count-3 LEDs, the other 3 each have 1 LED.
|
||||
|
||||
bottom_left/clockwise, led_count=100:
|
||||
EDGE_ORDER = [left, top, right, bottom]
|
||||
corners: [0, 97, 98, 99]
|
||||
left: 0→97 = 97
|
||||
top: 97→98 = 1
|
||||
right: 98→99 = 1
|
||||
bottom: 99→0 = 1 (wrap: (100-99)+0 = 1)
|
||||
total = 97+1+1+1 = 100 ✓
|
||||
"""
|
||||
cfg = solve_calibration(
|
||||
led_count=100,
|
||||
start_position="bottom_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[0, 97, 98, 99],
|
||||
)
|
||||
assert cfg.leds_left == 97
|
||||
assert cfg.leds_top == 1
|
||||
assert cfg.leds_right == 1
|
||||
assert cfg.leds_bottom == 1
|
||||
assert (cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left) == 100
|
||||
_roundtrip(cfg)
|
||||
|
||||
def test_last_edge_wraps_nearly_all_leds(self):
|
||||
"""The last edge (bottom in bottom_left/clockwise) wraps from 3 to 0.
|
||||
|
||||
corners: [0, 1, 2, 3]
|
||||
left: 0→1 = 1
|
||||
top: 1→2 = 1
|
||||
right: 2→3 = 1
|
||||
bottom: 3→0 = (100-3)+0 = 97 (wrap-around)
|
||||
total = 100 ✓
|
||||
"""
|
||||
cfg = solve_calibration(
|
||||
led_count=100,
|
||||
start_position="bottom_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[0, 1, 2, 3],
|
||||
)
|
||||
assert cfg.leds_left == 1
|
||||
assert cfg.leds_top == 1
|
||||
assert cfg.leds_right == 1
|
||||
assert cfg.leds_bottom == 97
|
||||
assert (cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left) == 100
|
||||
_roundtrip(cfg)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# led_count just above minimum
|
||||
# led_count=1: only 1 LED; corner_indices can only meaningfully be [0,0,0,0]
|
||||
# which gives all zeros. That's the all-equal case above.
|
||||
# led_count=5 (just above conceptual minimum) with a valid single-wrap partition.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMinimalLedCount:
|
||||
"""Edge cases around very small led_count values."""
|
||||
|
||||
def test_led_count_1_all_equal_indices(self):
|
||||
"""led_count=1: the only valid index is 0; all-equal → all-zero edges."""
|
||||
try:
|
||||
cfg = solve_calibration(
|
||||
led_count=1,
|
||||
start_position="bottom_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[0, 0, 0, 0],
|
||||
)
|
||||
total = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left
|
||||
assert total == 0, f"Expected total=0, got {total}"
|
||||
_roundtrip(cfg)
|
||||
except ValueError:
|
||||
pass # also acceptable
|
||||
|
||||
def test_led_count_4_minimal_partition(self):
|
||||
"""led_count=4, each edge gets 1 LED — minimum non-trivial partition."""
|
||||
# bottom_left/clockwise EDGE_ORDER: [left, top, right, bottom]
|
||||
# corners: [0, 1, 2, 3]
|
||||
# left: 0→1=1, top: 1→2=1, right: 2→3=1, bottom: 3→0=(4-3)+0=1
|
||||
cfg = solve_calibration(
|
||||
led_count=4,
|
||||
start_position="bottom_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[0, 1, 2, 3],
|
||||
)
|
||||
assert cfg.leds_left == 1
|
||||
assert cfg.leds_top == 1
|
||||
assert cfg.leds_right == 1
|
||||
assert cfg.leds_bottom == 1
|
||||
_roundtrip(cfg)
|
||||
|
||||
def test_led_count_0_raises_valueerror(self):
|
||||
"""led_count=0 is explicitly invalid per the docstring."""
|
||||
with pytest.raises(ValueError, match="led_count"):
|
||||
solve_calibration(
|
||||
led_count=0,
|
||||
start_position="bottom_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[0, 0, 0, 0],
|
||||
)
|
||||
|
||||
def test_led_count_negative_raises_valueerror(self):
|
||||
"""led_count=-5 is invalid."""
|
||||
with pytest.raises(ValueError):
|
||||
solve_calibration(
|
||||
led_count=-5,
|
||||
start_position="bottom_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[0, 0, 0, 0],
|
||||
)
|
||||
|
||||
def test_led_count_5_just_above_minimum(self):
|
||||
"""led_count=5 with a valid 2/1/1/1 partition."""
|
||||
# bottom_left/clockwise: [left,top,right,bottom]
|
||||
# corners: [0, 2, 3, 4]
|
||||
# left: 0→2=2, top: 2→3=1, right: 3→4=1, bottom: 4→0=(5-4)+0=1
|
||||
cfg = solve_calibration(
|
||||
led_count=5,
|
||||
start_position="bottom_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[0, 2, 3, 4],
|
||||
)
|
||||
assert cfg.leds_left == 2
|
||||
assert cfg.leds_top == 1
|
||||
assert cfg.leds_right == 1
|
||||
assert cfg.leds_bottom == 1
|
||||
assert sum([cfg.leds_top, cfg.leds_right, cfg.leds_bottom, cfg.leds_left]) == 5
|
||||
_roundtrip(cfg)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Offset interactions
|
||||
# The offset is stored verbatim; PixelMapper normalises it via % total_leds.
|
||||
# solve_calibration must pass it through without modification.
|
||||
# Also test: offset == 0 (explicit), offset == led_count (should store as-is),
|
||||
# and offset >> led_count.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestOffsetInteractions:
|
||||
"""Offset is stored verbatim and must not affect per-edge counts."""
|
||||
|
||||
def test_offset_zero_explicit(self):
|
||||
cfg = solve_calibration(
|
||||
led_count=100,
|
||||
start_position="bottom_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[0, 25, 50, 75],
|
||||
offset=0,
|
||||
)
|
||||
assert cfg.offset == 0
|
||||
_roundtrip(cfg)
|
||||
|
||||
def test_offset_equals_led_count_stored_verbatim(self):
|
||||
"""offset=led_count should be stored as-is (not reduced)."""
|
||||
cfg = solve_calibration(
|
||||
led_count=100,
|
||||
start_position="top_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[0, 25, 50, 75],
|
||||
offset=100,
|
||||
)
|
||||
assert cfg.offset == 100
|
||||
_roundtrip(cfg)
|
||||
|
||||
def test_large_offset_stored_verbatim(self):
|
||||
"""offset >> led_count — stored verbatim, build_segments must not crash."""
|
||||
cfg = solve_calibration(
|
||||
led_count=60,
|
||||
start_position="top_right",
|
||||
layout="counterclockwise",
|
||||
corner_indices=[0, 15, 30, 45],
|
||||
offset=9999,
|
||||
)
|
||||
assert cfg.offset == 9999
|
||||
_roundtrip(cfg)
|
||||
|
||||
def test_offset_does_not_change_edge_counts(self):
|
||||
"""Two calls with different offsets must produce identical edge counts."""
|
||||
kwargs = dict(
|
||||
led_count=100,
|
||||
start_position="bottom_left",
|
||||
layout="counterclockwise",
|
||||
corner_indices=[0, 25, 50, 75],
|
||||
)
|
||||
cfg_no_offset = solve_calibration(**kwargs, offset=0)
|
||||
cfg_offset_13 = solve_calibration(**kwargs, offset=13)
|
||||
assert cfg_no_offset.leds_top == cfg_offset_13.leds_top
|
||||
assert cfg_no_offset.leds_right == cfg_offset_13.leds_right
|
||||
assert cfg_no_offset.leds_bottom == cfg_offset_13.leds_bottom
|
||||
assert cfg_no_offset.leds_left == cfg_offset_13.leds_left
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# corner_indices >= led_count (modulo reduction)
|
||||
# The solver applies % led_count to each index before computing counts.
|
||||
# Index 150 for led_count=100 should behave identically to index 50.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCornerIndicesModuloReduction:
|
||||
"""Indices >= led_count are reduced modulo led_count before use."""
|
||||
|
||||
def test_indices_above_led_count_reduced(self):
|
||||
"""corner_indices [0, 25, 50, 75] and [100, 125, 150, 175] must give same result."""
|
||||
led_count = 100
|
||||
cfg_base = solve_calibration(
|
||||
led_count=led_count,
|
||||
start_position="bottom_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[0, 25, 50, 75],
|
||||
)
|
||||
cfg_shifted = solve_calibration(
|
||||
led_count=led_count,
|
||||
start_position="bottom_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[100, 125, 150, 175], # each + 100 (≡ same mod 100)
|
||||
)
|
||||
assert cfg_base.leds_left == cfg_shifted.leds_left
|
||||
assert cfg_base.leds_top == cfg_shifted.leds_top
|
||||
assert cfg_base.leds_right == cfg_shifted.leds_right
|
||||
assert cfg_base.leds_bottom == cfg_shifted.leds_bottom
|
||||
_roundtrip(cfg_shifted)
|
||||
|
||||
def test_index_exactly_led_count_is_zero(self):
|
||||
"""Index = led_count reduces to 0, same as explicit 0."""
|
||||
cfg_zero = solve_calibration(
|
||||
led_count=100,
|
||||
start_position="top_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[0, 25, 50, 75],
|
||||
)
|
||||
cfg_hundred = solve_calibration(
|
||||
led_count=100,
|
||||
start_position="top_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[100, 25, 50, 75], # first index = led_count → reduces to 0
|
||||
)
|
||||
assert cfg_zero.leds_top == cfg_hundred.leds_top
|
||||
assert cfg_zero.leds_right == cfg_hundred.leds_right
|
||||
assert cfg_zero.leds_bottom == cfg_hundred.leds_bottom
|
||||
assert cfg_zero.leds_left == cfg_hundred.leds_left
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Invalid input: wrong number of corner_indices
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestInvalidCornerIndicesLength:
|
||||
"""Wrong number of corner indices must raise ValueError."""
|
||||
|
||||
def test_three_corners_raises(self):
|
||||
with pytest.raises(ValueError):
|
||||
solve_calibration(
|
||||
led_count=100,
|
||||
start_position="bottom_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[0, 25, 50],
|
||||
)
|
||||
|
||||
def test_five_corners_raises(self):
|
||||
with pytest.raises(ValueError):
|
||||
solve_calibration(
|
||||
led_count=100,
|
||||
start_position="bottom_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[0, 20, 40, 60, 80],
|
||||
)
|
||||
|
||||
def test_empty_corners_raises(self):
|
||||
with pytest.raises(ValueError):
|
||||
solve_calibration(
|
||||
led_count=100,
|
||||
start_position="bottom_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[],
|
||||
)
|
||||
|
||||
def test_one_corner_raises(self):
|
||||
with pytest.raises(ValueError):
|
||||
solve_calibration(
|
||||
led_count=100,
|
||||
start_position="bottom_left",
|
||||
layout="clockwise",
|
||||
corner_indices=[50],
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Invalid start_position / layout
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestInvalidEnumInputs:
|
||||
def test_invalid_start_position_raises(self):
|
||||
with pytest.raises(ValueError, match="start_position"):
|
||||
solve_calibration(
|
||||
led_count=100,
|
||||
start_position="center",
|
||||
layout="clockwise",
|
||||
corner_indices=[0, 25, 50, 75],
|
||||
)
|
||||
|
||||
def test_invalid_layout_raises(self):
|
||||
with pytest.raises(ValueError):
|
||||
solve_calibration(
|
||||
led_count=100,
|
||||
start_position="bottom_left",
|
||||
layout="diagonal",
|
||||
corner_indices=[0, 25, 50, 75],
|
||||
)
|
||||
|
||||
def test_both_invalid_raises(self):
|
||||
with pytest.raises(ValueError):
|
||||
solve_calibration(
|
||||
led_count=100,
|
||||
start_position="wrong",
|
||||
layout="wrong",
|
||||
corner_indices=[0, 25, 50, 75],
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Totals preservation invariant
|
||||
# For any valid input, sum(edge_counts) == sum via build_segments()
|
||||
# Test across all 8 combinations with a wrap-around partition.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.parametrize("start_position,layout", list(EDGE_ORDER.keys()))
|
||||
def test_total_preservation_with_wraparound(start_position, layout):
|
||||
"""For all 8 combinations with a wrap-around partition, total preserved."""
|
||||
# The wrap-around partition: first 3 edges get 20 LEDs each, last edge
|
||||
# gets the remaining 40 via wrap.
|
||||
# EDGE_ORDER tells us walk order; corners: [0, 20, 40, 60] — last edge wraps to 40.
|
||||
# bottom: 60→0 = (100-60)+0 = 40.
|
||||
cfg = solve_calibration(
|
||||
led_count=100,
|
||||
start_position=start_position,
|
||||
layout=layout,
|
||||
corner_indices=[0, 20, 40, 60],
|
||||
)
|
||||
# Each of first 3 edges = 20, last = 40
|
||||
total = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left
|
||||
assert total == 100, f"{start_position}/{layout}: expected total=100, got {total}"
|
||||
_roundtrip(cfg)
|
||||
Reference in New Issue
Block a user