"""Media Server - FastAPI application entry point.""" import argparse import logging import socket import sys from contextlib import asynccontextmanager from pathlib import Path import uvicorn from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from . import __version__ from .auth import get_token_label, token_label_var from .config import generate_default_config, get_config_dir, settings from .routes import ( audio_router, browser_router, callbacks_router, display_router, health_router, links_router, media_router, scripts_router, ) from .services import get_media_controller from .services.websocket_manager import ws_manager class TokenLabelFilter(logging.Filter): """Add token label to log records.""" def filter(self, record): record.token_label = token_label_var.get("unknown") return True def setup_logging(): """Configure application logging with token labels.""" # Create filter and handler token_filter = TokenLabelFilter() handler = logging.StreamHandler(sys.stdout) handler.addFilter(token_filter) logging.basicConfig( level=getattr(logging, settings.log_level.upper()), format="%(asctime)s - %(name)s - [%(token_label)s] - %(levelname)s - %(message)s", handlers=[handler], ) # Suppress noisy third-party loggers logging.getLogger("screen_brightness_control").setLevel(logging.ERROR) @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan handler.""" setup_logging() logger = logging.getLogger(__name__) logger.info(f"Media Server starting on {settings.host}:{settings.port}") # Log authentication status if settings.api_tokens: for label, token in settings.api_tokens.items(): logger.info(f"API Token [{label}]: {token[:8]}...") else: logger.warning("No API tokens configured — authentication is DISABLED") # Start WebSocket status monitor controller = get_media_controller() await ws_manager.start_status_monitor(controller.get_status) logger.info("WebSocket status monitor started") # Start update checker update_checker = None if settings.update_check_enabled: from .services.gitea_release_provider import GiteaReleaseProvider from .services.update_checker import UpdateChecker provider = GiteaReleaseProvider() update_checker = UpdateChecker(provider, __version__) await update_checker.start(settings.update_check_interval) # Store globally so health endpoint can access cached result app.state.update_checker = update_checker # Register audio visualizer (capture starts on-demand when clients subscribe) analyzer = None if settings.visualizer_enabled: from .services.audio_analyzer import get_audio_analyzer analyzer = get_audio_analyzer( num_bins=settings.visualizer_bins, target_fps=settings.visualizer_fps, device_name=settings.visualizer_device, ) if analyzer.available: await ws_manager.start_audio_monitor(analyzer) logger.info("Audio visualizer available (capture on-demand)") else: logger.info("Audio visualizer unavailable (install soundcard + numpy)") yield # Stop update checker if update_checker is not None: await update_checker.stop() # Stop audio visualizer await ws_manager.stop_audio_monitor() if analyzer and analyzer.running: analyzer.stop() # Stop WebSocket status monitor await ws_manager.stop_status_monitor() # Clean up platform-specific resources import platform as _platform if _platform.system() == "Windows": from .services.windows_media import shutdown_executor shutdown_executor() logger.info("Media Server shutting down") def create_app() -> FastAPI: """Create and configure the FastAPI application.""" app = FastAPI( title="Media Server", description="REST API for controlling system media playback", version=__version__, lifespan=lifespan, ) # Compress responses > 1KB app.add_middleware(GZipMiddleware, minimum_size=1000) # Add CORS middleware for cross-origin requests # Token auth is via Authorization header, not cookies, so credentials are not needed app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) # Add token logging middleware @app.middleware("http") async def token_logging_middleware(request: Request, call_next): """Extract token label and set in context for logging.""" if not settings.api_tokens: token_label_var.set("anonymous") else: token_label = "unknown" # Try Authorization header auth_header = request.headers.get("authorization", "") if auth_header.startswith("Bearer "): token = auth_header[7:] label = get_token_label(token) if label: token_label = label # Try query parameter (for artwork endpoint) elif "token" in request.query_params: token = request.query_params["token"] label = get_token_label(token) if label: token_label = label token_label_var.set(token_label) response = await call_next(request) return response # Register routers app.include_router(audio_router) app.include_router(browser_router) app.include_router(callbacks_router) app.include_router(display_router) app.include_router(health_router) app.include_router(links_router) app.include_router(media_router) app.include_router(scripts_router) # Mount static files and serve UI at root static_dir = Path(__file__).parent / "static" if static_dir.exists(): @app.get("/sw.js", include_in_schema=False) async def serve_service_worker(): """Serve service worker from root scope for PWA installability.""" return FileResponse( static_dir / "sw.js", media_type="application/javascript", headers={"Cache-Control": "no-cache"}, ) app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") @app.get("/", include_in_schema=False) async def serve_ui(): """Serve the Web UI.""" return FileResponse(static_dir / "index.html") return app app = create_app() def main(): """Main entry point for running the server.""" parser = argparse.ArgumentParser(description="Media Server") parser.add_argument( "--host", default=settings.host, help=f"Host to bind to (default: {settings.host})", ) parser.add_argument( "--port", type=int, default=settings.port, help=f"Port to bind to (default: {settings.port})", ) parser.add_argument( "--generate-config", action="store_true", help="Generate a default configuration file and exit", ) parser.add_argument( "--show-token", action="store_true", help="Show the current API token and exit", ) parser.add_argument( "--no-tray", action="store_true", help="Disable system tray icon (for headless/service mode)", ) args = parser.parse_args() if args.generate_config: config_path = generate_default_config() print(f"Configuration file generated at: {config_path}") print("Authentication is disabled by default. Add api_tokens to enable it.") return if args.show_token: print(f"Config directory: {get_config_dir()}") if settings.api_tokens: print("\nAPI Tokens:") for label, token in settings.api_tokens.items(): print(f" {label:20} {token}") else: print("\nAuthentication is DISABLED (no tokens configured)") return # Check if port is available before starting with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: try: sock.bind((args.host if args.host != "0.0.0.0" else "127.0.0.1", args.port)) except OSError: print( f"ERROR: Port {args.port} is already in use. " f"Another instance of Media Server may be running.\n" f"Stop the other process or use --port to pick a different port.", file=sys.stderr, ) sys.exit(1) from .tray import PYSTRAY_AVAILABLE, TrayManager use_tray = PYSTRAY_AVAILABLE and not args.no_tray if use_tray: import asyncio import threading # Run uvicorn in a background thread so tray owns the main thread message loop uv_config = uvicorn.Config( "media_server.main:app", host=args.host, port=args.port, log_level=settings.log_level.lower(), ) server = uvicorn.Server(uv_config) def run_server(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(server.serve()) server_thread = threading.Thread(target=run_server, daemon=True) server_thread.start() # Tray on main thread (blocking) tray = TrayManager( port=args.port, on_exit=lambda: setattr(server, "should_exit", True), ) tray.run() # Tray exited — wait for server to finish graceful shutdown server_thread.join(timeout=10) if tray.restart_requested: import subprocess # Always restart via `python -m media_server.main` — this works # regardless of how we were originally started (console_script, # python -m, or direct script invocation). cmd = [sys.executable, "-m", "media_server.main"] subprocess.Popen( cmd, cwd=Path.cwd(), start_new_session=True, ) else: uvicorn.run( "media_server.main:app", host=args.host, port=args.port, reload=False, ) if __name__ == "__main__": main()