fix(devices): SP110E vendor handshake + Windows/bleak robustness
Build Android APK / build-android (push) Failing after 1m38s
Lint & Test / test (push) Successful in 4m32s

SP110E peripherals silently tear down the GATT link ~1s after connect
unless a two-write vendor handshake (01 00 → FFE2, 01 B7 E3 D5 → FFE1)
arrives immediately. Without it the first real write hangs 30s then
reconnect-loops forever. Adds optional BLEProtocol.init_writes executed
on connect, plumbs a per-write char_uuid through both transports, and
fixes the SP110E color/power frames from an incorrect 5 bytes to the
documented 4 bytes.

Windows/WinRT robustness:
- asyncio.wait_for hangs on bleak because WinRT IAsyncOperations refuse
  to cancel. _bounded_await() uses asyncio.wait() instead so timeouts
  actually return control even when the inner task is uncancellable.
- BleakClient connect by raw MAC string times out when WinRT guesses
  address type wrong; switched to pre-scanning with BleakScanner and
  passing the resolved BLEDevice, which carries the address type.
- Target-start fetch timeout bumped to 30s with retry disabled so the
  UI doesn't abort during the BLE pre-scan + connect + handshake path.

UI:
- Settings modal exposes Protocol Family (IconSelect grid, shared with
  add-device via parameterized ensureBleFamilyIconSelect) so users can
  fix a wrong family pick without recreating the device. Govee AES key
  row toggles on/off with family selection.

Also turns LAN auth back on in default_config.yaml, logs start_processing
requests on entry for easier diagnosis, and captures the full debug trail
in docs/BLE_LED_CONTROLLERS.md for future BLE work.

