Files
media-player-server/media_server/main.py
T
alexei.dolgolyov d131ba461c
Lint & Test / test (push) Successful in 20s
fix: production-readiness hardening — security, perf, a11y, observability
Security
- Default scripts_management, callbacks_management, links_management, and
  media_folders_management to False so a leaked token cannot escalate to RCE
  through admin CRUD endpoints.
- TokenSpec + scope hierarchy (read | control | admin); legacy bare-string
  api_tokens entries promote to admin for back-compat. Management endpoints
  now require admin scope.
- WebSocket subprotocol auth (Sec-WebSocket-Protocol: media-server.token.<T>)
  preferred over ?token= query so the token no longer lands in URL/history/
  Referer; query fallback retained for HA integration back-compat.
- Origin allow-list check on the WS endpoint (CSWSH defence).
- In-process token-bucket rate limiter: 5/min for failed auths,
  10/min for /api/scripts/execute and /api/callbacks/execute.
- shell=False subprocess path (shlex.split) + per-parameter regex `pattern`
  in ScriptParameterConfig to harden shell=true scripts against parameter
  injection (Windows cmd.exe env-var expansion).
- CSP gains form-action, worker-src, manifest-src directives.
- Refuse cors_origins=["*"] at startup; strip token=... from uvicorn access
  logs; validate Gitea release tag against strict SemVer regex.
- noopener noreferrer + no-referrer referrerpolicy on every outbound link.
- icacls hardening of config.yaml on Windows (current user + SYSTEM +
  Administrators only); 0600 still enforced on POSIX.
- WS volume handler clamps input and never drops the socket on bad messages.

Performance
- Album-art read in windows_media gated by track key — was decoding the
  WinRT thumbnail twice per second regardless of track changes.
- /api/media/artwork returns content-derived ETag + Cache-Control so the
  browser sends If-None-Match and gets 304s on track repeats.
- Foreground-service ctypes argtypes hoisted to one-time module init
  (was re-declaring ~14 prototypes per probe).
- display_service _static_cache keyed by (edid_hash, ...) tuple with
  eviction of disappeared monitors — fixes stale capabilities on hot-plug
  swaps where the new topology has the same monitor count.
- Visualizer rAF loop paused on document.hidden, resumed on visible.

Reliability / bug fixes
- Lifespan rewritten as try/yield/finally so a partial-startup failure
  cannot orphan background tasks or executors.
- _run_callback in routes/media.py keeps a strong task ref (GC-safe) and
  uses the dedicated callback executor instead of the default pool.
- macos_media.set_volume() no longer always returns True.
- TrayManager._restart_requested initialised in __init__; set before
  signalling exit so the main thread observes it correctly.
- Missing static_dir now logs a WARNING instead of silent UI disable.

