Add multi-token authentication with client labels

- Replace single api_token with api_tokens dict (label: token pairs)
- Add context-aware logging to track which client made each request
- Implement token label lookup with secure comparison
- Add logging middleware to inject token labels into request context
- Update logging format to display [label] in all log messages
- Fix WebSocket authentication to use new multi-token system
- Update CLI --show-token to display all tokens with labels
- Update config generation to use api_tokens format
- Update README with multi-token documentation
- Update config.example.yaml with multiple token examples

Benefits:
- Easy identification of clients in logs (Home Assistant, mobile, web UI, etc.)
- Per-client token management and revocation
- Better security and auditability

Example log output:
2026-02-06 03:36:20,806 - [home_assistant] - WebSocket client connected

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-06 03:37:35 +03:00
parent 5342cffac7
commit 71a0a6e6d1
6 changed files with 155 additions and 32 deletions

View File

@@ -1,5 +1,7 @@
"""Authentication middleware and utilities."""
import secrets
from contextvars import ContextVar
from typing import Optional
from fastapi import Depends, HTTPException, Query, Request, status
@@ -9,6 +11,24 @@ from .config import settings
security = HTTPBearer(auto_error=False)
# Context variable to store current request's token label
token_label_var: ContextVar[str] = ContextVar("token_label", default="unknown")
def get_token_label(token: str) -> Optional[str]:
"""Get the label for a token. Returns None if token is invalid.
Args:
token: The token to look up
Returns:
The label for the token, or None if invalid
"""
for label, stored_token in settings.api_tokens.items():
if secrets.compare_digest(stored_token, token):
return label
return None
async def verify_token(
request: Request,
@@ -21,7 +41,7 @@ async def verify_token(
credentials: The bearer token credentials
Returns:
The validated token
The token label
Raises:
HTTPException: If the token is missing or invalid
@@ -33,14 +53,17 @@ async def verify_token(
headers={"WWW-Authenticate": "Bearer"},
)
if credentials.credentials != settings.api_token:
label = get_token_label(credentials.credentials)
if label is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication token",
headers={"WWW-Authenticate": "Bearer"},
)
return credentials.credentials
# Set label in context for logging
token_label_var.set(label)
return label
class TokenAuth:
@@ -54,7 +77,7 @@ class TokenAuth:
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> str | None:
"""Verify the token and return it or raise an exception."""
"""Verify the token and return the label or raise an exception."""
if credentials is None:
if self.auto_error:
raise HTTPException(
@@ -64,7 +87,8 @@ class TokenAuth:
)
return None
if credentials.credentials != settings.api_token:
label = get_token_label(credentials.credentials)
if label is None:
if self.auto_error:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@@ -73,7 +97,9 @@ class TokenAuth:
)
return None
return credentials.credentials
# Set label in context for logging
token_label_var.set(label)
return label
async def verify_token_or_query(
@@ -89,23 +115,28 @@ async def verify_token_or_query(
token: Token from query parameter
Returns:
The validated token
The token label
Raises:
HTTPException: If the token is missing or invalid
"""
label = None
# Try header first
if credentials is not None:
if credentials.credentials == settings.api_token:
return credentials.credentials
label = get_token_label(credentials.credentials)
# Try query parameter
if token is not None:
if token == settings.api_token:
return token
if label is None and token is not None:
label = get_token_label(token)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing or invalid authentication token",
headers={"WWW-Authenticate": "Bearer"},
)
if label is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing or invalid authentication token",
headers={"WWW-Authenticate": "Bearer"},
)
# Set label in context for logging
token_label_var.set(label)
return label