Refactor capture engine architecture, rename PictureStream to PictureSource, and split API modules

- Separate CaptureEngine into stateless factory + stateful CaptureStream session
- Add LiveStream/LiveStreamManager for shared capture with reference counting
- Rename PictureStream to PictureSource across storage, API, and UI
- Remove legacy migration logic and unused compatibility code
- Split monolithic routes.py (1935 lines) into 5 focused route modules
- Split schemas.py (480 lines) into 7 schema modules with re-exports
- Extract dependency injection into dedicated dependencies.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-12 14:27:00 +03:00
parent b8389f080a
commit c3828e10fa
42 changed files with 4047 additions and 3797 deletions

View File

@@ -1,5 +1,18 @@
"""API routes and schemas."""
from .routes import router
from fastapi import APIRouter
from .routes.system import router as system_router
from .routes.devices import router as devices_router
from .routes.templates import router as templates_router
from .routes.postprocessing import router as postprocessing_router
from .routes.picture_sources import router as picture_sources_router
router = APIRouter()
router.include_router(system_router)
router.include_router(devices_router)
router.include_router(templates_router)
router.include_router(postprocessing_router)
router.include_router(picture_sources_router)
__all__ = ["router"]

View File

@@ -0,0 +1,65 @@
"""Dependency injection for API routes."""
from wled_controller.core.processor_manager import ProcessorManager
from wled_controller.storage import DeviceStore
from wled_controller.storage.template_store import TemplateStore
from wled_controller.storage.postprocessing_template_store import PostprocessingTemplateStore
from wled_controller.storage.picture_source_store import PictureSourceStore
# Global instances (initialized in main.py)
_device_store: DeviceStore | None = None
_template_store: TemplateStore | None = None
_pp_template_store: PostprocessingTemplateStore | None = None
_picture_source_store: PictureSourceStore | None = None
_processor_manager: ProcessorManager | None = None
def get_device_store() -> DeviceStore:
"""Get device store dependency."""
if _device_store is None:
raise RuntimeError("Device store not initialized")
return _device_store
def get_template_store() -> TemplateStore:
"""Get template store dependency."""
if _template_store is None:
raise RuntimeError("Template store not initialized")
return _template_store
def get_pp_template_store() -> PostprocessingTemplateStore:
"""Get postprocessing template store dependency."""
if _pp_template_store is None:
raise RuntimeError("Postprocessing template store not initialized")
return _pp_template_store
def get_picture_source_store() -> PictureSourceStore:
"""Get picture source store dependency."""
if _picture_source_store is None:
raise RuntimeError("Picture source store not initialized")
return _picture_source_store
def get_processor_manager() -> ProcessorManager:
"""Get processor manager dependency."""
if _processor_manager is None:
raise RuntimeError("Processor manager not initialized")
return _processor_manager
def init_dependencies(
device_store: DeviceStore,
template_store: TemplateStore,
processor_manager: ProcessorManager,
pp_template_store: PostprocessingTemplateStore | None = None,
picture_source_store: PictureSourceStore | None = None,
):
"""Initialize global dependencies."""
global _device_store, _template_store, _processor_manager, _pp_template_store, _picture_source_store
_device_store = device_store
_template_store = template_store
_processor_manager = processor_manager
_pp_template_store = pp_template_store
_picture_source_store = picture_source_store

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1 @@
"""API route modules."""

View File

