Add LED device abstraction layer for multi-controller support

Introduce abstract LEDClient base class with factory pattern so new
LED controller types can plug in alongside WLED. ProcessorManager is
now fully type-agnostic — all device-specific logic (health checks,
state snapshot/restore, fast send) lives behind the LEDClient interface.

- New led_client.py: LEDClient ABC, DeviceHealth, factory functions
- WLEDClient inherits LEDClient, encapsulates WLED health checks and state management
- device_type field on Device storage model (defaults to "wled")
- Rename target_type "wled" → "led" with backward-compat migration
- Frontend: "WLED" tab → "LED", device type badge, type selector in
  add-device modal, device type shown in target device dropdown
- All wled_* API fields renamed to device_* for generic naming

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-16 12:41:02 +03:00
parent afce183f79
commit b5a6885126
18 changed files with 667 additions and 346 deletions

View File

@@ -79,6 +79,8 @@ class CalibrationConfig:
# Skip LEDs: black out N LEDs at the start/end of the strip
skip_leds_start: int = 0
skip_leds_end: int = 0
# Border width: how many pixels from the screen edge to sample
border_width: int = 10
def build_segments(self) -> List[CalibrationSegment]:
"""Derive segment list from core parameters."""
@@ -463,6 +465,7 @@ def calibration_from_dict(data: dict) -> CalibrationConfig:
span_left_end=data.get("span_left_end", 1.0),
skip_leds_start=data.get("skip_leds_start", 0),
skip_leds_end=data.get("skip_leds_end", 0),
border_width=data.get("border_width", 10),
)
config.validate()
@@ -506,4 +509,6 @@ def calibration_to_dict(config: CalibrationConfig) -> dict:
result["skip_leds_start"] = config.skip_leds_start
if config.skip_leds_end > 0:
result["skip_leds_end"] = config.skip_leds_end
if config.border_width != 10:
result["border_width"] = config.border_width
return result

View File

