feat(wled): native realtime UDP output (DRGB/DRGBW/DNRGB) with auto-revert

Add WLED's native realtime UDP protocol (port 21324) as a third output mode for
LED targets, alongside DDP and HTTP. For the device LedGrab drives most, this
brings three user-visible wins DDP lacks:

- Auto-revert: every packet carries a timeout byte, so if the stream stops
  (host hiccup/sleep/crash) WLED returns to its preset instead of freezing on
  the last frame.
- Correct RGBW whites: the DRGBW variant carries an explicit white channel.
- Lighter on weak Wi-Fi: raw RGB with a 2-byte header.

New WledRealtimeClient auto-selects DRGB (<=490), DRGBW (<=367), or chunked
DNRGB (>490). WLED applies its own per-bus colour order in realtime mode, so we
send plain RGB and the user's colour-order config just works. Protocol 'udp' is
threaded through WLEDConfig/provider/processor and the schema pattern; the
target editor gains a protocol option + badge + i18n (en/ru/zh).

8 unit tests for the packet builder; full suite green (1919 passed).
This commit is contained in:
2026-06-04 23:34:26 +03:00
parent e28ab5a956
commit 7728aecb4f
12 changed files with 337 additions and 18 deletions
@@ -91,7 +91,7 @@ class LedOutputTargetResponse(_OutputTargetResponseBase):
adaptive_fps: bool = Field( adaptive_fps: bool = Field(
default=False, description="Auto-reduce FPS when device is unresponsive" default=False, description="Auto-reduce FPS when device is unresponsive"
) )
protocol: str = Field(default="ddp", description="Send protocol (ddp or http)") protocol: str = Field(default="ddp", description="Send protocol (ddp, udp, or http)")
max_milliamps: int = Field( max_milliamps: int = Field(
default=0, description="ABL: PSU current budget in mA (0 = unlimited)" default=0, description="ABL: PSU current budget in mA (0 = unlimited)"
) )
@@ -237,8 +237,8 @@ class LedOutputTargetCreate(_OutputTargetCreateBase):
) )
protocol: str = Field( protocol: str = Field(
default="ddp", default="ddp",
pattern="^(ddp|http)$", pattern="^(ddp|http|udp)$",
description="Send protocol: ddp (UDP) or http (JSON API)", description="Send protocol: ddp (DDP/UDP), udp (WLED native realtime UDP), or http (JSON API)",
) )
max_milliamps: int = Field( max_milliamps: int = Field(
default=0, default=0,
@@ -386,7 +386,9 @@ class LedOutputTargetUpdate(_OutputTargetUpdateBase):
None, description="Auto-reduce FPS when device is unresponsive" None, description="Auto-reduce FPS when device is unresponsive"
) )
protocol: str | None = Field( protocol: str | None = Field(
None, pattern="^(ddp|http)$", description="Send protocol: ddp (UDP) or http (JSON API)" None,
pattern="^(ddp|http|udp)$",
description="Send protocol: ddp (DDP/UDP), udp (WLED native realtime UDP), or http (JSON API)",
) )
max_milliamps: int | None = Field( max_milliamps: int | None = Field(
None, ge=0, le=200000, description="ABL: PSU current budget in mA (0 = unlimited)" None, ge=0, le=200000, description="ABL: PSU current budget in mA (0 = unlimited)"
@@ -23,6 +23,11 @@ class BaseDeviceConfig:
class WLEDConfig(BaseDeviceConfig): class WLEDConfig(BaseDeviceConfig):
device_type: Literal["wled"] = "wled" device_type: Literal["wled"] = "wled"
use_ddp: bool = False use_ddp: bool = False
# WLED native realtime UDP (port 21324) — mutually exclusive with use_ddp.
# realtime_timeout = seconds WLED stays in realtime after the last packet
# before reverting to its normal effect/preset (graceful auto-revert).
use_realtime: bool = False
realtime_timeout: int = 2
@dataclass(frozen=True) @dataclass(frozen=True)
+52 -8
View File
@@ -86,6 +86,8 @@ class WLEDClient(LEDClient):
retry_attempts: int = 3, retry_attempts: int = 3,
retry_delay: int = 1, retry_delay: int = 1,
use_ddp: bool = False, use_ddp: bool = False,
use_realtime: bool = False,
realtime_timeout: int = 2,
): ):
"""Initialize WLED client. """Initialize WLED client.
@@ -95,12 +97,17 @@ class WLEDClient(LEDClient):
retry_attempts: Number of retry attempts on failure retry_attempts: Number of retry attempts on failure
retry_delay: Delay between retries in seconds retry_delay: Delay between retries in seconds
use_ddp: Force DDP protocol (auto-enabled for >500 LEDs) use_ddp: Force DDP protocol (auto-enabled for >500 LEDs)
use_realtime: Use WLED native realtime UDP (port 21324) instead of DDP
realtime_timeout: Seconds WLED stays in realtime after the last packet
before reverting to its normal effect/preset (1-255)
""" """
self.url = url.rstrip("/") self.url = url.rstrip("/")
self.timeout = timeout self.timeout = timeout
self.retry_attempts = retry_attempts self.retry_attempts = retry_attempts
self.retry_delay = retry_delay self.retry_delay = retry_delay
self.use_ddp = use_ddp self.use_ddp = use_ddp
self.use_realtime = use_realtime
self.realtime_timeout = realtime_timeout
# Extract hostname/IP from URL for DDP # Extract hostname/IP from URL for DDP
parsed = urlparse(self.url) parsed = urlparse(self.url)
@@ -108,6 +115,7 @@ class WLEDClient(LEDClient):
self._client: httpx.AsyncClient | None = None self._client: httpx.AsyncClient | None = None
self._ddp_client: DDPClient | None = None self._ddp_client: DDPClient | None = None
self._realtime_client = None # WledRealtimeClient when use_realtime
self._connected = False self._connected = False
self._pre_connect_state: dict | None = None self._pre_connect_state: dict | None = None
@@ -127,8 +135,9 @@ class WLEDClient(LEDClient):
# Test connection by getting device info # Test connection by getting device info
info = await self.get_info() info = await self.get_info()
# Auto-enable DDP for large LED counts # Auto-enable DDP for large LED counts (unless the user explicitly
if info.led_count > self.HTTP_MAX_LEDS and not self.use_ddp: # chose native realtime UDP, which handles any size via DNRGB).
if info.led_count > self.HTTP_MAX_LEDS and not self.use_ddp and not self.use_realtime:
logger.info( logger.info(
f"Device has {info.led_count} LEDs (>{self.HTTP_MAX_LEDS}), " f"Device has {info.led_count} LEDs (>{self.HTTP_MAX_LEDS}), "
"auto-enabling DDP protocol" "auto-enabling DDP protocol"
@@ -138,8 +147,30 @@ class WLEDClient(LEDClient):
# Snapshot device state BEFORE any mutations (for auto-restore) # Snapshot device state BEFORE any mutations (for auto-restore)
self._pre_connect_state = await self.snapshot_device_state() self._pre_connect_state = await self.snapshot_device_state()
# Create WLED native realtime UDP client if selected
if self.use_realtime:
from ledgrab.core.devices.wled_realtime_client import WledRealtimeClient
self._realtime_client = WledRealtimeClient(
self.host, rgbw=info.rgbw, timeout_secs=self.realtime_timeout
)
await self._realtime_client.connect()
try:
await self._request(
"POST",
"/json/state",
json_data={"on": True, "lor": 0, "AudioReactive": {"on": False}},
)
except Exception as e:
logger.warning(f"Could not configure device for realtime UDP: {e}")
logger.info(
"WLED native realtime UDP enabled (port 21324, %ds timeout, %s)",
self.realtime_timeout,
"RGBW" if info.rgbw else "RGB",
)
# Create DDP client if needed # Create DDP client if needed
if self.use_ddp: elif self.use_ddp:
self._ddp_client = DDPClient(self.host, rgbw=False) self._ddp_client = DDPClient(self.host, rgbw=False)
# Pass per-bus config so DDP client can apply per-bus color reordering # Pass per-bus config so DDP client can apply per-bus color reordering
if info.buses: if info.buses:
@@ -191,6 +222,9 @@ class WLEDClient(LEDClient):
if self._ddp_client: if self._ddp_client:
await self._ddp_client.close() await self._ddp_client.close()
self._ddp_client = None self._ddp_client = None
if self._realtime_client:
await self._realtime_client.close()
self._realtime_client = None
self._connected = False self._connected = False
logger.debug(f"Closed connection to {self.url}") logger.debug(f"Closed connection to {self.url}")
@@ -201,8 +235,10 @@ class WLEDClient(LEDClient):
@property @property
def supports_fast_send(self) -> bool: def supports_fast_send(self) -> bool:
"""True when DDP is active and ready for fire-and-forget sends.""" """True when DDP or native realtime UDP is active (fire-and-forget)."""
return self.use_ddp and self._ddp_client is not None return (self.use_ddp and self._ddp_client is not None) or (
self.use_realtime and self._realtime_client is not None
)
async def _request( async def _request(
self, self,
@@ -384,7 +420,10 @@ class WLEDClient(LEDClient):
raise ValueError(f"Invalid RGB values at index {idx}: {tuple(pixel_arr[idx])}") raise ValueError(f"Invalid RGB values at index {idx}: {tuple(pixel_arr[idx])}")
validated_pixels = pixel_arr.astype(np.uint8) if pixel_arr.dtype != np.uint8 else pixel_arr validated_pixels = pixel_arr.astype(np.uint8) if pixel_arr.dtype != np.uint8 else pixel_arr
# Use DDP protocol if enabled # Native realtime UDP takes precedence, then DDP, then HTTP
if self.use_realtime and self._realtime_client:
self._realtime_client.send_pixels_numpy(validated_pixels)
return True
if self.use_ddp and self._ddp_client: if self.use_ddp and self._ddp_client:
return await self._send_pixels_ddp(validated_pixels, brightness) return await self._send_pixels_ddp(validated_pixels, brightness)
else: else:
@@ -485,8 +524,10 @@ class WLEDClient(LEDClient):
pixels: numpy array (N, 3) uint8 or list of (R, G, B) tuples pixels: numpy array (N, 3) uint8 or list of (R, G, B) tuples
brightness: Global brightness (0-255) brightness: Global brightness (0-255)
""" """
if not self.use_ddp or not self._ddp_client: if not (self.use_ddp and self._ddp_client) and not (
raise RuntimeError("send_pixels_fast requires DDP; use send_pixels for HTTP") self.use_realtime and self._realtime_client
):
raise RuntimeError("send_pixels_fast requires DDP or realtime UDP; use send_pixels")
if isinstance(pixels, np.ndarray): if isinstance(pixels, np.ndarray):
pixel_array = pixels pixel_array = pixels
@@ -494,6 +535,9 @@ class WLEDClient(LEDClient):
pixel_array = np.array(pixels, dtype=np.uint8) pixel_array = np.array(pixels, dtype=np.uint8)
# Note: brightness already applied by processor loop (_cached_brightness) # Note: brightness already applied by processor loop (_cached_brightness)
if self.use_realtime and self._realtime_client:
self._realtime_client.send_pixels_numpy(pixel_array)
else:
self._ddp_client.send_pixels_numpy(pixel_array) self._ddp_client.send_pixels_numpy(pixel_array)
# ===== LEDClient abstraction methods ===== # ===== LEDClient abstraction methods =====
@@ -86,6 +86,8 @@ class WLEDDeviceProvider(LEDDeviceProvider):
return WLEDClient( return WLEDClient(
config.device_url, config.device_url,
use_ddp=config.use_ddp, use_ddp=config.use_ddp,
use_realtime=config.use_realtime,
realtime_timeout=config.realtime_timeout,
) )
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth: async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
@@ -0,0 +1,153 @@
"""WLED native realtime UDP client (port 21324).
WLED exposes a family of "realtime" UDP protocols separate from DDP. Compared to
the DDP path this gives three user-visible wins for the device LedGrab drives
most:
* **Auto-revert** — every packet carries a *timeout* byte. If LedGrab stops
streaming (host hiccup, sleep, crash), WLED returns to its normal effect /
preset after that many seconds instead of freezing on the last frame.
* **Correct RGBW whites** — the DRGBW variant carries an explicit white channel,
so RGBW strips are driven correctly instead of leaving W uncontrolled.
* **Lighter on weak Wi-Fi** — raw RGB with a 2-byte header, no DDP framing.
Unlike the DDP path, WLED applies the configured per-bus color order itself in
realtime mode, so this sender transmits plain RGB (no manual reordering) — the
user's WLED colour-order setting just works.
Packet layout (first byte selects the protocol)::
DRGB (2): [2][timeout] + R G B per LED (<= 490 LEDs)
DRGBW (3): [3][timeout] + R G B W per LED (<= 367 LEDs)
DNRGB (4): [4][timeout][start_hi][start_lo] + R G B per LED (chunked, 489/pkt)
The ``timeout`` byte is in **seconds** (1-255). DNRGB carries a 16-bit start
index so strips larger than one packet are sent as several chunks.
Ref: https://kno.wled.ge/interfaces/udp-realtime/
"""
from __future__ import annotations
import asyncio
import numpy as np
from ledgrab.utils import get_logger
logger = get_logger(__name__)
REALTIME_PORT = 21324
# Protocol selector (first byte).
_DRGB = 2
_DRGBW = 3
_DNRGB = 4
# Per-protocol LED capacity (bounded by the ~1500-byte UDP payload).
_MAX_DRGB = 490 # 2 + 490*3 = 1472
_MAX_DRGBW = 367 # 2 + 367*4 = 1470
_MAX_DNRGB_CHUNK = 489 # 4 + 489*3 = 1471
# Default seconds WLED stays in realtime after the last packet before reverting.
DEFAULT_REALTIME_TIMEOUT = 2
def _clamp_timeout(seconds: int) -> int:
"""Clamp the realtime timeout to the on-wire 1-255 range."""
return max(1, min(255, int(seconds)))
class WledRealtimeClient:
"""Fire-and-forget UDP sender for WLED native realtime protocols."""
def __init__(
self,
host: str,
port: int = REALTIME_PORT,
rgbw: bool = False,
timeout_secs: int = DEFAULT_REALTIME_TIMEOUT,
) -> None:
self.host = host
self.port = port
self.rgbw = rgbw
self.timeout_secs = _clamp_timeout(timeout_secs)
self._transport: asyncio.DatagramTransport | None = None
self._protocol: asyncio.DatagramProtocol | None = None
# Reusable RGBW scratch (resized on demand) so the hot path doesn't
# allocate a fresh (N, 4) array per frame.
self._rgbw_buf: np.ndarray | None = None
self._rgbw_buf_n: int = 0
async def connect(self) -> bool:
"""Open the UDP datagram endpoint to the device."""
loop = asyncio.get_running_loop()
self._transport, self._protocol = await loop.create_datagram_endpoint(
asyncio.DatagramProtocol, remote_addr=(self.host, self.port)
)
logger.info(
"WLED realtime client connected to %s:%d (timeout %ds, %s)",
self.host,
self.port,
self.timeout_secs,
"RGBW" if self.rgbw else "RGB",
)
return True
async def close(self) -> None:
"""Close the datagram endpoint."""
if self._transport is not None:
self._transport.close()
self._transport = None
self._protocol = None
logger.debug("Closed WLED realtime connection to %s:%d", self.host, self.port)
@property
def is_connected(self) -> bool:
return self._transport is not None
def _ensure_rgbw_buf(self, n: int) -> np.ndarray:
"""Return an ``(n, 4)`` uint8 RGBW buffer with the white channel zeroed."""
if self._rgbw_buf is None or self._rgbw_buf_n != n:
self._rgbw_buf = np.zeros((n, 4), dtype=np.uint8)
self._rgbw_buf_n = n
return self._rgbw_buf
def build_packets(self, pixels: np.ndarray) -> list[bytes]:
"""Build the realtime UDP packet(s) for one ``(N, 3)`` uint8 RGB frame.
Exposed (and pure) for unit testing the wire format. Picks DRGBW for
RGBW strips within range, DRGB for small RGB strips, otherwise DNRGB
chunks. The white channel is sent as 0 (colour comes from the RGB LEDs).
"""
pixels = np.ascontiguousarray(pixels, dtype=np.uint8)
n = len(pixels)
t = self.timeout_secs
if n == 0:
return []
if self.rgbw and n <= _MAX_DRGBW:
buf = self._ensure_rgbw_buf(n)
buf[:, 0:3] = pixels
# white channel already zeroed and left at 0
return [bytes([_DRGBW, t]) + buf.tobytes()]
if n <= _MAX_DRGB and not self.rgbw:
return [bytes([_DRGB, t]) + pixels.tobytes()]
# DNRGB: 16-bit start index, chunked. Covers >490 RGB and >367 RGBW
# (the white channel is dropped for oversized RGBW strips).
packets: list[bytes] = []
for start in range(0, n, _MAX_DNRGB_CHUNK):
end = min(start + _MAX_DNRGB_CHUNK, n)
header = bytes([_DNRGB, t, (start >> 8) & 0xFF, start & 0xFF])
packets.append(header + pixels[start:end].tobytes())
return packets
def send_pixels_numpy(self, pixels: np.ndarray) -> bool:
"""Send one frame of ``(N, 3)`` uint8 RGB pixels (fire-and-forget)."""
if self._transport is None:
return False
for packet in self.build_packets(pixels):
self._transport.sendto(packet)
return True
@@ -156,9 +156,15 @@ class WledTargetProcessor(TargetProcessor):
from ledgrab.core.devices.device_config import WLEDConfig as _WLEDConfig from ledgrab.core.devices.device_config import WLEDConfig as _WLEDConfig
config = _dev.to_config() config = _dev.to_config()
# use_ddp is a target-derived protocol setting — override on WLEDConfig # The target's protocol selects how we drive a WLED device:
# "ddp" -> DDP UDP (4048) "udp" -> WLED native realtime UDP (21324)
# "http" -> JSON API (use_ddp and use_realtime are exclusive)
if isinstance(config, _WLEDConfig): if isinstance(config, _WLEDConfig):
config = _replace(config, use_ddp=(self._protocol == "ddp")) config = _replace(
config,
use_ddp=(self._protocol == "ddp"),
use_realtime=(self._protocol == "udp"),
)
self._device_config = config self._device_config = config
# Connect to LED device # Connect to LED device
@@ -183,8 +183,13 @@ const targetEditorModal = new TargetEditorModal();
function _protocolBadge(device: any, target: any) { function _protocolBadge(device: any, target: any) {
const dt = device?.device_type; const dt = device?.device_type;
if (!dt || dt === 'wled') { if (!dt || dt === 'wled') {
const proto = target.protocol === 'http' ? 'HTTP' : 'DDP'; const wledMap: Record<string, [string, string]> = {
return `${target.protocol === 'http' ? ICON_GLOBE : ICON_RADIO} ${proto}`; http: [ICON_GLOBE, 'HTTP'],
udp: [ICON_RADIO, 'WLED UDP'],
ddp: [ICON_RADIO, 'DDP'],
};
const [icon, label] = wledMap[target.protocol] || wledMap.ddp;
return `${icon} ${label}`;
} }
const map = { const map = {
openrgb: [ICON_PALETTE, 'OpenRGB SDK'], openrgb: [ICON_PALETTE, 'OpenRGB SDK'],
@@ -313,10 +318,11 @@ function _ensureProtocolIconSelect() {
if (!sel) return; if (!sel) return;
const items = [ const items = [
{ value: 'ddp', icon: _pIcon(P.radio), label: t('targets.protocol.ddp'), desc: t('targets.protocol.ddp.desc') }, { value: 'ddp', icon: _pIcon(P.radio), label: t('targets.protocol.ddp'), desc: t('targets.protocol.ddp.desc') },
{ value: 'udp', icon: _pIcon(P.radio), label: t('targets.protocol.udp'), desc: t('targets.protocol.udp.desc') },
{ value: 'http', icon: _pIcon(P.globe), label: t('targets.protocol.http'), desc: t('targets.protocol.http.desc') }, { value: 'http', icon: _pIcon(P.globe), label: t('targets.protocol.http'), desc: t('targets.protocol.http.desc') },
]; ];
if (_protocolIconSelect) { _protocolIconSelect.updateItems(items); return; } if (_protocolIconSelect) { _protocolIconSelect.updateItems(items); return; }
_protocolIconSelect = new IconSelect({ target: sel as HTMLSelectElement, items, columns: 2 }); _protocolIconSelect = new IconSelect({ target: sel as HTMLSelectElement, items, columns: 3 });
} }
function _ensureBrightnessWidget(): BindableScalarWidget { function _ensureBrightnessWidget(): BindableScalarWidget {
@@ -2085,6 +2085,8 @@
"targets.power_limit.per_led": "mA per LED (full white):", "targets.power_limit.per_led": "mA per LED (full white):",
"targets.protocol.ddp": "DDP (UDP)", "targets.protocol.ddp": "DDP (UDP)",
"targets.protocol.ddp.desc": "Fast raw UDP packets — recommended", "targets.protocol.ddp.desc": "Fast raw UDP packets — recommended",
"targets.protocol.udp": "WLED UDP (realtime)",
"targets.protocol.udp.desc": "WLED native realtime — RGBW whites + auto-revert if the stream drops",
"targets.protocol.http": "HTTP", "targets.protocol.http": "HTTP",
"targets.protocol.http.desc": "JSON API — slower, ≤500 LEDs", "targets.protocol.http.desc": "JSON API — slower, ≤500 LEDs",
"targets.protocol.serial": "Serial", "targets.protocol.serial": "Serial",
@@ -1945,6 +1945,8 @@
"targets.power_limit.per_led": "мА на светодиод (полный белый):", "targets.power_limit.per_led": "мА на светодиод (полный белый):",
"targets.protocol.ddp": "DDP (UDP)", "targets.protocol.ddp": "DDP (UDP)",
"targets.protocol.ddp.desc": "Быстрые UDP-пакеты — рекомендуется", "targets.protocol.ddp.desc": "Быстрые UDP-пакеты — рекомендуется",
"targets.protocol.udp": "WLED UDP (realtime)",
"targets.protocol.udp.desc": "Нативный realtime WLED — корректный RGBW и авто-возврат при обрыве потока",
"targets.protocol.http": "HTTP", "targets.protocol.http": "HTTP",
"targets.protocol.http.desc": "JSON API — медленнее, ≤500 LED", "targets.protocol.http.desc": "JSON API — медленнее, ≤500 LED",
"targets.protocol.serial": "Serial", "targets.protocol.serial": "Serial",
@@ -1941,6 +1941,8 @@
"targets.power_limit.per_led": "每颗 LED 电流(全白):", "targets.power_limit.per_led": "每颗 LED 电流(全白):",
"targets.protocol.ddp": "DDP (UDP)", "targets.protocol.ddp": "DDP (UDP)",
"targets.protocol.ddp.desc": "快速UDP数据包 - 推荐", "targets.protocol.ddp.desc": "快速UDP数据包 - 推荐",
"targets.protocol.udp": "WLED UDP(实时)",
"targets.protocol.udp.desc": "WLED 原生实时 — 正确的 RGBW 白色,断流时自动恢复",
"targets.protocol.http": "HTTP", "targets.protocol.http": "HTTP",
"targets.protocol.http.desc": "JSON API - 较慢,≤500 LED", "targets.protocol.http.desc": "JSON API - 较慢,≤500 LED",
"targets.protocol.serial": "串口", "targets.protocol.serial": "串口",
@@ -123,6 +123,7 @@
<small class="input-hint" style="display:none" data-i18n="targets.protocol.hint">DDP sends pixels via fast UDP (recommended). HTTP uses the JSON API — slower but reliable, limited to ~500 LEDs.</small> <small class="input-hint" style="display:none" data-i18n="targets.protocol.hint">DDP sends pixels via fast UDP (recommended). HTTP uses the JSON API — slower but reliable, limited to ~500 LEDs.</small>
<select id="target-editor-protocol"> <select id="target-editor-protocol">
<option value="ddp">DDP (UDP)</option> <option value="ddp">DDP (UDP)</option>
<option value="udp">WLED UDP (realtime)</option>
<option value="http">HTTP</option> <option value="http">HTTP</option>
</select> </select>
</div> </div>
+94
View File
@@ -0,0 +1,94 @@
"""Unit tests for the WLED native realtime UDP packet builder."""
import numpy as np
from ledgrab.core.devices.wled_realtime_client import (
DEFAULT_REALTIME_TIMEOUT,
WledRealtimeClient,
_clamp_timeout,
)
def _rgb(n: int) -> np.ndarray:
return np.arange(n * 3, dtype=np.uint8).reshape(n, 3)
def test_drgb_small_rgb_strip():
c = WledRealtimeClient("1.2.3.4", timeout_secs=2)
pixels = _rgb(10)
packets = c.build_packets(pixels)
assert len(packets) == 1
p = packets[0]
assert p[0] == 2 # DRGB
assert p[1] == 2 # timeout seconds
assert len(p) == 2 + 10 * 3
assert p[2:] == pixels.tobytes()
def test_drgbw_sets_explicit_white_zero():
c = WledRealtimeClient("1.2.3.4", rgbw=True, timeout_secs=5)
pixels = np.full((4, 3), 200, dtype=np.uint8)
packets = c.build_packets(pixels)
assert len(packets) == 1
p = packets[0]
assert p[0] == 3 # DRGBW
assert p[1] == 5
assert len(p) == 2 + 4 * 4
body = np.frombuffer(p[2:], dtype=np.uint8).reshape(4, 4)
assert (body[:, 0:3] == 200).all()
assert (body[:, 3] == 0).all() # white channel zeroed
def test_dnrgb_chunks_large_rgb_strip():
c = WledRealtimeClient("1.2.3.4", timeout_secs=3)
n = 1000 # > 490 -> DNRGB, > 489 per chunk -> 3 packets (489+489+22)
pixels = _rgb(n)
packets = c.build_packets(pixels)
assert len(packets) == 3
# Each packet starts with [4][timeout][start_hi][start_lo]
starts = []
total_leds = 0
for p in packets:
assert p[0] == 4 # DNRGB
assert p[1] == 3 # timeout
start = (p[2] << 8) | p[3]
starts.append(start)
leds = (len(p) - 4) // 3
total_leds += leds
assert starts == [0, 489, 978]
assert total_leds == n
def test_dnrgb_reassembles_to_original():
c = WledRealtimeClient("1.2.3.4", timeout_secs=1)
n = 700
pixels = _rgb(n)
out = bytearray()
for p in c.build_packets(pixels):
out += p[4:]
assert bytes(out) == pixels.tobytes()
def test_empty_frame_no_packets():
c = WledRealtimeClient("1.2.3.4")
assert c.build_packets(np.zeros((0, 3), dtype=np.uint8)) == []
def test_timeout_clamped_to_wire_range():
assert _clamp_timeout(0) == 1
assert _clamp_timeout(-5) == 1
assert _clamp_timeout(255) == 255
assert _clamp_timeout(1000) == 255
assert WledRealtimeClient("h", timeout_secs=0).timeout_secs == 1
def test_rgbw_over_capacity_falls_back_to_dnrgb():
# 400 RGBW LEDs (> 367) can't use DRGBW; falls back to DNRGB (RGB).
c = WledRealtimeClient("1.2.3.4", rgbw=True, timeout_secs=2)
packets = c.build_packets(_rgb(400))
assert all(p[0] == 4 for p in packets) # DNRGB
def test_default_timeout_constant():
assert DEFAULT_REALTIME_TIMEOUT == 2
assert WledRealtimeClient("h").timeout_secs == 2