"""Capture template, engine, and filter routes.""" import base64 import io import time import numpy as np from PIL import Image from fastapi import APIRouter, HTTPException, Depends, Query, WebSocket, WebSocketDisconnect from wled_controller.api.auth import AuthRequired from wled_controller.api.dependencies import ( get_picture_source_store, get_pp_template_store, get_template_store, ) from wled_controller.api.schemas.common import ( CaptureImage, PerformanceMetrics, TemplateTestResponse, ) from wled_controller.api.schemas.templates import ( EngineInfo, EngineListResponse, TemplateCreate, TemplateListResponse, TemplateResponse, TemplateTestRequest, TemplateUpdate, ) from wled_controller.api.schemas.filters import ( FilterOptionDefSchema, FilterTypeListResponse, FilterTypeResponse, ) from wled_controller.core.capture_engines import EngineRegistry from wled_controller.core.filters import FilterRegistry from wled_controller.storage.template_store import TemplateStore from wled_controller.storage.picture_source_store import PictureSourceStore from wled_controller.storage.picture_source import ScreenCapturePictureSource from wled_controller.utils import get_logger logger = get_logger(__name__) router = APIRouter() # ===== CAPTURE TEMPLATE ENDPOINTS ===== @router.get("/api/v1/capture-templates", response_model=TemplateListResponse, tags=["Templates"]) async def list_templates( _auth: AuthRequired, template_store: TemplateStore = Depends(get_template_store), ): """List all capture templates.""" try: templates = template_store.get_all_templates() template_responses = [ TemplateResponse( id=t.id, name=t.name, engine_type=t.engine_type, engine_config=t.engine_config, created_at=t.created_at, updated_at=t.updated_at, description=t.description, ) for t in templates ] return TemplateListResponse( templates=template_responses, count=len(template_responses), ) except Exception as e: logger.error(f"Failed to list templates: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/v1/capture-templates", response_model=TemplateResponse, tags=["Templates"], status_code=201) async def create_template( template_data: TemplateCreate, _auth: AuthRequired, template_store: TemplateStore = Depends(get_template_store), ): """Create a new capture template.""" try: template = template_store.create_template( name=template_data.name, engine_type=template_data.engine_type, engine_config=template_data.engine_config, description=template_data.description, ) return TemplateResponse( id=template.id, name=template.name, engine_type=template.engine_type, engine_config=template.engine_config, created_at=template.created_at, updated_at=template.updated_at, description=template.description, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to create template: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/v1/capture-templates/{template_id}", response_model=TemplateResponse, tags=["Templates"]) async def get_template( template_id: str, _auth: AuthRequired, template_store: TemplateStore = Depends(get_template_store), ): """Get template by ID.""" try: template = template_store.get_template(template_id) except ValueError: raise HTTPException(status_code=404, detail=f"Template {template_id} not found") return TemplateResponse( id=template.id, name=template.name, engine_type=template.engine_type, engine_config=template.engine_config, created_at=template.created_at, updated_at=template.updated_at, description=template.description, ) @router.put("/api/v1/capture-templates/{template_id}", response_model=TemplateResponse, tags=["Templates"]) async def update_template( template_id: str, update_data: TemplateUpdate, _auth: AuthRequired, template_store: TemplateStore = Depends(get_template_store), ): """Update a template.""" try: template = template_store.update_template( template_id=template_id, name=update_data.name, engine_type=update_data.engine_type, engine_config=update_data.engine_config, description=update_data.description, ) return TemplateResponse( id=template.id, name=template.name, engine_type=template.engine_type, engine_config=template.engine_config, created_at=template.created_at, updated_at=template.updated_at, description=template.description, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to update template: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.delete("/api/v1/capture-templates/{template_id}", status_code=204, tags=["Templates"]) async def delete_template( template_id: str, _auth: AuthRequired, template_store: TemplateStore = Depends(get_template_store), stream_store: PictureSourceStore = Depends(get_picture_source_store), ): """Delete a template. Validates that no streams are currently using this template before deletion. """ try: # Check if any streams are using this template streams_using_template = [] for stream in stream_store.get_all_streams(): if isinstance(stream, ScreenCapturePictureSource) and stream.capture_template_id == template_id: streams_using_template.append(stream.name) if streams_using_template: stream_list = ", ".join(streams_using_template) raise HTTPException( status_code=409, detail=f"Cannot delete template: it is used by the following stream(s): {stream_list}. " f"Please reassign these streams to a different template before deleting." ) # Proceed with deletion template_store.delete_template(template_id) except HTTPException: raise # Re-raise HTTP exceptions as-is except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to delete template: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/v1/capture-engines", response_model=EngineListResponse, tags=["Templates"]) async def list_engines(_auth: AuthRequired): """List all registered capture engines. Returns every registered engine with an ``available`` flag showing whether it can be used on the current system. """ try: available_set = set(EngineRegistry.get_available_engines()) all_engines = EngineRegistry.get_all_engines() engines = [] for engine_type, engine_class in all_engines.items(): engines.append( EngineInfo( type=engine_type, name=engine_type.upper(), default_config=engine_class.get_default_config(), available=(engine_type in available_set), ) ) return EngineListResponse(engines=engines, count=len(engines)) except Exception as e: logger.error(f"Failed to list engines: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/v1/capture-templates/test", response_model=TemplateTestResponse, tags=["Templates"]) def test_template( test_request: TemplateTestRequest, _auth: AuthRequired, ): """Test a capture template configuration. Uses sync ``def`` so FastAPI runs it in a thread pool — the engine initialisation and capture loop are blocking and would stall the event loop if run in an ``async def`` handler. Temporarily instantiates an engine with the provided configuration, captures frames for the specified duration, and returns actual FPS metrics. """ stream = None try: # Validate engine type if test_request.engine_type not in EngineRegistry.get_available_engines(): raise HTTPException( status_code=400, detail=f"Engine '{test_request.engine_type}' is not available on this system" ) # Create and initialize capture stream stream = EngineRegistry.create_stream( test_request.engine_type, test_request.display_index, test_request.engine_config ) stream.initialize() # Run sustained capture test logger.info(f"Starting {test_request.capture_duration}s capture test with {test_request.engine_type}") frame_count = 0 total_capture_time = 0.0 last_frame = None start_time = time.perf_counter() end_time = start_time + test_request.capture_duration while time.perf_counter() < end_time: capture_start = time.perf_counter() screen_capture = stream.capture_frame() capture_elapsed = time.perf_counter() - capture_start # Skip if no new frame (screen unchanged); yield CPU if screen_capture is None: time.sleep(0.005) continue total_capture_time += capture_elapsed frame_count += 1 last_frame = screen_capture actual_duration = time.perf_counter() - start_time logger.info(f"Captured {frame_count} frames in {actual_duration:.2f}s") # Use the last captured frame for preview if last_frame is None: raise RuntimeError("No frames captured during test") # Convert numpy array to PIL Image if isinstance(last_frame.image, np.ndarray): pil_image = Image.fromarray(last_frame.image) else: raise ValueError("Unexpected image format from engine") # Create thumbnail (640px wide, maintain aspect ratio) thumbnail_width = 640 aspect_ratio = pil_image.height / pil_image.width thumbnail_height = int(thumbnail_width * aspect_ratio) thumbnail = pil_image.copy() thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS) # Encode thumbnail as JPEG img_buffer = io.BytesIO() thumbnail.save(img_buffer, format='JPEG', quality=85) img_buffer.seek(0) thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8') thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}" # Encode full-resolution image as JPEG full_buffer = io.BytesIO() pil_image.save(full_buffer, format='JPEG', quality=90) full_buffer.seek(0) full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8') full_data_uri = f"data:image/jpeg;base64,{full_b64}" # Calculate metrics actual_fps = frame_count / actual_duration if actual_duration > 0 else 0 avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0 width, height = pil_image.size return TemplateTestResponse( full_capture=CaptureImage( image=thumbnail_data_uri, full_image=full_data_uri, width=width, height=height, thumbnail_width=thumbnail_width, thumbnail_height=thumbnail_height, ), border_extraction=None, performance=PerformanceMetrics( capture_duration_s=actual_duration, frame_count=frame_count, actual_fps=actual_fps, avg_capture_time_ms=avg_capture_time_ms, ), ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except RuntimeError as e: raise HTTPException(status_code=500, detail=f"Engine error: {str(e)}") except Exception as e: logger.error(f"Failed to test template: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) finally: if stream: try: stream.cleanup() except Exception as e: logger.error(f"Error cleaning up test stream: {e}") # ===== REAL-TIME CAPTURE TEMPLATE TEST WEBSOCKET ===== @router.websocket("/api/v1/capture-templates/test/ws") async def test_template_ws( websocket: WebSocket, token: str = Query(""), ): """WebSocket for capture template test with intermediate frame previews. Config is sent as the first client message (JSON with engine_type, engine_config, display_index, capture_duration). """ from wled_controller.api.routes._test_helpers import ( authenticate_ws_token, stream_capture_test, ) if not authenticate_ws_token(token): await websocket.close(code=4001, reason="Unauthorized") return await websocket.accept() # Read config from first client message try: config = await websocket.receive_json() except Exception as e: await websocket.send_json({"type": "error", "detail": f"Expected JSON config: {e}"}) await websocket.close(code=4003) return engine_type = config.get("engine_type", "") engine_config = config.get("engine_config", {}) display_index = config.get("display_index", 0) duration = float(config.get("capture_duration", 5.0)) pw = int(config.get("preview_width", 0)) or None if engine_type not in EngineRegistry.get_available_engines(): await websocket.send_json({"type": "error", "detail": f"Engine '{engine_type}' not available"}) await websocket.close(code=4003) return # Engine factory — creates + initializes engine inside the capture thread # to avoid thread-affinity issues (e.g. MSS uses thread-local state) def engine_factory(): s = EngineRegistry.create_stream(engine_type, display_index, engine_config) s.initialize() return s logger.info(f"Capture template test WS connected ({engine_type}, display {display_index}, {duration}s)") try: await stream_capture_test(websocket, engine_factory, duration, preview_width=pw) except WebSocketDisconnect: pass except Exception as e: logger.error(f"Capture template test WS error: {e}") finally: logger.info("Capture template test WS disconnected") # ===== FILTER TYPE ENDPOINTS ===== @router.get("/api/v1/filters", response_model=FilterTypeListResponse, tags=["Filters"]) async def list_filter_types( _auth: AuthRequired, pp_store=Depends(get_pp_template_store), ): """List all available postprocessing filter types and their options schemas.""" all_filters = FilterRegistry.get_all() # Pre-build template choices for the filter_template filter template_choices = None if pp_store: try: templates = pp_store.get_all_templates() template_choices = [{"value": t.id, "label": t.name} for t in templates] except Exception: template_choices = [] responses = [] for filter_id, filter_cls in all_filters.items(): schema = filter_cls.get_options_schema() opt_schemas = [] for opt in schema: choices = opt.choices # Enrich filter_template choices with current template list if filter_id == "filter_template" and opt.key == "template_id" and template_choices is not None: choices = template_choices opt_schemas.append(FilterOptionDefSchema( key=opt.key, label=opt.label, type=opt.option_type, default=opt.default, min_value=opt.min_value, max_value=opt.max_value, step=opt.step, choices=choices, )) responses.append(FilterTypeResponse( filter_id=filter_cls.filter_id, filter_name=filter_cls.filter_name, options_schema=opt_schemas, )) return FilterTypeListResponse(filters=responses, count=len(responses))