refactor(value-source): MetricSpec registry for SystemMetricsValueStream

SystemMetricsValueStream used to dispatch on its ``self._metric`` string
across three independent if/elif chains (audit finding M5):

  * priming in ``start()`` (cpu_percent seed, initial network counter);
  * raw reading in ``_read_metric_psutil`` plus ``_read_metric_fallback``;
  * normalisation in ``_normalize`` (percent / min-max range / max-rate).

Adding a new metric meant editing all three chains plus the Android
fallback — and forgetting one branch made the metric silently return 0.

Lift each per-metric concern into a free function and register them as a
``MetricSpec(name, read_psutil, read_fallback, normalize, prime)`` in a
new ``core.processing.metric_readers`` module. Shared normalisers
(``_norm_percent`` / ``_norm_range`` / ``_norm_rate`` / ``_zero``) live
once. The stream's ``start()`` / ``_read_metric()`` / ``_normalize()``
collapse to a single registry lookup + delegation.

The stream still owns its mutable state (``_disk_path``,
``_sensor_label``, ``_gpu_unavailable``, ``_prev_net_bytes``,
``_prev_net_time``, etc.) — readers operate on the stream by
parameter, not by inheritance, so the kitchen-sink class shrinks by
~140 lines without losing the per-stream cadence bookkeeping. Each
spec function's docstring documents which fields it reads or mutates.

Tests: 16 new tests cover the 10-metric coverage set, callable shape
of every spec field, the three normaliser primitives' clamping +
divide-by-zero behaviour, prime-hook presence (only the three metrics
that need a baseline: cpu_load + network_rx + network_tx), and
fallback-path expectations (desktop-only sensors -> _zero, cpu/ram ->
real MetricsProvider).

