Add test preview for color strip sources with LED strip and rectangle views

New WebSocket endpoint streams real-time RGB frames from any CSS source.
Generic sources show a horizontal LED strip canvas. Picture sources show
a rectangle with per-edge canvases matching the calibration layout.

Server computes exact output indices per edge (offset + reverse + CW/CCW)
so the frontend renders edges in correct visual orientation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-12 19:18:36 +03:00
parent 0e270685e8
commit bebdfcf319
9 changed files with 333 additions and 3 deletions

View File

@@ -1,5 +1,7 @@
"""Color strip source routes: CRUD, calibration test, and API input push."""
"""Color strip source routes: CRUD, calibration test, preview, and API input push."""
import asyncio
import json as _json
import secrets
import numpy as np
@@ -667,3 +669,94 @@ async def css_api_input_ws(
logger.error(f"API input WebSocket error for source {source_id}: {e}")
finally:
logger.info(f"API input WebSocket disconnected for source {source_id}")
# ── Test / Preview WebSocket ──────────────────────────────────────────
@router.websocket("/api/v1/color-strip-sources/{source_id}/test/ws")
async def test_color_strip_ws(
websocket: WebSocket,
source_id: str,
token: str = Query(""),
led_count: int = Query(100),
):
"""WebSocket for real-time CSS source preview. Auth via ``?token=<api_key>``.
First message is JSON metadata (source_type, led_count, calibration segments).
Subsequent messages are binary RGB frames (``led_count * 3`` bytes).
"""
# Authenticate
authenticated = False
cfg = get_config()
if token and cfg.auth.api_keys:
for _label, api_key in cfg.auth.api_keys.items():
if secrets.compare_digest(token, api_key):
authenticated = True
break
if not authenticated:
await websocket.close(code=4001, reason="Unauthorized")
return
# Validate source exists
store: ColorStripStore = get_color_strip_store()
try:
source = store.get_source(source_id)
except ValueError as e:
await websocket.close(code=4004, reason=str(e))
return
# Acquire stream
manager: ProcessorManager = get_processor_manager()
csm = manager.color_strip_stream_manager
consumer_id = "__test__"
try:
stream = csm.acquire(source_id, consumer_id)
except Exception as e:
logger.error(f"CSS test: failed to acquire stream for {source_id}: {e}")
await websocket.close(code=4003, reason=str(e))
return
# Configure LED count for auto-sizing streams
if hasattr(stream, "configure"):
stream.configure(max(1, led_count))
await websocket.accept()
logger.info(f"CSS test WebSocket connected for {source_id}")
try:
# Send metadata as first message
is_picture = isinstance(source, (PictureColorStripSource, AdvancedPictureColorStripSource))
meta: dict = {
"type": "meta",
"source_type": source.source_type,
"led_count": stream.led_count,
}
if is_picture and stream.calibration:
cal = stream.calibration
total = cal.get_total_leds()
offset = cal.offset % total if total > 0 else 0
edges = []
for seg in cal.segments:
# Compute output indices matching PixelMapper logic
indices = list(range(seg.led_start, seg.led_start + seg.led_count))
if seg.reverse:
indices = indices[::-1]
if offset > 0:
indices = [(idx + offset) % total for idx in indices]
edges.append({"edge": seg.edge, "indices": indices})
meta["edges"] = edges
await websocket.send_text(_json.dumps(meta))
# Stream binary RGB frames at ~20 Hz
while True:
colors = stream.get_latest_colors()
if colors is not None:
await websocket.send_bytes(colors.tobytes())
await asyncio.sleep(0.05)
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"CSS test WebSocket error for {source_id}: {e}")
finally:
csm.release(source_id, consumer_id)
logger.info(f"CSS test WebSocket disconnected for {source_id}")