"""Configuration management for the media server.""" import logging import os import secrets from pathlib import Path from typing import Optional import yaml from pydantic import BaseModel, Field, field_validator from pydantic_settings import BaseSettings, SettingsConfigDict logger = logging.getLogger(__name__) # Token scopes form a strict hierarchy: admin > control > read. Helper utility # used by both auth.py and the validator below. SCOPE_HIERARCHY: dict[str, frozenset[str]] = { "read": frozenset({"read"}), "control": frozenset({"read", "control"}), "admin": frozenset({"read", "control", "admin"}), } ALL_SCOPES: frozenset[str] = frozenset(SCOPE_HIERARCHY.keys()) class TokenSpec(BaseModel): """Per-token authentication entry with explicit scopes.""" token: str = Field(..., min_length=8, description="Secret token value") scopes: list[str] = Field( default_factory=lambda: ["admin"], description="Granted scopes (subset of read|control|admin).", ) @field_validator("scopes") @classmethod def _validate_scopes(cls, v: list[str]) -> list[str]: if not v: raise ValueError("scopes must list at least one of read|control|admin") unknown = set(v) - ALL_SCOPES if unknown: raise ValueError(f"unknown scopes: {sorted(unknown)}; valid={sorted(ALL_SCOPES)}") return v def grants(self, required: str) -> bool: """Whether this token grants the requested scope (with hierarchy expansion).""" granted: set[str] = set() for s in self.scopes: granted |= SCOPE_HIERARCHY.get(s, frozenset({s})) return required in granted class MediaFolderConfig(BaseModel): """Configuration for a media folder.""" path: str = Field(..., description="Absolute path to media folder") label: str = Field(..., description="Human-readable display label") enabled: bool = Field(default=True, description="Whether this folder is active") class CallbackConfig(BaseModel): """Configuration for a callback script (no label/description needed).""" command: str = Field(..., description="Command or script to execute") timeout: int = Field(default=30, description="Execution timeout in seconds", ge=1, le=300) working_dir: Optional[str] = Field(default=None, description="Working directory") shell: bool = Field(default=True, description="Run command in shell") class ScriptParameterConfig(BaseModel): """Configuration for a script parameter.""" type: str = Field( ..., description="Parameter type: string, integer, float, boolean, select", pattern=r"^(string|integer|float|boolean|select)$", ) description: str = Field(default="", description="Parameter description") required: bool = Field(default=False, description="Whether the parameter is required") default: Optional[str | int | float | bool] = Field( default=None, description="Default value if not provided" ) min: Optional[float] = Field(default=None, description="Minimum value (numeric types only)") max: Optional[float] = Field(default=None, description="Maximum value (numeric types only)") options: Optional[list[str]] = Field( default=None, description="Allowed values (select type only)" ) pattern: Optional[str] = Field( default=None, description=( "Optional regex (Python flavour) that string-typed values must match." " Use to harden parameters that flow into shell=true scripts." ), ) class ScriptConfig(BaseModel): """Configuration for a custom script.""" command: str = Field(..., description="Command or script to execute") label: Optional[str] = Field(default=None, description="User-friendly display label") description: str = Field(default="", description="Script description") icon: Optional[str] = Field(default=None, description="Custom icon (e.g., 'mdi:power')") timeout: int = Field(default=30, description="Execution timeout in seconds", ge=1, le=300) working_dir: Optional[str] = Field(default=None, description="Working directory") shell: bool = Field(default=True, description="Run command in shell") parameters: dict[str, ScriptParameterConfig] = Field( default_factory=dict, description="Named parameters with type and validation rules" ) class LinkConfig(BaseModel): """Configuration for a header quick link.""" url: str = Field(..., description="URL to open") icon: str = Field(default="mdi:link", description="MDI icon name (e.g., 'mdi:led-strip-variant')") label: str = Field(default="", description="Tooltip text") description: str = Field(default="", description="Optional description") class Settings(BaseSettings): """Application settings loaded from environment or config file.""" model_config = SettingsConfigDict( env_prefix="MEDIA_SERVER_", env_file=".env", env_file_encoding="utf-8", extra="ignore", ) # Server settings host: str = Field( default="127.0.0.1", description=( "Server bind address. Use 127.0.0.1 for loopback-only (default, safest)," " or 0.0.0.0 to expose on the LAN (requires api_tokens to be set)." ), ) port: int = Field(default=8765, description="Server port") allow_lan_without_auth: bool = Field( default=False, description=( "Allow binding to a non-loopback address with no api_tokens configured." " Off by default to prevent unauthenticated LAN exposure." ), ) cors_origins: list[str] = Field( default_factory=list, description=( "Allowed CORS origins. Empty (default) means only same-origin requests" " from http://localhost: and http://127.0.0.1:." ), ) # Reverse-proxy deployment: when serving the API behind nginx/Caddy/Traefik, # uvicorn must trust the X-Forwarded-* headers from the proxy so that the # `Origin` allow-list, request URLs, and logs reflect the public-facing # values. Off by default — only enable when there's a real proxy in front # (otherwise clients can spoof their own IP). proxy_headers: bool = Field( default=False, description="Honor X-Forwarded-For / X-Forwarded-Proto from upstream proxy.", ) forwarded_allow_ips: str = Field( default="127.0.0.1", description=( "Comma-separated IPs / CIDRs that uvicorn should trust X-Forwarded-* from." " Use '*' to trust all (only safe when bound to a private interface)." ), ) # HTTPS / TLS. Both must be set together to enable TLS; if only one is set # the server refuses to start. Use `mkcert` or letsencrypt to generate the # pair; the server reads them at startup. ssl_certfile: Optional[str] = Field( default=None, description="Path to TLS certificate (PEM). Pair with ssl_keyfile.", ) ssl_keyfile: Optional[str] = Field( default=None, description="Path to TLS private key (PEM). Pair with ssl_certfile.", ) ssl_keyfile_password: Optional[str] = Field( default=None, description="Optional password for the private key if encrypted.", ) # Admin-grade operations (script / callback / link / folder create/update/delete). # When True the same token used for read/play can also persist arbitrary shell # commands. Default False so a single leaked token cannot escalate to RCE; opt # in explicitly to manage scripts/callbacks/links via the Web UI. scripts_management: bool = Field(default=False, description="Allow scripts CRUD via API") callbacks_management: bool = Field(default=False, description="Allow callbacks CRUD via API") links_management: bool = Field(default=False, description="Allow links CRUD via API") # Authentication (empty = auth disabled, anyone can access the API). # # Each entry can be either: # • a bare string (legacy form, treated as scopes = ["admin"] for back-compat), OR # • a mapping with explicit scopes, e.g. # "ha": {token: "", scopes: ["read", "control"]} # "kiosk": {token: "", scopes: ["read"]} # "ops": {token: "", scopes: ["admin"]} # # Available scopes: # read — GET /api/* (status, list, browse) but no state-changing calls. # control — read + media transport, display/audio, script EXECUTE, callback EXECUTE. # admin — control + CRUD on scripts/callbacks/links/folders. # # Validation normalises both forms to TokenSpec at load time. api_tokens: dict[str, TokenSpec] = Field( default_factory=dict, description=( "Named API tokens. Value can be a bare token string (= admin scope) or" " a {token, scopes} mapping. See TokenSpec for scope definitions." ), ) @field_validator("api_tokens", mode="before") @classmethod def _normalise_tokens(cls, v): """Accept legacy `label: ` form and promote to TokenSpec.""" if not isinstance(v, dict): return v out: dict[str, dict | TokenSpec] = {} for label, entry in v.items(): if isinstance(entry, str): out[label] = {"token": entry, "scopes": ["admin"]} else: out[label] = entry return out # Media controller settings poll_interval: float = Field( default=1.0, description="Media status poll interval in seconds" ) # Audio device settings audio_device: Optional[str] = Field( default=None, description=( "Audio device name to control (None = default device)." " Use /api/audio/devices to list available devices." ), ) # Logging log_level: str = Field(default="INFO", description="Logging level") # Custom scripts (loaded separately from YAML) scripts: dict[str, ScriptConfig] = Field( default_factory=dict, description="Custom scripts that can be executed via API", ) # Callback scripts (executed by integration events, not shown in UI) callbacks: dict[str, CallbackConfig] = Field( default_factory=dict, description="Callback scripts executed by integration events (on_turn_on, on_turn_off, on_toggle)", ) # Media folders for browsing media_folders: dict[str, MediaFolderConfig] = Field( default_factory=dict, description="Media folders available for browsing in the media browser", ) media_folders_management: bool = Field( default=False, description="Allow adding, editing, and deleting media folders from the Web UI", ) # Thumbnail settings thumbnail_size: str = Field( default="medium", description='Thumbnail size: "small" (150x150), "medium" (300x300), or "both"', ) # Header quick links links: dict[str, LinkConfig] = Field( default_factory=dict, description="Quick links displayed as icons in the header", ) # Audio visualizer visualizer_enabled: bool = Field( default=True, description="Enable audio spectrum visualizer (requires soundcard + numpy)", ) visualizer_fps: int = Field( default=30, description="Visualizer update rate in frames per second", ge=10, le=60, ) visualizer_bins: int = Field( default=32, description="Number of frequency bins for the visualizer", ge=8, le=128, ) visualizer_device: Optional[str] = Field( default=None, description="Loopback audio device name for visualizer (None = auto-detect)", ) # Update checker update_check_enabled: bool = Field( default=True, description="Check for new versions on startup and periodically", ) update_check_interval: int = Field( default=21600, description="Update check interval in seconds (default: 6 hours)", ge=600, ) @classmethod def load_from_yaml(cls, path: Optional[Path] = None) -> "Settings": """Load settings from a YAML configuration file.""" if path is None: # Look for config in standard locations search_paths = [ Path("config.yaml"), Path("config.yml"), ] # Add platform-specific config directory if os.name == "nt": # Windows appdata = os.environ.get("APPDATA", "") if appdata: search_paths.append(Path(appdata) / "media-server" / "config.yaml") else: # Linux/Unix/macOS search_paths.append(Path.home() / ".config" / "media-server" / "config.yaml") search_paths.append(Path("/etc/media-server/config.yaml")) for search_path in search_paths: if search_path.exists(): path = search_path break if path and path.exists(): with open(path, "r", encoding="utf-8") as f: config_data = yaml.safe_load(f) or {} return cls(**config_data) return cls() def get_config_dir() -> Path: """Get the configuration directory path.""" if os.name == "nt": # Windows config_dir = Path(os.environ.get("APPDATA", "")) / "media-server" else: # Linux/Unix config_dir = Path.home() / ".config" / "media-server" config_dir.mkdir(parents=True, exist_ok=True) return config_dir def generate_default_config(path: Optional[Path] = None) -> Path: """Generate a default configuration file with a freshly generated API token. The token is written into ``api_tokens.default`` and printed to the logger so first-run users can copy it. Subsequent runs preserve whatever the user has set. """ if path is None: path = get_config_dir() / "config.yaml" default_token = secrets.token_urlsafe(32) config = { "host": "127.0.0.1", "port": 8765, # Default token grants "admin" scope (full access). To create a # read-only or control-only token, add a second entry: # ha_readonly: {token: "", scopes: ["read"]} "api_tokens": { "default": {"token": default_token, "scopes": ["admin"]}, }, "poll_interval": 1.0, "log_level": "INFO", "scripts": { "example_script": { "command": "echo Hello from Media Server!", "description": "Example script - echoes a message", "timeout": 10, "shell": True, }, }, } path.parent.mkdir(parents=True, exist_ok=True) _write_yaml_atomic(path, config) _restrict_config_perms(path) logger.info("Generated default config at %s", path) logger.info("API token (label=default): %s", default_token) return path def _write_yaml_atomic(path: Path, data: dict) -> None: """Write YAML to disk atomically via tmp file + rename, with restricted perms.""" tmp = path.with_suffix(path.suffix + ".tmp") with open(tmp, "w", encoding="utf-8") as f: yaml.dump(data, f, default_flow_style=False, sort_keys=False) _restrict_config_perms(tmp) os.replace(tmp, path) def _restrict_config_perms(path: Path) -> None: """Ensure config file is readable only by its owner. POSIX → ``chmod 0600``. On Windows the default NTFS ACL leaves the file readable by every interactive user on the machine (Users group has Read), which is bad given the file stores plaintext API tokens. Use ``icacls`` to grant exclusive access to the current user + SYSTEM + Administrators and strip inheritance. """ if os.name == "nt": _restrict_config_perms_windows(path) return try: os.chmod(path, 0o600) os.chmod(path.parent, 0o700) except OSError: logger.debug("Could not chmod %s", path, exc_info=True) def _restrict_config_perms_windows(path: Path) -> None: """Apply restrictive NTFS ACL to a config file (Windows only).""" import subprocess try: username = os.environ.get("USERNAME") or os.environ.get("USER") if not username: logger.debug("Cannot detect current user; skipping icacls hardening") return # Disable inheritance and remove every existing ACE, then grant access # only to current user, SYSTEM, and Administrators. /Q suppresses # progress output; /C lets per-file errors not abort the batch. subprocess.run( ["icacls", str(path), "/inheritance:r"], check=False, capture_output=True, timeout=5, ) for principal in (username, "SYSTEM", "Administrators"): subprocess.run( ["icacls", str(path), "/grant:r", f"{principal}:(R,W)"], check=False, capture_output=True, timeout=5, ) except (FileNotFoundError, subprocess.TimeoutExpired, OSError): # `icacls` missing or sandboxed — leave the default ACL in place. logger.debug("icacls hardening failed for %s", path, exc_info=True) # Global settings instance settings = Settings.load_from_yaml()