Files
ledgrab/server/tests/test_lifx.py
T
alexei.dolgolyov 8f9d490063 feat(devices): LIFX LAN target type
Adds support for LIFX smart bulbs and lightstrips that speak the LIFX
binary UDP protocol on port 56700, with broadcast LAN discovery via the
standard GetService/StateService probe.

Backend:
- LIFXClient is a single-pixel UDP adapter: averages the strip to one
  RGB triple, converts to LIFX HSBK (16-bit hue/saturation/brightness +
  kelvin), and pushes a tagged SetColor packet so all bulbs on the
  subnet act on it. Brightness folds into the HSBK brightness channel.
- Hand-rolled packet builder: 36-byte LIFX header (frame +
  frame-address + protocol-header) + variable-length payload. Source
  ID 'LGGR' identifies LedGrab in protocol logs.
- supports_fast_send=True with a synchronous send_pixels_fast hot path
  -- UDP costs nothing, so the default rate gate is 50 ms (~20 Hz) to
  match LIFX's documented <=20 cmd/sec recommendation.
- Broadcast discovery sends GetService and parses StateService replies
  back into IP + MAC + service-port triples. Broadcast failures yield
  [] rather than raising.
- Health check sends GetService and waits 1.5s for any reply on a
  one-shot UDP socket.
- LIFXConfig joins the typed config union; Device storage gains a
  lifx_min_interval_ms field; full to_dict/from_dict/to_config wiring.
- 47 unit tests cover URL parsing, RGB->HSBK conversion (red/green/
  blue/white/black/clamping), packet construction (size, msg type,
  tagged flag, target MAC, sequence byte), SetColor and SetPower
  payload layouts, StateService reply parsing (including rejection
  of wrong msg types and runt payloads), strip averaging, rate
  limiting, fast-send hot path, provider validate/discover/health,
  and Device.to_config round-trip.

Frontend:
- 'lifx' in DEVICE_TYPE_KEYS (next to 'wiz'), lightbulb icon
  (deliberate smart-bulb family grouping with Hue + Yeelight + WiZ).
- isLifxDevice predicate + per-type field show/hide in create and
  settings modals.
- Rate-limit number input (default 50 ms) in both modals with hint
  text referencing LIFX's documented <=20 cmd/sec ceiling.
- Locale strings in en/ru/zh.

LIFX bulbs are reachable from the existing "Scan network" button -- no
new discovery UI affordance was needed. No brightness_control capability
exposed; LIFX brightness is folded into the HSBK on the wire.
2026-05-16 02:30:30 +03:00

480 lines
14 KiB
Python

