"""Capture template, engine, and filter routes.""" import base64 import io import time import numpy as np from PIL import Image from fastapi import APIRouter, HTTPException, Depends from wled_controller.api.auth import AuthRequired from wled_controller.api.dependencies import ( get_picture_source_store, get_pp_template_store, get_template_store, ) from wled_controller.api.schemas.common import ( CaptureImage, PerformanceMetrics, TemplateTestResponse, ) from wled_controller.api.schemas.templates import ( EngineInfo, EngineListResponse, TemplateCreate, TemplateListResponse, TemplateResponse, TemplateTestRequest, TemplateUpdate, ) from wled_controller.api.schemas.filters import ( FilterOptionDefSchema, FilterTypeListResponse, FilterTypeResponse, ) from wled_controller.core.capture_engines import EngineRegistry from wled_controller.core.filters import FilterRegistry from wled_controller.storage.template_store import TemplateStore from wled_controller.storage.picture_source_store import PictureSourceStore from wled_controller.storage.picture_source import ScreenCapturePictureSource from wled_controller.utils import get_logger logger = get_logger(__name__) router = APIRouter() # ===== CAPTURE TEMPLATE ENDPOINTS ===== @router.get("/api/v1/capture-templates", response_model=TemplateListResponse, tags=["Templates"]) async def list_templates( _auth: AuthRequired, template_store: TemplateStore = Depends(get_template_store), ): """List all capture templates.""" try: templates = template_store.get_all_templates() template_responses = [ TemplateResponse( id=t.id, name=t.name, engine_type=t.engine_type, engine_config=t.engine_config, created_at=t.created_at, updated_at=t.updated_at, description=t.description, ) for t in templates ] return TemplateListResponse( templates=template_responses, count=len(template_responses), ) except Exception as e: logger.error(f"Failed to list templates: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/v1/capture-templates", response_model=TemplateResponse, tags=["Templates"], status_code=201) async def create_template( template_data: TemplateCreate, _auth: AuthRequired, template_store: TemplateStore = Depends(get_template_store), ): """Create a new capture template.""" try: template = template_store.create_template( name=template_data.name, engine_type=template_data.engine_type, engine_config=template_data.engine_config, description=template_data.description, ) return TemplateResponse( id=template.id, name=template.name, engine_type=template.engine_type, engine_config=template.engine_config, created_at=template.created_at, updated_at=template.updated_at, description=template.description, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to create template: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/v1/capture-templates/{template_id}", response_model=TemplateResponse, tags=["Templates"]) async def get_template( template_id: str, _auth: AuthRequired, template_store: TemplateStore = Depends(get_template_store), ): """Get template by ID.""" try: template = template_store.get_template(template_id) except ValueError: raise HTTPException(status_code=404, detail=f"Template {template_id} not found") return TemplateResponse( id=template.id, name=template.name, engine_type=template.engine_type, engine_config=template.engine_config, created_at=template.created_at, updated_at=template.updated_at, description=template.description, ) @router.put("/api/v1/capture-templates/{template_id}", response_model=TemplateResponse, tags=["Templates"]) async def update_template( template_id: str, update_data: TemplateUpdate, _auth: AuthRequired, template_store: TemplateStore = Depends(get_template_store), ): """Update a template.""" try: template = template_store.update_template( template_id=template_id, name=update_data.name, engine_type=update_data.engine_type, engine_config=update_data.engine_config, description=update_data.description, ) return TemplateResponse( id=template.id, name=template.name, engine_type=template.engine_type, engine_config=template.engine_config, created_at=template.created_at, updated_at=template.updated_at, description=template.description, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to update template: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.delete("/api/v1/capture-templates/{template_id}", status_code=204, tags=["Templates"]) async def delete_template( template_id: str, _auth: AuthRequired, template_store: TemplateStore = Depends(get_template_store), stream_store: PictureSourceStore = Depends(get_picture_source_store), ): """Delete a template. Validates that no streams are currently using this template before deletion. """ try: # Check if any streams are using this template streams_using_template = [] for stream in stream_store.get_all_streams(): if isinstance(stream, ScreenCapturePictureSource) and stream.capture_template_id == template_id: streams_using_template.append(stream.name) if streams_using_template: stream_list = ", ".join(streams_using_template) raise HTTPException( status_code=409, detail=f"Cannot delete template: it is used by the following stream(s): {stream_list}. " f"Please reassign these streams to a different template before deleting." ) # Proceed with deletion template_store.delete_template(template_id) except HTTPException: raise # Re-raise HTTP exceptions as-is except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to delete template: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/v1/capture-engines", response_model=EngineListResponse, tags=["Templates"]) async def list_engines(_auth: AuthRequired): """List all registered capture engines. Returns every registered engine with an ``available`` flag showing whether it can be used on the current system. """ try: available_set = set(EngineRegistry.get_available_engines()) all_engines = EngineRegistry.get_all_engines() engines = [] for engine_type, engine_class in all_engines.items(): engines.append( EngineInfo( type=engine_type, name=engine_type.upper(), default_config=engine_class.get_default_config(), available=(engine_type in available_set), ) ) return EngineListResponse(engines=engines, count=len(engines)) except Exception as e: logger.error(f"Failed to list engines: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/v1/capture-templates/test", response_model=TemplateTestResponse, tags=["Templates"]) def test_template( test_request: TemplateTestRequest, _auth: AuthRequired, ): """Test a capture template configuration. Uses sync ``def`` so FastAPI runs it in a thread pool — the engine initialisation and capture loop are blocking and would stall the event loop if run in an ``async def`` handler. Temporarily instantiates an engine with the provided configuration, captures frames for the specified duration, and returns actual FPS metrics. """ stream = None try: # Validate engine type if test_request.engine_type not in EngineRegistry.get_available_engines(): raise HTTPException( status_code=400, detail=f"Engine '{test_request.engine_type}' is not available on this system" ) # Create and initialize capture stream stream = EngineRegistry.create_stream( test_request.engine_type, test_request.display_index, test_request.engine_config ) stream.initialize() # Run sustained capture test logger.info(f"Starting {test_request.capture_duration}s capture test with {test_request.engine_type}") frame_count = 0 total_capture_time = 0.0 last_frame = None start_time = time.perf_counter() end_time = start_time + test_request.capture_duration while time.perf_counter() < end_time: capture_start = time.perf_counter() screen_capture = stream.capture_frame() capture_elapsed = time.perf_counter() - capture_start # Skip if no new frame (screen unchanged); yield CPU if screen_capture is None: time.sleep(0.005) continue total_capture_time += capture_elapsed frame_count += 1 last_frame = screen_capture actual_duration = time.perf_counter() - start_time logger.info(f"Captured {frame_count} frames in {actual_duration:.2f}s") # Use the last captured frame for preview if last_frame is None: raise RuntimeError("No frames captured during test") # Convert numpy array to PIL Image if isinstance(last_frame.image, np.ndarray): pil_image = Image.fromarray(last_frame.image) else: raise ValueError("Unexpected image format from engine") # Create thumbnail (640px wide, maintain aspect ratio) thumbnail_width = 640 aspect_ratio = pil_image.height / pil_image.width thumbnail_height = int(thumbnail_width * aspect_ratio) thumbnail = pil_image.copy() thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS) # Encode thumbnail as JPEG img_buffer = io.BytesIO() thumbnail.save(img_buffer, format='JPEG', quality=85) img_buffer.seek(0) thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8') thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}" # Encode full-resolution image as JPEG full_buffer = io.BytesIO() pil_image.save(full_buffer, format='JPEG', quality=90) full_buffer.seek(0) full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8') full_data_uri = f"data:image/jpeg;base64,{full_b64}" # Calculate metrics actual_fps = frame_count / actual_duration if actual_duration > 0 else 0 avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0 width, height = pil_image.size return TemplateTestResponse( full_capture=CaptureImage( image=thumbnail_data_uri, full_image=full_data_uri, width=width, height=height, thumbnail_width=thumbnail_width, thumbnail_height=thumbnail_height, ), border_extraction=None, performance=PerformanceMetrics( capture_duration_s=actual_duration, frame_count=frame_count, actual_fps=actual_fps, avg_capture_time_ms=avg_capture_time_ms, ), ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except RuntimeError as e: raise HTTPException(status_code=500, detail=f"Engine error: {str(e)}") except Exception as e: logger.error(f"Failed to test template: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) finally: if stream: try: stream.cleanup() except Exception as e: logger.error(f"Error cleaning up test stream: {e}") # ===== FILTER TYPE ENDPOINTS ===== @router.get("/api/v1/filters", response_model=FilterTypeListResponse, tags=["Filters"]) async def list_filter_types( _auth: AuthRequired, pp_store=Depends(get_pp_template_store), ): """List all available postprocessing filter types and their options schemas.""" all_filters = FilterRegistry.get_all() # Pre-build template choices for the filter_template filter template_choices = None if pp_store: try: templates = pp_store.get_all_templates() template_choices = [{"value": t.id, "label": t.name} for t in templates] except Exception: template_choices = [] responses = [] for filter_id, filter_cls in all_filters.items(): schema = filter_cls.get_options_schema() opt_schemas = [] for opt in schema: choices = opt.choices # Enrich filter_template choices with current template list if filter_id == "filter_template" and opt.key == "template_id" and template_choices is not None: choices = template_choices opt_schemas.append(FilterOptionDefSchema( key=opt.key, label=opt.label, type=opt.option_type, default=opt.default, min_value=opt.min_value, max_value=opt.max_value, step=opt.step, choices=choices, )) responses.append(FilterTypeResponse( filter_id=filter_cls.filter_id, filter_name=filter_cls.filter_name, options_schema=opt_schemas, )) return FilterTypeListResponse(filters=responses, count=len(responses))