Add server-side metrics ring buffer, seed dashboard charts from server history
Background task samples system (CPU/RAM/GPU) and per-target (FPS/timing) metrics every 1s into a 120-sample ring buffer (~2 min). New API endpoint GET /system/metrics-history returns the buffer. Dashboard charts now seed from server history on load instead of sessionStorage, surviving page refreshes. Also removes emoji from brightness source labels. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
123
server/src/wled_controller/core/processing/metrics_history.py
Normal file
123
server/src/wled_controller/core/processing/metrics_history.py
Normal file
@@ -0,0 +1,123 @@
|
||||
"""Server-side ring buffer for system and per-target metrics."""
|
||||
|
||||
import asyncio
|
||||
from collections import deque
|
||||
from datetime import datetime
|
||||
from typing import Dict, Optional
|
||||
|
||||
from wled_controller.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
MAX_SAMPLES = 120 # ~2 minutes at 1-second interval
|
||||
SAMPLE_INTERVAL = 1.0 # seconds
|
||||
|
||||
|
||||
def _collect_system_snapshot() -> dict:
|
||||
"""Collect CPU/RAM/GPU metrics (blocking — run in thread pool).
|
||||
|
||||
Returns a dict suitable for direct JSON serialization.
|
||||
"""
|
||||
import psutil
|
||||
|
||||
mem = psutil.virtual_memory()
|
||||
snapshot = {
|
||||
"t": datetime.utcnow().isoformat(),
|
||||
"cpu": psutil.cpu_percent(interval=None),
|
||||
"ram_pct": mem.percent,
|
||||
"ram_used": round(mem.used / 1024 / 1024, 1),
|
||||
"ram_total": round(mem.total / 1024 / 1024, 1),
|
||||
"gpu_util": None,
|
||||
"gpu_temp": None,
|
||||
}
|
||||
|
||||
try:
|
||||
from wled_controller.api.routes.system import _nvml_available, _nvml, _nvml_handle
|
||||
|
||||
if _nvml_available:
|
||||
util = _nvml.nvmlDeviceGetUtilizationRates(_nvml_handle)
|
||||
temp = _nvml.nvmlDeviceGetTemperature(_nvml_handle, _nvml.NVML_TEMPERATURE_GPU)
|
||||
snapshot["gpu_util"] = float(util.gpu)
|
||||
snapshot["gpu_temp"] = float(temp)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return snapshot
|
||||
|
||||
|
||||
class MetricsHistory:
|
||||
"""In-memory ring buffer collecting system and per-target metrics."""
|
||||
|
||||
def __init__(self, processor_manager):
|
||||
self._manager = processor_manager
|
||||
self._system: deque = deque(maxlen=MAX_SAMPLES)
|
||||
self._targets: Dict[str, deque] = {}
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
|
||||
async def start(self):
|
||||
"""Start the background sampling loop."""
|
||||
if self._task and not self._task.done():
|
||||
return
|
||||
self._task = asyncio.create_task(self._sample_loop())
|
||||
logger.info("Metrics history sampling started")
|
||||
|
||||
async def stop(self):
|
||||
"""Stop the background sampling loop."""
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._task = None
|
||||
logger.info("Metrics history sampling stopped")
|
||||
|
||||
async def _sample_loop(self):
|
||||
"""Sample system + target metrics every SAMPLE_INTERVAL seconds."""
|
||||
while True:
|
||||
try:
|
||||
await self._sample()
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.warning(f"Metrics sampling error: {e}")
|
||||
await asyncio.sleep(SAMPLE_INTERVAL)
|
||||
|
||||
async def _sample(self):
|
||||
"""Collect one snapshot of system and target metrics."""
|
||||
# System metrics (blocking psutil/nvml calls in thread pool)
|
||||
sys_snap = await asyncio.to_thread(_collect_system_snapshot)
|
||||
self._system.append(sys_snap)
|
||||
|
||||
# Per-target metrics from processor states
|
||||
try:
|
||||
all_states = self._manager.get_all_target_states()
|
||||
except Exception:
|
||||
all_states = {}
|
||||
|
||||
now = datetime.utcnow().isoformat()
|
||||
active_ids = set()
|
||||
for target_id, state in all_states.items():
|
||||
active_ids.add(target_id)
|
||||
if target_id not in self._targets:
|
||||
self._targets[target_id] = deque(maxlen=MAX_SAMPLES)
|
||||
if state.get("processing"):
|
||||
self._targets[target_id].append({
|
||||
"t": now,
|
||||
"fps": state.get("fps_actual"),
|
||||
"fps_target": state.get("fps_target"),
|
||||
"timing": state.get("timing_total_ms"),
|
||||
"errors": state.get("errors_count", 0),
|
||||
})
|
||||
|
||||
# Prune deques for targets no longer registered
|
||||
for tid in list(self._targets.keys()):
|
||||
if tid not in active_ids:
|
||||
del self._targets[tid]
|
||||
|
||||
def get_history(self) -> dict:
|
||||
"""Return all history for the API response."""
|
||||
return {
|
||||
"system": list(self._system),
|
||||
"targets": {tid: list(dq) for tid, dq in self._targets.items()},
|
||||
}
|
||||
@@ -16,6 +16,7 @@ from wled_controller.core.devices.led_client import (
|
||||
from wled_controller.core.audio.audio_capture import AudioCaptureManager
|
||||
from wled_controller.core.processing.live_stream_manager import LiveStreamManager
|
||||
from wled_controller.core.processing.color_strip_stream_manager import ColorStripStreamManager
|
||||
from wled_controller.core.processing.metrics_history import MetricsHistory
|
||||
from wled_controller.core.processing.value_stream import ValueStreamManager
|
||||
from wled_controller.core.capture.screen_overlay import OverlayManager
|
||||
from wled_controller.core.processing.target_processor import (
|
||||
@@ -97,8 +98,13 @@ class ProcessorManager:
|
||||
) if value_source_store else None
|
||||
self._overlay_manager = OverlayManager()
|
||||
self._event_queues: List[asyncio.Queue] = []
|
||||
self._metrics_history = MetricsHistory(self)
|
||||
logger.info("Processor manager initialized")
|
||||
|
||||
@property
|
||||
def metrics_history(self) -> MetricsHistory:
|
||||
return self._metrics_history
|
||||
|
||||
# ===== SHARED CONTEXT (passed to target processors) =====
|
||||
|
||||
def _build_context(self) -> TargetContext:
|
||||
@@ -718,6 +724,7 @@ class ProcessorManager:
|
||||
|
||||
async def stop_all(self):
|
||||
"""Stop processing and health monitoring for all targets and devices."""
|
||||
await self._metrics_history.stop()
|
||||
await self.stop_health_monitoring()
|
||||
|
||||
# Stop all processors
|
||||
@@ -761,6 +768,7 @@ class ProcessorManager:
|
||||
self._health_monitoring_active = True
|
||||
for device_id in self._devices:
|
||||
self._start_device_health_check(device_id)
|
||||
await self._metrics_history.start()
|
||||
logger.info("Started health monitoring for all devices")
|
||||
|
||||
async def stop_health_monitoring(self):
|
||||
|
||||
Reference in New Issue
Block a user