Files
ledgrab/server/tests/test_ws_auth.py
T
alexei.dolgolyov 123da1b5c4
Build Android APK / build-android (push) Failing after 1m45s
Lint & Test / test (push) Successful in 4m54s
fix: comprehensive security, stability, and code quality audit
Security:
- Force API key auth for LAN (non-loopback) requests; remove shipped dev key
- Block path-traversal in backup restore; require auth on backup endpoints
- SSRF protection: DNS resolve + private/loopback/link-local IP rejection
- AES-256-GCM encryption for HA tokens and MQTT passwords with auto-migration
- WebSocket auth migrated from query-string to first-message protocol
- Asset upload: extension allowlist, server-side mime, Content-Disposition
- Update installer: SHA256 verification, tar/zip member validation
- Tightened CORS (explicit methods/headers, no credentials)
- ADB serial regex allowlist, webhook rate-limit key fix, log scrubbing

Android:
- Root-capture: ordered teardown, screenrecord respawn watchdog, child reaping
- USB permission blocking API via CompletableDeferred
- Python init crash guard with fatal-error screen
- Moved root grant + QR generation off Main thread
- Cached PyObject engine for per-frame bridge calls
- Ordered ScreenCapture resource cleanup, allowBackup=false

Python:
- Replaced all asyncio.get_event_loop() with get_running_loop/to_thread
- Split color_strip_sources.py (1683->5 files) and color_strip_stream.py
  (1324->7 files) into packages
- Extracted FrameLimiter utility, migrated 9 stream loops
- Provider base-class reuse, WLED state caching + URL normalization
- Narrowed broad except-pass in WS routes, threading fixes in BaseStore

Frontend:
- XSS fix: escapeHtml on dynamic option labels, reconcile-based list renders
- Typed DOM helpers, safe localStorage access, AbortController listener hygiene
- openAuthedWs helper for first-message WS auth protocol
- Migrated remaining plain <select>s to IconSelect/EntitySelect

