"""Authentication module for API key validation.""" import secrets from typing import Annotated from fastapi import Depends, HTTPException, Security, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from wled_controller.config import get_config from wled_controller.utils import get_logger logger = get_logger(__name__) # Security scheme for Bearer token security = HTTPBearer(auto_error=False) def is_auth_enabled() -> bool: """Return True when at least one API key is configured.""" return bool(get_config().auth.api_keys) def verify_api_key( credentials: Annotated[HTTPAuthorizationCredentials | None, Security(security)] ) -> str: """Verify API key from Authorization header. When no API keys are configured, authentication is disabled and all requests are allowed through as "anonymous". Args: credentials: HTTP authorization credentials Returns: Label/identifier of the authenticated client Raises: HTTPException: If authentication is required but invalid """ config = get_config() # No keys configured → auth disabled, allow all requests if not config.auth.api_keys: return "anonymous" # Check if credentials are provided if not credentials: logger.warning("Request missing Authorization header") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing API key - authentication is required", headers={"WWW-Authenticate": "Bearer"}, ) # Extract token token = credentials.credentials # Find matching key and return its label using constant-time comparison authenticated_as = None for label, api_key in config.auth.api_keys.items(): if secrets.compare_digest(token, api_key): authenticated_as = label break if not authenticated_as: logger.warning("Invalid API key attempt") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key", headers={"WWW-Authenticate": "Bearer"}, ) # Log successful authentication logger.debug(f"Authenticated as: {authenticated_as}") return authenticated_as # Dependency for protected routes # Returns the label/identifier of the authenticated client AuthRequired = Annotated[str, Depends(verify_api_key)] def verify_ws_token(token: str) -> bool: """Check a WebSocket query-param token against configured API keys. When no API keys are configured, authentication is disabled and all WebSocket connections are allowed. """ config = get_config() # No keys configured → auth disabled, allow all connections if not config.auth.api_keys: return True if token: for _label, api_key in config.auth.api_keys.items(): if secrets.compare_digest(token, api_key): return True return False