Files
ledgrab/server/tests/test_lifx_multizone.py
T
alexei.dolgolyov 39b0554444 feat: roadmap round two (2026-06-23) — per-pixel smart-lights + integrations
A) Per-pixel smart-lights
- LIFX multizone (SetExtendedColorZones msg 510, <=82 zones) + Tile
  (SetTileState64 715), auto-detected on connect with single-colour
  fallback; lifx_per_zone threaded like nanoleaf_per_panel
- Hue gradient-lightstrip mapping: Entertainment v2 frame now keyed by
  channel id (was 1 light=1 LED), channels discovered on connect;
  hue_gradient_mode toggle (default on)

B) Integrations bundle
- Outbound webhook automation action (Discord/IFTTT/Zapier/Node-RED),
  SSRF-gated via validate_polling_url at both save and fire time; fires
  on activate/deactivate, best-effort, audited
- Home Assistant MQTT auto-discovery: read-only binary_sensors per
  automation + connectivity, availability via birth/will, cleanup on
  disable/delete, live state from the engine

Shared: pixel_reduce.resample_to_n nearest-neighbour helper.
57 new tests (lifx_multizone, hue_segment, webhook_action, ha_discovery).
Gate: ruff + tsc + build clean, pytest 2719 passed / 2 skipped.
2026-06-23 00:50:22 +03:00

216 lines
7.9 KiB
Python

