feat(devices): LIFX LAN target type

Adds support for LIFX smart bulbs and lightstrips that speak the LIFX
binary UDP protocol on port 56700, with broadcast LAN discovery via the
standard GetService/StateService probe.

Backend:
- LIFXClient is a single-pixel UDP adapter: averages the strip to one
  RGB triple, converts to LIFX HSBK (16-bit hue/saturation/brightness +
  kelvin), and pushes a tagged SetColor packet so all bulbs on the
  subnet act on it. Brightness folds into the HSBK brightness channel.
- Hand-rolled packet builder: 36-byte LIFX header (frame +
  frame-address + protocol-header) + variable-length payload. Source
  ID 'LGGR' identifies LedGrab in protocol logs.
- supports_fast_send=True with a synchronous send_pixels_fast hot path
  -- UDP costs nothing, so the default rate gate is 50 ms (~20 Hz) to
  match LIFX's documented <=20 cmd/sec recommendation.
- Broadcast discovery sends GetService and parses StateService replies
  back into IP + MAC + service-port triples. Broadcast failures yield
  [] rather than raising.
- Health check sends GetService and waits 1.5s for any reply on a
  one-shot UDP socket.
- LIFXConfig joins the typed config union; Device storage gains a
  lifx_min_interval_ms field; full to_dict/from_dict/to_config wiring.
- 47 unit tests cover URL parsing, RGB->HSBK conversion (red/green/
  blue/white/black/clamping), packet construction (size, msg type,
  tagged flag, target MAC, sequence byte), SetColor and SetPower
  payload layouts, StateService reply parsing (including rejection
  of wrong msg types and runt payloads), strip averaging, rate
  limiting, fast-send hot path, provider validate/discover/health,
  and Device.to_config round-trip.

Frontend:
- 'lifx' in DEVICE_TYPE_KEYS (next to 'wiz'), lightbulb icon
  (deliberate smart-bulb family grouping with Hue + Yeelight + WiZ).
- isLifxDevice predicate + per-type field show/hide in create and
  settings modals.
- Rate-limit number input (default 50 ms) in both modals with hint
  text referencing LIFX's documented <=20 cmd/sec ceiling.
- Locale strings in en/ru/zh.

