Stability: Add outer try/except/finally with _running=False cleanup to all 6
processing loop methods (live, color_strip, effect, audio, composite, mapped).
Add exponential backoff on consecutive capture errors in live_stream. Move
audio stream.stop() outside lock scope.
Performance: Replace per-pixel Python loop with np.array().tobytes() in
ddp_client. Vectorize pixelate filter with cv2.resize down+up. Vectorize
gradient rendering with np.searchsorted.
Frontend: Add lockBody/unlockBody re-entrancy counter. Add {once:true} to
fetchWithAuth abort listener. Null ws.onclose before ws.close() in LED preview.
Backend: Remove auth token prefix from log messages. Add atomic_write_json
helper (tempfile + os.replace) and update all 10 stores. Add name uniqueness
checks to all update methods. Fix DELETE status codes to 204 in audio_sources
and value_sources. Fix get_source() silent bug in color_strip_sources.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
78 lines
2.3 KiB
Python
78 lines
2.3 KiB
Python
"""Authentication module for API key validation."""
|
|
|
|
import secrets
|
|
from typing import Annotated
|
|
|
|
from fastapi import Depends, HTTPException, Security, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
|
|
from wled_controller.config import get_config
|
|
from wled_controller.utils import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# Security scheme for Bearer token
|
|
security = HTTPBearer(auto_error=False)
|
|
|
|
|
|
def verify_api_key(
|
|
credentials: Annotated[HTTPAuthorizationCredentials | None, Security(security)]
|
|
) -> str:
|
|
"""Verify API key from Authorization header.
|
|
|
|
Args:
|
|
credentials: HTTP authorization credentials
|
|
|
|
Returns:
|
|
Label/identifier of the authenticated client
|
|
|
|
Raises:
|
|
HTTPException: If authentication is required but invalid
|
|
"""
|
|
config = get_config()
|
|
|
|
# Check if credentials are provided
|
|
if not credentials:
|
|
logger.warning("Request missing Authorization header")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Missing API key - authentication is required",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# Extract token
|
|
token = credentials.credentials
|
|
|
|
# Verify against configured API keys
|
|
if not config.auth.api_keys:
|
|
logger.error("No API keys configured - server misconfiguration")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Server authentication not configured properly",
|
|
)
|
|
|
|
# Find matching key and return its label using constant-time comparison
|
|
authenticated_as = None
|
|
for label, api_key in config.auth.api_keys.items():
|
|
if secrets.compare_digest(token, api_key):
|
|
authenticated_as = label
|
|
break
|
|
|
|
if not authenticated_as:
|
|
logger.warning("Invalid API key attempt")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid API key",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# Log successful authentication
|
|
logger.debug(f"Authenticated as: {authenticated_as}")
|
|
|
|
return authenticated_as
|
|
|
|
|
|
# Dependency for protected routes
|
|
# Returns the label/identifier of the authenticated client
|
|
AuthRequired = Annotated[str, Depends(verify_api_key)]
|