Files
wled-screen-controller-mixed/server/src/wled_controller/api/routes/templates.py
alexei.dolgolyov f2871319cb
Some checks failed
Lint & Test / test (push) Failing after 48s
refactor: comprehensive code quality, security, and release readiness improvements
Security: tighten CORS defaults, add webhook rate limiting, fix XSS in
automations, guard WebSocket JSON.parse, validate ADB address input,
seal debug exception leak, URL-encode WS tokens, CSS.escape in selectors.

Code quality: add Pydantic models for brightness/power endpoints, fix
thread safety and name uniqueness in DeviceStore, immutable update
pattern, split 6 oversized files into 16 focused modules, enable
TypeScript strictNullChecks (741→102 errors), type state variables,
add dom-utils helper, migrate 3 modules from inline onclick to event
delegation, ProcessorDependencies dataclass.

Performance: async store saves, health endpoint log level, command
palette debounce, optimized entity-events comparison, fix service
worker precache list.

Testing: expand from 45 to 293 passing tests — add store tests (141),
route tests (25), core logic tests (42), E2E flow tests (33), organize
into tests/api/, tests/storage/, tests/core/, tests/e2e/.

DevOps: CI test pipeline, pre-commit config, Dockerfile multi-stage
build with non-root user and health check, docker-compose improvements,
version bump to 0.2.0.

Docs: rewrite CLAUDE.md (202→56 lines), server/CLAUDE.md (212→76),
create contexts/server-operations.md, fix .js→.ts references, fix env
var prefix in README, rewrite INSTALLATION.md, add CONTRIBUTING.md and
.env.example.
2026-03-22 00:38:28 +03:00

544 lines
19 KiB
Python

