fix: comprehensive security, stability, and code quality audit
Build Android APK / build-android (push) Failing after 1m45s
Lint & Test / test (push) Successful in 4m54s

Security:
- Force API key auth for LAN (non-loopback) requests; remove shipped dev key
- Block path-traversal in backup restore; require auth on backup endpoints
- SSRF protection: DNS resolve + private/loopback/link-local IP rejection
- AES-256-GCM encryption for HA tokens and MQTT passwords with auto-migration
- WebSocket auth migrated from query-string to first-message protocol
- Asset upload: extension allowlist, server-side mime, Content-Disposition
- Update installer: SHA256 verification, tar/zip member validation
- Tightened CORS (explicit methods/headers, no credentials)
- ADB serial regex allowlist, webhook rate-limit key fix, log scrubbing

Android:
- Root-capture: ordered teardown, screenrecord respawn watchdog, child reaping
- USB permission blocking API via CompletableDeferred
- Python init crash guard with fatal-error screen
- Moved root grant + QR generation off Main thread
- Cached PyObject engine for per-frame bridge calls
- Ordered ScreenCapture resource cleanup, allowBackup=false

Python:
- Replaced all asyncio.get_event_loop() with get_running_loop/to_thread
- Split color_strip_sources.py (1683->5 files) and color_strip_stream.py
  (1324->7 files) into packages
- Extracted FrameLimiter utility, migrated 9 stream loops
- Provider base-class reuse, WLED state caching + URL normalization
- Narrowed broad except-pass in WS routes, threading fixes in BaseStore

Frontend:
- XSS fix: escapeHtml on dynamic option labels, reconcile-based list renders
- Typed DOM helpers, safe localStorage access, AbortController listener hygiene
- openAuthedWs helper for first-message WS auth protocol
- Migrated remaining plain <select>s to IconSelect/EntitySelect

