Files
wled-screen-controller-mixed/server/src/wled_controller/api/routes/system.py
alexei.dolgolyov 2240471b67 Add demo mode: virtual hardware sandbox for testing without real devices
Demo mode provides a complete sandbox environment with:
- Virtual capture engine (radial rainbow test pattern on 3 displays)
- Virtual audio engine (synthetic music-like audio on 2 devices)
- Virtual LED device provider (strip/60, matrix/256, ring/24 LEDs)
- Isolated data directory (data/demo/) with auto-seeded sample entities
- Dedicated config (config/demo_config.yaml) with pre-configured API key
- Frontend indicator (DEMO badge + dismissible banner)
- Engine filtering (only demo engines visible in demo mode)
- Separate entry point: python -m wled_controller.demo (port 8081)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 16:17:14 +03:00

995 lines
34 KiB
Python

"""System routes: health, version, displays, performance, backup/restore, ADB."""
import asyncio
import io
import json
import logging
import platform
import subprocess
import sys
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import psutil
from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile, WebSocket, WebSocketDisconnect
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from wled_controller import __version__
from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
get_auto_backup_engine,
get_audio_source_store,
get_audio_template_store,
get_automation_store,
get_color_strip_store,
get_device_store,
get_output_target_store,
get_pattern_template_store,
get_picture_source_store,
get_pp_template_store,
get_processor_manager,
get_scene_preset_store,
get_sync_clock_store,
get_template_store,
get_value_source_store,
)
from wled_controller.api.schemas.system import (
AutoBackupSettings,
AutoBackupStatusResponse,
BackupFileInfo,
BackupListResponse,
DisplayInfo,
DisplayListResponse,
ExternalUrlRequest,
ExternalUrlResponse,
GpuInfo,
HealthResponse,
LogLevelRequest,
LogLevelResponse,
MQTTSettingsRequest,
MQTTSettingsResponse,
PerformanceResponse,
ProcessListResponse,
RestoreResponse,
VersionResponse,
)
from wled_controller.core.backup.auto_backup import AutoBackupEngine
from wled_controller.config import get_config, is_demo_mode
from wled_controller.core.capture.screen_capture import get_available_displays
from wled_controller.utils import atomic_write_json, get_logger
logger = get_logger(__name__)
# Prime psutil CPU counter (first call always returns 0.0)
psutil.cpu_percent(interval=None)
# GPU monitoring (initialized once in utils.gpu, shared with metrics_history)
from wled_controller.utils.gpu import nvml_available as _nvml_available, nvml as _nvml, nvml_handle as _nvml_handle
from wled_controller.storage.base_store import EntityNotFoundError
def _get_cpu_name() -> str | None:
"""Get a human-friendly CPU model name (cached at module level)."""
try:
if platform.system() == "Windows":
import winreg
key = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE,
r"HARDWARE\DESCRIPTION\System\CentralProcessor\0",
)
name, _ = winreg.QueryValueEx(key, "ProcessorNameString")
winreg.CloseKey(key)
return name.strip()
elif platform.system() == "Linux":
with open("/proc/cpuinfo") as f:
for line in f:
if "model name" in line:
return line.split(":")[1].strip()
elif platform.system() == "Darwin":
return (
subprocess.check_output(
["sysctl", "-n", "machdep.cpu.brand_string"]
)
.decode()
.strip()
)
except Exception as e:
logger.warning("CPU name detection failed: %s", e)
return platform.processor() or None
_cpu_name: str | None = _get_cpu_name()
router = APIRouter()
@router.get("/health", response_model=HealthResponse, tags=["Health"])
async def health_check():
"""Check service health status.
Returns basic health information including status, version, and timestamp.
"""
logger.info("Health check requested")
return HealthResponse(
status="healthy",
timestamp=datetime.now(timezone.utc),
version=__version__,
demo_mode=get_config().demo,
)
@router.get("/api/v1/version", response_model=VersionResponse, tags=["Info"])
async def get_version():
"""Get version information.
Returns application version, Python version, and API version.
"""
logger.info("Version info requested")
return VersionResponse(
version=__version__,
python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
api_version="v1",
demo_mode=get_config().demo,
)
@router.get("/api/v1/tags", tags=["Tags"])
async def list_all_tags(_: AuthRequired):
"""Get all tags used across all entities."""
all_tags: set[str] = set()
store_getters = [
get_device_store, get_output_target_store, get_color_strip_store,
get_picture_source_store, get_audio_source_store, get_value_source_store,
get_sync_clock_store, get_automation_store, get_scene_preset_store,
get_template_store, get_audio_template_store, get_pp_template_store,
get_pattern_template_store,
]
for getter in store_getters:
try:
store = getter()
except RuntimeError:
continue
# BaseJsonStore subclasses provide get_all(); DeviceStore provides get_all_devices()
fn = getattr(store, "get_all", None) or getattr(store, "get_all_devices", None)
items = fn() if fn else None
if items:
for item in items:
all_tags.update(item.tags)
return {"tags": sorted(all_tags)}
@router.get("/api/v1/config/displays", response_model=DisplayListResponse, tags=["Config"])
async def get_displays(
_: AuthRequired,
engine_type: Optional[str] = Query(None, description="Engine type to get displays for"),
):
"""Get list of available displays.
Returns information about all available monitors/displays that can be captured.
When ``engine_type`` is provided, returns displays specific to that engine
(e.g. ``scrcpy`` returns connected Android devices instead of desktop monitors).
"""
logger.info(f"Listing available displays (engine_type={engine_type})")
try:
from wled_controller.core.capture_engines import EngineRegistry
if engine_type:
engine_cls = EngineRegistry.get_engine(engine_type)
display_dataclasses = await asyncio.to_thread(engine_cls.get_available_displays)
elif is_demo_mode():
# In demo mode, use the best available engine (demo engine at priority 1000)
# instead of the mss-based real display detection
best = EngineRegistry.get_best_available_engine()
if best:
engine_cls = EngineRegistry.get_engine(best)
display_dataclasses = await asyncio.to_thread(engine_cls.get_available_displays)
else:
display_dataclasses = await asyncio.to_thread(get_available_displays)
else:
display_dataclasses = await asyncio.to_thread(get_available_displays)
# Convert dataclass DisplayInfo to Pydantic DisplayInfo
displays = [
DisplayInfo(
index=d.index,
name=d.name,
width=d.width,
height=d.height,
x=d.x,
y=d.y,
is_primary=d.is_primary,
refresh_rate=d.refresh_rate,
)
for d in display_dataclasses
]
logger.info(f"Found {len(displays)} displays")
return DisplayListResponse(
displays=displays,
count=len(displays),
)
except EntityNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to get displays: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to retrieve display information: {str(e)}"
)
@router.get("/api/v1/system/processes", response_model=ProcessListResponse, tags=["Config"])
async def get_running_processes(_: AuthRequired):
"""Get list of currently running process names.
Returns a sorted list of unique process names for use in automation conditions.
"""
from wled_controller.core.automations.platform_detector import PlatformDetector
try:
detector = PlatformDetector()
processes = await detector.get_running_processes()
sorted_procs = sorted(processes)
return ProcessListResponse(processes=sorted_procs, count=len(sorted_procs))
except Exception as e:
logger.error(f"Failed to get processes: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to retrieve process list: {str(e)}"
)
@router.get(
"/api/v1/system/performance",
response_model=PerformanceResponse,
tags=["Config"],
)
def get_system_performance(_: AuthRequired):
"""Get current system performance metrics (CPU, RAM, GPU).
Uses sync ``def`` so FastAPI runs it in a thread pool — the psutil
and NVML calls are blocking and would stall the event loop if run
in an ``async def`` handler.
"""
mem = psutil.virtual_memory()
gpu = None
if _nvml_available:
try:
util = _nvml.nvmlDeviceGetUtilizationRates(_nvml_handle)
mem_info = _nvml.nvmlDeviceGetMemoryInfo(_nvml_handle)
temp = _nvml.nvmlDeviceGetTemperature(
_nvml_handle, _nvml.NVML_TEMPERATURE_GPU
)
gpu = GpuInfo(
name=_nvml.nvmlDeviceGetName(_nvml_handle),
utilization=float(util.gpu),
memory_used_mb=round(mem_info.used / 1024 / 1024, 1),
memory_total_mb=round(mem_info.total / 1024 / 1024, 1),
temperature_c=float(temp),
)
except Exception as e:
logger.debug("NVML query failed: %s", e)
return PerformanceResponse(
cpu_name=_cpu_name,
cpu_percent=psutil.cpu_percent(interval=None),
ram_used_mb=round(mem.used / 1024 / 1024, 1),
ram_total_mb=round(mem.total / 1024 / 1024, 1),
ram_percent=mem.percent,
gpu=gpu,
timestamp=datetime.now(timezone.utc),
)
@router.get("/api/v1/system/metrics-history", tags=["Config"])
async def get_metrics_history(
_: AuthRequired,
manager=Depends(get_processor_manager),
):
"""Return the last ~2 minutes of system and per-target metrics.
Used by the dashboard to seed charts on page load so history
survives browser refreshes.
"""
return manager.metrics_history.get_history()
# ---------------------------------------------------------------------------
# Configuration backup / restore
# ---------------------------------------------------------------------------
# Mapping: logical store name → StorageConfig attribute name
STORE_MAP = {
"devices": "devices_file",
"capture_templates": "templates_file",
"postprocessing_templates": "postprocessing_templates_file",
"picture_sources": "picture_sources_file",
"output_targets": "output_targets_file",
"pattern_templates": "pattern_templates_file",
"color_strip_sources": "color_strip_sources_file",
"audio_sources": "audio_sources_file",
"audio_templates": "audio_templates_file",
"value_sources": "value_sources_file",
"sync_clocks": "sync_clocks_file",
"color_strip_processing_templates": "color_strip_processing_templates_file",
"automations": "automations_file",
"scene_presets": "scene_presets_file",
}
_SERVER_DIR = Path(__file__).resolve().parents[4]
def _schedule_restart() -> None:
"""Spawn a restart script after a short delay so the HTTP response completes."""
def _restart():
import time
time.sleep(1)
if sys.platform == "win32":
subprocess.Popen(
["powershell", "-ExecutionPolicy", "Bypass", "-File",
str(_SERVER_DIR / "restart.ps1")],
creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP,
)
else:
subprocess.Popen(
["bash", str(_SERVER_DIR / "restart.sh")],
start_new_session=True,
)
threading.Thread(target=_restart, daemon=True).start()
@router.get("/api/v1/system/api-keys", tags=["System"])
def list_api_keys(_: AuthRequired):
"""List API key labels (read-only; keys are defined in the YAML config file)."""
config = get_config()
keys = [
{"label": label, "masked": key[:4] + "****" + key[-4:] if len(key) >= 8 else "****"}
for label, key in config.auth.api_keys.items()
]
return {"keys": keys, "count": len(keys)}
@router.get("/api/v1/system/export/{store_key}", tags=["System"])
def export_store(store_key: str, _: AuthRequired):
"""Download a single entity store as a JSON file."""
if store_key not in STORE_MAP:
raise HTTPException(
status_code=404,
detail=f"Unknown store '{store_key}'. Valid keys: {sorted(STORE_MAP.keys())}",
)
config = get_config()
file_path = Path(getattr(config.storage, STORE_MAP[store_key]))
if file_path.exists():
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
else:
data = {}
export = {
"meta": {
"format": "ledgrab-partial-export",
"format_version": 1,
"store_key": store_key,
"app_version": __version__,
"created_at": datetime.now(timezone.utc).isoformat() + "Z",
},
"store": data,
}
content = json.dumps(export, indent=2, ensure_ascii=False)
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H%M%S")
filename = f"ledgrab-{store_key}-{timestamp}.json"
return StreamingResponse(
io.BytesIO(content.encode("utf-8")),
media_type="application/json",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.post("/api/v1/system/import/{store_key}", tags=["System"])
async def import_store(
store_key: str,
_: AuthRequired,
file: UploadFile = File(...),
merge: bool = Query(False, description="Merge into existing data instead of replacing"),
):
"""Upload a partial export file to replace or merge one entity store. Triggers server restart."""
if store_key not in STORE_MAP:
raise HTTPException(
status_code=404,
detail=f"Unknown store '{store_key}'. Valid keys: {sorted(STORE_MAP.keys())}",
)
try:
raw = await file.read()
if len(raw) > 10 * 1024 * 1024:
raise HTTPException(status_code=400, detail="File too large (max 10 MB)")
payload = json.loads(raw)
except json.JSONDecodeError as e:
raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}")
# Support both full-backup format and partial-export format
if "stores" in payload and isinstance(payload.get("meta"), dict):
# Full backup: extract the specific store
if payload["meta"].get("format") not in ("ledgrab-backup",):
raise HTTPException(status_code=400, detail="Not a valid LED Grab backup or partial export file")
stores = payload.get("stores", {})
if store_key not in stores:
raise HTTPException(status_code=400, detail=f"Backup does not contain store '{store_key}'")
incoming = stores[store_key]
elif isinstance(payload.get("meta"), dict) and payload["meta"].get("format") == "ledgrab-partial-export":
# Partial export format
if payload["meta"].get("store_key") != store_key:
raise HTTPException(
status_code=400,
detail=f"File is for store '{payload['meta']['store_key']}', not '{store_key}'",
)
incoming = payload.get("store", {})
else:
raise HTTPException(status_code=400, detail="Not a valid LED Grab backup or partial export file")
if not isinstance(incoming, dict):
raise HTTPException(status_code=400, detail="Store data must be a JSON object")
config = get_config()
file_path = Path(getattr(config.storage, STORE_MAP[store_key]))
def _write():
if merge and file_path.exists():
with open(file_path, "r", encoding="utf-8") as f:
existing = json.load(f)
if isinstance(existing, dict):
existing.update(incoming)
atomic_write_json(file_path, existing)
return len(existing)
atomic_write_json(file_path, incoming)
return len(incoming)
count = await asyncio.to_thread(_write)
logger.info(f"Imported store '{store_key}' ({count} entries, merge={merge}). Scheduling restart...")
_schedule_restart()
return {
"status": "imported",
"store_key": store_key,
"entries": count,
"merge": merge,
"restart_scheduled": True,
"message": f"Imported {count} entries for '{store_key}'. Server restarting...",
}
@router.get("/api/v1/system/backup", tags=["System"])
def backup_config(_: AuthRequired):
"""Download all configuration as a single JSON backup file."""
config = get_config()
stores = {}
for store_key, config_attr in STORE_MAP.items():
file_path = Path(getattr(config.storage, config_attr))
if file_path.exists():
with open(file_path, "r", encoding="utf-8") as f:
stores[store_key] = json.load(f)
else:
stores[store_key] = {}
backup = {
"meta": {
"format": "ledgrab-backup",
"format_version": 1,
"app_version": __version__,
"created_at": datetime.now(timezone.utc).isoformat() + "Z",
"store_count": len(stores),
},
"stores": stores,
}
content = json.dumps(backup, indent=2, ensure_ascii=False)
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H%M%S")
filename = f"ledgrab-backup-{timestamp}.json"
return StreamingResponse(
io.BytesIO(content.encode("utf-8")),
media_type="application/json",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.post("/api/v1/system/restart", tags=["System"])
def restart_server(_: AuthRequired):
"""Schedule a server restart and return immediately."""
_schedule_restart()
return {"status": "restarting"}
@router.post("/api/v1/system/restore", response_model=RestoreResponse, tags=["System"])
async def restore_config(
_: AuthRequired,
file: UploadFile = File(...),
):
"""Upload a backup file to restore all configuration. Triggers server restart."""
# Read and parse
try:
raw = await file.read()
if len(raw) > 10 * 1024 * 1024: # 10 MB limit
raise HTTPException(status_code=400, detail="Backup file too large (max 10 MB)")
backup = json.loads(raw)
except json.JSONDecodeError as e:
raise HTTPException(status_code=400, detail=f"Invalid JSON file: {e}")
# Validate envelope
meta = backup.get("meta")
if not isinstance(meta, dict) or meta.get("format") != "ledgrab-backup":
raise HTTPException(status_code=400, detail="Not a valid LED Grab backup file")
fmt_version = meta.get("format_version", 0)
if fmt_version > 1:
raise HTTPException(
status_code=400,
detail=f"Backup format version {fmt_version} is not supported by this server version",
)
stores = backup.get("stores")
if not isinstance(stores, dict):
raise HTTPException(status_code=400, detail="Backup file missing 'stores' section")
known_keys = set(STORE_MAP.keys())
present_keys = known_keys & set(stores.keys())
if not present_keys:
raise HTTPException(status_code=400, detail="Backup contains no recognized store data")
for key in present_keys:
if not isinstance(stores[key], dict):
raise HTTPException(status_code=400, detail=f"Store '{key}' in backup is not a valid JSON object")
# Write store files atomically (in thread to avoid blocking event loop)
config = get_config()
def _write_stores():
count = 0
for store_key, config_attr in STORE_MAP.items():
if store_key in stores:
file_path = Path(getattr(config.storage, config_attr))
atomic_write_json(file_path, stores[store_key])
count += 1
logger.info(f"Restored store: {store_key} -> {file_path}")
return count
written = await asyncio.to_thread(_write_stores)
logger.info(f"Restore complete: {written}/{len(STORE_MAP)} stores written. Scheduling restart...")
_schedule_restart()
missing = known_keys - present_keys
return RestoreResponse(
status="restored",
stores_written=written,
stores_total=len(STORE_MAP),
missing_stores=sorted(missing) if missing else [],
restart_scheduled=True,
message=f"Restored {written} stores. Server restarting...",
)
# ---------------------------------------------------------------------------
# Auto-backup settings & saved backups
# ---------------------------------------------------------------------------
@router.get(
"/api/v1/system/auto-backup/settings",
response_model=AutoBackupStatusResponse,
tags=["System"],
)
async def get_auto_backup_settings(
_: AuthRequired,
engine: AutoBackupEngine = Depends(get_auto_backup_engine),
):
"""Get auto-backup settings and status."""
return engine.get_settings()
@router.put(
"/api/v1/system/auto-backup/settings",
response_model=AutoBackupStatusResponse,
tags=["System"],
)
async def update_auto_backup_settings(
_: AuthRequired,
body: AutoBackupSettings,
engine: AutoBackupEngine = Depends(get_auto_backup_engine),
):
"""Update auto-backup settings (enable/disable, interval, max backups)."""
return await engine.update_settings(
enabled=body.enabled,
interval_hours=body.interval_hours,
max_backups=body.max_backups,
)
@router.post("/api/v1/system/auto-backup/trigger", tags=["System"])
async def trigger_backup(
_: AuthRequired,
engine: AutoBackupEngine = Depends(get_auto_backup_engine),
):
"""Manually trigger a backup now."""
backup = await engine.trigger_backup()
return {"status": "ok", "backup": backup}
@router.get(
"/api/v1/system/backups",
response_model=BackupListResponse,
tags=["System"],
)
async def list_backups(
_: AuthRequired,
engine: AutoBackupEngine = Depends(get_auto_backup_engine),
):
"""List all saved backup files."""
backups = engine.list_backups()
return BackupListResponse(
backups=[BackupFileInfo(**b) for b in backups],
count=len(backups),
)
@router.get("/api/v1/system/backups/{filename}", tags=["System"])
def download_saved_backup(
filename: str,
_: AuthRequired,
engine: AutoBackupEngine = Depends(get_auto_backup_engine),
):
"""Download a specific saved backup file."""
try:
path = engine.get_backup_path(filename)
except (ValueError, FileNotFoundError) as e:
raise HTTPException(status_code=404, detail=str(e))
content = path.read_bytes()
return StreamingResponse(
io.BytesIO(content),
media_type="application/json",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.delete("/api/v1/system/backups/{filename}", tags=["System"])
async def delete_saved_backup(
filename: str,
_: AuthRequired,
engine: AutoBackupEngine = Depends(get_auto_backup_engine),
):
"""Delete a specific saved backup file."""
try:
engine.delete_backup(filename)
except (ValueError, FileNotFoundError) as e:
raise HTTPException(status_code=404, detail=str(e))
return {"status": "deleted", "filename": filename}
# ---------------------------------------------------------------------------
# MQTT settings
# ---------------------------------------------------------------------------
_MQTT_SETTINGS_FILE: Path | None = None
def _get_mqtt_settings_path() -> Path:
global _MQTT_SETTINGS_FILE
if _MQTT_SETTINGS_FILE is None:
cfg = get_config()
# Derive the data directory from any known storage file path
data_dir = Path(cfg.storage.devices_file).parent
_MQTT_SETTINGS_FILE = data_dir / "mqtt_settings.json"
return _MQTT_SETTINGS_FILE
def _load_mqtt_settings() -> dict:
"""Load MQTT settings: YAML config defaults overridden by JSON overrides file."""
cfg = get_config()
defaults = {
"enabled": cfg.mqtt.enabled,
"broker_host": cfg.mqtt.broker_host,
"broker_port": cfg.mqtt.broker_port,
"username": cfg.mqtt.username,
"password": cfg.mqtt.password,
"client_id": cfg.mqtt.client_id,
"base_topic": cfg.mqtt.base_topic,
}
path = _get_mqtt_settings_path()
if path.exists():
try:
with open(path, "r", encoding="utf-8") as f:
overrides = json.load(f)
defaults.update(overrides)
except Exception as e:
logger.warning(f"Failed to load MQTT settings override file: {e}")
return defaults
def _save_mqtt_settings(settings: dict) -> None:
"""Persist MQTT settings to the JSON override file."""
from wled_controller.utils import atomic_write_json
atomic_write_json(_get_mqtt_settings_path(), settings)
@router.get(
"/api/v1/system/mqtt/settings",
response_model=MQTTSettingsResponse,
tags=["System"],
)
async def get_mqtt_settings(_: AuthRequired):
"""Get current MQTT broker settings. Password is masked."""
s = _load_mqtt_settings()
return MQTTSettingsResponse(
enabled=s["enabled"],
broker_host=s["broker_host"],
broker_port=s["broker_port"],
username=s["username"],
password_set=bool(s.get("password")),
client_id=s["client_id"],
base_topic=s["base_topic"],
)
@router.put(
"/api/v1/system/mqtt/settings",
response_model=MQTTSettingsResponse,
tags=["System"],
)
async def update_mqtt_settings(_: AuthRequired, body: MQTTSettingsRequest):
"""Update MQTT broker settings. If password is empty string, the existing password is preserved."""
current = _load_mqtt_settings()
# If caller sends an empty password, keep the existing one
password = body.password if body.password else current.get("password", "")
new_settings = {
"enabled": body.enabled,
"broker_host": body.broker_host,
"broker_port": body.broker_port,
"username": body.username,
"password": password,
"client_id": body.client_id,
"base_topic": body.base_topic,
}
_save_mqtt_settings(new_settings)
logger.info("MQTT settings updated")
return MQTTSettingsResponse(
enabled=new_settings["enabled"],
broker_host=new_settings["broker_host"],
broker_port=new_settings["broker_port"],
username=new_settings["username"],
password_set=bool(new_settings["password"]),
client_id=new_settings["client_id"],
base_topic=new_settings["base_topic"],
)
# ---------------------------------------------------------------------------
# External URL setting
# ---------------------------------------------------------------------------
_EXTERNAL_URL_FILE: Path | None = None
def _get_external_url_path() -> Path:
global _EXTERNAL_URL_FILE
if _EXTERNAL_URL_FILE is None:
cfg = get_config()
data_dir = Path(cfg.storage.devices_file).parent
_EXTERNAL_URL_FILE = data_dir / "external_url.json"
return _EXTERNAL_URL_FILE
def load_external_url() -> str:
"""Load the external URL setting. Returns empty string if not set."""
path = _get_external_url_path()
if path.exists():
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return data.get("external_url", "")
except Exception:
pass
return ""
def _save_external_url(url: str) -> None:
from wled_controller.utils import atomic_write_json
atomic_write_json(_get_external_url_path(), {"external_url": url})
@router.get(
"/api/v1/system/external-url",
response_model=ExternalUrlResponse,
tags=["System"],
)
async def get_external_url(_: AuthRequired):
"""Get the configured external base URL."""
return ExternalUrlResponse(external_url=load_external_url())
@router.put(
"/api/v1/system/external-url",
response_model=ExternalUrlResponse,
tags=["System"],
)
async def update_external_url(_: AuthRequired, body: ExternalUrlRequest):
"""Set the external base URL used in webhook URLs and other user-visible URLs."""
url = body.external_url.strip().rstrip("/")
_save_external_url(url)
logger.info("External URL updated: %s", url or "(cleared)")
return ExternalUrlResponse(external_url=url)
# ---------------------------------------------------------------------------
# Live log viewer WebSocket
# ---------------------------------------------------------------------------
@router.websocket("/api/v1/system/logs/ws")
async def logs_ws(
websocket: WebSocket,
token: str = Query(""),
):
"""WebSocket that streams server log lines in real time.
Auth via ``?token=<api_key>``. On connect, sends the last ~500 buffered
lines as individual text messages, then pushes new lines as they appear.
"""
from wled_controller.api.auth import verify_ws_token
from wled_controller.utils import log_broadcaster
if not verify_ws_token(token):
await websocket.close(code=4001, reason="Unauthorized")
return
await websocket.accept()
# Ensure the broadcaster knows the event loop (may be first connection)
log_broadcaster.ensure_loop()
# Subscribe *before* reading the backlog so no lines slip through
queue = log_broadcaster.subscribe()
try:
# Send backlog first
for line in log_broadcaster.get_backlog():
await websocket.send_text(line)
# Stream new lines
while True:
try:
line = await asyncio.wait_for(queue.get(), timeout=30.0)
await websocket.send_text(line)
except asyncio.TimeoutError:
# Send a keepalive ping so the connection stays alive
try:
await websocket.send_text("")
except Exception:
break
except WebSocketDisconnect:
pass
except Exception:
pass
finally:
log_broadcaster.unsubscribe(queue)
# ---------------------------------------------------------------------------
# ADB helpers (for Android / scrcpy engine)
# ---------------------------------------------------------------------------
class AdbConnectRequest(BaseModel):
address: str
def _get_adb_path() -> str:
"""Get the adb binary path from the scrcpy engine's resolver."""
from wled_controller.core.capture_engines.scrcpy_engine import _get_adb
return _get_adb()
@router.post("/api/v1/adb/connect", tags=["ADB"])
async def adb_connect(_: AuthRequired, request: AdbConnectRequest):
"""Connect to a WiFi ADB device by IP address.
Appends ``:5555`` if no port is specified.
"""
address = request.address.strip()
if not address:
raise HTTPException(status_code=400, detail="Address is required")
if ":" not in address:
address = f"{address}:5555"
adb = _get_adb_path()
logger.info(f"Connecting ADB device: {address}")
try:
proc = await asyncio.create_subprocess_exec(
adb, "connect", address,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
output = (stdout.decode() + stderr.decode()).strip()
if "connected" in output.lower():
return {"status": "connected", "address": address, "message": output}
raise HTTPException(status_code=400, detail=output or "Connection failed")
except FileNotFoundError:
raise HTTPException(
status_code=500,
detail="adb not found on PATH. Install Android SDK Platform-Tools.",
)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="ADB connect timed out")
@router.post("/api/v1/adb/disconnect", tags=["ADB"])
async def adb_disconnect(_: AuthRequired, request: AdbConnectRequest):
"""Disconnect a WiFi ADB device."""
address = request.address.strip()
if not address:
raise HTTPException(status_code=400, detail="Address is required")
adb = _get_adb_path()
logger.info(f"Disconnecting ADB device: {address}")
try:
proc = await asyncio.create_subprocess_exec(
adb, "disconnect", address,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
return {"status": "disconnected", "message": stdout.decode().strip()}
except FileNotFoundError:
raise HTTPException(status_code=500, detail="adb not found on PATH")
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="ADB disconnect timed out")
# ─── Log level ─────────────────────────────────────────────────
_VALID_LOG_LEVELS = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
@router.get("/api/v1/system/log-level", response_model=LogLevelResponse, tags=["System"])
async def get_log_level(_: AuthRequired):
"""Get the current root logger log level."""
level_int = logging.getLogger().getEffectiveLevel()
return LogLevelResponse(level=logging.getLevelName(level_int))
@router.put("/api/v1/system/log-level", response_model=LogLevelResponse, tags=["System"])
async def set_log_level(_: AuthRequired, body: LogLevelRequest):
"""Change the root logger log level at runtime (no server restart required)."""
level_name = body.level.upper()
if level_name not in _VALID_LOG_LEVELS:
raise HTTPException(
status_code=400,
detail=f"Invalid log level '{body.level}'. Must be one of: {', '.join(sorted(_VALID_LOG_LEVELS))}",
)
level_int = getattr(logging, level_name)
root = logging.getLogger()
root.setLevel(level_int)
# Also update all handlers so they actually emit at the new level
for handler in root.handlers:
handler.setLevel(level_int)
logger.info("Log level changed to %s", level_name)
return LogLevelResponse(level=level_name)