Add static image picture stream type with auto-validating UI

Introduces a new "static_image" stream type that loads a frame from a URL
or local file path, enabling LED testing with known images or displaying
static content. Includes validate-image API endpoint, auto-validation on
blur/enter/paste with caching, capture template names on stream cards,
and conditional test stats display for single-frame results.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-11 19:57:43 +03:00
parent 4f9c30ef06
commit e0877a9b16
10 changed files with 566 additions and 181 deletions

View File

@@ -54,6 +54,8 @@ from wled_controller.api.schemas import (
PictureStreamListResponse,
PictureStreamTestRequest,
PPTemplateTestRequest,
ImageValidateRequest,
ImageValidateResponse,
)
from wled_controller.config import get_config
from wled_controller.core.processor_manager import ProcessorManager, ProcessingSettings
@@ -1284,69 +1286,87 @@ async def test_pp_template(
raw_stream = chain["raw_stream"]
# Get capture template from raw stream
try:
capture_template = template_store.get_template(raw_stream.capture_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Capture template not found: {raw_stream.capture_template_id}",
)
if raw_stream.stream_type == "static_image":
# Static image: load directly
from pathlib import Path
display_index = raw_stream.display_index
source = raw_stream.image_source
start_time = time.perf_counter()
# Validate engine
if capture_template.engine_type not in EngineRegistry.get_available_engines():
raise HTTPException(
status_code=400,
detail=f"Engine '{capture_template.engine_type}' is not available on this system",
)
if source.startswith(("http://", "https://")):
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.get(source)
resp.raise_for_status()
pil_image = Image.open(io.BytesIO(resp.content)).convert("RGB")
else:
path = Path(source)
if not path.exists():
raise HTTPException(status_code=400, detail=f"Image file not found: {source}")
pil_image = Image.open(path).convert("RGB")
# Check display lock
locked_device_id = processor_manager.get_display_lock_info(display_index)
if locked_device_id:
try:
device = device_store.get_device(locked_device_id)
device_name = device.name
except Exception:
device_name = locked_device_id
raise HTTPException(
status_code=409,
detail=f"Display {display_index} is currently being captured by device '{device_name}'. "
f"Please stop the device processing before testing.",
)
# Create engine and run test
engine = EngineRegistry.create_engine(capture_template.engine_type, capture_template.engine_config)
logger.info(f"Starting {test_request.capture_duration}s PP template test for {template_id} using stream {test_request.source_stream_id}")
frame_count = 0
total_capture_time = 0.0
last_frame = None
start_time = time.perf_counter()
end_time = start_time + test_request.capture_duration
while time.perf_counter() < end_time:
capture_start = time.perf_counter()
screen_capture = engine.capture_display(display_index)
capture_elapsed = time.perf_counter() - capture_start
total_capture_time += capture_elapsed
frame_count += 1
last_frame = screen_capture
actual_duration = time.perf_counter() - start_time
if last_frame is None:
raise RuntimeError("No frames captured during test")
# Convert to PIL Image
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
actual_duration = time.perf_counter() - start_time
frame_count = 1
total_capture_time = actual_duration
else:
raise ValueError("Unexpected image format from engine")
# Raw capture stream: use engine
try:
capture_template = template_store.get_template(raw_stream.capture_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Capture template not found: {raw_stream.capture_template_id}",
)
display_index = raw_stream.display_index
if capture_template.engine_type not in EngineRegistry.get_available_engines():
raise HTTPException(
status_code=400,
detail=f"Engine '{capture_template.engine_type}' is not available on this system",
)
locked_device_id = processor_manager.get_display_lock_info(display_index)
if locked_device_id:
try:
device = device_store.get_device(locked_device_id)
device_name = device.name
except Exception:
device_name = locked_device_id
raise HTTPException(
status_code=409,
detail=f"Display {display_index} is currently being captured by device '{device_name}'. "
f"Please stop the device processing before testing.",
)
engine = EngineRegistry.create_engine(capture_template.engine_type, capture_template.engine_config)
logger.info(f"Starting {test_request.capture_duration}s PP template test for {template_id} using stream {test_request.source_stream_id}")
frame_count = 0
total_capture_time = 0.0
last_frame = None
start_time = time.perf_counter()
end_time = start_time + test_request.capture_duration
while time.perf_counter() < end_time:
capture_start = time.perf_counter()
screen_capture = engine.capture_display(display_index)
capture_elapsed = time.perf_counter() - capture_start
total_capture_time += capture_elapsed
frame_count += 1
last_frame = screen_capture
actual_duration = time.perf_counter() - start_time
if last_frame is None:
raise RuntimeError("No frames captured during test")
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
else:
raise ValueError("Unexpected image format from engine")
# Create thumbnail
thumbnail_width = 640
@@ -1435,6 +1455,7 @@ def _stream_to_response(s) -> PictureStreamResponse:
target_fps=s.target_fps,
source_stream_id=s.source_stream_id,
postprocessing_template_id=s.postprocessing_template_id,
image_source=s.image_source,
created_at=s.created_at,
updated_at=s.updated_at,
description=s.description,
@@ -1456,6 +1477,53 @@ async def list_picture_streams(
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/picture-streams/validate-image", response_model=ImageValidateResponse, tags=["Picture Streams"])
async def validate_image(
data: ImageValidateRequest,
_auth: AuthRequired,
):
"""Validate an image source (URL or file path) and return a preview thumbnail."""
try:
from pathlib import Path
source = data.image_source.strip()
if not source:
return ImageValidateResponse(valid=False, error="Image source is empty")
if source.startswith(("http://", "https://")):
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
response = await client.get(source)
response.raise_for_status()
pil_image = Image.open(io.BytesIO(response.content))
else:
path = Path(source)
if not path.exists():
return ImageValidateResponse(valid=False, error=f"File not found: {source}")
pil_image = Image.open(path)
pil_image = pil_image.convert("RGB")
width, height = pil_image.size
# Create thumbnail preview (max 320px wide)
thumb = pil_image.copy()
thumb.thumbnail((320, 320), Image.Resampling.LANCZOS)
buf = io.BytesIO()
thumb.save(buf, format="JPEG", quality=80)
buf.seek(0)
preview = f"data:image/jpeg;base64,{base64.b64encode(buf.getvalue()).decode()}"
return ImageValidateResponse(
valid=True, width=width, height=height, preview=preview
)
except httpx.HTTPStatusError as e:
return ImageValidateResponse(valid=False, error=f"HTTP {e.response.status_code}: {e.response.reason_phrase}")
except httpx.RequestError as e:
return ImageValidateResponse(valid=False, error=f"Request failed: {e}")
except Exception as e:
return ImageValidateResponse(valid=False, error=str(e))
@router.post("/api/v1/picture-streams", response_model=PictureStreamResponse, tags=["Picture Streams"], status_code=201)
async def create_picture_stream(
data: PictureStreamCreate,
@@ -1493,6 +1561,7 @@ async def create_picture_stream(
target_fps=data.target_fps,
source_stream_id=data.source_stream_id,
postprocessing_template_id=data.postprocessing_template_id,
image_source=data.image_source,
description=data.description,
)
return _stream_to_response(stream)
@@ -1536,6 +1605,7 @@ async def update_picture_stream(
target_fps=data.target_fps,
source_stream_id=data.source_stream_id,
postprocessing_template_id=data.postprocessing_template_id,
image_source=data.image_source,
description=data.description,
)
return _stream_to_response(stream)
@@ -1600,69 +1670,88 @@ async def test_picture_stream(
raw_stream = chain["raw_stream"]
# Get capture template from raw stream
try:
capture_template = template_store.get_template(raw_stream.capture_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Capture template not found: {raw_stream.capture_template_id}",
)
if raw_stream.stream_type == "static_image":
# Static image stream: load image directly, no engine needed
from pathlib import Path
display_index = raw_stream.display_index
source = raw_stream.image_source
start_time = time.perf_counter()
# Validate engine
if capture_template.engine_type not in EngineRegistry.get_available_engines():
raise HTTPException(
status_code=400,
detail=f"Engine '{capture_template.engine_type}' is not available on this system",
)
if source.startswith(("http://", "https://")):
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.get(source)
resp.raise_for_status()
pil_image = Image.open(io.BytesIO(resp.content)).convert("RGB")
else:
path = Path(source)
if not path.exists():
raise HTTPException(status_code=400, detail=f"Image file not found: {source}")
pil_image = Image.open(path).convert("RGB")
# Check display lock
locked_device_id = processor_manager.get_display_lock_info(display_index)
if locked_device_id:
try:
device = device_store.get_device(locked_device_id)
device_name = device.name
except Exception:
device_name = locked_device_id
raise HTTPException(
status_code=409,
detail=f"Display {display_index} is currently being captured by device '{device_name}'. "
f"Please stop the device processing before testing.",
)
actual_duration = time.perf_counter() - start_time
frame_count = 1
total_capture_time = actual_duration
# Create engine and run test
engine = EngineRegistry.create_engine(capture_template.engine_type, capture_template.engine_config)
logger.info(f"Starting {test_request.capture_duration}s stream test for {stream_id}")
frame_count = 0
total_capture_time = 0.0
last_frame = None
start_time = time.perf_counter()
end_time = start_time + test_request.capture_duration
while time.perf_counter() < end_time:
capture_start = time.perf_counter()
screen_capture = engine.capture_display(display_index)
capture_elapsed = time.perf_counter() - capture_start
total_capture_time += capture_elapsed
frame_count += 1
last_frame = screen_capture
actual_duration = time.perf_counter() - start_time
if last_frame is None:
raise RuntimeError("No frames captured during test")
# Convert to PIL Image
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
else:
raise ValueError("Unexpected image format from engine")
# Raw capture stream: use engine
try:
capture_template = template_store.get_template(raw_stream.capture_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Capture template not found: {raw_stream.capture_template_id}",
)
display_index = raw_stream.display_index
if capture_template.engine_type not in EngineRegistry.get_available_engines():
raise HTTPException(
status_code=400,
detail=f"Engine '{capture_template.engine_type}' is not available on this system",
)
locked_device_id = processor_manager.get_display_lock_info(display_index)
if locked_device_id:
try:
device = device_store.get_device(locked_device_id)
device_name = device.name
except Exception:
device_name = locked_device_id
raise HTTPException(
status_code=409,
detail=f"Display {display_index} is currently being captured by device '{device_name}'. "
f"Please stop the device processing before testing.",
)
engine = EngineRegistry.create_engine(capture_template.engine_type, capture_template.engine_config)
logger.info(f"Starting {test_request.capture_duration}s stream test for {stream_id}")
frame_count = 0
total_capture_time = 0.0
last_frame = None
start_time = time.perf_counter()
end_time = start_time + test_request.capture_duration
while time.perf_counter() < end_time:
capture_start = time.perf_counter()
screen_capture = engine.capture_display(display_index)
capture_elapsed = time.perf_counter() - capture_start
total_capture_time += capture_elapsed
frame_count += 1
last_frame = screen_capture
actual_duration = time.perf_counter() - start_time
if last_frame is None:
raise RuntimeError("No frames captured during test")
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
else:
raise ValueError("Unexpected image format from engine")
# Create thumbnail
thumbnail_width = 640

View File

@@ -405,12 +405,13 @@ class PictureStreamCreate(BaseModel):
"""Request to create a picture stream."""
name: str = Field(description="Stream name", min_length=1, max_length=100)
stream_type: Literal["raw", "processed"] = Field(description="Stream type")
stream_type: Literal["raw", "processed", "static_image"] = Field(description="Stream type")
display_index: Optional[int] = Field(None, description="Display index (raw streams)", ge=0)
capture_template_id: Optional[str] = Field(None, description="Capture template ID (raw streams)")
target_fps: Optional[int] = Field(None, description="Target FPS (raw streams)", ge=10, le=90)
source_stream_id: Optional[str] = Field(None, description="Source stream ID (processed streams)")
postprocessing_template_id: Optional[str] = Field(None, description="Postprocessing template ID (processed streams)")
image_source: Optional[str] = Field(None, description="Image URL or file path (static_image streams)")
description: Optional[str] = Field(None, description="Stream description", max_length=500)
@@ -423,6 +424,7 @@ class PictureStreamUpdate(BaseModel):
target_fps: Optional[int] = Field(None, description="Target FPS (raw streams)", ge=10, le=90)
source_stream_id: Optional[str] = Field(None, description="Source stream ID (processed streams)")
postprocessing_template_id: Optional[str] = Field(None, description="Postprocessing template ID (processed streams)")
image_source: Optional[str] = Field(None, description="Image URL or file path (static_image streams)")
description: Optional[str] = Field(None, description="Stream description", max_length=500)
@@ -431,12 +433,13 @@ class PictureStreamResponse(BaseModel):
id: str = Field(description="Stream ID")
name: str = Field(description="Stream name")
stream_type: str = Field(description="Stream type (raw or processed)")
stream_type: str = Field(description="Stream type (raw, processed, or static_image)")
display_index: Optional[int] = Field(None, description="Display index")
capture_template_id: Optional[str] = Field(None, description="Capture template ID")
target_fps: Optional[int] = Field(None, description="Target FPS")
source_stream_id: Optional[str] = Field(None, description="Source stream ID")
postprocessing_template_id: Optional[str] = Field(None, description="Postprocessing template ID")
image_source: Optional[str] = Field(None, description="Image URL or file path")
created_at: datetime = Field(description="Creation timestamp")
updated_at: datetime = Field(description="Last update timestamp")
description: Optional[str] = Field(None, description="Stream description")
@@ -461,3 +464,19 @@ class PPTemplateTestRequest(BaseModel):
source_stream_id: str = Field(description="ID of the source picture stream to capture from")
capture_duration: float = Field(default=5.0, ge=1.0, le=30.0, description="Duration to capture in seconds")
class ImageValidateRequest(BaseModel):
"""Request to validate an image source (URL or file path)."""
image_source: str = Field(description="Image URL or local file path")
class ImageValidateResponse(BaseModel):
"""Response from image validation."""
valid: bool = Field(description="Whether the image source is accessible and valid")
width: Optional[int] = Field(None, description="Image width in pixels")
height: Optional[int] = Field(None, description="Image height in pixels")
preview: Optional[str] = Field(None, description="Base64-encoded JPEG thumbnail")
error: Optional[str] = Field(None, description="Error message if invalid")