diff --git a/server/src/wled_controller/core/devices/openrgb_client.py b/server/src/wled_controller/core/devices/openrgb_client.py index a5f760e..8717ba7 100644 --- a/server/src/wled_controller/core/devices/openrgb_client.py +++ b/server/src/wled_controller/core/devices/openrgb_client.py @@ -2,6 +2,8 @@ import asyncio import socket +import struct +import threading from datetime import datetime from typing import Any, Dict, List, Optional, Tuple, Union @@ -88,6 +90,14 @@ class OpenRGBLEDClient(LEDClient): self._device_name: Optional[str] = None self._device_led_count: Optional[int] = None + # Background sender thread — decouples processing loop from blocking TCP writes + self._send_lock = threading.Lock() + self._send_event = threading.Event() + self._send_pending: Optional[Tuple[np.ndarray, int]] = None # (pixels, brightness) + self._send_thread: Optional[threading.Thread] = None + self._send_stop = threading.Event() + self._last_sent_pixels: Optional[np.ndarray] = None # for change-threshold dedup + async def connect(self) -> bool: """Connect to OpenRGB server and access the target device.""" try: @@ -121,12 +131,32 @@ class OpenRGBLEDClient(LEDClient): target_info = ", ".join( f"{z.name}({len(z.leds)})" for z in self._target_zones ) + # Optimize socket: disable Nagle, reduce timeout + try: + sock = self._client.comms.sock + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.settimeout(2.0) + except Exception as e: + logger.debug(f"Could not optimize OpenRGB socket: {e}") + + # Cache zone IDs for direct packet construction + self._zone_ids: List[int] = [z.id for z in self._target_zones] + logger.info( f"Connected to OpenRGB device '{self._device_name}' " f"(all zones: {all_zone_info}) " f"targeting {target_info} = {self._device_led_count} LEDs " f"at {self._host}:{self._port}/{self._device_index}" ) + + # Start background sender thread + self._send_stop.clear() + self._send_thread = threading.Thread( + target=self._sender_loop, daemon=True, + name=f"orgb-send-{self._host}:{self._port}/{self._device_index}", + ) + self._send_thread.start() + return True except Exception as e: self._connected = False @@ -162,6 +192,13 @@ class OpenRGBLEDClient(LEDClient): async def close(self) -> None: """Disconnect from the OpenRGB server.""" + # Stop sender thread first + self._send_stop.set() + self._send_event.set() # wake it so it can exit + if self._send_thread and self._send_thread.is_alive(): + self._send_thread.join(timeout=2.0) + self._send_thread = None + if self._client is not None: try: await asyncio.to_thread(self._client.disconnect) @@ -205,73 +242,141 @@ class OpenRGBLEDClient(LEDClient): pixels: Union[List[Tuple[int, int, int]], np.ndarray], brightness: int = 255, ) -> None: - """Synchronous fire-and-forget send for the processing hot loop. + """Non-blocking fire-and-forget send for the processing hot loop. - Converts numpy (N,3) array to List[RGBColor] and distributes colors - across target zones using zone-level updateZoneLeds packets. This is - more compatible than device-level updateLeds — some motherboards (e.g. - MSI) only respond to zone-level commands. - - When a zone filter is configured (e.g. openrgb://…/0/JRAINBOW2), - only that zone receives colors; other zones are left untouched. + Enqueues the latest frame into a single-slot buffer and returns + immediately. A background thread picks it up and does the blocking + TCP write to the OpenRGB server, so the asyncio event loop is never + stalled by slow socket operations. """ if not self.is_connected or self._device is None: return - try: - from openrgb.utils import RGBColor + if isinstance(pixels, np.ndarray): + pixel_array = pixels.copy() + else: + pixel_array = np.array(pixels, dtype=np.uint8) - if isinstance(pixels, np.ndarray): - pixel_array = pixels - else: - pixel_array = np.array(pixels, dtype=np.uint8) + with self._send_lock: + self._send_pending = (pixel_array, brightness) + self._send_event.set() - # Apply brightness scaling - if brightness < 255: - pixel_array = (pixel_array.astype(np.uint16) * brightness >> 8).astype(np.uint8) + def _sender_loop(self) -> None: + """Background thread that drains the single-slot buffer.""" + while not self._send_stop.is_set(): + self._send_event.wait() + if self._send_stop.is_set(): + break + self._send_event.clear() - # Truncate or pad to match target LED count - n_target = self._device_led_count - n_pixels = len(pixel_array) - if n_pixels > n_target: - pixel_array = pixel_array[:n_target] + with self._send_lock: + pending = self._send_pending + self._send_pending = None + if pending is None: + continue - # Separate mode: resample full pixel array independently per zone - if self._zone_mode == "separate" and len(self._target_zones) > 1: - n_src = len(pixel_array) - if n_src < 2: - # Single pixel — replicate to all zones - c = pixel_array[0] if n_src == 1 else np.array([0, 0, 0], dtype=np.uint8) - color_obj = RGBColor(int(c[0]), int(c[1]), int(c[2])) - for zone, zone_size in zip(self._target_zones, self._zone_sizes): - zone.set_colors([color_obj] * zone_size, fast=True) - else: - src_indices = np.linspace(0, 1, n_src) - for zone, zone_size in zip(self._target_zones, self._zone_sizes): - dst_indices = np.linspace(0, 1, zone_size) - resampled = np.column_stack([ - np.interp(dst_indices, src_indices, pixel_array[:, ch]) - for ch in range(3) - ]).astype(np.uint8) - colors = [RGBColor(int(r), int(g), int(b)) for r, g, b in resampled] - zone.set_colors(colors, fast=True) + pixel_array, brightness = pending + try: + self._do_send(pixel_array, brightness) + except Exception as e: + logger.error(f"OpenRGB sender thread failed: {e}") + self._connected = False + + def _do_send(self, pixel_array: np.ndarray, brightness: int) -> None: + """Actual blocking send — runs in the sender thread. + + Builds raw OpenRGB UpdateZoneLeds packets directly with struct.pack, + bypassing RGBColor object creation to avoid GC pressure. + """ + # Apply brightness scaling + if brightness < 255: + pixel_array = (pixel_array.astype(np.uint16) * brightness >> 8).astype(np.uint8) + + # Truncate to match target LED count + n_target = self._device_led_count + if len(pixel_array) > n_target: + pixel_array = pixel_array[:n_target] + + # Change-threshold dedup — skip if average per-LED color change < 2 + # GPU I2C/SMBus writes cause system-wide stalls; minimizing writes is critical. + if self._last_sent_pixels is not None and self._last_sent_pixels.shape == pixel_array.shape: + diff = np.mean(np.abs(pixel_array.astype(np.int16) - self._last_sent_pixels.astype(np.int16))) + if diff < 2.0: return + self._last_sent_pixels = pixel_array.copy() - # Combined mode: distribute pixels sequentially across zones - colors = [RGBColor(int(r), int(g), int(b)) for r, g, b in pixel_array] + # Separate mode: resample full pixel array independently per zone + if self._zone_mode == "separate" and len(self._target_zones) > 1: + n_src = len(pixel_array) + if n_src < 2: + c = pixel_array[0] if n_src == 1 else np.array([0, 0, 0], dtype=np.uint8) + for zone_id, zone_size in zip(self._zone_ids, self._zone_sizes): + self._send_zone_raw(zone_id, np.tile(c, (zone_size, 1))) + else: + src_indices = np.linspace(0, 1, n_src) + for zone_id, zone_size in zip(self._zone_ids, self._zone_sizes): + dst_indices = np.linspace(0, 1, zone_size) + resampled = np.column_stack([ + np.interp(dst_indices, src_indices, pixel_array[:, ch]) + for ch in range(3) + ]).astype(np.uint8) + self._send_zone_raw(zone_id, resampled) + return - # Pad with black if fewer pixels than target LEDs - if len(colors) < n_target: - colors.extend([RGBColor(0, 0, 0)] * (n_target - len(colors))) + # Combined mode: distribute pixels sequentially across zones + # Pad with black if fewer pixels than target LEDs + if len(pixel_array) < n_target: + pixel_array = np.vstack([ + pixel_array, + np.zeros((n_target - len(pixel_array), 3), dtype=np.uint8), + ]) - offset = 0 - for zone, zone_size in zip(self._target_zones, self._zone_sizes): - zone_colors = colors[offset:offset + zone_size] - zone.set_colors(zone_colors, fast=True) - offset += zone_size - except Exception as e: - logger.error(f"OpenRGB send_pixels_fast failed: {e}") + offset = 0 + for zone_id, zone_size in zip(self._zone_ids, self._zone_sizes): + self._send_zone_raw(zone_id, pixel_array[offset:offset + zone_size]) + offset += zone_size + + def _send_zone_raw(self, zone_id: int, colors: np.ndarray) -> None: + """Send UpdateZoneLeds packet directly via the library's socket. + + Constructs the raw binary packet instead of creating RGBColor objects, + avoiding hundreds of Python object allocations per frame. + """ + from openrgb.utils import PacketType + + n_leds = len(colors) + # Build zone LED data: data_size(u32) + zone_id(i32) + num_colors(u16) + colors(r,g,b,pad)*n + color_bytes = np.column_stack([ + colors, + np.zeros(n_leds, dtype=np.uint8), # padding byte per LED + ]).tobytes() + zone_data = struct.pack("iH", zone_id, n_leds) + color_bytes + data_with_size = struct.pack("I", len(zone_data) + 4) + zone_data + packet_size = len(data_with_size) + + # Use the library's comms lock + socket directly + comms = self._client.comms + if not comms.connected: self._connected = False + return + + if not comms.lock.acquire(timeout=2): + logger.warning("OpenRGB lock timeout — skipping frame") + return + try: + header = struct.pack( + 'ccccIII', b'O', b'R', b'G', b'B', + self._device_index, + PacketType.RGBCONTROLLER_UPDATEZONELEDS, + packet_size, + ) + comms.sock.sendall(header + data_with_size) + except Exception: + comms.stop_connection() + self._connected = False + raise + finally: + comms.lock.release() async def snapshot_device_state(self) -> Optional[dict]: """Save the active mode index before streaming."""