From cc87fba0dde9d9a6b1fc7251f97aca07faefb54d Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Sat, 16 May 2026 04:14:36 +0300 Subject: [PATCH] refactor(devices): extract _average_color to pixel_reduce Six single-pixel LED clients (Yeelight, WiZ, LIFX, Govee, Nanoleaf, BLE) had byte-for-byte identical local copies of the strip-averaging helper. Consolidates into core/devices/pixel_reduce.average_color so the next single-pixel driver can drop the local copy and so behavior changes land in one place. Hue is intentionally left out -- its Entertainment API addresses up to seven lights individually rather than averaging. Behavior is byte-identical (each call site re-imports under the same underscore-prefixed local name). 1358 tests still pass. --- server/src/ledgrab/core/devices/ble_client.py | 20 +-------- .../src/ledgrab/core/devices/govee_client.py | 22 +--------- .../src/ledgrab/core/devices/lifx_client.py | 22 +--------- .../ledgrab/core/devices/nanoleaf_client.py | 22 +--------- .../src/ledgrab/core/devices/pixel_reduce.py | 42 +++++++++++++++++++ server/src/ledgrab/core/devices/wiz_client.py | 22 +--------- .../ledgrab/core/devices/yeelight_client.py | 22 +--------- 7 files changed, 48 insertions(+), 124 deletions(-) create mode 100644 server/src/ledgrab/core/devices/pixel_reduce.py diff --git a/server/src/ledgrab/core/devices/ble_client.py b/server/src/ledgrab/core/devices/ble_client.py index 2e982d8..729a32c 100644 --- a/server/src/ledgrab/core/devices/ble_client.py +++ b/server/src/ledgrab/core/devices/ble_client.py @@ -24,6 +24,7 @@ import numpy as np from ledgrab.core.devices.ble_protocols import BLEProtocol, get_protocol from ledgrab.core.devices.ble_transport import make_transport from ledgrab.core.devices.led_client import DeviceHealth, LEDClient +from ledgrab.core.devices.pixel_reduce import average_color as _average_color from ledgrab.utils import get_logger logger = get_logger(__name__) @@ -63,25 +64,6 @@ def _strip_ble_scheme(url: str) -> str: return url.strip("/") -def _average_color(pixels: Union[List[Tuple[int, int, int]], np.ndarray]) -> Tuple[int, int, int]: - """Reduce an N-pixel strip to one average RGB.""" - if isinstance(pixels, np.ndarray): - if pixels.size == 0: - return (0, 0, 0) - arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3) - mean = arr.mean(axis=0) - return int(mean[0]), int(mean[1]), int(mean[2]) - if not pixels: - return (0, 0, 0) - total_r = total_g = total_b = 0 - for r, g, b in pixels: - total_r += r - total_g += g - total_b += b - n = len(pixels) - return total_r // n, total_g // n, total_b // n - - class BLEClient(LEDClient): """LED client for BLE controllers speaking one of the registered protocols. diff --git a/server/src/ledgrab/core/devices/govee_client.py b/server/src/ledgrab/core/devices/govee_client.py index a862f61..b892b7f 100644 --- a/server/src/ledgrab/core/devices/govee_client.py +++ b/server/src/ledgrab/core/devices/govee_client.py @@ -27,6 +27,7 @@ from urllib.parse import urlparse import numpy as np from ledgrab.core.devices.led_client import DeviceHealth, LEDClient +from ledgrab.core.devices.pixel_reduce import average_color as _average_color from ledgrab.utils import get_logger logger = get_logger(__name__) @@ -58,27 +59,6 @@ def parse_govee_url(url: str) -> str: return host -def _average_color( - pixels: Union[List[Tuple[int, int, int]], np.ndarray], -) -> Tuple[int, int, int]: - """Reduce an N-pixel strip to one average RGB triple.""" - if isinstance(pixels, np.ndarray): - if pixels.size == 0: - return (0, 0, 0) - arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3) - mean = arr.mean(axis=0) - return int(mean[0]), int(mean[1]), int(mean[2]) - if not pixels: - return (0, 0, 0) - total_r = total_g = total_b = 0 - for r, g, b in pixels: - total_r += r - total_g += g - total_b += b - n = len(pixels) - return total_r // n, total_g // n, total_b // n - - class _GoveeProtocol(asyncio.DatagramProtocol): """Write-only datagram protocol. Bulb replies (on 4002) are not collected here.""" diff --git a/server/src/ledgrab/core/devices/lifx_client.py b/server/src/ledgrab/core/devices/lifx_client.py index 21ad622..0a34948 100644 --- a/server/src/ledgrab/core/devices/lifx_client.py +++ b/server/src/ledgrab/core/devices/lifx_client.py @@ -28,6 +28,7 @@ from urllib.parse import urlparse import numpy as np from ledgrab.core.devices.led_client import DeviceHealth, LEDClient +from ledgrab.core.devices.pixel_reduce import average_color as _average_color from ledgrab.utils import get_logger logger = get_logger(__name__) @@ -66,27 +67,6 @@ def parse_lifx_url(url: str) -> Tuple[str, int]: return host, port -def _average_color( - pixels: Union[List[Tuple[int, int, int]], np.ndarray], -) -> Tuple[int, int, int]: - """Reduce an N-pixel strip to one average RGB triple.""" - if isinstance(pixels, np.ndarray): - if pixels.size == 0: - return (0, 0, 0) - arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3) - mean = arr.mean(axis=0) - return int(mean[0]), int(mean[1]), int(mean[2]) - if not pixels: - return (0, 0, 0) - total_r = total_g = total_b = 0 - for r, g, b in pixels: - total_r += r - total_g += g - total_b += b - n = len(pixels) - return total_r // n, total_g // n, total_b // n - - def rgb_to_hsbk(r: int, g: int, b: int, kelvin: int = 3500) -> Tuple[int, int, int, int]: """Convert 8-bit RGB to LIFX 16-bit HSBK. diff --git a/server/src/ledgrab/core/devices/nanoleaf_client.py b/server/src/ledgrab/core/devices/nanoleaf_client.py index 4d67264..04a68a6 100644 --- a/server/src/ledgrab/core/devices/nanoleaf_client.py +++ b/server/src/ledgrab/core/devices/nanoleaf_client.py @@ -31,6 +31,7 @@ import httpx import numpy as np from ledgrab.core.devices.led_client import DeviceHealth, LEDClient, PairingNotReady +from ledgrab.core.devices.pixel_reduce import average_color as _average_color from ledgrab.utils import get_logger logger = get_logger(__name__) @@ -60,27 +61,6 @@ def parse_nanoleaf_url(url: str) -> str: return host -def _average_color( - pixels: Union[List[Tuple[int, int, int]], np.ndarray], -) -> Tuple[int, int, int]: - """Reduce an N-pixel strip to one average RGB triple.""" - if isinstance(pixels, np.ndarray): - if pixels.size == 0: - return (0, 0, 0) - arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3) - mean = arr.mean(axis=0) - return int(mean[0]), int(mean[1]), int(mean[2]) - if not pixels: - return (0, 0, 0) - total_r = total_g = total_b = 0 - for r, g, b in pixels: - total_r += r - total_g += g - total_b += b - n = len(pixels) - return total_r // n, total_g // n, total_b // n - - def rgb_to_hsb(r: int, g: int, b: int) -> Tuple[int, int, int]: """Convert 8-bit RGB to Nanoleaf HSB (hue 0-360, sat 0-100, bri 0-100). diff --git a/server/src/ledgrab/core/devices/pixel_reduce.py b/server/src/ledgrab/core/devices/pixel_reduce.py new file mode 100644 index 0000000..9d3ccdd --- /dev/null +++ b/server/src/ledgrab/core/devices/pixel_reduce.py @@ -0,0 +1,42 @@ +"""Pixel-reduction helpers shared across single-pixel LED clients. + +Single-pixel devices (Yeelight, WiZ, LIFX, Govee, Nanoleaf, BLE bulbs) +all need to collapse an N-pixel strip down to one RGB triple before +sending it on the wire. This module is the single home for that reduction +so the next single-pixel driver can drop the local copy. + +Hue is the exception — its Entertainment API addresses up to seven +zones individually, so it doesn't reduce. +""" + +from __future__ import annotations + +from typing import List, Tuple, Union + +import numpy as np + + +def average_color( + pixels: Union[List[Tuple[int, int, int]], np.ndarray], +) -> Tuple[int, int, int]: + """Reduce an N-pixel strip to one average RGB triple. + + Accepts either a list of ``(r, g, b)`` tuples or an ``(N, 3)`` uint8 + numpy array. Empty inputs return ``(0, 0, 0)`` rather than raising so + callers can pass through a degenerate frame without branching. + """ + if isinstance(pixels, np.ndarray): + if pixels.size == 0: + return (0, 0, 0) + arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3) + mean = arr.mean(axis=0) + return int(mean[0]), int(mean[1]), int(mean[2]) + if not pixels: + return (0, 0, 0) + total_r = total_g = total_b = 0 + for r, g, b in pixels: + total_r += r + total_g += g + total_b += b + n = len(pixels) + return total_r // n, total_g // n, total_b // n diff --git a/server/src/ledgrab/core/devices/wiz_client.py b/server/src/ledgrab/core/devices/wiz_client.py index ef25367..1cde22c 100644 --- a/server/src/ledgrab/core/devices/wiz_client.py +++ b/server/src/ledgrab/core/devices/wiz_client.py @@ -23,6 +23,7 @@ from urllib.parse import urlparse import numpy as np from ledgrab.core.devices.led_client import DeviceHealth, LEDClient +from ledgrab.core.devices.pixel_reduce import average_color as _average_color from ledgrab.utils import get_logger logger = get_logger(__name__) @@ -49,27 +50,6 @@ def parse_wiz_url(url: str) -> Tuple[str, int]: return host, port -def _average_color( - pixels: Union[List[Tuple[int, int, int]], np.ndarray], -) -> Tuple[int, int, int]: - """Reduce an N-pixel strip to one average RGB triple.""" - if isinstance(pixels, np.ndarray): - if pixels.size == 0: - return (0, 0, 0) - arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3) - mean = arr.mean(axis=0) - return int(mean[0]), int(mean[1]), int(mean[2]) - if not pixels: - return (0, 0, 0) - total_r = total_g = total_b = 0 - for r, g, b in pixels: - total_r += r - total_g += g - total_b += b - n = len(pixels) - return total_r // n, total_g // n, total_b // n - - class _WiZProtocol(asyncio.DatagramProtocol): """Minimal protocol: sends only, drops any inbound packets silently.""" diff --git a/server/src/ledgrab/core/devices/yeelight_client.py b/server/src/ledgrab/core/devices/yeelight_client.py index 3196a2c..910fbd3 100644 --- a/server/src/ledgrab/core/devices/yeelight_client.py +++ b/server/src/ledgrab/core/devices/yeelight_client.py @@ -28,6 +28,7 @@ from urllib.parse import urlparse import numpy as np from ledgrab.core.devices.led_client import DeviceHealth, LEDClient +from ledgrab.core.devices.pixel_reduce import average_color as _average_color from ledgrab.utils import get_logger logger = get_logger(__name__) @@ -56,27 +57,6 @@ def parse_yeelight_url(url: str) -> str: return host -def _average_color( - pixels: Union[List[Tuple[int, int, int]], np.ndarray], -) -> Tuple[int, int, int]: - """Reduce an N-pixel strip to one average RGB triple.""" - if isinstance(pixels, np.ndarray): - if pixels.size == 0: - return (0, 0, 0) - arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3) - mean = arr.mean(axis=0) - return int(mean[0]), int(mean[1]), int(mean[2]) - if not pixels: - return (0, 0, 0) - total_r = total_g = total_b = 0 - for r, g, b in pixels: - total_r += r - total_g += g - total_b += b - n = len(pixels) - return total_r // n, total_g // n, total_b // n - - def _pack_rgb(r: int, g: int, b: int) -> int: """Pack an (R, G, B) triple into the 24-bit integer Yeelight expects.""" return ((r & 0xFF) << 16) | ((g & 0xFF) << 8) | (b & 0xFF)