Initial commit: WLED Screen Controller with FastAPI server and Home Assistant integration
Some checks failed
Validate / validate (push) Failing after 1m6s

This is a complete WLED ambient lighting controller that captures screen border pixels
and sends them to WLED devices for immersive ambient lighting effects.

## Server Features:
- FastAPI-based REST API with 17+ endpoints
- Real-time screen capture with multi-monitor support
- Advanced LED calibration system with visual GUI
- API key authentication with labeled tokens
- Per-device brightness control (0-100%)
- Configurable FPS (1-60), border width, and color correction
- Persistent device storage (JSON-based)
- Comprehensive Web UI with dark/light themes
- Docker support with docker-compose
- Windows monitor name detection via WMI (shows "LG ULTRAWIDE" etc.)

## Web UI Features:
- Device management (add, configure, remove WLED devices)
- Real-time status monitoring with FPS metrics
- Settings modal for device configuration
- Visual calibration GUI with edge testing
- Brightness slider per device
- Display selection with friendly monitor names
- Token-based authentication with login/logout
- Responsive button layout

## Calibration System:
- Support for any LED strip layout (clockwise/counterclockwise)
- 4 starting position options (corners)
- Per-edge LED count configuration
- Visual preview with starting position indicator
- Test buttons to light up individual edges
- Smart LED ordering based on start position and direction

## Home Assistant Integration:
- Custom HACS integration
- Switch entities for processing control
- Sensor entities for status and FPS
- Select entities for display selection
- Config flow for easy setup
- Auto-discovery of devices from server

## Technical Stack:
- Python 3.11+
- FastAPI + uvicorn
- mss (screen capture)
- httpx (async WLED client)
- Pydantic (validation)
- WMI (Windows monitor detection)
- Structlog (logging)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-06 16:38:27 +03:00
commit d471a40234
57 changed files with 9726 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
"""WLED Screen Controller - Ambient lighting based on screen content."""
__version__ = "0.1.0"
__author__ = "Alexei Dolgolyov"
__email__ = "dolgolyov.alexei@gmail.com"

View File

@@ -0,0 +1,5 @@
"""API routes and schemas."""
from .routes import router
__all__ = ["router"]

View File

@@ -0,0 +1,77 @@
"""Authentication module for API key validation."""
import secrets
from typing import Annotated
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from wled_controller.config import get_config
from wled_controller.utils import get_logger
logger = get_logger(__name__)
# Security scheme for Bearer token
security = HTTPBearer(auto_error=False)
def verify_api_key(
credentials: Annotated[HTTPAuthorizationCredentials | None, Security(security)]
) -> str:
"""Verify API key from Authorization header.
Args:
credentials: HTTP authorization credentials
Returns:
Label/identifier of the authenticated client
Raises:
HTTPException: If authentication is required but invalid
"""
config = get_config()
# Check if credentials are provided
if not credentials:
logger.warning("Request missing Authorization header")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing API key - authentication is required",
headers={"WWW-Authenticate": "Bearer"},
)
# Extract token
token = credentials.credentials
# Verify against configured API keys
if not config.auth.api_keys:
logger.error("No API keys configured - server misconfiguration")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Server authentication not configured properly",
)
# Find matching key and return its label using constant-time comparison
authenticated_as = None
for label, api_key in config.auth.api_keys.items():
if secrets.compare_digest(token, api_key):
authenticated_as = label
break
if not authenticated_as:
logger.warning(f"Invalid API key attempt: {token[:8]}...")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
headers={"WWW-Authenticate": "Bearer"},
)
# Log successful authentication
logger.debug(f"Authenticated as: {authenticated_as}")
return authenticated_as
# Dependency for protected routes
# Returns the label/identifier of the authenticated client
AuthRequired = Annotated[str, Depends(verify_api_key)]

View File

