refactor(value-source): MetricSpec registry for SystemMetricsValueStream

SystemMetricsValueStream used to dispatch on its ``self._metric`` string
across three independent if/elif chains (audit finding M5):

  * priming in ``start()`` (cpu_percent seed, initial network counter);
  * raw reading in ``_read_metric_psutil`` plus ``_read_metric_fallback``;
  * normalisation in ``_normalize`` (percent / min-max range / max-rate).

Adding a new metric meant editing all three chains plus the Android
fallback — and forgetting one branch made the metric silently return 0.

Lift each per-metric concern into a free function and register them as a
``MetricSpec(name, read_psutil, read_fallback, normalize, prime)`` in a
new ``core.processing.metric_readers`` module. Shared normalisers
(``_norm_percent`` / ``_norm_range`` / ``_norm_rate`` / ``_zero``) live
once. The stream's ``start()`` / ``_read_metric()`` / ``_normalize()``
collapse to a single registry lookup + delegation.

The stream still owns its mutable state (``_disk_path``,
``_sensor_label``, ``_gpu_unavailable``, ``_prev_net_bytes``,
``_prev_net_time``, etc.) — readers operate on the stream by
parameter, not by inheritance, so the kitchen-sink class shrinks by
~140 lines without losing the per-stream cadence bookkeeping. Each
spec function's docstring documents which fields it reads or mutates.

Tests: 16 new tests cover the 10-metric coverage set, callable shape
of every spec field, the three normaliser primitives' clamping +
divide-by-zero behaviour, prime-hook presence (only the three metrics
that need a baseline: cpu_load + network_rx + network_tx), and
fallback-path expectations (desktop-only sensors -> _zero, cpu/ram ->
real MetricsProvider).

