Extract SerialDeviceProvider base class and power off serial devices on shutdown

Create SerialDeviceProvider as the common base for Adalight and AmbiLED
providers, replacing the misleading Adalight→AmbiLED inheritance chain.
Subclasses now only override device_type and create_client(). Also send
explicit black frames to all serial LED devices during server shutdown.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-19 03:04:27 +03:00
parent 45634836b6
commit f83cd81937
5 changed files with 153 additions and 224 deletions

View File

@@ -11,12 +11,14 @@ from wled_controller.core.devices.led_client import (
get_device_capabilities,
get_provider,
)
from wled_controller.core.devices.serial_provider import SerialDeviceProvider
__all__ = [
"DeviceHealth",
"DiscoveredDevice",
"LEDClient",
"LEDDeviceProvider",
"SerialDeviceProvider",
"check_device_health",
"create_led_client",
"get_all_providers",

View File

@@ -1,34 +1,16 @@
"""Adalight device provider — serial LED controller support."""
"""Adalight device provider — serial LED controller using Adalight protocol."""
from typing import List, Tuple
import numpy as np
from wled_controller.core.devices.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
from wled_controller.core.devices.led_client import LEDClient
from wled_controller.core.devices.serial_provider import SerialDeviceProvider
class AdalightDeviceProvider(LEDDeviceProvider):
class AdalightDeviceProvider(SerialDeviceProvider):
"""Provider for Adalight serial LED controllers."""
@property
def device_type(self) -> str:
return "adalight"
@property
def capabilities(self) -> set:
# manual_led_count: user must specify LED count (can't auto-detect)
# power_control: can blank LEDs by sending all-black pixels
# brightness_control: software brightness (multiplies pixel values before sending)
return {"manual_led_count", "power_control", "brightness_control", "static_color"}
def create_client(self, url: str, **kwargs) -> LEDClient:
from wled_controller.core.devices.adalight_client import AdalightClient
@@ -36,111 +18,3 @@ class AdalightDeviceProvider(LEDDeviceProvider):
baud_rate = kwargs.pop("baud_rate", None)
kwargs.pop("use_ddp", None) # Not applicable for serial
return AdalightClient(url, led_count=led_count, baud_rate=baud_rate, **kwargs)
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
from wled_controller.core.devices.adalight_client import AdalightClient
return await AdalightClient.check_health(url, http_client, prev_health)
async def validate_device(self, url: str) -> dict:
"""Validate that the serial port exists.
Returns:
Empty dict — Adalight devices don't report LED count,
so it must be provided by the user.
"""
from wled_controller.core.devices.adalight_client import parse_adalight_url
port, _baud = parse_adalight_url(url)
try:
import serial.tools.list_ports
available_ports = [p.device for p in serial.tools.list_ports.comports()]
port_upper = port.upper()
if not any(p.upper() == port_upper for p in available_ports):
raise ValueError(
f"Serial port {port} not found. "
f"Available ports: {', '.join(available_ports) or 'none'}"
)
except ValueError:
raise
except Exception as e:
raise ValueError(f"Failed to enumerate serial ports: {e}")
logger.info(f"Adalight device validated: port {port}")
return {}
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
"""Discover serial ports that could be Adalight devices."""
try:
import serial.tools.list_ports
ports = serial.tools.list_ports.comports()
results = []
for port_info in ports:
results.append(
DiscoveredDevice(
name=port_info.description or port_info.device,
url=port_info.device,
device_type="adalight",
ip=port_info.device,
mac="",
led_count=None,
version=None,
)
)
logger.info(f"Serial port scan found {len(results)} port(s)")
return results
except Exception as e:
logger.error(f"Serial port discovery failed: {e}")
return []
async def get_power(self, url: str, **kwargs) -> bool:
# Adalight has no hardware power query; assume on
return True
async def set_power(self, url: str, on: bool, **kwargs) -> None:
"""Turn Adalight device on/off by sending an all-black frame (off) or no-op (on).
Requires kwargs: led_count (int), baud_rate (int | None).
"""
if on:
return # "on" is a no-op — next processing frame lights LEDs up
led_count = kwargs.get("led_count", 0)
baud_rate = kwargs.get("baud_rate")
if led_count <= 0:
raise ValueError("led_count is required to send black frame to Adalight device")
from wled_controller.core.devices.adalight_client import AdalightClient
client = AdalightClient(url, led_count=led_count, baud_rate=baud_rate)
try:
await client.connect()
black = np.zeros((led_count, 3), dtype=np.uint8)
await client.send_pixels(black, brightness=255)
logger.info(f"Adalight power off: sent black frame to {url}")
finally:
await client.close()
async def set_color(self, url: str, color: Tuple[int, int, int], **kwargs) -> None:
"""Send a solid color frame to the Adalight device.
Requires kwargs: led_count (int), baud_rate (int | None).
"""
led_count = kwargs.get("led_count", 0)
baud_rate = kwargs.get("baud_rate")
if led_count <= 0:
raise ValueError("led_count is required to send color frame to Adalight device")
from wled_controller.core.devices.adalight_client import AdalightClient
client = AdalightClient(url, led_count=led_count, baud_rate=baud_rate)
try:
await client.connect()
frame = np.full((led_count, 3), color, dtype=np.uint8)
await client.send_pixels(frame, brightness=255)
logger.info(f"Adalight set_color: sent solid {color} to {url}")
finally:
await client.close()

View File

@@ -1,17 +1,10 @@
"""AmbiLED device provider — serial LED controller using AmbiLED protocol."""
from typing import List, Tuple
import numpy as np
from wled_controller.core.devices.adalight_provider import AdalightDeviceProvider
from wled_controller.core.devices.led_client import DiscoveredDevice, LEDClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
from wled_controller.core.devices.led_client import LEDClient
from wled_controller.core.devices.serial_provider import SerialDeviceProvider
class AmbiLEDDeviceProvider(AdalightDeviceProvider):
class AmbiLEDDeviceProvider(SerialDeviceProvider):
"""Provider for AmbiLED serial LED controllers."""
@property
@@ -25,87 +18,3 @@ class AmbiLEDDeviceProvider(AdalightDeviceProvider):
baud_rate = kwargs.pop("baud_rate", None)
kwargs.pop("use_ddp", None)
return AmbiLEDClient(url, led_count=led_count, baud_rate=baud_rate, **kwargs)
async def validate_device(self, url: str) -> dict:
from wled_controller.core.devices.adalight_client import parse_adalight_url
port, _baud = parse_adalight_url(url)
try:
import serial.tools.list_ports
available_ports = [p.device for p in serial.tools.list_ports.comports()]
port_upper = port.upper()
if not any(p.upper() == port_upper for p in available_ports):
raise ValueError(
f"Serial port {port} not found. "
f"Available ports: {', '.join(available_ports) or 'none'}"
)
except ValueError:
raise
except Exception as e:
raise ValueError(f"Failed to enumerate serial ports: {e}")
logger.info(f"AmbiLED device validated: port {port}")
return {}
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
try:
import serial.tools.list_ports
ports = serial.tools.list_ports.comports()
results = []
for port_info in ports:
results.append(
DiscoveredDevice(
name=port_info.description or port_info.device,
url=port_info.device,
device_type="ambiled",
ip=port_info.device,
mac="",
led_count=None,
version=None,
)
)
logger.info(f"AmbiLED serial port scan found {len(results)} port(s)")
return results
except Exception as e:
logger.error(f"AmbiLED serial port discovery failed: {e}")
return []
async def set_power(self, url: str, on: bool, **kwargs) -> None:
if on:
return
led_count = kwargs.get("led_count", 0)
baud_rate = kwargs.get("baud_rate")
if led_count <= 0:
raise ValueError("led_count is required to send black frame to AmbiLED device")
from wled_controller.core.devices.ambiled_client import AmbiLEDClient
client = AmbiLEDClient(url, led_count=led_count, baud_rate=baud_rate)
try:
await client.connect()
black = np.zeros((led_count, 3), dtype=np.uint8)
await client.send_pixels(black, brightness=255)
logger.info(f"AmbiLED power off: sent black frame to {url}")
finally:
await client.close()
async def set_color(self, url: str, color: Tuple[int, int, int], **kwargs) -> None:
led_count = kwargs.get("led_count", 0)
baud_rate = kwargs.get("baud_rate")
if led_count <= 0:
raise ValueError("led_count is required to send color frame to AmbiLED device")
from wled_controller.core.devices.ambiled_client import AmbiLEDClient
client = AmbiLEDClient(url, led_count=led_count, baud_rate=baud_rate)
try:
await client.connect()
frame = np.full((led_count, 3), color, dtype=np.uint8)
await client.send_pixels(frame, brightness=255)
logger.info(f"AmbiLED set_color: sent solid {color} to {url}")
finally:
await client.close()

View File

@@ -0,0 +1,136 @@
"""Base provider for serial LED controllers (Adalight, AmbiLED, etc.).
Subclasses only need to override ``device_type`` and ``create_client()``.
All common serial-device logic (COM port validation, discovery, health
checks, power control via black frames, static colour) lives here.
"""
from typing import List, Tuple
import numpy as np
from wled_controller.core.devices.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class SerialDeviceProvider(LEDDeviceProvider):
"""Base provider for serial LED controllers."""
@property
def capabilities(self) -> set:
# manual_led_count: user must specify LED count (can't auto-detect)
# power_control: can blank LEDs by sending all-black pixels
# brightness_control: software brightness (multiplies pixel values before sending)
# static_color: can send a solid colour frame
return {"manual_led_count", "power_control", "brightness_control", "static_color"}
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
# Generic serial port health check — enumerate COM ports
from wled_controller.core.devices.adalight_client import AdalightClient
return await AdalightClient.check_health(url, http_client, prev_health)
async def validate_device(self, url: str) -> dict:
"""Validate that the serial port exists.
Returns empty dict — serial devices don't report LED count,
so it must be provided by the user.
"""
from wled_controller.core.devices.adalight_client import parse_adalight_url
port, _baud = parse_adalight_url(url)
try:
import serial.tools.list_ports
available_ports = [p.device for p in serial.tools.list_ports.comports()]
port_upper = port.upper()
if not any(p.upper() == port_upper for p in available_ports):
raise ValueError(
f"Serial port {port} not found. "
f"Available ports: {', '.join(available_ports) or 'none'}"
)
except ValueError:
raise
except Exception as e:
raise ValueError(f"Failed to enumerate serial ports: {e}")
logger.info(f"{self.device_type} device validated: port {port}")
return {}
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
"""Discover serial ports that could be LED devices."""
try:
import serial.tools.list_ports
ports = serial.tools.list_ports.comports()
results = []
for port_info in ports:
results.append(
DiscoveredDevice(
name=port_info.description or port_info.device,
url=port_info.device,
device_type=self.device_type,
ip=port_info.device,
mac="",
led_count=None,
version=None,
)
)
logger.info(f"{self.device_type} serial port scan found {len(results)} port(s)")
return results
except Exception as e:
logger.error(f"{self.device_type} serial port discovery failed: {e}")
return []
async def get_power(self, url: str, **kwargs) -> bool:
# Serial devices have no hardware power query; assume on
return True
async def set_power(self, url: str, on: bool, **kwargs) -> None:
"""Turn device on/off by sending an all-black frame (off) or no-op (on).
Requires kwargs: led_count (int), baud_rate (int | None).
"""
if on:
return # "on" is a no-op — next processing frame lights LEDs up
led_count = kwargs.get("led_count", 0)
baud_rate = kwargs.get("baud_rate")
if led_count <= 0:
raise ValueError(f"led_count is required to send black frame to {self.device_type} device")
client = self.create_client(url, led_count=led_count, baud_rate=baud_rate)
try:
await client.connect()
black = np.zeros((led_count, 3), dtype=np.uint8)
await client.send_pixels(black, brightness=255)
logger.info(f"{self.device_type} power off: sent black frame to {url}")
finally:
await client.close()
async def set_color(self, url: str, color: Tuple[int, int, int], **kwargs) -> None:
"""Send a solid color frame to the device.
Requires kwargs: led_count (int), baud_rate (int | None).
"""
led_count = kwargs.get("led_count", 0)
baud_rate = kwargs.get("baud_rate")
if led_count <= 0:
raise ValueError(f"led_count is required to send color frame to {self.device_type} device")
client = self.create_client(url, led_count=led_count, baud_rate=baud_rate)
try:
await client.connect()
frame = np.full((led_count, 3), color, dtype=np.uint8)
await client.send_pixels(frame, brightness=255)
logger.info(f"{self.device_type} set_color: sent solid {color} to {url}")
finally:
await client.close()

View File

@@ -658,6 +658,14 @@ class ProcessorManager:
for device_id in self._devices:
await self._restore_device_idle_state(device_id)
# Power off serial LED devices before closing connections
for device_id, ds in self._devices.items():
if ds.device_type != "wled":
try:
await self._send_clear_pixels(device_id)
except Exception as e:
logger.error(f"Failed to power off {device_id} on shutdown: {e}")
# Close any cached idle LED clients
for did in list(self._idle_clients):
await self._close_idle_client(did)