"""Capture template, engine, and filter routes."""
import base64
import io
import time
import numpy as np
from PIL import Image
from fastapi import APIRouter, HTTPException, Depends, Query, WebSocket, WebSocketDisconnect
from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
fire_entity_event,
get_cspt_store,
get_picture_source_store,
get_pp_template_store,
get_template_store,
)
from wled_controller.api.schemas.common import (
CaptureImage,
PerformanceMetrics,
TemplateTestResponse,
)
from wled_controller.api.schemas.templates import (
EngineInfo,
EngineListResponse,
TemplateCreate,
TemplateListResponse,
TemplateResponse,
TemplateTestRequest,
TemplateUpdate,
)
from wled_controller.api.schemas.filters import (
FilterOptionDefSchema,
FilterTypeListResponse,
FilterTypeResponse,
)
from wled_controller.core.capture_engines import EngineRegistry
from wled_controller.core.filters import FilterRegistry
from wled_controller.storage.template_store import TemplateStore
from wled_controller.storage.picture_source_store import PictureSourceStore
from wled_controller.storage.picture_source import ScreenCapturePictureSource
from wled_controller.utils import get_logger
from wled_controller.storage.base_store import EntityNotFoundError
logger = get_logger(__name__)
router = APIRouter()
# ===== CAPTURE TEMPLATE ENDPOINTS =====
@router.get("/api/v1/capture-templates", response_model=TemplateListResponse, tags=["Templates"])
async def list_templates(
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""List all capture templates."""
try:
templates = template_store.get_all_templates()
template_responses = [
TemplateResponse(
id=t.id,
name=t.name,
engine_type=t.engine_type,
engine_config=t.engine_config,
tags=t.tags,
created_at=t.created_at,
updated_at=t.updated_at,
description=t.description,
)
for t in templates
]
return TemplateListResponse(
templates=template_responses,
count=len(template_responses),
)
except Exception as e:
logger.error(f"Failed to list templates: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/capture-templates", response_model=TemplateResponse, tags=["Templates"], status_code=201)
async def create_template(
template_data: TemplateCreate,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""Create a new capture template."""
try:
template = template_store.create_template(
name=template_data.name,
engine_type=template_data.engine_type,
engine_config=template_data.engine_config,
description=template_data.description,
tags=template_data.tags,
)
fire_entity_event("capture_template", "created", template.id)
return TemplateResponse(
id=template.id,
name=template.name,
engine_type=template.engine_type,
engine_config=template.engine_config,
tags=template.tags,
created_at=template.created_at,
updated_at=template.updated_at,
description=template.description,
)
except EntityNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to create template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/capture-templates/{template_id}", response_model=TemplateResponse, tags=["Templates"])
async def get_template(
template_id: str,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""Get template by ID."""
try:
template = template_store.get_template(template_id)
except ValueError:
raise HTTPException(status_code=404, detail=f"Template {template_id} not found")
return TemplateResponse(
id=template.id,
name=template.name,
engine_type=template.engine_type,
engine_config=template.engine_config,
tags=template.tags,
created_at=template.created_at,
updated_at=template.updated_at,
description=template.description,
)
@router.put("/api/v1/capture-templates/{template_id}", response_model=TemplateResponse, tags=["Templates"])
async def update_template(
template_id: str,
update_data: TemplateUpdate,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""Update a template."""
try:
template = template_store.update_template(
template_id=template_id,
name=update_data.name,
engine_type=update_data.engine_type,
engine_config=update_data.engine_config,
description=update_data.description,
tags=update_data.tags,
)
fire_entity_event("capture_template", "updated", template_id)
return TemplateResponse(
id=template.id,
name=template.name,
engine_type=template.engine_type,
engine_config=template.engine_config,
tags=template.tags,
created_at=template.created_at,
updated_at=template.updated_at,
description=template.description,
)
except EntityNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to update template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/capture-templates/{template_id}", status_code=204, tags=["Templates"])
async def delete_template(
template_id: str,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
stream_store: PictureSourceStore = Depends(get_picture_source_store),
):
"""Delete a template.
Validates that no streams are currently using this template before deletion.
"""
try:
# Check if any streams are using this template
streams_using_template = []
for stream in stream_store.get_all_streams():
if isinstance(stream, ScreenCapturePictureSource) and stream.capture_template_id == template_id:
streams_using_template.append(stream.name)
if streams_using_template:
stream_list = ", ".join(streams_using_template)
raise HTTPException(
status_code=409,
detail=f"Cannot delete template: it is used by the following stream(s): {stream_list}. "
f"Please reassign these streams to a different template before deleting."
)
# Proceed with deletion
template_store.delete_template(template_id)
fire_entity_event("capture_template", "deleted", template_id)
except HTTPException:
raise # Re-raise HTTP exceptions as-is
except EntityNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to delete template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/capture-engines", response_model=EngineListResponse, tags=["Templates"])
async def list_engines(_auth: AuthRequired):
"""List all registered capture engines.
Returns every registered engine with an ``available`` flag showing
whether it can be used on the current system.
"""
try:
available_set = set(EngineRegistry.get_available_engines())
all_engines = EngineRegistry.get_all_engines()
engines = []
for engine_type, engine_class in all_engines.items():
engines.append(
EngineInfo(
type=engine_type,
name=engine_type.upper(),
default_config=engine_class.get_default_config(),
available=(engine_type in available_set),
has_own_displays=getattr(engine_class, 'HAS_OWN_DISPLAYS', False),
)
)
return EngineListResponse(engines=engines, count=len(engines))
except Exception as e:
logger.error(f"Failed to list engines: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/capture-templates/test", response_model=TemplateTestResponse, tags=["Templates"])
def test_template(
test_request: TemplateTestRequest,
_auth: AuthRequired,
):
"""Test a capture template configuration.
Uses sync ``def`` so FastAPI runs it in a thread pool — the engine
initialisation and capture loop are blocking and would stall the
event loop if run in an ``async def`` handler.
Temporarily instantiates an engine with the provided configuration,
captures frames for the specified duration, and returns actual FPS metrics.
"""
stream = None
try:
# Validate engine type
if test_request.engine_type not in EngineRegistry.get_available_engines():
raise HTTPException(
status_code=400,
detail=f"Engine '{test_request.engine_type}' is not available on this system"
)
# Create and initialize capture stream
stream = EngineRegistry.create_stream(
test_request.engine_type, test_request.display_index, test_request.engine_config
)
stream.initialize()
# Run sustained capture test
logger.info(f"Starting {test_request.capture_duration}s capture test with {test_request.engine_type}")
frame_count = 0
total_capture_time = 0.0
last_frame = None
start_time = time.perf_counter()
end_time = start_time + test_request.capture_duration
while time.perf_counter() < end_time:
capture_start = time.perf_counter()
screen_capture = stream.capture_frame()
capture_elapsed = time.perf_counter() - capture_start
# Skip if no new frame (screen unchanged); yield CPU
if screen_capture is None:
time.sleep(0.005)
continue
total_capture_time += capture_elapsed
frame_count += 1
last_frame = screen_capture
actual_duration = time.perf_counter() - start_time
logger.info(f"Captured {frame_count} frames in {actual_duration:.2f}s")
# Use the last captured frame for preview
if last_frame is None:
raise RuntimeError("No frames captured during test")
# Convert numpy array to PIL Image
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
else:
raise ValueError("Unexpected image format from engine")
# Create thumbnail (640px wide, maintain aspect ratio)
thumbnail_width = 640
aspect_ratio = pil_image.height / pil_image.width
thumbnail_height = int(thumbnail_width * aspect_ratio)
thumbnail = pil_image.copy()
thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS)
# Encode thumbnail as JPEG
img_buffer = io.BytesIO()
thumbnail.save(img_buffer, format='JPEG', quality=85)
img_buffer.seek(0)
thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}"
# Encode full-resolution image as JPEG
full_buffer = io.BytesIO()
pil_image.save(full_buffer, format='JPEG', quality=90)
full_buffer.seek(0)
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
full_data_uri = f"data:image/jpeg;base64,{full_b64}"
# Calculate metrics
actual_fps = frame_count / actual_duration if actual_duration > 0 else 0
avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0
width, height = pil_image.size
return TemplateTestResponse(
full_capture=CaptureImage(
image=thumbnail_data_uri,
full_image=full_data_uri,
width=width,
height=height,
thumbnail_width=thumbnail_width,
thumbnail_height=thumbnail_height,
),
border_extraction=None,
performance=PerformanceMetrics(
capture_duration_s=actual_duration,
frame_count=frame_count,
actual_fps=actual_fps,
avg_capture_time_ms=avg_capture_time_ms,
),
)
except EntityNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
raise HTTPException(status_code=500, detail=f"Engine error: {str(e)}")
except Exception as e:
logger.error(f"Failed to test template: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
finally:
if stream:
try:
stream.cleanup()
except Exception as e:
logger.error(f"Error cleaning up test stream: {e}")
# ===== REAL-TIME CAPTURE TEMPLATE TEST WEBSOCKET =====
@router.websocket("/api/v1/capture-templates/test/ws")
async def test_template_ws(
websocket: WebSocket,
token: str = Query(""),
):
"""WebSocket for capture template test with intermediate frame previews.
Config is sent as the first client message (JSON with engine_type,
engine_config, display_index, capture_duration).
"""
from wled_controller.api.routes._preview_helpers import (
authenticate_ws_token,
stream_capture_test,
)
if not authenticate_ws_token(token):
await websocket.close(code=4001, reason="Unauthorized")
return
await websocket.accept()
# Read config from first client message
try:
config = await websocket.receive_json()
except Exception as e:
await websocket.send_json({"type": "error", "detail": f"Expected JSON config: {e}"})
await websocket.close(code=4003)
return
engine_type = config.get("engine_type", "")
engine_config = config.get("engine_config", {})
display_index = config.get("display_index", 0)
duration = float(config.get("capture_duration", 5.0))
pw = int(config.get("preview_width", 0)) or None
if engine_type not in EngineRegistry.get_available_engines():
await websocket.send_json({"type": "error", "detail": f"Engine '{engine_type}' not available"})
await websocket.close(code=4003)
return
# Engine factory — creates + initializes engine inside the capture thread
# to avoid thread-affinity issues (e.g. MSS uses thread-local state)
def engine_factory():
s = EngineRegistry.create_stream(engine_type, display_index, engine_config)
s.initialize()
return s
logger.info(f"Capture template test WS connected ({engine_type}, display {display_index}, {duration}s)")
try:
await stream_capture_test(websocket, engine_factory, duration, preview_width=pw)
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"Capture template test WS error: {e}")
finally:
logger.info("Capture template test WS disconnected")
# ===== FILTER TYPE ENDPOINTS =====
@router.get("/api/v1/filters", response_model=FilterTypeListResponse, tags=["Filters"])
async def list_filter_types(
_auth: AuthRequired,
pp_store=Depends(get_pp_template_store),
):
"""List all available postprocessing filter types and their options schemas."""
all_filters = FilterRegistry.get_all()
# Pre-build template choices for the filter_template filter
template_choices = None
if pp_store:
try:
templates = pp_store.get_all_templates()
template_choices = [{"value": t.id, "label": t.name} for t in templates]
except Exception:
template_choices = []
responses = []
for filter_id, filter_cls in all_filters.items():
schema = filter_cls.get_options_schema()
opt_schemas = []
for opt in schema:
choices = opt.choices
# Enrich filter_template choices with current template list
if filter_id == "filter_template" and opt.key == "template_id" and template_choices is not None:
choices = template_choices
opt_schemas.append(FilterOptionDefSchema(
key=opt.key,
label=opt.label,
type=opt.option_type,
default=opt.default,
min_value=opt.min_value,
max_value=opt.max_value,
step=opt.step,
choices=choices,
))
responses.append(FilterTypeResponse(
filter_id=filter_cls.filter_id,
filter_name=filter_cls.filter_name,
options_schema=opt_schemas,
))
return FilterTypeListResponse(filters=responses, count=len(responses))
@router.get("/api/v1/strip-filters", response_model=FilterTypeListResponse, tags=["Filters"])
async def list_strip_filter_types(
_auth: AuthRequired,
cspt_store=Depends(get_cspt_store),
):
"""List filter types that support 1D LED strip processing."""
all_filters = FilterRegistry.get_all()
# Pre-build template choices for the css_filter_template filter
cspt_choices = None
if cspt_store:
try:
templates = cspt_store.get_all_templates()
cspt_choices = [{"value": t.id, "label": t.name} for t in templates]
except Exception:
cspt_choices = []
responses = []
for filter_id, filter_cls in all_filters.items():
if not getattr(filter_cls, "supports_strip", True):
continue
schema = filter_cls.get_options_schema()
opt_schemas = []
for opt in schema:
choices = opt.choices
if filter_id == "css_filter_template" and opt.key == "template_id" and cspt_choices is not None:
choices = cspt_choices
opt_schemas.append(FilterOptionDefSchema(
key=opt.key,
label=opt.label,
type=opt.option_type,
default=opt.default,
min_value=opt.min_value,
max_value=opt.max_value,
step=opt.step,
choices=choices,
))
responses.append(FilterTypeResponse(
filter_id=filter_cls.filter_id,
filter_name=filter_cls.filter_name,
options_schema=opt_schemas,
))
return FilterTypeListResponse(filters=responses, count=len(responses))