Add adaptive FPS and honest device reachability during streaming

DDP uses fire-and-forget UDP, so when a WiFi device becomes overwhelmed
by sustained traffic, sends appear successful while the device is
actually unreachable. This adds:

- HTTP liveness probe (GET /json/info, 2s timeout) every 10s during
  streaming, exposed as device_streaming_reachable in target state
- Adaptive FPS (opt-in): exponential backoff when device is unreachable,
  gradual recovery when it stabilizes — finds sustainable send rate
- Honest health checks: removed the lie that forced device_online=true
  during streaming; now runs actual health checks regardless
- Target editor toggle, FPS display shows effective rate when throttled,
  health dot reflects streaming reachability, red highlight when
  unreachable
- Auto-backup scheduling support in settings modal

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-26 20:22:58 +03:00
parent f8656b72a6
commit cadef971e7
23 changed files with 873 additions and 21 deletions

View File

@@ -0,0 +1,220 @@
"""Auto-backup engine — periodic background backups of all configuration stores."""
import asyncio
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
from wled_controller import __version__
from wled_controller.utils import atomic_write_json, get_logger
logger = get_logger(__name__)
DEFAULT_SETTINGS = {
"enabled": False,
"interval_hours": 24,
"max_backups": 10,
}
class AutoBackupEngine:
"""Creates periodic backups of all configuration stores."""
def __init__(
self,
settings_path: Path,
backup_dir: Path,
store_map: Dict[str, str],
storage_config: Any,
):
self._settings_path = Path(settings_path)
self._backup_dir = Path(backup_dir)
self._store_map = store_map
self._storage_config = storage_config
self._task: Optional[asyncio.Task] = None
self._last_backup_time: Optional[datetime] = None
self._settings = self._load_settings()
self._backup_dir.mkdir(parents=True, exist_ok=True)
# ─── Settings persistence ──────────────────────────────────
def _load_settings(self) -> dict:
if self._settings_path.exists():
try:
with open(self._settings_path, "r", encoding="utf-8") as f:
data = json.load(f)
return {**DEFAULT_SETTINGS, **data}
except Exception as e:
logger.warning(f"Failed to load auto-backup settings: {e}")
return dict(DEFAULT_SETTINGS)
def _save_settings(self) -> None:
atomic_write_json(self._settings_path, {
"enabled": self._settings["enabled"],
"interval_hours": self._settings["interval_hours"],
"max_backups": self._settings["max_backups"],
})
# ─── Lifecycle ─────────────────────────────────────────────
async def start(self) -> None:
if self._settings["enabled"]:
self._start_loop()
logger.info(
f"Auto-backup engine started (every {self._settings['interval_hours']}h, "
f"max {self._settings['max_backups']})"
)
else:
logger.info("Auto-backup engine initialized (disabled)")
async def stop(self) -> None:
self._cancel_loop()
logger.info("Auto-backup engine stopped")
def _start_loop(self) -> None:
self._cancel_loop()
self._task = asyncio.create_task(self._backup_loop())
def _cancel_loop(self) -> None:
if self._task is not None:
self._task.cancel()
self._task = None
async def _backup_loop(self) -> None:
try:
# Perform first backup immediately on start
await self._perform_backup()
self._prune_old_backups()
interval_secs = self._settings["interval_hours"] * 3600
while True:
await asyncio.sleep(interval_secs)
try:
await self._perform_backup()
self._prune_old_backups()
except Exception as e:
logger.error(f"Auto-backup failed: {e}", exc_info=True)
except asyncio.CancelledError:
pass
# ─── Backup operations ─────────────────────────────────────
async def _perform_backup(self) -> None:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._perform_backup_sync)
def _perform_backup_sync(self) -> None:
stores = {}
for store_key, config_attr in self._store_map.items():
file_path = Path(getattr(self._storage_config, config_attr))
if file_path.exists():
with open(file_path, "r", encoding="utf-8") as f:
stores[store_key] = json.load(f)
else:
stores[store_key] = {}
now = datetime.now(timezone.utc)
backup = {
"meta": {
"format": "ledgrab-backup",
"format_version": 1,
"app_version": __version__,
"created_at": now.isoformat(),
"store_count": len(stores),
"auto_backup": True,
},
"stores": stores,
}
timestamp = now.strftime("%Y-%m-%dT%H%M%S")
filename = f"ledgrab-autobackup-{timestamp}.json"
file_path = self._backup_dir / filename
content = json.dumps(backup, indent=2, ensure_ascii=False)
file_path.write_text(content, encoding="utf-8")
self._last_backup_time = now
logger.info(f"Auto-backup created: {filename}")
def _prune_old_backups(self) -> None:
max_backups = self._settings["max_backups"]
files = sorted(self._backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime)
excess = len(files) - max_backups
if excess > 0:
for f in files[:excess]:
try:
f.unlink()
logger.info(f"Pruned old backup: {f.name}")
except Exception as e:
logger.warning(f"Failed to prune {f.name}: {e}")
# ─── Public API ────────────────────────────────────────────
def get_settings(self) -> dict:
next_backup = None
if self._settings["enabled"] and self._last_backup_time:
from datetime import timedelta
next_backup = (
self._last_backup_time + timedelta(hours=self._settings["interval_hours"])
).isoformat()
return {
"enabled": self._settings["enabled"],
"interval_hours": self._settings["interval_hours"],
"max_backups": self._settings["max_backups"],
"last_backup_time": self._last_backup_time.isoformat() if self._last_backup_time else None,
"next_backup_time": next_backup,
}
async def update_settings(self, enabled: bool, interval_hours: float, max_backups: int) -> dict:
self._settings["enabled"] = enabled
self._settings["interval_hours"] = interval_hours
self._settings["max_backups"] = max_backups
self._save_settings()
# Restart or stop the loop
if enabled:
self._start_loop()
logger.info(
f"Auto-backup enabled (every {interval_hours}h, max {max_backups})"
)
else:
self._cancel_loop()
logger.info("Auto-backup disabled")
# Prune if max_backups was reduced
self._prune_old_backups()
return self.get_settings()
def list_backups(self) -> List[dict]:
backups = []
for f in sorted(self._backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True):
stat = f.stat()
backups.append({
"filename": f.name,
"size_bytes": stat.st_size,
"created_at": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(),
})
return backups
def delete_backup(self, filename: str) -> None:
# Validate filename to prevent path traversal
if os.sep in filename or "/" in filename or ".." in filename:
raise ValueError("Invalid filename")
target = self._backup_dir / filename
if not target.exists():
raise FileNotFoundError(f"Backup not found: {filename}")
target.unlink()
logger.info(f"Deleted backup: {filename}")
def get_backup_path(self, filename: str) -> Path:
if os.sep in filename or "/" in filename or ".." in filename:
raise ValueError("Invalid filename")
target = self._backup_dir / filename
if not target.exists():
raise FileNotFoundError(f"Backup not found: {filename}")
return target

View File

@@ -322,6 +322,7 @@ class ProcessorManager:
state_check_interval: int = DEFAULT_STATE_CHECK_INTERVAL,
brightness_value_source_id: str = "",
min_brightness_threshold: int = 0,
adaptive_fps: bool = False,
):
"""Register a WLED target processor."""
if target_id in self._processors:
@@ -338,6 +339,7 @@ class ProcessorManager:
state_check_interval=state_check_interval,
brightness_value_source_id=brightness_value_source_id,
min_brightness_threshold=min_brightness_threshold,
adaptive_fps=adaptive_fps,
ctx=self._build_context(),
)
self._processors[target_id] = proc
@@ -834,11 +836,7 @@ class ProcessorManager:
try:
while self._health_monitoring_active:
if not self._device_is_processing(device_id):
await self._check_device_health(device_id)
else:
if state.health:
state.health.online = True
await self._check_device_health(device_id)
await asyncio.sleep(check_interval)
except asyncio.CancelledError:
pass

View File

@@ -56,6 +56,9 @@ class ProcessingMetrics:
# KC targets
timing_calc_colors_ms: float = 0.0
timing_broadcast_ms: float = 0.0
# Streaming liveness (HTTP probe during DDP)
device_streaming_reachable: Optional[bool] = None
fps_effective: int = 0
@dataclass

View File

@@ -8,6 +8,7 @@ import time
from datetime import datetime
from typing import Optional
import httpx
import numpy as np
from wled_controller.core.devices.led_client import LEDClient, create_led_client, get_device_capabilities
@@ -37,6 +38,7 @@ class WledTargetProcessor(TargetProcessor):
state_check_interval: int = 30,
brightness_value_source_id: str = "",
min_brightness_threshold: int = 0,
adaptive_fps: bool = False,
ctx: TargetContext = None,
):
super().__init__(target_id, ctx)
@@ -47,6 +49,11 @@ class WledTargetProcessor(TargetProcessor):
self._css_id = color_strip_source_id
self._brightness_vs_id = brightness_value_source_id
self._min_brightness_threshold = min_brightness_threshold
self._adaptive_fps = adaptive_fps
# Adaptive FPS / liveness probe runtime state
self._effective_fps: int = self._target_fps
self._device_reachable: Optional[bool] = None # None = not yet probed
# Runtime state (populated on start)
self._led_client: Optional[LEDClient] = None
@@ -60,6 +67,8 @@ class WledTargetProcessor(TargetProcessor):
# LED preview WebSocket clients
self._preview_clients: list = []
self._last_preview_colors: np.ndarray | None = None
self._last_preview_brightness: int = 255
# ----- Properties -----
@@ -205,6 +214,7 @@ class WledTargetProcessor(TargetProcessor):
if isinstance(settings, dict):
if "fps" in settings:
self._target_fps = settings["fps"] if settings["fps"] > 0 else 30
self._effective_fps = self._target_fps # reset adaptive
css_manager = self._ctx.color_strip_stream_manager
if css_manager and self._is_running and self._css_id:
css_manager.notify_target_fps(self._css_id, self._target_id, self._target_fps)
@@ -214,6 +224,10 @@ class WledTargetProcessor(TargetProcessor):
self._state_check_interval = settings["state_check_interval"]
if "min_brightness_threshold" in settings:
self._min_brightness_threshold = settings["min_brightness_threshold"]
if "adaptive_fps" in settings:
self._adaptive_fps = settings["adaptive_fps"]
if not self._adaptive_fps:
self._effective_fps = self._target_fps
logger.info(f"Updated settings for target {self._target_id}")
def update_device(self, device_id: str) -> None:
@@ -285,6 +299,14 @@ class WledTargetProcessor(TargetProcessor):
logger.info(f"Hot-swapped brightness VS for {self._target_id}: {old_vs_id} -> {vs_id}")
async def _probe_device(self, device_url: str, client: httpx.AsyncClient) -> bool:
"""HTTP liveness probe — lightweight GET to check if device is reachable."""
try:
resp = await client.get(f"{device_url}/json/info")
return resp.status_code == 200
except Exception:
return False
def get_display_index(self) -> Optional[int]:
"""Display index being captured, from the active stream."""
if self._resolved_display_index is not None:
@@ -349,6 +371,8 @@ class WledTargetProcessor(TargetProcessor):
"needs_keepalive": self._needs_keepalive,
"last_update": metrics.last_update,
"errors": [metrics.last_error] if metrics.last_error else [],
"device_streaming_reachable": self._device_reachable if self._is_running else None,
"fps_effective": self._effective_fps if self._is_running else None,
}
def get_metrics(self) -> dict:
@@ -432,6 +456,17 @@ class WledTargetProcessor(TargetProcessor):
def add_led_preview_client(self, ws) -> None:
self._preview_clients.append(ws)
# Send last known frame immediately so late joiners see current state
if self._last_preview_colors is not None:
data = bytes([self._last_preview_brightness]) + self._last_preview_colors.astype(np.uint8).tobytes()
asyncio.ensure_future(self._send_preview_to(ws, data))
@staticmethod
async def _send_preview_to(ws, data: bytes) -> None:
try:
await ws.send_bytes(data)
except Exception:
pass
def remove_led_preview_client(self, ws) -> None:
if ws in self._preview_clients:
@@ -536,9 +571,22 @@ class WledTargetProcessor(TargetProcessor):
_diag_device_info: Optional[DeviceInfo] = None
_diag_device_info_age = 0
# --- Liveness probe + adaptive FPS ---
_device_url = _init_device_info.device_url if _init_device_info else ""
_probe_enabled = _device_url.startswith("http")
_probe_interval = 10.0 # seconds between probes
_last_probe_time = 0.0 # force first probe soon (after 10s)
_probe_task: Optional[asyncio.Task] = None
_probe_client: Optional[httpx.AsyncClient] = None
if _probe_enabled:
_probe_client = httpx.AsyncClient(timeout=httpx.Timeout(2.0))
self._effective_fps = self._target_fps
self._device_reachable = None
logger.info(
f"Processing loop started for target {self._target_id} "
f"(css={self._css_id}, {_total_leds} LEDs, fps={self._target_fps})"
f"(css={self._css_id}, {_total_leds} LEDs, fps={self._target_fps}"
f"{', adaptive' if self._adaptive_fps else ''})"
)
next_frame_time = time.perf_counter()
@@ -548,7 +596,61 @@ class WledTargetProcessor(TargetProcessor):
while self._is_running:
loop_start = now = time.perf_counter()
target_fps = self._target_fps if self._target_fps > 0 else 30
frame_time = 1.0 / target_fps
# --- Liveness probe ---
# Collect result as soon as it's done (every iteration)
if _probe_task is not None and _probe_task.done():
try:
reachable = _probe_task.result()
except Exception:
reachable = False
prev_reachable = self._device_reachable
self._device_reachable = reachable
self._metrics.device_streaming_reachable = reachable
_probe_task = None
if self._adaptive_fps:
if not reachable:
# Backoff: halve effective FPS
old_eff = self._effective_fps
self._effective_fps = max(1, self._effective_fps // 2)
if old_eff != self._effective_fps:
logger.warning(
f"[ADAPTIVE] {self._target_id} device unreachable, "
f"FPS {old_eff}{self._effective_fps}"
)
next_frame_time = time.perf_counter()
else:
# Recovery: gradually increase
if self._effective_fps < target_fps:
step = max(1, target_fps // 8)
old_eff = self._effective_fps
self._effective_fps = min(target_fps, self._effective_fps + step)
if old_eff != self._effective_fps:
logger.info(
f"[ADAPTIVE] {self._target_id} device reachable, "
f"FPS {old_eff}{self._effective_fps}"
)
next_frame_time = time.perf_counter()
if prev_reachable != reachable:
logger.info(
f"[PROBE] {self._target_id} device "
f"{'reachable' if reachable else 'UNREACHABLE'}"
)
# Fire new probe every _probe_interval seconds
if _probe_enabled and _probe_task is None and (now - _last_probe_time) >= _probe_interval:
if _probe_client is not None:
_last_probe_time = now
_probe_task = asyncio.create_task(
self._probe_device(_device_url, _probe_client)
)
# Use effective FPS for frame timing
effective_fps = self._effective_fps if self._adaptive_fps else target_fps
self._metrics.fps_effective = effective_fps
frame_time = 1.0 / effective_fps
keepalive_interval = self._keepalive_interval
# Detect hot-swapped CSS stream
@@ -640,6 +742,8 @@ class WledTargetProcessor(TargetProcessor):
# Fit to device LED count and apply brightness
device_colors = self._fit_to_device(frame, _total_leds)
send_colors = _cached_brightness(device_colors, cur_brightness)
self._last_preview_colors = send_colors
self._last_preview_brightness = cur_brightness
# Send to LED device
if not self._is_running or self._led_client is None:
@@ -752,4 +856,11 @@ class WledTargetProcessor(TargetProcessor):
self._is_running = False
raise
finally:
# Clean up probe client
if _probe_client is not None:
await _probe_client.aclose()
if _probe_task is not None and not _probe_task.done():
_probe_task.cancel()
self._device_reachable = None
self._metrics.device_streaming_reachable = None
logger.info(f"Processing loop ended for target {self._target_id}")