Files
ledgrab/server/src/wled_controller/api/routes.py
T
alexei.dolgolyov 472acd700a Add full-image lightbox and restore WLED state on stop
- Add GET /picture-streams/full-image endpoint to serve full-res images
- Click static image preview thumbnail to open full-res lightbox
- Snapshot WLED state (on/off, lor, AudioReactive) before streaming
- Restore saved WLED state when streaming stops

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 03:06:59 +03:00

1959 lines
73 KiB
Python

"""API routes and endpoints."""
import base64
import io
import sys
import time
from datetime import datetime
from typing import List, Dict, Any
import httpx
import numpy as np
from PIL import Image
from fastapi import APIRouter, HTTPException, Depends, Query
from fastapi.responses import Response
from wled_controller import __version__
from wled_controller.api.auth import AuthRequired
from wled_controller.api.schemas import (
HealthResponse,
VersionResponse,
DisplayListResponse,
DisplayInfo,
DeviceCreate,
DeviceUpdate,
DeviceResponse,
DeviceListResponse,
ProcessingSettings as ProcessingSettingsSchema,
Calibration as CalibrationSchema,
CalibrationTestModeRequest,
CalibrationTestModeResponse,
ProcessingState,
MetricsResponse,
TemplateCreate,
TemplateUpdate,
TemplateResponse,
TemplateListResponse,
EngineInfo,
EngineListResponse,
TemplateTestRequest,
TemplateTestResponse,
CaptureImage,
BorderExtraction,
PerformanceMetrics,
FilterInstanceSchema,
FilterOptionDefSchema,
FilterTypeResponse,
FilterTypeListResponse,
PostprocessingTemplateCreate,
PostprocessingTemplateUpdate,
PostprocessingTemplateResponse,
PostprocessingTemplateListResponse,
PictureStreamCreate,
PictureStreamUpdate,
PictureStreamResponse,
PictureStreamListResponse,
PictureStreamTestRequest,
PPTemplateTestRequest,
ImageValidateRequest,
ImageValidateResponse,
)
from wled_controller.config import get_config
from wled_controller.core.processor_manager import ProcessorManager, ProcessingSettings
from wled_controller.core.calibration import (
calibration_from_dict,
calibration_to_dict,
)
from wled_controller.storage import DeviceStore
from wled_controller.storage.template_store import TemplateStore
from wled_controller.storage.postprocessing_template_store import PostprocessingTemplateStore
from wled_controller.storage.picture_stream_store import PictureStreamStore
from wled_controller.core.capture_engines import EngineRegistry
from wled_controller.core.filters import FilterRegistry, FilterInstance, ImagePool
from wled_controller.utils import get_logger
from wled_controller.core.screen_capture import get_available_displays
logger = get_logger(__name__)
router = APIRouter()
# Global instances (initialized in main.py)
_device_store: DeviceStore | None = None
_template_store: TemplateStore | None = None
_pp_template_store: PostprocessingTemplateStore | None = None
_picture_stream_store: PictureStreamStore | None = None
_processor_manager: ProcessorManager | None = None
def get_device_store() -> DeviceStore:
"""Get device store dependency."""
if _device_store is None:
raise RuntimeError("Device store not initialized")
return _device_store
def get_template_store() -> TemplateStore:
"""Get template store dependency."""
if _template_store is None:
raise RuntimeError("Template store not initialized")
return _template_store
def get_pp_template_store() -> PostprocessingTemplateStore:
"""Get postprocessing template store dependency."""
if _pp_template_store is None:
raise RuntimeError("Postprocessing template store not initialized")
return _pp_template_store
def get_picture_stream_store() -> PictureStreamStore:
"""Get picture stream store dependency."""
if _picture_stream_store is None:
raise RuntimeError("Picture stream store not initialized")
return _picture_stream_store
def get_processor_manager() -> ProcessorManager:
"""Get processor manager dependency."""
if _processor_manager is None:
raise RuntimeError("Processor manager not initialized")
return _processor_manager
def init_dependencies(
device_store: DeviceStore,
template_store: TemplateStore,
processor_manager: ProcessorManager,
pp_template_store: PostprocessingTemplateStore | None = None,
picture_stream_store: PictureStreamStore | None = None,
):
"""Initialize global dependencies."""
global _device_store, _template_store, _processor_manager, _pp_template_store, _picture_stream_store
_device_store = device_store
_template_store = template_store
_processor_manager = processor_manager
_pp_template_store = pp_template_store
_picture_stream_store = picture_stream_store
@router.get("/health", response_model=HealthResponse, tags=["Health"])
async def health_check():
"""Check service health status.
Returns basic health information including status, version, and timestamp.
"""
logger.info("Health check requested")
return HealthResponse(
status="healthy",
timestamp=datetime.utcnow(),
version=__version__,
)
@router.get("/api/v1/version", response_model=VersionResponse, tags=["Info"])
async def get_version():
"""Get version information.
Returns application version, Python version, and API version.
"""
logger.info("Version info requested")
return VersionResponse(
version=__version__,
python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
api_version="v1",
)
@router.get("/api/v1/config/displays", response_model=DisplayListResponse, tags=["Config"])
async def get_displays(_: AuthRequired):
"""Get list of available displays.
Returns information about all available monitors/displays that can be captured.
"""
logger.info("Listing available displays")
try:
# Get available displays with all metadata (name, refresh rate, etc.)
display_dataclasses = get_available_displays()
# Convert dataclass DisplayInfo to Pydantic DisplayInfo
displays = [
DisplayInfo(
index=d.index,
name=d.name,
width=d.width,
height=d.height,
x=d.x,
y=d.y,
is_primary=d.is_primary,
refresh_rate=d.refresh_rate,
)
for d in display_dataclasses
]
logger.info(f"Found {len(displays)} displays")
return DisplayListResponse(
displays=displays,
count=len(displays),
)
except Exception as e:
logger.error(f"Failed to get displays: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to retrieve display information: {str(e)}"
)
# ===== DEVICE MANAGEMENT ENDPOINTS =====
@router.post("/api/v1/devices", response_model=DeviceResponse, tags=["Devices"], status_code=201)
async def create_device(
device_data: DeviceCreate,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
template_store: TemplateStore = Depends(get_template_store),
):
"""Create and attach a new WLED device."""
try:
logger.info(f"Creating device: {device_data.name}")
# Validate WLED device is reachable before adding
device_url = device_data.url.rstrip("/")
try:
async with httpx.AsyncClient(timeout=5) as client:
response = await client.get(f"{device_url}/json/info")
response.raise_for_status()
wled_info = response.json()
wled_led_count = wled_info.get("leds", {}).get("count")
if not wled_led_count or wled_led_count < 1:
raise HTTPException(
status_code=422,
detail=f"WLED device at {device_url} reported invalid LED count: {wled_led_count}"
)
logger.info(
f"WLED device reachable: {wled_info.get('name', 'Unknown')} "
f"v{wled_info.get('ver', '?')} ({wled_led_count} LEDs)"
)
except httpx.ConnectError:
raise HTTPException(
status_code=422,
detail=f"Cannot reach WLED device at {device_url}. Check the URL and ensure the device is powered on."
)
except httpx.TimeoutException:
raise HTTPException(
status_code=422,
detail=f"Connection to {device_url} timed out. Check network connectivity."
)
except Exception as e:
raise HTTPException(
status_code=422,
detail=f"Failed to connect to WLED device at {device_url}: {e}"
)
# Resolve capture template: use requested ID if valid, else first available
capture_template_id = None
if device_data.capture_template_id:
try:
template_store.get_template(device_data.capture_template_id)
capture_template_id = device_data.capture_template_id
except ValueError:
logger.warning(
f"Requested template '{device_data.capture_template_id}' not found, using first available"
)
if not capture_template_id:
all_templates = template_store.get_all_templates()
if all_templates:
capture_template_id = all_templates[0].id
else:
raise HTTPException(
status_code=500,
detail="No capture templates available. Please create one first."
)
# Create device in storage (LED count auto-detected from WLED)
device = store.create_device(
name=device_data.name,
url=device_data.url,
led_count=wled_led_count,
capture_template_id=capture_template_id,
)
# Add to processor manager
manager.add_device(
device_id=device.id,
device_url=device.url,
led_count=device.led_count,
settings=device.settings,
calibration=device.calibration,
)
return DeviceResponse(
id=device.id,
name=device.name,
url=device.url,
led_count=device.led_count,
enabled=device.enabled,
status="disconnected",
settings=ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
smoothing=device.settings.smoothing,
state_check_interval=device.settings.state_check_interval,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
capture_template_id=device.capture_template_id,
picture_stream_id=device.picture_stream_id,
created_at=device.created_at,
updated_at=device.updated_at,
)
except Exception as e:
logger.error(f"Failed to create device: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/devices", response_model=DeviceListResponse, tags=["Devices"])
async def list_devices(
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
):
"""List all attached WLED devices."""
try:
devices = store.get_all_devices()
device_responses = [
DeviceResponse(
id=device.id,
name=device.name,
url=device.url,
led_count=device.led_count,
enabled=device.enabled,
status="disconnected",
settings=ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
smoothing=device.settings.smoothing,
state_check_interval=device.settings.state_check_interval,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
capture_template_id=device.capture_template_id,
picture_stream_id=device.picture_stream_id,
created_at=device.created_at,
updated_at=device.updated_at,
)
for device in devices
]
return DeviceListResponse(devices=device_responses, count=len(device_responses))
except Exception as e:
logger.error(f"Failed to list devices: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/devices/{device_id}", response_model=DeviceResponse, tags=["Devices"])
async def get_device(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Get device details by ID."""
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
# Determine status
status = "connected" if manager.is_processing(device_id) else "disconnected"
return DeviceResponse(
id=device.id,
name=device.name,
url=device.url,
led_count=device.led_count,
enabled=device.enabled,
status=status,
settings=ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
smoothing=device.settings.smoothing,
state_check_interval=device.settings.state_check_interval,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
capture_template_id=device.capture_template_id,
picture_stream_id=device.picture_stream_id,
created_at=device.created_at,
updated_at=device.updated_at,
)
@router.put("/api/v1/devices/{device_id}", response_model=DeviceResponse, tags=["Devices"])
async def update_device(
device_id: str,
update_data: DeviceUpdate,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Update device information."""
try:
# Check if stream or template changed and device is processing (for hot-swap)
old_device = store.get_device(device_id)
stream_changed = (
update_data.picture_stream_id is not None
and update_data.picture_stream_id != old_device.picture_stream_id
)
template_changed = (
update_data.capture_template_id is not None
and update_data.capture_template_id != old_device.capture_template_id
)
was_processing = manager.is_processing(device_id)
# Update device
device = store.update_device(
device_id=device_id,
name=update_data.name,
url=update_data.url,
enabled=update_data.enabled,
capture_template_id=update_data.capture_template_id,
picture_stream_id=update_data.picture_stream_id,
)
# Sync processor state when stream/template changed
if stream_changed or template_changed:
if was_processing:
# Hot-swap: restart with new settings
logger.info(f"Hot-swapping stream/template for device {device_id}")
try:
await manager.stop_processing(device_id)
manager.remove_device(device_id)
manager.add_device(
device_id=device.id,
device_url=device.url,
led_count=device.led_count,
settings=device.settings,
calibration=device.calibration,
capture_template_id=device.capture_template_id,
picture_stream_id=device.picture_stream_id,
)
await manager.start_processing(device_id)
logger.info(f"Successfully hot-swapped stream/template for device {device_id}")
except Exception as e:
logger.error(f"Error during template hot-swap: {e}")
else:
# Not processing — update processor state so next start uses new values
manager.remove_device(device_id)
manager.add_device(
device_id=device.id,
device_url=device.url,
led_count=device.led_count,
settings=device.settings,
calibration=device.calibration,
capture_template_id=device.capture_template_id,
picture_stream_id=device.picture_stream_id,
)
return DeviceResponse(
id=device.id,
name=device.name,
url=device.url,
led_count=device.led_count,
enabled=device.enabled,
status="disconnected",
settings=ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
smoothing=device.settings.smoothing,
state_check_interval=device.settings.state_check_interval,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
capture_template_id=device.capture_template_id,
picture_stream_id=device.picture_stream_id,
created_at=device.created_at,
updated_at=device.updated_at,
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to update device: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/devices/{device_id}", status_code=204, tags=["Devices"])
async def delete_device(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Delete/detach a device."""
try:
# Stop processing if running
if manager.is_processing(device_id):
await manager.stop_processing(device_id)
# Remove from manager
manager.remove_device(device_id)
# Delete from storage
store.delete_device(device_id)
logger.info(f"Deleted device {device_id}")
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to delete device: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== PROCESSING CONTROL ENDPOINTS =====
@router.post("/api/v1/devices/{device_id}/start", tags=["Processing"])
async def start_processing(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Start screen processing for a device."""
try:
# Verify device exists
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
await manager.start_processing(device_id)
logger.info(f"Started processing for device {device_id}")
return {"status": "started", "device_id": device_id}
except RuntimeError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to start processing: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/devices/{device_id}/stop", tags=["Processing"])
async def stop_processing(
device_id: str,
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Stop screen processing for a device."""
try:
await manager.stop_processing(device_id)
logger.info(f"Stopped processing for device {device_id}")
return {"status": "stopped", "device_id": device_id}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to stop processing: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/devices/{device_id}/state", response_model=ProcessingState, tags=["Processing"])
async def get_processing_state(
device_id: str,
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Get current processing state for a device."""
try:
state = manager.get_state(device_id)
return ProcessingState(**state)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to get state: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== SETTINGS ENDPOINTS =====
@router.get("/api/v1/devices/{device_id}/settings", response_model=ProcessingSettingsSchema, tags=["Settings"])
async def get_settings(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
):
"""Get processing settings for a device."""
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
return ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
smoothing=device.settings.smoothing,
state_check_interval=device.settings.state_check_interval,
)
@router.put("/api/v1/devices/{device_id}/settings", response_model=ProcessingSettingsSchema, tags=["Settings"])
async def update_settings(
device_id: str,
settings: ProcessingSettingsSchema,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Update processing settings for a device.
Merges with existing settings so callers can send partial updates.
Only fields explicitly included in the request body are applied.
"""
try:
# Get existing device to merge settings
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
existing = device.settings
sent = settings.model_fields_set # fields the client actually sent
# Merge: only override fields the client explicitly provided
new_settings = ProcessingSettings(
display_index=settings.display_index if 'display_index' in sent else existing.display_index,
fps=settings.fps if 'fps' in sent else existing.fps,
border_width=settings.border_width if 'border_width' in sent else existing.border_width,
interpolation_mode=settings.interpolation_mode if 'interpolation_mode' in sent else existing.interpolation_mode,
brightness=settings.brightness if 'brightness' in sent else existing.brightness,
gamma=existing.gamma,
saturation=existing.saturation,
smoothing=settings.smoothing if 'smoothing' in sent else existing.smoothing,
state_check_interval=settings.state_check_interval if 'state_check_interval' in sent else existing.state_check_interval,
)
# Apply color_correction fields if explicitly sent
if 'color_correction' in sent and settings.color_correction:
cc_sent = settings.color_correction.model_fields_set
if 'brightness' in cc_sent:
new_settings.brightness = settings.color_correction.brightness
if 'gamma' in cc_sent:
new_settings.gamma = settings.color_correction.gamma
if 'saturation' in cc_sent:
new_settings.saturation = settings.color_correction.saturation
# Update in storage
device = store.update_device(device_id, settings=new_settings)
# Update in manager if device exists
try:
manager.update_settings(device_id, new_settings)
except ValueError:
# Device not in manager yet, that's ok
pass
return ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
smoothing=device.settings.smoothing,
state_check_interval=device.settings.state_check_interval,
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to update settings: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== WLED BRIGHTNESS ENDPOINT =====
@router.get("/api/v1/devices/{device_id}/brightness", tags=["Settings"])
async def get_device_brightness(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
):
"""Get current brightness from the WLED device."""
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
try:
async with httpx.AsyncClient(timeout=5.0) as http_client:
resp = await http_client.get(f"{device.url}/json/state")
resp.raise_for_status()
state = resp.json()
bri = state.get("bri", 255)
return {"brightness": bri}
except Exception as e:
logger.error(f"Failed to get WLED brightness for {device_id}: {e}")
raise HTTPException(status_code=502, detail=f"Failed to reach WLED device: {e}")
@router.put("/api/v1/devices/{device_id}/brightness", tags=["Settings"])
async def set_device_brightness(
device_id: str,
body: dict,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
):
"""Set brightness on the WLED device directly."""
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
bri = body.get("brightness")
if bri is None or not isinstance(bri, int) or not 0 <= bri <= 255:
raise HTTPException(status_code=400, detail="brightness must be an integer 0-255")
try:
async with httpx.AsyncClient(timeout=5.0) as http_client:
resp = await http_client.post(
f"{device.url}/json/state",
json={"bri": bri},
)
resp.raise_for_status()
return {"brightness": bri}
except Exception as e:
logger.error(f"Failed to set WLED brightness for {device_id}: {e}")
raise HTTPException(status_code=502, detail=f"Failed to reach WLED device: {e}")
# ===== CALIBRATION ENDPOINTS =====
@router.get("/api/v1/devices/{device_id}/calibration", response_model=CalibrationSchema, tags=["Calibration"])
async def get_calibration(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
):
"""Get calibration configuration for a device."""
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
return CalibrationSchema(**calibration_to_dict(device.calibration))
@router.put("/api/v1/devices/{device_id}/calibration", response_model=CalibrationSchema, tags=["Calibration"])
async def update_calibration(
device_id: str,
calibration_data: CalibrationSchema,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Update calibration configuration for a device."""
try:
# Convert schema to CalibrationConfig
calibration_dict = calibration_data.model_dump()
calibration = calibration_from_dict(calibration_dict)
# Update in storage
device = store.update_device(device_id, calibration=calibration)
# Update in manager if device exists
try:
manager.update_calibration(device_id, calibration)
except ValueError:
# Device not in manager yet, that's ok
pass
return CalibrationSchema(**calibration_to_dict(device.calibration))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to update calibration: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.put(
"/api/v1/devices/{device_id}/calibration/test",
response_model=CalibrationTestModeResponse,
tags=["Calibration"],
)
async def set_calibration_test_mode(
device_id: str,
body: CalibrationTestModeRequest,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Toggle calibration test mode for specific edges.
Send edges with colors to light them up, or empty edges dict to exit test mode.
While test mode is active, screen capture processing is paused.
"""
try:
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
# Validate edge names and colors
valid_edges = {"top", "right", "bottom", "left"}
for edge_name, color in body.edges.items():
if edge_name not in valid_edges:
raise HTTPException(
status_code=400,
detail=f"Invalid edge '{edge_name}'. Must be one of: {', '.join(valid_edges)}"
)
if len(color) != 3 or not all(0 <= c <= 255 for c in color):
raise HTTPException(
status_code=400,
detail=f"Invalid color for edge '{edge_name}'. Must be [R, G, B] with values 0-255."
)
await manager.set_test_mode(device_id, body.edges)
active_edges = list(body.edges.keys())
logger.info(
f"Test mode {'activated' if active_edges else 'deactivated'} "
f"for device {device_id}: {active_edges}"
)
return CalibrationTestModeResponse(
test_mode=len(active_edges) > 0,
active_edges=active_edges,
device_id=device_id,
)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to set test mode: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== METRICS ENDPOINTS =====
@router.get("/api/v1/devices/{device_id}/metrics", response_model=MetricsResponse, tags=["Metrics"])
async def get_metrics(
device_id: str,
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Get processing metrics for a device."""
try:
metrics = manager.get_metrics(device_id)
return MetricsResponse(**metrics)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to get metrics: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== CAPTURE TEMPLATE ENDPOINTS =====
@router.get("/api/v1/capture-templates", response_model=TemplateListResponse, tags=["Templates"])
async def list_templates(
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""List all capture templates."""
try:
templates = template_store.get_all_templates()
template_responses = [
TemplateResponse(
id=t.id,
name=t.name,
engine_type=t.engine_type,
engine_config=t.engine_config,
created_at=t.created_at,
updated_at=t.updated_at,
description=t.description,
)
for t in templates
]
return TemplateListResponse(
templates=template_responses,
count=len(template_responses),
)
except Exception as e:
logger.error(f"Failed to list templates: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/capture-templates", response_model=TemplateResponse, tags=["Templates"], status_code=201)
async def create_template(
template_data: TemplateCreate,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""Create a new capture template."""
try:
template = template_store.create_template(
name=template_data.name,
engine_type=template_data.engine_type,
engine_config=template_data.engine_config,
description=template_data.description,
)
return TemplateResponse(
id=template.id,
name=template.name,
engine_type=template.engine_type,
engine_config=template.engine_config,
created_at=template.created_at,
updated_at=template.updated_at,
description=template.description,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to create template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/capture-templates/{template_id}", response_model=TemplateResponse, tags=["Templates"])
async def get_template(
template_id: str,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""Get template by ID."""
try:
template = template_store.get_template(template_id)
except ValueError:
raise HTTPException(status_code=404, detail=f"Template {template_id} not found")
return TemplateResponse(
id=template.id,
name=template.name,
engine_type=template.engine_type,
engine_config=template.engine_config,
created_at=template.created_at,
updated_at=template.updated_at,
description=template.description,
)
@router.put("/api/v1/capture-templates/{template_id}", response_model=TemplateResponse, tags=["Templates"])
async def update_template(
template_id: str,
update_data: TemplateUpdate,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""Update a template."""
try:
template = template_store.update_template(
template_id=template_id,
name=update_data.name,
engine_type=update_data.engine_type,
engine_config=update_data.engine_config,
description=update_data.description,
)
return TemplateResponse(
id=template.id,
name=template.name,
engine_type=template.engine_type,
engine_config=template.engine_config,
created_at=template.created_at,
updated_at=template.updated_at,
description=template.description,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to update template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/capture-templates/{template_id}", status_code=204, tags=["Templates"])
async def delete_template(
template_id: str,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
stream_store: PictureStreamStore = Depends(get_picture_stream_store),
):
"""Delete a template.
Validates that no streams are currently using this template before deletion.
"""
try:
# Check if any streams are using this template
streams_using_template = []
for stream in stream_store.get_all_streams():
if stream.capture_template_id == template_id:
streams_using_template.append(stream.name)
if streams_using_template:
stream_list = ", ".join(streams_using_template)
raise HTTPException(
status_code=409,
detail=f"Cannot delete template: it is used by the following stream(s): {stream_list}. "
f"Please reassign these streams to a different template before deleting."
)
# Proceed with deletion
template_store.delete_template(template_id)
except HTTPException:
raise # Re-raise HTTP exceptions as-is
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to delete template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/capture-engines", response_model=EngineListResponse, tags=["Templates"])
async def list_engines(_auth: AuthRequired):
"""List available capture engines on this system.
Returns all registered engines that are available on the current platform.
"""
try:
available_engine_types = EngineRegistry.get_available_engines()
engines = []
for engine_type in available_engine_types:
engine_class = EngineRegistry.get_engine(engine_type)
engines.append(
EngineInfo(
type=engine_type,
name=engine_type.upper(),
default_config=engine_class.get_default_config(),
available=True,
)
)
return EngineListResponse(engines=engines, count=len(engines))
except Exception as e:
logger.error(f"Failed to list engines: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/capture-templates/test", response_model=TemplateTestResponse, tags=["Templates"])
async def test_template(
test_request: TemplateTestRequest,
_auth: AuthRequired,
processor_manager: ProcessorManager = Depends(get_processor_manager),
device_store: DeviceStore = Depends(get_device_store),
):
"""Test a capture template configuration.
Temporarily instantiates an engine with the provided configuration,
captures frames for the specified duration, and returns actual FPS metrics.
"""
engine = None
try:
# Validate engine type
if test_request.engine_type not in EngineRegistry.get_available_engines():
raise HTTPException(
status_code=400,
detail=f"Engine '{test_request.engine_type}' is not available on this system"
)
# Check if display is already being captured
locked_device_id = processor_manager.get_display_lock_info(test_request.display_index)
if locked_device_id:
# Get device info for better error message
try:
device = device_store.get_device(locked_device_id)
device_name = device.name
except Exception:
device_name = locked_device_id
raise HTTPException(
status_code=409,
detail=(
f"Display {test_request.display_index} is currently being captured by device "
f"'{device_name}'. Please stop the device processing before testing this template."
)
)
# Create engine (auto-initializes on first capture)
engine = EngineRegistry.create_engine(test_request.engine_type, test_request.engine_config)
# Run sustained capture test
logger.info(f"Starting {test_request.capture_duration}s capture test with {test_request.engine_type}")
frame_count = 0
total_capture_time = 0.0
last_frame = None
start_time = time.perf_counter()
end_time = start_time + test_request.capture_duration
while time.perf_counter() < end_time:
capture_start = time.perf_counter()
screen_capture = engine.capture_display(test_request.display_index)
capture_elapsed = time.perf_counter() - capture_start
# Skip if no new frame (screen unchanged)
if screen_capture is None:
continue
total_capture_time += capture_elapsed
frame_count += 1
last_frame = screen_capture
actual_duration = time.perf_counter() - start_time
logger.info(f"Captured {frame_count} frames in {actual_duration:.2f}s")
# Use the last captured frame for preview
if last_frame is None:
raise RuntimeError("No frames captured during test")
# Convert numpy array to PIL Image
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
else:
raise ValueError("Unexpected image format from engine")
# Create thumbnail (640px wide, maintain aspect ratio)
thumbnail_width = 640
aspect_ratio = pil_image.height / pil_image.width
thumbnail_height = int(thumbnail_width * aspect_ratio)
thumbnail = pil_image.copy()
thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS)
# Encode thumbnail as JPEG
img_buffer = io.BytesIO()
thumbnail.save(img_buffer, format='JPEG', quality=85)
img_buffer.seek(0)
thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}"
# Encode full-resolution image as JPEG
full_buffer = io.BytesIO()
pil_image.save(full_buffer, format='JPEG', quality=90)
full_buffer.seek(0)
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
full_data_uri = f"data:image/jpeg;base64,{full_b64}"
# Calculate metrics
actual_fps = frame_count / actual_duration if actual_duration > 0 else 0
avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0
width, height = pil_image.size
return TemplateTestResponse(
full_capture=CaptureImage(
image=thumbnail_data_uri,
full_image=full_data_uri,
width=width,
height=height,
thumbnail_width=thumbnail_width,
thumbnail_height=thumbnail_height,
),
border_extraction=None,
performance=PerformanceMetrics(
capture_duration_s=actual_duration,
frame_count=frame_count,
actual_fps=actual_fps,
avg_capture_time_ms=avg_capture_time_ms,
),
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
raise HTTPException(status_code=500, detail=f"Engine error: {str(e)}")
except Exception as e:
logger.error(f"Failed to test template: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
finally:
# Always cleanup engine
if engine:
try:
engine.cleanup()
except Exception as e:
logger.error(f"Error cleaning up test engine: {e}")
# ===== FILTER TYPE ENDPOINTS =====
@router.get("/api/v1/filters", response_model=FilterTypeListResponse, tags=["Filters"])
async def list_filter_types(_auth: AuthRequired):
"""List all available postprocessing filter types and their options schemas."""
all_filters = FilterRegistry.get_all()
responses = []
for filter_id, filter_cls in all_filters.items():
schema = filter_cls.get_options_schema()
responses.append(FilterTypeResponse(
filter_id=filter_cls.filter_id,
filter_name=filter_cls.filter_name,
options_schema=[
FilterOptionDefSchema(
key=opt.key,
label=opt.label,
type=opt.option_type,
default=opt.default,
min_value=opt.min_value,
max_value=opt.max_value,
step=opt.step,
)
for opt in schema
],
))
return FilterTypeListResponse(filters=responses, count=len(responses))
# ===== POSTPROCESSING TEMPLATE ENDPOINTS =====
def _pp_template_to_response(t) -> PostprocessingTemplateResponse:
"""Convert a PostprocessingTemplate to its API response."""
return PostprocessingTemplateResponse(
id=t.id,
name=t.name,
filters=[FilterInstanceSchema(filter_id=f.filter_id, options=f.options) for f in t.filters],
created_at=t.created_at,
updated_at=t.updated_at,
description=t.description,
)
@router.get("/api/v1/postprocessing-templates", response_model=PostprocessingTemplateListResponse, tags=["Postprocessing Templates"])
async def list_pp_templates(
_auth: AuthRequired,
store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""List all postprocessing templates."""
try:
templates = store.get_all_templates()
responses = [_pp_template_to_response(t) for t in templates]
return PostprocessingTemplateListResponse(templates=responses, count=len(responses))
except Exception as e:
logger.error(f"Failed to list postprocessing templates: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/postprocessing-templates", response_model=PostprocessingTemplateResponse, tags=["Postprocessing Templates"], status_code=201)
async def create_pp_template(
data: PostprocessingTemplateCreate,
_auth: AuthRequired,
store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""Create a new postprocessing template."""
try:
filters = [FilterInstance(f.filter_id, f.options) for f in data.filters]
template = store.create_template(
name=data.name,
filters=filters,
description=data.description,
)
return _pp_template_to_response(template)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to create postprocessing template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/postprocessing-templates/{template_id}", response_model=PostprocessingTemplateResponse, tags=["Postprocessing Templates"])
async def get_pp_template(
template_id: str,
_auth: AuthRequired,
store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""Get postprocessing template by ID."""
try:
template = store.get_template(template_id)
return _pp_template_to_response(template)
except ValueError:
raise HTTPException(status_code=404, detail=f"Postprocessing template {template_id} not found")
@router.put("/api/v1/postprocessing-templates/{template_id}", response_model=PostprocessingTemplateResponse, tags=["Postprocessing Templates"])
async def update_pp_template(
template_id: str,
data: PostprocessingTemplateUpdate,
_auth: AuthRequired,
store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""Update a postprocessing template."""
try:
filters = [FilterInstance(f.filter_id, f.options) for f in data.filters] if data.filters is not None else None
template = store.update_template(
template_id=template_id,
name=data.name,
filters=filters,
description=data.description,
)
return _pp_template_to_response(template)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to update postprocessing template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/postprocessing-templates/{template_id}", status_code=204, tags=["Postprocessing Templates"])
async def delete_pp_template(
template_id: str,
_auth: AuthRequired,
store: PostprocessingTemplateStore = Depends(get_pp_template_store),
stream_store: PictureStreamStore = Depends(get_picture_stream_store),
):
"""Delete a postprocessing template."""
try:
# Check if any picture stream references this template
if store.is_referenced_by(template_id, stream_store):
raise HTTPException(
status_code=409,
detail="Cannot delete postprocessing template: it is referenced by one or more picture streams. "
"Please reassign those streams before deleting.",
)
store.delete_template(template_id)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to delete postprocessing template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/postprocessing-templates/{template_id}/test", response_model=TemplateTestResponse, tags=["Postprocessing Templates"])
async def test_pp_template(
template_id: str,
test_request: PPTemplateTestRequest,
_auth: AuthRequired,
pp_store: PostprocessingTemplateStore = Depends(get_pp_template_store),
stream_store: PictureStreamStore = Depends(get_picture_stream_store),
template_store: TemplateStore = Depends(get_template_store),
processor_manager: ProcessorManager = Depends(get_processor_manager),
device_store: DeviceStore = Depends(get_device_store),
):
"""Test a postprocessing template by capturing from a source stream and applying filters."""
engine = None
try:
# Get the PP template
try:
pp_template = pp_store.get_template(template_id)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
# Resolve source stream chain to get the raw stream
try:
chain = stream_store.resolve_stream_chain(test_request.source_stream_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
raw_stream = chain["raw_stream"]
if raw_stream.stream_type == "static_image":
# Static image: load directly
from pathlib import Path
source = raw_stream.image_source
start_time = time.perf_counter()
if source.startswith(("http://", "https://")):
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.get(source)
resp.raise_for_status()
pil_image = Image.open(io.BytesIO(resp.content)).convert("RGB")
else:
path = Path(source)
if not path.exists():
raise HTTPException(status_code=400, detail=f"Image file not found: {source}")
pil_image = Image.open(path).convert("RGB")
actual_duration = time.perf_counter() - start_time
frame_count = 1
total_capture_time = actual_duration
else:
# Raw capture stream: use engine
try:
capture_template = template_store.get_template(raw_stream.capture_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Capture template not found: {raw_stream.capture_template_id}",
)
display_index = raw_stream.display_index
if capture_template.engine_type not in EngineRegistry.get_available_engines():
raise HTTPException(
status_code=400,
detail=f"Engine '{capture_template.engine_type}' is not available on this system",
)
locked_device_id = processor_manager.get_display_lock_info(display_index)
if locked_device_id:
try:
device = device_store.get_device(locked_device_id)
device_name = device.name
except Exception:
device_name = locked_device_id
raise HTTPException(
status_code=409,
detail=f"Display {display_index} is currently being captured by device '{device_name}'. "
f"Please stop the device processing before testing.",
)
engine = EngineRegistry.create_engine(capture_template.engine_type, capture_template.engine_config)
logger.info(f"Starting {test_request.capture_duration}s PP template test for {template_id} using stream {test_request.source_stream_id}")
frame_count = 0
total_capture_time = 0.0
last_frame = None
start_time = time.perf_counter()
end_time = start_time + test_request.capture_duration
while time.perf_counter() < end_time:
capture_start = time.perf_counter()
screen_capture = engine.capture_display(display_index)
capture_elapsed = time.perf_counter() - capture_start
if screen_capture is None:
continue
total_capture_time += capture_elapsed
frame_count += 1
last_frame = screen_capture
actual_duration = time.perf_counter() - start_time
if last_frame is None:
raise RuntimeError("No frames captured during test")
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
else:
raise ValueError("Unexpected image format from engine")
# Create thumbnail
thumbnail_width = 640
aspect_ratio = pil_image.height / pil_image.width
thumbnail_height = int(thumbnail_width * aspect_ratio)
thumbnail = pil_image.copy()
thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS)
# Apply postprocessing filters
if pp_template.filters:
pool = ImagePool()
def apply_filters(img):
arr = np.array(img)
for fi in pp_template.filters:
f = FilterRegistry.create_instance(fi.filter_id, fi.options)
result = f.process_image(arr, pool)
if result is not None:
arr = result
return Image.fromarray(arr)
thumbnail = apply_filters(thumbnail)
pil_image = apply_filters(pil_image)
# Encode thumbnail
img_buffer = io.BytesIO()
thumbnail.save(img_buffer, format='JPEG', quality=85)
img_buffer.seek(0)
thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}"
# Encode full-resolution image
full_buffer = io.BytesIO()
pil_image.save(full_buffer, format='JPEG', quality=90)
full_buffer.seek(0)
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
full_data_uri = f"data:image/jpeg;base64,{full_b64}"
actual_fps = frame_count / actual_duration if actual_duration > 0 else 0
avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0
width, height = pil_image.size
return TemplateTestResponse(
full_capture=CaptureImage(
image=thumbnail_data_uri,
full_image=full_data_uri,
width=width,
height=height,
thumbnail_width=thumbnail_width,
thumbnail_height=thumbnail_height,
),
border_extraction=None,
performance=PerformanceMetrics(
capture_duration_s=actual_duration,
frame_count=frame_count,
actual_fps=actual_fps,
avg_capture_time_ms=avg_capture_time_ms,
),
)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Postprocessing template test failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
if engine:
try:
engine.release()
except Exception:
pass
# ===== PICTURE STREAM ENDPOINTS =====
def _stream_to_response(s) -> PictureStreamResponse:
"""Convert a PictureStream to its API response."""
return PictureStreamResponse(
id=s.id,
name=s.name,
stream_type=s.stream_type,
display_index=s.display_index,
capture_template_id=s.capture_template_id,
target_fps=s.target_fps,
source_stream_id=s.source_stream_id,
postprocessing_template_id=s.postprocessing_template_id,
image_source=s.image_source,
created_at=s.created_at,
updated_at=s.updated_at,
description=s.description,
)
@router.get("/api/v1/picture-streams", response_model=PictureStreamListResponse, tags=["Picture Streams"])
async def list_picture_streams(
_auth: AuthRequired,
store: PictureStreamStore = Depends(get_picture_stream_store),
):
"""List all picture streams."""
try:
streams = store.get_all_streams()
responses = [_stream_to_response(s) for s in streams]
return PictureStreamListResponse(streams=responses, count=len(responses))
except Exception as e:
logger.error(f"Failed to list picture streams: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/picture-streams/validate-image", response_model=ImageValidateResponse, tags=["Picture Streams"])
async def validate_image(
data: ImageValidateRequest,
_auth: AuthRequired,
):
"""Validate an image source (URL or file path) and return a preview thumbnail."""
try:
from pathlib import Path
source = data.image_source.strip()
if not source:
return ImageValidateResponse(valid=False, error="Image source is empty")
if source.startswith(("http://", "https://")):
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
response = await client.get(source)
response.raise_for_status()
pil_image = Image.open(io.BytesIO(response.content))
else:
path = Path(source)
if not path.exists():
return ImageValidateResponse(valid=False, error=f"File not found: {source}")
pil_image = Image.open(path)
pil_image = pil_image.convert("RGB")
width, height = pil_image.size
# Create thumbnail preview (max 320px wide)
thumb = pil_image.copy()
thumb.thumbnail((320, 320), Image.Resampling.LANCZOS)
buf = io.BytesIO()
thumb.save(buf, format="JPEG", quality=80)
buf.seek(0)
preview = f"data:image/jpeg;base64,{base64.b64encode(buf.getvalue()).decode()}"
return ImageValidateResponse(
valid=True, width=width, height=height, preview=preview
)
except httpx.HTTPStatusError as e:
return ImageValidateResponse(valid=False, error=f"HTTP {e.response.status_code}: {e.response.reason_phrase}")
except httpx.RequestError as e:
return ImageValidateResponse(valid=False, error=f"Request failed: {e}")
except Exception as e:
return ImageValidateResponse(valid=False, error=str(e))
@router.get("/api/v1/picture-streams/full-image", tags=["Picture Streams"])
async def get_full_image(
_auth: AuthRequired,
source: str = Query(..., description="Image URL or local file path"),
):
"""Serve the full-resolution image for lightbox preview."""
from pathlib import Path
try:
if source.startswith(("http://", "https://")):
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
response = await client.get(source)
response.raise_for_status()
pil_image = Image.open(io.BytesIO(response.content))
else:
path = Path(source)
if not path.exists():
raise HTTPException(status_code=404, detail="File not found")
pil_image = Image.open(path)
pil_image = pil_image.convert("RGB")
buf = io.BytesIO()
pil_image.save(buf, format="JPEG", quality=90)
buf.seek(0)
return Response(content=buf.getvalue(), media_type="image/jpeg")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/api/v1/picture-streams", response_model=PictureStreamResponse, tags=["Picture Streams"], status_code=201)
async def create_picture_stream(
data: PictureStreamCreate,
_auth: AuthRequired,
store: PictureStreamStore = Depends(get_picture_stream_store),
template_store: TemplateStore = Depends(get_template_store),
pp_store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""Create a new picture stream."""
try:
# Validate referenced entities
if data.stream_type == "raw" and data.capture_template_id:
try:
template_store.get_template(data.capture_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Capture template not found: {data.capture_template_id}",
)
if data.stream_type == "processed" and data.postprocessing_template_id:
try:
pp_store.get_template(data.postprocessing_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Postprocessing template not found: {data.postprocessing_template_id}",
)
stream = store.create_stream(
name=data.name,
stream_type=data.stream_type,
display_index=data.display_index,
capture_template_id=data.capture_template_id,
target_fps=data.target_fps,
source_stream_id=data.source_stream_id,
postprocessing_template_id=data.postprocessing_template_id,
image_source=data.image_source,
description=data.description,
)
return _stream_to_response(stream)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to create picture stream: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/picture-streams/{stream_id}", response_model=PictureStreamResponse, tags=["Picture Streams"])
async def get_picture_stream(
stream_id: str,
_auth: AuthRequired,
store: PictureStreamStore = Depends(get_picture_stream_store),
):
"""Get picture stream by ID."""
try:
stream = store.get_stream(stream_id)
return _stream_to_response(stream)
except ValueError:
raise HTTPException(status_code=404, detail=f"Picture stream {stream_id} not found")
@router.put("/api/v1/picture-streams/{stream_id}", response_model=PictureStreamResponse, tags=["Picture Streams"])
async def update_picture_stream(
stream_id: str,
data: PictureStreamUpdate,
_auth: AuthRequired,
store: PictureStreamStore = Depends(get_picture_stream_store),
):
"""Update a picture stream."""
try:
stream = store.update_stream(
stream_id=stream_id,
name=data.name,
display_index=data.display_index,
capture_template_id=data.capture_template_id,
target_fps=data.target_fps,
source_stream_id=data.source_stream_id,
postprocessing_template_id=data.postprocessing_template_id,
image_source=data.image_source,
description=data.description,
)
return _stream_to_response(stream)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to update picture stream: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/picture-streams/{stream_id}", status_code=204, tags=["Picture Streams"])
async def delete_picture_stream(
stream_id: str,
_auth: AuthRequired,
store: PictureStreamStore = Depends(get_picture_stream_store),
device_store: DeviceStore = Depends(get_device_store),
):
"""Delete a picture stream."""
try:
# Check if any device references this stream
if store.is_referenced_by_device(stream_id, device_store):
raise HTTPException(
status_code=409,
detail="Cannot delete picture stream: it is assigned to one or more devices. "
"Please reassign those devices before deleting.",
)
store.delete_stream(stream_id)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to delete picture stream: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/picture-streams/{stream_id}/test", response_model=TemplateTestResponse, tags=["Picture Streams"])
async def test_picture_stream(
stream_id: str,
test_request: PictureStreamTestRequest,
_auth: AuthRequired,
store: PictureStreamStore = Depends(get_picture_stream_store),
template_store: TemplateStore = Depends(get_template_store),
processor_manager: ProcessorManager = Depends(get_processor_manager),
device_store: DeviceStore = Depends(get_device_store),
pp_store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""Test a picture stream by resolving its chain and running a capture test.
Resolves the stream chain to the raw stream, captures frames,
and returns preview image + performance metrics.
For processed streams, applies postprocessing (gamma, saturation, brightness)
to the preview image.
"""
engine = None
try:
# Resolve stream chain
try:
chain = store.resolve_stream_chain(stream_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
raw_stream = chain["raw_stream"]
if raw_stream.stream_type == "static_image":
# Static image stream: load image directly, no engine needed
from pathlib import Path
source = raw_stream.image_source
start_time = time.perf_counter()
if source.startswith(("http://", "https://")):
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.get(source)
resp.raise_for_status()
pil_image = Image.open(io.BytesIO(resp.content)).convert("RGB")
else:
path = Path(source)
if not path.exists():
raise HTTPException(status_code=400, detail=f"Image file not found: {source}")
pil_image = Image.open(path).convert("RGB")
actual_duration = time.perf_counter() - start_time
frame_count = 1
total_capture_time = actual_duration
else:
# Raw capture stream: use engine
try:
capture_template = template_store.get_template(raw_stream.capture_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Capture template not found: {raw_stream.capture_template_id}",
)
display_index = raw_stream.display_index
if capture_template.engine_type not in EngineRegistry.get_available_engines():
raise HTTPException(
status_code=400,
detail=f"Engine '{capture_template.engine_type}' is not available on this system",
)
locked_device_id = processor_manager.get_display_lock_info(display_index)
if locked_device_id:
try:
device = device_store.get_device(locked_device_id)
device_name = device.name
except Exception:
device_name = locked_device_id
raise HTTPException(
status_code=409,
detail=f"Display {display_index} is currently being captured by device '{device_name}'. "
f"Please stop the device processing before testing.",
)
engine = EngineRegistry.create_engine(capture_template.engine_type, capture_template.engine_config)
logger.info(f"Starting {test_request.capture_duration}s stream test for {stream_id}")
frame_count = 0
total_capture_time = 0.0
last_frame = None
start_time = time.perf_counter()
end_time = start_time + test_request.capture_duration
while time.perf_counter() < end_time:
capture_start = time.perf_counter()
screen_capture = engine.capture_display(display_index)
capture_elapsed = time.perf_counter() - capture_start
if screen_capture is None:
continue
total_capture_time += capture_elapsed
frame_count += 1
last_frame = screen_capture
actual_duration = time.perf_counter() - start_time
if last_frame is None:
raise RuntimeError("No frames captured during test")
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
else:
raise ValueError("Unexpected image format from engine")
# Create thumbnail
thumbnail_width = 640
aspect_ratio = pil_image.height / pil_image.width
thumbnail_height = int(thumbnail_width * aspect_ratio)
thumbnail = pil_image.copy()
thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS)
# Apply postprocessing filters if this is a processed stream
pp_template_ids = chain["postprocessing_template_ids"]
if pp_template_ids:
try:
pp_template = pp_store.get_template(pp_template_ids[0])
pool = ImagePool()
def apply_filters(img):
arr = np.array(img)
for fi in pp_template.filters:
f = FilterRegistry.create_instance(fi.filter_id, fi.options)
result = f.process_image(arr, pool)
if result is not None:
arr = result
return Image.fromarray(arr)
thumbnail = apply_filters(thumbnail)
pil_image = apply_filters(pil_image)
except ValueError:
logger.warning(f"PP template {pp_template_ids[0]} not found, skipping postprocessing preview")
# Encode thumbnail
img_buffer = io.BytesIO()
thumbnail.save(img_buffer, format='JPEG', quality=85)
img_buffer.seek(0)
thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}"
# Encode full-resolution image
full_buffer = io.BytesIO()
pil_image.save(full_buffer, format='JPEG', quality=90)
full_buffer.seek(0)
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
full_data_uri = f"data:image/jpeg;base64,{full_b64}"
actual_fps = frame_count / actual_duration if actual_duration > 0 else 0
avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0
width, height = pil_image.size
return TemplateTestResponse(
full_capture=CaptureImage(
image=thumbnail_data_uri,
full_image=full_data_uri,
width=width,
height=height,
thumbnail_width=thumbnail_width,
thumbnail_height=thumbnail_height,
),
border_extraction=None,
performance=PerformanceMetrics(
capture_duration_s=actual_duration,
frame_count=frame_count,
actual_fps=actual_fps,
avg_capture_time_ms=avg_capture_time_ms,
),
)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
raise HTTPException(status_code=500, detail=f"Engine error: {str(e)}")
except Exception as e:
logger.error(f"Failed to test picture stream: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
finally:
if engine:
try:
engine.cleanup()
except Exception as e:
logger.error(f"Error cleaning up test engine: {e}")