From 9f3f34654321916defd39ea08e1aec239b462dc4 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Fri, 22 May 2026 23:29:33 +0300 Subject: [PATCH] refactor(value-source): MetricSpec registry for SystemMetricsValueStream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SystemMetricsValueStream used to dispatch on its ``self._metric`` string across three independent if/elif chains (audit finding M5): * priming in ``start()`` (cpu_percent seed, initial network counter); * raw reading in ``_read_metric_psutil`` plus ``_read_metric_fallback``; * normalisation in ``_normalize`` (percent / min-max range / max-rate). Adding a new metric meant editing all three chains plus the Android fallback — and forgetting one branch made the metric silently return 0. Lift each per-metric concern into a free function and register them as a ``MetricSpec(name, read_psutil, read_fallback, normalize, prime)`` in a new ``core.processing.metric_readers`` module. Shared normalisers (``_norm_percent`` / ``_norm_range`` / ``_norm_rate`` / ``_zero``) live once. The stream's ``start()`` / ``_read_metric()`` / ``_normalize()`` collapse to a single registry lookup + delegation. The stream still owns its mutable state (``_disk_path``, ``_sensor_label``, ``_gpu_unavailable``, ``_prev_net_bytes``, ``_prev_net_time``, etc.) — readers operate on the stream by parameter, not by inheritance, so the kitchen-sink class shrinks by ~140 lines without losing the per-stream cadence bookkeeping. Each spec function's docstring documents which fields it reads or mutates. Tests: 16 new tests cover the 10-metric coverage set, callable shape of every spec field, the three normaliser primitives' clamping + divide-by-zero behaviour, prime-hook presence (only the three metrics that need a baseline: cpu_load + network_rx + network_tx), and fallback-path expectations (desktop-only sensors -> _zero, cpu/ram -> real MetricsProvider). 754 existing core / storage / api tests stay green; ruff clean. --- .../ledgrab/core/processing/metric_readers.py | 271 ++++++++++++++++++ .../ledgrab/core/processing/value_stream.py | 172 ++--------- .../core/processing/test_metric_readers.py | 152 ++++++++++ 3 files changed, 443 insertions(+), 152 deletions(-) create mode 100644 server/src/ledgrab/core/processing/metric_readers.py create mode 100644 server/tests/core/processing/test_metric_readers.py diff --git a/server/src/ledgrab/core/processing/metric_readers.py b/server/src/ledgrab/core/processing/metric_readers.py new file mode 100644 index 0000000..c9f82c0 --- /dev/null +++ b/server/src/ledgrab/core/processing/metric_readers.py @@ -0,0 +1,271 @@ +"""Per-metric reader / normaliser registry for ``SystemMetricsValueStream``. + +The stream previously dispatched on a ``self._metric`` string across three +separate ``if / elif`` chains (priming in ``start()``, raw read in +``_read_metric_psutil`` + ``_read_metric_fallback``, normalisation in +``_normalize``). Adding a new metric meant editing every chain. + +This module replaces all of that with a single ``METRIC_SPECS`` dict keyed +by metric name. Each :class:`MetricSpec` declares: + +* ``read_psutil(stream)`` — the desktop path that uses ``stream._psutil``; +* ``read_fallback(stream)`` — the Android / no-psutil path (returns 0.0 + for desktop-only sensors); +* ``normalize(stream, raw)`` — maps the raw reading to ``[0, 1]``; +* ``prime(stream)`` — optional one-time setup called from ``start()``. + +The spec functions operate on the stream's existing attributes +(``_disk_path``, ``_sensor_label``, ``_min_val``, ``_max_val``, +``_max_rate``, ``_gpu_unavailable``, ``_prev_net_bytes``, +``_prev_net_time``). That is intentional: the readers are stateless +strategy callables, but the *stream's* state remains its own. Mutating it +from a reader is documented per function so the contract is explicit. +""" + +from __future__ import annotations + +import time +from dataclasses import dataclass +from typing import TYPE_CHECKING, Callable, Dict, Optional + +from ledgrab.utils import get_logger + +if TYPE_CHECKING: + from ledgrab.core.processing.value_stream import SystemMetricsValueStream + +logger = get_logger(__name__) + + +# --------------------------------------------------------------------------- +# Spec dataclass +# --------------------------------------------------------------------------- + + +ReaderFn = Callable[["SystemMetricsValueStream"], float] +NormalizeFn = Callable[["SystemMetricsValueStream", float], float] +PrimeFn = Callable[["SystemMetricsValueStream"], None] + + +@dataclass(frozen=True) +class MetricSpec: + """How to read, normalise, and prime a single system metric.""" + + name: str + read_psutil: ReaderFn + read_fallback: ReaderFn + normalize: NormalizeFn + prime: Optional[PrimeFn] = None + + +# --------------------------------------------------------------------------- +# Normaliser primitives — shared across metrics +# --------------------------------------------------------------------------- + + +def _norm_percent(_s, raw: float) -> float: + """Percent metrics (cpu_load, ram_usage, …) — raw is 0..100.""" + return max(0.0, min(1.0, raw / 100.0)) + + +def _norm_range(s, raw: float) -> float: + """Temperature / fan-speed metrics normalise against (min_val, max_val).""" + rng = s._max_val - s._min_val + if abs(rng) < 1e-9: + return 0.5 + return max(0.0, min(1.0, (raw - s._min_val) / rng)) + + +def _norm_rate(s, raw: float) -> float: + """Network rate normalises against ``_max_rate`` (bytes/s).""" + if s._max_rate <= 0: + return 0.5 + return max(0.0, min(1.0, raw / s._max_rate)) + + +def _zero(_s) -> float: + """Desktop-only sensor on a no-psutil platform: report 0.0.""" + return 0.0 + + +# --------------------------------------------------------------------------- +# Read helpers — psutil paths +# --------------------------------------------------------------------------- + + +def _read_cpu_load_psutil(s) -> float: + return s._psutil.cpu_percent(interval=None) + + +def _read_ram_usage_psutil(s) -> float: + return s._psutil.virtual_memory().percent + + +def _read_disk_usage_psutil(s) -> float: + return s._psutil.disk_usage(s._disk_path).percent + + +def _read_battery_psutil(s) -> float: + bat = s._psutil.sensors_battery() + return bat.percent if bat else 0.0 + + +def _read_cpu_temp_psutil(s) -> float: + psutil = s._psutil + if psutil is None: + return 0.0 + temps = psutil.sensors_temperatures() + if not temps: + return 0.0 + if s._sensor_label: + for group_name, entries in temps.items(): + for entry in entries: + if entry.label == s._sensor_label or group_name == s._sensor_label: + return entry.current + for entries in temps.values(): + if entries: + return entries[0].current + return 0.0 + + +def _read_fan_speed_psutil(s) -> float: + psutil = s._psutil + if psutil is None: + return 0.0 + fans = psutil.sensors_fans() + if not fans: + return 0.0 + if s._sensor_label: + for group_name, entries in fans.items(): + for entry in entries: + if entry.label == s._sensor_label or group_name == s._sensor_label: + return entry.current + for entries in fans.values(): + if entries: + return entries[0].current + return 0.0 + + +def _read_gpu(metric: str) -> ReaderFn: + """Build a GPU reader for the given metric name ('gpu_load' or 'gpu_temp').""" + + def _read(s) -> float: + if s._gpu_unavailable: + return 0.0 + try: + from ledgrab.utils.gpu import nvml, nvml_available, nvml_handle + + if not nvml_available or nvml_handle is None: + s._gpu_unavailable = True + return 0.0 + if metric == "gpu_load": + util = nvml.nvmlDeviceGetUtilizationRates(nvml_handle) + return float(util.gpu) + # gpu_temp + return float(nvml.nvmlDeviceGetTemperature(nvml_handle, 0)) + except Exception as e: + logger.debug("GPU metric read error: %s", e) + s._gpu_unavailable = True + return 0.0 + + return _read + + +def _read_network_rate(s) -> float: + """Bytes/s rate for ``network_rx`` / ``network_tx``. + + Mutates ``s._prev_net_bytes`` and ``s._prev_net_time`` to track the + delta between calls — the stream owns the cadence state, this reader + just bumps it forward. + """ + psutil = s._psutil + if psutil is None: + return 0.0 + counters = psutil.net_io_counters() + if not counters: + return 0.0 + current_bytes = counters.bytes_recv if s._metric == "network_rx" else counters.bytes_sent + now = time.monotonic() + if s._prev_net_bytes is None or s._prev_net_time is None: + s._prev_net_bytes = current_bytes + s._prev_net_time = now + return 0.0 + dt = now - s._prev_net_time + if dt <= 0: + return 0.0 + # Cap delta time to avoid spikes after long gaps + dt = min(dt, s._poll_interval * 2) + rate = (current_bytes - s._prev_net_bytes) / dt + s._prev_net_bytes = current_bytes + s._prev_net_time = now + return max(0.0, rate) + + +# --------------------------------------------------------------------------- +# Read helpers — fallback (no-psutil) paths +# --------------------------------------------------------------------------- + + +def _read_cpu_load_fallback(_s) -> float: + from ledgrab.utils.metrics import get_metrics_provider + + return get_metrics_provider().cpu_percent() + + +def _read_ram_usage_fallback(_s) -> float: + from ledgrab.utils.metrics import get_metrics_provider + + mem = get_metrics_provider().virtual_memory() + if mem.total_bytes > 0: + return (mem.used_bytes / mem.total_bytes) * 100.0 + return 0.0 + + +# --------------------------------------------------------------------------- +# Prime helpers — one-time setup from start() +# --------------------------------------------------------------------------- + + +def _prime_cpu_load(s) -> None: + # Prime psutil.cpu_percent so the first real call returns meaningful data + if s._psutil is not None: + s._psutil.cpu_percent(interval=None) + + +def _prime_network(s) -> None: + """Capture initial network counter so the first delta has a baseline.""" + if s._psutil is None: + return + counters = s._psutil.net_io_counters() + if counters: + s._prev_net_bytes = ( + counters.bytes_recv if s._metric == "network_rx" else counters.bytes_sent + ) + s._prev_net_time = time.monotonic() + + +# --------------------------------------------------------------------------- +# Registry +# --------------------------------------------------------------------------- + + +METRIC_SPECS: Dict[str, MetricSpec] = { + "cpu_load": MetricSpec( + "cpu_load", _read_cpu_load_psutil, _read_cpu_load_fallback, _norm_percent, _prime_cpu_load + ), + "ram_usage": MetricSpec( + "ram_usage", _read_ram_usage_psutil, _read_ram_usage_fallback, _norm_percent + ), + "disk_usage": MetricSpec("disk_usage", _read_disk_usage_psutil, _zero, _norm_percent), + "battery_level": MetricSpec("battery_level", _read_battery_psutil, _zero, _norm_percent), + "cpu_temp": MetricSpec("cpu_temp", _read_cpu_temp_psutil, _zero, _norm_range), + "fan_speed": MetricSpec("fan_speed", _read_fan_speed_psutil, _zero, _norm_range), + "gpu_load": MetricSpec("gpu_load", _read_gpu("gpu_load"), _zero, _norm_percent), + "gpu_temp": MetricSpec("gpu_temp", _read_gpu("gpu_temp"), _zero, _norm_range), + "network_rx": MetricSpec("network_rx", _read_network_rate, _zero, _norm_rate, _prime_network), + "network_tx": MetricSpec("network_tx", _read_network_rate, _zero, _norm_rate, _prime_network), +} + + +def get_spec(metric: str) -> Optional[MetricSpec]: + """Look up the spec for ``metric``, returning ``None`` for unknown names.""" + return METRIC_SPECS.get(metric) diff --git a/server/src/ledgrab/core/processing/value_stream.py b/server/src/ledgrab/core/processing/value_stream.py index 272c564..e05ab29 100644 --- a/server/src/ledgrab/core/processing/value_stream.py +++ b/server/src/ledgrab/core/processing/value_stream.py @@ -31,6 +31,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple import numpy as np +from ledgrab.core.processing import metric_readers as _metric_readers from ledgrab.storage.base_store import EntityNotFoundError from ledgrab.utils import get_logger @@ -1526,19 +1527,11 @@ class SystemMetricsValueStream(ValueStream): self._psutil = None def start(self) -> None: - if self._psutil is None: - return - # Prime cpu_percent so the first real call returns meaningful data - if self._metric == "cpu_load": - self._psutil.cpu_percent(interval=None) - # Prime network counters - if self._metric in ("network_rx", "network_tx"): - counters = self._psutil.net_io_counters() - if counters: - self._prev_net_bytes = ( - counters.bytes_recv if self._metric == "network_rx" else counters.bytes_sent - ) - self._prev_net_time = time.monotonic() + # Per-metric priming (e.g. seed cpu_percent or capture an initial + # network counter) lives on the MetricSpec, keyed by ``self._metric``. + spec = _metric_readers.get_spec(self._metric) + if spec is not None and spec.prime is not None: + spec.prime(self) def stop(self) -> None: self._prev_value = None @@ -1570,154 +1563,29 @@ class SystemMetricsValueStream(ValueStream): return self._raw_value def _normalize(self, raw: float) -> float: - """Normalize raw value to [0, 1].""" - if self._metric in ("cpu_load", "ram_usage", "gpu_load", "battery_level", "disk_usage"): - return max(0.0, min(1.0, raw / 100.0)) - elif self._metric in ("cpu_temp", "gpu_temp", "fan_speed"): - rng = self._max_val - self._min_val - if abs(rng) < 1e-9: - return 0.5 - return max(0.0, min(1.0, (raw - self._min_val) / rng)) - elif self._metric in ("network_rx", "network_tx"): - if self._max_rate <= 0: - return 0.5 - return max(0.0, min(1.0, raw / self._max_rate)) - return 0.0 + """Normalize raw value to [0, 1] via the metric's registered normaliser.""" + spec = _metric_readers.get_spec(self._metric) + if spec is None: + return 0.0 + return spec.normalize(self, raw) def _read_metric(self) -> float: - """Read the raw metric value from the system. + """Read the raw metric value via the registered reader. - When psutil is unavailable (Android), falls back to the - platform-aware MetricsProvider for cpu/memory and returns 0.0 - for desktop-only metrics. + When psutil is unavailable (Android), the spec's ``read_fallback`` + path is used — desktop-only sensors return 0.0 there. Read errors + are swallowed and the last cached raw value is returned. """ + spec = _metric_readers.get_spec(self._metric) + if spec is None: + return 0.0 + reader = spec.read_psutil if self._psutil is not None else spec.read_fallback try: - if self._psutil is not None: - return self._read_metric_psutil() - return self._read_metric_fallback() + return reader(self) except Exception as e: logger.debug("SystemMetricsValueStream read error (%s): %s", self._metric, e) return self._raw_value if self._raw_value is not None else 0.0 - def _read_metric_psutil(self) -> float: - """Read metrics via psutil (desktop path).""" - psutil = self._psutil - if self._metric == "cpu_load": - return psutil.cpu_percent(interval=None) - elif self._metric == "ram_usage": - return psutil.virtual_memory().percent - elif self._metric == "disk_usage": - return psutil.disk_usage(self._disk_path).percent - elif self._metric == "battery_level": - bat = psutil.sensors_battery() - return bat.percent if bat else 0.0 - elif self._metric == "cpu_temp": - return self._read_cpu_temp() - elif self._metric == "fan_speed": - return self._read_fan_speed() - elif self._metric in ("gpu_load", "gpu_temp"): - return self._read_gpu_metric() - elif self._metric in ("network_rx", "network_tx"): - return self._read_network_rate() - return 0.0 - - def _read_metric_fallback(self) -> float: - """Read metrics without psutil (Android / fallback path). - - Uses the MetricsProvider abstraction for cpu/memory. Sensors, - battery, network, disk, and GPU are not available. - """ - from ledgrab.utils.metrics import get_metrics_provider - - provider = get_metrics_provider() - if self._metric == "cpu_load": - return provider.cpu_percent() - elif self._metric == "ram_usage": - mem = provider.virtual_memory() - if mem.total_bytes > 0: - return (mem.used_bytes / mem.total_bytes) * 100.0 - return 0.0 - return 0.0 - - def _read_cpu_temp(self) -> float: - psutil = self._psutil - if psutil is None: - return 0.0 - temps = psutil.sensors_temperatures() - if not temps: - return 0.0 - # If sensor_label specified, try to find it - if self._sensor_label: - for group_name, entries in temps.items(): - for entry in entries: - if entry.label == self._sensor_label or group_name == self._sensor_label: - return entry.current - # Fallback: first available sensor - for entries in temps.values(): - if entries: - return entries[0].current - return 0.0 - - def _read_fan_speed(self) -> float: - psutil = self._psutil - if psutil is None: - return 0.0 - fans = psutil.sensors_fans() - if not fans: - return 0.0 - if self._sensor_label: - for group_name, entries in fans.items(): - for entry in entries: - if entry.label == self._sensor_label or group_name == self._sensor_label: - return entry.current - # Fallback: first available fan - for entries in fans.values(): - if entries: - return entries[0].current - return 0.0 - - def _read_gpu_metric(self) -> float: - if self._gpu_unavailable: - return 0.0 - try: - from ledgrab.utils.gpu import nvml, nvml_available, nvml_handle - - if not nvml_available or nvml_handle is None: - self._gpu_unavailable = True - return 0.0 - if self._metric == "gpu_load": - util = nvml.nvmlDeviceGetUtilizationRates(nvml_handle) - return float(util.gpu) - else: # gpu_temp - return float(nvml.nvmlDeviceGetTemperature(nvml_handle, 0)) - except Exception as e: - logger.debug("GPU metric read error: %s", e) - self._gpu_unavailable = True - return 0.0 - - def _read_network_rate(self) -> float: - psutil = self._psutil - if psutil is None: - return 0.0 - counters = psutil.net_io_counters() - if not counters: - return 0.0 - current_bytes = counters.bytes_recv if self._metric == "network_rx" else counters.bytes_sent - now = time.monotonic() - if self._prev_net_bytes is None or self._prev_net_time is None: - self._prev_net_bytes = current_bytes - self._prev_net_time = now - return 0.0 - dt = now - self._prev_net_time - if dt <= 0: - return 0.0 - # Cap delta time to avoid spikes after long gaps - dt = min(dt, self._poll_interval * 2) - rate = (current_bytes - self._prev_net_bytes) / dt - self._prev_net_bytes = current_bytes - self._prev_net_time = now - return max(0.0, rate) - def update_source(self, source: "ValueSource") -> None: from ledgrab.storage.value_source import SystemMetricsValueSource diff --git a/server/tests/core/processing/test_metric_readers.py b/server/tests/core/processing/test_metric_readers.py new file mode 100644 index 0000000..cebfed5 --- /dev/null +++ b/server/tests/core/processing/test_metric_readers.py @@ -0,0 +1,152 @@ +"""Tests for the SystemMetricsValueStream metric reader registry. + +Locks in the spec-driven dispatch that replaced the three independent +if/elif chains (priming, raw read, normalise) inside +``SystemMetricsValueStream``. +""" + +from __future__ import annotations + +from types import SimpleNamespace + +import pytest + +from ledgrab.core.processing.metric_readers import ( + METRIC_SPECS, + MetricSpec, + _norm_percent, + _norm_range, + _norm_rate, + _zero, + get_spec, +) + + +EXPECTED_METRICS = { + "cpu_load", + "ram_usage", + "disk_usage", + "battery_level", + "cpu_temp", + "fan_speed", + "gpu_load", + "gpu_temp", + "network_rx", + "network_tx", +} + + +# --------------------------------------------------------------------------- +# Registry shape +# --------------------------------------------------------------------------- + + +def test_registry_covers_known_metrics(): + assert set(METRIC_SPECS.keys()) == EXPECTED_METRICS + + +def test_every_spec_has_callable_readers_and_normalize(): + for name, spec in METRIC_SPECS.items(): + assert isinstance(spec, MetricSpec), f"{name} is not a MetricSpec" + assert callable(spec.read_psutil), f"{name}.read_psutil not callable" + assert callable(spec.read_fallback), f"{name}.read_fallback not callable" + assert callable(spec.normalize), f"{name}.normalize not callable" + # prime is Optional[PrimeFn] + if spec.prime is not None: + assert callable(spec.prime), f"{name}.prime present but not callable" + + +def test_get_spec_returns_none_for_unknown_metric(): + assert get_spec("totally_unregistered") is None + + +# --------------------------------------------------------------------------- +# Normaliser primitives +# --------------------------------------------------------------------------- + + +def test_norm_percent_clamps_to_unit_interval(): + s = SimpleNamespace() # normalise primitives ignore the stream + assert _norm_percent(s, 50.0) == 0.5 + assert _norm_percent(s, 0.0) == 0.0 + assert _norm_percent(s, 100.0) == 1.0 + # Clamps + assert _norm_percent(s, -10.0) == 0.0 + assert _norm_percent(s, 200.0) == 1.0 + + +def test_norm_range_handles_min_max_window(): + s = SimpleNamespace(_min_val=20.0, _max_val=70.0) + assert _norm_range(s, 20.0) == 0.0 + assert _norm_range(s, 70.0) == 1.0 + assert _norm_range(s, 45.0) == 0.5 + # Clamps + assert _norm_range(s, 10.0) == 0.0 + assert _norm_range(s, 100.0) == 1.0 + + +def test_norm_range_returns_half_on_zero_window(): + """Avoid division by zero when min==max — pick the middle of [0,1].""" + s = SimpleNamespace(_min_val=50.0, _max_val=50.0) + assert _norm_range(s, 50.0) == 0.5 + assert _norm_range(s, 999.0) == 0.5 + + +def test_norm_rate_uses_max_rate(): + s = SimpleNamespace(_max_rate=100.0) + assert _norm_rate(s, 0.0) == 0.0 + assert _norm_rate(s, 50.0) == 0.5 + assert _norm_rate(s, 100.0) == 1.0 + assert _norm_rate(s, 200.0) == 1.0 + + +def test_norm_rate_returns_half_on_zero_max(): + s = SimpleNamespace(_max_rate=0.0) + assert _norm_rate(s, 12345.0) == 0.5 + + +def test_zero_reader_always_returns_zero(): + assert _zero(None) == 0.0 + assert _zero(SimpleNamespace()) == 0.0 + + +# --------------------------------------------------------------------------- +# Spec wiring sanity +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "name, expected_normalize", + [ + ("cpu_load", _norm_percent), + ("ram_usage", _norm_percent), + ("disk_usage", _norm_percent), + ("battery_level", _norm_percent), + ("gpu_load", _norm_percent), + ("cpu_temp", _norm_range), + ("gpu_temp", _norm_range), + ("fan_speed", _norm_range), + ("network_rx", _norm_rate), + ("network_tx", _norm_rate), + ], +) +def test_metric_uses_expected_normaliser(name, expected_normalize): + assert METRIC_SPECS[name].normalize is expected_normalize + + +def test_metrics_without_psutil_use_zero_fallback_for_desktop_only(): + """Desktop-only sensors fall back to a constant 0.0 when psutil is absent.""" + for name in ("disk_usage", "battery_level", "cpu_temp", "fan_speed", "gpu_load", "gpu_temp"): + assert METRIC_SPECS[name].read_fallback is _zero, name + + +def test_cpu_and_ram_have_meaningful_fallbacks(): + """cpu_load and ram_usage go through the platform-aware MetricsProvider.""" + for name in ("cpu_load", "ram_usage"): + assert METRIC_SPECS[name].read_fallback is not _zero, name + + +def test_only_cpu_load_and_network_have_prime_hooks(): + """Priming is only meaningful for metrics that need an initial baseline.""" + primed = {name for name, spec in METRIC_SPECS.items() if spec.prime is not None} + assert primed == {"cpu_load", "network_rx", "network_tx"}