Add Picture Streams architecture with postprocessing templates and stream test UI

Introduce Picture Stream abstraction that separates the capture pipeline into
composable layers: raw streams (display + capture engine + FPS) and processed
streams (source stream + postprocessing template). Devices reference a picture
stream instead of managing individual capture settings.

- Add PictureStream and PostprocessingTemplate data models and stores
- Add CRUD API endpoints for picture streams and postprocessing templates
- Add stream chain resolution in ProcessorManager for start_processing
- Add picture stream test endpoint with postprocessing preview support
- Add Stream Settings modal with border_width and interpolation_mode controls
- Add stream test modal with capture preview and performance metrics
- Add full frontend: Picture Streams tab, Processing Templates tab, stream
  selector on device cards, test buttons on stream cards
- Add localization keys for all new features (en, ru)
- Migrate existing devices to picture streams on startup

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-11 00:00:30 +03:00
parent 3db7ba4b0e
commit 493f14fba9
23 changed files with 2773 additions and 200 deletions

View File

@@ -40,6 +40,15 @@ from wled_controller.api.schemas import (
CaptureImage,
BorderExtraction,
PerformanceMetrics,
PostprocessingTemplateCreate,
PostprocessingTemplateUpdate,
PostprocessingTemplateResponse,
PostprocessingTemplateListResponse,
PictureStreamCreate,
PictureStreamUpdate,
PictureStreamResponse,
PictureStreamListResponse,
PictureStreamTestRequest,
)
from wled_controller.config import get_config
from wled_controller.core.processor_manager import ProcessorManager, ProcessingSettings
@@ -49,6 +58,8 @@ from wled_controller.core.calibration import (
)
from wled_controller.storage import DeviceStore
from wled_controller.storage.template_store import TemplateStore
from wled_controller.storage.postprocessing_template_store import PostprocessingTemplateStore
from wled_controller.storage.picture_stream_store import PictureStreamStore
from wled_controller.core.capture_engines import EngineRegistry
from wled_controller.utils import get_logger
from wled_controller.core.screen_capture import get_available_displays
@@ -60,6 +71,8 @@ router = APIRouter()
# Global instances (initialized in main.py)
_device_store: DeviceStore | None = None
_template_store: TemplateStore | None = None
_pp_template_store: PostprocessingTemplateStore | None = None
_picture_stream_store: PictureStreamStore | None = None
_processor_manager: ProcessorManager | None = None
@@ -77,6 +90,20 @@ def get_template_store() -> TemplateStore:
return _template_store
def get_pp_template_store() -> PostprocessingTemplateStore:
"""Get postprocessing template store dependency."""
if _pp_template_store is None:
raise RuntimeError("Postprocessing template store not initialized")
return _pp_template_store
def get_picture_stream_store() -> PictureStreamStore:
"""Get picture stream store dependency."""
if _picture_stream_store is None:
raise RuntimeError("Picture stream store not initialized")
return _picture_stream_store
def get_processor_manager() -> ProcessorManager:
"""Get processor manager dependency."""
if _processor_manager is None:
@@ -88,12 +115,16 @@ def init_dependencies(
device_store: DeviceStore,
template_store: TemplateStore,
processor_manager: ProcessorManager,
pp_template_store: PostprocessingTemplateStore | None = None,
picture_stream_store: PictureStreamStore | None = None,
):
"""Initialize global dependencies."""
global _device_store, _template_store, _processor_manager
global _device_store, _template_store, _processor_manager, _pp_template_store, _picture_stream_store
_device_store = device_store
_template_store = template_store
_processor_manager = processor_manager
_pp_template_store = pp_template_store
_picture_stream_store = picture_stream_store
@router.get("/health", response_model=HealthResponse, tags=["Health"])
@@ -230,7 +261,10 @@ async def create_device(
if all_templates:
capture_template_id = all_templates[0].id
else:
capture_template_id = "tpl_mss_default"
raise HTTPException(
status_code=500,
detail="No capture templates available. Please create one first."
)
# Create device in storage (LED count auto-detected from WLED)
device = store.create_device(
@@ -260,11 +294,13 @@ async def create_device(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
state_check_interval=device.settings.state_check_interval,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
capture_template_id=device.capture_template_id,
picture_stream_id=device.picture_stream_id,
created_at=device.created_at,
updated_at=device.updated_at,
)
@@ -300,6 +336,7 @@ async def list_devices(
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
capture_template_id=device.capture_template_id,
picture_stream_id=device.picture_stream_id,
created_at=device.created_at,
updated_at=device.updated_at,
)
@@ -359,8 +396,12 @@ async def update_device(
):
"""Update device information."""
try:
# Check if template changed and device is processing (for hot-swap)
# Check if stream or template changed and device is processing (for hot-swap)
old_device = store.get_device(device_id)
stream_changed = (
update_data.picture_stream_id is not None
and update_data.picture_stream_id != old_device.picture_stream_id
)
template_changed = (
update_data.capture_template_id is not None
and update_data.capture_template_id != old_device.capture_template_id
@@ -374,16 +415,17 @@ async def update_device(
url=update_data.url,
enabled=update_data.enabled,
capture_template_id=update_data.capture_template_id,
picture_stream_id=update_data.picture_stream_id,
)
# Hot-swap: If template changed and device was processing, restart it
if template_changed and was_processing:
logger.info(f"Hot-swapping template for device {device_id}")
# Hot-swap: If stream/template changed and device was processing, restart it
if (stream_changed or template_changed) and was_processing:
logger.info(f"Hot-swapping stream/template for device {device_id}")
try:
# Stop current processing
await manager.stop_processing(device_id)
# Update processor with new template
# Update processor with new settings
manager.remove_device(device_id)
manager.add_device(
device_id=device.id,
@@ -392,11 +434,12 @@ async def update_device(
settings=device.settings,
calibration=device.calibration,
capture_template_id=device.capture_template_id,
picture_stream_id=device.picture_stream_id,
)
# Restart processing
await manager.start_processing(device_id)
logger.info(f"Successfully hot-swapped template for device {device_id}")
logger.info(f"Successfully hot-swapped stream/template for device {device_id}")
except Exception as e:
logger.error(f"Error during template hot-swap: {e}")
@@ -413,11 +456,13 @@ async def update_device(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
state_check_interval=device.settings.state_check_interval,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
capture_template_id=device.capture_template_id,
picture_stream_id=device.picture_stream_id,
created_at=device.created_at,
updated_at=device.updated_at,
)
@@ -540,6 +585,7 @@ async def get_settings(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
state_check_interval=device.settings.state_check_interval,
)
@@ -553,16 +599,28 @@ async def update_settings(
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Update processing settings for a device."""
"""Update processing settings for a device.
Merges with existing settings so callers can send partial updates.
"""
try:
# Create ProcessingSettings from schema
# Get existing device to merge settings
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
existing = device.settings
# Merge: use new values where provided, keep existing otherwise
new_settings = ProcessingSettings(
display_index=settings.display_index,
fps=settings.fps,
border_width=settings.border_width,
brightness=settings.color_correction.brightness if settings.color_correction else 1.0,
gamma=settings.color_correction.gamma if settings.color_correction else 2.2,
saturation=settings.color_correction.saturation if settings.color_correction else 1.0,
interpolation_mode=settings.interpolation_mode,
brightness=settings.color_correction.brightness if settings.color_correction else existing.brightness,
gamma=settings.color_correction.gamma if settings.color_correction else existing.gamma,
saturation=settings.color_correction.saturation if settings.color_correction else existing.saturation,
smoothing=existing.smoothing,
state_check_interval=settings.state_check_interval,
)
@@ -580,6 +638,7 @@ async def update_settings(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
interpolation_mode=device.settings.interpolation_mode,
brightness=device.settings.brightness,
state_check_interval=device.settings.state_check_interval,
)
@@ -736,7 +795,7 @@ async def list_templates(
name=t.name,
engine_type=t.engine_type,
engine_config=t.engine_config,
is_default=t.is_default,
created_at=t.created_at,
updated_at=t.updated_at,
description=t.description,
@@ -774,7 +833,7 @@ async def create_template(
name=template.name,
engine_type=template.engine_type,
engine_config=template.engine_config,
is_default=template.is_default,
created_at=template.created_at,
updated_at=template.updated_at,
description=template.description,
@@ -804,7 +863,6 @@ async def get_template(
name=template.name,
engine_type=template.engine_type,
engine_config=template.engine_config,
is_default=template.is_default,
created_at=template.created_at,
updated_at=template.updated_at,
description=template.description,
@@ -833,7 +891,7 @@ async def update_template(
name=template.name,
engine_type=template.engine_type,
engine_config=template.engine_config,
is_default=template.is_default,
created_at=template.created_at,
updated_at=template.updated_at,
description=template.description,
@@ -1038,3 +1096,441 @@ async def test_template(
engine.cleanup()
except Exception as e:
logger.error(f"Error cleaning up test engine: {e}")
# ===== POSTPROCESSING TEMPLATE ENDPOINTS =====
def _pp_template_to_response(t) -> PostprocessingTemplateResponse:
"""Convert a PostprocessingTemplate to its API response."""
return PostprocessingTemplateResponse(
id=t.id,
name=t.name,
gamma=t.gamma,
saturation=t.saturation,
brightness=t.brightness,
smoothing=t.smoothing,
created_at=t.created_at,
updated_at=t.updated_at,
description=t.description,
)
@router.get("/api/v1/postprocessing-templates", response_model=PostprocessingTemplateListResponse, tags=["Postprocessing Templates"])
async def list_pp_templates(
_auth: AuthRequired,
store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""List all postprocessing templates."""
try:
templates = store.get_all_templates()
responses = [_pp_template_to_response(t) for t in templates]
return PostprocessingTemplateListResponse(templates=responses, count=len(responses))
except Exception as e:
logger.error(f"Failed to list postprocessing templates: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/postprocessing-templates", response_model=PostprocessingTemplateResponse, tags=["Postprocessing Templates"], status_code=201)
async def create_pp_template(
data: PostprocessingTemplateCreate,
_auth: AuthRequired,
store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""Create a new postprocessing template."""
try:
template = store.create_template(
name=data.name,
gamma=data.gamma,
saturation=data.saturation,
brightness=data.brightness,
smoothing=data.smoothing,
description=data.description,
)
return _pp_template_to_response(template)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to create postprocessing template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/postprocessing-templates/{template_id}", response_model=PostprocessingTemplateResponse, tags=["Postprocessing Templates"])
async def get_pp_template(
template_id: str,
_auth: AuthRequired,
store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""Get postprocessing template by ID."""
try:
template = store.get_template(template_id)
return _pp_template_to_response(template)
except ValueError:
raise HTTPException(status_code=404, detail=f"Postprocessing template {template_id} not found")
@router.put("/api/v1/postprocessing-templates/{template_id}", response_model=PostprocessingTemplateResponse, tags=["Postprocessing Templates"])
async def update_pp_template(
template_id: str,
data: PostprocessingTemplateUpdate,
_auth: AuthRequired,
store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""Update a postprocessing template."""
try:
template = store.update_template(
template_id=template_id,
name=data.name,
gamma=data.gamma,
saturation=data.saturation,
brightness=data.brightness,
smoothing=data.smoothing,
description=data.description,
)
return _pp_template_to_response(template)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to update postprocessing template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/postprocessing-templates/{template_id}", status_code=204, tags=["Postprocessing Templates"])
async def delete_pp_template(
template_id: str,
_auth: AuthRequired,
store: PostprocessingTemplateStore = Depends(get_pp_template_store),
stream_store: PictureStreamStore = Depends(get_picture_stream_store),
):
"""Delete a postprocessing template."""
try:
# Check if any picture stream references this template
if store.is_referenced_by(template_id, stream_store):
raise HTTPException(
status_code=409,
detail="Cannot delete postprocessing template: it is referenced by one or more picture streams. "
"Please reassign those streams before deleting.",
)
store.delete_template(template_id)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to delete postprocessing template: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== PICTURE STREAM ENDPOINTS =====
def _stream_to_response(s) -> PictureStreamResponse:
"""Convert a PictureStream to its API response."""
return PictureStreamResponse(
id=s.id,
name=s.name,
stream_type=s.stream_type,
display_index=s.display_index,
capture_template_id=s.capture_template_id,
target_fps=s.target_fps,
source_stream_id=s.source_stream_id,
postprocessing_template_id=s.postprocessing_template_id,
created_at=s.created_at,
updated_at=s.updated_at,
description=s.description,
)
@router.get("/api/v1/picture-streams", response_model=PictureStreamListResponse, tags=["Picture Streams"])
async def list_picture_streams(
_auth: AuthRequired,
store: PictureStreamStore = Depends(get_picture_stream_store),
):
"""List all picture streams."""
try:
streams = store.get_all_streams()
responses = [_stream_to_response(s) for s in streams]
return PictureStreamListResponse(streams=responses, count=len(responses))
except Exception as e:
logger.error(f"Failed to list picture streams: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/picture-streams", response_model=PictureStreamResponse, tags=["Picture Streams"], status_code=201)
async def create_picture_stream(
data: PictureStreamCreate,
_auth: AuthRequired,
store: PictureStreamStore = Depends(get_picture_stream_store),
template_store: TemplateStore = Depends(get_template_store),
pp_store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""Create a new picture stream."""
try:
# Validate referenced entities
if data.stream_type == "raw" and data.capture_template_id:
try:
template_store.get_template(data.capture_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Capture template not found: {data.capture_template_id}",
)
if data.stream_type == "processed" and data.postprocessing_template_id:
try:
pp_store.get_template(data.postprocessing_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Postprocessing template not found: {data.postprocessing_template_id}",
)
stream = store.create_stream(
name=data.name,
stream_type=data.stream_type,
display_index=data.display_index,
capture_template_id=data.capture_template_id,
target_fps=data.target_fps,
source_stream_id=data.source_stream_id,
postprocessing_template_id=data.postprocessing_template_id,
description=data.description,
)
return _stream_to_response(stream)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to create picture stream: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/picture-streams/{stream_id}", response_model=PictureStreamResponse, tags=["Picture Streams"])
async def get_picture_stream(
stream_id: str,
_auth: AuthRequired,
store: PictureStreamStore = Depends(get_picture_stream_store),
):
"""Get picture stream by ID."""
try:
stream = store.get_stream(stream_id)
return _stream_to_response(stream)
except ValueError:
raise HTTPException(status_code=404, detail=f"Picture stream {stream_id} not found")
@router.put("/api/v1/picture-streams/{stream_id}", response_model=PictureStreamResponse, tags=["Picture Streams"])
async def update_picture_stream(
stream_id: str,
data: PictureStreamUpdate,
_auth: AuthRequired,
store: PictureStreamStore = Depends(get_picture_stream_store),
):
"""Update a picture stream."""
try:
stream = store.update_stream(
stream_id=stream_id,
name=data.name,
display_index=data.display_index,
capture_template_id=data.capture_template_id,
target_fps=data.target_fps,
source_stream_id=data.source_stream_id,
postprocessing_template_id=data.postprocessing_template_id,
description=data.description,
)
return _stream_to_response(stream)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to update picture stream: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/picture-streams/{stream_id}", status_code=204, tags=["Picture Streams"])
async def delete_picture_stream(
stream_id: str,
_auth: AuthRequired,
store: PictureStreamStore = Depends(get_picture_stream_store),
device_store: DeviceStore = Depends(get_device_store),
):
"""Delete a picture stream."""
try:
# Check if any device references this stream
if store.is_referenced_by_device(stream_id, device_store):
raise HTTPException(
status_code=409,
detail="Cannot delete picture stream: it is assigned to one or more devices. "
"Please reassign those devices before deleting.",
)
store.delete_stream(stream_id)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to delete picture stream: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/picture-streams/{stream_id}/test", response_model=TemplateTestResponse, tags=["Picture Streams"])
async def test_picture_stream(
stream_id: str,
test_request: PictureStreamTestRequest,
_auth: AuthRequired,
store: PictureStreamStore = Depends(get_picture_stream_store),
template_store: TemplateStore = Depends(get_template_store),
processor_manager: ProcessorManager = Depends(get_processor_manager),
device_store: DeviceStore = Depends(get_device_store),
pp_store: PostprocessingTemplateStore = Depends(get_pp_template_store),
):
"""Test a picture stream by resolving its chain and running a capture test.
Resolves the stream chain to the raw stream, captures frames,
and returns preview image + performance metrics.
For processed streams, applies postprocessing (gamma, saturation, brightness)
to the preview image.
"""
engine = None
try:
# Resolve stream chain
try:
chain = store.resolve_stream_chain(stream_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
raw_stream = chain["raw_stream"]
# Get capture template from raw stream
try:
capture_template = template_store.get_template(raw_stream.capture_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Capture template not found: {raw_stream.capture_template_id}",
)
display_index = raw_stream.display_index
# Validate engine
if capture_template.engine_type not in EngineRegistry.get_available_engines():
raise HTTPException(
status_code=400,
detail=f"Engine '{capture_template.engine_type}' is not available on this system",
)
# Check display lock
locked_device_id = processor_manager.get_display_lock_info(display_index)
if locked_device_id:
try:
device = device_store.get_device(locked_device_id)
device_name = device.name
except Exception:
device_name = locked_device_id
raise HTTPException(
status_code=409,
detail=f"Display {display_index} is currently being captured by device '{device_name}'. "
f"Please stop the device processing before testing.",
)
# Create engine and run test
engine = EngineRegistry.create_engine(capture_template.engine_type, capture_template.engine_config)
logger.info(f"Starting {test_request.capture_duration}s stream test for {stream_id}")
frame_count = 0
total_capture_time = 0.0
last_frame = None
start_time = time.perf_counter()
end_time = start_time + test_request.capture_duration
while time.perf_counter() < end_time:
capture_start = time.perf_counter()
screen_capture = engine.capture_display(display_index)
capture_elapsed = time.perf_counter() - capture_start
total_capture_time += capture_elapsed
frame_count += 1
last_frame = screen_capture
actual_duration = time.perf_counter() - start_time
if last_frame is None:
raise RuntimeError("No frames captured during test")
# Convert to PIL Image
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
else:
raise ValueError("Unexpected image format from engine")
# Create thumbnail
thumbnail_width = 640
aspect_ratio = pil_image.height / pil_image.width
thumbnail_height = int(thumbnail_width * aspect_ratio)
thumbnail = pil_image.copy()
thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS)
# Apply postprocessing to preview if this is a processed stream
pp_template_ids = chain["postprocessing_template_ids"]
if pp_template_ids:
try:
pp = pp_store.get_template(pp_template_ids[0])
img_array = np.array(thumbnail, dtype=np.float32) / 255.0
if pp.brightness != 1.0:
img_array *= pp.brightness
if pp.saturation != 1.0:
luminance = np.dot(img_array[..., :3], [0.299, 0.587, 0.114])
luminance = luminance[..., np.newaxis]
img_array[..., :3] = luminance + (img_array[..., :3] - luminance) * pp.saturation
if pp.gamma != 1.0:
img_array = np.power(np.clip(img_array, 0, 1), 1.0 / pp.gamma)
img_array = np.clip(img_array * 255.0, 0, 255).astype(np.uint8)
thumbnail = Image.fromarray(img_array)
except ValueError:
logger.warning(f"PP template {pp_template_ids[0]} not found, skipping postprocessing preview")
img_buffer = io.BytesIO()
thumbnail.save(img_buffer, format='JPEG', quality=85)
img_buffer.seek(0)
full_capture_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
full_capture_data_uri = f"data:image/jpeg;base64,{full_capture_b64}"
actual_fps = frame_count / actual_duration if actual_duration > 0 else 0
avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0
width, height = pil_image.size
return TemplateTestResponse(
full_capture=CaptureImage(
image=full_capture_data_uri,
width=width,
height=height,
thumbnail_width=thumbnail_width,
thumbnail_height=thumbnail_height,
),
border_extraction=None,
performance=PerformanceMetrics(
capture_duration_s=actual_duration,
frame_count=frame_count,
actual_fps=actual_fps,
avg_capture_time_ms=avg_capture_time_ms,
),
)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
raise HTTPException(status_code=500, detail=f"Engine error: {str(e)}")
except Exception as e:
logger.error(f"Failed to test picture stream: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
finally:
if engine:
try:
engine.cleanup()
except Exception as e:
logger.error(f"Error cleaning up test engine: {e}")