Files
ledgrab/server/tests/test_ble_client.py
alexei.dolgolyov 45f93fd30e
Build Android APK / build-android (push) Failing after 1m38s
Lint & Test / test (push) Successful in 4m32s
fix(devices): SP110E vendor handshake + Windows/bleak robustness
SP110E peripherals silently tear down the GATT link ~1s after connect
unless a two-write vendor handshake (01 00 → FFE2, 01 B7 E3 D5 → FFE1)
arrives immediately. Without it the first real write hangs 30s then
reconnect-loops forever. Adds optional BLEProtocol.init_writes executed
on connect, plumbs a per-write char_uuid through both transports, and
fixes the SP110E color/power frames from an incorrect 5 bytes to the
documented 4 bytes.

Windows/WinRT robustness:
- asyncio.wait_for hangs on bleak because WinRT IAsyncOperations refuse
  to cancel. _bounded_await() uses asyncio.wait() instead so timeouts
  actually return control even when the inner task is uncancellable.
- BleakClient connect by raw MAC string times out when WinRT guesses
  address type wrong; switched to pre-scanning with BleakScanner and
  passing the resolved BLEDevice, which carries the address type.
- Target-start fetch timeout bumped to 30s with retry disabled so the
  UI doesn't abort during the BLE pre-scan + connect + handshake path.

UI:
- Settings modal exposes Protocol Family (IconSelect grid, shared with
  add-device via parameterized ensureBleFamilyIconSelect) so users can
  fix a wrong family pick without recreating the device. Govee AES key
  row toggles on/off with family selection.

Also turns LAN auth back on in default_config.yaml, logs start_processing
requests on entry for easier diagnosis, and captures the full debug trail
in docs/BLE_LED_CONTROLLERS.md for future BLE work.

Refs the mbullington SP110E protocol gist for the handshake bytes.
2026-04-21 17:45:21 +03:00

242 lines
9.6 KiB
Python

