31c6c3abb2
Adds support for Open Pixel Control receivers (Fadecandy boards, xLights/Falcon endpoints, OPC bridges, art-installation controllers, hobbyist LED driver software). OPC is a tiny TCP protocol on port 7890 with a 4-byte header [channel][cmd][len_hi][len_lo] + RGB body. Backend: - OPCClient opens one persistent TCP connection and streams frames as header+body byte pairs. Channel 0 broadcasts to every output on the OPC server; channels 1-255 address a specific channel on multi-output servers (Fadecandy with multiple Open Pixel chains). - supports_fast_send=True with a synchronous send_pixels_fast hot path. The fast path skips the async drain so the OS write-buffer flushes on its own schedule -- exactly what ambilight streaming wants. - Brightness applies client-side before the frame is sent (OPC has no reply channel for hardware-side brightness). - Health check opens a TCP connection and closes it. - OPCConfig joins the typed config union; storage gains an opc_channel field; full to_dict/from_dict/to_config wiring. - 36 unit tests cover URL parsing, header construction, send_pixels emitting header+body in order, brightness application, list and flat-array input shapes, drain behavior, connection lifecycle, provider validate/discover/capabilities, Device.to_config round-trip. Frontend: - 'opc' in DEVICE_TYPE_KEYS (next to 'ddp'), paper-plane icon -- same as DDP since both are open pixel-streaming protocols. - isOpcDevice predicate + per-type field show/hide. - Optional channel number input (default 0 = broadcast) with hint copy explaining the channel semantics. - Locale strings in en/ru/zh. No native discovery (OPC has no discovery protocol); users supply the receiver IP manually.
342 lines
9.9 KiB
Python
342 lines
9.9 KiB
Python
"""Tests for the Open Pixel Control LED client + provider."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from ledgrab.core.devices.device_config import OPCConfig
|
|
from ledgrab.core.devices.led_client import ProviderDeps
|
|
from ledgrab.core.devices.opc_client import (
|
|
OPC_CMD_SET_PIXELS,
|
|
OPC_PORT,
|
|
OPCClient,
|
|
_build_set_pixels_header,
|
|
parse_opc_url,
|
|
)
|
|
from ledgrab.core.devices.opc_provider import OPCDeviceProvider
|
|
|
|
|
|
# ============================================================================
|
|
# URL parsing
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"url,expected",
|
|
[
|
|
("opc://192.168.1.50", ("192.168.1.50", OPC_PORT)),
|
|
("opc://192.168.1.50:7890", ("192.168.1.50", 7890)),
|
|
("opc://192.168.1.50:9000", ("192.168.1.50", 9000)),
|
|
("192.168.1.50", ("192.168.1.50", OPC_PORT)),
|
|
("192.168.1.50:7890", ("192.168.1.50", 7890)),
|
|
("fadecandy.local", ("fadecandy.local", OPC_PORT)),
|
|
],
|
|
)
|
|
def test_parse_opc_url(url, expected):
|
|
assert parse_opc_url(url) == expected
|
|
|
|
|
|
@pytest.mark.parametrize("url", ["", " ", "opc://", "://192.168.1.1"])
|
|
def test_parse_opc_url_rejects_empty(url):
|
|
with pytest.raises(ValueError):
|
|
parse_opc_url(url)
|
|
|
|
|
|
# ============================================================================
|
|
# Header construction
|
|
# ============================================================================
|
|
|
|
|
|
def test_build_set_pixels_header_layout():
|
|
"""4-byte header: [channel, command, length_hi, length_lo]"""
|
|
header = _build_set_pixels_header(channel=1, body_len=300)
|
|
assert len(header) == 4
|
|
assert header[0] == 1 # channel
|
|
assert header[1] == OPC_CMD_SET_PIXELS # command
|
|
assert header[2] == 1 # length_hi (300 >> 8)
|
|
assert header[3] == 44 # length_lo (300 & 0xFF)
|
|
|
|
|
|
def test_build_set_pixels_header_broadcast_channel():
|
|
header = _build_set_pixels_header(channel=0, body_len=3)
|
|
assert header[0] == 0
|
|
|
|
|
|
def test_build_set_pixels_header_clamps_channel_to_byte():
|
|
header = _build_set_pixels_header(channel=300, body_len=3)
|
|
assert header[0] == 300 & 0xFF # 44
|
|
|
|
|
|
def test_build_set_pixels_header_encodes_large_body_length():
|
|
header = _build_set_pixels_header(channel=0, body_len=0xFFFF)
|
|
assert header[2] == 0xFF
|
|
assert header[3] == 0xFF
|
|
|
|
|
|
# ============================================================================
|
|
# OPCClient (mocked transport)
|
|
# ============================================================================
|
|
|
|
|
|
def _make_connected_client(channel: int = 0) -> OPCClient:
|
|
client = OPCClient("opc://127.0.0.1", led_count=10, channel=channel)
|
|
writer = MagicMock()
|
|
writer.write = MagicMock()
|
|
writer.drain = AsyncMock()
|
|
writer.close = MagicMock()
|
|
writer.wait_closed = AsyncMock()
|
|
client._writer = writer
|
|
client._reader = MagicMock()
|
|
client._connected = True
|
|
return client
|
|
|
|
|
|
def _sent_bytes(client: OPCClient) -> bytes:
|
|
"""Concatenate every write made to the mock writer into one byte string."""
|
|
return b"".join(call.args[0] for call in client._writer.write.call_args_list)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_pixels_emits_header_then_body():
|
|
client = _make_connected_client(channel=1)
|
|
pixels = np.array([[10, 20, 30], [40, 50, 60]], dtype=np.uint8)
|
|
|
|
await client.send_pixels(pixels)
|
|
|
|
data = _sent_bytes(client)
|
|
assert len(data) == 4 + 6 # header + 2 pixels * 3 bytes
|
|
assert data[0] == 1 # channel
|
|
assert data[1] == OPC_CMD_SET_PIXELS
|
|
assert data[2:4] == bytes([0, 6]) # body length 6
|
|
assert data[4:] == bytes([10, 20, 30, 40, 50, 60])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_pixels_applies_brightness():
|
|
client = _make_connected_client()
|
|
pixels = np.array([[200, 100, 50]], dtype=np.uint8)
|
|
|
|
await client.send_pixels(pixels, brightness=128)
|
|
|
|
data = _sent_bytes(client)
|
|
body = data[4:]
|
|
# Each channel scales as int(x * 128 / 255)
|
|
expected = bytes([int(200 * 128 / 255), int(100 * 128 / 255), int(50 * 128 / 255)])
|
|
assert body == expected
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_pixels_brightness_zero_blacks_out():
|
|
client = _make_connected_client()
|
|
pixels = np.array([[200, 100, 50], [255, 255, 255]], dtype=np.uint8)
|
|
|
|
await client.send_pixels(pixels, brightness=0)
|
|
|
|
body = _sent_bytes(client)[4:]
|
|
assert body == bytes(6) # all zeros
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_pixels_accepts_list():
|
|
client = _make_connected_client()
|
|
|
|
await client.send_pixels([(1, 2, 3), (4, 5, 6)])
|
|
|
|
body = _sent_bytes(client)[4:]
|
|
assert body == bytes([1, 2, 3, 4, 5, 6])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_pixels_reshapes_flat_array():
|
|
client = _make_connected_client()
|
|
flat = np.array([1, 2, 3, 4, 5, 6], dtype=np.uint8)
|
|
|
|
await client.send_pixels(flat)
|
|
|
|
body = _sent_bytes(client)[4:]
|
|
assert body == bytes([1, 2, 3, 4, 5, 6])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_pixels_drains_after_write():
|
|
client = _make_connected_client()
|
|
|
|
await client.send_pixels(np.array([[1, 2, 3]], dtype=np.uint8))
|
|
|
|
client._writer.drain.assert_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_pixels_when_not_connected_raises():
|
|
client = OPCClient("opc://127.0.0.1", led_count=1)
|
|
with pytest.raises(RuntimeError, match="not connected"):
|
|
await client.send_pixels([(1, 2, 3)])
|
|
|
|
|
|
def test_send_pixels_fast_writes_synchronously_without_drain():
|
|
"""Hot path skips drain so OS write-buffer flushes on its own schedule."""
|
|
client = _make_connected_client()
|
|
|
|
client.send_pixels_fast(np.array([[1, 2, 3]], dtype=np.uint8))
|
|
|
|
data = _sent_bytes(client)
|
|
assert data == bytes([0, OPC_CMD_SET_PIXELS, 0, 3, 1, 2, 3])
|
|
client._writer.drain.assert_not_called()
|
|
|
|
|
|
def test_send_pixels_fast_when_not_connected_raises():
|
|
client = OPCClient("opc://127.0.0.1", led_count=1)
|
|
with pytest.raises(RuntimeError, match="not connected"):
|
|
client.send_pixels_fast([(1, 2, 3)])
|
|
|
|
|
|
def test_supports_fast_send_is_true():
|
|
assert OPCClient("opc://127.0.0.1", led_count=1).supports_fast_send is True
|
|
|
|
|
|
def test_channel_clamps_to_byte():
|
|
client = OPCClient("opc://127.0.0.1", led_count=1, channel=300)
|
|
assert client.channel == 300 & 0xFF
|
|
|
|
|
|
def test_default_channel_is_broadcast():
|
|
client = OPCClient("opc://127.0.0.1", led_count=1)
|
|
assert client.channel == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_close_releases_transport():
|
|
client = _make_connected_client()
|
|
writer = client._writer
|
|
|
|
await client.close()
|
|
|
|
writer.close.assert_called_once()
|
|
assert client._writer is None
|
|
assert client.is_connected is False
|
|
|
|
|
|
# ============================================================================
|
|
# Provider
|
|
# ============================================================================
|
|
|
|
|
|
def test_provider_device_type_and_capabilities():
|
|
provider = OPCDeviceProvider()
|
|
assert provider.device_type == "opc"
|
|
caps = provider.capabilities
|
|
assert "manual_led_count" in caps
|
|
assert "health_check" in caps
|
|
# OPC has no reply channel; no native power/brightness control
|
|
assert "power_control" not in caps
|
|
assert "brightness_control" not in caps
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_provider_validate_accepts_bare_host():
|
|
provider = OPCDeviceProvider()
|
|
assert await provider.validate_device("192.168.1.50") == {}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_provider_validate_rejects_empty():
|
|
provider = OPCDeviceProvider()
|
|
with pytest.raises(ValueError, match="Invalid OPC URL"):
|
|
await provider.validate_device("")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_provider_discover_returns_empty():
|
|
"""OPC has no native discovery — provider must return [], not raise."""
|
|
provider = OPCDeviceProvider()
|
|
assert await provider.discover() == []
|
|
|
|
|
|
def test_provider_create_client_threads_config():
|
|
provider = OPCDeviceProvider()
|
|
config = OPCConfig(
|
|
device_id="device_test",
|
|
device_url="opc://192.168.1.50:9000",
|
|
led_count=144,
|
|
opc_channel=3,
|
|
)
|
|
|
|
client = provider.create_client(config, deps=ProviderDeps())
|
|
|
|
assert isinstance(client, OPCClient)
|
|
assert client.host == "192.168.1.50"
|
|
assert client.port == 9000
|
|
assert client._led_count == 144
|
|
assert client.channel == 3
|
|
|
|
|
|
# ============================================================================
|
|
# Device.to_config() round-trip
|
|
# ============================================================================
|
|
|
|
|
|
def test_device_to_config_round_trip_opc():
|
|
from ledgrab.storage.device_store import Device
|
|
|
|
device = Device(
|
|
device_id="device_abc12345",
|
|
name="Fadecandy 1",
|
|
url="opc://192.168.1.42",
|
|
led_count=512,
|
|
device_type="opc",
|
|
opc_channel=2,
|
|
)
|
|
|
|
config = device.to_config()
|
|
|
|
assert isinstance(config, OPCConfig)
|
|
assert config.device_url == "opc://192.168.1.42"
|
|
assert config.led_count == 512
|
|
assert config.opc_channel == 2
|
|
|
|
|
|
def test_device_to_dict_omits_opc_default_channel():
|
|
from ledgrab.storage.device_store import Device
|
|
|
|
device = Device(
|
|
device_id="device_abc12345",
|
|
name="Default",
|
|
url="opc://192.168.1.42",
|
|
led_count=1,
|
|
device_type="opc",
|
|
)
|
|
assert "opc_channel" not in device.to_dict()
|
|
|
|
|
|
def test_device_to_dict_preserves_non_default_opc_channel():
|
|
from ledgrab.storage.device_store import Device
|
|
|
|
device = Device(
|
|
device_id="device_abc12345",
|
|
name="Channel 3",
|
|
url="opc://192.168.1.42",
|
|
led_count=1,
|
|
device_type="opc",
|
|
opc_channel=3,
|
|
)
|
|
assert device.to_dict()["opc_channel"] == 3
|
|
|
|
|
|
def test_device_from_dict_opc_round_trip():
|
|
from ledgrab.storage.device_store import Device
|
|
|
|
restored = Device.from_dict(
|
|
{
|
|
"id": "device_abc12345",
|
|
"name": "Roundtrip",
|
|
"url": "opc://10.0.0.1",
|
|
"led_count": 64,
|
|
"device_type": "opc",
|
|
"opc_channel": 7,
|
|
}
|
|
)
|
|
assert restored.opc_channel == 7
|