"""Tests for the LIFX LAN LED client + provider."""
from __future__ import annotations
import struct
from unittest.mock import MagicMock
import numpy as np
import pytest
from ledgrab.core.devices.device_config import LIFXConfig
from ledgrab.core.devices.led_client import ProviderDeps
from ledgrab.core.devices.lifx_client import (
LIFX_PORT,
MSG_GET_SERVICE,
MSG_SET_COLOR,
MSG_SET_POWER,
MSG_STATE_SERVICE,
LIFXClient,
_average_color,
_build_packet,
_build_set_color_payload,
_build_set_power_payload,
_parse_state_service_reply,
parse_lifx_url,
rgb_to_hsbk,
)
from ledgrab.core.devices.lifx_provider import LIFXDeviceProvider
# ============================================================================
# URL parsing
# ============================================================================
@pytest.mark.parametrize(
"url,expected",
[
("lifx://192.168.1.50", ("192.168.1.50", LIFX_PORT)),
("lifx://192.168.1.50:56700", ("192.168.1.50", 56700)),
("192.168.1.50", ("192.168.1.50", LIFX_PORT)),
("192.168.1.50:56700", ("192.168.1.50", 56700)),
("bulb.local", ("bulb.local", LIFX_PORT)),
],
)
def test_parse_lifx_url(url, expected):
assert parse_lifx_url(url) == expected
@pytest.mark.parametrize("url", ["", " ", "lifx://", "://192.168.1.1"])
def test_parse_lifx_url_rejects_empty(url):
with pytest.raises(ValueError):
parse_lifx_url(url)
# ============================================================================
# RGB → HSBK conversion
# ============================================================================
def test_rgb_to_hsbk_pure_red():
h, s, b, k = rgb_to_hsbk(255, 0, 0)
assert h == 0
assert s == 65535
assert b == 65535
assert 2500 <= k <= 9000
def test_rgb_to_hsbk_pure_green():
h, s, b, k = rgb_to_hsbk(0, 255, 0)
# Green is at 120° → hue = 120/360 * 65535 ≈ 21845
assert abs(h - 21845) < 5
assert s == 65535
assert b == 65535
def test_rgb_to_hsbk_pure_blue():
h, s, b, _k = rgb_to_hsbk(0, 0, 255)
assert abs(h - 43690) < 5
assert s == 65535
assert b == 65535
def test_rgb_to_hsbk_black_is_zero_brightness():
h, s, b, _k = rgb_to_hsbk(0, 0, 0)
assert b == 0
assert s == 0
def test_rgb_to_hsbk_white_has_full_brightness_zero_saturation():
_h, s, b, _k = rgb_to_hsbk(255, 255, 255)
assert s == 0
assert b == 65535
def test_rgb_to_hsbk_clamps_out_of_range_input():
# Negative / >255 should still produce a valid HSBK
h, s, b, _k = rgb_to_hsbk(-50, 999, 128)
assert 0 <= h <= 0xFFFF
assert 0 <= s <= 0xFFFF
assert 0 <= b <= 0xFFFF
# ============================================================================
# Packet construction
# ============================================================================
def test_build_packet_size_header_matches_total():
packet = _build_packet(msg_type=MSG_SET_POWER, payload=b"\x00\x01\x02\x03\x04\x05")
size = struct.unpack_from("<H", packet, 0)[0]
assert size == len(packet)
def test_build_packet_encodes_msg_type():
packet = _build_packet(msg_type=MSG_SET_COLOR, payload=b"")
msg_type = struct.unpack_from("<H", packet, 32)[0]
assert msg_type == MSG_SET_COLOR
def test_build_packet_tagged_flag_set():
packet = _build_packet(msg_type=MSG_GET_SERVICE, payload=b"", tagged=True)
frame_field = struct.unpack_from("<H", packet, 2)[0]
# Tagged bit (0x2000) must be set in the frame header
assert frame_field & 0x2000
def test_build_packet_target_mac_at_offset_8():
target = b"\xaa\xbb\xcc\xdd\xee\xff"
packet = _build_packet(msg_type=MSG_SET_COLOR, payload=b"", target_mac=target)
assert packet[8:14] == target
def test_build_packet_sequence_byte_at_offset_23():
packet = _build_packet(msg_type=MSG_SET_COLOR, payload=b"", sequence=42)
assert packet[23] == 42
def test_set_color_payload_layout():
payload = _build_set_color_payload(h=10, s=20, b=30, k=4000, duration_ms=500)
# reserved byte + four uint16 + uint32 = 13 bytes
assert len(payload) == 13
assert payload[0] == 0
h, s, br, k, duration = struct.unpack_from("<HHHHI", payload, 1)
assert (h, s, br, k, duration) == (10, 20, 30, 4000, 500)
def test_set_power_payload_on_off():
on_payload = _build_set_power_payload(True)
off_payload = _build_set_power_payload(False)
on_level = struct.unpack_from("<H", on_payload, 0)[0]
off_level = struct.unpack_from("<H", off_payload, 0)[0]
assert on_level == 65535
assert off_level == 0
# ============================================================================
# StateService reply parsing
# ============================================================================
def test_parse_state_service_extracts_mac_and_port():
"""Build a synthetic LIFX reply and parse it back."""
mac = b"\xaa\xbb\xcc\xdd\xee\xff"
payload = struct.pack("<BI", 1, 56700) # service=1, port=56700
packet = _build_packet(
msg_type=MSG_STATE_SERVICE,
payload=payload,
target_mac=mac,
)
parsed = _parse_state_service_reply(packet)
assert parsed is not None
assert parsed["mac"] == mac.hex()
assert parsed["service"] == 1
assert parsed["port"] == 56700
def test_parse_state_service_rejects_wrong_msg_type():
"""A SetColor packet sent back must not be misread as a StateService."""
payload = _build_set_color_payload(0, 0, 0, 0)
packet = _build_packet(msg_type=MSG_SET_COLOR, payload=payload)
assert _parse_state_service_reply(packet) is None
def test_parse_state_service_rejects_runt_payload():
assert _parse_state_service_reply(b"") is None
assert _parse_state_service_reply(b"\x00" * 35) is None
# ============================================================================
# Helpers
# ============================================================================
def test_average_color_numpy():
pixels = np.array([[10, 20, 30], [40, 50, 60], [70, 80, 90]], dtype=np.uint8)
assert _average_color(pixels) == (40, 50, 60)
def test_average_color_list_and_empty():
assert _average_color([(10, 0, 0), (20, 0, 0), (30, 0, 0)]) == (20, 0, 0)
assert _average_color([]) == (0, 0, 0)
# ============================================================================
# LIFXClient (mocked transport)
# ============================================================================
def _make_connected_client(min_interval_s: float = 0.0) -> LIFXClient:
client = LIFXClient("lifx://127.0.0.1", led_count=10, min_interval_s=min_interval_s)
transport = MagicMock()
transport.sendto = MagicMock()
transport.close = MagicMock()
client._transport = transport
client._protocol = MagicMock()
client._connected = True
return client
def _sent_packets(client: LIFXClient) -> list[bytes]:
return [bytes(call.args[0]) for call in client._transport.sendto.call_args_list]
@pytest.mark.asyncio
async def test_send_pixels_emits_one_set_color_packet():
client = _make_connected_client()
pixels = np.array([[255, 0, 0]], dtype=np.uint8)
await client.send_pixels(pixels)
packets = _sent_packets(client)
assert len(packets) == 1
msg_type = struct.unpack_from("<H", packets[0], 32)[0]
assert msg_type == MSG_SET_COLOR
@pytest.mark.asyncio
async def test_send_pixels_encodes_red_into_hsbk_payload():
client = _make_connected_client()
pixels = np.array([[255, 0, 0]], dtype=np.uint8)
await client.send_pixels(pixels)
packet = _sent_packets(client)[0]
# SetColor payload starts at offset 36 (header) + 1 (reserved byte) = 37
h, s, b, _k, _dur = struct.unpack_from("<HHHHI", packet, 37)
assert h == 0
assert s == 65535
assert b == 65535
@pytest.mark.asyncio
async def test_send_pixels_scales_brightness_by_dimming_rgb():
client = _make_connected_client()
pixels = np.array([[255, 0, 0]], dtype=np.uint8)
await client.send_pixels(pixels, brightness=128)
packet = _sent_packets(client)[0]
_h, _s, b, _k, _dur = struct.unpack_from("<HHHHI", packet, 37)
# 255 * 128/255 = 128 → brightness ≈ 128/255 * 65535 ≈ 32896
assert abs(b - int((128 / 255) * 65535)) < 256
@pytest.mark.asyncio
async def test_rate_limit_drops_subsequent_frames():
client = _make_connected_client(min_interval_s=10.0)
pixels = np.array([[10, 20, 30]], dtype=np.uint8)
await client.send_pixels(pixels)
await client.send_pixels(pixels)
await client.send_pixels(pixels)
assert len(_sent_packets(client)) == 1
@pytest.mark.asyncio
async def test_send_pixels_when_not_connected_raises():
client = LIFXClient("lifx://127.0.0.1", led_count=1)
with pytest.raises(RuntimeError, match="not connected"):
await client.send_pixels(np.array([[1, 2, 3]], dtype=np.uint8))
def test_send_pixels_fast_runs_synchronously():
client = _make_connected_client(min_interval_s=0.0)
pixels = np.array([[10, 20, 30]], dtype=np.uint8)
client.send_pixels_fast(pixels)
packets = _sent_packets(client)
assert len(packets) == 1
msg_type = struct.unpack_from("<H", packets[0], 32)[0]
assert msg_type == MSG_SET_COLOR
def test_supports_fast_send_is_true():
assert LIFXClient("lifx://127.0.0.1", led_count=1).supports_fast_send is True
@pytest.mark.asyncio
async def test_set_power_sends_set_power_msg():
client = _make_connected_client()
await client.set_power(True)
await client.set_power(False)
packets = _sent_packets(client)
types = [struct.unpack_from("<H", p, 32)[0] for p in packets]
assert types == [MSG_SET_POWER, MSG_SET_POWER]
on_level = struct.unpack_from("<H", packets[0], 36)[0]
off_level = struct.unpack_from("<H", packets[1], 36)[0]
assert on_level == 65535
assert off_level == 0
@pytest.mark.asyncio
async def test_set_color_sends_set_color_msg():
client = _make_connected_client()
await client.set_color(255, 0, 0)
packet = _sent_packets(client)[0]
msg_type = struct.unpack_from("<H", packet, 32)[0]
assert msg_type == MSG_SET_COLOR
@pytest.mark.asyncio
async def test_close_releases_transport():
client = _make_connected_client()
transport = client._transport
await client.close()
transport.close.assert_called_once()
assert client._transport is None
assert client.is_connected is False
# ============================================================================
# Provider
# ============================================================================
def test_provider_device_type_and_capabilities():
provider = LIFXDeviceProvider()
assert provider.device_type == "lifx"
caps = provider.capabilities
assert "manual_led_count" in caps
assert "power_control" in caps
assert "single_pixel" in caps
@pytest.mark.asyncio
async def test_provider_validate_accepts_bare_host():
provider = LIFXDeviceProvider()
assert await provider.validate_device("192.168.1.50") == {}
@pytest.mark.asyncio
async def test_provider_validate_rejects_empty():
provider = LIFXDeviceProvider()
with pytest.raises(ValueError, match="Invalid LIFX URL"):
await provider.validate_device("")
def test_provider_create_client_threads_config():
provider = LIFXDeviceProvider()
config = LIFXConfig(
device_id="device_test",
device_url="lifx://192.168.1.50",
led_count=30,
lifx_min_interval_ms=100,
)
client = provider.create_client(config, deps=ProviderDeps())
assert isinstance(client, LIFXClient)
assert client.host == "192.168.1.50"
assert client.port == LIFX_PORT
assert client._led_count == 30
assert client._min_interval_s == pytest.approx(0.1)
@pytest.mark.asyncio
async def test_provider_discover_returns_empty_on_failure(monkeypatch):
async def _explode(timeout):
raise OSError("network unreachable")
monkeypatch.setattr("ledgrab.core.devices.lifx_provider.discover_lifx_bulbs", _explode)
provider = LIFXDeviceProvider()
assert await provider.discover() == []
@pytest.mark.asyncio
async def test_provider_discover_maps_replies_to_discovered_devices(monkeypatch):
async def _fake(timeout):
return [
{"ip": "192.168.1.50", "mac": "aabbccddeeff", "port": 56700},
{"ip": "", "mac": "1234567890ab", "port": 56700},
]
monkeypatch.setattr("ledgrab.core.devices.lifx_provider.discover_lifx_bulbs", _fake)
provider = LIFXDeviceProvider()
results = await provider.discover()
assert len(results) == 1
[bulb] = results
assert bulb.device_type == "lifx"
assert bulb.url == "lifx://192.168.1.50"
assert bulb.ip == "192.168.1.50"
assert bulb.mac == "aabbccddeeff"
# ============================================================================
# Device.to_config() round-trip
# ============================================================================
def test_device_to_config_round_trip_lifx():
from ledgrab.storage.device_store import Device
device = Device(
device_id="device_abc12345",
name="Office LIFX",
url="lifx://192.168.1.42",
led_count=30,
device_type="lifx",
lifx_min_interval_ms=100,
)
config = device.to_config()
assert isinstance(config, LIFXConfig)
assert config.device_url == "lifx://192.168.1.42"
assert config.led_count == 30
assert config.lifx_min_interval_ms == 100
def test_device_to_dict_omits_lifx_default_interval():
from ledgrab.storage.device_store import Device
device = Device(
device_id="device_abc12345",
name="Default",
url="lifx://192.168.1.42",
led_count=1,
device_type="lifx",
)
assert "lifx_min_interval_ms" not in device.to_dict()
def test_device_to_dict_preserves_non_default_lifx_interval():
from ledgrab.storage.device_store import Device
device = Device(
device_id="device_abc12345",
name="Custom",
url="lifx://192.168.1.42",
led_count=1,
device_type="lifx",
lifx_min_interval_ms=200,
)
assert device.to_dict()["lifx_min_interval_ms"] == 200
def test_device_from_dict_lifx_round_trip():
from ledgrab.storage.device_store import Device
restored = Device.from_dict(
{
"id": "device_abc12345",
"name": "Roundtrip",
"url": "lifx://10.0.0.1",
"led_count": 1,
"device_type": "lifx",
"lifx_min_interval_ms": 150,
}
)
assert restored.lifx_min_interval_ms == 150