From 3bac9c4ed9ef4e2e99afc506708e9555675151fb Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Wed, 18 Feb 2026 15:12:55 +0300 Subject: [PATCH] Add AmbiLED device backend (client + provider) AmbiLED protocol: raw RGB bytes (clamped 0-250) + 0xFF show command. Subclasses Adalight infrastructure, shares serial transport and discovery. Registered as built-in provider. Co-Authored-By: Claude Opus 4.6 --- .../core/devices/ambiled_client.py | 49 ++++++++ .../core/devices/ambiled_provider.py | 111 ++++++++++++++++++ .../core/devices/led_client.py | 3 + 3 files changed, 163 insertions(+) create mode 100644 server/src/wled_controller/core/devices/ambiled_client.py create mode 100644 server/src/wled_controller/core/devices/ambiled_provider.py diff --git a/server/src/wled_controller/core/devices/ambiled_client.py b/server/src/wled_controller/core/devices/ambiled_client.py new file mode 100644 index 0000000..343dd10 --- /dev/null +++ b/server/src/wled_controller/core/devices/ambiled_client.py @@ -0,0 +1,49 @@ +"""AmbiLED serial LED client — sends pixel data using the AmbiLED protocol. + +Protocol: raw RGB bytes (values clamped to 0–250) followed by 0xFF show command. +No header or checksum — simpler than Adalight. +Reference: https://github.com/flytron/ambiled-hd +""" + +import numpy as np + +from wled_controller.core.devices.adalight_client import AdalightClient +from wled_controller.utils import get_logger + +logger = get_logger(__name__) + +# AmbiLED command byte — triggers display update +AMBILED_SHOW_CMD = b"\xff" + + +class AmbiLEDClient(AdalightClient): + """LED client for AmbiLED serial devices.""" + + def __init__(self, url: str, led_count: int = 0, baud_rate=None, **kwargs): + super().__init__(url, led_count=led_count, baud_rate=baud_rate, **kwargs) + # AmbiLED has no header — clear the Adalight header + self._header = b"" + + async def connect(self) -> bool: + result = await super().connect() + if result: + logger.info( + f"AmbiLED connected: {self._port} @ {self._baud_rate} baud " + f"({self._led_count} LEDs)" + ) + return result + + def _build_frame(self, pixels, brightness: int) -> bytes: + """Build an AmbiLED frame: brightness-scaled RGB data + 0xFF show command.""" + if isinstance(pixels, np.ndarray): + arr = pixels.astype(np.uint16) + else: + arr = np.array(pixels, dtype=np.uint16) + + if brightness < 255: + arr = arr * brightness // 255 + + # Clamp to 0–250: values >250 are command bytes in AmbiLED protocol + np.clip(arr, 0, 250, out=arr) + rgb_bytes = arr.astype(np.uint8).tobytes() + return rgb_bytes + AMBILED_SHOW_CMD diff --git a/server/src/wled_controller/core/devices/ambiled_provider.py b/server/src/wled_controller/core/devices/ambiled_provider.py new file mode 100644 index 0000000..0e21bd9 --- /dev/null +++ b/server/src/wled_controller/core/devices/ambiled_provider.py @@ -0,0 +1,111 @@ +"""AmbiLED device provider — serial LED controller using AmbiLED protocol.""" + +from typing import List, Tuple + +import numpy as np + +from wled_controller.core.devices.adalight_provider import AdalightDeviceProvider +from wled_controller.core.devices.led_client import DiscoveredDevice, LEDClient +from wled_controller.utils import get_logger + +logger = get_logger(__name__) + + +class AmbiLEDDeviceProvider(AdalightDeviceProvider): + """Provider for AmbiLED serial LED controllers.""" + + @property + def device_type(self) -> str: + return "ambiled" + + def create_client(self, url: str, **kwargs) -> LEDClient: + from wled_controller.core.devices.ambiled_client import AmbiLEDClient + + led_count = kwargs.pop("led_count", 0) + baud_rate = kwargs.pop("baud_rate", None) + kwargs.pop("use_ddp", None) + return AmbiLEDClient(url, led_count=led_count, baud_rate=baud_rate, **kwargs) + + async def validate_device(self, url: str) -> dict: + from wled_controller.core.devices.adalight_client import parse_adalight_url + + port, _baud = parse_adalight_url(url) + + try: + import serial.tools.list_ports + + available_ports = [p.device for p in serial.tools.list_ports.comports()] + port_upper = port.upper() + if not any(p.upper() == port_upper for p in available_ports): + raise ValueError( + f"Serial port {port} not found. " + f"Available ports: {', '.join(available_ports) or 'none'}" + ) + except ValueError: + raise + except Exception as e: + raise ValueError(f"Failed to enumerate serial ports: {e}") + + logger.info(f"AmbiLED device validated: port {port}") + return {} + + async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]: + try: + import serial.tools.list_ports + + ports = serial.tools.list_ports.comports() + results = [] + for port_info in ports: + results.append( + DiscoveredDevice( + name=port_info.description or port_info.device, + url=port_info.device, + device_type="ambiled", + ip=port_info.device, + mac="", + led_count=None, + version=None, + ) + ) + logger.info(f"AmbiLED serial port scan found {len(results)} port(s)") + return results + except Exception as e: + logger.error(f"AmbiLED serial port discovery failed: {e}") + return [] + + async def set_power(self, url: str, on: bool, **kwargs) -> None: + if on: + return + + led_count = kwargs.get("led_count", 0) + baud_rate = kwargs.get("baud_rate") + if led_count <= 0: + raise ValueError("led_count is required to send black frame to AmbiLED device") + + from wled_controller.core.devices.ambiled_client import AmbiLEDClient + + client = AmbiLEDClient(url, led_count=led_count, baud_rate=baud_rate) + try: + await client.connect() + black = np.zeros((led_count, 3), dtype=np.uint8) + await client.send_pixels(black, brightness=255) + logger.info(f"AmbiLED power off: sent black frame to {url}") + finally: + await client.close() + + async def set_color(self, url: str, color: Tuple[int, int, int], **kwargs) -> None: + led_count = kwargs.get("led_count", 0) + baud_rate = kwargs.get("baud_rate") + if led_count <= 0: + raise ValueError("led_count is required to send color frame to AmbiLED device") + + from wled_controller.core.devices.ambiled_client import AmbiLEDClient + + client = AmbiLEDClient(url, led_count=led_count, baud_rate=baud_rate) + try: + await client.connect() + frame = np.full((led_count, 3), color, dtype=np.uint8) + await client.send_pixels(frame, brightness=255) + logger.info(f"AmbiLED set_color: sent solid {color} to {url}") + finally: + await client.close() diff --git a/server/src/wled_controller/core/devices/led_client.py b/server/src/wled_controller/core/devices/led_client.py index 33695fa..8267cf9 100644 --- a/server/src/wled_controller/core/devices/led_client.py +++ b/server/src/wled_controller/core/devices/led_client.py @@ -276,5 +276,8 @@ def _register_builtin_providers(): from wled_controller.core.devices.adalight_provider import AdalightDeviceProvider register_provider(AdalightDeviceProvider()) + from wled_controller.core.devices.ambiled_provider import AmbiLEDDeviceProvider + register_provider(AmbiLEDDeviceProvider()) + _register_builtin_providers()