45f93fd30e
SP110E peripherals silently tear down the GATT link ~1s after connect unless a two-write vendor handshake (01 00 → FFE2, 01 B7 E3 D5 → FFE1) arrives immediately. Without it the first real write hangs 30s then reconnect-loops forever. Adds optional BLEProtocol.init_writes executed on connect, plumbs a per-write char_uuid through both transports, and fixes the SP110E color/power frames from an incorrect 5 bytes to the documented 4 bytes. Windows/WinRT robustness: - asyncio.wait_for hangs on bleak because WinRT IAsyncOperations refuse to cancel. _bounded_await() uses asyncio.wait() instead so timeouts actually return control even when the inner task is uncancellable. - BleakClient connect by raw MAC string times out when WinRT guesses address type wrong; switched to pre-scanning with BleakScanner and passing the resolved BLEDevice, which carries the address type. - Target-start fetch timeout bumped to 30s with retry disabled so the UI doesn't abort during the BLE pre-scan + connect + handshake path. UI: - Settings modal exposes Protocol Family (IconSelect grid, shared with add-device via parameterized ensureBleFamilyIconSelect) so users can fix a wrong family pick without recreating the device. Govee AES key row toggles on/off with family selection. Also turns LAN auth back on in default_config.yaml, logs start_processing requests on entry for easier diagnosis, and captures the full debug trail in docs/BLE_LED_CONTROLLERS.md for future BLE work. Refs the mbullington SP110E protocol gist for the handshake bytes.
242 lines
9.6 KiB
Python
242 lines
9.6 KiB
Python
"""Tests for BLEClient using a fake transport — no bleak, no hardware."""
|
|
|
|
from typing import List
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from ledgrab.core.devices import ble_client as ble_client_module
|
|
from ledgrab.core.devices.ble_client import (
|
|
BLEClient,
|
|
_average_color,
|
|
_encrypt_govee_frame,
|
|
_strip_ble_scheme,
|
|
)
|
|
|
|
|
|
class FakeTransport:
|
|
"""In-memory stand-in for ``BLETransport`` — records every write."""
|
|
|
|
def __init__(self, *_, **__):
|
|
self.writes: List[bytes] = []
|
|
# Full log preserving char_uuid alongside payload so tests can
|
|
# assert on the SP110E vendor handshake's characteristic targets.
|
|
self.writes_detailed: List[tuple] = []
|
|
self._connected = False
|
|
|
|
@property
|
|
def is_connected(self) -> bool:
|
|
return self._connected
|
|
|
|
async def connect(self) -> None:
|
|
self._connected = True
|
|
|
|
async def close(self) -> None:
|
|
self._connected = False
|
|
|
|
async def write(self, data: bytes, char_uuid: str | None = None) -> None:
|
|
if not self._connected:
|
|
raise RuntimeError("fake transport not connected")
|
|
self.writes.append(data)
|
|
self.writes_detailed.append((char_uuid, data))
|
|
|
|
@property
|
|
def color_writes(self) -> list:
|
|
"""Writes that went to the protocol's default write characteristic
|
|
(i.e. not the init handshake). Most existing tests want this view."""
|
|
return [data for (char_uuid, data) in self.writes_detailed if char_uuid is None]
|
|
|
|
|
|
@pytest.fixture
|
|
def fake_transport_cls(monkeypatch):
|
|
monkeypatch.setattr(ble_client_module, "make_transport", lambda *_, **__: FakeTransport())
|
|
# Disable the inter-write delay so tests don't sleep.
|
|
monkeypatch.setattr(ble_client_module, "_MIN_WRITE_INTERVAL_SEC", 0.0)
|
|
return FakeTransport
|
|
|
|
|
|
class TestStripBleScheme:
|
|
def test_strips_ble_prefix(self):
|
|
assert _strip_ble_scheme("ble://AA:BB:CC:DD:EE:FF") == "AA:BB:CC:DD:EE:FF"
|
|
|
|
def test_leaves_bare_address_alone(self):
|
|
assert _strip_ble_scheme("AA:BB:CC:DD:EE:FF") == "AA:BB:CC:DD:EE:FF"
|
|
|
|
def test_strips_trailing_slashes(self):
|
|
assert _strip_ble_scheme("ble://abc/") == "abc"
|
|
|
|
|
|
class TestAverageColor:
|
|
def test_returns_black_on_empty_list(self):
|
|
assert _average_color([]) == (0, 0, 0)
|
|
|
|
def test_averages_list_of_tuples(self):
|
|
assert _average_color([(255, 0, 0), (0, 255, 0), (0, 0, 255)]) == (85, 85, 85)
|
|
|
|
def test_returns_black_on_empty_array(self):
|
|
assert _average_color(np.empty((0, 3), dtype=np.uint8)) == (0, 0, 0)
|
|
|
|
def test_averages_numpy_array(self):
|
|
arr = np.array([[100, 100, 100], [200, 200, 200]], dtype=np.uint8)
|
|
assert _average_color(arr) == (150, 150, 150)
|
|
|
|
|
|
class TestBLEClientLifecycle:
|
|
@pytest.mark.asyncio
|
|
async def test_connect_sets_connected_flag(self, fake_transport_cls):
|
|
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
|
|
assert not client.is_connected
|
|
await client.connect()
|
|
assert client.is_connected
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connect_runs_vendor_init_handshake(self, fake_transport_cls):
|
|
# SP110E requires a two-write handshake on connect or the GATT
|
|
# link silently drops — verify both writes actually go out.
|
|
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
|
|
await client.connect()
|
|
init_writes = client._transport.writes_detailed
|
|
assert len(init_writes) == 2
|
|
(char_a, payload_a), (char_b, payload_b) = init_writes
|
|
assert "ffe2" in char_a and payload_a == b"\x01\x00"
|
|
assert "ffe1" in char_b and payload_b == b"\x01\xb7\xe3\xd5"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_close_does_not_send_power_off(self, fake_transport_cls):
|
|
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
|
|
await client.connect()
|
|
await client.close()
|
|
# Strip is left in whatever state it's in — rapid power toggles on
|
|
# connect/close cause BLE stack hangs on Windows. (Only check color
|
|
# writes since the vendor init handshake is unrelated.)
|
|
assert bytes((0, 0, 0, 0xAB)) not in client._transport.color_writes
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unknown_family_raises(self):
|
|
with pytest.raises(ValueError, match="Unknown BLE family"):
|
|
BLEClient("ble://AA:BB:CC", ble_family="not-real")
|
|
|
|
|
|
class TestBLEClientSendPixels:
|
|
@pytest.mark.asyncio
|
|
async def test_send_averages_and_writes_one_frame(self, fake_transport_cls):
|
|
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
|
|
await client.connect()
|
|
ok = await client.send_pixels([(255, 0, 0), (0, 0, 0)], brightness=255)
|
|
assert ok
|
|
# Color frames go to the default write characteristic; init-handshake
|
|
# writes have their own char_uuid and don't count here.
|
|
color_writes = client._transport.color_writes
|
|
assert len(color_writes) == 1
|
|
# Averaged to (127, 0, 0), SP110E 4-byte frame with 0x1E cmd tail.
|
|
assert color_writes[0] == bytes((127, 0, 0, 0x1E))
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_duplicate_frames_are_dropped(self, fake_transport_cls):
|
|
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
|
|
await client.connect()
|
|
await client.send_pixels([(10, 20, 30)])
|
|
await client.send_pixels([(10, 20, 30)])
|
|
await client.send_pixels([(10, 20, 30)])
|
|
assert len(client._transport.color_writes) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_returns_false_when_disconnected(self, fake_transport_cls):
|
|
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
|
|
# No connect() call.
|
|
ok = await client.send_pixels([(1, 2, 3)])
|
|
assert ok is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_triones_frame_format(self, fake_transport_cls):
|
|
client = BLEClient("ble://AA:BB:CC", ble_family="triones")
|
|
await client.connect()
|
|
await client.send_pixels([(100, 150, 200)])
|
|
# Triones has no init handshake — first write is the color frame.
|
|
frame = client._transport.writes[0]
|
|
assert frame == bytes((0x7E, 0x07, 0x05, 0x03, 100, 150, 200, 0x10, 0xEF))
|
|
|
|
|
|
class TestBLEClientHealth:
|
|
@pytest.mark.asyncio
|
|
async def test_health_preserves_previous_state(self):
|
|
from ledgrab.core.devices.led_client import DeviceHealth
|
|
|
|
prev = DeviceHealth(online=True, device_name="Test", device_led_count=60)
|
|
result = await BLEClient.check_health("ble://AA:BB:CC", http_client=None, prev_health=prev)
|
|
assert result.online is True
|
|
assert result.device_name == "Test"
|
|
assert result.device_led_count == 60
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_health_defaults_offline_when_no_prev(self):
|
|
result = await BLEClient.check_health("ble://AA:BB:CC", http_client=None)
|
|
assert result.online is False
|
|
assert result.device_name == "AA:BB:CC"
|
|
|
|
|
|
class TestGoveeAESEncryption:
|
|
_KEY = bytes.fromhex("74657374746573747465737474657374") # "testtesttesttest"
|
|
|
|
def test_encrypt_produces_32_bytes(self):
|
|
from ledgrab.core.devices.ble_protocols.govee import encode_color
|
|
|
|
frame = encode_color(255, 0, 0)
|
|
encrypted = _encrypt_govee_frame(frame, self._KEY)
|
|
assert len(encrypted) == 32
|
|
|
|
def test_encrypt_is_deterministic(self):
|
|
frame = bytes(range(20))
|
|
assert _encrypt_govee_frame(frame, self._KEY) == _encrypt_govee_frame(frame, self._KEY)
|
|
|
|
def test_encrypt_differs_from_plaintext(self):
|
|
frame = bytes(range(20))
|
|
assert _encrypt_govee_frame(frame, self._KEY) != frame + b"\x00" * 12
|
|
|
|
def test_decrypt_roundtrip(self):
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
|
|
frame = bytes(range(20))
|
|
encrypted = _encrypt_govee_frame(frame, self._KEY)
|
|
cipher = Cipher(algorithms.AES(self._KEY), modes.ECB())
|
|
dec = cipher.decryptor()
|
|
decrypted = dec.update(encrypted) + dec.finalize()
|
|
assert decrypted[:20] == frame
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_govee_client_encrypts_frames(self, fake_transport_cls):
|
|
key_hex = self._KEY.hex()
|
|
client = BLEClient("ble://AA:BB:CC", ble_family="govee", ble_govee_key=key_hex)
|
|
await client.connect()
|
|
await client.send_pixels([(255, 0, 0)])
|
|
assert len(client._transport.writes) == 1
|
|
# Encrypted frame is 32 bytes, not the raw 20.
|
|
assert len(client._transport.writes[0]) == 32
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_govee_client_without_key_sends_plaintext(self, fake_transport_cls):
|
|
client = BLEClient("ble://AA:BB:CC", ble_family="govee")
|
|
await client.connect()
|
|
await client.send_pixels([(255, 0, 0)])
|
|
assert len(client._transport.writes[0]) == 20
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_key_hex_falls_back_gracefully(self, fake_transport_cls):
|
|
client = BLEClient("ble://AA:BB:CC", ble_family="govee", ble_govee_key="not-hex!")
|
|
await client.connect()
|
|
ok = await client.send_pixels([(100, 100, 100)])
|
|
assert ok is True
|
|
assert len(client._transport.writes[0]) == 20 # plaintext fallback
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_govee_key_ignored_for_non_govee_family(self, fake_transport_cls):
|
|
key_hex = self._KEY.hex()
|
|
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e", ble_govee_key=key_hex)
|
|
assert client._aes_key is None
|
|
await client.connect()
|
|
await client.send_pixels([(100, 100, 100)])
|
|
# SP110E color frame is 4 bytes (RR GG BB CMD), not encrypted.
|
|
color_writes = client._transport.color_writes
|
|
assert len(color_writes) == 1
|
|
assert len(color_writes[0]) == 4
|