refactor(devices): extract _average_color to pixel_reduce

Six single-pixel LED clients (Yeelight, WiZ, LIFX, Govee, Nanoleaf, BLE)
had byte-for-byte identical local copies of the strip-averaging helper.
Consolidates into core/devices/pixel_reduce.average_color so the next
single-pixel driver can drop the local copy and so behavior changes
land in one place.

Hue is intentionally left out -- its Entertainment API addresses up to
seven lights individually rather than averaging.

Behavior is byte-identical (each call site re-imports under the same
underscore-prefixed local name). 1358 tests still pass.
This commit is contained in:
2026-05-16 04:14:36 +03:00
parent 426484adf8
commit cc87fba0dd
7 changed files with 48 additions and 124 deletions
+1 -19
View File
@@ -24,6 +24,7 @@ import numpy as np
from ledgrab.core.devices.ble_protocols import BLEProtocol, get_protocol
from ledgrab.core.devices.ble_transport import make_transport
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
from ledgrab.core.devices.pixel_reduce import average_color as _average_color
from ledgrab.utils import get_logger
logger = get_logger(__name__)
@@ -63,25 +64,6 @@ def _strip_ble_scheme(url: str) -> str:
return url.strip("/")
def _average_color(pixels: Union[List[Tuple[int, int, int]], np.ndarray]) -> Tuple[int, int, int]:
"""Reduce an N-pixel strip to one average RGB."""
if isinstance(pixels, np.ndarray):
if pixels.size == 0:
return (0, 0, 0)
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
mean = arr.mean(axis=0)
return int(mean[0]), int(mean[1]), int(mean[2])
if not pixels:
return (0, 0, 0)
total_r = total_g = total_b = 0
for r, g, b in pixels:
total_r += r
total_g += g
total_b += b
n = len(pixels)
return total_r // n, total_g // n, total_b // n
class BLEClient(LEDClient):
"""LED client for BLE controllers speaking one of the registered protocols.
@@ -27,6 +27,7 @@ from urllib.parse import urlparse
import numpy as np
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
from ledgrab.core.devices.pixel_reduce import average_color as _average_color
from ledgrab.utils import get_logger
logger = get_logger(__name__)
@@ -58,27 +59,6 @@ def parse_govee_url(url: str) -> str:
return host
def _average_color(
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
) -> Tuple[int, int, int]:
"""Reduce an N-pixel strip to one average RGB triple."""
if isinstance(pixels, np.ndarray):
if pixels.size == 0:
return (0, 0, 0)
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
mean = arr.mean(axis=0)
return int(mean[0]), int(mean[1]), int(mean[2])
if not pixels:
return (0, 0, 0)
total_r = total_g = total_b = 0
for r, g, b in pixels:
total_r += r
total_g += g
total_b += b
n = len(pixels)
return total_r // n, total_g // n, total_b // n
class _GoveeProtocol(asyncio.DatagramProtocol):
"""Write-only datagram protocol. Bulb replies (on 4002) are not collected here."""
+1 -21
View File
@@ -28,6 +28,7 @@ from urllib.parse import urlparse
import numpy as np
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
from ledgrab.core.devices.pixel_reduce import average_color as _average_color
from ledgrab.utils import get_logger
logger = get_logger(__name__)
@@ -66,27 +67,6 @@ def parse_lifx_url(url: str) -> Tuple[str, int]:
return host, port
def _average_color(
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
) -> Tuple[int, int, int]:
"""Reduce an N-pixel strip to one average RGB triple."""
if isinstance(pixels, np.ndarray):
if pixels.size == 0:
return (0, 0, 0)
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
mean = arr.mean(axis=0)
return int(mean[0]), int(mean[1]), int(mean[2])
if not pixels:
return (0, 0, 0)
total_r = total_g = total_b = 0
for r, g, b in pixels:
total_r += r
total_g += g
total_b += b
n = len(pixels)
return total_r // n, total_g // n, total_b // n
def rgb_to_hsbk(r: int, g: int, b: int, kelvin: int = 3500) -> Tuple[int, int, int, int]:
"""Convert 8-bit RGB to LIFX 16-bit HSBK.
@@ -31,6 +31,7 @@ import httpx
import numpy as np
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient, PairingNotReady
from ledgrab.core.devices.pixel_reduce import average_color as _average_color
from ledgrab.utils import get_logger
logger = get_logger(__name__)
@@ -60,27 +61,6 @@ def parse_nanoleaf_url(url: str) -> str:
return host
def _average_color(
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
) -> Tuple[int, int, int]:
"""Reduce an N-pixel strip to one average RGB triple."""
if isinstance(pixels, np.ndarray):
if pixels.size == 0:
return (0, 0, 0)
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
mean = arr.mean(axis=0)
return int(mean[0]), int(mean[1]), int(mean[2])
if not pixels:
return (0, 0, 0)
total_r = total_g = total_b = 0
for r, g, b in pixels:
total_r += r
total_g += g
total_b += b
n = len(pixels)
return total_r // n, total_g // n, total_b // n
def rgb_to_hsb(r: int, g: int, b: int) -> Tuple[int, int, int]:
"""Convert 8-bit RGB to Nanoleaf HSB (hue 0-360, sat 0-100, bri 0-100).
@@ -0,0 +1,42 @@
"""Pixel-reduction helpers shared across single-pixel LED clients.
Single-pixel devices (Yeelight, WiZ, LIFX, Govee, Nanoleaf, BLE bulbs)
all need to collapse an N-pixel strip down to one RGB triple before
sending it on the wire. This module is the single home for that reduction
so the next single-pixel driver can drop the local copy.
Hue is the exception — its Entertainment API addresses up to seven
zones individually, so it doesn't reduce.
"""
from __future__ import annotations
from typing import List, Tuple, Union
import numpy as np
def average_color(
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
) -> Tuple[int, int, int]:
"""Reduce an N-pixel strip to one average RGB triple.
Accepts either a list of ``(r, g, b)`` tuples or an ``(N, 3)`` uint8
numpy array. Empty inputs return ``(0, 0, 0)`` rather than raising so
callers can pass through a degenerate frame without branching.
"""
if isinstance(pixels, np.ndarray):
if pixels.size == 0:
return (0, 0, 0)
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
mean = arr.mean(axis=0)
return int(mean[0]), int(mean[1]), int(mean[2])
if not pixels:
return (0, 0, 0)
total_r = total_g = total_b = 0
for r, g, b in pixels:
total_r += r
total_g += g
total_b += b
n = len(pixels)
return total_r // n, total_g // n, total_b // n
+1 -21
View File
@@ -23,6 +23,7 @@ from urllib.parse import urlparse
import numpy as np
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
from ledgrab.core.devices.pixel_reduce import average_color as _average_color
from ledgrab.utils import get_logger
logger = get_logger(__name__)
@@ -49,27 +50,6 @@ def parse_wiz_url(url: str) -> Tuple[str, int]:
return host, port
def _average_color(
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
) -> Tuple[int, int, int]:
"""Reduce an N-pixel strip to one average RGB triple."""
if isinstance(pixels, np.ndarray):
if pixels.size == 0:
return (0, 0, 0)
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
mean = arr.mean(axis=0)
return int(mean[0]), int(mean[1]), int(mean[2])
if not pixels:
return (0, 0, 0)
total_r = total_g = total_b = 0
for r, g, b in pixels:
total_r += r
total_g += g
total_b += b
n = len(pixels)
return total_r // n, total_g // n, total_b // n
class _WiZProtocol(asyncio.DatagramProtocol):
"""Minimal protocol: sends only, drops any inbound packets silently."""
@@ -28,6 +28,7 @@ from urllib.parse import urlparse
import numpy as np
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
from ledgrab.core.devices.pixel_reduce import average_color as _average_color
from ledgrab.utils import get_logger
logger = get_logger(__name__)
@@ -56,27 +57,6 @@ def parse_yeelight_url(url: str) -> str:
return host
def _average_color(
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
) -> Tuple[int, int, int]:
"""Reduce an N-pixel strip to one average RGB triple."""
if isinstance(pixels, np.ndarray):
if pixels.size == 0:
return (0, 0, 0)
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
mean = arr.mean(axis=0)
return int(mean[0]), int(mean[1]), int(mean[2])
if not pixels:
return (0, 0, 0)
total_r = total_g = total_b = 0
for r, g, b in pixels:
total_r += r
total_g += g
total_b += b
n = len(pixels)
return total_r // n, total_g // n, total_b // n
def _pack_rgb(r: int, g: int, b: int) -> int:
"""Pack an (R, G, B) triple into the 24-bit integer Yeelight expects."""
return ((r & 0xFF) << 16) | ((g & 0xFF) << 8) | (b & 0xFF)