"""Media Server - FastAPI application entry point.""" import argparse import logging import socket import sys from contextlib import asynccontextmanager from pathlib import Path import uvicorn from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from . import __version__ from .auth import get_token_label, token_label_var from .config import generate_default_config, get_config_dir, settings from .routes import ( audio_router, browser_router, callbacks_router, display_router, health_router, links_router, media_router, scripts_router, ) from .services import get_media_controller from .services.websocket_manager import ws_manager class TokenLabelFilter(logging.Filter): """Add token label to log records.""" def filter(self, record): record.token_label = token_label_var.get("unknown") return True def setup_logging(): """Configure application logging with token labels.""" # Create filter and handler token_filter = TokenLabelFilter() handler = logging.StreamHandler(sys.stdout) handler.addFilter(token_filter) logging.basicConfig( level=getattr(logging, settings.log_level.upper()), format="%(asctime)s - %(name)s - [%(token_label)s] - %(levelname)s - %(message)s", handlers=[handler], ) # Suppress noisy third-party loggers logging.getLogger("screen_brightness_control").setLevel(logging.ERROR) @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan handler.""" setup_logging() logger = logging.getLogger(__name__) logger.info(f"Media Server starting on {settings.host}:{settings.port}") # Log authentication status — never log full or partial token material. if settings.api_tokens: labels = ", ".join(settings.api_tokens.keys()) logger.info(f"Authentication enabled. Tokens configured: [{labels}]") else: logger.warning("No API tokens configured — authentication is DISABLED") # Start WebSocket status monitor controller = get_media_controller() await ws_manager.start_status_monitor(controller.get_status) logger.info("WebSocket status monitor started") # Start update checker update_checker = None if settings.update_check_enabled: from .services.gitea_release_provider import GiteaReleaseProvider from .services.update_checker import UpdateChecker provider = GiteaReleaseProvider() update_checker = UpdateChecker(provider, __version__) await update_checker.start(settings.update_check_interval) # Store globally so health endpoint can access cached result app.state.update_checker = update_checker # Schedule periodic thumbnail cache cleanup so the 500 MB cap is actually # enforced. Runs once at startup and then hourly until shutdown. from .services.thumbnail_service import ThumbnailService async def _thumbnail_cleanup_loop() -> None: while True: try: await asyncio.to_thread(ThumbnailService.cleanup_cache) except Exception as e: logger.warning("Thumbnail cache cleanup failed: %s", e) try: await asyncio.sleep(3600) except asyncio.CancelledError: break import asyncio cleanup_task = asyncio.create_task(_thumbnail_cleanup_loop()) # Register audio visualizer (capture starts on-demand when clients subscribe) analyzer = None if settings.visualizer_enabled: from .services.audio_analyzer import get_audio_analyzer analyzer = get_audio_analyzer( num_bins=settings.visualizer_bins, target_fps=settings.visualizer_fps, device_name=settings.visualizer_device, ) if analyzer.available: await ws_manager.start_audio_monitor(analyzer) logger.info("Audio visualizer available (capture on-demand)") else: logger.info("Audio visualizer unavailable (install soundcard + numpy)") yield # Stop update checker if update_checker is not None: await update_checker.stop() # Cancel periodic thumbnail cleanup cleanup_task.cancel() try: await cleanup_task except asyncio.CancelledError: pass # Stop audio visualizer await ws_manager.stop_audio_monitor() if analyzer and analyzer.running: analyzer.stop() # Stop WebSocket status monitor await ws_manager.stop_status_monitor() # Shut down dedicated thread pools so pending scripts don't leak threads from .routes.callbacks import shutdown_callback_executor from .routes.scripts import shutdown_script_executor shutdown_script_executor() shutdown_callback_executor() # Clean up platform-specific resources import platform as _platform if _platform.system() == "Windows": from .services.windows_media import shutdown_executor shutdown_executor() logger.info("Media Server shutting down") def create_app() -> FastAPI: """Create and configure the FastAPI application.""" app = FastAPI( title="Media Server", description="REST API for controlling system media playback", version=__version__, lifespan=lifespan, ) # Compress responses > 1KB app.add_middleware(GZipMiddleware, minimum_size=1000) # CORS — restrict to same-origin by default; users that integrate the API # from another origin (e.g. Home Assistant on a different host) can set # cors_origins in config.yaml. cors_origins = settings.cors_origins or [ f"http://localhost:{settings.port}", f"http://127.0.0.1:{settings.port}", ] app.add_middleware( CORSMiddleware, allow_origins=cors_origins, allow_credentials=False, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["Authorization", "Content-Type"], ) # Security headers — strict CSP for the bundled UI, disallow framing, hide referrer. @app.middleware("http") async def security_headers_middleware(request: Request, call_next): response = await call_next(request) response.headers.setdefault( "Content-Security-Policy", ( "default-src 'self'; " "img-src 'self' data: blob: https://api.iconify.design; " "connect-src 'self' https://api.iconify.design ws: wss:; " "script-src 'self'; " "style-src 'self' 'unsafe-inline'; " "font-src 'self' data:; " "frame-ancestors 'none'; " "base-uri 'self'" ), ) response.headers.setdefault("X-Frame-Options", "DENY") response.headers.setdefault("X-Content-Type-Options", "nosniff") response.headers.setdefault("Referrer-Policy", "no-referrer") return response # Add token logging middleware @app.middleware("http") async def token_logging_middleware(request: Request, call_next): """Extract token label and set in context for logging.""" if not settings.api_tokens: token_label_var.set("anonymous") else: token_label = "unknown" # Try Authorization header auth_header = request.headers.get("authorization", "") if auth_header.startswith("Bearer "): token = auth_header[7:] label = get_token_label(token) if label: token_label = label # Try query parameter (for artwork endpoint) elif "token" in request.query_params: token = request.query_params["token"] label = get_token_label(token) if label: token_label = label token_label_var.set(token_label) response = await call_next(request) return response # Register routers app.include_router(audio_router) app.include_router(browser_router) app.include_router(callbacks_router) app.include_router(display_router) app.include_router(health_router) app.include_router(links_router) app.include_router(media_router) app.include_router(scripts_router) # Mount static files and serve UI at root static_dir = Path(__file__).parent / "static" if static_dir.exists(): @app.get("/sw.js", include_in_schema=False) async def serve_service_worker(): """Serve service worker from root scope for PWA installability.""" return FileResponse( static_dir / "sw.js", media_type="application/javascript", headers={"Cache-Control": "no-cache"}, ) app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") @app.get("/", include_in_schema=False) async def serve_ui(): """Serve the Web UI.""" return FileResponse(static_dir / "index.html") return app app = create_app() def main(): """Main entry point for running the server.""" parser = argparse.ArgumentParser(description="Media Server") parser.add_argument( "--host", default=settings.host, help=f"Host to bind to (default: {settings.host})", ) parser.add_argument( "--port", type=int, default=settings.port, help=f"Port to bind to (default: {settings.port})", ) parser.add_argument( "--generate-config", action="store_true", help="Generate a default configuration file and exit", ) parser.add_argument( "--show-token", action="store_true", help="Show the current API token and exit", ) parser.add_argument( "--no-tray", action="store_true", help="Disable system tray icon (for headless/service mode)", ) args = parser.parse_args() if args.generate_config: config_path = generate_default_config() print(f"Configuration file generated at: {config_path}") print("A random API token was generated under api_tokens.default.") print("Run `python -m media_server.main --show-token` to view it.") return if args.show_token: print(f"Config directory: {get_config_dir()}") if settings.api_tokens: print("\nAPI Tokens:") for label, token in settings.api_tokens.items(): print(f" {label:20} {token}") else: print("\nAuthentication is DISABLED (no tokens configured)") return # First-run bootstrap: if no config has ever been written, generate one # with a random token instead of starting in the insecure "no-auth" mode. config_path = get_config_dir() / "config.yaml" if not config_path.exists() and not settings.api_tokens: try: generate_default_config(config_path) print( f"\nFirst run: generated default config at {config_path}.\n" "Run --show-token to retrieve the API token, then restart.", file=sys.stderr, ) sys.exit(0) except OSError as e: print(f"WARNING: could not bootstrap config: {e}", file=sys.stderr) # Refuse to bind a non-loopback address with no tokens, unless explicitly opted in. non_loopback = args.host not in ("127.0.0.1", "localhost", "::1") if non_loopback and not settings.api_tokens and not settings.allow_lan_without_auth: print( "ERROR: refusing to bind a non-loopback address with no api_tokens configured.\n" "Either set api_tokens in config.yaml, bind to 127.0.0.1," " or set allow_lan_without_auth: true in config.yaml to override.", file=sys.stderr, ) sys.exit(1) # Check if port is available before starting with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: try: sock.bind((args.host if args.host != "0.0.0.0" else "127.0.0.1", args.port)) except OSError: print( f"ERROR: Port {args.port} is already in use. " f"Another instance of Media Server may be running.\n" f"Stop the other process or use --port to pick a different port.", file=sys.stderr, ) sys.exit(1) from .tray import PYSTRAY_AVAILABLE, TrayManager use_tray = PYSTRAY_AVAILABLE and not args.no_tray if use_tray: import asyncio import threading # Run uvicorn in a background thread so tray owns the main thread message loop uv_config = uvicorn.Config( "media_server.main:app", host=args.host, port=args.port, log_level=settings.log_level.lower(), ) server = uvicorn.Server(uv_config) def run_server(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(server.serve()) server_thread = threading.Thread(target=run_server, daemon=True) server_thread.start() # Tray on main thread (blocking) tray = TrayManager( port=args.port, on_exit=lambda: setattr(server, "should_exit", True), ) tray.run() # Tray exited — wait for server to finish graceful shutdown server_thread.join(timeout=10) if tray.restart_requested: import subprocess # Always restart via `python -m media_server.main` — this works # regardless of how we were originally started (console_script, # python -m, or direct script invocation). cmd = [sys.executable, "-m", "media_server.main"] subprocess.Popen( cmd, cwd=Path.cwd(), start_new_session=True, ) else: uvicorn.run( "media_server.main:app", host=args.host, port=args.port, reload=False, ) if __name__ == "__main__": main()