diff --git a/docs/API.md b/docs/API.md index d89c378..ef09f19 100644 --- a/docs/API.md +++ b/docs/API.md @@ -238,7 +238,7 @@ A single aggregated poll endpoint for low-overhead clients. | Method | Path | Description | | ------ | ---- | ----------- | -| GET | `/api/v1/snapshot` | Full poll payload (targets, states, metrics, devices, brightness, color/value sources, scene presets, sync clocks, system) in one response. Use `?include=` to request a subset; per-section fault isolation. | +| GET | `/api/v1/snapshot` | Full poll payload (targets, states, metrics, devices, brightness, color/value sources, scene presets, scene playlists + cycling state, sync clocks, system) in one response. Use `?include=` to request a subset; per-section fault isolation. | ## Devices @@ -649,6 +649,73 @@ The wiring-graph: schema registry, topology, dependents, validation, and subgrap | POST | `/api/v1/graph/validate-connection` | Validate a proposed wiring edit (existence, kind, no cycle). | | POST | `/api/v1/graph/duplicate` | Deep-clone selected value/color-strip sources with remapped wiring. | +## Calibration + +Guided LED chase and auto-solver for the `CalibrationConfig` stored on a +color-strip source. The flow is: + +1. **Start** a session (`POST /session`) — stops any running target on the + device and remembers it for restore on stop. +2. **Position** the chase pixel (`POST /session/position`) to walk through + each physical corner and record the LED index. +3. **Solve** (`POST /solve`) — the server computes per-edge LED counts. +4. **Persist** — call `PUT /api/v1/color-strip-sources/{id}` with the solved + `calibration` object to save and hot-reload. +5. **Stop** (`POST /session/stop`) — clears the device and restores the prior + target. + +| Method | Path | Description | +| ------ | ---- | ----------- | +| POST | `/api/v1/calibration/session` | Start a calibration session on a device (stops the running target, clears to black). | +| POST | `/api/v1/calibration/session/position` | Advance the chase pixel to LED `index` (± `window` dim neighbours). | +| POST | `/api/v1/calibration/session/stop` | End the session: clear to black and restore the prior target. | +| POST | `/api/v1/calibration/session/cancel` | Alias for stop — no calibration is applied. | +| GET | `/api/v1/calibration/session/state` | Current session state (active, device_id, led_count, last_activity). | +| POST | `/api/v1/calibration/solve` | Solve per-edge LED counts from 4 corner tap indices. Returns solved config dict (does NOT persist). | + +**Session state** response shape: + +```json +{ + "active": true, + "device_id": "dev_abc123", + "led_count": 100, + "prior_target_id": "ot_xyz456", + "last_activity": "2026-06-08T12:34:56.789Z" +} +``` + +**Solve request** (body): + +```json +{ + "device_id": "dev_abc123", + "start_position": "bottom_left", + "layout": "clockwise", + "corner_indices": [0, 30, 60, 80], + "offset": 0 +} +``` + +`corner_indices` must be exactly 4 integers, one per screen corner, in the +strip-walk order defined by `(start_position, layout)`. Provide either +`device_id` (preferred — server derives `led_count`) or `led_count` directly. + +**Important session behavior:** + +- **Stops the running output target** — starting a calibration session immediately + stops any output target currently running on that device. Other clients driving + that device will lose their output for the duration of the session. +- **Single session only** — only one calibration session runs at a time across the + whole server. Starting a new session automatically ends the previous one (clearing + and restoring its device first), regardless of which device each session is on. +- **Idle auto-end** — a session that receives no `position` calls for ~60 seconds is + automatically stopped and the prior target restored, so devices are never left dark + indefinitely. + +**Idle timeout:** a session that receives no `position` calls for 60 seconds +is automatically stopped and the prior target restored. + ## Web UI & PWA App-level routes served by FastAPI (not under `/api/v1`). diff --git a/server/src/ledgrab/api/__init__.py b/server/src/ledgrab/api/__init__.py index a1e710f..283e33c 100644 --- a/server/src/ledgrab/api/__init__.py +++ b/server/src/ledgrab/api/__init__.py @@ -36,6 +36,7 @@ from .routes.pattern_templates import router as pattern_templates_router from .routes.preferences import router as preferences_router from .routes.snapshot import router as snapshot_router from .routes.graph import router as graph_router +from .routes.calibration import router as calibration_router router = APIRouter() router.include_router(system_router) @@ -72,5 +73,6 @@ router.include_router(pattern_templates_router) router.include_router(preferences_router) router.include_router(snapshot_router) router.include_router(graph_router) +router.include_router(calibration_router) __all__ = ["router"] diff --git a/server/src/ledgrab/api/routes/calibration.py b/server/src/ledgrab/api/routes/calibration.py new file mode 100644 index 0000000..4080e5f --- /dev/null +++ b/server/src/ledgrab/api/routes/calibration.py @@ -0,0 +1,236 @@ +"""Calibration session and solver API routes. + +Endpoints +--------- +POST /api/v1/calibration/session + Start a calibration session on a device (stops any running target on that + device and remembers it for restore on stop). + +POST /api/v1/calibration/session/position + Advance the chase pixel to a specific LED index on the active device. + +POST /api/v1/calibration/session/stop + End the session: clear the device to black and restore the prior target. + +POST /api/v1/calibration/session/cancel + Alias for stop (does not apply any solved calibration). + +GET /api/v1/calibration/session/state + Return the current session state (active, device, last_activity, …). + +POST /api/v1/calibration/solve + Pure-logic: solve a CalibrationConfig from 4 corner tap indices. + Does NOT persist — the caller must follow up with + ``PUT /api/v1/color-strip-sources/{id}`` to persist. + +Persist path +------------ +The existing ``PUT /api/v1/color-strip-sources/{id}`` already accepts a +``calibration`` field on ``PictureCSSUpdate`` / ``PictureAdvancedCSSUpdate`` +and hot-reloads running streams automatically (see +``api/routes/color_strip_sources/crud.py``). There is NO duplicate endpoint +here. Phase 3 UI calls the existing PUT to persist. +""" + +from fastapi import APIRouter, Depends, HTTPException + +from ledgrab.api.auth import AuthRequired +from ledgrab.api.dependencies import get_processor_manager +from ledgrab.api.schemas.calibration import ( + CalibrationSessionPositionRequest, + CalibrationSessionStartRequest, + CalibrationSessionStateResponse, + CalibrationSolveRequest, + CalibrationSolvedResponse, +) +from ledgrab.core.capture.calibration import solve_calibration +from ledgrab.core.capture.calibration_session import get_calibration_session +from ledgrab.core.processing.processor_manager import ProcessorManager +from ledgrab.utils import get_logger + +logger = get_logger(__name__) +router = APIRouter() + + +# ── Session endpoints ───────────────────────────────────────────────────────── + + +@router.post( + "/api/v1/calibration/session", + response_model=CalibrationSessionStateResponse, + tags=["Calibration"], + status_code=201, +) +async def start_calibration_session( + body: CalibrationSessionStartRequest, + _auth: AuthRequired, + manager: ProcessorManager = Depends(get_processor_manager), +) -> CalibrationSessionStateResponse: + """Start a calibration session on a device. + + Stops any target currently processing on that device (it will be restored + when the session ends). Only one session can be active at a time; starting + a new one terminates the previous one first. + """ + session = get_calibration_session() + try: + await session.start(body.device_id, manager) + except ValueError as exc: + raise HTTPException(status_code=404, detail=str(exc)) + except Exception as exc: + logger.error("Failed to start calibration session: %s", exc, exc_info=True) + raise HTTPException(status_code=500, detail="Internal server error") + + return CalibrationSessionStateResponse(**session.get_state()) + + +@router.post( + "/api/v1/calibration/session/position", + response_model=CalibrationSessionStateResponse, + tags=["Calibration"], +) +async def calibration_session_position( + body: CalibrationSessionPositionRequest, + _auth: AuthRequired, + manager: ProcessorManager = Depends(get_processor_manager), # noqa: ARG001 +) -> CalibrationSessionStateResponse: + """Advance the chase pixel to a specific LED index on the active device. + + ``index`` must be 0-based and < ``led_count``. Returns 422 when out of + range (Pydantic ``ge=0``) or 400 if the session is not active / index + exceeds led_count. + """ + session = get_calibration_session() + try: + await session.position(body.index, body.window) + except RuntimeError as exc: + raise HTTPException(status_code=400, detail=str(exc)) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) + except Exception as exc: + logger.error("Failed to set calibration pixel index=%d: %s", body.index, exc, exc_info=True) + raise HTTPException(status_code=500, detail="Internal server error") + + return CalibrationSessionStateResponse(**session.get_state()) + + +@router.post( + "/api/v1/calibration/session/stop", + response_model=CalibrationSessionStateResponse, + tags=["Calibration"], +) +async def stop_calibration_session( + _auth: AuthRequired, + manager: ProcessorManager = Depends(get_processor_manager), # noqa: ARG001 +) -> CalibrationSessionStateResponse: + """End the calibration session. + + Clears the device to black and restores the previously-running target (if + any). Safe to call even when no session is active (returns inactive state). + """ + session = get_calibration_session() + try: + await session.stop() + except Exception as exc: + logger.error("Failed to stop calibration session: %s", exc, exc_info=True) + raise HTTPException(status_code=500, detail="Internal server error") + + return CalibrationSessionStateResponse(**session.get_state()) + + +@router.post( + "/api/v1/calibration/session/cancel", + response_model=CalibrationSessionStateResponse, + tags=["Calibration"], +) +async def cancel_calibration_session( + _auth: AuthRequired, + manager: ProcessorManager = Depends(get_processor_manager), # noqa: ARG001 +) -> CalibrationSessionStateResponse: + """Cancel the calibration session (alias for stop — no calibration is applied).""" + session = get_calibration_session() + try: + await session.cancel() + except Exception as exc: + logger.error("Failed to cancel calibration session: %s", exc, exc_info=True) + raise HTTPException(status_code=500, detail="Internal server error") + + return CalibrationSessionStateResponse(**session.get_state()) + + +@router.get( + "/api/v1/calibration/session/state", + response_model=CalibrationSessionStateResponse, + tags=["Calibration"], +) +async def get_calibration_session_state( + _auth: AuthRequired, +) -> CalibrationSessionStateResponse: + """Return the current calibration session state.""" + return CalibrationSessionStateResponse(**get_calibration_session().get_state()) + + +# ── Solver endpoint ─────────────────────────────────────────────────────────── + + +@router.post( + "/api/v1/calibration/solve", + response_model=CalibrationSolvedResponse, + tags=["Calibration"], +) +async def solve_calibration_endpoint( + body: CalibrationSolveRequest, + _auth: AuthRequired, + manager: ProcessorManager = Depends(get_processor_manager), +) -> CalibrationSolvedResponse: + """Solve a CalibrationConfig from 4 corner tap indices. + + Returns the computed per-edge LED counts. Does NOT persist — call + ``PUT /api/v1/color-strip-sources/{id}`` with ``calibration`` in the body + to save. + + Provide either *device_id* (preferred, server derives led_count) or + *led_count* directly. Returns 404 if *device_id* is not found, 422 on + invalid enum values, 400 on logical errors (e.g. corner_indices length). + """ + # Resolve led_count + led_count = body.led_count + if body.device_id is not None: + if body.device_id not in manager._devices: + raise HTTPException( + status_code=404, + detail=f"Device {body.device_id!r} not found", + ) + ds = manager._devices[body.device_id] + led_count = ds.led_count + + if led_count is None or led_count <= 0: + raise HTTPException( + status_code=400, + detail="led_count must be a positive integer", + ) + + try: + cfg = solve_calibration( + led_count=led_count, + start_position=body.start_position, + layout=body.layout, + corner_indices=body.corner_indices, + offset=body.offset, + ) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) + except Exception as exc: + logger.error("Failed to solve calibration: %s", exc, exc_info=True) + raise HTTPException(status_code=500, detail="Internal server error") + + return CalibrationSolvedResponse( + mode="simple", + layout=cfg.layout, + start_position=cfg.start_position, + leds_top=cfg.leds_top, + leds_right=cfg.leds_right, + leds_bottom=cfg.leds_bottom, + leds_left=cfg.leds_left, + offset=cfg.offset, + ) diff --git a/server/src/ledgrab/api/schemas/calibration.py b/server/src/ledgrab/api/schemas/calibration.py new file mode 100644 index 0000000..c13561c --- /dev/null +++ b/server/src/ledgrab/api/schemas/calibration.py @@ -0,0 +1,103 @@ +"""Pydantic schemas for the calibration session and solver API.""" + +from typing import List, Literal + +from pydantic import BaseModel, Field, model_validator + + +# ── Session lifecycle ───────────────────────────────────────────────────────── + + +class CalibrationSessionStartRequest(BaseModel): + """Request to start a calibration session on a device.""" + + device_id: str = Field(description="ID of the device to drive during calibration") + + +class CalibrationSessionPositionRequest(BaseModel): + """Request to advance the chase pixel to a specific LED index.""" + + index: int = Field(ge=0, description="LED index to illuminate (0-based)") + window: int = Field( + default=1, + ge=0, + le=10, + description="Number of dim neighbour LEDs to show on each side (0 = centre only)", + ) + + +class CalibrationSessionStateResponse(BaseModel): + """Current calibration session state.""" + + active: bool = Field(description="Whether a session is currently active") + device_id: str | None = Field(None, description="Device being driven (null if inactive)") + led_count: int = Field(0, description="LED count of the active device") + prior_target_id: str | None = Field( + None, description="Target that was running before the session (will be restored on stop)" + ) + last_activity: str | None = Field( + None, description="ISO timestamp of the last position call (null if inactive)" + ) + + +# ── Solver ──────────────────────────────────────────────────────────────────── + + +class CalibrationSolveRequest(BaseModel): + """Request to solve a CalibrationConfig from 4 corner tap indices. + + Provide either *device_id* (the server derives led_count from the device) + or *led_count* directly. *device_id* takes precedence. + """ + + device_id: str | None = Field( + None, + description=("Device ID to derive led_count from (preferred over led_count field)"), + ) + led_count: int | None = Field( + None, + ge=1, + description="Total LED count (used when device_id is not provided)", + ) + start_position: Literal["top_left", "top_right", "bottom_left", "bottom_right"] = Field( + description="Starting corner of the strip" + ) + layout: Literal["clockwise", "counterclockwise"] = Field( + description="Winding direction of the strip" + ) + corner_indices: List[int] = Field( + description=( + "Four strip indices — one per screen corner — in the strip-walk order " + "defined by (start_position, layout). Index 0 of the strip is the " + "start corner; the four tap positions are recorded in strip order " + "beginning from that start corner (the solver lays edges out from " + "led_start=0, so a non-zero physical start would require the `offset` " + "field rather than a shifted corner_indices[0])." + ), + min_length=4, + max_length=4, + ) + offset: int = Field( + default=0, + ge=0, + description="Physical LED offset (0 = no offset)", + ) + + @model_validator(mode="after") + def _require_device_or_led_count(self) -> "CalibrationSolveRequest": + if self.device_id is None and self.led_count is None: + raise ValueError("Either 'device_id' or 'led_count' must be provided") + return self + + +class CalibrationSolvedResponse(BaseModel): + """Solved calibration config in simple-mode dict form.""" + + mode: Literal["simple"] = "simple" + layout: str = Field(description="Winding direction") + start_position: str = Field(description="Starting corner") + leds_top: int = Field(ge=0, description="LEDs on the top edge") + leds_right: int = Field(ge=0, description="LEDs on the right edge") + leds_bottom: int = Field(ge=0, description="LEDs on the bottom edge") + leds_left: int = Field(ge=0, description="LEDs on the left edge") + offset: int = Field(ge=0, description="Physical LED offset") diff --git a/server/src/ledgrab/core/capture/calibration.py b/server/src/ledgrab/core/capture/calibration.py index 2b04dac..2d3af4c 100644 --- a/server/src/ledgrab/core/capture/calibration.py +++ b/server/src/ledgrab/core/capture/calibration.py @@ -668,6 +668,98 @@ def create_pixel_mapper( return PixelMapper(calibration, interpolation_mode) +def solve_calibration( + led_count: int, + start_position: str, + layout: str, + corner_indices: List[int], + offset: int = 0, +) -> "CalibrationConfig": + """Derive a CalibrationConfig from 4 corner tap indices. + + Given the LED-strip indices where the user tapped each physical corner of + the screen (in strip-walk order matching *start_position* and *layout*), + compute per-edge LED counts that are consistent with + ``EDGE_ORDER``/``EDGE_REVERSE`` and round-trip through + ``build_segments()``. + + Args: + led_count: Total number of LEDs on the strip. + start_position: Starting corner of the strip + (``"top_left"``, ``"top_right"``, ``"bottom_left"``, + ``"bottom_right"``). + layout: Winding direction (``"clockwise"`` or + ``"counterclockwise"``). + corner_indices: Four strip indices, one per screen corner, in the + same order as the strip walk defined by ``EDGE_ORDER`` for the + given *(start_position, layout)* pair. Index 0 is the start + corner, index 1 is the second corner reached while walking, + etc. Indices may wrap around (i.e. the last segment may + straddle the physical end of the strip). + offset: Physical LED offset stored directly on the config (0 = none). + + Returns: + ``CalibrationConfig`` in simple mode with per-edge counts filled in. + + Raises: + ValueError: If *start_position*, *layout*, or the number of + corner indices is invalid. + """ + key = (start_position, layout) + if key not in EDGE_ORDER: + raise ValueError( + f"Invalid start_position/layout combination: {start_position!r}/{layout!r}" + ) + if len(corner_indices) != 4: + raise ValueError(f"corner_indices must have exactly 4 entries, got {len(corner_indices)}") + if led_count <= 0: + raise ValueError(f"led_count must be positive, got {led_count}") + + edge_order = EDGE_ORDER[key] # 4 edges in strip-walk order + + # Compute per-edge LED counts from consecutive corner indices. + # The i-th edge spans from corner_indices[i] to corner_indices[(i+1) % 4], + # wrapping around led_count if necessary. + edge_counts: dict[str, int] = {} + for i, edge in enumerate(edge_order): + start_idx = corner_indices[i] % led_count + end_idx = corner_indices[(i + 1) % 4] % led_count + if end_idx > start_idx: + count = end_idx - start_idx + elif end_idx == start_idx: + # Adjacent taps on the same index → 0-LED edge + count = 0 + else: + # Wrap-around: strip crosses the physical end + count = (led_count - start_idx) + end_idx + edge_counts[edge] = count + + cfg = CalibrationConfig( + mode="simple", + layout=layout, + start_position=start_position, + leds_top=edge_counts.get("top", 0), + leds_right=edge_counts.get("right", 0), + leds_bottom=edge_counts.get("bottom", 0), + leds_left=edge_counts.get("left", 0), + offset=offset, + ) + + logger.info( + "solve_calibration: start=%s layout=%s corner_indices=%s " + "-> top=%d right=%d bottom=%d left=%d offset=%d", + start_position, + layout, + corner_indices, + cfg.leds_top, + cfg.leds_right, + cfg.leds_bottom, + cfg.leds_left, + offset, + ) + return cfg + + def create_default_calibration( led_count: int, aspect_width: int = 16, diff --git a/server/src/ledgrab/core/capture/calibration_session.py b/server/src/ledgrab/core/capture/calibration_session.py new file mode 100644 index 0000000..f80e0f5 --- /dev/null +++ b/server/src/ledgrab/core/capture/calibration_session.py @@ -0,0 +1,410 @@ +"""Calibration session lifecycle and per-LED chase driver. + +Provides two things: +1. ``set_calibration_pixel`` — direct per-index LED write for the chase + (added beside ``set_test_mode`` on ``ProcessorManager`` via the mixin, but + kept here to avoid growing device_test_mode.py further). +2. ``CalibrationSession`` — single-active-session guard with idle timeout and + guaranteed stop/restore contract. + +Stop / restore contract (required by Phase 3 UI) +------------------------------------------------- +- ``start(device_id)``: + * If a target is currently processing on *device_id*, stop it and record + its ``target_id`` as ``_prior_target_id``. + * Send the device to black (chase start state). + * Record session as active with a fresh ``last_activity`` timestamp. + * Only one active session is allowed at a time; starting a new one on any + device while another is active calls ``stop()`` on the old one first. +- ``position(index, window)``: + * Validates ``index < led_count``; raises ``ValueError`` on out-of-range. + * Sends a chase pixel (bright white centre ±window dim neighbours). + * Updates ``last_activity``. +- ``stop()`` / ``cancel()``: + * Sends all-black to clear the device. + * If ``_prior_target_id`` was recorded, calls ``start_processing`` to + restart it. + * Clears the session state. + * NEVER leaves the device dark or stuck in chase. +- Idle timeout (``IDLE_TIMEOUT_SECONDS``, default 60 s): + * A background asyncio task checks ``last_activity``; if the session has + been idle longer than the timeout, ``stop()`` is called automatically. + * The timeout task is cancelled when ``stop()`` is called explicitly. + +Notes +----- +- ``set_calibration_pixel`` reuses ``_get_idle_client`` / + ``_send_pixels_to_device`` from ``DeviceTestModeMixin``; no new connection + management is needed. +- The session holds a reference to the ``ProcessorManager`` so it can call + ``stop_processing`` / ``start_processing``. +- Thread-safety: all public methods are ``async``; the idle-timeout callback + schedules itself on the running event loop via ``asyncio.ensure_future``. +""" + +import asyncio +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +from ledgrab.utils import get_logger + +if TYPE_CHECKING: + from ledgrab.core.processing.processor_manager import ProcessorManager + +logger = get_logger(__name__) + +# ── Constants ──────────────────────────────────────────────────────────────── + +IDLE_TIMEOUT_SECONDS: int = 60 +"""Auto-stop a calibration session after this many seconds of inactivity.""" + +_CHASE_CENTER_COLOR: tuple[int, int, int] = (255, 255, 255) +"""Bright white for the chase centre pixel.""" + +_CHASE_WING_COLOR: tuple[int, int, int] = (60, 60, 60) +"""Dim grey for ±window neighbour pixels.""" + + +# ── Mixin: per-index chase driver ──────────────────────────────────────────── + + +class CalibrationChaseMixin: + """Adds ``set_calibration_pixel`` to ``ProcessorManager``. + + Requires the same host-class attributes as ``DeviceTestModeMixin``: + ``_devices``, ``_processors``, ``_idle_clients``. + Inherits ``_send_pixels_to_device`` and ``_get_idle_client`` from + ``DeviceTestModeMixin`` (both already on ``ProcessorManager``). + """ + + async def set_calibration_pixel( + self, + device_id: str, + index: int, + color: tuple[int, int, int] = _CHASE_CENTER_COLOR, + window: int = 1, + ) -> None: + """Light a single LED index (plus optional ±window neighbours) on a device. + + Sends a full pixel array to avoid partial-frame artefacts. The centre + LED is set to *color*; the ``window`` neighbours on each side are set to + ``_CHASE_WING_COLOR`` (dim grey) so the user can see which direction the + strip is wound. + + Args: + device_id: Target device ID (must be registered). + index: LED index to light (0-based). Must be < ``led_count``. + color: RGB tuple for the centre LED (default bright white). + window: Number of neighbouring LEDs to dim on each side (default 1). + + Raises: + ValueError: If *device_id* is not registered or *index* is out of + range. + """ + if device_id not in self._devices: + raise ValueError(f"Device {device_id!r} not found") + ds = self._devices[device_id] + led_count = ds.led_count + if led_count <= 0: + raise ValueError(f"Device {device_id!r} has led_count={led_count}") + if not (0 <= index < led_count): + raise ValueError( + f"index {index} out of range for device {device_id!r} " f"(led_count={led_count})" + ) + + pixels: list[tuple[int, int, int]] = [(0, 0, 0)] * led_count + pixels[index] = color + for offset in range(1, window + 1): + left = (index - offset) % led_count + right = (index + offset) % led_count + pixels[left] = _CHASE_WING_COLOR + pixels[right] = _CHASE_WING_COLOR + # Re-assign center last so on tiny strips (window >= led_count) the + # center LED always shows the full color rather than a wrapped wing. + pixels[index] = color + + await self._send_pixels_to_device(device_id, pixels) + + logger.debug( + "set_calibration_pixel: device=%s index=%d window=%d", + device_id, + index, + window, + ) + + +# ── Session lifecycle ───────────────────────────────────────────────────────── + + +class CalibrationSession: + """Single-active calibration session with idle-timeout and stop/restore. + + One instance is shared per application (singleton held by the API layer). + Only one session can be active at a time; starting a new session + automatically terminates the previous one. + + All public methods that mutate session state acquire ``_lock`` so that + concurrent ``POST /session`` calls (or a ``stop`` racing with the idle + watchdog) cannot interleave and leave ``_prior_target_id`` stale. The + watchdog calls the internal ``_teardown_locked`` helper which must only be + invoked when the lock is already held; if the lock is already taken the + watchdog simply exits, letting the holder finish teardown. + """ + + def __init__(self) -> None: + self._manager: "ProcessorManager | None" = None + self._device_id: str | None = None + self._led_count: int = 0 + self._prior_target_id: str | None = None + self._last_activity: datetime | None = None + self._timeout_task: asyncio.Task | None = None + self._active: bool = False + self._lock: asyncio.Lock = asyncio.Lock() + + # ── Public API ─────────────────────────────────────────────────────────── + + @property + def is_active(self) -> bool: + return self._active + + @property + def device_id(self) -> str | None: + return self._device_id + + @property + def led_count(self) -> int: + return self._led_count + + @property + def last_activity(self) -> datetime | None: + return self._last_activity + + async def start(self, device_id: str, manager: "ProcessorManager") -> None: + """Begin a calibration session on *device_id*. + + If a session is already active (even on a different device), it is + stopped first. If a target is currently processing on *device_id*, it + is stopped and remembered so it can be restored when this session ends. + + Args: + device_id: The device to drive during calibration. + manager: Live ``ProcessorManager`` instance. + + Raises: + ValueError: If *device_id* is not registered. + """ + async with self._lock: + # Validate device before touching any state or awaiting + if device_id not in manager._devices: + raise ValueError(f"Device {device_id!r} not found") + + ds = manager._devices[device_id] + led_count = ds.led_count + + # Capture the prior running target NOW — before any await — so the + # value cannot be mutated by a concurrent call that sneaks in after + # the lock is released between awaits. + prior_target_id = manager.get_processing_target_for_device(device_id) + + # Terminate any existing session while we still hold the lock. + # Call _teardown_locked directly (we already hold the lock). + if self._active: + logger.info( + "CalibrationSession.start: stopping existing session on device=%s " + "to start new one on device=%s", + self._device_id, + device_id, + ) + await self._teardown_locked(cancelled=False) + + # Stop any running target on this device and remember it for restore + if prior_target_id is not None: + logger.info( + "CalibrationSession.start: stopping target %s on device %s for calibration", + prior_target_id, + device_id, + ) + await manager.stop_processing(prior_target_id) + + self._manager = manager + self._device_id = device_id + self._led_count = led_count + self._prior_target_id = prior_target_id + self._last_activity = datetime.now(timezone.utc) + self._active = True + + # Clear the device to black so the chase starts from a clean state + await manager.send_clear_pixels(device_id) + + # Start idle-timeout watchdog + self._timeout_task = asyncio.ensure_future(self._idle_watchdog()) + + logger.info( + "CalibrationSession.start: session started on device=%s led_count=%d " + "prior_target=%s", + device_id, + led_count, + prior_target_id, + ) + + async def position(self, index: int, window: int = 1) -> None: + """Drive the chase pixel to *index* on the active device. + + Args: + index: LED index to illuminate (0-based, must be < led_count). + window: Number of dim neighbours on each side (default 1). + + Raises: + RuntimeError: If no session is active. + ValueError: If *index* is out of range. + """ + async with self._lock: + if not self._active or self._manager is None or self._device_id is None: + raise RuntimeError("No active calibration session") + if not (0 <= index < self._led_count): + raise ValueError(f"index {index} out of range (led_count={self._led_count})") + + self._last_activity = datetime.now(timezone.utc) + await self._manager.set_calibration_pixel(self._device_id, index, window=window) + + logger.debug( + "CalibrationSession.position: device=%s index=%d window=%d", + self._device_id, + index, + window, + ) + + async def stop(self) -> None: + """End the session: clear the device and restore the prior target. + + Safe to call even if no session is active (no-op). + """ + async with self._lock: + await self._teardown_locked(cancelled=False) + + async def cancel(self) -> None: + """Alias for ``stop()`` — ends the session without applying calibration.""" + async with self._lock: + await self._teardown_locked(cancelled=True) + + def get_state(self) -> dict: + """Return a snapshot of the current session state for API responses.""" + return { + "active": self._active, + "device_id": self._device_id, + "led_count": self._led_count, + "prior_target_id": self._prior_target_id, + "last_activity": (self._last_activity.isoformat() if self._last_activity else None), + } + + # ── Internal ───────────────────────────────────────────────────────────── + + async def _teardown_locked(self, cancelled: bool) -> None: + """Clear the device, restore the prior target, and reset state. + + MUST be called with ``self._lock`` already held by the caller. + Safe to call when already inactive (no-op). + """ + if not self._active: + return + + device_id = self._device_id + manager = self._manager + prior_target_id = self._prior_target_id + + # Cancel the idle watchdog — but only if we are NOT running inside it. + # Awaiting the current task would deadlock. + if ( + self._timeout_task is not None + and self._timeout_task is not asyncio.current_task() + and not self._timeout_task.done() + ): + self._timeout_task.cancel() + try: + await self._timeout_task + except asyncio.CancelledError: + pass + self._timeout_task = None + + # Reset state before side-effects so re-entrant calls are no-ops + self._active = False + self._device_id = None + self._led_count = 0 + self._prior_target_id = None + self._last_activity = None + self._manager = None + + if manager is None or device_id is None: + return + + # 1. Clear the device to black + try: + await manager.send_clear_pixels(device_id) + except Exception as exc: + logger.warning( + "CalibrationSession._teardown: failed to clear pixels on %s: %s", + device_id, + exc, + ) + + # 2. Restore the prior target (if any) + if prior_target_id is not None: + try: + await manager.start_processing(prior_target_id) + logger.info( + "CalibrationSession._teardown: restored target %s on device %s", + prior_target_id, + device_id, + ) + except Exception as exc: + logger.error( + "CalibrationSession._teardown: failed to restore target %s on " "device %s: %s", + prior_target_id, + device_id, + exc, + ) + + action = "cancel" if cancelled else "stop" + logger.info( + "CalibrationSession.%s: session ended on device=%s prior_target=%s", + action, + device_id, + prior_target_id, + ) + + async def _idle_watchdog(self) -> None: + """Background task: auto-stop the session after IDLE_TIMEOUT_SECONDS. + + Tries to acquire ``_lock`` when the timeout fires. If the lock is + already held (e.g. a concurrent ``stop()`` is in progress) the + ``acquire`` will wait; once it gets the lock, ``_teardown_locked`` + is a no-op if the session was already ended by the other caller. + """ + try: + while True: + await asyncio.sleep(5) + if not self._active or self._last_activity is None: + break + elapsed = (datetime.now(timezone.utc) - self._last_activity).total_seconds() + if elapsed >= IDLE_TIMEOUT_SECONDS: + logger.warning( + "CalibrationSession._idle_watchdog: session on device=%s " + "idle for %.0fs — auto-stopping", + self._device_id, + elapsed, + ) + async with self._lock: + await self._teardown_locked(cancelled=False) + break + except asyncio.CancelledError: + pass + + +# ── Module-level singleton ──────────────────────────────────────────────────── + +_session: CalibrationSession = CalibrationSession() + + +def get_calibration_session() -> CalibrationSession: + """Return the module-level singleton ``CalibrationSession``.""" + return _session diff --git a/server/src/ledgrab/core/processing/processor_manager.py b/server/src/ledgrab/core/processing/processor_manager.py index 5ddeda3..81b859c 100644 --- a/server/src/ledgrab/core/processing/processor_manager.py +++ b/server/src/ledgrab/core/processing/processor_manager.py @@ -44,6 +44,7 @@ from ledgrab.core.processing.sync_clock_manager import SyncClockManager from ledgrab.core.weather.weather_manager import WeatherManager from ledgrab.core.processing.device_health import DeviceHealthMixin from ledgrab.core.processing.device_test_mode import DeviceTestModeMixin +from ledgrab.core.capture.calibration_session import CalibrationChaseMixin from ledgrab.utils import get_logger logger = get_logger(__name__) @@ -106,7 +107,9 @@ class DeviceState: zone_mode: str = "combined" -class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin): +class ProcessorManager( + AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin, CalibrationChaseMixin +): """Manages devices and delegates target processing to TargetProcessor instances. Devices are registered for health monitoring. diff --git a/server/tests/api/routes/test_calibration_routes.py b/server/tests/api/routes/test_calibration_routes.py new file mode 100644 index 0000000..e62a970 --- /dev/null +++ b/server/tests/api/routes/test_calibration_routes.py @@ -0,0 +1,354 @@ +"""Happy-path and bounds-validation tests for calibration API routes. + +Runs with the full app test-client stack but mocks the ProcessorManager +so no real LED devices are required. + +Note: Deep adversarial coverage is deferred to the Phase 4 test-writer +(Big Bang strategy). +""" + +from __future__ import annotations + +import pytest +import pytest_asyncio + +from unittest.mock import AsyncMock, MagicMock + +from fastapi import FastAPI +from httpx import AsyncClient, ASGITransport + +from ledgrab.api.routes.calibration import router +from ledgrab.core.capture.calibration_session import get_calibration_session + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def mock_manager() -> MagicMock: + """A minimal fake ProcessorManager.""" + mgr = MagicMock() + # Simulate a registered device with 100 LEDs + ds = MagicMock() + ds.led_count = 100 + mgr._devices = {"dev1": ds} + mgr.get_processing_target_for_device = MagicMock(return_value=None) + mgr.stop_processing = AsyncMock() + mgr.start_processing = AsyncMock() + mgr.send_clear_pixels = AsyncMock() + mgr.set_calibration_pixel = AsyncMock() + return mgr + + +@pytest_asyncio.fixture(autouse=True) +async def reset_session(): + """Reset the module-level CalibrationSession singleton before each test.""" + + import asyncio + + def _clear(session) -> None: + session._active = False + session._device_id = None + session._led_count = 0 + session._prior_target_id = None + session._last_activity = None + session._manager = None + # Reset lock so a test that aborted mid-await doesn't leave it locked + session._lock = asyncio.Lock() + + session = get_calibration_session() + # Cancel any leftover watchdog task before clearing + if session._timeout_task and not session._timeout_task.done(): + session._timeout_task.cancel() + try: + await session._timeout_task + except Exception: + pass + session._timeout_task = None + _clear(session) + + yield + + # Cleanup after test + if session._timeout_task and not session._timeout_task.done(): + session._timeout_task.cancel() + try: + await session._timeout_task + except Exception: + pass + session._timeout_task = None + _clear(session) + + +@pytest.fixture() +def app(mock_manager: MagicMock) -> FastAPI: + """Tiny FastAPI app with only the calibration router and auth disabled.""" + from fastapi import FastAPI + from ledgrab.api.auth import verify_api_key + from ledgrab.api import dependencies as deps_mod + + _app = FastAPI() + _app.include_router(router) + # Override the underlying dependency that AuthRequired resolves to + _app.dependency_overrides[verify_api_key] = lambda: "test-token" + _app.dependency_overrides[deps_mod.get_processor_manager] = lambda: mock_manager + return _app + + +@pytest_asyncio.fixture() +async def client(app: FastAPI): + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: + yield c + + +# --------------------------------------------------------------------------- +# Session start +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_start_session_success(client: AsyncClient, mock_manager: MagicMock): + resp = await client.post("/api/v1/calibration/session", json={"device_id": "dev1"}) + assert resp.status_code == 201 + data = resp.json() + assert data["active"] is True + assert data["device_id"] == "dev1" + assert data["led_count"] == 100 + mock_manager.send_clear_pixels.assert_awaited_once_with("dev1") + + +@pytest.mark.asyncio +async def test_start_session_unknown_device(client: AsyncClient): + resp = await client.post("/api/v1/calibration/session", json={"device_id": "does_not_exist"}) + assert resp.status_code == 404 + + +# --------------------------------------------------------------------------- +# Session position +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_position_success(client: AsyncClient, mock_manager: MagicMock): + # Start session first + await client.post("/api/v1/calibration/session", json={"device_id": "dev1"}) + resp = await client.post( + "/api/v1/calibration/session/position", json={"index": 42, "window": 2} + ) + assert resp.status_code == 200 + data = resp.json() + assert data["active"] is True + mock_manager.set_calibration_pixel.assert_awaited_with("dev1", 42, window=2) + + +@pytest.mark.asyncio +async def test_position_out_of_range(client: AsyncClient, mock_manager: MagicMock): + """index >= led_count → 400.""" + await client.post("/api/v1/calibration/session", json={"device_id": "dev1"}) + resp = await client.post( + "/api/v1/calibration/session/position", json={"index": 100, "window": 1} + ) + assert resp.status_code == 400 + + +@pytest.mark.asyncio +async def test_position_negative_index_422(client: AsyncClient): + """index < 0 → Pydantic 422.""" + resp = await client.post( + "/api/v1/calibration/session/position", json={"index": -1, "window": 1} + ) + assert resp.status_code == 422 + + +@pytest.mark.asyncio +async def test_position_no_active_session(client: AsyncClient): + """Calling position without starting a session → 400.""" + resp = await client.post("/api/v1/calibration/session/position", json={"index": 5, "window": 1}) + assert resp.status_code == 400 + + +# --------------------------------------------------------------------------- +# Session stop +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_stop_session_clears_device(client: AsyncClient, mock_manager: MagicMock): + await client.post("/api/v1/calibration/session", json={"device_id": "dev1"}) + resp = await client.post("/api/v1/calibration/session/stop") + assert resp.status_code == 200 + data = resp.json() + assert data["active"] is False + # send_clear_pixels called at start AND at stop + assert mock_manager.send_clear_pixels.await_count == 2 + + +@pytest.mark.asyncio +async def test_stop_restores_prior_target(client: AsyncClient, mock_manager: MagicMock): + """When a target was running, stop should restart it.""" + mock_manager.get_processing_target_for_device = MagicMock(return_value="tgt1") + await client.post("/api/v1/calibration/session", json={"device_id": "dev1"}) + await client.post("/api/v1/calibration/session/stop") + mock_manager.start_processing.assert_awaited_once_with("tgt1") + + +@pytest.mark.asyncio +async def test_stop_no_active_session_is_ok(client: AsyncClient): + """stop when inactive → 200 with active=False.""" + resp = await client.post("/api/v1/calibration/session/stop") + assert resp.status_code == 200 + assert resp.json()["active"] is False + + +# --------------------------------------------------------------------------- +# Session state +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_get_state_inactive(client: AsyncClient): + resp = await client.get("/api/v1/calibration/session/state") + assert resp.status_code == 200 + assert resp.json()["active"] is False + + +@pytest.mark.asyncio +async def test_get_state_active(client: AsyncClient, mock_manager: MagicMock): + await client.post("/api/v1/calibration/session", json={"device_id": "dev1"}) + resp = await client.get("/api/v1/calibration/session/state") + assert resp.status_code == 200 + assert resp.json()["active"] is True + + +# --------------------------------------------------------------------------- +# Solve endpoint +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_solve_with_device_id(client: AsyncClient): + resp = await client.post( + "/api/v1/calibration/solve", + json={ + "device_id": "dev1", + "start_position": "bottom_left", + "layout": "clockwise", + "corner_indices": [0, 30, 60, 80], + }, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["mode"] == "simple" + # bottom_left/clockwise EDGE_ORDER: left, top, right, bottom + # left=30, top=30, right=20, bottom=20 → total=100 + assert data["leds_left"] == 30 + assert data["leds_top"] == 30 + assert data["leds_right"] == 20 + assert data["leds_bottom"] == 20 + assert data["layout"] == "clockwise" + assert data["start_position"] == "bottom_left" + + +@pytest.mark.asyncio +async def test_solve_with_led_count(client: AsyncClient): + resp = await client.post( + "/api/v1/calibration/solve", + json={ + "led_count": 80, + "start_position": "top_left", + "layout": "clockwise", + "corner_indices": [0, 20, 40, 60], + }, + ) + assert resp.status_code == 200 + data = resp.json() + assert sum([data["leds_top"], data["leds_right"], data["leds_bottom"], data["leds_left"]]) == 80 + + +@pytest.mark.asyncio +async def test_solve_missing_device_and_led_count(client: AsyncClient): + """Omitting both device_id and led_count → 422 (model validator).""" + resp = await client.post( + "/api/v1/calibration/solve", + json={ + "start_position": "bottom_left", + "layout": "clockwise", + "corner_indices": [0, 25, 50, 75], + }, + ) + assert resp.status_code == 422 + + +@pytest.mark.asyncio +async def test_solve_unknown_device(client: AsyncClient): + resp = await client.post( + "/api/v1/calibration/solve", + json={ + "device_id": "no_such_device", + "start_position": "bottom_left", + "layout": "clockwise", + "corner_indices": [0, 25, 50, 75], + }, + ) + assert resp.status_code == 404 + + +@pytest.mark.asyncio +async def test_solve_invalid_start_position_422(client: AsyncClient): + resp = await client.post( + "/api/v1/calibration/solve", + json={ + "led_count": 100, + "start_position": "invalid_corner", + "layout": "clockwise", + "corner_indices": [0, 25, 50, 75], + }, + ) + assert resp.status_code == 422 + + +@pytest.mark.asyncio +async def test_solve_invalid_layout_422(client: AsyncClient): + resp = await client.post( + "/api/v1/calibration/solve", + json={ + "led_count": 100, + "start_position": "bottom_left", + "layout": "diagonal", + "corner_indices": [0, 25, 50, 75], + }, + ) + assert resp.status_code == 422 + + +@pytest.mark.asyncio +async def test_solve_wrong_corner_count_422(client: AsyncClient): + """Only 3 corner indices → 422 (min_length=4).""" + resp = await client.post( + "/api/v1/calibration/solve", + json={ + "led_count": 100, + "start_position": "bottom_left", + "layout": "clockwise", + "corner_indices": [0, 25, 50], + }, + ) + assert resp.status_code == 422 + + +@pytest.mark.asyncio +async def test_solve_with_offset(client: AsyncClient): + resp = await client.post( + "/api/v1/calibration/solve", + json={ + "led_count": 100, + "start_position": "bottom_left", + "layout": "clockwise", + "corner_indices": [0, 25, 50, 75], + "offset": 7, + }, + ) + assert resp.status_code == 200 + assert resp.json()["offset"] == 7 diff --git a/server/tests/core/test_calibration_solver.py b/server/tests/core/test_calibration_solver.py new file mode 100644 index 0000000..9957ef5 --- /dev/null +++ b/server/tests/core/test_calibration_solver.py @@ -0,0 +1,315 @@ +"""Unit tests for solve_calibration() — pure logic, runs in isolation. + +Tests cover: +- All 8 (start_position × layout) combinations +- 0-LED edge (two corners tapped adjacent) +- offset pass-through +- Round-trip through build_segments() +- Wrap-around (corner_indices straddle the 0/led_count boundary) +""" + +import pytest + +from ledgrab.core.capture.calibration import ( + EDGE_ORDER, + CalibrationConfig, + solve_calibration, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _assert_roundtrip(cfg: CalibrationConfig) -> None: + """build_segments() must not crash and must cover the expected LED count.""" + segs = cfg.build_segments() + total_from_segs = sum(s.led_count for s in segs) + expected = cfg.leds_top + cfg.leds_right + cfg.leds_bottom + cfg.leds_left + assert total_from_segs == expected, ( + f"Segment total {total_from_segs} != field total {expected} " f"for cfg={cfg!r}" + ) + + +def _edge_counts(cfg: CalibrationConfig) -> dict[str, int]: + return { + "top": cfg.leds_top, + "right": cfg.leds_right, + "bottom": cfg.leds_bottom, + "left": cfg.leds_left, + } + + +# --------------------------------------------------------------------------- +# Basic: bottom_left / clockwise (canonical case) +# --------------------------------------------------------------------------- + + +class TestBottomLeftClockwise: + """start_position=bottom_left, layout=clockwise. + + EDGE_ORDER: ["left", "top", "right", "bottom"] + Strip walk: LED 0 is at bottom-left corner, goes UP the left edge, + across the top, DOWN the right, and back along the bottom. + + Corner indices for a 100-LED, 20/30/20/30 (L/T/R/B) layout: + bottom_left -> 0 + top_left -> 20 (after left edge) + top_right -> 50 (after top edge) + bottom_right -> 70 (after right edge) + """ + + START = "bottom_left" + LAYOUT = "clockwise" + LED_COUNT = 100 + + def _make_corner_indices(self) -> list[int]: + # left=20, top=30, right=20, bottom=30 + return [0, 20, 50, 70] # BL, TL, TR, BR + + def test_basic_counts(self): + cfg = solve_calibration( + led_count=self.LED_COUNT, + start_position=self.START, + layout=self.LAYOUT, + corner_indices=self._make_corner_indices(), + ) + counts = _edge_counts(cfg) + assert counts["left"] == 20 + assert counts["top"] == 30 + assert counts["right"] == 20 + assert counts["bottom"] == 30 + + def test_start_position_preserved(self): + cfg = solve_calibration( + led_count=self.LED_COUNT, + start_position=self.START, + layout=self.LAYOUT, + corner_indices=self._make_corner_indices(), + ) + assert cfg.start_position == self.START + + def test_layout_preserved(self): + cfg = solve_calibration( + led_count=self.LED_COUNT, + start_position=self.START, + layout=self.LAYOUT, + corner_indices=self._make_corner_indices(), + ) + assert cfg.layout == self.LAYOUT + + def test_roundtrip(self): + cfg = solve_calibration( + led_count=self.LED_COUNT, + start_position=self.START, + layout=self.LAYOUT, + corner_indices=self._make_corner_indices(), + ) + _assert_roundtrip(cfg) + + def test_offset_passthrough(self): + cfg = solve_calibration( + led_count=self.LED_COUNT, + start_position=self.START, + layout=self.LAYOUT, + corner_indices=self._make_corner_indices(), + offset=5, + ) + assert cfg.offset == 5 + _assert_roundtrip(cfg) + + +# --------------------------------------------------------------------------- +# All 8 combinations: smoke test (round-trip + total == led_count) +# --------------------------------------------------------------------------- + + +ALL_CORNERS: dict[str, list[str]] = { + # start_position: [BL, TL, TR, BR] corners in the order they appear on the strip + # for layout=clockwise. We use 100 LEDs with 25 per edge for simplicity. + "bottom_left": ["BL", "TL", "TR", "BR"], + "top_left": ["TL", "TR", "BR", "BL"], + "top_right": ["TR", "BR", "BL", "TL"], + "bottom_right": ["BR", "BL", "TL", "TR"], +} + + +# For each start_position × layout, what are the 4 corner indices +# when all edges have 25 LEDs (100 total)? +# EDGE_ORDER for (start, "clockwise") gives the edge walk sequence. +# We map corner names to indices by placing them at the boundaries. +def _corner_indices_25_each(start_position: str, layout: str) -> list[int]: + """ + Build corner indices assuming all 4 edges have exactly 25 LEDs. + Returns [start_corner, second_corner, third_corner, fourth_corner] + following the strip walk order defined by EDGE_ORDER. + + The corners of the screen are: + top_left=TL, top_right=TR, bottom_left=BL, bottom_right=BR + + Each edge start-corner is at the leading edge index; its end-corner + is at that index + led_count of that edge (mod 100). + """ + key = (start_position, layout) + order = EDGE_ORDER[key] # e.g. ["left","top","right","bottom"] + + # Map edge names to their start and end screen corners + # Corner positions: start corner of each edge in strip order + result = [] + led_pos = 0 + for edge in order: + result.append(led_pos) + led_pos += 25 + return result + + +@pytest.mark.parametrize("start_position", list(EDGE_ORDER)) +def test_all_combinations_roundtrip_25_each(start_position): + """All 8 (start, layout) combos with 25 LEDs/edge must round-trip.""" + start_pos_str, layout = start_position # unpack tuple key + indices = _corner_indices_25_each(start_pos_str, layout) + cfg = solve_calibration( + led_count=100, + start_position=start_pos_str, + layout=layout, + corner_indices=indices, + ) + counts = _edge_counts(cfg) + assert ( + sum(counts.values()) == 100 + ), f"{start_pos_str}/{layout}: total LEDs {sum(counts.values())} != 100" + assert all( + v == 25 for v in counts.values() + ), f"{start_pos_str}/{layout}: edge counts {counts} not all 25" + _assert_roundtrip(cfg) + + +# --------------------------------------------------------------------------- +# 0-LED edge: two corners tapped adjacent (one edge has 0 LEDs) +# --------------------------------------------------------------------------- + + +class TestZeroLedEdge: + """When two consecutive corner taps are the same index, that edge has 0 LEDs.""" + + def test_zero_bottom_edge(self): + """ + bottom_left / clockwise, 100 LEDs. + EDGE_ORDER: left, top, right, bottom + Tap top-left and bottom-right at the same index → bottom edge = 0 + We place BL=0, TL=40, TR=70, BR=70 (top=30, right=0 would be wrong; + let's use BL=0, TL=25, TR=65, BR=90 for bottom=10, then make left=right=40) + Actually: make right edge 0: BL=0, TL=40, TR=60, BR=60 + """ + # EDGE_ORDER for bottom_left/clockwise: ["left","top","right","bottom"] + # Strip indices: left 0..39 (40 LEDs), top 40..59 (20 LEDs), right 60..59 (0 LEDs!), bottom 60..99 (40 LEDs) + cfg = solve_calibration( + led_count=100, + start_position="bottom_left", + layout="clockwise", + corner_indices=[0, 40, 60, 60], # BL, TL, TR, BR — right=0 + ) + counts = _edge_counts(cfg) + assert counts["left"] == 40 + assert counts["top"] == 20 + assert counts["right"] == 0 + assert counts["bottom"] == 40 + assert sum(counts.values()) == 100 + _assert_roundtrip(cfg) + + def test_zero_first_edge(self): + """First edge (left) can also be 0 if corners 0 and 1 are the same.""" + # EDGE_ORDER bottom_left/clockwise: ["left","top","right","bottom"] + # If BL==TL, left edge has 0 LEDs + cfg = solve_calibration( + led_count=60, + start_position="bottom_left", + layout="clockwise", + corner_indices=[0, 0, 20, 40], # BL=TL, left=0 + ) + counts = _edge_counts(cfg) + assert counts["left"] == 0 + assert counts["top"] == 20 + assert counts["right"] == 20 + assert counts["bottom"] == 20 + assert sum(counts.values()) == 60 + _assert_roundtrip(cfg) + + +# --------------------------------------------------------------------------- +# Wrap-around: last corner index < first (straddles the 0 boundary) +# --------------------------------------------------------------------------- + + +class TestWrapAround: + """When the strip wraps: the last segment spans from some index to led_count, + then continues from 0 to the start corner. This can happen if the user + provides indices that wrap around the physical end of the strip. + """ + + def test_wrap_around_bottom_edge(self): + """ + bottom_left / clockwise, 100 LEDs. + EDGE_ORDER: left, top, right, bottom. + If the user taps: BL=80, TL=10, TR=40, BR=60 (wraps) + -> left: 80..10 = (100-80)+10 = 30 + -> top: 10..40 = 30 + -> right:40..60 = 20 + -> bottom:60..80 = 20 + """ + cfg = solve_calibration( + led_count=100, + start_position="bottom_left", + layout="clockwise", + corner_indices=[80, 10, 40, 60], + ) + counts = _edge_counts(cfg) + assert counts["left"] == 30 + assert counts["top"] == 30 + assert counts["right"] == 20 + assert counts["bottom"] == 20 + assert sum(counts.values()) == 100 + _assert_roundtrip(cfg) + + +# --------------------------------------------------------------------------- +# Offset +# --------------------------------------------------------------------------- + + +class TestOffset: + def test_offset_stored_correctly(self): + cfg = solve_calibration( + led_count=100, + start_position="top_left", + layout="clockwise", + corner_indices=[0, 25, 50, 75], + offset=10, + ) + assert cfg.offset == 10 + _assert_roundtrip(cfg) + + def test_offset_default_zero(self): + cfg = solve_calibration( + led_count=100, + start_position="top_left", + layout="clockwise", + corner_indices=[0, 25, 50, 75], + ) + assert cfg.offset == 0 + + +# --------------------------------------------------------------------------- +# Mode is always "simple" +# --------------------------------------------------------------------------- + + +def test_solve_returns_simple_mode(): + cfg = solve_calibration( + led_count=80, + start_position="top_right", + layout="counterclockwise", + corner_indices=[0, 20, 40, 60], + ) + assert cfg.mode == "simple"