@@ -0,0 +1,663 @@
"""Device routes: CRUD, processing control, settings, brightness, calibration, metrics."""
from datetime import datetime
import httpx
from fastapi import APIRouter, HTTPException, Depends
from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
get_device_store,
get_processor_manager,
)
from wled_controller.api.schemas.devices import (
Calibration as CalibrationSchema,
CalibrationTestModeRequest,
CalibrationTestModeResponse,
DeviceCreate,
DeviceListResponse,
DeviceResponse,
DeviceUpdate,
MetricsResponse,
ProcessingSettings as ProcessingSettingsSchema,
ProcessingState,
)
from wled_controller.core.calibration import (
calibration_from_dict,
calibration_to_dict,
)
from wled_controller.core.processor_manager import ProcessorManager, ProcessingSettings
from wled_controller.storage import DeviceStore
from wled_controller.utils import get_logger
logger = get_logger(__name__)
router = APIRouter()
# ===== DEVICE MANAGEMENT ENDPOINTS =====
@router.post("/api/v1/devices", response_model=DeviceResponse, tags=["Devices"], status_code=201)
async def create_device(
device_data: DeviceCreate,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Create and attach a new WLED device."""
try:
logger.info(f"Creating device: {device_data.name}")
# Validate WLED device is reachable before adding
device_url = device_data.url.rstrip("/")
try:
async with httpx.AsyncClient(timeout=5) as client:
response = await client.get(f"{device_url}/json/info")
response.raise_for_status()
wled_info = response.json()
wled_led_count = wled_info.get("leds", {}).get("count")
if not wled_led_count or wled_led_count < 1:
raise HTTPException(
status_code=422,
detail=f"WLED device at {device_url} reported invalid LED count: {wled_led_count}"
)
logger.info(
f"WLED device reachable: {wled_info.get('name', 'Unknown')} "
f"v{wled_info.get('ver', '?')} ({wled_led_count} LEDs)"
)
except httpx.ConnectError:
raise HTTPException(
status_code=422,
detail=f"Cannot reach WLED device at {device_url}. Check the URL and ensure the device is powered on."
)
except httpx.TimeoutException:
raise HTTPException(
status_code=422,
detail=f"Connection to {device_url} timed out. Check network connectivity."
)
except Exception as e:
raise HTTPException(
status_code=422,
detail=f"Failed to connect to WLED device at {device_url}: {e}"
)
# Create device in storage (LED count auto-detected from WLED)
device = store.create_device(
name=device_data.name,
url=device_data.url,
led_count=wled_led_count,
)
# Add to processor manager
manager.add_device(
device_id=device.id,
device_url=device.url,
led_count=device.led_count,
settings=device.settings,
calibration=device.calibration,
)
return DeviceResponse(
id=device.id,
name=device.name,
url=device.url,
led_count=device.led_count,
enabled=device.enabled,
status="disconnected",
settings=ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
smoothing=device.settings.smoothing,
state_check_interval=device.settings.state_check_interval,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
picture_source_id=device.picture_source_id,
created_at=device.created_at,
updated_at=device.updated_at,
)
except Exception as e:
logger.error(f"Failed to create device: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/devices", response_model=DeviceListResponse, tags=["Devices"])
async def list_devices(
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
):
"""List all attached WLED devices."""
try:
devices = store.get_all_devices()
device_responses = [
DeviceResponse(
id=device.id,
name=device.name,
url=device.url,
led_count=device.led_count,
enabled=device.enabled,
status="disconnected",
settings=ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
smoothing=device.settings.smoothing,
state_check_interval=device.settings.state_check_interval,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
picture_source_id=device.picture_source_id,
created_at=device.created_at,
updated_at=device.updated_at,
)
for device in devices
]
return DeviceListResponse(devices=device_responses, count=len(device_responses))
except Exception as e:
logger.error(f"Failed to list devices: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/devices/{device_id}", response_model=DeviceResponse, tags=["Devices"])
async def get_device(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Get device details by ID."""
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
# Determine status
status = "connected" if manager.is_processing(device_id) else "disconnected"
return DeviceResponse(
id=device.id,
name=device.name,
url=device.url,
led_count=device.led_count,
enabled=device.enabled,
status=status,
settings=ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
smoothing=device.settings.smoothing,
state_check_interval=device.settings.state_check_interval,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
picture_source_id=device.picture_source_id,
created_at=device.created_at,
updated_at=device.updated_at,
)
@router.put("/api/v1/devices/{device_id}", response_model=DeviceResponse, tags=["Devices"])
async def update_device(
device_id: str,
update_data: DeviceUpdate,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Update device information."""
try:
# Check if stream changed and device is processing (for hot-swap)
old_device = store.get_device(device_id)
stream_changed = (
update_data.picture_source_id is not None
and update_data.picture_source_id != old_device.picture_source_id
)
was_processing = manager.is_processing(device_id)
# Update device
device = store.update_device(
device_id=device_id,
name=update_data.name,
url=update_data.url,
enabled=update_data.enabled,
picture_source_id=update_data.picture_source_id,
)
# Sync processor state when stream changed
if stream_changed:
if was_processing:
# Hot-swap: restart with new settings
logger.info(f"Hot-swapping stream for device {device_id}")
try:
await manager.stop_processing(device_id)
manager.remove_device(device_id)
manager.add_device(
device_id=device.id,
device_url=device.url,
led_count=device.led_count,
settings=device.settings,
calibration=device.calibration,
picture_source_id=device.picture_source_id,
)
await manager.start_processing(device_id)
logger.info(f"Successfully hot-swapped stream for device {device_id}")
except Exception as e:
logger.error(f"Error during stream hot-swap: {e}")
else:
# Not processing -- update processor state so next start uses new values
manager.remove_device(device_id)
manager.add_device(
device_id=device.id,
device_url=device.url,
led_count=device.led_count,
settings=device.settings,
calibration=device.calibration,
picture_source_id=device.picture_source_id,
)
return DeviceResponse(
id=device.id,
name=device.name,
url=device.url,
led_count=device.led_count,
enabled=device.enabled,
status="disconnected",
settings=ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
smoothing=device.settings.smoothing,
state_check_interval=device.settings.state_check_interval,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
picture_source_id=device.picture_source_id,
created_at=device.created_at,
updated_at=device.updated_at,
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to update device: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/devices/{device_id}", status_code=204, tags=["Devices"])
async def delete_device(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Delete/detach a device."""
try:
# Stop processing if running
if manager.is_processing(device_id):
await manager.stop_processing(device_id)
# Remove from manager
manager.remove_device(device_id)
# Delete from storage
store.delete_device(device_id)
logger.info(f"Deleted device {device_id}")
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to delete device: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== PROCESSING CONTROL ENDPOINTS =====
@router.post("/api/v1/devices/{device_id}/start", tags=["Processing"])
async def start_processing(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Start screen processing for a device."""
try:
# Verify device exists
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
await manager.start_processing(device_id)
logger.info(f"Started processing for device {device_id}")
return {"status": "started", "device_id": device_id}
except RuntimeError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to start processing: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/devices/{device_id}/stop", tags=["Processing"])
async def stop_processing(
device_id: str,
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Stop screen processing for a device."""
try:
await manager.stop_processing(device_id)
logger.info(f"Stopped processing for device {device_id}")
return {"status": "stopped", "device_id": device_id}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to stop processing: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/devices/{device_id}/state", response_model=ProcessingState, tags=["Processing"])
async def get_processing_state(
device_id: str,
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Get current processing state for a device."""
try:
state = manager.get_state(device_id)
return ProcessingState(**state)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to get state: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== SETTINGS ENDPOINTS =====
@router.get("/api/v1/devices/{device_id}/settings", response_model=ProcessingSettingsSchema, tags=["Settings"])
async def get_settings(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
):
"""Get processing settings for a device."""
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
return ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
smoothing=device.settings.smoothing,
state_check_interval=device.settings.state_check_interval,
)
@router.put("/api/v1/devices/{device_id}/settings", response_model=ProcessingSettingsSchema, tags=["Settings"])
async def update_settings(
device_id: str,
settings: ProcessingSettingsSchema,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Update processing settings for a device.
Merges with existing settings so callers can send partial updates.
Only fields explicitly included in the request body are applied.
"""
try:
# Get existing device to merge settings
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
existing = device.settings
sent = settings.model_fields_set # fields the client actually sent
# Merge: only override fields the client explicitly provided
new_settings = ProcessingSettings(
display_index=settings.display_index if 'display_index' in sent else existing.display_index,
fps=settings.fps if 'fps' in sent else existing.fps,
border_width=settings.border_width if 'border_width' in sent else existing.border_width,
interpolation_mode=settings.interpolation_mode if 'interpolation_mode' in sent else existing.interpolation_mode,
brightness=settings.brightness if 'brightness' in sent else existing.brightness,
gamma=existing.gamma,
saturation=existing.saturation,
smoothing=settings.smoothing if 'smoothing' in sent else existing.smoothing,
state_check_interval=settings.state_check_interval if 'state_check_interval' in sent else existing.state_check_interval,
)
# Apply color_correction fields if explicitly sent
if 'color_correction' in sent and settings.color_correction:
cc_sent = settings.color_correction.model_fields_set
if 'brightness' in cc_sent:
new_settings.brightness = settings.color_correction.brightness
if 'gamma' in cc_sent:
new_settings.gamma = settings.color_correction.gamma
if 'saturation' in cc_sent:
new_settings.saturation = settings.color_correction.saturation
# Update in storage
device = store.update_device(device_id, settings=new_settings)
# Update in manager if device exists
try:
manager.update_settings(device_id, new_settings)
except ValueError:
# Device not in manager yet, that's ok
pass
return ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
smoothing=device.settings.smoothing,
state_check_interval=device.settings.state_check_interval,
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to update settings: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== WLED BRIGHTNESS ENDPOINT =====
@router.get("/api/v1/devices/{device_id}/brightness", tags=["Settings"])
async def get_device_brightness(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
):
"""Get current brightness from the WLED device."""
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
try:
async with httpx.AsyncClient(timeout=5.0) as http_client:
resp = await http_client.get(f"{device.url}/json/state")
resp.raise_for_status()
state = resp.json()
bri = state.get("bri", 255)
return {"brightness": bri}
except Exception as e:
logger.error(f"Failed to get WLED brightness for {device_id}: {e}")
raise HTTPException(status_code=502, detail=f"Failed to reach WLED device: {e}")
@router.put("/api/v1/devices/{device_id}/brightness", tags=["Settings"])
async def set_device_brightness(
device_id: str,
body: dict,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
):
"""Set brightness on the WLED device directly."""
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
bri = body.get("brightness")
if bri is None or not isinstance(bri, int) or not 0 <= bri <= 255:
raise HTTPException(status_code=400, detail="brightness must be an integer 0-255")
try:
async with httpx.AsyncClient(timeout=5.0) as http_client:
resp = await http_client.post(
f"{device.url}/json/state",
json={"bri": bri},
)
resp.raise_for_status()
return {"brightness": bri}
except Exception as e:
logger.error(f"Failed to set WLED brightness for {device_id}: {e}")
raise HTTPException(status_code=502, detail=f"Failed to reach WLED device: {e}")
# ===== CALIBRATION ENDPOINTS =====
@router.get("/api/v1/devices/{device_id}/calibration", response_model=CalibrationSchema, tags=["Calibration"])
async def get_calibration(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
):
"""Get calibration configuration for a device."""
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
return CalibrationSchema(**calibration_to_dict(device.calibration))
@router.put("/api/v1/devices/{device_id}/calibration", response_model=CalibrationSchema, tags=["Calibration"])
async def update_calibration(
device_id: str,
calibration_data: CalibrationSchema,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Update calibration configuration for a device."""
try:
# Convert schema to CalibrationConfig
calibration_dict = calibration_data.model_dump()
calibration = calibration_from_dict(calibration_dict)
# Update in storage
device = store.update_device(device_id, calibration=calibration)
# Update in manager if device exists
try:
manager.update_calibration(device_id, calibration)
except ValueError:
# Device not in manager yet, that's ok
pass
return CalibrationSchema(**calibration_to_dict(device.calibration))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to update calibration: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.put(
"/api/v1/devices/{device_id}/calibration/test",
response_model=CalibrationTestModeResponse,
tags=["Calibration"],
)
async def set_calibration_test_mode(
device_id: str,
body: CalibrationTestModeRequest,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Toggle calibration test mode for specific edges.
Send edges with colors to light them up, or empty edges dict to exit test mode.
While test mode is active, screen capture processing is paused.
"""
try:
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
# Validate edge names and colors
valid_edges = {"top", "right", "bottom", "left"}
for edge_name, color in body.edges.items():
if edge_name not in valid_edges:
raise HTTPException(
status_code=400,
detail=f"Invalid edge '{edge_name}'. Must be one of: {', '.join(valid_edges)}"
)
if len(color) != 3 or not all(0 <= c <= 255 for c in color):
raise HTTPException(
status_code=400,
detail=f"Invalid color for edge '{edge_name}'. Must be [R, G, B] with values 0-255."
)
await manager.set_test_mode(device_id, body.edges)
active_edges = list(body.edges.keys())
logger.info(
f"Test mode {'activated' if active_edges else 'deactivated'} "
f"for device {device_id}: {active_edges}"
)
return CalibrationTestModeResponse(
test_mode=len(active_edges) > 0,
active_edges=active_edges,
device_id=device_id,
)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to set test mode: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== METRICS ENDPOINTS =====
@router.get("/api/v1/devices/{device_id}/metrics", response_model=MetricsResponse, tags=["Metrics"])
async def get_metrics(
device_id: str,
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Get processing metrics for a device."""
try:
metrics = manager.get_metrics(device_id)
return MetricsResponse(**metrics)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to get metrics: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -0,0 +1,475 @@
"""Picture source routes."""
import base64
import io
import time
import httpx
import numpy as np
from PIL import Image
from fastapi import APIRouter, HTTPException, Depends, Query
from fastapi.responses import Response
from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
get_device_store,
get_picture_source_store,
get_pp_template_store,
get_processor_manager,
get_template_store,
)
from wled_controller.api.schemas.common import (
CaptureImage,
PerformanceMetrics,
TemplateTestResponse,
)
from wled_controller.api.schemas.picture_sources import (
ImageValidateRequest,
ImageValidateResponse,
PictureSourceCreate,
PictureSourceListResponse,
PictureSourceResponse,
PictureSourceTestRequest,
PictureSourceUpdate,
)
from wled_controller.core.capture_engines import EngineRegistry
from wled_controller.core.filters import FilterRegistry, ImagePool
from wled_controller.core.processor_manager import ProcessorManager
from wled_controller.storage import DeviceStore
from wled_controller.storage.template_store import TemplateStore
from wled_controller.storage.postprocessing_template_store import PostprocessingTemplateStore
from wled_controller.storage.picture_source_store import PictureSourceStore
from wled_controller.storage.picture_source import ScreenCapturePictureSource, StaticImagePictureSource
from wled_controller.utils import get_logger
logger = get_logger(__name__)
router = APIRouter()
def _stream_to_response(s) -> PictureSourceResponse:
"""Convert a PictureSource to its API response."""
return PictureSourceResponse(
id=s.id,
name=s.name,
stream_type=s.stream_type,
display_index=getattr(s, "display_index", None),
capture_template_id=getattr(s, "capture_template_id", None),
target_fps=getattr(s, "target_fps", None),
source_stream_id=getattr(s, "source_stream_id", None),
postprocessing_template_id=getattr(s, "postprocessing_template_id", None),
image_source=getattr(s, "image_source", None),
created_at=s.created_at,
updated_at=s.updated_at,
description=s.description,
)
@router.get("/api/v1/picture-sources", response_model=PictureSourceListResponse, tags=["Picture Sources"])
async def list_picture_sources(
_auth: AuthRequired,
store: PictureSourceStore = Depends(get_picture_source_store),
):
"""List all picture sources."""
try:
streams = store.get_all_streams()
responses = [_stream_to_response(s) for s in streams]
return PictureSourceListResponse(streams=responses, count=len(responses))
except Exception as e:
logger.error(f"Failed to list picture sources: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/picture-sources/validate-image", response_model=ImageValidateResponse, tags=["Picture Sources"])
async def validate_image(
data: ImageValidateRequest,
_auth: AuthRequired,
):
"""Validate an image source (URL or file path) and return a preview thumbnail."""
try:
from pathlib import Path
source = data.image_source.strip()
if not source:
return ImageValidateResponse(valid=False, error="Image source is empty")
if source.startswith(("http://", "https://")):
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
response = await client.get(source)
response.raise_for_status()
pil_image = Image.open(io.BytesIO(response.content))
else:
path = Path(source)
if not path.exists():
return ImageValidateResponse(valid=False, error=f"File not found: {source}")
pil_image = Image.open(path)
pil_image = pil_image.convert("RGB")
width, height = pil_image.size
# Create thumbnail preview (max 320px wide)
thumb = pil_image.copy()
thumb.thumbnail((320, 320), Image.Resampling.LANCZOS)
buf = io.BytesIO()
thumb.save(buf, format="JPEG", quality=80)
buf.seek(0)
preview = f"data:image/jpeg;base64,{base64.b64encode(buf.getvalue()).decode()}"
return ImageValidateResponse(
valid=True, width=width, height=height, preview=preview
)
except httpx.HTTPStatusError as e:
return ImageValidateResponse(valid=False, error=f"HTTP {e.response.status_code}: {e.response.reason_phrase}")
except httpx.RequestError as e:
return ImageValidateResponse(valid=False, error=f"Request failed: {e}")
except Exception as e:
return ImageValidateResponse(valid=False, error=str(e))
@router.get("/api/v1/picture-sources/full-image", tags=["Picture Sources"])
async def get_full_image(
_auth: AuthRequired,
source: str = Query(..., description="Image URL or local file path"),
):
"""Serve the full-resolution image for lightbox preview."""
from pathlib import Path
try:
if source.startswith(("http://", "https://")):
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
response = await client.get(source)
response.raise_for_status()
pil_image = Image.open(io.BytesIO(response.content))
else:
path = Path(source)
if not path.exists():
raise HTTPException(status_code=404, detail="File not found")
pil_image = Image.open(path)
pil_image = pil_image.convert("RGB")
buf = io.BytesIO()
pil_image.save(buf, format="JPEG", quality=90)
buf.seek(0)
return Response(content=buf.getvalue(), media_type="image/jpeg")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/api/v1/picture-sources", response_model=PictureSourceResponse, tags=["Picture Sources"], status_code=201)
async def create_picture_source(
data: PictureSourceCreate,
_auth: AuthRequired,
store: PictureSourceStore = Depends(get_picture_source_store),
template_store: TemplateStore = Depends(get_template_store),
pp_store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""Create a new picture source."""
try:
# Validate referenced entities
if data.stream_type == "raw" and data.capture_template_id:
try:
template_store.get_template(data.capture_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Capture template not found: {data.capture_template_id}",
)
if data.stream_type == "processed" and data.postprocessing_template_id:
try:
pp_store.get_template(data.postprocessing_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Postprocessing template not found: {data.postprocessing_template_id}",
)
stream = store.create_stream(
name=data.name,
stream_type=data.stream_type,
display_index=data.display_index,
capture_template_id=data.capture_template_id,
target_fps=data.target_fps,
source_stream_id=data.source_stream_id,
postprocessing_template_id=data.postprocessing_template_id,
image_source=data.image_source,
description=data.description,
)
return _stream_to_response(stream)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to create picture source: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/picture-sources/{stream_id}", response_model=PictureSourceResponse, tags=["Picture Sources"])
async def get_picture_source(
stream_id: str,
_auth: AuthRequired,
store: PictureSourceStore = Depends(get_picture_source_store),
):
"""Get picture source by ID."""
try:
stream = store.get_stream(stream_id)
return _stream_to_response(stream)
except ValueError:
raise HTTPException(status_code=404, detail=f"Picture source {stream_id} not found")
@router.put("/api/v1/picture-sources/{stream_id}", response_model=PictureSourceResponse, tags=["Picture Sources"])
async def update_picture_source(
stream_id: str,
data: PictureSourceUpdate,
_auth: AuthRequired,
store: PictureSourceStore = Depends(get_picture_source_store),
):
"""Update a picture source."""
try:
stream = store.update_stream(
stream_id=stream_id,
name=data.name,
display_index=data.display_index,
capture_template_id=data.capture_template_id,
target_fps=data.target_fps,
source_stream_id=data.source_stream_id,
postprocessing_template_id=data.postprocessing_template_id,
image_source=data.image_source,
description=data.description,
)
return _stream_to_response(stream)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to update picture source: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/picture-sources/{stream_id}", status_code=204, tags=["Picture Sources"])
async def delete_picture_source(
stream_id: str,
_auth: AuthRequired,
store: PictureSourceStore = Depends(get_picture_source_store),
device_store: DeviceStore = Depends(get_device_store),
):
"""Delete a picture source."""
try:
# Check if any device references this stream
if store.is_referenced_by_device(stream_id, device_store):
raise HTTPException(
status_code=409,
detail="Cannot delete picture source: it is assigned to one or more devices. "
"Please reassign those devices before deleting.",
)
store.delete_stream(stream_id)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to delete picture source: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/picture-sources/{stream_id}/test", response_model=TemplateTestResponse, tags=["Picture Sources"])
async def test_picture_source(
stream_id: str,
test_request: PictureSourceTestRequest,
_auth: AuthRequired,
store: PictureSourceStore = Depends(get_picture_source_store),
template_store: TemplateStore = Depends(get_template_store),
processor_manager: ProcessorManager = Depends(get_processor_manager),
device_store: DeviceStore = Depends(get_device_store),
pp_store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""Test a picture source by resolving its chain and running a capture test.
Resolves the stream chain to the raw stream, captures frames,
and returns preview image + performance metrics.
For processed streams, applies postprocessing (gamma, saturation, brightness)
to the preview image.
"""
stream = None
try:
# Resolve stream chain
try:
chain = store.resolve_stream_chain(stream_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
raw_stream = chain["raw_stream"]
if isinstance(raw_stream, StaticImagePictureSource):
# Static image stream: load image directly, no engine needed
from pathlib import Path
source = raw_stream.image_source
start_time = time.perf_counter()
if source.startswith(("http://", "https://")):
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.get(source)
resp.raise_for_status()
pil_image = Image.open(io.BytesIO(resp.content)).convert("RGB")
else:
path = Path(source)
if not path.exists():
raise HTTPException(status_code=400, detail=f"Image file not found: {source}")
pil_image = Image.open(path).convert("RGB")
actual_duration = time.perf_counter() - start_time
frame_count = 1
total_capture_time = actual_duration
elif isinstance(raw_stream, ScreenCapturePictureSource):
# Screen capture stream: use engine
try:
capture_template = template_store.get_template(raw_stream.capture_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Capture template not found: {raw_stream.capture_template_id}",
)
display_index = raw_stream.display_index
if capture_template.engine_type not in EngineRegistry.get_available_engines():
raise HTTPException(
status_code=400,
detail=f"Engine '{capture_template.engine_type}' is not available on this system",
)
locked_device_id = processor_manager.get_display_lock_info(display_index)
if locked_device_id:
try:
device = device_store.get_device(locked_device_id)
device_name = device.name
except Exception:
device_name = locked_device_id
raise HTTPException(
status_code=409,
detail=f"Display {display_index} is currently being captured by device '{device_name}'. "
f"Please stop the device processing before testing.",
)
stream = EngineRegistry.create_stream(
capture_template.engine_type, display_index, capture_template.engine_config
)
stream.initialize()
logger.info(f"Starting {test_request.capture_duration}s stream test for {stream_id}")
frame_count = 0
total_capture_time = 0.0
last_frame = None
start_time = time.perf_counter()
end_time = start_time + test_request.capture_duration
while time.perf_counter() < end_time:
capture_start = time.perf_counter()
screen_capture = stream.capture_frame()
capture_elapsed = time.perf_counter() - capture_start
if screen_capture is None:
continue
total_capture_time += capture_elapsed
frame_count += 1
last_frame = screen_capture
actual_duration = time.perf_counter() - start_time
if last_frame is None:
raise RuntimeError("No frames captured during test")
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
else:
raise ValueError("Unexpected image format from engine")
# Create thumbnail
thumbnail_width = 640
aspect_ratio = pil_image.height / pil_image.width
thumbnail_height = int(thumbnail_width * aspect_ratio)
thumbnail = pil_image.copy()
thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS)
# Apply postprocessing filters if this is a processed stream
pp_template_ids = chain["postprocessing_template_ids"]
if pp_template_ids:
try:
pp_template = pp_store.get_template(pp_template_ids[0])
pool = ImagePool()
def apply_filters(img):
arr = np.array(img)
for fi in pp_template.filters:
f = FilterRegistry.create_instance(fi.filter_id, fi.options)
result = f.process_image(arr, pool)
if result is not None:
arr = result
return Image.fromarray(arr)
thumbnail = apply_filters(thumbnail)
pil_image = apply_filters(pil_image)
except ValueError:
logger.warning(f"PP template {pp_template_ids[0]} not found, skipping postprocessing preview")
# Encode thumbnail
img_buffer = io.BytesIO()
thumbnail.save(img_buffer, format='JPEG', quality=85)
img_buffer.seek(0)
thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}"
# Encode full-resolution image
full_buffer = io.BytesIO()
pil_image.save(full_buffer, format='JPEG', quality=90)
full_buffer.seek(0)
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
full_data_uri = f"data:image/jpeg;base64,{full_b64}"
actual_fps = frame_count / actual_duration if actual_duration > 0 else 0
avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0
width, height = pil_image.size
return TemplateTestResponse(
full_capture=CaptureImage(
image=thumbnail_data_uri,
full_image=full_data_uri,
width=width,
height=height,
thumbnail_width=thumbnail_width,
thumbnail_height=thumbnail_height,
),
border_extraction=None,
performance=PerformanceMetrics(
capture_duration_s=actual_duration,
frame_count=frame_count,
actual_fps=actual_fps,
avg_capture_time_ms=avg_capture_time_ms,
),
)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
raise HTTPException(status_code=500, detail=f"Engine error: {str(e)}")
except Exception as e:
logger.error(f"Failed to test picture source: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
finally:
if stream:
try:
stream.cleanup()
except Exception as e:
logger.error(f"Error cleaning up test stream: {e}")

View File

@@ -0,0 +1,348 @@
"""Postprocessing template routes."""
import base64
import io
import time
import httpx
import numpy as np
from PIL import Image
from fastapi import APIRouter, HTTPException, Depends
from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
get_device_store,
get_picture_source_store,
get_pp_template_store,
get_processor_manager,
get_template_store,
)
from wled_controller.api.schemas.common import (
CaptureImage,
PerformanceMetrics,
TemplateTestResponse,
)
from wled_controller.api.schemas.filters import FilterInstanceSchema
from wled_controller.api.schemas.postprocessing import (
PostprocessingTemplateCreate,
PostprocessingTemplateListResponse,
PostprocessingTemplateResponse,
PostprocessingTemplateUpdate,
PPTemplateTestRequest,
)
from wled_controller.core.capture_engines import EngineRegistry
from wled_controller.core.filters import FilterRegistry, FilterInstance, ImagePool
from wled_controller.core.processor_manager import ProcessorManager
from wled_controller.storage import DeviceStore
from wled_controller.storage.template_store import TemplateStore
from wled_controller.storage.postprocessing_template_store import PostprocessingTemplateStore
from wled_controller.storage.picture_source_store import PictureSourceStore
from wled_controller.storage.picture_source import ScreenCapturePictureSource, StaticImagePictureSource
from wled_controller.utils import get_logger
logger = get_logger(__name__)
router = APIRouter()
def _pp_template_to_response(t) -> PostprocessingTemplateResponse:
"""Convert a PostprocessingTemplate to its API response."""
return PostprocessingTemplateResponse(
id=t.id,
name=t.name,
filters=[FilterInstanceSchema(filter_id=f.filter_id, options=f.options) for f in t.filters],
created_at=t.created_at,
updated_at=t.updated_at,
description=t.description,
)
@router.get("/api/v1/postprocessing-templates", response_model=PostprocessingTemplateListResponse, tags=["Postprocessing Templates"])
async def list_pp_templates(
_auth: AuthRequired,
store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""List all postprocessing templates."""
try:
templates = store.get_all_templates()
responses = [_pp_template_to_response(t) for t in templates]
return PostprocessingTemplateListResponse(templates=responses, count=len(responses))
except Exception as e:
logger.error(f"Failed to list postprocessing templates: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/postprocessing-templates", response_model=PostprocessingTemplateResponse, tags=["Postprocessing Templates"], status_code=201)
async def create_pp_template(
data: PostprocessingTemplateCreate,
_auth: AuthRequired,
store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""Create a new postprocessing template."""
try:
filters = [FilterInstance(f.filter_id, f.options) for f in data.filters]
template = store.create_template(
name=data.name,
filters=filters,
description=data.description,
)
return _pp_template_to_response(template)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to create postprocessing template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/postprocessing-templates/{template_id}", response_model=PostprocessingTemplateResponse, tags=["Postprocessing Templates"])
async def get_pp_template(
template_id: str,
_auth: AuthRequired,
store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""Get postprocessing template by ID."""
try:
template = store.get_template(template_id)
return _pp_template_to_response(template)
except ValueError:
raise HTTPException(status_code=404, detail=f"Postprocessing template {template_id} not found")
@router.put("/api/v1/postprocessing-templates/{template_id}", response_model=PostprocessingTemplateResponse, tags=["Postprocessing Templates"])
async def update_pp_template(
template_id: str,
data: PostprocessingTemplateUpdate,
_auth: AuthRequired,
store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""Update a postprocessing template."""
try:
filters = [FilterInstance(f.filter_id, f.options) for f in data.filters] if data.filters is not None else None
template = store.update_template(
template_id=template_id,
name=data.name,
filters=filters,
description=data.description,
)
return _pp_template_to_response(template)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to update postprocessing template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/postprocessing-templates/{template_id}", status_code=204, tags=["Postprocessing Templates"])
async def delete_pp_template(
template_id: str,
_auth: AuthRequired,
store: PostprocessingTemplateStore = Depends(get_pp_template_store),
stream_store: PictureSourceStore = Depends(get_picture_source_store),
):
"""Delete a postprocessing template."""
try:
# Check if any picture source references this template
if store.is_referenced_by(template_id, stream_store):
raise HTTPException(
status_code=409,
detail="Cannot delete postprocessing template: it is referenced by one or more picture sources. "
"Please reassign those streams before deleting.",
)
store.delete_template(template_id)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to delete postprocessing template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/postprocessing-templates/{template_id}/test", response_model=TemplateTestResponse, tags=["Postprocessing Templates"])
async def test_pp_template(
template_id: str,
test_request: PPTemplateTestRequest,
_auth: AuthRequired,
pp_store: PostprocessingTemplateStore = Depends(get_pp_template_store),
stream_store: PictureSourceStore = Depends(get_picture_source_store),
template_store: TemplateStore = Depends(get_template_store),
processor_manager: ProcessorManager = Depends(get_processor_manager),
device_store: DeviceStore = Depends(get_device_store),
):
"""Test a postprocessing template by capturing from a source stream and applying filters."""
stream = None
try:
# Get the PP template
try:
pp_template = pp_store.get_template(template_id)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
# Resolve source stream chain to get the raw stream
try:
chain = stream_store.resolve_stream_chain(test_request.source_stream_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
raw_stream = chain["raw_stream"]
if isinstance(raw_stream, StaticImagePictureSource):
# Static image: load directly
from pathlib import Path
source = raw_stream.image_source
start_time = time.perf_counter()
if source.startswith(("http://", "https://")):
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.get(source)
resp.raise_for_status()
pil_image = Image.open(io.BytesIO(resp.content)).convert("RGB")
else:
path = Path(source)
if not path.exists():
raise HTTPException(status_code=400, detail=f"Image file not found: {source}")
pil_image = Image.open(path).convert("RGB")
actual_duration = time.perf_counter() - start_time
frame_count = 1
total_capture_time = actual_duration
elif isinstance(raw_stream, ScreenCapturePictureSource):
# Screen capture stream: use engine
try:
capture_template = template_store.get_template(raw_stream.capture_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Capture template not found: {raw_stream.capture_template_id}",
)
display_index = raw_stream.display_index
if capture_template.engine_type not in EngineRegistry.get_available_engines():
raise HTTPException(
status_code=400,
detail=f"Engine '{capture_template.engine_type}' is not available on this system",
)
locked_device_id = processor_manager.get_display_lock_info(display_index)
if locked_device_id:
try:
device = device_store.get_device(locked_device_id)
device_name = device.name
except Exception:
device_name = locked_device_id
raise HTTPException(
status_code=409,
detail=f"Display {display_index} is currently being captured by device '{device_name}'. "
f"Please stop the device processing before testing.",
)
stream = EngineRegistry.create_stream(
capture_template.engine_type, display_index, capture_template.engine_config
)
stream.initialize()
logger.info(f"Starting {test_request.capture_duration}s PP template test for {template_id} using stream {test_request.source_stream_id}")
frame_count = 0
total_capture_time = 0.0
last_frame = None
start_time = time.perf_counter()
end_time = start_time + test_request.capture_duration
while time.perf_counter() < end_time:
capture_start = time.perf_counter()
screen_capture = stream.capture_frame()
capture_elapsed = time.perf_counter() - capture_start
if screen_capture is None:
continue
total_capture_time += capture_elapsed
frame_count += 1
last_frame = screen_capture
actual_duration = time.perf_counter() - start_time
if last_frame is None:
raise RuntimeError("No frames captured during test")
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
else:
raise ValueError("Unexpected image format from engine")
# Create thumbnail
thumbnail_width = 640
aspect_ratio = pil_image.height / pil_image.width
thumbnail_height = int(thumbnail_width * aspect_ratio)
thumbnail = pil_image.copy()
thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS)
# Apply postprocessing filters
if pp_template.filters:
pool = ImagePool()
def apply_filters(img):
arr = np.array(img)
for fi in pp_template.filters:
f = FilterRegistry.create_instance(fi.filter_id, fi.options)
result = f.process_image(arr, pool)
if result is not None:
arr = result
return Image.fromarray(arr)
thumbnail = apply_filters(thumbnail)
pil_image = apply_filters(pil_image)
# Encode thumbnail
img_buffer = io.BytesIO()
thumbnail.save(img_buffer, format='JPEG', quality=85)
img_buffer.seek(0)
thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}"
# Encode full-resolution image
full_buffer = io.BytesIO()
pil_image.save(full_buffer, format='JPEG', quality=90)
full_buffer.seek(0)
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
full_data_uri = f"data:image/jpeg;base64,{full_b64}"
actual_fps = frame_count / actual_duration if actual_duration > 0 else 0
avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0
width, height = pil_image.size
return TemplateTestResponse(
full_capture=CaptureImage(
image=thumbnail_data_uri,
full_image=full_data_uri,
width=width,
height=height,
thumbnail_width=thumbnail_width,
thumbnail_height=thumbnail_height,
),
border_extraction=None,
performance=PerformanceMetrics(
capture_duration_s=actual_duration,
frame_count=frame_count,
actual_fps=actual_fps,
avg_capture_time_ms=avg_capture_time_ms,
),
)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Postprocessing template test failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
if stream:
try:
stream.cleanup()
except Exception:
pass

View File

@@ -0,0 +1,93 @@
"""System routes: health, version, displays."""
import sys
from datetime import datetime
from fastapi import APIRouter, HTTPException
from wled_controller import __version__
from wled_controller.api.auth import AuthRequired
from wled_controller.api.schemas.system import (
DisplayInfo,
DisplayListResponse,
HealthResponse,
VersionResponse,
)
from wled_controller.core.screen_capture import get_available_displays
from wled_controller.utils import get_logger
logger = get_logger(__name__)
router = APIRouter()
@router.get("/health", response_model=HealthResponse, tags=["Health"])
async def health_check():
"""Check service health status.
Returns basic health information including status, version, and timestamp.
"""
logger.info("Health check requested")
return HealthResponse(
status="healthy",
timestamp=datetime.utcnow(),
version=__version__,
)
@router.get("/api/v1/version", response_model=VersionResponse, tags=["Info"])
async def get_version():
"""Get version information.
Returns application version, Python version, and API version.
"""
logger.info("Version info requested")
return VersionResponse(
version=__version__,
python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
api_version="v1",
)
@router.get("/api/v1/config/displays", response_model=DisplayListResponse, tags=["Config"])
async def get_displays(_: AuthRequired):
"""Get list of available displays.
Returns information about all available monitors/displays that can be captured.
"""
logger.info("Listing available displays")
try:
# Get available displays with all metadata (name, refresh rate, etc.)
display_dataclasses = get_available_displays()
# Convert dataclass DisplayInfo to Pydantic DisplayInfo
displays = [
DisplayInfo(
index=d.index,
name=d.name,
width=d.width,
height=d.height,
x=d.x,
y=d.y,
is_primary=d.is_primary,
refresh_rate=d.refresh_rate,
)
for d in display_dataclasses
]
logger.info(f"Found {len(displays)} displays")
return DisplayListResponse(
displays=displays,
count=len(displays),
)
except Exception as e:
logger.error(f"Failed to get displays: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to retrieve display information: {str(e)}"
)

View File

@@ -0,0 +1,410 @@
"""Capture template, engine, and filter routes."""
import base64
import io
import time
import numpy as np
from PIL import Image
from fastapi import APIRouter, HTTPException, Depends
from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
get_device_store,
get_picture_source_store,
get_processor_manager,
get_template_store,
)
from wled_controller.api.schemas.common import (
CaptureImage,
PerformanceMetrics,
TemplateTestResponse,
)
from wled_controller.api.schemas.templates import (
EngineInfo,
EngineListResponse,
TemplateCreate,
TemplateListResponse,
TemplateResponse,
TemplateTestRequest,
TemplateUpdate,
)
from wled_controller.api.schemas.filters import (
FilterOptionDefSchema,
FilterTypeListResponse,
FilterTypeResponse,
)
from wled_controller.core.capture_engines import EngineRegistry
from wled_controller.core.filters import FilterRegistry
from wled_controller.core.processor_manager import ProcessorManager
from wled_controller.storage import DeviceStore
from wled_controller.storage.template_store import TemplateStore
from wled_controller.storage.picture_source_store import PictureSourceStore
from wled_controller.storage.picture_source import ScreenCapturePictureSource
from wled_controller.utils import get_logger
logger = get_logger(__name__)
router = APIRouter()
# ===== CAPTURE TEMPLATE ENDPOINTS =====
@router.get("/api/v1/capture-templates", response_model=TemplateListResponse, tags=["Templates"])
async def list_templates(
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""List all capture templates."""
try:
templates = template_store.get_all_templates()
template_responses = [
TemplateResponse(
id=t.id,
name=t.name,
engine_type=t.engine_type,
engine_config=t.engine_config,
created_at=t.created_at,
updated_at=t.updated_at,
description=t.description,
)
for t in templates
]
return TemplateListResponse(
templates=template_responses,
count=len(template_responses),
)
except Exception as e:
logger.error(f"Failed to list templates: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/capture-templates", response_model=TemplateResponse, tags=["Templates"], status_code=201)
async def create_template(
template_data: TemplateCreate,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""Create a new capture template."""
try:
template = template_store.create_template(
name=template_data.name,
engine_type=template_data.engine_type,
engine_config=template_data.engine_config,
description=template_data.description,
)
return TemplateResponse(
id=template.id,
name=template.name,
engine_type=template.engine_type,
engine_config=template.engine_config,
created_at=template.created_at,
updated_at=template.updated_at,
description=template.description,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to create template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/capture-templates/{template_id}", response_model=TemplateResponse, tags=["Templates"])
async def get_template(
template_id: str,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""Get template by ID."""
try:
template = template_store.get_template(template_id)
except ValueError:
raise HTTPException(status_code=404, detail=f"Template {template_id} not found")
return TemplateResponse(
id=template.id,
name=template.name,
engine_type=template.engine_type,
engine_config=template.engine_config,
created_at=template.created_at,
updated_at=template.updated_at,
description=template.description,
)
@router.put("/api/v1/capture-templates/{template_id}", response_model=TemplateResponse, tags=["Templates"])
async def update_template(
template_id: str,
update_data: TemplateUpdate,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""Update a template."""
try:
template = template_store.update_template(
template_id=template_id,
name=update_data.name,
engine_type=update_data.engine_type,
engine_config=update_data.engine_config,
description=update_data.description,
)
return TemplateResponse(
id=template.id,
name=template.name,
engine_type=template.engine_type,
engine_config=template.engine_config,
created_at=template.created_at,
updated_at=template.updated_at,
description=template.description,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to update template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/capture-templates/{template_id}", status_code=204, tags=["Templates"])
async def delete_template(
template_id: str,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
stream_store: PictureSourceStore = Depends(get_picture_source_store),
):
"""Delete a template.
Validates that no streams are currently using this template before deletion.
"""
try:
# Check if any streams are using this template
streams_using_template = []
for stream in stream_store.get_all_streams():
if isinstance(stream, ScreenCapturePictureSource) and stream.capture_template_id == template_id:
streams_using_template.append(stream.name)
if streams_using_template:
stream_list = ", ".join(streams_using_template)
raise HTTPException(
status_code=409,
detail=f"Cannot delete template: it is used by the following stream(s): {stream_list}. "
f"Please reassign these streams to a different template before deleting."
)
# Proceed with deletion
template_store.delete_template(template_id)
except HTTPException:
raise # Re-raise HTTP exceptions as-is
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to delete template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/capture-engines", response_model=EngineListResponse, tags=["Templates"])
async def list_engines(_auth: AuthRequired):
"""List available capture engines on this system.
Returns all registered engines that are available on the current platform.
"""
try:
available_engine_types = EngineRegistry.get_available_engines()
engines = []
for engine_type in available_engine_types:
engine_class = EngineRegistry.get_engine(engine_type)
engines.append(
EngineInfo(
type=engine_type,
name=engine_type.upper(),
default_config=engine_class.get_default_config(),
available=True,
)
)
return EngineListResponse(engines=engines, count=len(engines))
except Exception as e:
logger.error(f"Failed to list engines: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/capture-templates/test", response_model=TemplateTestResponse, tags=["Templates"])
async def test_template(
test_request: TemplateTestRequest,
_auth: AuthRequired,
processor_manager: ProcessorManager = Depends(get_processor_manager),
device_store: DeviceStore = Depends(get_device_store),
):
"""Test a capture template configuration.
Temporarily instantiates an engine with the provided configuration,
captures frames for the specified duration, and returns actual FPS metrics.
"""
stream = None
try:
# Validate engine type
if test_request.engine_type not in EngineRegistry.get_available_engines():
raise HTTPException(
status_code=400,
detail=f"Engine '{test_request.engine_type}' is not available on this system"
)
# Check if display is already being captured
locked_device_id = processor_manager.get_display_lock_info(test_request.display_index)
if locked_device_id:
# Get device info for better error message
try:
device = device_store.get_device(locked_device_id)
device_name = device.name
except Exception:
device_name = locked_device_id
raise HTTPException(
status_code=409,
detail=(
f"Display {test_request.display_index} is currently being captured by device "
f"'{device_name}'. Please stop the device processing before testing this template."
)
)
# Create and initialize capture stream
stream = EngineRegistry.create_stream(
test_request.engine_type, test_request.display_index, test_request.engine_config
)
stream.initialize()
# Run sustained capture test
logger.info(f"Starting {test_request.capture_duration}s capture test with {test_request.engine_type}")
frame_count = 0
total_capture_time = 0.0
last_frame = None
start_time = time.perf_counter()
end_time = start_time + test_request.capture_duration
while time.perf_counter() < end_time:
capture_start = time.perf_counter()
screen_capture = stream.capture_frame()
capture_elapsed = time.perf_counter() - capture_start
# Skip if no new frame (screen unchanged)
if screen_capture is None:
continue
total_capture_time += capture_elapsed
frame_count += 1
last_frame = screen_capture
actual_duration = time.perf_counter() - start_time
logger.info(f"Captured {frame_count} frames in {actual_duration:.2f}s")
# Use the last captured frame for preview
if last_frame is None:
raise RuntimeError("No frames captured during test")
# Convert numpy array to PIL Image
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
else:
raise ValueError("Unexpected image format from engine")
# Create thumbnail (640px wide, maintain aspect ratio)
thumbnail_width = 640
aspect_ratio = pil_image.height / pil_image.width
thumbnail_height = int(thumbnail_width * aspect_ratio)
thumbnail = pil_image.copy()
thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS)
# Encode thumbnail as JPEG
img_buffer = io.BytesIO()
thumbnail.save(img_buffer, format='JPEG', quality=85)
img_buffer.seek(0)
thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}"
# Encode full-resolution image as JPEG
full_buffer = io.BytesIO()
pil_image.save(full_buffer, format='JPEG', quality=90)
full_buffer.seek(0)
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
full_data_uri = f"data:image/jpeg;base64,{full_b64}"
# Calculate metrics
actual_fps = frame_count / actual_duration if actual_duration > 0 else 0
avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0
width, height = pil_image.size
return TemplateTestResponse(
full_capture=CaptureImage(
image=thumbnail_data_uri,
full_image=full_data_uri,
width=width,
height=height,
thumbnail_width=thumbnail_width,
thumbnail_height=thumbnail_height,
),
border_extraction=None,
performance=PerformanceMetrics(
capture_duration_s=actual_duration,
frame_count=frame_count,
actual_fps=actual_fps,
avg_capture_time_ms=avg_capture_time_ms,
),
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
raise HTTPException(status_code=500, detail=f"Engine error: {str(e)}")
except Exception as e:
logger.error(f"Failed to test template: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
finally:
if stream:
try:
stream.cleanup()
except Exception as e:
logger.error(f"Error cleaning up test stream: {e}")
# ===== FILTER TYPE ENDPOINTS =====
@router.get("/api/v1/filters", response_model=FilterTypeListResponse, tags=["Filters"])
async def list_filter_types(_auth: AuthRequired):
"""List all available postprocessing filter types and their options schemas."""
all_filters = FilterRegistry.get_all()
responses = []
for filter_id, filter_cls in all_filters.items():
schema = filter_cls.get_options_schema()
responses.append(FilterTypeResponse(
filter_id=filter_cls.filter_id,
filter_name=filter_cls.filter_name,
options_schema=[
FilterOptionDefSchema(
key=opt.key,
label=opt.label,
type=opt.option_type,
default=opt.default,
min_value=opt.min_value,
max_value=opt.max_value,
step=opt.step,
)
for opt in schema
],
))
return FilterTypeListResponse(filters=responses, count=len(responses))

View File

@@ -1,482 +0,0 @@
"""Pydantic schemas for API request and response models."""
from datetime import datetime
from typing import Any, Dict, List, Literal, Optional
from pydantic import BaseModel, Field, HttpUrl
from wled_controller.core.processor_manager import DEFAULT_STATE_CHECK_INTERVAL
# Health and Version Schemas
class HealthResponse(BaseModel):
"""Health check response."""
status: Literal["healthy", "unhealthy"] = Field(description="Service health status")
timestamp: datetime = Field(description="Current server time")
version: str = Field(description="Application version")
class VersionResponse(BaseModel):
"""Version information response."""
version: str = Field(description="Application version")
python_version: str = Field(description="Python version")
api_version: str = Field(description="API version")
# Display Schemas
class DisplayInfo(BaseModel):
"""Display/monitor information."""
index: int = Field(description="Display index")
name: str = Field(description="Display name")
width: int = Field(description="Display width in pixels")
height: int = Field(description="Display height in pixels")
x: int = Field(description="Display X position")
y: int = Field(description="Display Y position")
is_primary: bool = Field(default=False, description="Whether this is the primary display")
refresh_rate: int = Field(description="Display refresh rate in Hz")
class DisplayListResponse(BaseModel):
"""List of available displays."""
displays: List[DisplayInfo] = Field(description="Available displays")
count: int = Field(description="Number of displays")
# Device Schemas
class DeviceCreate(BaseModel):
"""Request to create/attach a WLED device."""
name: str = Field(description="Device name", min_length=1, max_length=100)
url: str = Field(description="WLED device URL (e.g., http://192.168.1.100)")
capture_template_id: Optional[str] = Field(None, description="Capture template ID (uses first available if not set or invalid)")
class DeviceUpdate(BaseModel):
"""Request to update device information."""
name: Optional[str] = Field(None, description="Device name", min_length=1, max_length=100)
url: Optional[str] = Field(None, description="WLED device URL")
enabled: Optional[bool] = Field(None, description="Whether device is enabled")
capture_template_id: Optional[str] = Field(None, description="Capture template ID (legacy)")
picture_stream_id: Optional[str] = Field(None, description="Picture stream ID")
class ColorCorrection(BaseModel):
"""Color correction settings."""
gamma: float = Field(default=2.2, description="Gamma correction", ge=0.1, le=5.0)
saturation: float = Field(default=1.0, description="Saturation multiplier", ge=0.0, le=2.0)
brightness: float = Field(default=1.0, description="Brightness multiplier", ge=0.0, le=1.0)
class ProcessingSettings(BaseModel):
"""Processing settings for a device."""
display_index: int = Field(default=0, description="Display to capture", ge=0)
fps: int = Field(default=30, description="Target frames per second", ge=10, le=90)
border_width: int = Field(default=10, description="Border width in pixels", ge=1, le=100)
interpolation_mode: str = Field(default="average", description="LED color interpolation mode (average, median, dominant)")
brightness: float = Field(default=1.0, description="Global brightness (0.0-1.0)", ge=0.0, le=1.0)
smoothing: float = Field(default=0.3, description="Temporal smoothing factor (0.0=none, 1.0=full)", ge=0.0, le=1.0)
state_check_interval: int = Field(
default=DEFAULT_STATE_CHECK_INTERVAL, ge=5, le=600,
description="Seconds between WLED health checks"
)
color_correction: Optional[ColorCorrection] = Field(
default_factory=ColorCorrection,
description="Color correction settings"
)
class Calibration(BaseModel):
"""Calibration configuration for pixel-to-LED mapping."""
layout: Literal["clockwise", "counterclockwise"] = Field(
default="clockwise",
description="LED strip layout direction"
)
start_position: Literal["top_left", "top_right", "bottom_left", "bottom_right"] = Field(
default="bottom_left",
description="Starting corner of the LED strip"
)
offset: int = Field(
default=0,
ge=0,
description="Number of LEDs from physical LED 0 to start corner (along strip direction)"
)
leds_top: int = Field(default=0, ge=0, description="Number of LEDs on the top edge")
leds_right: int = Field(default=0, ge=0, description="Number of LEDs on the right edge")
leds_bottom: int = Field(default=0, ge=0, description="Number of LEDs on the bottom edge")
leds_left: int = Field(default=0, ge=0, description="Number of LEDs on the left edge")
# Per-edge span: fraction of screen side covered by LEDs (0.01.0)
span_top_start: float = Field(default=0.0, ge=0.0, le=1.0, description="Start of top edge coverage")
span_top_end: float = Field(default=1.0, ge=0.0, le=1.0, description="End of top edge coverage")
span_right_start: float = Field(default=0.0, ge=0.0, le=1.0, description="Start of right edge coverage")
span_right_end: float = Field(default=1.0, ge=0.0, le=1.0, description="End of right edge coverage")
span_bottom_start: float = Field(default=0.0, ge=0.0, le=1.0, description="Start of bottom edge coverage")
span_bottom_end: float = Field(default=1.0, ge=0.0, le=1.0, description="End of bottom edge coverage")
span_left_start: float = Field(default=0.0, ge=0.0, le=1.0, description="Start of left edge coverage")
span_left_end: float = Field(default=1.0, ge=0.0, le=1.0, description="End of left edge coverage")
class CalibrationTestModeRequest(BaseModel):
"""Request to set calibration test mode with multiple edges."""
edges: Dict[str, List[int]] = Field(
default_factory=dict,
description="Map of active edge names to RGB colors. "
"E.g. {'top': [255, 0, 0], 'left': [255, 255, 0]}. "
"Empty dict = exit test mode."
)
class CalibrationTestModeResponse(BaseModel):
"""Response for calibration test mode."""
test_mode: bool = Field(description="Whether test mode is active")
active_edges: List[str] = Field(default_factory=list, description="Currently lit edges")
device_id: str = Field(description="Device ID")
class DeviceResponse(BaseModel):
"""Device information response."""
id: str = Field(description="Device ID")
name: str = Field(description="Device name")
url: str = Field(description="WLED device URL")
led_count: int = Field(description="Total number of LEDs")
enabled: bool = Field(description="Whether device is enabled")
status: Literal["connected", "disconnected", "error"] = Field(
description="Connection status"
)
settings: ProcessingSettings = Field(description="Processing settings")
calibration: Optional[Calibration] = Field(None, description="Calibration configuration")
capture_template_id: str = Field(description="ID of assigned capture template (legacy)")
picture_stream_id: str = Field(default="", description="ID of assigned picture stream")
created_at: datetime = Field(description="Creation timestamp")
updated_at: datetime = Field(description="Last update timestamp")
class DeviceListResponse(BaseModel):
"""List of devices response."""
devices: List[DeviceResponse] = Field(description="List of devices")
count: int = Field(description="Number of devices")
# Processing State Schemas
class ProcessingState(BaseModel):
"""Processing state for a device."""
device_id: str = Field(description="Device ID")
processing: bool = Field(description="Whether processing is active")
fps_actual: Optional[float] = Field(None, description="Actual FPS achieved")
fps_target: int = Field(description="Target FPS")
display_index: int = Field(description="Current display index")
last_update: Optional[datetime] = Field(None, description="Last successful update")
errors: List[str] = Field(default_factory=list, description="Recent errors")
wled_online: bool = Field(default=False, description="Whether WLED device is reachable")
wled_latency_ms: Optional[float] = Field(None, description="WLED health check latency in ms")
wled_name: Optional[str] = Field(None, description="WLED device name")
wled_version: Optional[str] = Field(None, description="WLED firmware version")
wled_led_count: Optional[int] = Field(None, description="LED count reported by WLED device")
wled_rgbw: Optional[bool] = Field(None, description="Whether WLED device uses RGBW LEDs")
wled_led_type: Optional[str] = Field(None, description="LED chip type (e.g. WS2812B, SK6812 RGBW)")
wled_last_checked: Optional[datetime] = Field(None, description="Last health check time")
wled_error: Optional[str] = Field(None, description="Last health check error")
class MetricsResponse(BaseModel):
"""Device metrics response."""
device_id: str = Field(description="Device ID")
processing: bool = Field(description="Whether processing is active")
fps_actual: Optional[float] = Field(None, description="Actual FPS")
fps_target: int = Field(description="Target FPS")
uptime_seconds: float = Field(description="Processing uptime in seconds")
frames_processed: int = Field(description="Total frames processed")
errors_count: int = Field(description="Total error count")
last_error: Optional[str] = Field(None, description="Last error message")
last_update: Optional[datetime] = Field(None, description="Last update timestamp")
# Error Schemas
class ErrorResponse(BaseModel):
"""Error response."""
error: str = Field(description="Error type")
message: str = Field(description="Error message")
detail: Optional[Dict] = Field(None, description="Additional error details")
timestamp: datetime = Field(default_factory=datetime.utcnow, description="Error timestamp")
# Capture Template Schemas
class TemplateCreate(BaseModel):
"""Request to create a capture template."""
name: str = Field(description="Template name", min_length=1, max_length=100)
engine_type: str = Field(description="Engine type (e.g., 'mss', 'dxcam', 'wgc')", min_length=1)
engine_config: Dict = Field(default_factory=dict, description="Engine-specific configuration")
description: Optional[str] = Field(None, description="Template description", max_length=500)
class TemplateUpdate(BaseModel):
"""Request to update a template."""
name: Optional[str] = Field(None, description="Template name", min_length=1, max_length=100)
engine_type: Optional[str] = Field(None, description="Capture engine type (mss, dxcam, wgc)")
engine_config: Optional[Dict] = Field(None, description="Engine-specific configuration")
description: Optional[str] = Field(None, description="Template description", max_length=500)
class TemplateResponse(BaseModel):
"""Template information response."""
id: str = Field(description="Template ID")
name: str = Field(description="Template name")
engine_type: str = Field(description="Engine type identifier")
engine_config: Dict = Field(description="Engine-specific configuration")
created_at: datetime = Field(description="Creation timestamp")
updated_at: datetime = Field(description="Last update timestamp")
description: Optional[str] = Field(None, description="Template description")
class TemplateListResponse(BaseModel):
"""List of templates response."""
templates: List[TemplateResponse] = Field(description="List of templates")
count: int = Field(description="Number of templates")
class EngineInfo(BaseModel):
"""Capture engine information."""
type: str = Field(description="Engine type identifier (e.g., 'mss', 'dxcam')")
name: str = Field(description="Human-readable engine name")
default_config: Dict = Field(description="Default configuration for this engine")
available: bool = Field(description="Whether engine is available on this system")
class EngineListResponse(BaseModel):
"""List of available engines response."""
engines: List[EngineInfo] = Field(description="Available capture engines")
count: int = Field(description="Number of engines")
class TemplateAssignment(BaseModel):
"""Request to assign template to device."""
template_id: str = Field(description="Template ID to assign")
class TemplateTestRequest(BaseModel):
"""Request to test a capture template."""
engine_type: str = Field(description="Capture engine type to test")
engine_config: Dict = Field(default={}, description="Engine configuration")
display_index: int = Field(description="Display index to capture")
border_width: int = Field(default=10, ge=1, le=100, description="Border width in pixels")
capture_duration: float = Field(default=5.0, ge=1.0, le=30.0, description="Duration to capture in seconds")
class CaptureImage(BaseModel):
"""Captured image with metadata."""
image: str = Field(description="Base64-encoded thumbnail image data")
full_image: Optional[str] = Field(None, description="Base64-encoded full-resolution image data")
width: int = Field(description="Original image width in pixels")
height: int = Field(description="Original image height in pixels")
thumbnail_width: Optional[int] = Field(None, description="Thumbnail width (if resized)")
thumbnail_height: Optional[int] = Field(None, description="Thumbnail height (if resized)")
class BorderExtraction(BaseModel):
"""Extracted border images."""
top: str = Field(description="Base64-encoded top border image")
right: str = Field(description="Base64-encoded right border image")
bottom: str = Field(description="Base64-encoded bottom border image")
left: str = Field(description="Base64-encoded left border image")
class PerformanceMetrics(BaseModel):
"""Performance metrics for template test."""
capture_duration_s: float = Field(description="Total capture duration in seconds")
frame_count: int = Field(description="Number of frames captured")
actual_fps: float = Field(description="Actual FPS (frame_count / duration)")
avg_capture_time_ms: float = Field(description="Average time per frame capture in milliseconds")
class TemplateTestResponse(BaseModel):
"""Response from template test."""
full_capture: CaptureImage = Field(description="Full screen capture with thumbnail")
border_extraction: Optional[BorderExtraction] = Field(None, description="Extracted border images (deprecated)")
performance: PerformanceMetrics = Field(description="Performance metrics")
# Filter Schemas
class FilterInstanceSchema(BaseModel):
"""A single filter instance with its configuration."""
filter_id: str = Field(description="Filter type identifier")
options: Dict[str, Any] = Field(default_factory=dict, description="Filter-specific options")
class FilterOptionDefSchema(BaseModel):
"""Describes a configurable option for a filter type."""
key: str = Field(description="Option key")
label: str = Field(description="Display label")
type: str = Field(description="Option type (float or int)")
default: Any = Field(description="Default value")
min_value: Any = Field(description="Minimum value")
max_value: Any = Field(description="Maximum value")
step: Any = Field(description="Step increment")
class FilterTypeResponse(BaseModel):
"""Available filter type with its options schema."""
filter_id: str = Field(description="Filter type identifier")
filter_name: str = Field(description="Display name")
options_schema: List[FilterOptionDefSchema] = Field(description="Configurable options")
class FilterTypeListResponse(BaseModel):
"""List of available filter types."""
filters: List[FilterTypeResponse] = Field(description="Available filter types")
count: int = Field(description="Number of filter types")
# Postprocessing Template Schemas
class PostprocessingTemplateCreate(BaseModel):
"""Request to create a postprocessing template."""
name: str = Field(description="Template name", min_length=1, max_length=100)
filters: List[FilterInstanceSchema] = Field(default_factory=list, description="Ordered list of filter instances")
description: Optional[str] = Field(None, description="Template description", max_length=500)
class PostprocessingTemplateUpdate(BaseModel):
"""Request to update a postprocessing template."""
name: Optional[str] = Field(None, description="Template name", min_length=1, max_length=100)
filters: Optional[List[FilterInstanceSchema]] = Field(None, description="Ordered list of filter instances")
description: Optional[str] = Field(None, description="Template description", max_length=500)
class PostprocessingTemplateResponse(BaseModel):
"""Postprocessing template information response."""
id: str = Field(description="Template ID")
name: str = Field(description="Template name")
filters: List[FilterInstanceSchema] = Field(description="Ordered list of filter instances")
created_at: datetime = Field(description="Creation timestamp")
updated_at: datetime = Field(description="Last update timestamp")
description: Optional[str] = Field(None, description="Template description")
class PostprocessingTemplateListResponse(BaseModel):
"""List of postprocessing templates response."""
templates: List[PostprocessingTemplateResponse] = Field(description="List of postprocessing templates")
count: int = Field(description="Number of templates")
# Picture Stream Schemas
class PictureStreamCreate(BaseModel):
"""Request to create a picture stream."""
name: str = Field(description="Stream name", min_length=1, max_length=100)
stream_type: Literal["raw", "processed", "static_image"] = Field(description="Stream type")
display_index: Optional[int] = Field(None, description="Display index (raw streams)", ge=0)
capture_template_id: Optional[str] = Field(None, description="Capture template ID (raw streams)")
target_fps: Optional[int] = Field(None, description="Target FPS (raw streams)", ge=10, le=90)
source_stream_id: Optional[str] = Field(None, description="Source stream ID (processed streams)")
postprocessing_template_id: Optional[str] = Field(None, description="Postprocessing template ID (processed streams)")
image_source: Optional[str] = Field(None, description="Image URL or file path (static_image streams)")
description: Optional[str] = Field(None, description="Stream description", max_length=500)
class PictureStreamUpdate(BaseModel):
"""Request to update a picture stream."""
name: Optional[str] = Field(None, description="Stream name", min_length=1, max_length=100)
display_index: Optional[int] = Field(None, description="Display index (raw streams)", ge=0)
capture_template_id: Optional[str] = Field(None, description="Capture template ID (raw streams)")
target_fps: Optional[int] = Field(None, description="Target FPS (raw streams)", ge=10, le=90)
source_stream_id: Optional[str] = Field(None, description="Source stream ID (processed streams)")
postprocessing_template_id: Optional[str] = Field(None, description="Postprocessing template ID (processed streams)")
image_source: Optional[str] = Field(None, description="Image URL or file path (static_image streams)")
description: Optional[str] = Field(None, description="Stream description", max_length=500)
class PictureStreamResponse(BaseModel):
"""Picture stream information response."""
id: str = Field(description="Stream ID")
name: str = Field(description="Stream name")
stream_type: str = Field(description="Stream type (raw, processed, or static_image)")
display_index: Optional[int] = Field(None, description="Display index")
capture_template_id: Optional[str] = Field(None, description="Capture template ID")
target_fps: Optional[int] = Field(None, description="Target FPS")
source_stream_id: Optional[str] = Field(None, description="Source stream ID")
postprocessing_template_id: Optional[str] = Field(None, description="Postprocessing template ID")
image_source: Optional[str] = Field(None, description="Image URL or file path")
created_at: datetime = Field(description="Creation timestamp")
updated_at: datetime = Field(description="Last update timestamp")
description: Optional[str] = Field(None, description="Stream description")
class PictureStreamListResponse(BaseModel):
"""List of picture streams response."""
streams: List[PictureStreamResponse] = Field(description="List of picture streams")
count: int = Field(description="Number of streams")
class PictureStreamTestRequest(BaseModel):
"""Request to test a picture stream."""
capture_duration: float = Field(default=5.0, ge=1.0, le=30.0, description="Duration to capture in seconds")
border_width: int = Field(default=10, ge=1, le=100, description="Border width in pixels for preview")
class PPTemplateTestRequest(BaseModel):
"""Request to test a postprocessing template against a source stream."""
source_stream_id: str = Field(description="ID of the source picture stream to capture from")
capture_duration: float = Field(default=5.0, ge=1.0, le=30.0, description="Duration to capture in seconds")
class ImageValidateRequest(BaseModel):
"""Request to validate an image source (URL or file path)."""
image_source: str = Field(description="Image URL or local file path")
class ImageValidateResponse(BaseModel):
"""Response from image validation."""
valid: bool = Field(description="Whether the image source is accessible and valid")
width: Optional[int] = Field(None, description="Image width in pixels")
height: Optional[int] = Field(None, description="Image height in pixels")
preview: Optional[str] = Field(None, description="Base64-encoded JPEG thumbnail")
error: Optional[str] = Field(None, description="Error message if invalid")

View File

@@ -0,0 +1,107 @@
"""Pydantic schemas for API request and response models."""
from .common import (
CaptureImage,
BorderExtraction,
ErrorResponse,
PerformanceMetrics,
TemplateTestResponse,
)
from .system import (
DisplayInfo,
DisplayListResponse,
HealthResponse,
VersionResponse,
)
from .devices import (
Calibration,
CalibrationTestModeRequest,
CalibrationTestModeResponse,
ColorCorrection,
DeviceCreate,
DeviceListResponse,
DeviceResponse,
DeviceUpdate,
MetricsResponse,
ProcessingSettings,
ProcessingState,
)
from .templates import (
EngineInfo,
EngineListResponse,
TemplateAssignment,
TemplateCreate,
TemplateListResponse,
TemplateResponse,
TemplateTestRequest,
TemplateUpdate,
)
from .filters import (
FilterInstanceSchema,
FilterOptionDefSchema,
FilterTypeListResponse,
FilterTypeResponse,
)
from .postprocessing import (
PostprocessingTemplateCreate,
PostprocessingTemplateListResponse,
PostprocessingTemplateResponse,
PostprocessingTemplateUpdate,
PPTemplateTestRequest,
)
from .picture_sources import (
ImageValidateRequest,
ImageValidateResponse,
PictureSourceCreate,
PictureSourceListResponse,
PictureSourceResponse,
PictureSourceTestRequest,
PictureSourceUpdate,
)
__all__ = [
"CaptureImage",
"BorderExtraction",
"ErrorResponse",
"PerformanceMetrics",
"TemplateTestResponse",
"DisplayInfo",
"DisplayListResponse",
"HealthResponse",
"VersionResponse",
"Calibration",
"CalibrationTestModeRequest",
"CalibrationTestModeResponse",
"ColorCorrection",
"DeviceCreate",
"DeviceListResponse",
"DeviceResponse",
"DeviceUpdate",
"MetricsResponse",
"ProcessingSettings",
"ProcessingState",
"EngineInfo",
"EngineListResponse",
"TemplateAssignment",
"TemplateCreate",
"TemplateListResponse",
"TemplateResponse",
"TemplateTestRequest",
"TemplateUpdate",
"FilterInstanceSchema",
"FilterOptionDefSchema",
"FilterTypeListResponse",
"FilterTypeResponse",
"PostprocessingTemplateCreate",
"PostprocessingTemplateListResponse",
"PostprocessingTemplateResponse",
"PostprocessingTemplateUpdate",
"PPTemplateTestRequest",
"ImageValidateRequest",
"ImageValidateResponse",
"PictureSourceCreate",
"PictureSourceListResponse",
"PictureSourceResponse",
"PictureSourceTestRequest",
"PictureSourceUpdate",
]

View File

@@ -0,0 +1,52 @@
"""Shared schemas used across multiple route modules."""
from datetime import datetime
from typing import Dict, List, Optional
from pydantic import BaseModel, Field
class ErrorResponse(BaseModel):
"""Error response."""
error: str = Field(description="Error type")
message: str = Field(description="Error message")
detail: Optional[Dict] = Field(None, description="Additional error details")
timestamp: datetime = Field(default_factory=datetime.utcnow, description="Error timestamp")
class CaptureImage(BaseModel):
"""Captured image with metadata."""
image: str = Field(description="Base64-encoded thumbnail image data")
full_image: Optional[str] = Field(None, description="Base64-encoded full-resolution image data")
width: int = Field(description="Original image width in pixels")
height: int = Field(description="Original image height in pixels")
thumbnail_width: Optional[int] = Field(None, description="Thumbnail width (if resized)")
thumbnail_height: Optional[int] = Field(None, description="Thumbnail height (if resized)")
class BorderExtraction(BaseModel):
"""Extracted border images."""
top: str = Field(description="Base64-encoded top border image")
right: str = Field(description="Base64-encoded right border image")
bottom: str = Field(description="Base64-encoded bottom border image")
left: str = Field(description="Base64-encoded left border image")
class PerformanceMetrics(BaseModel):
"""Performance metrics for template test."""
capture_duration_s: float = Field(description="Total capture duration in seconds")
frame_count: int = Field(description="Number of frames captured")
actual_fps: float = Field(description="Actual FPS (frame_count / duration)")
avg_capture_time_ms: float = Field(description="Average time per frame capture in milliseconds")
class TemplateTestResponse(BaseModel):
"""Response from template test."""
full_capture: CaptureImage = Field(description="Full screen capture with thumbnail")
border_extraction: Optional[BorderExtraction] = Field(None, description="Extracted border images (deprecated)")
performance: PerformanceMetrics = Field(description="Performance metrics")

View File

@@ -0,0 +1,161 @@
"""Device-related schemas (CRUD, settings, calibration, processing state, metrics)."""
from datetime import datetime
from typing import Dict, List, Literal, Optional
from pydantic import BaseModel, Field
from wled_controller.core.processor_manager import DEFAULT_STATE_CHECK_INTERVAL
class DeviceCreate(BaseModel):
"""Request to create/attach a WLED device."""
name: str = Field(description="Device name", min_length=1, max_length=100)
url: str = Field(description="WLED device URL (e.g., http://192.168.1.100)")
class DeviceUpdate(BaseModel):
"""Request to update device information."""
name: Optional[str] = Field(None, description="Device name", min_length=1, max_length=100)
url: Optional[str] = Field(None, description="WLED device URL")
enabled: Optional[bool] = Field(None, description="Whether device is enabled")
picture_source_id: Optional[str] = Field(None, description="Picture source ID")
class ColorCorrection(BaseModel):
"""Color correction settings."""
gamma: float = Field(default=2.2, description="Gamma correction", ge=0.1, le=5.0)
saturation: float = Field(default=1.0, description="Saturation multiplier", ge=0.0, le=2.0)
brightness: float = Field(default=1.0, description="Brightness multiplier", ge=0.0, le=1.0)
class ProcessingSettings(BaseModel):
"""Processing settings for a device."""
display_index: int = Field(default=0, description="Display to capture", ge=0)
fps: int = Field(default=30, description="Target frames per second", ge=10, le=90)
border_width: int = Field(default=10, description="Border width in pixels", ge=1, le=100)
interpolation_mode: str = Field(default="average", description="LED color interpolation mode (average, median, dominant)")
brightness: float = Field(default=1.0, description="Global brightness (0.0-1.0)", ge=0.0, le=1.0)
smoothing: float = Field(default=0.3, description="Temporal smoothing factor (0.0=none, 1.0=full)", ge=0.0, le=1.0)
state_check_interval: int = Field(
default=DEFAULT_STATE_CHECK_INTERVAL, ge=5, le=600,
description="Seconds between WLED health checks"
)
color_correction: Optional[ColorCorrection] = Field(
default_factory=ColorCorrection,
description="Color correction settings"
)
class Calibration(BaseModel):
"""Calibration configuration for pixel-to-LED mapping."""
layout: Literal["clockwise", "counterclockwise"] = Field(
default="clockwise",
description="LED strip layout direction"
)
start_position: Literal["top_left", "top_right", "bottom_left", "bottom_right"] = Field(
default="bottom_left",
description="Starting corner of the LED strip"
)
offset: int = Field(
default=0,
ge=0,
description="Number of LEDs from physical LED 0 to start corner (along strip direction)"
)
leds_top: int = Field(default=0, ge=0, description="Number of LEDs on the top edge")
leds_right: int = Field(default=0, ge=0, description="Number of LEDs on the right edge")
leds_bottom: int = Field(default=0, ge=0, description="Number of LEDs on the bottom edge")
leds_left: int = Field(default=0, ge=0, description="Number of LEDs on the left edge")
# Per-edge span: fraction of screen side covered by LEDs (0.0-1.0)
span_top_start: float = Field(default=0.0, ge=0.0, le=1.0, description="Start of top edge coverage")
span_top_end: float = Field(default=1.0, ge=0.0, le=1.0, description="End of top edge coverage")
span_right_start: float = Field(default=0.0, ge=0.0, le=1.0, description="Start of right edge coverage")
span_right_end: float = Field(default=1.0, ge=0.0, le=1.0, description="End of right edge coverage")
span_bottom_start: float = Field(default=0.0, ge=0.0, le=1.0, description="Start of bottom edge coverage")
span_bottom_end: float = Field(default=1.0, ge=0.0, le=1.0, description="End of bottom edge coverage")
span_left_start: float = Field(default=0.0, ge=0.0, le=1.0, description="Start of left edge coverage")
span_left_end: float = Field(default=1.0, ge=0.0, le=1.0, description="End of left edge coverage")
class CalibrationTestModeRequest(BaseModel):
"""Request to set calibration test mode with multiple edges."""
edges: Dict[str, List[int]] = Field(
default_factory=dict,
description="Map of active edge names to RGB colors. "
"E.g. {'top': [255, 0, 0], 'left': [255, 255, 0]}. "
"Empty dict = exit test mode."
)
class CalibrationTestModeResponse(BaseModel):
"""Response for calibration test mode."""
test_mode: bool = Field(description="Whether test mode is active")
active_edges: List[str] = Field(default_factory=list, description="Currently lit edges")
device_id: str = Field(description="Device ID")
class DeviceResponse(BaseModel):
"""Device information response."""
id: str = Field(description="Device ID")
name: str = Field(description="Device name")
url: str = Field(description="WLED device URL")
led_count: int = Field(description="Total number of LEDs")
enabled: bool = Field(description="Whether device is enabled")
status: Literal["connected", "disconnected", "error"] = Field(
description="Connection status"
)
settings: ProcessingSettings = Field(description="Processing settings")
calibration: Optional[Calibration] = Field(None, description="Calibration configuration")
picture_source_id: str = Field(default="", description="ID of assigned picture source")
created_at: datetime = Field(description="Creation timestamp")
updated_at: datetime = Field(description="Last update timestamp")
class DeviceListResponse(BaseModel):
"""List of devices response."""
devices: List[DeviceResponse] = Field(description="List of devices")
count: int = Field(description="Number of devices")
class ProcessingState(BaseModel):
"""Processing state for a device."""
device_id: str = Field(description="Device ID")
processing: bool = Field(description="Whether processing is active")
fps_actual: Optional[float] = Field(None, description="Actual FPS achieved")
fps_target: int = Field(description="Target FPS")
display_index: int = Field(description="Current display index")
last_update: Optional[datetime] = Field(None, description="Last successful update")
errors: List[str] = Field(default_factory=list, description="Recent errors")
wled_online: bool = Field(default=False, description="Whether WLED device is reachable")
wled_latency_ms: Optional[float] = Field(None, description="WLED health check latency in ms")
wled_name: Optional[str] = Field(None, description="WLED device name")
wled_version: Optional[str] = Field(None, description="WLED firmware version")
wled_led_count: Optional[int] = Field(None, description="LED count reported by WLED device")
wled_rgbw: Optional[bool] = Field(None, description="Whether WLED device uses RGBW LEDs")
wled_led_type: Optional[str] = Field(None, description="LED chip type (e.g. WS2812B, SK6812 RGBW)")
wled_last_checked: Optional[datetime] = Field(None, description="Last health check time")
wled_error: Optional[str] = Field(None, description="Last health check error")
class MetricsResponse(BaseModel):
"""Device metrics response."""
device_id: str = Field(description="Device ID")
processing: bool = Field(description="Whether processing is active")
fps_actual: Optional[float] = Field(None, description="Actual FPS")
fps_target: int = Field(description="Target FPS")
uptime_seconds: float = Field(description="Processing uptime in seconds")
frames_processed: int = Field(description="Total frames processed")
errors_count: int = Field(description="Total error count")
last_error: Optional[str] = Field(None, description="Last error message")
last_update: Optional[datetime] = Field(None, description="Last update timestamp")

View File

@@ -0,0 +1,39 @@
"""Filter-related schemas."""
from typing import Any, Dict, List
from pydantic import BaseModel, Field
class FilterInstanceSchema(BaseModel):
"""A single filter instance with its configuration."""
filter_id: str = Field(description="Filter type identifier")
options: Dict[str, Any] = Field(default_factory=dict, description="Filter-specific options")
class FilterOptionDefSchema(BaseModel):
"""Describes a configurable option for a filter type."""
key: str = Field(description="Option key")
label: str = Field(description="Display label")
type: str = Field(description="Option type (float or int)")
default: Any = Field(description="Default value")
min_value: Any = Field(description="Minimum value")
max_value: Any = Field(description="Maximum value")
step: Any = Field(description="Step increment")
class FilterTypeResponse(BaseModel):
"""Available filter type with its options schema."""
filter_id: str = Field(description="Filter type identifier")
filter_name: str = Field(description="Display name")
options_schema: List[FilterOptionDefSchema] = Field(description="Configurable options")
class FilterTypeListResponse(BaseModel):
"""List of available filter types."""
filters: List[FilterTypeResponse] = Field(description="Available filter types")
count: int = Field(description="Number of filter types")

View File

@@ -0,0 +1,80 @@
"""Picture source schemas."""
from datetime import datetime
from typing import List, Literal, Optional
from pydantic import BaseModel, Field
class PictureSourceCreate(BaseModel):
"""Request to create a picture source."""
name: str = Field(description="Stream name", min_length=1, max_length=100)
stream_type: Literal["raw", "processed", "static_image"] = Field(description="Stream type")
display_index: Optional[int] = Field(None, description="Display index (raw streams)", ge=0)
capture_template_id: Optional[str] = Field(None, description="Capture template ID (raw streams)")
target_fps: Optional[int] = Field(None, description="Target FPS (raw streams)", ge=10, le=90)
source_stream_id: Optional[str] = Field(None, description="Source stream ID (processed streams)")
postprocessing_template_id: Optional[str] = Field(None, description="Postprocessing template ID (processed streams)")
image_source: Optional[str] = Field(None, description="Image URL or file path (static_image streams)")
description: Optional[str] = Field(None, description="Stream description", max_length=500)
class PictureSourceUpdate(BaseModel):
"""Request to update a picture source."""
name: Optional[str] = Field(None, description="Stream name", min_length=1, max_length=100)
display_index: Optional[int] = Field(None, description="Display index (raw streams)", ge=0)
capture_template_id: Optional[str] = Field(None, description="Capture template ID (raw streams)")
target_fps: Optional[int] = Field(None, description="Target FPS (raw streams)", ge=10, le=90)
source_stream_id: Optional[str] = Field(None, description="Source stream ID (processed streams)")
postprocessing_template_id: Optional[str] = Field(None, description="Postprocessing template ID (processed streams)")
image_source: Optional[str] = Field(None, description="Image URL or file path (static_image streams)")
description: Optional[str] = Field(None, description="Stream description", max_length=500)
class PictureSourceResponse(BaseModel):
"""Picture source information response."""
id: str = Field(description="Stream ID")
name: str = Field(description="Stream name")
stream_type: str = Field(description="Stream type (raw, processed, or static_image)")
display_index: Optional[int] = Field(None, description="Display index")
capture_template_id: Optional[str] = Field(None, description="Capture template ID")
target_fps: Optional[int] = Field(None, description="Target FPS")
source_stream_id: Optional[str] = Field(None, description="Source stream ID")
postprocessing_template_id: Optional[str] = Field(None, description="Postprocessing template ID")
image_source: Optional[str] = Field(None, description="Image URL or file path")
created_at: datetime = Field(description="Creation timestamp")
updated_at: datetime = Field(description="Last update timestamp")
description: Optional[str] = Field(None, description="Stream description")
class PictureSourceListResponse(BaseModel):
"""List of picture sources response."""
streams: List[PictureSourceResponse] = Field(description="List of picture sources")
count: int = Field(description="Number of streams")
class PictureSourceTestRequest(BaseModel):
"""Request to test a picture source."""
capture_duration: float = Field(default=5.0, ge=1.0, le=30.0, description="Duration to capture in seconds")
border_width: int = Field(default=10, ge=1, le=100, description="Border width in pixels for preview")
class ImageValidateRequest(BaseModel):
"""Request to validate an image source (URL or file path)."""
image_source: str = Field(description="Image URL or local file path")
class ImageValidateResponse(BaseModel):
"""Response from image validation."""
valid: bool = Field(description="Whether the image source is accessible and valid")
width: Optional[int] = Field(None, description="Image width in pixels")
height: Optional[int] = Field(None, description="Image height in pixels")
preview: Optional[str] = Field(None, description="Base64-encoded JPEG thumbnail")
error: Optional[str] = Field(None, description="Error message if invalid")

View File

@@ -0,0 +1,49 @@
"""Postprocessing template schemas."""
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field
from .filters import FilterInstanceSchema
class PostprocessingTemplateCreate(BaseModel):
"""Request to create a postprocessing template."""
name: str = Field(description="Template name", min_length=1, max_length=100)
filters: List[FilterInstanceSchema] = Field(default_factory=list, description="Ordered list of filter instances")
description: Optional[str] = Field(None, description="Template description", max_length=500)
class PostprocessingTemplateUpdate(BaseModel):
"""Request to update a postprocessing template."""
name: Optional[str] = Field(None, description="Template name", min_length=1, max_length=100)
filters: Optional[List[FilterInstanceSchema]] = Field(None, description="Ordered list of filter instances")
description: Optional[str] = Field(None, description="Template description", max_length=500)
class PostprocessingTemplateResponse(BaseModel):
"""Postprocessing template information response."""
id: str = Field(description="Template ID")
name: str = Field(description="Template name")
filters: List[FilterInstanceSchema] = Field(description="Ordered list of filter instances")
created_at: datetime = Field(description="Creation timestamp")
updated_at: datetime = Field(description="Last update timestamp")
description: Optional[str] = Field(None, description="Template description")
class PostprocessingTemplateListResponse(BaseModel):
"""List of postprocessing templates response."""
templates: List[PostprocessingTemplateResponse] = Field(description="List of postprocessing templates")
count: int = Field(description="Number of templates")
class PPTemplateTestRequest(BaseModel):
"""Request to test a postprocessing template against a source stream."""
source_stream_id: str = Field(description="ID of the source picture source to capture from")
capture_duration: float = Field(default=5.0, ge=1.0, le=30.0, description="Duration to capture in seconds")

View File

@@ -0,0 +1,42 @@
"""System-related schemas (health, version, displays)."""
from datetime import datetime
from typing import List, Literal
from pydantic import BaseModel, Field
class HealthResponse(BaseModel):
"""Health check response."""
status: Literal["healthy", "unhealthy"] = Field(description="Service health status")
timestamp: datetime = Field(description="Current server time")
version: str = Field(description="Application version")
class VersionResponse(BaseModel):
"""Version information response."""
version: str = Field(description="Application version")
python_version: str = Field(description="Python version")
api_version: str = Field(description="API version")
class DisplayInfo(BaseModel):
"""Display/monitor information."""
index: int = Field(description="Display index")
name: str = Field(description="Display name")
width: int = Field(description="Display width in pixels")
height: int = Field(description="Display height in pixels")
x: int = Field(description="Display X position")
y: int = Field(description="Display Y position")
is_primary: bool = Field(default=False, description="Whether this is the primary display")
refresh_rate: int = Field(description="Display refresh rate in Hz")
class DisplayListResponse(BaseModel):
"""List of available displays."""
displays: List[DisplayInfo] = Field(description="Available displays")
count: int = Field(description="Number of displays")

View File

@@ -0,0 +1,75 @@
"""Capture template and engine schemas."""
from datetime import datetime
from typing import Dict, List, Optional
from pydantic import BaseModel, Field
class TemplateCreate(BaseModel):
"""Request to create a capture template."""
name: str = Field(description="Template name", min_length=1, max_length=100)
engine_type: str = Field(description="Engine type (e.g., 'mss', 'dxcam', 'wgc')", min_length=1)
engine_config: Dict = Field(default_factory=dict, description="Engine-specific configuration")
description: Optional[str] = Field(None, description="Template description", max_length=500)
class TemplateUpdate(BaseModel):
"""Request to update a template."""
name: Optional[str] = Field(None, description="Template name", min_length=1, max_length=100)
engine_type: Optional[str] = Field(None, description="Capture engine type (mss, dxcam, wgc)")
engine_config: Optional[Dict] = Field(None, description="Engine-specific configuration")
description: Optional[str] = Field(None, description="Template description", max_length=500)
class TemplateResponse(BaseModel):
"""Template information response."""
id: str = Field(description="Template ID")
name: str = Field(description="Template name")
engine_type: str = Field(description="Engine type identifier")
engine_config: Dict = Field(description="Engine-specific configuration")
created_at: datetime = Field(description="Creation timestamp")
updated_at: datetime = Field(description="Last update timestamp")
description: Optional[str] = Field(None, description="Template description")
class TemplateListResponse(BaseModel):
"""List of templates response."""
templates: List[TemplateResponse] = Field(description="List of templates")
count: int = Field(description="Number of templates")
class EngineInfo(BaseModel):
"""Capture engine information."""
type: str = Field(description="Engine type identifier (e.g., 'mss', 'dxcam')")
name: str = Field(description="Human-readable engine name")
default_config: Dict = Field(description="Default configuration for this engine")
available: bool = Field(description="Whether engine is available on this system")
class EngineListResponse(BaseModel):
"""List of available engines response."""
engines: List[EngineInfo] = Field(description="Available capture engines")
count: int = Field(description="Number of engines")
class TemplateAssignment(BaseModel):
"""Request to assign template to device."""
template_id: str = Field(description="Template ID to assign")
class TemplateTestRequest(BaseModel):
"""Request to test a capture template."""
engine_type: str = Field(description="Capture engine type to test")
engine_config: Dict = Field(default={}, description="Engine configuration")
display_index: int = Field(description="Display index to capture")
border_width: int = Field(default=10, ge=1, le=100, description="Border width in pixels")
capture_duration: float = Field(default=5.0, ge=1.0, le=30.0, description="Duration to capture in seconds")