feat(devices): BLE LED controller support (SP110E/Triones/Zengge/Govee)
End-to-end BLE streaming: provider + client + per-protocol wire encoders with whole-strip averaging, desktop (bleak) and Android (Kotlin BleBridge via Chaquopy) transports, discovery with protocol-family detection that auto-fills the UI, throttled not-connected warning + 10 s reconnect cooldown so a dropped link no longer stalls the pipeline at ~30 s/frame, and an explicit asyncio.wait_for wrapper around bleak connect() since the WinRT backend doesn't always honor the timeout kwarg. Also rewrites server/restart.ps1 to be parameterized (-Port / -Module / -PythonVersion / timeouts / -Quiet), pick the right interpreter via the py launcher, pre-flight the target module, poll port readiness on both shutdown and startup, redirect child stdout/stderr so Start-Process doesn't hang on inherited Git-Bash handles, and return proper exit codes. Rolls in concurrent work: Android BLE permissions + launcher icons + ru/zh resources, Chaquopy-safe value_stream psutil fallback, setup-required modal, asset-store test coverage, and misc system/config touch-ups.
This commit is contained in:
@@ -0,0 +1,155 @@
|
||||
"""Tests for AssetStore — focusing on self-heal behavior on startup.
|
||||
|
||||
Regression guard for the "restored backup loses notification sounds" class
|
||||
of bug: when the SQLite DB is restored from elsewhere but the ``assets/``
|
||||
directory hasn't moved with it, prebuilt sound files end up with stale
|
||||
DB rows pointing at missing files. ``heal_prebuilt_files`` is the safety
|
||||
net.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.storage.asset_store import AssetStore
|
||||
from ledgrab.storage.database import Database
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def store_with_prebuilt(tmp_path):
|
||||
"""AssetStore seeded with the shipped prebuilt sounds."""
|
||||
db = Database(tmp_path / "test.db")
|
||||
assets_dir = tmp_path / "assets"
|
||||
prebuilt_dir = tmp_path / "prebuilt"
|
||||
prebuilt_dir.mkdir()
|
||||
# Minimal valid WAV-ish bytes — content doesn't matter for these tests,
|
||||
# only that the file exists and is copied around correctly.
|
||||
for name in ("alert.wav", "bell.wav"):
|
||||
(prebuilt_dir / name).write_bytes(b"RIFF\x00\x00\x00\x00WAVEfmt " + name.encode())
|
||||
|
||||
store = AssetStore(db, assets_dir)
|
||||
imported = store.import_prebuilt_sounds(prebuilt_dir)
|
||||
assert len(imported) == 2
|
||||
yield store, prebuilt_dir, assets_dir
|
||||
db.close()
|
||||
|
||||
|
||||
class TestHealPrebuiltFiles:
|
||||
def test_heals_missing_stored_file(self, store_with_prebuilt):
|
||||
store, prebuilt_dir, assets_dir = store_with_prebuilt
|
||||
alert = next(a for a in store.get_all_assets() if a.filename == "alert.wav")
|
||||
|
||||
# Simulate a partial restore: DB row survived, file vanished.
|
||||
(assets_dir / alert.stored_filename).unlink()
|
||||
assert store.get_file_path(alert.id) is None
|
||||
|
||||
healed = store.heal_prebuilt_files(prebuilt_dir)
|
||||
|
||||
assert [a.id for a in healed] == [alert.id]
|
||||
restored_path = assets_dir / alert.stored_filename
|
||||
assert restored_path.exists()
|
||||
# File content round-trips from the prebuilt source.
|
||||
assert restored_path.read_bytes() == (prebuilt_dir / "alert.wav").read_bytes()
|
||||
# Size metadata is updated to match the re-copied file.
|
||||
assert store.get_asset(alert.id).size_bytes == restored_path.stat().st_size
|
||||
|
||||
def test_idempotent_when_files_present(self, store_with_prebuilt):
|
||||
store, prebuilt_dir, _ = store_with_prebuilt
|
||||
first = store.heal_prebuilt_files(prebuilt_dir)
|
||||
second = store.heal_prebuilt_files(prebuilt_dir)
|
||||
assert first == [] and second == []
|
||||
|
||||
def test_skips_soft_deleted_prebuilt(self, store_with_prebuilt):
|
||||
store, prebuilt_dir, assets_dir = store_with_prebuilt
|
||||
alert = next(a for a in store.get_all_assets() if a.filename == "alert.wav")
|
||||
store.delete_asset(alert.id) # prebuilt → soft-delete; file removed
|
||||
assert store.get_asset(alert.id).deleted is True
|
||||
|
||||
healed = store.heal_prebuilt_files(prebuilt_dir)
|
||||
# Heal must not resurrect a soft-deleted asset — that is
|
||||
# `restore_prebuilt`'s job and is an explicit user action.
|
||||
assert healed == []
|
||||
assert not (assets_dir / alert.stored_filename).exists()
|
||||
|
||||
def test_returns_empty_when_source_also_missing(self, store_with_prebuilt):
|
||||
"""If both the stored file and the prebuilt source are gone, the
|
||||
heal is a no-op — the asset stays broken and the method does not
|
||||
raise (so startup is not aborted)."""
|
||||
store, prebuilt_dir, assets_dir = store_with_prebuilt
|
||||
alert = next(a for a in store.get_all_assets() if a.filename == "alert.wav")
|
||||
(assets_dir / alert.stored_filename).unlink()
|
||||
(prebuilt_dir / "alert.wav").unlink()
|
||||
|
||||
healed = store.heal_prebuilt_files(prebuilt_dir)
|
||||
|
||||
assert healed == []
|
||||
# Unrelated assets still heal correctly.
|
||||
bell = next(a for a in store.get_all_assets() if a.filename == "bell.wav")
|
||||
(assets_dir / bell.stored_filename).unlink()
|
||||
healed = store.heal_prebuilt_files(prebuilt_dir)
|
||||
assert [a.id for a in healed] == [bell.id]
|
||||
|
||||
def test_import_prebuilt_sounds_runs_heal(self, tmp_path):
|
||||
"""End-to-end: restoring a DB and re-running startup heals missing files."""
|
||||
# First run: import both sounds normally.
|
||||
db = Database(tmp_path / "first.db")
|
||||
assets_dir = tmp_path / "assets"
|
||||
prebuilt_dir = tmp_path / "prebuilt"
|
||||
prebuilt_dir.mkdir()
|
||||
(prebuilt_dir / "alert.wav").write_bytes(b"hello-alert")
|
||||
|
||||
store = AssetStore(db, assets_dir)
|
||||
imported = store.import_prebuilt_sounds(prebuilt_dir)
|
||||
assert len(imported) == 1
|
||||
alert_stored = imported[0].stored_filename
|
||||
db.close()
|
||||
|
||||
# Simulate partial restore: file gone, DB row preserved.
|
||||
(assets_dir / alert_stored).unlink()
|
||||
|
||||
# Second run: reopen the same DB + assets dir.
|
||||
db2 = Database(tmp_path / "first.db")
|
||||
store2 = AssetStore(db2, assets_dir)
|
||||
store2.import_prebuilt_sounds(prebuilt_dir)
|
||||
assert (assets_dir / alert_stored).exists()
|
||||
db2.close()
|
||||
|
||||
|
||||
class TestWarnMissingCustomFiles:
|
||||
def test_reports_custom_asset_with_missing_file(self, tmp_path):
|
||||
db = Database(tmp_path / "test.db")
|
||||
assets_dir = tmp_path / "assets"
|
||||
store = AssetStore(db, assets_dir)
|
||||
|
||||
asset = store.create_asset(
|
||||
name="User Sound",
|
||||
filename="user.wav",
|
||||
file_data=b"RIFF\x00\x00\x00\x00WAVEfmt user",
|
||||
)
|
||||
(assets_dir / asset.stored_filename).unlink()
|
||||
|
||||
missing = store.warn_missing_custom_files()
|
||||
|
||||
assert [a.id for a in missing] == [asset.id]
|
||||
db.close()
|
||||
|
||||
def test_quiet_when_all_custom_files_present(self, tmp_path):
|
||||
db = Database(tmp_path / "test.db")
|
||||
store = AssetStore(db, tmp_path / "assets")
|
||||
store.create_asset(name="User", filename="user.wav", file_data=b"data123")
|
||||
assert store.warn_missing_custom_files() == []
|
||||
db.close()
|
||||
|
||||
def test_ignores_prebuilt_and_soft_deleted(self, tmp_path):
|
||||
db = Database(tmp_path / "test.db")
|
||||
assets_dir = tmp_path / "assets"
|
||||
prebuilt_dir = tmp_path / "prebuilt"
|
||||
prebuilt_dir.mkdir()
|
||||
(prebuilt_dir / "alert.wav").write_bytes(b"prebuilt-alert")
|
||||
|
||||
store = AssetStore(db, assets_dir)
|
||||
store.import_prebuilt_sounds(prebuilt_dir)
|
||||
prebuilt_asset = store.get_all_assets()[0]
|
||||
(assets_dir / prebuilt_asset.stored_filename).unlink()
|
||||
|
||||
# Missing prebuilt files are NOT reported here (that's heal's job).
|
||||
assert store.warn_missing_custom_files() == []
|
||||
db.close()
|
||||
@@ -0,0 +1,213 @@
|
||||
"""Tests for BLEClient using a fake transport — no bleak, no hardware."""
|
||||
|
||||
from typing import List
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from ledgrab.core.devices import ble_client as ble_client_module
|
||||
from ledgrab.core.devices.ble_client import (
|
||||
BLEClient,
|
||||
_average_color,
|
||||
_encrypt_govee_frame,
|
||||
_strip_ble_scheme,
|
||||
)
|
||||
|
||||
|
||||
class FakeTransport:
|
||||
"""In-memory stand-in for ``BLETransport`` — records every write."""
|
||||
|
||||
def __init__(self, *_, **__):
|
||||
self.writes: List[bytes] = []
|
||||
self._connected = False
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self._connected
|
||||
|
||||
async def connect(self) -> None:
|
||||
self._connected = True
|
||||
|
||||
async def close(self) -> None:
|
||||
self._connected = False
|
||||
|
||||
async def write(self, data: bytes) -> None:
|
||||
if not self._connected:
|
||||
raise RuntimeError("fake transport not connected")
|
||||
self.writes.append(data)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_transport_cls(monkeypatch):
|
||||
monkeypatch.setattr(ble_client_module, "make_transport", lambda *_, **__: FakeTransport())
|
||||
# Disable the inter-write delay so tests don't sleep.
|
||||
monkeypatch.setattr(ble_client_module, "_MIN_WRITE_INTERVAL_SEC", 0.0)
|
||||
return FakeTransport
|
||||
|
||||
|
||||
class TestStripBleScheme:
|
||||
def test_strips_ble_prefix(self):
|
||||
assert _strip_ble_scheme("ble://AA:BB:CC:DD:EE:FF") == "AA:BB:CC:DD:EE:FF"
|
||||
|
||||
def test_leaves_bare_address_alone(self):
|
||||
assert _strip_ble_scheme("AA:BB:CC:DD:EE:FF") == "AA:BB:CC:DD:EE:FF"
|
||||
|
||||
def test_strips_trailing_slashes(self):
|
||||
assert _strip_ble_scheme("ble://abc/") == "abc"
|
||||
|
||||
|
||||
class TestAverageColor:
|
||||
def test_returns_black_on_empty_list(self):
|
||||
assert _average_color([]) == (0, 0, 0)
|
||||
|
||||
def test_averages_list_of_tuples(self):
|
||||
assert _average_color([(255, 0, 0), (0, 255, 0), (0, 0, 255)]) == (85, 85, 85)
|
||||
|
||||
def test_returns_black_on_empty_array(self):
|
||||
assert _average_color(np.empty((0, 3), dtype=np.uint8)) == (0, 0, 0)
|
||||
|
||||
def test_averages_numpy_array(self):
|
||||
arr = np.array([[100, 100, 100], [200, 200, 200]], dtype=np.uint8)
|
||||
assert _average_color(arr) == (150, 150, 150)
|
||||
|
||||
|
||||
class TestBLEClientLifecycle:
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect_sets_connected_flag(self, fake_transport_cls):
|
||||
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
|
||||
assert not client.is_connected
|
||||
await client.connect()
|
||||
assert client.is_connected
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_close_does_not_send_power_off(self, fake_transport_cls):
|
||||
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
|
||||
await client.connect()
|
||||
await client.close()
|
||||
# Strip is left in whatever state it's in — rapid power toggles on
|
||||
# connect/close cause BLE stack hangs on Windows.
|
||||
assert bytes((0, 0, 0, 0, 0xAB)) not in client._transport.writes
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unknown_family_raises(self):
|
||||
with pytest.raises(ValueError, match="Unknown BLE family"):
|
||||
BLEClient("ble://AA:BB:CC", ble_family="not-real")
|
||||
|
||||
|
||||
class TestBLEClientSendPixels:
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_averages_and_writes_one_frame(self, fake_transport_cls):
|
||||
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
|
||||
await client.connect()
|
||||
ok = await client.send_pixels([(255, 0, 0), (0, 0, 0)], brightness=255)
|
||||
assert ok
|
||||
assert len(client._transport.writes) == 1
|
||||
frame = client._transport.writes[0]
|
||||
# Averaged to (127, 0, 0), SP110E trailer 00 1E
|
||||
assert frame == bytes((127, 0, 0, 0, 0x1E))
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_duplicate_frames_are_dropped(self, fake_transport_cls):
|
||||
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
|
||||
await client.connect()
|
||||
await client.send_pixels([(10, 20, 30)])
|
||||
await client.send_pixels([(10, 20, 30)])
|
||||
await client.send_pixels([(10, 20, 30)])
|
||||
assert len(client._transport.writes) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_returns_false_when_disconnected(self, fake_transport_cls):
|
||||
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e")
|
||||
# No connect() call.
|
||||
ok = await client.send_pixels([(1, 2, 3)])
|
||||
assert ok is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_triones_frame_format(self, fake_transport_cls):
|
||||
client = BLEClient("ble://AA:BB:CC", ble_family="triones")
|
||||
await client.connect()
|
||||
await client.send_pixels([(100, 150, 200)])
|
||||
frame = client._transport.writes[0]
|
||||
assert frame == bytes((0x7E, 0x07, 0x05, 0x03, 100, 150, 200, 0x10, 0xEF))
|
||||
|
||||
|
||||
class TestBLEClientHealth:
|
||||
@pytest.mark.asyncio
|
||||
async def test_health_preserves_previous_state(self):
|
||||
from ledgrab.core.devices.led_client import DeviceHealth
|
||||
|
||||
prev = DeviceHealth(online=True, device_name="Test", device_led_count=60)
|
||||
result = await BLEClient.check_health("ble://AA:BB:CC", http_client=None, prev_health=prev)
|
||||
assert result.online is True
|
||||
assert result.device_name == "Test"
|
||||
assert result.device_led_count == 60
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_health_defaults_offline_when_no_prev(self):
|
||||
result = await BLEClient.check_health("ble://AA:BB:CC", http_client=None)
|
||||
assert result.online is False
|
||||
assert result.device_name == "AA:BB:CC"
|
||||
|
||||
|
||||
class TestGoveeAESEncryption:
|
||||
_KEY = bytes.fromhex("74657374746573747465737474657374") # "testtesttesttest"
|
||||
|
||||
def test_encrypt_produces_32_bytes(self):
|
||||
from ledgrab.core.devices.ble_protocols.govee import encode_color
|
||||
|
||||
frame = encode_color(255, 0, 0)
|
||||
encrypted = _encrypt_govee_frame(frame, self._KEY)
|
||||
assert len(encrypted) == 32
|
||||
|
||||
def test_encrypt_is_deterministic(self):
|
||||
frame = bytes(range(20))
|
||||
assert _encrypt_govee_frame(frame, self._KEY) == _encrypt_govee_frame(frame, self._KEY)
|
||||
|
||||
def test_encrypt_differs_from_plaintext(self):
|
||||
frame = bytes(range(20))
|
||||
assert _encrypt_govee_frame(frame, self._KEY) != frame + b"\x00" * 12
|
||||
|
||||
def test_decrypt_roundtrip(self):
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
|
||||
frame = bytes(range(20))
|
||||
encrypted = _encrypt_govee_frame(frame, self._KEY)
|
||||
cipher = Cipher(algorithms.AES(self._KEY), modes.ECB())
|
||||
dec = cipher.decryptor()
|
||||
decrypted = dec.update(encrypted) + dec.finalize()
|
||||
assert decrypted[:20] == frame
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_govee_client_encrypts_frames(self, fake_transport_cls):
|
||||
key_hex = self._KEY.hex()
|
||||
client = BLEClient("ble://AA:BB:CC", ble_family="govee", ble_govee_key=key_hex)
|
||||
await client.connect()
|
||||
await client.send_pixels([(255, 0, 0)])
|
||||
assert len(client._transport.writes) == 1
|
||||
# Encrypted frame is 32 bytes, not the raw 20.
|
||||
assert len(client._transport.writes[0]) == 32
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_govee_client_without_key_sends_plaintext(self, fake_transport_cls):
|
||||
client = BLEClient("ble://AA:BB:CC", ble_family="govee")
|
||||
await client.connect()
|
||||
await client.send_pixels([(255, 0, 0)])
|
||||
assert len(client._transport.writes[0]) == 20
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_invalid_key_hex_falls_back_gracefully(self, fake_transport_cls):
|
||||
client = BLEClient("ble://AA:BB:CC", ble_family="govee", ble_govee_key="not-hex!")
|
||||
await client.connect()
|
||||
ok = await client.send_pixels([(100, 100, 100)])
|
||||
assert ok is True
|
||||
assert len(client._transport.writes[0]) == 20 # plaintext fallback
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_govee_key_ignored_for_non_govee_family(self, fake_transport_cls):
|
||||
key_hex = self._KEY.hex()
|
||||
client = BLEClient("ble://AA:BB:CC", ble_family="sp110e", ble_govee_key=key_hex)
|
||||
assert client._aes_key is None
|
||||
await client.connect()
|
||||
await client.send_pixels([(100, 100, 100)])
|
||||
# SP110E frame is 5 bytes, not encrypted.
|
||||
assert len(client._transport.writes[0]) == 5
|
||||
@@ -0,0 +1,130 @@
|
||||
"""Unit tests for BLE LED controller wire protocols.
|
||||
|
||||
These tests exercise the pure byte-encoding functions for each family,
|
||||
so they run without ``bleak`` installed and without any real BLE hardware.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.core.devices.ble_protocols import (
|
||||
all_protocols,
|
||||
get_protocol,
|
||||
identify_family,
|
||||
govee,
|
||||
sp110e,
|
||||
triones,
|
||||
zengge,
|
||||
)
|
||||
|
||||
|
||||
class TestSP110E:
|
||||
def test_color_frame_is_five_bytes_with_cmd_tail(self):
|
||||
frame = sp110e.encode_color(255, 128, 64)
|
||||
assert frame == bytes((255, 128, 64, 0x00, 0x1E))
|
||||
|
||||
def test_brightness_scales_rgb(self):
|
||||
frame = sp110e.encode_color(200, 200, 200, brightness=128)
|
||||
# 200 * 128 // 255 == 100
|
||||
assert frame == bytes((100, 100, 100, 0x00, 0x1E))
|
||||
|
||||
def test_brightness_255_is_passthrough(self):
|
||||
assert sp110e.encode_color(1, 2, 3, 255) == bytes((1, 2, 3, 0x00, 0x1E))
|
||||
|
||||
def test_clamps_out_of_range(self):
|
||||
assert sp110e.encode_color(-5, 300, 128) == bytes((0, 255, 128, 0x00, 0x1E))
|
||||
|
||||
def test_power_frames(self):
|
||||
assert sp110e.encode_power(True) == bytes((0, 0, 0, 0, 0xAA))
|
||||
assert sp110e.encode_power(False) == bytes((0, 0, 0, 0, 0xAB))
|
||||
|
||||
|
||||
class TestTriones:
|
||||
def test_color_frame_matches_documented_template(self):
|
||||
# 7E 07 05 03 RR GG BB 10 EF
|
||||
frame = triones.encode_color(0xAA, 0xBB, 0xCC)
|
||||
assert frame == bytes((0x7E, 0x07, 0x05, 0x03, 0xAA, 0xBB, 0xCC, 0x10, 0xEF))
|
||||
|
||||
def test_frame_is_nine_bytes(self):
|
||||
assert len(triones.encode_color(10, 20, 30)) == 9
|
||||
|
||||
def test_brightness_scales(self):
|
||||
frame = triones.encode_color(255, 255, 255, brightness=51) # 20%
|
||||
# 255 * 51 // 255 == 51
|
||||
assert frame[4:7] == bytes((51, 51, 51))
|
||||
|
||||
def test_power_on_off_distinct(self):
|
||||
on = triones.encode_power(True)
|
||||
off = triones.encode_power(False)
|
||||
assert on != off
|
||||
assert on[0] == 0x7E and off[0] == 0x7E
|
||||
assert on[-1] == 0xEF and off[-1] == 0xEF
|
||||
|
||||
|
||||
class TestZengge:
|
||||
def test_color_frame_format(self):
|
||||
# 56 RR GG BB 00 F0 AA
|
||||
frame = zengge.encode_color(0x11, 0x22, 0x33)
|
||||
assert frame == bytes((0x56, 0x11, 0x22, 0x33, 0x00, 0xF0, 0xAA))
|
||||
|
||||
def test_frame_is_seven_bytes(self):
|
||||
assert len(zengge.encode_color(1, 2, 3)) == 7
|
||||
|
||||
def test_power_frames(self):
|
||||
assert zengge.encode_power(True) == bytes((0xCC, 0x23, 0x33))
|
||||
assert zengge.encode_power(False) == bytes((0xCC, 0x24, 0x33))
|
||||
|
||||
|
||||
class TestGovee:
|
||||
def test_color_frame_is_twenty_bytes_with_checksum(self):
|
||||
frame = govee.encode_color(255, 0, 0)
|
||||
assert len(frame) == 20
|
||||
# Verify XOR checksum
|
||||
expected_checksum = 0
|
||||
for i in range(19):
|
||||
expected_checksum ^= frame[i]
|
||||
assert frame[19] == expected_checksum
|
||||
|
||||
def test_color_frame_header(self):
|
||||
# 33 05 02 RR GG BB ...
|
||||
frame = govee.encode_color(0x10, 0x20, 0x30)
|
||||
assert frame[0] == 0x33
|
||||
assert frame[1] == 0x05
|
||||
assert frame[2] == 0x02
|
||||
assert frame[3:6] == bytes((0x10, 0x20, 0x30))
|
||||
|
||||
def test_brightness_scales(self):
|
||||
frame = govee.encode_color(100, 100, 100, brightness=0)
|
||||
assert frame[3:6] == bytes((0, 0, 0))
|
||||
|
||||
def test_power_frames_have_valid_checksum(self):
|
||||
for state in (True, False):
|
||||
frame = govee.encode_power(state)
|
||||
assert len(frame) == 20
|
||||
checksum = 0
|
||||
for i in range(19):
|
||||
checksum ^= frame[i]
|
||||
assert frame[19] == checksum
|
||||
|
||||
|
||||
class TestRegistry:
|
||||
def test_all_four_families_registered(self):
|
||||
families = set(all_protocols().keys())
|
||||
assert families == {"sp110e", "triones", "zengge", "govee"}
|
||||
|
||||
def test_get_protocol_raises_on_unknown(self):
|
||||
with pytest.raises(ValueError, match="Unknown BLE family"):
|
||||
get_protocol("not-a-real-family")
|
||||
|
||||
def test_get_protocol_returns_correct_family(self):
|
||||
assert get_protocol("sp110e").family == "sp110e"
|
||||
assert get_protocol("triones").family == "triones"
|
||||
|
||||
def test_identify_family_by_name(self):
|
||||
assert identify_family("SP110E_A1B2C3") == "sp110e"
|
||||
assert identify_family("Triones-ABCDEF") == "triones"
|
||||
assert identify_family("Zengge-123456") == "zengge"
|
||||
assert identify_family("ihoment_H6008_xxxx") == "govee"
|
||||
|
||||
def test_identify_family_returns_none_for_unknown(self):
|
||||
assert identify_family("SomeRandomDevice") is None
|
||||
assert identify_family("") is None
|
||||
@@ -1,5 +1,7 @@
|
||||
"""Tests for configuration management."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
@@ -18,10 +20,23 @@ class TestDefaultConfig:
|
||||
assert config.server.port == 8080
|
||||
assert config.server.log_level == "INFO"
|
||||
|
||||
def test_default_storage_paths(self):
|
||||
def test_default_storage_paths(self, monkeypatch):
|
||||
monkeypatch.delenv("LEDGRAB_DATA_DIR", raising=False)
|
||||
config = Config()
|
||||
assert config.storage.database_file == "data/ledgrab.db"
|
||||
|
||||
def test_data_dir_env_override(self, monkeypatch, tmp_path):
|
||||
monkeypatch.setenv("LEDGRAB_DATA_DIR", str(tmp_path / "custom"))
|
||||
# default_data_dir reads the env var, but the module-level default
|
||||
# was evaluated at import time — so re-import paths() value via the
|
||||
# helper to confirm the contract.
|
||||
from importlib import reload
|
||||
|
||||
from ledgrab import paths as paths_mod
|
||||
|
||||
reload(paths_mod)
|
||||
assert paths_mod.default_data_dir() == Path(str(tmp_path / "custom"))
|
||||
|
||||
def test_default_mqtt_disabled(self):
|
||||
config = Config()
|
||||
assert config.mqtt.enabled is False
|
||||
@@ -71,9 +86,15 @@ class TestServerConfig:
|
||||
class TestDemoMode:
|
||||
def test_demo_rewrites_storage_paths(self):
|
||||
config = Config(demo=True)
|
||||
assert config.storage.database_file.startswith("data/demo/")
|
||||
db_path = Path(config.storage.database_file)
|
||||
assert db_path.parent.name == "demo"
|
||||
assert db_path.name == "ledgrab.db"
|
||||
assets_path = Path(config.assets.assets_dir)
|
||||
assert assets_path.parent.name == "demo"
|
||||
assert assets_path.name == "assets"
|
||||
|
||||
def test_non_demo_keeps_original_paths(self):
|
||||
def test_non_demo_keeps_original_paths(self, monkeypatch):
|
||||
monkeypatch.delenv("LEDGRAB_DATA_DIR", raising=False)
|
||||
config = Config(demo=False)
|
||||
assert config.storage.database_file == "data/ledgrab.db"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user