feat(devices): LIFX LAN target type
Adds support for LIFX smart bulbs and lightstrips that speak the LIFX binary UDP protocol on port 56700, with broadcast LAN discovery via the standard GetService/StateService probe. Backend: - LIFXClient is a single-pixel UDP adapter: averages the strip to one RGB triple, converts to LIFX HSBK (16-bit hue/saturation/brightness + kelvin), and pushes a tagged SetColor packet so all bulbs on the subnet act on it. Brightness folds into the HSBK brightness channel. - Hand-rolled packet builder: 36-byte LIFX header (frame + frame-address + protocol-header) + variable-length payload. Source ID 'LGGR' identifies LedGrab in protocol logs. - supports_fast_send=True with a synchronous send_pixels_fast hot path -- UDP costs nothing, so the default rate gate is 50 ms (~20 Hz) to match LIFX's documented <=20 cmd/sec recommendation. - Broadcast discovery sends GetService and parses StateService replies back into IP + MAC + service-port triples. Broadcast failures yield [] rather than raising. - Health check sends GetService and waits 1.5s for any reply on a one-shot UDP socket. - LIFXConfig joins the typed config union; Device storage gains a lifx_min_interval_ms field; full to_dict/from_dict/to_config wiring. - 47 unit tests cover URL parsing, RGB->HSBK conversion (red/green/ blue/white/black/clamping), packet construction (size, msg type, tagged flag, target MAC, sequence byte), SetColor and SetPower payload layouts, StateService reply parsing (including rejection of wrong msg types and runt payloads), strip averaging, rate limiting, fast-send hot path, provider validate/discover/health, and Device.to_config round-trip. Frontend: - 'lifx' in DEVICE_TYPE_KEYS (next to 'wiz'), lightbulb icon (deliberate smart-bulb family grouping with Hue + Yeelight + WiZ). - isLifxDevice predicate + per-type field show/hide in create and settings modals. - Rate-limit number input (default 50 ms) in both modals with hint text referencing LIFX's documented <=20 cmd/sec ceiling. - Locale strings in en/ru/zh. LIFX bulbs are reachable from the existing "Scan network" button -- no new discovery UI affordance was needed. No brightness_control capability exposed; LIFX brightness is folded into the HSBK on the wire.
This commit is contained in:
@@ -723,9 +723,12 @@ After phase 1 the codebase will have 3 fresh examples of "ping the LAN, listen f
|
||||
|
||||
### Phase 4 — Major consumer brands
|
||||
|
||||
- [x] **LIFX LAN** — UDP binary protocol on port 56700; RGB→HSBK 16-bit
|
||||
conversion; broadcast discovery via GetService/StateService probe;
|
||||
47 unit tests. Single-pixel adapter shape, identical to WiZ
|
||||
structurally. Frontend wired via subagent.
|
||||
- [ ] Govee LAN API (2023+)
|
||||
- [ ] Twinkly
|
||||
- [ ] LIFX LAN
|
||||
- [ ] Nanoleaf OpenAPI
|
||||
- [ ] Mi-Light / MiBoxer UDP gateway
|
||||
|
||||
|
||||
@@ -68,6 +68,7 @@ def _device_to_response(device) -> DeviceResponse:
|
||||
hue_entertainment_group_id=device.hue_entertainment_group_id,
|
||||
yeelight_min_interval_ms=device.yeelight_min_interval_ms,
|
||||
wiz_min_interval_ms=device.wiz_min_interval_ms,
|
||||
lifx_min_interval_ms=device.lifx_min_interval_ms,
|
||||
spi_speed_hz=device.spi_speed_hz,
|
||||
spi_led_type=device.spi_led_type,
|
||||
chroma_device_type=device.chroma_device_type,
|
||||
@@ -233,6 +234,11 @@ async def create_device(
|
||||
if device_data.wiz_min_interval_ms is not None
|
||||
else 50
|
||||
),
|
||||
lifx_min_interval_ms=(
|
||||
device_data.lifx_min_interval_ms
|
||||
if device_data.lifx_min_interval_ms is not None
|
||||
else 50
|
||||
),
|
||||
spi_speed_hz=device_data.spi_speed_hz or 800000,
|
||||
spi_led_type=device_data.spi_led_type or "WS2812B",
|
||||
chroma_device_type=device_data.chroma_device_type or "chromalink",
|
||||
@@ -497,6 +503,7 @@ async def update_device(
|
||||
hue_entertainment_group_id=update_data.hue_entertainment_group_id,
|
||||
yeelight_min_interval_ms=update_data.yeelight_min_interval_ms,
|
||||
wiz_min_interval_ms=update_data.wiz_min_interval_ms,
|
||||
lifx_min_interval_ms=update_data.lifx_min_interval_ms,
|
||||
spi_speed_hz=update_data.spi_speed_hz,
|
||||
spi_led_type=update_data.spi_led_type,
|
||||
chroma_device_type=update_data.chroma_device_type,
|
||||
|
||||
@@ -77,6 +77,13 @@ class DeviceCreate(BaseModel):
|
||||
le=10000,
|
||||
description="WiZ client-side rate limit between commands in ms (default 50)",
|
||||
)
|
||||
# LIFX fields
|
||||
lifx_min_interval_ms: Optional[int] = Field(
|
||||
None,
|
||||
ge=0,
|
||||
le=10000,
|
||||
description="LIFX client-side rate limit between commands in ms (default 50)",
|
||||
)
|
||||
# SPI Direct fields
|
||||
spi_speed_hz: Optional[int] = Field(
|
||||
None, ge=100000, le=4000000, description="SPI clock speed in Hz"
|
||||
@@ -171,6 +178,9 @@ class DeviceUpdate(BaseModel):
|
||||
wiz_min_interval_ms: Optional[int] = Field(
|
||||
None, ge=0, le=10000, description="WiZ client-side rate limit in ms"
|
||||
)
|
||||
lifx_min_interval_ms: Optional[int] = Field(
|
||||
None, ge=0, le=10000, description="LIFX client-side rate limit in ms"
|
||||
)
|
||||
spi_speed_hz: Optional[int] = Field(None, ge=100000, le=4000000, description="SPI clock speed")
|
||||
spi_led_type: Optional[str] = Field(None, description="LED chipset type")
|
||||
chroma_device_type: Optional[str] = Field(None, description="Chroma peripheral type")
|
||||
@@ -344,6 +354,7 @@ class DeviceResponse(BaseModel):
|
||||
default=500, description="Yeelight client-side rate limit in ms"
|
||||
)
|
||||
wiz_min_interval_ms: int = Field(default=50, description="WiZ client-side rate limit in ms")
|
||||
lifx_min_interval_ms: int = Field(default=50, description="LIFX client-side rate limit in ms")
|
||||
spi_speed_hz: int = Field(default=800000, description="SPI clock speed in Hz")
|
||||
spi_led_type: str = Field(default="WS2812B", description="LED chipset type")
|
||||
chroma_device_type: str = Field(default="chromalink", description="Chroma peripheral type")
|
||||
|
||||
@@ -100,6 +100,18 @@ class WiZConfig(BaseDeviceConfig):
|
||||
wiz_min_interval_ms: int = 50
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class LIFXConfig(BaseDeviceConfig):
|
||||
"""LIFX LAN bulb / lightstrip.
|
||||
|
||||
LIFX recommends ≤20 commands/sec per device. ``lifx_min_interval_ms``
|
||||
defaults to 50 ms so we stay just under that ceiling.
|
||||
"""
|
||||
|
||||
device_type: Literal["lifx"] = "lifx"
|
||||
lifx_min_interval_ms: int = 50
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SPIConfig(BaseDeviceConfig):
|
||||
device_type: Literal["spi"] = "spi"
|
||||
@@ -172,6 +184,7 @@ DeviceConfig = Union[
|
||||
DDPConfig,
|
||||
YeelightConfig,
|
||||
WiZConfig,
|
||||
LIFXConfig,
|
||||
AdalightConfig,
|
||||
AmbiLEDConfig,
|
||||
DMXConfig,
|
||||
|
||||
@@ -346,6 +346,10 @@ def _register_builtin_providers():
|
||||
|
||||
register_provider(WiZDeviceProvider())
|
||||
|
||||
from ledgrab.core.devices.lifx_provider import LIFXDeviceProvider
|
||||
|
||||
register_provider(LIFXDeviceProvider())
|
||||
|
||||
# BLE support is optional — only register the provider if the ``bleak``
|
||||
# extra is installed. Importing the provider itself is safe (it doesn't
|
||||
# import bleak at module load), but we still want a clean skip on
|
||||
|
||||
@@ -0,0 +1,425 @@
|
||||
"""LIFX LAN LED client.
|
||||
|
||||
LIFX bulbs and lightstrips accept a binary UDP protocol on port 56700.
|
||||
Every packet has a 36-byte header (frame + frame-address + protocol-header)
|
||||
followed by a type-specific payload. Colors are HSBK 16-bit per channel.
|
||||
|
||||
URL scheme: ``lifx://<host>[:port]`` or bare ``<host>[:port]``. Default port 56700.
|
||||
|
||||
LIFX bulbs are reachable two ways:
|
||||
* Unicast — set the ``target`` field to the bulb's 48-bit MAC.
|
||||
* Broadcast — set ``target`` to all zeros and ``tagged=1``; all bulbs on
|
||||
the subnet act on the message. We use this for the SetColor hot path
|
||||
so we don't have to learn the MAC of every device a user owns.
|
||||
|
||||
Reference: https://lan.developer.lifx.com/docs/header-description
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import socket
|
||||
import struct
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from typing import List, Optional, Tuple, Union
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import numpy as np
|
||||
|
||||
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
|
||||
from ledgrab.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
LIFX_PORT = 56700
|
||||
DEFAULT_MIN_INTERVAL_S = 0.05 # ~20 Hz — LIFX rate-limit guidance is 20/sec
|
||||
|
||||
# Message types we care about
|
||||
MSG_GET_SERVICE = 2
|
||||
MSG_STATE_SERVICE = 3
|
||||
MSG_SET_POWER = 21
|
||||
MSG_SET_COLOR = 102
|
||||
|
||||
# Frame field byte 0 of the protocol header: tagged=1, addressable=1, protocol=1024
|
||||
_FRAME_TAGGED = 0x3400
|
||||
_FRAME_UNTAGGED = 0x1400
|
||||
|
||||
_SOURCE_ID = 0x4C474752 # "LGGR" — identifies LedGrab in protocol logs
|
||||
|
||||
|
||||
def parse_lifx_url(url: str) -> Tuple[str, int]:
|
||||
"""Pull ``(host, port)`` from ``lifx://host[:port]`` or bare ``host[:port]``."""
|
||||
if not url:
|
||||
raise ValueError("LIFX URL is empty")
|
||||
raw = url.strip()
|
||||
if "://" in raw:
|
||||
parsed = urlparse(raw)
|
||||
host = parsed.hostname or ""
|
||||
port = parsed.port or LIFX_PORT
|
||||
else:
|
||||
parsed = urlparse(f"lifx://{raw}")
|
||||
host = parsed.hostname or ""
|
||||
port = parsed.port or LIFX_PORT
|
||||
if not host:
|
||||
raise ValueError(f"LIFX URL has no host: {url!r}")
|
||||
return host, port
|
||||
|
||||
|
||||
def _average_color(
|
||||
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
|
||||
) -> Tuple[int, int, int]:
|
||||
"""Reduce an N-pixel strip to one average RGB triple."""
|
||||
if isinstance(pixels, np.ndarray):
|
||||
if pixels.size == 0:
|
||||
return (0, 0, 0)
|
||||
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
|
||||
mean = arr.mean(axis=0)
|
||||
return int(mean[0]), int(mean[1]), int(mean[2])
|
||||
if not pixels:
|
||||
return (0, 0, 0)
|
||||
total_r = total_g = total_b = 0
|
||||
for r, g, b in pixels:
|
||||
total_r += r
|
||||
total_g += g
|
||||
total_b += b
|
||||
n = len(pixels)
|
||||
return total_r // n, total_g // n, total_b // n
|
||||
|
||||
|
||||
def rgb_to_hsbk(r: int, g: int, b: int, kelvin: int = 3500) -> Tuple[int, int, int, int]:
|
||||
"""Convert 8-bit RGB to LIFX 16-bit HSBK.
|
||||
|
||||
The ``kelvin`` channel is irrelevant when saturation > 0 (the bulb
|
||||
interprets it as a hint, not a hard temperature), so we leave it at the
|
||||
LIFX default of ~3500 K. Outputs are clamped uint16.
|
||||
"""
|
||||
r_n = max(0, min(255, r)) / 255.0
|
||||
g_n = max(0, min(255, g)) / 255.0
|
||||
b_n = max(0, min(255, b)) / 255.0
|
||||
c_max = max(r_n, g_n, b_n)
|
||||
c_min = min(r_n, g_n, b_n)
|
||||
delta = c_max - c_min
|
||||
|
||||
# Hue (0-360 degrees → 0-65535)
|
||||
if delta == 0:
|
||||
h = 0.0
|
||||
elif c_max == r_n:
|
||||
h = 60.0 * (((g_n - b_n) / delta) % 6)
|
||||
elif c_max == g_n:
|
||||
h = 60.0 * (((b_n - r_n) / delta) + 2)
|
||||
else:
|
||||
h = 60.0 * (((r_n - g_n) / delta) + 4)
|
||||
hue_u16 = int((h / 360.0) * 65535) & 0xFFFF
|
||||
|
||||
# Saturation (0-1 → 0-65535)
|
||||
sat_u16 = 0 if c_max == 0 else int((delta / c_max) * 65535) & 0xFFFF
|
||||
# Brightness (0-1 → 0-65535)
|
||||
bri_u16 = int(c_max * 65535) & 0xFFFF
|
||||
kelvin_u16 = max(2500, min(9000, kelvin)) & 0xFFFF
|
||||
return hue_u16, sat_u16, bri_u16, kelvin_u16
|
||||
|
||||
|
||||
def _build_packet(
|
||||
*,
|
||||
msg_type: int,
|
||||
payload: bytes,
|
||||
target_mac: bytes = b"\x00\x00\x00\x00\x00\x00",
|
||||
sequence: int = 0,
|
||||
res_required: bool = False,
|
||||
ack_required: bool = False,
|
||||
tagged: bool = False,
|
||||
) -> bytes:
|
||||
"""Pack a LIFX packet: 36-byte header + payload.
|
||||
|
||||
See https://lan.developer.lifx.com/docs/header-description for the
|
||||
bit-level field layout. We construct the three sub-headers separately
|
||||
so the magic numbers are scoped to the fields they belong to.
|
||||
"""
|
||||
size = 36 + len(payload)
|
||||
# Frame header (8 bytes): size(2) | origin/tagged/addressable/protocol(2) | source(4)
|
||||
frame_field = _FRAME_TAGGED if tagged else _FRAME_UNTAGGED
|
||||
frame = struct.pack("<HHI", size, frame_field, _SOURCE_ID)
|
||||
# Frame address (16 bytes): target(8) | reserved(6) | res/ack flags(1) | sequence(1)
|
||||
flags = (0x01 if res_required else 0) | (0x02 if ack_required else 0)
|
||||
frame_addr = (
|
||||
target_mac + b"\x00\x00" + b"\x00\x00\x00\x00\x00\x00" + bytes([flags, sequence & 0xFF])
|
||||
)
|
||||
# Protocol header (12 bytes): reserved(8) | type(2) | reserved(2)
|
||||
proto = b"\x00" * 8 + struct.pack("<HH", msg_type, 0)
|
||||
return frame + frame_addr + proto + payload
|
||||
|
||||
|
||||
def _build_set_color_payload(h: int, s: int, b: int, k: int, duration_ms: int = 0) -> bytes:
|
||||
"""SetColor payload: reserved(1) | HSBK(8) | duration(4)."""
|
||||
return b"\x00" + struct.pack(
|
||||
"<HHHHI", h & 0xFFFF, s & 0xFFFF, b & 0xFFFF, k & 0xFFFF, duration_ms & 0xFFFFFFFF
|
||||
)
|
||||
|
||||
|
||||
def _build_set_power_payload(on: bool, duration_ms: int = 0) -> bytes:
|
||||
"""SetPower payload: level(2) | duration(4). Level is 0 or 65535."""
|
||||
return struct.pack("<HI", 65535 if on else 0, duration_ms & 0xFFFFFFFF)
|
||||
|
||||
|
||||
def _parse_state_service_reply(raw: bytes) -> Optional[dict]:
|
||||
"""Parse a LIFX StateService (discovery) reply.
|
||||
|
||||
Returns ``{"mac": "aabbccddeeff", "service": int, "port": int}`` or
|
||||
``None`` if the payload isn't a StateService.
|
||||
"""
|
||||
if len(raw) < 36 + 5:
|
||||
return None
|
||||
# Read msg_type from the protocol header at offset 32
|
||||
msg_type = struct.unpack_from("<H", raw, 32)[0]
|
||||
if msg_type != MSG_STATE_SERVICE:
|
||||
return None
|
||||
# Target MAC at offset 8, 6 bytes are the MAC.
|
||||
mac_bytes = raw[8:14]
|
||||
mac = mac_bytes.hex()
|
||||
# Payload at offset 36: service(1) + port(4)
|
||||
service, port = struct.unpack_from("<BI", raw, 36)
|
||||
return {"mac": mac, "service": int(service), "port": int(port)}
|
||||
|
||||
|
||||
class _LIFXProtocol(asyncio.DatagramProtocol):
|
||||
"""Write-only datagram protocol. Inbound replies dropped silently."""
|
||||
|
||||
def connection_made(self, transport):
|
||||
self.transport = transport
|
||||
|
||||
def datagram_received(self, data, addr):
|
||||
# LIFX bulbs sometimes echo back state on broadcast. We don't need it
|
||||
# for streaming ambilight — discard.
|
||||
pass
|
||||
|
||||
def error_received(self, exc):
|
||||
logger.debug("LIFX UDP error: %s", exc)
|
||||
|
||||
|
||||
class LIFXClient(LEDClient):
|
||||
"""LEDClient for a single LIFX bulb on the LAN."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
led_count: int = 1,
|
||||
*,
|
||||
min_interval_s: float = DEFAULT_MIN_INTERVAL_S,
|
||||
):
|
||||
host, port = parse_lifx_url(url)
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._led_count = led_count
|
||||
self._min_interval_s = max(0.0, min_interval_s)
|
||||
self._transport: Optional[asyncio.DatagramTransport] = None
|
||||
self._protocol: Optional[_LIFXProtocol] = None
|
||||
self._connected = False
|
||||
self._next_tx_at: float = 0.0
|
||||
self._sequence: int = 0
|
||||
|
||||
@property
|
||||
def host(self) -> str:
|
||||
return self._host
|
||||
|
||||
@property
|
||||
def port(self) -> int:
|
||||
return self._port
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self._connected and self._transport is not None
|
||||
|
||||
@property
|
||||
def device_led_count(self) -> Optional[int]:
|
||||
return self._led_count or None
|
||||
|
||||
async def connect(self) -> bool:
|
||||
if self._connected and self._transport is not None:
|
||||
return True
|
||||
loop = asyncio.get_running_loop()
|
||||
try:
|
||||
transport, protocol = await loop.create_datagram_endpoint(
|
||||
_LIFXProtocol, remote_addr=(self._host, self._port)
|
||||
)
|
||||
except OSError as exc:
|
||||
raise RuntimeError(f"Failed to open UDP to LIFX at {self._host}: {exc}") from exc
|
||||
self._transport = transport
|
||||
self._protocol = protocol # type: ignore[assignment]
|
||||
self._connected = True
|
||||
logger.info("LIFXClient connected to %s:%d", self._host, self._port)
|
||||
return True
|
||||
|
||||
async def close(self) -> None:
|
||||
if self._transport is not None:
|
||||
try:
|
||||
self._transport.close()
|
||||
except OSError:
|
||||
pass
|
||||
self._transport = None
|
||||
self._protocol = None
|
||||
self._connected = False
|
||||
|
||||
def _next_sequence(self) -> int:
|
||||
self._sequence = (self._sequence + 1) & 0xFF
|
||||
return self._sequence
|
||||
|
||||
def _send(self, msg_type: int, payload: bytes) -> None:
|
||||
"""Build and send one LIFX packet. Caller must hold an open transport."""
|
||||
assert self._transport is not None
|
||||
packet = _build_packet(
|
||||
msg_type=msg_type,
|
||||
payload=payload,
|
||||
sequence=self._next_sequence(),
|
||||
tagged=True, # broadcast within the unicast UDP socket — bulb still acts on it
|
||||
)
|
||||
self._transport.sendto(packet)
|
||||
|
||||
async def send_pixels(
|
||||
self,
|
||||
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
|
||||
brightness: int = 255,
|
||||
) -> bool:
|
||||
"""Average the strip → HSBK → SetColor."""
|
||||
if not self.is_connected:
|
||||
raise RuntimeError("LIFXClient not connected")
|
||||
now = time.monotonic()
|
||||
if now < self._next_tx_at:
|
||||
return True
|
||||
r, g, b = _average_color(pixels)
|
||||
if brightness < 255:
|
||||
scale = max(0, min(255, brightness)) / 255.0
|
||||
r = int(r * scale)
|
||||
g = int(g * scale)
|
||||
b = int(b * scale)
|
||||
h, s, br, k = rgb_to_hsbk(r, g, b)
|
||||
self._send(MSG_SET_COLOR, _build_set_color_payload(h, s, br, k, duration_ms=0))
|
||||
self._next_tx_at = now + self._min_interval_s
|
||||
return True
|
||||
|
||||
def send_pixels_fast(
|
||||
self,
|
||||
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
|
||||
brightness: int = 255,
|
||||
) -> None:
|
||||
"""Synchronous variant — same shape, runs on the hot loop."""
|
||||
if not self.is_connected or self._transport is None:
|
||||
raise RuntimeError("LIFXClient not connected")
|
||||
now = time.monotonic()
|
||||
if now < self._next_tx_at:
|
||||
return
|
||||
r, g, b = _average_color(pixels)
|
||||
if brightness < 255:
|
||||
scale = max(0, min(255, brightness)) / 255.0
|
||||
r = int(r * scale)
|
||||
g = int(g * scale)
|
||||
b = int(b * scale)
|
||||
h, s, br, k = rgb_to_hsbk(r, g, b)
|
||||
self._send(MSG_SET_COLOR, _build_set_color_payload(h, s, br, k, duration_ms=0))
|
||||
self._next_tx_at = now + self._min_interval_s
|
||||
|
||||
@property
|
||||
def supports_fast_send(self) -> bool:
|
||||
return True
|
||||
|
||||
async def set_color(self, r: int, g: int, b: int) -> None:
|
||||
if not self.is_connected:
|
||||
raise RuntimeError("LIFXClient not connected")
|
||||
h, s, br, k = rgb_to_hsbk(r, g, b)
|
||||
self._send(MSG_SET_COLOR, _build_set_color_payload(h, s, br, k, duration_ms=0))
|
||||
|
||||
async def set_power(self, on: bool) -> None:
|
||||
if not self.is_connected:
|
||||
raise RuntimeError("LIFXClient not connected")
|
||||
self._send(MSG_SET_POWER, _build_set_power_payload(on))
|
||||
|
||||
@classmethod
|
||||
async def check_health(
|
||||
cls,
|
||||
url: str,
|
||||
http_client,
|
||||
prev_health: Optional[DeviceHealth] = None,
|
||||
) -> DeviceHealth:
|
||||
"""Send GetService and wait briefly for a StateService reply."""
|
||||
now = datetime.now(timezone.utc)
|
||||
try:
|
||||
host, port = parse_lifx_url(url)
|
||||
except ValueError as exc:
|
||||
return DeviceHealth(online=False, last_checked=now, error=str(exc))
|
||||
loop = asyncio.get_running_loop()
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
|
||||
sock.setblocking(False)
|
||||
try:
|
||||
sock.bind(("", 0))
|
||||
probe = _build_packet(msg_type=MSG_GET_SERVICE, payload=b"", tagged=True)
|
||||
start = loop.time()
|
||||
await loop.sock_sendto(sock, probe, (host, port))
|
||||
try:
|
||||
await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=1.5)
|
||||
except asyncio.TimeoutError:
|
||||
return DeviceHealth(
|
||||
online=False,
|
||||
last_checked=now,
|
||||
error=f"No LIFX reply from {host}:{port} within 1.5s",
|
||||
)
|
||||
latency_ms = (loop.time() - start) * 1000.0
|
||||
return DeviceHealth(online=True, latency_ms=latency_ms, last_checked=now)
|
||||
except OSError as exc:
|
||||
return DeviceHealth(
|
||||
online=False,
|
||||
last_checked=now,
|
||||
error=f"LIFX probe failed for {host}: {exc}",
|
||||
)
|
||||
finally:
|
||||
sock.close()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Broadcast discovery
|
||||
# ============================================================================
|
||||
|
||||
|
||||
async def discover_lifx_bulbs(timeout: float = 2.0) -> List[dict]:
|
||||
"""Broadcast a GetService probe on the LAN and collect StateService replies.
|
||||
|
||||
Returns ``[{"ip": ..., "mac": ..., "port": ...}, ...]``.
|
||||
"""
|
||||
loop = asyncio.get_running_loop()
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
sock.setblocking(False)
|
||||
try:
|
||||
sock.bind(("", 0))
|
||||
probe = _build_packet(msg_type=MSG_GET_SERVICE, payload=b"", tagged=True)
|
||||
await loop.sock_sendto(sock, probe, ("255.255.255.255", LIFX_PORT))
|
||||
results: list[dict] = []
|
||||
seen_macs: set[str] = set()
|
||||
deadline = loop.time() + timeout
|
||||
while True:
|
||||
remaining = deadline - loop.time()
|
||||
if remaining <= 0:
|
||||
break
|
||||
try:
|
||||
raw, addr = await asyncio.wait_for(
|
||||
loop.sock_recvfrom(sock, 4096),
|
||||
timeout=remaining,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
break
|
||||
parsed = _parse_state_service_reply(raw)
|
||||
if not parsed:
|
||||
continue
|
||||
mac = parsed["mac"]
|
||||
if mac in seen_macs:
|
||||
continue
|
||||
seen_macs.add(mac)
|
||||
results.append(
|
||||
{
|
||||
"ip": addr[0],
|
||||
"mac": mac,
|
||||
"port": parsed["port"],
|
||||
}
|
||||
)
|
||||
return results
|
||||
finally:
|
||||
sock.close()
|
||||
@@ -0,0 +1,92 @@
|
||||
"""LIFX device provider — LAN-discoverable LIFX smart bulbs."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, List
|
||||
|
||||
from ledgrab.core.devices.led_client import (
|
||||
DeviceHealth,
|
||||
DiscoveredDevice,
|
||||
LEDClient,
|
||||
LEDDeviceProvider,
|
||||
ProviderDeps,
|
||||
)
|
||||
from ledgrab.core.devices.lifx_client import (
|
||||
LIFXClient,
|
||||
discover_lifx_bulbs,
|
||||
parse_lifx_url,
|
||||
)
|
||||
from ledgrab.utils import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ledgrab.core.devices.device_config import LIFXConfig
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class LIFXDeviceProvider(LEDDeviceProvider):
|
||||
"""Provider for LIFX smart bulbs / lightstrips.
|
||||
|
||||
Single-pixel adapter: averages the strip down to one color, encodes as
|
||||
HSBK (LIFX's native color model), and broadcasts a SetColor packet.
|
||||
"""
|
||||
|
||||
@property
|
||||
def device_type(self) -> str:
|
||||
return "lifx"
|
||||
|
||||
@property
|
||||
def capabilities(self) -> set:
|
||||
return {
|
||||
"manual_led_count",
|
||||
"power_control",
|
||||
"static_color",
|
||||
"health_check",
|
||||
"single_pixel",
|
||||
}
|
||||
|
||||
def create_client(self, config: "LIFXConfig", *, deps: ProviderDeps) -> LEDClient:
|
||||
return LIFXClient(
|
||||
config.device_url,
|
||||
led_count=config.led_count,
|
||||
min_interval_s=max(0.0, config.lifx_min_interval_ms / 1000.0),
|
||||
)
|
||||
|
||||
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
|
||||
return await LIFXClient.check_health(url, http_client, prev_health)
|
||||
|
||||
async def validate_device(self, url: str) -> dict:
|
||||
try:
|
||||
host, port = parse_lifx_url(url)
|
||||
except ValueError as exc:
|
||||
raise ValueError(f"Invalid LIFX URL: {exc}") from exc
|
||||
logger.info("LIFX device URL validated: host=%s port=%d", host, port)
|
||||
return {}
|
||||
|
||||
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
|
||||
try:
|
||||
bulbs = await discover_lifx_bulbs(timeout=min(timeout, 5.0))
|
||||
except (OSError, RuntimeError) as exc:
|
||||
logger.warning("LIFX discovery failed: %s", exc)
|
||||
return []
|
||||
|
||||
results: List[DiscoveredDevice] = []
|
||||
for bulb in bulbs:
|
||||
ip = bulb.get("ip", "")
|
||||
if not ip:
|
||||
continue
|
||||
url = f"lifx://{ip}"
|
||||
mac = bulb.get("mac", "")
|
||||
results.append(
|
||||
DiscoveredDevice(
|
||||
name=f"LIFX {mac[-6:]}" if mac else "LIFX bulb",
|
||||
url=url,
|
||||
device_type="lifx",
|
||||
ip=ip,
|
||||
mac=mac,
|
||||
led_count=None,
|
||||
version=None,
|
||||
)
|
||||
)
|
||||
logger.info("LIFX broadcast scan found %d bulb(s)", len(results))
|
||||
return results
|
||||
@@ -167,6 +167,10 @@ export function isWizDevice(type: string) {
|
||||
return type === 'wiz';
|
||||
}
|
||||
|
||||
export function isLifxDevice(type: string) {
|
||||
return type === 'lifx';
|
||||
}
|
||||
|
||||
export function isUsbhidDevice(type: string) {
|
||||
return type === 'usbhid';
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ const _deviceTypeIcons = {
|
||||
wled: _svg(P.wifi), adalight: _svg(P.usb), ambiled: _svg(P.usb),
|
||||
mqtt: _svg(P.send), ws: _svg(P.globe), openrgb: _svg(P.palette),
|
||||
dmx: _svg(P.radio), ddp: _svg(P.send), mock: _svg(P.wrench),
|
||||
espnow: _svg(P.radio), hue: _svg(P.lightbulb), yeelight: _svg(P.lightbulb), wiz: _svg(P.lightbulb),
|
||||
espnow: _svg(P.radio), hue: _svg(P.lightbulb), yeelight: _svg(P.lightbulb), wiz: _svg(P.lightbulb), lifx: _svg(P.lightbulb),
|
||||
usbhid: _svg(P.usb),
|
||||
spi: _svg(P.plug), chroma: _svg(P.zap), gamesense: _svg(P.target),
|
||||
ble: _svg(P.bluetooth),
|
||||
|
||||
@@ -7,7 +7,7 @@ import {
|
||||
_discoveryCache, set_discoveryCache,
|
||||
csptCache,
|
||||
} from '../core/state.ts';
|
||||
import { API_BASE, fetchWithAuth, isSerialDevice, isMockDevice, isMqttDevice, isWsDevice, isOpenrgbDevice, isDmxDevice, isDdpDevice, isEspnowDevice, isHueDevice, isYeelightDevice, isWizDevice, isBleDevice, isUsbhidDevice, isSpiDevice, isChromaDevice, isGameSenseDevice, isGroupDevice, escapeHtml } from '../core/api.ts';
|
||||
import { API_BASE, fetchWithAuth, isSerialDevice, isMockDevice, isMqttDevice, isWsDevice, isOpenrgbDevice, isDmxDevice, isDdpDevice, isEspnowDevice, isHueDevice, isYeelightDevice, isWizDevice, isLifxDevice, isBleDevice, isUsbhidDevice, isSpiDevice, isChromaDevice, isGameSenseDevice, isGroupDevice, escapeHtml } from '../core/api.ts';
|
||||
import { devicesCache } from '../core/state.ts';
|
||||
import { t } from '../core/i18n.ts';
|
||||
import { showToast, desktopFocus } from '../core/ui.ts';
|
||||
@@ -43,6 +43,7 @@ class AddDeviceModal extends Modal {
|
||||
bleGoveeKey: (document.getElementById('device-ble-govee-key') as HTMLInputElement)?.value || '',
|
||||
yeelightMinInterval: (document.getElementById('device-yeelight-min-interval') as HTMLInputElement)?.value || '500',
|
||||
wizMinInterval: (document.getElementById('device-wiz-min-interval') as HTMLInputElement)?.value || '50',
|
||||
lifxMinInterval: (document.getElementById('device-lifx-min-interval') as HTMLInputElement)?.value || '50',
|
||||
groupChildren: JSON.stringify(_getGroupChildIds('device')),
|
||||
groupMode: (document.getElementById('device-group-mode-select') as HTMLSelectElement)?.value || 'sequence',
|
||||
};
|
||||
@@ -53,7 +54,7 @@ const addDeviceModal = new AddDeviceModal();
|
||||
|
||||
/* ── Icon-grid type selector ──────────────────────────────────── */
|
||||
|
||||
const DEVICE_TYPE_KEYS = ['wled', 'adalight', 'ambiled', 'mqtt', 'ws', 'openrgb', 'dmx', 'ddp', 'espnow', 'hue', 'yeelight', 'wiz', 'ble', 'usbhid', 'spi', 'chroma', 'gamesense', 'group', 'mock'];
|
||||
const DEVICE_TYPE_KEYS = ['wled', 'adalight', 'ambiled', 'mqtt', 'ws', 'openrgb', 'dmx', 'ddp', 'espnow', 'hue', 'yeelight', 'wiz', 'lifx', 'ble', 'usbhid', 'spi', 'chroma', 'gamesense', 'group', 'mock'];
|
||||
|
||||
function _buildDeviceTypeItems() {
|
||||
return DEVICE_TYPE_KEYS.map(key => ({
|
||||
@@ -282,6 +283,7 @@ export function onDeviceTypeChanged() {
|
||||
_showHueFields(false);
|
||||
_showYeelightFields(false);
|
||||
_showWizFields(false);
|
||||
_showLifxFields(false);
|
||||
_showBleFields(false);
|
||||
_showSpiFields(false);
|
||||
_showChromaFields(false);
|
||||
@@ -503,6 +505,28 @@ export function onDeviceTypeChanged() {
|
||||
} else {
|
||||
scanForDevices();
|
||||
}
|
||||
} else if (isLifxDevice(deviceType)) {
|
||||
// LIFX: binary UDP on port 56700. Show URL (LAN IP), LED count
|
||||
// (controls source mapping; LIFX is single-pixel — HSBK averaged
|
||||
// from the strip), rate-limit ms. Discovery uses UDP broadcast.
|
||||
urlGroup.style.display = '';
|
||||
urlInput.setAttribute('required', '');
|
||||
serialGroup.style.display = 'none';
|
||||
serialSelect.removeAttribute('required');
|
||||
ledCountGroup.style.display = '';
|
||||
baudRateGroup.style.display = 'none';
|
||||
if (ledTypeGroup) ledTypeGroup.style.display = 'none';
|
||||
if (sendLatencyGroup) sendLatencyGroup.style.display = 'none';
|
||||
if (scanBtn) scanBtn.style.display = '';
|
||||
_showLifxFields(true);
|
||||
if (urlLabel) urlLabel.textContent = t('device.lifx.url') || 'IP Address:';
|
||||
if (urlHint) urlHint.textContent = t('device.lifx.url.hint') || 'LAN IP of the LIFX bulb. UDP port 56700 is the protocol default.';
|
||||
urlInput.placeholder = t('device.lifx.url.placeholder') || '192.168.1.50';
|
||||
if (deviceType in _discoveryCache) {
|
||||
_renderDiscoveryList();
|
||||
} else {
|
||||
scanForDevices();
|
||||
}
|
||||
} else if (isBleDevice(deviceType)) {
|
||||
// BLE: show URL (ble://<address>), LED count, protocol family picker,
|
||||
// and a Govee-only AES key field that toggles with the family selection.
|
||||
@@ -856,6 +880,13 @@ export function showAddDevice(presetType: any = null, cloneData: any = null) {
|
||||
wmi.value = String(cloneData.wiz_min_interval_ms);
|
||||
}
|
||||
}
|
||||
// Prefill LIFX fields
|
||||
if (isLifxDevice(presetType)) {
|
||||
const lmi = document.getElementById('device-lifx-min-interval') as HTMLInputElement;
|
||||
if (lmi && cloneData.lifx_min_interval_ms != null) {
|
||||
lmi.value = String(cloneData.lifx_min_interval_ms);
|
||||
}
|
||||
}
|
||||
// Prefill CSPT template selector (after fetch completes)
|
||||
if (cloneData.default_css_processing_template_id) {
|
||||
csptCache.fetch().then(() => {
|
||||
@@ -1067,6 +1098,11 @@ export async function handleAddDevice(event: any) {
|
||||
const parsed = parseInt(raw || '50', 10);
|
||||
body.wiz_min_interval_ms = Number.isFinite(parsed) ? parsed : 50;
|
||||
}
|
||||
if (isLifxDevice(deviceType)) {
|
||||
const raw = (document.getElementById('device-lifx-min-interval') as HTMLInputElement)?.value;
|
||||
const parsed = parseInt(raw || '50', 10);
|
||||
body.lifx_min_interval_ms = Number.isFinite(parsed) ? parsed : 50;
|
||||
}
|
||||
if (isBleDevice(deviceType)) {
|
||||
body.ble_family = (document.getElementById('device-ble-family') as HTMLSelectElement)?.value || 'sp110e';
|
||||
const goveeKey = (document.getElementById('device-ble-govee-key') as HTMLInputElement)?.value?.trim();
|
||||
@@ -1430,6 +1466,11 @@ function _showWizFields(show: boolean) {
|
||||
if (el) el.style.display = show ? '' : 'none';
|
||||
}
|
||||
|
||||
function _showLifxFields(show: boolean) {
|
||||
const el = document.getElementById('device-lifx-min-interval-group') as HTMLElement | null;
|
||||
if (el) el.style.display = show ? '' : 'none';
|
||||
}
|
||||
|
||||
// Tracks whether the BLE fields are currently shown — avoids reading
|
||||
// style.display strings in _updateBleGoveeKeyVisibility.
|
||||
let _bleFieldsVisible = false;
|
||||
|
||||
@@ -6,7 +6,7 @@ import {
|
||||
_deviceBrightnessCache, updateDeviceBrightness,
|
||||
csptCache,
|
||||
} from '../core/state.ts';
|
||||
import { API_BASE, getHeaders, fetchWithAuth, escapeHtml, isSerialDevice, isMockDevice, isMqttDevice, isWsDevice, isOpenrgbDevice, isDmxDevice, isDdpDevice, isYeelightDevice, isWizDevice, isBleDevice, isGroupDevice } from '../core/api.ts';
|
||||
import { API_BASE, getHeaders, fetchWithAuth, escapeHtml, isSerialDevice, isMockDevice, isMqttDevice, isWsDevice, isOpenrgbDevice, isDmxDevice, isDdpDevice, isYeelightDevice, isWizDevice, isLifxDevice, isBleDevice, isGroupDevice } from '../core/api.ts';
|
||||
import { devicesCache } from '../core/state.ts';
|
||||
import { _fetchOpenrgbZones, _getCheckedZones, _splitOpenrgbZone, _getZoneMode, ensureDmxProtocolIconSelect, destroyDmxProtocolIconSelect, ensureDdpColorOrderIconSelect, destroyDdpColorOrderIconSelect, ensureSpiLedTypeIconSelect, destroySpiLedTypeIconSelect, ensureGameSenseDeviceTypeIconSelect, destroyGameSenseDeviceTypeIconSelect, addGroupChildSettingsWithId as _addGroupChildSettingsWithId, ensureGroupModeIconSelect, destroyGroupModeIconSelect, ensureBleFamilyIconSelect, destroyBleFamilyIconSelect } from './device-discovery.ts';
|
||||
import { t } from '../core/i18n.ts';
|
||||
@@ -97,6 +97,7 @@ class DeviceSettingsModal extends Modal {
|
||||
bleGoveeKey: (document.getElementById('settings-ble-govee-key') as HTMLInputElement | null)?.value || '',
|
||||
yeelightMinInterval: (document.getElementById('settings-yeelight-min-interval') as HTMLInputElement | null)?.value || '500',
|
||||
wizMinInterval: (document.getElementById('settings-wiz-min-interval') as HTMLInputElement | null)?.value || '50',
|
||||
lifxMinInterval: (document.getElementById('settings-lifx-min-interval') as HTMLInputElement | null)?.value || '50',
|
||||
csptId: (document.getElementById('settings-css-processing-template') as HTMLSelectElement | null)?.value || '',
|
||||
};
|
||||
}
|
||||
@@ -667,6 +668,24 @@ export async function showSettings(deviceId: any) {
|
||||
if (wizMinIntervalGroup) (wizMinIntervalGroup as HTMLElement).style.display = 'none';
|
||||
}
|
||||
|
||||
// LIFX-specific fields — binary UDP on port 56700, single-pixel
|
||||
// (HSBK averaged from the strip). LIFX recommends ≤20 cmd/sec per
|
||||
// device; default 50 ms matches that ceiling.
|
||||
const lifxMinIntervalGroup = document.getElementById('settings-lifx-min-interval-group');
|
||||
if (isLifxDevice(device.device_type)) {
|
||||
if (lifxMinIntervalGroup) (lifxMinIntervalGroup as HTMLElement).style.display = '';
|
||||
const lmi = device.lifx_min_interval_ms ?? 50;
|
||||
(document.getElementById('settings-lifx-min-interval') as HTMLInputElement).value = String(lmi);
|
||||
// Relabel URL field as IP Address (same pattern as WiZ/Yeelight/DMX/DDP)
|
||||
const urlLabel6 = urlGroup.querySelector('label[for="settings-device-url"]') as HTMLElement | null;
|
||||
const urlHint6 = urlGroup.querySelector('.input-hint') as HTMLElement | null;
|
||||
if (urlLabel6) urlLabel6.textContent = t('device.lifx.url');
|
||||
if (urlHint6) urlHint6.textContent = t('device.lifx.url.hint');
|
||||
urlInput.placeholder = t('device.lifx.url.placeholder') || '192.168.1.50';
|
||||
} else {
|
||||
if (lifxMinIntervalGroup) (lifxMinIntervalGroup as HTMLElement).style.display = 'none';
|
||||
}
|
||||
|
||||
// BLE-specific fields — exposed in the settings modal so the user
|
||||
// can fix a wrong protocol family pick without deleting+recreating
|
||||
// the device. Uses the shared IconSelect grid (project rule bans
|
||||
@@ -813,6 +832,11 @@ export async function saveDeviceSettings() {
|
||||
const parsed = parseInt(raw || '50', 10);
|
||||
body.wiz_min_interval_ms = Number.isFinite(parsed) ? parsed : 50;
|
||||
}
|
||||
if (isLifxDevice(settingsModal.deviceType)) {
|
||||
const raw = (document.getElementById('settings-lifx-min-interval') as HTMLInputElement | null)?.value;
|
||||
const parsed = parseInt(raw || '50', 10);
|
||||
body.lifx_min_interval_ms = Number.isFinite(parsed) ? parsed : 50;
|
||||
}
|
||||
if (isBleDevice(settingsModal.deviceType)) {
|
||||
body.ble_family = (document.getElementById('settings-ble-family') as HTMLSelectElement | null)?.value || 'sp110e';
|
||||
const goveeKey = (document.getElementById('settings-ble-govee-key') as HTMLInputElement | null)?.value?.trim() || '';
|
||||
|
||||
@@ -47,7 +47,7 @@ export function bindableColorSourceId(b: BindableColor | undefined): string {
|
||||
|
||||
export type DeviceType =
|
||||
| 'wled' | 'adalight' | 'ambiled' | 'mock' | 'mqtt' | 'ws'
|
||||
| 'openrgb' | 'dmx' | 'ddp' | 'espnow' | 'hue' | 'yeelight' | 'wiz'
|
||||
| 'openrgb' | 'dmx' | 'ddp' | 'espnow' | 'hue' | 'yeelight' | 'wiz' | 'lifx'
|
||||
| 'ble' | 'usbhid' | 'spi'
|
||||
| 'chroma' | 'gamesense' | 'group';
|
||||
|
||||
@@ -78,6 +78,7 @@ export interface Device {
|
||||
hue_entertainment_group_id: string;
|
||||
yeelight_min_interval_ms: number;
|
||||
wiz_min_interval_ms: number;
|
||||
lifx_min_interval_ms: number;
|
||||
spi_speed_hz: number;
|
||||
spi_led_type: string;
|
||||
chroma_device_type: string;
|
||||
|
||||
@@ -211,6 +211,13 @@
|
||||
"device.wiz.url.placeholder": "192.168.1.50",
|
||||
"device.wiz_min_interval": "Min Update Interval:",
|
||||
"device.wiz_min_interval.hint": "Client-side rate limit between commands in ms. UDP fire-and-forget tolerates fast updates; default 50 ms ≈ 20 Hz.",
|
||||
"device.type.lifx": "LIFX",
|
||||
"device.type.lifx.desc": "LIFX smart bulb / lightstrip over LAN",
|
||||
"device.lifx.url": "IP Address:",
|
||||
"device.lifx.url.hint": "LAN IP of the LIFX bulb. UDP port 56700 is the protocol default.",
|
||||
"device.lifx.url.placeholder": "192.168.1.50",
|
||||
"device.lifx_min_interval": "Min Update Interval:",
|
||||
"device.lifx_min_interval.hint": "Client-side rate limit between commands in ms. LIFX recommends ≤20 cmd/sec; default 50 ms matches that ceiling.",
|
||||
"device.type.ble": "BLE LED Controller",
|
||||
"device.type.ble.desc": "Bluetooth LE strips: SP110E, Triones, Zengge, Govee (whole-strip color)",
|
||||
"device.ble.url": "BLE Address:",
|
||||
|
||||
@@ -266,6 +266,13 @@
|
||||
"device.wiz.url.placeholder": "192.168.1.50",
|
||||
"device.wiz_min_interval": "Мин. интервал обновления:",
|
||||
"device.wiz_min_interval.hint": "Локальный лимит частоты команд (мс). UDP fire-and-forget справляется с быстрыми обновлениями; по умолчанию 50 мс ≈ 20 Гц.",
|
||||
"device.type.lifx": "LIFX",
|
||||
"device.type.lifx.desc": "Умная лампа / лента LIFX по LAN",
|
||||
"device.lifx.url": "IP-адрес:",
|
||||
"device.lifx.url.hint": "IP-адрес лампы LIFX в локальной сети. UDP-порт 56700 — по умолчанию.",
|
||||
"device.lifx.url.placeholder": "192.168.1.50",
|
||||
"device.lifx_min_interval": "Мин. интервал обновления:",
|
||||
"device.lifx_min_interval.hint": "Локальный лимит частоты команд (мс). LIFX рекомендует ≤20 команд/сек; по умолчанию 50 мс соответствует этому потолку.",
|
||||
"device.type.ble": "BLE LED контроллер",
|
||||
"device.type.ble.desc": "Bluetooth LE ленты: SP110E, Triones, Zengge, Govee (один цвет на всю ленту)",
|
||||
"device.ble.url": "BLE адрес:",
|
||||
|
||||
@@ -264,6 +264,13 @@
|
||||
"device.wiz.url.placeholder": "192.168.1.50",
|
||||
"device.wiz_min_interval": "最小更新间隔:",
|
||||
"device.wiz_min_interval.hint": "客户端命令速率限制(毫秒)。UDP 即发即忘可处理快速更新;默认 50 毫秒 ≈ 20 Hz。",
|
||||
"device.type.lifx": "LIFX",
|
||||
"device.type.lifx.desc": "通过局域网连接 LIFX 智能灯泡/灯带",
|
||||
"device.lifx.url": "IP 地址:",
|
||||
"device.lifx.url.hint": "LIFX 灯泡的局域网 IP。UDP 端口 56700 为协议默认值。",
|
||||
"device.lifx.url.placeholder": "192.168.1.50",
|
||||
"device.lifx_min_interval": "最小更新间隔:",
|
||||
"device.lifx_min_interval.hint": "客户端命令速率限制(毫秒)。LIFX 建议 ≤20 cmd/sec;默认 50 毫秒符合该上限。",
|
||||
"device.type.ble": "BLE LED 控制器",
|
||||
"device.type.ble.desc": "Bluetooth LE 灯带:SP110E、Triones、Zengge、Govee(整条灯带同色)",
|
||||
"device.ble.url": "BLE 地址:",
|
||||
|
||||
@@ -66,6 +66,8 @@ class Device:
|
||||
yeelight_min_interval_ms: int = 500,
|
||||
# WiZ fields
|
||||
wiz_min_interval_ms: int = 50,
|
||||
# LIFX fields
|
||||
lifx_min_interval_ms: int = 50,
|
||||
# SPI Direct fields
|
||||
spi_speed_hz: int = 800000,
|
||||
spi_led_type: str = "WS2812B",
|
||||
@@ -115,6 +117,7 @@ class Device:
|
||||
self.hue_entertainment_group_id = hue_entertainment_group_id
|
||||
self.yeelight_min_interval_ms = yeelight_min_interval_ms
|
||||
self.wiz_min_interval_ms = wiz_min_interval_ms
|
||||
self.lifx_min_interval_ms = lifx_min_interval_ms
|
||||
self.spi_speed_hz = spi_speed_hz
|
||||
self.spi_led_type = spi_led_type
|
||||
self.chroma_device_type = chroma_device_type
|
||||
@@ -153,6 +156,7 @@ class Device:
|
||||
MQTTConfig,
|
||||
OpenRGBConfig,
|
||||
SPIConfig,
|
||||
LIFXConfig,
|
||||
USBHIDConfig,
|
||||
WiZConfig,
|
||||
WLEDConfig,
|
||||
@@ -213,6 +217,11 @@ class Device:
|
||||
**base,
|
||||
wiz_min_interval_ms=self.wiz_min_interval_ms,
|
||||
)
|
||||
if dt == "lifx":
|
||||
return LIFXConfig(
|
||||
**base,
|
||||
lifx_min_interval_ms=self.lifx_min_interval_ms,
|
||||
)
|
||||
if dt == "spi":
|
||||
return SPIConfig(**base, spi_speed_hz=self.spi_speed_hz, spi_led_type=self.spi_led_type)
|
||||
if dt == "chroma":
|
||||
@@ -295,6 +304,8 @@ class Device:
|
||||
d["yeelight_min_interval_ms"] = self.yeelight_min_interval_ms
|
||||
if self.wiz_min_interval_ms != 50:
|
||||
d["wiz_min_interval_ms"] = self.wiz_min_interval_ms
|
||||
if self.lifx_min_interval_ms != 50:
|
||||
d["lifx_min_interval_ms"] = self.lifx_min_interval_ms
|
||||
if self.spi_speed_hz != 800000:
|
||||
d["spi_speed_hz"] = self.spi_speed_hz
|
||||
if self.spi_led_type != "WS2812B":
|
||||
@@ -352,6 +363,7 @@ class Device:
|
||||
hue_entertainment_group_id=data.get("hue_entertainment_group_id", ""),
|
||||
yeelight_min_interval_ms=data.get("yeelight_min_interval_ms", 500),
|
||||
wiz_min_interval_ms=data.get("wiz_min_interval_ms", 50),
|
||||
lifx_min_interval_ms=data.get("lifx_min_interval_ms", 50),
|
||||
spi_speed_hz=data.get("spi_speed_hz", 800000),
|
||||
spi_led_type=data.get("spi_led_type", "WS2812B"),
|
||||
chroma_device_type=data.get("chroma_device_type", "chromalink"),
|
||||
@@ -401,6 +413,7 @@ _UPDATABLE_FIELDS: frozenset[str] = frozenset(
|
||||
"hue_entertainment_group_id",
|
||||
"yeelight_min_interval_ms",
|
||||
"wiz_min_interval_ms",
|
||||
"lifx_min_interval_ms",
|
||||
"spi_speed_hz",
|
||||
"spi_led_type",
|
||||
"chroma_device_type",
|
||||
@@ -503,6 +516,7 @@ class DeviceStore(BaseSqliteStore[Device]):
|
||||
hue_entertainment_group_id: str = "",
|
||||
yeelight_min_interval_ms: int = 500,
|
||||
wiz_min_interval_ms: int = 50,
|
||||
lifx_min_interval_ms: int = 50,
|
||||
spi_speed_hz: int = 800000,
|
||||
spi_led_type: str = "WS2812B",
|
||||
chroma_device_type: str = "chromalink",
|
||||
@@ -548,6 +562,7 @@ class DeviceStore(BaseSqliteStore[Device]):
|
||||
hue_entertainment_group_id=hue_entertainment_group_id,
|
||||
yeelight_min_interval_ms=yeelight_min_interval_ms,
|
||||
wiz_min_interval_ms=wiz_min_interval_ms,
|
||||
lifx_min_interval_ms=lifx_min_interval_ms,
|
||||
spi_speed_hz=spi_speed_hz,
|
||||
spi_led_type=spi_led_type,
|
||||
chroma_device_type=chroma_device_type,
|
||||
|
||||
@@ -48,6 +48,7 @@
|
||||
<option value="hue">Philips Hue</option>
|
||||
<option value="yeelight">Yeelight</option>
|
||||
<option value="wiz">WiZ</option>
|
||||
<option value="lifx">LIFX</option>
|
||||
<option value="ble">BLE LED Controller</option>
|
||||
<option value="usbhid">USB HID</option>
|
||||
<option value="spi">SPI Direct</option>
|
||||
@@ -235,6 +236,15 @@
|
||||
<small class="input-hint" style="display:none" data-i18n="device.wiz_min_interval.hint">Client-side rate limit between commands in ms. UDP fire-and-forget tolerates fast updates; default 50 ms ≈ 20 Hz.</small>
|
||||
<input type="number" id="device-wiz-min-interval" min="0" max="10000" step="10" value="50">
|
||||
</div>
|
||||
<!-- LIFX fields -->
|
||||
<div class="form-group" id="device-lifx-min-interval-group" style="display: none;">
|
||||
<div class="label-row">
|
||||
<label for="device-lifx-min-interval" data-i18n="device.lifx_min_interval">Min Update Interval:</label>
|
||||
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
|
||||
</div>
|
||||
<small class="input-hint" style="display:none" data-i18n="device.lifx_min_interval.hint">Client-side rate limit between commands in ms. LIFX recommends ≤20 cmd/sec; default 50 ms matches that ceiling.</small>
|
||||
<input type="number" id="device-lifx-min-interval" min="0" max="10000" step="10" value="50">
|
||||
</div>
|
||||
<!-- ESP-NOW fields -->
|
||||
<div class="form-group" id="device-espnow-peer-mac-group" style="display: none;">
|
||||
<div class="label-row">
|
||||
|
||||
@@ -268,6 +268,15 @@
|
||||
<input type="number" id="settings-wiz-min-interval" min="0" max="10000" step="10" value="50">
|
||||
</div>
|
||||
|
||||
<div class="form-group" id="settings-lifx-min-interval-group" style="display: none;">
|
||||
<div class="label-row">
|
||||
<label for="settings-lifx-min-interval" data-i18n="device.lifx_min_interval">Min Update Interval:</label>
|
||||
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
|
||||
</div>
|
||||
<small class="input-hint" style="display:none" data-i18n="device.lifx_min_interval.hint">Client-side rate limit between commands in ms. LIFX recommends ≤20 cmd/sec; default 50 ms matches that ceiling.</small>
|
||||
<input type="number" id="settings-lifx-min-interval" min="0" max="10000" step="10" value="50">
|
||||
</div>
|
||||
|
||||
<div class="form-group" id="settings-send-latency-group" style="display: none;">
|
||||
<div class="label-row">
|
||||
<label for="settings-send-latency" data-i18n="device.send_latency">Send Latency (ms):</label>
|
||||
|
||||
@@ -0,0 +1,479 @@
|
||||
"""Tests for the LIFX LAN LED client + provider."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import struct
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from ledgrab.core.devices.device_config import LIFXConfig
|
||||
from ledgrab.core.devices.led_client import ProviderDeps
|
||||
from ledgrab.core.devices.lifx_client import (
|
||||
LIFX_PORT,
|
||||
MSG_GET_SERVICE,
|
||||
MSG_SET_COLOR,
|
||||
MSG_SET_POWER,
|
||||
MSG_STATE_SERVICE,
|
||||
LIFXClient,
|
||||
_average_color,
|
||||
_build_packet,
|
||||
_build_set_color_payload,
|
||||
_build_set_power_payload,
|
||||
_parse_state_service_reply,
|
||||
parse_lifx_url,
|
||||
rgb_to_hsbk,
|
||||
)
|
||||
from ledgrab.core.devices.lifx_provider import LIFXDeviceProvider
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# URL parsing
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url,expected",
|
||||
[
|
||||
("lifx://192.168.1.50", ("192.168.1.50", LIFX_PORT)),
|
||||
("lifx://192.168.1.50:56700", ("192.168.1.50", 56700)),
|
||||
("192.168.1.50", ("192.168.1.50", LIFX_PORT)),
|
||||
("192.168.1.50:56700", ("192.168.1.50", 56700)),
|
||||
("bulb.local", ("bulb.local", LIFX_PORT)),
|
||||
],
|
||||
)
|
||||
def test_parse_lifx_url(url, expected):
|
||||
assert parse_lifx_url(url) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize("url", ["", " ", "lifx://", "://192.168.1.1"])
|
||||
def test_parse_lifx_url_rejects_empty(url):
|
||||
with pytest.raises(ValueError):
|
||||
parse_lifx_url(url)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# RGB → HSBK conversion
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def test_rgb_to_hsbk_pure_red():
|
||||
h, s, b, k = rgb_to_hsbk(255, 0, 0)
|
||||
assert h == 0
|
||||
assert s == 65535
|
||||
assert b == 65535
|
||||
assert 2500 <= k <= 9000
|
||||
|
||||
|
||||
def test_rgb_to_hsbk_pure_green():
|
||||
h, s, b, k = rgb_to_hsbk(0, 255, 0)
|
||||
# Green is at 120° → hue = 120/360 * 65535 ≈ 21845
|
||||
assert abs(h - 21845) < 5
|
||||
assert s == 65535
|
||||
assert b == 65535
|
||||
|
||||
|
||||
def test_rgb_to_hsbk_pure_blue():
|
||||
h, s, b, _k = rgb_to_hsbk(0, 0, 255)
|
||||
assert abs(h - 43690) < 5
|
||||
assert s == 65535
|
||||
assert b == 65535
|
||||
|
||||
|
||||
def test_rgb_to_hsbk_black_is_zero_brightness():
|
||||
h, s, b, _k = rgb_to_hsbk(0, 0, 0)
|
||||
assert b == 0
|
||||
assert s == 0
|
||||
|
||||
|
||||
def test_rgb_to_hsbk_white_has_full_brightness_zero_saturation():
|
||||
_h, s, b, _k = rgb_to_hsbk(255, 255, 255)
|
||||
assert s == 0
|
||||
assert b == 65535
|
||||
|
||||
|
||||
def test_rgb_to_hsbk_clamps_out_of_range_input():
|
||||
# Negative / >255 should still produce a valid HSBK
|
||||
h, s, b, _k = rgb_to_hsbk(-50, 999, 128)
|
||||
assert 0 <= h <= 0xFFFF
|
||||
assert 0 <= s <= 0xFFFF
|
||||
assert 0 <= b <= 0xFFFF
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Packet construction
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def test_build_packet_size_header_matches_total():
|
||||
packet = _build_packet(msg_type=MSG_SET_POWER, payload=b"\x00\x01\x02\x03\x04\x05")
|
||||
size = struct.unpack_from("<H", packet, 0)[0]
|
||||
assert size == len(packet)
|
||||
|
||||
|
||||
def test_build_packet_encodes_msg_type():
|
||||
packet = _build_packet(msg_type=MSG_SET_COLOR, payload=b"")
|
||||
msg_type = struct.unpack_from("<H", packet, 32)[0]
|
||||
assert msg_type == MSG_SET_COLOR
|
||||
|
||||
|
||||
def test_build_packet_tagged_flag_set():
|
||||
packet = _build_packet(msg_type=MSG_GET_SERVICE, payload=b"", tagged=True)
|
||||
frame_field = struct.unpack_from("<H", packet, 2)[0]
|
||||
# Tagged bit (0x2000) must be set in the frame header
|
||||
assert frame_field & 0x2000
|
||||
|
||||
|
||||
def test_build_packet_target_mac_at_offset_8():
|
||||
target = b"\xaa\xbb\xcc\xdd\xee\xff"
|
||||
packet = _build_packet(msg_type=MSG_SET_COLOR, payload=b"", target_mac=target)
|
||||
assert packet[8:14] == target
|
||||
|
||||
|
||||
def test_build_packet_sequence_byte_at_offset_23():
|
||||
packet = _build_packet(msg_type=MSG_SET_COLOR, payload=b"", sequence=42)
|
||||
assert packet[23] == 42
|
||||
|
||||
|
||||
def test_set_color_payload_layout():
|
||||
payload = _build_set_color_payload(h=10, s=20, b=30, k=4000, duration_ms=500)
|
||||
# reserved byte + four uint16 + uint32 = 13 bytes
|
||||
assert len(payload) == 13
|
||||
assert payload[0] == 0
|
||||
h, s, br, k, duration = struct.unpack_from("<HHHHI", payload, 1)
|
||||
assert (h, s, br, k, duration) == (10, 20, 30, 4000, 500)
|
||||
|
||||
|
||||
def test_set_power_payload_on_off():
|
||||
on_payload = _build_set_power_payload(True)
|
||||
off_payload = _build_set_power_payload(False)
|
||||
on_level = struct.unpack_from("<H", on_payload, 0)[0]
|
||||
off_level = struct.unpack_from("<H", off_payload, 0)[0]
|
||||
assert on_level == 65535
|
||||
assert off_level == 0
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# StateService reply parsing
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def test_parse_state_service_extracts_mac_and_port():
|
||||
"""Build a synthetic LIFX reply and parse it back."""
|
||||
mac = b"\xaa\xbb\xcc\xdd\xee\xff"
|
||||
payload = struct.pack("<BI", 1, 56700) # service=1, port=56700
|
||||
packet = _build_packet(
|
||||
msg_type=MSG_STATE_SERVICE,
|
||||
payload=payload,
|
||||
target_mac=mac,
|
||||
)
|
||||
|
||||
parsed = _parse_state_service_reply(packet)
|
||||
assert parsed is not None
|
||||
assert parsed["mac"] == mac.hex()
|
||||
assert parsed["service"] == 1
|
||||
assert parsed["port"] == 56700
|
||||
|
||||
|
||||
def test_parse_state_service_rejects_wrong_msg_type():
|
||||
"""A SetColor packet sent back must not be misread as a StateService."""
|
||||
payload = _build_set_color_payload(0, 0, 0, 0)
|
||||
packet = _build_packet(msg_type=MSG_SET_COLOR, payload=payload)
|
||||
assert _parse_state_service_reply(packet) is None
|
||||
|
||||
|
||||
def test_parse_state_service_rejects_runt_payload():
|
||||
assert _parse_state_service_reply(b"") is None
|
||||
assert _parse_state_service_reply(b"\x00" * 35) is None
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Helpers
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def test_average_color_numpy():
|
||||
pixels = np.array([[10, 20, 30], [40, 50, 60], [70, 80, 90]], dtype=np.uint8)
|
||||
assert _average_color(pixels) == (40, 50, 60)
|
||||
|
||||
|
||||
def test_average_color_list_and_empty():
|
||||
assert _average_color([(10, 0, 0), (20, 0, 0), (30, 0, 0)]) == (20, 0, 0)
|
||||
assert _average_color([]) == (0, 0, 0)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# LIFXClient (mocked transport)
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def _make_connected_client(min_interval_s: float = 0.0) -> LIFXClient:
|
||||
client = LIFXClient("lifx://127.0.0.1", led_count=10, min_interval_s=min_interval_s)
|
||||
transport = MagicMock()
|
||||
transport.sendto = MagicMock()
|
||||
transport.close = MagicMock()
|
||||
client._transport = transport
|
||||
client._protocol = MagicMock()
|
||||
client._connected = True
|
||||
return client
|
||||
|
||||
|
||||
def _sent_packets(client: LIFXClient) -> list[bytes]:
|
||||
return [bytes(call.args[0]) for call in client._transport.sendto.call_args_list]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_pixels_emits_one_set_color_packet():
|
||||
client = _make_connected_client()
|
||||
pixels = np.array([[255, 0, 0]], dtype=np.uint8)
|
||||
|
||||
await client.send_pixels(pixels)
|
||||
|
||||
packets = _sent_packets(client)
|
||||
assert len(packets) == 1
|
||||
msg_type = struct.unpack_from("<H", packets[0], 32)[0]
|
||||
assert msg_type == MSG_SET_COLOR
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_pixels_encodes_red_into_hsbk_payload():
|
||||
client = _make_connected_client()
|
||||
pixels = np.array([[255, 0, 0]], dtype=np.uint8)
|
||||
|
||||
await client.send_pixels(pixels)
|
||||
|
||||
packet = _sent_packets(client)[0]
|
||||
# SetColor payload starts at offset 36 (header) + 1 (reserved byte) = 37
|
||||
h, s, b, _k, _dur = struct.unpack_from("<HHHHI", packet, 37)
|
||||
assert h == 0
|
||||
assert s == 65535
|
||||
assert b == 65535
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_pixels_scales_brightness_by_dimming_rgb():
|
||||
client = _make_connected_client()
|
||||
pixels = np.array([[255, 0, 0]], dtype=np.uint8)
|
||||
|
||||
await client.send_pixels(pixels, brightness=128)
|
||||
|
||||
packet = _sent_packets(client)[0]
|
||||
_h, _s, b, _k, _dur = struct.unpack_from("<HHHHI", packet, 37)
|
||||
# 255 * 128/255 = 128 → brightness ≈ 128/255 * 65535 ≈ 32896
|
||||
assert abs(b - int((128 / 255) * 65535)) < 256
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_rate_limit_drops_subsequent_frames():
|
||||
client = _make_connected_client(min_interval_s=10.0)
|
||||
pixels = np.array([[10, 20, 30]], dtype=np.uint8)
|
||||
|
||||
await client.send_pixels(pixels)
|
||||
await client.send_pixels(pixels)
|
||||
await client.send_pixels(pixels)
|
||||
|
||||
assert len(_sent_packets(client)) == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_pixels_when_not_connected_raises():
|
||||
client = LIFXClient("lifx://127.0.0.1", led_count=1)
|
||||
with pytest.raises(RuntimeError, match="not connected"):
|
||||
await client.send_pixels(np.array([[1, 2, 3]], dtype=np.uint8))
|
||||
|
||||
|
||||
def test_send_pixels_fast_runs_synchronously():
|
||||
client = _make_connected_client(min_interval_s=0.0)
|
||||
pixels = np.array([[10, 20, 30]], dtype=np.uint8)
|
||||
|
||||
client.send_pixels_fast(pixels)
|
||||
|
||||
packets = _sent_packets(client)
|
||||
assert len(packets) == 1
|
||||
msg_type = struct.unpack_from("<H", packets[0], 32)[0]
|
||||
assert msg_type == MSG_SET_COLOR
|
||||
|
||||
|
||||
def test_supports_fast_send_is_true():
|
||||
assert LIFXClient("lifx://127.0.0.1", led_count=1).supports_fast_send is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_power_sends_set_power_msg():
|
||||
client = _make_connected_client()
|
||||
|
||||
await client.set_power(True)
|
||||
await client.set_power(False)
|
||||
|
||||
packets = _sent_packets(client)
|
||||
types = [struct.unpack_from("<H", p, 32)[0] for p in packets]
|
||||
assert types == [MSG_SET_POWER, MSG_SET_POWER]
|
||||
on_level = struct.unpack_from("<H", packets[0], 36)[0]
|
||||
off_level = struct.unpack_from("<H", packets[1], 36)[0]
|
||||
assert on_level == 65535
|
||||
assert off_level == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_color_sends_set_color_msg():
|
||||
client = _make_connected_client()
|
||||
|
||||
await client.set_color(255, 0, 0)
|
||||
|
||||
packet = _sent_packets(client)[0]
|
||||
msg_type = struct.unpack_from("<H", packet, 32)[0]
|
||||
assert msg_type == MSG_SET_COLOR
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_close_releases_transport():
|
||||
client = _make_connected_client()
|
||||
transport = client._transport
|
||||
await client.close()
|
||||
transport.close.assert_called_once()
|
||||
assert client._transport is None
|
||||
assert client.is_connected is False
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Provider
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def test_provider_device_type_and_capabilities():
|
||||
provider = LIFXDeviceProvider()
|
||||
assert provider.device_type == "lifx"
|
||||
caps = provider.capabilities
|
||||
assert "manual_led_count" in caps
|
||||
assert "power_control" in caps
|
||||
assert "single_pixel" in caps
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_provider_validate_accepts_bare_host():
|
||||
provider = LIFXDeviceProvider()
|
||||
assert await provider.validate_device("192.168.1.50") == {}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_provider_validate_rejects_empty():
|
||||
provider = LIFXDeviceProvider()
|
||||
with pytest.raises(ValueError, match="Invalid LIFX URL"):
|
||||
await provider.validate_device("")
|
||||
|
||||
|
||||
def test_provider_create_client_threads_config():
|
||||
provider = LIFXDeviceProvider()
|
||||
config = LIFXConfig(
|
||||
device_id="device_test",
|
||||
device_url="lifx://192.168.1.50",
|
||||
led_count=30,
|
||||
lifx_min_interval_ms=100,
|
||||
)
|
||||
|
||||
client = provider.create_client(config, deps=ProviderDeps())
|
||||
|
||||
assert isinstance(client, LIFXClient)
|
||||
assert client.host == "192.168.1.50"
|
||||
assert client.port == LIFX_PORT
|
||||
assert client._led_count == 30
|
||||
assert client._min_interval_s == pytest.approx(0.1)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_provider_discover_returns_empty_on_failure(monkeypatch):
|
||||
async def _explode(timeout):
|
||||
raise OSError("network unreachable")
|
||||
|
||||
monkeypatch.setattr("ledgrab.core.devices.lifx_provider.discover_lifx_bulbs", _explode)
|
||||
provider = LIFXDeviceProvider()
|
||||
assert await provider.discover() == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_provider_discover_maps_replies_to_discovered_devices(monkeypatch):
|
||||
async def _fake(timeout):
|
||||
return [
|
||||
{"ip": "192.168.1.50", "mac": "aabbccddeeff", "port": 56700},
|
||||
{"ip": "", "mac": "1234567890ab", "port": 56700},
|
||||
]
|
||||
|
||||
monkeypatch.setattr("ledgrab.core.devices.lifx_provider.discover_lifx_bulbs", _fake)
|
||||
provider = LIFXDeviceProvider()
|
||||
results = await provider.discover()
|
||||
|
||||
assert len(results) == 1
|
||||
[bulb] = results
|
||||
assert bulb.device_type == "lifx"
|
||||
assert bulb.url == "lifx://192.168.1.50"
|
||||
assert bulb.ip == "192.168.1.50"
|
||||
assert bulb.mac == "aabbccddeeff"
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Device.to_config() round-trip
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def test_device_to_config_round_trip_lifx():
|
||||
from ledgrab.storage.device_store import Device
|
||||
|
||||
device = Device(
|
||||
device_id="device_abc12345",
|
||||
name="Office LIFX",
|
||||
url="lifx://192.168.1.42",
|
||||
led_count=30,
|
||||
device_type="lifx",
|
||||
lifx_min_interval_ms=100,
|
||||
)
|
||||
|
||||
config = device.to_config()
|
||||
|
||||
assert isinstance(config, LIFXConfig)
|
||||
assert config.device_url == "lifx://192.168.1.42"
|
||||
assert config.led_count == 30
|
||||
assert config.lifx_min_interval_ms == 100
|
||||
|
||||
|
||||
def test_device_to_dict_omits_lifx_default_interval():
|
||||
from ledgrab.storage.device_store import Device
|
||||
|
||||
device = Device(
|
||||
device_id="device_abc12345",
|
||||
name="Default",
|
||||
url="lifx://192.168.1.42",
|
||||
led_count=1,
|
||||
device_type="lifx",
|
||||
)
|
||||
assert "lifx_min_interval_ms" not in device.to_dict()
|
||||
|
||||
|
||||
def test_device_to_dict_preserves_non_default_lifx_interval():
|
||||
from ledgrab.storage.device_store import Device
|
||||
|
||||
device = Device(
|
||||
device_id="device_abc12345",
|
||||
name="Custom",
|
||||
url="lifx://192.168.1.42",
|
||||
led_count=1,
|
||||
device_type="lifx",
|
||||
lifx_min_interval_ms=200,
|
||||
)
|
||||
assert device.to_dict()["lifx_min_interval_ms"] == 200
|
||||
|
||||
|
||||
def test_device_from_dict_lifx_round_trip():
|
||||
from ledgrab.storage.device_store import Device
|
||||
|
||||
restored = Device.from_dict(
|
||||
{
|
||||
"id": "device_abc12345",
|
||||
"name": "Roundtrip",
|
||||
"url": "lifx://10.0.0.1",
|
||||
"led_count": 1,
|
||||
"device_type": "lifx",
|
||||
"lifx_min_interval_ms": 150,
|
||||
}
|
||||
)
|
||||
assert restored.lifx_min_interval_ms == 150
|
||||
Reference in New Issue
Block a user