"""Script execution API endpoints.""" import asyncio import logging import subprocess from typing import Any from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel, Field from ..auth import verify_token from ..config import settings router = APIRouter(prefix="/api/scripts", tags=["scripts"]) logger = logging.getLogger(__name__) class ScriptExecuteRequest(BaseModel): """Request model for script execution with optional arguments.""" args: list[str] = Field(default_factory=list, description="Additional arguments") class ScriptExecuteResponse(BaseModel): """Response model for script execution.""" success: bool script: str exit_code: int | None = None stdout: str = "" stderr: str = "" error: str | None = None class ScriptInfo(BaseModel): """Information about an available script.""" name: str label: str description: str icon: str | None = None timeout: int @router.get("/list") async def list_scripts(_: str = Depends(verify_token)) -> list[ScriptInfo]: """List all available scripts. Returns: List of available scripts with their descriptions """ return [ ScriptInfo( name=name, label=config.label or name.replace("_", " ").title(), description=config.description, icon=config.icon, timeout=config.timeout, ) for name, config in settings.scripts.items() ] @router.post("/execute/{script_name}") async def execute_script( script_name: str, request: ScriptExecuteRequest | None = None, _: str = Depends(verify_token), ) -> ScriptExecuteResponse: """Execute a pre-defined script by name. Args: script_name: Name of the script to execute (must be defined in config) request: Optional arguments to pass to the script Returns: Execution result including stdout, stderr, and exit code """ # Check if script exists if script_name not in settings.scripts: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Script '{script_name}' not found. Use /api/scripts/list to see available scripts.", ) script_config = settings.scripts[script_name] args = request.args if request else [] logger.info(f"Executing script: {script_name}") try: # Build command command = script_config.command if args: # Append arguments to command command = f"{command} {' '.join(args)}" # Execute in thread pool to not block loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, lambda: _run_script( command=command, timeout=script_config.timeout, shell=script_config.shell, working_dir=script_config.working_dir, ), ) return ScriptExecuteResponse( success=result["exit_code"] == 0, script=script_name, exit_code=result["exit_code"], stdout=result["stdout"], stderr=result["stderr"], ) except Exception as e: logger.error(f"Script execution error: {e}") return ScriptExecuteResponse( success=False, script=script_name, error=str(e), ) def _run_script( command: str, timeout: int, shell: bool, working_dir: str | None, ) -> dict[str, Any]: """Run a script synchronously. Args: command: Command to execute timeout: Timeout in seconds shell: Whether to run in shell working_dir: Working directory Returns: Dict with exit_code, stdout, stderr """ try: result = subprocess.run( command, shell=shell, cwd=working_dir, capture_output=True, text=True, timeout=timeout, ) return { "exit_code": result.returncode, "stdout": result.stdout[:10000], # Limit output size "stderr": result.stderr[:10000], } except subprocess.TimeoutExpired: return { "exit_code": -1, "stdout": "", "stderr": f"Script timed out after {timeout} seconds", } except Exception as e: return { "exit_code": -1, "stdout": "", "stderr": str(e), }