"""System routes: health, version, displays, performance, backup/restore, ADB.""" import asyncio import io import json import platform import subprocess import sys import threading from datetime import datetime from pathlib import Path from typing import Optional import psutil from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile from fastapi.responses import StreamingResponse from pydantic import BaseModel from wled_controller import __version__ from wled_controller.api.auth import AuthRequired from wled_controller.api.dependencies import get_auto_backup_engine, get_processor_manager from wled_controller.api.schemas.system import ( AutoBackupSettings, AutoBackupStatusResponse, BackupFileInfo, BackupListResponse, DisplayInfo, DisplayListResponse, GpuInfo, HealthResponse, PerformanceResponse, ProcessListResponse, RestoreResponse, VersionResponse, ) from wled_controller.core.backup.auto_backup import AutoBackupEngine from wled_controller.config import get_config from wled_controller.core.capture.screen_capture import get_available_displays from wled_controller.utils import atomic_write_json, get_logger logger = get_logger(__name__) # Prime psutil CPU counter (first call always returns 0.0) psutil.cpu_percent(interval=None) # Try to initialize NVIDIA GPU monitoring _nvml_available = False try: import pynvml as _pynvml_mod # nvidia-ml-py (the pynvml wrapper is deprecated) _pynvml_mod.nvmlInit() _nvml_handle = _pynvml_mod.nvmlDeviceGetHandleByIndex(0) _nvml_available = True _nvml = _pynvml_mod logger.info(f"NVIDIA GPU monitoring enabled: {_nvml.nvmlDeviceGetName(_nvml_handle)}") except Exception: _nvml = None logger.info("NVIDIA GPU monitoring unavailable (pynvml not installed or no NVIDIA GPU)") def _get_cpu_name() -> str | None: """Get a human-friendly CPU model name (cached at module level).""" try: if platform.system() == "Windows": import winreg key = winreg.OpenKey( winreg.HKEY_LOCAL_MACHINE, r"HARDWARE\DESCRIPTION\System\CentralProcessor\0", ) name, _ = winreg.QueryValueEx(key, "ProcessorNameString") winreg.CloseKey(key) return name.strip() elif platform.system() == "Linux": with open("/proc/cpuinfo") as f: for line in f: if "model name" in line: return line.split(":")[1].strip() elif platform.system() == "Darwin": return ( subprocess.check_output( ["sysctl", "-n", "machdep.cpu.brand_string"] ) .decode() .strip() ) except Exception: pass return platform.processor() or None _cpu_name: str | None = _get_cpu_name() router = APIRouter() @router.get("/health", response_model=HealthResponse, tags=["Health"]) async def health_check(): """Check service health status. Returns basic health information including status, version, and timestamp. """ logger.info("Health check requested") return HealthResponse( status="healthy", timestamp=datetime.utcnow(), version=__version__, ) @router.get("/api/v1/version", response_model=VersionResponse, tags=["Info"]) async def get_version(): """Get version information. Returns application version, Python version, and API version. """ logger.info("Version info requested") return VersionResponse( version=__version__, python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", api_version="v1", ) @router.get("/api/v1/config/displays", response_model=DisplayListResponse, tags=["Config"]) async def get_displays( _: AuthRequired, engine_type: Optional[str] = Query(None, description="Engine type to get displays for"), ): """Get list of available displays. Returns information about all available monitors/displays that can be captured. When ``engine_type`` is provided, returns displays specific to that engine (e.g. ``scrcpy`` returns connected Android devices instead of desktop monitors). """ logger.info(f"Listing available displays (engine_type={engine_type})") try: if engine_type: from wled_controller.core.capture_engines import EngineRegistry engine_cls = EngineRegistry.get_engine(engine_type) display_dataclasses = engine_cls.get_available_displays() else: display_dataclasses = get_available_displays() # Convert dataclass DisplayInfo to Pydantic DisplayInfo displays = [ DisplayInfo( index=d.index, name=d.name, width=d.width, height=d.height, x=d.x, y=d.y, is_primary=d.is_primary, refresh_rate=d.refresh_rate, ) for d in display_dataclasses ] logger.info(f"Found {len(displays)} displays") return DisplayListResponse( displays=displays, count=len(displays), ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to get displays: {e}") raise HTTPException( status_code=500, detail=f"Failed to retrieve display information: {str(e)}" ) @router.get("/api/v1/system/processes", response_model=ProcessListResponse, tags=["Config"]) async def get_running_processes(_: AuthRequired): """Get list of currently running process names. Returns a sorted list of unique process names for use in automation conditions. """ from wled_controller.core.automations.platform_detector import PlatformDetector try: detector = PlatformDetector() processes = await detector.get_running_processes() sorted_procs = sorted(processes) return ProcessListResponse(processes=sorted_procs, count=len(sorted_procs)) except Exception as e: logger.error(f"Failed to get processes: {e}") raise HTTPException( status_code=500, detail=f"Failed to retrieve process list: {str(e)}" ) @router.get( "/api/v1/system/performance", response_model=PerformanceResponse, tags=["Config"], ) def get_system_performance(_: AuthRequired): """Get current system performance metrics (CPU, RAM, GPU). Uses sync ``def`` so FastAPI runs it in a thread pool — the psutil and NVML calls are blocking and would stall the event loop if run in an ``async def`` handler. """ mem = psutil.virtual_memory() gpu = None if _nvml_available: try: util = _nvml.nvmlDeviceGetUtilizationRates(_nvml_handle) mem_info = _nvml.nvmlDeviceGetMemoryInfo(_nvml_handle) temp = _nvml.nvmlDeviceGetTemperature( _nvml_handle, _nvml.NVML_TEMPERATURE_GPU ) gpu = GpuInfo( name=_nvml.nvmlDeviceGetName(_nvml_handle), utilization=float(util.gpu), memory_used_mb=round(mem_info.used / 1024 / 1024, 1), memory_total_mb=round(mem_info.total / 1024 / 1024, 1), temperature_c=float(temp), ) except Exception: pass return PerformanceResponse( cpu_name=_cpu_name, cpu_percent=psutil.cpu_percent(interval=None), ram_used_mb=round(mem.used / 1024 / 1024, 1), ram_total_mb=round(mem.total / 1024 / 1024, 1), ram_percent=mem.percent, gpu=gpu, timestamp=datetime.utcnow(), ) @router.get("/api/v1/system/metrics-history", tags=["Config"]) async def get_metrics_history( _: AuthRequired, manager=Depends(get_processor_manager), ): """Return the last ~2 minutes of system and per-target metrics. Used by the dashboard to seed charts on page load so history survives browser refreshes. """ return manager.metrics_history.get_history() # --------------------------------------------------------------------------- # Configuration backup / restore # --------------------------------------------------------------------------- # Mapping: logical store name → StorageConfig attribute name STORE_MAP = { "devices": "devices_file", "capture_templates": "templates_file", "postprocessing_templates": "postprocessing_templates_file", "picture_sources": "picture_sources_file", "output_targets": "output_targets_file", "pattern_templates": "pattern_templates_file", "color_strip_sources": "color_strip_sources_file", "audio_sources": "audio_sources_file", "audio_templates": "audio_templates_file", "value_sources": "value_sources_file", "sync_clocks": "sync_clocks_file", "automations": "automations_file", "scene_presets": "scene_presets_file", } _SERVER_DIR = Path(__file__).resolve().parents[4] def _schedule_restart() -> None: """Spawn a restart script after a short delay so the HTTP response completes.""" def _restart(): import time time.sleep(1) if sys.platform == "win32": subprocess.Popen( ["powershell", "-ExecutionPolicy", "Bypass", "-File", str(_SERVER_DIR / "restart.ps1")], creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP, ) else: subprocess.Popen( ["bash", str(_SERVER_DIR / "restart.sh")], start_new_session=True, ) threading.Thread(target=_restart, daemon=True).start() @router.get("/api/v1/system/backup", tags=["System"]) def backup_config(_: AuthRequired): """Download all configuration as a single JSON backup file.""" config = get_config() stores = {} for store_key, config_attr in STORE_MAP.items(): file_path = Path(getattr(config.storage, config_attr)) if file_path.exists(): with open(file_path, "r", encoding="utf-8") as f: stores[store_key] = json.load(f) else: stores[store_key] = {} backup = { "meta": { "format": "ledgrab-backup", "format_version": 1, "app_version": __version__, "created_at": datetime.utcnow().isoformat() + "Z", "store_count": len(stores), }, "stores": stores, } content = json.dumps(backup, indent=2, ensure_ascii=False) timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H%M%S") filename = f"ledgrab-backup-{timestamp}.json" return StreamingResponse( io.BytesIO(content.encode("utf-8")), media_type="application/json", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) @router.post("/api/v1/system/restore", response_model=RestoreResponse, tags=["System"]) async def restore_config( _: AuthRequired, file: UploadFile = File(...), ): """Upload a backup file to restore all configuration. Triggers server restart.""" # Read and parse try: raw = await file.read() if len(raw) > 10 * 1024 * 1024: # 10 MB limit raise HTTPException(status_code=400, detail="Backup file too large (max 10 MB)") backup = json.loads(raw) except json.JSONDecodeError as e: raise HTTPException(status_code=400, detail=f"Invalid JSON file: {e}") # Validate envelope meta = backup.get("meta") if not isinstance(meta, dict) or meta.get("format") != "ledgrab-backup": raise HTTPException(status_code=400, detail="Not a valid LED Grab backup file") fmt_version = meta.get("format_version", 0) if fmt_version > 1: raise HTTPException( status_code=400, detail=f"Backup format version {fmt_version} is not supported by this server version", ) stores = backup.get("stores") if not isinstance(stores, dict): raise HTTPException(status_code=400, detail="Backup file missing 'stores' section") known_keys = set(STORE_MAP.keys()) present_keys = known_keys & set(stores.keys()) if not present_keys: raise HTTPException(status_code=400, detail="Backup contains no recognized store data") for key in present_keys: if not isinstance(stores[key], dict): raise HTTPException(status_code=400, detail=f"Store '{key}' in backup is not a valid JSON object") # Write store files atomically (in thread to avoid blocking event loop) config = get_config() def _write_stores(): count = 0 for store_key, config_attr in STORE_MAP.items(): if store_key in stores: file_path = Path(getattr(config.storage, config_attr)) atomic_write_json(file_path, stores[store_key]) count += 1 logger.info(f"Restored store: {store_key} -> {file_path}") return count written = await asyncio.to_thread(_write_stores) logger.info(f"Restore complete: {written}/{len(STORE_MAP)} stores written. Scheduling restart...") _schedule_restart() missing = known_keys - present_keys return RestoreResponse( status="restored", stores_written=written, stores_total=len(STORE_MAP), missing_stores=sorted(missing) if missing else [], restart_scheduled=True, message=f"Restored {written} stores. Server restarting...", ) # --------------------------------------------------------------------------- # Auto-backup settings & saved backups # --------------------------------------------------------------------------- @router.get( "/api/v1/system/auto-backup/settings", response_model=AutoBackupStatusResponse, tags=["System"], ) async def get_auto_backup_settings( _: AuthRequired, engine: AutoBackupEngine = Depends(get_auto_backup_engine), ): """Get auto-backup settings and status.""" return engine.get_settings() @router.put( "/api/v1/system/auto-backup/settings", response_model=AutoBackupStatusResponse, tags=["System"], ) async def update_auto_backup_settings( _: AuthRequired, body: AutoBackupSettings, engine: AutoBackupEngine = Depends(get_auto_backup_engine), ): """Update auto-backup settings (enable/disable, interval, max backups).""" return await engine.update_settings( enabled=body.enabled, interval_hours=body.interval_hours, max_backups=body.max_backups, ) @router.get( "/api/v1/system/backups", response_model=BackupListResponse, tags=["System"], ) async def list_backups( _: AuthRequired, engine: AutoBackupEngine = Depends(get_auto_backup_engine), ): """List all saved backup files.""" backups = engine.list_backups() return BackupListResponse( backups=[BackupFileInfo(**b) for b in backups], count=len(backups), ) @router.get("/api/v1/system/backups/{filename}", tags=["System"]) def download_saved_backup( filename: str, _: AuthRequired, engine: AutoBackupEngine = Depends(get_auto_backup_engine), ): """Download a specific saved backup file.""" try: path = engine.get_backup_path(filename) except (ValueError, FileNotFoundError) as e: raise HTTPException(status_code=404, detail=str(e)) content = path.read_bytes() return StreamingResponse( io.BytesIO(content), media_type="application/json", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) @router.delete("/api/v1/system/backups/{filename}", tags=["System"]) async def delete_saved_backup( filename: str, _: AuthRequired, engine: AutoBackupEngine = Depends(get_auto_backup_engine), ): """Delete a specific saved backup file.""" try: engine.delete_backup(filename) except (ValueError, FileNotFoundError) as e: raise HTTPException(status_code=404, detail=str(e)) return {"status": "deleted", "filename": filename} # --------------------------------------------------------------------------- # ADB helpers (for Android / scrcpy engine) # --------------------------------------------------------------------------- class AdbConnectRequest(BaseModel): address: str def _get_adb_path() -> str: """Get the adb binary path from the scrcpy engine's resolver.""" from wled_controller.core.capture_engines.scrcpy_engine import _get_adb return _get_adb() @router.post("/api/v1/adb/connect", tags=["ADB"]) async def adb_connect(_: AuthRequired, request: AdbConnectRequest): """Connect to a WiFi ADB device by IP address. Appends ``:5555`` if no port is specified. """ address = request.address.strip() if not address: raise HTTPException(status_code=400, detail="Address is required") if ":" not in address: address = f"{address}:5555" adb = _get_adb_path() logger.info(f"Connecting ADB device: {address}") try: result = subprocess.run( [adb, "connect", address], capture_output=True, text=True, timeout=10, ) output = (result.stdout + result.stderr).strip() if "connected" in output.lower(): return {"status": "connected", "address": address, "message": output} raise HTTPException(status_code=400, detail=output or "Connection failed") except FileNotFoundError: raise HTTPException( status_code=500, detail="adb not found on PATH. Install Android SDK Platform-Tools.", ) except subprocess.TimeoutExpired: raise HTTPException(status_code=504, detail="ADB connect timed out") @router.post("/api/v1/adb/disconnect", tags=["ADB"]) async def adb_disconnect(_: AuthRequired, request: AdbConnectRequest): """Disconnect a WiFi ADB device.""" address = request.address.strip() if not address: raise HTTPException(status_code=400, detail="Address is required") adb = _get_adb_path() logger.info(f"Disconnecting ADB device: {address}") try: result = subprocess.run( [adb, "disconnect", address], capture_output=True, text=True, timeout=10, ) return {"status": "disconnected", "message": result.stdout.strip()} except FileNotFoundError: raise HTTPException(status_code=500, detail="adb not found on PATH") except subprocess.TimeoutExpired: raise HTTPException(status_code=504, detail="ADB disconnect timed out")