"""Unit tests for the WLED native realtime UDP packet builder.""" import numpy as np from ledgrab.core.devices.wled_realtime_client import ( DEFAULT_REALTIME_TIMEOUT, WledRealtimeClient, _clamp_timeout, ) def _rgb(n: int) -> np.ndarray: return np.arange(n * 3, dtype=np.uint8).reshape(n, 3) def test_drgb_small_rgb_strip(): c = WledRealtimeClient("1.2.3.4", timeout_secs=2) pixels = _rgb(10) packets = c.build_packets(pixels) assert len(packets) == 1 p = packets[0] assert p[0] == 2 # DRGB assert p[1] == 2 # timeout seconds assert len(p) == 2 + 10 * 3 assert p[2:] == pixels.tobytes() def test_drgbw_sets_explicit_white_zero(): c = WledRealtimeClient("1.2.3.4", rgbw=True, timeout_secs=5) pixels = np.full((4, 3), 200, dtype=np.uint8) packets = c.build_packets(pixels) assert len(packets) == 1 p = packets[0] assert p[0] == 3 # DRGBW assert p[1] == 5 assert len(p) == 2 + 4 * 4 body = np.frombuffer(p[2:], dtype=np.uint8).reshape(4, 4) assert (body[:, 0:3] == 200).all() assert (body[:, 3] == 0).all() # white channel zeroed def test_dnrgb_chunks_large_rgb_strip(): c = WledRealtimeClient("1.2.3.4", timeout_secs=3) n = 1000 # > 490 -> DNRGB, > 489 per chunk -> 3 packets (489+489+22) pixels = _rgb(n) packets = c.build_packets(pixels) assert len(packets) == 3 # Each packet starts with [4][timeout][start_hi][start_lo] starts = [] total_leds = 0 for p in packets: assert p[0] == 4 # DNRGB assert p[1] == 3 # timeout start = (p[2] << 8) | p[3] starts.append(start) leds = (len(p) - 4) // 3 total_leds += leds assert starts == [0, 489, 978] assert total_leds == n def test_dnrgb_reassembles_to_original(): c = WledRealtimeClient("1.2.3.4", timeout_secs=1) n = 700 pixels = _rgb(n) out = bytearray() for p in c.build_packets(pixels): out += p[4:] assert bytes(out) == pixels.tobytes() def test_empty_frame_no_packets(): c = WledRealtimeClient("1.2.3.4") assert c.build_packets(np.zeros((0, 3), dtype=np.uint8)) == [] def test_timeout_clamped_to_wire_range(): assert _clamp_timeout(0) == 1 assert _clamp_timeout(-5) == 1 assert _clamp_timeout(255) == 255 assert _clamp_timeout(1000) == 255 assert WledRealtimeClient("h", timeout_secs=0).timeout_secs == 1 def test_rgbw_over_capacity_falls_back_to_dnrgb(): # 400 RGBW LEDs (> 367) can't use DRGBW; falls back to DNRGB (RGB). c = WledRealtimeClient("1.2.3.4", rgbw=True, timeout_secs=2) packets = c.build_packets(_rgb(400)) assert all(p[0] == 4 for p in packets) # DNRGB def test_default_timeout_constant(): assert DEFAULT_REALTIME_TIMEOUT == 2 assert WledRealtimeClient("h").timeout_secs == 2