Add OpenRGB device support for PC peripheral ambient lighting

New device type enabling control of keyboards, mice, RAM, GPU, and fans
via the OpenRGB SDK server (TCP port 6742). Includes auto-discovery,
health monitoring, state snapshot/restore, and fast synchronous pixel
send with brightness scaling. Also updates TODO.md with complexity notes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-01 11:30:02 +03:00
parent bf2fd5ca69
commit b9ec509f56
12 changed files with 455 additions and 2 deletions

View File

@@ -288,5 +288,8 @@ def _register_builtin_providers():
from wled_controller.core.devices.ws_provider import WSDeviceProvider
register_provider(WSDeviceProvider())
from wled_controller.core.devices.openrgb_provider import OpenRGBDeviceProvider
register_provider(OpenRGBDeviceProvider())
_register_builtin_providers()

View File

@@ -0,0 +1,255 @@
"""OpenRGB LED client — controls RGB peripherals via the OpenRGB SDK server."""
import asyncio
import socket
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
from wled_controller.core.devices.led_client import DeviceHealth, LEDClient
from wled_controller.utils import get_logger
logger = get_logger(__name__)
def parse_openrgb_url(url: str) -> Tuple[str, int, int]:
"""Parse an openrgb:// URL into (host, port, device_index).
Format: openrgb://host:port/device_index
Defaults: host=localhost, port=6742, device_index=0
"""
if url.startswith("openrgb://"):
url = url[len("openrgb://"):]
else:
return ("localhost", 6742, 0)
# Split path from host:port
if "/" in url:
host_port, index_str = url.split("/", 1)
try:
device_index = int(index_str)
except ValueError:
device_index = 0
else:
host_port = url
device_index = 0
# Split host and port
if ":" in host_port:
host, port_str = host_port.rsplit(":", 1)
try:
port = int(port_str)
except ValueError:
port = 6742
else:
host = host_port if host_port else "localhost"
port = 6742
return (host, port, device_index)
class OpenRGBLEDClient(LEDClient):
"""Controls an OpenRGB device via the openrgb-python library.
The OpenRGB SDK server (TCP port 6742) exposes PC peripherals such as
keyboards, mice, RAM, GPU, and fans for live RGB control.
Uses asyncio.to_thread() for blocking library calls in connect/close,
and direct synchronous calls for send_pixels_fast (runs in processing thread).
"""
def __init__(self, url: str, **kwargs):
self._url = url
host, port, device_index = parse_openrgb_url(url)
self._host = host
self._port = port
self._device_index = device_index
self._client: Any = None # openrgb.OpenRGBClient
self._device: Any = None # openrgb.Device
self._connected = False
self._device_name: Optional[str] = None
self._device_led_count: Optional[int] = None
async def connect(self) -> bool:
"""Connect to OpenRGB server and access the target device."""
try:
self._client, self._device = await asyncio.to_thread(self._connect_sync)
self._connected = True
self._device_name = self._device.name
self._device_led_count = len(self._device.leds)
logger.info(
f"Connected to OpenRGB device '{self._device_name}' "
f"({self._device_led_count} LEDs) at {self._host}:{self._port}/{self._device_index}"
)
return True
except Exception as e:
self._connected = False
raise ConnectionError(f"Failed to connect to OpenRGB: {e}") from e
def _connect_sync(self) -> Tuple[Any, Any]:
"""Synchronous connect — runs in thread pool."""
from openrgb import OpenRGBClient
from openrgb.utils import DeviceType
client = OpenRGBClient(self._host, self._port, name="WLED Controller")
devices = client.devices
if self._device_index >= len(devices):
client.disconnect()
raise ValueError(
f"Device index {self._device_index} out of range "
f"(server has {len(devices)} device(s))"
)
device = devices[self._device_index]
# Set direct mode for live control (try direct first, fall back)
try:
device.set_mode("direct")
except Exception:
try:
# Some devices use different mode names
for mode in device.modes:
if mode.name.lower() in ("direct", "static", "custom"):
device.set_mode(mode.id)
break
except Exception as e:
logger.warning(f"Could not set direct mode on '{device.name}': {e}")
return client, device
async def close(self) -> None:
"""Disconnect from the OpenRGB server."""
if self._client is not None:
try:
await asyncio.to_thread(self._client.disconnect)
except Exception as e:
logger.debug(f"Error disconnecting OpenRGB: {e}")
finally:
self._client = None
self._device = None
self._connected = False
@property
def is_connected(self) -> bool:
return self._connected and self._client is not None
async def send_pixels(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> bool:
"""Send pixel colors to the OpenRGB device (async wrapper)."""
if not self.is_connected:
return False
try:
await asyncio.to_thread(self.send_pixels_fast, pixels, brightness)
return True
except Exception as e:
logger.error(f"OpenRGB send_pixels failed: {e}")
self._connected = False
return False
@property
def supports_fast_send(self) -> bool:
return True
def send_pixels_fast(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
brightness: int = 255,
) -> None:
"""Synchronous fire-and-forget send for the processing hot loop.
Converts numpy (N,3) array to List[RGBColor] and calls
device.set_colors(colors, fast=True) to skip the re-fetch round trip.
"""
if not self.is_connected or self._device is None:
return
try:
from openrgb.utils import RGBColor
if isinstance(pixels, np.ndarray):
pixel_array = pixels
else:
pixel_array = np.array(pixels, dtype=np.uint8)
# Apply brightness scaling
if brightness < 255:
pixel_array = (pixel_array.astype(np.uint16) * brightness >> 8).astype(np.uint8)
# Truncate or pad to match device LED count
n_device = len(self._device.leds)
n_pixels = len(pixel_array)
if n_pixels > n_device:
pixel_array = pixel_array[:n_device]
colors = [RGBColor(int(r), int(g), int(b)) for r, g, b in pixel_array]
# Pad with black if fewer pixels than device LEDs
if len(colors) < n_device:
colors.extend([RGBColor(0, 0, 0)] * (n_device - len(colors)))
self._device.set_colors(colors, fast=True)
except Exception as e:
logger.error(f"OpenRGB send_pixels_fast failed: {e}")
self._connected = False
async def snapshot_device_state(self) -> Optional[dict]:
"""Save the active mode index before streaming."""
if self._device is None:
return None
try:
return {"active_mode": self._device.active_mode}
except Exception as e:
logger.warning(f"Could not snapshot OpenRGB device state: {e}")
return None
async def restore_device_state(self, state: Optional[dict]) -> None:
"""Restore the original mode after streaming stops."""
if not state or self._device is None:
return
try:
mode_id = state.get("active_mode")
if mode_id is not None:
await asyncio.to_thread(self._device.set_mode, mode_id)
logger.info(f"Restored OpenRGB device mode to {mode_id}")
except Exception as e:
logger.warning(f"Could not restore OpenRGB device state: {e}")
@classmethod
async def check_health(
cls,
url: str,
http_client,
prev_health: Optional[DeviceHealth] = None,
) -> DeviceHealth:
"""Check OpenRGB server reachability via raw TCP socket connect.
Uses a lightweight socket probe instead of full library init to avoid
re-downloading all device data every health check cycle (~30s).
"""
host, port, device_index = parse_openrgb_url(url)
start = asyncio.get_event_loop().time()
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3.0)
await asyncio.to_thread(sock.connect, (host, port))
sock.close()
latency = (asyncio.get_event_loop().time() - start) * 1000
# Preserve cached device metadata from previous health check
device_name = prev_health.device_name if prev_health else None
device_led_count = prev_health.device_led_count if prev_health else None
return DeviceHealth(
online=True,
latency_ms=latency,
last_checked=datetime.utcnow(),
device_name=device_name,
device_led_count=device_led_count,
)
except Exception as e:
return DeviceHealth(
online=False,
last_checked=datetime.utcnow(),
error=str(e),
)

