Files
ledgrab/server/tests/test_adalight_client.py
T
alexei.dolgolyov 6745e25b20 feat: roadmap batch (2026-06-19) — solar/linear-light/dither/nanoleaf + integrations
Eight roadmap features from the 2026-06-19 review, each a full vertical
(backend + tests + frontend + i18n en/ru/zh); ~67 new unit tests:

- automations: SolarRule sunrise/sunset trigger (new utils/solar.py, shared
  with the daylight cycle; window logic mirrors TimeOfDayRule)
- ci: best-effort arm64 multi-arch Docker manifest via QEMU + docker manifest
  (release.yml; amd64 path untouched, continue-on-error)
- game-integration: wire the orphaned LoLPoller via a LoLPollManager + a shared
  runtime_state module (poll lifecycle on enable/CRUD/startup/shutdown)
- ui: color-harmony gradient generator (complementary/analogous/triadic/...)
- effects: audio-reactive palette modulation (new audio_energy_tap; brightness/
  saturation modulation across all 12 procedural effects)
- capture: linear-light blending + spatio-temporal dithering, opt-in per
  calibration (new utils/linear_light.py, utils/dither.py)
- devices: Nanoleaf extControl v2 per-panel UDP streaming (per_panel mode)

Also bundles the pending 2026-06-18 production-review fixes and other
in-progress work already in the working tree (manual-trigger rule, etc.),
since they share files and could not be cleanly separated.

Gate: ruff + tsc clean; pytest 2654 passed / 2 skipped. The single failing
test (automation manual_trigger handler coverage) is a separate in-progress
item owned elsewhere, intentionally left as-is.
2026-06-22 23:21:24 +03:00

174 lines
5.8 KiB
Python

