fix: resolve comprehensive review findings (security, concurrency, perf, Android, UI)

Multi-dimension review of v0.8.2. Excludes the deliberately deferred
default_config.yaml weak-default-key item (C1).

Backend:
- calibration: create_default_calibration no longer exceeds led_count for small
  odd counts (bounded trim + regression test)
- game-integration: generic webhook now requires auth_token; constant-time
  compare_digest in all adapters; per-IP failed-auth rate limit on the ingest
  route; auth_token encrypted at rest via secret_box (migration-safe)
- playlist engine: serialize _state/_task under the lifecycle lock to close a
  delete-mid-play race (+ concurrency tests)
- main: stop the calibration session on shutdown (restore prior target)
- home_assistant: validate HA host via the LAN classifier on create/update
- perf: drop slow preview-WS clients instead of blocking the send loop; cache
  composite full-strip resize linspaces; effect_stream lava reuses scratch

Frontend:
- setup/auto-calibration wizard: guard _state after awaits (cancel-safe), await
  session teardown before output start, busy-gate skip-calibration, manual
  display input keeps focus, move focus on step change
- calibration: destroy EntitySelect on modal close
- color-strips test: dirty-flag-gated render + cached ctx/ImageData
- a11y/TV: focus-visible for new wizard/auto-cal/corner controls, aria-labels on
  the spatial corner/edge picker; theme-aware syntax tokens; dead/undefined CSS
  tokens removed; .modal-error styled; i18n titles (en/ru/zh)

Android:
- ApiKeyManager: EncryptedSharedPreferences with verified, data-safe legacy
  migration that never rotates an existing key
- CaptureService: validate MediaProjection token before promoting; satisfy the
  startForeground 5s contract on the bail path
- NotificationListener: connection-scoped executor with lazy fallback
- BLE: request BLUETOOTH_SCAN/CONNECT at runtime + guard handler-thread
  SecurityExceptions
- Root: cancellation-aware su grant probe