754 existing core / storage / api tests stay green; ruff clean.
This commit is contained in:
2026-05-22 23:29:33 +03:00
parent 98fb61d932
commit 9f3f346543
3 changed files with 443 additions and 152 deletions
@@ -0,0 +1,271 @@
"""Per-metric reader / normaliser registry for ``SystemMetricsValueStream``.
The stream previously dispatched on a ``self._metric`` string across three
separate ``if / elif`` chains (priming in ``start()``, raw read in
``_read_metric_psutil`` + ``_read_metric_fallback``, normalisation in
``_normalize``). Adding a new metric meant editing every chain.
This module replaces all of that with a single ``METRIC_SPECS`` dict keyed
by metric name. Each :class:`MetricSpec` declares:
* ``read_psutil(stream)`` — the desktop path that uses ``stream._psutil``;
* ``read_fallback(stream)`` — the Android / no-psutil path (returns 0.0
for desktop-only sensors);
* ``normalize(stream, raw)`` — maps the raw reading to ``[0, 1]``;
* ``prime(stream)`` — optional one-time setup called from ``start()``.
The spec functions operate on the stream's existing attributes
(``_disk_path``, ``_sensor_label``, ``_min_val``, ``_max_val``,
``_max_rate``, ``_gpu_unavailable``, ``_prev_net_bytes``,
``_prev_net_time``). That is intentional: the readers are stateless
strategy callables, but the *stream's* state remains its own. Mutating it
from a reader is documented per function so the contract is explicit.
"""
from __future__ import annotations
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable, Dict, Optional
from ledgrab.utils import get_logger
if TYPE_CHECKING:
from ledgrab.core.processing.value_stream import SystemMetricsValueStream
logger = get_logger(__name__)
# ---------------------------------------------------------------------------
# Spec dataclass
# ---------------------------------------------------------------------------
ReaderFn = Callable[["SystemMetricsValueStream"], float]
NormalizeFn = Callable[["SystemMetricsValueStream", float], float]
PrimeFn = Callable[["SystemMetricsValueStream"], None]
@dataclass(frozen=True)
class MetricSpec:
"""How to read, normalise, and prime a single system metric."""
name: str
read_psutil: ReaderFn
read_fallback: ReaderFn
normalize: NormalizeFn
prime: Optional[PrimeFn] = None
# ---------------------------------------------------------------------------
# Normaliser primitives — shared across metrics
# ---------------------------------------------------------------------------
def _norm_percent(_s, raw: float) -> float:
"""Percent metrics (cpu_load, ram_usage, …) — raw is 0..100."""
return max(0.0, min(1.0, raw / 100.0))
def _norm_range(s, raw: float) -> float:
"""Temperature / fan-speed metrics normalise against (min_val, max_val)."""
rng = s._max_val - s._min_val
if abs(rng) < 1e-9:
return 0.5
return max(0.0, min(1.0, (raw - s._min_val) / rng))
def _norm_rate(s, raw: float) -> float:
"""Network rate normalises against ``_max_rate`` (bytes/s)."""
if s._max_rate <= 0:
return 0.5
return max(0.0, min(1.0, raw / s._max_rate))
def _zero(_s) -> float:
"""Desktop-only sensor on a no-psutil platform: report 0.0."""
return 0.0
# ---------------------------------------------------------------------------
# Read helpers — psutil paths
# ---------------------------------------------------------------------------
def _read_cpu_load_psutil(s) -> float:
return s._psutil.cpu_percent(interval=None)
def _read_ram_usage_psutil(s) -> float:
return s._psutil.virtual_memory().percent
def _read_disk_usage_psutil(s) -> float:
return s._psutil.disk_usage(s._disk_path).percent
def _read_battery_psutil(s) -> float:
bat = s._psutil.sensors_battery()
return bat.percent if bat else 0.0
def _read_cpu_temp_psutil(s) -> float:
psutil = s._psutil
if psutil is None:
return 0.0
temps = psutil.sensors_temperatures()
if not temps:
return 0.0
if s._sensor_label:
for group_name, entries in temps.items():
for entry in entries:
if entry.label == s._sensor_label or group_name == s._sensor_label:
return entry.current
for entries in temps.values():
if entries:
return entries[0].current
return 0.0
def _read_fan_speed_psutil(s) -> float:
psutil = s._psutil
if psutil is None:
return 0.0
fans = psutil.sensors_fans()
if not fans:
return 0.0
if s._sensor_label:
for group_name, entries in fans.items():
for entry in entries:
if entry.label == s._sensor_label or group_name == s._sensor_label:
return entry.current
for entries in fans.values():
if entries:
return entries[0].current
return 0.0
def _read_gpu(metric: str) -> ReaderFn:
"""Build a GPU reader for the given metric name ('gpu_load' or 'gpu_temp')."""
def _read(s) -> float:
if s._gpu_unavailable:
return 0.0
try:
from ledgrab.utils.gpu import nvml, nvml_available, nvml_handle
if not nvml_available or nvml_handle is None:
s._gpu_unavailable = True
return 0.0
if metric == "gpu_load":
util = nvml.nvmlDeviceGetUtilizationRates(nvml_handle)
return float(util.gpu)
# gpu_temp
return float(nvml.nvmlDeviceGetTemperature(nvml_handle, 0))
except Exception as e:
logger.debug("GPU metric read error: %s", e)
s._gpu_unavailable = True
return 0.0
return _read
def _read_network_rate(s) -> float:
"""Bytes/s rate for ``network_rx`` / ``network_tx``.
Mutates ``s._prev_net_bytes`` and ``s._prev_net_time`` to track the
delta between calls — the stream owns the cadence state, this reader
just bumps it forward.
"""
psutil = s._psutil
if psutil is None:
return 0.0
counters = psutil.net_io_counters()
if not counters:
return 0.0
current_bytes = counters.bytes_recv if s._metric == "network_rx" else counters.bytes_sent
now = time.monotonic()
if s._prev_net_bytes is None or s._prev_net_time is None:
s._prev_net_bytes = current_bytes
s._prev_net_time = now
return 0.0
dt = now - s._prev_net_time
if dt <= 0:
return 0.0
# Cap delta time to avoid spikes after long gaps
dt = min(dt, s._poll_interval * 2)
rate = (current_bytes - s._prev_net_bytes) / dt
s._prev_net_bytes = current_bytes
s._prev_net_time = now
return max(0.0, rate)
# ---------------------------------------------------------------------------
# Read helpers — fallback (no-psutil) paths
# ---------------------------------------------------------------------------
def _read_cpu_load_fallback(_s) -> float:
from ledgrab.utils.metrics import get_metrics_provider
return get_metrics_provider().cpu_percent()
def _read_ram_usage_fallback(_s) -> float:
from ledgrab.utils.metrics import get_metrics_provider
mem = get_metrics_provider().virtual_memory()
if mem.total_bytes > 0:
return (mem.used_bytes / mem.total_bytes) * 100.0
return 0.0
# ---------------------------------------------------------------------------
# Prime helpers — one-time setup from start()
# ---------------------------------------------------------------------------
def _prime_cpu_load(s) -> None:
# Prime psutil.cpu_percent so the first real call returns meaningful data
if s._psutil is not None:
s._psutil.cpu_percent(interval=None)
def _prime_network(s) -> None:
"""Capture initial network counter so the first delta has a baseline."""
if s._psutil is None:
return
counters = s._psutil.net_io_counters()
if counters:
s._prev_net_bytes = (
counters.bytes_recv if s._metric == "network_rx" else counters.bytes_sent
)
s._prev_net_time = time.monotonic()
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
METRIC_SPECS: Dict[str, MetricSpec] = {
"cpu_load": MetricSpec(
"cpu_load", _read_cpu_load_psutil, _read_cpu_load_fallback, _norm_percent, _prime_cpu_load
),
"ram_usage": MetricSpec(
"ram_usage", _read_ram_usage_psutil, _read_ram_usage_fallback, _norm_percent
),
"disk_usage": MetricSpec("disk_usage", _read_disk_usage_psutil, _zero, _norm_percent),
"battery_level": MetricSpec("battery_level", _read_battery_psutil, _zero, _norm_percent),
"cpu_temp": MetricSpec("cpu_temp", _read_cpu_temp_psutil, _zero, _norm_range),
"fan_speed": MetricSpec("fan_speed", _read_fan_speed_psutil, _zero, _norm_range),
"gpu_load": MetricSpec("gpu_load", _read_gpu("gpu_load"), _zero, _norm_percent),
"gpu_temp": MetricSpec("gpu_temp", _read_gpu("gpu_temp"), _zero, _norm_range),
"network_rx": MetricSpec("network_rx", _read_network_rate, _zero, _norm_rate, _prime_network),
"network_tx": MetricSpec("network_tx", _read_network_rate, _zero, _norm_rate, _prime_network),
}
def get_spec(metric: str) -> Optional[MetricSpec]:
"""Look up the spec for ``metric``, returning ``None`` for unknown names."""
return METRIC_SPECS.get(metric)
@@ -31,6 +31,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
import numpy as np import numpy as np
from ledgrab.core.processing import metric_readers as _metric_readers
from ledgrab.storage.base_store import EntityNotFoundError from ledgrab.storage.base_store import EntityNotFoundError
from ledgrab.utils import get_logger from ledgrab.utils import get_logger
@@ -1526,19 +1527,11 @@ class SystemMetricsValueStream(ValueStream):
self._psutil = None self._psutil = None
def start(self) -> None: def start(self) -> None:
if self._psutil is None: # Per-metric priming (e.g. seed cpu_percent or capture an initial
return # network counter) lives on the MetricSpec, keyed by ``self._metric``.
# Prime cpu_percent so the first real call returns meaningful data spec = _metric_readers.get_spec(self._metric)
if self._metric == "cpu_load": if spec is not None and spec.prime is not None:
self._psutil.cpu_percent(interval=None) spec.prime(self)
# Prime network counters
if self._metric in ("network_rx", "network_tx"):
counters = self._psutil.net_io_counters()
if counters:
self._prev_net_bytes = (
counters.bytes_recv if self._metric == "network_rx" else counters.bytes_sent
)
self._prev_net_time = time.monotonic()
def stop(self) -> None: def stop(self) -> None:
self._prev_value = None self._prev_value = None
@@ -1570,154 +1563,29 @@ class SystemMetricsValueStream(ValueStream):
return self._raw_value return self._raw_value
def _normalize(self, raw: float) -> float: def _normalize(self, raw: float) -> float:
"""Normalize raw value to [0, 1].""" """Normalize raw value to [0, 1] via the metric's registered normaliser."""
if self._metric in ("cpu_load", "ram_usage", "gpu_load", "battery_level", "disk_usage"): spec = _metric_readers.get_spec(self._metric)
return max(0.0, min(1.0, raw / 100.0)) if spec is None:
elif self._metric in ("cpu_temp", "gpu_temp", "fan_speed"): return 0.0
rng = self._max_val - self._min_val return spec.normalize(self, raw)
if abs(rng) < 1e-9:
return 0.5
return max(0.0, min(1.0, (raw - self._min_val) / rng))
elif self._metric in ("network_rx", "network_tx"):
if self._max_rate <= 0:
return 0.5
return max(0.0, min(1.0, raw / self._max_rate))
return 0.0
def _read_metric(self) -> float: def _read_metric(self) -> float:
"""Read the raw metric value from the system. """Read the raw metric value via the registered reader.
When psutil is unavailable (Android), falls back to the When psutil is unavailable (Android), the spec's ``read_fallback``
platform-aware MetricsProvider for cpu/memory and returns 0.0 path is used — desktop-only sensors return 0.0 there. Read errors
for desktop-only metrics. are swallowed and the last cached raw value is returned.
""" """
spec = _metric_readers.get_spec(self._metric)
if spec is None:
return 0.0
reader = spec.read_psutil if self._psutil is not None else spec.read_fallback
try: try:
if self._psutil is not None: return reader(self)
return self._read_metric_psutil()
return self._read_metric_fallback()
except Exception as e: except Exception as e:
logger.debug("SystemMetricsValueStream read error (%s): %s", self._metric, e) logger.debug("SystemMetricsValueStream read error (%s): %s", self._metric, e)
return self._raw_value if self._raw_value is not None else 0.0 return self._raw_value if self._raw_value is not None else 0.0
def _read_metric_psutil(self) -> float:
"""Read metrics via psutil (desktop path)."""
psutil = self._psutil
if self._metric == "cpu_load":
return psutil.cpu_percent(interval=None)
elif self._metric == "ram_usage":
return psutil.virtual_memory().percent
elif self._metric == "disk_usage":
return psutil.disk_usage(self._disk_path).percent
elif self._metric == "battery_level":
bat = psutil.sensors_battery()
return bat.percent if bat else 0.0
elif self._metric == "cpu_temp":
return self._read_cpu_temp()
elif self._metric == "fan_speed":
return self._read_fan_speed()
elif self._metric in ("gpu_load", "gpu_temp"):
return self._read_gpu_metric()
elif self._metric in ("network_rx", "network_tx"):
return self._read_network_rate()
return 0.0
def _read_metric_fallback(self) -> float:
"""Read metrics without psutil (Android / fallback path).
Uses the MetricsProvider abstraction for cpu/memory. Sensors,
battery, network, disk, and GPU are not available.
"""
from ledgrab.utils.metrics import get_metrics_provider
provider = get_metrics_provider()
if self._metric == "cpu_load":
return provider.cpu_percent()
elif self._metric == "ram_usage":
mem = provider.virtual_memory()
if mem.total_bytes > 0:
return (mem.used_bytes / mem.total_bytes) * 100.0
return 0.0
return 0.0
def _read_cpu_temp(self) -> float:
psutil = self._psutil
if psutil is None:
return 0.0
temps = psutil.sensors_temperatures()
if not temps:
return 0.0
# If sensor_label specified, try to find it
if self._sensor_label:
for group_name, entries in temps.items():
for entry in entries:
if entry.label == self._sensor_label or group_name == self._sensor_label:
return entry.current
# Fallback: first available sensor
for entries in temps.values():
if entries:
return entries[0].current
return 0.0
def _read_fan_speed(self) -> float:
psutil = self._psutil
if psutil is None:
return 0.0
fans = psutil.sensors_fans()
if not fans:
return 0.0
if self._sensor_label:
for group_name, entries in fans.items():
for entry in entries:
if entry.label == self._sensor_label or group_name == self._sensor_label:
return entry.current
# Fallback: first available fan
for entries in fans.values():
if entries:
return entries[0].current
return 0.0
def _read_gpu_metric(self) -> float:
if self._gpu_unavailable:
return 0.0
try:
from ledgrab.utils.gpu import nvml, nvml_available, nvml_handle
if not nvml_available or nvml_handle is None:
self._gpu_unavailable = True
return 0.0
if self._metric == "gpu_load":
util = nvml.nvmlDeviceGetUtilizationRates(nvml_handle)
return float(util.gpu)
else: # gpu_temp
return float(nvml.nvmlDeviceGetTemperature(nvml_handle, 0))
except Exception as e:
logger.debug("GPU metric read error: %s", e)
self._gpu_unavailable = True
return 0.0
def _read_network_rate(self) -> float:
psutil = self._psutil
if psutil is None:
return 0.0
counters = psutil.net_io_counters()
if not counters:
return 0.0
current_bytes = counters.bytes_recv if self._metric == "network_rx" else counters.bytes_sent
now = time.monotonic()
if self._prev_net_bytes is None or self._prev_net_time is None:
self._prev_net_bytes = current_bytes
self._prev_net_time = now
return 0.0
dt = now - self._prev_net_time
if dt <= 0:
return 0.0
# Cap delta time to avoid spikes after long gaps
dt = min(dt, self._poll_interval * 2)
rate = (current_bytes - self._prev_net_bytes) / dt
self._prev_net_bytes = current_bytes
self._prev_net_time = now
return max(0.0, rate)
def update_source(self, source: "ValueSource") -> None: def update_source(self, source: "ValueSource") -> None:
from ledgrab.storage.value_source import SystemMetricsValueSource from ledgrab.storage.value_source import SystemMetricsValueSource
@@ -0,0 +1,152 @@
"""Tests for the SystemMetricsValueStream metric reader registry.
Locks in the spec-driven dispatch that replaced the three independent
if/elif chains (priming, raw read, normalise) inside
``SystemMetricsValueStream``.
"""
from __future__ import annotations
from types import SimpleNamespace
import pytest
from ledgrab.core.processing.metric_readers import (
METRIC_SPECS,
MetricSpec,
_norm_percent,
_norm_range,
_norm_rate,
_zero,
get_spec,
)
EXPECTED_METRICS = {
"cpu_load",
"ram_usage",
"disk_usage",
"battery_level",
"cpu_temp",
"fan_speed",
"gpu_load",
"gpu_temp",
"network_rx",
"network_tx",
}
# ---------------------------------------------------------------------------
# Registry shape
# ---------------------------------------------------------------------------
def test_registry_covers_known_metrics():
assert set(METRIC_SPECS.keys()) == EXPECTED_METRICS
def test_every_spec_has_callable_readers_and_normalize():
for name, spec in METRIC_SPECS.items():
assert isinstance(spec, MetricSpec), f"{name} is not a MetricSpec"
assert callable(spec.read_psutil), f"{name}.read_psutil not callable"
assert callable(spec.read_fallback), f"{name}.read_fallback not callable"
assert callable(spec.normalize), f"{name}.normalize not callable"
# prime is Optional[PrimeFn]
if spec.prime is not None:
assert callable(spec.prime), f"{name}.prime present but not callable"
def test_get_spec_returns_none_for_unknown_metric():
assert get_spec("totally_unregistered") is None
# ---------------------------------------------------------------------------
# Normaliser primitives
# ---------------------------------------------------------------------------
def test_norm_percent_clamps_to_unit_interval():
s = SimpleNamespace() # normalise primitives ignore the stream
assert _norm_percent(s, 50.0) == 0.5
assert _norm_percent(s, 0.0) == 0.0
assert _norm_percent(s, 100.0) == 1.0
# Clamps
assert _norm_percent(s, -10.0) == 0.0
assert _norm_percent(s, 200.0) == 1.0
def test_norm_range_handles_min_max_window():
s = SimpleNamespace(_min_val=20.0, _max_val=70.0)
assert _norm_range(s, 20.0) == 0.0
assert _norm_range(s, 70.0) == 1.0
assert _norm_range(s, 45.0) == 0.5
# Clamps
assert _norm_range(s, 10.0) == 0.0
assert _norm_range(s, 100.0) == 1.0
def test_norm_range_returns_half_on_zero_window():
"""Avoid division by zero when min==max — pick the middle of [0,1]."""
s = SimpleNamespace(_min_val=50.0, _max_val=50.0)
assert _norm_range(s, 50.0) == 0.5
assert _norm_range(s, 999.0) == 0.5
def test_norm_rate_uses_max_rate():
s = SimpleNamespace(_max_rate=100.0)
assert _norm_rate(s, 0.0) == 0.0
assert _norm_rate(s, 50.0) == 0.5
assert _norm_rate(s, 100.0) == 1.0
assert _norm_rate(s, 200.0) == 1.0
def test_norm_rate_returns_half_on_zero_max():
s = SimpleNamespace(_max_rate=0.0)
assert _norm_rate(s, 12345.0) == 0.5
def test_zero_reader_always_returns_zero():
assert _zero(None) == 0.0
assert _zero(SimpleNamespace()) == 0.0
# ---------------------------------------------------------------------------
# Spec wiring sanity
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
"name, expected_normalize",
[
("cpu_load", _norm_percent),
("ram_usage", _norm_percent),
("disk_usage", _norm_percent),
("battery_level", _norm_percent),
("gpu_load", _norm_percent),
("cpu_temp", _norm_range),
("gpu_temp", _norm_range),
("fan_speed", _norm_range),
("network_rx", _norm_rate),
("network_tx", _norm_rate),
],
)
def test_metric_uses_expected_normaliser(name, expected_normalize):
assert METRIC_SPECS[name].normalize is expected_normalize
def test_metrics_without_psutil_use_zero_fallback_for_desktop_only():
"""Desktop-only sensors fall back to a constant 0.0 when psutil is absent."""
for name in ("disk_usage", "battery_level", "cpu_temp", "fan_speed", "gpu_load", "gpu_temp"):
assert METRIC_SPECS[name].read_fallback is _zero, name
def test_cpu_and_ram_have_meaningful_fallbacks():
"""cpu_load and ram_usage go through the platform-aware MetricsProvider."""
for name in ("cpu_load", "ram_usage"):
assert METRIC_SPECS[name].read_fallback is not _zero, name
def test_only_cpu_load_and_network_have_prime_hooks():
"""Priming is only meaningful for metrics that need an initial baseline."""
primed = {name for name, spec in METRIC_SPECS.items() if spec.prime is not None}
assert primed == {"cpu_load", "network_rx", "network_tx"}