Files
media-player-server/media_server/main.py
T

335 lines
10 KiB
Python

"""Media Server - FastAPI application entry point."""
import argparse
import logging
import socket
import sys
from contextlib import asynccontextmanager
from pathlib import Path
import uvicorn
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from . import __version__
from .auth import get_token_label, token_label_var
from .config import generate_default_config, get_config_dir, settings
from .routes import (
audio_router,
browser_router,
callbacks_router,
display_router,
health_router,
links_router,
media_router,
scripts_router,
)
from .services import get_media_controller
from .services.websocket_manager import ws_manager
class TokenLabelFilter(logging.Filter):
"""Add token label to log records."""
def filter(self, record):
record.token_label = token_label_var.get("unknown")
return True
def setup_logging():
"""Configure application logging with token labels."""
# Create filter and handler
token_filter = TokenLabelFilter()
handler = logging.StreamHandler(sys.stdout)
handler.addFilter(token_filter)
logging.basicConfig(
level=getattr(logging, settings.log_level.upper()),
format="%(asctime)s - %(name)s - [%(token_label)s] - %(levelname)s - %(message)s",
handlers=[handler],
)
# Suppress noisy third-party loggers
logging.getLogger("screen_brightness_control").setLevel(logging.ERROR)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler."""
setup_logging()
logger = logging.getLogger(__name__)
logger.info(f"Media Server starting on {settings.host}:{settings.port}")
# Log authentication status
if settings.api_tokens:
for label, token in settings.api_tokens.items():
logger.info(f"API Token [{label}]: {token[:8]}...")
else:
logger.warning("No API tokens configured — authentication is DISABLED")
# Start WebSocket status monitor
controller = get_media_controller()
await ws_manager.start_status_monitor(controller.get_status)
logger.info("WebSocket status monitor started")
# Start update checker
update_checker = None
if settings.update_check_enabled:
from .services.gitea_release_provider import GiteaReleaseProvider
from .services.update_checker import UpdateChecker
provider = GiteaReleaseProvider()
update_checker = UpdateChecker(provider, __version__)
await update_checker.start(settings.update_check_interval)
# Store globally so health endpoint can access cached result
app.state.update_checker = update_checker
# Register audio visualizer (capture starts on-demand when clients subscribe)
analyzer = None
if settings.visualizer_enabled:
from .services.audio_analyzer import get_audio_analyzer
analyzer = get_audio_analyzer(
num_bins=settings.visualizer_bins,
target_fps=settings.visualizer_fps,
device_name=settings.visualizer_device,
)
if analyzer.available:
await ws_manager.start_audio_monitor(analyzer)
logger.info("Audio visualizer available (capture on-demand)")
else:
logger.info("Audio visualizer unavailable (install soundcard + numpy)")
yield
# Stop update checker
if update_checker is not None:
await update_checker.stop()
# Stop audio visualizer
await ws_manager.stop_audio_monitor()
if analyzer and analyzer.running:
analyzer.stop()
# Stop WebSocket status monitor
await ws_manager.stop_status_monitor()
# Clean up platform-specific resources
import platform as _platform
if _platform.system() == "Windows":
from .services.windows_media import shutdown_executor
shutdown_executor()
logger.info("Media Server shutting down")
def create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
app = FastAPI(
title="Media Server",
description="REST API for controlling system media playback",
version=__version__,
lifespan=lifespan,
)
# Compress responses > 1KB
app.add_middleware(GZipMiddleware, minimum_size=1000)
# Add CORS middleware for cross-origin requests
# Token auth is via Authorization header, not cookies, so credentials are not needed
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
# Add token logging middleware
@app.middleware("http")
async def token_logging_middleware(request: Request, call_next):
"""Extract token label and set in context for logging."""
if not settings.api_tokens:
token_label_var.set("anonymous")
else:
token_label = "unknown"
# Try Authorization header
auth_header = request.headers.get("authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
label = get_token_label(token)
if label:
token_label = label
# Try query parameter (for artwork endpoint)
elif "token" in request.query_params:
token = request.query_params["token"]
label = get_token_label(token)
if label:
token_label = label
token_label_var.set(token_label)
response = await call_next(request)
return response
# Register routers
app.include_router(audio_router)
app.include_router(browser_router)
app.include_router(callbacks_router)
app.include_router(display_router)
app.include_router(health_router)
app.include_router(links_router)
app.include_router(media_router)
app.include_router(scripts_router)
# Mount static files and serve UI at root
static_dir = Path(__file__).parent / "static"
if static_dir.exists():
@app.get("/sw.js", include_in_schema=False)
async def serve_service_worker():
"""Serve service worker from root scope for PWA installability."""
return FileResponse(
static_dir / "sw.js",
media_type="application/javascript",
headers={"Cache-Control": "no-cache"},
)
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
@app.get("/", include_in_schema=False)
async def serve_ui():
"""Serve the Web UI."""
return FileResponse(static_dir / "index.html")
return app
app = create_app()
def main():
"""Main entry point for running the server."""
parser = argparse.ArgumentParser(description="Media Server")
parser.add_argument(
"--host",
default=settings.host,
help=f"Host to bind to (default: {settings.host})",
)
parser.add_argument(
"--port",
type=int,
default=settings.port,
help=f"Port to bind to (default: {settings.port})",
)
parser.add_argument(
"--generate-config",
action="store_true",
help="Generate a default configuration file and exit",
)
parser.add_argument(
"--show-token",
action="store_true",
help="Show the current API token and exit",
)
parser.add_argument(
"--no-tray",
action="store_true",
help="Disable system tray icon (for headless/service mode)",
)
args = parser.parse_args()
if args.generate_config:
config_path = generate_default_config()
print(f"Configuration file generated at: {config_path}")
print("Authentication is disabled by default. Add api_tokens to enable it.")
return
if args.show_token:
print(f"Config directory: {get_config_dir()}")
if settings.api_tokens:
print("\nAPI Tokens:")
for label, token in settings.api_tokens.items():
print(f" {label:20} {token}")
else:
print("\nAuthentication is DISABLED (no tokens configured)")
return
# Check if port is available before starting
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
try:
sock.bind((args.host if args.host != "0.0.0.0" else "127.0.0.1", args.port))
except OSError:
print(
f"ERROR: Port {args.port} is already in use. "
f"Another instance of Media Server may be running.\n"
f"Stop the other process or use --port to pick a different port.",
file=sys.stderr,
)
sys.exit(1)
from .tray import PYSTRAY_AVAILABLE, TrayManager
use_tray = PYSTRAY_AVAILABLE and not args.no_tray
if use_tray:
import asyncio
import threading
# Run uvicorn in a background thread so tray owns the main thread message loop
uv_config = uvicorn.Config(
"media_server.main:app",
host=args.host,
port=args.port,
log_level=settings.log_level.lower(),
)
server = uvicorn.Server(uv_config)
def run_server():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(server.serve())
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()
# Tray on main thread (blocking)
tray = TrayManager(
port=args.port,
on_exit=lambda: setattr(server, "should_exit", True),
)
tray.run()
# Tray exited — wait for server to finish graceful shutdown
server_thread.join(timeout=10)
if tray.restart_requested:
import subprocess
# Always restart via `python -m media_server.main` — this works
# regardless of how we were originally started (console_script,
# python -m, or direct script invocation).
cmd = [sys.executable, "-m", "media_server.main"]
subprocess.Popen(
cmd,
cwd=Path.cwd(),
start_new_session=True,
)
else:
uvicorn.run(
"media_server.main:app",
host=args.host,
port=args.port,
reload=False,
)
if __name__ == "__main__":
main()