Add 6 new device providers, IconSelect grids, and UI fixes

New device providers: ESP-NOW, Philips Hue, USB HID, SPI Direct,
Razer Chroma SDK, and SteelSeries GameSense — each with client,
provider, full backend registration, schemas, routes, and frontend
support including discovery, form fields, and i18n.

Add IconSelect grids for SPI LED chipset selector and GameSense
peripheral type selector with new Lucide icons (cpu, keyboard,
mouse, headphones).

Replace emoji graph overlay buttons (eye, bell) with proper SVG
path icons for consistent cross-platform rendering.

Fix connection overlay causing horizontal scroll by adding
overflow: hidden.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-14 20:32:28 +03:00
parent 51ec0970c3
commit 272cb69247
28 changed files with 2752 additions and 10 deletions

View File

@@ -57,6 +57,15 @@ def _device_to_response(device) -> DeviceResponse:
dmx_protocol=getattr(device, 'dmx_protocol', 'artnet'),
dmx_start_universe=getattr(device, 'dmx_start_universe', 0),
dmx_start_channel=getattr(device, 'dmx_start_channel', 1),
espnow_peer_mac=getattr(device, 'espnow_peer_mac', ''),
espnow_channel=getattr(device, 'espnow_channel', 1),
hue_username=getattr(device, 'hue_username', ''),
hue_client_key=getattr(device, 'hue_client_key', ''),
hue_entertainment_group_id=getattr(device, 'hue_entertainment_group_id', ''),
spi_speed_hz=getattr(device, 'spi_speed_hz', 800000),
spi_led_type=getattr(device, 'spi_led_type', 'WS2812B'),
chroma_device_type=getattr(device, 'chroma_device_type', 'chromalink'),
gamesense_device_type=getattr(device, 'gamesense_device_type', 'keyboard'),
created_at=device.created_at,
updated_at=device.updated_at,
)
@@ -135,6 +144,15 @@ async def create_device(
dmx_protocol=device_data.dmx_protocol or "artnet",
dmx_start_universe=device_data.dmx_start_universe or 0,
dmx_start_channel=device_data.dmx_start_channel or 1,
espnow_peer_mac=device_data.espnow_peer_mac or "",
espnow_channel=device_data.espnow_channel or 1,
hue_username=device_data.hue_username or "",
hue_client_key=device_data.hue_client_key or "",
hue_entertainment_group_id=device_data.hue_entertainment_group_id or "",
spi_speed_hz=device_data.spi_speed_hz or 800000,
spi_led_type=device_data.spi_led_type or "WS2812B",
chroma_device_type=device_data.chroma_device_type or "chromalink",
gamesense_device_type=device_data.gamesense_device_type or "keyboard",
)
# WS devices: auto-set URL to ws://{device_id}
@@ -322,6 +340,15 @@ async def update_device(
dmx_protocol=update_data.dmx_protocol,
dmx_start_universe=update_data.dmx_start_universe,
dmx_start_channel=update_data.dmx_start_channel,
espnow_peer_mac=update_data.espnow_peer_mac,
espnow_channel=update_data.espnow_channel,
hue_username=update_data.hue_username,
hue_client_key=update_data.hue_client_key,
hue_entertainment_group_id=update_data.hue_entertainment_group_id,
spi_speed_hz=update_data.spi_speed_hz,
spi_led_type=update_data.spi_led_type,
chroma_device_type=update_data.chroma_device_type,
gamesense_device_type=update_data.gamesense_device_type,
)
# Sync connection info in processor manager

View File

@@ -23,6 +23,20 @@ class DeviceCreate(BaseModel):
dmx_protocol: Optional[str] = Field(None, description="DMX protocol: artnet or sacn")
dmx_start_universe: Optional[int] = Field(None, ge=0, le=32767, description="DMX start universe")
dmx_start_channel: Optional[int] = Field(None, ge=1, le=512, description="DMX start channel (1-512)")
# ESP-NOW fields
espnow_peer_mac: Optional[str] = Field(None, description="ESP-NOW peer MAC address (e.g. AA:BB:CC:DD:EE:FF)")
espnow_channel: Optional[int] = Field(None, ge=1, le=14, description="ESP-NOW WiFi channel (1-14)")
# Philips Hue fields
hue_username: Optional[str] = Field(None, description="Hue bridge username (from pairing)")
hue_client_key: Optional[str] = Field(None, description="Hue entertainment client key (hex)")
hue_entertainment_group_id: Optional[str] = Field(None, description="Hue entertainment group/zone ID")
# SPI Direct fields
spi_speed_hz: Optional[int] = Field(None, ge=100000, le=4000000, description="SPI clock speed in Hz")
spi_led_type: Optional[str] = Field(None, description="LED chipset: WS2812, WS2812B, WS2811, SK6812, SK6812_RGBW")
# Razer Chroma fields
chroma_device_type: Optional[str] = Field(None, description="Chroma peripheral type: keyboard, mouse, mousepad, headset, chromalink, keypad")
# SteelSeries GameSense fields
gamesense_device_type: Optional[str] = Field(None, description="GameSense device type: keyboard, mouse, headset, mousepad, indicator")
class DeviceUpdate(BaseModel):
@@ -41,6 +55,15 @@ class DeviceUpdate(BaseModel):
dmx_protocol: Optional[str] = Field(None, description="DMX protocol: artnet or sacn")
dmx_start_universe: Optional[int] = Field(None, ge=0, le=32767, description="DMX start universe")
dmx_start_channel: Optional[int] = Field(None, ge=1, le=512, description="DMX start channel (1-512)")
espnow_peer_mac: Optional[str] = Field(None, description="ESP-NOW peer MAC address")
espnow_channel: Optional[int] = Field(None, ge=1, le=14, description="ESP-NOW WiFi channel")
hue_username: Optional[str] = Field(None, description="Hue bridge username")
hue_client_key: Optional[str] = Field(None, description="Hue entertainment client key")
hue_entertainment_group_id: Optional[str] = Field(None, description="Hue entertainment group ID")
spi_speed_hz: Optional[int] = Field(None, ge=100000, le=4000000, description="SPI clock speed")
spi_led_type: Optional[str] = Field(None, description="LED chipset type")
chroma_device_type: Optional[str] = Field(None, description="Chroma peripheral type")
gamesense_device_type: Optional[str] = Field(None, description="GameSense device type")
class CalibrationLineSchema(BaseModel):
@@ -138,6 +161,15 @@ class DeviceResponse(BaseModel):
dmx_protocol: str = Field(default="artnet", description="DMX protocol: artnet or sacn")
dmx_start_universe: int = Field(default=0, description="DMX start universe")
dmx_start_channel: int = Field(default=1, description="DMX start channel (1-512)")
espnow_peer_mac: str = Field(default="", description="ESP-NOW peer MAC address")
espnow_channel: int = Field(default=1, description="ESP-NOW WiFi channel")
hue_username: str = Field(default="", description="Hue bridge username")
hue_client_key: str = Field(default="", description="Hue entertainment client key")
hue_entertainment_group_id: str = Field(default="", description="Hue entertainment group ID")
spi_speed_hz: int = Field(default=800000, description="SPI clock speed in Hz")
spi_led_type: str = Field(default="WS2812B", description="LED chipset type")
chroma_device_type: str = Field(default="chromalink", description="Chroma peripheral type")
gamesense_device_type: str = Field(default="keyboard", description="GameSense device type")
created_at: datetime = Field(description="Creation timestamp")
updated_at: datetime = Field(description="Last update timestamp")

View File

