refactor(devices): extract _average_color to pixel_reduce
Six single-pixel LED clients (Yeelight, WiZ, LIFX, Govee, Nanoleaf, BLE) had byte-for-byte identical local copies of the strip-averaging helper. Consolidates into core/devices/pixel_reduce.average_color so the next single-pixel driver can drop the local copy and so behavior changes land in one place. Hue is intentionally left out -- its Entertainment API addresses up to seven lights individually rather than averaging. Behavior is byte-identical (each call site re-imports under the same underscore-prefixed local name). 1358 tests still pass.
This commit is contained in:
@@ -24,6 +24,7 @@ import numpy as np
|
||||
from ledgrab.core.devices.ble_protocols import BLEProtocol, get_protocol
|
||||
from ledgrab.core.devices.ble_transport import make_transport
|
||||
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
|
||||
from ledgrab.core.devices.pixel_reduce import average_color as _average_color
|
||||
from ledgrab.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
@@ -63,25 +64,6 @@ def _strip_ble_scheme(url: str) -> str:
|
||||
return url.strip("/")
|
||||
|
||||
|
||||
def _average_color(pixels: Union[List[Tuple[int, int, int]], np.ndarray]) -> Tuple[int, int, int]:
|
||||
"""Reduce an N-pixel strip to one average RGB."""
|
||||
if isinstance(pixels, np.ndarray):
|
||||
if pixels.size == 0:
|
||||
return (0, 0, 0)
|
||||
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
|
||||
mean = arr.mean(axis=0)
|
||||
return int(mean[0]), int(mean[1]), int(mean[2])
|
||||
if not pixels:
|
||||
return (0, 0, 0)
|
||||
total_r = total_g = total_b = 0
|
||||
for r, g, b in pixels:
|
||||
total_r += r
|
||||
total_g += g
|
||||
total_b += b
|
||||
n = len(pixels)
|
||||
return total_r // n, total_g // n, total_b // n
|
||||
|
||||
|
||||
class BLEClient(LEDClient):
|
||||
"""LED client for BLE controllers speaking one of the registered protocols.
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ from urllib.parse import urlparse
|
||||
import numpy as np
|
||||
|
||||
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
|
||||
from ledgrab.core.devices.pixel_reduce import average_color as _average_color
|
||||
from ledgrab.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
@@ -58,27 +59,6 @@ def parse_govee_url(url: str) -> str:
|
||||
return host
|
||||
|
||||
|
||||
def _average_color(
|
||||
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
|
||||
) -> Tuple[int, int, int]:
|
||||
"""Reduce an N-pixel strip to one average RGB triple."""
|
||||
if isinstance(pixels, np.ndarray):
|
||||
if pixels.size == 0:
|
||||
return (0, 0, 0)
|
||||
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
|
||||
mean = arr.mean(axis=0)
|
||||
return int(mean[0]), int(mean[1]), int(mean[2])
|
||||
if not pixels:
|
||||
return (0, 0, 0)
|
||||
total_r = total_g = total_b = 0
|
||||
for r, g, b in pixels:
|
||||
total_r += r
|
||||
total_g += g
|
||||
total_b += b
|
||||
n = len(pixels)
|
||||
return total_r // n, total_g // n, total_b // n
|
||||
|
||||
|
||||
class _GoveeProtocol(asyncio.DatagramProtocol):
|
||||
"""Write-only datagram protocol. Bulb replies (on 4002) are not collected here."""
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ from urllib.parse import urlparse
|
||||
import numpy as np
|
||||
|
||||
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
|
||||
from ledgrab.core.devices.pixel_reduce import average_color as _average_color
|
||||
from ledgrab.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
@@ -66,27 +67,6 @@ def parse_lifx_url(url: str) -> Tuple[str, int]:
|
||||
return host, port
|
||||
|
||||
|
||||
def _average_color(
|
||||
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
|
||||
) -> Tuple[int, int, int]:
|
||||
"""Reduce an N-pixel strip to one average RGB triple."""
|
||||
if isinstance(pixels, np.ndarray):
|
||||
if pixels.size == 0:
|
||||
return (0, 0, 0)
|
||||
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
|
||||
mean = arr.mean(axis=0)
|
||||
return int(mean[0]), int(mean[1]), int(mean[2])
|
||||
if not pixels:
|
||||
return (0, 0, 0)
|
||||
total_r = total_g = total_b = 0
|
||||
for r, g, b in pixels:
|
||||
total_r += r
|
||||
total_g += g
|
||||
total_b += b
|
||||
n = len(pixels)
|
||||
return total_r // n, total_g // n, total_b // n
|
||||
|
||||
|
||||
def rgb_to_hsbk(r: int, g: int, b: int, kelvin: int = 3500) -> Tuple[int, int, int, int]:
|
||||
"""Convert 8-bit RGB to LIFX 16-bit HSBK.
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ import httpx
|
||||
import numpy as np
|
||||
|
||||
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient, PairingNotReady
|
||||
from ledgrab.core.devices.pixel_reduce import average_color as _average_color
|
||||
from ledgrab.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
@@ -60,27 +61,6 @@ def parse_nanoleaf_url(url: str) -> str:
|
||||
return host
|
||||
|
||||
|
||||
def _average_color(
|
||||
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
|
||||
) -> Tuple[int, int, int]:
|
||||
"""Reduce an N-pixel strip to one average RGB triple."""
|
||||
if isinstance(pixels, np.ndarray):
|
||||
if pixels.size == 0:
|
||||
return (0, 0, 0)
|
||||
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
|
||||
mean = arr.mean(axis=0)
|
||||
return int(mean[0]), int(mean[1]), int(mean[2])
|
||||
if not pixels:
|
||||
return (0, 0, 0)
|
||||
total_r = total_g = total_b = 0
|
||||
for r, g, b in pixels:
|
||||
total_r += r
|
||||
total_g += g
|
||||
total_b += b
|
||||
n = len(pixels)
|
||||
return total_r // n, total_g // n, total_b // n
|
||||
|
||||
|
||||
def rgb_to_hsb(r: int, g: int, b: int) -> Tuple[int, int, int]:
|
||||
"""Convert 8-bit RGB to Nanoleaf HSB (hue 0-360, sat 0-100, bri 0-100).
|
||||
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
"""Pixel-reduction helpers shared across single-pixel LED clients.
|
||||
|
||||
Single-pixel devices (Yeelight, WiZ, LIFX, Govee, Nanoleaf, BLE bulbs)
|
||||
all need to collapse an N-pixel strip down to one RGB triple before
|
||||
sending it on the wire. This module is the single home for that reduction
|
||||
so the next single-pixel driver can drop the local copy.
|
||||
|
||||
Hue is the exception — its Entertainment API addresses up to seven
|
||||
zones individually, so it doesn't reduce.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List, Tuple, Union
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
def average_color(
|
||||
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
|
||||
) -> Tuple[int, int, int]:
|
||||
"""Reduce an N-pixel strip to one average RGB triple.
|
||||
|
||||
Accepts either a list of ``(r, g, b)`` tuples or an ``(N, 3)`` uint8
|
||||
numpy array. Empty inputs return ``(0, 0, 0)`` rather than raising so
|
||||
callers can pass through a degenerate frame without branching.
|
||||
"""
|
||||
if isinstance(pixels, np.ndarray):
|
||||
if pixels.size == 0:
|
||||
return (0, 0, 0)
|
||||
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
|
||||
mean = arr.mean(axis=0)
|
||||
return int(mean[0]), int(mean[1]), int(mean[2])
|
||||
if not pixels:
|
||||
return (0, 0, 0)
|
||||
total_r = total_g = total_b = 0
|
||||
for r, g, b in pixels:
|
||||
total_r += r
|
||||
total_g += g
|
||||
total_b += b
|
||||
n = len(pixels)
|
||||
return total_r // n, total_g // n, total_b // n
|
||||
@@ -23,6 +23,7 @@ from urllib.parse import urlparse
|
||||
import numpy as np
|
||||
|
||||
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
|
||||
from ledgrab.core.devices.pixel_reduce import average_color as _average_color
|
||||
from ledgrab.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
@@ -49,27 +50,6 @@ def parse_wiz_url(url: str) -> Tuple[str, int]:
|
||||
return host, port
|
||||
|
||||
|
||||
def _average_color(
|
||||
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
|
||||
) -> Tuple[int, int, int]:
|
||||
"""Reduce an N-pixel strip to one average RGB triple."""
|
||||
if isinstance(pixels, np.ndarray):
|
||||
if pixels.size == 0:
|
||||
return (0, 0, 0)
|
||||
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
|
||||
mean = arr.mean(axis=0)
|
||||
return int(mean[0]), int(mean[1]), int(mean[2])
|
||||
if not pixels:
|
||||
return (0, 0, 0)
|
||||
total_r = total_g = total_b = 0
|
||||
for r, g, b in pixels:
|
||||
total_r += r
|
||||
total_g += g
|
||||
total_b += b
|
||||
n = len(pixels)
|
||||
return total_r // n, total_g // n, total_b // n
|
||||
|
||||
|
||||
class _WiZProtocol(asyncio.DatagramProtocol):
|
||||
"""Minimal protocol: sends only, drops any inbound packets silently."""
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ from urllib.parse import urlparse
|
||||
import numpy as np
|
||||
|
||||
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
|
||||
from ledgrab.core.devices.pixel_reduce import average_color as _average_color
|
||||
from ledgrab.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
@@ -56,27 +57,6 @@ def parse_yeelight_url(url: str) -> str:
|
||||
return host
|
||||
|
||||
|
||||
def _average_color(
|
||||
pixels: Union[List[Tuple[int, int, int]], np.ndarray],
|
||||
) -> Tuple[int, int, int]:
|
||||
"""Reduce an N-pixel strip to one average RGB triple."""
|
||||
if isinstance(pixels, np.ndarray):
|
||||
if pixels.size == 0:
|
||||
return (0, 0, 0)
|
||||
arr = pixels.reshape(-1, 3) if pixels.ndim > 1 else pixels[:3].reshape(1, 3)
|
||||
mean = arr.mean(axis=0)
|
||||
return int(mean[0]), int(mean[1]), int(mean[2])
|
||||
if not pixels:
|
||||
return (0, 0, 0)
|
||||
total_r = total_g = total_b = 0
|
||||
for r, g, b in pixels:
|
||||
total_r += r
|
||||
total_g += g
|
||||
total_b += b
|
||||
n = len(pixels)
|
||||
return total_r // n, total_g // n, total_b // n
|
||||
|
||||
|
||||
def _pack_rgb(r: int, g: int, b: int) -> int:
|
||||
"""Pack an (R, G, B) triple into the 24-bit integer Yeelight expects."""
|
||||
return ((r & 0xFF) << 16) | ((g & 0xFF) << 8) | (b & 0xFF)
|
||||
|
||||
Reference in New Issue
Block a user