754 existing core / storage / api tests stay green; ruff clean.
This commit is contained in:
2026-05-22 23:29:33 +03:00
parent 98fb61d932
commit 9f3f346543
3 changed files with 443 additions and 152 deletions
@@ -0,0 +1,271 @@
"""Per-metric reader / normaliser registry for ``SystemMetricsValueStream``.
The stream previously dispatched on a ``self._metric`` string across three
separate ``if / elif`` chains (priming in ``start()``, raw read in
``_read_metric_psutil`` + ``_read_metric_fallback``, normalisation in
``_normalize``). Adding a new metric meant editing every chain.
This module replaces all of that with a single ``METRIC_SPECS`` dict keyed
by metric name. Each :class:`MetricSpec` declares:
* ``read_psutil(stream)`` — the desktop path that uses ``stream._psutil``;
* ``read_fallback(stream)`` — the Android / no-psutil path (returns 0.0
for desktop-only sensors);
* ``normalize(stream, raw)`` — maps the raw reading to ``[0, 1]``;
* ``prime(stream)`` — optional one-time setup called from ``start()``.
The spec functions operate on the stream's existing attributes
(``_disk_path``, ``_sensor_label``, ``_min_val``, ``_max_val``,
``_max_rate``, ``_gpu_unavailable``, ``_prev_net_bytes``,
``_prev_net_time``). That is intentional: the readers are stateless
strategy callables, but the *stream's* state remains its own. Mutating it
from a reader is documented per function so the contract is explicit.
"""
from __future__ import annotations
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable, Dict, Optional
from ledgrab.utils import get_logger
if TYPE_CHECKING:
from ledgrab.core.processing.value_stream import SystemMetricsValueStream
logger = get_logger(__name__)
# ---------------------------------------------------------------------------
# Spec dataclass
# ---------------------------------------------------------------------------
ReaderFn = Callable[["SystemMetricsValueStream"], float]
NormalizeFn = Callable[["SystemMetricsValueStream", float], float]
PrimeFn = Callable[["SystemMetricsValueStream"], None]
@dataclass(frozen=True)
class MetricSpec:
"""How to read, normalise, and prime a single system metric."""
name: str
read_psutil: ReaderFn
read_fallback: ReaderFn
normalize: NormalizeFn
prime: Optional[PrimeFn] = None
# ---------------------------------------------------------------------------
# Normaliser primitives — shared across metrics
# ---------------------------------------------------------------------------
def _norm_percent(_s, raw: float) -> float:
"""Percent metrics (cpu_load, ram_usage, …) — raw is 0..100."""
return max(0.0, min(1.0, raw / 100.0))
def _norm_range(s, raw: float) -> float:
"""Temperature / fan-speed metrics normalise against (min_val, max_val)."""
rng = s._max_val - s._min_val
if abs(rng) < 1e-9:
return 0.5
return max(0.0, min(1.0, (raw - s._min_val) / rng))
def _norm_rate(s, raw: float) -> float:
"""Network rate normalises against ``_max_rate`` (bytes/s)."""
if s._max_rate <= 0:
return 0.5
return max(0.0, min(1.0, raw / s._max_rate))
def _zero(_s) -> float:
"""Desktop-only sensor on a no-psutil platform: report 0.0."""
return 0.0
# ---------------------------------------------------------------------------
# Read helpers — psutil paths
# ---------------------------------------------------------------------------
def _read_cpu_load_psutil(s) -> float:
return s._psutil.cpu_percent(interval=None)
def _read_ram_usage_psutil(s) -> float:
return s._psutil.virtual_memory().percent
def _read_disk_usage_psutil(s) -> float:
return s._psutil.disk_usage(s._disk_path).percent
def _read_battery_psutil(s) -> float:
bat = s._psutil.sensors_battery()
return bat.percent if bat else 0.0
def _read_cpu_temp_psutil(s) -> float:
psutil = s._psutil
if psutil is None:
return 0.0
temps = psutil.sensors_temperatures()
if not temps:
return 0.0
if s._sensor_label:
for group_name, entries in temps.items():
for entry in entries:
if entry.label == s._sensor_label or group_name == s._sensor_label:
return entry.current
for entries in temps.values():
if entries:
return entries[0].current
return 0.0
def _read_fan_speed_psutil(s) -> float:
psutil = s._psutil
if psutil is None:
return 0.0
fans = psutil.sensors_fans()
if not fans:
return 0.0
if s._sensor_label:
for group_name, entries in fans.items():
for entry in entries:
if entry.label == s._sensor_label or group_name == s._sensor_label:
return entry.current
for entries in fans.values():
if entries:
return entries[0].current
return 0.0
def _read_gpu(metric: str) -> ReaderFn:
"""Build a GPU reader for the given metric name ('gpu_load' or 'gpu_temp')."""
def _read(s) -> float:
if s._gpu_unavailable:
return 0.0
try:
from ledgrab.utils.gpu import nvml, nvml_available, nvml_handle
if not nvml_available or nvml_handle is None:
s._gpu_unavailable = True
return 0.0
if metric == "gpu_load":
util = nvml.nvmlDeviceGetUtilizationRates(nvml_handle)
return float(util.gpu)
# gpu_temp
return float(nvml.nvmlDeviceGetTemperature(nvml_handle, 0))
except Exception as e:
logger.debug("GPU metric read error: %s", e)
s._gpu_unavailable = True
return 0.0
return _read
def _read_network_rate(s) -> float:
"""Bytes/s rate for ``network_rx`` / ``network_tx``.
Mutates ``s._prev_net_bytes`` and ``s._prev_net_time`` to track the
delta between calls — the stream owns the cadence state, this reader
just bumps it forward.
"""
psutil = s._psutil
if psutil is None:
return 0.0
counters = psutil.net_io_counters()
if not counters:
return 0.0
current_bytes = counters.bytes_recv if s._metric == "network_rx" else counters.bytes_sent
now = time.monotonic()
if s._prev_net_bytes is None or s._prev_net_time is None:
s._prev_net_bytes = current_bytes
s._prev_net_time = now
return 0.0
dt = now - s._prev_net_time
if dt <= 0:
return 0.0
# Cap delta time to avoid spikes after long gaps
dt = min(dt, s._poll_interval * 2)
rate = (current_bytes - s._prev_net_bytes) / dt
s._prev_net_bytes = current_bytes
s._prev_net_time = now
return max(0.0, rate)
# ---------------------------------------------------------------------------
# Read helpers — fallback (no-psutil) paths
# ---------------------------------------------------------------------------
def _read_cpu_load_fallback(_s) -> float:
from ledgrab.utils.metrics import get_metrics_provider
return get_metrics_provider().cpu_percent()
def _read_ram_usage_fallback(_s) -> float:
from ledgrab.utils.metrics import get_metrics_provider
mem = get_metrics_provider().virtual_memory()
if mem.total_bytes > 0:
return (mem.used_bytes / mem.total_bytes) * 100.0
return 0.0
# ---------------------------------------------------------------------------
# Prime helpers — one-time setup from start()
# ---------------------------------------------------------------------------
def _prime_cpu_load(s) -> None:
# Prime psutil.cpu_percent so the first real call returns meaningful data
if s._psutil is not None:
s._psutil.cpu_percent(interval=None)
def _prime_network(s) -> None:
"""Capture initial network counter so the first delta has a baseline."""
if s._psutil is None:
return
counters = s._psutil.net_io_counters()
if counters:
s._prev_net_bytes = (
counters.bytes_recv if s._metric == "network_rx" else counters.bytes_sent
)
s._prev_net_time = time.monotonic()
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
METRIC_SPECS: Dict[str, MetricSpec] = {
"cpu_load": MetricSpec(
"cpu_load", _read_cpu_load_psutil, _read_cpu_load_fallback, _norm_percent, _prime_cpu_load
),
"ram_usage": MetricSpec(
"ram_usage", _read_ram_usage_psutil, _read_ram_usage_fallback, _norm_percent
),
"disk_usage": MetricSpec("disk_usage", _read_disk_usage_psutil, _zero, _norm_percent),
"battery_level": MetricSpec("battery_level", _read_battery_psutil, _zero, _norm_percent),
"cpu_temp": MetricSpec("cpu_temp", _read_cpu_temp_psutil, _zero, _norm_range),
"fan_speed": MetricSpec("fan_speed", _read_fan_speed_psutil, _zero, _norm_range),
"gpu_load": MetricSpec("gpu_load", _read_gpu("gpu_load"), _zero, _norm_percent),
"gpu_temp": MetricSpec("gpu_temp", _read_gpu("gpu_temp"), _zero, _norm_range),
"network_rx": MetricSpec("network_rx", _read_network_rate, _zero, _norm_rate, _prime_network),
"network_tx": MetricSpec("network_tx", _read_network_rate, _zero, _norm_rate, _prime_network),
}
def get_spec(metric: str) -> Optional[MetricSpec]:
"""Look up the spec for ``metric``, returning ``None`` for unknown names."""
return METRIC_SPECS.get(metric)
@@ -31,6 +31,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
import numpy as np
from ledgrab.core.processing import metric_readers as _metric_readers
from ledgrab.storage.base_store import EntityNotFoundError
from ledgrab.utils import get_logger
@@ -1526,19 +1527,11 @@ class SystemMetricsValueStream(ValueStream):
self._psutil = None
def start(self) -> None:
if self._psutil is None:
return
# Prime cpu_percent so the first real call returns meaningful data
if self._metric == "cpu_load":
self._psutil.cpu_percent(interval=None)
# Prime network counters
if self._metric in ("network_rx", "network_tx"):
counters = self._psutil.net_io_counters()
if counters:
self._prev_net_bytes = (
counters.bytes_recv if self._metric == "network_rx" else counters.bytes_sent
)
self._prev_net_time = time.monotonic()
# Per-metric priming (e.g. seed cpu_percent or capture an initial
# network counter) lives on the MetricSpec, keyed by ``self._metric``.
spec = _metric_readers.get_spec(self._metric)
if spec is not None and spec.prime is not None:
spec.prime(self)
def stop(self) -> None:
self._prev_value = None
@@ -1570,154 +1563,29 @@ class SystemMetricsValueStream(ValueStream):
return self._raw_value
def _normalize(self, raw: float) -> float:
"""Normalize raw value to [0, 1]."""
if self._metric in ("cpu_load", "ram_usage", "gpu_load", "battery_level", "disk_usage"):
return max(0.0, min(1.0, raw / 100.0))
elif self._metric in ("cpu_temp", "gpu_temp", "fan_speed"):
rng = self._max_val - self._min_val
if abs(rng) < 1e-9:
return 0.5
return max(0.0, min(1.0, (raw - self._min_val) / rng))
elif self._metric in ("network_rx", "network_tx"):
if self._max_rate <= 0:
return 0.5
return max(0.0, min(1.0, raw / self._max_rate))
return 0.0
"""Normalize raw value to [0, 1] via the metric's registered normaliser."""
spec = _metric_readers.get_spec(self._metric)
if spec is None:
return 0.0
return spec.normalize(self, raw)
def _read_metric(self) -> float:
"""Read the raw metric value from the system.
"""Read the raw metric value via the registered reader.
When psutil is unavailable (Android), falls back to the
platform-aware MetricsProvider for cpu/memory and returns 0.0
for desktop-only metrics.
When psutil is unavailable (Android), the spec's ``read_fallback``
path is used — desktop-only sensors return 0.0 there. Read errors
are swallowed and the last cached raw value is returned.
"""
spec = _metric_readers.get_spec(self._metric)
if spec is None:
return 0.0
reader = spec.read_psutil if self._psutil is not None else spec.read_fallback
try:
if self._psutil is not None:
return self._read_metric_psutil()
return self._read_metric_fallback()
return reader(self)
except Exception as e:
logger.debug("SystemMetricsValueStream read error (%s): %s", self._metric, e)
return self._raw_value if self._raw_value is not None else 0.0
def _read_metric_psutil(self) -> float:
"""Read metrics via psutil (desktop path)."""
psutil = self._psutil
if self._metric == "cpu_load":
return psutil.cpu_percent(interval=None)
elif self._metric == "ram_usage":
return psutil.virtual_memory().percent
elif self._metric == "disk_usage":
return psutil.disk_usage(self._disk_path).percent
elif self._metric == "battery_level":
bat = psutil.sensors_battery()
return bat.percent if bat else 0.0
elif self._metric == "cpu_temp":
return self._read_cpu_temp()
elif self._metric == "fan_speed":
return self._read_fan_speed()
elif self._metric in ("gpu_load", "gpu_temp"):
return self._read_gpu_metric()
elif self._metric in ("network_rx", "network_tx"):
return self._read_network_rate()
return 0.0
def _read_metric_fallback(self) -> float:
"""Read metrics without psutil (Android / fallback path).
Uses the MetricsProvider abstraction for cpu/memory. Sensors,
battery, network, disk, and GPU are not available.
"""
from ledgrab.utils.metrics import get_metrics_provider
provider = get_metrics_provider()
if self._metric == "cpu_load":
return provider.cpu_percent()
elif self._metric == "ram_usage":
mem = provider.virtual_memory()
if mem.total_bytes > 0:
return (mem.used_bytes / mem.total_bytes) * 100.0
return 0.0
return 0.0
def _read_cpu_temp(self) -> float:
psutil = self._psutil
if psutil is None:
return 0.0
temps = psutil.sensors_temperatures()
if not temps:
return 0.0
# If sensor_label specified, try to find it
if self._sensor_label:
for group_name, entries in temps.items():
for entry in entries:
if entry.label == self._sensor_label or group_name == self._sensor_label:
return entry.current
# Fallback: first available sensor
for entries in temps.values():
if entries:
return entries[0].current
return 0.0
def _read_fan_speed(self) -> float:
psutil = self._psutil
if psutil is None:
return 0.0
fans = psutil.sensors_fans()
if not fans:
return 0.0
if self._sensor_label:
for group_name, entries in fans.items():
for entry in entries:
if entry.label == self._sensor_label or group_name == self._sensor_label:
return entry.current
# Fallback: first available fan
for entries in fans.values():
if entries:
return entries[0].current
return 0.0
def _read_gpu_metric(self) -> float:
if self._gpu_unavailable:
return 0.0
try:
from ledgrab.utils.gpu import nvml, nvml_available, nvml_handle
if not nvml_available or nvml_handle is None:
self._gpu_unavailable = True
return 0.0
if self._metric == "gpu_load":
util = nvml.nvmlDeviceGetUtilizationRates(nvml_handle)
return float(util.gpu)
else: # gpu_temp
return float(nvml.nvmlDeviceGetTemperature(nvml_handle, 0))
except Exception as e:
logger.debug("GPU metric read error: %s", e)
self._gpu_unavailable = True
return 0.0
def _read_network_rate(self) -> float:
psutil = self._psutil
if psutil is None:
return 0.0
counters = psutil.net_io_counters()
if not counters:
return 0.0
current_bytes = counters.bytes_recv if self._metric == "network_rx" else counters.bytes_sent
now = time.monotonic()
if self._prev_net_bytes is None or self._prev_net_time is None:
self._prev_net_bytes = current_bytes
self._prev_net_time = now
return 0.0
dt = now - self._prev_net_time
if dt <= 0:
return 0.0
# Cap delta time to avoid spikes after long gaps
dt = min(dt, self._poll_interval * 2)
rate = (current_bytes - self._prev_net_bytes) / dt
self._prev_net_bytes = current_bytes
self._prev_net_time = now
return max(0.0, rate)
def update_source(self, source: "ValueSource") -> None:
from ledgrab.storage.value_source import SystemMetricsValueSource
@@ -0,0 +1,152 @@
"""Tests for the SystemMetricsValueStream metric reader registry.
Locks in the spec-driven dispatch that replaced the three independent
if/elif chains (priming, raw read, normalise) inside
``SystemMetricsValueStream``.
"""
from __future__ import annotations
from types import SimpleNamespace
import pytest
from ledgrab.core.processing.metric_readers import (
METRIC_SPECS,
MetricSpec,
_norm_percent,
_norm_range,
_norm_rate,
_zero,
get_spec,
)
EXPECTED_METRICS = {
"cpu_load",
"ram_usage",
"disk_usage",
"battery_level",
"cpu_temp",
"fan_speed",
"gpu_load",
"gpu_temp",
"network_rx",
"network_tx",
}
# ---------------------------------------------------------------------------
# Registry shape
# ---------------------------------------------------------------------------
def test_registry_covers_known_metrics():
assert set(METRIC_SPECS.keys()) == EXPECTED_METRICS
def test_every_spec_has_callable_readers_and_normalize():
for name, spec in METRIC_SPECS.items():
assert isinstance(spec, MetricSpec), f"{name} is not a MetricSpec"
assert callable(spec.read_psutil), f"{name}.read_psutil not callable"
assert callable(spec.read_fallback), f"{name}.read_fallback not callable"
assert callable(spec.normalize), f"{name}.normalize not callable"
# prime is Optional[PrimeFn]
if spec.prime is not None:
assert callable(spec.prime), f"{name}.prime present but not callable"
def test_get_spec_returns_none_for_unknown_metric():
assert get_spec("totally_unregistered") is None
# ---------------------------------------------------------------------------
# Normaliser primitives
# ---------------------------------------------------------------------------
def test_norm_percent_clamps_to_unit_interval():
s = SimpleNamespace() # normalise primitives ignore the stream
assert _norm_percent(s, 50.0) == 0.5
assert _norm_percent(s, 0.0) == 0.0
assert _norm_percent(s, 100.0) == 1.0
# Clamps
assert _norm_percent(s, -10.0) == 0.0
assert _norm_percent(s, 200.0) == 1.0
def test_norm_range_handles_min_max_window():
s = SimpleNamespace(_min_val=20.0, _max_val=70.0)
assert _norm_range(s, 20.0) == 0.0
assert _norm_range(s, 70.0) == 1.0
assert _norm_range(s, 45.0) == 0.5
# Clamps
assert _norm_range(s, 10.0) == 0.0
assert _norm_range(s, 100.0) == 1.0
def test_norm_range_returns_half_on_zero_window():
"""Avoid division by zero when min==max — pick the middle of [0,1]."""
s = SimpleNamespace(_min_val=50.0, _max_val=50.0)
assert _norm_range(s, 50.0) == 0.5
assert _norm_range(s, 999.0) == 0.5
def test_norm_rate_uses_max_rate():
s = SimpleNamespace(_max_rate=100.0)
assert _norm_rate(s, 0.0) == 0.0
assert _norm_rate(s, 50.0) == 0.5
assert _norm_rate(s, 100.0) == 1.0
assert _norm_rate(s, 200.0) == 1.0
def test_norm_rate_returns_half_on_zero_max():
s = SimpleNamespace(_max_rate=0.0)
assert _norm_rate(s, 12345.0) == 0.5
def test_zero_reader_always_returns_zero():
assert _zero(None) == 0.0
assert _zero(SimpleNamespace()) == 0.0
# ---------------------------------------------------------------------------
# Spec wiring sanity
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
"name, expected_normalize",
[
("cpu_load", _norm_percent),
("ram_usage", _norm_percent),
("disk_usage", _norm_percent),
("battery_level", _norm_percent),
("gpu_load", _norm_percent),
("cpu_temp", _norm_range),
("gpu_temp", _norm_range),
("fan_speed", _norm_range),
("network_rx", _norm_rate),
("network_tx", _norm_rate),
],
)
def test_metric_uses_expected_normaliser(name, expected_normalize):
assert METRIC_SPECS[name].normalize is expected_normalize
def test_metrics_without_psutil_use_zero_fallback_for_desktop_only():
"""Desktop-only sensors fall back to a constant 0.0 when psutil is absent."""
for name in ("disk_usage", "battery_level", "cpu_temp", "fan_speed", "gpu_load", "gpu_temp"):
assert METRIC_SPECS[name].read_fallback is _zero, name
def test_cpu_and_ram_have_meaningful_fallbacks():
"""cpu_load and ram_usage go through the platform-aware MetricsProvider."""
for name in ("cpu_load", "ram_usage"):
assert METRIC_SPECS[name].read_fallback is not _zero, name
def test_only_cpu_load_and_network_have_prime_hooks():
"""Priming is only meaningful for metrics that need an initial baseline."""
primed = {name for name, spec in METRIC_SPECS.items() if spec.prime is not None}
assert primed == {"cpu_load", "network_rx", "network_tx"}