LIFX bulbs are reachable from the existing "Scan network" button -- no
new discovery UI affordance was needed. No brightness_control capability
exposed; LIFX brightness is folded into the HSBK on the wire.
This commit is contained in:
2026-05-16 02:30:30 +03:00
parent ede627b4ac
commit 8f9d490063
19 changed files with 1165 additions and 6 deletions
+4 -1
View File
@@ -723,9 +723,12 @@ After phase 1 the codebase will have 3 fresh examples of "ping the LAN, listen f
### Phase 4 — Major consumer brands ### Phase 4 — Major consumer brands
- [x] **LIFX LAN** — UDP binary protocol on port 56700; RGB→HSBK 16-bit
conversion; broadcast discovery via GetService/StateService probe;
47 unit tests. Single-pixel adapter shape, identical to WiZ
structurally. Frontend wired via subagent.
- [ ] Govee LAN API (2023+) - [ ] Govee LAN API (2023+)
- [ ] Twinkly - [ ] Twinkly
- [ ] LIFX LAN
- [ ] Nanoleaf OpenAPI - [ ] Nanoleaf OpenAPI
- [ ] Mi-Light / MiBoxer UDP gateway - [ ] Mi-Light / MiBoxer UDP gateway
+7
View File
@@ -68,6 +68,7 @@ def _device_to_response(device) -> DeviceResponse:
hue_entertainment_group_id=device.hue_entertainment_group_id, hue_entertainment_group_id=device.hue_entertainment_group_id,
yeelight_min_interval_ms=device.yeelight_min_interval_ms, yeelight_min_interval_ms=device.yeelight_min_interval_ms,
wiz_min_interval_ms=device.wiz_min_interval_ms, wiz_min_interval_ms=device.wiz_min_interval_ms,
lifx_min_interval_ms=device.lifx_min_interval_ms,
spi_speed_hz=device.spi_speed_hz, spi_speed_hz=device.spi_speed_hz,
spi_led_type=device.spi_led_type, spi_led_type=device.spi_led_type,
chroma_device_type=device.chroma_device_type, chroma_device_type=device.chroma_device_type,
@@ -233,6 +234,11 @@ async def create_device(
if device_data.wiz_min_interval_ms is not None if device_data.wiz_min_interval_ms is not None
else 50 else 50
), ),
lifx_min_interval_ms=(
device_data.lifx_min_interval_ms
if device_data.lifx_min_interval_ms is not None
else 50
),
spi_speed_hz=device_data.spi_speed_hz or 800000, spi_speed_hz=device_data.spi_speed_hz or 800000,
spi_led_type=device_data.spi_led_type or "WS2812B", spi_led_type=device_data.spi_led_type or "WS2812B",
chroma_device_type=device_data.chroma_device_type or "chromalink", chroma_device_type=device_data.chroma_device_type or "chromalink",
@@ -497,6 +503,7 @@ async def update_device(
hue_entertainment_group_id=update_data.hue_entertainment_group_id, hue_entertainment_group_id=update_data.hue_entertainment_group_id,
yeelight_min_interval_ms=update_data.yeelight_min_interval_ms, yeelight_min_interval_ms=update_data.yeelight_min_interval_ms,
wiz_min_interval_ms=update_data.wiz_min_interval_ms, wiz_min_interval_ms=update_data.wiz_min_interval_ms,
lifx_min_interval_ms=update_data.lifx_min_interval_ms,
spi_speed_hz=update_data.spi_speed_hz, spi_speed_hz=update_data.spi_speed_hz,
spi_led_type=update_data.spi_led_type, spi_led_type=update_data.spi_led_type,
chroma_device_type=update_data.chroma_device_type, chroma_device_type=update_data.chroma_device_type,
+11
View File
@@ -77,6 +77,13 @@ class DeviceCreate(BaseModel):
le=10000, le=10000,
description="WiZ client-side rate limit between commands in ms (default 50)", description="WiZ client-side rate limit between commands in ms (default 50)",
) )
# LIFX fields
lifx_min_interval_ms: Optional[int] = Field(
None,
ge=0,
le=10000,
description="LIFX client-side rate limit between commands in ms (default 50)",
)
# SPI Direct fields # SPI Direct fields
spi_speed_hz: Optional[int] = Field( spi_speed_hz: Optional[int] = Field(
None, ge=100000, le=4000000, description="SPI clock speed in Hz" None, ge=100000, le=4000000, description="SPI clock speed in Hz"
@@ -171,6 +178,9 @@ class DeviceUpdate(BaseModel):
wiz_min_interval_ms: Optional[int] = Field( wiz_min_interval_ms: Optional[int] = Field(
None, ge=0, le=10000, description="WiZ client-side rate limit in ms" None, ge=0, le=10000, description="WiZ client-side rate limit in ms"
) )
lifx_min_interval_ms: Optional[int] = Field(
None, ge=0, le=10000, description="LIFX client-side rate limit in ms"
)
spi_speed_hz: Optional[int] = Field(None, ge=100000, le=4000000, description="SPI clock speed") spi_speed_hz: Optional[int] = Field(None, ge=100000, le=4000000, description="SPI clock speed")
spi_led_type: Optional[str] = Field(None, description="LED chipset type") spi_led_type: Optional[str] = Field(None, description="LED chipset type")
chroma_device_type: Optional[str] = Field(None, description="Chroma peripheral type") chroma_device_type: Optional[str] = Field(None, description="Chroma peripheral type")
@@ -344,6 +354,7 @@ class DeviceResponse(BaseModel):
default=500, description="Yeelight client-side rate limit in ms" default=500, description="Yeelight client-side rate limit in ms"
) )
wiz_min_interval_ms: int = Field(default=50, description="WiZ client-side rate limit in ms") wiz_min_interval_ms: int = Field(default=50, description="WiZ client-side rate limit in ms")
lifx_min_interval_ms: int = Field(default=50, description="LIFX client-side rate limit in ms")
spi_speed_hz: int = Field(default=800000, description="SPI clock speed in Hz") spi_speed_hz: int = Field(default=800000, description="SPI clock speed in Hz")
spi_led_type: str = Field(default="WS2812B", description="LED chipset type") spi_led_type: str = Field(default="WS2812B", description="LED chipset type")
chroma_device_type: str = Field(default="chromalink", description="Chroma peripheral type") chroma_device_type: str = Field(default="chromalink", description="Chroma peripheral type")
@@ -100,6 +100,18 @@ class WiZConfig(BaseDeviceConfig):
wiz_min_interval_ms: int = 50 wiz_min_interval_ms: int = 50
@dataclass(frozen=True)
class LIFXConfig(BaseDeviceConfig):
"""LIFX LAN bulb / lightstrip.
LIFX recommends ≤20 commands/sec per device. ``lifx_min_interval_ms``
defaults to 50 ms so we stay just under that ceiling.
"""
device_type: Literal["lifx"] = "lifx"
lifx_min_interval_ms: int = 50
@dataclass(frozen=True) @dataclass(frozen=True)
class SPIConfig(BaseDeviceConfig): class SPIConfig(BaseDeviceConfig):
device_type: Literal["spi"] = "spi" device_type: Literal["spi"] = "spi"
@@ -172,6 +184,7 @@ DeviceConfig = Union[
DDPConfig, DDPConfig,
YeelightConfig, YeelightConfig,
WiZConfig, WiZConfig,
LIFXConfig,
AdalightConfig, AdalightConfig,
AmbiLEDConfig, AmbiLEDConfig,
DMXConfig, DMXConfig,
@@ -346,6 +346,10 @@ def _register_builtin_providers():
register_provider(WiZDeviceProvider()) register_provider(WiZDeviceProvider())
from ledgrab.core.devices.lifx_provider import LIFXDeviceProvider
register_provider(LIFXDeviceProvider())
# BLE support is optional — only register the provider if the ``bleak`` # BLE support is optional — only register the provider if the ``bleak``
# extra is installed. Importing the provider itself is safe (it doesn't # extra is installed. Importing the provider itself is safe (it doesn't
# import bleak at module load), but we still want a clean skip on # import bleak at module load), but we still want a clean skip on
@@ -0,0 +1,425 @@
"""LIFX LAN LED client.
LIFX bulbs and lightstrips accept a binary UDP protocol on port 56700.
Every packet has a 36-byte header (frame + frame-address + protocol-header)
followed by a type-specific payload. Colors are HSBK 16-bit per channel.
URL scheme: ``lifx://<host>[:port]`` or bare ``<host>[:port]``. Default port 56700.
LIFX bulbs are reachable two ways:
* Unicast — set the ``target`` field to the bulb's 48-bit MAC.
* Broadcast — set ``target`` to all zeros and ``tagged=1``; all bulbs on
the subnet act on the message. We use this for the SetColor hot path
so we don't have to learn the MAC of every device a user owns.
Reference: https://lan.developer.lifx.com/docs/header-description
"""
from __future__ import annotations
import asyncio
import socket
import struct
import time
from datetime import datetime, timezone
from typing import List, Optional, Tuple, Union
from urllib.parse import urlparse
import numpy as np
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
from ledgrab.utils import get_logger
logger = get_logger(__name__)
LIFX_PORT = 56700
DEFAULT_MIN_INTERVAL_S = 0.05 # ~20 Hz — LIFX rate-limit guidance is 20/sec
# Message types we care about
MSG_GET_SERVICE = 2
MSG_STATE_SERVICE = 3
MSG_SET_POWER = 21
MSG_SET_COLOR = 102
# Frame field byte 0 of the protocol header: tagged=1, addressable=1, protocol=1024
_FRAME_TAGGED = 0x3400
_FRAME_UNTAGGED = 0x1400
_SOURCE_ID = 0x4C474752 # "LGGR" — identifies LedGrab in protocol logs
def parse_lifx_url(url: str) -> Tuple[str, int]:
"""Pull ``(host, port)`` from ``lifx://host[:port]`` or bare ``host[:port]``."""
if not url:
raise ValueError("LIFX URL is empty")
raw = url.strip()
if "://" in raw:
parsed = urlparse(raw)
host = parsed.hostname or ""
port = parsed.port or LIFX_PORT
else:
parsed = urlparse(f"lifx://{raw}")
host = parsed.hostname or ""
port = parsed.port or LIFX_PORT
if not host:
raise ValueError(f"LIFX URL has no host: {url!r}")
return host, port
def _average_color(
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
) -> Tuple[int, int, int]:
"""Reduce an N-pixel strip to one average RGB triple."""
if isinstance(pixels, np.ndarray):
if pixels.size == 0:
return (0, 0, 0)
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
mean = arr.mean(axis=0)
return int(mean[0]), int(mean[1]), int(mean[2])
if not pixels:
return (0, 0, 0)
total_r = total_g = total_b = 0
for r, g, b in pixels:
total_r += r
total_g += g
total_b += b
n = len(pixels)
return total_r // n, total_g // n, total_b // n
def rgb_to_hsbk(r: int, g: int, b: int, kelvin: int = 3500) -> Tuple[int, int, int, int]:
"""Convert 8-bit RGB to LIFX 16-bit HSBK.
The ``kelvin`` channel is irrelevant when saturation > 0 (the bulb
interprets it as a hint, not a hard temperature), so we leave it at the
LIFX default of ~3500 K. Outputs are clamped uint16.
"""
r_n = max(0, min(255, r)) / 255.0
g_n = max(0, min(255, g)) / 255.0
b_n = max(0, min(255, b)) / 255.0
c_max = max(r_n, g_n, b_n)
c_min = min(r_n, g_n, b_n)
delta = c_max - c_min
# Hue (0-360 degrees → 0-65535)
if delta == 0:
h = 0.0
elif c_max == r_n:
h = 60.0 * (((g_n - b_n) / delta) % 6)
elif c_max == g_n:
h = 60.0 * (((b_n - r_n) / delta) + 2)
else:
h = 60.0 * (((r_n - g_n) / delta) + 4)
hue_u16 = int((h / 360.0) * 65535) & 0xFFFF
# Saturation (0-1 → 0-65535)
sat_u16 = 0 if c_max == 0 else int((delta / c_max) * 65535) & 0xFFFF
# Brightness (0-1 → 0-65535)
bri_u16 = int(c_max * 65535) & 0xFFFF
kelvin_u16 = max(2500, min(9000, kelvin)) & 0xFFFF
return hue_u16, sat_u16, bri_u16, kelvin_u16
def _build_packet(
*,
msg_type: int,
payload: bytes,
target_mac: bytes = b"\x00\x00\x00\x00\x00\x00",
sequence: int = 0,
res_required: bool = False,
ack_required: bool = False,
tagged: bool = False,
) -> bytes:
"""Pack a LIFX packet: 36-byte header + payload.
See https://lan.developer.lifx.com/docs/header-description for the
bit-level field layout. We construct the three sub-headers separately
so the magic numbers are scoped to the fields they belong to.
"""
size = 36 + len(payload)
# Frame header (8 bytes): size(2) | origin/tagged/addressable/protocol(2) | source(4)
frame_field = _FRAME_TAGGED if tagged else _FRAME_UNTAGGED
frame = struct.pack("<HHI", size, frame_field, _SOURCE_ID)
# Frame address (16 bytes): target(8) | reserved(6) | res/ack flags(1) | sequence(1)
flags = (0x01 if res_required else 0) | (0x02 if ack_required else 0)
frame_addr = (
target_mac + b"\x00\x00" + b"\x00\x00\x00\x00\x00\x00" + bytes([flags, sequence & 0xFF])
)
# Protocol header (12 bytes): reserved(8) | type(2) | reserved(2)
proto = b"\x00" * 8 + struct.pack("<HH", msg_type, 0)
return frame + frame_addr + proto + payload
def _build_set_color_payload(h: int, s: int, b: int, k: int, duration_ms: int = 0) -> bytes:
"""SetColor payload: reserved(1) | HSBK(8) | duration(4)."""
return b"\x00" + struct.pack(
"<HHHHI", h & 0xFFFF, s & 0xFFFF, b & 0xFFFF, k & 0xFFFF, duration_ms & 0xFFFFFFFF
)
def _build_set_power_payload(on: bool, duration_ms: int = 0) -> bytes:
"""SetPower payload: level(2) | duration(4). Level is 0 or 65535."""
return struct.pack("<HI", 65535 if on else 0, duration_ms & 0xFFFFFFFF)
def _parse_state_service_reply(raw: bytes) -> Optional[dict]:
"""Parse a LIFX StateService (discovery) reply.
Returns ``{"mac": "aabbccddeeff", "service": int, "port": int}`` or
``None`` if the payload isn't a StateService.
"""
if len(raw) < 36 + 5:
return None
# Read msg_type from the protocol header at offset 32
msg_type = struct.unpack_from("<H", raw, 32)[0]
if msg_type != MSG_STATE_SERVICE:
return None
# Target MAC at offset 8, 6 bytes are the MAC.
mac_bytes = raw[8:14]
mac = mac_bytes.hex()
# Payload at offset 36: service(1) + port(4)
service, port = struct.unpack_from("<BI", raw, 36)
return {"mac": mac, "service": int(service), "port": int(port)}
class _LIFXProtocol(asyncio.DatagramProtocol):
"""Write-only datagram protocol. Inbound replies dropped silently."""
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
# LIFX bulbs sometimes echo back state on broadcast. We don't need it
# for streaming ambilight — discard.
pass
def error_received(self, exc):
logger.debug("LIFX UDP error: %s", exc)
class LIFXClient(LEDClient):
"""LEDClient for a single LIFX bulb on the LAN."""
def __init__(
self,
url: str,
led_count: int = 1,
*,
min_interval_s: float = DEFAULT_MIN_INTERVAL_S,
):
host, port = parse_lifx_url(url)
self._host = host
self._port = port
self._led_count = led_count
self._min_interval_s = max(0.0, min_interval_s)
self._transport: Optional[asyncio.DatagramTransport] = None
self._protocol: Optional[_LIFXProtocol] = None
self._connected = False
self._next_tx_at: float = 0.0
self._sequence: int = 0
@property
def host(self) -> str:
return self._host
@property
def port(self) -> int:
return self._port
@property
def is_connected(self) -> bool:
return self._connected and self._transport is not None
@property
def device_led_count(self) -> Optional[int]:
return self._led_count or None
async def connect(self) -> bool:
if self._connected and self._transport is not None:
return True
loop = asyncio.get_running_loop()
try:
transport, protocol = await loop.create_datagram_endpoint(
_LIFXProtocol, remote_addr=(self._host, self._port)
)
except OSError as exc:
raise RuntimeError(f"Failed to open UDP to LIFX at {self._host}: {exc}") from exc
self._transport = transport
self._protocol = protocol # type: ignore[assignment]
self._connected = True
logger.info("LIFXClient connected to %s:%d", self._host, self._port)
return True
async def close(self) -> None:
if self._transport is not None:
try:
self._transport.close()
except OSError:
pass
self._transport = None
self._protocol = None
self._connected = False
def _next_sequence(self) -> int:
self._sequence = (self._sequence + 1) & 0xFF
return self._sequence
def _send(self, msg_type: int, payload: bytes) -> None:
"""Build and send one LIFX packet. Caller must hold an open transport."""
assert self._transport is not None
packet = _build_packet(
msg_type=msg_type,
payload=payload,
sequence=self._next_sequence(),
tagged=True, # broadcast within the unicast UDP socket — bulb still acts on it
)
self._transport.sendto(packet)
async def send_pixels(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> bool:
"""Average the strip → HSBK → SetColor."""
if not self.is_connected:
raise RuntimeError("LIFXClient not connected")
now = time.monotonic()
if now < self._next_tx_at:
return True
r, g, b = _average_color(pixels)
if brightness < 255:
scale = max(0, min(255, brightness)) / 255.0
r = int(r * scale)
g = int(g * scale)
b = int(b * scale)
h, s, br, k = rgb_to_hsbk(r, g, b)
self._send(MSG_SET_COLOR, _build_set_color_payload(h, s, br, k, duration_ms=0))
self._next_tx_at = now + self._min_interval_s
return True
def send_pixels_fast(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> None:
"""Synchronous variant — same shape, runs on the hot loop."""
if not self.is_connected or self._transport is None:
raise RuntimeError("LIFXClient not connected")
now = time.monotonic()
if now < self._next_tx_at:
return
r, g, b = _average_color(pixels)
if brightness < 255:
scale = max(0, min(255, brightness)) / 255.0
r = int(r * scale)
g = int(g * scale)
b = int(b * scale)
h, s, br, k = rgb_to_hsbk(r, g, b)
self._send(MSG_SET_COLOR, _build_set_color_payload(h, s, br, k, duration_ms=0))
self._next_tx_at = now + self._min_interval_s
@property
def supports_fast_send(self) -> bool:
return True
async def set_color(self, r: int, g: int, b: int) -> None:
if not self.is_connected:
raise RuntimeError("LIFXClient not connected")
h, s, br, k = rgb_to_hsbk(r, g, b)
self._send(MSG_SET_COLOR, _build_set_color_payload(h, s, br, k, duration_ms=0))
async def set_power(self, on: bool) -> None:
if not self.is_connected:
raise RuntimeError("LIFXClient not connected")
self._send(MSG_SET_POWER, _build_set_power_payload(on))
@classmethod
async def check_health(
cls,
url: str,
http_client,
prev_health: Optional[DeviceHealth] = None,
) -> DeviceHealth:
"""Send GetService and wait briefly for a StateService reply."""
now = datetime.now(timezone.utc)
try:
host, port = parse_lifx_url(url)
except ValueError as exc:
return DeviceHealth(online=False, last_checked=now, error=str(exc))
loop = asyncio.get_running_loop()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setblocking(False)
try:
sock.bind(("", 0))
probe = _build_packet(msg_type=MSG_GET_SERVICE, payload=b"", tagged=True)
start = loop.time()
await loop.sock_sendto(sock, probe, (host, port))
try:
await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=1.5)
except asyncio.TimeoutError:
return DeviceHealth(
online=False,
last_checked=now,
error=f"No LIFX reply from {host}:{port} within 1.5s",
)
latency_ms = (loop.time() - start) * 1000.0
return DeviceHealth(online=True, latency_ms=latency_ms, last_checked=now)
except OSError as exc:
return DeviceHealth(
online=False,
last_checked=now,
error=f"LIFX probe failed for {host}: {exc}",
)
finally:
sock.close()
# ============================================================================
# Broadcast discovery
# ============================================================================
async def discover_lifx_bulbs(timeout: float = 2.0) -> List[dict]:
"""Broadcast a GetService probe on the LAN and collect StateService replies.
Returns ``[{"ip": ..., "mac": ..., "port": ...}, ...]``.
"""
loop = asyncio.get_running_loop()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setblocking(False)
try:
sock.bind(("", 0))
probe = _build_packet(msg_type=MSG_GET_SERVICE, payload=b"", tagged=True)
await loop.sock_sendto(sock, probe, ("255.255.255.255", LIFX_PORT))
results: list[dict] = []
seen_macs: set[str] = set()
deadline = loop.time() + timeout
while True:
remaining = deadline - loop.time()
if remaining <= 0:
break
try:
raw, addr = await asyncio.wait_for(
loop.sock_recvfrom(sock, 4096),
timeout=remaining,
)
except asyncio.TimeoutError:
break
parsed = _parse_state_service_reply(raw)
if not parsed:
continue
mac = parsed["mac"]
if mac in seen_macs:
continue
seen_macs.add(mac)
results.append(
{
"ip": addr[0],
"mac": mac,
"port": parsed["port"],
}
)
return results
finally:
sock.close()
@@ -0,0 +1,92 @@
"""LIFX device provider — LAN-discoverable LIFX smart bulbs."""
from __future__ import annotations
from typing import TYPE_CHECKING, List
from ledgrab.core.devices.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
ProviderDeps,
)
from ledgrab.core.devices.lifx_client import (
LIFXClient,
discover_lifx_bulbs,
parse_lifx_url,
)
from ledgrab.utils import get_logger
if TYPE_CHECKING:
from ledgrab.core.devices.device_config import LIFXConfig
logger = get_logger(__name__)
class LIFXDeviceProvider(LEDDeviceProvider):
"""Provider for LIFX smart bulbs / lightstrips.
Single-pixel adapter: averages the strip down to one color, encodes as
HSBK (LIFX's native color model), and broadcasts a SetColor packet.
"""
@property
def device_type(self) -> str:
return "lifx"
@property
def capabilities(self) -> set:
return {
"manual_led_count",
"power_control",
"static_color",
"health_check",
"single_pixel",
}
def create_client(self, config: "LIFXConfig", *, deps: ProviderDeps) -> LEDClient:
return LIFXClient(
config.device_url,
led_count=config.led_count,
min_interval_s=max(0.0, config.lifx_min_interval_ms / 1000.0),
)
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
return await LIFXClient.check_health(url, http_client, prev_health)
async def validate_device(self, url: str) -> dict:
try:
host, port = parse_lifx_url(url)
except ValueError as exc:
raise ValueError(f"Invalid LIFX URL: {exc}") from exc
logger.info("LIFX device URL validated: host=%s port=%d", host, port)
return {}
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
try:
bulbs = await discover_lifx_bulbs(timeout=min(timeout, 5.0))
except (OSError, RuntimeError) as exc:
logger.warning("LIFX discovery failed: %s", exc)
return []
results: List[DiscoveredDevice] = []
for bulb in bulbs:
ip = bulb.get("ip", "")
if not ip:
continue
url = f"lifx://{ip}"
mac = bulb.get("mac", "")
results.append(
DiscoveredDevice(
name=f"LIFX {mac[-6:]}" if mac else "LIFX bulb",
url=url,
device_type="lifx",
ip=ip,
mac=mac,
led_count=None,
version=None,
)
)
logger.info("LIFX broadcast scan found %d bulb(s)", len(results))
return results
+4
View File
@@ -167,6 +167,10 @@ export function isWizDevice(type: string) {
return type === 'wiz'; return type === 'wiz';
} }
export function isLifxDevice(type: string) {
return type === 'lifx';
}
export function isUsbhidDevice(type: string) { export function isUsbhidDevice(type: string) {
return type === 'usbhid'; return type === 'usbhid';
} }
+1 -1
View File
@@ -48,7 +48,7 @@ const _deviceTypeIcons = {
wled: _svg(P.wifi), adalight: _svg(P.usb), ambiled: _svg(P.usb), wled: _svg(P.wifi), adalight: _svg(P.usb), ambiled: _svg(P.usb),
mqtt: _svg(P.send), ws: _svg(P.globe), openrgb: _svg(P.palette), mqtt: _svg(P.send), ws: _svg(P.globe), openrgb: _svg(P.palette),
dmx: _svg(P.radio), ddp: _svg(P.send), mock: _svg(P.wrench), dmx: _svg(P.radio), ddp: _svg(P.send), mock: _svg(P.wrench),
espnow: _svg(P.radio), hue: _svg(P.lightbulb), yeelight: _svg(P.lightbulb), wiz: _svg(P.lightbulb), espnow: _svg(P.radio), hue: _svg(P.lightbulb), yeelight: _svg(P.lightbulb), wiz: _svg(P.lightbulb), lifx: _svg(P.lightbulb),
usbhid: _svg(P.usb), usbhid: _svg(P.usb),
spi: _svg(P.plug), chroma: _svg(P.zap), gamesense: _svg(P.target), spi: _svg(P.plug), chroma: _svg(P.zap), gamesense: _svg(P.target),
ble: _svg(P.bluetooth), ble: _svg(P.bluetooth),
@@ -7,7 +7,7 @@ import {
_discoveryCache, set_discoveryCache, _discoveryCache, set_discoveryCache,
csptCache, csptCache,
} from '../core/state.ts'; } from '../core/state.ts';
import { API_BASE, fetchWithAuth, isSerialDevice, isMockDevice, isMqttDevice, isWsDevice, isOpenrgbDevice, isDmxDevice, isDdpDevice, isEspnowDevice, isHueDevice, isYeelightDevice, isWizDevice, isBleDevice, isUsbhidDevice, isSpiDevice, isChromaDevice, isGameSenseDevice, isGroupDevice, escapeHtml } from '../core/api.ts'; import { API_BASE, fetchWithAuth, isSerialDevice, isMockDevice, isMqttDevice, isWsDevice, isOpenrgbDevice, isDmxDevice, isDdpDevice, isEspnowDevice, isHueDevice, isYeelightDevice, isWizDevice, isLifxDevice, isBleDevice, isUsbhidDevice, isSpiDevice, isChromaDevice, isGameSenseDevice, isGroupDevice, escapeHtml } from '../core/api.ts';
import { devicesCache } from '../core/state.ts'; import { devicesCache } from '../core/state.ts';
import { t } from '../core/i18n.ts'; import { t } from '../core/i18n.ts';
import { showToast, desktopFocus } from '../core/ui.ts'; import { showToast, desktopFocus } from '../core/ui.ts';
@@ -43,6 +43,7 @@ class AddDeviceModal extends Modal {
bleGoveeKey: (document.getElementById('device-ble-govee-key') as HTMLInputElement)?.value || '', bleGoveeKey: (document.getElementById('device-ble-govee-key') as HTMLInputElement)?.value || '',
yeelightMinInterval: (document.getElementById('device-yeelight-min-interval') as HTMLInputElement)?.value || '500', yeelightMinInterval: (document.getElementById('device-yeelight-min-interval') as HTMLInputElement)?.value || '500',
wizMinInterval: (document.getElementById('device-wiz-min-interval') as HTMLInputElement)?.value || '50', wizMinInterval: (document.getElementById('device-wiz-min-interval') as HTMLInputElement)?.value || '50',
lifxMinInterval: (document.getElementById('device-lifx-min-interval') as HTMLInputElement)?.value || '50',
groupChildren: JSON.stringify(_getGroupChildIds('device')), groupChildren: JSON.stringify(_getGroupChildIds('device')),
groupMode: (document.getElementById('device-group-mode-select') as HTMLSelectElement)?.value || 'sequence', groupMode: (document.getElementById('device-group-mode-select') as HTMLSelectElement)?.value || 'sequence',
}; };
@@ -53,7 +54,7 @@ const addDeviceModal = new AddDeviceModal();
/* ── Icon-grid type selector ──────────────────────────────────── */ /* ── Icon-grid type selector ──────────────────────────────────── */
const DEVICE_TYPE_KEYS = ['wled', 'adalight', 'ambiled', 'mqtt', 'ws', 'openrgb', 'dmx', 'ddp', 'espnow', 'hue', 'yeelight', 'wiz', 'ble', 'usbhid', 'spi', 'chroma', 'gamesense', 'group', 'mock']; const DEVICE_TYPE_KEYS = ['wled', 'adalight', 'ambiled', 'mqtt', 'ws', 'openrgb', 'dmx', 'ddp', 'espnow', 'hue', 'yeelight', 'wiz', 'lifx', 'ble', 'usbhid', 'spi', 'chroma', 'gamesense', 'group', 'mock'];
function _buildDeviceTypeItems() { function _buildDeviceTypeItems() {
return DEVICE_TYPE_KEYS.map(key => ({ return DEVICE_TYPE_KEYS.map(key => ({
@@ -282,6 +283,7 @@ export function onDeviceTypeChanged() {
_showHueFields(false); _showHueFields(false);
_showYeelightFields(false); _showYeelightFields(false);
_showWizFields(false); _showWizFields(false);
_showLifxFields(false);
_showBleFields(false); _showBleFields(false);
_showSpiFields(false); _showSpiFields(false);
_showChromaFields(false); _showChromaFields(false);
@@ -503,6 +505,28 @@ export function onDeviceTypeChanged() {
} else { } else {
scanForDevices(); scanForDevices();
} }
} else if (isLifxDevice(deviceType)) {
// LIFX: binary UDP on port 56700. Show URL (LAN IP), LED count
// (controls source mapping; LIFX is single-pixel — HSBK averaged
// from the strip), rate-limit ms. Discovery uses UDP broadcast.
urlGroup.style.display = '';
urlInput.setAttribute('required', '');
serialGroup.style.display = 'none';
serialSelect.removeAttribute('required');
ledCountGroup.style.display = '';
baudRateGroup.style.display = 'none';
if (ledTypeGroup) ledTypeGroup.style.display = 'none';
if (sendLatencyGroup) sendLatencyGroup.style.display = 'none';
if (scanBtn) scanBtn.style.display = '';
_showLifxFields(true);
if (urlLabel) urlLabel.textContent = t('device.lifx.url') || 'IP Address:';
if (urlHint) urlHint.textContent = t('device.lifx.url.hint') || 'LAN IP of the LIFX bulb. UDP port 56700 is the protocol default.';
urlInput.placeholder = t('device.lifx.url.placeholder') || '192.168.1.50';
if (deviceType in _discoveryCache) {
_renderDiscoveryList();
} else {
scanForDevices();
}
} else if (isBleDevice(deviceType)) { } else if (isBleDevice(deviceType)) {
// BLE: show URL (ble://<address>), LED count, protocol family picker, // BLE: show URL (ble://<address>), LED count, protocol family picker,
// and a Govee-only AES key field that toggles with the family selection. // and a Govee-only AES key field that toggles with the family selection.
@@ -856,6 +880,13 @@ export function showAddDevice(presetType: any = null, cloneData: any = null) {
wmi.value = String(cloneData.wiz_min_interval_ms); wmi.value = String(cloneData.wiz_min_interval_ms);
} }
} }
// Prefill LIFX fields
if (isLifxDevice(presetType)) {
const lmi = document.getElementById('device-lifx-min-interval') as HTMLInputElement;
if (lmi && cloneData.lifx_min_interval_ms != null) {
lmi.value = String(cloneData.lifx_min_interval_ms);
}
}
// Prefill CSPT template selector (after fetch completes) // Prefill CSPT template selector (after fetch completes)
if (cloneData.default_css_processing_template_id) { if (cloneData.default_css_processing_template_id) {
csptCache.fetch().then(() => { csptCache.fetch().then(() => {
@@ -1067,6 +1098,11 @@ export async function handleAddDevice(event: any) {
const parsed = parseInt(raw || '50', 10); const parsed = parseInt(raw || '50', 10);
body.wiz_min_interval_ms = Number.isFinite(parsed) ? parsed : 50; body.wiz_min_interval_ms = Number.isFinite(parsed) ? parsed : 50;
} }
if (isLifxDevice(deviceType)) {
const raw = (document.getElementById('device-lifx-min-interval') as HTMLInputElement)?.value;
const parsed = parseInt(raw || '50', 10);
body.lifx_min_interval_ms = Number.isFinite(parsed) ? parsed : 50;
}
if (isBleDevice(deviceType)) { if (isBleDevice(deviceType)) {
body.ble_family = (document.getElementById('device-ble-family') as HTMLSelectElement)?.value || 'sp110e'; body.ble_family = (document.getElementById('device-ble-family') as HTMLSelectElement)?.value || 'sp110e';
const goveeKey = (document.getElementById('device-ble-govee-key') as HTMLInputElement)?.value?.trim(); const goveeKey = (document.getElementById('device-ble-govee-key') as HTMLInputElement)?.value?.trim();
@@ -1430,6 +1466,11 @@ function _showWizFields(show: boolean) {
if (el) el.style.display = show ? '' : 'none'; if (el) el.style.display = show ? '' : 'none';
} }
function _showLifxFields(show: boolean) {
const el = document.getElementById('device-lifx-min-interval-group') as HTMLElement | null;
if (el) el.style.display = show ? '' : 'none';
}
// Tracks whether the BLE fields are currently shown — avoids reading // Tracks whether the BLE fields are currently shown — avoids reading
// style.display strings in _updateBleGoveeKeyVisibility. // style.display strings in _updateBleGoveeKeyVisibility.
let _bleFieldsVisible = false; let _bleFieldsVisible = false;
@@ -6,7 +6,7 @@ import {
_deviceBrightnessCache, updateDeviceBrightness, _deviceBrightnessCache, updateDeviceBrightness,
csptCache, csptCache,
} from '../core/state.ts'; } from '../core/state.ts';
import { API_BASE, getHeaders, fetchWithAuth, escapeHtml, isSerialDevice, isMockDevice, isMqttDevice, isWsDevice, isOpenrgbDevice, isDmxDevice, isDdpDevice, isYeelightDevice, isWizDevice, isBleDevice, isGroupDevice } from '../core/api.ts'; import { API_BASE, getHeaders, fetchWithAuth, escapeHtml, isSerialDevice, isMockDevice, isMqttDevice, isWsDevice, isOpenrgbDevice, isDmxDevice, isDdpDevice, isYeelightDevice, isWizDevice, isLifxDevice, isBleDevice, isGroupDevice } from '../core/api.ts';
import { devicesCache } from '../core/state.ts'; import { devicesCache } from '../core/state.ts';
import { _fetchOpenrgbZones, _getCheckedZones, _splitOpenrgbZone, _getZoneMode, ensureDmxProtocolIconSelect, destroyDmxProtocolIconSelect, ensureDdpColorOrderIconSelect, destroyDdpColorOrderIconSelect, ensureSpiLedTypeIconSelect, destroySpiLedTypeIconSelect, ensureGameSenseDeviceTypeIconSelect, destroyGameSenseDeviceTypeIconSelect, addGroupChildSettingsWithId as _addGroupChildSettingsWithId, ensureGroupModeIconSelect, destroyGroupModeIconSelect, ensureBleFamilyIconSelect, destroyBleFamilyIconSelect } from './device-discovery.ts'; import { _fetchOpenrgbZones, _getCheckedZones, _splitOpenrgbZone, _getZoneMode, ensureDmxProtocolIconSelect, destroyDmxProtocolIconSelect, ensureDdpColorOrderIconSelect, destroyDdpColorOrderIconSelect, ensureSpiLedTypeIconSelect, destroySpiLedTypeIconSelect, ensureGameSenseDeviceTypeIconSelect, destroyGameSenseDeviceTypeIconSelect, addGroupChildSettingsWithId as _addGroupChildSettingsWithId, ensureGroupModeIconSelect, destroyGroupModeIconSelect, ensureBleFamilyIconSelect, destroyBleFamilyIconSelect } from './device-discovery.ts';
import { t } from '../core/i18n.ts'; import { t } from '../core/i18n.ts';
@@ -97,6 +97,7 @@ class DeviceSettingsModal extends Modal {
bleGoveeKey: (document.getElementById('settings-ble-govee-key') as HTMLInputElement | null)?.value || '', bleGoveeKey: (document.getElementById('settings-ble-govee-key') as HTMLInputElement | null)?.value || '',
yeelightMinInterval: (document.getElementById('settings-yeelight-min-interval') as HTMLInputElement | null)?.value || '500', yeelightMinInterval: (document.getElementById('settings-yeelight-min-interval') as HTMLInputElement | null)?.value || '500',
wizMinInterval: (document.getElementById('settings-wiz-min-interval') as HTMLInputElement | null)?.value || '50', wizMinInterval: (document.getElementById('settings-wiz-min-interval') as HTMLInputElement | null)?.value || '50',
lifxMinInterval: (document.getElementById('settings-lifx-min-interval') as HTMLInputElement | null)?.value || '50',
csptId: (document.getElementById('settings-css-processing-template') as HTMLSelectElement | null)?.value || '', csptId: (document.getElementById('settings-css-processing-template') as HTMLSelectElement | null)?.value || '',
}; };
} }
@@ -667,6 +668,24 @@ export async function showSettings(deviceId: any) {
if (wizMinIntervalGroup) (wizMinIntervalGroup as HTMLElement).style.display = 'none'; if (wizMinIntervalGroup) (wizMinIntervalGroup as HTMLElement).style.display = 'none';
} }
// LIFX-specific fields — binary UDP on port 56700, single-pixel
// (HSBK averaged from the strip). LIFX recommends ≤20 cmd/sec per
// device; default 50 ms matches that ceiling.
const lifxMinIntervalGroup = document.getElementById('settings-lifx-min-interval-group');
if (isLifxDevice(device.device_type)) {
if (lifxMinIntervalGroup) (lifxMinIntervalGroup as HTMLElement).style.display = '';
const lmi = device.lifx_min_interval_ms ?? 50;
(document.getElementById('settings-lifx-min-interval') as HTMLInputElement).value = String(lmi);
// Relabel URL field as IP Address (same pattern as WiZ/Yeelight/DMX/DDP)
const urlLabel6 = urlGroup.querySelector('label[for="settings-device-url"]') as HTMLElement | null;
const urlHint6 = urlGroup.querySelector('.input-hint') as HTMLElement | null;
if (urlLabel6) urlLabel6.textContent = t('device.lifx.url');
if (urlHint6) urlHint6.textContent = t('device.lifx.url.hint');
urlInput.placeholder = t('device.lifx.url.placeholder') || '192.168.1.50';
} else {
if (lifxMinIntervalGroup) (lifxMinIntervalGroup as HTMLElement).style.display = 'none';
}
// BLE-specific fields — exposed in the settings modal so the user // BLE-specific fields — exposed in the settings modal so the user
// can fix a wrong protocol family pick without deleting+recreating // can fix a wrong protocol family pick without deleting+recreating
// the device. Uses the shared IconSelect grid (project rule bans // the device. Uses the shared IconSelect grid (project rule bans
@@ -813,6 +832,11 @@ export async function saveDeviceSettings() {
const parsed = parseInt(raw || '50', 10); const parsed = parseInt(raw || '50', 10);
body.wiz_min_interval_ms = Number.isFinite(parsed) ? parsed : 50; body.wiz_min_interval_ms = Number.isFinite(parsed) ? parsed : 50;
} }
if (isLifxDevice(settingsModal.deviceType)) {
const raw = (document.getElementById('settings-lifx-min-interval') as HTMLInputElement | null)?.value;
const parsed = parseInt(raw || '50', 10);
body.lifx_min_interval_ms = Number.isFinite(parsed) ? parsed : 50;
}
if (isBleDevice(settingsModal.deviceType)) { if (isBleDevice(settingsModal.deviceType)) {
body.ble_family = (document.getElementById('settings-ble-family') as HTMLSelectElement | null)?.value || 'sp110e'; body.ble_family = (document.getElementById('settings-ble-family') as HTMLSelectElement | null)?.value || 'sp110e';
const goveeKey = (document.getElementById('settings-ble-govee-key') as HTMLInputElement | null)?.value?.trim() || ''; const goveeKey = (document.getElementById('settings-ble-govee-key') as HTMLInputElement | null)?.value?.trim() || '';
+2 -1
View File
@@ -47,7 +47,7 @@ export function bindableColorSourceId(b: BindableColor | undefined): string {
export type DeviceType = export type DeviceType =
| 'wled' | 'adalight' | 'ambiled' | 'mock' | 'mqtt' | 'ws' | 'wled' | 'adalight' | 'ambiled' | 'mock' | 'mqtt' | 'ws'
| 'openrgb' | 'dmx' | 'ddp' | 'espnow' | 'hue' | 'yeelight' | 'wiz' | 'openrgb' | 'dmx' | 'ddp' | 'espnow' | 'hue' | 'yeelight' | 'wiz' | 'lifx'
| 'ble' | 'usbhid' | 'spi' | 'ble' | 'usbhid' | 'spi'
| 'chroma' | 'gamesense' | 'group'; | 'chroma' | 'gamesense' | 'group';
@@ -78,6 +78,7 @@ export interface Device {
hue_entertainment_group_id: string; hue_entertainment_group_id: string;
yeelight_min_interval_ms: number; yeelight_min_interval_ms: number;
wiz_min_interval_ms: number; wiz_min_interval_ms: number;
lifx_min_interval_ms: number;
spi_speed_hz: number; spi_speed_hz: number;
spi_led_type: string; spi_led_type: string;
chroma_device_type: string; chroma_device_type: string;
@@ -211,6 +211,13 @@
"device.wiz.url.placeholder": "192.168.1.50", "device.wiz.url.placeholder": "192.168.1.50",
"device.wiz_min_interval": "Min Update Interval:", "device.wiz_min_interval": "Min Update Interval:",
"device.wiz_min_interval.hint": "Client-side rate limit between commands in ms. UDP fire-and-forget tolerates fast updates; default 50 ms ≈ 20 Hz.", "device.wiz_min_interval.hint": "Client-side rate limit between commands in ms. UDP fire-and-forget tolerates fast updates; default 50 ms ≈ 20 Hz.",
"device.type.lifx": "LIFX",
"device.type.lifx.desc": "LIFX smart bulb / lightstrip over LAN",
"device.lifx.url": "IP Address:",
"device.lifx.url.hint": "LAN IP of the LIFX bulb. UDP port 56700 is the protocol default.",
"device.lifx.url.placeholder": "192.168.1.50",
"device.lifx_min_interval": "Min Update Interval:",
"device.lifx_min_interval.hint": "Client-side rate limit between commands in ms. LIFX recommends ≤20 cmd/sec; default 50 ms matches that ceiling.",
"device.type.ble": "BLE LED Controller", "device.type.ble": "BLE LED Controller",
"device.type.ble.desc": "Bluetooth LE strips: SP110E, Triones, Zengge, Govee (whole-strip color)", "device.type.ble.desc": "Bluetooth LE strips: SP110E, Triones, Zengge, Govee (whole-strip color)",
"device.ble.url": "BLE Address:", "device.ble.url": "BLE Address:",
@@ -266,6 +266,13 @@
"device.wiz.url.placeholder": "192.168.1.50", "device.wiz.url.placeholder": "192.168.1.50",
"device.wiz_min_interval": "Мин. интервал обновления:", "device.wiz_min_interval": "Мин. интервал обновления:",
"device.wiz_min_interval.hint": "Локальный лимит частоты команд (мс). UDP fire-and-forget справляется с быстрыми обновлениями; по умолчанию 50 мс ≈ 20 Гц.", "device.wiz_min_interval.hint": "Локальный лимит частоты команд (мс). UDP fire-and-forget справляется с быстрыми обновлениями; по умолчанию 50 мс ≈ 20 Гц.",
"device.type.lifx": "LIFX",
"device.type.lifx.desc": "Умная лампа / лента LIFX по LAN",
"device.lifx.url": "IP-адрес:",
"device.lifx.url.hint": "IP-адрес лампы LIFX в локальной сети. UDP-порт 56700 — по умолчанию.",
"device.lifx.url.placeholder": "192.168.1.50",
"device.lifx_min_interval": "Мин. интервал обновления:",
"device.lifx_min_interval.hint": "Локальный лимит частоты команд (мс). LIFX рекомендует ≤20 команд/сек; по умолчанию 50 мс соответствует этому потолку.",
"device.type.ble": "BLE LED контроллер", "device.type.ble": "BLE LED контроллер",
"device.type.ble.desc": "Bluetooth LE ленты: SP110E, Triones, Zengge, Govee (один цвет на всю ленту)", "device.type.ble.desc": "Bluetooth LE ленты: SP110E, Triones, Zengge, Govee (один цвет на всю ленту)",
"device.ble.url": "BLE адрес:", "device.ble.url": "BLE адрес:",
@@ -264,6 +264,13 @@
"device.wiz.url.placeholder": "192.168.1.50", "device.wiz.url.placeholder": "192.168.1.50",
"device.wiz_min_interval": "最小更新间隔:", "device.wiz_min_interval": "最小更新间隔:",
"device.wiz_min_interval.hint": "客户端命令速率限制(毫秒)。UDP 即发即忘可处理快速更新;默认 50 毫秒 ≈ 20 Hz。", "device.wiz_min_interval.hint": "客户端命令速率限制(毫秒)。UDP 即发即忘可处理快速更新;默认 50 毫秒 ≈ 20 Hz。",
"device.type.lifx": "LIFX",
"device.type.lifx.desc": "通过局域网连接 LIFX 智能灯泡/灯带",
"device.lifx.url": "IP 地址:",
"device.lifx.url.hint": "LIFX 灯泡的局域网 IP。UDP 端口 56700 为协议默认值。",
"device.lifx.url.placeholder": "192.168.1.50",
"device.lifx_min_interval": "最小更新间隔:",
"device.lifx_min_interval.hint": "客户端命令速率限制(毫秒)。LIFX 建议 ≤20 cmd/sec;默认 50 毫秒符合该上限。",
"device.type.ble": "BLE LED 控制器", "device.type.ble": "BLE LED 控制器",
"device.type.ble.desc": "Bluetooth LE 灯带:SP110E、Triones、Zengge、Govee(整条灯带同色)", "device.type.ble.desc": "Bluetooth LE 灯带:SP110E、Triones、Zengge、Govee(整条灯带同色)",
"device.ble.url": "BLE 地址:", "device.ble.url": "BLE 地址:",
@@ -66,6 +66,8 @@ class Device:
yeelight_min_interval_ms: int = 500, yeelight_min_interval_ms: int = 500,
# WiZ fields # WiZ fields
wiz_min_interval_ms: int = 50, wiz_min_interval_ms: int = 50,
# LIFX fields
lifx_min_interval_ms: int = 50,
# SPI Direct fields # SPI Direct fields
spi_speed_hz: int = 800000, spi_speed_hz: int = 800000,
spi_led_type: str = "WS2812B", spi_led_type: str = "WS2812B",
@@ -115,6 +117,7 @@ class Device:
self.hue_entertainment_group_id = hue_entertainment_group_id self.hue_entertainment_group_id = hue_entertainment_group_id
self.yeelight_min_interval_ms = yeelight_min_interval_ms self.yeelight_min_interval_ms = yeelight_min_interval_ms
self.wiz_min_interval_ms = wiz_min_interval_ms self.wiz_min_interval_ms = wiz_min_interval_ms
self.lifx_min_interval_ms = lifx_min_interval_ms
self.spi_speed_hz = spi_speed_hz self.spi_speed_hz = spi_speed_hz
self.spi_led_type = spi_led_type self.spi_led_type = spi_led_type
self.chroma_device_type = chroma_device_type self.chroma_device_type = chroma_device_type
@@ -153,6 +156,7 @@ class Device:
MQTTConfig, MQTTConfig,
OpenRGBConfig, OpenRGBConfig,
SPIConfig, SPIConfig,
LIFXConfig,
USBHIDConfig, USBHIDConfig,
WiZConfig, WiZConfig,
WLEDConfig, WLEDConfig,
@@ -213,6 +217,11 @@ class Device:
**base, **base,
wiz_min_interval_ms=self.wiz_min_interval_ms, wiz_min_interval_ms=self.wiz_min_interval_ms,
) )
if dt == "lifx":
return LIFXConfig(
**base,
lifx_min_interval_ms=self.lifx_min_interval_ms,
)
if dt == "spi": if dt == "spi":
return SPIConfig(**base, spi_speed_hz=self.spi_speed_hz, spi_led_type=self.spi_led_type) return SPIConfig(**base, spi_speed_hz=self.spi_speed_hz, spi_led_type=self.spi_led_type)
if dt == "chroma": if dt == "chroma":
@@ -295,6 +304,8 @@ class Device:
d["yeelight_min_interval_ms"] = self.yeelight_min_interval_ms d["yeelight_min_interval_ms"] = self.yeelight_min_interval_ms
if self.wiz_min_interval_ms != 50: if self.wiz_min_interval_ms != 50:
d["wiz_min_interval_ms"] = self.wiz_min_interval_ms d["wiz_min_interval_ms"] = self.wiz_min_interval_ms
if self.lifx_min_interval_ms != 50:
d["lifx_min_interval_ms"] = self.lifx_min_interval_ms
if self.spi_speed_hz != 800000: if self.spi_speed_hz != 800000:
d["spi_speed_hz"] = self.spi_speed_hz d["spi_speed_hz"] = self.spi_speed_hz
if self.spi_led_type != "WS2812B": if self.spi_led_type != "WS2812B":
@@ -352,6 +363,7 @@ class Device:
hue_entertainment_group_id=data.get("hue_entertainment_group_id", ""), hue_entertainment_group_id=data.get("hue_entertainment_group_id", ""),
yeelight_min_interval_ms=data.get("yeelight_min_interval_ms", 500), yeelight_min_interval_ms=data.get("yeelight_min_interval_ms", 500),
wiz_min_interval_ms=data.get("wiz_min_interval_ms", 50), wiz_min_interval_ms=data.get("wiz_min_interval_ms", 50),
lifx_min_interval_ms=data.get("lifx_min_interval_ms", 50),
spi_speed_hz=data.get("spi_speed_hz", 800000), spi_speed_hz=data.get("spi_speed_hz", 800000),
spi_led_type=data.get("spi_led_type", "WS2812B"), spi_led_type=data.get("spi_led_type", "WS2812B"),
chroma_device_type=data.get("chroma_device_type", "chromalink"), chroma_device_type=data.get("chroma_device_type", "chromalink"),
@@ -401,6 +413,7 @@ _UPDATABLE_FIELDS: frozenset[str] = frozenset(
"hue_entertainment_group_id", "hue_entertainment_group_id",
"yeelight_min_interval_ms", "yeelight_min_interval_ms",
"wiz_min_interval_ms", "wiz_min_interval_ms",
"lifx_min_interval_ms",
"spi_speed_hz", "spi_speed_hz",
"spi_led_type", "spi_led_type",
"chroma_device_type", "chroma_device_type",
@@ -503,6 +516,7 @@ class DeviceStore(BaseSqliteStore[Device]):
hue_entertainment_group_id: str = "", hue_entertainment_group_id: str = "",
yeelight_min_interval_ms: int = 500, yeelight_min_interval_ms: int = 500,
wiz_min_interval_ms: int = 50, wiz_min_interval_ms: int = 50,
lifx_min_interval_ms: int = 50,
spi_speed_hz: int = 800000, spi_speed_hz: int = 800000,
spi_led_type: str = "WS2812B", spi_led_type: str = "WS2812B",
chroma_device_type: str = "chromalink", chroma_device_type: str = "chromalink",
@@ -548,6 +562,7 @@ class DeviceStore(BaseSqliteStore[Device]):
hue_entertainment_group_id=hue_entertainment_group_id, hue_entertainment_group_id=hue_entertainment_group_id,
yeelight_min_interval_ms=yeelight_min_interval_ms, yeelight_min_interval_ms=yeelight_min_interval_ms,
wiz_min_interval_ms=wiz_min_interval_ms, wiz_min_interval_ms=wiz_min_interval_ms,
lifx_min_interval_ms=lifx_min_interval_ms,
spi_speed_hz=spi_speed_hz, spi_speed_hz=spi_speed_hz,
spi_led_type=spi_led_type, spi_led_type=spi_led_type,
chroma_device_type=chroma_device_type, chroma_device_type=chroma_device_type,
@@ -48,6 +48,7 @@
<option value="hue">Philips Hue</option> <option value="hue">Philips Hue</option>
<option value="yeelight">Yeelight</option> <option value="yeelight">Yeelight</option>
<option value="wiz">WiZ</option> <option value="wiz">WiZ</option>
<option value="lifx">LIFX</option>
<option value="ble">BLE LED Controller</option> <option value="ble">BLE LED Controller</option>
<option value="usbhid">USB HID</option> <option value="usbhid">USB HID</option>
<option value="spi">SPI Direct</option> <option value="spi">SPI Direct</option>
@@ -235,6 +236,15 @@
<small class="input-hint" style="display:none" data-i18n="device.wiz_min_interval.hint">Client-side rate limit between commands in ms. UDP fire-and-forget tolerates fast updates; default 50 ms ≈ 20 Hz.</small> <small class="input-hint" style="display:none" data-i18n="device.wiz_min_interval.hint">Client-side rate limit between commands in ms. UDP fire-and-forget tolerates fast updates; default 50 ms ≈ 20 Hz.</small>
<input type="number" id="device-wiz-min-interval" min="0" max="10000" step="10" value="50"> <input type="number" id="device-wiz-min-interval" min="0" max="10000" step="10" value="50">
</div> </div>
<!-- LIFX fields -->
<div class="form-group" id="device-lifx-min-interval-group" style="display: none;">
<div class="label-row">
<label for="device-lifx-min-interval" data-i18n="device.lifx_min_interval">Min Update Interval:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="device.lifx_min_interval.hint">Client-side rate limit between commands in ms. LIFX recommends ≤20 cmd/sec; default 50 ms matches that ceiling.</small>
<input type="number" id="device-lifx-min-interval" min="0" max="10000" step="10" value="50">
</div>
<!-- ESP-NOW fields --> <!-- ESP-NOW fields -->
<div class="form-group" id="device-espnow-peer-mac-group" style="display: none;"> <div class="form-group" id="device-espnow-peer-mac-group" style="display: none;">
<div class="label-row"> <div class="label-row">
@@ -268,6 +268,15 @@
<input type="number" id="settings-wiz-min-interval" min="0" max="10000" step="10" value="50"> <input type="number" id="settings-wiz-min-interval" min="0" max="10000" step="10" value="50">
</div> </div>
<div class="form-group" id="settings-lifx-min-interval-group" style="display: none;">
<div class="label-row">
<label for="settings-lifx-min-interval" data-i18n="device.lifx_min_interval">Min Update Interval:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="device.lifx_min_interval.hint">Client-side rate limit between commands in ms. LIFX recommends ≤20 cmd/sec; default 50 ms matches that ceiling.</small>
<input type="number" id="settings-lifx-min-interval" min="0" max="10000" step="10" value="50">
</div>
<div class="form-group" id="settings-send-latency-group" style="display: none;"> <div class="form-group" id="settings-send-latency-group" style="display: none;">
<div class="label-row"> <div class="label-row">
<label for="settings-send-latency" data-i18n="device.send_latency">Send Latency (ms):</label> <label for="settings-send-latency" data-i18n="device.send_latency">Send Latency (ms):</label>
+479
View File
@@ -0,0 +1,479 @@
"""Tests for the LIFX LAN LED client + provider."""
from __future__ import annotations
import struct
from unittest.mock import MagicMock
import numpy as np
import pytest
from ledgrab.core.devices.device_config import LIFXConfig
from ledgrab.core.devices.led_client import ProviderDeps
from ledgrab.core.devices.lifx_client import (
LIFX_PORT,
MSG_GET_SERVICE,
MSG_SET_COLOR,
MSG_SET_POWER,
MSG_STATE_SERVICE,
LIFXClient,
_average_color,
_build_packet,
_build_set_color_payload,
_build_set_power_payload,
_parse_state_service_reply,
parse_lifx_url,
rgb_to_hsbk,
)
from ledgrab.core.devices.lifx_provider import LIFXDeviceProvider
# ============================================================================
# URL parsing
# ============================================================================
@pytest.mark.parametrize(
"url,expected",
[
("lifx://192.168.1.50", ("192.168.1.50", LIFX_PORT)),
("lifx://192.168.1.50:56700", ("192.168.1.50", 56700)),
("192.168.1.50", ("192.168.1.50", LIFX_PORT)),
("192.168.1.50:56700", ("192.168.1.50", 56700)),
("bulb.local", ("bulb.local", LIFX_PORT)),
],
)
def test_parse_lifx_url(url, expected):
assert parse_lifx_url(url) == expected
@pytest.mark.parametrize("url", ["", " ", "lifx://", "://192.168.1.1"])
def test_parse_lifx_url_rejects_empty(url):
with pytest.raises(ValueError):
parse_lifx_url(url)
# ============================================================================
# RGB → HSBK conversion
# ============================================================================
def test_rgb_to_hsbk_pure_red():
h, s, b, k = rgb_to_hsbk(255, 0, 0)
assert h == 0
assert s == 65535
assert b == 65535
assert 2500 <= k <= 9000
def test_rgb_to_hsbk_pure_green():
h, s, b, k = rgb_to_hsbk(0, 255, 0)
# Green is at 120° → hue = 120/360 * 65535 ≈ 21845
assert abs(h - 21845) < 5
assert s == 65535
assert b == 65535
def test_rgb_to_hsbk_pure_blue():
h, s, b, _k = rgb_to_hsbk(0, 0, 255)
assert abs(h - 43690) < 5
assert s == 65535
assert b == 65535
def test_rgb_to_hsbk_black_is_zero_brightness():
h, s, b, _k = rgb_to_hsbk(0, 0, 0)
assert b == 0
assert s == 0
def test_rgb_to_hsbk_white_has_full_brightness_zero_saturation():
_h, s, b, _k = rgb_to_hsbk(255, 255, 255)
assert s == 0
assert b == 65535
def test_rgb_to_hsbk_clamps_out_of_range_input():
# Negative / >255 should still produce a valid HSBK
h, s, b, _k = rgb_to_hsbk(-50, 999, 128)
assert 0 <= h <= 0xFFFF
assert 0 <= s <= 0xFFFF
assert 0 <= b <= 0xFFFF
# ============================================================================
# Packet construction
# ============================================================================
def test_build_packet_size_header_matches_total():
packet = _build_packet(msg_type=MSG_SET_POWER, payload=b"\x00\x01\x02\x03\x04\x05")
size = struct.unpack_from("<H", packet, 0)[0]
assert size == len(packet)
def test_build_packet_encodes_msg_type():
packet = _build_packet(msg_type=MSG_SET_COLOR, payload=b"")
msg_type = struct.unpack_from("<H", packet, 32)[0]
assert msg_type == MSG_SET_COLOR
def test_build_packet_tagged_flag_set():
packet = _build_packet(msg_type=MSG_GET_SERVICE, payload=b"", tagged=True)
frame_field = struct.unpack_from("<H", packet, 2)[0]
# Tagged bit (0x2000) must be set in the frame header
assert frame_field & 0x2000
def test_build_packet_target_mac_at_offset_8():
target = b"\xaa\xbb\xcc\xdd\xee\xff"
packet = _build_packet(msg_type=MSG_SET_COLOR, payload=b"", target_mac=target)
assert packet[8:14] == target
def test_build_packet_sequence_byte_at_offset_23():
packet = _build_packet(msg_type=MSG_SET_COLOR, payload=b"", sequence=42)
assert packet[23] == 42
def test_set_color_payload_layout():
payload = _build_set_color_payload(h=10, s=20, b=30, k=4000, duration_ms=500)
# reserved byte + four uint16 + uint32 = 13 bytes
assert len(payload) == 13
assert payload[0] == 0
h, s, br, k, duration = struct.unpack_from("<HHHHI", payload, 1)
assert (h, s, br, k, duration) == (10, 20, 30, 4000, 500)
def test_set_power_payload_on_off():
on_payload = _build_set_power_payload(True)
off_payload = _build_set_power_payload(False)
on_level = struct.unpack_from("<H", on_payload, 0)[0]
off_level = struct.unpack_from("<H", off_payload, 0)[0]
assert on_level == 65535
assert off_level == 0
# ============================================================================
# StateService reply parsing
# ============================================================================
def test_parse_state_service_extracts_mac_and_port():
"""Build a synthetic LIFX reply and parse it back."""
mac = b"\xaa\xbb\xcc\xdd\xee\xff"
payload = struct.pack("<BI", 1, 56700) # service=1, port=56700
packet = _build_packet(
msg_type=MSG_STATE_SERVICE,
payload=payload,
target_mac=mac,
)
parsed = _parse_state_service_reply(packet)
assert parsed is not None
assert parsed["mac"] == mac.hex()
assert parsed["service"] == 1
assert parsed["port"] == 56700
def test_parse_state_service_rejects_wrong_msg_type():
"""A SetColor packet sent back must not be misread as a StateService."""
payload = _build_set_color_payload(0, 0, 0, 0)
packet = _build_packet(msg_type=MSG_SET_COLOR, payload=payload)
assert _parse_state_service_reply(packet) is None
def test_parse_state_service_rejects_runt_payload():
assert _parse_state_service_reply(b"") is None
assert _parse_state_service_reply(b"\x00" * 35) is None
# ============================================================================
# Helpers
# ============================================================================
def test_average_color_numpy():
pixels = np.array([[10, 20, 30], [40, 50, 60], [70, 80, 90]], dtype=np.uint8)
assert _average_color(pixels) == (40, 50, 60)
def test_average_color_list_and_empty():
assert _average_color([(10, 0, 0), (20, 0, 0), (30, 0, 0)]) == (20, 0, 0)
assert _average_color([]) == (0, 0, 0)
# ============================================================================
# LIFXClient (mocked transport)
# ============================================================================
def _make_connected_client(min_interval_s: float = 0.0) -> LIFXClient:
client = LIFXClient("lifx://127.0.0.1", led_count=10, min_interval_s=min_interval_s)
transport = MagicMock()
transport.sendto = MagicMock()
transport.close = MagicMock()
client._transport = transport
client._protocol = MagicMock()
client._connected = True
return client
def _sent_packets(client: LIFXClient) -> list[bytes]:
return [bytes(call.args[0]) for call in client._transport.sendto.call_args_list]
@pytest.mark.asyncio
async def test_send_pixels_emits_one_set_color_packet():
client = _make_connected_client()
pixels = np.array([[255, 0, 0]], dtype=np.uint8)
await client.send_pixels(pixels)
packets = _sent_packets(client)
assert len(packets) == 1
msg_type = struct.unpack_from("<H", packets[0], 32)[0]
assert msg_type == MSG_SET_COLOR
@pytest.mark.asyncio
async def test_send_pixels_encodes_red_into_hsbk_payload():
client = _make_connected_client()
pixels = np.array([[255, 0, 0]], dtype=np.uint8)
await client.send_pixels(pixels)
packet = _sent_packets(client)[0]
# SetColor payload starts at offset 36 (header) + 1 (reserved byte) = 37
h, s, b, _k, _dur = struct.unpack_from("<HHHHI", packet, 37)
assert h == 0
assert s == 65535
assert b == 65535
@pytest.mark.asyncio
async def test_send_pixels_scales_brightness_by_dimming_rgb():
client = _make_connected_client()
pixels = np.array([[255, 0, 0]], dtype=np.uint8)
await client.send_pixels(pixels, brightness=128)
packet = _sent_packets(client)[0]
_h, _s, b, _k, _dur = struct.unpack_from("<HHHHI", packet, 37)
# 255 * 128/255 = 128 → brightness ≈ 128/255 * 65535 ≈ 32896
assert abs(b - int((128 / 255) * 65535)) < 256
@pytest.mark.asyncio
async def test_rate_limit_drops_subsequent_frames():
client = _make_connected_client(min_interval_s=10.0)
pixels = np.array([[10, 20, 30]], dtype=np.uint8)
await client.send_pixels(pixels)
await client.send_pixels(pixels)
await client.send_pixels(pixels)
assert len(_sent_packets(client)) == 1
@pytest.mark.asyncio
async def test_send_pixels_when_not_connected_raises():
client = LIFXClient("lifx://127.0.0.1", led_count=1)
with pytest.raises(RuntimeError, match="not connected"):
await client.send_pixels(np.array([[1, 2, 3]], dtype=np.uint8))
def test_send_pixels_fast_runs_synchronously():
client = _make_connected_client(min_interval_s=0.0)
pixels = np.array([[10, 20, 30]], dtype=np.uint8)
client.send_pixels_fast(pixels)
packets = _sent_packets(client)
assert len(packets) == 1
msg_type = struct.unpack_from("<H", packets[0], 32)[0]
assert msg_type == MSG_SET_COLOR
def test_supports_fast_send_is_true():
assert LIFXClient("lifx://127.0.0.1", led_count=1).supports_fast_send is True
@pytest.mark.asyncio
async def test_set_power_sends_set_power_msg():
client = _make_connected_client()
await client.set_power(True)
await client.set_power(False)
packets = _sent_packets(client)
types = [struct.unpack_from("<H", p, 32)[0] for p in packets]
assert types == [MSG_SET_POWER, MSG_SET_POWER]
on_level = struct.unpack_from("<H", packets[0], 36)[0]
off_level = struct.unpack_from("<H", packets[1], 36)[0]
assert on_level == 65535
assert off_level == 0
@pytest.mark.asyncio
async def test_set_color_sends_set_color_msg():
client = _make_connected_client()
await client.set_color(255, 0, 0)
packet = _sent_packets(client)[0]
msg_type = struct.unpack_from("<H", packet, 32)[0]
assert msg_type == MSG_SET_COLOR
@pytest.mark.asyncio
async def test_close_releases_transport():
client = _make_connected_client()
transport = client._transport
await client.close()
transport.close.assert_called_once()
assert client._transport is None
assert client.is_connected is False
# ============================================================================
# Provider
# ============================================================================
def test_provider_device_type_and_capabilities():
provider = LIFXDeviceProvider()
assert provider.device_type == "lifx"
caps = provider.capabilities
assert "manual_led_count" in caps
assert "power_control" in caps
assert "single_pixel" in caps
@pytest.mark.asyncio
async def test_provider_validate_accepts_bare_host():
provider = LIFXDeviceProvider()
assert await provider.validate_device("192.168.1.50") == {}
@pytest.mark.asyncio
async def test_provider_validate_rejects_empty():
provider = LIFXDeviceProvider()
with pytest.raises(ValueError, match="Invalid LIFX URL"):
await provider.validate_device("")
def test_provider_create_client_threads_config():
provider = LIFXDeviceProvider()
config = LIFXConfig(
device_id="device_test",
device_url="lifx://192.168.1.50",
led_count=30,
lifx_min_interval_ms=100,
)
client = provider.create_client(config, deps=ProviderDeps())
assert isinstance(client, LIFXClient)
assert client.host == "192.168.1.50"
assert client.port == LIFX_PORT
assert client._led_count == 30
assert client._min_interval_s == pytest.approx(0.1)
@pytest.mark.asyncio
async def test_provider_discover_returns_empty_on_failure(monkeypatch):
async def _explode(timeout):
raise OSError("network unreachable")
monkeypatch.setattr("ledgrab.core.devices.lifx_provider.discover_lifx_bulbs", _explode)
provider = LIFXDeviceProvider()
assert await provider.discover() == []
@pytest.mark.asyncio
async def test_provider_discover_maps_replies_to_discovered_devices(monkeypatch):
async def _fake(timeout):
return [
{"ip": "192.168.1.50", "mac": "aabbccddeeff", "port": 56700},
{"ip": "", "mac": "1234567890ab", "port": 56700},
]
monkeypatch.setattr("ledgrab.core.devices.lifx_provider.discover_lifx_bulbs", _fake)
provider = LIFXDeviceProvider()
results = await provider.discover()
assert len(results) == 1
[bulb] = results
assert bulb.device_type == "lifx"
assert bulb.url == "lifx://192.168.1.50"
assert bulb.ip == "192.168.1.50"
assert bulb.mac == "aabbccddeeff"
# ============================================================================
# Device.to_config() round-trip
# ============================================================================
def test_device_to_config_round_trip_lifx():
from ledgrab.storage.device_store import Device
device = Device(
device_id="device_abc12345",
name="Office LIFX",
url="lifx://192.168.1.42",
led_count=30,
device_type="lifx",
lifx_min_interval_ms=100,
)
config = device.to_config()
assert isinstance(config, LIFXConfig)
assert config.device_url == "lifx://192.168.1.42"
assert config.led_count == 30
assert config.lifx_min_interval_ms == 100
def test_device_to_dict_omits_lifx_default_interval():
from ledgrab.storage.device_store import Device
device = Device(
device_id="device_abc12345",
name="Default",
url="lifx://192.168.1.42",
led_count=1,
device_type="lifx",
)
assert "lifx_min_interval_ms" not in device.to_dict()
def test_device_to_dict_preserves_non_default_lifx_interval():
from ledgrab.storage.device_store import Device
device = Device(
device_id="device_abc12345",
name="Custom",
url="lifx://192.168.1.42",
led_count=1,
device_type="lifx",
lifx_min_interval_ms=200,
)
assert device.to_dict()["lifx_min_interval_ms"] == 200
def test_device_from_dict_lifx_round_trip():
from ledgrab.storage.device_store import Device
restored = Device.from_dict(
{
"id": "device_abc12345",
"name": "Roundtrip",
"url": "lifx://10.0.0.1",
"led_count": 1,
"device_type": "lifx",
"lifx_min_interval_ms": 150,
}
)
assert restored.lifx_min_interval_ms == 150