"""Tests for LIFX multizone (Z/Beam) + tile (Canvas) per-pixel streaming.
The device-side handshake (zone/tile auto-detection over UDP, firmware
fallback) needs real hardware to validate; here we lock down the parts that
DON'T: the exact packet framing for SetExtendedColorZones (510) and
SetTileState64 (715), the StateMultiZone / StateDeviceChain reply parsers,
the strip→element resample, the per-mode emit path, and the ``lifx_per_zone``
config round-trip through the device store.
"""
from __future__ import annotations
import struct
import pytest
from ledgrab.core.devices.lifx_client import (
MSG_SET_COLOR,
MSG_SET_EXTENDED_COLOR_ZONES,
MSG_SET_TILE_STATE_64,
MSG_STATE_DEVICE_CHAIN,
MSG_STATE_MULTIZONE,
LIFXClient,
_build_packet,
_build_set_extended_color_zones_payload,
_build_set_tile_state64_payload,
_parse_multizone_reply,
_parse_state_device_chain,
rgb_to_hsbk,
)
from ledgrab.core.devices.pixel_reduce import resample_to_n
from ledgrab.storage.device_store import Device
_EXT_ZONE_MAX = 82
_TILE_PIXELS = 64
_TILE_STRUCT_SIZE = 55
_TILE_CHAIN_MAX = 16
class _FakeTransport:
"""Captures every datagram the client emits via ``_send``."""
def __init__(self) -> None:
self.sent: list[bytes] = []
def sendto(self, data: bytes) -> None:
self.sent.append(bytes(data))
def _client(mode: str = "single", *, zone_count: int = 0, tiles=None) -> LIFXClient:
c = LIFXClient("lifx://1.2.3.4", led_count=10, per_zone=True)
c._connected = True
c._transport = _FakeTransport() # type: ignore[assignment]
c._mode = mode
c._zone_count = zone_count
c._tiles = tiles or []
return c
def _msg_type(packet: bytes) -> int:
"""LIFX protocol-header message type lives at byte offset 32."""
return struct.unpack_from("<H", packet, 32)[0]
class TestResampleToN:
def test_nearest_neighbour_downsample(self):
pixels = [[10, 10, 10], [20, 20, 20], [30, 30, 30], [40, 40, 40]]
assert resample_to_n(pixels, 2) == [(10, 10, 10), (30, 30, 30)]
def test_empty_strip_is_black(self):
assert resample_to_n([], 3) == [(0, 0, 0), (0, 0, 0), (0, 0, 0)]
def test_upsample_repeats(self):
assert resample_to_n([[255, 0, 0]], 3) == [(255, 0, 0), (255, 0, 0), (255, 0, 0)]
def test_zero_n_is_empty(self):
assert resample_to_n([[1, 2, 3]], 0) == []
class TestExtendedColorZonesPayload:
def test_framing_is_byte_exact(self):
hsbk = [(100, 200, 300, 3500), (400, 500, 600, 3500)]
payload = _build_set_extended_color_zones_payload(hsbk, duration_ms=0, zone_index=0)
# header(8) + 82 fixed HSBK slots * 8 bytes
assert len(payload) == 8 + _EXT_ZONE_MAX * 8
duration, apply, zone_index, count = struct.unpack_from("<IBHB", payload, 0)
assert (duration, apply, zone_index, count) == (0, 1, 0, 2)
h, s, b, k = struct.unpack_from("<HHHH", payload, 8)
assert (h, s, b, k) == (100, 200, 300, 3500)
def test_unused_slots_are_zero_padded(self):
payload = _build_set_extended_color_zones_payload([(1, 2, 3, 4)])
# Third slot (index 2) onward must be zero.
assert payload[8 + 8 : 8 + _EXT_ZONE_MAX * 8] == b"\x00" * (8 * (_EXT_ZONE_MAX - 1))
def test_overflow_is_capped_at_82(self):
payload = _build_set_extended_color_zones_payload([(1, 1, 1, 1)] * 200)
_, _, _, count = struct.unpack_from("<IBHB", payload, 0)
assert count == _EXT_ZONE_MAX
assert len(payload) == 8 + _EXT_ZONE_MAX * 8
class TestTileState64Payload:
def test_framing_is_byte_exact(self):
hsbk = [(11, 22, 33, 3500)]
payload = _build_set_tile_state64_payload(hsbk, tile_index=2, x=0, y=0, width=8)
# header(10) + 64 fixed HSBK slots * 8 bytes
assert len(payload) == 10 + _TILE_PIXELS * 8
tile_index, length, reserved, x, y, width, duration = struct.unpack_from(
"<BBBBBBI", payload, 0
)
assert (tile_index, length, reserved, x, y, width, duration) == (2, 1, 0, 0, 0, 8, 0)
h, s, b, k = struct.unpack_from("<HHHH", payload, 10)
assert (h, s, b, k) == (11, 22, 33, 3500)
class TestMultizoneReplyParser:
def test_parses_state_multizone(self):
body = struct.pack("<BB", 16, 0) + b"\x00" * (8 * 8) # count=16, index=0, 8 HSBK
raw = _build_packet(msg_type=MSG_STATE_MULTIZONE, payload=body)
parsed = _parse_multizone_reply(raw)
assert parsed == {"count": 16, "index": 0}
def test_other_message_type_returns_none(self):
raw = _build_packet(msg_type=MSG_SET_COLOR, payload=b"\x00" * 16)
assert _parse_multizone_reply(raw) is None
class TestDeviceChainParser:
def _chain_packet(self, tile_specs: list[tuple[int, int]]) -> bytes:
payload = bytearray(1 + _TILE_STRUCT_SIZE * _TILE_CHAIN_MAX + 1)
payload[0] = 0 # start_index
for i, (w, h) in enumerate(tile_specs):
base = 1 + i * _TILE_STRUCT_SIZE
payload[base + 16] = w
payload[base + 17] = h
payload[1 + _TILE_STRUCT_SIZE * _TILE_CHAIN_MAX] = len(tile_specs) # tile count
return _build_packet(msg_type=MSG_STATE_DEVICE_CHAIN, payload=bytes(payload))
def test_parses_tile_widths_and_heights(self):
raw = self._chain_packet([(8, 8), (8, 8)])
parsed = _parse_state_device_chain(raw)
assert parsed == {"start_index": 0, "tiles": [(8, 8), (8, 8)]}
def test_short_packet_returns_none(self):
assert _parse_state_device_chain(b"\x00" * 40) is None
class TestEmitPixels:
def test_multizone_emits_one_extended_packet(self):
c = _client("multizone", zone_count=4)
c._emit_pixels([(255, 0, 0)] * 8, 255)
sent = c._transport.sent # type: ignore[attr-defined]
assert len(sent) == 1
assert _msg_type(sent[0]) == MSG_SET_EXTENDED_COLOR_ZONES
def test_tile_emits_one_packet_per_tile(self):
c = _client("tile", tiles=[(2, 2), (2, 2)])
c._emit_pixels([(0, 255, 0)] * 8, 255)
sent = c._transport.sent # type: ignore[attr-defined]
assert len(sent) == 2
assert all(_msg_type(p) == MSG_SET_TILE_STATE_64 for p in sent)
def test_single_mode_falls_back_to_set_color(self):
c = _client("single")
c._emit_pixels([(10, 20, 30)] * 4, 255)
sent = c._transport.sent # type: ignore[attr-defined]
assert len(sent) == 1
assert _msg_type(sent[0]) == MSG_SET_COLOR
def test_device_led_count_reflects_mode(self):
assert _client("multizone", zone_count=16).device_led_count == 16
assert _client("tile", tiles=[(8, 8), (8, 8)]).device_led_count == 128
assert _client("single").device_led_count == 10
class TestBrightnessScaling:
def test_brightness_scales_zone_colours(self):
c = _client("multizone", zone_count=1)
c._emit_pixels([(200, 100, 50)], 128)
payload = c._transport.sent[0][36:] # type: ignore[index]
h, s, b, k = struct.unpack_from("<HHHH", payload, 8)
scale = 128 / 255.0
exp_h, exp_s, exp_b, exp_k = rgb_to_hsbk(
int(200 * scale), int(100 * scale), int(50 * scale)
)
assert (h, s, b) == (exp_h, exp_s, exp_b)
class TestConfigRoundTrip:
def _device(self, per_zone: bool) -> Device:
return Device(
device_id="d1",
name="Beam",
url="lifx://1.2.3.4",
led_count=10,
device_type="lifx",
lifx_per_zone=per_zone,
)
def test_per_zone_round_trips_through_store(self):
d = self._device(True)
assert d.to_dict().get("lifx_per_zone") is True
back = Device.from_dict(d.to_dict())
assert back.lifx_per_zone is True
assert back.to_config().lifx_per_zone is True
def test_default_off_is_omitted(self):
d = self._device(False)
assert "lifx_per_zone" not in d.to_dict()
assert d.to_config().lifx_per_zone is False
if __name__ == "__main__":
pytest.main([__file__, "-v"])