"""Tests for AdalightClient frame construction (the LED hot path).""" import numpy as np import pytest from ledgrab.core.devices.adalight_client import ( AdalightClient, _build_adalight_header, ) def _make_client(led_count: int = 10) -> AdalightClient: """Build a client without opening the serial port.""" return AdalightClient(url="COM99", led_count=led_count) def test_adalight_header_format(): # Adalight protocol: 'A' 'd' 'a' # where count = led_count - 1, checksum = hi ^ lo ^ 0x55 header = _build_adalight_header(10) count = 9 hi = (count >> 8) & 0xFF lo = count & 0xFF expected_checksum = hi ^ lo ^ 0x55 assert header == bytes([ord("A"), ord("d"), ord("a"), hi, lo, expected_checksum]) def test_build_frame_uint8_fast_path(): client = _make_client(led_count=3) pixels = np.array( [[10, 20, 30], [40, 50, 60], [70, 80, 90]], dtype=np.uint8, ) frame = client._build_frame(pixels, brightness=255) # Header (6 bytes) + RGB (9 bytes) assert len(frame) == 6 + 9 assert bytes(frame[:6]) == client._header assert bytes(frame[6:]) == bytes([10, 20, 30, 40, 50, 60, 70, 80, 90]) def test_build_frame_reuses_buffer_across_calls(): client = _make_client(led_count=3) pixels = np.array([[1, 2, 3]] * 3, dtype=np.uint8) frame1 = client._build_frame(pixels, brightness=255) buf_id_1 = id(client._frame_buf) frame2 = client._build_frame(pixels, brightness=255) buf_id_2 = id(client._frame_buf) # Same bytearray reused — frame returned IS the internal buffer assert buf_id_1 == buf_id_2 assert frame1 is frame2 # Header preserved at front, payload identical assert bytes(frame2[:6]) == client._header def test_build_frame_handles_non_contiguous_input(): """Slicing a wider array yields a non-contiguous view; payload must still serialise to the same bytes as a contiguous copy.""" client = _make_client(led_count=4) wide = np.arange(48, dtype=np.uint8).reshape(4, 4, 3) pixels = wide[:, 1, :] # (4, 3) view, possibly non-contiguous frame = client._build_frame(pixels, brightness=255) expected = np.ascontiguousarray(pixels).tobytes() assert bytes(frame[6:]) == expected def test_build_frame_clamps_wider_int_dtypes(): """Wider integer inputs (e.g. uint16 from legacy code) clamp to [0, 255] before narrowing, matching historical behaviour.""" client = _make_client(led_count=2) pixels = np.array([[100, 200, 300], [400, 50, 25]], dtype=np.uint16) frame = client._build_frame(pixels, brightness=255) # Values >255 clamp to 255, values <=255 pass through. assert bytes(frame[6:]) == bytes([100, 200, 255, 255, 50, 25]) def test_build_frame_accepts_list_of_tuples(): """Legacy callers pass list[tuple]; should produce same output.""" client = _make_client(led_count=2) pixels = [(10, 20, 30), (40, 50, 60)] frame = client._build_frame(pixels, brightness=255) assert bytes(frame[6:]) == bytes([10, 20, 30, 40, 50, 60]) def test_build_frame_resizes_buffer_on_led_count_change(): """If led count changes between calls, the buffer is reallocated.""" client = _make_client(led_count=5) small = np.zeros((3, 3), dtype=np.uint8) big = np.zeros((100, 3), dtype=np.uint8) client._build_frame(small, brightness=255) small_len = len(client._frame_buf) client._build_frame(big, brightness=255) big_len = len(client._frame_buf) # 6-byte header + 3-byte/LED RGB assert small_len == 6 + 3 * 3 assert big_len == 6 + 100 * 3 def test_build_frame_no_uint16_round_trip(): """The hot path must NOT promote uint8 → uint16 → uint8. Asserts the scratch buffer (used only for non-uint8 input) is never allocated when the input is already uint8. """ client = _make_client(led_count=3) pixels = np.array([[1, 2, 3]] * 3, dtype=np.uint8) assert client._u8_scratch is None client._build_frame(pixels, brightness=255) # Scratch never touched on the fast path. assert client._u8_scratch is None @pytest.mark.parametrize("led_count", [1, 50, 300, 1000]) def test_build_frame_total_length(led_count): client = _make_client(led_count=led_count) pixels = np.zeros((led_count, 3), dtype=np.uint8) frame = client._build_frame(pixels, brightness=255) assert len(frame) == 6 + led_count * 3 async def test_close_settles_before_port_close(monkeypatch): """close() must let the board paint the black frame before resetting it. The black frame has to be written AND given settle time before ``serial.close()`` toggles DTR (Arduino auto-reset). If the reset wins the race the strip latches its last lit frame and "stays on". This guards the ordering: write → flush → sleep(settle) → close. """ import concurrent.futures from unittest.mock import MagicMock from ledgrab.core.devices import adalight_client as mod client = _make_client(led_count=3) events: list[str] = [] serial = MagicMock() serial.is_open = True serial.write.side_effect = lambda *_a, **_k: events.append("write") serial.flush.side_effect = lambda *_a, **_k: events.append("flush") serial.close.side_effect = lambda *_a, **_k: events.append("close") client._serial = serial client._connected = True client._tx_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) async def fake_sleep(_seconds): events.append(f"sleep:{_seconds}") monkeypatch.setattr(mod.asyncio, "sleep", fake_sleep) await client.close() # Black frame is written and flushed, the board is given settle time, and # ONLY THEN is the port closed (which resets the board). assert events == ["write", "flush", f"sleep:{mod.BLACK_FRAME_SETTLE_DELAY}", "close"] assert mod.BLACK_FRAME_SETTLE_DELAY > 0