Files
media-player-server/media_server/routes/scripts.py
T
alexei.dolgolyov bcc6d40ed7
Lint & Test / test (push) Successful in 20s
fix: comprehensive security, bug, performance, and UI/UX audit
Security
- Default bind 127.0.0.1; first-run bootstrap generates random api_token
  and refuses to bind non-loopback without auth unless explicitly opted in
- Path-traversal hardened: BrowserService.validate_path rejects absolute
  paths, drive letters, UNC, NUL bytes. /api/browser/{play,metadata,
  thumbnail} now require folder_id and a folder-relative path
- Pydantic validators on links: http(s) URLs only, mdi:<slug> icons only
- Scripts/callbacks/links create/update/delete gated by *_management flags
- Strict CSP, X-Frame-Options DENY, Referrer-Policy no-referrer,
  X-Content-Type-Options nosniff
- CORS locked to localhost:<port> + 127.0.0.1:<port> by default; configurable
- config.yaml writes atomic (tmp + os.replace) and 0o600 on POSIX
- Subprocesses spawned in their own process group / new session so timeout
  kills the whole tree (Windows CREATE_NEW_PROCESS_GROUP, POSIX
  start_new_session=True)
- Frontend XSS: monitor name + details escapeHtml'd; power button moved to
  delegated data-action handler; remote MDI SVGs parsed and sanitized
  (strip script/foreignObject/on*/javascript: hrefs) before innerHTML
- All dynamic URL segments now wrapped in encodeURIComponent

Bugs
- WebSocket reconnect: close previous socket before opening new, clear
  ping interval per-socket, clear reconnectTimeout up-front, retry on
  online/visibilitychange, try/catch JSON.parse
- Artwork fetch race: AbortController + generation guard
- _broadcast_after_open: initialize status, swallow per-poll errors,
  background tasks tracked in a strong-ref set with done-callback cleanup
- Audio analyzer: sticky _unavailable flag prevents infinite start/stop
  spin when no loopback device exists; cleared by set_device()
- Volume short-circuit cache invalidated when server reports remote volume
- Browser thumbnail race: per-folder generation counter + isConnected
  checks; aborts in-flight fetches on navigation
- Track-skip uses cached title instead of full WinRT status round-trip

Performance
- Linux MPRIS/pactl and /api/display DDC-CI handlers wrapped in
  asyncio.to_thread so blocking IO never stalls the event loop
- browse_directory moved off the event loop (SMB shares could freeze it)
- Windows status poll caches one asyncio loop per worker thread via
  threading.local instead of new_event_loop/close on every 0.5s tick
- broadcast() serializes JSON once and uses send_text to all clients
- Hourly thumbnail cache cleanup scheduled in lifespan (was never invoked
  — cache grew unbounded)
- Progress drag listeners attached only while dragging

Quality
- All asyncio.get_event_loop() in coroutines → get_running_loop()
- ThreadPoolExecutors shut down cleanly during lifespan teardown
- config_manager dedup: 12 near-identical methods collapsed onto generic
  _upsert/_delete helpers (~290 lines removed)
- Service worker no longer pass-throughs every fetch
- M3U playlist written via NamedTemporaryFile (no fixed-path symlink
  clobber race)
- __version__ now prefers live pyproject.toml in dev checkouts so
  pip install -e . users see the source-of-truth version, not the stale
  package-metadata version baked in at install time

UI/UX (Studio Reference)
- Green leftover focus rings (rgba(29,185,84,...)) all replaced with
  copper accent (rgba(var(--copper-rgb),...))
- Dialogs: square corners, copper top hairline, unified with editorial
  chrome
- .browser-item: transparent with copper hover border (was filled card)
- Audio device select uses var(--sans) instead of generic system font
- Mobile container padding tuned for ≤480px screens
- Breadcrumb home is a real <button> with aria-label; aria-current on root
- i18n: filled display.msg.power_*, execution.*, scripts.params.execute,
  callbacks.empty in both en + ru
2026-05-16 13:22:46 +03:00

620 lines
20 KiB
Python

