Optimize OpenRGB client: background sender thread, raw packets, change-threshold dedup
- Decouple processing loop from blocking TCP writes via single-slot buffer sender thread - Build raw UpdateZoneLeds packets with struct.pack instead of RGBColor objects (reduces GC pressure) - Add change-threshold frame dedup to minimize GPU I2C/SMBus writes that cause system stalls - Set TCP_NODELAY and reduce socket timeout for lower latency - Cache zone IDs for direct packet construction Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,6 +2,8 @@
|
||||
|
||||
import asyncio
|
||||
import socket
|
||||
import struct
|
||||
import threading
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
|
||||
@@ -88,6 +90,14 @@ class OpenRGBLEDClient(LEDClient):
|
||||
self._device_name: Optional[str] = None
|
||||
self._device_led_count: Optional[int] = None
|
||||
|
||||
# Background sender thread — decouples processing loop from blocking TCP writes
|
||||
self._send_lock = threading.Lock()
|
||||
self._send_event = threading.Event()
|
||||
self._send_pending: Optional[Tuple[np.ndarray, int]] = None # (pixels, brightness)
|
||||
self._send_thread: Optional[threading.Thread] = None
|
||||
self._send_stop = threading.Event()
|
||||
self._last_sent_pixels: Optional[np.ndarray] = None # for change-threshold dedup
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Connect to OpenRGB server and access the target device."""
|
||||
try:
|
||||
@@ -121,12 +131,32 @@ class OpenRGBLEDClient(LEDClient):
|
||||
target_info = ", ".join(
|
||||
f"{z.name}({len(z.leds)})" for z in self._target_zones
|
||||
)
|
||||
# Optimize socket: disable Nagle, reduce timeout
|
||||
try:
|
||||
sock = self._client.comms.sock
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
sock.settimeout(2.0)
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not optimize OpenRGB socket: {e}")
|
||||
|
||||
# Cache zone IDs for direct packet construction
|
||||
self._zone_ids: List[int] = [z.id for z in self._target_zones]
|
||||
|
||||
logger.info(
|
||||
f"Connected to OpenRGB device '{self._device_name}' "
|
||||
f"(all zones: {all_zone_info}) "
|
||||
f"targeting {target_info} = {self._device_led_count} LEDs "
|
||||
f"at {self._host}:{self._port}/{self._device_index}"
|
||||
)
|
||||
|
||||
# Start background sender thread
|
||||
self._send_stop.clear()
|
||||
self._send_thread = threading.Thread(
|
||||
target=self._sender_loop, daemon=True,
|
||||
name=f"orgb-send-{self._host}:{self._port}/{self._device_index}",
|
||||
)
|
||||
self._send_thread.start()
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
self._connected = False
|
||||
@@ -162,6 +192,13 @@ class OpenRGBLEDClient(LEDClient):
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Disconnect from the OpenRGB server."""
|
||||
# Stop sender thread first
|
||||
self._send_stop.set()
|
||||
self._send_event.set() # wake it so it can exit
|
||||
if self._send_thread and self._send_thread.is_alive():
|
||||
self._send_thread.join(timeout=2.0)
|
||||
self._send_thread = None
|
||||
|
||||
if self._client is not None:
|
||||
try:
|
||||
await asyncio.to_thread(self._client.disconnect)
|
||||
@@ -205,73 +242,141 @@ class OpenRGBLEDClient(LEDClient):
|
||||
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
|
||||
brightness: int = 255,
|
||||
) -> None:
|
||||
"""Synchronous fire-and-forget send for the processing hot loop.
|
||||
"""Non-blocking fire-and-forget send for the processing hot loop.
|
||||
|
||||
Converts numpy (N,3) array to List[RGBColor] and distributes colors
|
||||
across target zones using zone-level updateZoneLeds packets. This is
|
||||
more compatible than device-level updateLeds — some motherboards (e.g.
|
||||
MSI) only respond to zone-level commands.
|
||||
|
||||
When a zone filter is configured (e.g. openrgb://…/0/JRAINBOW2),
|
||||
only that zone receives colors; other zones are left untouched.
|
||||
Enqueues the latest frame into a single-slot buffer and returns
|
||||
immediately. A background thread picks it up and does the blocking
|
||||
TCP write to the OpenRGB server, so the asyncio event loop is never
|
||||
stalled by slow socket operations.
|
||||
"""
|
||||
if not self.is_connected or self._device is None:
|
||||
return
|
||||
|
||||
try:
|
||||
from openrgb.utils import RGBColor
|
||||
|
||||
if isinstance(pixels, np.ndarray):
|
||||
pixel_array = pixels
|
||||
pixel_array = pixels.copy()
|
||||
else:
|
||||
pixel_array = np.array(pixels, dtype=np.uint8)
|
||||
|
||||
with self._send_lock:
|
||||
self._send_pending = (pixel_array, brightness)
|
||||
self._send_event.set()
|
||||
|
||||
def _sender_loop(self) -> None:
|
||||
"""Background thread that drains the single-slot buffer."""
|
||||
while not self._send_stop.is_set():
|
||||
self._send_event.wait()
|
||||
if self._send_stop.is_set():
|
||||
break
|
||||
self._send_event.clear()
|
||||
|
||||
with self._send_lock:
|
||||
pending = self._send_pending
|
||||
self._send_pending = None
|
||||
if pending is None:
|
||||
continue
|
||||
|
||||
pixel_array, brightness = pending
|
||||
try:
|
||||
self._do_send(pixel_array, brightness)
|
||||
except Exception as e:
|
||||
logger.error(f"OpenRGB sender thread failed: {e}")
|
||||
self._connected = False
|
||||
|
||||
def _do_send(self, pixel_array: np.ndarray, brightness: int) -> None:
|
||||
"""Actual blocking send — runs in the sender thread.
|
||||
|
||||
Builds raw OpenRGB UpdateZoneLeds packets directly with struct.pack,
|
||||
bypassing RGBColor object creation to avoid GC pressure.
|
||||
"""
|
||||
# Apply brightness scaling
|
||||
if brightness < 255:
|
||||
pixel_array = (pixel_array.astype(np.uint16) * brightness >> 8).astype(np.uint8)
|
||||
|
||||
# Truncate or pad to match target LED count
|
||||
# Truncate to match target LED count
|
||||
n_target = self._device_led_count
|
||||
n_pixels = len(pixel_array)
|
||||
if n_pixels > n_target:
|
||||
if len(pixel_array) > n_target:
|
||||
pixel_array = pixel_array[:n_target]
|
||||
|
||||
# Change-threshold dedup — skip if average per-LED color change < 2
|
||||
# GPU I2C/SMBus writes cause system-wide stalls; minimizing writes is critical.
|
||||
if self._last_sent_pixels is not None and self._last_sent_pixels.shape == pixel_array.shape:
|
||||
diff = np.mean(np.abs(pixel_array.astype(np.int16) - self._last_sent_pixels.astype(np.int16)))
|
||||
if diff < 2.0:
|
||||
return
|
||||
self._last_sent_pixels = pixel_array.copy()
|
||||
|
||||
# Separate mode: resample full pixel array independently per zone
|
||||
if self._zone_mode == "separate" and len(self._target_zones) > 1:
|
||||
n_src = len(pixel_array)
|
||||
if n_src < 2:
|
||||
# Single pixel — replicate to all zones
|
||||
c = pixel_array[0] if n_src == 1 else np.array([0, 0, 0], dtype=np.uint8)
|
||||
color_obj = RGBColor(int(c[0]), int(c[1]), int(c[2]))
|
||||
for zone, zone_size in zip(self._target_zones, self._zone_sizes):
|
||||
zone.set_colors([color_obj] * zone_size, fast=True)
|
||||
for zone_id, zone_size in zip(self._zone_ids, self._zone_sizes):
|
||||
self._send_zone_raw(zone_id, np.tile(c, (zone_size, 1)))
|
||||
else:
|
||||
src_indices = np.linspace(0, 1, n_src)
|
||||
for zone, zone_size in zip(self._target_zones, self._zone_sizes):
|
||||
for zone_id, zone_size in zip(self._zone_ids, self._zone_sizes):
|
||||
dst_indices = np.linspace(0, 1, zone_size)
|
||||
resampled = np.column_stack([
|
||||
np.interp(dst_indices, src_indices, pixel_array[:, ch])
|
||||
for ch in range(3)
|
||||
]).astype(np.uint8)
|
||||
colors = [RGBColor(int(r), int(g), int(b)) for r, g, b in resampled]
|
||||
zone.set_colors(colors, fast=True)
|
||||
self._send_zone_raw(zone_id, resampled)
|
||||
return
|
||||
|
||||
# Combined mode: distribute pixels sequentially across zones
|
||||
colors = [RGBColor(int(r), int(g), int(b)) for r, g, b in pixel_array]
|
||||
|
||||
# Pad with black if fewer pixels than target LEDs
|
||||
if len(colors) < n_target:
|
||||
colors.extend([RGBColor(0, 0, 0)] * (n_target - len(colors)))
|
||||
if len(pixel_array) < n_target:
|
||||
pixel_array = np.vstack([
|
||||
pixel_array,
|
||||
np.zeros((n_target - len(pixel_array), 3), dtype=np.uint8),
|
||||
])
|
||||
|
||||
offset = 0
|
||||
for zone, zone_size in zip(self._target_zones, self._zone_sizes):
|
||||
zone_colors = colors[offset:offset + zone_size]
|
||||
zone.set_colors(zone_colors, fast=True)
|
||||
for zone_id, zone_size in zip(self._zone_ids, self._zone_sizes):
|
||||
self._send_zone_raw(zone_id, pixel_array[offset:offset + zone_size])
|
||||
offset += zone_size
|
||||
except Exception as e:
|
||||
logger.error(f"OpenRGB send_pixels_fast failed: {e}")
|
||||
|
||||
def _send_zone_raw(self, zone_id: int, colors: np.ndarray) -> None:
|
||||
"""Send UpdateZoneLeds packet directly via the library's socket.
|
||||
|
||||
Constructs the raw binary packet instead of creating RGBColor objects,
|
||||
avoiding hundreds of Python object allocations per frame.
|
||||
"""
|
||||
from openrgb.utils import PacketType
|
||||
|
||||
n_leds = len(colors)
|
||||
# Build zone LED data: data_size(u32) + zone_id(i32) + num_colors(u16) + colors(r,g,b,pad)*n
|
||||
color_bytes = np.column_stack([
|
||||
colors,
|
||||
np.zeros(n_leds, dtype=np.uint8), # padding byte per LED
|
||||
]).tobytes()
|
||||
zone_data = struct.pack("iH", zone_id, n_leds) + color_bytes
|
||||
data_with_size = struct.pack("I", len(zone_data) + 4) + zone_data
|
||||
packet_size = len(data_with_size)
|
||||
|
||||
# Use the library's comms lock + socket directly
|
||||
comms = self._client.comms
|
||||
if not comms.connected:
|
||||
self._connected = False
|
||||
return
|
||||
|
||||
if not comms.lock.acquire(timeout=2):
|
||||
logger.warning("OpenRGB lock timeout — skipping frame")
|
||||
return
|
||||
try:
|
||||
header = struct.pack(
|
||||
'ccccIII', b'O', b'R', b'G', b'B',
|
||||
self._device_index,
|
||||
PacketType.RGBCONTROLLER_UPDATEZONELEDS,
|
||||
packet_size,
|
||||
)
|
||||
comms.sock.sendall(header + data_with_size)
|
||||
except Exception:
|
||||
comms.stop_connection()
|
||||
self._connected = False
|
||||
raise
|
||||
finally:
|
||||
comms.lock.release()
|
||||
|
||||
async def snapshot_device_state(self) -> Optional[dict]:
|
||||
"""Save the active mode index before streaming."""
|
||||
|
||||
Reference in New Issue
Block a user