Add Art-Net / sACN (E1.31) DMX device support

Full-stack implementation of DMX output for stage lighting and LED controllers:
- DMXClient with Art-Net and sACN packet builders, multi-universe splitting
- DMXDeviceProvider with manual_led_count capability and URL parsing
- Device store, API schemas, routes wired with dmx_protocol/start_universe/start_channel
- Frontend: add/settings modals with DMX fields, IconSelect protocol picker
- Fix add device modal dirty check on type change (re-snapshot after switch)
- i18n keys for DMX in en/ru/zh locales

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-13 16:46:40 +03:00
parent 18c886cbc5
commit ff24ec95e6
18 changed files with 607 additions and 7 deletions

View File

@@ -0,0 +1,245 @@
"""Art-Net / sACN (E1.31) DMX client for stage lighting and LED controllers."""
import asyncio
import struct
import uuid
from typing import List, Optional, Tuple, Union
import numpy as np
from wled_controller.core.devices.led_client import LEDClient, DeviceHealth
from wled_controller.utils import get_logger
logger = get_logger(__name__)
# Art-Net constants
ARTNET_PORT = 6454
ARTNET_HEADER = b"Art-Net\x00"
ARTNET_OPCODE_DMX = 0x5000
ARTNET_PROTOCOL_VERSION = 14
# sACN / E1.31 constants
SACN_PORT = 5568
ACN_PACKET_IDENTIFIER = b"\x00\x10\x00\x00\x41\x53\x43\x2d\x45\x31\x2e\x31\x37\x00\x00\x00"
SACN_VECTOR_ROOT = 0x00000004
SACN_VECTOR_FRAMING = 0x00000002
SACN_VECTOR_DMP = 0x02
# DMX512 limits
DMX_CHANNELS_PER_UNIVERSE = 512
DMX_PIXELS_PER_UNIVERSE = 170 # floor(512 / 3)
class DMXClient(LEDClient):
"""UDP client for Art-Net and sACN (E1.31) DMX protocols.
Supports sending RGB pixel data across multiple DMX universes.
Both protocols are UDP fire-and-forget, similar to DDP.
"""
def __init__(
self,
host: str,
port: Optional[int] = None,
led_count: int = 1,
protocol: str = "artnet",
start_universe: int = 0,
start_channel: int = 1,
**kwargs,
):
self.host = host
self.protocol = protocol.lower()
self.port = port or (ARTNET_PORT if self.protocol == "artnet" else SACN_PORT)
self.led_count = led_count
self.start_universe = start_universe
self.start_channel = max(1, min(512, start_channel)) # clamp 1-512
self._transport = None
self._protocol_obj = None
self._sequence = 0
# sACN requires a stable 16-byte CID (Component Identifier)
self._sacn_cid = uuid.uuid4().bytes
self._sacn_source_name = b"WLED Controller\x00" + b"\x00" * 48 # 64 bytes padded
# Pre-compute universe mapping
self._universe_map = self._compute_universe_map()
def _compute_universe_map(self) -> List[Tuple[int, int, int]]:
"""Pre-compute which channels go to which universe.
Returns list of (universe, channel_offset, num_channels) tuples.
channel_offset is 0-based index into the flat RGB byte array.
"""
total_channels = self.led_count * 3
start_ch_0 = self.start_channel - 1 # convert to 0-based
mapping = []
byte_offset = 0
universe = self.start_universe
ch_in_universe = start_ch_0
while byte_offset < total_channels:
available = DMX_CHANNELS_PER_UNIVERSE - ch_in_universe
needed = total_channels - byte_offset
count = min(available, needed)
mapping.append((universe, ch_in_universe, count, byte_offset))
byte_offset += count
universe += 1
ch_in_universe = 0 # subsequent universes start at channel 0
return mapping
@property
def is_connected(self) -> bool:
return self._transport is not None
@property
def supports_fast_send(self) -> bool:
return True
async def connect(self) -> bool:
try:
loop = asyncio.get_event_loop()
self._transport, self._protocol_obj = await loop.create_datagram_endpoint(
asyncio.DatagramProtocol,
remote_addr=(self.host, self.port),
)
num_universes = len(self._universe_map)
logger.info(
f"DMX/{self.protocol} client connected to {self.host}:{self.port} "
f"({self.led_count} LEDs across {num_universes} universe(s), "
f"starting at universe {self.start_universe} channel {self.start_channel})"
)
return True
except Exception as e:
logger.error(f"Failed to connect DMX client: {e}")
raise
async def close(self) -> None:
if self._transport:
self._transport.close()
self._transport = None
self._protocol_obj = None
logger.debug(f"Closed DMX/{self.protocol} connection to {self.host}:{self.port}")
async def send_pixels(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> bool:
if not self._transport:
raise RuntimeError("DMX client not connected")
self.send_pixels_fast(pixels, brightness)
return True
def send_pixels_fast(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> None:
if not self._transport:
raise RuntimeError("DMX client not connected")
if isinstance(pixels, np.ndarray):
pixel_bytes = pixels.tobytes()
else:
pixel_bytes = np.array(pixels, dtype=np.uint8).tobytes()
self._sequence = (self._sequence + 1) % 256
for universe, ch_offset, num_channels, byte_offset in self._universe_map:
# Build a full 512-channel DMX frame (zero-padded)
dmx_data = bytearray(DMX_CHANNELS_PER_UNIVERSE)
chunk = pixel_bytes[byte_offset:byte_offset + num_channels]
dmx_data[ch_offset:ch_offset + len(chunk)] = chunk
if self.protocol == "sacn":
packet = self._build_sacn_packet(universe, bytes(dmx_data), self._sequence)
else:
packet = self._build_artnet_packet(universe, bytes(dmx_data), self._sequence)
self._transport.sendto(packet)
def _build_artnet_packet(self, universe: int, dmx_data: bytes, sequence: int) -> bytes:
"""Build an Art-Net DMX (OpDmx / 0x5000) packet.
Art-Net packet structure:
- 8 bytes: "Art-Net\\0" header
- 2 bytes: OpCode (0x5000 little-endian)
- 2 bytes: Protocol version (14, big-endian)
- 1 byte: Sequence number
- 1 byte: Physical port (0)
- 2 bytes: Universe (little-endian, 15-bit: subnet+universe)
- 2 bytes: Data length (big-endian, must be even, 2-512)
- N bytes: DMX channel data
"""
data_len = len(dmx_data)
# Art-Net requires even data length
if data_len % 2 != 0:
dmx_data = dmx_data + b"\x00"
data_len += 1
packet = bytearray()
packet.extend(ARTNET_HEADER) # 8 bytes: "Art-Net\0"
packet.extend(struct.pack("<H", ARTNET_OPCODE_DMX)) # 2 bytes: OpCode LE
packet.extend(struct.pack(">H", ARTNET_PROTOCOL_VERSION)) # 2 bytes: version BE
packet.append(sequence & 0xFF) # 1 byte: sequence
packet.append(0) # 1 byte: physical
packet.extend(struct.pack("<H", universe & 0x7FFF)) # 2 bytes: universe LE
packet.extend(struct.pack(">H", data_len)) # 2 bytes: length BE
packet.extend(dmx_data) # N bytes: data
return bytes(packet)
def _build_sacn_packet(
self, universe: int, dmx_data: bytes, sequence: int, priority: int = 100,
) -> bytes:
"""Build an sACN / E1.31 data packet.
Structure:
- Root layer (38 bytes)
- Framing layer (77 bytes)
- DMP layer (10 bytes + 1 start code + DMX data)
"""
slot_count = len(dmx_data) + 1 # +1 for DMX start code (0x00)
dmp_len = 10 + slot_count # DMP layer
framing_len = 77 + dmp_len # framing layer
root_len = 22 + framing_len # root layer (after preamble)
packet = bytearray()
# ── Root Layer ──
packet.extend(struct.pack(">H", 0x0010)) # preamble size
packet.extend(struct.pack(">H", 0x0000)) # post-amble size
packet.extend(ACN_PACKET_IDENTIFIER) # 12 bytes ACN packet ID
# Flags + length (high 4 bits = 0x7, low 12 = root_len)
packet.extend(struct.pack(">H", 0x7000 | (root_len & 0x0FFF)))
packet.extend(struct.pack(">I", SACN_VECTOR_ROOT)) # vector
packet.extend(self._sacn_cid) # 16 bytes CID
# ── Framing Layer ──
packet.extend(struct.pack(">H", 0x7000 | (framing_len & 0x0FFF)))
packet.extend(struct.pack(">I", SACN_VECTOR_FRAMING)) # vector
packet.extend(self._sacn_source_name[:64]) # 64 bytes source name
packet.append(priority & 0xFF) # priority
packet.extend(struct.pack(">H", 0)) # sync address (0 = none)
packet.append(sequence & 0xFF) # sequence
packet.append(0) # options (0 = normal)
packet.extend(struct.pack(">H", universe)) # universe
# ── DMP Layer ──
packet.extend(struct.pack(">H", 0x7000 | (dmp_len & 0x0FFF)))
packet.append(SACN_VECTOR_DMP) # vector
packet.append(0xA1) # address type & data type
packet.extend(struct.pack(">H", 0)) # first property address
packet.extend(struct.pack(">H", 1)) # address increment
packet.extend(struct.pack(">H", slot_count)) # property value count
packet.append(0x00) # DMX start code
packet.extend(dmx_data) # DMX channel data
return bytes(packet)
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()

View File

@@ -0,0 +1,82 @@
"""DMX device provider — Art-Net / sACN (E1.31) factory, validation, health."""
from datetime import datetime, timezone
from typing import List
from urllib.parse import urlparse
from wled_controller.core.devices.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
)
from wled_controller.core.devices.dmx_client import DMXClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
def parse_dmx_url(url: str) -> dict:
"""Parse a DMX URL like 'artnet://192.168.1.50' or 'sacn://192.168.1.50'.
Returns dict with 'host', 'port', 'protocol'.
Also accepts plain IP addresses (defaults to artnet).
"""
url = url.strip()
if "://" not in url:
url = f"artnet://{url}"
parsed = urlparse(url)
protocol = parsed.scheme.lower()
if protocol not in ("artnet", "sacn"):
protocol = "artnet"
host = parsed.hostname or "127.0.0.1"
port = parsed.port # None = use protocol default
return {"host": host, "port": port, "protocol": protocol}
class DMXDeviceProvider(LEDDeviceProvider):
"""Provider for Art-Net and sACN (E1.31) DMX devices."""
@property
def device_type(self) -> str:
return "dmx"
@property
def capabilities(self) -> set:
return {"manual_led_count"}
def create_client(self, url: str, **kwargs) -> LEDClient:
parsed = parse_dmx_url(url)
return DMXClient(
host=parsed["host"],
port=parsed["port"],
led_count=kwargs.get("led_count", 1),
protocol=kwargs.get("dmx_protocol", parsed["protocol"]),
start_universe=kwargs.get("dmx_start_universe", 0),
start_channel=kwargs.get("dmx_start_channel", 1),
)
async def check_health(
self, url: str, http_client, prev_health=None,
) -> DeviceHealth:
# DMX is UDP fire-and-forget — no reliable health probe.
# Report as always online (same pattern as WS/Mock providers).
return DeviceHealth(
online=True,
latency_ms=0.0,
last_checked=datetime.now(timezone.utc),
)
async def validate_device(self, url: str) -> dict:
"""Validate DMX device URL."""
parsed = parse_dmx_url(url)
if not parsed["host"]:
raise ValueError("DMX device requires a valid IP address or hostname")
return {}
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
# No auto-discovery for DMX devices
return []

View File

@@ -296,5 +296,8 @@ def _register_builtin_providers():
from wled_controller.core.devices.openrgb_provider import OpenRGBDeviceProvider
register_provider(OpenRGBDeviceProvider())
from wled_controller.core.devices.dmx_provider import DMXDeviceProvider
register_provider(DMXDeviceProvider())
_register_builtin_providers()

View File

@@ -169,14 +169,20 @@ class ProcessorManager:
ds = self._devices.get(device_id)
if ds is None:
return None
# Read mock-specific fields from persistent storage
# Read device-specific fields from persistent storage
send_latency_ms = 0
rgbw = False
dmx_protocol = "artnet"
dmx_start_universe = 0
dmx_start_channel = 1
if self._device_store:
dev = self._device_store.get_device(ds.device_id)
if dev:
send_latency_ms = getattr(dev, "send_latency_ms", 0)
rgbw = getattr(dev, "rgbw", False)
dmx_protocol = getattr(dev, "dmx_protocol", "artnet")
dmx_start_universe = getattr(dev, "dmx_start_universe", 0)
dmx_start_channel = getattr(dev, "dmx_start_channel", 1)
return DeviceInfo(
device_id=ds.device_id,
@@ -190,6 +196,9 @@ class ProcessorManager:
rgbw=rgbw,
zone_mode=ds.zone_mode,
auto_shutdown=ds.auto_shutdown,
dmx_protocol=dmx_protocol,
dmx_start_universe=dmx_start_universe,
dmx_start_channel=dmx_start_channel,
)
# ===== EVENT SYSTEM (state change notifications) =====

View File

@@ -76,6 +76,10 @@ class DeviceInfo:
rgbw: bool = False
zone_mode: str = "combined"
auto_shutdown: bool = False
# DMX (Art-Net / sACN) fields
dmx_protocol: str = "artnet"
dmx_start_universe: int = 0
dmx_start_channel: int = 1
@dataclass

View File

@@ -109,6 +109,9 @@ class WledTargetProcessor(TargetProcessor):
send_latency_ms=device_info.send_latency_ms,
rgbw=device_info.rgbw,
zone_mode=device_info.zone_mode,
dmx_protocol=device_info.dmx_protocol,
dmx_start_universe=device_info.dmx_start_universe,
dmx_start_channel=device_info.dmx_start_channel,
)
await self._led_client.connect()