Files
ledgrab/server/tests/test_opc.py
T
alexei.dolgolyov 31c6c3abb2 feat(devices): Open Pixel Control (OPC) target type
Adds support for Open Pixel Control receivers (Fadecandy boards,
xLights/Falcon endpoints, OPC bridges, art-installation controllers,
hobbyist LED driver software). OPC is a tiny TCP protocol on port
7890 with a 4-byte header [channel][cmd][len_hi][len_lo] + RGB body.

Backend:
- OPCClient opens one persistent TCP connection and streams frames as
  header+body byte pairs. Channel 0 broadcasts to every output on the
  OPC server; channels 1-255 address a specific channel on multi-output
  servers (Fadecandy with multiple Open Pixel chains).
- supports_fast_send=True with a synchronous send_pixels_fast hot path.
  The fast path skips the async drain so the OS write-buffer flushes
  on its own schedule -- exactly what ambilight streaming wants.
- Brightness applies client-side before the frame is sent (OPC has no
  reply channel for hardware-side brightness).
- Health check opens a TCP connection and closes it.
- OPCConfig joins the typed config union; storage gains an opc_channel
  field; full to_dict/from_dict/to_config wiring.
- 36 unit tests cover URL parsing, header construction, send_pixels
  emitting header+body in order, brightness application, list and
  flat-array input shapes, drain behavior, connection lifecycle,
  provider validate/discover/capabilities, Device.to_config round-trip.

Frontend:
- 'opc' in DEVICE_TYPE_KEYS (next to 'ddp'), paper-plane icon -- same
  as DDP since both are open pixel-streaming protocols.
- isOpcDevice predicate + per-type field show/hide.
- Optional channel number input (default 0 = broadcast) with hint copy
  explaining the channel semantics.
- Locale strings in en/ru/zh.

No native discovery (OPC has no discovery protocol); users supply
the receiver IP manually.
2026-05-16 03:02:41 +03:00

342 lines
9.9 KiB
Python

"""Tests for the Open Pixel Control LED client + provider."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock
import numpy as np
import pytest
from ledgrab.core.devices.device_config import OPCConfig
from ledgrab.core.devices.led_client import ProviderDeps
from ledgrab.core.devices.opc_client import (
OPC_CMD_SET_PIXELS,
OPC_PORT,
OPCClient,
_build_set_pixels_header,
parse_opc_url,
)
from ledgrab.core.devices.opc_provider import OPCDeviceProvider
# ============================================================================
# URL parsing
# ============================================================================
@pytest.mark.parametrize(
"url,expected",
[
("opc://192.168.1.50", ("192.168.1.50", OPC_PORT)),
("opc://192.168.1.50:7890", ("192.168.1.50", 7890)),
("opc://192.168.1.50:9000", ("192.168.1.50", 9000)),
("192.168.1.50", ("192.168.1.50", OPC_PORT)),
("192.168.1.50:7890", ("192.168.1.50", 7890)),
("fadecandy.local", ("fadecandy.local", OPC_PORT)),
],
)
def test_parse_opc_url(url, expected):
assert parse_opc_url(url) == expected
@pytest.mark.parametrize("url", ["", " ", "opc://", "://192.168.1.1"])
def test_parse_opc_url_rejects_empty(url):
with pytest.raises(ValueError):
parse_opc_url(url)
# ============================================================================
# Header construction
# ============================================================================
def test_build_set_pixels_header_layout():
"""4-byte header: [channel, command, length_hi, length_lo]"""
header = _build_set_pixels_header(channel=1, body_len=300)
assert len(header) == 4
assert header[0] == 1 # channel
assert header[1] == OPC_CMD_SET_PIXELS # command
assert header[2] == 1 # length_hi (300 >> 8)
assert header[3] == 44 # length_lo (300 & 0xFF)
def test_build_set_pixels_header_broadcast_channel():
header = _build_set_pixels_header(channel=0, body_len=3)
assert header[0] == 0
def test_build_set_pixels_header_clamps_channel_to_byte():
header = _build_set_pixels_header(channel=300, body_len=3)
assert header[0] == 300 & 0xFF # 44
def test_build_set_pixels_header_encodes_large_body_length():
header = _build_set_pixels_header(channel=0, body_len=0xFFFF)
assert header[2] == 0xFF
assert header[3] == 0xFF
# ============================================================================
# OPCClient (mocked transport)
# ============================================================================
def _make_connected_client(channel: int = 0) -> OPCClient:
client = OPCClient("opc://127.0.0.1", led_count=10, channel=channel)
writer = MagicMock()
writer.write = MagicMock()
writer.drain = AsyncMock()
writer.close = MagicMock()
writer.wait_closed = AsyncMock()
client._writer = writer
client._reader = MagicMock()
client._connected = True
return client
def _sent_bytes(client: OPCClient) -> bytes:
"""Concatenate every write made to the mock writer into one byte string."""
return b"".join(call.args[0] for call in client._writer.write.call_args_list)
@pytest.mark.asyncio
async def test_send_pixels_emits_header_then_body():
client = _make_connected_client(channel=1)
pixels = np.array([[10, 20, 30], [40, 50, 60]], dtype=np.uint8)
await client.send_pixels(pixels)
data = _sent_bytes(client)
assert len(data) == 4 + 6 # header + 2 pixels * 3 bytes
assert data[0] == 1 # channel
assert data[1] == OPC_CMD_SET_PIXELS
assert data[2:4] == bytes([0, 6]) # body length 6
assert data[4:] == bytes([10, 20, 30, 40, 50, 60])
@pytest.mark.asyncio
async def test_send_pixels_applies_brightness():
client = _make_connected_client()
pixels = np.array([[200, 100, 50]], dtype=np.uint8)
await client.send_pixels(pixels, brightness=128)
data = _sent_bytes(client)
body = data[4:]
# Each channel scales as int(x * 128 / 255)
expected = bytes([int(200 * 128 / 255), int(100 * 128 / 255), int(50 * 128 / 255)])
assert body == expected
@pytest.mark.asyncio
async def test_send_pixels_brightness_zero_blacks_out():
client = _make_connected_client()
pixels = np.array([[200, 100, 50], [255, 255, 255]], dtype=np.uint8)
await client.send_pixels(pixels, brightness=0)
body = _sent_bytes(client)[4:]
assert body == bytes(6) # all zeros
@pytest.mark.asyncio
async def test_send_pixels_accepts_list():
client = _make_connected_client()
await client.send_pixels([(1, 2, 3), (4, 5, 6)])
body = _sent_bytes(client)[4:]
assert body == bytes([1, 2, 3, 4, 5, 6])
@pytest.mark.asyncio
async def test_send_pixels_reshapes_flat_array():
client = _make_connected_client()
flat = np.array([1, 2, 3, 4, 5, 6], dtype=np.uint8)
await client.send_pixels(flat)
body = _sent_bytes(client)[4:]
assert body == bytes([1, 2, 3, 4, 5, 6])
@pytest.mark.asyncio
async def test_send_pixels_drains_after_write():
client = _make_connected_client()
await client.send_pixels(np.array([[1, 2, 3]], dtype=np.uint8))
client._writer.drain.assert_awaited()
@pytest.mark.asyncio
async def test_send_pixels_when_not_connected_raises():
client = OPCClient("opc://127.0.0.1", led_count=1)
with pytest.raises(RuntimeError, match="not connected"):
await client.send_pixels([(1, 2, 3)])
def test_send_pixels_fast_writes_synchronously_without_drain():
"""Hot path skips drain so OS write-buffer flushes on its own schedule."""
client = _make_connected_client()
client.send_pixels_fast(np.array([[1, 2, 3]], dtype=np.uint8))
data = _sent_bytes(client)
assert data == bytes([0, OPC_CMD_SET_PIXELS, 0, 3, 1, 2, 3])
client._writer.drain.assert_not_called()
def test_send_pixels_fast_when_not_connected_raises():
client = OPCClient("opc://127.0.0.1", led_count=1)
with pytest.raises(RuntimeError, match="not connected"):
client.send_pixels_fast([(1, 2, 3)])
def test_supports_fast_send_is_true():
assert OPCClient("opc://127.0.0.1", led_count=1).supports_fast_send is True
def test_channel_clamps_to_byte():
client = OPCClient("opc://127.0.0.1", led_count=1, channel=300)
assert client.channel == 300 & 0xFF
def test_default_channel_is_broadcast():
client = OPCClient("opc://127.0.0.1", led_count=1)
assert client.channel == 0
@pytest.mark.asyncio
async def test_close_releases_transport():
client = _make_connected_client()
writer = client._writer
await client.close()
writer.close.assert_called_once()
assert client._writer is None
assert client.is_connected is False
# ============================================================================
# Provider
# ============================================================================
def test_provider_device_type_and_capabilities():
provider = OPCDeviceProvider()
assert provider.device_type == "opc"
caps = provider.capabilities
assert "manual_led_count" in caps
assert "health_check" in caps
# OPC has no reply channel; no native power/brightness control
assert "power_control" not in caps
assert "brightness_control" not in caps
@pytest.mark.asyncio
async def test_provider_validate_accepts_bare_host():
provider = OPCDeviceProvider()
assert await provider.validate_device("192.168.1.50") == {}
@pytest.mark.asyncio
async def test_provider_validate_rejects_empty():
provider = OPCDeviceProvider()
with pytest.raises(ValueError, match="Invalid OPC URL"):
await provider.validate_device("")
@pytest.mark.asyncio
async def test_provider_discover_returns_empty():
"""OPC has no native discovery — provider must return [], not raise."""
provider = OPCDeviceProvider()
assert await provider.discover() == []
def test_provider_create_client_threads_config():
provider = OPCDeviceProvider()
config = OPCConfig(
device_id="device_test",
device_url="opc://192.168.1.50:9000",
led_count=144,
opc_channel=3,
)
client = provider.create_client(config, deps=ProviderDeps())
assert isinstance(client, OPCClient)
assert client.host == "192.168.1.50"
assert client.port == 9000
assert client._led_count == 144
assert client.channel == 3
# ============================================================================
# Device.to_config() round-trip
# ============================================================================
def test_device_to_config_round_trip_opc():
from ledgrab.storage.device_store import Device
device = Device(
device_id="device_abc12345",
name="Fadecandy 1",
url="opc://192.168.1.42",
led_count=512,
device_type="opc",
opc_channel=2,
)
config = device.to_config()
assert isinstance(config, OPCConfig)
assert config.device_url == "opc://192.168.1.42"
assert config.led_count == 512
assert config.opc_channel == 2
def test_device_to_dict_omits_opc_default_channel():
from ledgrab.storage.device_store import Device
device = Device(
device_id="device_abc12345",
name="Default",
url="opc://192.168.1.42",
led_count=1,
device_type="opc",
)
assert "opc_channel" not in device.to_dict()
def test_device_to_dict_preserves_non_default_opc_channel():
from ledgrab.storage.device_store import Device
device = Device(
device_id="device_abc12345",
name="Channel 3",
url="opc://192.168.1.42",
led_count=1,
device_type="opc",
opc_channel=3,
)
assert device.to_dict()["opc_channel"] == 3
def test_device_from_dict_opc_round_trip():
from ledgrab.storage.device_store import Device
restored = Device.from_dict(
{
"id": "device_abc12345",
"name": "Roundtrip",
"url": "opc://10.0.0.1",
"led_count": 64,
"device_type": "opc",
"opc_channel": 7,
}
)
assert restored.opc_channel == 7