4d1bb78c83
Lint & Test / test (push) Successful in 10s
When no api_tokens are configured (the new default), all endpoints are accessible without authentication. The frontend detects this via /api/health's auth_required field and skips the login form. - Backend: auth.py skips verification when api_tokens is empty - Frontend: shared getAuthHeaders()/hasCredentials() helpers replace scattered token logic across all JS modules - Health endpoint exposes auth_required for frontend discovery - config.example.yaml ships with tokens commented out - CLI --show-token and startup log reflect disabled state Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
167 lines
4.9 KiB
Python
167 lines
4.9 KiB
Python
"""Authentication middleware and utilities."""
|
|
|
|
import secrets
|
|
from contextvars import ContextVar
|
|
from typing import Optional
|
|
|
|
from fastapi import Depends, HTTPException, Query, Request, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
|
|
from .config import settings
|
|
|
|
security = HTTPBearer(auto_error=False)
|
|
|
|
# Context variable to store current request's token label
|
|
token_label_var: ContextVar[str] = ContextVar("token_label", default="unknown")
|
|
|
|
|
|
def auth_enabled() -> bool:
|
|
"""Check if authentication is enabled (i.e. at least one token is configured)."""
|
|
return bool(settings.api_tokens)
|
|
|
|
|
|
def get_token_label(token: str) -> Optional[str]:
|
|
"""Get the label for a token. Returns None if token is invalid.
|
|
|
|
Args:
|
|
token: The token to look up
|
|
|
|
Returns:
|
|
The label for the token, or None if invalid
|
|
"""
|
|
for label, stored_token in settings.api_tokens.items():
|
|
if secrets.compare_digest(stored_token, token):
|
|
return label
|
|
return None
|
|
|
|
|
|
async def verify_token(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
) -> str:
|
|
"""Verify the API token from the Authorization header.
|
|
|
|
When no tokens are configured, authentication is skipped entirely.
|
|
Reuses the label from middleware context when already validated.
|
|
|
|
Returns:
|
|
The token label (or "anonymous" when auth is disabled)
|
|
|
|
Raises:
|
|
HTTPException: If the token is missing or invalid (only when auth enabled)
|
|
"""
|
|
if not auth_enabled():
|
|
token_label_var.set("anonymous")
|
|
return "anonymous"
|
|
|
|
# Reuse label already set by middleware to avoid redundant O(n) scan
|
|
existing = token_label_var.get("unknown")
|
|
if existing != "unknown":
|
|
return existing
|
|
|
|
if credentials is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Missing authentication token",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
label = get_token_label(credentials.credentials)
|
|
if label is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication token",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
token_label_var.set(label)
|
|
return label
|
|
|
|
|
|
class TokenAuth:
|
|
"""Dependency class for token authentication."""
|
|
|
|
def __init__(self, auto_error: bool = True):
|
|
self.auto_error = auto_error
|
|
|
|
async def __call__(
|
|
self,
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
) -> str | None:
|
|
"""Verify the token and return the label or raise an exception."""
|
|
if not auth_enabled():
|
|
token_label_var.set("anonymous")
|
|
return "anonymous"
|
|
|
|
if credentials is None:
|
|
if self.auto_error:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Missing authentication token",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
return None
|
|
|
|
label = get_token_label(credentials.credentials)
|
|
if label is None:
|
|
if self.auto_error:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication token",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
return None
|
|
|
|
# Set label in context for logging
|
|
token_label_var.set(label)
|
|
return label
|
|
|
|
|
|
async def verify_token_or_query(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
token: Optional[str] = Query(None, description="API token as query parameter"),
|
|
) -> str:
|
|
"""Verify the API token from header or query parameter.
|
|
|
|
Useful for endpoints that need to be accessed via URL (like images).
|
|
|
|
Args:
|
|
credentials: The bearer token credentials from header
|
|
token: Token from query parameter
|
|
|
|
Returns:
|
|
The token label
|
|
|
|
Raises:
|
|
HTTPException: If the token is missing or invalid
|
|
"""
|
|
if not auth_enabled():
|
|
token_label_var.set("anonymous")
|
|
return "anonymous"
|
|
|
|
# Reuse label already set by middleware
|
|
existing = token_label_var.get("unknown")
|
|
if existing != "unknown":
|
|
return existing
|
|
|
|
label = None
|
|
|
|
# Try header first
|
|
if credentials is not None:
|
|
label = get_token_label(credentials.credentials)
|
|
|
|
# Try query parameter
|
|
if label is None and token is not None:
|
|
label = get_token_label(token)
|
|
|
|
if label is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Missing or invalid authentication token",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
token_label_var.set(label)
|
|
return label
|