39b0554444
A) Per-pixel smart-lights - LIFX multizone (SetExtendedColorZones msg 510, <=82 zones) + Tile (SetTileState64 715), auto-detected on connect with single-colour fallback; lifx_per_zone threaded like nanoleaf_per_panel - Hue gradient-lightstrip mapping: Entertainment v2 frame now keyed by channel id (was 1 light=1 LED), channels discovered on connect; hue_gradient_mode toggle (default on) B) Integrations bundle - Outbound webhook automation action (Discord/IFTTT/Zapier/Node-RED), SSRF-gated via validate_polling_url at both save and fire time; fires on activate/deactivate, best-effort, audited - Home Assistant MQTT auto-discovery: read-only binary_sensors per automation + connectivity, availability via birth/will, cleanup on disable/delete, live state from the engine Shared: pixel_reduce.resample_to_n nearest-neighbour helper. 57 new tests (lifx_multizone, hue_segment, webhook_action, ha_discovery). Gate: ruff + tsc + build clean, pytest 2719 passed / 2 skipped.
216 lines
7.9 KiB
Python
216 lines
7.9 KiB
Python
"""Tests for LIFX multizone (Z/Beam) + tile (Canvas) per-pixel streaming.
|
|
|
|
The device-side handshake (zone/tile auto-detection over UDP, firmware
|
|
fallback) needs real hardware to validate; here we lock down the parts that
|
|
DON'T: the exact packet framing for SetExtendedColorZones (510) and
|
|
SetTileState64 (715), the StateMultiZone / StateDeviceChain reply parsers,
|
|
the strip→element resample, the per-mode emit path, and the ``lifx_per_zone``
|
|
config round-trip through the device store.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import struct
|
|
|
|
import pytest
|
|
|
|
from ledgrab.core.devices.lifx_client import (
|
|
MSG_SET_COLOR,
|
|
MSG_SET_EXTENDED_COLOR_ZONES,
|
|
MSG_SET_TILE_STATE_64,
|
|
MSG_STATE_DEVICE_CHAIN,
|
|
MSG_STATE_MULTIZONE,
|
|
LIFXClient,
|
|
_build_packet,
|
|
_build_set_extended_color_zones_payload,
|
|
_build_set_tile_state64_payload,
|
|
_parse_multizone_reply,
|
|
_parse_state_device_chain,
|
|
rgb_to_hsbk,
|
|
)
|
|
from ledgrab.core.devices.pixel_reduce import resample_to_n
|
|
from ledgrab.storage.device_store import Device
|
|
|
|
_EXT_ZONE_MAX = 82
|
|
_TILE_PIXELS = 64
|
|
_TILE_STRUCT_SIZE = 55
|
|
_TILE_CHAIN_MAX = 16
|
|
|
|
|
|
class _FakeTransport:
|
|
"""Captures every datagram the client emits via ``_send``."""
|
|
|
|
def __init__(self) -> None:
|
|
self.sent: list[bytes] = []
|
|
|
|
def sendto(self, data: bytes) -> None:
|
|
self.sent.append(bytes(data))
|
|
|
|
|
|
def _client(mode: str = "single", *, zone_count: int = 0, tiles=None) -> LIFXClient:
|
|
c = LIFXClient("lifx://1.2.3.4", led_count=10, per_zone=True)
|
|
c._connected = True
|
|
c._transport = _FakeTransport() # type: ignore[assignment]
|
|
c._mode = mode
|
|
c._zone_count = zone_count
|
|
c._tiles = tiles or []
|
|
return c
|
|
|
|
|
|
def _msg_type(packet: bytes) -> int:
|
|
"""LIFX protocol-header message type lives at byte offset 32."""
|
|
return struct.unpack_from("<H", packet, 32)[0]
|
|
|
|
|
|
class TestResampleToN:
|
|
def test_nearest_neighbour_downsample(self):
|
|
pixels = [[10, 10, 10], [20, 20, 20], [30, 30, 30], [40, 40, 40]]
|
|
assert resample_to_n(pixels, 2) == [(10, 10, 10), (30, 30, 30)]
|
|
|
|
def test_empty_strip_is_black(self):
|
|
assert resample_to_n([], 3) == [(0, 0, 0), (0, 0, 0), (0, 0, 0)]
|
|
|
|
def test_upsample_repeats(self):
|
|
assert resample_to_n([[255, 0, 0]], 3) == [(255, 0, 0), (255, 0, 0), (255, 0, 0)]
|
|
|
|
def test_zero_n_is_empty(self):
|
|
assert resample_to_n([[1, 2, 3]], 0) == []
|
|
|
|
|
|
class TestExtendedColorZonesPayload:
|
|
def test_framing_is_byte_exact(self):
|
|
hsbk = [(100, 200, 300, 3500), (400, 500, 600, 3500)]
|
|
payload = _build_set_extended_color_zones_payload(hsbk, duration_ms=0, zone_index=0)
|
|
# header(8) + 82 fixed HSBK slots * 8 bytes
|
|
assert len(payload) == 8 + _EXT_ZONE_MAX * 8
|
|
duration, apply, zone_index, count = struct.unpack_from("<IBHB", payload, 0)
|
|
assert (duration, apply, zone_index, count) == (0, 1, 0, 2)
|
|
h, s, b, k = struct.unpack_from("<HHHH", payload, 8)
|
|
assert (h, s, b, k) == (100, 200, 300, 3500)
|
|
|
|
def test_unused_slots_are_zero_padded(self):
|
|
payload = _build_set_extended_color_zones_payload([(1, 2, 3, 4)])
|
|
# Third slot (index 2) onward must be zero.
|
|
assert payload[8 + 8 : 8 + _EXT_ZONE_MAX * 8] == b"\x00" * (8 * (_EXT_ZONE_MAX - 1))
|
|
|
|
def test_overflow_is_capped_at_82(self):
|
|
payload = _build_set_extended_color_zones_payload([(1, 1, 1, 1)] * 200)
|
|
_, _, _, count = struct.unpack_from("<IBHB", payload, 0)
|
|
assert count == _EXT_ZONE_MAX
|
|
assert len(payload) == 8 + _EXT_ZONE_MAX * 8
|
|
|
|
|
|
class TestTileState64Payload:
|
|
def test_framing_is_byte_exact(self):
|
|
hsbk = [(11, 22, 33, 3500)]
|
|
payload = _build_set_tile_state64_payload(hsbk, tile_index=2, x=0, y=0, width=8)
|
|
# header(10) + 64 fixed HSBK slots * 8 bytes
|
|
assert len(payload) == 10 + _TILE_PIXELS * 8
|
|
tile_index, length, reserved, x, y, width, duration = struct.unpack_from(
|
|
"<BBBBBBI", payload, 0
|
|
)
|
|
assert (tile_index, length, reserved, x, y, width, duration) == (2, 1, 0, 0, 0, 8, 0)
|
|
h, s, b, k = struct.unpack_from("<HHHH", payload, 10)
|
|
assert (h, s, b, k) == (11, 22, 33, 3500)
|
|
|
|
|
|
class TestMultizoneReplyParser:
|
|
def test_parses_state_multizone(self):
|
|
body = struct.pack("<BB", 16, 0) + b"\x00" * (8 * 8) # count=16, index=0, 8 HSBK
|
|
raw = _build_packet(msg_type=MSG_STATE_MULTIZONE, payload=body)
|
|
parsed = _parse_multizone_reply(raw)
|
|
assert parsed == {"count": 16, "index": 0}
|
|
|
|
def test_other_message_type_returns_none(self):
|
|
raw = _build_packet(msg_type=MSG_SET_COLOR, payload=b"\x00" * 16)
|
|
assert _parse_multizone_reply(raw) is None
|
|
|
|
|
|
class TestDeviceChainParser:
|
|
def _chain_packet(self, tile_specs: list[tuple[int, int]]) -> bytes:
|
|
payload = bytearray(1 + _TILE_STRUCT_SIZE * _TILE_CHAIN_MAX + 1)
|
|
payload[0] = 0 # start_index
|
|
for i, (w, h) in enumerate(tile_specs):
|
|
base = 1 + i * _TILE_STRUCT_SIZE
|
|
payload[base + 16] = w
|
|
payload[base + 17] = h
|
|
payload[1 + _TILE_STRUCT_SIZE * _TILE_CHAIN_MAX] = len(tile_specs) # tile count
|
|
return _build_packet(msg_type=MSG_STATE_DEVICE_CHAIN, payload=bytes(payload))
|
|
|
|
def test_parses_tile_widths_and_heights(self):
|
|
raw = self._chain_packet([(8, 8), (8, 8)])
|
|
parsed = _parse_state_device_chain(raw)
|
|
assert parsed == {"start_index": 0, "tiles": [(8, 8), (8, 8)]}
|
|
|
|
def test_short_packet_returns_none(self):
|
|
assert _parse_state_device_chain(b"\x00" * 40) is None
|
|
|
|
|
|
class TestEmitPixels:
|
|
def test_multizone_emits_one_extended_packet(self):
|
|
c = _client("multizone", zone_count=4)
|
|
c._emit_pixels([(255, 0, 0)] * 8, 255)
|
|
sent = c._transport.sent # type: ignore[attr-defined]
|
|
assert len(sent) == 1
|
|
assert _msg_type(sent[0]) == MSG_SET_EXTENDED_COLOR_ZONES
|
|
|
|
def test_tile_emits_one_packet_per_tile(self):
|
|
c = _client("tile", tiles=[(2, 2), (2, 2)])
|
|
c._emit_pixels([(0, 255, 0)] * 8, 255)
|
|
sent = c._transport.sent # type: ignore[attr-defined]
|
|
assert len(sent) == 2
|
|
assert all(_msg_type(p) == MSG_SET_TILE_STATE_64 for p in sent)
|
|
|
|
def test_single_mode_falls_back_to_set_color(self):
|
|
c = _client("single")
|
|
c._emit_pixels([(10, 20, 30)] * 4, 255)
|
|
sent = c._transport.sent # type: ignore[attr-defined]
|
|
assert len(sent) == 1
|
|
assert _msg_type(sent[0]) == MSG_SET_COLOR
|
|
|
|
def test_device_led_count_reflects_mode(self):
|
|
assert _client("multizone", zone_count=16).device_led_count == 16
|
|
assert _client("tile", tiles=[(8, 8), (8, 8)]).device_led_count == 128
|
|
assert _client("single").device_led_count == 10
|
|
|
|
|
|
class TestBrightnessScaling:
|
|
def test_brightness_scales_zone_colours(self):
|
|
c = _client("multizone", zone_count=1)
|
|
c._emit_pixels([(200, 100, 50)], 128)
|
|
payload = c._transport.sent[0][36:] # type: ignore[index]
|
|
h, s, b, k = struct.unpack_from("<HHHH", payload, 8)
|
|
scale = 128 / 255.0
|
|
exp_h, exp_s, exp_b, exp_k = rgb_to_hsbk(
|
|
int(200 * scale), int(100 * scale), int(50 * scale)
|
|
)
|
|
assert (h, s, b) == (exp_h, exp_s, exp_b)
|
|
|
|
|
|
class TestConfigRoundTrip:
|
|
def _device(self, per_zone: bool) -> Device:
|
|
return Device(
|
|
device_id="d1",
|
|
name="Beam",
|
|
url="lifx://1.2.3.4",
|
|
led_count=10,
|
|
device_type="lifx",
|
|
lifx_per_zone=per_zone,
|
|
)
|
|
|
|
def test_per_zone_round_trips_through_store(self):
|
|
d = self._device(True)
|
|
assert d.to_dict().get("lifx_per_zone") is True
|
|
back = Device.from_dict(d.to_dict())
|
|
assert back.lifx_per_zone is True
|
|
assert back.to_config().lifx_per_zone is True
|
|
|
|
def test_default_off_is_omitted(self):
|
|
d = self._device(False)
|
|
assert "lifx_per_zone" not in d.to_dict()
|
|
assert d.to_config().lifx_per_zone is False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|