7f613df362
Validate / validate (push) Failing after 9s
- Replace segment-based calibration with core parameters (leds_top/right/bottom/left); segments are now derived at runtime via lookup tables - Fix clockwise/counterclockwise edge traversal order for all 8 start_position/layout combinations (e.g. bottom_left+clockwise now correctly goes up-left first) - Add pixel layout preview overlay with color-coded edges, LED index labels, direction arrows, and start position marker - Move "Add New Device" form into a modal dialog triggered by "+" button - Add display index selector to device settings modal - Migrate from requirements.txt to pyproject.toml for dependency management - Update Dockerfile and docs to use `pip install .` Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
629 lines
21 KiB
Python
629 lines
21 KiB
Python
"""API routes and endpoints."""
|
|
|
|
import sys
|
|
from datetime import datetime
|
|
from typing import List
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, HTTPException, Depends
|
|
|
|
from wled_controller import __version__
|
|
from wled_controller.api.auth import AuthRequired
|
|
from wled_controller.api.schemas import (
|
|
HealthResponse,
|
|
VersionResponse,
|
|
DisplayListResponse,
|
|
DisplayInfo,
|
|
DeviceCreate,
|
|
DeviceUpdate,
|
|
DeviceResponse,
|
|
DeviceListResponse,
|
|
ProcessingSettings as ProcessingSettingsSchema,
|
|
Calibration as CalibrationSchema,
|
|
CalibrationTestModeRequest,
|
|
CalibrationTestModeResponse,
|
|
ProcessingState,
|
|
MetricsResponse,
|
|
)
|
|
from wled_controller.config import get_config
|
|
from wled_controller.core.processor_manager import ProcessorManager, ProcessingSettings
|
|
from wled_controller.core.calibration import (
|
|
calibration_from_dict,
|
|
calibration_to_dict,
|
|
)
|
|
from wled_controller.storage import DeviceStore
|
|
from wled_controller.utils import get_logger
|
|
from wled_controller.core.screen_capture import get_available_displays
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
router = APIRouter()
|
|
|
|
# Global instances (initialized in main.py)
|
|
_device_store: DeviceStore | None = None
|
|
_processor_manager: ProcessorManager | None = None
|
|
|
|
|
|
def get_device_store() -> DeviceStore:
|
|
"""Get device store dependency."""
|
|
if _device_store is None:
|
|
raise RuntimeError("Device store not initialized")
|
|
return _device_store
|
|
|
|
|
|
def get_processor_manager() -> ProcessorManager:
|
|
"""Get processor manager dependency."""
|
|
if _processor_manager is None:
|
|
raise RuntimeError("Processor manager not initialized")
|
|
return _processor_manager
|
|
|
|
|
|
def init_dependencies(device_store: DeviceStore, processor_manager: ProcessorManager):
|
|
"""Initialize global dependencies."""
|
|
global _device_store, _processor_manager
|
|
_device_store = device_store
|
|
_processor_manager = processor_manager
|
|
|
|
|
|
@router.get("/health", response_model=HealthResponse, tags=["Health"])
|
|
async def health_check():
|
|
"""Check service health status.
|
|
|
|
Returns basic health information including status, version, and timestamp.
|
|
"""
|
|
logger.info("Health check requested")
|
|
|
|
return HealthResponse(
|
|
status="healthy",
|
|
timestamp=datetime.utcnow(),
|
|
version=__version__,
|
|
)
|
|
|
|
|
|
@router.get("/api/v1/version", response_model=VersionResponse, tags=["Info"])
|
|
async def get_version():
|
|
"""Get version information.
|
|
|
|
Returns application version, Python version, and API version.
|
|
"""
|
|
logger.info("Version info requested")
|
|
|
|
return VersionResponse(
|
|
version=__version__,
|
|
python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
|
|
api_version="v1",
|
|
)
|
|
|
|
|
|
@router.get("/api/v1/config/displays", response_model=DisplayListResponse, tags=["Config"])
|
|
async def get_displays(_: AuthRequired):
|
|
"""Get list of available displays.
|
|
|
|
Returns information about all available monitors/displays that can be captured.
|
|
"""
|
|
logger.info("Listing available displays")
|
|
|
|
try:
|
|
# Get available displays with all metadata (name, refresh rate, etc.)
|
|
display_dataclasses = get_available_displays()
|
|
|
|
# Convert dataclass DisplayInfo to Pydantic DisplayInfo
|
|
displays = [
|
|
DisplayInfo(
|
|
index=d.index,
|
|
name=d.name,
|
|
width=d.width,
|
|
height=d.height,
|
|
x=d.x,
|
|
y=d.y,
|
|
is_primary=d.is_primary,
|
|
refresh_rate=d.refresh_rate,
|
|
)
|
|
for d in display_dataclasses
|
|
]
|
|
|
|
logger.info(f"Found {len(displays)} displays")
|
|
|
|
return DisplayListResponse(
|
|
displays=displays,
|
|
count=len(displays),
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get displays: {e}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to retrieve display information: {str(e)}"
|
|
)
|
|
|
|
|
|
# ===== DEVICE MANAGEMENT ENDPOINTS =====
|
|
|
|
@router.post("/api/v1/devices", response_model=DeviceResponse, tags=["Devices"], status_code=201)
|
|
async def create_device(
|
|
device_data: DeviceCreate,
|
|
_auth: AuthRequired,
|
|
store: DeviceStore = Depends(get_device_store),
|
|
manager: ProcessorManager = Depends(get_processor_manager),
|
|
):
|
|
"""Create and attach a new WLED device."""
|
|
try:
|
|
logger.info(f"Creating device: {device_data.name}")
|
|
|
|
# Validate WLED device is reachable before adding
|
|
device_url = device_data.url.rstrip("/")
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5) as client:
|
|
response = await client.get(f"{device_url}/json/info")
|
|
response.raise_for_status()
|
|
wled_info = response.json()
|
|
wled_led_count = wled_info.get("leds", {}).get("count")
|
|
if not wled_led_count or wled_led_count < 1:
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail=f"WLED device at {device_url} reported invalid LED count: {wled_led_count}"
|
|
)
|
|
logger.info(
|
|
f"WLED device reachable: {wled_info.get('name', 'Unknown')} "
|
|
f"v{wled_info.get('ver', '?')} ({wled_led_count} LEDs)"
|
|
)
|
|
except httpx.ConnectError:
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail=f"Cannot reach WLED device at {device_url}. Check the URL and ensure the device is powered on."
|
|
)
|
|
except httpx.TimeoutException:
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail=f"Connection to {device_url} timed out. Check network connectivity."
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail=f"Failed to connect to WLED device at {device_url}: {e}"
|
|
)
|
|
|
|
# Create device in storage (LED count auto-detected from WLED)
|
|
device = store.create_device(
|
|
name=device_data.name,
|
|
url=device_data.url,
|
|
led_count=wled_led_count,
|
|
)
|
|
|
|
# Add to processor manager
|
|
manager.add_device(
|
|
device_id=device.id,
|
|
device_url=device.url,
|
|
led_count=device.led_count,
|
|
settings=device.settings,
|
|
calibration=device.calibration,
|
|
)
|
|
|
|
return DeviceResponse(
|
|
id=device.id,
|
|
name=device.name,
|
|
url=device.url,
|
|
led_count=device.led_count,
|
|
enabled=device.enabled,
|
|
status="disconnected",
|
|
settings=ProcessingSettingsSchema(
|
|
display_index=device.settings.display_index,
|
|
fps=device.settings.fps,
|
|
border_width=device.settings.border_width,
|
|
brightness=device.settings.brightness,
|
|
state_check_interval=device.settings.state_check_interval,
|
|
),
|
|
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
|
|
created_at=device.created_at,
|
|
updated_at=device.updated_at,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create device: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/api/v1/devices", response_model=DeviceListResponse, tags=["Devices"])
|
|
async def list_devices(
|
|
_auth: AuthRequired,
|
|
store: DeviceStore = Depends(get_device_store),
|
|
):
|
|
"""List all attached WLED devices."""
|
|
try:
|
|
devices = store.get_all_devices()
|
|
|
|
device_responses = [
|
|
DeviceResponse(
|
|
id=device.id,
|
|
name=device.name,
|
|
url=device.url,
|
|
led_count=device.led_count,
|
|
enabled=device.enabled,
|
|
status="disconnected",
|
|
settings=ProcessingSettingsSchema(
|
|
display_index=device.settings.display_index,
|
|
fps=device.settings.fps,
|
|
border_width=device.settings.border_width,
|
|
brightness=device.settings.brightness,
|
|
state_check_interval=device.settings.state_check_interval,
|
|
),
|
|
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
|
|
created_at=device.created_at,
|
|
updated_at=device.updated_at,
|
|
)
|
|
for device in devices
|
|
]
|
|
|
|
return DeviceListResponse(devices=device_responses, count=len(device_responses))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to list devices: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/api/v1/devices/{device_id}", response_model=DeviceResponse, tags=["Devices"])
|
|
async def get_device(
|
|
device_id: str,
|
|
_auth: AuthRequired,
|
|
store: DeviceStore = Depends(get_device_store),
|
|
manager: ProcessorManager = Depends(get_processor_manager),
|
|
):
|
|
"""Get device details by ID."""
|
|
device = store.get_device(device_id)
|
|
if not device:
|
|
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
|
|
|
|
# Determine status
|
|
status = "connected" if manager.is_processing(device_id) else "disconnected"
|
|
|
|
return DeviceResponse(
|
|
id=device.id,
|
|
name=device.name,
|
|
url=device.url,
|
|
led_count=device.led_count,
|
|
enabled=device.enabled,
|
|
status=status,
|
|
settings=ProcessingSettingsSchema(
|
|
display_index=device.settings.display_index,
|
|
fps=device.settings.fps,
|
|
border_width=device.settings.border_width,
|
|
brightness=device.settings.brightness,
|
|
state_check_interval=device.settings.state_check_interval,
|
|
),
|
|
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
|
|
created_at=device.created_at,
|
|
updated_at=device.updated_at,
|
|
)
|
|
|
|
|
|
@router.put("/api/v1/devices/{device_id}", response_model=DeviceResponse, tags=["Devices"])
|
|
async def update_device(
|
|
device_id: str,
|
|
update_data: DeviceUpdate,
|
|
_auth: AuthRequired,
|
|
store: DeviceStore = Depends(get_device_store),
|
|
):
|
|
"""Update device information."""
|
|
try:
|
|
device = store.update_device(
|
|
device_id=device_id,
|
|
name=update_data.name,
|
|
url=update_data.url,
|
|
enabled=update_data.enabled,
|
|
)
|
|
|
|
return DeviceResponse(
|
|
id=device.id,
|
|
name=device.name,
|
|
url=device.url,
|
|
led_count=device.led_count,
|
|
enabled=device.enabled,
|
|
status="disconnected",
|
|
settings=ProcessingSettingsSchema(
|
|
display_index=device.settings.display_index,
|
|
fps=device.settings.fps,
|
|
border_width=device.settings.border_width,
|
|
brightness=device.settings.brightness,
|
|
state_check_interval=device.settings.state_check_interval,
|
|
),
|
|
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
|
|
created_at=device.created_at,
|
|
updated_at=device.updated_at,
|
|
)
|
|
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=404, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Failed to update device: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.delete("/api/v1/devices/{device_id}", status_code=204, tags=["Devices"])
|
|
async def delete_device(
|
|
device_id: str,
|
|
_auth: AuthRequired,
|
|
store: DeviceStore = Depends(get_device_store),
|
|
manager: ProcessorManager = Depends(get_processor_manager),
|
|
):
|
|
"""Delete/detach a device."""
|
|
try:
|
|
# Stop processing if running
|
|
if manager.is_processing(device_id):
|
|
await manager.stop_processing(device_id)
|
|
|
|
# Remove from manager
|
|
manager.remove_device(device_id)
|
|
|
|
# Delete from storage
|
|
store.delete_device(device_id)
|
|
|
|
logger.info(f"Deleted device {device_id}")
|
|
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=404, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete device: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# ===== PROCESSING CONTROL ENDPOINTS =====
|
|
|
|
@router.post("/api/v1/devices/{device_id}/start", tags=["Processing"])
|
|
async def start_processing(
|
|
device_id: str,
|
|
_auth: AuthRequired,
|
|
store: DeviceStore = Depends(get_device_store),
|
|
manager: ProcessorManager = Depends(get_processor_manager),
|
|
):
|
|
"""Start screen processing for a device."""
|
|
try:
|
|
# Verify device exists
|
|
device = store.get_device(device_id)
|
|
if not device:
|
|
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
|
|
|
|
await manager.start_processing(device_id)
|
|
|
|
logger.info(f"Started processing for device {device_id}")
|
|
return {"status": "started", "device_id": device_id}
|
|
|
|
except RuntimeError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Failed to start processing: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/api/v1/devices/{device_id}/stop", tags=["Processing"])
|
|
async def stop_processing(
|
|
device_id: str,
|
|
_auth: AuthRequired,
|
|
manager: ProcessorManager = Depends(get_processor_manager),
|
|
):
|
|
"""Stop screen processing for a device."""
|
|
try:
|
|
await manager.stop_processing(device_id)
|
|
|
|
logger.info(f"Stopped processing for device {device_id}")
|
|
return {"status": "stopped", "device_id": device_id}
|
|
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=404, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Failed to stop processing: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/api/v1/devices/{device_id}/state", response_model=ProcessingState, tags=["Processing"])
|
|
async def get_processing_state(
|
|
device_id: str,
|
|
_auth: AuthRequired,
|
|
manager: ProcessorManager = Depends(get_processor_manager),
|
|
):
|
|
"""Get current processing state for a device."""
|
|
try:
|
|
state = manager.get_state(device_id)
|
|
return ProcessingState(**state)
|
|
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=404, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Failed to get state: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# ===== SETTINGS ENDPOINTS =====
|
|
|
|
@router.get("/api/v1/devices/{device_id}/settings", response_model=ProcessingSettingsSchema, tags=["Settings"])
|
|
async def get_settings(
|
|
device_id: str,
|
|
_auth: AuthRequired,
|
|
store: DeviceStore = Depends(get_device_store),
|
|
):
|
|
"""Get processing settings for a device."""
|
|
device = store.get_device(device_id)
|
|
if not device:
|
|
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
|
|
|
|
return ProcessingSettingsSchema(
|
|
display_index=device.settings.display_index,
|
|
fps=device.settings.fps,
|
|
border_width=device.settings.border_width,
|
|
brightness=device.settings.brightness,
|
|
state_check_interval=device.settings.state_check_interval,
|
|
)
|
|
|
|
|
|
@router.put("/api/v1/devices/{device_id}/settings", response_model=ProcessingSettingsSchema, tags=["Settings"])
|
|
async def update_settings(
|
|
device_id: str,
|
|
settings: ProcessingSettingsSchema,
|
|
_auth: AuthRequired,
|
|
store: DeviceStore = Depends(get_device_store),
|
|
manager: ProcessorManager = Depends(get_processor_manager),
|
|
):
|
|
"""Update processing settings for a device."""
|
|
try:
|
|
# Create ProcessingSettings from schema
|
|
new_settings = ProcessingSettings(
|
|
display_index=settings.display_index,
|
|
fps=settings.fps,
|
|
border_width=settings.border_width,
|
|
brightness=settings.color_correction.brightness if settings.color_correction else 1.0,
|
|
gamma=settings.color_correction.gamma if settings.color_correction else 2.2,
|
|
saturation=settings.color_correction.saturation if settings.color_correction else 1.0,
|
|
state_check_interval=settings.state_check_interval,
|
|
)
|
|
|
|
# Update in storage
|
|
device = store.update_device(device_id, settings=new_settings)
|
|
|
|
# Update in manager if device exists
|
|
try:
|
|
manager.update_settings(device_id, new_settings)
|
|
except ValueError:
|
|
# Device not in manager yet, that's ok
|
|
pass
|
|
|
|
return ProcessingSettingsSchema(
|
|
display_index=device.settings.display_index,
|
|
fps=device.settings.fps,
|
|
border_width=device.settings.border_width,
|
|
brightness=device.settings.brightness,
|
|
state_check_interval=device.settings.state_check_interval,
|
|
)
|
|
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=404, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Failed to update settings: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# ===== CALIBRATION ENDPOINTS =====
|
|
|
|
@router.get("/api/v1/devices/{device_id}/calibration", response_model=CalibrationSchema, tags=["Calibration"])
|
|
async def get_calibration(
|
|
device_id: str,
|
|
_auth: AuthRequired,
|
|
store: DeviceStore = Depends(get_device_store),
|
|
):
|
|
"""Get calibration configuration for a device."""
|
|
device = store.get_device(device_id)
|
|
if not device:
|
|
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
|
|
|
|
return CalibrationSchema(**calibration_to_dict(device.calibration))
|
|
|
|
|
|
@router.put("/api/v1/devices/{device_id}/calibration", response_model=CalibrationSchema, tags=["Calibration"])
|
|
async def update_calibration(
|
|
device_id: str,
|
|
calibration_data: CalibrationSchema,
|
|
_auth: AuthRequired,
|
|
store: DeviceStore = Depends(get_device_store),
|
|
manager: ProcessorManager = Depends(get_processor_manager),
|
|
):
|
|
"""Update calibration configuration for a device."""
|
|
try:
|
|
# Convert schema to CalibrationConfig
|
|
calibration_dict = calibration_data.model_dump()
|
|
calibration = calibration_from_dict(calibration_dict)
|
|
|
|
# Update in storage
|
|
device = store.update_device(device_id, calibration=calibration)
|
|
|
|
# Update in manager if device exists
|
|
try:
|
|
manager.update_calibration(device_id, calibration)
|
|
except ValueError:
|
|
# Device not in manager yet, that's ok
|
|
pass
|
|
|
|
return CalibrationSchema(**calibration_to_dict(device.calibration))
|
|
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Failed to update calibration: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.put(
|
|
"/api/v1/devices/{device_id}/calibration/test",
|
|
response_model=CalibrationTestModeResponse,
|
|
tags=["Calibration"],
|
|
)
|
|
async def set_calibration_test_mode(
|
|
device_id: str,
|
|
body: CalibrationTestModeRequest,
|
|
_auth: AuthRequired,
|
|
store: DeviceStore = Depends(get_device_store),
|
|
manager: ProcessorManager = Depends(get_processor_manager),
|
|
):
|
|
"""Toggle calibration test mode for specific edges.
|
|
|
|
Send edges with colors to light them up, or empty edges dict to exit test mode.
|
|
While test mode is active, screen capture processing is paused.
|
|
"""
|
|
try:
|
|
device = store.get_device(device_id)
|
|
if not device:
|
|
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
|
|
|
|
# Validate edge names and colors
|
|
valid_edges = {"top", "right", "bottom", "left"}
|
|
for edge_name, color in body.edges.items():
|
|
if edge_name not in valid_edges:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid edge '{edge_name}'. Must be one of: {', '.join(valid_edges)}"
|
|
)
|
|
if len(color) != 3 or not all(0 <= c <= 255 for c in color):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid color for edge '{edge_name}'. Must be [R, G, B] with values 0-255."
|
|
)
|
|
|
|
await manager.set_test_mode(device_id, body.edges)
|
|
|
|
active_edges = list(body.edges.keys())
|
|
logger.info(
|
|
f"Test mode {'activated' if active_edges else 'deactivated'} "
|
|
f"for device {device_id}: {active_edges}"
|
|
)
|
|
|
|
return CalibrationTestModeResponse(
|
|
test_mode=len(active_edges) > 0,
|
|
active_edges=active_edges,
|
|
device_id=device_id,
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=404, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Failed to set test mode: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# ===== METRICS ENDPOINTS =====
|
|
|
|
@router.get("/api/v1/devices/{device_id}/metrics", response_model=MetricsResponse, tags=["Metrics"])
|
|
async def get_metrics(
|
|
device_id: str,
|
|
_auth: AuthRequired,
|
|
manager: ProcessorManager = Depends(get_processor_manager),
|
|
):
|
|
"""Get processing metrics for a device."""
|
|
try:
|
|
metrics = manager.get_metrics(device_id)
|
|
return MetricsResponse(**metrics)
|
|
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=404, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Failed to get metrics: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|