@@ -0,0 +1,229 @@
"""Razer Chroma SDK LED client — controls Razer RGB peripherals via REST API."""
import asyncio
import json
from datetime import datetime, timezone
from typing import List, Optional, Tuple, Union
import numpy as np
from wled_controller.core.devices.led_client import DeviceHealth, LEDClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
# Chroma SDK REST API base
CHROMA_SDK_URL = "http://localhost:54235/razer/rest"
# Device type → (endpoint suffix, max LEDs)
CHROMA_DEVICES = {
"keyboard": ("keyboard", 132), # 22 columns × 6 rows
"mouse": ("mouse", 30),
"mousepad": ("mousepad", 15),
"headset": ("headset", 5), # left + right + 3 zones
"chromalink": ("chromalink", 5),
"keypad": ("keypad", 20), # 5×4 grid
}
class ChromaClient(LEDClient):
"""LED client that controls Razer peripherals via the Chroma SDK REST API.
The Chroma SDK exposes a local REST API. Workflow:
1. POST /razer/rest to init a session → get session URL
2. PUT effects to {session_url}/{device_type}
3. DELETE session on close
Uses CUSTOM effect type for per-LED RGB control.
"""
HEARTBEAT_INTERVAL = 10 # seconds — SDK kills session after 15s idle
def __init__(
self,
url: str = "",
led_count: int = 0,
chroma_device_type: str = "chromalink",
**kwargs,
):
self._base_url = url or CHROMA_SDK_URL
self._led_count = led_count
self._chroma_device_type = chroma_device_type
self._session_url: Optional[str] = None
self._connected = False
self._heartbeat_task: Optional[asyncio.Task] = None
self._http_client = None
async def connect(self) -> bool:
import httpx
self._http_client = httpx.AsyncClient(timeout=5.0)
# Initialize Chroma SDK session
init_payload = {
"title": "WLED Screen Controller",
"description": "LED pixel streaming from WLED Screen Controller",
"author": {"name": "WLED-SC", "contact": "https://github.com"},
"device_supported": [self._chroma_device_type],
"category": "application",
}
try:
resp = await self._http_client.post(self._base_url, json=init_payload)
resp.raise_for_status()
data = resp.json()
self._session_url = data.get("uri") or data.get("sessionid")
if not self._session_url:
raise RuntimeError(f"Chroma SDK init returned no session URL: {data}")
except Exception as e:
logger.error("Chroma SDK init failed: %s", e)
await self._http_client.aclose()
self._http_client = None
raise
self._connected = True
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
logger.info(
"Chroma client connected: device=%s session=%s leds=%d",
self._chroma_device_type, self._session_url, self._led_count,
)
return True
async def _heartbeat_loop(self):
"""Keep the Chroma SDK session alive."""
while self._connected and self._session_url:
try:
await asyncio.sleep(self.HEARTBEAT_INTERVAL)
if self._http_client and self._session_url:
await self._http_client.put(
f"{self._session_url}/heartbeat",
json={},
)
except asyncio.CancelledError:
break
except Exception as e:
logger.debug("Chroma heartbeat error: %s", e)
async def close(self) -> None:
self._connected = False
if self._heartbeat_task:
self._heartbeat_task.cancel()
try:
await self._heartbeat_task
except asyncio.CancelledError:
pass
self._heartbeat_task = None
if self._http_client and self._session_url:
try:
await self._http_client.delete(self._session_url)
except Exception:
pass
if self._http_client:
await self._http_client.aclose()
self._http_client = None
self._session_url = None
logger.info("Chroma client closed")
@property
def is_connected(self) -> bool:
return self._connected and self._session_url is not None
async def send_pixels(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> bool:
if not self.is_connected or not self._http_client:
return False
if isinstance(pixels, np.ndarray):
pixel_arr = pixels
else:
pixel_arr = np.array(pixels, dtype=np.uint8)
bri_scale = brightness / 255.0
device_info = CHROMA_DEVICES.get(self._chroma_device_type)
if not device_info:
return False
endpoint, max_leds = device_info
n = min(len(pixel_arr), max_leds, self._led_count or max_leds)
# Chroma uses BGR packed as 0x00BBGGRR integers
colors = []
for i in range(n):
r, g, b = pixel_arr[i]
r = int(r * bri_scale)
g = int(g * bri_scale)
b = int(b * bri_scale)
colors.append(r | (g << 8) | (b << 16))
# Pad to max_leds if needed
while len(colors) < max_leds:
colors.append(0)
# Build effect payload based on device type
if self._chroma_device_type == "keyboard":
# Keyboard uses 2D array: 6 rows × 22 columns
grid = []
idx = 0
for row in range(6):
row_colors = []
for col in range(22):
row_colors.append(colors[idx] if idx < len(colors) else 0)
idx += 1
grid.append(row_colors)
effect = {"effect": "CHROMA_CUSTOM", "param": grid}
elif self._chroma_device_type == "keypad":
# Keypad uses 2D array: 4 rows × 5 columns
grid = []
idx = 0
for row in range(4):
row_colors = []
for col in range(5):
row_colors.append(colors[idx] if idx < len(colors) else 0)
idx += 1
grid.append(row_colors)
effect = {"effect": "CHROMA_CUSTOM", "param": grid}
else:
# 1D devices: mouse, mousepad, headset, chromalink
effect = {"effect": "CHROMA_CUSTOM", "param": colors[:max_leds]}
try:
url = f"{self._session_url}/{endpoint}"
resp = await self._http_client.put(url, json=effect)
return resp.status_code == 200
except Exception as e:
logger.error("Chroma send failed: %s", e)
return False
@classmethod
async def check_health(
cls,
url: str,
http_client,
prev_health: Optional[DeviceHealth] = None,
) -> DeviceHealth:
"""Check if Chroma SDK is running."""
base = url or CHROMA_SDK_URL
try:
resp = await http_client.get(base, timeout=3.0)
if resp.status_code < 500:
return DeviceHealth(
online=True,
latency_ms=0.0,
device_name="Razer Chroma SDK",
last_checked=datetime.now(timezone.utc),
)
except Exception:
pass
return DeviceHealth(
online=False,
error="Chroma SDK not responding (is Razer Synapse running?)",
last_checked=datetime.now(timezone.utc),
)

View File

@@ -0,0 +1,104 @@
"""Razer Chroma SDK device provider — control Razer RGB peripherals."""
from datetime import datetime, timezone
from typing import List
from wled_controller.core.devices.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
)
from wled_controller.core.devices.chroma_client import (
ChromaClient,
CHROMA_DEVICES,
CHROMA_SDK_URL,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class ChromaDeviceProvider(LEDDeviceProvider):
"""Provider for Razer Chroma SDK RGB peripherals.
URL format: chroma://device_type (e.g. chroma://keyboard, chroma://chromalink)
Requires Razer Synapse with Chroma SDK enabled.
"""
@property
def device_type(self) -> str:
return "chroma"
@property
def capabilities(self) -> set:
return {"manual_led_count", "health_check"}
def create_client(self, url: str, **kwargs) -> LEDClient:
chroma_device_type = _parse_chroma_url(url)
return ChromaClient(
url=CHROMA_SDK_URL,
led_count=kwargs.get("led_count", 0),
chroma_device_type=chroma_device_type,
)
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
return await ChromaClient.check_health(CHROMA_SDK_URL, http_client, prev_health)
async def validate_device(self, url: str) -> dict:
"""Validate Chroma SDK is reachable."""
chroma_type = _parse_chroma_url(url)
if chroma_type not in CHROMA_DEVICES:
raise ValueError(
f"Unknown Chroma device type '{chroma_type}'. "
f"Supported: {', '.join(CHROMA_DEVICES.keys())}"
)
import httpx
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(CHROMA_SDK_URL)
if resp.status_code >= 500:
raise ValueError("Chroma SDK returned server error")
except httpx.ConnectError:
raise ValueError(
"Cannot connect to Chroma SDK. "
"Ensure Razer Synapse is running with Chroma SDK enabled."
)
_, max_leds = CHROMA_DEVICES[chroma_type]
return {"led_count": max_leds}
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
"""Discover available Chroma device types if SDK is running."""
import httpx
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.get(CHROMA_SDK_URL)
if resp.status_code >= 500:
return []
except Exception:
return []
# SDK is running — offer all device types
results = []
for dev_type, (_, max_leds) in CHROMA_DEVICES.items():
results.append(
DiscoveredDevice(
name=f"Razer {dev_type.title()}",
url=f"chroma://{dev_type}",
device_type="chroma",
ip="127.0.0.1",
mac="",
led_count=max_leds,
version=None,
)
)
return results
def _parse_chroma_url(url: str) -> str:
"""Parse 'chroma://device_type' → device_type string."""
return url.replace("chroma://", "").strip().lower()

View File

@@ -0,0 +1,162 @@
"""ESP-NOW LED client — sends pixel data via serial to an ESP32 gateway which forwards over ESP-NOW."""
import asyncio
import struct
from datetime import datetime, timezone
from typing import List, Optional, Tuple, Union
import numpy as np
from wled_controller.core.devices.led_client import DeviceHealth, LEDClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
# Gateway serial protocol constants
FRAME_START = 0xEE
FRAME_END = 0xEF
def _mac_str_to_bytes(mac: str) -> bytes:
"""Convert 'AA:BB:CC:DD:EE:FF' to 6 raw bytes."""
parts = mac.replace("-", ":").split(":")
if len(parts) != 6:
raise ValueError(f"Invalid MAC address: {mac}")
return bytes(int(p, 16) for p in parts)
def _build_frame(peer_mac: bytes, pixels: bytes, brightness: int) -> bytes:
"""Build a serial frame for the ESP-NOW gateway.
Wire format:
[0xEE][MAC 6B][LED_COUNT 2B LE][BRIGHTNESS 1B][RGB...][CHECKSUM 1B][0xEF]
Checksum is XOR of all bytes between START and END (exclusive).
"""
led_count = len(pixels) // 3
header = struct.pack("<B6sHB", FRAME_START, peer_mac, led_count, brightness)
payload = header[1:] + pixels # everything after START for checksum
checksum = 0
for b in payload:
checksum ^= b
return header + pixels + bytes([checksum, FRAME_END])
class ESPNowClient(LEDClient):
"""LED client that sends pixel data to an ESP32 ESP-NOW gateway over serial.
The gateway ESP32 receives serial frames and forwards LED data via
ESP-NOW to a peer ESP32 which drives the LED strip.
"""
def __init__(
self,
url: str = "",
led_count: int = 0,
baud_rate: int = 921600,
espnow_peer_mac: str = "FF:FF:FF:FF:FF:FF",
espnow_channel: int = 1,
**kwargs,
):
self._port = url
self._led_count = led_count
self._baud_rate = baud_rate
self._peer_mac = _mac_str_to_bytes(espnow_peer_mac)
self._channel = espnow_channel
self._serial = None
self._connected = False
self._lock = asyncio.Lock()
async def connect(self) -> bool:
try:
import serial as pyserial
except ImportError:
raise RuntimeError("pyserial is required for ESP-NOW devices: pip install pyserial")
loop = asyncio.get_event_loop()
self._serial = await loop.run_in_executor(
None,
lambda: pyserial.Serial(self._port, self._baud_rate, timeout=1),
)
self._connected = True
logger.info(
"ESP-NOW client connected: port=%s baud=%d peer=%s channel=%d leds=%d",
self._port, self._baud_rate,
":".join(f"{b:02X}" for b in self._peer_mac),
self._channel, self._led_count,
)
return True
async def close(self) -> None:
if self._serial and self._serial.is_open:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._serial.close)
self._serial = None
self._connected = False
logger.info("ESP-NOW client closed: port=%s", self._port)
@property
def is_connected(self) -> bool:
return self._connected and self._serial is not None and self._serial.is_open
@property
def supports_fast_send(self) -> bool:
return True
def send_pixels_fast(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> None:
if not self.is_connected:
return
if isinstance(pixels, np.ndarray):
pixel_bytes = pixels.astype(np.uint8).tobytes()
else:
pixel_bytes = bytes(c for rgb in pixels for c in rgb)
frame = _build_frame(self._peer_mac, pixel_bytes, brightness)
try:
self._serial.write(frame)
except Exception as e:
logger.warning("ESP-NOW send failed: %s", e)
self._connected = False
async def send_pixels(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> bool:
if not self.is_connected:
return False
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(
None,
lambda: self.send_pixels_fast(pixels, brightness),
)
return True
except Exception as e:
logger.error("ESP-NOW async send failed: %s", e)
return False
@classmethod
async def check_health(
cls,
url: str,
http_client,
prev_health: Optional[DeviceHealth] = None,
) -> DeviceHealth:
"""Check if the serial port is available."""
try:
import serial as pyserial
s = pyserial.Serial(url, timeout=0.1)
s.close()
return DeviceHealth(online=True, latency_ms=0.0, last_checked=datetime.now(timezone.utc))
except Exception as e:
return DeviceHealth(
online=False,
error=str(e),
last_checked=datetime.now(timezone.utc),
)

View File

@@ -0,0 +1,87 @@
"""ESP-NOW device provider — ultra-low-latency LED control via ESP32 gateway."""
from datetime import datetime, timezone
from typing import List
from wled_controller.core.devices.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
)
from wled_controller.core.devices.espnow_client import ESPNowClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class ESPNowDeviceProvider(LEDDeviceProvider):
"""Provider for ESP-NOW LED devices via serial ESP32 gateway.
URL = serial port of the gateway ESP32 (e.g. COM3, /dev/ttyUSB0).
Each device represents one remote ESP32 peer identified by MAC address.
Multiple devices can share the same gateway (serial port).
"""
@property
def device_type(self) -> str:
return "espnow"
@property
def capabilities(self) -> set:
return {"manual_led_count", "health_check"}
def create_client(self, url: str, **kwargs) -> LEDClient:
return ESPNowClient(
url,
led_count=kwargs.get("led_count", 0),
baud_rate=kwargs.get("baud_rate", 921600),
espnow_peer_mac=kwargs.get("espnow_peer_mac", "FF:FF:FF:FF:FF:FF"),
espnow_channel=kwargs.get("espnow_channel", 1),
)
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
return await ESPNowClient.check_health(url, http_client, prev_health)
async def validate_device(self, url: str) -> dict:
"""Validate serial port is accessible. LED count is manual."""
try:
import serial as pyserial
s = pyserial.Serial(url, timeout=0.5)
s.close()
except ImportError:
raise ValueError("pyserial is required for ESP-NOW devices: pip install pyserial")
except Exception as e:
raise ValueError(f"Cannot open serial port {url}: {e}")
return {}
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
"""Discover available serial ports that could be ESP32 gateways."""
try:
import serial.tools.list_ports
ports = serial.tools.list_ports.comports()
results = []
for port in ports:
# Look for ESP32 USB descriptors
desc = (port.description or "").lower()
vid = port.vid or 0
# Common ESP32 USB VIDs: Espressif (0x303A), Silicon Labs CP210x (0x10C4),
# FTDI (0x0403), WCH CH340 (0x1A86)
esp_vids = {0x303A, 0x10C4, 0x0403, 0x1A86}
if vid in esp_vids or "cp210" in desc or "ch340" in desc or "esp" in desc:
results.append(
DiscoveredDevice(
name=f"ESP-NOW Gateway ({port.description})",
url=port.device,
device_type="espnow",
ip="",
mac="",
led_count=None,
version=None,
)
)
return results
except ImportError:
return []

View File

@@ -0,0 +1,262 @@
"""SteelSeries GameSense LED client — controls SteelSeries RGB peripherals via REST API."""
import asyncio
import json
import os
import platform
from datetime import datetime, timezone
from typing import List, Optional, Tuple, Union
import numpy as np
from wled_controller.core.devices.led_client import DeviceHealth, LEDClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
# Game registration constants
GAME_NAME = "WLED_SCREEN_CTRL"
GAME_DISPLAY_NAME = "WLED Screen Controller"
EVENT_NAME = "PIXEL_DATA"
def _get_gamesense_address() -> Optional[str]:
"""Discover the SteelSeries GameSense address from coreProps.json."""
if platform.system() == "Windows":
props_path = os.path.join(
os.environ.get("PROGRAMDATA", r"C:\ProgramData"),
"SteelSeries", "SteelSeries Engine 3", "coreProps.json",
)
elif platform.system() == "Darwin":
props_path = os.path.expanduser(
"~/Library/Application Support/SteelSeries Engine 3/coreProps.json"
)
else:
# Linux — SteelSeries Engine not officially supported
props_path = os.path.expanduser(
"~/.config/SteelSeries Engine 3/coreProps.json"
)
try:
with open(props_path, "r") as f:
data = json.load(f)
return data.get("address")
except (FileNotFoundError, json.JSONDecodeError, KeyError):
return None
# GameSense device-zone mapping
GAMESENSE_ZONES = {
"keyboard": ("rgb-per-key-zones", "all"),
"mouse": ("mouse", "wheel"),
"headset": ("headset", "earcups"),
"mousepad": ("mousepad", "all"),
"indicator": ("indicator", "one"),
}
class GameSenseClient(LEDClient):
"""LED client that controls SteelSeries peripherals via GameSense REST API.
GameSense uses a register-bind-send pattern:
1. Register game + event
2. Bind event to device zone with context-color handler
3. Send color events with RGB data in frame payload
The API address is discovered from coreProps.json at runtime.
"""
def __init__(
self,
url: str = "",
led_count: int = 0,
gamesense_device_type: str = "keyboard",
**kwargs,
):
self._address = url.replace("gamesense://", "").strip() if url else ""
self._led_count = led_count
self._gs_device_type = gamesense_device_type
self._connected = False
self._http_client = None
self._base_url: Optional[str] = None
async def connect(self) -> bool:
import httpx
# Discover GameSense address
address = self._address or _get_gamesense_address()
if not address:
raise RuntimeError(
"Cannot find SteelSeries Engine. "
"Ensure SteelSeries GG/Engine is running."
)
self._base_url = f"http://{address}"
self._http_client = httpx.AsyncClient(timeout=5.0)
try:
# Register game
await self._http_client.post(
f"{self._base_url}/game_metadata",
json={
"game": GAME_NAME,
"game_display_name": GAME_DISPLAY_NAME,
"developer": "WLED-SC",
},
)
# Register event
await self._http_client.post(
f"{self._base_url}/register_game_event",
json={
"game": GAME_NAME,
"event": EVENT_NAME,
"min_value": 0,
"max_value": 100,
"value_optional": True,
},
)
# Bind event to device zone with context-color mode
zone_info = GAMESENSE_ZONES.get(
self._gs_device_type, ("rgb-per-key-zones", "all")
)
device_type, zone = zone_info
await self._http_client.post(
f"{self._base_url}/bind_game_event",
json={
"game": GAME_NAME,
"event": EVENT_NAME,
"handlers": [
{
"device-type": device_type,
"zone": zone,
"mode": "context-color",
"context-frame-key": "zone-color",
}
],
},
)
except Exception as e:
logger.error("GameSense setup failed: %s", e)
await self._http_client.aclose()
self._http_client = None
raise
self._connected = True
logger.info(
"GameSense client connected: device=%s address=%s leds=%d",
self._gs_device_type, address, self._led_count,
)
return True
async def close(self) -> None:
if self._http_client and self._base_url:
try:
# Remove game registration
await self._http_client.post(
f"{self._base_url}/remove_game",
json={"game": GAME_NAME},
)
except Exception:
pass
if self._http_client:
await self._http_client.aclose()
self._http_client = None
self._connected = False
self._base_url = None
logger.info("GameSense client closed")
@property
def is_connected(self) -> bool:
return self._connected and self._http_client is not None
async def send_pixels(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> bool:
if not self.is_connected or not self._http_client:
return False
if isinstance(pixels, np.ndarray):
pixel_arr = pixels
else:
pixel_arr = np.array(pixels, dtype=np.uint8)
bri_scale = brightness / 255.0
# Use average color for single-zone devices, or first N for multi-zone
if len(pixel_arr) == 0:
return True
# Compute average color for the zone
avg = pixel_arr.mean(axis=0)
r = int(avg[0] * bri_scale)
g = int(avg[1] * bri_scale)
b = int(avg[2] * bri_scale)
event_data = {
"game": GAME_NAME,
"event": EVENT_NAME,
"data": {
"value": 100,
"frame": {
"zone-color": {
"red": r,
"green": g,
"blue": b,
}
},
},
}
try:
resp = await self._http_client.post(
f"{self._base_url}/game_event",
json=event_data,
)
return resp.status_code == 200
except Exception as e:
logger.error("GameSense send failed: %s", e)
return False
@classmethod
async def check_health(
cls,
url: str,
http_client,
prev_health: Optional[DeviceHealth] = None,
) -> DeviceHealth:
"""Check if SteelSeries Engine is running."""
address = url.replace("gamesense://", "").strip() if url else None
if not address:
address = _get_gamesense_address()
if not address:
return DeviceHealth(
online=False,
error="SteelSeries Engine not found (coreProps.json missing)",
last_checked=datetime.now(timezone.utc),
)
try:
resp = await http_client.get(f"http://{address}", timeout=3.0)
if resp.status_code < 500:
return DeviceHealth(
online=True,
latency_ms=0.0,
device_name="SteelSeries GameSense",
last_checked=datetime.now(timezone.utc),
)
except Exception:
pass
return DeviceHealth(
online=False,
error="SteelSeries Engine not responding",
last_checked=datetime.now(timezone.utc),
)

View File

@@ -0,0 +1,104 @@
"""SteelSeries GameSense device provider — control SteelSeries RGB peripherals."""
from datetime import datetime, timezone
from typing import List
from wled_controller.core.devices.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
)
from wled_controller.core.devices.gamesense_client import (
GameSenseClient,
GAMESENSE_ZONES,
_get_gamesense_address,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class GameSenseDeviceProvider(LEDDeviceProvider):
"""Provider for SteelSeries GameSense RGB peripherals.
URL format: gamesense://address or gamesense://auto (auto-discover from coreProps.json)
Requires SteelSeries GG / SteelSeries Engine 3.
"""
@property
def device_type(self) -> str:
return "gamesense"
@property
def capabilities(self) -> set:
return {"manual_led_count", "health_check"}
def create_client(self, url: str, **kwargs) -> LEDClient:
return GameSenseClient(
url=url,
led_count=kwargs.get("led_count", 0),
gamesense_device_type=kwargs.get("gamesense_device_type", "keyboard"),
)
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
return await GameSenseClient.check_health(url, http_client, prev_health)
async def validate_device(self, url: str) -> dict:
"""Validate GameSense is reachable."""
address = url.replace("gamesense://", "").strip()
if not address or address == "auto":
address = _get_gamesense_address()
if not address:
raise ValueError(
"Cannot find SteelSeries Engine. "
"Ensure SteelSeries GG is running."
)
import httpx
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(f"http://{address}")
if resp.status_code >= 500:
raise ValueError("SteelSeries Engine returned server error")
except httpx.ConnectError:
raise ValueError(
"Cannot connect to SteelSeries Engine at "
f"{address}. Ensure it is running."
)
return {}
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
"""Discover SteelSeries Engine if running."""
address = _get_gamesense_address()
if not address:
return []
import httpx
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.get(f"http://{address}")
if resp.status_code >= 500:
return []
except Exception:
return []
# Engine is running — offer device types
results = []
for dev_type in GAMESENSE_ZONES:
results.append(
DiscoveredDevice(
name=f"SteelSeries {dev_type.title()}",
url=f"gamesense://{address}",
device_type="gamesense",
ip=address.split(":")[0],
mac="",
led_count=None,
version=None,
)
)
return results

View File

@@ -0,0 +1,240 @@
"""Philips Hue LED client — streams color data via the Hue Entertainment API."""
import asyncio
import socket
import struct
from datetime import datetime, timezone
from typing import List, Optional, Tuple, Union
import numpy as np
from wled_controller.core.devices.led_client import DeviceHealth, LEDClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
# Hue Entertainment API constants
HUE_ENT_PORT = 2100
PROTOCOL_NAME = b"HueStream"
VERSION_MAJOR = 2
VERSION_MINOR = 0
COLOR_SPACE_RGB = 0x00
# API version 2 header: "HueStream" + version(2B) + sequence(1B) + reserved(2B)
# + color_space(1B) + reserved(1B) = 16 bytes
HEADER_SIZE = 16
def _build_entertainment_frame(
lights: List[Tuple[int, int, int]],
brightness: int = 255,
sequence: int = 0,
) -> bytes:
"""Build a Hue Entertainment API v2 UDP frame.
Each light gets 7 bytes: [light_id(2B)][R(2B)][G(2B)][B(2B)]
Colors are 16-bit (0-65535). We scale 8-bit RGB + brightness.
"""
# Header
header = bytearray(HEADER_SIZE)
header[0:9] = PROTOCOL_NAME
header[9] = VERSION_MAJOR
header[10] = VERSION_MINOR
header[11] = sequence & 0xFF
header[12] = 0x00 # reserved
header[13] = 0x00 # reserved
header[14] = COLOR_SPACE_RGB
header[15] = 0x00 # reserved
# Light data
bri_scale = brightness / 255.0
data = bytearray()
for idx, (r, g, b) in enumerate(lights):
light_id = idx # 0-based light index in entertainment group
r16 = int(r * bri_scale * 257) # scale 0-255 to 0-65535
g16 = int(g * bri_scale * 257)
b16 = int(b * bri_scale * 257)
data += struct.pack(">BHHH", light_id, r16, g16, b16)
return bytes(header) + bytes(data)
class HueClient(LEDClient):
"""LED client for Philips Hue Entertainment API streaming.
Uses UDP (optionally DTLS) to stream color data at ~25 fps to a Hue
entertainment group. Each light in the group is treated as one "LED".
"""
def __init__(
self,
url: str = "",
led_count: int = 0,
hue_username: str = "",
hue_client_key: str = "",
hue_entertainment_group_id: str = "",
**kwargs,
):
self._bridge_ip = url.replace("hue://", "").rstrip("/")
self._led_count = led_count
self._username = hue_username
self._client_key = hue_client_key
self._group_id = hue_entertainment_group_id
self._sock: Optional[socket.socket] = None
self._connected = False
self._sequence = 0
self._dtls_sock = None
async def connect(self) -> bool:
loop = asyncio.get_event_loop()
# Activate entertainment streaming via REST API
await self._activate_streaming(True)
# Open UDP socket for entertainment streaming
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.setblocking(False)
# Try DTLS if dtls library is available
try:
from dtls import do_patch
do_patch()
import ssl
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
ctx.set_ciphers("PSK-AES128-GCM-SHA256")
# PSK identity = username, key = client_key (hex decoded)
psk = bytes.fromhex(self._client_key)
ctx.set_psk_client_callback(lambda hint: (self._username.encode(), psk))
self._dtls_sock = ctx.wrap_socket(self._sock, server_hostname=self._bridge_ip)
await loop.run_in_executor(
None,
lambda: self._dtls_sock.connect((self._bridge_ip, HUE_ENT_PORT)),
)
logger.info("Hue DTLS connection established to %s", self._bridge_ip)
except (ImportError, Exception) as e:
# Fall back to plain UDP (works for local testing / older bridges)
logger.warning(
"Hue DTLS not available (%s), falling back to plain UDP. "
"Install 'dtls' package for encrypted streaming.",
e,
)
self._dtls_sock = None
self._connected = True
logger.info(
"Hue client connected: bridge=%s group=%s lights=%d",
self._bridge_ip, self._group_id, self._led_count,
)
return True
async def _activate_streaming(self, active: bool) -> None:
"""Activate/deactivate entertainment streaming via Hue REST API."""
import httpx
url = f"https://{self._bridge_ip}/clip/v2/resource/entertainment_configuration/{self._group_id}"
payload = {"action": "start" if active else "stop"}
headers = {"hue-application-key": self._username}
async with httpx.AsyncClient(verify=False, timeout=5.0) as client:
resp = await client.put(url, json=payload, headers=headers)
if resp.status_code not in (200, 207):
logger.warning("Hue streaming %s failed: %s", "start" if active else "stop", resp.text)
async def close(self) -> None:
if self._connected:
try:
await self._activate_streaming(False)
except Exception:
pass
if self._dtls_sock:
try:
self._dtls_sock.close()
except Exception:
pass
if self._sock:
try:
self._sock.close()
except Exception:
pass
self._sock = None
self._dtls_sock = None
self._connected = False
logger.info("Hue client closed: bridge=%s", self._bridge_ip)
@property
def is_connected(self) -> bool:
return self._connected
@property
def supports_fast_send(self) -> bool:
return True
def send_pixels_fast(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> None:
if not self._connected:
return
if isinstance(pixels, np.ndarray):
light_colors = [tuple(pixels[i]) for i in range(min(len(pixels), self._led_count))]
else:
light_colors = pixels[: self._led_count]
frame = _build_entertainment_frame(light_colors, brightness, self._sequence)
self._sequence = (self._sequence + 1) & 0xFF
try:
if self._dtls_sock:
self._dtls_sock.send(frame)
elif self._sock:
self._sock.sendto(frame, (self._bridge_ip, HUE_ENT_PORT))
except Exception as e:
logger.warning("Hue send failed: %s", e)
async def send_pixels(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> bool:
if not self._connected:
return False
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: self.send_pixels_fast(pixels, brightness),
)
return True
@classmethod
async def check_health(
cls,
url: str,
http_client,
prev_health: Optional[DeviceHealth] = None,
) -> DeviceHealth:
"""Check if the Hue bridge is reachable."""
bridge_ip = url.replace("hue://", "").rstrip("/")
try:
import httpx
import time
start = time.time()
async with httpx.AsyncClient(verify=False, timeout=3.0) as client:
resp = await client.get(f"https://{bridge_ip}/api/0/config")
latency = (time.time() - start) * 1000
if resp.status_code == 200:
data = resp.json()
return DeviceHealth(
online=True,
latency_ms=round(latency, 1),
device_name=data.get("name", "Hue Bridge"),
device_version=data.get("swversion"),
last_checked=datetime.now(timezone.utc),
)
return DeviceHealth(online=False, error="Unexpected response", last_checked=datetime.now(timezone.utc))
except Exception as e:
return DeviceHealth(online=False, error=str(e), last_checked=datetime.now(timezone.utc))

View File

@@ -0,0 +1,195 @@
"""Philips Hue device provider — entertainment streaming to Hue lights."""
import asyncio
from datetime import datetime, timezone
from typing import List, Tuple
from wled_controller.core.devices.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
)
from wled_controller.core.devices.hue_client import HueClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class HueDeviceProvider(LEDDeviceProvider):
"""Provider for Philips Hue Entertainment API streaming.
URL format: hue://<bridge-ip>
Each device = one entertainment group on the bridge.
LED count = number of lights in the entertainment group.
"""
@property
def device_type(self) -> str:
return "hue"
@property
def capabilities(self) -> set:
return {
"manual_led_count",
"brightness_control",
"power_control",
"health_check",
"static_color",
}
def create_client(self, url: str, **kwargs) -> LEDClient:
return HueClient(
url,
led_count=kwargs.get("led_count", 0),
hue_username=kwargs.get("hue_username", ""),
hue_client_key=kwargs.get("hue_client_key", ""),
hue_entertainment_group_id=kwargs.get("hue_entertainment_group_id", ""),
)
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
return await HueClient.check_health(url, http_client, prev_health)
async def validate_device(self, url: str) -> dict:
"""Validate Hue bridge is reachable. LED count is manual (depends on entertainment group)."""
bridge_ip = url.replace("hue://", "").rstrip("/")
try:
import httpx
async with httpx.AsyncClient(verify=False, timeout=5.0) as client:
resp = await client.get(f"https://{bridge_ip}/api/0/config")
if resp.status_code == 200:
data = resp.json()
if "bridgeid" in data or "name" in data:
return {}
raise ValueError(f"Device at {bridge_ip} does not appear to be a Hue bridge")
except ValueError:
raise
except Exception as e:
raise ValueError(f"Cannot reach Hue bridge at {bridge_ip}: {e}")
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
"""Discover Hue bridges via mDNS."""
results = []
try:
from zeroconf import ServiceBrowser, Zeroconf
found = []
class Listener:
def add_service(self, zc, type_, name):
info = zc.get_service_info(type_, name)
if info:
found.append(info)
def remove_service(self, zc, type_, name):
pass
def update_service(self, zc, type_, name):
pass
zc = Zeroconf()
try:
ServiceBrowser(zc, "_hue._tcp.local.", Listener())
await asyncio.sleep(min(timeout, 3.0))
for info in found:
addresses = info.parsed_addresses()
if not addresses:
continue
ip = addresses[0]
name = info.server.rstrip(".")
results.append(
DiscoveredDevice(
name=f"Hue Bridge ({name})",
url=f"hue://{ip}",
device_type="hue",
ip=ip,
mac="",
led_count=None,
version=None,
)
)
finally:
zc.close()
except ImportError:
logger.debug("zeroconf not available for Hue discovery")
return results
async def get_brightness(self, url: str) -> int:
"""Get bridge group brightness (not per-light)."""
raise NotImplementedError
async def set_brightness(self, url: str, brightness: int) -> None:
"""Software brightness — handled at send time."""
raise NotImplementedError
async def get_power(self, url: str, **kwargs) -> bool:
return True
async def set_power(self, url: str, on: bool, **kwargs) -> None:
"""Turn all lights in group on/off via REST API."""
bridge_ip = url.replace("hue://", "").rstrip("/")
hue_username = kwargs.get("hue_username", "")
group_id = kwargs.get("hue_entertainment_group_id", "")
if not hue_username or not group_id:
return
try:
import httpx
api_url = f"https://{bridge_ip}/clip/v2/resource/grouped_light"
headers = {"hue-application-key": hue_username}
async with httpx.AsyncClient(verify=False, timeout=5.0) as client:
resp = await client.get(api_url, headers=headers)
if resp.status_code == 200:
# Find the grouped_light for our entertainment config
for item in resp.json().get("data", []):
await client.put(
f"{api_url}/{item['id']}",
json={"on": {"on": on}},
headers=headers,
)
except Exception as e:
logger.error("Failed to set Hue power: %s", e)
async def set_color(self, url: str, color: Tuple[int, int, int], **kwargs) -> None:
"""Set all lights to a solid color via REST API."""
bridge_ip = url.replace("hue://", "").rstrip("/")
hue_username = kwargs.get("hue_username", "")
if not hue_username:
return
try:
import httpx
headers = {"hue-application-key": hue_username}
# Convert RGB to CIE xy for Hue API
r, g, b = [c / 255.0 for c in color]
# sRGB to linear
r = r / 12.92 if r <= 0.04045 else ((r + 0.055) / 1.055) ** 2.4
g = g / 12.92 if g <= 0.04045 else ((g + 0.055) / 1.055) ** 2.4
b = b / 12.92 if b <= 0.04045 else ((b + 0.055) / 1.055) ** 2.4
X = r * 0.4124 + g * 0.3576 + b * 0.1805
Y = r * 0.2126 + g * 0.7152 + b * 0.0722
Z = r * 0.0193 + g * 0.1192 + b * 0.9505
total = X + Y + Z
if total > 0:
x = X / total
y = Y / total
else:
x, y = 0.3127, 0.3290 # D65 white point
api_url = f"https://{bridge_ip}/clip/v2/resource/light"
async with httpx.AsyncClient(verify=False, timeout=5.0) as client:
resp = await client.get(api_url, headers=headers)
if resp.status_code == 200:
for light in resp.json().get("data", []):
await client.put(
f"{api_url}/{light['id']}",
json={
"on": {"on": True},
"color": {"xy": {"x": round(x, 4), "y": round(y, 4)}},
},
headers=headers,
)
except Exception as e:
logger.error("Failed to set Hue color: %s", e)

View File

@@ -299,5 +299,23 @@ def _register_builtin_providers():
from wled_controller.core.devices.dmx_provider import DMXDeviceProvider
register_provider(DMXDeviceProvider())
from wled_controller.core.devices.espnow_provider import ESPNowDeviceProvider
register_provider(ESPNowDeviceProvider())
from wled_controller.core.devices.hue_provider import HueDeviceProvider
register_provider(HueDeviceProvider())
from wled_controller.core.devices.usbhid_provider import USBHIDDeviceProvider
register_provider(USBHIDDeviceProvider())
from wled_controller.core.devices.spi_provider import SPIDeviceProvider
register_provider(SPIDeviceProvider())
from wled_controller.core.devices.chroma_provider import ChromaDeviceProvider
register_provider(ChromaDeviceProvider())
from wled_controller.core.devices.gamesense_provider import GameSenseDeviceProvider
register_provider(GameSenseDeviceProvider())
_register_builtin_providers()

View File

@@ -0,0 +1,264 @@
"""SPI Direct LED client — drives WS2812/SK6812 strips via Raspberry Pi GPIO/SPI."""
import asyncio
from datetime import datetime, timezone
from typing import List, Optional, Tuple, Union
import numpy as np
from wled_controller.core.devices.led_client import DeviceHealth, LEDClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
# Supported LED chipsets
LED_TYPES = {
"WS2812": {"freq_hz": 800000, "invert": False, "strip_type": None},
"WS2812B": {"freq_hz": 800000, "invert": False, "strip_type": None},
"WS2811": {"freq_hz": 800000, "invert": False, "strip_type": None},
"SK6812": {"freq_hz": 800000, "invert": False, "strip_type": None},
"SK6812_RGBW": {"freq_hz": 800000, "invert": False, "strip_type": None},
}
# SPI speed for bitbang protocol
DEFAULT_SPI_SPEED = 800000
# GPIO pin for rpi_ws281x (GPIO 18 = PWM0, GPIO 10 = SPI0 MOSI)
DEFAULT_GPIO_PIN = 18
def _parse_spi_url(url: str) -> dict:
"""Parse SPI URL format.
Formats:
spi://gpio:18 — rpi_ws281x via GPIO pin 18
spi://spidev:0.0 — SPI device /dev/spidev0.0
spi://0 — shorthand for GPIO pin 0 (actually pin 18 default)
"""
path = url.replace("spi://", "")
if path.startswith("gpio:"):
pin = int(path.split(":")[1])
return {"method": "gpio", "gpio_pin": pin}
elif path.startswith("spidev:"):
dev = path.split(":")[1]
bus, device = dev.split(".")
return {"method": "spidev", "bus": int(bus), "device": int(device)}
else:
# Default: gpio pin
try:
pin = int(path) if path else DEFAULT_GPIO_PIN
except ValueError:
pin = DEFAULT_GPIO_PIN
return {"method": "gpio", "gpio_pin": pin}
class SPIClient(LEDClient):
"""LED client that drives addressable LED strips directly via Raspberry Pi GPIO/SPI.
Uses rpi_ws281x library for GPIO-based control, or spidev for raw SPI.
Requires root privileges or SPI group membership.
"""
def __init__(
self,
url: str = "",
led_count: int = 0,
spi_speed_hz: int = DEFAULT_SPI_SPEED,
spi_led_type: str = "WS2812B",
**kwargs,
):
self._config = _parse_spi_url(url)
self._led_count = led_count
self._speed_hz = spi_speed_hz
self._led_type = spi_led_type
self._strip = None
self._spi = None
self._connected = False
async def connect(self) -> bool:
loop = asyncio.get_event_loop()
if self._config["method"] == "gpio":
await loop.run_in_executor(None, self._connect_rpi_ws281x)
else:
await loop.run_in_executor(None, self._connect_spidev)
self._connected = True
logger.info(
"SPI client connected: method=%s leds=%d type=%s",
self._config["method"], self._led_count, self._led_type,
)
return True
def _connect_rpi_ws281x(self):
"""Connect via rpi_ws281x library (GPIO PWM/DMA)."""
try:
from rpi_ws281x import PixelStrip, Color
except ImportError:
raise RuntimeError(
"rpi_ws281x is required for GPIO LED control: "
"pip install rpi_ws281x (Linux/RPi only)"
)
led_info = LED_TYPES.get(self._led_type, LED_TYPES["WS2812B"])
gpio_pin = self._config["gpio_pin"]
self._strip = PixelStrip(
self._led_count,
gpio_pin,
led_info["freq_hz"],
10, # DMA channel
led_info["invert"],
255, # max brightness
0, # channel (0 for GPIO 18, 1 for GPIO 13)
)
self._strip.begin()
def _connect_spidev(self):
"""Connect via spidev (raw SPI bus)."""
try:
import spidev
except ImportError:
raise RuntimeError("spidev is required for SPI LED control: pip install spidev")
bus = self._config["bus"]
device = self._config["device"]
self._spi = spidev.SpiDev()
self._spi.open(bus, device)
self._spi.max_speed_hz = self._speed_hz
self._spi.mode = 0
async def close(self) -> None:
loop = asyncio.get_event_loop()
if self._strip:
# Turn off all LEDs
def _clear():
for i in range(self._led_count):
self._strip.setPixelColor(i, 0)
self._strip.show()
await loop.run_in_executor(None, _clear)
self._strip = None
if self._spi:
await loop.run_in_executor(None, self._spi.close)
self._spi = None
self._connected = False
logger.info("SPI client closed")
@property
def is_connected(self) -> bool:
return self._connected
@property
def supports_fast_send(self) -> bool:
return True
def send_pixels_fast(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> None:
if not self._connected:
return
bri_scale = brightness / 255.0
if isinstance(pixels, np.ndarray):
pixel_arr = pixels
else:
pixel_arr = np.array(pixels, dtype=np.uint8)
if self._strip:
# rpi_ws281x path
try:
from rpi_ws281x import Color
except ImportError:
return
self._strip.setBrightness(brightness)
for i in range(min(len(pixel_arr), self._led_count)):
r, g, b = pixel_arr[i]
self._strip.setPixelColor(i, Color(int(r), int(g), int(b)))
self._strip.show()
elif self._spi:
# SPI bitbang path: convert RGB to WS2812 wire format
# Each bit is encoded as 3 SPI bits: 1=110, 0=100
scaled = (pixel_arr[:self._led_count].astype(np.float32) * bri_scale).astype(np.uint8)
# GRB order for WS2812
grb = scaled[:, [1, 0, 2]]
raw_bytes = grb.tobytes()
# Encode each byte as 3 SPI bytes (8 bits → 24 SPI bits)
spi_data = bytearray()
for byte in raw_bytes:
for bit in range(7, -1, -1):
if byte & (1 << bit):
spi_data.append(0b110)
else:
spi_data.append(0b100)
# Reset pulse (>50us low)
spi_data.extend(b'\x00' * 20)
self._spi.writebytes2(list(spi_data))
async def send_pixels(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> bool:
if not self._connected:
return False
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(
None,
lambda: self.send_pixels_fast(pixels, brightness),
)
return True
except Exception as e:
logger.error("SPI send failed: %s", e)
return False
@classmethod
async def check_health(
cls,
url: str,
http_client,
prev_health: Optional[DeviceHealth] = None,
) -> DeviceHealth:
"""Check if the SPI/GPIO device is accessible."""
import platform
if platform.system() != "Linux":
return DeviceHealth(
online=False,
error="SPI direct is only available on Linux (Raspberry Pi)",
last_checked=datetime.now(timezone.utc),
)
config = _parse_spi_url(url)
if config["method"] == "spidev":
import os
dev_path = f"/dev/spidev{config['bus']}.{config['device']}"
if os.path.exists(dev_path):
return DeviceHealth(online=True, latency_ms=0.0, last_checked=datetime.now(timezone.utc))
return DeviceHealth(
online=False,
error=f"SPI device {dev_path} not found",
last_checked=datetime.now(timezone.utc),
)
else:
# GPIO — check if we can import rpi_ws281x
try:
import rpi_ws281x
return DeviceHealth(online=True, latency_ms=0.0, last_checked=datetime.now(timezone.utc))
except ImportError:
return DeviceHealth(
online=False,
error="rpi_ws281x not installed",
last_checked=datetime.now(timezone.utc),
)

View File

@@ -0,0 +1,133 @@
"""SPI Direct device provider — Raspberry Pi GPIO/SPI direct LED strip control."""
import platform
from datetime import datetime, timezone
from typing import List
from wled_controller.core.devices.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
)
from wled_controller.core.devices.spi_client import SPIClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class SPIDeviceProvider(LEDDeviceProvider):
"""Provider for direct SPI/GPIO LED strip control on Raspberry Pi.
URL formats:
spi://gpio:18 — rpi_ws281x via GPIO pin 18 (PWM)
spi://gpio:10 — rpi_ws281x via GPIO pin 10 (SPI MOSI)
spi://spidev:0.0 — Raw SPI device /dev/spidev0.0
Requires Linux (Raspberry Pi) and root privileges or appropriate permissions.
"""
@property
def device_type(self) -> str:
return "spi"
@property
def capabilities(self) -> set:
return {"manual_led_count", "health_check", "power_control", "brightness_control"}
def create_client(self, url: str, **kwargs) -> LEDClient:
return SPIClient(
url,
led_count=kwargs.get("led_count", 0),
spi_speed_hz=kwargs.get("spi_speed_hz", 800000),
spi_led_type=kwargs.get("spi_led_type", "WS2812B"),
)
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
return await SPIClient.check_health(url, http_client, prev_health)
async def validate_device(self, url: str) -> dict:
"""Validate SPI/GPIO is accessible. LED count is manual."""
if platform.system() != "Linux":
raise ValueError("SPI direct control is only available on Linux (Raspberry Pi)")
from wled_controller.core.devices.spi_client import _parse_spi_url
config = _parse_spi_url(url)
if config["method"] == "spidev":
import os
dev_path = f"/dev/spidev{config['bus']}.{config['device']}"
if not os.path.exists(dev_path):
raise ValueError(f"SPI device {dev_path} not found. Enable SPI in raspi-config.")
else:
try:
import rpi_ws281x # noqa: F401
except ImportError:
raise ValueError("rpi_ws281x library required: pip install rpi_ws281x")
return {}
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
"""Discover available SPI devices on Raspberry Pi."""
if platform.system() != "Linux":
return []
results = []
# Check for SPI devices
import os
for bus in range(2):
for device in range(2):
path = f"/dev/spidev{bus}.{device}"
if os.path.exists(path):
results.append(
DiscoveredDevice(
name=f"SPI Bus {bus} Device {device}",
url=f"spi://spidev:{bus}.{device}",
device_type="spi",
ip="",
mac="",
led_count=None,
version=None,
)
)
# Check for GPIO availability
try:
import rpi_ws281x # noqa: F401
results.append(
DiscoveredDevice(
name="GPIO 18 (PWM0)",
url="spi://gpio:18",
device_type="spi",
ip="",
mac="",
led_count=None,
version=None,
)
)
results.append(
DiscoveredDevice(
name="GPIO 10 (SPI MOSI)",
url="spi://gpio:10",
device_type="spi",
ip="",
mac="",
led_count=None,
version=None,
)
)
except ImportError:
pass
return results
async def get_power(self, url: str, **kwargs) -> bool:
return True
async def set_power(self, url: str, on: bool, **kwargs) -> None:
"""Power off = send all-black frame."""
pass # Handled at target processor level

View File

@@ -0,0 +1,178 @@
"""USB HID LED client — controls RGB peripherals via HID protocol using hidapi."""
import asyncio
from datetime import datetime, timezone
from typing import List, Optional, Tuple, Union
import numpy as np
from wled_controller.core.devices.led_client import DeviceHealth, LEDClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
def _parse_hid_url(url: str) -> Tuple[int, int]:
"""Parse 'hid://vendor_id:product_id' to (vid, pid) ints."""
path = url.replace("hid://", "")
parts = path.split(":")
if len(parts) != 2:
raise ValueError(f"Invalid HID URL: {url}. Expected format: hid://VID:PID (hex)")
return int(parts[0], 16), int(parts[1], 16)
class USBHIDClient(LEDClient):
"""LED client that controls RGB peripherals via USB HID reports.
Uses the hidapi library to send raw HID reports containing LED color data.
The report format is device-specific but follows a common pattern:
[REPORT_ID][MODE][LED_INDEX][R][G][B]...
For generic HID RGB devices, a bulk-set report is used:
[REPORT_ID=0x00][CMD=0x0E][ZONE_COUNT][R G B R G B ...]
"""
# Common HID report IDs and commands
REPORT_ID_LED = 0x00
CMD_SET_LEDS = 0x0E
MAX_REPORT_SIZE = 64 # typical HID report size
def __init__(
self,
url: str = "",
led_count: int = 0,
hid_usage_page: int = 0,
**kwargs,
):
self._vid, self._pid = _parse_hid_url(url)
self._led_count = led_count
self._usage_page = hid_usage_page
self._device = None
self._connected = False
async def connect(self) -> bool:
try:
import hid
except ImportError:
raise RuntimeError("hidapi is required for USB HID devices: pip install hidapi")
loop = asyncio.get_event_loop()
def _open():
device = hid.device()
device.open(self._vid, self._pid)
device.set_nonblocking(True)
return device
self._device = await loop.run_in_executor(None, _open)
self._connected = True
manufacturer = self._device.get_manufacturer_string() or "Unknown"
product = self._device.get_product_string() or "Unknown"
logger.info(
"USB HID client connected: %04X:%04X (%s %s) leds=%d",
self._vid, self._pid, manufacturer, product, self._led_count,
)
return True
async def close(self) -> None:
if self._device:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._device.close)
self._device = None
self._connected = False
logger.info("USB HID client closed: %04X:%04X", self._vid, self._pid)
@property
def is_connected(self) -> bool:
return self._connected and self._device is not None
async def send_pixels(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> bool:
if not self.is_connected:
return False
if isinstance(pixels, np.ndarray):
pixel_list = pixels.tolist()
else:
pixel_list = list(pixels)
bri_scale = brightness / 255.0
# Build HID reports — split across multiple reports if needed
# Each report: [REPORT_ID][CMD][OFFSET_LO][OFFSET_HI][COUNT][R G B R G B ...]
max_leds_per_report = (self.MAX_REPORT_SIZE - 5) // 3
offset = 0
reports = []
while offset < len(pixel_list):
chunk = pixel_list[offset: offset + max_leds_per_report]
report = bytearray(self.MAX_REPORT_SIZE)
report[0] = self.REPORT_ID_LED
report[1] = self.CMD_SET_LEDS
report[2] = offset & 0xFF
report[3] = (offset >> 8) & 0xFF
report[4] = len(chunk)
for i, (r, g, b) in enumerate(chunk):
base = 5 + i * 3
report[base] = int(r * bri_scale)
report[base + 1] = int(g * bri_scale)
report[base + 2] = int(b * bri_scale)
reports.append(bytes(report))
offset += len(chunk)
loop = asyncio.get_event_loop()
def _send_all():
for report in reports:
self._device.write(report)
try:
await loop.run_in_executor(None, _send_all)
return True
except Exception as e:
logger.error("USB HID send failed: %s", e)
self._connected = False
return False
@classmethod
async def check_health(
cls,
url: str,
http_client,
prev_health: Optional[DeviceHealth] = None,
) -> DeviceHealth:
"""Check if the HID device is present."""
try:
import hid
vid, pid = _parse_hid_url(url)
devices = hid.enumerate(vid, pid)
if devices:
info = devices[0]
return DeviceHealth(
online=True,
latency_ms=0.0,
device_name=info.get("product_string", "USB HID Device"),
last_checked=datetime.now(timezone.utc),
)
return DeviceHealth(
online=False,
error=f"HID device {vid:04X}:{pid:04X} not found",
last_checked=datetime.now(timezone.utc),
)
except ImportError:
return DeviceHealth(
online=False, error="hidapi not installed",
last_checked=datetime.now(timezone.utc),
)
except Exception as e:
return DeviceHealth(
online=False, error=str(e),
last_checked=datetime.now(timezone.utc),
)

View File

@@ -0,0 +1,103 @@
"""USB HID LED device provider — control RGB peripherals via USB HID."""
from datetime import datetime, timezone
from typing import List
from wled_controller.core.devices.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
)
from wled_controller.core.devices.usbhid_client import USBHIDClient, _parse_hid_url
from wled_controller.utils import get_logger
logger = get_logger(__name__)
# Known RGB peripheral vendor IDs and names
KNOWN_RGB_VENDORS = {
0x1532: "Razer",
0x1B1C: "Corsair",
0x1038: "SteelSeries",
0x046D: "Logitech",
0x2516: "Cooler Master",
0x0951: "HyperX",
0x3633: "Glorious",
0x320F: "NZXT",
}
class USBHIDDeviceProvider(LEDDeviceProvider):
"""Provider for USB HID RGB peripheral devices.
URL format: hid://VID:PID (hex, e.g. hid://1532:0084)
LED count = number of addressable zones/keys on the device.
"""
@property
def device_type(self) -> str:
return "usbhid"
@property
def capabilities(self) -> set:
return {"manual_led_count", "health_check"}
def create_client(self, url: str, **kwargs) -> LEDClient:
return USBHIDClient(
url,
led_count=kwargs.get("led_count", 0),
hid_usage_page=kwargs.get("hid_usage_page", 0),
)
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
return await USBHIDClient.check_health(url, http_client, prev_health)
async def validate_device(self, url: str) -> dict:
"""Validate HID device exists. LED count is manual."""
try:
import hid
vid, pid = _parse_hid_url(url)
devices = hid.enumerate(vid, pid)
if not devices:
raise ValueError(f"No HID device found with VID:PID {vid:04X}:{pid:04X}")
return {}
except ImportError:
raise ValueError("hidapi is required for USB HID devices: pip install hidapi")
except ValueError:
raise
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
"""Discover connected USB HID devices with known RGB vendor IDs."""
try:
import hid
results = []
seen = set()
for info in hid.enumerate():
vid = info.get("vendor_id", 0)
pid = info.get("product_id", 0)
key = (vid, pid)
if key in seen or vid == 0:
continue
seen.add(key)
vendor = KNOWN_RGB_VENDORS.get(vid)
if not vendor:
continue
product = info.get("product_string", "Unknown Device")
results.append(
DiscoveredDevice(
name=f"{vendor} {product}",
url=f"hid://{vid:04x}:{pid:04x}",
device_type="usbhid",
ip="",
mac="",
led_count=None,
version=None,
)
)
return results
except ImportError:
return []

View File

@@ -175,6 +175,15 @@ class ProcessorManager:
dmx_protocol = "artnet"
dmx_start_universe = 0
dmx_start_channel = 1
espnow_peer_mac = ""
espnow_channel = 1
hue_username = ""
hue_client_key = ""
hue_entertainment_group_id = ""
spi_speed_hz = 800000
spi_led_type = "WS2812B"
chroma_device_type = "chromalink"
gamesense_device_type = "keyboard"
if self._device_store:
dev = self._device_store.get_device(ds.device_id)
if dev:
@@ -183,6 +192,15 @@ class ProcessorManager:
dmx_protocol = getattr(dev, "dmx_protocol", "artnet")
dmx_start_universe = getattr(dev, "dmx_start_universe", 0)
dmx_start_channel = getattr(dev, "dmx_start_channel", 1)
espnow_peer_mac = getattr(dev, "espnow_peer_mac", "")
espnow_channel = getattr(dev, "espnow_channel", 1)
hue_username = getattr(dev, "hue_username", "")
hue_client_key = getattr(dev, "hue_client_key", "")
hue_entertainment_group_id = getattr(dev, "hue_entertainment_group_id", "")
spi_speed_hz = getattr(dev, "spi_speed_hz", 800000)
spi_led_type = getattr(dev, "spi_led_type", "WS2812B")
chroma_device_type = getattr(dev, "chroma_device_type", "chromalink")
gamesense_device_type = getattr(dev, "gamesense_device_type", "keyboard")
return DeviceInfo(
device_id=ds.device_id,
@@ -199,6 +217,15 @@ class ProcessorManager:
dmx_protocol=dmx_protocol,
dmx_start_universe=dmx_start_universe,
dmx_start_channel=dmx_start_channel,
espnow_peer_mac=espnow_peer_mac,
espnow_channel=espnow_channel,
hue_username=hue_username,
hue_client_key=hue_client_key,
hue_entertainment_group_id=hue_entertainment_group_id,
spi_speed_hz=spi_speed_hz,
spi_led_type=spi_led_type,
chroma_device_type=chroma_device_type,
gamesense_device_type=gamesense_device_type,
)
# ===== EVENT SYSTEM (state change notifications) =====

View File

@@ -80,6 +80,20 @@ class DeviceInfo:
dmx_protocol: str = "artnet"
dmx_start_universe: int = 0
dmx_start_channel: int = 1
# ESP-NOW fields
espnow_peer_mac: str = ""
espnow_channel: int = 1
# Philips Hue fields
hue_username: str = ""
hue_client_key: str = ""
hue_entertainment_group_id: str = ""
# SPI Direct fields
spi_speed_hz: int = 800000
spi_led_type: str = "WS2812B"
# Razer Chroma fields
chroma_device_type: str = "chromalink"
# SteelSeries GameSense fields
gamesense_device_type: str = "keyboard"
@dataclass

View File

@@ -112,6 +112,15 @@ class WledTargetProcessor(TargetProcessor):
dmx_protocol=device_info.dmx_protocol,
dmx_start_universe=device_info.dmx_start_universe,
dmx_start_channel=device_info.dmx_start_channel,
espnow_peer_mac=device_info.espnow_peer_mac,
espnow_channel=device_info.espnow_channel,
hue_username=device_info.hue_username,
hue_client_key=device_info.hue_client_key,
hue_entertainment_group_id=device_info.hue_entertainment_group_id,
spi_speed_hz=device_info.spi_speed_hz,
spi_led_type=device_info.spi_led_type,
chroma_device_type=device_info.chroma_device_type,
gamesense_device_type=device_info.gamesense_device_type,
)
await self._led_client.connect()

View File

@@ -139,6 +139,7 @@ h2 {
justify-content: center;
background: rgba(0, 0, 0, 0.7);
backdrop-filter: blur(4px);
overflow: hidden;
}
.connection-overlay-content {

View File

@@ -94,6 +94,30 @@ export function isDmxDevice(type) {
return type === 'dmx';
}
export function isEspnowDevice(type) {
return type === 'espnow';
}
export function isHueDevice(type) {
return type === 'hue';
}
export function isUsbhidDevice(type) {
return type === 'usbhid';
}
export function isSpiDevice(type) {
return type === 'spi';
}
export function isChromaDevice(type) {
return type === 'chroma';
}
export function isGameSenseDevice(type) {
return type === 'gamesense';
}
export function handle401Error() {
if (!apiKey) return; // Already handled or no session
localStorage.removeItem('wled_api_key');

View File

@@ -315,12 +315,12 @@ function _createOverlay(node, nodeWidth, callbacks) {
// Test button for applicable kinds
if (TEST_KINDS.has(node.kind) || (node.kind === 'output_target' && node.subtype === 'key_colors')) {
btns.push({ icon: '\uD83D\uDC41', action: 'test', cls: '' }); // 👁 test/preview
btns.push({ svgPath: P.eye, action: 'test', cls: '' });
}
// Notification test for notification color strip sources
if (node.kind === 'color_strip_source' && node.subtype === 'notification') {
btns.push({ icon: '\uD83D\uDD14', action: 'notify', cls: '' }); // 🔔
btns.push({ svgPath: P.bellRing, action: 'notify', cls: '' });
}
// Always: edit and delete
@@ -359,9 +359,22 @@ function _createOverlay(node, nodeWidth, callbacks) {
const by = oy + 2;
const bg = svgEl('g', { class: `graph-node-overlay-btn ${btn.cls}` });
bg.appendChild(svgEl('rect', { x: bx, y: by, width: btnSize, height: btnSize }));
const txt = svgEl('text', { x: bx + btnSize / 2, y: by + btnSize / 2 });
txt.textContent = btn.icon;
bg.appendChild(txt);
if (btn.svgPath) {
const iconG = svgEl('g', {
transform: `translate(${bx + 2}, ${by + 2}) scale(${(btnSize - 4) / 24})`,
});
iconG.innerHTML = btn.svgPath;
iconG.setAttribute('fill', 'none');
iconG.setAttribute('stroke', 'currentColor');
iconG.setAttribute('stroke-width', '2');
iconG.setAttribute('stroke-linecap', 'round');
iconG.setAttribute('stroke-linejoin', 'round');
bg.appendChild(iconG);
} else {
const txt = svgEl('text', { x: bx + btnSize / 2, y: by + btnSize / 2 });
txt.textContent = btn.icon;
bg.appendChild(txt);
}
const btnTip = svgEl('title');
btnTip.textContent = ACTION_LABELS[btn.action] || btn.action;
bg.appendChild(btnTip);

View File

@@ -74,3 +74,7 @@ export const power = '<path d="M18.36 6.64a9 9 0 1 1-12.73 0"/><line x1="
export const wifi = '<path d="M12 20h.01"/><path d="M2 8.82a15 15 0 0 1 20 0"/><path d="M5 12.859a10 10 0 0 1 14 0"/><path d="M8.5 16.429a5 5 0 0 1 7 0"/>';
export const flame = '<path d="M8.5 14.5A2.5 2.5 0 0 0 11 12c0-1.38-.5-2-1-3-1.072-2.143-.224-4.054 2-6 .5 2.5 2 4.9 4 6.5 2 1.6 3 3.5 3 5.5a7 7 0 1 1-14 0c0-1.153.433-2.294 1-3a2.5 2.5 0 0 0 2.5 2.5z"/>';
export const usb = '<circle cx="10" cy="7" r="1"/><circle cx="4" cy="20" r="1"/><path d="M4.7 19.3 19 5"/><path d="m21 3-3 1 2 2Z"/><path d="M10 8v3a1 1 0 0 1-1 1H4"/><path d="M14 12v2a1 1 0 0 0 1 1h3"/><circle cx="20" cy="15" r="1"/>';
export const cpu = '<rect x="4" y="4" width="16" height="16" rx="2"/><rect x="9" y="9" width="6" height="6"/><path d="M15 2v2"/><path d="M15 20v2"/><path d="M2 15h2"/><path d="M2 9h2"/><path d="M20 15h2"/><path d="M20 9h2"/><path d="M9 2v2"/><path d="M9 20v2"/>';
export const keyboard = '<path d="M10 8h.01"/><path d="M12 12h.01"/><path d="M14 8h.01"/><path d="M16 12h.01"/><path d="M18 8h.01"/><path d="M6 8h.01"/><path d="M7 16h10"/><path d="M8 12h.01"/><rect width="20" height="16" x="2" y="4" rx="2"/>';
export const mouse = '<rect x="5" y="2" width="14" height="20" rx="7"/><path d="M12 6v4"/>';
export const headphones = '<path d="M3 14h3a2 2 0 0 1 2 2v3a2 2 0 0 1-2 2H5a2 2 0 0 1-2-2v-7a9 9 0 0 1 18 0v7a2 2 0 0 1-2 2h-1a2 2 0 0 1-2-2v-3a2 2 0 0 1 2-2h3"/>';

View File

@@ -37,6 +37,8 @@ const _deviceTypeIcons = {
wled: _svg(P.wifi), adalight: _svg(P.usb), ambiled: _svg(P.usb),
mqtt: _svg(P.send), ws: _svg(P.globe), openrgb: _svg(P.palette),
dmx: _svg(P.radio), mock: _svg(P.wrench),
espnow: _svg(P.radio), hue: _svg(P.lightbulb), usbhid: _svg(P.usb),
spi: _svg(P.plug), chroma: _svg(P.zap), gamesense: _svg(P.target),
};
const _engineTypeIcons = {
mss: _svg(P.monitor), dxcam: _svg(P.zap), bettercam: _svg(P.rocket),
@@ -171,3 +173,7 @@ export const ICON_UNDO = _svg(P.undo2);
export const ICON_SCENE = _svg(P.sparkles);
export const ICON_CAPTURE = _svg(P.camera);
export const ICON_BELL = _svg(P.bellRing);
export const ICON_CPU = _svg(P.cpu);
export const ICON_KEYBOARD = _svg(P.keyboard);
export const ICON_MOUSE = _svg(P.mouse);
export const ICON_HEADPHONES = _svg(P.headphones);

View File

@@ -6,13 +6,13 @@ import {
_discoveryScanRunning, set_discoveryScanRunning,
_discoveryCache, set_discoveryCache,
} from '../core/state.js';
import { API_BASE, fetchWithAuth, isSerialDevice, isMockDevice, isMqttDevice, isWsDevice, isOpenrgbDevice, isDmxDevice, escapeHtml } from '../core/api.js';
import { API_BASE, fetchWithAuth, isSerialDevice, isMockDevice, isMqttDevice, isWsDevice, isOpenrgbDevice, isDmxDevice, isEspnowDevice, isHueDevice, isUsbhidDevice, isSpiDevice, isChromaDevice, isGameSenseDevice, escapeHtml } from '../core/api.js';
import { devicesCache } from '../core/state.js';
import { t } from '../core/i18n.js';
import { showToast, desktopFocus } from '../core/ui.js';
import { Modal } from '../core/modal.js';
import { _computeMaxFps, _renderFpsHint } from './devices.js';
import { getDeviceTypeIcon, ICON_RADIO, ICON_GLOBE } from '../core/icons.js';
import { getDeviceTypeIcon, ICON_RADIO, ICON_GLOBE, ICON_CPU, ICON_KEYBOARD, ICON_MOUSE, ICON_HEADPHONES, ICON_PLUG, ICON_TARGET_ICON, ICON_ACTIVITY } from '../core/icons.js';
import { IconSelect } from '../core/icon-select.js';
class AddDeviceModal extends Modal {
@@ -41,7 +41,7 @@ const addDeviceModal = new AddDeviceModal();
/* ── Icon-grid type selector ──────────────────────────────────── */
const DEVICE_TYPE_KEYS = ['wled', 'adalight', 'ambiled', 'mqtt', 'ws', 'openrgb', 'dmx', 'mock'];
const DEVICE_TYPE_KEYS = ['wled', 'adalight', 'ambiled', 'mqtt', 'ws', 'openrgb', 'dmx', 'espnow', 'hue', 'usbhid', 'spi', 'chroma', 'gamesense', 'mock'];
function _buildDeviceTypeItems() {
return DEVICE_TYPE_KEYS.map(key => ({
@@ -93,6 +93,76 @@ export function destroyDmxProtocolIconSelect(selectId) {
}
}
/* ── Icon-grid SPI LED chipset selector ──────────────────────── */
function _buildSpiLedTypeItems() {
return [
{ value: 'WS2812B', icon: ICON_CPU, label: 'WS2812B', desc: t('device.spi.led_type.ws2812b.desc') },
{ value: 'WS2812', icon: ICON_CPU, label: 'WS2812', desc: t('device.spi.led_type.ws2812.desc') },
{ value: 'WS2811', icon: ICON_CPU, label: 'WS2811', desc: t('device.spi.led_type.ws2811.desc') },
{ value: 'SK6812', icon: ICON_CPU, label: 'SK6812 (RGB)', desc: t('device.spi.led_type.sk6812.desc') },
{ value: 'SK6812_RGBW', icon: ICON_CPU, label: 'SK6812 (RGBW)', desc: t('device.spi.led_type.sk6812_rgbw.desc') },
];
}
const _spiLedTypeIconSelects = {};
export function ensureSpiLedTypeIconSelect(selectId) {
const sel = document.getElementById(selectId);
if (!sel) return;
if (_spiLedTypeIconSelects[selectId]) {
_spiLedTypeIconSelects[selectId].updateItems(_buildSpiLedTypeItems());
return;
}
_spiLedTypeIconSelects[selectId] = new IconSelect({
target: sel,
items: _buildSpiLedTypeItems(),
columns: 3,
});
}
export function destroySpiLedTypeIconSelect(selectId) {
if (_spiLedTypeIconSelects[selectId]) {
_spiLedTypeIconSelects[selectId].destroy();
delete _spiLedTypeIconSelects[selectId];
}
}
/* ── Icon-grid GameSense peripheral type selector ────────────── */
function _buildGameSenseDeviceTypeItems() {
return [
{ value: 'keyboard', icon: ICON_KEYBOARD, label: t('device.gamesense.peripheral.keyboard'), desc: t('device.gamesense.peripheral.keyboard.desc') },
{ value: 'mouse', icon: ICON_MOUSE, label: t('device.gamesense.peripheral.mouse'), desc: t('device.gamesense.peripheral.mouse.desc') },
{ value: 'headset', icon: ICON_HEADPHONES, label: t('device.gamesense.peripheral.headset'), desc: t('device.gamesense.peripheral.headset.desc') },
{ value: 'mousepad', icon: ICON_PLUG, label: t('device.gamesense.peripheral.mousepad'), desc: t('device.gamesense.peripheral.mousepad.desc') },
{ value: 'indicator', icon: ICON_ACTIVITY, label: t('device.gamesense.peripheral.indicator'), desc: t('device.gamesense.peripheral.indicator.desc') },
];
}
const _gameSenseDeviceTypeIconSelects = {};
export function ensureGameSenseDeviceTypeIconSelect(selectId) {
const sel = document.getElementById(selectId);
if (!sel) return;
if (_gameSenseDeviceTypeIconSelects[selectId]) {
_gameSenseDeviceTypeIconSelects[selectId].updateItems(_buildGameSenseDeviceTypeItems());
return;
}
_gameSenseDeviceTypeIconSelects[selectId] = new IconSelect({
target: sel,
items: _buildGameSenseDeviceTypeItems(),
columns: 3,
});
}
export function destroyGameSenseDeviceTypeIconSelect(selectId) {
if (_gameSenseDeviceTypeIconSelects[selectId]) {
_gameSenseDeviceTypeIconSelects[selectId].destroy();
delete _gameSenseDeviceTypeIconSelects[selectId];
}
}
export function onDeviceTypeChanged() {
const deviceType = document.getElementById('device-type').value;
if (_deviceTypeIconSelect) _deviceTypeIconSelect.setValue(deviceType);
@@ -126,6 +196,13 @@ export function onDeviceTypeChanged() {
if (dmxStartUniverseGroup) dmxStartUniverseGroup.style.display = 'none';
if (dmxStartChannelGroup) dmxStartChannelGroup.style.display = 'none';
// Hide new device type fields by default
_showEspnowFields(false);
_showHueFields(false);
_showSpiFields(false);
_showChromaFields(false);
_showGameSenseFields(false);
if (isMqttDevice(deviceType)) {
// MQTT: show URL (topic), LED count; hide serial/baud/led-type/latency/discovery
urlGroup.style.display = '';
@@ -228,6 +305,126 @@ export function onDeviceTypeChanged() {
} else {
scanForDevices();
}
} else if (isEspnowDevice(deviceType)) {
// ESP-NOW: serial port for gateway, LED count, baud rate, + ESP-NOW fields
urlGroup.style.display = 'none';
urlInput.removeAttribute('required');
serialGroup.style.display = '';
serialSelect.setAttribute('required', '');
ledCountGroup.style.display = '';
baudRateGroup.style.display = '';
if (ledTypeGroup) ledTypeGroup.style.display = 'none';
if (sendLatencyGroup) sendLatencyGroup.style.display = 'none';
if (discoverySection) discoverySection.style.display = 'none';
if (scanBtn) scanBtn.style.display = 'none';
// Show ESP-NOW specific fields
_showEspnowFields(true);
// Populate serial ports
if (deviceType in _discoveryCache) {
_populateSerialPortDropdown(_discoveryCache[deviceType]);
} else {
serialSelect.innerHTML = '';
const opt = document.createElement('option');
opt.value = '';
opt.textContent = t('device.serial_port.hint') || 'Click to discover ports...';
opt.disabled = true;
serialSelect.appendChild(opt);
}
} else if (isHueDevice(deviceType)) {
// Hue: show URL (bridge IP), LED count, + Hue auth fields
urlGroup.style.display = '';
urlInput.setAttribute('required', '');
serialGroup.style.display = 'none';
serialSelect.removeAttribute('required');
ledCountGroup.style.display = '';
baudRateGroup.style.display = 'none';
if (ledTypeGroup) ledTypeGroup.style.display = 'none';
if (sendLatencyGroup) sendLatencyGroup.style.display = 'none';
if (scanBtn) scanBtn.style.display = '';
_showHueFields(true);
if (urlLabel) urlLabel.textContent = t('device.hue.url') || 'Bridge IP';
if (urlHint) urlHint.textContent = t('device.hue.url.hint') || 'IP address of your Hue bridge';
urlInput.placeholder = 'hue://192.168.1.2';
if (deviceType in _discoveryCache) {
_renderDiscoveryList();
} else {
scanForDevices();
}
} else if (isUsbhidDevice(deviceType)) {
// USB HID: show URL (VID:PID), LED count
urlGroup.style.display = '';
urlInput.setAttribute('required', '');
serialGroup.style.display = 'none';
serialSelect.removeAttribute('required');
ledCountGroup.style.display = '';
baudRateGroup.style.display = 'none';
if (ledTypeGroup) ledTypeGroup.style.display = 'none';
if (sendLatencyGroup) sendLatencyGroup.style.display = 'none';
if (scanBtn) scanBtn.style.display = '';
if (urlLabel) urlLabel.textContent = t('device.usbhid.url') || 'VID:PID';
if (urlHint) urlHint.textContent = t('device.usbhid.url.hint') || 'USB Vendor:Product ID in hex';
urlInput.placeholder = 'hid://1532:0084';
if (deviceType in _discoveryCache) {
_renderDiscoveryList();
} else {
scanForDevices();
}
} else if (isSpiDevice(deviceType)) {
// SPI Direct: show URL (gpio/spidev), LED count, + SPI fields
urlGroup.style.display = '';
urlInput.setAttribute('required', '');
serialGroup.style.display = 'none';
serialSelect.removeAttribute('required');
ledCountGroup.style.display = '';
baudRateGroup.style.display = 'none';
if (ledTypeGroup) ledTypeGroup.style.display = 'none';
if (sendLatencyGroup) sendLatencyGroup.style.display = 'none';
if (scanBtn) scanBtn.style.display = '';
_showSpiFields(true);
ensureSpiLedTypeIconSelect('device-spi-led-type');
if (urlLabel) urlLabel.textContent = t('device.spi.url') || 'GPIO/SPI Path';
if (urlHint) urlHint.textContent = t('device.spi.url.hint') || 'GPIO pin or SPI device path';
urlInput.placeholder = 'spi://gpio:18';
if (deviceType in _discoveryCache) {
_renderDiscoveryList();
} else {
scanForDevices();
}
} else if (isChromaDevice(deviceType)) {
// Razer Chroma: auto URL, LED count, + peripheral type selector
urlGroup.style.display = 'none';
urlInput.removeAttribute('required');
serialGroup.style.display = 'none';
serialSelect.removeAttribute('required');
ledCountGroup.style.display = '';
baudRateGroup.style.display = 'none';
if (ledTypeGroup) ledTypeGroup.style.display = 'none';
if (sendLatencyGroup) sendLatencyGroup.style.display = 'none';
if (scanBtn) scanBtn.style.display = '';
_showChromaFields(true);
if (deviceType in _discoveryCache) {
_renderDiscoveryList();
} else {
scanForDevices();
}
} else if (isGameSenseDevice(deviceType)) {
// SteelSeries GameSense: auto URL, LED count, + device type selector
urlGroup.style.display = 'none';
urlInput.removeAttribute('required');
serialGroup.style.display = 'none';
serialSelect.removeAttribute('required');
ledCountGroup.style.display = '';
baudRateGroup.style.display = 'none';
if (ledTypeGroup) ledTypeGroup.style.display = 'none';
if (sendLatencyGroup) sendLatencyGroup.style.display = 'none';
if (scanBtn) scanBtn.style.display = '';
_showGameSenseFields(true);
ensureGameSenseDeviceTypeIconSelect('device-gamesense-device-type');
if (deviceType in _discoveryCache) {
_renderDiscoveryList();
} else {
scanForDevices();
}
} else {
urlGroup.style.display = '';
urlInput.setAttribute('required', '');
@@ -476,8 +673,13 @@ export async function handleAddDevice(event) {
url = 'mock://';
} else if (isWsDevice(deviceType)) {
url = 'ws://';
} else if (isSerialDevice(deviceType)) {
} else if (isSerialDevice(deviceType) || isEspnowDevice(deviceType)) {
url = document.getElementById('device-serial-port').value;
} else if (isChromaDevice(deviceType)) {
const chromaType = document.getElementById('device-chroma-device-type')?.value || 'chromalink';
url = `chroma://${chromaType}`;
} else if (isGameSenseDevice(deviceType)) {
url = 'gamesense://auto';
} else {
url = document.getElementById('device-url').value.trim();
}
@@ -525,6 +727,26 @@ export async function handleAddDevice(event) {
body.dmx_start_universe = parseInt(document.getElementById('device-dmx-start-universe')?.value || '0', 10);
body.dmx_start_channel = parseInt(document.getElementById('device-dmx-start-channel')?.value || '1', 10);
}
if (isEspnowDevice(deviceType)) {
body.espnow_peer_mac = document.getElementById('device-espnow-peer-mac')?.value || '';
body.espnow_channel = parseInt(document.getElementById('device-espnow-channel')?.value || '1', 10);
body.baud_rate = parseInt(document.getElementById('device-baud-rate')?.value || '921600', 10);
}
if (isHueDevice(deviceType)) {
body.hue_username = document.getElementById('device-hue-username')?.value || '';
body.hue_client_key = document.getElementById('device-hue-client-key')?.value || '';
body.hue_entertainment_group_id = document.getElementById('device-hue-group-id')?.value || '';
}
if (isSpiDevice(deviceType)) {
body.spi_speed_hz = parseInt(document.getElementById('device-spi-speed')?.value || '800000', 10);
body.spi_led_type = document.getElementById('device-spi-led-type')?.value || 'WS2812B';
}
if (isChromaDevice(deviceType)) {
body.chroma_device_type = document.getElementById('device-chroma-device-type')?.value || 'chromalink';
}
if (isGameSenseDevice(deviceType)) {
body.gamesense_device_type = document.getElementById('device-gamesense-device-type')?.value || 'keyboard';
}
if (lastTemplateId) body.capture_template_id = lastTemplateId;
const response = await fetchWithAuth('/devices', {
@@ -666,3 +888,39 @@ export function _getZoneMode(radioName = 'device-zone-mode') {
const radio = document.querySelector(`input[name="${radioName}"]:checked`);
return radio ? radio.value : 'combined';
}
/* ── New device type field visibility helpers ──────────────────── */
function _showEspnowFields(show) {
const ids = ['device-espnow-peer-mac-group', 'device-espnow-channel-group'];
ids.forEach(id => {
const el = document.getElementById(id);
if (el) el.style.display = show ? '' : 'none';
});
}
function _showHueFields(show) {
const ids = ['device-hue-username-group', 'device-hue-client-key-group', 'device-hue-group-id-group'];
ids.forEach(id => {
const el = document.getElementById(id);
if (el) el.style.display = show ? '' : 'none';
});
}
function _showSpiFields(show) {
const ids = ['device-spi-speed-group', 'device-spi-led-type-group'];
ids.forEach(id => {
const el = document.getElementById(id);
if (el) el.style.display = show ? '' : 'none';
});
}
function _showChromaFields(show) {
const el = document.getElementById('device-chroma-device-type-group');
if (el) el.style.display = show ? '' : 'none';
}
function _showGameSenseFields(show) {
const el = document.getElementById('device-gamesense-device-type-group');
if (el) el.style.display = show ? '' : 'none';
}

View File

@@ -7,7 +7,7 @@ import {
} from '../core/state.js';
import { API_BASE, getHeaders, fetchWithAuth, escapeHtml, isSerialDevice, isMockDevice, isMqttDevice, isWsDevice, isOpenrgbDevice, isDmxDevice } from '../core/api.js';
import { devicesCache } from '../core/state.js';
import { _fetchOpenrgbZones, _getCheckedZones, _splitOpenrgbZone, _getZoneMode, ensureDmxProtocolIconSelect, destroyDmxProtocolIconSelect } from './device-discovery.js';
import { _fetchOpenrgbZones, _getCheckedZones, _splitOpenrgbZone, _getZoneMode, ensureDmxProtocolIconSelect, destroyDmxProtocolIconSelect, ensureSpiLedTypeIconSelect, destroySpiLedTypeIconSelect, ensureGameSenseDeviceTypeIconSelect, destroyGameSenseDeviceTypeIconSelect } from './device-discovery.js';
import { t } from '../core/i18n.js';
import { showToast, showConfirm, desktopFocus } from '../core/ui.js';
import { Modal } from '../core/modal.js';

View File

@@ -146,6 +146,57 @@
"device.type.dmx.desc": "Art-Net / sACN (E1.31) stage lighting",
"device.type.mock": "Mock",
"device.type.mock.desc": "Virtual device for testing",
"device.type.espnow": "ESP-NOW",
"device.type.espnow.desc": "Ultra-low-latency via ESP32 gateway",
"device.type.hue": "Philips Hue",
"device.type.hue.desc": "Hue Entertainment API streaming",
"device.type.usbhid": "USB HID",
"device.type.usbhid.desc": "USB RGB peripherals (keyboards, mice)",
"device.type.spi": "SPI Direct",
"device.type.spi.desc": "Raspberry Pi GPIO/SPI LED strips",
"device.type.chroma": "Razer Chroma",
"device.type.chroma.desc": "Razer peripherals via Chroma SDK",
"device.type.gamesense": "SteelSeries",
"device.type.gamesense.desc": "SteelSeries peripherals via GameSense",
"device.chroma.device_type": "Peripheral Type:",
"device.chroma.device_type.hint": "Which Razer peripheral to control via Chroma SDK",
"device.gamesense.device_type": "Peripheral Type:",
"device.gamesense.device_type.hint": "Which SteelSeries peripheral to control via GameSense",
"device.espnow.peer_mac": "Peer MAC:",
"device.espnow.peer_mac.hint": "MAC address of the remote ESP32 receiver (e.g. AA:BB:CC:DD:EE:FF)",
"device.espnow.channel": "WiFi Channel:",
"device.espnow.channel.hint": "WiFi channel (1-14). Must match the receiver's channel.",
"device.hue.url": "Bridge IP:",
"device.hue.url.hint": "IP address of your Hue bridge",
"device.hue.username": "Bridge Username:",
"device.hue.username.hint": "Hue bridge application key from pairing",
"device.hue.client_key": "Client Key:",
"device.hue.client_key.hint": "Entertainment API client key (hex string from pairing)",
"device.hue.group_id": "Entertainment Group:",
"device.hue.group_id.hint": "Entertainment configuration ID from your Hue bridge",
"device.usbhid.url": "VID:PID:",
"device.usbhid.url.hint": "USB Vendor:Product ID in hex (e.g. 1532:0084)",
"device.spi.url": "GPIO/SPI Path:",
"device.spi.url.hint": "GPIO pin or SPI device path (e.g. spi://gpio:18)",
"device.spi.speed": "SPI Speed (Hz):",
"device.spi.speed.hint": "SPI clock speed. 800000 Hz for WS2812, 2400000 Hz for APA102.",
"device.spi.led_type": "LED Chipset:",
"device.spi.led_type.hint": "Type of addressable LED strip connected to the GPIO/SPI pin",
"device.spi.led_type.ws2812b.desc": "Most common, 800 KHz data, 3-wire RGB",
"device.spi.led_type.ws2812.desc": "Original WS2812, 800 KHz, 3-wire RGB",
"device.spi.led_type.ws2811.desc": "External driver IC, 400 KHz, 12V strips",
"device.spi.led_type.sk6812.desc": "Samsung LED, 800 KHz, 3-wire RGB",
"device.spi.led_type.sk6812_rgbw.desc": "SK6812 with dedicated white channel",
"device.gamesense.peripheral.keyboard": "Keyboard",
"device.gamesense.peripheral.keyboard.desc": "Per-key RGB illumination",
"device.gamesense.peripheral.mouse": "Mouse",
"device.gamesense.peripheral.mouse.desc": "Mouse RGB zones",
"device.gamesense.peripheral.headset": "Headset",
"device.gamesense.peripheral.headset.desc": "Headset earcup lighting",
"device.gamesense.peripheral.mousepad": "Mousepad",
"device.gamesense.peripheral.mousepad.desc": "Mousepad edge lighting zones",
"device.gamesense.peripheral.indicator": "Indicator",
"device.gamesense.peripheral.indicator.desc": "OLED/LED status indicator",
"device.dmx_protocol": "DMX Protocol:",
"device.dmx_protocol.hint": "Art-Net uses UDP port 6454, sACN (E1.31) uses UDP port 5568",
"device.dmx_protocol.artnet.desc": "UDP unicast, port 6454",

View File

@@ -38,6 +38,20 @@ class Device:
dmx_protocol: str = "artnet",
dmx_start_universe: int = 0,
dmx_start_channel: int = 1,
# ESP-NOW fields
espnow_peer_mac: str = "",
espnow_channel: int = 1,
# Philips Hue fields
hue_username: str = "",
hue_client_key: str = "",
hue_entertainment_group_id: str = "",
# SPI Direct fields
spi_speed_hz: int = 800000,
spi_led_type: str = "WS2812B",
# Razer Chroma fields
chroma_device_type: str = "chromalink",
# SteelSeries GameSense fields
gamesense_device_type: str = "keyboard",
created_at: Optional[datetime] = None,
updated_at: Optional[datetime] = None,
):
@@ -57,6 +71,15 @@ class Device:
self.dmx_protocol = dmx_protocol
self.dmx_start_universe = dmx_start_universe
self.dmx_start_channel = dmx_start_channel
self.espnow_peer_mac = espnow_peer_mac
self.espnow_channel = espnow_channel
self.hue_username = hue_username
self.hue_client_key = hue_client_key
self.hue_entertainment_group_id = hue_entertainment_group_id
self.spi_speed_hz = spi_speed_hz
self.spi_led_type = spi_led_type
self.chroma_device_type = chroma_device_type
self.gamesense_device_type = gamesense_device_type
self.created_at = created_at or datetime.now(timezone.utc)
self.updated_at = updated_at or datetime.now(timezone.utc)
@@ -92,6 +115,24 @@ class Device:
d["dmx_start_universe"] = self.dmx_start_universe
if self.dmx_start_channel != 1:
d["dmx_start_channel"] = self.dmx_start_channel
if self.espnow_peer_mac:
d["espnow_peer_mac"] = self.espnow_peer_mac
if self.espnow_channel != 1:
d["espnow_channel"] = self.espnow_channel
if self.hue_username:
d["hue_username"] = self.hue_username
if self.hue_client_key:
d["hue_client_key"] = self.hue_client_key
if self.hue_entertainment_group_id:
d["hue_entertainment_group_id"] = self.hue_entertainment_group_id
if self.spi_speed_hz != 800000:
d["spi_speed_hz"] = self.spi_speed_hz
if self.spi_led_type != "WS2812B":
d["spi_led_type"] = self.spi_led_type
if self.chroma_device_type != "chromalink":
d["chroma_device_type"] = self.chroma_device_type
if self.gamesense_device_type != "keyboard":
d["gamesense_device_type"] = self.gamesense_device_type
return d
@classmethod
@@ -114,6 +155,15 @@ class Device:
dmx_protocol=data.get("dmx_protocol", "artnet"),
dmx_start_universe=data.get("dmx_start_universe", 0),
dmx_start_channel=data.get("dmx_start_channel", 1),
espnow_peer_mac=data.get("espnow_peer_mac", ""),
espnow_channel=data.get("espnow_channel", 1),
hue_username=data.get("hue_username", ""),
hue_client_key=data.get("hue_client_key", ""),
hue_entertainment_group_id=data.get("hue_entertainment_group_id", ""),
spi_speed_hz=data.get("spi_speed_hz", 800000),
spi_led_type=data.get("spi_led_type", "WS2812B"),
chroma_device_type=data.get("chroma_device_type", "chromalink"),
gamesense_device_type=data.get("gamesense_device_type", "keyboard"),
created_at=datetime.fromisoformat(data.get("created_at", datetime.now(timezone.utc).isoformat())),
updated_at=datetime.fromisoformat(data.get("updated_at", datetime.now(timezone.utc).isoformat())),
)
@@ -202,6 +252,15 @@ class DeviceStore:
dmx_protocol: str = "artnet",
dmx_start_universe: int = 0,
dmx_start_channel: int = 1,
espnow_peer_mac: str = "",
espnow_channel: int = 1,
hue_username: str = "",
hue_client_key: str = "",
hue_entertainment_group_id: str = "",
spi_speed_hz: int = 800000,
spi_led_type: str = "WS2812B",
chroma_device_type: str = "chromalink",
gamesense_device_type: str = "keyboard",
) -> Device:
"""Create a new device."""
device_id = f"device_{uuid.uuid4().hex[:8]}"
@@ -225,6 +284,15 @@ class DeviceStore:
dmx_protocol=dmx_protocol,
dmx_start_universe=dmx_start_universe,
dmx_start_channel=dmx_start_channel,
espnow_peer_mac=espnow_peer_mac,
espnow_channel=espnow_channel,
hue_username=hue_username,
hue_client_key=hue_client_key,
hue_entertainment_group_id=hue_entertainment_group_id,
spi_speed_hz=spi_speed_hz,
spi_led_type=spi_led_type,
chroma_device_type=chroma_device_type,
gamesense_device_type=gamesense_device_type,
)
self._devices[device_id] = device
@@ -257,6 +325,15 @@ class DeviceStore:
dmx_protocol: Optional[str] = None,
dmx_start_universe: Optional[int] = None,
dmx_start_channel: Optional[int] = None,
espnow_peer_mac: Optional[str] = None,
espnow_channel: Optional[int] = None,
hue_username: Optional[str] = None,
hue_client_key: Optional[str] = None,
hue_entertainment_group_id: Optional[str] = None,
spi_speed_hz: Optional[int] = None,
spi_led_type: Optional[str] = None,
chroma_device_type: Optional[str] = None,
gamesense_device_type: Optional[str] = None,
) -> Device:
"""Update device."""
device = self._devices.get(device_id)
@@ -289,6 +366,24 @@ class DeviceStore:
device.dmx_start_universe = dmx_start_universe
if dmx_start_channel is not None:
device.dmx_start_channel = dmx_start_channel
if espnow_peer_mac is not None:
device.espnow_peer_mac = espnow_peer_mac
if espnow_channel is not None:
device.espnow_channel = espnow_channel
if hue_username is not None:
device.hue_username = hue_username
if hue_client_key is not None:
device.hue_client_key = hue_client_key
if hue_entertainment_group_id is not None:
device.hue_entertainment_group_id = hue_entertainment_group_id
if spi_speed_hz is not None:
device.spi_speed_hz = spi_speed_hz
if spi_led_type is not None:
device.spi_led_type = spi_led_type
if chroma_device_type is not None:
device.chroma_device_type = chroma_device_type
if gamesense_device_type is not None:
device.gamesense_device_type = gamesense_device_type
device.updated_at = datetime.now(timezone.utc)
self.save()

View File

@@ -34,6 +34,12 @@
<option value="ws">WebSocket</option>
<option value="openrgb">OpenRGB</option>
<option value="dmx">DMX</option>
<option value="espnow">ESP-NOW</option>
<option value="hue">Philips Hue</option>
<option value="usbhid">USB HID</option>
<option value="spi">SPI Direct</option>
<option value="chroma">Razer Chroma</option>
<option value="gamesense">SteelSeries</option>
<option value="mock">Mock</option>
</select>
</div>
@@ -154,6 +160,102 @@
<small class="input-hint" style="display:none" data-i18n="device.dmx_start_channel.hint">First DMX channel within the universe (1-512)</small>
<input type="number" id="device-dmx-start-channel" min="1" max="512" value="1">
</div>
<!-- ESP-NOW fields -->
<div class="form-group" id="device-espnow-peer-mac-group" style="display: none;">
<div class="label-row">
<label for="device-espnow-peer-mac" data-i18n="device.espnow.peer_mac">Peer MAC:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="device.espnow.peer_mac.hint">MAC address of the remote ESP32 receiver (e.g. AA:BB:CC:DD:EE:FF)</small>
<input type="text" id="device-espnow-peer-mac" placeholder="AA:BB:CC:DD:EE:FF" pattern="^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$">
</div>
<div class="form-group" id="device-espnow-channel-group" style="display: none;">
<div class="label-row">
<label for="device-espnow-channel" data-i18n="device.espnow.channel">WiFi Channel:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="device.espnow.channel.hint">WiFi channel (1-14). Must match the receiver's channel.</small>
<input type="number" id="device-espnow-channel" min="1" max="14" value="1">
</div>
<!-- Philips Hue fields -->
<div class="form-group" id="device-hue-username-group" style="display: none;">
<div class="label-row">
<label for="device-hue-username" data-i18n="device.hue.username">Bridge Username:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="device.hue.username.hint">Hue bridge application key from pairing</small>
<input type="text" id="device-hue-username" placeholder="Hue application key">
</div>
<div class="form-group" id="device-hue-client-key-group" style="display: none;">
<div class="label-row">
<label for="device-hue-client-key" data-i18n="device.hue.client_key">Client Key:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="device.hue.client_key.hint">Entertainment API client key (hex string from pairing)</small>
<input type="text" id="device-hue-client-key" placeholder="Hex client key">
</div>
<div class="form-group" id="device-hue-group-id-group" style="display: none;">
<div class="label-row">
<label for="device-hue-group-id" data-i18n="device.hue.group_id">Entertainment Group:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="device.hue.group_id.hint">Entertainment configuration ID from your Hue bridge</small>
<input type="text" id="device-hue-group-id" placeholder="Entertainment group ID">
</div>
<!-- SPI Direct fields -->
<div class="form-group" id="device-spi-speed-group" style="display: none;">
<div class="label-row">
<label for="device-spi-speed" data-i18n="device.spi.speed">SPI Speed (Hz):</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="device.spi.speed.hint">SPI clock speed. 800000 Hz for WS2812, 2400000 Hz for APA102.</small>
<input type="number" id="device-spi-speed" min="100000" max="4000000" value="800000">
</div>
<div class="form-group" id="device-spi-led-type-group" style="display: none;">
<div class="label-row">
<label for="device-spi-led-type" data-i18n="device.spi.led_type">LED Chipset:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="device.spi.led_type.hint">Type of addressable LED strip connected to the GPIO/SPI pin</small>
<select id="device-spi-led-type">
<option value="WS2812B">WS2812B</option>
<option value="WS2812">WS2812</option>
<option value="WS2811">WS2811</option>
<option value="SK6812">SK6812 (RGB)</option>
<option value="SK6812_RGBW">SK6812 (RGBW)</option>
</select>
</div>
<!-- Razer Chroma fields -->
<div class="form-group" id="device-chroma-device-type-group" style="display: none;">
<div class="label-row">
<label for="device-chroma-device-type" data-i18n="device.chroma.device_type">Peripheral Type:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="device.chroma.device_type.hint">Which Razer peripheral to control</small>
<select id="device-chroma-device-type">
<option value="chromalink">ChromaLink (5 zones)</option>
<option value="keyboard">Keyboard (132 keys)</option>
<option value="mouse">Mouse (30 LEDs)</option>
<option value="mousepad">Mousepad (15 LEDs)</option>
<option value="headset">Headset (5 zones)</option>
<option value="keypad">Keypad (20 keys)</option>
</select>
</div>
<!-- SteelSeries GameSense fields -->
<div class="form-group" id="device-gamesense-device-type-group" style="display: none;">
<div class="label-row">
<label for="device-gamesense-device-type" data-i18n="device.gamesense.device_type">Peripheral Type:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="device.gamesense.device_type.hint">Which SteelSeries peripheral to control</small>
<select id="device-gamesense-device-type">
<option value="keyboard">Keyboard</option>
<option value="mouse">Mouse</option>
<option value="headset">Headset</option>
<option value="mousepad">Mousepad</option>
<option value="indicator">Indicator</option>
</select>
</div>
<div id="add-device-error" class="error-message" style="display: none;"></div>
</form>
</div>