Add OpenRGB per-zone LED control with separate/combined modes and zone preview

- Zone picker UI in device add/settings modals with per-zone checkbox selection
- Combined mode: pixels distributed sequentially across zones
- Separate mode: full effect resampled independently to each zone via linear interpolation
- Per-zone LED preview in target cards: one canvas strip per zone with hover overlay labels
- Zone badges on device cards enriched with actual LED counts from OpenRGB API
- Fix stale led_count by using device_led_count discovered at connect time

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-01 20:35:51 +03:00
parent aafcf83896
commit 52ee4bdeb6
19 changed files with 769 additions and 55 deletions

View File

@@ -101,6 +101,11 @@ class LEDClient(ABC):
"""
raise NotImplementedError("send_pixels_fast not supported for this device type")
@property
def device_led_count(self) -> Optional[int]:
"""Actual LED count discovered after connect(). None if not available."""
return None
async def snapshot_device_state(self) -> Optional[dict]:
"""Snapshot device state before streaming starts.

View File

@@ -13,20 +13,31 @@ from wled_controller.utils import get_logger
logger = get_logger(__name__)
def parse_openrgb_url(url: str) -> Tuple[str, int, int]:
"""Parse an openrgb:// URL into (host, port, device_index).
def parse_openrgb_url(url: str) -> Tuple[str, int, int, List[str]]:
"""Parse an openrgb:// URL into (host, port, device_index, zone_names).
Format: openrgb://host:port/device_index
Defaults: host=localhost, port=6742, device_index=0
Format: openrgb://host:port/device_index[/zone1+zone2+...]
Defaults: host=localhost, port=6742, device_index=0, zone_names=[]
When *zone_names* is non-empty, only LEDs in those zones are addressed.
Multiple zones are separated by ``+``.
"""
zones_str: Optional[str] = None
if url.startswith("openrgb://"):
url = url[len("openrgb://"):]
else:
return ("localhost", 6742, 0)
return ("localhost", 6742, 0, [])
# Split path from host:port
if "/" in url:
host_port, index_str = url.split("/", 1)
host_port, path = url.split("/", 1)
# path may be "0" or "0/JRAINBOW2" or "0/JRAINBOW1+JRAINBOW2"
if "/" in path:
index_str, zones_str = path.split("/", 1)
zones_str = zones_str.strip() or None
else:
index_str = path
try:
device_index = int(index_str)
except ValueError:
@@ -46,7 +57,11 @@ def parse_openrgb_url(url: str) -> Tuple[str, int, int]:
host = host_port if host_port else "localhost"
port = 6742
return (host, port, device_index)
zone_names: List[str] = []
if zones_str:
zone_names = [z.strip() for z in zones_str.split("+") if z.strip()]
return (host, port, device_index, zone_names)
class OpenRGBLEDClient(LEDClient):
@@ -61,10 +76,12 @@ class OpenRGBLEDClient(LEDClient):
def __init__(self, url: str, **kwargs):
self._url = url
host, port, device_index = parse_openrgb_url(url)
host, port, device_index, zone_names = parse_openrgb_url(url)
self._host = host
self._port = port
self._device_index = device_index
self._zone_filters: List[str] = zone_names # e.g. ["JRAINBOW2"]
self._zone_mode: str = kwargs.pop("zone_mode", "combined")
self._client: Any = None # openrgb.OpenRGBClient
self._device: Any = None # openrgb.Device
self._connected = False
@@ -77,10 +94,38 @@ class OpenRGBLEDClient(LEDClient):
self._client, self._device = await asyncio.to_thread(self._connect_sync)
self._connected = True
self._device_name = self._device.name
self._device_led_count = len(self._device.leds)
# Resolve zone filter → target zones list
all_zone_info = ", ".join(
f"{z.name}({len(z.leds)})" for z in self._device.zones
)
if self._zone_filters:
# Case-insensitive match by name(s)
filt_set = {n.lower() for n in self._zone_filters}
self._target_zones = [
z for z in self._device.zones
if z.name.lower() in filt_set
]
if not self._target_zones:
avail = [z.name for z in self._device.zones]
raise ValueError(
f"Zone(s) {self._zone_filters} not found on device "
f"'{self._device.name}'. Available zones: {avail}"
)
else:
self._target_zones = list(self._device.zones)
self._zone_sizes: List[int] = [len(z.leds) for z in self._target_zones]
self._device_led_count = sum(self._zone_sizes)
target_info = ", ".join(
f"{z.name}({len(z.leds)})" for z in self._target_zones
)
logger.info(
f"Connected to OpenRGB device '{self._device_name}' "
f"({self._device_led_count} LEDs) at {self._host}:{self._port}/{self._device_index}"
f"(all zones: {all_zone_info}) "
f"targeting {target_info} = {self._device_led_count} LEDs "
f"at {self._host}:{self._port}/{self._device_index}"
)
return True
except Exception as e:
@@ -131,6 +176,10 @@ class OpenRGBLEDClient(LEDClient):
def is_connected(self) -> bool:
return self._connected and self._client is not None
@property
def device_led_count(self) -> Optional[int]:
return self._device_led_count
async def send_pixels(
self,
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
@@ -158,8 +207,13 @@ class OpenRGBLEDClient(LEDClient):
) -> None:
"""Synchronous fire-and-forget send for the processing hot loop.
Converts numpy (N,3) array to List[RGBColor] and calls
device.set_colors(colors, fast=True) to skip the re-fetch round trip.
Converts numpy (N,3) array to List[RGBColor] and distributes colors
across target zones using zone-level updateZoneLeds packets. This is
more compatible than device-level updateLeds — some motherboards (e.g.
MSI) only respond to zone-level commands.
When a zone filter is configured (e.g. openrgb://…/0/JRAINBOW2),
only that zone receives colors; other zones are left untouched.
"""
if not self.is_connected or self._device is None:
return
@@ -176,19 +230,45 @@ class OpenRGBLEDClient(LEDClient):
if brightness < 255:
pixel_array = (pixel_array.astype(np.uint16) * brightness >> 8).astype(np.uint8)
# Truncate or pad to match device LED count
n_device = len(self._device.leds)
# Truncate or pad to match target LED count
n_target = self._device_led_count
n_pixels = len(pixel_array)
if n_pixels > n_device:
pixel_array = pixel_array[:n_device]
if n_pixels > n_target:
pixel_array = pixel_array[:n_target]
# Separate mode: resample full pixel array independently per zone
if self._zone_mode == "separate" and len(self._target_zones) > 1:
n_src = len(pixel_array)
if n_src < 2:
# Single pixel — replicate to all zones
c = pixel_array[0] if n_src == 1 else np.array([0, 0, 0], dtype=np.uint8)
color_obj = RGBColor(int(c[0]), int(c[1]), int(c[2]))
for zone, zone_size in zip(self._target_zones, self._zone_sizes):
zone.set_colors([color_obj] * zone_size, fast=True)
else:
src_indices = np.linspace(0, 1, n_src)
for zone, zone_size in zip(self._target_zones, self._zone_sizes):
dst_indices = np.linspace(0, 1, zone_size)
resampled = np.column_stack([
np.interp(dst_indices, src_indices, pixel_array[:, ch])
for ch in range(3)
]).astype(np.uint8)
colors = [RGBColor(int(r), int(g), int(b)) for r, g, b in resampled]
zone.set_colors(colors, fast=True)
return
# Combined mode: distribute pixels sequentially across zones
colors = [RGBColor(int(r), int(g), int(b)) for r, g, b in pixel_array]
# Pad with black if fewer pixels than device LEDs
if len(colors) < n_device:
colors.extend([RGBColor(0, 0, 0)] * (n_device - len(colors)))
# Pad with black if fewer pixels than target LEDs
if len(colors) < n_target:
colors.extend([RGBColor(0, 0, 0)] * (n_target - len(colors)))
self._device.set_colors(colors, fast=True)
offset = 0
for zone, zone_size in zip(self._target_zones, self._zone_sizes):
zone_colors = colors[offset:offset + zone_size]
zone.set_colors(zone_colors, fast=True)
offset += zone_size
except Exception as e:
logger.error(f"OpenRGB send_pixels_fast failed: {e}")
self._connected = False
@@ -227,7 +307,7 @@ class OpenRGBLEDClient(LEDClient):
Uses a lightweight socket probe instead of full library init to avoid
re-downloading all device data every health check cycle (~30s).
"""
host, port, device_index = parse_openrgb_url(url)
host, port, device_index, _zones = parse_openrgb_url(url)
start = asyncio.get_event_loop().time()
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

View File

@@ -30,11 +30,12 @@ class OpenRGBDeviceProvider(LEDDeviceProvider):
return {"health_check", "auto_restore", "static_color"}
def create_client(self, url: str, **kwargs) -> LEDClient:
zone_mode = kwargs.pop("zone_mode", "combined")
kwargs.pop("led_count", None)
kwargs.pop("baud_rate", None)
kwargs.pop("send_latency_ms", None)
kwargs.pop("rgbw", None)
return OpenRGBLEDClient(url, **kwargs)
return OpenRGBLEDClient(url, zone_mode=zone_mode, **kwargs)
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
return await OpenRGBLEDClient.check_health(url, http_client, prev_health)
@@ -48,7 +49,7 @@ class OpenRGBDeviceProvider(LEDDeviceProvider):
Raises:
Exception on validation failure.
"""
host, port, device_index = parse_openrgb_url(url)
host, port, device_index, zone_names = parse_openrgb_url(url)
def _validate_sync():
from openrgb import OpenRGBClient
@@ -62,10 +63,26 @@ class OpenRGBDeviceProvider(LEDDeviceProvider):
f"(server has {len(devices)} device(s))"
)
device = devices[device_index]
led_count = len(device.leds)
logger.info(
f"OpenRGB device validated: '{device.name}' ({led_count} LEDs)"
)
if zone_names:
filt_set = {n.lower() for n in zone_names}
matching = [z for z in device.zones if z.name.lower() in filt_set]
if not matching:
avail = [z.name for z in device.zones]
raise ValueError(
f"Zone(s) {zone_names} not found on '{device.name}'. "
f"Available zones: {avail}"
)
led_count = sum(len(z.leds) for z in matching)
logger.info(
f"OpenRGB device validated: '{device.name}' "
f"zones {zone_names} ({led_count} LEDs)"
)
else:
led_count = len(device.leds)
logger.info(
f"OpenRGB device validated: '{device.name}' ({led_count} LEDs)"
)
return {"led_count": led_count}
finally:
client.disconnect()
@@ -111,8 +128,8 @@ class OpenRGBDeviceProvider(LEDDeviceProvider):
return await asyncio.to_thread(_discover_sync)
async def set_color(self, url: str, color: Tuple[int, int, int], **kwargs) -> None:
"""Set all LEDs on the OpenRGB device to a solid color."""
host, port, device_index = parse_openrgb_url(url)
"""Set all LEDs on the OpenRGB device (or target zone) to a solid color."""
host, port, device_index, zone_names = parse_openrgb_url(url)
def _set_color_sync():
from openrgb import OpenRGBClient
@@ -125,7 +142,15 @@ class OpenRGBDeviceProvider(LEDDeviceProvider):
raise ValueError(f"Device index {device_index} out of range")
device = devices[device_index]
color_obj = RGBColor(color[0], color[1], color[2])
device.set_color(color_obj)
# Use zone-level calls for better motherboard compatibility
if zone_names:
filt_set = {n.lower() for n in zone_names}
for zone in device.zones:
if zone.name.lower() in filt_set:
zone.set_color(color_obj)
else:
for zone in device.zones:
zone.set_color(color_obj)
finally:
client.disconnect()