Files
wled-screen-controller-mixed/server/src/wled_controller/api/routes/system_settings.py
alexei.dolgolyov e2e1107df7
Some checks failed
Lint & Test / test (push) Has been cancelled
feat: asset-based image/video sources, notification sounds, UI improvements
- Replace URL-based image_source/url fields with image_asset_id/video_asset_id
  on StaticImagePictureSource and VideoCaptureSource (clean break, no migration)
- Resolve asset IDs to file paths at runtime via AssetStore.get_file_path()
- Add EntitySelect asset pickers for image/video in stream editor modal
- Add notification sound configuration (global sound + per-app overrides)
- Unify per-app color and sound overrides into single "Per-App Overrides" section
- Persist notification history between server restarts
- Add asset management system (upload, edit, delete, soft-delete)
- Replace emoji buttons with SVG icons throughout UI
- Various backend improvements: SQLite stores, auth, backup, MQTT, webhooks
2026-03-26 20:40:25 +03:00

339 lines
12 KiB
Python

"""System routes: MQTT, external URL, ADB, logs WebSocket, log level.
Extracted from system.py to keep files under 800 lines.
"""
import asyncio
import logging
import re
from fastapi import APIRouter, Depends, HTTPException, Query, WebSocket, WebSocketDisconnect
from pydantic import BaseModel
from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import get_database
from wled_controller.api.schemas.system import (
ExternalUrlRequest,
ExternalUrlResponse,
LogLevelRequest,
LogLevelResponse,
MQTTSettingsRequest,
MQTTSettingsResponse,
)
from wled_controller.config import get_config
from wled_controller.storage.database import Database
from wled_controller.utils import get_logger
logger = get_logger(__name__)
router = APIRouter()
# ---------------------------------------------------------------------------
# MQTT settings
# ---------------------------------------------------------------------------
def _load_mqtt_settings(db: Database) -> dict:
"""Load MQTT settings: YAML config defaults overridden by DB settings."""
cfg = get_config()
defaults = {
"enabled": cfg.mqtt.enabled,
"broker_host": cfg.mqtt.broker_host,
"broker_port": cfg.mqtt.broker_port,
"username": cfg.mqtt.username,
"password": cfg.mqtt.password,
"client_id": cfg.mqtt.client_id,
"base_topic": cfg.mqtt.base_topic,
}
overrides = db.get_setting("mqtt")
if overrides:
defaults.update(overrides)
return defaults
@router.get(
"/api/v1/system/mqtt/settings",
response_model=MQTTSettingsResponse,
tags=["System"],
)
async def get_mqtt_settings(_: AuthRequired, db: Database = Depends(get_database)):
"""Get current MQTT broker settings. Password is masked."""
s = _load_mqtt_settings(db)
return MQTTSettingsResponse(
enabled=s["enabled"],
broker_host=s["broker_host"],
broker_port=s["broker_port"],
username=s["username"],
password_set=bool(s.get("password")),
client_id=s["client_id"],
base_topic=s["base_topic"],
)
@router.put(
"/api/v1/system/mqtt/settings",
response_model=MQTTSettingsResponse,
tags=["System"],
)
async def update_mqtt_settings(_: AuthRequired, body: MQTTSettingsRequest, db: Database = Depends(get_database)):
"""Update MQTT broker settings. If password is empty string, the existing password is preserved."""
current = _load_mqtt_settings(db)
# If caller sends an empty password, keep the existing one
password = body.password if body.password else current.get("password", "")
new_settings = {
"enabled": body.enabled,
"broker_host": body.broker_host,
"broker_port": body.broker_port,
"username": body.username,
"password": password,
"client_id": body.client_id,
"base_topic": body.base_topic,
}
db.set_setting("mqtt", new_settings)
logger.info("MQTT settings updated")
return MQTTSettingsResponse(
enabled=new_settings["enabled"],
broker_host=new_settings["broker_host"],
broker_port=new_settings["broker_port"],
username=new_settings["username"],
password_set=bool(new_settings["password"]),
client_id=new_settings["client_id"],
base_topic=new_settings["base_topic"],
)
# ---------------------------------------------------------------------------
# External URL setting
# ---------------------------------------------------------------------------
def load_external_url(db: Database | None = None) -> str:
"""Load the external URL setting. Returns empty string if not set."""
if db is None:
from wled_controller.api.dependencies import get_database
db = get_database()
data = db.get_setting("external_url")
if data:
return data.get("external_url", "")
return ""
@router.get(
"/api/v1/system/external-url",
response_model=ExternalUrlResponse,
tags=["System"],
)
async def get_external_url(_: AuthRequired, db: Database = Depends(get_database)):
"""Get the configured external base URL."""
return ExternalUrlResponse(external_url=load_external_url(db))
@router.put(
"/api/v1/system/external-url",
response_model=ExternalUrlResponse,
tags=["System"],
)
async def update_external_url(_: AuthRequired, body: ExternalUrlRequest, db: Database = Depends(get_database)):
"""Set the external base URL used in webhook URLs and other user-visible URLs."""
url = body.external_url.strip().rstrip("/")
db.set_setting("external_url", {"external_url": url})
logger.info("External URL updated: %s", url or "(cleared)")
return ExternalUrlResponse(external_url=url)
# ---------------------------------------------------------------------------
# Live log viewer WebSocket
# ---------------------------------------------------------------------------
@router.websocket("/api/v1/system/logs/ws")
async def logs_ws(
websocket: WebSocket,
token: str = Query(""),
):
"""WebSocket that streams server log lines in real time.
Auth via ``?token=<api_key>``. On connect, sends the last ~500 buffered
lines as individual text messages, then pushes new lines as they appear.
"""
from wled_controller.api.auth import verify_ws_token
from wled_controller.utils import log_broadcaster
if not verify_ws_token(token):
await websocket.close(code=4001, reason="Unauthorized")
return
await websocket.accept()
# Ensure the broadcaster knows the event loop (may be first connection)
log_broadcaster.ensure_loop()
# Subscribe *before* reading the backlog so no lines slip through
queue = log_broadcaster.subscribe()
try:
# Send backlog first
for line in log_broadcaster.get_backlog():
await websocket.send_text(line)
# Stream new lines
while True:
try:
line = await asyncio.wait_for(queue.get(), timeout=30.0)
await websocket.send_text(line)
except asyncio.TimeoutError:
# Send a keepalive ping so the connection stays alive
try:
await websocket.send_text("")
except Exception:
break
except WebSocketDisconnect:
logger.debug("Log stream WebSocket disconnected")
pass
except Exception as e:
logger.debug("Log stream WebSocket error: %s", e)
pass
finally:
log_broadcaster.unsubscribe(queue)
# ---------------------------------------------------------------------------
# ADB helpers (for Android / scrcpy engine)
# ---------------------------------------------------------------------------
# Regex: IPv4 address with optional port, e.g. "192.168.1.5" or "192.168.1.5:5555"
_ADB_ADDRESS_RE = re.compile(
r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(:\d{1,5})?$"
)
class AdbConnectRequest(BaseModel):
address: str
def _validate_adb_address(address: str) -> None:
"""Raise 400 if *address* is not a valid IP:port for ADB."""
if not _ADB_ADDRESS_RE.match(address):
raise HTTPException(
status_code=400,
detail=(
f"Invalid ADB address '{address}'. "
"Expected format: <IP> or <IP>:<port>, e.g. 192.168.1.5 or 192.168.1.5:5555"
),
)
# Validate each octet is 0-255 and port is 1-65535
parts = address.split(":")
ip_parts = parts[0].split(".")
for octet in ip_parts:
if not (0 <= int(octet) <= 255):
raise HTTPException(
status_code=400,
detail=f"Invalid IP octet '{octet}' in address '{address}'. Each octet must be 0-255.",
)
if len(parts) == 2:
port = int(parts[1])
if not (1 <= port <= 65535):
raise HTTPException(
status_code=400,
detail=f"Invalid port '{parts[1]}' in address '{address}'. Port must be 1-65535.",
)
def _get_adb_path() -> str:
"""Get the adb binary path from the scrcpy engine's resolver."""
from wled_controller.core.capture_engines.scrcpy_engine import _get_adb
return _get_adb()
@router.post("/api/v1/adb/connect", tags=["ADB"])
async def adb_connect(_: AuthRequired, request: AdbConnectRequest):
"""Connect to a WiFi ADB device by IP address.
Appends ``:5555`` if no port is specified.
"""
address = request.address.strip()
if not address:
raise HTTPException(status_code=400, detail="Address is required")
_validate_adb_address(address)
if ":" not in address:
address = f"{address}:5555"
adb = _get_adb_path()
logger.info(f"Connecting ADB device: {address}")
try:
proc = await asyncio.create_subprocess_exec(
adb, "connect", address,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
output = (stdout.decode() + stderr.decode()).strip()
if "connected" in output.lower():
return {"status": "connected", "address": address, "message": output}
raise HTTPException(status_code=400, detail=output or "Connection failed")
except FileNotFoundError:
raise HTTPException(
status_code=500,
detail="adb not found on PATH. Install Android SDK Platform-Tools.",
)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="ADB connect timed out")
@router.post("/api/v1/adb/disconnect", tags=["ADB"])
async def adb_disconnect(_: AuthRequired, request: AdbConnectRequest):
"""Disconnect a WiFi ADB device."""
address = request.address.strip()
if not address:
raise HTTPException(status_code=400, detail="Address is required")
_validate_adb_address(address)
adb = _get_adb_path()
logger.info(f"Disconnecting ADB device: {address}")
try:
proc = await asyncio.create_subprocess_exec(
adb, "disconnect", address,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
return {"status": "disconnected", "message": stdout.decode().strip()}
except FileNotFoundError:
raise HTTPException(status_code=500, detail="adb not found on PATH")
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="ADB disconnect timed out")
# --- Log level -----
_VALID_LOG_LEVELS = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
@router.get("/api/v1/system/log-level", response_model=LogLevelResponse, tags=["System"])
async def get_log_level(_: AuthRequired):
"""Get the current root logger log level."""
level_int = logging.getLogger().getEffectiveLevel()
return LogLevelResponse(level=logging.getLevelName(level_int))
@router.put("/api/v1/system/log-level", response_model=LogLevelResponse, tags=["System"])
async def set_log_level(_: AuthRequired, body: LogLevelRequest):
"""Change the root logger log level at runtime (no server restart required)."""
level_name = body.level.upper()
if level_name not in _VALID_LOG_LEVELS:
raise HTTPException(
status_code=400,
detail=f"Invalid log level '{body.level}'. Must be one of: {', '.join(sorted(_VALID_LOG_LEVELS))}",
)
level_int = getattr(logging, level_name)
root = logging.getLogger()
root.setLevel(level_int)
# Also update all handlers so they actually emit at the new level
for handler in root.handlers:
handler.setLevel(level_int)
logger.info("Log level changed to %s", level_name)
return LogLevelResponse(level=level_name)