Files
ledgrab/server/src/ledgrab/api/auth.py
T
alexei.dolgolyov a5effba553 feat: aggregated snapshot + wiring-graph APIs, MQTT device brokers
Backend
- snapshot: GET /api/v1/snapshot aggregates targets, devices, sources,
  presets and system into one payload for the HA coordinator, collapsing
  the prior ~2N+M request fan-out; per-section ?include= gating.
- graph: GET /api/v1/graph{,/schema,/dependents} backed by a pure,
  unit-tested graph_schema engine — one authoritative connectable-field
  registry so the editor no longer hard-codes topology in two places.
- devices: thread mqtt_source_id through DeviceCreate/Update/Response and
  the routes for multi-broker MQTT; shared validate_mqtt_source_exists
  (_mqtt_validation.py) reused by device + output-target routes; stop
  update_device masking intentional 4xx as 500.
- shutdown: bound uvicorn graceful-shutdown via GRACEFUL_SHUTDOWN_TIMEOUT
  (shared by __main__, android_entry, demo) so a lingering events WebSocket
  can't strand LED targets or block process exit.
- access log: structured _access_log middleware attributing each request to
  its authenticated token label (never the secret); uvicorn access_log off.

Frontend
- graph editor: generic schema-driven port/edge rendering, layout and
  connection handling; service-worker refresh.
- device modals: MQTT broker EntitySelect for device_type=mqtt in add-device
  and settings, wired into load/save/validate/dirty-check/clone.
- i18n: en/ru/zh keys.

Tests: graph routes + schema, snapshot routes, access log, mqtt_source_id
device regressions, bounded-shutdown entrypoint. 1614 passed.
2026-05-28 22:51:04 +03:00

362 lines
13 KiB
Python