UX / accessibility / PWA
- manifest.json theme_color and background_color match the Studio Reference
  base (#0E0D0B); added id and scope for PWA installability.
- ARIA on mini-player icon buttons; inner SVGs marked aria-hidden.
- OS mediaSession API wired so headset / lockscreen / Bluetooth buttons
  drive play/pause/next/prev/seek and show track metadata + artwork.

Observability
- X-Request-ID middleware (accept upstream id if it matches a safe regex,
  otherwise UUID4); request_id_var added to ContextVars and included in
  every log line alongside the token label.
- Audit log (append-only JSONL) for every script + callback execution,
  including the on_play/on_pause/etc. event callbacks. Background-thread
  writer; queue capped; flushed in lifespan teardown.

Deployment
- proxy_headers + forwarded_allow_ips plumbed through Settings →
  uvicorn.Config for reverse-proxy installs.
- HTTPS support via ssl_certfile + ssl_keyfile (+ optional password);
  startup refuses to launch with only one of the pair set.
- Thumbnail cache moved from project-root .cache to
  %LOCALAPPDATA%/media-server/cache (Windows) and
  $XDG_CACHE_HOME/media-server/thumbnails (POSIX).

Tests
- 35 new tests across auth scopes, rate limiter, browser path traversal
  (../ NUL UNC absolute), script-param validation incl. regex, Gitea tag
  whitelist, config atomic write + POSIX perms. 47 passed / 4 skipped.
2026-05-22 22:25:54 +03:00

589 lines
21 KiB
Python

"""Media Server - FastAPI application entry point."""
import argparse
import logging
import socket
import sys
from contextlib import asynccontextmanager
from pathlib import Path
import uvicorn
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from . import __version__
from .auth import get_token_label, request_id_var, token_label_var
from .config import generate_default_config, get_config_dir, settings
from .routes import (
audio_router,
browser_router,
callbacks_router,
display_router,
foreground_router,
health_router,
links_router,
media_router,
scripts_router,
)
from .services import get_media_controller
from .services.websocket_manager import ws_manager
class TokenLabelFilter(logging.Filter):
"""Add token label + request_id to log records."""
def filter(self, record):
record.token_label = token_label_var.get("unknown")
record.request_id = request_id_var.get("-")
return True
class _StripTokenQueryFilter(logging.Filter):
"""Strip `token=...` from query strings before they hit the access log.
uvicorn's default access log format includes the full request line, so
`/api/media/artwork?token=SECRET` would otherwise be persisted verbatim
in stdout/journald/file sinks.
"""
import re as _re
_TOKEN_RE = _re.compile(r"([?&])token=[^&\s\"']+")
def filter(self, record): # type: ignore[override]
if isinstance(record.args, tuple):
record.args = tuple(
self._TOKEN_RE.sub(r"\1token=REDACTED", a) if isinstance(a, str) else a
for a in record.args
)
if isinstance(record.msg, str) and "token=" in record.msg:
record.msg = self._TOKEN_RE.sub(r"\1token=REDACTED", record.msg)
return True
def setup_logging():
"""Configure application logging with token labels."""
# Create filter and handler
token_filter = TokenLabelFilter()
handler = logging.StreamHandler(sys.stdout)
handler.addFilter(token_filter)
logging.basicConfig(
level=getattr(logging, settings.log_level.upper()),
format=(
"%(asctime)s - %(name)s - [%(token_label)s] [%(request_id)s]"
" - %(levelname)s - %(message)s"
),
handlers=[handler],
)
# Suppress noisy third-party loggers
logging.getLogger("screen_brightness_control").setLevel(logging.ERROR)
# Make sure the uvicorn access log never persists tokens leaked into the
# query string (the artwork + WS endpoints accept `?token=` for browser
# compatibility — see verify_token_or_query).
strip_filter = _StripTokenQueryFilter()
for name in ("uvicorn.access", "uvicorn"):
logging.getLogger(name).addFilter(strip_filter)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler.
All long-lived resources started during startup are kept in local refs and
torn down in a `finally:` so a partial-startup failure cannot orphan tasks
or thread pools.
"""
import asyncio
setup_logging()
logger = logging.getLogger(__name__)
logger.info(f"Media Server starting on {settings.host}:{settings.port}")
# Log authentication status — never log full or partial token material.
if settings.api_tokens:
labels = ", ".join(settings.api_tokens.keys())
logger.info(f"Authentication enabled. Tokens configured: [{labels}]")
else:
logger.warning("No API tokens configured — authentication is DISABLED")
update_checker = None
cleanup_task: asyncio.Task | None = None
analyzer = None
status_monitor_started = False
try:
# Start WebSocket status monitor
controller = get_media_controller()
await ws_manager.start_status_monitor(controller.get_status)
status_monitor_started = True
logger.info("WebSocket status monitor started")
# Start update checker
if settings.update_check_enabled:
from .services.gitea_release_provider import GiteaReleaseProvider
from .services.update_checker import UpdateChecker
provider = GiteaReleaseProvider()
update_checker = UpdateChecker(provider, __version__)
await update_checker.start(settings.update_check_interval)
# Store globally so health endpoint can access cached result
app.state.update_checker = update_checker
# Schedule periodic thumbnail cache cleanup so the 500 MB cap is actually
# enforced. Runs once at startup and then hourly until shutdown.
from .services.thumbnail_service import ThumbnailService
async def _thumbnail_cleanup_loop() -> None:
while True:
try:
await asyncio.to_thread(ThumbnailService.cleanup_cache)
except Exception as e:
logger.warning("Thumbnail cache cleanup failed: %s", e)
try:
await asyncio.sleep(3600)
except asyncio.CancelledError:
break
cleanup_task = asyncio.create_task(_thumbnail_cleanup_loop())
# Register audio visualizer (capture starts on-demand when clients subscribe)
if settings.visualizer_enabled:
from .services.audio_analyzer import get_audio_analyzer
analyzer = get_audio_analyzer(
num_bins=settings.visualizer_bins,
target_fps=settings.visualizer_fps,
device_name=settings.visualizer_device,
)
if analyzer.available:
await ws_manager.start_audio_monitor(analyzer)
logger.info("Audio visualizer available (capture on-demand)")
else:
logger.info("Audio visualizer unavailable (install soundcard + numpy)")
yield
finally:
# Stop update checker
if update_checker is not None:
try:
await update_checker.stop()
except Exception:
logger.exception("Error stopping update checker")
# Cancel periodic thumbnail cleanup
if cleanup_task is not None:
cleanup_task.cancel()
try:
await cleanup_task
except asyncio.CancelledError:
pass
except Exception:
logger.exception("Error awaiting thumbnail cleanup task")
# Stop audio visualizer
try:
await ws_manager.stop_audio_monitor()
except Exception:
logger.exception("Error stopping audio monitor")
if analyzer and analyzer.running:
try:
analyzer.stop()
except Exception:
logger.exception("Error stopping audio analyzer")
# Stop WebSocket status monitor
if status_monitor_started:
try:
await ws_manager.stop_status_monitor()
except Exception:
logger.exception("Error stopping status monitor")
# Shut down dedicated thread pools so pending scripts don't leak threads
try:
from .routes.callbacks import shutdown_callback_executor
from .routes.scripts import shutdown_script_executor
shutdown_script_executor()
shutdown_callback_executor()
except Exception:
logger.exception("Error shutting down script/callback executors")
# Flush audit log writer
try:
from .services.audit_log import shutdown_audit_log
shutdown_audit_log()
except Exception:
logger.exception("Error flushing audit log")
# Clean up platform-specific resources
import platform as _platform
if _platform.system() == "Windows":
try:
from .services.windows_media import shutdown_executor
shutdown_executor()
except Exception:
logger.exception("Error shutting down windows_media executor")
logger.info("Media Server shutting down")
def create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
app = FastAPI(
title="Media Server",
description="REST API for controlling system media playback",
version=__version__,
lifespan=lifespan,
)
# Compress responses > 1KB
app.add_middleware(GZipMiddleware, minimum_size=1000)
# CORS — restrict to same-origin by default; users that integrate the API
# from another origin (e.g. Home Assistant on a different host) can set
# cors_origins in config.yaml. Refuse "*" outright: combined with the
# admin endpoints this would let any origin in the universe run
# arbitrary shell. If users genuinely need every origin, they can list
# them explicitly.
if any(o.strip() == "*" for o in settings.cors_origins):
raise RuntimeError(
"cors_origins must not contain '*' — list exact origins instead. "
"This protects the script-execution endpoints from any-origin abuse."
)
cors_origins = settings.cors_origins or [
f"http://localhost:{settings.port}",
f"http://127.0.0.1:{settings.port}",
]
app.add_middleware(
CORSMiddleware,
allow_origins=cors_origins,
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Authorization", "Content-Type"],
)
# Request correlation ID — accept upstream X-Request-ID if it's a sane
# ASCII id, otherwise mint a fresh UUID4. Emitted on the response so
# clients can quote it back in bug reports.
import re
import uuid as _uuid
_REQ_ID_RE = re.compile(r"^[A-Za-z0-9._\-]{1,128}$")
@app.middleware("http")
async def request_id_middleware(request: Request, call_next):
incoming = request.headers.get("x-request-id", "")
req_id = incoming if _REQ_ID_RE.match(incoming) else _uuid.uuid4().hex[:16]
request_id_var.set(req_id)
response = await call_next(request)
response.headers["X-Request-ID"] = req_id
return response
# Security headers — strict CSP for the bundled UI, disallow framing, hide referrer.
@app.middleware("http")
async def security_headers_middleware(request: Request, call_next):
response = await call_next(request)
response.headers.setdefault(
"Content-Security-Policy",
(
"default-src 'self'; "
"img-src 'self' data: blob: https://api.iconify.design; "
"connect-src 'self' https://api.iconify.design ws: wss:; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline'; "
"font-src 'self' data:; "
"frame-ancestors 'none'; "
"form-action 'self'; "
"worker-src 'self'; "
"manifest-src 'self'; "
"base-uri 'self'"
),
)
response.headers.setdefault("X-Frame-Options", "DENY")
response.headers.setdefault("X-Content-Type-Options", "nosniff")
response.headers.setdefault("Referrer-Policy", "no-referrer")
return response
# Add token logging middleware + auth-failure rate limit
from fastapi.responses import JSONResponse
from .services.rate_limit import check as ratelimit_check
from .services.rate_limit import get_peer
@app.middleware("http")
async def token_logging_middleware(request: Request, call_next):
"""Extract token label, set in context, and rate-limit failed auths."""
if not settings.api_tokens:
token_label_var.set("anonymous")
else:
token_label = "unknown"
token_present = False
token_valid = False
# Try Authorization header
auth_header = request.headers.get("authorization", "")
if auth_header.startswith("Bearer "):
token_present = True
token = auth_header[7:]
label = get_token_label(token)
if label:
token_label = label
token_valid = True
# Try query parameter (for artwork endpoint)
elif "token" in request.query_params:
token_present = True
token = request.query_params["token"]
label = get_token_label(token)
if label:
token_label = label
token_valid = True
token_label_var.set(token_label)
# Brute-force gate: a peer that produces a wrong/missing token gets
# 5 failures per minute before being throttled. Static-asset
# requests (GET /static/*, /, /sw.js) and the docs endpoint are
# exempt — they're served unauthenticated by design.
if token_present and not token_valid:
path = request.url.path
if not (
path == "/" or path == "/sw.js"
or path.startswith("/static/")
or path.startswith("/docs") or path.startswith("/openapi")
or path.startswith("/redoc")
):
allowed, retry_after = ratelimit_check("auth", get_peer(request))
if not allowed:
return JSONResponse(
status_code=429,
content={"detail": "Too many authentication failures"},
headers={"Retry-After": str(int(retry_after or 60))},
)
response = await call_next(request)
return response
# Register routers
app.include_router(audio_router)
app.include_router(browser_router)
app.include_router(callbacks_router)
app.include_router(display_router)
app.include_router(foreground_router)
app.include_router(health_router)
app.include_router(links_router)
app.include_router(media_router)
app.include_router(scripts_router)
# Mount static files and serve UI at root
static_dir = Path(__file__).parent / "static"
if static_dir.exists():
@app.get("/sw.js", include_in_schema=False)
async def serve_service_worker():
"""Serve service worker from root scope for PWA installability."""
return FileResponse(
static_dir / "sw.js",
media_type="application/javascript",
headers={"Cache-Control": "no-cache"},
)
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
@app.get("/", include_in_schema=False)
async def serve_ui():
"""Serve the Web UI."""
return FileResponse(static_dir / "index.html")
else:
logging.getLogger(__name__).warning(
"static_dir not found at %s — Web UI disabled (API only)",
static_dir,
)
return app
app = create_app()
def main():
"""Main entry point for running the server."""
parser = argparse.ArgumentParser(description="Media Server")
parser.add_argument(
"--host",
default=settings.host,
help=f"Host to bind to (default: {settings.host})",
)
parser.add_argument(
"--port",
type=int,
default=settings.port,
help=f"Port to bind to (default: {settings.port})",
)
parser.add_argument(
"--generate-config",
action="store_true",
help="Generate a default configuration file and exit",
)
parser.add_argument(
"--show-token",
action="store_true",
help="Show the current API token and exit",
)
parser.add_argument(
"--no-tray",
action="store_true",
help="Disable system tray icon (for headless/service mode)",
)
args = parser.parse_args()
if args.generate_config:
config_path = generate_default_config()
print(f"Configuration file generated at: {config_path}")
print("A random API token was generated under api_tokens.default.")
print("Run `python -m media_server.main --show-token` to view it.")
return
if args.show_token:
print(f"Config directory: {get_config_dir()}")
if settings.api_tokens:
print("\nAPI Tokens:")
for label, spec in settings.api_tokens.items():
scope_str = ",".join(spec.scopes)
print(f" {label:20} {spec.token} [scopes: {scope_str}]")
else:
print("\nAuthentication is DISABLED (no tokens configured)")
return
# Stderr is invisible when launched via wscript / pythonw (Start Menu shortcut,
# autostart). Mirror pre-uvicorn failures to a file in the config dir so the
# next silent boot failure is diagnosable.
def _fatal(msg: str, exit_code: int = 1) -> None:
print(msg, file=sys.stderr)
try:
log_path = get_config_dir() / "startup-errors.log"
from datetime import datetime
with open(log_path, "a", encoding="utf-8") as f:
f.write(f"[{datetime.now().isoformat(timespec='seconds')}] {msg}\n")
except OSError:
pass
sys.exit(exit_code)
# First-run bootstrap: if no config has ever been written, generate one
# with a random token instead of starting in the insecure "no-auth" mode.
config_path = get_config_dir() / "config.yaml"
if not config_path.exists() and not settings.api_tokens:
try:
generate_default_config(config_path)
_fatal(
f"\nFirst run: generated default config at {config_path}.\n"
"Run --show-token to retrieve the API token, then restart.",
exit_code=0,
)
except OSError as e:
print(f"WARNING: could not bootstrap config: {e}", file=sys.stderr)
# Refuse to bind a non-loopback address with no tokens, unless explicitly opted in.
non_loopback = args.host not in ("127.0.0.1", "localhost", "::1")
if non_loopback and not settings.api_tokens and not settings.allow_lan_without_auth:
_fatal(
"ERROR: refusing to bind a non-loopback address with no api_tokens configured.\n"
"Either set api_tokens in config.yaml, bind to 127.0.0.1,"
" or set allow_lan_without_auth: true in config.yaml to override."
)
# Check if port is available before starting
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
try:
sock.bind((args.host if args.host != "0.0.0.0" else "127.0.0.1", args.port))
except OSError:
_fatal(
f"ERROR: Port {args.port} is already in use. "
f"Another instance of Media Server may be running.\n"
f"Stop the other process or use --port to pick a different port."
)
from .tray import PYSTRAY_AVAILABLE, TrayManager
use_tray = PYSTRAY_AVAILABLE and not args.no_tray
# Validate TLS pair consistency before either path so we don't fail late.
if bool(settings.ssl_certfile) ^ bool(settings.ssl_keyfile):
_fatal(
"ERROR: ssl_certfile and ssl_keyfile must both be set, or both unset."
)
def _uvicorn_kwargs() -> dict:
kw: dict = {
"host": args.host,
"port": args.port,
"log_level": settings.log_level.lower(),
"proxy_headers": settings.proxy_headers,
"forwarded_allow_ips": settings.forwarded_allow_ips,
}
if settings.ssl_certfile and settings.ssl_keyfile:
kw["ssl_certfile"] = settings.ssl_certfile
kw["ssl_keyfile"] = settings.ssl_keyfile
if settings.ssl_keyfile_password:
kw["ssl_keyfile_password"] = settings.ssl_keyfile_password
return kw
if use_tray:
import asyncio
import threading
# Run uvicorn in a background thread so tray owns the main thread message loop
uv_config = uvicorn.Config(
"media_server.main:app",
**_uvicorn_kwargs(),
)
server = uvicorn.Server(uv_config)
def run_server():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(server.serve())
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()
# Tray on main thread (blocking)
tray = TrayManager(
port=args.port,
on_exit=lambda: setattr(server, "should_exit", True),
)
tray.run()
# Tray exited — wait for server to finish graceful shutdown
server_thread.join(timeout=10)
if tray.restart_requested:
import subprocess
# Always restart via `python -m media_server.main` — this works
# regardless of how we were originally started (console_script,
# python -m, or direct script invocation).
cmd = [sys.executable, "-m", "media_server.main"]
subprocess.Popen(
cmd,
cwd=Path.cwd(),
start_new_session=True,
)
else:
uvicorn.run(
"media_server.main:app",
reload=False,
**_uvicorn_kwargs(),
)
if __name__ == "__main__":
main()