Design:
- WCAG AA primary color on light theme (#2e7d32, 5.4:1 contrast)
- Android TV 10-foot breakpoint (tv.css)
- Consolidated z-index tokens, unified easing, card-running GPU hints
2026-04-16 04:56:04 +03:00

220 lines
7.9 KiB
Python

"""Tests for WebSocket first-message authentication."""
import json
import pytest
from fastapi import FastAPI, WebSocket
from fastapi.testclient import TestClient
import ledgrab.config as config_mod
from ledgrab.config import AuthConfig, Config, ServerConfig, StorageConfig
# ---------------------------------------------------------------------------
# Minimal app with a single WS endpoint using verify_ws_auth
# ---------------------------------------------------------------------------
def _make_app() -> FastAPI:
app = FastAPI()
@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
from ledgrab.api.auth import WS_AUTH_CLOSE_CODE, verify_ws_auth
await websocket.accept()
label = await verify_ws_auth(websocket)
if label is None:
await websocket.close(code=WS_AUTH_CLOSE_CODE)
return
await websocket.send_json({"echo": "hello", "label": label})
# Keep alive until client disconnects
try:
while True:
await websocket.receive_text()
except Exception:
pass
return app
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture()
def app():
return _make_app()
@pytest.fixture()
def _patch_config_with_keys(monkeypatch, tmp_path):
"""Patch global config to have a test API key."""
data_dir = tmp_path / "data"
data_dir.mkdir(parents=True, exist_ok=True)
cfg = Config(
server=ServerConfig(host="127.0.0.1", port=9999),
auth=AuthConfig(api_keys={"dev": "secret-key-abc"}),
storage=StorageConfig(database_file=str(data_dir / "t.db")),
)
monkeypatch.setattr(config_mod, "config", cfg)
@pytest.fixture()
def _patch_config_no_keys(monkeypatch, tmp_path):
"""Patch global config with empty api_keys (loopback-only mode)."""
data_dir = tmp_path / "data"
data_dir.mkdir(parents=True, exist_ok=True)
cfg = Config(
server=ServerConfig(host="127.0.0.1", port=9999),
auth=AuthConfig(api_keys={}),
storage=StorageConfig(database_file=str(data_dir / "t.db")),
)
monkeypatch.setattr(config_mod, "config", cfg)
# ---------------------------------------------------------------------------
# Tests — keys configured
# ---------------------------------------------------------------------------
class TestWsAuthWithKeys:
"""WS auth when api_keys are configured."""
@pytest.mark.usefixtures("_patch_config_with_keys")
def test_valid_token(self, app):
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_text(json.dumps({"type": "auth", "token": "secret-key-abc"}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_ok"
data = json.loads(ws.receive_text())
assert data["label"] == "dev"
@pytest.mark.usefixtures("_patch_config_with_keys")
def test_invalid_token(self, app):
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_text(json.dumps({"type": "auth", "token": "wrong"}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_error"
assert "invalid" in resp["reason"].lower()
@pytest.mark.usefixtures("_patch_config_with_keys")
def test_missing_token(self, app):
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_text(json.dumps({"type": "auth"}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_error"
@pytest.mark.usefixtures("_patch_config_with_keys")
def test_non_auth_first_message(self, app):
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_text(json.dumps({"type": "ping"}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_error"
assert "auth" in resp["reason"].lower()
@pytest.mark.usefixtures("_patch_config_with_keys")
def test_invalid_json(self, app):
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_text("not json at all")
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_error"
assert "json" in resp["reason"].lower()
# ---------------------------------------------------------------------------
# Tests — no keys (loopback anonymous)
# ---------------------------------------------------------------------------
class TestWsAuthLoopbackAnonymous:
"""WS auth when api_keys is empty — loopback clients get anonymous access.
The Starlette TestClient reports client host as "testclient" which
is in the _LOOPBACK_HOSTS set.
"""
@pytest.mark.usefixtures("_patch_config_no_keys")
def test_anonymous_with_auth_message(self, app):
"""Sending an auth message on loopback with no keys is a no-op — still succeeds."""
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_text(json.dumps({"type": "auth", "token": None}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_ok"
data = json.loads(ws.receive_text())
assert data["label"] == "anonymous"
@pytest.mark.usefixtures("_patch_config_no_keys")
def test_anonymous_with_token(self, app):
"""Sending a token on loopback with no keys is also fine."""
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_text(json.dumps({"type": "auth", "token": "anything"}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_ok"
data = json.loads(ws.receive_text())
assert data["label"] == "anonymous"
# ---------------------------------------------------------------------------
# Tests — accept_and_authenticate_ws helper
# ---------------------------------------------------------------------------
class TestAcceptAndAuthenticateWs:
"""Test the convenience wrapper."""
@pytest.mark.usefixtures("_patch_config_with_keys")
def test_accept_and_auth_success(self):
app = FastAPI()
@app.websocket("/ws2")
async def ws2(websocket: WebSocket):
from ledgrab.api.auth import accept_and_authenticate_ws
label = await accept_and_authenticate_ws(websocket)
if label is None:
return
await websocket.send_json({"ok": True, "label": label})
try:
while True:
await websocket.receive_text()
except Exception:
pass
client = TestClient(app)
with client.websocket_connect("/ws2") as ws:
ws.send_text(json.dumps({"type": "auth", "token": "secret-key-abc"}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_ok"
data = json.loads(ws.receive_text())
assert data["ok"] is True
assert data["label"] == "dev"
@pytest.mark.usefixtures("_patch_config_with_keys")
def test_accept_and_auth_failure_closes(self):
app = FastAPI()
@app.websocket("/ws3")
async def ws3(websocket: WebSocket):
from ledgrab.api.auth import accept_and_authenticate_ws
label = await accept_and_authenticate_ws(websocket)
if label is None:
return
# Should not reach here
await websocket.send_json({"should": "not happen"})
client = TestClient(app)
with client.websocket_connect("/ws3") as ws:
ws.send_text(json.dumps({"type": "auth", "token": "wrong-key"}))
resp = json.loads(ws.receive_text())
assert resp["type"] == "auth_error"