@@ -0,0 +1,180 @@
"""Abstract base class for LED device communication clients."""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional, Tuple
@dataclass
class DeviceHealth:
"""Health check result for an LED device."""
online: bool = False
latency_ms: Optional[float] = None
last_checked: Optional[datetime] = None
# Device-reported metadata (populated by type-specific health check)
device_name: Optional[str] = None
device_version: Optional[str] = None
device_led_count: Optional[int] = None
device_rgbw: Optional[bool] = None
device_led_type: Optional[str] = None
error: Optional[str] = None
class LEDClient(ABC):
"""Abstract base for LED device communication.
Lifecycle:
client = SomeLEDClient(url, ...)
await client.connect()
state = await client.snapshot_device_state() # save before streaming
client.send_pixels_fast(pixels, brightness) # if supports_fast_send
await client.send_pixels(pixels, brightness)
await client.restore_device_state(state) # restore after streaming
await client.close()
Or as async context manager:
async with SomeLEDClient(url, ...) as client:
...
"""
@abstractmethod
async def connect(self) -> bool:
"""Establish connection. Returns True on success, raises on failure."""
...
@abstractmethod
async def close(self) -> None:
"""Close the connection and release resources."""
...
@property
@abstractmethod
def is_connected(self) -> bool:
"""Check if connected."""
...
@abstractmethod
async def send_pixels(
self,
pixels: List[Tuple[int, int, int]],
brightness: int = 255,
) -> bool:
"""Send pixel colors to the LED device (async).
Args:
pixels: List of (R, G, B) tuples
brightness: Global brightness (0-255)
"""
...
@property
def supports_fast_send(self) -> bool:
"""Whether send_pixels_fast() is available (e.g. DDP UDP)."""
return False
def send_pixels_fast(
self,
pixels: List[Tuple[int, int, int]],
brightness: int = 255,
) -> None:
"""Synchronous fire-and-forget send for the hot loop.
Override in subclasses that support a fast protocol (e.g. DDP).
"""
raise NotImplementedError("send_pixels_fast not supported for this device type")
async def snapshot_device_state(self) -> Optional[dict]:
"""Snapshot device state before streaming starts.
Override in subclasses that need to save/restore state around streaming.
Returns a state dict to pass to restore_device_state(), or None.
"""
return None
async def restore_device_state(self, state: Optional[dict]) -> None:
"""Restore device state after streaming stops.
Args:
state: State dict returned by snapshot_device_state(), or None.
"""
pass
@classmethod
async def check_health(
cls,
url: str,
http_client,
prev_health: Optional[DeviceHealth] = None,
) -> DeviceHealth:
"""Check device health without a full client connection.
Override in subclasses for type-specific health probes.
Default: mark as online with no metadata.
Args:
url: Device URL
http_client: Shared httpx.AsyncClient for HTTP requests
prev_health: Previous health result (for preserving cached metadata)
"""
return DeviceHealth(online=True, last_checked=datetime.utcnow())
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
# Per-device-type capability sets.
# Used by API routes to gate type-specific features (e.g. brightness control).
DEVICE_TYPE_CAPABILITIES = {
"wled": {"brightness_control"},
}
def get_device_capabilities(device_type: str) -> set:
"""Return the capability set for a device type."""
return DEVICE_TYPE_CAPABILITIES.get(device_type, set())
def create_led_client(device_type: str, url: str, **kwargs) -> LEDClient:
"""Factory: create the right LEDClient subclass for a device type.
Args:
device_type: Device type identifier (e.g. "wled")
url: Device URL
**kwargs: Passed to the client constructor
Returns:
LEDClient instance
Raises:
ValueError: If device_type is unknown
"""
if device_type == "wled":
from wled_controller.core.wled_client import WLEDClient
return WLEDClient(url, **kwargs)
raise ValueError(f"Unknown LED device type: {device_type}")
async def check_device_health(
device_type: str,
url: str,
http_client,
prev_health: Optional[DeviceHealth] = None,
) -> DeviceHealth:
"""Factory: dispatch health check to the right LEDClient subclass.
Args:
device_type: Device type identifier
url: Device URL
http_client: Shared httpx.AsyncClient
prev_health: Previous health result
"""
if device_type == "wled":
from wled_controller.core.wled_client import WLEDClient
return await WLEDClient.check_health(url, http_client, prev_health)
return DeviceHealth(online=True, last_checked=datetime.utcnow())

View File