"""Tests for AdalightClient frame construction (the LED hot path)."""
import numpy as np
import pytest
from ledgrab.core.devices.adalight_client import (
AdalightClient,
_build_adalight_header,
)
def _make_client(led_count: int = 10) -> AdalightClient:
"""Build a client without opening the serial port."""
return AdalightClient(url="COM99", led_count=led_count)
def test_adalight_header_format():
# Adalight protocol: 'A' 'd' 'a' <count_hi> <count_lo> <checksum>
# where count = led_count - 1, checksum = hi ^ lo ^ 0x55
header = _build_adalight_header(10)
count = 9
hi = (count >> 8) & 0xFF
lo = count & 0xFF
expected_checksum = hi ^ lo ^ 0x55
assert header == bytes([ord("A"), ord("d"), ord("a"), hi, lo, expected_checksum])
def test_build_frame_uint8_fast_path():
client = _make_client(led_count=3)
pixels = np.array(
[[10, 20, 30], [40, 50, 60], [70, 80, 90]],
dtype=np.uint8,
)
frame = client._build_frame(pixels, brightness=255)
# Header (6 bytes) + RGB (9 bytes)
assert len(frame) == 6 + 9
assert bytes(frame[:6]) == client._header
assert bytes(frame[6:]) == bytes([10, 20, 30, 40, 50, 60, 70, 80, 90])
def test_build_frame_reuses_buffer_across_calls():
client = _make_client(led_count=3)
pixels = np.array([[1, 2, 3]] * 3, dtype=np.uint8)
frame1 = client._build_frame(pixels, brightness=255)
buf_id_1 = id(client._frame_buf)
frame2 = client._build_frame(pixels, brightness=255)
buf_id_2 = id(client._frame_buf)
# Same bytearray reused — frame returned IS the internal buffer
assert buf_id_1 == buf_id_2
assert frame1 is frame2
# Header preserved at front, payload identical
assert bytes(frame2[:6]) == client._header
def test_build_frame_handles_non_contiguous_input():
"""Slicing a wider array yields a non-contiguous view; payload must
still serialise to the same bytes as a contiguous copy."""
client = _make_client(led_count=4)
wide = np.arange(48, dtype=np.uint8).reshape(4, 4, 3)
pixels = wide[:, 1, :] # (4, 3) view, possibly non-contiguous
frame = client._build_frame(pixels, brightness=255)
expected = np.ascontiguousarray(pixels).tobytes()
assert bytes(frame[6:]) == expected
def test_build_frame_clamps_wider_int_dtypes():
"""Wider integer inputs (e.g. uint16 from legacy code) clamp to [0, 255]
before narrowing, matching historical behaviour."""
client = _make_client(led_count=2)
pixels = np.array([[100, 200, 300], [400, 50, 25]], dtype=np.uint16)
frame = client._build_frame(pixels, brightness=255)
# Values >255 clamp to 255, values <=255 pass through.
assert bytes(frame[6:]) == bytes([100, 200, 255, 255, 50, 25])
def test_build_frame_accepts_list_of_tuples():
"""Legacy callers pass list[tuple]; should produce same output."""
client = _make_client(led_count=2)
pixels = [(10, 20, 30), (40, 50, 60)]
frame = client._build_frame(pixels, brightness=255)
assert bytes(frame[6:]) == bytes([10, 20, 30, 40, 50, 60])
def test_build_frame_resizes_buffer_on_led_count_change():
"""If led count changes between calls, the buffer is reallocated."""
client = _make_client(led_count=5)
small = np.zeros((3, 3), dtype=np.uint8)
big = np.zeros((100, 3), dtype=np.uint8)
client._build_frame(small, brightness=255)
small_len = len(client._frame_buf)
client._build_frame(big, brightness=255)
big_len = len(client._frame_buf)
# 6-byte header + 3-byte/LED RGB
assert small_len == 6 + 3 * 3
assert big_len == 6 + 100 * 3
def test_build_frame_no_uint16_round_trip():
"""The hot path must NOT promote uint8 → uint16 → uint8.
Asserts the scratch buffer (used only for non-uint8 input) is never
allocated when the input is already uint8.
"""
client = _make_client(led_count=3)
pixels = np.array([[1, 2, 3]] * 3, dtype=np.uint8)
assert client._u8_scratch is None
client._build_frame(pixels, brightness=255)
# Scratch never touched on the fast path.
assert client._u8_scratch is None
@pytest.mark.parametrize("led_count", [1, 50, 300, 1000])
def test_build_frame_total_length(led_count):
client = _make_client(led_count=led_count)
pixels = np.zeros((led_count, 3), dtype=np.uint8)
frame = client._build_frame(pixels, brightness=255)
assert len(frame) == 6 + led_count * 3
async def test_close_settles_before_port_close(monkeypatch):
"""close() must let the board paint the black frame before resetting it.
The black frame has to be written AND given settle time before
``serial.close()`` toggles DTR (Arduino auto-reset). If the reset wins the
race the strip latches its last lit frame and "stays on". This guards the
ordering: write → flush → sleep(settle) → close.
"""
import concurrent.futures
from unittest.mock import MagicMock
from ledgrab.core.devices import adalight_client as mod
client = _make_client(led_count=3)
events: list[str] = []
serial = MagicMock()
serial.is_open = True
serial.write.side_effect = lambda *_a, **_k: events.append("write")
serial.flush.side_effect = lambda *_a, **_k: events.append("flush")
serial.close.side_effect = lambda *_a, **_k: events.append("close")
client._serial = serial
client._connected = True
client._tx_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
async def fake_sleep(_seconds):
events.append(f"sleep:{_seconds}")
monkeypatch.setattr(mod.asyncio, "sleep", fake_sleep)
await client.close()
# Black frame is written and flushed, the board is given settle time, and
# ONLY THEN is the port closed (which resets the board).
assert events == ["write", "flush", f"sleep:{mod.BLACK_FRAME_SETTLE_DELAY}", "close"]
assert mod.BLACK_FRAME_SETTLE_DELAY > 0