"""API routes and endpoints.""" import base64 import io import sys import time from datetime import datetime from typing import List, Dict, Any import httpx import numpy as np from PIL import Image from fastapi import APIRouter, HTTPException, Depends from wled_controller import __version__ from wled_controller.api.auth import AuthRequired from wled_controller.api.schemas import ( HealthResponse, VersionResponse, DisplayListResponse, DisplayInfo, DeviceCreate, DeviceUpdate, DeviceResponse, DeviceListResponse, ProcessingSettings as ProcessingSettingsSchema, Calibration as CalibrationSchema, CalibrationTestModeRequest, CalibrationTestModeResponse, ProcessingState, MetricsResponse, TemplateCreate, TemplateUpdate, TemplateResponse, TemplateListResponse, EngineInfo, EngineListResponse, TemplateTestRequest, TemplateTestResponse, CaptureImage, BorderExtraction, PerformanceMetrics, FilterInstanceSchema, FilterOptionDefSchema, FilterTypeResponse, FilterTypeListResponse, PostprocessingTemplateCreate, PostprocessingTemplateUpdate, PostprocessingTemplateResponse, PostprocessingTemplateListResponse, PictureStreamCreate, PictureStreamUpdate, PictureStreamResponse, PictureStreamListResponse, PictureStreamTestRequest, PPTemplateTestRequest, ) from wled_controller.config import get_config from wled_controller.core.processor_manager import ProcessorManager, ProcessingSettings from wled_controller.core.calibration import ( calibration_from_dict, calibration_to_dict, ) from wled_controller.storage import DeviceStore from wled_controller.storage.template_store import TemplateStore from wled_controller.storage.postprocessing_template_store import PostprocessingTemplateStore from wled_controller.storage.picture_stream_store import PictureStreamStore from wled_controller.core.capture_engines import EngineRegistry from wled_controller.core.filters import FilterRegistry, FilterInstance, ImagePool from wled_controller.utils import get_logger from wled_controller.core.screen_capture import get_available_displays logger = get_logger(__name__) router = APIRouter() # Global instances (initialized in main.py) _device_store: DeviceStore | None = None _template_store: TemplateStore | None = None _pp_template_store: PostprocessingTemplateStore | None = None _picture_stream_store: PictureStreamStore | None = None _processor_manager: ProcessorManager | None = None def get_device_store() -> DeviceStore: """Get device store dependency.""" if _device_store is None: raise RuntimeError("Device store not initialized") return _device_store def get_template_store() -> TemplateStore: """Get template store dependency.""" if _template_store is None: raise RuntimeError("Template store not initialized") return _template_store def get_pp_template_store() -> PostprocessingTemplateStore: """Get postprocessing template store dependency.""" if _pp_template_store is None: raise RuntimeError("Postprocessing template store not initialized") return _pp_template_store def get_picture_stream_store() -> PictureStreamStore: """Get picture stream store dependency.""" if _picture_stream_store is None: raise RuntimeError("Picture stream store not initialized") return _picture_stream_store def get_processor_manager() -> ProcessorManager: """Get processor manager dependency.""" if _processor_manager is None: raise RuntimeError("Processor manager not initialized") return _processor_manager def init_dependencies( device_store: DeviceStore, template_store: TemplateStore, processor_manager: ProcessorManager, pp_template_store: PostprocessingTemplateStore | None = None, picture_stream_store: PictureStreamStore | None = None, ): """Initialize global dependencies.""" global _device_store, _template_store, _processor_manager, _pp_template_store, _picture_stream_store _device_store = device_store _template_store = template_store _processor_manager = processor_manager _pp_template_store = pp_template_store _picture_stream_store = picture_stream_store @router.get("/health", response_model=HealthResponse, tags=["Health"]) async def health_check(): """Check service health status. Returns basic health information including status, version, and timestamp. """ logger.info("Health check requested") return HealthResponse( status="healthy", timestamp=datetime.utcnow(), version=__version__, ) @router.get("/api/v1/version", response_model=VersionResponse, tags=["Info"]) async def get_version(): """Get version information. Returns application version, Python version, and API version. """ logger.info("Version info requested") return VersionResponse( version=__version__, python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", api_version="v1", ) @router.get("/api/v1/config/displays", response_model=DisplayListResponse, tags=["Config"]) async def get_displays(_: AuthRequired): """Get list of available displays. Returns information about all available monitors/displays that can be captured. """ logger.info("Listing available displays") try: # Get available displays with all metadata (name, refresh rate, etc.) display_dataclasses = get_available_displays() # Convert dataclass DisplayInfo to Pydantic DisplayInfo displays = [ DisplayInfo( index=d.index, name=d.name, width=d.width, height=d.height, x=d.x, y=d.y, is_primary=d.is_primary, refresh_rate=d.refresh_rate, ) for d in display_dataclasses ] logger.info(f"Found {len(displays)} displays") return DisplayListResponse( displays=displays, count=len(displays), ) except Exception as e: logger.error(f"Failed to get displays: {e}") raise HTTPException( status_code=500, detail=f"Failed to retrieve display information: {str(e)}" ) # ===== DEVICE MANAGEMENT ENDPOINTS ===== @router.post("/api/v1/devices", response_model=DeviceResponse, tags=["Devices"], status_code=201) async def create_device( device_data: DeviceCreate, _auth: AuthRequired, store: DeviceStore = Depends(get_device_store), manager: ProcessorManager = Depends(get_processor_manager), template_store: TemplateStore = Depends(get_template_store), ): """Create and attach a new WLED device.""" try: logger.info(f"Creating device: {device_data.name}") # Validate WLED device is reachable before adding device_url = device_data.url.rstrip("/") try: async with httpx.AsyncClient(timeout=5) as client: response = await client.get(f"{device_url}/json/info") response.raise_for_status() wled_info = response.json() wled_led_count = wled_info.get("leds", {}).get("count") if not wled_led_count or wled_led_count < 1: raise HTTPException( status_code=422, detail=f"WLED device at {device_url} reported invalid LED count: {wled_led_count}" ) logger.info( f"WLED device reachable: {wled_info.get('name', 'Unknown')} " f"v{wled_info.get('ver', '?')} ({wled_led_count} LEDs)" ) except httpx.ConnectError: raise HTTPException( status_code=422, detail=f"Cannot reach WLED device at {device_url}. Check the URL and ensure the device is powered on." ) except httpx.TimeoutException: raise HTTPException( status_code=422, detail=f"Connection to {device_url} timed out. Check network connectivity." ) except Exception as e: raise HTTPException( status_code=422, detail=f"Failed to connect to WLED device at {device_url}: {e}" ) # Resolve capture template: use requested ID if valid, else first available capture_template_id = None if device_data.capture_template_id: try: template_store.get_template(device_data.capture_template_id) capture_template_id = device_data.capture_template_id except ValueError: logger.warning( f"Requested template '{device_data.capture_template_id}' not found, using first available" ) if not capture_template_id: all_templates = template_store.get_all_templates() if all_templates: capture_template_id = all_templates[0].id else: raise HTTPException( status_code=500, detail="No capture templates available. Please create one first." ) # Create device in storage (LED count auto-detected from WLED) device = store.create_device( name=device_data.name, url=device_data.url, led_count=wled_led_count, capture_template_id=capture_template_id, ) # Add to processor manager manager.add_device( device_id=device.id, device_url=device.url, led_count=device.led_count, settings=device.settings, calibration=device.calibration, ) return DeviceResponse( id=device.id, name=device.name, url=device.url, led_count=device.led_count, enabled=device.enabled, status="disconnected", settings=ProcessingSettingsSchema( display_index=device.settings.display_index, fps=device.settings.fps, border_width=device.settings.border_width, interpolation_mode=device.settings.interpolation_mode, brightness=device.settings.brightness, state_check_interval=device.settings.state_check_interval, ), calibration=CalibrationSchema(**calibration_to_dict(device.calibration)), capture_template_id=device.capture_template_id, picture_stream_id=device.picture_stream_id, created_at=device.created_at, updated_at=device.updated_at, ) except Exception as e: logger.error(f"Failed to create device: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/v1/devices", response_model=DeviceListResponse, tags=["Devices"]) async def list_devices( _auth: AuthRequired, store: DeviceStore = Depends(get_device_store), ): """List all attached WLED devices.""" try: devices = store.get_all_devices() device_responses = [ DeviceResponse( id=device.id, name=device.name, url=device.url, led_count=device.led_count, enabled=device.enabled, status="disconnected", settings=ProcessingSettingsSchema( display_index=device.settings.display_index, fps=device.settings.fps, border_width=device.settings.border_width, brightness=device.settings.brightness, state_check_interval=device.settings.state_check_interval, ), calibration=CalibrationSchema(**calibration_to_dict(device.calibration)), capture_template_id=device.capture_template_id, picture_stream_id=device.picture_stream_id, created_at=device.created_at, updated_at=device.updated_at, ) for device in devices ] return DeviceListResponse(devices=device_responses, count=len(device_responses)) except Exception as e: logger.error(f"Failed to list devices: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/v1/devices/{device_id}", response_model=DeviceResponse, tags=["Devices"]) async def get_device( device_id: str, _auth: AuthRequired, store: DeviceStore = Depends(get_device_store), manager: ProcessorManager = Depends(get_processor_manager), ): """Get device details by ID.""" device = store.get_device(device_id) if not device: raise HTTPException(status_code=404, detail=f"Device {device_id} not found") # Determine status status = "connected" if manager.is_processing(device_id) else "disconnected" return DeviceResponse( id=device.id, name=device.name, url=device.url, led_count=device.led_count, enabled=device.enabled, status=status, settings=ProcessingSettingsSchema( display_index=device.settings.display_index, fps=device.settings.fps, border_width=device.settings.border_width, brightness=device.settings.brightness, state_check_interval=device.settings.state_check_interval, ), calibration=CalibrationSchema(**calibration_to_dict(device.calibration)), capture_template_id=device.capture_template_id, created_at=device.created_at, updated_at=device.updated_at, ) @router.put("/api/v1/devices/{device_id}", response_model=DeviceResponse, tags=["Devices"]) async def update_device( device_id: str, update_data: DeviceUpdate, _auth: AuthRequired, store: DeviceStore = Depends(get_device_store), manager: ProcessorManager = Depends(get_processor_manager), ): """Update device information.""" try: # Check if stream or template changed and device is processing (for hot-swap) old_device = store.get_device(device_id) stream_changed = ( update_data.picture_stream_id is not None and update_data.picture_stream_id != old_device.picture_stream_id ) template_changed = ( update_data.capture_template_id is not None and update_data.capture_template_id != old_device.capture_template_id ) was_processing = manager.is_processing(device_id) # Update device device = store.update_device( device_id=device_id, name=update_data.name, url=update_data.url, enabled=update_data.enabled, capture_template_id=update_data.capture_template_id, picture_stream_id=update_data.picture_stream_id, ) # Hot-swap: If stream/template changed and device was processing, restart it if (stream_changed or template_changed) and was_processing: logger.info(f"Hot-swapping stream/template for device {device_id}") try: # Stop current processing await manager.stop_processing(device_id) # Update processor with new settings manager.remove_device(device_id) manager.add_device( device_id=device.id, device_url=device.url, led_count=device.led_count, settings=device.settings, calibration=device.calibration, capture_template_id=device.capture_template_id, picture_stream_id=device.picture_stream_id, ) # Restart processing await manager.start_processing(device_id) logger.info(f"Successfully hot-swapped stream/template for device {device_id}") except Exception as e: logger.error(f"Error during template hot-swap: {e}") # Device is stopped but updated - user can manually restart return DeviceResponse( id=device.id, name=device.name, url=device.url, led_count=device.led_count, enabled=device.enabled, status="disconnected", settings=ProcessingSettingsSchema( display_index=device.settings.display_index, fps=device.settings.fps, border_width=device.settings.border_width, interpolation_mode=device.settings.interpolation_mode, brightness=device.settings.brightness, state_check_interval=device.settings.state_check_interval, ), calibration=CalibrationSchema(**calibration_to_dict(device.calibration)), capture_template_id=device.capture_template_id, picture_stream_id=device.picture_stream_id, created_at=device.created_at, updated_at=device.updated_at, ) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: logger.error(f"Failed to update device: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.delete("/api/v1/devices/{device_id}", status_code=204, tags=["Devices"]) async def delete_device( device_id: str, _auth: AuthRequired, store: DeviceStore = Depends(get_device_store), manager: ProcessorManager = Depends(get_processor_manager), ): """Delete/detach a device.""" try: # Stop processing if running if manager.is_processing(device_id): await manager.stop_processing(device_id) # Remove from manager manager.remove_device(device_id) # Delete from storage store.delete_device(device_id) logger.info(f"Deleted device {device_id}") except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: logger.error(f"Failed to delete device: {e}") raise HTTPException(status_code=500, detail=str(e)) # ===== PROCESSING CONTROL ENDPOINTS ===== @router.post("/api/v1/devices/{device_id}/start", tags=["Processing"]) async def start_processing( device_id: str, _auth: AuthRequired, store: DeviceStore = Depends(get_device_store), manager: ProcessorManager = Depends(get_processor_manager), ): """Start screen processing for a device.""" try: # Verify device exists device = store.get_device(device_id) if not device: raise HTTPException(status_code=404, detail=f"Device {device_id} not found") await manager.start_processing(device_id) logger.info(f"Started processing for device {device_id}") return {"status": "started", "device_id": device_id} except RuntimeError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to start processing: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/v1/devices/{device_id}/stop", tags=["Processing"]) async def stop_processing( device_id: str, _auth: AuthRequired, manager: ProcessorManager = Depends(get_processor_manager), ): """Stop screen processing for a device.""" try: await manager.stop_processing(device_id) logger.info(f"Stopped processing for device {device_id}") return {"status": "stopped", "device_id": device_id} except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: logger.error(f"Failed to stop processing: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/v1/devices/{device_id}/state", response_model=ProcessingState, tags=["Processing"]) async def get_processing_state( device_id: str, _auth: AuthRequired, manager: ProcessorManager = Depends(get_processor_manager), ): """Get current processing state for a device.""" try: state = manager.get_state(device_id) return ProcessingState(**state) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: logger.error(f"Failed to get state: {e}") raise HTTPException(status_code=500, detail=str(e)) # ===== SETTINGS ENDPOINTS ===== @router.get("/api/v1/devices/{device_id}/settings", response_model=ProcessingSettingsSchema, tags=["Settings"]) async def get_settings( device_id: str, _auth: AuthRequired, store: DeviceStore = Depends(get_device_store), ): """Get processing settings for a device.""" device = store.get_device(device_id) if not device: raise HTTPException(status_code=404, detail=f"Device {device_id} not found") return ProcessingSettingsSchema( display_index=device.settings.display_index, fps=device.settings.fps, border_width=device.settings.border_width, interpolation_mode=device.settings.interpolation_mode, brightness=device.settings.brightness, smoothing=device.settings.smoothing, state_check_interval=device.settings.state_check_interval, ) @router.put("/api/v1/devices/{device_id}/settings", response_model=ProcessingSettingsSchema, tags=["Settings"]) async def update_settings( device_id: str, settings: ProcessingSettingsSchema, _auth: AuthRequired, store: DeviceStore = Depends(get_device_store), manager: ProcessorManager = Depends(get_processor_manager), ): """Update processing settings for a device. Merges with existing settings so callers can send partial updates. """ try: # Get existing device to merge settings device = store.get_device(device_id) if not device: raise HTTPException(status_code=404, detail=f"Device {device_id} not found") existing = device.settings # Merge: use new values where provided, keep existing otherwise new_settings = ProcessingSettings( display_index=settings.display_index, fps=settings.fps, border_width=settings.border_width, interpolation_mode=settings.interpolation_mode, brightness=settings.color_correction.brightness if settings.color_correction else existing.brightness, gamma=settings.color_correction.gamma if settings.color_correction else existing.gamma, saturation=settings.color_correction.saturation if settings.color_correction else existing.saturation, smoothing=settings.smoothing, state_check_interval=settings.state_check_interval, ) # Update in storage device = store.update_device(device_id, settings=new_settings) # Update in manager if device exists try: manager.update_settings(device_id, new_settings) except ValueError: # Device not in manager yet, that's ok pass return ProcessingSettingsSchema( display_index=device.settings.display_index, fps=device.settings.fps, border_width=device.settings.border_width, interpolation_mode=device.settings.interpolation_mode, brightness=device.settings.brightness, state_check_interval=device.settings.state_check_interval, ) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: logger.error(f"Failed to update settings: {e}") raise HTTPException(status_code=500, detail=str(e)) # ===== CALIBRATION ENDPOINTS ===== @router.get("/api/v1/devices/{device_id}/calibration", response_model=CalibrationSchema, tags=["Calibration"]) async def get_calibration( device_id: str, _auth: AuthRequired, store: DeviceStore = Depends(get_device_store), ): """Get calibration configuration for a device.""" device = store.get_device(device_id) if not device: raise HTTPException(status_code=404, detail=f"Device {device_id} not found") return CalibrationSchema(**calibration_to_dict(device.calibration)) @router.put("/api/v1/devices/{device_id}/calibration", response_model=CalibrationSchema, tags=["Calibration"]) async def update_calibration( device_id: str, calibration_data: CalibrationSchema, _auth: AuthRequired, store: DeviceStore = Depends(get_device_store), manager: ProcessorManager = Depends(get_processor_manager), ): """Update calibration configuration for a device.""" try: # Convert schema to CalibrationConfig calibration_dict = calibration_data.model_dump() calibration = calibration_from_dict(calibration_dict) # Update in storage device = store.update_device(device_id, calibration=calibration) # Update in manager if device exists try: manager.update_calibration(device_id, calibration) except ValueError: # Device not in manager yet, that's ok pass return CalibrationSchema(**calibration_to_dict(device.calibration)) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to update calibration: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.put( "/api/v1/devices/{device_id}/calibration/test", response_model=CalibrationTestModeResponse, tags=["Calibration"], ) async def set_calibration_test_mode( device_id: str, body: CalibrationTestModeRequest, _auth: AuthRequired, store: DeviceStore = Depends(get_device_store), manager: ProcessorManager = Depends(get_processor_manager), ): """Toggle calibration test mode for specific edges. Send edges with colors to light them up, or empty edges dict to exit test mode. While test mode is active, screen capture processing is paused. """ try: device = store.get_device(device_id) if not device: raise HTTPException(status_code=404, detail=f"Device {device_id} not found") # Validate edge names and colors valid_edges = {"top", "right", "bottom", "left"} for edge_name, color in body.edges.items(): if edge_name not in valid_edges: raise HTTPException( status_code=400, detail=f"Invalid edge '{edge_name}'. Must be one of: {', '.join(valid_edges)}" ) if len(color) != 3 or not all(0 <= c <= 255 for c in color): raise HTTPException( status_code=400, detail=f"Invalid color for edge '{edge_name}'. Must be [R, G, B] with values 0-255." ) await manager.set_test_mode(device_id, body.edges) active_edges = list(body.edges.keys()) logger.info( f"Test mode {'activated' if active_edges else 'deactivated'} " f"for device {device_id}: {active_edges}" ) return CalibrationTestModeResponse( test_mode=len(active_edges) > 0, active_edges=active_edges, device_id=device_id, ) except HTTPException: raise except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: logger.error(f"Failed to set test mode: {e}") raise HTTPException(status_code=500, detail=str(e)) # ===== METRICS ENDPOINTS ===== @router.get("/api/v1/devices/{device_id}/metrics", response_model=MetricsResponse, tags=["Metrics"]) async def get_metrics( device_id: str, _auth: AuthRequired, manager: ProcessorManager = Depends(get_processor_manager), ): """Get processing metrics for a device.""" try: metrics = manager.get_metrics(device_id) return MetricsResponse(**metrics) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: logger.error(f"Failed to get metrics: {e}") raise HTTPException(status_code=500, detail=str(e)) # ===== CAPTURE TEMPLATE ENDPOINTS ===== @router.get("/api/v1/capture-templates", response_model=TemplateListResponse, tags=["Templates"]) async def list_templates( _auth: AuthRequired, template_store: TemplateStore = Depends(get_template_store), ): """List all capture templates.""" try: templates = template_store.get_all_templates() template_responses = [ TemplateResponse( id=t.id, name=t.name, engine_type=t.engine_type, engine_config=t.engine_config, created_at=t.created_at, updated_at=t.updated_at, description=t.description, ) for t in templates ] return TemplateListResponse( templates=template_responses, count=len(template_responses), ) except Exception as e: logger.error(f"Failed to list templates: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/v1/capture-templates", response_model=TemplateResponse, tags=["Templates"], status_code=201) async def create_template( template_data: TemplateCreate, _auth: AuthRequired, template_store: TemplateStore = Depends(get_template_store), ): """Create a new capture template.""" try: template = template_store.create_template( name=template_data.name, engine_type=template_data.engine_type, engine_config=template_data.engine_config, description=template_data.description, ) return TemplateResponse( id=template.id, name=template.name, engine_type=template.engine_type, engine_config=template.engine_config, created_at=template.created_at, updated_at=template.updated_at, description=template.description, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to create template: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/v1/capture-templates/{template_id}", response_model=TemplateResponse, tags=["Templates"]) async def get_template( template_id: str, _auth: AuthRequired, template_store: TemplateStore = Depends(get_template_store), ): """Get template by ID.""" try: template = template_store.get_template(template_id) except ValueError: raise HTTPException(status_code=404, detail=f"Template {template_id} not found") return TemplateResponse( id=template.id, name=template.name, engine_type=template.engine_type, engine_config=template.engine_config, created_at=template.created_at, updated_at=template.updated_at, description=template.description, ) @router.put("/api/v1/capture-templates/{template_id}", response_model=TemplateResponse, tags=["Templates"]) async def update_template( template_id: str, update_data: TemplateUpdate, _auth: AuthRequired, template_store: TemplateStore = Depends(get_template_store), ): """Update a template.""" try: template = template_store.update_template( template_id=template_id, name=update_data.name, engine_type=update_data.engine_type, engine_config=update_data.engine_config, description=update_data.description, ) return TemplateResponse( id=template.id, name=template.name, engine_type=template.engine_type, engine_config=template.engine_config, created_at=template.created_at, updated_at=template.updated_at, description=template.description, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to update template: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.delete("/api/v1/capture-templates/{template_id}", status_code=204, tags=["Templates"]) async def delete_template( template_id: str, _auth: AuthRequired, template_store: TemplateStore = Depends(get_template_store), device_store: DeviceStore = Depends(get_device_store), ): """Delete a template. Validates that no devices are currently using this template before deletion. """ try: # Check if any devices are using this template devices_using_template = [] for device in device_store.get_all_devices(): if device.capture_template_id == template_id: devices_using_template.append(device.name) if devices_using_template: device_list = ", ".join(devices_using_template) raise HTTPException( status_code=409, detail=f"Cannot delete template: it is currently assigned to the following device(s): {device_list}. " f"Please reassign these devices to a different template before deleting." ) # Proceed with deletion template_store.delete_template(template_id) except HTTPException: raise # Re-raise HTTP exceptions as-is except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to delete template: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/v1/capture-engines", response_model=EngineListResponse, tags=["Templates"]) async def list_engines(_auth: AuthRequired): """List available capture engines on this system. Returns all registered engines that are available on the current platform. """ try: available_engine_types = EngineRegistry.get_available_engines() engines = [] for engine_type in available_engine_types: engine_class = EngineRegistry.get_engine(engine_type) engines.append( EngineInfo( type=engine_type, name=engine_type.upper(), default_config=engine_class.get_default_config(), available=True, ) ) return EngineListResponse(engines=engines, count=len(engines)) except Exception as e: logger.error(f"Failed to list engines: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/v1/capture-templates/test", response_model=TemplateTestResponse, tags=["Templates"]) async def test_template( test_request: TemplateTestRequest, _auth: AuthRequired, processor_manager: ProcessorManager = Depends(get_processor_manager), device_store: DeviceStore = Depends(get_device_store), ): """Test a capture template configuration. Temporarily instantiates an engine with the provided configuration, captures frames for the specified duration, and returns actual FPS metrics. """ engine = None try: # Validate engine type if test_request.engine_type not in EngineRegistry.get_available_engines(): raise HTTPException( status_code=400, detail=f"Engine '{test_request.engine_type}' is not available on this system" ) # Check if display is already being captured locked_device_id = processor_manager.get_display_lock_info(test_request.display_index) if locked_device_id: # Get device info for better error message try: device = device_store.get_device(locked_device_id) device_name = device.name except Exception: device_name = locked_device_id raise HTTPException( status_code=409, detail=( f"Display {test_request.display_index} is currently being captured by device " f"'{device_name}'. Please stop the device processing before testing this template." ) ) # Create engine (auto-initializes on first capture) engine = EngineRegistry.create_engine(test_request.engine_type, test_request.engine_config) # Run sustained capture test logger.info(f"Starting {test_request.capture_duration}s capture test with {test_request.engine_type}") frame_count = 0 total_capture_time = 0.0 last_frame = None start_time = time.perf_counter() end_time = start_time + test_request.capture_duration while time.perf_counter() < end_time: capture_start = time.perf_counter() screen_capture = engine.capture_display(test_request.display_index) capture_elapsed = time.perf_counter() - capture_start total_capture_time += capture_elapsed frame_count += 1 last_frame = screen_capture actual_duration = time.perf_counter() - start_time logger.info(f"Captured {frame_count} frames in {actual_duration:.2f}s") # Use the last captured frame for preview if last_frame is None: raise RuntimeError("No frames captured during test") # Convert numpy array to PIL Image if isinstance(last_frame.image, np.ndarray): pil_image = Image.fromarray(last_frame.image) else: raise ValueError("Unexpected image format from engine") # Create thumbnail (640px wide, maintain aspect ratio) thumbnail_width = 640 aspect_ratio = pil_image.height / pil_image.width thumbnail_height = int(thumbnail_width * aspect_ratio) thumbnail = pil_image.copy() thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS) # Encode thumbnail as JPEG img_buffer = io.BytesIO() thumbnail.save(img_buffer, format='JPEG', quality=85) img_buffer.seek(0) thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8') thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}" # Encode full-resolution image as JPEG full_buffer = io.BytesIO() pil_image.save(full_buffer, format='JPEG', quality=90) full_buffer.seek(0) full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8') full_data_uri = f"data:image/jpeg;base64,{full_b64}" # Calculate metrics actual_fps = frame_count / actual_duration if actual_duration > 0 else 0 avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0 width, height = pil_image.size return TemplateTestResponse( full_capture=CaptureImage( image=thumbnail_data_uri, full_image=full_data_uri, width=width, height=height, thumbnail_width=thumbnail_width, thumbnail_height=thumbnail_height, ), border_extraction=None, performance=PerformanceMetrics( capture_duration_s=actual_duration, frame_count=frame_count, actual_fps=actual_fps, avg_capture_time_ms=avg_capture_time_ms, ), ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except RuntimeError as e: raise HTTPException(status_code=500, detail=f"Engine error: {str(e)}") except Exception as e: logger.error(f"Failed to test template: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) finally: # Always cleanup engine if engine: try: engine.cleanup() except Exception as e: logger.error(f"Error cleaning up test engine: {e}") # ===== FILTER TYPE ENDPOINTS ===== @router.get("/api/v1/filters", response_model=FilterTypeListResponse, tags=["Filters"]) async def list_filter_types(_auth: AuthRequired): """List all available postprocessing filter types and their options schemas.""" all_filters = FilterRegistry.get_all() responses = [] for filter_id, filter_cls in all_filters.items(): schema = filter_cls.get_options_schema() responses.append(FilterTypeResponse( filter_id=filter_cls.filter_id, filter_name=filter_cls.filter_name, options_schema=[ FilterOptionDefSchema( key=opt.key, label=opt.label, type=opt.option_type, default=opt.default, min_value=opt.min_value, max_value=opt.max_value, step=opt.step, ) for opt in schema ], )) return FilterTypeListResponse(filters=responses, count=len(responses)) # ===== POSTPROCESSING TEMPLATE ENDPOINTS ===== def _pp_template_to_response(t) -> PostprocessingTemplateResponse: """Convert a PostprocessingTemplate to its API response.""" return PostprocessingTemplateResponse( id=t.id, name=t.name, filters=[FilterInstanceSchema(filter_id=f.filter_id, options=f.options) for f in t.filters], created_at=t.created_at, updated_at=t.updated_at, description=t.description, ) @router.get("/api/v1/postprocessing-templates", response_model=PostprocessingTemplateListResponse, tags=["Postprocessing Templates"]) async def list_pp_templates( _auth: AuthRequired, store: PostprocessingTemplateStore = Depends(get_pp_template_store), ): """List all postprocessing templates.""" try: templates = store.get_all_templates() responses = [_pp_template_to_response(t) for t in templates] return PostprocessingTemplateListResponse(templates=responses, count=len(responses)) except Exception as e: logger.error(f"Failed to list postprocessing templates: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/v1/postprocessing-templates", response_model=PostprocessingTemplateResponse, tags=["Postprocessing Templates"], status_code=201) async def create_pp_template( data: PostprocessingTemplateCreate, _auth: AuthRequired, store: PostprocessingTemplateStore = Depends(get_pp_template_store), ): """Create a new postprocessing template.""" try: filters = [FilterInstance(f.filter_id, f.options) for f in data.filters] template = store.create_template( name=data.name, filters=filters, description=data.description, ) return _pp_template_to_response(template) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to create postprocessing template: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/v1/postprocessing-templates/{template_id}", response_model=PostprocessingTemplateResponse, tags=["Postprocessing Templates"]) async def get_pp_template( template_id: str, _auth: AuthRequired, store: PostprocessingTemplateStore = Depends(get_pp_template_store), ): """Get postprocessing template by ID.""" try: template = store.get_template(template_id) return _pp_template_to_response(template) except ValueError: raise HTTPException(status_code=404, detail=f"Postprocessing template {template_id} not found") @router.put("/api/v1/postprocessing-templates/{template_id}", response_model=PostprocessingTemplateResponse, tags=["Postprocessing Templates"]) async def update_pp_template( template_id: str, data: PostprocessingTemplateUpdate, _auth: AuthRequired, store: PostprocessingTemplateStore = Depends(get_pp_template_store), ): """Update a postprocessing template.""" try: filters = [FilterInstance(f.filter_id, f.options) for f in data.filters] if data.filters is not None else None template = store.update_template( template_id=template_id, name=data.name, filters=filters, description=data.description, ) return _pp_template_to_response(template) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to update postprocessing template: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.delete("/api/v1/postprocessing-templates/{template_id}", status_code=204, tags=["Postprocessing Templates"]) async def delete_pp_template( template_id: str, _auth: AuthRequired, store: PostprocessingTemplateStore = Depends(get_pp_template_store), stream_store: PictureStreamStore = Depends(get_picture_stream_store), ): """Delete a postprocessing template.""" try: # Check if any picture stream references this template if store.is_referenced_by(template_id, stream_store): raise HTTPException( status_code=409, detail="Cannot delete postprocessing template: it is referenced by one or more picture streams. " "Please reassign those streams before deleting.", ) store.delete_template(template_id) except HTTPException: raise except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to delete postprocessing template: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/v1/postprocessing-templates/{template_id}/test", response_model=TemplateTestResponse, tags=["Postprocessing Templates"]) async def test_pp_template( template_id: str, test_request: PPTemplateTestRequest, _auth: AuthRequired, pp_store: PostprocessingTemplateStore = Depends(get_pp_template_store), stream_store: PictureStreamStore = Depends(get_picture_stream_store), template_store: TemplateStore = Depends(get_template_store), processor_manager: ProcessorManager = Depends(get_processor_manager), device_store: DeviceStore = Depends(get_device_store), ): """Test a postprocessing template by capturing from a source stream and applying filters.""" engine = None try: # Get the PP template try: pp_template = pp_store.get_template(template_id) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) # Resolve source stream chain to get the raw stream try: chain = stream_store.resolve_stream_chain(test_request.source_stream_id) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) raw_stream = chain["raw_stream"] # Get capture template from raw stream try: capture_template = template_store.get_template(raw_stream.capture_template_id) except ValueError: raise HTTPException( status_code=400, detail=f"Capture template not found: {raw_stream.capture_template_id}", ) display_index = raw_stream.display_index # Validate engine if capture_template.engine_type not in EngineRegistry.get_available_engines(): raise HTTPException( status_code=400, detail=f"Engine '{capture_template.engine_type}' is not available on this system", ) # Check display lock locked_device_id = processor_manager.get_display_lock_info(display_index) if locked_device_id: try: device = device_store.get_device(locked_device_id) device_name = device.name except Exception: device_name = locked_device_id raise HTTPException( status_code=409, detail=f"Display {display_index} is currently being captured by device '{device_name}'. " f"Please stop the device processing before testing.", ) # Create engine and run test engine = EngineRegistry.create_engine(capture_template.engine_type, capture_template.engine_config) logger.info(f"Starting {test_request.capture_duration}s PP template test for {template_id} using stream {test_request.source_stream_id}") frame_count = 0 total_capture_time = 0.0 last_frame = None start_time = time.perf_counter() end_time = start_time + test_request.capture_duration while time.perf_counter() < end_time: capture_start = time.perf_counter() screen_capture = engine.capture_display(display_index) capture_elapsed = time.perf_counter() - capture_start total_capture_time += capture_elapsed frame_count += 1 last_frame = screen_capture actual_duration = time.perf_counter() - start_time if last_frame is None: raise RuntimeError("No frames captured during test") # Convert to PIL Image if isinstance(last_frame.image, np.ndarray): pil_image = Image.fromarray(last_frame.image) else: raise ValueError("Unexpected image format from engine") # Create thumbnail thumbnail_width = 640 aspect_ratio = pil_image.height / pil_image.width thumbnail_height = int(thumbnail_width * aspect_ratio) thumbnail = pil_image.copy() thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS) # Apply postprocessing filters if pp_template.filters: pool = ImagePool() def apply_filters(img): arr = np.array(img) for fi in pp_template.filters: f = FilterRegistry.create_instance(fi.filter_id, fi.options) result = f.process_image(arr, pool) if result is not None: arr = result return Image.fromarray(arr) thumbnail = apply_filters(thumbnail) pil_image = apply_filters(pil_image) # Encode thumbnail img_buffer = io.BytesIO() thumbnail.save(img_buffer, format='JPEG', quality=85) img_buffer.seek(0) thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8') thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}" # Encode full-resolution image full_buffer = io.BytesIO() pil_image.save(full_buffer, format='JPEG', quality=90) full_buffer.seek(0) full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8') full_data_uri = f"data:image/jpeg;base64,{full_b64}" actual_fps = frame_count / actual_duration if actual_duration > 0 else 0 avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0 width, height = pil_image.size return TemplateTestResponse( full_capture=CaptureImage( image=thumbnail_data_uri, full_image=full_data_uri, width=width, height=height, thumbnail_width=thumbnail_width, thumbnail_height=thumbnail_height, ), border_extraction=None, performance=PerformanceMetrics( capture_duration_s=actual_duration, frame_count=frame_count, actual_fps=actual_fps, avg_capture_time_ms=avg_capture_time_ms, ), ) except HTTPException: raise except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Postprocessing template test failed: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: if engine: try: engine.release() except Exception: pass # ===== PICTURE STREAM ENDPOINTS ===== def _stream_to_response(s) -> PictureStreamResponse: """Convert a PictureStream to its API response.""" return PictureStreamResponse( id=s.id, name=s.name, stream_type=s.stream_type, display_index=s.display_index, capture_template_id=s.capture_template_id, target_fps=s.target_fps, source_stream_id=s.source_stream_id, postprocessing_template_id=s.postprocessing_template_id, created_at=s.created_at, updated_at=s.updated_at, description=s.description, ) @router.get("/api/v1/picture-streams", response_model=PictureStreamListResponse, tags=["Picture Streams"]) async def list_picture_streams( _auth: AuthRequired, store: PictureStreamStore = Depends(get_picture_stream_store), ): """List all picture streams.""" try: streams = store.get_all_streams() responses = [_stream_to_response(s) for s in streams] return PictureStreamListResponse(streams=responses, count=len(responses)) except Exception as e: logger.error(f"Failed to list picture streams: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/v1/picture-streams", response_model=PictureStreamResponse, tags=["Picture Streams"], status_code=201) async def create_picture_stream( data: PictureStreamCreate, _auth: AuthRequired, store: PictureStreamStore = Depends(get_picture_stream_store), template_store: TemplateStore = Depends(get_template_store), pp_store: PostprocessingTemplateStore = Depends(get_pp_template_store), ): """Create a new picture stream.""" try: # Validate referenced entities if data.stream_type == "raw" and data.capture_template_id: try: template_store.get_template(data.capture_template_id) except ValueError: raise HTTPException( status_code=400, detail=f"Capture template not found: {data.capture_template_id}", ) if data.stream_type == "processed" and data.postprocessing_template_id: try: pp_store.get_template(data.postprocessing_template_id) except ValueError: raise HTTPException( status_code=400, detail=f"Postprocessing template not found: {data.postprocessing_template_id}", ) stream = store.create_stream( name=data.name, stream_type=data.stream_type, display_index=data.display_index, capture_template_id=data.capture_template_id, target_fps=data.target_fps, source_stream_id=data.source_stream_id, postprocessing_template_id=data.postprocessing_template_id, description=data.description, ) return _stream_to_response(stream) except HTTPException: raise except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to create picture stream: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/v1/picture-streams/{stream_id}", response_model=PictureStreamResponse, tags=["Picture Streams"]) async def get_picture_stream( stream_id: str, _auth: AuthRequired, store: PictureStreamStore = Depends(get_picture_stream_store), ): """Get picture stream by ID.""" try: stream = store.get_stream(stream_id) return _stream_to_response(stream) except ValueError: raise HTTPException(status_code=404, detail=f"Picture stream {stream_id} not found") @router.put("/api/v1/picture-streams/{stream_id}", response_model=PictureStreamResponse, tags=["Picture Streams"]) async def update_picture_stream( stream_id: str, data: PictureStreamUpdate, _auth: AuthRequired, store: PictureStreamStore = Depends(get_picture_stream_store), ): """Update a picture stream.""" try: stream = store.update_stream( stream_id=stream_id, name=data.name, display_index=data.display_index, capture_template_id=data.capture_template_id, target_fps=data.target_fps, source_stream_id=data.source_stream_id, postprocessing_template_id=data.postprocessing_template_id, description=data.description, ) return _stream_to_response(stream) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to update picture stream: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.delete("/api/v1/picture-streams/{stream_id}", status_code=204, tags=["Picture Streams"]) async def delete_picture_stream( stream_id: str, _auth: AuthRequired, store: PictureStreamStore = Depends(get_picture_stream_store), device_store: DeviceStore = Depends(get_device_store), ): """Delete a picture stream.""" try: # Check if any device references this stream if store.is_referenced_by_device(stream_id, device_store): raise HTTPException( status_code=409, detail="Cannot delete picture stream: it is assigned to one or more devices. " "Please reassign those devices before deleting.", ) store.delete_stream(stream_id) except HTTPException: raise except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to delete picture stream: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/v1/picture-streams/{stream_id}/test", response_model=TemplateTestResponse, tags=["Picture Streams"]) async def test_picture_stream( stream_id: str, test_request: PictureStreamTestRequest, _auth: AuthRequired, store: PictureStreamStore = Depends(get_picture_stream_store), template_store: TemplateStore = Depends(get_template_store), processor_manager: ProcessorManager = Depends(get_processor_manager), device_store: DeviceStore = Depends(get_device_store), pp_store: PostprocessingTemplateStore = Depends(get_pp_template_store), ): """Test a picture stream by resolving its chain and running a capture test. Resolves the stream chain to the raw stream, captures frames, and returns preview image + performance metrics. For processed streams, applies postprocessing (gamma, saturation, brightness) to the preview image. """ engine = None try: # Resolve stream chain try: chain = store.resolve_stream_chain(stream_id) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) raw_stream = chain["raw_stream"] # Get capture template from raw stream try: capture_template = template_store.get_template(raw_stream.capture_template_id) except ValueError: raise HTTPException( status_code=400, detail=f"Capture template not found: {raw_stream.capture_template_id}", ) display_index = raw_stream.display_index # Validate engine if capture_template.engine_type not in EngineRegistry.get_available_engines(): raise HTTPException( status_code=400, detail=f"Engine '{capture_template.engine_type}' is not available on this system", ) # Check display lock locked_device_id = processor_manager.get_display_lock_info(display_index) if locked_device_id: try: device = device_store.get_device(locked_device_id) device_name = device.name except Exception: device_name = locked_device_id raise HTTPException( status_code=409, detail=f"Display {display_index} is currently being captured by device '{device_name}'. " f"Please stop the device processing before testing.", ) # Create engine and run test engine = EngineRegistry.create_engine(capture_template.engine_type, capture_template.engine_config) logger.info(f"Starting {test_request.capture_duration}s stream test for {stream_id}") frame_count = 0 total_capture_time = 0.0 last_frame = None start_time = time.perf_counter() end_time = start_time + test_request.capture_duration while time.perf_counter() < end_time: capture_start = time.perf_counter() screen_capture = engine.capture_display(display_index) capture_elapsed = time.perf_counter() - capture_start total_capture_time += capture_elapsed frame_count += 1 last_frame = screen_capture actual_duration = time.perf_counter() - start_time if last_frame is None: raise RuntimeError("No frames captured during test") # Convert to PIL Image if isinstance(last_frame.image, np.ndarray): pil_image = Image.fromarray(last_frame.image) else: raise ValueError("Unexpected image format from engine") # Create thumbnail thumbnail_width = 640 aspect_ratio = pil_image.height / pil_image.width thumbnail_height = int(thumbnail_width * aspect_ratio) thumbnail = pil_image.copy() thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS) # Apply postprocessing filters if this is a processed stream pp_template_ids = chain["postprocessing_template_ids"] if pp_template_ids: try: pp_template = pp_store.get_template(pp_template_ids[0]) pool = ImagePool() def apply_filters(img): arr = np.array(img) for fi in pp_template.filters: f = FilterRegistry.create_instance(fi.filter_id, fi.options) result = f.process_image(arr, pool) if result is not None: arr = result return Image.fromarray(arr) thumbnail = apply_filters(thumbnail) pil_image = apply_filters(pil_image) except ValueError: logger.warning(f"PP template {pp_template_ids[0]} not found, skipping postprocessing preview") # Encode thumbnail img_buffer = io.BytesIO() thumbnail.save(img_buffer, format='JPEG', quality=85) img_buffer.seek(0) thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8') thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}" # Encode full-resolution image full_buffer = io.BytesIO() pil_image.save(full_buffer, format='JPEG', quality=90) full_buffer.seek(0) full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8') full_data_uri = f"data:image/jpeg;base64,{full_b64}" actual_fps = frame_count / actual_duration if actual_duration > 0 else 0 avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0 width, height = pil_image.size return TemplateTestResponse( full_capture=CaptureImage( image=thumbnail_data_uri, full_image=full_data_uri, width=width, height=height, thumbnail_width=thumbnail_width, thumbnail_height=thumbnail_height, ), border_extraction=None, performance=PerformanceMetrics( capture_duration_s=actual_duration, frame_count=frame_count, actual_fps=actual_fps, avg_capture_time_ms=avg_capture_time_ms, ), ) except HTTPException: raise except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except RuntimeError as e: raise HTTPException(status_code=500, detail=f"Engine error: {str(e)}") except Exception as e: logger.error(f"Failed to test picture stream: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) finally: if engine: try: engine.cleanup() except Exception as e: logger.error(f"Error cleaning up test engine: {e}")