Files
ledgrab/server/tests/test_wled_realtime.py
alexei.dolgolyov 7728aecb4f feat(wled): native realtime UDP output (DRGB/DRGBW/DNRGB) with auto-revert
Add WLED's native realtime UDP protocol (port 21324) as a third output mode for
LED targets, alongside DDP and HTTP. For the device LedGrab drives most, this
brings three user-visible wins DDP lacks:

- Auto-revert: every packet carries a timeout byte, so if the stream stops
  (host hiccup/sleep/crash) WLED returns to its preset instead of freezing on
  the last frame.
- Correct RGBW whites: the DRGBW variant carries an explicit white channel.
- Lighter on weak Wi-Fi: raw RGB with a 2-byte header.

New WledRealtimeClient auto-selects DRGB (<=490), DRGBW (<=367), or chunked
DNRGB (>490). WLED applies its own per-bus colour order in realtime mode, so we
send plain RGB and the user's colour-order config just works. Protocol 'udp' is
threaded through WLEDConfig/provider/processor and the schema pattern; the
target editor gains a protocol option + badge + i18n (en/ru/zh).

8 unit tests for the packet builder; full suite green (1919 passed).
2026-06-04 23:34:26 +03:00

95 lines
2.8 KiB
Python

"""Unit tests for the WLED native realtime UDP packet builder."""
import numpy as np
from ledgrab.core.devices.wled_realtime_client import (
DEFAULT_REALTIME_TIMEOUT,
WledRealtimeClient,
_clamp_timeout,
)
def _rgb(n: int) -> np.ndarray:
return np.arange(n * 3, dtype=np.uint8).reshape(n, 3)
def test_drgb_small_rgb_strip():
c = WledRealtimeClient("1.2.3.4", timeout_secs=2)
pixels = _rgb(10)
packets = c.build_packets(pixels)
assert len(packets) == 1
p = packets[0]
assert p[0] == 2 # DRGB
assert p[1] == 2 # timeout seconds
assert len(p) == 2 + 10 * 3
assert p[2:] == pixels.tobytes()
def test_drgbw_sets_explicit_white_zero():
c = WledRealtimeClient("1.2.3.4", rgbw=True, timeout_secs=5)
pixels = np.full((4, 3), 200, dtype=np.uint8)
packets = c.build_packets(pixels)
assert len(packets) == 1
p = packets[0]
assert p[0] == 3 # DRGBW
assert p[1] == 5
assert len(p) == 2 + 4 * 4
body = np.frombuffer(p[2:], dtype=np.uint8).reshape(4, 4)
assert (body[:, 0:3] == 200).all()
assert (body[:, 3] == 0).all() # white channel zeroed
def test_dnrgb_chunks_large_rgb_strip():
c = WledRealtimeClient("1.2.3.4", timeout_secs=3)
n = 1000 # > 490 -> DNRGB, > 489 per chunk -> 3 packets (489+489+22)
pixels = _rgb(n)
packets = c.build_packets(pixels)
assert len(packets) == 3
# Each packet starts with [4][timeout][start_hi][start_lo]
starts = []
total_leds = 0
for p in packets:
assert p[0] == 4 # DNRGB
assert p[1] == 3 # timeout
start = (p[2] << 8) | p[3]
starts.append(start)
leds = (len(p) - 4) // 3
total_leds += leds
assert starts == [0, 489, 978]
assert total_leds == n
def test_dnrgb_reassembles_to_original():
c = WledRealtimeClient("1.2.3.4", timeout_secs=1)
n = 700
pixels = _rgb(n)
out = bytearray()
for p in c.build_packets(pixels):
out += p[4:]
assert bytes(out) == pixels.tobytes()
def test_empty_frame_no_packets():
c = WledRealtimeClient("1.2.3.4")
assert c.build_packets(np.zeros((0, 3), dtype=np.uint8)) == []
def test_timeout_clamped_to_wire_range():
assert _clamp_timeout(0) == 1
assert _clamp_timeout(-5) == 1
assert _clamp_timeout(255) == 255
assert _clamp_timeout(1000) == 255
assert WledRealtimeClient("h", timeout_secs=0).timeout_secs == 1
def test_rgbw_over_capacity_falls_back_to_dnrgb():
# 400 RGBW LEDs (> 367) can't use DRGBW; falls back to DNRGB (RGB).
c = WledRealtimeClient("1.2.3.4", rgbw=True, timeout_secs=2)
packets = c.build_packets(_rgb(400))
assert all(p[0] == 4 for p in packets) # DNRGB
def test_default_timeout_constant():
assert DEFAULT_REALTIME_TIMEOUT == 2
assert WledRealtimeClient("h").timeout_secs == 2