Files
ledgrab/server/src/wled_controller/api/routes/picture_targets.py
T
alexei.dolgolyov 3100b0d979 Add frame-change detection, keepalive, current FPS, and compact metrics UI
Skip redundant processing/DDP sends when screen is static using object
identity comparison. Add configurable standby interval to periodically
resend last frame keeping WLED in live mode. Track frames skipped,
keepalive count, and current FPS (rolling 1-second send count). Always
use DDP regardless of LED count. Compact metrics grid with label-value
rows and remove Skipped from UI display.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 15:17:14 +03:00

770 lines
29 KiB
Python

"""Picture target routes: CRUD, processing control, settings, state, metrics."""
import base64
import io
import secrets
import time
import numpy as np
from fastapi import APIRouter, HTTPException, Depends, Query, WebSocket, WebSocketDisconnect
from PIL import Image
from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
get_device_store,
get_pattern_template_store,
get_picture_source_store,
get_picture_target_store,
get_processor_manager,
get_template_store,
)
from wled_controller.api.schemas.picture_targets import (
ExtractedColorResponse,
KCTestRectangleResponse,
KCTestResponse,
KeyColorsResponse,
KeyColorsSettingsSchema,
PictureTargetCreate,
PictureTargetListResponse,
PictureTargetResponse,
PictureTargetUpdate,
ProcessingSettings as ProcessingSettingsSchema,
TargetMetricsResponse,
TargetProcessingState,
)
from wled_controller.config import config
from wled_controller.core.capture_engines import EngineRegistry
from wled_controller.core.processor_manager import ProcessorManager, ProcessingSettings
from wled_controller.core.screen_capture import (
calculate_average_color,
calculate_dominant_color,
calculate_median_color,
)
from wled_controller.storage import DeviceStore
from wled_controller.storage.pattern_template_store import PatternTemplateStore
from wled_controller.storage.picture_source import ScreenCapturePictureSource, StaticImagePictureSource
from wled_controller.storage.picture_source_store import PictureSourceStore
from wled_controller.storage.template_store import TemplateStore
from wled_controller.storage.wled_picture_target import WledPictureTarget
from wled_controller.storage.key_colors_picture_target import (
KeyColorsSettings,
KeyColorsPictureTarget,
)
from wled_controller.storage.picture_target_store import PictureTargetStore
from wled_controller.utils import get_logger
logger = get_logger(__name__)
router = APIRouter()
def _settings_to_core(schema: ProcessingSettingsSchema) -> ProcessingSettings:
"""Convert schema ProcessingSettings to core ProcessingSettings."""
settings = ProcessingSettings(
display_index=schema.display_index,
fps=schema.fps,
border_width=schema.border_width,
interpolation_mode=schema.interpolation_mode,
brightness=schema.brightness,
smoothing=schema.smoothing,
standby_interval=schema.standby_interval,
state_check_interval=schema.state_check_interval,
)
if schema.color_correction:
settings.gamma = schema.color_correction.gamma
settings.saturation = schema.color_correction.saturation
# color_correction.brightness maps to settings.brightness
settings.brightness = schema.color_correction.brightness
return settings
def _settings_to_schema(settings: ProcessingSettings) -> ProcessingSettingsSchema:
"""Convert core ProcessingSettings to schema ProcessingSettings."""
from wled_controller.api.schemas.picture_targets import ColorCorrection
return ProcessingSettingsSchema(
display_index=settings.display_index,
fps=settings.fps,
border_width=settings.border_width,
interpolation_mode=settings.interpolation_mode,
brightness=settings.brightness,
smoothing=settings.smoothing,
standby_interval=settings.standby_interval,
state_check_interval=settings.state_check_interval,
color_correction=ColorCorrection(
gamma=settings.gamma,
saturation=settings.saturation,
brightness=settings.brightness,
),
)
def _kc_settings_to_schema(settings: KeyColorsSettings) -> KeyColorsSettingsSchema:
"""Convert core KeyColorsSettings to schema."""
return KeyColorsSettingsSchema(
fps=settings.fps,
interpolation_mode=settings.interpolation_mode,
smoothing=settings.smoothing,
pattern_template_id=settings.pattern_template_id,
)
def _kc_schema_to_settings(schema: KeyColorsSettingsSchema) -> KeyColorsSettings:
"""Convert schema KeyColorsSettings to core."""
return KeyColorsSettings(
fps=schema.fps,
interpolation_mode=schema.interpolation_mode,
smoothing=schema.smoothing,
pattern_template_id=schema.pattern_template_id,
)
def _target_to_response(target) -> PictureTargetResponse:
"""Convert a PictureTarget to PictureTargetResponse."""
if isinstance(target, WledPictureTarget):
return PictureTargetResponse(
id=target.id,
name=target.name,
target_type=target.target_type,
device_id=target.device_id,
picture_source_id=target.picture_source_id,
settings=_settings_to_schema(target.settings),
description=target.description,
created_at=target.created_at,
updated_at=target.updated_at,
)
elif isinstance(target, KeyColorsPictureTarget):
return PictureTargetResponse(
id=target.id,
name=target.name,
target_type=target.target_type,
picture_source_id=target.picture_source_id,
key_colors_settings=_kc_settings_to_schema(target.settings),
description=target.description,
created_at=target.created_at,
updated_at=target.updated_at,
)
else:
return PictureTargetResponse(
id=target.id,
name=target.name,
target_type=target.target_type,
description=target.description,
created_at=target.created_at,
updated_at=target.updated_at,
)
# ===== CRUD ENDPOINTS =====
@router.post("/api/v1/picture-targets", response_model=PictureTargetResponse, tags=["Targets"], status_code=201)
async def create_target(
data: PictureTargetCreate,
_auth: AuthRequired,
target_store: PictureTargetStore = Depends(get_picture_target_store),
device_store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Create a new picture target."""
try:
# Validate device exists if provided
if data.device_id:
device = device_store.get_device(data.device_id)
if not device:
raise HTTPException(status_code=422, detail=f"Device {data.device_id} not found")
# Convert settings
core_settings = _settings_to_core(data.settings) if data.settings else None
kc_settings = _kc_schema_to_settings(data.key_colors_settings) if data.key_colors_settings else None
# Create in store
target = target_store.create_target(
name=data.name,
target_type=data.target_type,
device_id=data.device_id,
picture_source_id=data.picture_source_id,
settings=core_settings,
key_colors_settings=kc_settings,
description=data.description,
)
# Register in processor manager
if isinstance(target, WledPictureTarget) and target.device_id:
try:
manager.add_target(
target_id=target.id,
device_id=target.device_id,
settings=target.settings,
picture_source_id=target.picture_source_id,
)
except ValueError as e:
logger.warning(f"Could not register target {target.id} in processor manager: {e}")
elif isinstance(target, KeyColorsPictureTarget):
try:
manager.add_kc_target(
target_id=target.id,
picture_source_id=target.picture_source_id,
settings=target.settings,
)
except ValueError as e:
logger.warning(f"Could not register KC target {target.id}: {e}")
return _target_to_response(target)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to create target: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/picture-targets", response_model=PictureTargetListResponse, tags=["Targets"])
async def list_targets(
_auth: AuthRequired,
target_store: PictureTargetStore = Depends(get_picture_target_store),
):
"""List all picture targets."""
targets = target_store.get_all_targets()
responses = [_target_to_response(t) for t in targets]
return PictureTargetListResponse(targets=responses, count=len(responses))
@router.get("/api/v1/picture-targets/{target_id}", response_model=PictureTargetResponse, tags=["Targets"])
async def get_target(
target_id: str,
_auth: AuthRequired,
target_store: PictureTargetStore = Depends(get_picture_target_store),
):
"""Get a picture target by ID."""
try:
target = target_store.get_target(target_id)
return _target_to_response(target)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@router.put("/api/v1/picture-targets/{target_id}", response_model=PictureTargetResponse, tags=["Targets"])
async def update_target(
target_id: str,
data: PictureTargetUpdate,
_auth: AuthRequired,
target_store: PictureTargetStore = Depends(get_picture_target_store),
device_store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Update a picture target."""
try:
# Validate device exists if changing
if data.device_id is not None and data.device_id:
device = device_store.get_device(data.device_id)
if not device:
raise HTTPException(status_code=422, detail=f"Device {data.device_id} not found")
# Convert settings
core_settings = _settings_to_core(data.settings) if data.settings else None
kc_settings = _kc_schema_to_settings(data.key_colors_settings) if data.key_colors_settings else None
# Update in store
target = target_store.update_target(
target_id=target_id,
name=data.name,
device_id=data.device_id,
picture_source_id=data.picture_source_id,
settings=core_settings,
key_colors_settings=kc_settings,
description=data.description,
)
# Sync processor manager
if isinstance(target, WledPictureTarget):
try:
if data.settings is not None:
manager.update_target_settings(target_id, target.settings)
if data.picture_source_id is not None:
manager.update_target_source(target_id, target.picture_source_id)
if data.device_id is not None:
manager.update_target_device(target_id, target.device_id)
except ValueError:
pass
elif isinstance(target, KeyColorsPictureTarget):
try:
if data.key_colors_settings is not None:
manager.update_kc_target_settings(target_id, target.settings)
if data.picture_source_id is not None:
manager.update_kc_target_source(target_id, target.picture_source_id)
except ValueError:
pass
return _target_to_response(target)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to update target: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/picture-targets/{target_id}", status_code=204, tags=["Targets"])
async def delete_target(
target_id: str,
_auth: AuthRequired,
target_store: PictureTargetStore = Depends(get_picture_target_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Delete a picture target. Stops processing first if active."""
try:
# Stop processing if running (WLED or KC)
try:
if manager.is_kc_target(target_id):
await manager.stop_kc_processing(target_id)
elif manager.is_target_processing(target_id):
await manager.stop_processing(target_id)
except ValueError:
pass
# Remove from manager (WLED or KC)
try:
if manager.is_kc_target(target_id):
manager.remove_kc_target(target_id)
else:
manager.remove_target(target_id)
except (ValueError, RuntimeError):
pass
# Delete from store
target_store.delete_target(target_id)
logger.info(f"Deleted target {target_id}")
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to delete target: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== PROCESSING CONTROL ENDPOINTS =====
@router.post("/api/v1/picture-targets/{target_id}/start", tags=["Processing"])
async def start_processing(
target_id: str,
_auth: AuthRequired,
target_store: PictureTargetStore = Depends(get_picture_target_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Start processing for a picture target."""
try:
# Verify target exists and dispatch by type
target = target_store.get_target(target_id)
if isinstance(target, KeyColorsPictureTarget):
await manager.start_kc_processing(target_id)
else:
await manager.start_processing(target_id)
logger.info(f"Started processing for target {target_id}")
return {"status": "started", "target_id": target_id}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except RuntimeError as e:
raise HTTPException(status_code=409, detail=str(e))
except Exception as e:
logger.error(f"Failed to start processing: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/picture-targets/{target_id}/stop", tags=["Processing"])
async def stop_processing(
target_id: str,
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Stop processing for a picture target."""
try:
if manager.is_kc_target(target_id):
await manager.stop_kc_processing(target_id)
else:
await manager.stop_processing(target_id)
logger.info(f"Stopped processing for target {target_id}")
return {"status": "stopped", "target_id": target_id}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to stop processing: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== STATE & METRICS ENDPOINTS =====
@router.get("/api/v1/picture-targets/{target_id}/state", response_model=TargetProcessingState, tags=["Processing"])
async def get_target_state(
target_id: str,
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Get current processing state for a target."""
try:
if manager.is_kc_target(target_id):
state = manager.get_kc_target_state(target_id)
else:
state = manager.get_target_state(target_id)
return TargetProcessingState(**state)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to get target state: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/picture-targets/{target_id}/settings", tags=["Settings"])
async def get_target_settings(
target_id: str,
_auth: AuthRequired,
target_store: PictureTargetStore = Depends(get_picture_target_store),
):
"""Get processing settings for a target."""
try:
target = target_store.get_target(target_id)
if isinstance(target, KeyColorsPictureTarget):
return _kc_settings_to_schema(target.settings)
if isinstance(target, WledPictureTarget):
return _settings_to_schema(target.settings)
return ProcessingSettingsSchema()
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@router.put("/api/v1/picture-targets/{target_id}/settings", response_model=ProcessingSettingsSchema, tags=["Settings"])
async def update_target_settings(
target_id: str,
settings: ProcessingSettingsSchema,
_auth: AuthRequired,
target_store: PictureTargetStore = Depends(get_picture_target_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Update processing settings for a target.
Merges with existing settings so callers can send partial updates.
"""
try:
target = target_store.get_target(target_id)
if not isinstance(target, WledPictureTarget):
raise HTTPException(status_code=400, detail="Target does not support processing settings")
existing = target.settings
sent = settings.model_fields_set
# Merge: only override fields the client explicitly provided
new_settings = ProcessingSettings(
display_index=settings.display_index if 'display_index' in sent else existing.display_index,
fps=settings.fps if 'fps' in sent else existing.fps,
border_width=settings.border_width if 'border_width' in sent else existing.border_width,
interpolation_mode=settings.interpolation_mode if 'interpolation_mode' in sent else existing.interpolation_mode,
brightness=settings.brightness if 'brightness' in sent else existing.brightness,
gamma=existing.gamma,
saturation=existing.saturation,
smoothing=settings.smoothing if 'smoothing' in sent else existing.smoothing,
standby_interval=settings.standby_interval if 'standby_interval' in sent else existing.standby_interval,
state_check_interval=settings.state_check_interval if 'state_check_interval' in sent else existing.state_check_interval,
)
# Apply color_correction fields if explicitly sent
if 'color_correction' in sent and settings.color_correction:
cc_sent = settings.color_correction.model_fields_set
if 'brightness' in cc_sent:
new_settings.brightness = settings.color_correction.brightness
if 'gamma' in cc_sent:
new_settings.gamma = settings.color_correction.gamma
if 'saturation' in cc_sent:
new_settings.saturation = settings.color_correction.saturation
# Update in store
target_store.update_target(target_id, settings=new_settings)
# Update in manager
try:
manager.update_target_settings(target_id, new_settings)
except ValueError:
pass
return _settings_to_schema(new_settings)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to update target settings: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/picture-targets/{target_id}/metrics", response_model=TargetMetricsResponse, tags=["Metrics"])
async def get_target_metrics(
target_id: str,
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Get processing metrics for a target."""
try:
if manager.is_kc_target(target_id):
metrics = manager.get_kc_target_metrics(target_id)
else:
metrics = manager.get_target_metrics(target_id)
return TargetMetricsResponse(**metrics)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to get target metrics: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== KEY COLORS ENDPOINTS =====
@router.get("/api/v1/picture-targets/{target_id}/colors", response_model=KeyColorsResponse, tags=["Key Colors"])
async def get_target_colors(
target_id: str,
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Get latest extracted colors for a key-colors target (polling)."""
try:
raw_colors = manager.get_kc_latest_colors(target_id)
colors = {}
for name, (r, g, b) in raw_colors.items():
colors[name] = ExtractedColorResponse(
r=r, g=g, b=b,
hex=f"#{r:02x}{g:02x}{b:02x}",
)
from datetime import datetime
return KeyColorsResponse(
target_id=target_id,
colors=colors,
timestamp=datetime.utcnow(),
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@router.post("/api/v1/picture-targets/{target_id}/test", response_model=KCTestResponse, tags=["Key Colors"])
async def test_kc_target(
target_id: str,
_auth: AuthRequired,
target_store: PictureTargetStore = Depends(get_picture_target_store),
source_store: PictureSourceStore = Depends(get_picture_source_store),
template_store: TemplateStore = Depends(get_template_store),
pattern_store: PatternTemplateStore = Depends(get_pattern_template_store),
processor_manager: ProcessorManager = Depends(get_processor_manager),
device_store: DeviceStore = Depends(get_device_store),
):
"""Test a key-colors target: capture a frame, extract colors from each rectangle."""
import httpx
stream = None
try:
# 1. Load and validate KC target
try:
target = target_store.get_target(target_id)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
if not isinstance(target, KeyColorsPictureTarget):
raise HTTPException(status_code=400, detail="Target is not a key_colors target")
settings = target.settings
# 2. Resolve pattern template
if not settings.pattern_template_id:
raise HTTPException(status_code=400, detail="No pattern template configured")
try:
pattern_tmpl = pattern_store.get_template(settings.pattern_template_id)
except ValueError:
raise HTTPException(status_code=400, detail=f"Pattern template not found: {settings.pattern_template_id}")
rectangles = pattern_tmpl.rectangles
if not rectangles:
raise HTTPException(status_code=400, detail="Pattern template has no rectangles")
# 3. Resolve picture source and capture a frame
if not target.picture_source_id:
raise HTTPException(status_code=400, detail="No picture source configured")
try:
chain = source_store.resolve_stream_chain(target.picture_source_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
raw_stream = chain["raw_stream"]
if isinstance(raw_stream, StaticImagePictureSource):
source = raw_stream.image_source
if source.startswith(("http://", "https://")):
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.get(source)
resp.raise_for_status()
pil_image = Image.open(io.BytesIO(resp.content)).convert("RGB")
else:
from pathlib import Path
path = Path(source)
if not path.exists():
raise HTTPException(status_code=400, detail=f"Image file not found: {source}")
pil_image = Image.open(path).convert("RGB")
elif isinstance(raw_stream, ScreenCapturePictureSource):
try:
capture_template = template_store.get_template(raw_stream.capture_template_id)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Capture template not found: {raw_stream.capture_template_id}",
)
display_index = raw_stream.display_index
if capture_template.engine_type not in EngineRegistry.get_available_engines():
raise HTTPException(
status_code=400,
detail=f"Engine '{capture_template.engine_type}' is not available on this system",
)
locked_device_id = processor_manager.get_display_lock_info(display_index)
if locked_device_id:
try:
device = device_store.get_device(locked_device_id)
device_name = device.name
except Exception:
device_name = locked_device_id
raise HTTPException(
status_code=409,
detail=f"Display {display_index} is currently being captured by device '{device_name}'. "
f"Please stop the device processing before testing.",
)
stream = EngineRegistry.create_stream(
capture_template.engine_type, display_index, capture_template.engine_config
)
stream.initialize()
screen_capture = stream.capture_frame()
if screen_capture is None:
raise RuntimeError("No frame captured")
if isinstance(screen_capture.image, np.ndarray):
pil_image = Image.fromarray(screen_capture.image)
else:
raise ValueError("Unexpected image format from engine")
else:
raise HTTPException(status_code=400, detail="Unsupported picture source type")
# 4. Extract colors from each rectangle
img_array = np.array(pil_image)
h, w = img_array.shape[:2]
calc_fns = {
"average": calculate_average_color,
"median": calculate_median_color,
"dominant": calculate_dominant_color,
}
calc_fn = calc_fns.get(settings.interpolation_mode, calculate_average_color)
result_rects = []
for rect in rectangles:
px_x = max(0, int(rect.x * w))
px_y = max(0, int(rect.y * h))
px_w = max(1, int(rect.width * w))
px_h = max(1, int(rect.height * h))
px_x = min(px_x, w - 1)
px_y = min(px_y, h - 1)
px_w = min(px_w, w - px_x)
px_h = min(px_h, h - px_y)
sub_img = img_array[px_y:px_y + px_h, px_x:px_x + px_w]
r, g, b = calc_fn(sub_img)
result_rects.append(KCTestRectangleResponse(
name=rect.name,
x=rect.x,
y=rect.y,
width=rect.width,
height=rect.height,
color=ExtractedColorResponse(r=r, g=g, b=b, hex=f"#{r:02x}{g:02x}{b:02x}"),
))
# 5. Encode frame as base64 JPEG
full_buffer = io.BytesIO()
pil_image.save(full_buffer, format='JPEG', quality=90)
full_buffer.seek(0)
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
image_data_uri = f"data:image/jpeg;base64,{full_b64}"
return KCTestResponse(
image=image_data_uri,
rectangles=result_rects,
interpolation_mode=settings.interpolation_mode,
pattern_template_name=pattern_tmpl.name,
)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
raise HTTPException(status_code=500, detail=f"Capture error: {str(e)}")
except Exception as e:
logger.error(f"Failed to test KC target: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
finally:
if stream:
try:
stream.cleanup()
except Exception as e:
logger.error(f"Error cleaning up test stream: {e}")
@router.websocket("/api/v1/picture-targets/{target_id}/ws")
async def target_colors_ws(
websocket: WebSocket,
target_id: str,
token: str = Query(""),
):
"""WebSocket for real-time key color updates. Auth via ?token=<api_key>."""
# Authenticate
authenticated = False
if token and config.auth.api_keys:
for _label, api_key in config.auth.api_keys.items():
if secrets.compare_digest(token, api_key):
authenticated = True
break
if not authenticated:
await websocket.close(code=4001, reason="Unauthorized")
return
await websocket.accept()
manager = get_processor_manager()
try:
manager.add_kc_ws_client(target_id, websocket)
except ValueError:
await websocket.close(code=4004, reason="Target not found")
return
try:
while True:
# Keep alive — wait for client messages (or disconnect)
await websocket.receive_text()
except WebSocketDisconnect:
pass
finally:
manager.remove_kc_ws_client(target_id, websocket)