"""Picture source routes.""" import asyncio import time import httpx import numpy as np from fastapi import APIRouter, HTTPException, Depends, Query, WebSocket, WebSocketDisconnect from fastapi.responses import Response from wled_controller.api.auth import AuthRequired from wled_controller.api.dependencies import ( fire_entity_event, get_picture_source_store, get_output_target_store, get_pp_template_store, get_template_store, ) from wled_controller.api.schemas.common import ( CaptureImage, PerformanceMetrics, TemplateTestResponse, ) from wled_controller.api.schemas.picture_sources import ( ImageValidateRequest, ImageValidateResponse, PictureSourceCreate, PictureSourceListResponse, PictureSourceResponse, PictureSourceTestRequest, PictureSourceUpdate, ) from wled_controller.core.capture_engines import EngineRegistry from wled_controller.core.filters import FilterRegistry, ImagePool from wled_controller.storage.output_target_store import OutputTargetStore from wled_controller.storage.template_store import TemplateStore from wled_controller.storage.postprocessing_template_store import PostprocessingTemplateStore from wled_controller.storage.picture_source_store import PictureSourceStore from wled_controller.storage.picture_source import ScreenCapturePictureSource, StaticImagePictureSource, VideoCaptureSource from wled_controller.utils import get_logger from wled_controller.storage.base_store import EntityNotFoundError logger = get_logger(__name__) router = APIRouter() def _stream_to_response(s) -> PictureSourceResponse: """Convert a PictureSource to its API response.""" return PictureSourceResponse( id=s.id, name=s.name, stream_type=s.stream_type, display_index=getattr(s, "display_index", None), capture_template_id=getattr(s, "capture_template_id", None), target_fps=getattr(s, "target_fps", None), source_stream_id=getattr(s, "source_stream_id", None), postprocessing_template_id=getattr(s, "postprocessing_template_id", None), image_source=getattr(s, "image_source", None), created_at=s.created_at, updated_at=s.updated_at, description=s.description, tags=s.tags, # Video fields url=getattr(s, "url", None), loop=getattr(s, "loop", None), playback_speed=getattr(s, "playback_speed", None), start_time=getattr(s, "start_time", None), end_time=getattr(s, "end_time", None), resolution_limit=getattr(s, "resolution_limit", None), clock_id=getattr(s, "clock_id", None), ) @router.get("/api/v1/picture-sources", response_model=PictureSourceListResponse, tags=["Picture Sources"]) async def list_picture_sources( _auth: AuthRequired, store: PictureSourceStore = Depends(get_picture_source_store), ): """List all picture sources.""" try: streams = store.get_all_streams() responses = [_stream_to_response(s) for s in streams] return PictureSourceListResponse(streams=responses, count=len(responses)) except Exception as e: logger.error(f"Failed to list picture sources: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/v1/picture-sources/validate-image", response_model=ImageValidateResponse, tags=["Picture Sources"]) async def validate_image( data: ImageValidateRequest, _auth: AuthRequired, ): """Validate an image source (URL or file path) and return a preview thumbnail.""" try: from pathlib import Path source = data.image_source.strip() if not source: return ImageValidateResponse(valid=False, error="Image source is empty") if source.startswith(("http://", "https://")): async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: response = await client.get(source) response.raise_for_status() img_bytes = response.content else: path = Path(source) if not path.exists(): return ImageValidateResponse(valid=False, error=f"File not found: {source}") img_bytes = path def _process_image(src): from wled_controller.utils.image_codec import ( encode_jpeg_data_uri, load_image_bytes, load_image_file, thumbnail as make_thumbnail, ) if isinstance(src, bytes): image = load_image_bytes(src) else: image = load_image_file(src) h, w = image.shape[:2] thumb = make_thumbnail(image, 320) preview = encode_jpeg_data_uri(thumb, quality=80) return w, h, preview width, height, preview = await asyncio.to_thread(_process_image, img_bytes) return ImageValidateResponse( valid=True, width=width, height=height, preview=preview ) except httpx.HTTPStatusError as e: return ImageValidateResponse(valid=False, error=f"HTTP {e.response.status_code}: {e.response.reason_phrase}") except httpx.RequestError as e: return ImageValidateResponse(valid=False, error=f"Request failed: {e}") except Exception as e: return ImageValidateResponse(valid=False, error=str(e)) @router.get("/api/v1/picture-sources/full-image", tags=["Picture Sources"]) async def get_full_image( _auth: AuthRequired, source: str = Query(..., description="Image URL or local file path"), ): """Serve the full-resolution image for lightbox preview.""" from pathlib import Path try: if source.startswith(("http://", "https://")): async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: response = await client.get(source) response.raise_for_status() img_bytes = response.content else: path = Path(source) if not path.exists(): raise HTTPException(status_code=404, detail="File not found") img_bytes = path def _encode_full(src): from wled_controller.utils.image_codec import encode_jpeg, load_image_bytes, load_image_file if isinstance(src, bytes): image = load_image_bytes(src) else: image = load_image_file(src) return encode_jpeg(image, quality=90) jpeg_bytes = await asyncio.to_thread(_encode_full, img_bytes) return Response(content=jpeg_bytes, media_type="image/jpeg") except HTTPException: raise except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @router.post("/api/v1/picture-sources", response_model=PictureSourceResponse, tags=["Picture Sources"], status_code=201) async def create_picture_source( data: PictureSourceCreate, _auth: AuthRequired, store: PictureSourceStore = Depends(get_picture_source_store), template_store: TemplateStore = Depends(get_template_store), pp_store: PostprocessingTemplateStore = Depends(get_pp_template_store), ): """Create a new picture source.""" try: # Validate referenced entities if data.stream_type == "raw" and data.capture_template_id: try: template_store.get_template(data.capture_template_id) except ValueError: raise HTTPException( status_code=400, detail=f"Capture template not found: {data.capture_template_id}", ) if data.stream_type == "processed" and data.postprocessing_template_id: try: pp_store.get_template(data.postprocessing_template_id) except ValueError: raise HTTPException( status_code=400, detail=f"Postprocessing template not found: {data.postprocessing_template_id}", ) stream = store.create_stream( name=data.name, stream_type=data.stream_type, display_index=data.display_index, capture_template_id=data.capture_template_id, target_fps=data.target_fps, source_stream_id=data.source_stream_id, postprocessing_template_id=data.postprocessing_template_id, image_source=data.image_source, description=data.description, tags=data.tags, # Video fields url=data.url, loop=data.loop, playback_speed=data.playback_speed, start_time=data.start_time, end_time=data.end_time, resolution_limit=data.resolution_limit, clock_id=data.clock_id, ) fire_entity_event("picture_source", "created", stream.id) return _stream_to_response(stream) except HTTPException: raise except EntityNotFoundError as e: raise HTTPException(status_code=404, detail=str(e)) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to create picture source: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/v1/picture-sources/{stream_id}", response_model=PictureSourceResponse, tags=["Picture Sources"]) async def get_picture_source( stream_id: str, _auth: AuthRequired, store: PictureSourceStore = Depends(get_picture_source_store), ): """Get picture source by ID.""" try: stream = store.get_stream(stream_id) return _stream_to_response(stream) except ValueError: raise HTTPException(status_code=404, detail=f"Picture source {stream_id} not found") @router.put("/api/v1/picture-sources/{stream_id}", response_model=PictureSourceResponse, tags=["Picture Sources"]) async def update_picture_source( stream_id: str, data: PictureSourceUpdate, _auth: AuthRequired, store: PictureSourceStore = Depends(get_picture_source_store), ): """Update a picture source.""" try: stream = store.update_stream( stream_id=stream_id, name=data.name, display_index=data.display_index, capture_template_id=data.capture_template_id, target_fps=data.target_fps, source_stream_id=data.source_stream_id, postprocessing_template_id=data.postprocessing_template_id, image_source=data.image_source, description=data.description, tags=data.tags, # Video fields url=data.url, loop=data.loop, playback_speed=data.playback_speed, start_time=data.start_time, end_time=data.end_time, resolution_limit=data.resolution_limit, clock_id=data.clock_id, ) fire_entity_event("picture_source", "updated", stream_id) return _stream_to_response(stream) except EntityNotFoundError as e: raise HTTPException(status_code=404, detail=str(e)) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to update picture source: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.delete("/api/v1/picture-sources/{stream_id}", status_code=204, tags=["Picture Sources"]) async def delete_picture_source( stream_id: str, _auth: AuthRequired, store: PictureSourceStore = Depends(get_picture_source_store), target_store: OutputTargetStore = Depends(get_output_target_store), ): """Delete a picture source.""" try: # Check if any target references this stream target_names = store.get_targets_referencing(stream_id, target_store) if target_names: names = ", ".join(target_names) raise HTTPException( status_code=409, detail=f"Cannot delete picture source: it is assigned to target(s): {names}. " "Please reassign those targets before deleting.", ) store.delete_stream(stream_id) fire_entity_event("picture_source", "deleted", stream_id) except HTTPException: raise except EntityNotFoundError as e: raise HTTPException(status_code=404, detail=str(e)) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to delete picture source: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/v1/picture-sources/{stream_id}/thumbnail", tags=["Picture Sources"]) async def get_video_thumbnail( stream_id: str, _auth: AuthRequired, store: PictureSourceStore = Depends(get_picture_source_store), ): """Get a thumbnail for a video picture source (first frame).""" from wled_controller.core.processing.video_stream import extract_thumbnail from wled_controller.storage.picture_source import VideoCaptureSource from wled_controller.utils.image_codec import encode_jpeg_data_uri, resize_down try: source = store.get_stream(stream_id) if not isinstance(source, VideoCaptureSource): raise HTTPException(status_code=400, detail="Not a video source") frame = await asyncio.get_event_loop().run_in_executor( None, extract_thumbnail, source.url, source.resolution_limit ) if frame is None: raise HTTPException(status_code=404, detail="Could not extract thumbnail") # Resize to max 320px wide for thumbnail frame = resize_down(frame, 320) h, w = frame.shape[:2] data_uri = encode_jpeg_data_uri(frame, quality=80) return {"thumbnail": data_uri, "width": w, "height": h} except HTTPException: raise except Exception as e: logger.error(f"Failed to extract video thumbnail: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/v1/picture-sources/{stream_id}/test", response_model=TemplateTestResponse, tags=["Picture Sources"]) async def test_picture_source( stream_id: str, test_request: PictureSourceTestRequest, _auth: AuthRequired, store: PictureSourceStore = Depends(get_picture_source_store), template_store: TemplateStore = Depends(get_template_store), pp_store: PostprocessingTemplateStore = Depends(get_pp_template_store), ): """Test a picture source by resolving its chain and running a capture test. Resolves the stream chain to the raw stream, captures frames, and returns preview image + performance metrics. For processed streams, applies postprocessing (gamma, saturation, brightness) to the preview image. """ stream = None try: # Resolve stream chain try: chain = store.resolve_stream_chain(stream_id) except EntityNotFoundError as e: raise HTTPException(status_code=404, detail=str(e)) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) raw_stream = chain["raw_stream"] if isinstance(raw_stream, StaticImagePictureSource): # Static image stream: load image directly, no engine needed from pathlib import Path source = raw_stream.image_source start_time = time.perf_counter() from wled_controller.utils.image_codec import load_image_bytes, load_image_file if source.startswith(("http://", "https://")): async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: resp = await client.get(source) resp.raise_for_status() image = load_image_bytes(resp.content) else: path = Path(source) if not path.exists(): raise HTTPException(status_code=400, detail=f"Image file not found: {source}") image = await asyncio.to_thread(load_image_file, path) actual_duration = time.perf_counter() - start_time frame_count = 1 total_capture_time = actual_duration elif isinstance(raw_stream, ScreenCapturePictureSource): # Screen capture stream: use engine try: capture_template = template_store.get_template(raw_stream.capture_template_id) except ValueError: raise HTTPException( status_code=400, detail=f"Capture template not found: {raw_stream.capture_template_id}", ) display_index = raw_stream.display_index if capture_template.engine_type not in EngineRegistry.get_available_engines(): raise HTTPException( status_code=400, detail=f"Engine '{capture_template.engine_type}' is not available on this system", ) stream = EngineRegistry.create_stream( capture_template.engine_type, display_index, capture_template.engine_config ) stream.initialize() frame_count = 0 total_capture_time = 0.0 last_frame = None start_time = time.perf_counter() if test_request.capture_duration == 0: # Single frame capture logger.info(f"Capturing single frame for {stream_id}") capture_start = time.perf_counter() screen_capture = stream.capture_frame() capture_elapsed = time.perf_counter() - capture_start if screen_capture is not None: total_capture_time = capture_elapsed frame_count = 1 last_frame = screen_capture else: logger.info(f"Starting {test_request.capture_duration}s stream test for {stream_id}") end_time = start_time + test_request.capture_duration while time.perf_counter() < end_time: capture_start = time.perf_counter() screen_capture = stream.capture_frame() capture_elapsed = time.perf_counter() - capture_start if screen_capture is None: continue total_capture_time += capture_elapsed frame_count += 1 last_frame = screen_capture actual_duration = time.perf_counter() - start_time if last_frame is None: raise RuntimeError("No frames captured during test") if not isinstance(last_frame.image, np.ndarray): raise ValueError("Unexpected image format from engine") image = last_frame.image # Create thumbnail + encode (CPU-bound — run in thread) from wled_controller.utils.image_codec import encode_jpeg_data_uri, thumbnail as make_thumbnail pp_template_ids = chain["postprocessing_template_ids"] flat_filters = None if pp_template_ids: try: pp_template = pp_store.get_template(pp_template_ids[0]) flat_filters = pp_store.resolve_filter_instances(pp_template.filters) or None except ValueError: logger.warning(f"PP template {pp_template_ids[0]} not found, skipping postprocessing preview") def _create_thumbnails_and_encode(img, filters): thumb = make_thumbnail(img, 640) if filters: pool = ImagePool() def apply_filters(arr): for fi in filters: f = FilterRegistry.create_instance(fi.filter_id, fi.options) result = f.process_image(arr, pool) if result is not None: arr = result return arr thumb = apply_filters(thumb) img = apply_filters(img) thumb_uri = encode_jpeg_data_uri(thumb, quality=85) full_uri = encode_jpeg_data_uri(img, quality=90) th, tw = thumb.shape[:2] return tw, th, thumb_uri, full_uri thumbnail_width, thumbnail_height, thumbnail_data_uri, full_data_uri = await asyncio.to_thread( _create_thumbnails_and_encode, image, flat_filters ) actual_fps = frame_count / actual_duration if actual_duration > 0 else 0 avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0 height, width = image.shape[:2] return TemplateTestResponse( full_capture=CaptureImage( image=thumbnail_data_uri, full_image=full_data_uri, width=width, height=height, thumbnail_width=thumbnail_width, thumbnail_height=thumbnail_height, ), border_extraction=None, performance=PerformanceMetrics( capture_duration_s=actual_duration, frame_count=frame_count, actual_fps=actual_fps, avg_capture_time_ms=avg_capture_time_ms, ), ) except HTTPException: raise except EntityNotFoundError as e: raise HTTPException(status_code=404, detail=str(e)) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except RuntimeError as e: raise HTTPException(status_code=500, detail=f"Engine error: {str(e)}") except Exception as e: logger.error(f"Failed to test picture source: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) finally: if stream: try: stream.cleanup() except Exception as e: logger.error(f"Error cleaning up test stream: {e}") # ===== REAL-TIME PICTURE SOURCE TEST WEBSOCKET ===== @router.websocket("/api/v1/picture-sources/{stream_id}/test/ws") async def test_picture_source_ws( websocket: WebSocket, stream_id: str, token: str = Query(""), duration: float = Query(5.0), preview_width: int = Query(0), ): """WebSocket for picture source test with intermediate frame previews.""" from wled_controller.api.routes._preview_helpers import ( authenticate_ws_token, stream_capture_test, ) from wled_controller.api.dependencies import ( get_picture_source_store as _get_ps_store, get_template_store as _get_t_store, get_pp_template_store as _get_pp_store, ) if not authenticate_ws_token(token): await websocket.close(code=4001, reason="Unauthorized") return store = _get_ps_store() template_store = _get_t_store() pp_store = _get_pp_store() # Resolve stream chain try: chain = store.resolve_stream_chain(stream_id) except ValueError as e: await websocket.close(code=4004, reason=str(e)) return raw_stream = chain["raw_stream"] # Static images don't benefit from streaming — reject gracefully if isinstance(raw_stream, StaticImagePictureSource): await websocket.close(code=4003, reason="Static image streams don't support live test") return # Video sources: use VideoCaptureLiveStream for test preview if isinstance(raw_stream, VideoCaptureSource): from wled_controller.core.processing.video_stream import VideoCaptureLiveStream await websocket.accept() logger.info(f"Video source test WS connected for {stream_id} ({duration}s)") video_stream = VideoCaptureLiveStream( url=raw_stream.url, loop=raw_stream.loop, playback_speed=raw_stream.playback_speed, start_time=raw_stream.start_time, end_time=raw_stream.end_time, resolution_limit=raw_stream.resolution_limit, target_fps=raw_stream.target_fps, ) def _encode_video_frame(image, pw): """Encode numpy RGB image as JPEG base64 data URI.""" from wled_controller.utils.image_codec import encode_jpeg_data_uri, resize_down if pw: image = resize_down(image, pw) h, w = image.shape[:2] return encode_jpeg_data_uri(image, quality=80), w, h try: await asyncio.get_event_loop().run_in_executor(None, video_stream.start) import time as _time fps = min(raw_stream.target_fps or 30, 30) frame_time = 1.0 / fps end_at = _time.monotonic() + duration frame_count = 0 last_frame = None while _time.monotonic() < end_at: frame = video_stream.get_latest_frame() if frame is not None and frame.image is not None and frame is not last_frame: last_frame = frame frame_count += 1 thumb, w, h = await asyncio.get_event_loop().run_in_executor( None, _encode_video_frame, frame.image, preview_width or None, ) elapsed = duration - (end_at - _time.monotonic()) await websocket.send_json({ "type": "frame", "thumbnail": thumb, "width": w, "height": h, "elapsed": round(elapsed, 1), "frame_count": frame_count, }) await asyncio.sleep(frame_time) # Send final result if last_frame is not None: full_img, fw, fh = await asyncio.get_event_loop().run_in_executor( None, _encode_video_frame, last_frame.image, None, ) await websocket.send_json({ "type": "result", "full_image": full_img, "width": fw, "height": fh, "total_frames": frame_count, "duration": duration, "avg_fps": round(frame_count / max(duration, 0.001), 1), }) except WebSocketDisconnect: pass except Exception as e: logger.error(f"Video source test WS error for {stream_id}: {e}") try: await websocket.send_json({"type": "error", "detail": str(e)}) except Exception: pass finally: video_stream.stop() logger.info(f"Video source test WS disconnected for {stream_id}") return if not isinstance(raw_stream, ScreenCapturePictureSource): await websocket.close(code=4003, reason="Unsupported stream type for live test") return # Create capture engine try: capture_template = template_store.get_template(raw_stream.capture_template_id) except ValueError as e: await websocket.close(code=4004, reason=str(e)) return if capture_template.engine_type not in EngineRegistry.get_available_engines(): await websocket.close(code=4003, reason=f"Engine '{capture_template.engine_type}' not available") return # Resolve postprocessing filters (if any) pp_filters = None pp_template_ids = chain.get("postprocessing_template_ids", []) if pp_template_ids: try: pp_template = pp_store.get_template(pp_template_ids[0]) pp_filters = pp_store.resolve_filter_instances(pp_template.filters) or None except ValueError: pass # Engine factory — creates + initializes engine inside the capture thread # to avoid thread-affinity issues (e.g. MSS uses thread-local state) _engine_type = capture_template.engine_type _display_index = raw_stream.display_index _engine_config = capture_template.engine_config def engine_factory(): s = EngineRegistry.create_stream(_engine_type, _display_index, _engine_config) s.initialize() return s await websocket.accept() logger.info(f"Picture source test WS connected for {stream_id} ({duration}s)") try: await stream_capture_test( websocket, engine_factory, duration, pp_filters=pp_filters, preview_width=preview_width or None, ) except WebSocketDisconnect: pass except Exception as e: logger.error(f"Picture source test WS error for {stream_id}: {e}") finally: logger.info(f"Picture source test WS disconnected for {stream_id}")