View File

@@ -0,0 +1,132 @@
"""OpenRGB device provider — factory, validation, health checks, discovery."""
import asyncio
from typing import List, Optional, Tuple
from wled_controller.core.devices.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
)
from wled_controller.core.devices.openrgb_client import (
OpenRGBLEDClient,
parse_openrgb_url,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class OpenRGBDeviceProvider(LEDDeviceProvider):
"""Provider for OpenRGB-controlled PC peripherals."""
@property
def device_type(self) -> str:
return "openrgb"
@property
def capabilities(self) -> set:
return {"health_check", "auto_restore", "static_color"}
def create_client(self, url: str, **kwargs) -> LEDClient:
kwargs.pop("led_count", None)
kwargs.pop("baud_rate", None)
kwargs.pop("send_latency_ms", None)
kwargs.pop("rgbw", None)
return OpenRGBLEDClient(url, **kwargs)
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
return await OpenRGBLEDClient.check_health(url, http_client, prev_health)
async def validate_device(self, url: str) -> dict:
"""Validate an OpenRGB device URL by connecting and reading device info.
Returns:
dict with 'led_count' key.
Raises:
Exception on validation failure.
"""
host, port, device_index = parse_openrgb_url(url)
def _validate_sync():
from openrgb import OpenRGBClient
client = OpenRGBClient(host, port, name="WLED Controller (validate)")
try:
devices = client.devices
if device_index >= len(devices):
raise ValueError(
f"Device index {device_index} out of range "
f"(server has {len(devices)} device(s))"
)
device = devices[device_index]
led_count = len(device.leds)
logger.info(
f"OpenRGB device validated: '{device.name}' ({led_count} LEDs)"
)
return {"led_count": led_count}
finally:
client.disconnect()
return await asyncio.to_thread(_validate_sync)
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
"""Discover OpenRGB devices on localhost."""
def _discover_sync():
from openrgb import OpenRGBClient
results = []
try:
client = OpenRGBClient("localhost", 6742, name="WLED Controller (discover)")
except Exception as e:
logger.debug(f"OpenRGB discovery failed (server not running?): {e}")
return results
try:
for idx, device in enumerate(client.devices):
results.append(
DiscoveredDevice(
name=f"{device.name}",
url=f"openrgb://localhost:6742/{idx}",
device_type="openrgb",
ip="localhost",
mac="",
led_count=len(device.leds),
version=None,
)
)
except Exception as e:
logger.warning(f"OpenRGB discovery error enumerating devices: {e}")
finally:
try:
client.disconnect()
except Exception:
pass
return results
return await asyncio.to_thread(_discover_sync)
async def set_color(self, url: str, color: Tuple[int, int, int], **kwargs) -> None:
"""Set all LEDs on the OpenRGB device to a solid color."""
host, port, device_index = parse_openrgb_url(url)
def _set_color_sync():
from openrgb import OpenRGBClient
from openrgb.utils import RGBColor
client = OpenRGBClient(host, port, name="WLED Controller (color)")
try:
devices = client.devices
if device_index >= len(devices):
raise ValueError(f"Device index {device_index} out of range")
device = devices[device_index]
color_obj = RGBColor(color[0], color[1], color[2])
device.set_color(color_obj)
finally:
client.disconnect()
await asyncio.to_thread(_set_color_sync)