"""Authentication module for API key validation."""
import asyncio
import json
import secrets
from typing import Annotated
from fastapi import Depends, HTTPException, Request, Security, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette.websockets import WebSocket, WebSocketDisconnect
from ledgrab.config import get_config
from ledgrab.utils import get_logger
from ledgrab.utils.net_classify import is_loopback as _classify_is_loopback
logger = get_logger(__name__)
# Security scheme for Bearer token
security = HTTPBearer(auto_error=False)
# Exceptions that legitimately fire when we try to send / close a WebSocket
# that is already shutting down: the peer dropped, the connect-state moved
# under us, the underlying socket is gone, the JSON encoder choked, etc.
# Keeping this tuple narrow means a genuine programming error (AttributeError,
# TypeError) bubbles up to the caller instead of silently disappearing.
_WS_SEND_BENIGN_EXC: tuple[type[BaseException], ...] = (
WebSocketDisconnect,
RuntimeError,
ConnectionError,
OSError,
)
def is_auth_enabled() -> bool:
"""Return True when at least one API key is configured."""
return bool(get_config().auth.api_keys)
def _is_loopback(host: str | None) -> bool:
"""Return True when *host* is a loopback address.
Delegates to :func:`ledgrab.utils.net_classify.is_loopback` so this
auth gate, the SSRF guard in ``safe_source``, and the LAN-default
inference in ``url_scheme`` share one classification source.
"""
if not host:
return False
return _classify_is_loopback(host)
def verify_api_key(
request: Request,
credentials: Annotated[HTTPAuthorizationCredentials | None, Security(security)],
) -> str:
"""Verify API key from Authorization header.
Behavior:
- When no API keys are configured AND the request comes from a loopback
address, anonymous access is allowed.
- When no API keys are configured AND the request is from a non-loopback
(LAN) address, the request is REJECTED with 401 (security default —
LAN access requires an API key).
- When API keys ARE configured, valid Bearer credentials are required.
Args:
request: incoming request (used to read client host)
credentials: HTTP authorization credentials
Returns:
Label/identifier of the authenticated client ("anonymous" for
loopback unauthenticated access).
Raises:
HTTPException: If authentication is required but invalid / missing.
"""
config = get_config()
client_host = request.client.host if request.client else None
if not config.auth.api_keys:
# No keys configured — allow loopback only.
if _is_loopback(client_host):
request.state.auth_label = "anonymous"
return "anonymous"
# Allow caller to authenticate explicitly even without configured keys?
# No — there are no keys to compare against. Reject.
logger.warning("Rejected LAN request from %s: no API key configured", client_host)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=(
"LAN access requires an API key. Configure auth.api_keys in "
"config.yaml (see config/default_config.yaml for the format)."
),
headers={"WWW-Authenticate": "Bearer"},
)
# Check if credentials are provided
if not credentials:
logger.warning("Request missing Authorization header")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing API key - authentication is required",
headers={"WWW-Authenticate": "Bearer"},
)
# Extract token
token = credentials.credentials
# Find matching key and return its label using constant-time comparison
authenticated_as = None
for label, api_key in config.auth.api_keys.items():
if secrets.compare_digest(token, api_key):
authenticated_as = label
break
if not authenticated_as:
logger.warning("Invalid API key attempt")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
headers={"WWW-Authenticate": "Bearer"},
)
# Log successful authentication
logger.debug(f"Authenticated as: {authenticated_as}")
# Stash the friendly label so the access-log middleware can attribute the
# request to a client without re-running the token comparison.
request.state.auth_label = authenticated_as
return authenticated_as
# Dependency for protected routes
# Returns the label/identifier of the authenticated client
AuthRequired = Annotated[str, Depends(verify_api_key)]
def require_authenticated(label: str) -> None:
"""Reject the anonymous (loopback) auth label.
Use this in endpoints that must NEVER be called anonymously even
from loopback (e.g. backup download, secret reveal).
Raises:
HTTPException: If *label* is "anonymous".
"""
if label == "anonymous":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=(
"This endpoint requires an API key. Configure auth.api_keys "
"in config.yaml and provide a Bearer token."
),
headers={"WWW-Authenticate": "Bearer"},
)
WS_AUTH_CLOSE_CODE = 4401
WS_ORIGIN_CLOSE_CODE = 4403
"""Close code sent when a WebSocket request fails the Origin allowlist."""
def _is_origin_allowed(origin: str | None, allowed: list[str]) -> bool:
"""Return True when *origin* matches one of the configured CORS origins.
Non-browser clients (Python scripts, curl) don't send Origin — those are
allowed through; the Bearer-token check on the auth handshake is the
primary defence in that case. Browsers always set Origin, so this only
blocks cross-site WebSocket connection attempts (CSWSH).
"""
if not origin:
return True
return origin in set(allowed or [])
async def accept_and_authenticate_ws(websocket: WebSocket, timeout: float = 3.0) -> str | None:
"""Accept the WebSocket, then perform first-message auth handshake.
Convenience wrapper over :func:`verify_ws_auth` that handles
``websocket.accept()`` and automatically closes the connection with
:data:`WS_AUTH_CLOSE_CODE` on failure.
Returns the caller label on success, ``None`` on failure (connection
already closed).
"""
# Reject cross-site WebSocket attempts before accepting — a browser-based
# attacker page cannot forge the Origin header, so an Origin mismatch is
# a strong signal even before the token check. Non-browser clients
# legitimately omit Origin; those fall through to the auth handshake.
config = get_config()
origin = websocket.headers.get("origin")
if not _is_origin_allowed(origin, config.server.cors_origins):
logger.warning(
"Rejected WebSocket from origin %r (not in cors_origins)",
origin,
)
try:
await websocket.close(code=WS_ORIGIN_CLOSE_CODE)
except _WS_SEND_BENIGN_EXC:
pass
return None
await websocket.accept()
label = await verify_ws_auth(websocket, timeout=timeout)
if label is None:
try:
await websocket.close(code=WS_AUTH_CLOSE_CODE)
except _WS_SEND_BENIGN_EXC:
pass
return None
return label
"""Close code sent when a WebSocket fails first-message auth (timeout or bad token)."""
def _match_api_key(token: str) -> str | None:
"""Return the label matching *token* using constant-time comparison, or None."""
config = get_config()
if not token:
return None
for label, api_key in config.auth.api_keys.items():
if secrets.compare_digest(token, api_key):
return label
return None
async def verify_ws_auth(
websocket: WebSocket,
timeout: float = 3.0,
) -> str | None:
"""Authenticate a WebSocket via a first-message auth handshake.
Protocol:
1. The caller must have already ``await websocket.accept()`` ed the
connection.
2. This function waits up to *timeout* seconds for the first message,
which must be JSON of the form ``{"type": "auth", "token": "<key>"}``.
``token`` may be null/missing on loopback when no API keys are
configured.
3. On success, sends ``{"type": "auth_ok"}`` and returns the caller
label (e.g. ``"dev"``, or ``"anonymous"`` for loopback with no
configured keys).
4. On failure, sends ``{"type": "auth_error", "reason": ...}`` and
returns ``None``. The caller is responsible for calling
``await websocket.close(code=WS_AUTH_CLOSE_CODE)``.
Loopback policy mirrors :func:`verify_api_key`:
- No API keys configured + loopback client → anonymous access allowed.
If a client sends an ``auth`` message anyway, it's accepted as a
no-op so the protocol stays uniform.
- No API keys configured + non-loopback client → rejected.
- Keys configured → valid token required regardless of loopback.
Returns:
Caller label on success, ``None`` on failure.
"""
config = get_config()
client_host = websocket.client.host if websocket.client else None
loopback = _is_loopback(client_host)
keys_configured = bool(config.auth.api_keys)
# Try to read the auth message with a timeout.
raw: str | None
try:
raw = await asyncio.wait_for(websocket.receive_text(), timeout=timeout)
except asyncio.TimeoutError:
if not keys_configured and loopback:
# Loopback anonymous: no auth message arrived, but none is required.
try:
await websocket.send_json({"type": "auth_ok"})
except _WS_SEND_BENIGN_EXC:
return None
return "anonymous"
logger.warning("WebSocket auth timeout after %.1fs from %s", timeout, client_host)
try:
await websocket.send_json({"type": "auth_error", "reason": "auth timeout"})
except _WS_SEND_BENIGN_EXC:
pass
return None
except WebSocketDisconnect:
return None
except (RuntimeError, ConnectionError, OSError) as exc:
# The peer hung up mid-handshake or the underlying socket is gone.
# Promote anything outside this set to a hard failure with a stack
# trace so we can see real bugs (decode errors, type errors, …).
logger.debug("WebSocket auth receive error: %s", exc)
return None
except Exception:
# Unexpected — log the full traceback so we can see what we missed
# without leaving the connection half-open. Re-raise nothing; the
# caller will close on the None return.
logger.exception("Unexpected error during WebSocket auth handshake")
return None
# Parse the auth message.
try:
msg = json.loads(raw) if raw else {}
except (json.JSONDecodeError, ValueError):
try:
await websocket.send_json(
{"type": "auth_error", "reason": "invalid JSON in auth message"}
)
except _WS_SEND_BENIGN_EXC:
pass
return None
if not isinstance(msg, dict) or msg.get("type") != "auth":
try:
await websocket.send_json(
{"type": "auth_error", "reason": "first message must be {type:'auth'}"}
)
except _WS_SEND_BENIGN_EXC:
pass
return None
token = msg.get("token")
if token is not None and not isinstance(token, str):
try:
await websocket.send_json(
{"type": "auth_error", "reason": "token must be a string or null"}
)
except _WS_SEND_BENIGN_EXC:
pass
return None
# Loopback + no keys configured: accept regardless of token contents.
if not keys_configured:
if loopback:
await websocket.send_json({"type": "auth_ok"})
return "anonymous"
logger.warning("Rejected LAN WebSocket from %s: no API key configured", client_host)
try:
await websocket.send_json(
{
"type": "auth_error",
"reason": "LAN access requires an API key",
}
)
except _WS_SEND_BENIGN_EXC:
pass
return None
# Keys configured: require a matching token.
label = _match_api_key(token or "")
if not label:
logger.warning("Invalid WebSocket auth attempt from %s", client_host)
try:
await websocket.send_json({"type": "auth_error", "reason": "invalid token"})
except _WS_SEND_BENIGN_EXC:
pass
return None
try:
await websocket.send_json({"type": "auth_ok"})
except _WS_SEND_BENIGN_EXC:
return None
logger.debug("WebSocket authenticated as: %s", label)
return label