feat(devices): WiZ Connected LAN target type

Adds support for WiZ Connected (Philips' budget-tier) smart bulbs that
accept JSON commands as UDP datagrams on port 38899 with broadcast LAN
discovery on 255.255.255.255:38899.

Backend:
- WiZClient is a single-pixel UDP adapter: averages the incoming strip
  to one RGB triple and pushes it via setPilot with r/g/b params.
  Brightness folds into the RGB scaling so we burn one packet per frame
  instead of two.
- UDP fire-and-forget tolerates high update rates with no ack overhead,
  so the default rate gate is 50 ms (~20 Hz) -- 10x faster than Yeelight.
- supports_fast_send=True with a synchronous send_pixels_fast hot path.
- Broadcast discovery sends the standard registration envelope; bulb
  replies are parsed for IP+MAC and surfaced as DiscoveredDevice
  entries. Broadcast failures (no network, firewall) yield [] rather
  than raising.
- Health check sends getPilot and waits 1.5s for any reply on a
  one-shot UDP socket.
- WiZConfig joins the typed config union; Device storage gains a
  wiz_min_interval_ms field; full to_dict/from_dict/to_config wiring.
- 36 unit tests cover URL parsing, MAC extraction, strip averaging,
  rate limiting, fast-send hot path, provider validate/discover/health,
  and Device.to_config round-trip.

Frontend:
- 'wiz' in DEVICE_TYPE_KEYS (next to 'yeelight'), lightbulb icon
  (deliberate smart-bulb family grouping with Hue + Yeelight).
- isWizDevice predicate + per-type field show/hide in create and
  settings modals.
- Rate-limit number input (default 50 ms) in both modals with hint
  text noting the UDP fire-and-forget characteristic.
- Locale strings in en/ru/zh.

WiZ bulbs are reachable from the existing "Scan network" button -- no
new discovery UI affordance was needed.
This commit is contained in:
2026-05-16 02:12:01 +03:00
parent 4b65005823
commit ede627b4ac
19 changed files with 962 additions and 8 deletions
+10 -3
View File
@@ -695,9 +695,16 @@ Xiaomi/Yeelight bulbs, port 55443 TCP JSON. Direct protocol (no
Philips' UDP-local budget tier. Port 38899 JSON UDP.
- [ ] Reuse the discovery scaffolding from Yeelight (UDP broadcast pattern)
- [ ] `WiZConfig` + `WiZLEDClient` + `WiZDeviceProvider`
- [ ] Frontend additions + locales
- [x] `WiZConfig` + `WiZClient` + `WiZDeviceProvider`
- [x] UDP broadcast discovery on 255.255.255.255:38899 with the standard
`registration` envelope; replies parsed for IP+MAC.
- [x] Sync `send_pixels_fast` for the hot loop (UDP is fire-and-forget,
no async needed). 50 ms default min interval → ~20 Hz cap.
- [x] Health check sends `getPilot` and waits for any reply.
- [x] Storage + API schemas + route handler wiring
- [x] 36 unit tests
- [ ] Frontend: WiZ in device-type picker + edit form
- [ ] Locale strings (en/ru/zh)
### Phase 2 — Unified discovery + pairing UX layer
+7
View File
@@ -67,6 +67,7 @@ def _device_to_response(device) -> DeviceResponse:
hue_client_key=device.hue_client_key,
hue_entertainment_group_id=device.hue_entertainment_group_id,
yeelight_min_interval_ms=device.yeelight_min_interval_ms,
wiz_min_interval_ms=device.wiz_min_interval_ms,
spi_speed_hz=device.spi_speed_hz,
spi_led_type=device.spi_led_type,
chroma_device_type=device.chroma_device_type,
@@ -227,6 +228,11 @@ async def create_device(
if device_data.yeelight_min_interval_ms is not None
else 500
),
wiz_min_interval_ms=(
device_data.wiz_min_interval_ms
if device_data.wiz_min_interval_ms is not None
else 50
),
spi_speed_hz=device_data.spi_speed_hz or 800000,
spi_led_type=device_data.spi_led_type or "WS2812B",
chroma_device_type=device_data.chroma_device_type or "chromalink",
@@ -490,6 +496,7 @@ async def update_device(
hue_client_key=update_data.hue_client_key,
hue_entertainment_group_id=update_data.hue_entertainment_group_id,
yeelight_min_interval_ms=update_data.yeelight_min_interval_ms,
wiz_min_interval_ms=update_data.wiz_min_interval_ms,
spi_speed_hz=update_data.spi_speed_hz,
spi_led_type=update_data.spi_led_type,
chroma_device_type=update_data.chroma_device_type,
+11
View File
@@ -70,6 +70,13 @@ class DeviceCreate(BaseModel):
le=10000,
description="Yeelight client-side rate limit between commands in ms (default 500)",
)
# WiZ fields
wiz_min_interval_ms: Optional[int] = Field(
None,
ge=0,
le=10000,
description="WiZ client-side rate limit between commands in ms (default 50)",
)
# SPI Direct fields
spi_speed_hz: Optional[int] = Field(
None, ge=100000, le=4000000, description="SPI clock speed in Hz"
@@ -161,6 +168,9 @@ class DeviceUpdate(BaseModel):
yeelight_min_interval_ms: Optional[int] = Field(
None, ge=0, le=10000, description="Yeelight client-side rate limit in ms"
)
wiz_min_interval_ms: Optional[int] = Field(
None, ge=0, le=10000, description="WiZ client-side rate limit in ms"
)
spi_speed_hz: Optional[int] = Field(None, ge=100000, le=4000000, description="SPI clock speed")
spi_led_type: Optional[str] = Field(None, description="LED chipset type")
chroma_device_type: Optional[str] = Field(None, description="Chroma peripheral type")
@@ -333,6 +343,7 @@ class DeviceResponse(BaseModel):
yeelight_min_interval_ms: int = Field(
default=500, description="Yeelight client-side rate limit in ms"
)
wiz_min_interval_ms: int = Field(default=50, description="WiZ client-side rate limit in ms")
spi_speed_hz: int = Field(default=800000, description="SPI clock speed in Hz")
spi_led_type: str = Field(default="WS2812B", description="LED chipset type")
chroma_device_type: str = Field(default="chromalink", description="Chroma peripheral type")
@@ -88,6 +88,18 @@ class YeelightConfig(BaseDeviceConfig):
yeelight_min_interval_ms: int = 500
@dataclass(frozen=True)
class WiZConfig(BaseDeviceConfig):
"""WiZ Connected (Philips budget-tier) UDP LAN bulb.
``wiz_min_interval_ms`` is a client-side rate gate. WiZ tolerates much
higher rates than Yeelight (UDP, no ack) so the default is 50 ms ≈ 20 Hz.
"""
device_type: Literal["wiz"] = "wiz"
wiz_min_interval_ms: int = 50
@dataclass(frozen=True)
class SPIConfig(BaseDeviceConfig):
device_type: Literal["spi"] = "spi"
@@ -159,6 +171,7 @@ DeviceConfig = Union[
WLEDConfig,
DDPConfig,
YeelightConfig,
WiZConfig,
AdalightConfig,
AmbiLEDConfig,
DMXConfig,
@@ -342,6 +342,10 @@ def _register_builtin_providers():
register_provider(YeelightDeviceProvider())
from ledgrab.core.devices.wiz_provider import WiZDeviceProvider
register_provider(WiZDeviceProvider())
# BLE support is optional — only register the provider if the ``bleak``
# extra is installed. Importing the provider itself is safe (it doesn't
# import bleak at module load), but we still want a clean skip on
@@ -0,0 +1,323 @@
"""WiZ Connected (Philips' budget tier) LAN LED client.
WiZ bulbs accept JSON commands as UDP datagrams on port 38899. There's no
persistent connection — every frame is fire-and-forget — so the client is
simpler than Yeelight and tolerates higher update rates.
URL scheme: ``wiz://<host>[:port]`` or bare ``<host>``. Default port 38899.
Discovery: UDP broadcast of a ``registration`` envelope on
255.255.255.255:38899 — bulbs reply unicast with their MAC and state.
"""
from __future__ import annotations
import asyncio
import json
import socket
import time
from datetime import datetime, timezone
from typing import List, Optional, Tuple, Union
from urllib.parse import urlparse
import numpy as np
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
from ledgrab.utils import get_logger
logger = get_logger(__name__)
WIZ_PORT = 38899
DEFAULT_MIN_INTERVAL_S = 0.05 # ~20 Hz cap; bulbs tolerate it, UDP costs nothing
def parse_wiz_url(url: str) -> Tuple[str, int]:
"""Pull ``(host, port)`` from ``wiz://host[:port]`` or a bare ``host[:port]``."""
if not url:
raise ValueError("WiZ URL is empty")
raw = url.strip()
if "://" in raw:
parsed = urlparse(raw)
host = parsed.hostname or ""
port = parsed.port or WIZ_PORT
else:
parsed = urlparse(f"wiz://{raw}")
host = parsed.hostname or ""
port = parsed.port or WIZ_PORT
if not host:
raise ValueError(f"WiZ URL has no host: {url!r}")
return host, port
def _average_color(
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
) -> Tuple[int, int, int]:
"""Reduce an N-pixel strip to one average RGB triple."""
if isinstance(pixels, np.ndarray):
if pixels.size == 0:
return (0, 0, 0)
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
mean = arr.mean(axis=0)
return int(mean[0]), int(mean[1]), int(mean[2])
if not pixels:
return (0, 0, 0)
total_r = total_g = total_b = 0
for r, g, b in pixels:
total_r += r
total_g += g
total_b += b
n = len(pixels)
return total_r // n, total_g // n, total_b // n
class _WiZProtocol(asyncio.DatagramProtocol):
"""Minimal protocol: sends only, drops any inbound packets silently."""
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
# WiZ bulbs reply to setPilot with a small ack. We don't need it for
# ambilight streaming — just drop the bytes on the floor.
pass
def error_received(self, exc):
# UDP errors (ICMP unreachable, route changes) surface here. Log
# once and let the next frame retry; transient drops are normal.
logger.debug("WiZ UDP error: %s", exc)
class WiZClient(LEDClient):
"""LEDClient for a single WiZ Connected bulb on the LAN."""
def __init__(
self,
url: str,
led_count: int = 1,
*,
min_interval_s: float = DEFAULT_MIN_INTERVAL_S,
):
host, port = parse_wiz_url(url)
self._host = host
self._port = port
self._led_count = led_count
self._min_interval_s = max(0.0, min_interval_s)
self._transport: Optional[asyncio.DatagramTransport] = None
self._protocol: Optional[_WiZProtocol] = None
self._connected = False
self._next_tx_at: float = 0.0
@property
def host(self) -> str:
return self._host
@property
def port(self) -> int:
return self._port
@property
def is_connected(self) -> bool:
return self._connected and self._transport is not None
@property
def device_led_count(self) -> Optional[int]:
return self._led_count or None
async def connect(self) -> bool:
if self._connected and self._transport is not None:
return True
loop = asyncio.get_running_loop()
try:
transport, protocol = await loop.create_datagram_endpoint(
_WiZProtocol, remote_addr=(self._host, self._port)
)
except OSError as exc:
raise RuntimeError(f"Failed to open UDP to WiZ at {self._host}: {exc}") from exc
self._transport = transport
self._protocol = protocol # type: ignore[assignment]
self._connected = True
logger.info("WiZClient connected to %s:%d", self._host, self._port)
return True
async def close(self) -> None:
if self._transport is not None:
try:
self._transport.close()
except OSError:
pass
self._transport = None
self._protocol = None
self._connected = False
def _send_json(self, payload: dict) -> None:
"""Fire one JSON UDP packet. Caller must hold an open transport."""
assert self._transport is not None
raw = json.dumps(payload, separators=(",", ":")).encode("utf-8")
self._transport.sendto(raw)
async def send_pixels(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> bool:
"""Average the pixel strip to one color and push ``setPilot``."""
if not self.is_connected:
raise RuntimeError("WiZClient not connected")
now = time.monotonic()
if now < self._next_tx_at:
return True
r, g, b = _average_color(pixels)
# WiZ has a separate "dimming" param (1-100). For ambilight we keep
# things linear and fold brightness into the RGB scalars — that's
# what the bulb shows anyway with state=on.
if brightness < 255:
scale = max(0, min(255, brightness)) / 255.0
r = int(r * scale)
g = int(g * scale)
b = int(b * scale)
self._send_json({"method": "setPilot", "params": {"r": r, "g": g, "b": b}})
self._next_tx_at = now + self._min_interval_s
return True
def send_pixels_fast(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> None:
"""Synchronous variant for the hot path. Same shape as send_pixels."""
if not self.is_connected or self._transport is None:
raise RuntimeError("WiZClient not connected")
now = time.monotonic()
if now < self._next_tx_at:
return
r, g, b = _average_color(pixels)
if brightness < 255:
scale = max(0, min(255, brightness)) / 255.0
r = int(r * scale)
g = int(g * scale)
b = int(b * scale)
self._send_json({"method": "setPilot", "params": {"r": r, "g": g, "b": b}})
self._next_tx_at = now + self._min_interval_s
@property
def supports_fast_send(self) -> bool:
# WiZ is UDP fire-and-forget — perfect candidate for the sync hot path.
return True
async def set_color(self, r: int, g: int, b: int) -> None:
if not self.is_connected:
raise RuntimeError("WiZClient not connected")
self._send_json({"method": "setPilot", "params": {"r": r, "g": g, "b": b}})
async def set_brightness(self, brightness_0_100: int) -> None:
if not self.is_connected:
raise RuntimeError("WiZClient not connected")
clamped = max(10, min(100, brightness_0_100)) # WiZ rejects <10
self._send_json({"method": "setPilot", "params": {"dimming": clamped}})
async def set_power(self, on: bool) -> None:
if not self.is_connected:
raise RuntimeError("WiZClient not connected")
self._send_json({"method": "setPilot", "params": {"state": on}})
@classmethod
async def check_health(
cls,
url: str,
http_client,
prev_health: Optional[DeviceHealth] = None,
) -> DeviceHealth:
"""Send a getPilot and wait briefly for any reply on a one-shot socket."""
now = datetime.now(timezone.utc)
try:
host, port = parse_wiz_url(url)
except ValueError as exc:
return DeviceHealth(online=False, last_checked=now, error=str(exc))
loop = asyncio.get_running_loop()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setblocking(False)
try:
sock.bind(("", 0))
start = loop.time()
await loop.sock_sendto(sock, b'{"method":"getPilot","params":{}}', (host, port))
try:
await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=1.5)
except asyncio.TimeoutError:
return DeviceHealth(
online=False,
last_checked=now,
error=f"No WiZ reply from {host}:{port} within 1.5s",
)
latency_ms = (loop.time() - start) * 1000.0
return DeviceHealth(online=True, latency_ms=latency_ms, last_checked=now)
except OSError as exc:
return DeviceHealth(
online=False,
last_checked=now,
error=f"WiZ probe failed for {host}: {exc}",
)
finally:
sock.close()
# ============================================================================
# Broadcast discovery
# ============================================================================
_DISCOVERY_REQUEST = (
b'{"method":"registration","params":{"phoneMac":"AAAAAAAAAAAA","register":false,'
b'"phoneIp":"0.0.0.0","id":"1"}}'
)
def _extract_mac(payload: dict) -> str:
"""Pull a bulb MAC out of the standard ``result`` envelope, if present."""
result = payload.get("result")
if isinstance(result, dict):
return str(result.get("mac", "")).lower()
return ""
async def discover_wiz_bulbs(timeout: float = 2.0) -> List[dict]:
"""Broadcast a registration probe and collect bulb replies.
Returns a list of ``{"ip": ..., "mac": ..., "raw": <parsed_json>}`` dicts.
Multicast / broadcast failures (no network, firewall) raise OSError;
callers handle that by returning an empty discovery list.
"""
loop = asyncio.get_running_loop()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setblocking(False)
try:
sock.bind(("", 0))
await loop.sock_sendto(sock, _DISCOVERY_REQUEST, ("255.255.255.255", WIZ_PORT))
results: list[dict] = []
seen_ips: set[str] = set()
deadline = loop.time() + timeout
while True:
remaining = deadline - loop.time()
if remaining <= 0:
break
try:
raw, addr = await asyncio.wait_for(
loop.sock_recvfrom(sock, 2048),
timeout=remaining,
)
except asyncio.TimeoutError:
break
ip = addr[0]
if ip in seen_ips:
continue
try:
payload = json.loads(raw.decode("utf-8", errors="replace"))
except (json.JSONDecodeError, UnicodeDecodeError):
continue
if not isinstance(payload, dict):
continue
seen_ips.add(ip)
results.append({"ip": ip, "mac": _extract_mac(payload), "raw": payload})
return results
finally:
sock.close()
@@ -0,0 +1,92 @@
"""WiZ Connected device provider — LAN-discoverable Philips budget-tier bulbs."""
from __future__ import annotations
from typing import TYPE_CHECKING, List
from ledgrab.core.devices.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
ProviderDeps,
)
from ledgrab.core.devices.wiz_client import (
WiZClient,
discover_wiz_bulbs,
parse_wiz_url,
)
from ledgrab.utils import get_logger
if TYPE_CHECKING:
from ledgrab.core.devices.device_config import WiZConfig
logger = get_logger(__name__)
class WiZDeviceProvider(LEDDeviceProvider):
"""Provider for WiZ Connected (Philips budget-tier) bulbs.
Single-pixel device, identical adaptation shape as Yeelight/Hue.
"""
@property
def device_type(self) -> str:
return "wiz"
@property
def capabilities(self) -> set:
return {
"manual_led_count",
"power_control",
"brightness_control",
"static_color",
"health_check",
"single_pixel",
}
def create_client(self, config: "WiZConfig", *, deps: ProviderDeps) -> LEDClient:
return WiZClient(
config.device_url,
led_count=config.led_count,
min_interval_s=max(0.0, config.wiz_min_interval_ms / 1000.0),
)
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
return await WiZClient.check_health(url, http_client, prev_health)
async def validate_device(self, url: str) -> dict:
try:
host, port = parse_wiz_url(url)
except ValueError as exc:
raise ValueError(f"Invalid WiZ URL: {exc}") from exc
logger.info("WiZ device URL validated: host=%s port=%d", host, port)
return {}
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
try:
bulbs = await discover_wiz_bulbs(timeout=min(timeout, 5.0))
except (OSError, RuntimeError) as exc:
logger.warning("WiZ discovery failed: %s", exc)
return []
results: List[DiscoveredDevice] = []
for bulb in bulbs:
ip = bulb.get("ip", "")
if not ip:
continue
url = f"wiz://{ip}"
mac = bulb.get("mac", "")
results.append(
DiscoveredDevice(
name=f"WiZ {mac[-6:]}" if mac else "WiZ bulb",
url=url,
device_type="wiz",
ip=ip,
mac=mac,
led_count=None,
version=None,
)
)
logger.info("WiZ broadcast scan found %d bulb(s)", len(results))
return results
+4
View File
@@ -163,6 +163,10 @@ export function isYeelightDevice(type: string) {
return type === 'yeelight';
}
export function isWizDevice(type: string) {
return type === 'wiz';
}
export function isUsbhidDevice(type: string) {
return type === 'usbhid';
}
+1 -1
View File
@@ -48,7 +48,7 @@ const _deviceTypeIcons = {
wled: _svg(P.wifi), adalight: _svg(P.usb), ambiled: _svg(P.usb),
mqtt: _svg(P.send), ws: _svg(P.globe), openrgb: _svg(P.palette),
dmx: _svg(P.radio), ddp: _svg(P.send), mock: _svg(P.wrench),
espnow: _svg(P.radio), hue: _svg(P.lightbulb), yeelight: _svg(P.lightbulb),
espnow: _svg(P.radio), hue: _svg(P.lightbulb), yeelight: _svg(P.lightbulb), wiz: _svg(P.lightbulb),
usbhid: _svg(P.usb),
spi: _svg(P.plug), chroma: _svg(P.zap), gamesense: _svg(P.target),
ble: _svg(P.bluetooth),
@@ -7,7 +7,7 @@ import {
_discoveryCache, set_discoveryCache,
csptCache,
} from '../core/state.ts';
import { API_BASE, fetchWithAuth, isSerialDevice, isMockDevice, isMqttDevice, isWsDevice, isOpenrgbDevice, isDmxDevice, isDdpDevice, isEspnowDevice, isHueDevice, isYeelightDevice, isBleDevice, isUsbhidDevice, isSpiDevice, isChromaDevice, isGameSenseDevice, isGroupDevice, escapeHtml } from '../core/api.ts';
import { API_BASE, fetchWithAuth, isSerialDevice, isMockDevice, isMqttDevice, isWsDevice, isOpenrgbDevice, isDmxDevice, isDdpDevice, isEspnowDevice, isHueDevice, isYeelightDevice, isWizDevice, isBleDevice, isUsbhidDevice, isSpiDevice, isChromaDevice, isGameSenseDevice, isGroupDevice, escapeHtml } from '../core/api.ts';
import { devicesCache } from '../core/state.ts';
import { t } from '../core/i18n.ts';
import { showToast, desktopFocus } from '../core/ui.ts';
@@ -42,6 +42,7 @@ class AddDeviceModal extends Modal {
bleFamily: (document.getElementById('device-ble-family') as HTMLSelectElement)?.value || '',
bleGoveeKey: (document.getElementById('device-ble-govee-key') as HTMLInputElement)?.value || '',
yeelightMinInterval: (document.getElementById('device-yeelight-min-interval') as HTMLInputElement)?.value || '500',
wizMinInterval: (document.getElementById('device-wiz-min-interval') as HTMLInputElement)?.value || '50',
groupChildren: JSON.stringify(_getGroupChildIds('device')),
groupMode: (document.getElementById('device-group-mode-select') as HTMLSelectElement)?.value || 'sequence',
};
@@ -52,7 +53,7 @@ const addDeviceModal = new AddDeviceModal();
/* ── Icon-grid type selector ──────────────────────────────────── */
const DEVICE_TYPE_KEYS = ['wled', 'adalight', 'ambiled', 'mqtt', 'ws', 'openrgb', 'dmx', 'ddp', 'espnow', 'hue', 'yeelight', 'ble', 'usbhid', 'spi', 'chroma', 'gamesense', 'group', 'mock'];
const DEVICE_TYPE_KEYS = ['wled', 'adalight', 'ambiled', 'mqtt', 'ws', 'openrgb', 'dmx', 'ddp', 'espnow', 'hue', 'yeelight', 'wiz', 'ble', 'usbhid', 'spi', 'chroma', 'gamesense', 'group', 'mock'];
function _buildDeviceTypeItems() {
return DEVICE_TYPE_KEYS.map(key => ({
@@ -280,6 +281,7 @@ export function onDeviceTypeChanged() {
_showEspnowFields(false);
_showHueFields(false);
_showYeelightFields(false);
_showWizFields(false);
_showBleFields(false);
_showSpiFields(false);
_showChromaFields(false);
@@ -479,6 +481,28 @@ export function onDeviceTypeChanged() {
} else {
scanForDevices();
}
} else if (isWizDevice(deviceType)) {
// WiZ: UDP fire-and-forget on port 38899. Show URL (LAN IP), LED
// count (controls source mapping; the bulb averages to one color),
// rate-limit ms. Discovery uses UDP broadcast — same scan flow.
urlGroup.style.display = '';
urlInput.setAttribute('required', '');
serialGroup.style.display = 'none';
serialSelect.removeAttribute('required');
ledCountGroup.style.display = '';
baudRateGroup.style.display = 'none';
if (ledTypeGroup) ledTypeGroup.style.display = 'none';
if (sendLatencyGroup) sendLatencyGroup.style.display = 'none';
if (scanBtn) scanBtn.style.display = '';
_showWizFields(true);
if (urlLabel) urlLabel.textContent = t('device.wiz.url') || 'IP Address:';
if (urlHint) urlHint.textContent = t('device.wiz.url.hint') || 'LAN IP of the WiZ bulb. UDP port 38899 is the protocol default.';
urlInput.placeholder = t('device.wiz.url.placeholder') || '192.168.1.50';
if (deviceType in _discoveryCache) {
_renderDiscoveryList();
} else {
scanForDevices();
}
} else if (isBleDevice(deviceType)) {
// BLE: show URL (ble://<address>), LED count, protocol family picker,
// and a Govee-only AES key field that toggles with the family selection.
@@ -825,6 +849,13 @@ export function showAddDevice(presetType: any = null, cloneData: any = null) {
ymi.value = String(cloneData.yeelight_min_interval_ms);
}
}
// Prefill WiZ fields
if (isWizDevice(presetType)) {
const wmi = document.getElementById('device-wiz-min-interval') as HTMLInputElement;
if (wmi && cloneData.wiz_min_interval_ms != null) {
wmi.value = String(cloneData.wiz_min_interval_ms);
}
}
// Prefill CSPT template selector (after fetch completes)
if (cloneData.default_css_processing_template_id) {
csptCache.fetch().then(() => {
@@ -1031,6 +1062,11 @@ export async function handleAddDevice(event: any) {
const parsed = parseInt(raw || '500', 10);
body.yeelight_min_interval_ms = Number.isFinite(parsed) ? parsed : 500;
}
if (isWizDevice(deviceType)) {
const raw = (document.getElementById('device-wiz-min-interval') as HTMLInputElement)?.value;
const parsed = parseInt(raw || '50', 10);
body.wiz_min_interval_ms = Number.isFinite(parsed) ? parsed : 50;
}
if (isBleDevice(deviceType)) {
body.ble_family = (document.getElementById('device-ble-family') as HTMLSelectElement)?.value || 'sp110e';
const goveeKey = (document.getElementById('device-ble-govee-key') as HTMLInputElement)?.value?.trim();
@@ -1389,6 +1425,11 @@ function _showYeelightFields(show: boolean) {
if (el) el.style.display = show ? '' : 'none';
}
function _showWizFields(show: boolean) {
const el = document.getElementById('device-wiz-min-interval-group') as HTMLElement | null;
if (el) el.style.display = show ? '' : 'none';
}
// Tracks whether the BLE fields are currently shown — avoids reading
// style.display strings in _updateBleGoveeKeyVisibility.
let _bleFieldsVisible = false;
@@ -6,7 +6,7 @@ import {
_deviceBrightnessCache, updateDeviceBrightness,
csptCache,
} from '../core/state.ts';
import { API_BASE, getHeaders, fetchWithAuth, escapeHtml, isSerialDevice, isMockDevice, isMqttDevice, isWsDevice, isOpenrgbDevice, isDmxDevice, isDdpDevice, isYeelightDevice, isBleDevice, isGroupDevice } from '../core/api.ts';
import { API_BASE, getHeaders, fetchWithAuth, escapeHtml, isSerialDevice, isMockDevice, isMqttDevice, isWsDevice, isOpenrgbDevice, isDmxDevice, isDdpDevice, isYeelightDevice, isWizDevice, isBleDevice, isGroupDevice } from '../core/api.ts';
import { devicesCache } from '../core/state.ts';
import { _fetchOpenrgbZones, _getCheckedZones, _splitOpenrgbZone, _getZoneMode, ensureDmxProtocolIconSelect, destroyDmxProtocolIconSelect, ensureDdpColorOrderIconSelect, destroyDdpColorOrderIconSelect, ensureSpiLedTypeIconSelect, destroySpiLedTypeIconSelect, ensureGameSenseDeviceTypeIconSelect, destroyGameSenseDeviceTypeIconSelect, addGroupChildSettingsWithId as _addGroupChildSettingsWithId, ensureGroupModeIconSelect, destroyGroupModeIconSelect, ensureBleFamilyIconSelect, destroyBleFamilyIconSelect } from './device-discovery.ts';
import { t } from '../core/i18n.ts';
@@ -96,6 +96,7 @@ class DeviceSettingsModal extends Modal {
bleFamily: (document.getElementById('settings-ble-family') as HTMLSelectElement | null)?.value || '',
bleGoveeKey: (document.getElementById('settings-ble-govee-key') as HTMLInputElement | null)?.value || '',
yeelightMinInterval: (document.getElementById('settings-yeelight-min-interval') as HTMLInputElement | null)?.value || '500',
wizMinInterval: (document.getElementById('settings-wiz-min-interval') as HTMLInputElement | null)?.value || '50',
csptId: (document.getElementById('settings-css-processing-template') as HTMLSelectElement | null)?.value || '',
};
}
@@ -648,6 +649,24 @@ export async function showSettings(deviceId: any) {
if (yeelightMinIntervalGroup) (yeelightMinIntervalGroup as HTMLElement).style.display = 'none';
}
// WiZ-specific fields — UDP fire-and-forget, no wire-level rate cap
// beyond what the bulb's MCU can keep up with. Default 50 ms ≈ 20 Hz
// is comfortable; users can push lower if they want to experiment.
const wizMinIntervalGroup = document.getElementById('settings-wiz-min-interval-group');
if (isWizDevice(device.device_type)) {
if (wizMinIntervalGroup) (wizMinIntervalGroup as HTMLElement).style.display = '';
const wmi = device.wiz_min_interval_ms ?? 50;
(document.getElementById('settings-wiz-min-interval') as HTMLInputElement).value = String(wmi);
// Relabel URL field as IP Address (same pattern as Yeelight/DMX/DDP)
const urlLabel5 = urlGroup.querySelector('label[for="settings-device-url"]') as HTMLElement | null;
const urlHint5 = urlGroup.querySelector('.input-hint') as HTMLElement | null;
if (urlLabel5) urlLabel5.textContent = t('device.wiz.url');
if (urlHint5) urlHint5.textContent = t('device.wiz.url.hint');
urlInput.placeholder = t('device.wiz.url.placeholder') || '192.168.1.50';
} else {
if (wizMinIntervalGroup) (wizMinIntervalGroup as HTMLElement).style.display = 'none';
}
// BLE-specific fields — exposed in the settings modal so the user
// can fix a wrong protocol family pick without deleting+recreating
// the device. Uses the shared IconSelect grid (project rule bans
@@ -789,6 +808,11 @@ export async function saveDeviceSettings() {
const parsed = parseInt(raw || '500', 10);
body.yeelight_min_interval_ms = Number.isFinite(parsed) ? parsed : 500;
}
if (isWizDevice(settingsModal.deviceType)) {
const raw = (document.getElementById('settings-wiz-min-interval') as HTMLInputElement | null)?.value;
const parsed = parseInt(raw || '50', 10);
body.wiz_min_interval_ms = Number.isFinite(parsed) ? parsed : 50;
}
if (isBleDevice(settingsModal.deviceType)) {
body.ble_family = (document.getElementById('settings-ble-family') as HTMLSelectElement | null)?.value || 'sp110e';
const goveeKey = (document.getElementById('settings-ble-govee-key') as HTMLInputElement | null)?.value?.trim() || '';
+2 -1
View File
@@ -47,7 +47,7 @@ export function bindableColorSourceId(b: BindableColor | undefined): string {
export type DeviceType =
| 'wled' | 'adalight' | 'ambiled' | 'mock' | 'mqtt' | 'ws'
| 'openrgb' | 'dmx' | 'ddp' | 'espnow' | 'hue' | 'yeelight'
| 'openrgb' | 'dmx' | 'ddp' | 'espnow' | 'hue' | 'yeelight' | 'wiz'
| 'ble' | 'usbhid' | 'spi'
| 'chroma' | 'gamesense' | 'group';
@@ -77,6 +77,7 @@ export interface Device {
hue_client_key: string;
hue_entertainment_group_id: string;
yeelight_min_interval_ms: number;
wiz_min_interval_ms: number;
spi_speed_hz: number;
spi_led_type: string;
chroma_device_type: string;
@@ -204,6 +204,13 @@
"device.yeelight.url.placeholder": "192.168.1.50",
"device.yeelight_min_interval": "Min Update Interval:",
"device.yeelight_min_interval.hint": "Client-side rate limit between commands in ms. Default 500 ms keeps bulbs under their ~1 cmd/sec cap; lower values risk throttling.",
"device.type.wiz": "WiZ",
"device.type.wiz.desc": "WiZ Connected (Philips) UDP LAN bulb",
"device.wiz.url": "IP Address:",
"device.wiz.url.hint": "LAN IP of the WiZ bulb. UDP port 38899 is the protocol default.",
"device.wiz.url.placeholder": "192.168.1.50",
"device.wiz_min_interval": "Min Update Interval:",
"device.wiz_min_interval.hint": "Client-side rate limit between commands in ms. UDP fire-and-forget tolerates fast updates; default 50 ms ≈ 20 Hz.",
"device.type.ble": "BLE LED Controller",
"device.type.ble.desc": "Bluetooth LE strips: SP110E, Triones, Zengge, Govee (whole-strip color)",
"device.ble.url": "BLE Address:",
@@ -259,6 +259,13 @@
"device.yeelight.url.placeholder": "192.168.1.50",
"device.yeelight_min_interval": "Мин. интервал обновления:",
"device.yeelight_min_interval.hint": "Локальный лимит частоты команд (мс). По умолчанию 500 мс держит лампу под ограничением ~1 команда/сек; меньшие значения могут вызвать троттлинг.",
"device.type.wiz": "WiZ",
"device.type.wiz.desc": "Лампа WiZ Connected (Philips) по UDP",
"device.wiz.url": "IP-адрес:",
"device.wiz.url.hint": "IP-адрес лампы WiZ в локальной сети. UDP-порт 38899 — по умолчанию.",
"device.wiz.url.placeholder": "192.168.1.50",
"device.wiz_min_interval": "Мин. интервал обновления:",
"device.wiz_min_interval.hint": "Локальный лимит частоты команд (мс). UDP fire-and-forget справляется с быстрыми обновлениями; по умолчанию 50 мс ≈ 20 Гц.",
"device.type.ble": "BLE LED контроллер",
"device.type.ble.desc": "Bluetooth LE ленты: SP110E, Triones, Zengge, Govee (один цвет на всю ленту)",
"device.ble.url": "BLE адрес:",
@@ -257,6 +257,13 @@
"device.yeelight.url.placeholder": "192.168.1.50",
"device.yeelight_min_interval": "最小更新间隔:",
"device.yeelight_min_interval.hint": "客户端命令速率限制(毫秒)。默认 500 毫秒可使灯泡保持在约 1 cmd/sec 限制下;较低的值可能导致节流。",
"device.type.wiz": "WiZ",
"device.type.wiz.desc": "WiZ Connected (飞利浦) UDP 局域网灯泡",
"device.wiz.url": "IP 地址:",
"device.wiz.url.hint": "WiZ 灯泡的局域网 IP。UDP 端口 38899 为协议默认值。",
"device.wiz.url.placeholder": "192.168.1.50",
"device.wiz_min_interval": "最小更新间隔:",
"device.wiz_min_interval.hint": "客户端命令速率限制(毫秒)。UDP 即发即忘可处理快速更新;默认 50 毫秒 ≈ 20 Hz。",
"device.type.ble": "BLE LED 控制器",
"device.type.ble.desc": "Bluetooth LE 灯带:SP110E、Triones、Zengge、Govee(整条灯带同色)",
"device.ble.url": "BLE 地址:",
@@ -64,6 +64,8 @@ class Device:
hue_entertainment_group_id: str = "",
# Yeelight fields
yeelight_min_interval_ms: int = 500,
# WiZ fields
wiz_min_interval_ms: int = 50,
# SPI Direct fields
spi_speed_hz: int = 800000,
spi_led_type: str = "WS2812B",
@@ -112,6 +114,7 @@ class Device:
self.hue_client_key = hue_client_key
self.hue_entertainment_group_id = hue_entertainment_group_id
self.yeelight_min_interval_ms = yeelight_min_interval_ms
self.wiz_min_interval_ms = wiz_min_interval_ms
self.spi_speed_hz = spi_speed_hz
self.spi_led_type = spi_led_type
self.chroma_device_type = chroma_device_type
@@ -151,6 +154,7 @@ class Device:
OpenRGBConfig,
SPIConfig,
USBHIDConfig,
WiZConfig,
WLEDConfig,
WSConfig,
YeelightConfig,
@@ -204,6 +208,11 @@ class Device:
**base,
yeelight_min_interval_ms=self.yeelight_min_interval_ms,
)
if dt == "wiz":
return WiZConfig(
**base,
wiz_min_interval_ms=self.wiz_min_interval_ms,
)
if dt == "spi":
return SPIConfig(**base, spi_speed_hz=self.spi_speed_hz, spi_led_type=self.spi_led_type)
if dt == "chroma":
@@ -284,6 +293,8 @@ class Device:
d["hue_entertainment_group_id"] = self.hue_entertainment_group_id
if self.yeelight_min_interval_ms != 500:
d["yeelight_min_interval_ms"] = self.yeelight_min_interval_ms
if self.wiz_min_interval_ms != 50:
d["wiz_min_interval_ms"] = self.wiz_min_interval_ms
if self.spi_speed_hz != 800000:
d["spi_speed_hz"] = self.spi_speed_hz
if self.spi_led_type != "WS2812B":
@@ -340,6 +351,7 @@ class Device:
hue_client_key=_dec(data.get("hue_client_key", "")),
hue_entertainment_group_id=data.get("hue_entertainment_group_id", ""),
yeelight_min_interval_ms=data.get("yeelight_min_interval_ms", 500),
wiz_min_interval_ms=data.get("wiz_min_interval_ms", 50),
spi_speed_hz=data.get("spi_speed_hz", 800000),
spi_led_type=data.get("spi_led_type", "WS2812B"),
chroma_device_type=data.get("chroma_device_type", "chromalink"),
@@ -388,6 +400,7 @@ _UPDATABLE_FIELDS: frozenset[str] = frozenset(
"hue_client_key",
"hue_entertainment_group_id",
"yeelight_min_interval_ms",
"wiz_min_interval_ms",
"spi_speed_hz",
"spi_led_type",
"chroma_device_type",
@@ -489,6 +502,7 @@ class DeviceStore(BaseSqliteStore[Device]):
hue_client_key: str = "",
hue_entertainment_group_id: str = "",
yeelight_min_interval_ms: int = 500,
wiz_min_interval_ms: int = 50,
spi_speed_hz: int = 800000,
spi_led_type: str = "WS2812B",
chroma_device_type: str = "chromalink",
@@ -533,6 +547,7 @@ class DeviceStore(BaseSqliteStore[Device]):
hue_client_key=hue_client_key,
hue_entertainment_group_id=hue_entertainment_group_id,
yeelight_min_interval_ms=yeelight_min_interval_ms,
wiz_min_interval_ms=wiz_min_interval_ms,
spi_speed_hz=spi_speed_hz,
spi_led_type=spi_led_type,
chroma_device_type=chroma_device_type,
@@ -47,6 +47,7 @@
<option value="espnow">ESP-NOW</option>
<option value="hue">Philips Hue</option>
<option value="yeelight">Yeelight</option>
<option value="wiz">WiZ</option>
<option value="ble">BLE LED Controller</option>
<option value="usbhid">USB HID</option>
<option value="spi">SPI Direct</option>
@@ -225,6 +226,15 @@
<small class="input-hint" style="display:none" data-i18n="device.yeelight_min_interval.hint">Client-side rate limit between commands in ms. Default 500 ms keeps the bulb under its ~1 cmd/sec cap. Lower = faster updates but risk throttling.</small>
<input type="number" id="device-yeelight-min-interval" min="0" max="10000" step="50" value="500">
</div>
<!-- WiZ fields -->
<div class="form-group" id="device-wiz-min-interval-group" style="display: none;">
<div class="label-row">
<label for="device-wiz-min-interval" data-i18n="device.wiz_min_interval">Min Update Interval:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="device.wiz_min_interval.hint">Client-side rate limit between commands in ms. UDP fire-and-forget tolerates fast updates; default 50 ms ≈ 20 Hz.</small>
<input type="number" id="device-wiz-min-interval" min="0" max="10000" step="10" value="50">
</div>
<!-- ESP-NOW fields -->
<div class="form-group" id="device-espnow-peer-mac-group" style="display: none;">
<div class="label-row">
@@ -259,6 +259,15 @@
<input type="number" id="settings-yeelight-min-interval" min="0" max="10000" step="50" value="500">
</div>
<div class="form-group" id="settings-wiz-min-interval-group" style="display: none;">
<div class="label-row">
<label for="settings-wiz-min-interval" data-i18n="device.wiz_min_interval">Min Update Interval:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="device.wiz_min_interval.hint">Client-side rate limit between commands in ms. UDP fire-and-forget tolerates fast updates; default 50 ms ≈ 20 Hz.</small>
<input type="number" id="settings-wiz-min-interval" min="0" max="10000" step="10" value="50">
</div>
<div class="form-group" id="settings-send-latency-group" style="display: none;">
<div class="label-row">
<label for="settings-send-latency" data-i18n="device.send_latency">Send Latency (ms):</label>
+372
View File
@@ -0,0 +1,372 @@
"""Tests for the WiZ Connected UDP LED client + provider."""
from __future__ import annotations
import json
from unittest.mock import MagicMock
import numpy as np
import pytest
from ledgrab.core.devices.device_config import WiZConfig
from ledgrab.core.devices.led_client import ProviderDeps
from ledgrab.core.devices.wiz_client import (
WIZ_PORT,
WiZClient,
_average_color,
_extract_mac,
parse_wiz_url,
)
from ledgrab.core.devices.wiz_provider import WiZDeviceProvider
# ============================================================================
# parse_wiz_url
# ============================================================================
@pytest.mark.parametrize(
"url,expected",
[
("wiz://192.168.1.50", ("192.168.1.50", WIZ_PORT)),
("wiz://192.168.1.50:38899", ("192.168.1.50", 38899)),
("wiz://192.168.1.50:40000", ("192.168.1.50", 40000)),
("192.168.1.50", ("192.168.1.50", WIZ_PORT)),
("192.168.1.50:38899", ("192.168.1.50", 38899)),
("bulb.local", ("bulb.local", WIZ_PORT)),
],
)
def test_parse_wiz_url(url, expected):
assert parse_wiz_url(url) == expected
@pytest.mark.parametrize("url", ["", " ", "wiz://", "://192.168.1.1"])
def test_parse_wiz_url_rejects_empty(url):
with pytest.raises(ValueError):
parse_wiz_url(url)
# ============================================================================
# Helpers
# ============================================================================
def test_average_color_numpy():
pixels = np.array([[10, 20, 30], [40, 50, 60], [70, 80, 90]], dtype=np.uint8)
assert _average_color(pixels) == (40, 50, 60)
def test_average_color_list_and_empty():
assert _average_color([(10, 0, 0), (20, 0, 0), (30, 0, 0)]) == (20, 0, 0)
assert _average_color([]) == (0, 0, 0)
def test_extract_mac_from_registration_reply():
assert _extract_mac({"result": {"mac": "AABBCCDDEEFF"}}) == "aabbccddeeff"
assert _extract_mac({"result": {}}) == ""
assert _extract_mac({"error": "oops"}) == ""
assert _extract_mac({}) == ""
# ============================================================================
# WiZClient (mocked transport)
# ============================================================================
def _make_connected_client(min_interval_s: float = 0.0) -> WiZClient:
client = WiZClient("wiz://127.0.0.1", led_count=10, min_interval_s=min_interval_s)
transport = MagicMock()
transport.sendto = MagicMock()
transport.close = MagicMock()
client._transport = transport
client._protocol = MagicMock()
client._connected = True
return client
def _sent_payloads(client: WiZClient) -> list[dict]:
return [
json.loads(call.args[0].decode("utf-8")) for call in client._transport.sendto.call_args_list
]
@pytest.mark.asyncio
async def test_send_pixels_averages_to_set_pilot_rgb():
client = _make_connected_client()
pixels = np.array(
[[255, 0, 0], [0, 255, 0], [0, 0, 255]],
dtype=np.uint8,
)
await client.send_pixels(pixels)
payloads = _sent_payloads(client)
assert len(payloads) == 1
assert payloads[0]["method"] == "setPilot"
# Average of (255,0,0), (0,255,0), (0,0,255) is (85, 85, 85)
assert payloads[0]["params"] == {"r": 85, "g": 85, "b": 85}
@pytest.mark.asyncio
async def test_send_pixels_scales_for_brightness():
client = _make_connected_client()
pixels = np.array([[200, 100, 50]], dtype=np.uint8)
await client.send_pixels(pixels, brightness=128)
payloads = _sent_payloads(client)
expected_r = int(200 * 128 / 255)
expected_g = int(100 * 128 / 255)
expected_b = int(50 * 128 / 255)
assert payloads[0]["params"] == {"r": expected_r, "g": expected_g, "b": expected_b}
@pytest.mark.asyncio
async def test_send_pixels_zero_brightness_blacks_out():
client = _make_connected_client()
pixels = np.array([[200, 100, 50]], dtype=np.uint8)
await client.send_pixels(pixels, brightness=0)
payloads = _sent_payloads(client)
assert payloads[0]["params"] == {"r": 0, "g": 0, "b": 0}
@pytest.mark.asyncio
async def test_rate_limit_drops_subsequent_frames_within_window():
client = _make_connected_client(min_interval_s=10.0)
pixels = np.array([[10, 20, 30]], dtype=np.uint8)
await client.send_pixels(pixels)
await client.send_pixels(pixels)
await client.send_pixels(pixels)
assert len(_sent_payloads(client)) == 1
@pytest.mark.asyncio
async def test_zero_interval_sends_every_frame():
client = _make_connected_client(min_interval_s=0.0)
pixels = np.array([[10, 20, 30]], dtype=np.uint8)
await client.send_pixels(pixels)
await client.send_pixels(pixels)
await client.send_pixels(pixels)
assert len(_sent_payloads(client)) == 3
@pytest.mark.asyncio
async def test_send_pixels_when_not_connected_raises():
client = WiZClient("wiz://127.0.0.1", led_count=1)
with pytest.raises(RuntimeError, match="not connected"):
await client.send_pixels(np.array([[1, 2, 3]], dtype=np.uint8))
def test_send_pixels_fast_runs_synchronously():
"""Hot path: synchronous fast-send must dispatch over the UDP transport."""
client = _make_connected_client(min_interval_s=0.0)
pixels = np.array([[10, 20, 30]], dtype=np.uint8)
client.send_pixels_fast(pixels)
payloads = _sent_payloads(client)
assert payloads[0]["params"] == {"r": 10, "g": 20, "b": 30}
def test_send_pixels_fast_when_not_connected_raises():
client = WiZClient("wiz://127.0.0.1", led_count=1)
with pytest.raises(RuntimeError, match="not connected"):
client.send_pixels_fast([(1, 2, 3)])
def test_supports_fast_send_is_true():
assert WiZClient("wiz://127.0.0.1", led_count=1).supports_fast_send is True
@pytest.mark.asyncio
async def test_set_power_sends_state_param():
client = _make_connected_client()
await client.set_power(True)
await client.set_power(False)
payloads = _sent_payloads(client)
assert payloads[0]["params"] == {"state": True}
assert payloads[1]["params"] == {"state": False}
@pytest.mark.asyncio
async def test_set_brightness_clamps_to_10_100():
"""WiZ rejects dimming values below ~10."""
client = _make_connected_client()
await client.set_brightness(5)
await client.set_brightness(50)
await client.set_brightness(200)
payloads = _sent_payloads(client)
assert payloads[0]["params"] == {"dimming": 10}
assert payloads[1]["params"] == {"dimming": 50}
assert payloads[2]["params"] == {"dimming": 100}
@pytest.mark.asyncio
async def test_set_color_sends_rgb_set_pilot():
client = _make_connected_client()
await client.set_color(12, 34, 56)
payloads = _sent_payloads(client)
assert payloads[0] == {"method": "setPilot", "params": {"r": 12, "g": 34, "b": 56}}
@pytest.mark.asyncio
async def test_close_releases_transport():
client = _make_connected_client()
transport = client._transport
await client.close()
transport.close.assert_called_once()
assert client._transport is None
assert client.is_connected is False
# ============================================================================
# Provider
# ============================================================================
def test_provider_device_type_and_capabilities():
provider = WiZDeviceProvider()
assert provider.device_type == "wiz"
caps = provider.capabilities
assert "manual_led_count" in caps
assert "power_control" in caps
assert "brightness_control" in caps
assert "single_pixel" in caps
@pytest.mark.asyncio
async def test_provider_validate_accepts_bare_host():
provider = WiZDeviceProvider()
assert await provider.validate_device("192.168.1.50") == {}
@pytest.mark.asyncio
async def test_provider_validate_rejects_empty():
provider = WiZDeviceProvider()
with pytest.raises(ValueError, match="Invalid WiZ URL"):
await provider.validate_device("")
def test_provider_create_client_threads_config():
provider = WiZDeviceProvider()
config = WiZConfig(
device_id="device_test",
device_url="wiz://192.168.1.50",
led_count=30,
wiz_min_interval_ms=100,
)
client = provider.create_client(config, deps=ProviderDeps())
assert isinstance(client, WiZClient)
assert client.host == "192.168.1.50"
assert client.port == WIZ_PORT
assert client._led_count == 30
assert client._min_interval_s == pytest.approx(0.1)
@pytest.mark.asyncio
async def test_provider_discover_returns_empty_on_failure(monkeypatch):
async def _explode(timeout):
raise OSError("network unreachable")
monkeypatch.setattr("ledgrab.core.devices.wiz_provider.discover_wiz_bulbs", _explode)
provider = WiZDeviceProvider()
assert await provider.discover() == []
@pytest.mark.asyncio
async def test_provider_discover_maps_replies_to_discovered_devices(monkeypatch):
async def _fake(timeout):
return [
{"ip": "192.168.1.50", "mac": "aabbccddeeff", "raw": {}},
# Missing IP should be skipped silently.
{"ip": "", "mac": "1234567890ab", "raw": {}},
]
monkeypatch.setattr("ledgrab.core.devices.wiz_provider.discover_wiz_bulbs", _fake)
provider = WiZDeviceProvider()
results = await provider.discover()
assert len(results) == 1
[bulb] = results
assert bulb.device_type == "wiz"
assert bulb.url == "wiz://192.168.1.50"
assert bulb.ip == "192.168.1.50"
assert bulb.mac == "aabbccddeeff"
# Last 6 chars of the MAC end up in the surface name for easy ID
assert "ddeeff" in bulb.name.lower()
# ============================================================================
# Device.to_config() round-trip
# ============================================================================
def test_device_to_config_round_trip_wiz():
from ledgrab.storage.device_store import Device
device = Device(
device_id="device_abc12345",
name="Bedroom WiZ",
url="wiz://192.168.1.42",
led_count=30,
device_type="wiz",
wiz_min_interval_ms=100,
)
config = device.to_config()
assert isinstance(config, WiZConfig)
assert config.device_url == "wiz://192.168.1.42"
assert config.led_count == 30
assert config.wiz_min_interval_ms == 100
def test_device_to_dict_omits_wiz_default_interval():
from ledgrab.storage.device_store import Device
device = Device(
device_id="device_abc12345",
name="Default",
url="wiz://192.168.1.42",
led_count=1,
device_type="wiz",
)
assert "wiz_min_interval_ms" not in device.to_dict()
def test_device_to_dict_preserves_non_default_wiz_interval():
from ledgrab.storage.device_store import Device
device = Device(
device_id="device_abc12345",
name="Custom",
url="wiz://192.168.1.42",
led_count=1,
device_type="wiz",
wiz_min_interval_ms=200,
)
assert device.to_dict()["wiz_min_interval_ms"] == 200
def test_device_from_dict_wiz_round_trip():
from ledgrab.storage.device_store import Device
restored = Device.from_dict(
{
"id": "device_abc12345",
"name": "Roundtrip",
"url": "wiz://10.0.0.1",
"led_count": 1,
"device_type": "wiz",
"wiz_min_interval_ms": 150,
}
)
assert restored.wiz_min_interval_ms == 150