From f83cd81937a2704a61ceff2489e39271cd88e648 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Thu, 19 Feb 2026 03:04:27 +0300 Subject: [PATCH] Extract SerialDeviceProvider base class and power off serial devices on shutdown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Create SerialDeviceProvider as the common base for Adalight and AmbiLED providers, replacing the misleading Adalight→AmbiLED inheritance chain. Subclasses now only override device_type and create_client(). Also send explicit black frames to all serial LED devices during server shutdown. Co-Authored-By: Claude Opus 4.6 --- .../wled_controller/core/devices/__init__.py | 2 + .../core/devices/adalight_provider.py | 134 +---------------- .../core/devices/ambiled_provider.py | 97 +------------ .../core/devices/serial_provider.py | 136 ++++++++++++++++++ .../core/processing/processor_manager.py | 8 ++ 5 files changed, 153 insertions(+), 224 deletions(-) create mode 100644 server/src/wled_controller/core/devices/serial_provider.py diff --git a/server/src/wled_controller/core/devices/__init__.py b/server/src/wled_controller/core/devices/__init__.py index 333abd8..4b2c005 100644 --- a/server/src/wled_controller/core/devices/__init__.py +++ b/server/src/wled_controller/core/devices/__init__.py @@ -11,12 +11,14 @@ from wled_controller.core.devices.led_client import ( get_device_capabilities, get_provider, ) +from wled_controller.core.devices.serial_provider import SerialDeviceProvider __all__ = [ "DeviceHealth", "DiscoveredDevice", "LEDClient", "LEDDeviceProvider", + "SerialDeviceProvider", "check_device_health", "create_led_client", "get_all_providers", diff --git a/server/src/wled_controller/core/devices/adalight_provider.py b/server/src/wled_controller/core/devices/adalight_provider.py index 9964515..dbc831b 100644 --- a/server/src/wled_controller/core/devices/adalight_provider.py +++ b/server/src/wled_controller/core/devices/adalight_provider.py @@ -1,34 +1,16 @@ -"""Adalight device provider — serial LED controller support.""" +"""Adalight device provider — serial LED controller using Adalight protocol.""" -from typing import List, Tuple - -import numpy as np - -from wled_controller.core.devices.led_client import ( - DeviceHealth, - DiscoveredDevice, - LEDClient, - LEDDeviceProvider, -) -from wled_controller.utils import get_logger - -logger = get_logger(__name__) +from wled_controller.core.devices.led_client import LEDClient +from wled_controller.core.devices.serial_provider import SerialDeviceProvider -class AdalightDeviceProvider(LEDDeviceProvider): +class AdalightDeviceProvider(SerialDeviceProvider): """Provider for Adalight serial LED controllers.""" @property def device_type(self) -> str: return "adalight" - @property - def capabilities(self) -> set: - # manual_led_count: user must specify LED count (can't auto-detect) - # power_control: can blank LEDs by sending all-black pixels - # brightness_control: software brightness (multiplies pixel values before sending) - return {"manual_led_count", "power_control", "brightness_control", "static_color"} - def create_client(self, url: str, **kwargs) -> LEDClient: from wled_controller.core.devices.adalight_client import AdalightClient @@ -36,111 +18,3 @@ class AdalightDeviceProvider(LEDDeviceProvider): baud_rate = kwargs.pop("baud_rate", None) kwargs.pop("use_ddp", None) # Not applicable for serial return AdalightClient(url, led_count=led_count, baud_rate=baud_rate, **kwargs) - - async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth: - from wled_controller.core.devices.adalight_client import AdalightClient - - return await AdalightClient.check_health(url, http_client, prev_health) - - async def validate_device(self, url: str) -> dict: - """Validate that the serial port exists. - - Returns: - Empty dict — Adalight devices don't report LED count, - so it must be provided by the user. - """ - from wled_controller.core.devices.adalight_client import parse_adalight_url - - port, _baud = parse_adalight_url(url) - - try: - import serial.tools.list_ports - - available_ports = [p.device for p in serial.tools.list_ports.comports()] - port_upper = port.upper() - if not any(p.upper() == port_upper for p in available_ports): - raise ValueError( - f"Serial port {port} not found. " - f"Available ports: {', '.join(available_ports) or 'none'}" - ) - except ValueError: - raise - except Exception as e: - raise ValueError(f"Failed to enumerate serial ports: {e}") - - logger.info(f"Adalight device validated: port {port}") - return {} - - async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]: - """Discover serial ports that could be Adalight devices.""" - try: - import serial.tools.list_ports - - ports = serial.tools.list_ports.comports() - results = [] - for port_info in ports: - results.append( - DiscoveredDevice( - name=port_info.description or port_info.device, - url=port_info.device, - device_type="adalight", - ip=port_info.device, - mac="", - led_count=None, - version=None, - ) - ) - logger.info(f"Serial port scan found {len(results)} port(s)") - return results - except Exception as e: - logger.error(f"Serial port discovery failed: {e}") - return [] - - async def get_power(self, url: str, **kwargs) -> bool: - # Adalight has no hardware power query; assume on - return True - - async def set_power(self, url: str, on: bool, **kwargs) -> None: - """Turn Adalight device on/off by sending an all-black frame (off) or no-op (on). - - Requires kwargs: led_count (int), baud_rate (int | None). - """ - if on: - return # "on" is a no-op — next processing frame lights LEDs up - - led_count = kwargs.get("led_count", 0) - baud_rate = kwargs.get("baud_rate") - if led_count <= 0: - raise ValueError("led_count is required to send black frame to Adalight device") - - from wled_controller.core.devices.adalight_client import AdalightClient - - client = AdalightClient(url, led_count=led_count, baud_rate=baud_rate) - try: - await client.connect() - black = np.zeros((led_count, 3), dtype=np.uint8) - await client.send_pixels(black, brightness=255) - logger.info(f"Adalight power off: sent black frame to {url}") - finally: - await client.close() - - async def set_color(self, url: str, color: Tuple[int, int, int], **kwargs) -> None: - """Send a solid color frame to the Adalight device. - - Requires kwargs: led_count (int), baud_rate (int | None). - """ - led_count = kwargs.get("led_count", 0) - baud_rate = kwargs.get("baud_rate") - if led_count <= 0: - raise ValueError("led_count is required to send color frame to Adalight device") - - from wled_controller.core.devices.adalight_client import AdalightClient - - client = AdalightClient(url, led_count=led_count, baud_rate=baud_rate) - try: - await client.connect() - frame = np.full((led_count, 3), color, dtype=np.uint8) - await client.send_pixels(frame, brightness=255) - logger.info(f"Adalight set_color: sent solid {color} to {url}") - finally: - await client.close() diff --git a/server/src/wled_controller/core/devices/ambiled_provider.py b/server/src/wled_controller/core/devices/ambiled_provider.py index 0e21bd9..7ac6b3d 100644 --- a/server/src/wled_controller/core/devices/ambiled_provider.py +++ b/server/src/wled_controller/core/devices/ambiled_provider.py @@ -1,17 +1,10 @@ """AmbiLED device provider — serial LED controller using AmbiLED protocol.""" -from typing import List, Tuple - -import numpy as np - -from wled_controller.core.devices.adalight_provider import AdalightDeviceProvider -from wled_controller.core.devices.led_client import DiscoveredDevice, LEDClient -from wled_controller.utils import get_logger - -logger = get_logger(__name__) +from wled_controller.core.devices.led_client import LEDClient +from wled_controller.core.devices.serial_provider import SerialDeviceProvider -class AmbiLEDDeviceProvider(AdalightDeviceProvider): +class AmbiLEDDeviceProvider(SerialDeviceProvider): """Provider for AmbiLED serial LED controllers.""" @property @@ -25,87 +18,3 @@ class AmbiLEDDeviceProvider(AdalightDeviceProvider): baud_rate = kwargs.pop("baud_rate", None) kwargs.pop("use_ddp", None) return AmbiLEDClient(url, led_count=led_count, baud_rate=baud_rate, **kwargs) - - async def validate_device(self, url: str) -> dict: - from wled_controller.core.devices.adalight_client import parse_adalight_url - - port, _baud = parse_adalight_url(url) - - try: - import serial.tools.list_ports - - available_ports = [p.device for p in serial.tools.list_ports.comports()] - port_upper = port.upper() - if not any(p.upper() == port_upper for p in available_ports): - raise ValueError( - f"Serial port {port} not found. " - f"Available ports: {', '.join(available_ports) or 'none'}" - ) - except ValueError: - raise - except Exception as e: - raise ValueError(f"Failed to enumerate serial ports: {e}") - - logger.info(f"AmbiLED device validated: port {port}") - return {} - - async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]: - try: - import serial.tools.list_ports - - ports = serial.tools.list_ports.comports() - results = [] - for port_info in ports: - results.append( - DiscoveredDevice( - name=port_info.description or port_info.device, - url=port_info.device, - device_type="ambiled", - ip=port_info.device, - mac="", - led_count=None, - version=None, - ) - ) - logger.info(f"AmbiLED serial port scan found {len(results)} port(s)") - return results - except Exception as e: - logger.error(f"AmbiLED serial port discovery failed: {e}") - return [] - - async def set_power(self, url: str, on: bool, **kwargs) -> None: - if on: - return - - led_count = kwargs.get("led_count", 0) - baud_rate = kwargs.get("baud_rate") - if led_count <= 0: - raise ValueError("led_count is required to send black frame to AmbiLED device") - - from wled_controller.core.devices.ambiled_client import AmbiLEDClient - - client = AmbiLEDClient(url, led_count=led_count, baud_rate=baud_rate) - try: - await client.connect() - black = np.zeros((led_count, 3), dtype=np.uint8) - await client.send_pixels(black, brightness=255) - logger.info(f"AmbiLED power off: sent black frame to {url}") - finally: - await client.close() - - async def set_color(self, url: str, color: Tuple[int, int, int], **kwargs) -> None: - led_count = kwargs.get("led_count", 0) - baud_rate = kwargs.get("baud_rate") - if led_count <= 0: - raise ValueError("led_count is required to send color frame to AmbiLED device") - - from wled_controller.core.devices.ambiled_client import AmbiLEDClient - - client = AmbiLEDClient(url, led_count=led_count, baud_rate=baud_rate) - try: - await client.connect() - frame = np.full((led_count, 3), color, dtype=np.uint8) - await client.send_pixels(frame, brightness=255) - logger.info(f"AmbiLED set_color: sent solid {color} to {url}") - finally: - await client.close() diff --git a/server/src/wled_controller/core/devices/serial_provider.py b/server/src/wled_controller/core/devices/serial_provider.py new file mode 100644 index 0000000..b473fb0 --- /dev/null +++ b/server/src/wled_controller/core/devices/serial_provider.py @@ -0,0 +1,136 @@ +"""Base provider for serial LED controllers (Adalight, AmbiLED, etc.). + +Subclasses only need to override ``device_type`` and ``create_client()``. +All common serial-device logic (COM port validation, discovery, health +checks, power control via black frames, static colour) lives here. +""" + +from typing import List, Tuple + +import numpy as np + +from wled_controller.core.devices.led_client import ( + DeviceHealth, + DiscoveredDevice, + LEDClient, + LEDDeviceProvider, +) +from wled_controller.utils import get_logger + +logger = get_logger(__name__) + + +class SerialDeviceProvider(LEDDeviceProvider): + """Base provider for serial LED controllers.""" + + @property + def capabilities(self) -> set: + # manual_led_count: user must specify LED count (can't auto-detect) + # power_control: can blank LEDs by sending all-black pixels + # brightness_control: software brightness (multiplies pixel values before sending) + # static_color: can send a solid colour frame + return {"manual_led_count", "power_control", "brightness_control", "static_color"} + + async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth: + # Generic serial port health check — enumerate COM ports + from wled_controller.core.devices.adalight_client import AdalightClient + + return await AdalightClient.check_health(url, http_client, prev_health) + + async def validate_device(self, url: str) -> dict: + """Validate that the serial port exists. + + Returns empty dict — serial devices don't report LED count, + so it must be provided by the user. + """ + from wled_controller.core.devices.adalight_client import parse_adalight_url + + port, _baud = parse_adalight_url(url) + + try: + import serial.tools.list_ports + + available_ports = [p.device for p in serial.tools.list_ports.comports()] + port_upper = port.upper() + if not any(p.upper() == port_upper for p in available_ports): + raise ValueError( + f"Serial port {port} not found. " + f"Available ports: {', '.join(available_ports) or 'none'}" + ) + except ValueError: + raise + except Exception as e: + raise ValueError(f"Failed to enumerate serial ports: {e}") + + logger.info(f"{self.device_type} device validated: port {port}") + return {} + + async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]: + """Discover serial ports that could be LED devices.""" + try: + import serial.tools.list_ports + + ports = serial.tools.list_ports.comports() + results = [] + for port_info in ports: + results.append( + DiscoveredDevice( + name=port_info.description or port_info.device, + url=port_info.device, + device_type=self.device_type, + ip=port_info.device, + mac="", + led_count=None, + version=None, + ) + ) + logger.info(f"{self.device_type} serial port scan found {len(results)} port(s)") + return results + except Exception as e: + logger.error(f"{self.device_type} serial port discovery failed: {e}") + return [] + + async def get_power(self, url: str, **kwargs) -> bool: + # Serial devices have no hardware power query; assume on + return True + + async def set_power(self, url: str, on: bool, **kwargs) -> None: + """Turn device on/off by sending an all-black frame (off) or no-op (on). + + Requires kwargs: led_count (int), baud_rate (int | None). + """ + if on: + return # "on" is a no-op — next processing frame lights LEDs up + + led_count = kwargs.get("led_count", 0) + baud_rate = kwargs.get("baud_rate") + if led_count <= 0: + raise ValueError(f"led_count is required to send black frame to {self.device_type} device") + + client = self.create_client(url, led_count=led_count, baud_rate=baud_rate) + try: + await client.connect() + black = np.zeros((led_count, 3), dtype=np.uint8) + await client.send_pixels(black, brightness=255) + logger.info(f"{self.device_type} power off: sent black frame to {url}") + finally: + await client.close() + + async def set_color(self, url: str, color: Tuple[int, int, int], **kwargs) -> None: + """Send a solid color frame to the device. + + Requires kwargs: led_count (int), baud_rate (int | None). + """ + led_count = kwargs.get("led_count", 0) + baud_rate = kwargs.get("baud_rate") + if led_count <= 0: + raise ValueError(f"led_count is required to send color frame to {self.device_type} device") + + client = self.create_client(url, led_count=led_count, baud_rate=baud_rate) + try: + await client.connect() + frame = np.full((led_count, 3), color, dtype=np.uint8) + await client.send_pixels(frame, brightness=255) + logger.info(f"{self.device_type} set_color: sent solid {color} to {url}") + finally: + await client.close() diff --git a/server/src/wled_controller/core/processing/processor_manager.py b/server/src/wled_controller/core/processing/processor_manager.py index 8a44983..8108e4a 100644 --- a/server/src/wled_controller/core/processing/processor_manager.py +++ b/server/src/wled_controller/core/processing/processor_manager.py @@ -658,6 +658,14 @@ class ProcessorManager: for device_id in self._devices: await self._restore_device_idle_state(device_id) + # Power off serial LED devices before closing connections + for device_id, ds in self._devices.items(): + if ds.device_type != "wled": + try: + await self._send_clear_pixels(device_id) + except Exception as e: + logger.error(f"Failed to power off {device_id} on shutdown: {e}") + # Close any cached idle LED clients for did in list(self._idle_clients): await self._close_idle_client(did)