"""Script execution API endpoints."""
import asyncio
import logging
import os
import re
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
from ..auth import verify_token
from ..config import ScriptConfig, ScriptParameterConfig, settings
from ..config_manager import config_manager
from ..services.websocket_manager import ws_manager
router = APIRouter(prefix="/api/scripts", tags=["scripts"])
# Dedicated executor for script/subprocess execution (avoids blocking the default pool)
_script_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="script")
logger = logging.getLogger(__name__)
def shutdown_script_executor() -> None:
"""Shut down the dedicated executor cleanly on application teardown."""
_script_executor.shutdown(wait=False, cancel_futures=True)
def _require_scripts_management() -> None:
if not settings.scripts_management:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=(
"Scripts management is disabled. Set scripts_management: true"
" in config.yaml to enable."
),
)
class ScriptExecuteRequest(BaseModel):
"""Request model for script execution with optional parameters."""
params: dict[str, str | int | float | bool] = Field(
default_factory=dict, description="Named parameters (validated against script schema)"
)
class ScriptExecuteResponse(BaseModel):
"""Response model for script execution."""
success: bool
script: str
exit_code: int | None = None
stdout: str = ""
stderr: str = ""
error: str | None = None
execution_time: float | None = None
class ScriptParameterInfo(BaseModel):
"""Information about a script parameter."""
type: str
description: str = ""
required: bool = False
default: str | int | float | bool | None = None
min: float | None = None
max: float | None = None
options: list[str] | None = None
class ScriptInfo(BaseModel):
"""Information about an available script."""
name: str
label: str
command: str
description: str
icon: str | None = None
timeout: int
parameters: dict[str, ScriptParameterInfo] = Field(default_factory=dict)
@router.get("/list")
async def list_scripts(_: str = Depends(verify_token)) -> list[ScriptInfo]:
"""List all available scripts.
Returns:
List of available scripts with their descriptions
"""
return [
ScriptInfo(
name=name,
label=config.label or name.replace("_", " ").title(),
command=config.command,
description=config.description,
icon=config.icon,
timeout=config.timeout,
parameters={
pname: ScriptParameterInfo(**pconfig.model_dump())
for pname, pconfig in config.parameters.items()
},
)
for name, config in settings.scripts.items()
]
def _validate_params(
params: dict[str, str | int | float | bool],
param_defs: dict[str, ScriptParameterConfig],
) -> dict[str, str]:
"""Validate parameters against script schema and return env vars.
Args:
params: User-supplied parameter values.
param_defs: Parameter definitions from script config.
Returns:
Dict of environment variables (SCRIPT_PARAM_<NAME> -> str value).
Raises:
HTTPException: On validation failure.
"""
# Reject unknown parameters
unknown = set(params.keys()) - set(param_defs.keys())
if unknown:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unknown parameters: {', '.join(sorted(unknown))}",
)
env_vars: dict[str, str] = {}
for pname, pdef in param_defs.items():
value = params.get(pname)
# Apply default if missing
if value is None and pdef.default is not None:
value = pdef.default
# Check required
if value is None:
if pdef.required:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Required parameter '{pname}' is missing",
)
continue
# Type validation and coercion
if pdef.type == "integer":
try:
value = int(value)
except (TypeError, ValueError):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Parameter '{pname}' must be an integer, got: {value!r}",
)
if pdef.min is not None and value < pdef.min:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Parameter '{pname}' must be >= {pdef.min}, got: {value}",
)
if pdef.max is not None and value > pdef.max:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Parameter '{pname}' must be <= {pdef.max}, got: {value}",
)
elif pdef.type == "float":
try:
value = float(value)
except (TypeError, ValueError):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Parameter '{pname}' must be a number, got: {value!r}",
)
if pdef.min is not None and value < pdef.min:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Parameter '{pname}' must be >= {pdef.min}, got: {value}",
)
if pdef.max is not None and value > pdef.max:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Parameter '{pname}' must be <= {pdef.max}, got: {value}",
)
elif pdef.type == "boolean":
if isinstance(value, str):
if value.lower() in ("true", "1", "yes"):
value = True
elif value.lower() in ("false", "0", "no"):
value = False
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Parameter '{pname}' must be a boolean, got: {value!r}",
)
elif not isinstance(value, bool):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Parameter '{pname}' must be a boolean, got: {value!r}",
)
elif pdef.type == "select":
value = str(value)
if pdef.options and value not in pdef.options:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Parameter '{pname}' must be one of {pdef.options}, got: {value!r}",
)
else:
# string — just convert to str
value = str(value)
env_vars[f"SCRIPT_PARAM_{pname.upper()}"] = str(value)
return env_vars
@router.post("/execute/{script_name}")
async def execute_script(
script_name: str,
request: ScriptExecuteRequest | None = None,
_: str = Depends(verify_token),
) -> ScriptExecuteResponse:
"""Execute a pre-defined script by name.
Args:
script_name: Name of the script to execute (must be defined in config)
request: Optional parameters to pass to the script
Returns:
Execution result including stdout, stderr, and exit code
"""
# Check if script exists
if script_name not in settings.scripts:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Script '{script_name}' not found. Use /api/scripts/list to see available scripts.",
)
script_config = settings.scripts[script_name]
params = request.params if request else {}
# Validate parameters and build env vars
extra_env = _validate_params(params, script_config.parameters)
logger.info(f"Executing script: {script_name}")
try:
# Execute in dedicated thread pool to not block the default executor
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
_script_executor,
lambda: _run_script(
command=script_config.command,
timeout=script_config.timeout,
shell=script_config.shell,
working_dir=script_config.working_dir,
extra_env=extra_env,
),
)
return ScriptExecuteResponse(
success=result["exit_code"] == 0,
script=script_name,
exit_code=result["exit_code"],
stdout=result["stdout"],
stderr=result["stderr"],
execution_time=result.get("execution_time"),
)
except Exception as e:
logger.error(f"Script execution error: {e}")
return ScriptExecuteResponse(
success=False,
script=script_name,
error=str(e),
)
def _run_script(
command: str,
timeout: int,
shell: bool,
working_dir: str | None,
extra_env: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Run a script synchronously.
Args:
command: Command to execute
timeout: Timeout in seconds
shell: Whether to run in shell
working_dir: Working directory
extra_env: Additional environment variables (e.g. SCRIPT_PARAM_*)
Returns:
Dict with exit_code, stdout, stderr, execution_time
"""
start_time = time.time()
env = None
if extra_env:
env = {**os.environ, **extra_env}
# Spawn the script in its own process group / job so a timeout kills the
# whole tree, not just the shell (POSIX) and not just the parent (Windows).
popen_kwargs: dict[str, Any] = {}
if sys.platform == "win32":
popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
else:
popen_kwargs["start_new_session"] = True
try:
result = subprocess.run(
command,
shell=shell,
cwd=working_dir,
capture_output=True,
text=True,
timeout=timeout,
env=env,
**popen_kwargs,
)
execution_time = time.time() - start_time
return {
"exit_code": result.returncode,
"stdout": result.stdout[:10000], # Limit output size
"stderr": result.stderr[:10000],
"execution_time": execution_time,
}
except subprocess.TimeoutExpired:
execution_time = time.time() - start_time
return {
"exit_code": -1,
"stdout": "",
"stderr": f"Script timed out after {timeout} seconds",
"execution_time": execution_time,
}
except Exception as e:
execution_time = time.time() - start_time
return {
"exit_code": -1,
"stdout": "",
"stderr": str(e),
"execution_time": execution_time,
}
# Script management endpoints
class ScriptParameterCreateRequest(BaseModel):
"""Request model for a script parameter definition."""
type: str = Field(
..., description="Parameter type: string, integer, float, boolean, select"
)
description: str = Field(default="", description="Parameter description")
required: bool = Field(default=False, description="Whether the parameter is required")
default: str | int | float | bool | None = Field(
default=None, description="Default value if not provided"
)
min: float | None = Field(default=None, description="Minimum value (numeric types only)")
max: float | None = Field(default=None, description="Maximum value (numeric types only)")
options: list[str] | None = Field(
default=None, description="Allowed values (select type only)"
)
class ScriptCreateRequest(BaseModel):
"""Request model for creating or updating a script."""
command: str = Field(..., description="Command to execute", min_length=1)
label: str | None = Field(default=None, description="User-friendly label")
description: str = Field(default="", description="Script description")
icon: str | None = Field(default=None, description="Custom MDI icon (e.g., 'mdi:power')")
timeout: int = Field(default=30, description="Execution timeout in seconds", ge=1, le=300)
working_dir: str | None = Field(default=None, description="Working directory")
shell: bool = Field(default=True, description="Run command in shell")
parameters: dict[str, ScriptParameterCreateRequest] = Field(
default_factory=dict, description="Named parameters with type and validation rules"
)
def _validate_parameter_definitions(
parameters: dict[str, ScriptParameterCreateRequest],
) -> None:
"""Validate parameter definitions are well-formed.
Args:
parameters: Parameter definitions to validate.
Raises:
HTTPException: If any definition is invalid.
"""
valid_types = {"string", "integer", "float", "boolean", "select"}
for pname, pdef in parameters.items():
if not re.match(r"^[a-zA-Z][a-zA-Z0-9_]*$", pname):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"Parameter name '{pname}' must start with a letter"
" and contain only alphanumeric characters and underscores"
),
)
if pdef.type not in valid_types:
allowed = ", ".join(sorted(valid_types))
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Parameter '{pname}' has invalid type '{pdef.type}'. Must be one of: {allowed}",
)
if pdef.type == "select":
if not pdef.options or len(pdef.options) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Parameter '{pname}' of type 'select' must have a non-empty 'options' list",
)
if pdef.type not in ("integer", "float"):
if pdef.min is not None or pdef.max is not None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Parameter '{pname}': 'min'/'max' are only valid for integer/float types",
)
if pdef.min is not None and pdef.max is not None and pdef.min > pdef.max:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Parameter '{pname}': 'min' ({pdef.min}) must be <= 'max' ({pdef.max})",
)
def _validate_script_name(name: str) -> None:
"""Validate script name.
Args:
name: Script name to validate.
Raises:
HTTPException: If name is invalid.
"""
if not name:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Script name cannot be empty",
)
if not re.match(r"^[a-zA-Z0-9_]+$", name):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Script name must contain only alphanumeric characters and underscores",
)
if len(name) > 64:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Script name must be 64 characters or less",
)
@router.post("/create/{script_name}")
async def create_script(
script_name: str,
request: ScriptCreateRequest,
_: str = Depends(verify_token),
) -> dict[str, Any]:
"""Create a new script.
Args:
script_name: Name for the new script (alphanumeric + underscore only).
request: Script configuration.
Returns:
Success response with script name.
Raises:
HTTPException: If script already exists or name is invalid.
"""
_require_scripts_management()
_validate_script_name(script_name)
# Check if script already exists
if script_name in settings.scripts:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Script '{script_name}' already exists. Use PUT /api/scripts/update/{script_name} to update it.",
)
# Validate parameter definitions
_validate_parameter_definitions(request.parameters)
# Build ScriptConfig with ScriptParameterConfig instances
data = request.model_dump()
data["parameters"] = {
pname: ScriptParameterConfig(**pdef)
for pname, pdef in data.get("parameters", {}).items()
}
script_config = ScriptConfig(**data)
# Add to config file and in-memory
try:
config_manager.add_script(script_name, script_config)
except Exception as e:
logger.error(f"Failed to add script '{script_name}': {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to add script: {str(e)}",
)
# Notify WebSocket clients
await ws_manager.broadcast_scripts_changed()
logger.info(f"Script '{script_name}' created successfully")
return {"success": True, "script": script_name}
@router.put("/update/{script_name}")
async def update_script(
script_name: str,
request: ScriptCreateRequest,
_: str = Depends(verify_token),
) -> dict[str, Any]:
"""Update an existing script.
Args:
script_name: Name of the script to update.
request: Updated script configuration.
Returns:
Success response with script name.
Raises:
HTTPException: If script does not exist.
"""
_require_scripts_management()
_validate_script_name(script_name)
# Check if script exists
if script_name not in settings.scripts:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Script '{script_name}' not found. Use POST /api/scripts/create/{script_name} to create it.",
)
# Validate parameter definitions
_validate_parameter_definitions(request.parameters)
# Build ScriptConfig with ScriptParameterConfig instances
data = request.model_dump()
data["parameters"] = {
pname: ScriptParameterConfig(**pdef)
for pname, pdef in data.get("parameters", {}).items()
}
script_config = ScriptConfig(**data)
# Update config file and in-memory
try:
config_manager.update_script(script_name, script_config)
except Exception as e:
logger.error(f"Failed to update script '{script_name}': {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update script: {str(e)}",
)
# Notify WebSocket clients
await ws_manager.broadcast_scripts_changed()
logger.info(f"Script '{script_name}' updated successfully")
return {"success": True, "script": script_name}
@router.delete("/delete/{script_name}")
async def delete_script(
script_name: str,
_: str = Depends(verify_token),
) -> dict[str, Any]:
"""Delete a script.
Args:
script_name: Name of the script to delete.
Returns:
Success response with script name.
Raises:
HTTPException: If script does not exist.
"""
_require_scripts_management()
_validate_script_name(script_name)
# Check if script exists
if script_name not in settings.scripts:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Script '{script_name}' not found",
)
# Delete from config file and in-memory
try:
config_manager.delete_script(script_name)
except Exception as e:
logger.error(f"Failed to delete script '{script_name}': {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete script: {str(e)}",
)
# Notify WebSocket clients
await ws_manager.broadcast_scripts_changed()
logger.info(f"Script '{script_name}' deleted successfully")
return {"success": True, "script": script_name}