1410a8d2cb
- Add ScriptParameterConfig model (string, integer, float, boolean, select types) - Server-side validation at both define-time and execute-time - Parameters passed as SCRIPT_PARAM_* environment variables - Web UI parameter editor in script create/edit dialog (add/remove/reorder) - Icon-grid selector component (ported from wled-screen-controller) - Replace audio device dropdown with icon-grid selector - Replace callback event dropdown with icon-grid selector - Localization for parameter UI (en, ru)
593 lines
19 KiB
Python
593 lines
19 KiB
Python
"""Script execution API endpoints."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
import subprocess
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from typing import Any
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from pydantic import BaseModel, Field
|
|
|
|
from ..auth import verify_token
|
|
from ..config import ScriptConfig, ScriptParameterConfig, settings
|
|
from ..config_manager import config_manager
|
|
from ..services.websocket_manager import ws_manager
|
|
|
|
router = APIRouter(prefix="/api/scripts", tags=["scripts"])
|
|
|
|
# Dedicated executor for script/subprocess execution (avoids blocking the default pool)
|
|
_script_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="script")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ScriptExecuteRequest(BaseModel):
|
|
"""Request model for script execution with optional parameters."""
|
|
|
|
params: dict[str, str | int | float | bool] = Field(
|
|
default_factory=dict, description="Named parameters (validated against script schema)"
|
|
)
|
|
|
|
|
|
class ScriptExecuteResponse(BaseModel):
|
|
"""Response model for script execution."""
|
|
|
|
success: bool
|
|
script: str
|
|
exit_code: int | None = None
|
|
stdout: str = ""
|
|
stderr: str = ""
|
|
error: str | None = None
|
|
execution_time: float | None = None
|
|
|
|
|
|
class ScriptParameterInfo(BaseModel):
|
|
"""Information about a script parameter."""
|
|
|
|
type: str
|
|
description: str = ""
|
|
required: bool = False
|
|
default: str | int | float | bool | None = None
|
|
min: float | None = None
|
|
max: float | None = None
|
|
options: list[str] | None = None
|
|
|
|
|
|
class ScriptInfo(BaseModel):
|
|
"""Information about an available script."""
|
|
|
|
name: str
|
|
label: str
|
|
command: str
|
|
description: str
|
|
icon: str | None = None
|
|
timeout: int
|
|
parameters: dict[str, ScriptParameterInfo] = Field(default_factory=dict)
|
|
|
|
|
|
@router.get("/list")
|
|
async def list_scripts(_: str = Depends(verify_token)) -> list[ScriptInfo]:
|
|
"""List all available scripts.
|
|
|
|
Returns:
|
|
List of available scripts with their descriptions
|
|
"""
|
|
return [
|
|
ScriptInfo(
|
|
name=name,
|
|
label=config.label or name.replace("_", " ").title(),
|
|
command=config.command,
|
|
description=config.description,
|
|
icon=config.icon,
|
|
timeout=config.timeout,
|
|
parameters={
|
|
pname: ScriptParameterInfo(**pconfig.model_dump())
|
|
for pname, pconfig in config.parameters.items()
|
|
},
|
|
)
|
|
for name, config in settings.scripts.items()
|
|
]
|
|
|
|
|
|
def _validate_params(
|
|
params: dict[str, str | int | float | bool],
|
|
param_defs: dict[str, ScriptParameterConfig],
|
|
) -> dict[str, str]:
|
|
"""Validate parameters against script schema and return env vars.
|
|
|
|
Args:
|
|
params: User-supplied parameter values.
|
|
param_defs: Parameter definitions from script config.
|
|
|
|
Returns:
|
|
Dict of environment variables (SCRIPT_PARAM_<NAME> -> str value).
|
|
|
|
Raises:
|
|
HTTPException: On validation failure.
|
|
"""
|
|
# Reject unknown parameters
|
|
unknown = set(params.keys()) - set(param_defs.keys())
|
|
if unknown:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Unknown parameters: {', '.join(sorted(unknown))}",
|
|
)
|
|
|
|
env_vars: dict[str, str] = {}
|
|
|
|
for pname, pdef in param_defs.items():
|
|
value = params.get(pname)
|
|
|
|
# Apply default if missing
|
|
if value is None and pdef.default is not None:
|
|
value = pdef.default
|
|
|
|
# Check required
|
|
if value is None:
|
|
if pdef.required:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Required parameter '{pname}' is missing",
|
|
)
|
|
continue
|
|
|
|
# Type validation and coercion
|
|
if pdef.type == "integer":
|
|
try:
|
|
value = int(value)
|
|
except (TypeError, ValueError):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Parameter '{pname}' must be an integer, got: {value!r}",
|
|
)
|
|
if pdef.min is not None and value < pdef.min:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Parameter '{pname}' must be >= {pdef.min}, got: {value}",
|
|
)
|
|
if pdef.max is not None and value > pdef.max:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Parameter '{pname}' must be <= {pdef.max}, got: {value}",
|
|
)
|
|
elif pdef.type == "float":
|
|
try:
|
|
value = float(value)
|
|
except (TypeError, ValueError):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Parameter '{pname}' must be a number, got: {value!r}",
|
|
)
|
|
if pdef.min is not None and value < pdef.min:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Parameter '{pname}' must be >= {pdef.min}, got: {value}",
|
|
)
|
|
if pdef.max is not None and value > pdef.max:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Parameter '{pname}' must be <= {pdef.max}, got: {value}",
|
|
)
|
|
elif pdef.type == "boolean":
|
|
if isinstance(value, str):
|
|
if value.lower() in ("true", "1", "yes"):
|
|
value = True
|
|
elif value.lower() in ("false", "0", "no"):
|
|
value = False
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Parameter '{pname}' must be a boolean, got: {value!r}",
|
|
)
|
|
elif not isinstance(value, bool):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Parameter '{pname}' must be a boolean, got: {value!r}",
|
|
)
|
|
elif pdef.type == "select":
|
|
value = str(value)
|
|
if pdef.options and value not in pdef.options:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Parameter '{pname}' must be one of {pdef.options}, got: {value!r}",
|
|
)
|
|
else:
|
|
# string — just convert to str
|
|
value = str(value)
|
|
|
|
env_vars[f"SCRIPT_PARAM_{pname.upper()}"] = str(value)
|
|
|
|
return env_vars
|
|
|
|
|
|
@router.post("/execute/{script_name}")
|
|
async def execute_script(
|
|
script_name: str,
|
|
request: ScriptExecuteRequest | None = None,
|
|
_: str = Depends(verify_token),
|
|
) -> ScriptExecuteResponse:
|
|
"""Execute a pre-defined script by name.
|
|
|
|
Args:
|
|
script_name: Name of the script to execute (must be defined in config)
|
|
request: Optional parameters to pass to the script
|
|
|
|
Returns:
|
|
Execution result including stdout, stderr, and exit code
|
|
"""
|
|
# Check if script exists
|
|
if script_name not in settings.scripts:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Script '{script_name}' not found. Use /api/scripts/list to see available scripts.",
|
|
)
|
|
script_config = settings.scripts[script_name]
|
|
params = request.params if request else {}
|
|
|
|
# Validate parameters and build env vars
|
|
extra_env = _validate_params(params, script_config.parameters)
|
|
|
|
logger.info(f"Executing script: {script_name}")
|
|
|
|
try:
|
|
# Execute in dedicated thread pool to not block the default executor
|
|
loop = asyncio.get_event_loop()
|
|
result = await loop.run_in_executor(
|
|
_script_executor,
|
|
lambda: _run_script(
|
|
command=script_config.command,
|
|
timeout=script_config.timeout,
|
|
shell=script_config.shell,
|
|
working_dir=script_config.working_dir,
|
|
extra_env=extra_env,
|
|
),
|
|
)
|
|
|
|
return ScriptExecuteResponse(
|
|
success=result["exit_code"] == 0,
|
|
script=script_name,
|
|
exit_code=result["exit_code"],
|
|
stdout=result["stdout"],
|
|
stderr=result["stderr"],
|
|
execution_time=result.get("execution_time"),
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Script execution error: {e}")
|
|
return ScriptExecuteResponse(
|
|
success=False,
|
|
script=script_name,
|
|
error=str(e),
|
|
)
|
|
|
|
|
|
def _run_script(
|
|
command: str,
|
|
timeout: int,
|
|
shell: bool,
|
|
working_dir: str | None,
|
|
extra_env: dict[str, str] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Run a script synchronously.
|
|
|
|
Args:
|
|
command: Command to execute
|
|
timeout: Timeout in seconds
|
|
shell: Whether to run in shell
|
|
working_dir: Working directory
|
|
extra_env: Additional environment variables (e.g. SCRIPT_PARAM_*)
|
|
|
|
Returns:
|
|
Dict with exit_code, stdout, stderr, execution_time
|
|
"""
|
|
start_time = time.time()
|
|
env = None
|
|
if extra_env:
|
|
import os
|
|
env = {**os.environ, **extra_env}
|
|
try:
|
|
result = subprocess.run(
|
|
command,
|
|
shell=shell,
|
|
cwd=working_dir,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout,
|
|
env=env,
|
|
)
|
|
execution_time = time.time() - start_time
|
|
return {
|
|
"exit_code": result.returncode,
|
|
"stdout": result.stdout[:10000], # Limit output size
|
|
"stderr": result.stderr[:10000],
|
|
"execution_time": execution_time,
|
|
}
|
|
except subprocess.TimeoutExpired:
|
|
execution_time = time.time() - start_time
|
|
return {
|
|
"exit_code": -1,
|
|
"stdout": "",
|
|
"stderr": f"Script timed out after {timeout} seconds",
|
|
"execution_time": execution_time,
|
|
}
|
|
except Exception as e:
|
|
execution_time = time.time() - start_time
|
|
return {
|
|
"exit_code": -1,
|
|
"stdout": "",
|
|
"stderr": str(e),
|
|
"execution_time": execution_time,
|
|
}
|
|
|
|
|
|
# Script management endpoints
|
|
|
|
|
|
class ScriptParameterCreateRequest(BaseModel):
|
|
"""Request model for a script parameter definition."""
|
|
|
|
type: str = Field(
|
|
..., description="Parameter type: string, integer, float, boolean, select"
|
|
)
|
|
description: str = Field(default="", description="Parameter description")
|
|
required: bool = Field(default=False, description="Whether the parameter is required")
|
|
default: str | int | float | bool | None = Field(
|
|
default=None, description="Default value if not provided"
|
|
)
|
|
min: float | None = Field(default=None, description="Minimum value (numeric types only)")
|
|
max: float | None = Field(default=None, description="Maximum value (numeric types only)")
|
|
options: list[str] | None = Field(
|
|
default=None, description="Allowed values (select type only)"
|
|
)
|
|
|
|
|
|
class ScriptCreateRequest(BaseModel):
|
|
"""Request model for creating or updating a script."""
|
|
|
|
command: str = Field(..., description="Command to execute", min_length=1)
|
|
label: str | None = Field(default=None, description="User-friendly label")
|
|
description: str = Field(default="", description="Script description")
|
|
icon: str | None = Field(default=None, description="Custom MDI icon (e.g., 'mdi:power')")
|
|
timeout: int = Field(default=30, description="Execution timeout in seconds", ge=1, le=300)
|
|
working_dir: str | None = Field(default=None, description="Working directory")
|
|
shell: bool = Field(default=True, description="Run command in shell")
|
|
parameters: dict[str, ScriptParameterCreateRequest] = Field(
|
|
default_factory=dict, description="Named parameters with type and validation rules"
|
|
)
|
|
|
|
|
|
def _validate_parameter_definitions(
|
|
parameters: dict[str, ScriptParameterCreateRequest],
|
|
) -> None:
|
|
"""Validate parameter definitions are well-formed.
|
|
|
|
Args:
|
|
parameters: Parameter definitions to validate.
|
|
|
|
Raises:
|
|
HTTPException: If any definition is invalid.
|
|
"""
|
|
valid_types = {"string", "integer", "float", "boolean", "select"}
|
|
|
|
for pname, pdef in parameters.items():
|
|
if not re.match(r"^[a-zA-Z][a-zA-Z0-9_]*$", pname):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=(
|
|
f"Parameter name '{pname}' must start with a letter"
|
|
" and contain only alphanumeric characters and underscores"
|
|
),
|
|
)
|
|
|
|
if pdef.type not in valid_types:
|
|
allowed = ", ".join(sorted(valid_types))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Parameter '{pname}' has invalid type '{pdef.type}'. Must be one of: {allowed}",
|
|
)
|
|
|
|
if pdef.type == "select":
|
|
if not pdef.options or len(pdef.options) == 0:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Parameter '{pname}' of type 'select' must have a non-empty 'options' list",
|
|
)
|
|
|
|
if pdef.type not in ("integer", "float"):
|
|
if pdef.min is not None or pdef.max is not None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Parameter '{pname}': 'min'/'max' are only valid for integer/float types",
|
|
)
|
|
|
|
if pdef.min is not None and pdef.max is not None and pdef.min > pdef.max:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Parameter '{pname}': 'min' ({pdef.min}) must be <= 'max' ({pdef.max})",
|
|
)
|
|
|
|
|
|
def _validate_script_name(name: str) -> None:
|
|
"""Validate script name.
|
|
|
|
Args:
|
|
name: Script name to validate.
|
|
|
|
Raises:
|
|
HTTPException: If name is invalid.
|
|
"""
|
|
if not name:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Script name cannot be empty",
|
|
)
|
|
|
|
if not re.match(r"^[a-zA-Z0-9_]+$", name):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Script name must contain only alphanumeric characters and underscores",
|
|
)
|
|
|
|
if len(name) > 64:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Script name must be 64 characters or less",
|
|
)
|
|
|
|
|
|
@router.post("/create/{script_name}")
|
|
async def create_script(
|
|
script_name: str,
|
|
request: ScriptCreateRequest,
|
|
_: str = Depends(verify_token),
|
|
) -> dict[str, Any]:
|
|
"""Create a new script.
|
|
|
|
Args:
|
|
script_name: Name for the new script (alphanumeric + underscore only).
|
|
request: Script configuration.
|
|
|
|
Returns:
|
|
Success response with script name.
|
|
|
|
Raises:
|
|
HTTPException: If script already exists or name is invalid.
|
|
"""
|
|
# Validate name
|
|
_validate_script_name(script_name)
|
|
|
|
# Check if script already exists
|
|
if script_name in settings.scripts:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Script '{script_name}' already exists. Use PUT /api/scripts/update/{script_name} to update it.",
|
|
)
|
|
|
|
# Validate parameter definitions
|
|
_validate_parameter_definitions(request.parameters)
|
|
|
|
# Build ScriptConfig with ScriptParameterConfig instances
|
|
data = request.model_dump()
|
|
data["parameters"] = {
|
|
pname: ScriptParameterConfig(**pdef)
|
|
for pname, pdef in data.get("parameters", {}).items()
|
|
}
|
|
script_config = ScriptConfig(**data)
|
|
|
|
# Add to config file and in-memory
|
|
try:
|
|
config_manager.add_script(script_name, script_config)
|
|
except Exception as e:
|
|
logger.error(f"Failed to add script '{script_name}': {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to add script: {str(e)}",
|
|
)
|
|
|
|
# Notify WebSocket clients
|
|
await ws_manager.broadcast_scripts_changed()
|
|
|
|
logger.info(f"Script '{script_name}' created successfully")
|
|
return {"success": True, "script": script_name}
|
|
|
|
|
|
@router.put("/update/{script_name}")
|
|
async def update_script(
|
|
script_name: str,
|
|
request: ScriptCreateRequest,
|
|
_: str = Depends(verify_token),
|
|
) -> dict[str, Any]:
|
|
"""Update an existing script.
|
|
|
|
Args:
|
|
script_name: Name of the script to update.
|
|
request: Updated script configuration.
|
|
|
|
Returns:
|
|
Success response with script name.
|
|
|
|
Raises:
|
|
HTTPException: If script does not exist.
|
|
"""
|
|
# Validate name
|
|
_validate_script_name(script_name)
|
|
|
|
# Check if script exists
|
|
if script_name not in settings.scripts:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Script '{script_name}' not found. Use POST /api/scripts/create/{script_name} to create it.",
|
|
)
|
|
|
|
# Validate parameter definitions
|
|
_validate_parameter_definitions(request.parameters)
|
|
|
|
# Build ScriptConfig with ScriptParameterConfig instances
|
|
data = request.model_dump()
|
|
data["parameters"] = {
|
|
pname: ScriptParameterConfig(**pdef)
|
|
for pname, pdef in data.get("parameters", {}).items()
|
|
}
|
|
script_config = ScriptConfig(**data)
|
|
|
|
# Update config file and in-memory
|
|
try:
|
|
config_manager.update_script(script_name, script_config)
|
|
except Exception as e:
|
|
logger.error(f"Failed to update script '{script_name}': {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to update script: {str(e)}",
|
|
)
|
|
|
|
# Notify WebSocket clients
|
|
await ws_manager.broadcast_scripts_changed()
|
|
|
|
logger.info(f"Script '{script_name}' updated successfully")
|
|
return {"success": True, "script": script_name}
|
|
|
|
|
|
@router.delete("/delete/{script_name}")
|
|
async def delete_script(
|
|
script_name: str,
|
|
_: str = Depends(verify_token),
|
|
) -> dict[str, Any]:
|
|
"""Delete a script.
|
|
|
|
Args:
|
|
script_name: Name of the script to delete.
|
|
|
|
Returns:
|
|
Success response with script name.
|
|
|
|
Raises:
|
|
HTTPException: If script does not exist.
|
|
"""
|
|
# Validate name
|
|
_validate_script_name(script_name)
|
|
|
|
# Check if script exists
|
|
if script_name not in settings.scripts:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Script '{script_name}' not found",
|
|
)
|
|
|
|
# Delete from config file and in-memory
|
|
try:
|
|
config_manager.delete_script(script_name)
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete script '{script_name}': {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to delete script: {str(e)}",
|
|
)
|
|
|
|
# Notify WebSocket clients
|
|
await ws_manager.broadcast_scripts_changed()
|
|
|
|
logger.info(f"Script '{script_name}' deleted successfully")
|
|
return {"success": True, "script": script_name}
|