Add Adalight serial LED device support with per-type discovery and capability-based UI

Implements the second device provider (Adalight) for Arduino-based serial LED controllers,
validating the LEDDeviceProvider abstraction. Adds serial port auto-discovery, per-type
discovery caching with lazy-load, capability-driven UI (brightness control, manual LED count,
standby), and serial port combobox in both Add Device and General Settings modals.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-16 15:55:42 +03:00
parent 242718a9a9
commit 1612c04c90
13 changed files with 678 additions and 76 deletions

View File

@@ -0,0 +1,204 @@
"""Adalight serial LED client — sends pixel data over serial using the Adalight protocol."""
import time
from datetime import datetime
from typing import List, Optional, Tuple
import numpy as np
from wled_controller.core.led_client import DeviceHealth, LEDClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
DEFAULT_BAUD_RATE = 115200
ARDUINO_RESET_DELAY = 2.0 # seconds to wait after opening serial for Arduino bootloader
def parse_adalight_url(url: str) -> Tuple[str, int]:
"""Parse an Adalight URL into (port, baud_rate).
Formats:
"COM3" -> ("COM3", 115200)
"COM3:230400" -> ("COM3", 230400)
"/dev/ttyUSB0" -> ("/dev/ttyUSB0", 115200)
"""
url = url.strip()
if ":" in url and not url.startswith("/"):
# Windows COM port with baud: "COM3:230400"
parts = url.rsplit(":", 1)
try:
baud = int(parts[1])
return parts[0], baud
except ValueError:
pass
elif ":" in url and url.startswith("/"):
# Unix path with baud: "/dev/ttyUSB0:230400"
parts = url.rsplit(":", 1)
try:
baud = int(parts[1])
return parts[0], baud
except ValueError:
pass
return url, DEFAULT_BAUD_RATE
def _build_adalight_header(led_count: int) -> bytes:
"""Build the 6-byte Adalight protocol header.
Format: 'A' 'd' 'a' <count_hi> <count_lo> <checksum>
where count = led_count - 1 (zero-indexed).
"""
count = led_count - 1
hi = (count >> 8) & 0xFF
lo = count & 0xFF
checksum = hi ^ lo ^ 0x55
return bytes([ord("A"), ord("d"), ord("a"), hi, lo, checksum])
class AdalightClient(LEDClient):
"""LED client for Arduino Adalight serial devices."""
def __init__(self, url: str, led_count: int = 0, **kwargs):
"""Initialize Adalight client.
Args:
url: Serial port string, e.g. "COM3" or "COM3:230400"
led_count: Number of LEDs on the strip (required for Adalight header)
"""
self._port, self._baud_rate = parse_adalight_url(url)
self._led_count = led_count
self._serial = None
self._connected = False
# Pre-compute Adalight header if led_count is known
self._header = _build_adalight_header(led_count) if led_count > 0 else b""
# Pre-allocate numpy buffer for brightness scaling
self._pixel_buf = None
async def connect(self) -> bool:
"""Open serial port and wait for Arduino reset."""
import serial
try:
self._serial = serial.Serial(
port=self._port,
baudrate=self._baud_rate,
timeout=1,
)
# Wait for Arduino to finish bootloader reset
time.sleep(ARDUINO_RESET_DELAY)
self._connected = True
logger.info(
f"Adalight connected: {self._port} @ {self._baud_rate} baud "
f"({self._led_count} LEDs)"
)
return True
except Exception as e:
logger.error(f"Failed to open serial port {self._port}: {e}")
raise RuntimeError(f"Failed to open serial port {self._port}: {e}")
async def close(self) -> None:
"""Close the serial port."""
self._connected = False
if self._serial and self._serial.is_open:
try:
self._serial.close()
except Exception as e:
logger.warning(f"Error closing serial port: {e}")
self._serial = None
logger.info(f"Adalight disconnected: {self._port}")
@property
def is_connected(self) -> bool:
return self._connected and self._serial is not None and self._serial.is_open
async def send_pixels(
self,
pixels: List[Tuple[int, int, int]],
brightness: int = 255,
) -> bool:
"""Send pixel data over serial using Adalight protocol."""
if not self.is_connected:
return False
try:
frame = self._build_frame(pixels, brightness)
self._serial.write(frame)
return True
except Exception as e:
logger.error(f"Adalight send_pixels error: {e}")
return False
@property
def supports_fast_send(self) -> bool:
return True
def send_pixels_fast(
self,
pixels: List[Tuple[int, int, int]],
brightness: int = 255,
) -> None:
"""Synchronous fire-and-forget serial send."""
if not self.is_connected:
return
try:
frame = self._build_frame(pixels, brightness)
self._serial.write(frame)
except Exception as e:
logger.error(f"Adalight send_pixels_fast error: {e}")
def _build_frame(self, pixels: List[Tuple[int, int, int]], brightness: int) -> bytes:
"""Build a complete Adalight frame: header + brightness-scaled RGB data."""
arr = np.array(pixels, dtype=np.uint16)
if brightness < 255:
arr = arr * brightness // 255
np.clip(arr, 0, 255, out=arr)
rgb_bytes = arr.astype(np.uint8).tobytes()
return self._header + rgb_bytes
@classmethod
async def check_health(
cls,
url: str,
http_client,
prev_health: Optional[DeviceHealth] = None,
) -> DeviceHealth:
"""Check if the serial port exists without opening it.
Enumerates COM ports to avoid exclusive-access conflicts on Windows.
"""
port, _baud = parse_adalight_url(url)
try:
import serial.tools.list_ports
available_ports = [p.device for p in serial.tools.list_ports.comports()]
port_upper = port.upper()
found = any(p.upper() == port_upper for p in available_ports)
if found:
return DeviceHealth(
online=True,
latency_ms=0.0,
last_checked=datetime.utcnow(),
device_name=prev_health.device_name if prev_health else None,
device_version=None,
device_led_count=prev_health.device_led_count if prev_health else None,
)
else:
return DeviceHealth(
online=False,
last_checked=datetime.utcnow(),
error=f"Serial port {port} not found",
)
except Exception as e:
return DeviceHealth(
online=False,
last_checked=datetime.utcnow(),
error=str(e),
)

View File

@@ -0,0 +1,93 @@
"""Adalight device provider — serial LED controller support."""
from typing import List
from wled_controller.core.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class AdalightDeviceProvider(LEDDeviceProvider):
"""Provider for Adalight serial LED controllers."""
@property
def device_type(self) -> str:
return "adalight"
@property
def capabilities(self) -> set:
# No hardware brightness control, no standby required
# manual_led_count: user must specify LED count (can't auto-detect)
return {"manual_led_count"}
def create_client(self, url: str, **kwargs) -> LEDClient:
from wled_controller.core.adalight_client import AdalightClient
led_count = kwargs.pop("led_count", 0)
kwargs.pop("use_ddp", None) # Not applicable for serial
return AdalightClient(url, led_count=led_count, **kwargs)
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
from wled_controller.core.adalight_client import AdalightClient
return await AdalightClient.check_health(url, http_client, prev_health)
async def validate_device(self, url: str) -> dict:
"""Validate that the serial port exists.
Returns:
Empty dict — Adalight devices don't report LED count,
so it must be provided by the user.
"""
from wled_controller.core.adalight_client import parse_adalight_url
port, _baud = parse_adalight_url(url)
try:
import serial.tools.list_ports
available_ports = [p.device for p in serial.tools.list_ports.comports()]
port_upper = port.upper()
if not any(p.upper() == port_upper for p in available_ports):
raise ValueError(
f"Serial port {port} not found. "
f"Available ports: {', '.join(available_ports) or 'none'}"
)
except ValueError:
raise
except Exception as e:
raise ValueError(f"Failed to enumerate serial ports: {e}")
logger.info(f"Adalight device validated: port {port}")
return {}
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
"""Discover serial ports that could be Adalight devices."""
try:
import serial.tools.list_ports
ports = serial.tools.list_ports.comports()
results = []
for port_info in ports:
results.append(
DiscoveredDevice(
name=port_info.description or port_info.device,
url=port_info.device,
device_type="adalight",
ip=port_info.device,
mac="",
led_count=None,
version=None,
)
)
logger.info(f"Serial port scan found {len(results)} port(s)")
return results
except Exception as e:
logger.error(f"Serial port discovery failed: {e}")
return []

View File

@@ -258,5 +258,8 @@ def _register_builtin_providers():
from wled_controller.core.wled_provider import WLEDDeviceProvider
register_provider(WLEDDeviceProvider())
from wled_controller.core.adalight_provider import AdalightDeviceProvider
register_provider(AdalightDeviceProvider())
_register_builtin_providers()

View File

@@ -534,7 +534,7 @@ class ProcessorManager:
# Connect to LED device via factory
try:
state.led_client = create_led_client(device_type, state.device_url, use_ddp=True)
state.led_client = create_led_client(device_type, state.device_url, use_ddp=True, led_count=state.led_count)
await state.led_client.connect()
logger.info(f"Target {target_id} connected to {device_type} device ({state.led_count} LEDs)")
@@ -885,7 +885,7 @@ class ProcessorManager:
if active_client:
await active_client.send_pixels(pixels)
else:
async with create_led_client(ds.device_type, ds.device_url, use_ddp=True) as client:
async with create_led_client(ds.device_type, ds.device_url, use_ddp=True, led_count=ds.led_count) as client:
await client.send_pixels(pixels)
except Exception as e:
logger.error(f"Failed to send test pixels for {device_id}: {e}")
@@ -905,7 +905,7 @@ class ProcessorManager:
if active_client:
await active_client.send_pixels(pixels)
else:
async with create_led_client(ds.device_type, ds.device_url, use_ddp=True) as client:
async with create_led_client(ds.device_type, ds.device_url, use_ddp=True, led_count=ds.led_count) as client:
await client.send_pixels(pixels)
except Exception as e:
logger.error(f"Failed to clear pixels for {device_id}: {e}")

View File

@@ -34,6 +34,7 @@ class WLEDDeviceProvider(LEDDeviceProvider):
def create_client(self, url: str, **kwargs) -> LEDClient:
from wled_controller.core.wled_client import WLEDClient
kwargs.pop("led_count", None)
return WLEDClient(url, **kwargs)
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth: