"""System routes: MQTT, external URL, ADB, logs WebSocket, log level. Extracted from system.py to keep files under 800 lines. """ import asyncio import json import logging import re from pathlib import Path from fastapi import APIRouter, HTTPException, Query, WebSocket, WebSocketDisconnect from pydantic import BaseModel from wled_controller.api.auth import AuthRequired from wled_controller.api.schemas.system import ( ExternalUrlRequest, ExternalUrlResponse, LogLevelRequest, LogLevelResponse, MQTTSettingsRequest, MQTTSettingsResponse, ) from wled_controller.config import get_config from wled_controller.utils import get_logger logger = get_logger(__name__) router = APIRouter() # --------------------------------------------------------------------------- # MQTT settings # --------------------------------------------------------------------------- _MQTT_SETTINGS_FILE: Path | None = None def _get_mqtt_settings_path() -> Path: global _MQTT_SETTINGS_FILE if _MQTT_SETTINGS_FILE is None: cfg = get_config() # Derive the data directory from any known storage file path data_dir = Path(cfg.storage.devices_file).parent _MQTT_SETTINGS_FILE = data_dir / "mqtt_settings.json" return _MQTT_SETTINGS_FILE def _load_mqtt_settings() -> dict: """Load MQTT settings: YAML config defaults overridden by JSON overrides file.""" cfg = get_config() defaults = { "enabled": cfg.mqtt.enabled, "broker_host": cfg.mqtt.broker_host, "broker_port": cfg.mqtt.broker_port, "username": cfg.mqtt.username, "password": cfg.mqtt.password, "client_id": cfg.mqtt.client_id, "base_topic": cfg.mqtt.base_topic, } path = _get_mqtt_settings_path() if path.exists(): try: with open(path, "r", encoding="utf-8") as f: overrides = json.load(f) defaults.update(overrides) except Exception as e: logger.warning(f"Failed to load MQTT settings override file: {e}") return defaults def _save_mqtt_settings(settings: dict) -> None: """Persist MQTT settings to the JSON override file.""" from wled_controller.utils import atomic_write_json atomic_write_json(_get_mqtt_settings_path(), settings) @router.get( "/api/v1/system/mqtt/settings", response_model=MQTTSettingsResponse, tags=["System"], ) async def get_mqtt_settings(_: AuthRequired): """Get current MQTT broker settings. Password is masked.""" s = _load_mqtt_settings() return MQTTSettingsResponse( enabled=s["enabled"], broker_host=s["broker_host"], broker_port=s["broker_port"], username=s["username"], password_set=bool(s.get("password")), client_id=s["client_id"], base_topic=s["base_topic"], ) @router.put( "/api/v1/system/mqtt/settings", response_model=MQTTSettingsResponse, tags=["System"], ) async def update_mqtt_settings(_: AuthRequired, body: MQTTSettingsRequest): """Update MQTT broker settings. If password is empty string, the existing password is preserved.""" current = _load_mqtt_settings() # If caller sends an empty password, keep the existing one password = body.password if body.password else current.get("password", "") new_settings = { "enabled": body.enabled, "broker_host": body.broker_host, "broker_port": body.broker_port, "username": body.username, "password": password, "client_id": body.client_id, "base_topic": body.base_topic, } _save_mqtt_settings(new_settings) logger.info("MQTT settings updated") return MQTTSettingsResponse( enabled=new_settings["enabled"], broker_host=new_settings["broker_host"], broker_port=new_settings["broker_port"], username=new_settings["username"], password_set=bool(new_settings["password"]), client_id=new_settings["client_id"], base_topic=new_settings["base_topic"], ) # --------------------------------------------------------------------------- # External URL setting # --------------------------------------------------------------------------- _EXTERNAL_URL_FILE: Path | None = None def _get_external_url_path() -> Path: global _EXTERNAL_URL_FILE if _EXTERNAL_URL_FILE is None: cfg = get_config() data_dir = Path(cfg.storage.devices_file).parent _EXTERNAL_URL_FILE = data_dir / "external_url.json" return _EXTERNAL_URL_FILE def load_external_url() -> str: """Load the external URL setting. Returns empty string if not set.""" path = _get_external_url_path() if path.exists(): try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) return data.get("external_url", "") except Exception: pass return "" def _save_external_url(url: str) -> None: from wled_controller.utils import atomic_write_json atomic_write_json(_get_external_url_path(), {"external_url": url}) @router.get( "/api/v1/system/external-url", response_model=ExternalUrlResponse, tags=["System"], ) async def get_external_url(_: AuthRequired): """Get the configured external base URL.""" return ExternalUrlResponse(external_url=load_external_url()) @router.put( "/api/v1/system/external-url", response_model=ExternalUrlResponse, tags=["System"], ) async def update_external_url(_: AuthRequired, body: ExternalUrlRequest): """Set the external base URL used in webhook URLs and other user-visible URLs.""" url = body.external_url.strip().rstrip("/") _save_external_url(url) logger.info("External URL updated: %s", url or "(cleared)") return ExternalUrlResponse(external_url=url) # --------------------------------------------------------------------------- # Live log viewer WebSocket # --------------------------------------------------------------------------- @router.websocket("/api/v1/system/logs/ws") async def logs_ws( websocket: WebSocket, token: str = Query(""), ): """WebSocket that streams server log lines in real time. Auth via ``?token=``. On connect, sends the last ~500 buffered lines as individual text messages, then pushes new lines as they appear. """ from wled_controller.api.auth import verify_ws_token from wled_controller.utils import log_broadcaster if not verify_ws_token(token): await websocket.close(code=4001, reason="Unauthorized") return await websocket.accept() # Ensure the broadcaster knows the event loop (may be first connection) log_broadcaster.ensure_loop() # Subscribe *before* reading the backlog so no lines slip through queue = log_broadcaster.subscribe() try: # Send backlog first for line in log_broadcaster.get_backlog(): await websocket.send_text(line) # Stream new lines while True: try: line = await asyncio.wait_for(queue.get(), timeout=30.0) await websocket.send_text(line) except asyncio.TimeoutError: # Send a keepalive ping so the connection stays alive try: await websocket.send_text("") except Exception: break except WebSocketDisconnect: pass except Exception: pass finally: log_broadcaster.unsubscribe(queue) # --------------------------------------------------------------------------- # ADB helpers (for Android / scrcpy engine) # --------------------------------------------------------------------------- # Regex: IPv4 address with optional port, e.g. "192.168.1.5" or "192.168.1.5:5555" _ADB_ADDRESS_RE = re.compile( r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(:\d{1,5})?$" ) class AdbConnectRequest(BaseModel): address: str def _validate_adb_address(address: str) -> None: """Raise 400 if *address* is not a valid IP:port for ADB.""" if not _ADB_ADDRESS_RE.match(address): raise HTTPException( status_code=400, detail=( f"Invalid ADB address '{address}'. " "Expected format: or :, e.g. 192.168.1.5 or 192.168.1.5:5555" ), ) # Validate each octet is 0-255 and port is 1-65535 parts = address.split(":") ip_parts = parts[0].split(".") for octet in ip_parts: if not (0 <= int(octet) <= 255): raise HTTPException( status_code=400, detail=f"Invalid IP octet '{octet}' in address '{address}'. Each octet must be 0-255.", ) if len(parts) == 2: port = int(parts[1]) if not (1 <= port <= 65535): raise HTTPException( status_code=400, detail=f"Invalid port '{parts[1]}' in address '{address}'. Port must be 1-65535.", ) def _get_adb_path() -> str: """Get the adb binary path from the scrcpy engine's resolver.""" from wled_controller.core.capture_engines.scrcpy_engine import _get_adb return _get_adb() @router.post("/api/v1/adb/connect", tags=["ADB"]) async def adb_connect(_: AuthRequired, request: AdbConnectRequest): """Connect to a WiFi ADB device by IP address. Appends ``:5555`` if no port is specified. """ address = request.address.strip() if not address: raise HTTPException(status_code=400, detail="Address is required") _validate_adb_address(address) if ":" not in address: address = f"{address}:5555" adb = _get_adb_path() logger.info(f"Connecting ADB device: {address}") try: proc = await asyncio.create_subprocess_exec( adb, "connect", address, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10) output = (stdout.decode() + stderr.decode()).strip() if "connected" in output.lower(): return {"status": "connected", "address": address, "message": output} raise HTTPException(status_code=400, detail=output or "Connection failed") except FileNotFoundError: raise HTTPException( status_code=500, detail="adb not found on PATH. Install Android SDK Platform-Tools.", ) except asyncio.TimeoutError: raise HTTPException(status_code=504, detail="ADB connect timed out") @router.post("/api/v1/adb/disconnect", tags=["ADB"]) async def adb_disconnect(_: AuthRequired, request: AdbConnectRequest): """Disconnect a WiFi ADB device.""" address = request.address.strip() if not address: raise HTTPException(status_code=400, detail="Address is required") adb = _get_adb_path() logger.info(f"Disconnecting ADB device: {address}") try: proc = await asyncio.create_subprocess_exec( adb, "disconnect", address, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10) return {"status": "disconnected", "message": stdout.decode().strip()} except FileNotFoundError: raise HTTPException(status_code=500, detail="adb not found on PATH") except asyncio.TimeoutError: raise HTTPException(status_code=504, detail="ADB disconnect timed out") # --- Log level ----- _VALID_LOG_LEVELS = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"} @router.get("/api/v1/system/log-level", response_model=LogLevelResponse, tags=["System"]) async def get_log_level(_: AuthRequired): """Get the current root logger log level.""" level_int = logging.getLogger().getEffectiveLevel() return LogLevelResponse(level=logging.getLevelName(level_int)) @router.put("/api/v1/system/log-level", response_model=LogLevelResponse, tags=["System"]) async def set_log_level(_: AuthRequired, body: LogLevelRequest): """Change the root logger log level at runtime (no server restart required).""" level_name = body.level.upper() if level_name not in _VALID_LOG_LEVELS: raise HTTPException( status_code=400, detail=f"Invalid log level '{body.level}'. Must be one of: {', '.join(sorted(_VALID_LOG_LEVELS))}", ) level_int = getattr(logging, level_name) root = logging.getLogger() root.setLevel(level_int) # Also update all handlers so they actually emit at the new level for handler in root.handlers: handler.setLevel(level_int) logger.info("Log level changed to %s", level_name) return LogLevelResponse(level=level_name)