Adds 14 tests. Gate: ruff + tsc 5.9.3 + esbuild + pytest (2185 passed) +
compileDebugKotlin all green.
This commit is contained in:
2026-06-09 16:35:08 +03:00
parent 7a12f39f49
commit 17dd2e02ba
42 changed files with 1358 additions and 152 deletions
@@ -341,6 +341,64 @@ class TestEventIngestion:
assert len(recent) == 1
# ---------------------------------------------------------------------------
# Failed-auth rate limiting (brute-force defence on the ingest route)
# ---------------------------------------------------------------------------
@pytest.fixture
def _reset_auth_fail_limiter():
"""Clear the module-level failed-auth hit map before and after each test.
The limiter keeps per-IP state in a process-global dict, so without this
reset, attempts from earlier tests would bleed into later ones.
"""
from ledgrab.api.routes import game_integration as gi
gi._auth_fail_hits.clear()
yield
gi._auth_fail_hits.clear()
class TestIngestRateLimiting:
def test_failed_auth_attempts_are_rate_limited(self, client, _reset_auth_fail_limiter):
from ledgrab.api.routes import game_integration as gi
created = _create_integration(
client,
adapter_config={"auth_token": "correct_token"},
)
integration_id = created["id"]
url = f"/api/v1/game-integrations/{integration_id}/event"
bad = {"x-auth-token": "wrong_token"}
# Burn through the failed-auth budget — each returns 403.
for _ in range(gi._AUTH_FAIL_LIMIT):
resp = client.post(url, json={"data": {"health": 1}}, headers=bad)
assert resp.status_code == 403
# The next attempt from the same IP is throttled with 429.
resp = client.post(url, json={"data": {"health": 1}}, headers=bad)
assert resp.status_code == 429
def test_successful_ingest_not_rate_limited(self, client, event_bus, _reset_auth_fail_limiter):
from ledgrab.api.routes import game_integration as gi
created = _create_integration(
client,
adapter_config={"auth_token": "correct_token"},
)
integration_id = created["id"]
url = f"/api/v1/game-integrations/{integration_id}/event"
good = {"x-auth-token": "correct_token"}
# High-rate legitimate ingestion well past the failed-auth threshold
# must NOT be throttled — only failures count toward the limit.
for _ in range(gi._AUTH_FAIL_LIMIT + 10):
resp = client.post(url, json={"data": {"health": 50}}, headers=good)
assert resp.status_code == 204
# ---------------------------------------------------------------------------
# Status / diagnostics tests
# ---------------------------------------------------------------------------
@@ -102,9 +102,20 @@ class TestGenericWebhookParsing:
class TestGenericWebhookAuth:
def test_no_auth_configured(self) -> None:
def test_no_auth_configured_rejects(self) -> None:
# Secure-by-default: a network-facing webhook adapter with no token
# configured must REJECT, not accept (otherwise it is open to the LAN).
result = GenericWebhookAdapter.validate_auth({}, {}, {})
assert result is True
assert result is False
def test_empty_token_rejects(self) -> None:
# An explicitly empty token is still "no token" → reject.
result = GenericWebhookAdapter.validate_auth(
{"Authorization": "Bearer "},
{},
{"auth_token": ""},
)
assert result is False
def test_bearer_auth_valid(self) -> None:
result = GenericWebhookAdapter.validate_auth(
@@ -156,6 +167,8 @@ class TestGenericWebhookMetadata:
assert "auth_token" in schema["properties"]
assert "mappings" in schema["properties"]
assert "auth_header" in schema["properties"]
# auth_token is mandatory (secure-by-default).
assert "auth_token" in schema.get("required", [])
def test_setup_instructions(self) -> None:
instructions = GenericWebhookAdapter.get_setup_instructions()
+172
View File
@@ -315,3 +315,175 @@ class TestEvents:
async def test_stop_when_idle_fires_nothing(self, engine):
await engine.stop()
assert _fired_actions(engine) == []
# ---------------------------------------------------------------------------
# Concurrency — delete-mid-play race (H5). The engine reads/mutates _task and
# _state across await boundaries; a deletion firing stop_if_running() while the
# _run loop is between its get_playlist re-read and the natural-end clear must
# still produce exactly ONE terminal 'stopped' transition (no duplicate /
# contradictory WS events), and get_state() must never raise nor return a
# half-cleared snapshot during the window.
# ---------------------------------------------------------------------------
class _DeletableStore:
"""Wraps a real playlist store; ``delete_playlist`` records a tombstone so
a later ``get_playlist`` re-read raises (like the real store after a delete)
and the engine's ``_run`` loop breaks out at the cycle boundary.
"""
def __init__(self, real):
self._real = real
self._deleted: set[str] = set()
def get_playlist(self, playlist_id):
if playlist_id in self._deleted:
raise KeyError(playlist_id)
return self._real.get_playlist(playlist_id)
def delete_playlist(self, playlist_id):
self._deleted.add(playlist_id)
class TestConcurrencyDeleteRace:
async def test_delete_and_stop_if_running_emit_single_stopped(
self, engine, playlist_store, applied
):
# A looping playlist so _run re-reads get_playlist every cycle, giving
# us a deterministic parking point to interleave the delete + stop.
pl = _make_playlist(playlist_store, "Racy", [("scene_a", 50), ("scene_b", 50)], loop=True)
gated = _DeletableStore(playlist_store)
engine._playlist_store = gated
# The dwell sleep is the engine's only await between the get_playlist
# re-read and the natural-end clear. We gate the FIRST dwell: when _run
# parks there we set `entered`, then await `release`. While _run is
# suspended the test deletes the playlist and kicks stop_if_running, so
# they race _run's resumed natural-end clear under the lifecycle lock.
entered = asyncio.Event()
release = asyncio.Event()
first_dwell = {"done": False}
async def _interleaving_sleep(_duration):
if not first_dwell["done"]:
first_dwell["done"] = True
entered.set()
await release.wait()
await _REAL_SLEEP(0.005)
with patch("asyncio.sleep", _interleaving_sleep):
await engine.start_playlist(pl.id)
# Wait until _run is parked inside the first dwell (mid-cycle).
await asyncio.wait_for(entered.wait(), timeout=2.0)
# get_state during the window must not raise and must be coherent:
# either fully running or fully idle — never half-cleared.
snap = engine.get_state()
assert isinstance(snap, dict)
assert "is_running" in snap
# Concurrently: delete from the store AND call stop_if_running for
# the same id. Kick stop_if_running first as a task so it contends
# for the lifecycle lock with the (soon to resume) _run natural-end.
gated.delete_playlist(pl.id)
stop_task = asyncio.create_task(engine.stop_if_running(pl.id))
# Release _run: it finishes the dwell, re-reads (KeyError -> break),
# and races stop_task into the natural-end clear under the lock.
release.set()
await asyncio.wait_for(stop_task, timeout=2.0)
# Drain whatever remains of the run task.
run_task = engine._task
if run_task is not None:
try:
await asyncio.wait_for(asyncio.shield(run_task), timeout=2.0)
except (asyncio.CancelledError, KeyError):
pass
# Exactly one terminal 'stopped' transition (no duplicate/contradictory).
stopped = [e for e in _fired_events(engine) if e.get("action") == "stopped"]
assert len(stopped) == 1, f"expected exactly one stopped, got {len(stopped)}: {stopped}"
assert stopped[0]["playlist_id"] == pl.id
# Engine fully idle and get_state coherent afterwards.
assert engine.is_running() is False
final = engine.get_state()
assert final["is_running"] is False
assert final["playlist_id"] is None
async def test_natural_end_after_delete_emits_single_stopped(
self, engine, playlist_store, applied
):
# Drive the OTHER ordering: the delete causes _run to break and run its
# (now lock-wrapped) natural-end clear+'stopped', while a stop_if_running
# for the same id is fired AFTER the dwell is released but races into the
# same lock. Whoever wins, exactly one 'stopped' must surface and the
# late stop_if_running must be a clean no-op (state already cleared).
pl = _make_playlist(playlist_store, "Ending", [("scene_a", 50)], loop=False)
gated = _DeletableStore(playlist_store)
engine._playlist_store = gated
entered = asyncio.Event()
release = asyncio.Event()
first_dwell = {"done": False}
async def _interleaving_sleep(_duration):
if not first_dwell["done"]:
first_dwell["done"] = True
entered.set()
await release.wait()
await _REAL_SLEEP(0.005)
with patch("asyncio.sleep", _interleaving_sleep):
await engine.start_playlist(pl.id)
await asyncio.wait_for(entered.wait(), timeout=2.0)
# Delete and release so _run resumes -> non-loop break -> natural end.
gated.delete_playlist(pl.id)
release.set()
# Let the run task drive its natural-end clear to completion.
run_task = engine._task
if run_task is not None:
await asyncio.wait_for(asyncio.shield(run_task), timeout=2.0)
# A late stop_if_running for the same id is now a clean no-op.
await engine.stop_if_running(pl.id)
stopped = [e for e in _fired_events(engine) if e.get("action") == "stopped"]
assert len(stopped) == 1, f"expected exactly one stopped, got {len(stopped)}: {stopped}"
assert stopped[0]["playlist_id"] == pl.id
assert engine.is_running() is False
assert engine.get_state()["playlist_id"] is None
async def test_get_state_never_half_cleared_under_concurrent_stop(self, engine, playlist_store):
# Hammer get_state() while start/stop churn the lifecycle, asserting it
# never raises and never reports running with a None playlist_id.
pl = _make_playlist(playlist_store, "Churn", [("scene_a", 50)], loop=True)
async def _churn():
for _ in range(20):
await engine.start_playlist(pl.id)
await engine.stop()
async def _poll():
for _ in range(500):
s = engine.get_state()
# Coherence invariant: running implies a concrete playlist_id.
if s["is_running"]:
assert s["playlist_id"] is not None
await _REAL_SLEEP(0)
async def _fast(_duration):
await _REAL_SLEEP(0)
with patch("asyncio.sleep", _fast):
await asyncio.gather(_churn(), _poll())
await engine.stop()
assert engine.is_running() is False
@@ -5,6 +5,7 @@ import pytest
from ledgrab.storage.base_store import EntityNotFoundError
from ledgrab.storage.game_integration import EventMapping, GameIntegrationConfig
from ledgrab.storage.game_integration_store import GameIntegrationStore
from ledgrab.utils import secret_box
@pytest.fixture
@@ -272,3 +273,83 @@ class TestGetReferences:
config = store.create_integration(name="Test", adapter_type="webhook")
refs = store.get_references(config.id)
assert refs == []
# ---------------------------------------------------------------------------
# Secret encryption (adapter_config.auth_token) tests
# ---------------------------------------------------------------------------
class TestAdapterConfigSecretEncryption:
def test_auth_token_round_trips(self):
config = GameIntegrationConfig.create_from_kwargs(
name="Webhook",
adapter_type="generic_webhook",
adapter_config={"auth_token": "super-secret", "auth_header": "X-Token"},
)
restored = GameIntegrationConfig.from_dict(config.to_dict())
# Decrypted at runtime — plaintext is recovered.
assert restored.adapter_config["auth_token"] == "super-secret"
# Non-secret fields are untouched.
assert restored.adapter_config["auth_header"] == "X-Token"
def test_stored_token_is_not_plaintext(self):
config = GameIntegrationConfig.create_from_kwargs(
name="Webhook",
adapter_type="generic_webhook",
adapter_config={"auth_token": "plaintext-secret"},
)
stored = config.to_dict()
raw_token = stored["adapter_config"]["auth_token"]
assert raw_token != "plaintext-secret"
assert secret_box.is_encrypted(raw_token)
def test_legacy_plaintext_row_still_decrypts(self):
# Simulate a row written before encryption was added: auth_token is
# bare plaintext (not an ENC: envelope). It must still load.
legacy = {
"id": "gi_legacy01",
"name": "Legacy Webhook",
"adapter_type": "generic_webhook",
"enabled": True,
"adapter_config": {"auth_token": "legacy-plaintext"},
"event_mappings": [],
"created_at": "2024-01-01T00:00:00+00:00",
"updated_at": "2024-01-01T00:00:00+00:00",
}
restored = GameIntegrationConfig.from_dict(legacy)
assert restored.adapter_config["auth_token"] == "legacy-plaintext"
def test_no_auth_token_key_is_safe(self):
config = GameIntegrationConfig.create_from_kwargs(
name="NoSecret",
adapter_type="cs2",
adapter_config={"max_gold": 99999},
)
restored = GameIntegrationConfig.from_dict(config.to_dict())
assert restored.adapter_config == {"max_gold": 99999}
def test_db_stores_token_encrypted(self, tmp_db):
store = GameIntegrationStore(tmp_db)
store.create_integration(
name="Webhook",
adapter_type="generic_webhook",
adapter_config={"auth_token": "db-secret"},
)
rows = tmp_db.load_all("game_integrations")
assert len(rows) == 1
raw_token = rows[0]["adapter_config"]["auth_token"]
assert raw_token != "db-secret"
assert secret_box.is_encrypted(raw_token)
def test_db_round_trip_after_reload(self, tmp_db):
store1 = GameIntegrationStore(tmp_db)
created = store1.create_integration(
name="Webhook",
adapter_type="generic_webhook",
adapter_config={"auth_token": "reload-secret"},
)
# New store instance reads from disk and must decrypt.
store2 = GameIntegrationStore(tmp_db)
fetched = store2.get_integration(created.id)
assert fetched.adapter_config["auth_token"] == "reload-secret"
+16
View File
@@ -291,6 +291,22 @@ def test_create_default_calibration_small_count():
assert config.get_total_leds() == 4
@pytest.mark.parametrize("led_count", [4, 5, 6, 7, 9, 11, 13, 60, 144, 300])
def test_create_default_calibration_total_matches_count(led_count):
"""Total LED count must equal led_count exactly, with every edge >= 1.
Regression for small odd counts where the per-edge max(1, ...) floor used
to push the total above led_count (e.g. led_count=5 produced a total of 6).
"""
config = create_default_calibration(led_count)
assert config.get_total_leds() == led_count
assert config.leds_top >= 1
assert config.leds_right >= 1
assert config.leds_bottom >= 1
assert config.leds_left >= 1
def test_create_default_calibration_invalid():
"""Test default calibration with invalid LED count."""
with pytest.raises(ValueError):
@@ -0,0 +1,52 @@
"""Tests for Home Assistant source host classification.
The HA source host is user-supplied and stored as ``host:port`` (e.g.
``192.168.1.100:8123``). Before the WebSocket runtime connects to it, the
route layer gates the host with the shared LAN classifier so an
(authenticated or default-anonymous) caller cannot weaponise it into a
public network-scan oracle — mirroring the LED device providers.
"""
import pytest
from ledgrab.api.routes.home_assistant import _validate_ha_host
@pytest.mark.parametrize(
"host",
[
"127.0.0.1:8123",
"10.0.0.5:8123",
"192.168.1.100:8123",
"172.16.0.10", # no port
"homeassistant.local:8123", # mDNS label
"hass", # bare hostname
"[fe80::1]:8123", # bracketed IPv6 link-local + port
"[fc00::1]:8123", # bracketed IPv6 ULA + port
"169.254.169.254:80", # link-local — allowed by the shared LAN policy
"", # empty passes (upstream schema requires non-empty)
None, # update path may pass None (host unchanged)
],
)
def test_validate_ha_host_accepts_lan(host) -> None:
"""Loopback / private / link-local / hostnames must not raise.
Link-local (169.254/16, fe80::/10) is part of the shared
``validate_lan_host`` LAN policy used by every LED device provider, so
HA reuses it unchanged rather than hand-rolling a stricter variant.
"""
_validate_ha_host(host)
@pytest.mark.parametrize(
"host",
[
"1.1.1.1:8123", # public IPv4 + port
"8.8.8.8", # public IPv4, no port
"[2606:4700:4700::1111]:8123", # public IPv6 + port
],
)
def test_validate_ha_host_rejects_public(host) -> None:
"""Genuinely-public IPs (the network-scan-oracle risk) are rejected."""
with pytest.raises(ValueError, match="LedGrab is LAN-only"):
_validate_ha_host(host)