Add AmbiLED device backend (client + provider)

AmbiLED protocol: raw RGB bytes (clamped 0-250) + 0xFF show command.
Subclasses Adalight infrastructure, shares serial transport and
discovery. Registered as built-in provider.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-18 15:12:55 +03:00
parent aa105f3958
commit 3bac9c4ed9
3 changed files with 163 additions and 0 deletions

View File

@@ -0,0 +1,49 @@
"""AmbiLED serial LED client — sends pixel data using the AmbiLED protocol.
Protocol: raw RGB bytes (values clamped to 0250) followed by 0xFF show command.
No header or checksum — simpler than Adalight.
Reference: https://github.com/flytron/ambiled-hd
"""
import numpy as np
from wled_controller.core.devices.adalight_client import AdalightClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
# AmbiLED command byte — triggers display update
AMBILED_SHOW_CMD = b"\xff"
class AmbiLEDClient(AdalightClient):
"""LED client for AmbiLED serial devices."""
def __init__(self, url: str, led_count: int = 0, baud_rate=None, **kwargs):
super().__init__(url, led_count=led_count, baud_rate=baud_rate, **kwargs)
# AmbiLED has no header — clear the Adalight header
self._header = b""
async def connect(self) -> bool:
result = await super().connect()
if result:
logger.info(
f"AmbiLED connected: {self._port} @ {self._baud_rate} baud "
f"({self._led_count} LEDs)"
)
return result
def _build_frame(self, pixels, brightness: int) -> bytes:
"""Build an AmbiLED frame: brightness-scaled RGB data + 0xFF show command."""
if isinstance(pixels, np.ndarray):
arr = pixels.astype(np.uint16)
else:
arr = np.array(pixels, dtype=np.uint16)
if brightness < 255:
arr = arr * brightness // 255
# Clamp to 0250: values >250 are command bytes in AmbiLED protocol
np.clip(arr, 0, 250, out=arr)
rgb_bytes = arr.astype(np.uint8).tobytes()
return rgb_bytes + AMBILED_SHOW_CMD

View File

@@ -0,0 +1,111 @@
"""AmbiLED device provider — serial LED controller using AmbiLED protocol."""
from typing import List, Tuple
import numpy as np
from wled_controller.core.devices.adalight_provider import AdalightDeviceProvider
from wled_controller.core.devices.led_client import DiscoveredDevice, LEDClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class AmbiLEDDeviceProvider(AdalightDeviceProvider):
"""Provider for AmbiLED serial LED controllers."""
@property
def device_type(self) -> str:
return "ambiled"
def create_client(self, url: str, **kwargs) -> LEDClient:
from wled_controller.core.devices.ambiled_client import AmbiLEDClient
led_count = kwargs.pop("led_count", 0)
baud_rate = kwargs.pop("baud_rate", None)
kwargs.pop("use_ddp", None)
return AmbiLEDClient(url, led_count=led_count, baud_rate=baud_rate, **kwargs)
async def validate_device(self, url: str) -> dict:
from wled_controller.core.devices.adalight_client import parse_adalight_url
port, _baud = parse_adalight_url(url)
try:
import serial.tools.list_ports
available_ports = [p.device for p in serial.tools.list_ports.comports()]
port_upper = port.upper()
if not any(p.upper() == port_upper for p in available_ports):
raise ValueError(
f"Serial port {port} not found. "
f"Available ports: {', '.join(available_ports) or 'none'}"
)
except ValueError:
raise
except Exception as e:
raise ValueError(f"Failed to enumerate serial ports: {e}")
logger.info(f"AmbiLED device validated: port {port}")
return {}
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
try:
import serial.tools.list_ports
ports = serial.tools.list_ports.comports()
results = []
for port_info in ports:
results.append(
DiscoveredDevice(
name=port_info.description or port_info.device,
url=port_info.device,
device_type="ambiled",
ip=port_info.device,
mac="",
led_count=None,
version=None,
)
)
logger.info(f"AmbiLED serial port scan found {len(results)} port(s)")
return results
except Exception as e:
logger.error(f"AmbiLED serial port discovery failed: {e}")
return []
async def set_power(self, url: str, on: bool, **kwargs) -> None:
if on:
return
led_count = kwargs.get("led_count", 0)
baud_rate = kwargs.get("baud_rate")
if led_count <= 0:
raise ValueError("led_count is required to send black frame to AmbiLED device")
from wled_controller.core.devices.ambiled_client import AmbiLEDClient
client = AmbiLEDClient(url, led_count=led_count, baud_rate=baud_rate)
try:
await client.connect()
black = np.zeros((led_count, 3), dtype=np.uint8)
await client.send_pixels(black, brightness=255)
logger.info(f"AmbiLED power off: sent black frame to {url}")
finally:
await client.close()
async def set_color(self, url: str, color: Tuple[int, int, int], **kwargs) -> None:
led_count = kwargs.get("led_count", 0)
baud_rate = kwargs.get("baud_rate")
if led_count <= 0:
raise ValueError("led_count is required to send color frame to AmbiLED device")
from wled_controller.core.devices.ambiled_client import AmbiLEDClient
client = AmbiLEDClient(url, led_count=led_count, baud_rate=baud_rate)
try:
await client.connect()
frame = np.full((led_count, 3), color, dtype=np.uint8)
await client.send_pixels(frame, brightness=255)
logger.info(f"AmbiLED set_color: sent solid {color} to {url}")
finally:
await client.close()

View File

@@ -276,5 +276,8 @@ def _register_builtin_providers():
from wled_controller.core.devices.adalight_provider import AdalightDeviceProvider
register_provider(AdalightDeviceProvider())
from wled_controller.core.devices.ambiled_provider import AmbiLEDDeviceProvider
register_provider(AmbiLEDDeviceProvider())
_register_builtin_providers()