"""Tests for BLEClient using a fake transport — no bleak, no hardware."""
from typing import List
import numpy as np
import pytest
from ledgrab.core.devices import ble_client as ble_client_module
from ledgrab.core.devices.ble_client import (
BLEClient,
_average_color,
_encrypt_govee_frame,
_strip_ble_scheme,
)
class FakeTransport:
"""In-memory stand-in for ``BLETransport`` — records every write."""
def __init__(self, *_, **__):
self.writes: List[bytes] = []
# Full log preserving char_uuid alongside payload so tests can
# assert on the SP110E vendor handshake's characteristic targets.
self.writes_detailed: List[tuple] = []
self._connected = False
@property
def is_connected(self) -> bool:
return self._connected
async def connect(self) -> None:
self._connected = True
async def close(self) -> None:
self._connected = False
async def write(self, data: bytes, char_uuid: str | None = None) -> None:
if not self._connected:
raise RuntimeError("fake transport not connected")
self.writes.append(data)
self.writes_detailed.append((char_uuid, data))
@property
def color_writes(self) -> list:
"""Writes that went to the protocol's default write characteristic
(i.e. not the init handshake). Most existing tests want this view."""
return [data for (char_uuid, data) in self.writes_detailed if char_uuid is None]
@pytest.fixture
def fake_transport_cls(monkeypatch):
monkeypatch.setattr(ble_client_module, "make_transport", lambda *_, **__: FakeTransport())
# Disable the inter-write delay so tests don't sleep.
monkeypatch.setattr(ble_client_module, "_MIN_WRITE_INTERVAL_SEC", 0.0)
return FakeTransport
class TestStripBleScheme:
def test_strips_ble_prefix(self):
assert _strip_ble_scheme("ble://AA:BB:CC:DD:EE:FF") == "AA:BB:CC:DD:EE:FF"
def test_leaves_bare_address_alone(self):
assert _strip_ble_scheme("AA:BB:CC:DD:EE:FF") == "AA:BB:CC:DD:EE:FF"
def test_strips_trailing_slashes(self):
assert _strip_ble_scheme("ble://abc/") == "abc"
class TestAverageColor:
def test_returns_black_on_empty_list(self):
assert _average_color([]) == (0, 0, 0)
def test_averages_list_of_tuples(self):
assert _average_color([(255, 0, 0), (0, 255, 0), (0, 0, 255)]) == (85, 85, 85)
def test_returns_black_on_empty_array(self):
assert _average_color(np.empty((0, 3), dtype=np.uint8)) == (0, 0, 0)
def test_averages_numpy_array(self):
arr = np.array([[100, 100, 100], [200, 200, 200]], dtype=np.uint8)
assert _average_color(arr) == (150, 150, 150)
class TestBLEClientLifecycle:
@pytest.mark.asyncio
async def test_connect_sets_connected_flag(self, fake_transport_cls):
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
assert not client.is_connected
await client.connect()
assert client.is_connected
@pytest.mark.asyncio
async def test_connect_runs_vendor_init_handshake(self, fake_transport_cls):
# SP110E requires a two-write handshake on connect or the GATT
# link silently drops — verify both writes actually go out.
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
await client.connect()
init_writes = client._transport.writes_detailed
assert len(init_writes) == 2
(char_a, payload_a), (char_b, payload_b) = init_writes
assert "ffe2" in char_a and payload_a == b"\x01\x00"
assert "ffe1" in char_b and payload_b == b"\x01\xb7\xe3\xd5"
@pytest.mark.asyncio
async def test_close_does_not_send_power_off(self, fake_transport_cls):
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
await client.connect()
await client.close()
# Strip is left in whatever state it's in — rapid power toggles on
# connect/close cause BLE stack hangs on Windows. (Only check color
# writes since the vendor init handshake is unrelated.)
assert bytes((0, 0, 0, 0xAB)) not in client._transport.color_writes
@pytest.mark.asyncio
async def test_unknown_family_raises(self):
with pytest.raises(ValueError, match="Unknown BLE family"):
BLEClient("ble://AA:BB:CC", ble_family="not-real")
class TestBLEClientSendPixels:
@pytest.mark.asyncio
async def test_send_averages_and_writes_one_frame(self, fake_transport_cls):
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
await client.connect()
ok = await client.send_pixels([(255, 0, 0), (0, 0, 0)], brightness=255)
assert ok
# Color frames go to the default write characteristic; init-handshake
# writes have their own char_uuid and don't count here.
color_writes = client._transport.color_writes
assert len(color_writes) == 1
# Averaged to (127, 0, 0), SP110E 4-byte frame with 0x1E cmd tail.
assert color_writes[0] == bytes((127, 0, 0, 0x1E))
@pytest.mark.asyncio
async def test_duplicate_frames_are_dropped(self, fake_transport_cls):
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
await client.connect()
await client.send_pixels([(10, 20, 30)])
await client.send_pixels([(10, 20, 30)])
await client.send_pixels([(10, 20, 30)])
assert len(client._transport.color_writes) == 1
@pytest.mark.asyncio
async def test_send_returns_false_when_disconnected(self, fake_transport_cls):
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
# No connect() call.
ok = await client.send_pixels([(1, 2, 3)])
assert ok is False
@pytest.mark.asyncio
async def test_triones_frame_format(self, fake_transport_cls):
client = BLEClient("ble://AA:BB:CC", ble_family="triones")
await client.connect()
await client.send_pixels([(100, 150, 200)])
# Triones has no init handshake — first write is the color frame.
frame = client._transport.writes[0]
assert frame == bytes((0x7E, 0x07, 0x05, 0x03, 100, 150, 200, 0x10, 0xEF))
class TestBLEClientHealth:
@pytest.mark.asyncio
async def test_health_preserves_previous_state(self):
from ledgrab.core.devices.led_client import DeviceHealth
prev = DeviceHealth(online=True, device_name="Test", device_led_count=60)
result = await BLEClient.check_health("ble://AA:BB:CC", http_client=None, prev_health=prev)
assert result.online is True
assert result.device_name == "Test"
assert result.device_led_count == 60
@pytest.mark.asyncio
async def test_health_defaults_offline_when_no_prev(self):
result = await BLEClient.check_health("ble://AA:BB:CC", http_client=None)
assert result.online is False
assert result.device_name == "AA:BB:CC"
class TestGoveeAESEncryption:
_KEY = bytes.fromhex("74657374746573747465737474657374") # "testtesttesttest"
def test_encrypt_produces_32_bytes(self):
from ledgrab.core.devices.ble_protocols.govee import encode_color
frame = encode_color(255, 0, 0)
encrypted = _encrypt_govee_frame(frame, self._KEY)
assert len(encrypted) == 32
def test_encrypt_is_deterministic(self):
frame = bytes(range(20))
assert _encrypt_govee_frame(frame, self._KEY) == _encrypt_govee_frame(frame, self._KEY)
def test_encrypt_differs_from_plaintext(self):
frame = bytes(range(20))
assert _encrypt_govee_frame(frame, self._KEY) != frame + b"\x00" * 12
def test_decrypt_roundtrip(self):
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
frame = bytes(range(20))
encrypted = _encrypt_govee_frame(frame, self._KEY)
cipher = Cipher(algorithms.AES(self._KEY), modes.ECB())
dec = cipher.decryptor()
decrypted = dec.update(encrypted) + dec.finalize()
assert decrypted[:20] == frame
@pytest.mark.asyncio
async def test_govee_client_encrypts_frames(self, fake_transport_cls):
key_hex = self._KEY.hex()
client = BLEClient("ble://AA:BB:CC", ble_family="govee", ble_govee_key=key_hex)
await client.connect()
await client.send_pixels([(255, 0, 0)])
assert len(client._transport.writes) == 1
# Encrypted frame is 32 bytes, not the raw 20.
assert len(client._transport.writes[0]) == 32
@pytest.mark.asyncio
async def test_govee_client_without_key_sends_plaintext(self, fake_transport_cls):
client = BLEClient("ble://AA:BB:CC", ble_family="govee")
await client.connect()
await client.send_pixels([(255, 0, 0)])
assert len(client._transport.writes[0]) == 20
@pytest.mark.asyncio
async def test_invalid_key_hex_falls_back_gracefully(self, fake_transport_cls):
client = BLEClient("ble://AA:BB:CC", ble_family="govee", ble_govee_key="not-hex!")
await client.connect()
ok = await client.send_pixels([(100, 100, 100)])
assert ok is True
assert len(client._transport.writes[0]) == 20 # plaintext fallback
@pytest.mark.asyncio
async def test_govee_key_ignored_for_non_govee_family(self, fake_transport_cls):
key_hex = self._KEY.hex()
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e", ble_govee_key=key_hex)
assert client._aes_key is None
await client.connect()
await client.send_pixels([(100, 100, 100)])
# SP110E color frame is 4 bytes (RR GG BB CMD), not encrypted.
color_writes = client._transport.color_writes
assert len(color_writes) == 1
assert len(color_writes[0]) == 4