Design:
- WCAG AA primary color on light theme (#2e7d32, 5.4:1 contrast)
- Android TV 10-foot breakpoint (tv.css)
- Consolidated z-index tokens, unified easing, card-running GPU hints
This commit is contained in:
2026-04-16 04:56:04 +03:00
parent 5fcb9f82bd
commit 123da1b5c4
124 changed files with 6276 additions and 3705 deletions
@@ -7,9 +7,20 @@ from ledgrab.config import get_config
# Ensure audio filters registered
import ledgrab.core.audio.filters # noqa: F401
_config = get_config()
_api_key = next(iter(_config.auth.api_keys.values()), "")
AUTH = {"Authorization": f"Bearer {_api_key}"} if _api_key else {}
AUTH: dict = {}
@pytest.fixture(autouse=True, scope="module")
def _populate_auth():
"""Resolve the active API key once per module — the global config may
have been swapped by a session-scoped conftest after this module was
first imported, so we cannot capture at import time."""
key = next(iter(get_config().auth.api_keys.values()), "")
AUTH.clear()
if key:
AUTH["Authorization"] = f"Bearer {key}"
yield
AUTH.clear()
@pytest.fixture(scope="module")
+3
View File
@@ -32,6 +32,9 @@ _test_config = _original_config.model_copy(
assets_dir=_test_assets_dir,
max_file_size_mb=_original_config.assets.max_file_size_mb,
),
# Inject a test API key so auth-enforcement tests have something
# concrete to authenticate against and reject when omitted.
"auth": _config_mod.AuthConfig(api_keys={"test": "test-api-key-12345"}),
},
)
_config_mod.config = _test_config
+3
View File
@@ -27,6 +27,8 @@ _test_assets_dir = str(_e2e_tmp / "test_assets")
import ledgrab.config as _config_mod # noqa: E402
# Build a Config that mirrors production settings but with isolated paths.
# Always inject a test API key so auth-enforcement tests have something
# concrete to authenticate against (and reject when omitted).
_original_config = _config_mod.Config.load()
_test_config = _original_config.model_copy(
update={
@@ -35,6 +37,7 @@ _test_config = _original_config.model_copy(
assets_dir=_test_assets_dir,
max_file_size_mb=_original_config.assets.max_file_size_mb,
),
"auth": _config_mod.AuthConfig(api_keys={"e2e": "e2e-test-api-key-12345"}),
},
)
# Install as the global singleton so all subsequent get_config() calls
+49
View File
@@ -0,0 +1,49 @@
"""Security tests for backup/restore endpoints (e2e — uses live app)."""
import io
import zipfile
from tests.e2e.conftest import API_KEY # type: ignore
def test_restore_rejects_path_traversal_zip(client):
"""ZIP entries with ../ must be rejected."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr(
"ledgrab.db",
b"SQLite format 3\x00" + b"\x00" * 200,
)
zf.writestr("assets/../evil.txt", b"bad")
buf.seek(0)
resp = client.post(
"/api/v1/system/restore",
files={"file": ("backup.zip", buf.getvalue(), "application/zip")},
)
assert resp.status_code == 400
assert "traversal" in resp.text.lower()
def test_backup_download_requires_non_anonymous(client):
"""Backup download rejects anonymous (loopback) callers."""
import ledgrab.config as cm
from ledgrab.config import AuthConfig
saved = cm.config
cm.config = saved.model_copy(update={"auth": AuthConfig(api_keys={})})
try:
client.headers.pop("Authorization", None)
resp = client.get("/api/v1/system/backup")
assert resp.status_code == 401
finally:
cm.config = saved
client.headers["Authorization"] = f"Bearer {API_KEY}"
def test_backup_roundtrip_authenticated(client):
"""A normal authenticated download still works."""
resp = client.get("/api/v1/system/backup")
assert resp.status_code == 200
assert resp.headers["content-type"] == "application/zip"
assert resp.content[:4] == b"PK\x03\x04"
+56
View File
@@ -0,0 +1,56 @@
"""Tests for SSRF protection in safe_source.validate_image_url."""
import pytest
from fastapi import HTTPException
from ledgrab.utils.safe_source import validate_image_url
@pytest.mark.parametrize(
"url",
[
"http://127.0.0.1/",
"http://127.0.0.1:8080/x.png",
"http://[::1]/",
"http://169.254.169.254/latest/meta-data/",
"http://10.0.0.1/",
"http://192.168.1.1/x.png",
"http://172.16.0.5/",
"http://0.0.0.0/",
"http://224.0.0.1/", # multicast
"ftp://example.com/",
"file:///etc/passwd",
"gopher://example.com/",
"http:///no-host",
],
)
def test_validate_image_url_rejects_dangerous_targets(url):
with pytest.raises(HTTPException) as exc:
validate_image_url(url)
assert exc.value.status_code == 400
def test_validate_image_url_accepts_public(monkeypatch):
"""Accept a public host whose DNS resolves to a non-private IP."""
import ledgrab.utils.safe_source as ss
def fake_getaddrinfo(host, _port):
# Pretend example.com resolves to a public IP
return [(0, 0, 0, "", ("93.184.216.34", 0))]
monkeypatch.setattr(ss.socket, "getaddrinfo", fake_getaddrinfo)
# Should not raise
validate_image_url("https://example.com/image.png")
def test_validate_image_url_blocks_dns_to_private(monkeypatch):
"""Hostname that resolves to a private IP must be rejected."""
import ledgrab.utils.safe_source as ss
def fake_getaddrinfo(host, _port):
return [(0, 0, 0, "", ("10.0.0.5", 0))]
monkeypatch.setattr(ss.socket, "getaddrinfo", fake_getaddrinfo)
with pytest.raises(HTTPException) as exc:
validate_image_url("http://internal.example.com/")
assert exc.value.status_code == 400
+219
View File
@@ -0,0 +1,219 @@
"""Tests for WebSocket first-message authentication."""
import json
import pytest
from fastapi import FastAPI, WebSocket
from fastapi.testclient import TestClient
import ledgrab.config as config_mod
from ledgrab.config import AuthConfig, Config, ServerConfig, StorageConfig
# ---------------------------------------------------------------------------
# Minimal app with a single WS endpoint using verify_ws_auth
# ---------------------------------------------------------------------------
def _make_app() -> FastAPI:
app = FastAPI()
@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
from ledgrab.api.auth import WS_AUTH_CLOSE_CODE, verify_ws_auth
await websocket.accept()
label = await verify_ws_auth(websocket)
if label is None:
await websocket.close(code=WS_AUTH_CLOSE_CODE)
return
await websocket.send_json({"echo": "hello", "label": label})
# Keep alive until client disconnects
try:
while True:
await websocket.receive_text()
except Exception:
pass
return app
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture()
def app():
return _make_app()
@pytest.fixture()
def _patch_config_with_keys(monkeypatch, tmp_path):
"""Patch global config to have a test API key."""
data_dir = tmp_path / "data"
data_dir.mkdir(parents=True, exist_ok=True)
cfg = Config(
server=ServerConfig(host="127.0.0.1", port=9999),
auth=AuthConfig(api_keys={"dev": "secret-key-abc"}),
storage=StorageConfig(database_file=str(data_dir / "t.db")),
)
monkeypatch.setattr(config_mod, "config", cfg)
@pytest.fixture()
def _patch_config_no_keys(monkeypatch, tmp_path):
"""Patch global config with empty api_keys (loopback-only mode)."""
data_dir = tmp_path / "data"
data_dir.mkdir(parents=True, exist_ok=True)
cfg = Config(
server=ServerConfig(host="127.0.0.1", port=9999),
auth=AuthConfig(api_keys={}),
storage=StorageConfig(database_file=str(data_dir / "t.db")),
)
monkeypatch.setattr(config_mod, "config", cfg)
# ---------------------------------------------------------------------------
# Tests — keys configured
# ---------------------------------------------------------------------------
class TestWsAuthWithKeys:
"""WS auth when api_keys are configured."""
@pytest.mark.usefixtures("_patch_config_with_keys")
def test_valid_token(self, app):
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_text(json.dumps({"type": "auth", "token": "secret-key-abc"}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_ok"
data = json.loads(ws.receive_text())
assert data["label"] == "dev"
@pytest.mark.usefixtures("_patch_config_with_keys")
def test_invalid_token(self, app):
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_text(json.dumps({"type": "auth", "token": "wrong"}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_error"
assert "invalid" in resp["reason"].lower()
@pytest.mark.usefixtures("_patch_config_with_keys")
def test_missing_token(self, app):
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_text(json.dumps({"type": "auth"}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_error"
@pytest.mark.usefixtures("_patch_config_with_keys")
def test_non_auth_first_message(self, app):
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_text(json.dumps({"type": "ping"}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_error"
assert "auth" in resp["reason"].lower()
@pytest.mark.usefixtures("_patch_config_with_keys")
def test_invalid_json(self, app):
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_text("not json at all")
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_error"
assert "json" in resp["reason"].lower()
# ---------------------------------------------------------------------------
# Tests — no keys (loopback anonymous)
# ---------------------------------------------------------------------------
class TestWsAuthLoopbackAnonymous:
"""WS auth when api_keys is empty — loopback clients get anonymous access.
The Starlette TestClient reports client host as "testclient" which
is in the _LOOPBACK_HOSTS set.
"""
@pytest.mark.usefixtures("_patch_config_no_keys")
def test_anonymous_with_auth_message(self, app):
"""Sending an auth message on loopback with no keys is a no-op — still succeeds."""
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_text(json.dumps({"type": "auth", "token": None}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_ok"
data = json.loads(ws.receive_text())
assert data["label"] == "anonymous"
@pytest.mark.usefixtures("_patch_config_no_keys")
def test_anonymous_with_token(self, app):
"""Sending a token on loopback with no keys is also fine."""
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_text(json.dumps({"type": "auth", "token": "anything"}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_ok"
data = json.loads(ws.receive_text())
assert data["label"] == "anonymous"
# ---------------------------------------------------------------------------
# Tests — accept_and_authenticate_ws helper
# ---------------------------------------------------------------------------
class TestAcceptAndAuthenticateWs:
"""Test the convenience wrapper."""
@pytest.mark.usefixtures("_patch_config_with_keys")
def test_accept_and_auth_success(self):
app = FastAPI()
@app.websocket("/ws2")
async def ws2(websocket: WebSocket):
from ledgrab.api.auth import accept_and_authenticate_ws
label = await accept_and_authenticate_ws(websocket)
if label is None:
return
await websocket.send_json({"ok": True, "label": label})
try:
while True:
await websocket.receive_text()
except Exception:
pass
client = TestClient(app)
with client.websocket_connect("/ws2") as ws:
ws.send_text(json.dumps({"type": "auth", "token": "secret-key-abc"}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_ok"
data = json.loads(ws.receive_text())
assert data["ok"] is True
assert data["label"] == "dev"
@pytest.mark.usefixtures("_patch_config_with_keys")
def test_accept_and_auth_failure_closes(self):
app = FastAPI()
@app.websocket("/ws3")
async def ws3(websocket: WebSocket):
from ledgrab.api.auth import accept_and_authenticate_ws
label = await accept_and_authenticate_ws(websocket)
if label is None:
return
# Should not reach here
await websocket.send_json({"should": "not happen"})
client = TestClient(app)
with client.websocket_connect("/ws3") as ws:
ws.send_text(json.dumps({"type": "auth", "token": "wrong-key"}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_error"