@@ -0,0 +1,598 @@
"""API routes and endpoints."""
import sys
from datetime import datetime
from typing import List
from fastapi import APIRouter, HTTPException, Depends
from wled_controller import __version__
from wled_controller.api.auth import AuthRequired
from wled_controller.api.schemas import (
HealthResponse,
VersionResponse,
DisplayListResponse,
DisplayInfo,
DeviceCreate,
DeviceUpdate,
DeviceResponse,
DeviceListResponse,
ProcessingSettings as ProcessingSettingsSchema,
Calibration as CalibrationSchema,
ProcessingState,
MetricsResponse,
)
from wled_controller.config import get_config
from wled_controller.core.processor_manager import ProcessorManager, ProcessingSettings
from wled_controller.core.calibration import (
calibration_from_dict,
calibration_to_dict,
)
from wled_controller.storage import DeviceStore
from wled_controller.utils import get_logger, get_monitor_names
logger = get_logger(__name__)
router = APIRouter()
# Global instances (initialized in main.py)
_device_store: DeviceStore | None = None
_processor_manager: ProcessorManager | None = None
def get_device_store() -> DeviceStore:
"""Get device store dependency."""
if _device_store is None:
raise RuntimeError("Device store not initialized")
return _device_store
def get_processor_manager() -> ProcessorManager:
"""Get processor manager dependency."""
if _processor_manager is None:
raise RuntimeError("Processor manager not initialized")
return _processor_manager
def init_dependencies(device_store: DeviceStore, processor_manager: ProcessorManager):
"""Initialize global dependencies."""
global _device_store, _processor_manager
_device_store = device_store
_processor_manager = processor_manager
@router.get("/health", response_model=HealthResponse, tags=["Health"])
async def health_check():
"""Check service health status.
Returns basic health information including status, version, and timestamp.
"""
logger.info("Health check requested")
return HealthResponse(
status="healthy",
timestamp=datetime.utcnow(),
version=__version__,
)
@router.get("/api/v1/version", response_model=VersionResponse, tags=["Info"])
async def get_version():
"""Get version information.
Returns application version, Python version, and API version.
"""
logger.info("Version info requested")
return VersionResponse(
version=__version__,
python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
api_version="v1",
)
@router.get("/api/v1/config/displays", response_model=DisplayListResponse, tags=["Config"])
async def get_displays(_: AuthRequired):
"""Get list of available displays.
Returns information about all available monitors/displays that can be captured.
"""
logger.info("Listing available displays")
try:
# Import here to avoid issues if mss is not installed yet
import mss
# Get friendly monitor names (Windows only, falls back to generic names)
monitor_names = get_monitor_names()
with mss.mss() as sct:
displays = []
# Skip the first monitor (it's the combined virtual screen on multi-monitor setups)
for idx, monitor in enumerate(sct.monitors[1:], start=0):
# Use friendly name from WMI if available, otherwise generic name
friendly_name = monitor_names.get(idx, f"Display {idx}")
display_info = DisplayInfo(
index=idx,
name=friendly_name,
width=monitor["width"],
height=monitor["height"],
x=monitor["left"],
y=monitor["top"],
is_primary=(idx == 0),
)
displays.append(display_info)
logger.info(f"Found {len(displays)} displays")
return DisplayListResponse(
displays=displays,
count=len(displays),
)
except Exception as e:
logger.error(f"Failed to get displays: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to retrieve display information: {str(e)}"
)
# ===== DEVICE MANAGEMENT ENDPOINTS =====
@router.post("/api/v1/devices", response_model=DeviceResponse, tags=["Devices"], status_code=201)
async def create_device(
device_data: DeviceCreate,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Create and attach a new WLED device."""
try:
logger.info(f"Creating device: {device_data.name}")
# Create device in storage
device = store.create_device(
name=device_data.name,
url=device_data.url,
led_count=device_data.led_count,
)
# Add to processor manager
manager.add_device(
device_id=device.id,
device_url=device.url,
led_count=device.led_count,
settings=device.settings,
calibration=device.calibration,
)
return DeviceResponse(
id=device.id,
name=device.name,
url=device.url,
led_count=device.led_count,
enabled=device.enabled,
status="disconnected",
settings=ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
brightness=device.settings.brightness,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
created_at=device.created_at,
updated_at=device.updated_at,
)
except Exception as e:
logger.error(f"Failed to create device: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/devices", response_model=DeviceListResponse, tags=["Devices"])
async def list_devices(
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
):
"""List all attached WLED devices."""
try:
devices = store.get_all_devices()
device_responses = [
DeviceResponse(
id=device.id,
name=device.name,
url=device.url,
led_count=device.led_count,
enabled=device.enabled,
status="disconnected",
settings=ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
created_at=device.created_at,
updated_at=device.updated_at,
)
for device in devices
]
return DeviceListResponse(devices=device_responses, count=len(device_responses))
except Exception as e:
logger.error(f"Failed to list devices: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/devices/{device_id}", response_model=DeviceResponse, tags=["Devices"])
async def get_device(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Get device details by ID."""
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
# Determine status
status = "connected" if manager.is_processing(device_id) else "disconnected"
return DeviceResponse(
id=device.id,
name=device.name,
url=device.url,
led_count=device.led_count,
enabled=device.enabled,
status=status,
settings=ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
created_at=device.created_at,
updated_at=device.updated_at,
)
@router.put("/api/v1/devices/{device_id}", response_model=DeviceResponse, tags=["Devices"])
async def update_device(
device_id: str,
update_data: DeviceUpdate,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
):
"""Update device information."""
try:
device = store.update_device(
device_id=device_id,
name=update_data.name,
url=update_data.url,
led_count=update_data.led_count,
enabled=update_data.enabled,
)
return DeviceResponse(
id=device.id,
name=device.name,
url=device.url,
led_count=device.led_count,
enabled=device.enabled,
status="disconnected",
settings=ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
brightness=device.settings.brightness,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
created_at=device.created_at,
updated_at=device.updated_at,
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to update device: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/devices/{device_id}", status_code=204, tags=["Devices"])
async def delete_device(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Delete/detach a device."""
try:
# Stop processing if running
if manager.is_processing(device_id):
await manager.stop_processing(device_id)
# Remove from manager
manager.remove_device(device_id)
# Delete from storage
store.delete_device(device_id)
logger.info(f"Deleted device {device_id}")
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to delete device: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== PROCESSING CONTROL ENDPOINTS =====
@router.post("/api/v1/devices/{device_id}/start", tags=["Processing"])
async def start_processing(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Start screen processing for a device."""
try:
# Verify device exists
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
await manager.start_processing(device_id)
logger.info(f"Started processing for device {device_id}")
return {"status": "started", "device_id": device_id}
except RuntimeError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to start processing: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/devices/{device_id}/stop", tags=["Processing"])
async def stop_processing(
device_id: str,
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Stop screen processing for a device."""
try:
await manager.stop_processing(device_id)
logger.info(f"Stopped processing for device {device_id}")
return {"status": "stopped", "device_id": device_id}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to stop processing: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/devices/{device_id}/state", response_model=ProcessingState, tags=["Processing"])
async def get_processing_state(
device_id: str,
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Get current processing state for a device."""
try:
state = manager.get_state(device_id)
return ProcessingState(**state)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to get state: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== SETTINGS ENDPOINTS =====
@router.get("/api/v1/devices/{device_id}/settings", response_model=ProcessingSettingsSchema, tags=["Settings"])
async def get_settings(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
):
"""Get processing settings for a device."""
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
return ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
brightness=device.settings.brightness,
)
@router.put("/api/v1/devices/{device_id}/settings", response_model=ProcessingSettingsSchema, tags=["Settings"])
async def update_settings(
device_id: str,
settings: ProcessingSettingsSchema,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Update processing settings for a device."""
try:
# Create ProcessingSettings from schema
new_settings = ProcessingSettings(
display_index=settings.display_index,
fps=settings.fps,
border_width=settings.border_width,
brightness=settings.color_correction.brightness if settings.color_correction else 1.0,
gamma=settings.color_correction.gamma if settings.color_correction else 2.2,
saturation=settings.color_correction.saturation if settings.color_correction else 1.0,
)
# Update in storage
device = store.update_device(device_id, settings=new_settings)
# Update in manager if device exists
try:
manager.update_settings(device_id, new_settings)
except ValueError:
# Device not in manager yet, that's ok
pass
return ProcessingSettingsSchema(
display_index=device.settings.display_index,
fps=device.settings.fps,
border_width=device.settings.border_width,
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to update settings: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== CALIBRATION ENDPOINTS =====
@router.get("/api/v1/devices/{device_id}/calibration", response_model=CalibrationSchema, tags=["Calibration"])
async def get_calibration(
device_id: str,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
):
"""Get calibration configuration for a device."""
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
return CalibrationSchema(**calibration_to_dict(device.calibration))
@router.put("/api/v1/devices/{device_id}/calibration", response_model=CalibrationSchema, tags=["Calibration"])
async def update_calibration(
device_id: str,
calibration_data: CalibrationSchema,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Update calibration configuration for a device."""
try:
# Convert schema to CalibrationConfig
calibration_dict = calibration_data.model_dump()
calibration = calibration_from_dict(calibration_dict)
# Update in storage
device = store.update_device(device_id, calibration=calibration)
# Update in manager if device exists
try:
manager.update_calibration(device_id, calibration)
except ValueError:
# Device not in manager yet, that's ok
pass
return CalibrationSchema(**calibration_to_dict(device.calibration))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to update calibration: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/devices/{device_id}/calibration/test", tags=["Calibration"])
async def test_calibration(
device_id: str,
_auth: AuthRequired,
edge: str = "top",
color: List[int] = [255, 0, 0],
store: DeviceStore = Depends(get_device_store),
):
"""Test calibration by lighting up specific edge.
Useful for verifying LED positions match screen edges.
"""
try:
# Get device
device = store.get_device(device_id)
if not device:
raise HTTPException(status_code=404, detail=f"Device {device_id} not found")
# Find the segment for this edge
segment = None
for seg in device.calibration.segments:
if seg.edge == edge:
segment = seg
break
if not segment:
raise HTTPException(status_code=400, detail=f"No LEDs configured for {edge} edge")
# Create pixel array - all black except for the test edge
pixels = [(0, 0, 0)] * device.led_count
# Light up the test edge
r, g, b = color if len(color) == 3 else [255, 0, 0]
for i in range(segment.led_start, segment.led_start + segment.led_count):
if i < device.led_count:
pixels[i] = (r, g, b)
# Send to WLED
from wled_controller.core.wled_client import WLEDClient
import asyncio
async with WLEDClient(device.url) as wled:
# Light up the edge
await wled.send_pixels(pixels)
# Wait 2 seconds
await asyncio.sleep(2)
# Turn off
pixels_off = [(0, 0, 0)] * device.led_count
await wled.send_pixels(pixels_off)
logger.info(f"Calibration test completed for edge '{edge}' on device {device_id}")
return {
"status": "test_completed",
"device_id": device_id,
"edge": edge,
"led_count": segment.led_count,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to test calibration: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== METRICS ENDPOINTS =====
@router.get("/api/v1/devices/{device_id}/metrics", response_model=MetricsResponse, tags=["Metrics"])
async def get_metrics(
device_id: str,
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Get processing metrics for a device."""
try:
metrics = manager.get_metrics(device_id)
return MetricsResponse(**metrics)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to get metrics: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -0,0 +1,175 @@
"""Pydantic schemas for API request and response models."""
from datetime import datetime
from typing import Dict, List, Literal, Optional
from pydantic import BaseModel, Field, HttpUrl
# Health and Version Schemas
class HealthResponse(BaseModel):
"""Health check response."""
status: Literal["healthy", "unhealthy"] = Field(description="Service health status")
timestamp: datetime = Field(description="Current server time")
version: str = Field(description="Application version")
class VersionResponse(BaseModel):
"""Version information response."""
version: str = Field(description="Application version")
python_version: str = Field(description="Python version")
api_version: str = Field(description="API version")
# Display Schemas
class DisplayInfo(BaseModel):
"""Display/monitor information."""
index: int = Field(description="Display index")
name: str = Field(description="Display name")
width: int = Field(description="Display width in pixels")
height: int = Field(description="Display height in pixels")
x: int = Field(description="Display X position")
y: int = Field(description="Display Y position")
is_primary: bool = Field(default=False, description="Whether this is the primary display")
class DisplayListResponse(BaseModel):
"""List of available displays."""
displays: List[DisplayInfo] = Field(description="Available displays")
count: int = Field(description="Number of displays")
# Device Schemas
class DeviceCreate(BaseModel):
"""Request to create/attach a WLED device."""
name: str = Field(description="Device name", min_length=1, max_length=100)
url: str = Field(description="WLED device URL (e.g., http://192.168.1.100)")
led_count: int = Field(description="Total number of LEDs", gt=0, le=10000)
class DeviceUpdate(BaseModel):
"""Request to update device information."""
name: Optional[str] = Field(None, description="Device name", min_length=1, max_length=100)
url: Optional[str] = Field(None, description="WLED device URL")
led_count: Optional[int] = Field(None, description="Total number of LEDs", gt=0, le=10000)
enabled: Optional[bool] = Field(None, description="Whether device is enabled")
class ColorCorrection(BaseModel):
"""Color correction settings."""
gamma: float = Field(default=2.2, description="Gamma correction", ge=0.1, le=5.0)
saturation: float = Field(default=1.0, description="Saturation multiplier", ge=0.0, le=2.0)
brightness: float = Field(default=1.0, description="Brightness multiplier", ge=0.0, le=1.0)
class ProcessingSettings(BaseModel):
"""Processing settings for a device."""
display_index: int = Field(default=0, description="Display to capture", ge=0)
fps: int = Field(default=30, description="Target frames per second", ge=1, le=60)
border_width: int = Field(default=10, description="Border width in pixels", ge=1, le=100)
brightness: float = Field(default=1.0, description="Global brightness (0.0-1.0)", ge=0.0, le=1.0)
color_correction: Optional[ColorCorrection] = Field(
default_factory=ColorCorrection,
description="Color correction settings"
)
class CalibrationSegment(BaseModel):
"""Calibration segment for LED mapping."""
edge: Literal["top", "right", "bottom", "left"] = Field(description="Screen edge")
led_start: int = Field(description="Starting LED index", ge=0)
led_count: int = Field(description="Number of LEDs on this edge", gt=0)
reverse: bool = Field(default=False, description="Reverse LED order on this edge")
class Calibration(BaseModel):
"""Calibration configuration for pixel-to-LED mapping."""
layout: Literal["clockwise", "counterclockwise"] = Field(
default="clockwise",
description="LED strip layout direction"
)
start_position: Literal["top_left", "top_right", "bottom_left", "bottom_right"] = Field(
default="bottom_left",
description="Position of LED index 0"
)
segments: List[CalibrationSegment] = Field(
description="LED segments for each screen edge",
min_length=1,
max_length=4
)
class DeviceResponse(BaseModel):
"""Device information response."""
id: str = Field(description="Device ID")
name: str = Field(description="Device name")
url: str = Field(description="WLED device URL")
led_count: int = Field(description="Total number of LEDs")
enabled: bool = Field(description="Whether device is enabled")
status: Literal["connected", "disconnected", "error"] = Field(
description="Connection status"
)
settings: ProcessingSettings = Field(description="Processing settings")
calibration: Optional[Calibration] = Field(None, description="Calibration configuration")
created_at: datetime = Field(description="Creation timestamp")
updated_at: datetime = Field(description="Last update timestamp")
class DeviceListResponse(BaseModel):
"""List of devices response."""
devices: List[DeviceResponse] = Field(description="List of devices")
count: int = Field(description="Number of devices")
# Processing State Schemas
class ProcessingState(BaseModel):
"""Processing state for a device."""
device_id: str = Field(description="Device ID")
processing: bool = Field(description="Whether processing is active")
fps_actual: Optional[float] = Field(None, description="Actual FPS achieved")
fps_target: int = Field(description="Target FPS")
display_index: int = Field(description="Current display index")
last_update: Optional[datetime] = Field(None, description="Last successful update")
errors: List[str] = Field(default_factory=list, description="Recent errors")
class MetricsResponse(BaseModel):
"""Device metrics response."""
device_id: str = Field(description="Device ID")
processing: bool = Field(description="Whether processing is active")
fps_actual: Optional[float] = Field(None, description="Actual FPS")
fps_target: int = Field(description="Target FPS")
uptime_seconds: float = Field(description="Processing uptime in seconds")
frames_processed: int = Field(description="Total frames processed")
errors_count: int = Field(description="Total error count")
last_error: Optional[str] = Field(None, description="Last error message")
last_update: Optional[datetime] = Field(None, description="Last update timestamp")
# Error Schemas
class ErrorResponse(BaseModel):
"""Error response."""
error: str = Field(description="Error type")
message: str = Field(description="Error message")
detail: Optional[Dict] = Field(None, description="Additional error details")
timestamp: datetime = Field(default_factory=datetime.utcnow, description="Error timestamp")

View File

@@ -0,0 +1,154 @@
"""Configuration management for WLED Screen Controller."""
import os
from pathlib import Path
from typing import List, Literal
import yaml
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class ServerConfig(BaseSettings):
"""Server configuration."""
host: str = "0.0.0.0"
port: int = 8080
log_level: str = "INFO"
cors_origins: List[str] = ["*"]
class AuthConfig(BaseSettings):
"""Authentication configuration."""
api_keys: dict[str, str] = {} # label: key mapping (required for security)
class ProcessingConfig(BaseSettings):
"""Processing configuration."""
default_fps: int = 30
max_fps: int = 60
min_fps: int = 1
border_width: int = 10
interpolation_mode: Literal["average", "median", "dominant"] = "average"
class ScreenCaptureConfig(BaseSettings):
"""Screen capture configuration."""
buffer_size: int = 2
class WLEDConfig(BaseSettings):
"""WLED client configuration."""
timeout: int = 5
retry_attempts: int = 3
retry_delay: int = 1
protocol: Literal["http", "https"] = "http"
max_brightness: int = 255
class StorageConfig(BaseSettings):
"""Storage configuration."""
devices_file: str = "data/devices.json"
class LoggingConfig(BaseSettings):
"""Logging configuration."""
format: Literal["json", "text"] = "json"
file: str = "logs/wled_controller.log"
max_size_mb: int = 100
backup_count: int = 5
class Config(BaseSettings):
"""Main application configuration."""
model_config = SettingsConfigDict(
env_prefix="WLED_",
env_nested_delimiter="__",
case_sensitive=False,
)
server: ServerConfig = Field(default_factory=ServerConfig)
auth: AuthConfig = Field(default_factory=AuthConfig)
processing: ProcessingConfig = Field(default_factory=ProcessingConfig)
screen_capture: ScreenCaptureConfig = Field(default_factory=ScreenCaptureConfig)
wled: WLEDConfig = Field(default_factory=WLEDConfig)
storage: StorageConfig = Field(default_factory=StorageConfig)
logging: LoggingConfig = Field(default_factory=LoggingConfig)
@classmethod
def from_yaml(cls, config_path: str | Path) -> "Config":
"""Load configuration from YAML file.
Args:
config_path: Path to YAML configuration file
Returns:
Config instance
"""
config_path = Path(config_path)
if not config_path.exists():
raise FileNotFoundError(f"Configuration file not found: {config_path}")
with open(config_path, "r") as f:
config_data = yaml.safe_load(f)
return cls(**config_data)
@classmethod
def load(cls) -> "Config":
"""Load configuration from default locations.
Tries to load from:
1. Environment variable WLED_CONFIG_PATH
2. ./config/default_config.yaml
3. Default values
Returns:
Config instance
"""
config_path = os.getenv("WLED_CONFIG_PATH")
if config_path:
return cls.from_yaml(config_path)
# Try default location
default_path = Path("config/default_config.yaml")
if default_path.exists():
return cls.from_yaml(default_path)
# Use defaults
return cls()
# Global configuration instance
config: Config | None = None
def get_config() -> Config:
"""Get global configuration instance.
Returns:
Config instance
"""
global config
if config is None:
config = Config.load()
return config
def reload_config() -> Config:
"""Reload configuration from file.
Returns:
New Config instance
"""
global config
config = Config.load()
return config

View File

@@ -0,0 +1,17 @@
"""Core functionality for screen capture and WLED control."""
from .screen_capture import (
get_available_displays,
capture_display,
extract_border_pixels,
ScreenCapture,
BorderPixels,
)
__all__ = [
"get_available_displays",
"capture_display",
"extract_border_pixels",
"ScreenCapture",
"BorderPixels",
]

View File

@@ -0,0 +1,344 @@
"""Calibration system for mapping screen pixels to LED positions."""
from dataclasses import dataclass
from typing import List, Literal, Tuple
from wled_controller.core.screen_capture import (
BorderPixels,
get_edge_segments,
calculate_average_color,
calculate_median_color,
calculate_dominant_color,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
@dataclass
class CalibrationSegment:
"""Configuration for one segment of the LED strip."""
edge: Literal["top", "right", "bottom", "left"]
led_start: int
led_count: int
reverse: bool = False
@dataclass
class CalibrationConfig:
"""Complete calibration configuration."""
layout: Literal["clockwise", "counterclockwise"]
start_position: Literal["top_left", "top_right", "bottom_left", "bottom_right"]
segments: List[CalibrationSegment]
def validate(self) -> bool:
"""Validate calibration configuration.
Returns:
True if configuration is valid
Raises:
ValueError: If configuration is invalid
"""
if not self.segments:
raise ValueError("Calibration must have at least one segment")
# Check for duplicate edges
edges = [seg.edge for seg in self.segments]
if len(edges) != len(set(edges)):
raise ValueError("Duplicate edges in calibration segments")
# Validate LED indices don't overlap
led_ranges = []
for seg in self.segments:
led_range = range(seg.led_start, seg.led_start + seg.led_count)
led_ranges.append(led_range)
# Check for overlaps
for i, range1 in enumerate(led_ranges):
for j, range2 in enumerate(led_ranges):
if i != j:
overlap = set(range1) & set(range2)
if overlap:
raise ValueError(
f"LED indices overlap between segments {i} and {j}: {overlap}"
)
# Validate LED counts are positive
for seg in self.segments:
if seg.led_count <= 0:
raise ValueError(f"LED count must be positive, got {seg.led_count}")
if seg.led_start < 0:
raise ValueError(f"LED start must be non-negative, got {seg.led_start}")
return True
def get_total_leds(self) -> int:
"""Get total number of LEDs across all segments."""
return sum(seg.led_count for seg in self.segments)
def get_segment_for_edge(self, edge: str) -> CalibrationSegment | None:
"""Get segment configuration for a specific edge."""
for seg in self.segments:
if seg.edge == edge:
return seg
return None
class PixelMapper:
"""Maps screen border pixels to LED colors based on calibration."""
def __init__(
self,
calibration: CalibrationConfig,
interpolation_mode: Literal["average", "median", "dominant"] = "average",
):
"""Initialize pixel mapper.
Args:
calibration: Calibration configuration
interpolation_mode: Color calculation mode
"""
self.calibration = calibration
self.interpolation_mode = interpolation_mode
# Validate calibration
self.calibration.validate()
# Select color calculation function
if interpolation_mode == "average":
self._calc_color = calculate_average_color
elif interpolation_mode == "median":
self._calc_color = calculate_median_color
elif interpolation_mode == "dominant":
self._calc_color = calculate_dominant_color
else:
raise ValueError(f"Invalid interpolation mode: {interpolation_mode}")
logger.info(
f"Initialized pixel mapper with {self.calibration.get_total_leds()} LEDs "
f"using {interpolation_mode} interpolation"
)
def map_border_to_leds(
self,
border_pixels: BorderPixels
) -> List[Tuple[int, int, int]]:
"""Map screen border pixels to LED colors.
Args:
border_pixels: Extracted border pixels from screen
Returns:
List of (R, G, B) tuples for each LED
Raises:
ValueError: If border pixels don't match calibration
"""
total_leds = self.calibration.get_total_leds()
led_colors = [(0, 0, 0)] * total_leds
# Process each edge
for edge_name in ["top", "right", "bottom", "left"]:
segment = self.calibration.get_segment_for_edge(edge_name)
if not segment:
# This edge is not configured
continue
# Get pixels for this edge
if edge_name == "top":
edge_pixels = border_pixels.top
elif edge_name == "right":
edge_pixels = border_pixels.right
elif edge_name == "bottom":
edge_pixels = border_pixels.bottom
else: # left
edge_pixels = border_pixels.left
# Divide edge into segments matching LED count
try:
pixel_segments = get_edge_segments(
edge_pixels,
segment.led_count,
edge_name
)
except ValueError as e:
logger.error(f"Failed to segment {edge_name} edge: {e}")
raise
# Calculate LED indices for this segment
led_indices = list(range(segment.led_start, segment.led_start + segment.led_count))
# Reverse if needed
if segment.reverse:
led_indices = list(reversed(led_indices))
# Map pixel segments to LEDs
for led_idx, pixel_segment in zip(led_indices, pixel_segments):
color = self._calc_color(pixel_segment)
led_colors[led_idx] = color
logger.debug(f"Mapped border pixels to {total_leds} LED colors")
return led_colors
def test_calibration(self, edge: str, color: Tuple[int, int, int]) -> List[Tuple[int, int, int]]:
"""Generate test pattern to light up specific edge.
Useful for verifying calibration configuration.
Args:
edge: Edge to light up (top, right, bottom, left)
color: RGB color to use
Returns:
List of LED colors with only the specified edge lit
Raises:
ValueError: If edge is not in calibration
"""
segment = self.calibration.get_segment_for_edge(edge)
if not segment:
raise ValueError(f"Edge '{edge}' not found in calibration")
total_leds = self.calibration.get_total_leds()
led_colors = [(0, 0, 0)] * total_leds
# Light up the specified edge
led_indices = range(segment.led_start, segment.led_start + segment.led_count)
for led_idx in led_indices:
led_colors[led_idx] = color
logger.info(f"Generated test pattern for {edge} edge with color {color}")
return led_colors
def create_default_calibration(led_count: int) -> CalibrationConfig:
"""Create a default calibration for a rectangular screen.
Assumes LEDs are evenly distributed around the screen edges in clockwise order
starting from bottom-left.
Args:
led_count: Total number of LEDs
Returns:
Default calibration configuration
"""
if led_count < 4:
raise ValueError("Need at least 4 LEDs for default calibration")
# Distribute LEDs evenly across 4 edges
leds_per_edge = led_count // 4
remainder = led_count % 4
# Distribute remainder to longer edges (bottom and top)
bottom_count = leds_per_edge + (1 if remainder > 0 else 0)
right_count = leds_per_edge
top_count = leds_per_edge + (1 if remainder > 1 else 0)
left_count = leds_per_edge + (1 if remainder > 2 else 0)
segments = [
CalibrationSegment(
edge="bottom",
led_start=0,
led_count=bottom_count,
reverse=False,
),
CalibrationSegment(
edge="right",
led_start=bottom_count,
led_count=right_count,
reverse=False,
),
CalibrationSegment(
edge="top",
led_start=bottom_count + right_count,
led_count=top_count,
reverse=True,
),
CalibrationSegment(
edge="left",
led_start=bottom_count + right_count + top_count,
led_count=left_count,
reverse=True,
),
]
config = CalibrationConfig(
layout="clockwise",
start_position="bottom_left",
segments=segments,
)
logger.info(
f"Created default calibration for {led_count} LEDs: "
f"bottom={bottom_count}, right={right_count}, "
f"top={top_count}, left={left_count}"
)
return config
def calibration_from_dict(data: dict) -> CalibrationConfig:
"""Create calibration configuration from dictionary.
Args:
data: Dictionary with calibration data
Returns:
CalibrationConfig instance
Raises:
ValueError: If data is invalid
"""
try:
segments = [
CalibrationSegment(
edge=seg["edge"],
led_start=seg["led_start"],
led_count=seg["led_count"],
reverse=seg.get("reverse", False),
)
for seg in data["segments"]
]
config = CalibrationConfig(
layout=data["layout"],
start_position=data["start_position"],
segments=segments,
)
config.validate()
return config
except KeyError as e:
raise ValueError(f"Missing required calibration field: {e}")
except Exception as e:
raise ValueError(f"Invalid calibration data: {e}")
def calibration_to_dict(config: CalibrationConfig) -> dict:
"""Convert calibration configuration to dictionary.
Args:
config: Calibration configuration
Returns:
Dictionary representation
"""
return {
"layout": config.layout,
"start_position": config.start_position,
"segments": [
{
"edge": seg.edge,
"led_start": seg.led_start,
"led_count": seg.led_count,
"reverse": seg.reverse,
}
for seg in config.segments
],
}

View File

@@ -0,0 +1,166 @@
"""Pixel processing utilities for color correction and manipulation."""
from typing import List, Tuple
import numpy as np
from wled_controller.utils import get_logger
logger = get_logger(__name__)
def apply_color_correction(
colors: List[Tuple[int, int, int]],
gamma: float = 2.2,
saturation: float = 1.0,
brightness: float = 1.0,
) -> List[Tuple[int, int, int]]:
"""Apply color correction to LED colors.
Args:
colors: List of (R, G, B) tuples
gamma: Gamma correction factor (default 2.2)
saturation: Saturation multiplier (0.0-2.0)
brightness: Brightness multiplier (0.0-1.0)
Returns:
Corrected list of (R, G, B) tuples
"""
if not colors:
return colors
# Convert to numpy array for efficient processing
colors_array = np.array(colors, dtype=np.float32) / 255.0
# Apply brightness
if brightness != 1.0:
colors_array *= brightness
# Apply saturation
if saturation != 1.0:
# Convert RGB to HSV-like saturation adjustment
# Calculate luminance (grayscale)
luminance = np.dot(colors_array, [0.299, 0.587, 0.114])
luminance = luminance[:, np.newaxis] # Reshape for broadcasting
# Blend between grayscale and color based on saturation
colors_array = luminance + (colors_array - luminance) * saturation
# Apply gamma correction
if gamma != 1.0:
colors_array = np.power(colors_array, 1.0 / gamma)
# Clamp to valid range and convert back to integers
colors_array = np.clip(colors_array * 255.0, 0, 255).astype(np.uint8)
# Convert back to list of tuples
corrected_colors = [tuple(color) for color in colors_array]
return corrected_colors
def smooth_colors(
current_colors: List[Tuple[int, int, int]],
previous_colors: List[Tuple[int, int, int]],
smoothing_factor: float = 0.5,
) -> List[Tuple[int, int, int]]:
"""Smooth color transitions between frames.
Args:
current_colors: Current frame colors
previous_colors: Previous frame colors
smoothing_factor: Smoothing amount (0.0-1.0, where 0=no smoothing, 1=full smoothing)
Returns:
Smoothed colors
"""
if not current_colors or not previous_colors:
return current_colors
if len(current_colors) != len(previous_colors):
logger.warning(
f"Color count mismatch: current={len(current_colors)}, "
f"previous={len(previous_colors)}. Skipping smoothing."
)
return current_colors
if smoothing_factor <= 0:
return current_colors
if smoothing_factor >= 1:
return previous_colors
# Convert to numpy arrays
current = np.array(current_colors, dtype=np.float32)
previous = np.array(previous_colors, dtype=np.float32)
# Blend between current and previous
smoothed = current * (1 - smoothing_factor) + previous * smoothing_factor
# Convert back to integers
smoothed = np.clip(smoothed, 0, 255).astype(np.uint8)
return [tuple(color) for color in smoothed]
def adjust_brightness_global(
colors: List[Tuple[int, int, int]],
target_brightness: int,
) -> List[Tuple[int, int, int]]:
"""Adjust colors to achieve target global brightness.
Args:
colors: List of (R, G, B) tuples
target_brightness: Target brightness (0-255)
Returns:
Adjusted colors
"""
if not colors or target_brightness == 255:
return colors
# Calculate scaling factor
scale = target_brightness / 255.0
# Scale all colors
scaled = [
(
int(r * scale),
int(g * scale),
int(b * scale),
)
for r, g, b in colors
]
return scaled
def limit_brightness(
colors: List[Tuple[int, int, int]],
max_brightness: int = 255,
) -> List[Tuple[int, int, int]]:
"""Limit maximum brightness of any color channel.
Args:
colors: List of (R, G, B) tuples
max_brightness: Maximum allowed brightness (0-255)
Returns:
Limited colors
"""
if not colors or max_brightness == 255:
return colors
limited = []
for r, g, b in colors:
# Find max channel value
max_val = max(r, g, b)
if max_val > max_brightness:
# Scale down proportionally
scale = max_brightness / max_val
r = int(r * scale)
g = int(g * scale)
b = int(b * scale)
limited.append((r, g, b))
return limited

View File

@@ -0,0 +1,452 @@
"""Processing manager for coordinating screen capture and WLED updates."""
import asyncio
import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Optional
from wled_controller.core.calibration import (
CalibrationConfig,
PixelMapper,
create_default_calibration,
)
from wled_controller.core.pixel_processor import apply_color_correction, smooth_colors
from wled_controller.core.screen_capture import capture_display, extract_border_pixels
from wled_controller.core.wled_client import WLEDClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
@dataclass
class ProcessingSettings:
"""Settings for screen processing."""
display_index: int = 0
fps: int = 30
border_width: int = 10
brightness: float = 1.0
gamma: float = 2.2
saturation: float = 1.0
smoothing: float = 0.3
interpolation_mode: str = "average"
@dataclass
class ProcessingMetrics:
"""Metrics for processing performance."""
frames_processed: int = 0
errors_count: int = 0
last_error: Optional[str] = None
last_update: Optional[datetime] = None
start_time: Optional[datetime] = None
fps_actual: float = 0.0
@dataclass
class ProcessorState:
"""State of a running processor."""
device_id: str
device_url: str
led_count: int
settings: ProcessingSettings
calibration: CalibrationConfig
wled_client: Optional[WLEDClient] = None
pixel_mapper: Optional[PixelMapper] = None
is_running: bool = False
task: Optional[asyncio.Task] = None
metrics: ProcessingMetrics = field(default_factory=ProcessingMetrics)
previous_colors: Optional[list] = None
class ProcessorManager:
"""Manages screen processing for multiple WLED devices."""
def __init__(self):
"""Initialize processor manager."""
self._processors: Dict[str, ProcessorState] = {}
logger.info("Processor manager initialized")
def add_device(
self,
device_id: str,
device_url: str,
led_count: int,
settings: Optional[ProcessingSettings] = None,
calibration: Optional[CalibrationConfig] = None,
):
"""Add a device for processing.
Args:
device_id: Unique device identifier
device_url: WLED device URL
led_count: Number of LEDs
settings: Processing settings (uses defaults if None)
calibration: Calibration config (creates default if None)
"""
if device_id in self._processors:
raise ValueError(f"Device {device_id} already exists")
if settings is None:
settings = ProcessingSettings()
if calibration is None:
calibration = create_default_calibration(led_count)
state = ProcessorState(
device_id=device_id,
device_url=device_url,
led_count=led_count,
settings=settings,
calibration=calibration,
)
self._processors[device_id] = state
logger.info(f"Added device {device_id} with {led_count} LEDs")
def remove_device(self, device_id: str):
"""Remove a device.
Args:
device_id: Device identifier
Raises:
ValueError: If device not found
"""
if device_id not in self._processors:
raise ValueError(f"Device {device_id} not found")
# Stop processing if running
if self._processors[device_id].is_running:
raise RuntimeError(f"Cannot remove device {device_id} while processing")
del self._processors[device_id]
logger.info(f"Removed device {device_id}")
def update_settings(self, device_id: str, settings: ProcessingSettings):
"""Update processing settings for a device.
Args:
device_id: Device identifier
settings: New settings
Raises:
ValueError: If device not found
"""
if device_id not in self._processors:
raise ValueError(f"Device {device_id} not found")
self._processors[device_id].settings = settings
# Recreate pixel mapper if interpolation mode changed
state = self._processors[device_id]
if state.pixel_mapper:
state.pixel_mapper = PixelMapper(
state.calibration,
interpolation_mode=settings.interpolation_mode,
)
logger.info(f"Updated settings for device {device_id}")
def update_calibration(self, device_id: str, calibration: CalibrationConfig):
"""Update calibration for a device.
Args:
device_id: Device identifier
calibration: New calibration config
Raises:
ValueError: If device not found or calibration invalid
"""
if device_id not in self._processors:
raise ValueError(f"Device {device_id} not found")
# Validate calibration
calibration.validate()
# Check LED count matches
state = self._processors[device_id]
if calibration.get_total_leds() != state.led_count:
raise ValueError(
f"Calibration LED count ({calibration.get_total_leds()}) "
f"does not match device LED count ({state.led_count})"
)
state.calibration = calibration
# Recreate pixel mapper if running
if state.pixel_mapper:
state.pixel_mapper = PixelMapper(
calibration,
interpolation_mode=state.settings.interpolation_mode,
)
logger.info(f"Updated calibration for device {device_id}")
async def start_processing(self, device_id: str):
"""Start screen processing for a device.
Args:
device_id: Device identifier
Raises:
ValueError: If device not found
RuntimeError: If processing already running
"""
if device_id not in self._processors:
raise ValueError(f"Device {device_id} not found")
state = self._processors[device_id]
if state.is_running:
raise RuntimeError(f"Processing already running for device {device_id}")
# Connect to WLED device
try:
state.wled_client = WLEDClient(state.device_url)
await state.wled_client.connect()
except Exception as e:
logger.error(f"Failed to connect to WLED device {device_id}: {e}")
raise RuntimeError(f"Failed to connect to WLED device: {e}")
# Initialize pixel mapper
state.pixel_mapper = PixelMapper(
state.calibration,
interpolation_mode=state.settings.interpolation_mode,
)
# Reset metrics
state.metrics = ProcessingMetrics(start_time=datetime.utcnow())
state.previous_colors = None
# Start processing task
state.task = asyncio.create_task(self._processing_loop(device_id))
state.is_running = True
logger.info(f"Started processing for device {device_id}")
async def stop_processing(self, device_id: str):
"""Stop screen processing for a device.
Args:
device_id: Device identifier
Raises:
ValueError: If device not found
"""
if device_id not in self._processors:
raise ValueError(f"Device {device_id} not found")
state = self._processors[device_id]
if not state.is_running:
logger.warning(f"Processing not running for device {device_id}")
return
# Stop processing
state.is_running = False
# Cancel task
if state.task:
state.task.cancel()
try:
await state.task
except asyncio.CancelledError:
pass
state.task = None
# Close WLED connection
if state.wled_client:
await state.wled_client.close()
state.wled_client = None
logger.info(f"Stopped processing for device {device_id}")
async def _processing_loop(self, device_id: str):
"""Main processing loop for a device.
Args:
device_id: Device identifier
"""
state = self._processors[device_id]
settings = state.settings
logger.info(
f"Processing loop started for {device_id} "
f"(display={settings.display_index}, fps={settings.fps})"
)
frame_time = 1.0 / settings.fps
fps_samples = []
try:
while state.is_running:
loop_start = time.time()
try:
# Capture screen
capture = capture_display(settings.display_index)
# Extract border pixels
border_pixels = extract_border_pixels(capture, settings.border_width)
# Map to LED colors
led_colors = state.pixel_mapper.map_border_to_leds(border_pixels)
# Apply color correction
led_colors = apply_color_correction(
led_colors,
gamma=settings.gamma,
saturation=settings.saturation,
brightness=settings.brightness,
)
# Apply smoothing
if state.previous_colors and settings.smoothing > 0:
led_colors = smooth_colors(
led_colors,
state.previous_colors,
settings.smoothing,
)
# Send to WLED with brightness
brightness_value = int(settings.brightness * 255)
await state.wled_client.send_pixels(led_colors, brightness=brightness_value)
# Update metrics
state.metrics.frames_processed += 1
state.metrics.last_update = datetime.utcnow()
state.previous_colors = led_colors
# Calculate actual FPS
loop_time = time.time() - loop_start
fps_samples.append(1.0 / loop_time if loop_time > 0 else 0)
if len(fps_samples) > 10:
fps_samples.pop(0)
state.metrics.fps_actual = sum(fps_samples) / len(fps_samples)
except Exception as e:
state.metrics.errors_count += 1
state.metrics.last_error = str(e)
logger.error(f"Processing error for device {device_id}: {e}")
# FPS control
elapsed = time.time() - loop_start
sleep_time = max(0, frame_time - elapsed)
if sleep_time > 0:
await asyncio.sleep(sleep_time)
except asyncio.CancelledError:
logger.info(f"Processing loop cancelled for device {device_id}")
raise
except Exception as e:
logger.error(f"Fatal error in processing loop for {device_id}: {e}")
state.is_running = False
raise
finally:
logger.info(f"Processing loop ended for device {device_id}")
def get_state(self, device_id: str) -> dict:
"""Get current processing state for a device.
Args:
device_id: Device identifier
Returns:
State dictionary
Raises:
ValueError: If device not found
"""
if device_id not in self._processors:
raise ValueError(f"Device {device_id} not found")
state = self._processors[device_id]
metrics = state.metrics
return {
"device_id": device_id,
"processing": state.is_running,
"fps_actual": metrics.fps_actual if state.is_running else None,
"fps_target": state.settings.fps,
"display_index": state.settings.display_index,
"last_update": metrics.last_update,
"errors": [metrics.last_error] if metrics.last_error else [],
}
def get_metrics(self, device_id: str) -> dict:
"""Get detailed metrics for a device.
Args:
device_id: Device identifier
Returns:
Metrics dictionary
Raises:
ValueError: If device not found
"""
if device_id not in self._processors:
raise ValueError(f"Device {device_id} not found")
state = self._processors[device_id]
metrics = state.metrics
# Calculate uptime
uptime_seconds = 0.0
if metrics.start_time and state.is_running:
uptime_seconds = (datetime.utcnow() - metrics.start_time).total_seconds()
return {
"device_id": device_id,
"processing": state.is_running,
"fps_actual": metrics.fps_actual if state.is_running else None,
"fps_target": state.settings.fps,
"uptime_seconds": uptime_seconds,
"frames_processed": metrics.frames_processed,
"errors_count": metrics.errors_count,
"last_error": metrics.last_error,
"last_update": metrics.last_update,
}
def is_processing(self, device_id: str) -> bool:
"""Check if device is currently processing.
Args:
device_id: Device identifier
Returns:
True if processing
Raises:
ValueError: If device not found
"""
if device_id not in self._processors:
raise ValueError(f"Device {device_id} not found")
return self._processors[device_id].is_running
def get_all_devices(self) -> list[str]:
"""Get list of all device IDs.
Returns:
List of device IDs
"""
return list(self._processors.keys())
async def stop_all(self):
"""Stop processing for all devices."""
device_ids = list(self._processors.keys())
for device_id in device_ids:
if self._processors[device_id].is_running:
try:
await self.stop_processing(device_id)
except Exception as e:
logger.error(f"Error stopping device {device_id}: {e}")
logger.info("Stopped all processors")

View File

@@ -0,0 +1,329 @@
"""Screen capture functionality using mss library."""
from dataclasses import dataclass
from typing import Dict, List
import mss
import numpy as np
from PIL import Image
from wled_controller.utils import get_logger, get_monitor_names
logger = get_logger(__name__)
@dataclass
class DisplayInfo:
"""Information about a display/monitor."""
index: int
name: str
width: int
height: int
x: int
y: int
is_primary: bool
@dataclass
class ScreenCapture:
"""Captured screen image data."""
image: np.ndarray
width: int
height: int
display_index: int
@dataclass
class BorderPixels:
"""Border pixels extracted from screen edges."""
top: np.ndarray
right: np.ndarray
bottom: np.ndarray
left: np.ndarray
def get_available_displays() -> List[DisplayInfo]:
"""Get list of available displays/monitors.
Returns:
List of DisplayInfo objects for each available monitor
Raises:
RuntimeError: If unable to detect displays
"""
try:
# Get friendly monitor names (Windows only, falls back to generic names)
monitor_names = get_monitor_names()
with mss.mss() as sct:
displays = []
# Skip the first monitor (combined virtual screen on multi-monitor setups)
for idx, monitor in enumerate(sct.monitors[1:], start=0):
# Use friendly name from WMI if available, otherwise generic name
friendly_name = monitor_names.get(idx, f"Display {idx}")
display_info = DisplayInfo(
index=idx,
name=friendly_name,
width=monitor["width"],
height=monitor["height"],
x=monitor["left"],
y=monitor["top"],
is_primary=(idx == 0),
)
displays.append(display_info)
logger.info(f"Detected {len(displays)} display(s)")
return displays
except Exception as e:
logger.error(f"Failed to detect displays: {e}")
raise RuntimeError(f"Failed to detect displays: {e}")
def capture_display(display_index: int = 0) -> ScreenCapture:
"""Capture the specified display.
Args:
display_index: Index of the display to capture (0-based)
Returns:
ScreenCapture object containing the captured image
Raises:
ValueError: If display_index is invalid
RuntimeError: If screen capture fails
"""
try:
with mss.mss() as sct:
# mss monitors[0] is the combined screen, monitors[1+] are individual displays
monitor_index = display_index + 1
if monitor_index >= len(sct.monitors):
raise ValueError(
f"Invalid display index {display_index}. "
f"Available displays: 0-{len(sct.monitors) - 2}"
)
monitor = sct.monitors[monitor_index]
# Capture screenshot
screenshot = sct.grab(monitor)
# Convert to numpy array (RGB)
img = Image.frombytes("RGB", screenshot.size, screenshot.rgb)
img_array = np.array(img)
logger.debug(
f"Captured display {display_index}: {monitor['width']}x{monitor['height']}"
)
return ScreenCapture(
image=img_array,
width=monitor["width"],
height=monitor["height"],
display_index=display_index,
)
except ValueError:
raise
except Exception as e:
logger.error(f"Failed to capture display {display_index}: {e}")
raise RuntimeError(f"Screen capture failed: {e}")
def extract_border_pixels(
screen_capture: ScreenCapture,
border_width: int = 10
) -> BorderPixels:
"""Extract border pixels from screen capture.
Args:
screen_capture: Captured screen image
border_width: Width of the border in pixels to extract
Returns:
BorderPixels object containing pixels from each edge
Raises:
ValueError: If border_width is invalid
"""
if border_width < 1:
raise ValueError("border_width must be at least 1")
if border_width > min(screen_capture.width, screen_capture.height) // 4:
raise ValueError(
f"border_width {border_width} is too large for screen size "
f"{screen_capture.width}x{screen_capture.height}"
)
img = screen_capture.image
height, width = img.shape[:2]
# Extract border regions
# Top edge: top border_width rows, full width
top = img[:border_width, :, :]
# Bottom edge: bottom border_width rows, full width
bottom = img[-border_width:, :, :]
# Right edge: right border_width columns, full height
right = img[:, -border_width:, :]
# Left edge: left border_width columns, full height
left = img[:, :border_width, :]
logger.debug(
f"Extracted borders: top={top.shape}, right={right.shape}, "
f"bottom={bottom.shape}, left={left.shape}"
)
return BorderPixels(
top=top,
right=right,
bottom=bottom,
left=left,
)
def get_edge_segments(
edge_pixels: np.ndarray,
segment_count: int,
edge_name: str
) -> List[np.ndarray]:
"""Divide edge pixels into segments.
Args:
edge_pixels: Pixel array for one edge
segment_count: Number of segments to divide into
edge_name: Name of the edge (for orientation)
Returns:
List of pixel arrays, one per segment
Raises:
ValueError: If segment_count is invalid
"""
if segment_count < 1:
raise ValueError("segment_count must be at least 1")
# Determine the dimension to divide
# For top/bottom edges: divide along width (axis 1)
# For left/right edges: divide along height (axis 0)
if edge_name in ["top", "bottom"]:
divide_axis = 1 # Width
edge_length = edge_pixels.shape[1]
else: # left, right
divide_axis = 0 # Height
edge_length = edge_pixels.shape[0]
if segment_count > edge_length:
raise ValueError(
f"segment_count {segment_count} is larger than edge length {edge_length}"
)
# Calculate segment size
segment_size = edge_length // segment_count
segments = []
for i in range(segment_count):
start = i * segment_size
end = start + segment_size if i < segment_count - 1 else edge_length
if divide_axis == 1:
segment = edge_pixels[:, start:end, :]
else:
segment = edge_pixels[start:end, :, :]
segments.append(segment)
return segments
def calculate_average_color(pixels: np.ndarray) -> tuple[int, int, int]:
"""Calculate average color of a pixel region.
Args:
pixels: Pixel array (height, width, 3)
Returns:
Tuple of (R, G, B) average values
"""
if pixels.size == 0:
return (0, 0, 0)
# Calculate mean across height and width dimensions
mean_color = np.mean(pixels, axis=(0, 1))
# Convert to integers and clamp to valid range
r = int(np.clip(mean_color[0], 0, 255))
g = int(np.clip(mean_color[1], 0, 255))
b = int(np.clip(mean_color[2], 0, 255))
return (r, g, b)
def calculate_median_color(pixels: np.ndarray) -> tuple[int, int, int]:
"""Calculate median color of a pixel region.
Args:
pixels: Pixel array (height, width, 3)
Returns:
Tuple of (R, G, B) median values
"""
if pixels.size == 0:
return (0, 0, 0)
# Calculate median across height and width dimensions
median_color = np.median(pixels, axis=(0, 1))
# Convert to integers and clamp to valid range
r = int(np.clip(median_color[0], 0, 255))
g = int(np.clip(median_color[1], 0, 255))
b = int(np.clip(median_color[2], 0, 255))
return (r, g, b)
def calculate_dominant_color(pixels: np.ndarray) -> tuple[int, int, int]:
"""Calculate dominant color of a pixel region using simple clustering.
Args:
pixels: Pixel array (height, width, 3)
Returns:
Tuple of (R, G, B) dominant color values
"""
if pixels.size == 0:
return (0, 0, 0)
# Reshape to (n_pixels, 3)
pixels_reshaped = pixels.reshape(-1, 3)
# For performance, sample pixels if there are too many
max_samples = 1000
if len(pixels_reshaped) > max_samples:
indices = np.random.choice(len(pixels_reshaped), max_samples, replace=False)
pixels_reshaped = pixels_reshaped[indices]
# Simple dominant color: quantize colors and find most common
# Reduce color space to 32 levels per channel for binning
quantized = (pixels_reshaped // 8) * 8
# Find unique colors and their counts
unique_colors, counts = np.unique(quantized, axis=0, return_counts=True)
# Get the most common color
dominant_idx = np.argmax(counts)
dominant_color = unique_colors[dominant_idx]
r = int(np.clip(dominant_color[0], 0, 255))
g = int(np.clip(dominant_color[1], 0, 255))
b = int(np.clip(dominant_color[2], 0, 255))
return (r, g, b)

View File

@@ -0,0 +1,368 @@
"""WLED HTTP client for controlling LED devices."""
import asyncio
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict, Any
import httpx
from wled_controller.utils import get_logger
logger = get_logger(__name__)
@dataclass
class WLEDInfo:
"""WLED device information."""
name: str
version: str
led_count: int
brand: str
product: str
mac: str
ip: str
class WLEDClient:
"""HTTP client for WLED devices."""
def __init__(
self,
url: str,
timeout: int = 5,
retry_attempts: int = 3,
retry_delay: int = 1,
):
"""Initialize WLED client.
Args:
url: WLED device URL (e.g., http://192.168.1.100)
timeout: Request timeout in seconds
retry_attempts: Number of retry attempts on failure
retry_delay: Delay between retries in seconds
"""
self.url = url.rstrip("/")
self.timeout = timeout
self.retry_attempts = retry_attempts
self.retry_delay = retry_delay
self._client: Optional[httpx.AsyncClient] = None
self._connected = False
async def __aenter__(self):
"""Async context manager entry."""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()
async def connect(self) -> bool:
"""Establish connection to WLED device.
Returns:
True if connection successful
Raises:
RuntimeError: If connection fails
"""
try:
self._client = httpx.AsyncClient(timeout=self.timeout)
# Test connection by getting device info
info = await self.get_info()
self._connected = True
logger.info(
f"Connected to WLED device: {info.name} ({info.version}) "
f"with {info.led_count} LEDs"
)
return True
except Exception as e:
logger.error(f"Failed to connect to WLED device at {self.url}: {e}")
self._connected = False
if self._client:
await self._client.aclose()
self._client = None
raise RuntimeError(f"Failed to connect to WLED device: {e}")
async def close(self):
"""Close the connection to WLED device."""
if self._client:
await self._client.aclose()
self._client = None
self._connected = False
logger.debug(f"Closed connection to {self.url}")
@property
def is_connected(self) -> bool:
"""Check if connected to WLED device."""
return self._connected and self._client is not None
async def _request(
self,
method: str,
endpoint: str,
json_data: Optional[Dict[str, Any]] = None,
retry: bool = True,
) -> Dict[str, Any]:
"""Make HTTP request to WLED device with retry logic.
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint
json_data: JSON data for request body
retry: Whether to retry on failure
Returns:
Response JSON data
Raises:
RuntimeError: If request fails after retries
"""
if not self._client:
raise RuntimeError("Client not connected. Call connect() first.")
url = f"{self.url}{endpoint}"
attempts = self.retry_attempts if retry else 1
for attempt in range(attempts):
try:
if method == "GET":
response = await self._client.get(url)
elif method == "POST":
response = await self._client.post(url, json=json_data)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error {e.response.status_code} on attempt {attempt + 1}: {e}")
if attempt < attempts - 1:
await asyncio.sleep(self.retry_delay)
else:
raise RuntimeError(f"HTTP request failed: {e}")
except httpx.RequestError as e:
logger.error(f"Request error on attempt {attempt + 1}: {e}")
if attempt < attempts - 1:
await asyncio.sleep(self.retry_delay)
else:
self._connected = False
raise RuntimeError(f"Request to WLED device failed: {e}")
except Exception as e:
logger.error(f"Unexpected error on attempt {attempt + 1}: {e}")
if attempt < attempts - 1:
await asyncio.sleep(self.retry_delay)
else:
raise RuntimeError(f"WLED request failed: {e}")
raise RuntimeError("Request failed after all retry attempts")
async def get_info(self) -> WLEDInfo:
"""Get WLED device information.
Returns:
WLEDInfo object with device details
Raises:
RuntimeError: If request fails
"""
try:
data = await self._request("GET", "/json/info")
return WLEDInfo(
name=data.get("name", "Unknown"),
version=data.get("ver", "Unknown"),
led_count=data.get("leds", {}).get("count", 0),
brand=data.get("brand", "WLED"),
product=data.get("product", "FOSS"),
mac=data.get("mac", ""),
ip=data.get("ip", ""),
)
except Exception as e:
logger.error(f"Failed to get device info: {e}")
raise
async def get_state(self) -> Dict[str, Any]:
"""Get current WLED device state.
Returns:
State dictionary
Raises:
RuntimeError: If request fails
"""
try:
return await self._request("GET", "/json/state")
except Exception as e:
logger.error(f"Failed to get device state: {e}")
raise
async def send_pixels(
self,
pixels: List[Tuple[int, int, int]],
brightness: int = 255,
segment_id: int = 0,
) -> bool:
"""Send pixel colors to WLED device.
Args:
pixels: List of (R, G, B) tuples for each LED
brightness: Global brightness (0-255)
segment_id: Segment ID to update
Returns:
True if successful
Raises:
ValueError: If pixel values are invalid
RuntimeError: If request fails
"""
# Validate inputs
if not pixels:
raise ValueError("Pixels list cannot be empty")
if not 0 <= brightness <= 255:
raise ValueError(f"Brightness must be 0-255, got {brightness}")
# Validate pixel values
for i, (r, g, b) in enumerate(pixels):
if not (0 <= r <= 255 and 0 <= g <= 255 and 0 <= b <= 255):
raise ValueError(f"Invalid RGB values at index {i}: ({r}, {g}, {b})")
# Build WLED JSON state
payload = {
"on": True,
"bri": brightness,
"seg": [
{
"id": segment_id,
"i": pixels, # Individual LED colors
}
],
}
try:
await self._request("POST", "/json/state", json_data=payload)
logger.debug(f"Sent {len(pixels)} pixel colors to WLED device")
return True
except Exception as e:
logger.error(f"Failed to send pixels: {e}")
raise
async def set_power(self, on: bool) -> bool:
"""Turn WLED device on or off.
Args:
on: True to turn on, False to turn off
Returns:
True if successful
Raises:
RuntimeError: If request fails
"""
payload = {"on": on}
try:
await self._request("POST", "/json/state", json_data=payload)
logger.info(f"Set WLED power: {'ON' if on else 'OFF'}")
return True
except Exception as e:
logger.error(f"Failed to set power: {e}")
raise
async def set_brightness(self, brightness: int) -> bool:
"""Set global brightness.
Args:
brightness: Brightness value (0-255)
Returns:
True if successful
Raises:
ValueError: If brightness is out of range
RuntimeError: If request fails
"""
if not 0 <= brightness <= 255:
raise ValueError(f"Brightness must be 0-255, got {brightness}")
payload = {"bri": brightness}
try:
await self._request("POST", "/json/state", json_data=payload)
logger.debug(f"Set brightness to {brightness}")
return True
except Exception as e:
logger.error(f"Failed to set brightness: {e}")
raise
async def test_connection(self) -> bool:
"""Test connection to WLED device.
Returns:
True if device is reachable
Raises:
RuntimeError: If connection test fails
"""
try:
await self.get_info()
return True
except Exception as e:
logger.error(f"Connection test failed: {e}")
raise
async def send_test_pattern(self, led_count: int, duration: float = 2.0):
"""Send a test pattern to verify LED configuration.
Cycles through red, green, blue on all LEDs.
Args:
led_count: Number of LEDs
duration: Duration for each color in seconds
Raises:
RuntimeError: If test pattern fails
"""
logger.info(f"Sending test pattern to {led_count} LEDs")
try:
# Red
pixels = [(255, 0, 0)] * led_count
await self.send_pixels(pixels)
await asyncio.sleep(duration)
# Green
pixels = [(0, 255, 0)] * led_count
await self.send_pixels(pixels)
await asyncio.sleep(duration)
# Blue
pixels = [(0, 0, 255)] * led_count
await self.send_pixels(pixels)
await asyncio.sleep(duration)
# Off
pixels = [(0, 0, 0)] * led_count
await self.send_pixels(pixels)
logger.info("Test pattern complete")
except Exception as e:
logger.error(f"Test pattern failed: {e}")
raise

View File

@@ -0,0 +1,166 @@
"""FastAPI application entry point."""
import sys
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from wled_controller import __version__
from wled_controller.api import router
from wled_controller.api.routes import init_dependencies
from wled_controller.config import get_config
from wled_controller.core.processor_manager import ProcessorManager
from wled_controller.storage import DeviceStore
from wled_controller.utils import setup_logging, get_logger
# Initialize logging
setup_logging()
logger = get_logger(__name__)
# Get configuration
config = get_config()
# Initialize storage and processing
device_store = DeviceStore(config.storage.devices_file)
processor_manager = ProcessorManager()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager.
Handles startup and shutdown events.
"""
# Startup
logger.info(f"Starting WLED Screen Controller v{__version__}")
logger.info(f"Python version: {sys.version}")
logger.info(f"Server listening on {config.server.host}:{config.server.port}")
# Validate authentication configuration
if not config.auth.api_keys:
logger.error("=" * 70)
logger.error("CRITICAL: No API keys configured!")
logger.error("Authentication is REQUIRED for all API requests.")
logger.error("Please add API keys to your configuration:")
logger.error(" 1. Generate keys: openssl rand -hex 32")
logger.error(" 2. Add to config/default_config.yaml under auth.api_keys")
logger.error(" 3. Format: label: \"your-generated-key\"")
logger.error("=" * 70)
raise RuntimeError("No API keys configured - server cannot start without authentication")
# Log authentication status
logger.info(f"API Authentication: ENFORCED ({len(config.auth.api_keys)} clients configured)")
client_labels = ", ".join(config.auth.api_keys.keys())
logger.info(f"Authorized clients: {client_labels}")
logger.info("All API requests require valid Bearer token authentication")
# Initialize API dependencies
init_dependencies(device_store, processor_manager)
# Load existing devices into processor manager
devices = device_store.get_all_devices()
for device in devices:
try:
processor_manager.add_device(
device_id=device.id,
device_url=device.url,
led_count=device.led_count,
settings=device.settings,
calibration=device.calibration,
)
logger.info(f"Loaded device: {device.name} ({device.id})")
except Exception as e:
logger.error(f"Failed to load device {device.id}: {e}")
logger.info(f"Loaded {len(devices)} devices from storage")
yield
# Shutdown
logger.info("Shutting down WLED Screen Controller")
# Stop all processing
try:
await processor_manager.stop_all()
logger.info("Stopped all processors")
except Exception as e:
logger.error(f"Error stopping processors: {e}")
# Create FastAPI application
app = FastAPI(
title="WLED Screen Controller",
description="Control WLED devices based on screen content for ambient lighting",
version=__version__,
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=config.server.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include API routes
app.include_router(router)
# Mount static files
static_path = Path(__file__).parent / "static"
if static_path.exists():
app.mount("/static", StaticFiles(directory=str(static_path)), name="static")
logger.info(f"Mounted static files from {static_path}")
else:
logger.warning(f"Static files directory not found: {static_path}")
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
"""Global exception handler for unhandled errors."""
logger.error(f"Unhandled exception: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": "InternalServerError",
"message": "An unexpected error occurred",
"detail": str(exc) if config.server.log_level == "DEBUG" else None,
},
)
@app.get("/")
async def root():
"""Serve the web UI dashboard."""
static_path = Path(__file__).parent / "static" / "index.html"
if static_path.exists():
return FileResponse(static_path)
# Fallback to JSON if static files not found
return {
"name": "WLED Screen Controller",
"version": __version__,
"docs": "/docs",
"health": "/health",
"api": "/api/v1",
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"wled_controller.main:app",
host=config.server.host,
port=config.server.port,
log_level=config.server.log_level.lower(),
reload=True,
)

View File

@@ -0,0 +1,780 @@
const API_BASE = '/api/v1';
let refreshInterval = null;
let apiKey = null;
// Initialize app
document.addEventListener('DOMContentLoaded', () => {
// Load API key from localStorage
apiKey = localStorage.getItem('wled_api_key');
// Setup form handler
document.getElementById('add-device-form').addEventListener('submit', handleAddDevice);
// Show modal if no API key is stored
if (!apiKey) {
// Wait for modal functions to be defined
setTimeout(() => {
if (typeof showApiKeyModal === 'function') {
showApiKeyModal('Welcome! Please login with your API key to get started.', true);
}
}, 100);
return; // Don't load data yet
}
// User is logged in, load data
loadServerInfo();
loadDisplays();
loadDevices();
// Start auto-refresh
startAutoRefresh();
});
// Helper function to add auth header if needed
function getHeaders() {
const headers = {
'Content-Type': 'application/json'
};
if (apiKey) {
headers['Authorization'] = `Bearer ${apiKey}`;
}
return headers;
}
// Handle 401 errors by showing login modal
function handle401Error() {
// Clear invalid API key
localStorage.removeItem('wled_api_key');
apiKey = null;
if (typeof updateAuthUI === 'function') {
updateAuthUI();
}
if (typeof showApiKeyModal === 'function') {
showApiKeyModal('Your session has expired or the API key is invalid. Please login again.', true);
} else {
showToast('Authentication failed. Please reload the page and login.', 'error');
}
}
// Configure API key
function configureApiKey() {
const currentKey = localStorage.getItem('wled_api_key');
const message = currentKey
? 'Current API key is set. Enter new key to update or leave blank to remove:'
: 'Enter your API key:';
const key = prompt(message);
if (key === null) {
return; // Cancelled
}
if (key === '') {
localStorage.removeItem('wled_api_key');
apiKey = null;
document.getElementById('api-key-btn').style.display = 'none';
showToast('API key removed', 'info');
} else {
localStorage.setItem('wled_api_key', key);
apiKey = key;
document.getElementById('api-key-btn').style.display = 'inline-block';
showToast('API key updated', 'success');
}
// Reload data with new key
loadServerInfo();
loadDisplays();
loadDevices();
}
// Server info
async function loadServerInfo() {
try {
const response = await fetch('/health');
const data = await response.json();
document.getElementById('server-version').textContent = `Version: ${data.version}`;
document.getElementById('server-status').textContent = '●';
document.getElementById('server-status').className = 'status-badge online';
} catch (error) {
console.error('Failed to load server info:', error);
document.getElementById('server-status').className = 'status-badge offline';
showToast('Server offline', 'error');
}
}
// Load displays
async function loadDisplays() {
try {
const response = await fetch(`${API_BASE}/config/displays`, {
headers: getHeaders()
});
if (response.status === 401) {
handle401Error();
return;
}
const data = await response.json();
const container = document.getElementById('displays-list');
if (!data.displays || data.displays.length === 0) {
container.innerHTML = '<div class="loading">No displays available</div>';
return;
}
container.innerHTML = data.displays.map(display => `
<div class="display-card">
<div class="display-index">${display.name}</div>
<div class="info-row">
<span class="info-label">Resolution:</span>
<span class="info-value">${display.width} × ${display.height}</span>
</div>
<div class="info-row">
<span class="info-label">Position:</span>
<span class="info-value">${display.x}, ${display.y}</span>
</div>
</div>
`).join('');
} catch (error) {
console.error('Failed to load displays:', error);
document.getElementById('displays-list').innerHTML =
'<div class="loading">Failed to load displays</div>';
}
}
// Load devices
async function loadDevices() {
try {
const response = await fetch(`${API_BASE}/devices`, {
headers: getHeaders()
});
if (response.status === 401) {
handle401Error();
return;
}
const data = await response.json();
const devices = data.devices || [];
const container = document.getElementById('devices-list');
if (!devices || devices.length === 0) {
container.innerHTML = '<div class="loading">No devices attached</div>';
return;
}
// Fetch state for each device
const devicesWithState = await Promise.all(
devices.map(async (device) => {
try {
const stateResponse = await fetch(`${API_BASE}/devices/${device.id}/state`, {
headers: getHeaders()
});
const state = await stateResponse.json();
const metricsResponse = await fetch(`${API_BASE}/devices/${device.id}/metrics`, {
headers: getHeaders()
});
const metrics = await metricsResponse.json();
return { ...device, state, metrics };
} catch (error) {
console.error(`Failed to load state for device ${device.id}:`, error);
return device;
}
})
);
container.innerHTML = devicesWithState.map(device => createDeviceCard(device)).join('');
// Attach event listeners
devicesWithState.forEach(device => {
attachDeviceListeners(device.id);
});
} catch (error) {
console.error('Failed to load devices:', error);
document.getElementById('devices-list').innerHTML =
'<div class="loading">Failed to load devices</div>';
}
}
function createDeviceCard(device) {
const state = device.state || {};
const metrics = device.metrics || {};
const settings = device.settings || {};
const isProcessing = state.processing || false;
const status = isProcessing ? 'processing' : 'idle';
return `
<div class="card" data-device-id="${device.id}">
<div class="card-header">
<div class="card-title">${device.name || device.id}</div>
<span class="badge ${status}">${status.toUpperCase()}</span>
</div>
<div class="card-content">
<div class="info-row">
<span class="info-label">URL:</span>
<span class="info-value">${device.url || 'N/A'}</span>
</div>
<div class="info-row">
<span class="info-label">LED Count:</span>
<span class="info-value">${device.led_count || 0}</span>
</div>
<div class="info-row">
<span class="info-label">Display:</span>
<span class="info-value">Display ${settings.display_index !== undefined ? settings.display_index : 0}</span>
</div>
${isProcessing ? `
<div class="metrics-grid">
<div class="metric">
<div class="metric-value">${state.fps_actual?.toFixed(1) || '0.0'}</div>
<div class="metric-label">Actual FPS</div>
</div>
<div class="metric">
<div class="metric-value">${state.fps_target || 0}</div>
<div class="metric-label">Target FPS</div>
</div>
<div class="metric">
<div class="metric-value">${metrics.frames_processed || 0}</div>
<div class="metric-label">Frames</div>
</div>
<div class="metric">
<div class="metric-value">${metrics.errors_count || 0}</div>
<div class="metric-label">Errors</div>
</div>
</div>
` : ''}
</div>
<div class="card-actions">
${isProcessing ? `
<button class="btn btn-danger" onclick="stopProcessing('${device.id}')">
Stop Processing
</button>
` : `
<button class="btn btn-primary" onclick="startProcessing('${device.id}')">
Start Processing
</button>
`}
<button class="btn btn-secondary" onclick="showSettings('${device.id}')">
Settings
</button>
<button class="btn btn-secondary" onclick="showCalibration('${device.id}')">
Calibrate
</button>
<button class="btn btn-danger" onclick="removeDevice('${device.id}')">
Remove
</button>
</div>
</div>
`;
}
function attachDeviceListeners(deviceId) {
// Add any specific event listeners here if needed
}
// Device actions
async function startProcessing(deviceId) {
try {
const response = await fetch(`${API_BASE}/devices/${deviceId}/start`, {
method: 'POST',
headers: getHeaders()
});
if (response.status === 401) {
handle401Error();
return;
}
if (response.ok) {
showToast('Processing started', 'success');
loadDevices();
} else {
const error = await response.json();
showToast(`Failed to start: ${error.detail}`, 'error');
}
} catch (error) {
console.error('Failed to start processing:', error);
showToast('Failed to start processing', 'error');
}
}
async function stopProcessing(deviceId) {
try {
const response = await fetch(`${API_BASE}/devices/${deviceId}/stop`, {
method: 'POST',
headers: getHeaders()
});
if (response.status === 401) {
handle401Error();
return;
}
if (response.ok) {
showToast('Processing stopped', 'success');
loadDevices();
} else {
const error = await response.json();
showToast(`Failed to stop: ${error.detail}`, 'error');
}
} catch (error) {
console.error('Failed to stop processing:', error);
showToast('Failed to stop processing', 'error');
}
}
async function removeDevice(deviceId) {
if (!confirm('Are you sure you want to remove this device?')) {
return;
}
try {
const response = await fetch(`${API_BASE}/devices/${deviceId}`, {
method: 'DELETE',
headers: getHeaders()
});
if (response.status === 401) {
handle401Error();
return;
}
if (response.ok) {
showToast('Device removed', 'success');
loadDevices();
} else {
const error = await response.json();
showToast(`Failed to remove: ${error.detail}`, 'error');
}
} catch (error) {
console.error('Failed to remove device:', error);
showToast('Failed to remove device', 'error');
}
}
async function showSettings(deviceId) {
try {
// Fetch current device data
const response = await fetch(`${API_BASE}/devices/${deviceId}`, {
headers: getHeaders()
});
if (response.status === 401) {
handle401Error();
return;
}
if (!response.ok) {
showToast('Failed to load device settings', 'error');
return;
}
const device = await response.json();
// Populate modal
document.getElementById('settings-device-id').value = device.id;
document.getElementById('settings-device-name').value = device.name;
document.getElementById('settings-device-url').value = device.url;
document.getElementById('settings-device-led-count').value = device.led_count;
// Set brightness (convert from 0.0-1.0 to 0-100)
const brightnessPercent = Math.round((device.settings.brightness || 1.0) * 100);
document.getElementById('settings-device-brightness').value = brightnessPercent;
document.getElementById('brightness-value').textContent = brightnessPercent + '%';
// Show modal
const modal = document.getElementById('device-settings-modal');
modal.style.display = 'flex';
// Focus first input
setTimeout(() => {
document.getElementById('settings-device-name').focus();
}, 100);
} catch (error) {
console.error('Failed to load device settings:', error);
showToast('Failed to load device settings', 'error');
}
}
function closeDeviceSettingsModal() {
const modal = document.getElementById('device-settings-modal');
const error = document.getElementById('settings-error');
modal.style.display = 'none';
error.style.display = 'none';
}
async function saveDeviceSettings() {
const deviceId = document.getElementById('settings-device-id').value;
const name = document.getElementById('settings-device-name').value.trim();
const url = document.getElementById('settings-device-url').value.trim();
const led_count = parseInt(document.getElementById('settings-device-led-count').value);
const brightnessPercent = parseInt(document.getElementById('settings-device-brightness').value);
const brightness = brightnessPercent / 100.0; // Convert to 0.0-1.0
const error = document.getElementById('settings-error');
// Validation
if (!name || !url || !led_count || led_count < 1) {
error.textContent = 'Please fill in all fields correctly';
error.style.display = 'block';
return;
}
try {
// Update device info (name, url, led_count)
const deviceResponse = await fetch(`${API_BASE}/devices/${deviceId}`, {
method: 'PUT',
headers: getHeaders(),
body: JSON.stringify({ name, url, led_count })
});
if (deviceResponse.status === 401) {
handle401Error();
return;
}
if (!deviceResponse.ok) {
const errorData = await deviceResponse.json();
error.textContent = `Failed to update device: ${errorData.detail}`;
error.style.display = 'block';
return;
}
// Update settings (brightness)
const settingsResponse = await fetch(`${API_BASE}/devices/${deviceId}/settings`, {
method: 'PUT',
headers: getHeaders(),
body: JSON.stringify({ brightness })
});
if (settingsResponse.status === 401) {
handle401Error();
return;
}
if (settingsResponse.ok) {
showToast('Device settings updated', 'success');
closeDeviceSettingsModal();
loadDevices();
} else {
const errorData = await settingsResponse.json();
error.textContent = `Failed to update settings: ${errorData.detail}`;
error.style.display = 'block';
}
} catch (err) {
console.error('Failed to save device settings:', err);
error.textContent = 'Failed to save settings';
error.style.display = 'block';
}
}
// Add device form handler
async function handleAddDevice(event) {
event.preventDefault();
const name = document.getElementById('device-name').value;
const url = document.getElementById('device-url').value;
const led_count = parseInt(document.getElementById('device-led-count').value);
try {
const response = await fetch(`${API_BASE}/devices`, {
method: 'POST',
headers: getHeaders(),
body: JSON.stringify({ name, url, led_count })
});
if (response.status === 401) {
handle401Error();
return;
}
if (response.ok) {
showToast('Device added successfully', 'success');
event.target.reset();
loadDevices();
} else {
const error = await response.json();
showToast(`Failed to add device: ${error.detail}`, 'error');
}
} catch (error) {
console.error('Failed to add device:', error);
showToast('Failed to add device', 'error');
}
}
// Auto-refresh
function startAutoRefresh() {
if (refreshInterval) {
clearInterval(refreshInterval);
}
refreshInterval = setInterval(() => {
loadDevices();
}, 2000); // Refresh every 2 seconds
}
// Toast notifications
function showToast(message, type = 'info') {
const toast = document.getElementById('toast');
toast.textContent = message;
toast.className = `toast ${type} show`;
setTimeout(() => {
toast.className = 'toast';
}, 3000);
}
// Calibration functions
async function showCalibration(deviceId) {
try {
// Fetch current device data
const response = await fetch(`${API_BASE}/devices/${deviceId}`, {
headers: getHeaders()
});
if (response.status === 401) {
handle401Error();
return;
}
if (!response.ok) {
showToast('Failed to load calibration', 'error');
return;
}
const device = await response.json();
const calibration = device.calibration;
// Store device ID and LED count
document.getElementById('calibration-device-id').value = device.id;
document.getElementById('cal-device-led-count').textContent = device.led_count;
// Set layout
document.getElementById('cal-start-position').value = calibration.start_position;
document.getElementById('cal-layout').value = calibration.layout;
// Set LED counts per edge
const edgeCounts = { top: 0, right: 0, bottom: 0, left: 0 };
calibration.segments.forEach(seg => {
edgeCounts[seg.edge] = seg.led_count;
});
document.getElementById('cal-top-leds').value = edgeCounts.top;
document.getElementById('cal-right-leds').value = edgeCounts.right;
document.getElementById('cal-bottom-leds').value = edgeCounts.bottom;
document.getElementById('cal-left-leds').value = edgeCounts.left;
// Update preview
updateCalibrationPreview();
// Show modal
const modal = document.getElementById('calibration-modal');
modal.style.display = 'flex';
} catch (error) {
console.error('Failed to load calibration:', error);
showToast('Failed to load calibration', 'error');
}
}
function closeCalibrationModal() {
const modal = document.getElementById('calibration-modal');
const error = document.getElementById('calibration-error');
modal.style.display = 'none';
error.style.display = 'none';
}
function updateCalibrationPreview() {
// Update edge counts in preview
document.getElementById('preview-top-count').textContent = document.getElementById('cal-top-leds').value;
document.getElementById('preview-right-count').textContent = document.getElementById('cal-right-leds').value;
document.getElementById('preview-bottom-count').textContent = document.getElementById('cal-bottom-leds').value;
document.getElementById('preview-left-count').textContent = document.getElementById('cal-left-leds').value;
// Calculate total
const total = parseInt(document.getElementById('cal-top-leds').value || 0) +
parseInt(document.getElementById('cal-right-leds').value || 0) +
parseInt(document.getElementById('cal-bottom-leds').value || 0) +
parseInt(document.getElementById('cal-left-leds').value || 0);
document.getElementById('cal-total-leds').textContent = total;
// Update starting position indicator
const startPos = document.getElementById('cal-start-position').value;
const indicator = document.getElementById('start-indicator');
const positions = {
'bottom_left': { bottom: '10px', left: '10px', top: 'auto', right: 'auto' },
'bottom_right': { bottom: '10px', right: '10px', top: 'auto', left: 'auto' },
'top_left': { top: '10px', left: '10px', bottom: 'auto', right: 'auto' },
'top_right': { top: '10px', right: '10px', bottom: 'auto', left: 'auto' }
};
const pos = positions[startPos];
indicator.style.top = pos.top;
indicator.style.right = pos.right;
indicator.style.bottom = pos.bottom;
indicator.style.left = pos.left;
}
async function testCalibrationEdge(edge) {
const deviceId = document.getElementById('calibration-device-id').value;
const error = document.getElementById('calibration-error');
try {
const response = await fetch(`${API_BASE}/devices/${deviceId}/calibration/test`, {
method: 'POST',
headers: getHeaders(),
body: JSON.stringify({ edge, color: [255, 0, 0] }) // Red color
});
if (response.status === 401) {
handle401Error();
return;
}
if (response.ok) {
showToast(`Testing ${edge} edge (2 seconds)`, 'info');
} else {
const errorData = await response.json();
error.textContent = `Test failed: ${errorData.detail}`;
error.style.display = 'block';
}
} catch (err) {
console.error('Failed to test edge:', err);
error.textContent = 'Failed to test edge';
error.style.display = 'block';
}
}
async function saveCalibration() {
const deviceId = document.getElementById('calibration-device-id').value;
const deviceLedCount = parseInt(document.getElementById('cal-device-led-count').textContent);
const error = document.getElementById('calibration-error');
const topLeds = parseInt(document.getElementById('cal-top-leds').value || 0);
const rightLeds = parseInt(document.getElementById('cal-right-leds').value || 0);
const bottomLeds = parseInt(document.getElementById('cal-bottom-leds').value || 0);
const leftLeds = parseInt(document.getElementById('cal-left-leds').value || 0);
const total = topLeds + rightLeds + bottomLeds + leftLeds;
// Validation
if (total !== deviceLedCount) {
error.textContent = `Total LEDs (${total}) must equal device LED count (${deviceLedCount})`;
error.style.display = 'block';
return;
}
// Build calibration config
const startPosition = document.getElementById('cal-start-position').value;
const layout = document.getElementById('cal-layout').value;
// Build segments based on start position and direction
const segments = [];
let ledStart = 0;
const edgeOrder = getEdgeOrder(startPosition, layout);
const edgeCounts = {
top: topLeds,
right: rightLeds,
bottom: bottomLeds,
left: leftLeds
};
edgeOrder.forEach(edge => {
const count = edgeCounts[edge];
if (count > 0) {
segments.push({
edge: edge,
led_start: ledStart,
led_count: count,
reverse: shouldReverse(edge, startPosition, layout)
});
ledStart += count;
}
});
const calibration = {
layout: layout,
start_position: startPosition,
segments: segments
};
try {
const response = await fetch(`${API_BASE}/devices/${deviceId}/calibration`, {
method: 'PUT',
headers: getHeaders(),
body: JSON.stringify(calibration)
});
if (response.status === 401) {
handle401Error();
return;
}
if (response.ok) {
showToast('Calibration saved', 'success');
closeCalibrationModal();
loadDevices();
} else {
const errorData = await response.json();
error.textContent = `Failed to save: ${errorData.detail}`;
error.style.display = 'block';
}
} catch (err) {
console.error('Failed to save calibration:', err);
error.textContent = 'Failed to save calibration';
error.style.display = 'block';
}
}
function getEdgeOrder(startPosition, layout) {
const clockwise = ['bottom', 'right', 'top', 'left'];
const counterclockwise = ['bottom', 'left', 'top', 'right'];
const orders = {
'bottom_left_clockwise': clockwise,
'bottom_left_counterclockwise': counterclockwise,
'bottom_right_clockwise': ['bottom', 'left', 'top', 'right'],
'bottom_right_counterclockwise': ['bottom', 'right', 'top', 'left'],
'top_left_clockwise': ['top', 'right', 'bottom', 'left'],
'top_left_counterclockwise': ['top', 'left', 'bottom', 'right'],
'top_right_clockwise': ['top', 'left', 'bottom', 'right'],
'top_right_counterclockwise': ['top', 'right', 'bottom', 'left']
};
return orders[`${startPosition}_${layout}`] || clockwise;
}
function shouldReverse(edge, startPosition, layout) {
// Determine if this edge should be reversed based on LED strip direction
const reverseRules = {
'bottom_left_clockwise': { bottom: false, right: false, top: true, left: true },
'bottom_left_counterclockwise': { bottom: false, right: true, top: true, left: false },
'bottom_right_clockwise': { bottom: true, right: false, top: false, left: true },
'bottom_right_counterclockwise': { bottom: true, right: true, top: false, left: false },
'top_left_clockwise': { top: false, right: false, bottom: true, left: true },
'top_left_counterclockwise': { top: false, right: true, bottom: true, left: false },
'top_right_clockwise': { top: true, right: false, bottom: false, left: true },
'top_right_counterclockwise': { top: true, right: true, bottom: false, left: false }
};
const rules = reverseRules[`${startPosition}_${layout}`];
return rules ? rules[edge] : false;
}
// Cleanup on page unload
window.addEventListener('beforeunload', () => {
if (refreshInterval) {
clearInterval(refreshInterval);
}
});

View File

@@ -0,0 +1,416 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WLED Screen Controller</title>
<link rel="stylesheet" href="/static/style.css">
</head>
<body>
<div class="container">
<header>
<h1>WLED Screen Controller</h1>
<div class="server-info">
<span id="server-version">Version: Loading...</span>
<span id="server-status" class="status-badge"></span>
<button class="theme-toggle" onclick="toggleTheme()" title="Toggle theme">
<span id="theme-icon">🌙</span>
</button>
<span id="auth-status" style="margin-left: 10px; display: none;">
<span id="logged-in-user" style="color: #4CAF50; margin-right: 8px;"></span>
</span>
<button id="login-btn" class="btn btn-primary" onclick="showLogin()" style="display: none; padding: 6px 16px; font-size: 0.85rem; margin-left: 10px;">
🔑 Login
</button>
<button id="logout-btn" class="btn btn-danger" onclick="logout()" style="display: none; padding: 6px 16px; font-size: 0.85rem; margin-left: 10px;">
🚪 Logout
</button>
</div>
</header>
<section class="displays-section">
<h2>Available Displays</h2>
<div id="displays-list" class="displays-grid">
<div class="loading">Loading displays...</div>
</div>
</section>
<section class="devices-section">
<h2>WLED Devices</h2>
<div id="devices-list" class="devices-grid">
<div class="loading">Loading devices...</div>
</div>
</section>
<section class="add-device-section">
<h2>Add New Device</h2>
<div class="info-banner" style="margin-bottom: 20px; padding: 12px; background: rgba(33, 150, 243, 0.1); border-left: 4px solid #2196F3; border-radius: 4px;">
<strong>📱 WLED Configuration:</strong> Configure your WLED device (effects, segments, color order, power limits, etc.) using the
<a href="https://kno.wled.ge/" target="_blank" rel="noopener" style="color: #2196F3; text-decoration: underline;">official WLED app</a>.
This controller sends pixel color data and controls brightness per device.
</div>
<form id="add-device-form">
<div class="form-group">
<label for="device-name">Device Name:</label>
<input type="text" id="device-name" placeholder="Living Room TV" required>
</div>
<div class="form-group">
<label for="device-url">WLED URL:</label>
<input type="url" id="device-url" placeholder="http://192.168.1.100" required>
</div>
<div class="form-group">
<label for="device-led-count">LED Count:</label>
<input type="number" id="device-led-count" value="150" min="1" required>
<small class="input-hint">Number of LEDs configured in your WLED device</small>
</div>
<button type="submit" class="btn btn-primary">Add Device</button>
</form>
</section>
</div>
<div id="toast" class="toast"></div>
<!-- Calibration Modal -->
<div id="calibration-modal" class="modal">
<div class="modal-content" style="max-width: 700px;">
<div class="modal-header">
<h2>📐 LED Calibration</h2>
</div>
<div class="modal-body">
<input type="hidden" id="calibration-device-id">
<p style="margin-bottom: 20px; color: var(--text-secondary);">
Configure how your LED strip is mapped to screen edges. Use test buttons to verify each edge lights up correctly.
</p>
<!-- Visual Preview -->
<div style="margin-bottom: 25px;">
<div style="position: relative; width: 400px; height: 250px; margin: 0 auto; background: var(--card-bg); border: 2px solid var(--border-color); border-radius: 8px;">
<!-- Screen representation -->
<div style="position: absolute; top: 50%; left: 50%; transform: translate(-50%, -50%); width: 300px; height: 180px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 4px; display: flex; align-items: center; justify-content: center; color: white; font-size: 14px;">
Screen
</div>
<!-- Edge labels -->
<div style="position: absolute; top: 5px; left: 50%; transform: translateX(-50%); font-size: 12px; color: var(--text-secondary);">
Top: <span id="preview-top-count">0</span> LEDs
</div>
<div style="position: absolute; right: 5px; top: 50%; transform: translateY(-50%) rotate(90deg); font-size: 12px; color: var(--text-secondary); white-space: nowrap;">
Right: <span id="preview-right-count">0</span> LEDs
</div>
<div style="position: absolute; bottom: 5px; left: 50%; transform: translateX(-50%); font-size: 12px; color: var(--text-secondary);">
Bottom: <span id="preview-bottom-count">0</span> LEDs
</div>
<div style="position: absolute; left: 5px; top: 50%; transform: translateY(-50%) rotate(-90deg); font-size: 12px; color: var(--text-secondary); white-space: nowrap;">
Left: <span id="preview-left-count">0</span> LEDs
</div>
<!-- Starting position indicator -->
<div id="start-indicator" style="position: absolute; bottom: 10px; left: 10px; width: 12px; height: 12px; background: #4CAF50; border-radius: 50%; border: 2px solid white;"></div>
</div>
</div>
<!-- Layout Configuration -->
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 15px; margin-bottom: 20px;">
<div class="form-group">
<label for="cal-start-position">Starting Position:</label>
<select id="cal-start-position" onchange="updateCalibrationPreview()">
<option value="bottom_left">Bottom Left</option>
<option value="bottom_right">Bottom Right</option>
<option value="top_left">Top Left</option>
<option value="top_right">Top Right</option>
</select>
</div>
<div class="form-group">
<label for="cal-layout">Direction:</label>
<select id="cal-layout" onchange="updateCalibrationPreview()">
<option value="clockwise">Clockwise</option>
<option value="counterclockwise">Counterclockwise</option>
</select>
</div>
</div>
<!-- LED Counts per Edge -->
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 15px; margin-bottom: 20px;">
<div class="form-group">
<label for="cal-top-leds">Top LEDs:</label>
<input type="number" id="cal-top-leds" min="0" value="0" oninput="updateCalibrationPreview()">
</div>
<div class="form-group">
<label for="cal-right-leds">Right LEDs:</label>
<input type="number" id="cal-right-leds" min="0" value="0" oninput="updateCalibrationPreview()">
</div>
<div class="form-group">
<label for="cal-bottom-leds">Bottom LEDs:</label>
<input type="number" id="cal-bottom-leds" min="0" value="0" oninput="updateCalibrationPreview()">
</div>
<div class="form-group">
<label for="cal-left-leds">Left LEDs:</label>
<input type="number" id="cal-left-leds" min="0" value="0" oninput="updateCalibrationPreview()">
</div>
</div>
<div style="padding: 10px; background: rgba(255, 193, 7, 0.1); border-left: 4px solid #FFC107; border-radius: 4px; margin-bottom: 20px;">
<strong>Total LEDs:</strong> <span id="cal-total-leds">0</span> / <span id="cal-device-led-count">0</span>
</div>
<!-- Test Buttons -->
<div style="margin-bottom: 15px;">
<p style="font-weight: 600; margin-bottom: 10px;">Test Edges (lights up each edge):</p>
<div style="display: grid; grid-template-columns: repeat(4, 1fr); gap: 10px;">
<button class="btn btn-secondary" onclick="testCalibrationEdge('top')" style="font-size: 0.9rem; padding: 8px;">
⬆️ Top
</button>
<button class="btn btn-secondary" onclick="testCalibrationEdge('right')" style="font-size: 0.9rem; padding: 8px;">
➡️ Right
</button>
<button class="btn btn-secondary" onclick="testCalibrationEdge('bottom')" style="font-size: 0.9rem; padding: 8px;">
⬇️ Bottom
</button>
<button class="btn btn-secondary" onclick="testCalibrationEdge('left')" style="font-size: 0.9rem; padding: 8px;">
⬅️ Left
</button>
</div>
</div>
<div id="calibration-error" class="error-message" style="display: none;"></div>
</div>
<div class="modal-footer">
<button class="btn btn-secondary" onclick="closeCalibrationModal()">Cancel</button>
<button class="btn btn-primary" onclick="saveCalibration()">Save Calibration</button>
</div>
</div>
</div>
<!-- Device Settings Modal -->
<div id="device-settings-modal" class="modal">
<div class="modal-content">
<div class="modal-header">
<h2>⚙️ Device Settings</h2>
</div>
<div class="modal-body">
<form id="device-settings-form">
<input type="hidden" id="settings-device-id">
<div class="form-group">
<label for="settings-device-name">Device Name:</label>
<input type="text" id="settings-device-name" placeholder="Living Room TV" required>
</div>
<div class="form-group">
<label for="settings-device-url">WLED URL:</label>
<input type="url" id="settings-device-url" placeholder="http://192.168.1.100" required>
<small class="input-hint">IP address or hostname of your WLED device</small>
</div>
<div class="form-group">
<label for="settings-device-led-count">LED Count:</label>
<input type="number" id="settings-device-led-count" min="1" required>
<small class="input-hint">Number of LEDs configured in your WLED device</small>
</div>
<div class="form-group">
<label for="settings-device-brightness">Brightness: <span id="brightness-value">100%</span></label>
<input type="range" id="settings-device-brightness" min="0" max="100" value="100"
oninput="document.getElementById('brightness-value').textContent = this.value + '%'"
style="width: 100%;">
<small class="input-hint">Global brightness for this WLED device (0-100%)</small>
</div>
<div id="settings-error" class="error-message" style="display: none;"></div>
</form>
</div>
<div class="modal-footer">
<button class="btn btn-secondary" onclick="closeDeviceSettingsModal()">Cancel</button>
<button class="btn btn-primary" onclick="saveDeviceSettings()">Save Changes</button>
</div>
</div>
</div>
<!-- Login Modal -->
<div id="api-key-modal" class="modal">
<div class="modal-content">
<div class="modal-header">
<h2>🔑 Login to WLED Controller</h2>
</div>
<div class="modal-body">
<p class="modal-description">
Please enter your API key to authenticate and access the WLED Screen Controller.
</p>
<div class="form-group">
<label for="api-key-input">API Key:</label>
<div class="password-input-wrapper">
<input
type="password"
id="api-key-input"
placeholder="Enter your API key..."
autocomplete="off"
>
<button type="button" class="password-toggle" onclick="togglePasswordVisibility()">
👁️
</button>
</div>
<small class="input-hint">Your API key will be stored securely in your browser's local storage.</small>
</div>
<div id="api-key-error" class="error-message" style="display: none;"></div>
</div>
<div class="modal-footer">
<button class="btn btn-secondary" onclick="closeApiKeyModal()" id="modal-cancel-btn">Cancel</button>
<button class="btn btn-primary" onclick="submitApiKey()">Login</button>
</div>
</div>
</div>
<script src="/static/app.js"></script>
<script>
// Initialize theme
const savedTheme = localStorage.getItem('theme') || 'dark';
document.documentElement.setAttribute('data-theme', savedTheme);
updateThemeIcon(savedTheme);
function updateThemeIcon(theme) {
const icon = document.getElementById('theme-icon');
icon.textContent = theme === 'dark' ? '☀️' : '🌙';
}
function toggleTheme() {
const currentTheme = document.documentElement.getAttribute('data-theme');
const newTheme = currentTheme === 'dark' ? 'light' : 'dark';
document.documentElement.setAttribute('data-theme', newTheme);
localStorage.setItem('theme', newTheme);
updateThemeIcon(newTheme);
showToast(`Switched to ${newTheme} theme`, 'info');
}
// Initialize auth state
function updateAuthUI() {
const apiKey = localStorage.getItem('wled_api_key');
const loginBtn = document.getElementById('login-btn');
const logoutBtn = document.getElementById('logout-btn');
const authStatus = document.getElementById('auth-status');
const loggedInUser = document.getElementById('logged-in-user');
if (apiKey) {
// Logged in
loginBtn.style.display = 'none';
logoutBtn.style.display = 'inline-block';
authStatus.style.display = 'inline';
// Show masked key
const masked = apiKey.substring(0, 8) + '...';
loggedInUser.textContent = `● Authenticated`;
loggedInUser.title = `API Key: ${masked}`;
} else {
// Logged out
loginBtn.style.display = 'inline-block';
logoutBtn.style.display = 'none';
authStatus.style.display = 'none';
}
}
function showLogin() {
showApiKeyModal('Enter your API key to login and access the controller.');
document.getElementById('modal-cancel-btn').style.display = 'inline-block';
}
function logout() {
if (confirm('Are you sure you want to logout?')) {
localStorage.removeItem('wled_api_key');
apiKey = null;
updateAuthUI();
showToast('Logged out successfully', 'info');
// Clear the UI
document.getElementById('devices-list').innerHTML = '<div class="loading">Please login to view devices</div>';
document.getElementById('displays-list').innerHTML = '<div class="loading">Please login to view displays</div>';
}
}
// Initialize on load
updateAuthUI();
// Modal functions
function togglePasswordVisibility() {
const input = document.getElementById('api-key-input');
const button = document.querySelector('.password-toggle');
if (input.type === 'password') {
input.type = 'text';
button.textContent = '🙈';
} else {
input.type = 'password';
button.textContent = '👁️';
}
}
function showApiKeyModal(message, hideCancel = false) {
const modal = document.getElementById('api-key-modal');
const description = document.querySelector('.modal-description');
const input = document.getElementById('api-key-input');
const error = document.getElementById('api-key-error');
const cancelBtn = document.getElementById('modal-cancel-btn');
if (message) {
description.textContent = message;
}
input.value = '';
input.placeholder = 'Enter your API key...';
error.style.display = 'none';
modal.style.display = 'flex';
// Hide cancel button if this is required login (no existing session)
cancelBtn.style.display = hideCancel ? 'none' : 'inline-block';
setTimeout(() => input.focus(), 100);
}
function closeApiKeyModal() {
const modal = document.getElementById('api-key-modal');
modal.style.display = 'none';
}
function submitApiKey() {
const input = document.getElementById('api-key-input');
const error = document.getElementById('api-key-error');
const key = input.value.trim();
if (!key) {
error.textContent = 'Please enter an API key';
error.style.display = 'block';
return;
}
// Store the key
localStorage.setItem('wled_api_key', key);
apiKey = key;
updateAuthUI();
closeApiKeyModal();
showToast('Logged in successfully!', 'success');
// Reload data
loadServerInfo();
loadDisplays();
loadDevices();
// Start auto-refresh if not already running
if (!refreshInterval) {
startAutoRefresh();
}
}
// Handle Enter key in modal
document.addEventListener('DOMContentLoaded', () => {
document.getElementById('api-key-input').addEventListener('keypress', (e) => {
if (e.key === 'Enter') {
submitApiKey();
}
});
});
</script>
</body>
</html>

View File

@@ -0,0 +1,510 @@
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
:root {
--primary-color: #4CAF50;
--danger-color: #f44336;
--warning-color: #ff9800;
--info-color: #2196F3;
}
/* Dark theme (default) */
[data-theme="dark"] {
--bg-color: #1a1a1a;
--card-bg: #2d2d2d;
--text-color: #e0e0e0;
--border-color: #404040;
}
/* Light theme */
[data-theme="light"] {
--bg-color: #f5f5f5;
--card-bg: #ffffff;
--text-color: #333333;
--border-color: #e0e0e0;
}
/* Default to dark theme */
body {
background: var(--bg-color);
color: var(--text-color);
}
html {
background: var(--bg-color);
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif;
background: var(--bg-color);
color: var(--text-color);
line-height: 1.6;
}
.container {
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
header {
display: flex;
justify-content: space-between;
align-items: center;
padding: 20px 0;
border-bottom: 2px solid var(--border-color);
margin-bottom: 30px;
}
h1 {
font-size: 2rem;
color: var(--primary-color);
}
h2 {
margin-bottom: 20px;
color: var(--text-color);
font-size: 1.5rem;
}
.server-info {
display: flex;
align-items: center;
gap: 15px;
}
.status-badge {
font-size: 1.5rem;
animation: pulse 2s infinite;
}
.status-badge.online {
color: var(--primary-color);
}
.status-badge.offline {
color: var(--danger-color);
}
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.5; }
}
section {
margin-bottom: 40px;
}
.displays-grid,
.devices-grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(300px, 1fr));
gap: 20px;
}
.card {
background: var(--card-bg);
border: 1px solid var(--border-color);
border-radius: 8px;
padding: 20px;
transition: transform 0.2s, box-shadow 0.2s;
}
.card:hover {
transform: translateY(-2px);
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.3);
}
.card-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 15px;
}
.card-title {
font-size: 1.2rem;
font-weight: 600;
}
.badge {
padding: 4px 12px;
border-radius: 12px;
font-size: 0.85rem;
font-weight: 600;
}
.badge.processing {
background: var(--primary-color);
color: white;
}
.badge.idle {
background: var(--warning-color);
color: white;
}
.badge.error {
background: var(--danger-color);
color: white;
}
.card-content {
margin-bottom: 15px;
}
.info-row {
display: flex;
justify-content: space-between;
padding: 8px 0;
border-bottom: 1px solid var(--border-color);
}
.info-row:last-child {
border-bottom: none;
}
.info-label {
color: #999;
}
.info-value {
font-weight: 600;
}
.card-actions {
display: flex;
flex-wrap: wrap;
gap: 10px;
justify-content: center;
}
.btn {
padding: 8px 16px;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 0.9rem;
font-weight: 600;
transition: opacity 0.2s;
flex: 1 1 auto;
min-width: 100px;
}
.btn:hover {
opacity: 0.9;
}
.btn:disabled {
opacity: 0.5;
cursor: not-allowed;
}
.btn-primary {
background: var(--primary-color);
color: white;
}
.btn-danger {
background: var(--danger-color);
color: white;
}
.btn-secondary {
background: var(--border-color);
color: var(--text-color);
}
.display-card {
background: var(--card-bg);
border: 1px solid var(--border-color);
border-radius: 8px;
padding: 15px;
text-align: center;
}
.display-index {
font-size: 2rem;
font-weight: 700;
color: var(--info-color);
margin-bottom: 10px;
}
.add-device-section {
background: var(--card-bg);
border: 1px solid var(--border-color);
border-radius: 8px;
padding: 20px;
}
.form-group {
margin-bottom: 15px;
}
label {
display: block;
margin-bottom: 5px;
color: #999;
font-weight: 500;
}
input[type="text"],
input[type="url"],
input[type="number"],
input[type="password"],
select {
width: 100%;
padding: 10px;
border: 1px solid var(--border-color);
border-radius: 4px;
background: var(--bg-color);
color: var(--text-color);
font-size: 1rem;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif;
transition: border-color 0.2s, box-shadow 0.2s;
}
/* Better password field appearance */
input[type="password"] {
letter-spacing: 0.15em;
}
input:focus,
select:focus {
outline: none;
border-color: var(--primary-color);
box-shadow: 0 0 0 2px rgba(76, 175, 80, 0.2);
}
/* Remove browser autofill styling */
input:-webkit-autofill,
input:-webkit-autofill:hover,
input:-webkit-autofill:focus {
-webkit-box-shadow: 0 0 0 1000px var(--bg-color) inset;
-webkit-text-fill-color: var(--text-color);
transition: background-color 5000s ease-in-out 0s;
}
.loading {
text-align: center;
padding: 40px;
color: #999;
}
.toast {
position: fixed;
bottom: 20px;
right: 20px;
padding: 15px 20px;
border-radius: 4px;
color: white;
font-weight: 600;
opacity: 0;
transition: opacity 0.3s;
z-index: 1000;
}
.toast.show {
opacity: 1;
}
.toast.success {
background: var(--primary-color);
}
.toast.error {
background: var(--danger-color);
}
.toast.info {
background: var(--info-color);
}
.metrics-grid {
display: grid;
grid-template-columns: repeat(2, 1fr);
gap: 10px;
margin-top: 10px;
}
.metric {
text-align: center;
padding: 10px;
background: var(--bg-color);
border-radius: 4px;
}
.metric-value {
font-size: 1.5rem;
font-weight: 700;
color: var(--primary-color);
}
.metric-label {
font-size: 0.85rem;
color: #999;
margin-top: 5px;
}
/* Modal Styles */
.modal {
display: none;
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
background: rgba(0, 0, 0, 0.8);
z-index: 2000;
align-items: center;
justify-content: center;
animation: fadeIn 0.2s ease-out;
}
@keyframes fadeIn {
from { opacity: 0; }
to { opacity: 1; }
}
.modal-content {
background: var(--card-bg);
border: 1px solid var(--border-color);
border-radius: 12px;
max-width: 500px;
width: 90%;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.5);
animation: slideUp 0.3s ease-out;
}
@keyframes slideUp {
from {
transform: translateY(20px);
opacity: 0;
}
to {
transform: translateY(0);
opacity: 1;
}
}
.modal-header {
padding: 24px 24px 16px 24px;
border-bottom: 1px solid var(--border-color);
}
.modal-header h2 {
margin: 0;
font-size: 1.5rem;
color: var(--text-color);
}
.modal-body {
padding: 24px;
}
.modal-description {
color: #999;
margin-bottom: 20px;
line-height: 1.6;
}
.password-input-wrapper {
position: relative;
display: flex;
align-items: center;
}
.password-input-wrapper input {
flex: 1;
padding-right: 45px;
}
.password-toggle {
position: absolute;
right: 8px;
background: none;
border: none;
cursor: pointer;
font-size: 1.2rem;
padding: 8px;
color: var(--text-color);
opacity: 0.6;
transition: opacity 0.2s;
}
.password-toggle:hover {
opacity: 1;
}
.input-hint {
display: block;
margin-top: 8px;
color: #666;
font-size: 0.85rem;
}
.error-message {
background: rgba(244, 67, 54, 0.1);
border: 1px solid var(--danger-color);
color: var(--danger-color);
padding: 12px;
border-radius: 4px;
margin-top: 15px;
font-size: 0.9rem;
}
.modal-footer {
padding: 16px 24px 24px 24px;
display: flex;
justify-content: flex-end;
gap: 12px;
}
.modal-footer .btn {
min-width: 100px;
}
/* Theme Toggle */
.theme-toggle {
background: var(--card-bg);
border: 1px solid var(--border-color);
padding: 6px 12px;
border-radius: 4px;
cursor: pointer;
font-size: 1.2rem;
transition: transform 0.2s;
margin-left: 10px;
}
.theme-toggle:hover {
transform: scale(1.1);
}
@media (max-width: 768px) {
.displays-grid,
.devices-grid {
grid-template-columns: 1fr;
}
header {
flex-direction: column;
gap: 15px;
text-align: center;
}
.modal-content {
width: 95%;
margin: 20px;
}
.modal-footer {
flex-direction: column-reverse;
}
.modal-footer .btn {
width: 100%;
}
}

View File

@@ -0,0 +1,5 @@
"""Storage layer for device and configuration persistence."""
from .device_store import DeviceStore
__all__ = ["DeviceStore"]

View File

@@ -0,0 +1,360 @@
"""Device storage using JSON files."""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from wled_controller.core.calibration import (
CalibrationConfig,
calibration_from_dict,
calibration_to_dict,
create_default_calibration,
)
from wled_controller.core.processor_manager import ProcessingSettings
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class Device:
"""Represents a WLED device configuration."""
def __init__(
self,
device_id: str,
name: str,
url: str,
led_count: int,
enabled: bool = True,
settings: Optional[ProcessingSettings] = None,
calibration: Optional[CalibrationConfig] = None,
created_at: Optional[datetime] = None,
updated_at: Optional[datetime] = None,
):
"""Initialize device.
Args:
device_id: Unique device identifier
name: Device name
url: WLED device URL
led_count: Number of LEDs
enabled: Whether device is enabled
settings: Processing settings
calibration: Calibration configuration
created_at: Creation timestamp
updated_at: Last update timestamp
"""
self.id = device_id
self.name = name
self.url = url
self.led_count = led_count
self.enabled = enabled
self.settings = settings or ProcessingSettings()
self.calibration = calibration or create_default_calibration(led_count)
self.created_at = created_at or datetime.utcnow()
self.updated_at = updated_at or datetime.utcnow()
def to_dict(self) -> dict:
"""Convert device to dictionary.
Returns:
Dictionary representation
"""
return {
"id": self.id,
"name": self.name,
"url": self.url,
"led_count": self.led_count,
"enabled": self.enabled,
"settings": {
"display_index": self.settings.display_index,
"fps": self.settings.fps,
"border_width": self.settings.border_width,
"brightness": self.settings.brightness,
"gamma": self.settings.gamma,
"saturation": self.settings.saturation,
"smoothing": self.settings.smoothing,
"interpolation_mode": self.settings.interpolation_mode,
},
"calibration": calibration_to_dict(self.calibration),
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}
@classmethod
def from_dict(cls, data: dict) -> "Device":
"""Create device from dictionary.
Args:
data: Dictionary with device data
Returns:
Device instance
"""
settings_data = data.get("settings", {})
settings = ProcessingSettings(
display_index=settings_data.get("display_index", 0),
fps=settings_data.get("fps", 30),
border_width=settings_data.get("border_width", 10),
brightness=settings_data.get("brightness", 1.0),
gamma=settings_data.get("gamma", 2.2),
saturation=settings_data.get("saturation", 1.0),
smoothing=settings_data.get("smoothing", 0.3),
interpolation_mode=settings_data.get("interpolation_mode", "average"),
)
calibration_data = data.get("calibration")
calibration = (
calibration_from_dict(calibration_data)
if calibration_data
else create_default_calibration(data["led_count"])
)
return cls(
device_id=data["id"],
name=data["name"],
url=data["url"],
led_count=data["led_count"],
enabled=data.get("enabled", True),
settings=settings,
calibration=calibration,
created_at=datetime.fromisoformat(data.get("created_at", datetime.utcnow().isoformat())),
updated_at=datetime.fromisoformat(data.get("updated_at", datetime.utcnow().isoformat())),
)
class DeviceStore:
"""Persistent storage for WLED devices."""
def __init__(self, storage_file: str | Path):
"""Initialize device store.
Args:
storage_file: Path to JSON storage file
"""
self.storage_file = Path(storage_file)
self._devices: Dict[str, Device] = {}
# Ensure directory exists
self.storage_file.parent.mkdir(parents=True, exist_ok=True)
# Load existing devices
self.load()
logger.info(f"Device store initialized with {len(self._devices)} devices")
def load(self):
"""Load devices from storage file."""
if not self.storage_file.exists():
logger.info(f"Storage file does not exist, starting with empty store")
return
try:
with open(self.storage_file, "r") as f:
data = json.load(f)
devices_data = data.get("devices", {})
self._devices = {
device_id: Device.from_dict(device_data)
for device_id, device_data in devices_data.items()
}
logger.info(f"Loaded {len(self._devices)} devices from storage")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse storage file: {e}")
raise
except Exception as e:
logger.error(f"Failed to load devices: {e}")
raise
def save(self):
"""Save devices to storage file."""
try:
data = {
"devices": {
device_id: device.to_dict()
for device_id, device in self._devices.items()
}
}
# Write to temporary file first
temp_file = self.storage_file.with_suffix(".tmp")
with open(temp_file, "w") as f:
json.dump(data, f, indent=2)
# Atomic rename
temp_file.replace(self.storage_file)
logger.debug(f"Saved {len(self._devices)} devices to storage")
except Exception as e:
logger.error(f"Failed to save devices: {e}")
raise
def create_device(
self,
name: str,
url: str,
led_count: int,
settings: Optional[ProcessingSettings] = None,
calibration: Optional[CalibrationConfig] = None,
) -> Device:
"""Create a new device.
Args:
name: Device name
url: WLED device URL
led_count: Number of LEDs
settings: Processing settings
calibration: Calibration configuration
Returns:
Created device
Raises:
ValueError: If validation fails
"""
# Generate unique ID
device_id = f"device_{uuid.uuid4().hex[:8]}"
# Create device
device = Device(
device_id=device_id,
name=name,
url=url,
led_count=led_count,
settings=settings,
calibration=calibration,
)
# Store
self._devices[device_id] = device
self.save()
logger.info(f"Created device {device_id}: {name}")
return device
def get_device(self, device_id: str) -> Optional[Device]:
"""Get device by ID.
Args:
device_id: Device identifier
Returns:
Device or None if not found
"""
return self._devices.get(device_id)
def get_all_devices(self) -> List[Device]:
"""Get all devices.
Returns:
List of all devices
"""
return list(self._devices.values())
def update_device(
self,
device_id: str,
name: Optional[str] = None,
url: Optional[str] = None,
led_count: Optional[int] = None,
enabled: Optional[bool] = None,
settings: Optional[ProcessingSettings] = None,
calibration: Optional[CalibrationConfig] = None,
) -> Device:
"""Update device.
Args:
device_id: Device identifier
name: New name (optional)
url: New URL (optional)
led_count: New LED count (optional)
enabled: New enabled state (optional)
settings: New settings (optional)
calibration: New calibration (optional)
Returns:
Updated device
Raises:
ValueError: If device not found or validation fails
"""
device = self._devices.get(device_id)
if not device:
raise ValueError(f"Device {device_id} not found")
# Update fields
if name is not None:
device.name = name
if url is not None:
device.url = url
if led_count is not None:
device.led_count = led_count
# Reset calibration if LED count changed
device.calibration = create_default_calibration(led_count)
if enabled is not None:
device.enabled = enabled
if settings is not None:
device.settings = settings
if calibration is not None:
# Validate LED count matches
if calibration.get_total_leds() != device.led_count:
raise ValueError(
f"Calibration LED count ({calibration.get_total_leds()}) "
f"does not match device LED count ({device.led_count})"
)
device.calibration = calibration
device.updated_at = datetime.utcnow()
# Save
self.save()
logger.info(f"Updated device {device_id}")
return device
def delete_device(self, device_id: str):
"""Delete device.
Args:
device_id: Device identifier
Raises:
ValueError: If device not found
"""
if device_id not in self._devices:
raise ValueError(f"Device {device_id} not found")
del self._devices[device_id]
self.save()
logger.info(f"Deleted device {device_id}")
def device_exists(self, device_id: str) -> bool:
"""Check if device exists.
Args:
device_id: Device identifier
Returns:
True if device exists
"""
return device_id in self._devices
def count(self) -> int:
"""Get number of devices.
Returns:
Device count
"""
return len(self._devices)
def clear(self):
"""Clear all devices (for testing)."""
self._devices.clear()
self.save()
logger.warning("Cleared all devices from storage")

View File

@@ -0,0 +1,6 @@
"""Utility functions and helpers."""
from .logger import setup_logging, get_logger
from .monitor_names import get_monitor_names, get_monitor_name
__all__ = ["setup_logging", "get_logger", "get_monitor_names", "get_monitor_name"]

View File

@@ -0,0 +1,86 @@
"""Logging configuration and setup."""
import logging
import sys
from pathlib import Path
from logging.handlers import RotatingFileHandler
import structlog
from pythonjsonlogger import jsonlogger
from wled_controller.config import get_config
def setup_logging() -> None:
"""Configure structured logging for the application."""
config = get_config()
# Ensure log directory exists
log_path = Path(config.logging.file)
log_path.parent.mkdir(parents=True, exist_ok=True)
# Configure root logger
root_logger = logging.getLogger()
root_logger.setLevel(config.server.log_level)
# Remove existing handlers
root_logger.handlers.clear()
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(config.server.log_level)
# File handler with rotation
file_handler = RotatingFileHandler(
filename=str(log_path),
maxBytes=config.logging.max_size_mb * 1024 * 1024,
backupCount=config.logging.backup_count,
)
file_handler.setLevel(config.server.log_level)
# Configure formatter based on format setting
if config.logging.format == "json":
formatter = jsonlogger.JsonFormatter(
"%(asctime)s %(name)s %(levelname)s %(message)s"
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
else:
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
root_logger.addHandler(file_handler)
# Configure structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
if config.logging.format == "json"
else structlog.dev.ConsoleRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(logging.NOTSET),
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
def get_logger(name: str) -> structlog.stdlib.BoundLogger:
"""Get a configured logger instance.
Args:
name: Logger name (typically __name__)
Returns:
Configured structlog logger
"""
return structlog.get_logger(name)

View File

@@ -0,0 +1,79 @@
"""Utility functions for retrieving friendly monitor/display names."""
import sys
from typing import Dict, Optional
from wled_controller.utils import get_logger
logger = get_logger(__name__)
def get_monitor_names() -> Dict[int, str]:
"""Get friendly names for connected monitors.
On Windows, attempts to retrieve monitor names from WMI.
On other platforms, returns empty dict (will fall back to generic names).
Returns:
Dictionary mapping display indices to friendly names
"""
if sys.platform != "win32":
logger.debug("Monitor name detection only supported on Windows")
return {}
try:
import wmi
w = wmi.WMI(namespace="wmi")
monitors = w.WmiMonitorID()
monitor_names = {}
for idx, monitor in enumerate(monitors):
try:
# Extract manufacturer name
manufacturer = ""
if monitor.ManufacturerName:
manufacturer = "".join(chr(c) for c in monitor.ManufacturerName if c != 0)
# Extract user-friendly name
user_name = ""
if monitor.UserFriendlyName:
user_name = "".join(chr(c) for c in monitor.UserFriendlyName if c != 0)
# Build friendly name
if user_name:
friendly_name = user_name.strip()
elif manufacturer:
friendly_name = f"{manufacturer.strip()} Monitor"
else:
friendly_name = f"Display {idx}"
monitor_names[idx] = friendly_name
logger.debug(f"Monitor {idx}: {friendly_name}")
except Exception as e:
logger.debug(f"Failed to parse monitor {idx} name: {e}")
monitor_names[idx] = f"Display {idx}"
return monitor_names
except ImportError:
logger.debug("WMI library not available - install with: pip install wmi")
return {}
except Exception as e:
logger.debug(f"Failed to retrieve monitor names via WMI: {e}")
return {}
def get_monitor_name(index: int) -> str:
"""Get friendly name for a specific monitor.
Args:
index: Monitor index (0-based)
Returns:
Friendly monitor name or generic fallback
"""
monitor_names = get_monitor_names()
return monitor_names.get(index, f"Display {index}")