@@ -26,7 +26,12 @@ from wled_controller.core.screen_capture import (
calculate_median_color,
extract_border_pixels,
)
from wled_controller.core.wled_client import WLEDClient
from wled_controller.core.led_client import (
DeviceHealth,
LEDClient,
check_device_health,
create_led_client,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
@@ -87,27 +92,12 @@ def _process_kc_frame(capture, rectangles, calc_fn, previous_colors, smoothing):
)
return colors
# WLED LED bus type codes from const.h → human-readable names
WLED_LED_TYPES: Dict[int, str] = {
18: "WS2812 1ch", 19: "WS2812 1ch x3", 20: "WS2812 CCT", 21: "WS2812 WWA",
22: "WS2812B", 23: "GS8608", 24: "WS2811 400kHz", 25: "TM1829",
26: "UCS8903", 27: "APA106", 28: "FW1906", 29: "UCS8904",
30: "SK6812 RGBW", 31: "TM1814", 32: "WS2805", 33: "TM1914", 34: "SM16825",
40: "On/Off", 41: "PWM 1ch", 42: "PWM 2ch", 43: "PWM 3ch",
44: "PWM 4ch", 45: "PWM 5ch", 46: "PWM 6ch",
50: "WS2801", 51: "APA102", 52: "LPD8806", 53: "P9813", 54: "LPD6803",
65: "HUB75 HS", 66: "HUB75 QS",
80: "DDP RGB", 81: "E1.31", 82: "Art-Net", 88: "DDP RGBW", 89: "Art-Net RGBW",
}
@dataclass
class ProcessingSettings:
"""Settings for screen processing."""
display_index: int = 0
fps: int = 30
border_width: int = 10
brightness: float = 1.0
gamma: float = 2.2
saturation: float = 1.0
@@ -117,21 +107,6 @@ class ProcessingSettings:
state_check_interval: int = DEFAULT_STATE_CHECK_INTERVAL
@dataclass
class DeviceHealth:
"""Health check result for a WLED device (GET /json/info)."""
online: bool = False
latency_ms: Optional[float] = None
last_checked: Optional[datetime] = None
wled_name: Optional[str] = None
wled_version: Optional[str] = None
wled_led_count: Optional[int] = None
wled_rgbw: Optional[bool] = None
wled_led_type: Optional[str] = None
error: Optional[str] = None
@dataclass
class ProcessingMetrics:
"""Metrics for processing performance."""
@@ -150,12 +125,13 @@ class ProcessingMetrics:
@dataclass
class DeviceState:
"""State for a registered WLED device (health monitoring + calibration)."""
"""State for a registered LED device (health monitoring + calibration)."""
device_id: str
device_url: str
led_count: int
calibration: CalibrationConfig
device_type: str = "wled"
health: DeviceHealth = field(default_factory=DeviceHealth)
health_task: Optional[asyncio.Task] = None
# Calibration test mode (works independently of target processing)
@@ -174,7 +150,7 @@ class TargetState:
settings: ProcessingSettings
calibration: CalibrationConfig
picture_source_id: str = ""
wled_client: Optional[WLEDClient] = None
led_client: Optional[LEDClient] = None
pixel_mapper: Optional[PixelMapper] = None
is_running: bool = False
task: Optional[asyncio.Task] = None
@@ -187,8 +163,8 @@ class TargetState:
resolved_engine_config: Optional[dict] = None
# LiveStream: runtime frame source (shared via LiveStreamManager)
live_stream: Optional[LiveStream] = None
# WLED state snapshot taken before streaming starts (to restore on stop)
wled_state_before: Optional[dict] = None
# Device state snapshot taken before streaming starts (to restore on stop)
device_state_before: Optional[dict] = None
@dataclass
@@ -245,14 +221,16 @@ class ProcessorManager:
device_url: str,
led_count: int,
calibration: Optional[CalibrationConfig] = None,
device_type: str = "wled",
):
"""Register a device for health monitoring.
Args:
device_id: Unique device identifier
device_url: WLED device URL
device_url: Device URL
led_count: Number of LEDs
calibration: Calibration config (creates default if None)
device_type: LED device type (e.g. "wled")
"""
if device_id in self._devices:
raise ValueError(f"Device {device_id} already registered")
@@ -265,6 +243,7 @@ class ProcessorManager:
device_url=device_url,
led_count=led_count,
calibration=calibration,
device_type=device_type,
)
self._devices[device_id] = state
@@ -356,11 +335,11 @@ class ProcessorManager:
"online": h.online,
"latency_ms": h.latency_ms,
"last_checked": h.last_checked,
"wled_name": h.wled_name,
"wled_version": h.wled_version,
"wled_led_count": h.wled_led_count,
"wled_rgbw": h.wled_rgbw,
"wled_led_type": h.wled_led_type,
"device_name": h.device_name,
"device_version": h.device_version,
"device_led_count": h.device_led_count,
"device_rgbw": h.device_rgbw,
"device_led_type": h.device_led_type,
"error": h.error,
}
@@ -373,15 +352,15 @@ class ProcessorManager:
h = ds.health
return {
"device_id": device_id,
"wled_online": h.online,
"wled_latency_ms": h.latency_ms,
"wled_name": h.wled_name,
"wled_version": h.wled_version,
"wled_led_count": h.wled_led_count,
"wled_rgbw": h.wled_rgbw,
"wled_led_type": h.wled_led_type,
"wled_last_checked": h.last_checked,
"wled_error": h.error,
"device_online": h.online,
"device_latency_ms": h.latency_ms,
"device_name": h.device_name,
"device_version": h.device_version,
"device_led_count": h.device_led_count,
"device_rgbw": h.device_rgbw,
"device_led_type": h.device_led_type,
"device_last_checked": h.last_checked,
"device_error": h.error,
"test_mode": ds.test_mode_active,
"test_mode_edges": list(ds.test_mode_edges.keys()),
}
@@ -548,31 +527,22 @@ class ProcessorManager:
# Resolve stream settings
self._resolve_stream_settings(state)
# Snapshot WLED state before streaming changes it
try:
async with httpx.AsyncClient(timeout=5) as http:
resp = await http.get(f"{state.device_url}/json/state")
resp.raise_for_status()
wled_state = resp.json()
state.wled_state_before = {
"on": wled_state.get("on", True),
"lor": wled_state.get("lor", 0),
}
if "AudioReactive" in wled_state:
state.wled_state_before["AudioReactive"] = wled_state["AudioReactive"]
logger.info(f"Saved WLED state before streaming: {state.wled_state_before}")
except Exception as e:
logger.warning(f"Could not snapshot WLED state: {e}")
state.wled_state_before = None
# Determine device type from device state
device_type = "wled"
if state.device_id in self._devices:
device_type = self._devices[state.device_id].device_type
# Connect to WLED device (always use DDP for low-latency UDP streaming)
# Connect to LED device via factory
try:
state.wled_client = WLEDClient(state.device_url, use_ddp=True)
await state.wled_client.connect()
logger.info(f"Target {target_id} using DDP protocol ({state.led_count} LEDs)")
state.led_client = create_led_client(device_type, state.device_url, use_ddp=True)
await state.led_client.connect()
logger.info(f"Target {target_id} connected to {device_type} device ({state.led_count} LEDs)")
# Snapshot device state before streaming (type-specific, e.g. WLED saves on/lor/AudioReactive)
state.device_state_before = await state.led_client.snapshot_device_state()
except Exception as e:
logger.error(f"Failed to connect to WLED device for target {target_id}: {e}")
raise RuntimeError(f"Failed to connect to WLED device: {e}")
logger.error(f"Failed to connect to LED device for target {target_id}: {e}")
raise RuntimeError(f"Failed to connect to LED device: {e}")
# Acquire live stream via LiveStreamManager (shared across targets)
try:
@@ -589,8 +559,8 @@ class ProcessorManager:
)
except Exception as e:
logger.error(f"Failed to initialize live stream for target {target_id}: {e}")
if state.wled_client:
await state.wled_client.disconnect()
if state.led_client:
await state.led_client.close()
raise RuntimeError(f"Failed to initialize live stream: {e}")
# Initialize pixel mapper
@@ -632,23 +602,15 @@ class ProcessorManager:
pass
state.task = None
# Restore WLED state
if state.wled_state_before:
try:
async with httpx.AsyncClient(timeout=5) as http:
await http.post(
f"{state.device_url}/json/state",
json=state.wled_state_before,
)
logger.info(f"Restored WLED state: {state.wled_state_before}")
except Exception as e:
logger.warning(f"Could not restore WLED state: {e}")
state.wled_state_before = None
# Restore device state (type-specific, e.g. WLED restores on/lor/AudioReactive)
if state.led_client and state.device_state_before:
await state.led_client.restore_device_state(state.device_state_before)
state.device_state_before = None
# Close WLED connection
if state.wled_client:
await state.wled_client.close()
state.wled_client = None
# Close LED connection
if state.led_client:
await state.led_client.close()
state.led_client = None
# Release live stream
if state.live_stream:
@@ -667,8 +629,8 @@ class ProcessorManager:
target_fps = settings.fps
smoothing = settings.smoothing
border_width = settings.border_width
wled_brightness = settings.brightness
border_width = state.calibration.border_width
led_brightness = settings.brightness
logger.info(
f"Processing loop started for target {target_id} "
@@ -708,15 +670,15 @@ class ProcessorManager:
# Skip processing + send if the frame hasn't changed
if capture is prev_capture:
# Keepalive: resend last colors to prevent WLED exiting live mode
# Keepalive: resend last colors to prevent device exiting live mode
if state.previous_colors and (loop_start - last_send_time) >= standby_interval:
if not state.is_running or state.wled_client is None:
if not state.is_running or state.led_client is None:
break
brightness_value = int(wled_brightness * 255)
if state.wled_client.use_ddp:
state.wled_client.send_pixels_fast(state.previous_colors, brightness=brightness_value)
brightness_value = int(led_brightness * 255)
if state.led_client.supports_fast_send:
state.led_client.send_pixels_fast(state.previous_colors, brightness=brightness_value)
else:
await state.wled_client.send_pixels(state.previous_colors, brightness=brightness_value)
await state.led_client.send_pixels(state.previous_colors, brightness=brightness_value)
last_send_time = time.time()
send_timestamps.append(last_send_time)
state.metrics.frames_keepalive += 1
@@ -737,14 +699,14 @@ class ProcessorManager:
state.pixel_mapper, state.previous_colors, smoothing,
)
# Send to WLED with device brightness
if not state.is_running or state.wled_client is None:
# Send to LED device with brightness
if not state.is_running or state.led_client is None:
break
brightness_value = int(wled_brightness * 255)
if state.wled_client.use_ddp:
state.wled_client.send_pixels_fast(led_colors, brightness=brightness_value)
brightness_value = int(led_brightness * 255)
if state.led_client.supports_fast_send:
state.led_client.send_pixels_fast(led_colors, brightness=brightness_value)
else:
await state.wled_client.send_pixels(led_colors, brightness=brightness_value)
await state.led_client.send_pixels(led_colors, brightness=brightness_value)
last_send_time = time.time()
send_timestamps.append(last_send_time)
@@ -802,20 +764,20 @@ class ProcessorManager:
state = self._targets[target_id]
metrics = state.metrics
# Include WLED health info from the device
# Include device health info
health_info = {}
if state.device_id in self._devices:
h = self._devices[state.device_id].health
health_info = {
"wled_online": h.online,
"wled_latency_ms": h.latency_ms,
"wled_name": h.wled_name,
"wled_version": h.wled_version,
"wled_led_count": h.wled_led_count,
"wled_rgbw": h.wled_rgbw,
"wled_led_type": h.wled_led_type,
"wled_last_checked": h.last_checked,
"wled_error": h.error,
"device_online": h.online,
"device_latency_ms": h.latency_ms,
"device_name": h.device_name,
"device_version": h.device_version,
"device_led_count": h.device_led_count,
"device_rgbw": h.device_rgbw,
"device_led_type": h.device_led_type,
"device_last_checked": h.last_checked,
"device_error": h.error,
}
return {
@@ -913,38 +875,38 @@ class ProcessorManager:
break
try:
# Check if a target is running for this device (use its WLED client)
# Check if a target is running for this device (use its LED client)
active_client = None
for ts in self._targets.values():
if ts.device_id == device_id and ts.is_running and ts.wled_client:
active_client = ts.wled_client
if ts.device_id == device_id and ts.is_running and ts.led_client:
active_client = ts.led_client
break
if active_client:
await active_client.send_pixels(pixels)
else:
async with WLEDClient(ds.device_url, use_ddp=True) as wled:
await wled.send_pixels(pixels)
async with create_led_client(ds.device_type, ds.device_url, use_ddp=True) as client:
await client.send_pixels(pixels)
except Exception as e:
logger.error(f"Failed to send test pixels for {device_id}: {e}")
async def _send_clear_pixels(self, device_id: str) -> None:
"""Send all-black pixels to clear WLED output."""
"""Send all-black pixels to clear LED output."""
ds = self._devices[device_id]
pixels = [(0, 0, 0)] * ds.led_count
try:
active_client = None
for ts in self._targets.values():
if ts.device_id == device_id and ts.is_running and ts.wled_client:
active_client = ts.wled_client
if ts.device_id == device_id and ts.is_running and ts.led_client:
active_client = ts.led_client
break
if active_client:
await active_client.send_pixels(pixels)
else:
async with WLEDClient(ds.device_url, use_ddp=True) as wled:
await wled.send_pixels(pixels)
async with create_led_client(ds.device_type, ds.device_url, use_ddp=True) as client:
await client.send_pixels(pixels)
except Exception as e:
logger.error(f"Failed to clear pixels for {device_id}: {e}")
@@ -1050,58 +1012,14 @@ class ProcessorManager:
logger.error(f"Fatal error in health check loop for {device_id}: {e}")
async def _check_device_health(self, device_id: str):
"""Check device health via GET /json/info."""
"""Check device health via the LED client abstraction."""
state = self._devices.get(device_id)
if not state:
return
url = state.device_url.rstrip("/")
start = time.time()
try:
client = await self._get_http_client()
response = await client.get(f"{url}/json/info")
response.raise_for_status()
data = response.json()
latency = (time.time() - start) * 1000
leds_info = data.get("leds", {})
wled_led_count = leds_info.get("count")
# Fetch LED type from /json/cfg once (it's static config)
wled_led_type = state.health.wled_led_type
if wled_led_type is None:
try:
cfg = await client.get(f"{url}/json/cfg")
cfg.raise_for_status()
cfg_data = cfg.json()
ins = cfg_data.get("hw", {}).get("led", {}).get("ins", [])
if ins:
type_code = ins[0].get("type", 0)
wled_led_type = WLED_LED_TYPES.get(type_code, f"Type {type_code}")
except Exception as cfg_err:
logger.debug(f"Could not fetch LED type for {device_id}: {cfg_err}")
state.health = DeviceHealth(
online=True,
latency_ms=round(latency, 1),
last_checked=datetime.utcnow(),
wled_name=data.get("name"),
wled_version=data.get("ver"),
wled_led_count=wled_led_count,
wled_rgbw=leds_info.get("rgbw", False),
wled_led_type=wled_led_type,
error=None,
)
except Exception as e:
state.health = DeviceHealth(
online=False,
latency_ms=None,
last_checked=datetime.utcnow(),
wled_name=state.health.wled_name,
wled_version=state.health.wled_version,
wled_led_count=state.health.wled_led_count,
wled_rgbw=state.health.wled_rgbw,
wled_led_type=state.health.wled_led_type,
error=str(e),
)
client = await self._get_http_client()
state.health = await check_device_health(
state.device_type, state.device_url, client, state.health,
)
# ===== KEY COLORS TARGET MANAGEMENT =====

View File

@@ -1,7 +1,9 @@
"""WLED client for controlling LED devices via HTTP or DDP."""
import asyncio
import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Tuple, Optional, Dict, Any
from urllib.parse import urlparse
@@ -10,9 +12,23 @@ import numpy as np
from wled_controller.utils import get_logger
from wled_controller.core.ddp_client import BusConfig, DDPClient
from wled_controller.core.led_client import DeviceHealth, LEDClient
logger = get_logger(__name__)
# WLED LED bus type codes from const.h → human-readable names
WLED_LED_TYPES: Dict[int, str] = {
18: "WS2812 1ch", 19: "WS2812 1ch x3", 20: "WS2812 CCT", 21: "WS2812 WWA",
22: "WS2812B", 23: "GS8608", 24: "WS2811 400kHz", 25: "TM1829",
26: "UCS8903", 27: "APA106", 28: "FW1906", 29: "UCS8904",
30: "SK6812 RGBW", 31: "TM1814", 32: "WS2805", 33: "TM1914", 34: "SM16825",
40: "On/Off", 41: "PWM 1ch", 42: "PWM 2ch", 43: "PWM 3ch",
44: "PWM 4ch", 45: "PWM 5ch", 46: "PWM 6ch",
50: "WS2801", 51: "APA102", 52: "LPD8806", 53: "P9813", 54: "LPD6803",
65: "HUB75 HS", 66: "HUB75 QS",
80: "DDP RGB", 81: "E1.31", 82: "Art-Net", 88: "DDP RGBW", 89: "Art-Net RGBW",
}
@dataclass
class WLEDInfo:
@@ -30,7 +46,7 @@ class WLEDInfo:
buses: List[BusConfig] = field(default_factory=list) # Per-bus/GPIO config
class WLEDClient:
class WLEDClient(LEDClient):
"""Client for WLED devices supporting both HTTP and DDP protocols."""
# HTTP JSON API has ~10KB limit, ~500 LEDs max
@@ -67,15 +83,6 @@ class WLEDClient:
self._ddp_client: Optional[DDPClient] = None
self._connected = False
async def __aenter__(self):
"""Async context manager entry."""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()
async def connect(self) -> bool:
"""Establish connection to WLED device.
@@ -161,6 +168,11 @@ class WLEDClient:
"""Check if connected to WLED device."""
return self._connected and self._client is not None
@property
def supports_fast_send(self) -> bool:
"""True when DDP is active and ready for fire-and-forget sends."""
return self.use_ddp and self._ddp_client is not None
async def _request(
self,
method: str,
@@ -445,6 +457,96 @@ class WLEDClient:
self._ddp_client.send_pixels_numpy(pixel_array)
# ===== LEDClient abstraction methods =====
async def snapshot_device_state(self) -> Optional[dict]:
"""Snapshot WLED state before streaming (on, lor, AudioReactive)."""
if not self._client:
return None
try:
resp = await self._client.get(f"{self.url}/json/state")
resp.raise_for_status()
wled_state = resp.json()
state = {
"on": wled_state.get("on", True),
"lor": wled_state.get("lor", 0),
}
if "AudioReactive" in wled_state:
state["AudioReactive"] = wled_state["AudioReactive"]
logger.info(f"Saved WLED state before streaming: {state}")
return state
except Exception as e:
logger.warning(f"Could not snapshot WLED state: {e}")
return None
async def restore_device_state(self, state: Optional[dict]) -> None:
"""Restore WLED state after streaming."""
if not state:
return
try:
async with httpx.AsyncClient(timeout=5) as http:
await http.post(f"{self.url}/json/state", json=state)
logger.info(f"Restored WLED state: {state}")
except Exception as e:
logger.warning(f"Could not restore WLED state: {e}")
@classmethod
async def check_health(
cls,
url: str,
http_client,
prev_health: Optional[DeviceHealth] = None,
) -> DeviceHealth:
"""WLED health check via GET /json/info (+ /json/cfg for LED type)."""
url = url.rstrip("/")
start = time.time()
try:
response = await http_client.get(f"{url}/json/info")
response.raise_for_status()
data = response.json()
latency = (time.time() - start) * 1000
leds_info = data.get("leds", {})
# Fetch LED type from /json/cfg once (it's static config)
device_led_type = prev_health.device_led_type if prev_health else None
if device_led_type is None:
try:
cfg = await http_client.get(f"{url}/json/cfg")
cfg.raise_for_status()
cfg_data = cfg.json()
ins = cfg_data.get("hw", {}).get("led", {}).get("ins", [])
if ins:
type_code = ins[0].get("type", 0)
device_led_type = WLED_LED_TYPES.get(type_code, f"Type {type_code}")
except Exception as cfg_err:
logger.debug(f"Could not fetch LED type: {cfg_err}")
return DeviceHealth(
online=True,
latency_ms=round(latency, 1),
last_checked=datetime.utcnow(),
device_name=data.get("name"),
device_version=data.get("ver"),
device_led_count=leds_info.get("count"),
device_rgbw=leds_info.get("rgbw", False),
device_led_type=device_led_type,
error=None,
)
except Exception as e:
return DeviceHealth(
online=False,
latency_ms=None,
last_checked=datetime.utcnow(),
device_name=prev_health.device_name if prev_health else None,
device_version=prev_health.device_version if prev_health else None,
device_led_count=prev_health.device_led_count if prev_health else None,
device_rgbw=prev_health.device_rgbw if prev_health else None,
device_led_type=prev_health.device_led_type if prev_health else None,
error=str(e),
)
# ===== WLED-specific methods =====
async def set_power(self, on: bool) -> bool:
"""Turn WLED device on or off.