refactor: rename project to LedGrab, split HA integration into separate repo
Lint & Test / test (push) Successful in 1m56s
Lint & Test / test (push) Successful in 1m56s
- Rename Python package: wled_controller -> ledgrab - Rename env var prefix: WLED_ -> LEDGRAB_ (with auto-migration for old vars) - Rename localStorage key: wled_api_key -> ledgrab_api_key (with migration) - Rename HA integration domain: wled_screen_controller -> ledgrab - Update all imports, build scripts, Docker, installer, config, docs - Remove HA integration (moved to ledgrab-haos-integration repo) - Remove hacs.json (belongs in HA repo now) - Add startup warning for users with old WLED_ env vars - All tests pass (715/715), ruff clean, tsc clean, frontend builds
This commit is contained in:
@@ -0,0 +1,98 @@
|
||||
"""Authentication module for API key validation."""
|
||||
|
||||
import secrets
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends, HTTPException, Security, status
|
||||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||||
|
||||
from ledgrab.config import get_config
|
||||
from ledgrab.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# Security scheme for Bearer token
|
||||
security = HTTPBearer(auto_error=False)
|
||||
|
||||
|
||||
def is_auth_enabled() -> bool:
|
||||
"""Return True when at least one API key is configured."""
|
||||
return bool(get_config().auth.api_keys)
|
||||
|
||||
|
||||
def verify_api_key(
|
||||
credentials: Annotated[HTTPAuthorizationCredentials | None, Security(security)]
|
||||
) -> str:
|
||||
"""Verify API key from Authorization header.
|
||||
|
||||
When no API keys are configured, authentication is disabled and all
|
||||
requests are allowed through as "anonymous".
|
||||
|
||||
Args:
|
||||
credentials: HTTP authorization credentials
|
||||
|
||||
Returns:
|
||||
Label/identifier of the authenticated client
|
||||
|
||||
Raises:
|
||||
HTTPException: If authentication is required but invalid
|
||||
"""
|
||||
config = get_config()
|
||||
|
||||
# No keys configured → auth disabled, allow all requests
|
||||
if not config.auth.api_keys:
|
||||
return "anonymous"
|
||||
|
||||
# Check if credentials are provided
|
||||
if not credentials:
|
||||
logger.warning("Request missing Authorization header")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Missing API key - authentication is required",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
# Extract token
|
||||
token = credentials.credentials
|
||||
|
||||
# Find matching key and return its label using constant-time comparison
|
||||
authenticated_as = None
|
||||
for label, api_key in config.auth.api_keys.items():
|
||||
if secrets.compare_digest(token, api_key):
|
||||
authenticated_as = label
|
||||
break
|
||||
|
||||
if not authenticated_as:
|
||||
logger.warning("Invalid API key attempt")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Invalid API key",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
# Log successful authentication
|
||||
logger.debug(f"Authenticated as: {authenticated_as}")
|
||||
|
||||
return authenticated_as
|
||||
|
||||
|
||||
# Dependency for protected routes
|
||||
# Returns the label/identifier of the authenticated client
|
||||
AuthRequired = Annotated[str, Depends(verify_api_key)]
|
||||
|
||||
|
||||
def verify_ws_token(token: str) -> bool:
|
||||
"""Check a WebSocket query-param token against configured API keys.
|
||||
|
||||
When no API keys are configured, authentication is disabled and all
|
||||
WebSocket connections are allowed.
|
||||
"""
|
||||
config = get_config()
|
||||
# No keys configured → auth disabled, allow all connections
|
||||
if not config.auth.api_keys:
|
||||
return True
|
||||
if token:
|
||||
for _label, api_key in config.auth.api_keys.items():
|
||||
if secrets.compare_digest(token, api_key):
|
||||
return True
|
||||
return False
|
||||
Reference in New Issue
Block a user