Refs the mbullington SP110E protocol gist for the handshake bytes.
This commit is contained in:
2026-04-21 17:45:21 +03:00
parent 2b5dac2c42
commit 45f93fd30e
14 changed files with 448 additions and 55 deletions
+38 -10
View File
@@ -19,6 +19,9 @@ class FakeTransport:
def __init__(self, *_, **__):
self.writes: List[bytes] = []
# Full log preserving char_uuid alongside payload so tests can
# assert on the SP110E vendor handshake's characteristic targets.
self.writes_detailed: List[tuple] = []
self._connected = False
@property
@@ -31,10 +34,17 @@ class FakeTransport:
async def close(self) -> None:
self._connected = False
async def write(self, data: bytes) -> None:
async def write(self, data: bytes, char_uuid: str | None = None) -> None:
if not self._connected:
raise RuntimeError("fake transport not connected")
self.writes.append(data)
self.writes_detailed.append((char_uuid, data))
@property
def color_writes(self) -> list:
"""Writes that went to the protocol's default write characteristic
(i.e. not the init handshake). Most existing tests want this view."""
return [data for (char_uuid, data) in self.writes_detailed if char_uuid is None]
@pytest.fixture
@@ -79,14 +89,27 @@ class TestBLEClientLifecycle:
await client.connect()
assert client.is_connected
@pytest.mark.asyncio
async def test_connect_runs_vendor_init_handshake(self, fake_transport_cls):
# SP110E requires a two-write handshake on connect or the GATT
# link silently drops — verify both writes actually go out.
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
await client.connect()
init_writes = client._transport.writes_detailed
assert len(init_writes) == 2
(char_a, payload_a), (char_b, payload_b) = init_writes
assert "ffe2" in char_a and payload_a == b"\x01\x00"
assert "ffe1" in char_b and payload_b == b"\x01\xb7\xe3\xd5"
@pytest.mark.asyncio
async def test_close_does_not_send_power_off(self, fake_transport_cls):
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
await client.connect()
await client.close()
# Strip is left in whatever state it's in — rapid power toggles on
# connect/close cause BLE stack hangs on Windows.
assert bytes((0, 0, 0, 0, 0xAB)) not in client._transport.writes
# connect/close cause BLE stack hangs on Windows. (Only check color
# writes since the vendor init handshake is unrelated.)
assert bytes((0, 0, 0, 0xAB)) not in client._transport.color_writes
@pytest.mark.asyncio
async def test_unknown_family_raises(self):
@@ -101,10 +124,12 @@ class TestBLEClientSendPixels:
await client.connect()
ok = await client.send_pixels([(255, 0, 0), (0, 0, 0)], brightness=255)
assert ok
assert len(client._transport.writes) == 1
frame = client._transport.writes[0]
# Averaged to (127, 0, 0), SP110E trailer 00 1E
assert frame == bytes((127, 0, 0, 0, 0x1E))
# Color frames go to the default write characteristic; init-handshake
# writes have their own char_uuid and don't count here.
color_writes = client._transport.color_writes
assert len(color_writes) == 1
# Averaged to (127, 0, 0), SP110E 4-byte frame with 0x1E cmd tail.
assert color_writes[0] == bytes((127, 0, 0, 0x1E))
@pytest.mark.asyncio
async def test_duplicate_frames_are_dropped(self, fake_transport_cls):
@@ -113,7 +138,7 @@ class TestBLEClientSendPixels:
await client.send_pixels([(10, 20, 30)])
await client.send_pixels([(10, 20, 30)])
await client.send_pixels([(10, 20, 30)])
assert len(client._transport.writes) == 1
assert len(client._transport.color_writes) == 1
@pytest.mark.asyncio
async def test_send_returns_false_when_disconnected(self, fake_transport_cls):
@@ -127,6 +152,7 @@ class TestBLEClientSendPixels:
client = BLEClient("ble://AA:BB:CC", ble_family="triones")
await client.connect()
await client.send_pixels([(100, 150, 200)])
# Triones has no init handshake — first write is the color frame.
frame = client._transport.writes[0]
assert frame == bytes((0x7E, 0x07, 0x05, 0x03, 100, 150, 200, 0x10, 0xEF))
@@ -209,5 +235,7 @@ class TestGoveeAESEncryption:
assert client._aes_key is None
await client.connect()
await client.send_pixels([(100, 100, 100)])
# SP110E frame is 5 bytes, not encrypted.
assert len(client._transport.writes[0]) == 5
# SP110E color frame is 4 bytes (RR GG BB CMD), not encrypted.
color_writes = client._transport.color_writes
assert len(color_writes) == 1
assert len(color_writes[0]) == 4
+17 -7
View File
@@ -18,24 +18,34 @@ from ledgrab.core.devices.ble_protocols import (
class TestSP110E:
def test_color_frame_is_five_bytes_with_cmd_tail(self):
def test_color_frame_is_four_bytes_with_cmd_tail(self):
frame = sp110e.encode_color(255, 128, 64)
assert frame == bytes((255, 128, 64, 0x00, 0x1E))
assert frame == bytes((255, 128, 64, 0x1E))
def test_brightness_scales_rgb(self):
frame = sp110e.encode_color(200, 200, 200, brightness=128)
# 200 * 128 // 255 == 100
assert frame == bytes((100, 100, 100, 0x00, 0x1E))
assert frame == bytes((100, 100, 100, 0x1E))
def test_brightness_255_is_passthrough(self):
assert sp110e.encode_color(1, 2, 3, 255) == bytes((1, 2, 3, 0x00, 0x1E))
assert sp110e.encode_color(1, 2, 3, 255) == bytes((1, 2, 3, 0x1E))
def test_clamps_out_of_range(self):
assert sp110e.encode_color(-5, 300, 128) == bytes((0, 255, 128, 0x00, 0x1E))
assert sp110e.encode_color(-5, 300, 128) == bytes((0, 255, 128, 0x1E))
def test_power_frames(self):
assert sp110e.encode_power(True) == bytes((0, 0, 0, 0, 0xAA))
assert sp110e.encode_power(False) == bytes((0, 0, 0, 0, 0xAB))
assert sp110e.encode_power(True) == bytes((0, 0, 0, 0xAA))
assert sp110e.encode_power(False) == bytes((0, 0, 0, 0xAB))
def test_init_handshake_is_defined(self):
# SP110E silently drops the GATT link within ~1s of connect unless
# this two-write handshake arrives — see module docstring.
assert len(sp110e.PROTOCOL.init_writes) == 2
(ffe2_uuid, ffe2_payload), (ffe1_uuid, ffe1_payload) = sp110e.PROTOCOL.init_writes
assert ffe2_uuid.endswith("ffe2-0000-1000-8000-00805f9b34fb") or "ffe2" in ffe2_uuid
assert ffe2_payload == b"\x01\x00"
assert ffe1_uuid == sp110e.PROTOCOL.write_char_uuid
assert ffe1_payload == b"\x01\xb7\xe3\xd5"
class TestTriones: