"""Authentication module for API key validation.""" import secrets from typing import Annotated from fastapi import Depends, HTTPException, Security, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from wled_controller.config import get_config from wled_controller.utils import get_logger logger = get_logger(__name__) # Security scheme for Bearer token security = HTTPBearer(auto_error=False) def verify_api_key( credentials: Annotated[HTTPAuthorizationCredentials | None, Security(security)] ) -> str: """Verify API key from Authorization header. Args: credentials: HTTP authorization credentials Returns: Label/identifier of the authenticated client Raises: HTTPException: If authentication is required but invalid """ config = get_config() # Check if credentials are provided if not credentials: logger.warning("Request missing Authorization header") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing API key - authentication is required", headers={"WWW-Authenticate": "Bearer"}, ) # Extract token token = credentials.credentials # Verify against configured API keys if not config.auth.api_keys: logger.error("No API keys configured - server misconfiguration") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Server authentication not configured properly", ) # Find matching key and return its label using constant-time comparison authenticated_as = None for label, api_key in config.auth.api_keys.items(): if secrets.compare_digest(token, api_key): authenticated_as = label break if not authenticated_as: logger.warning("Invalid API key attempt") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key", headers={"WWW-Authenticate": "Bearer"}, ) # Log successful authentication logger.debug(f"Authenticated as: {authenticated_as}") return authenticated_as # Dependency for protected routes # Returns the label/identifier of the authenticated client AuthRequired = Annotated[str, Depends(verify_api_key)] def verify_ws_token(token: str) -> bool: """Check a WebSocket query-param token against configured API keys. Use this for WebSocket endpoints where FastAPI's Depends() isn't available. """ config = get_config() if token and config.auth.api_keys: for _label, api_key in config.auth.api_keys.items(): if secrets.compare_digest(token, api_key): return True return False