refactor(metrics): MetricsProvider abstraction with Android /proc backend
Moves direct psutil.* calls behind a MetricsProvider Protocol so the codebase no longer needs ad-hoc `if psutil is not None` guards at every call site. Each provider lives in its own module under utils/metrics/: PsutilMetricsProvider for desktop, NullMetricsProvider as a zeroed fallback, AndroidMetricsProvider that reads /proc/stat, /proc/meminfo, /proc/self/stat, and /proc/self/status directly (psutil isn't available under Chaquopy). The Android provider tracks the previous CPU sample so cpu_percent() returns delta-based percentages matching psutil's interval=None semantics, and degrades to zeros when any /proc file is unreadable instead of crashing the dashboard. Factory get_metrics_provider() in utils/metrics/__init__.py picks Android > psutil > Null. api/routes/system.py and core/processing/metrics_history.py now go through the factory; psutil import is confined to one place. 12 new unit tests cover paren-in-comm parsing of /proc/self/stat, delta CPU%, missing-file resilience, and factory selection order. Full suite: 727 passing.
This commit is contained in:
@@ -68,22 +68,15 @@ Drive USB LED controllers (APA102, WS2812) connected directly to the Android TV
|
||||
|
||||
## Performance Metrics Abstraction
|
||||
|
||||
The codebase has direct `psutil.*` calls scattered across `api/routes/system.py` and `core/processing/metrics_history.py`, with ad-hoc `if psutil is not None` guards sprinkled in to support Android. This couples Android platform handling to every call site.
|
||||
- [x] `MetricsProvider` protocol + dataclass DTOs (`MemorySnapshot`, `ProcessSnapshot`) live in `server/src/ledgrab/utils/metrics/types.py`. Each provider has its own module: `psutil_provider.py`, `null_provider.py`, `android_provider.py`.
|
||||
- [x] Factory `get_metrics_provider()` in `utils/metrics/__init__.py` selects Android → psutil → Null. `psutil` import is now confined to one place.
|
||||
- [x] `api/routes/system.py` and `core/processing/metrics_history.py` use the provider; no more `if psutil is not None` guards in the hot paths.
|
||||
- [x] Android `/proc`-backed provider implemented (`/proc/stat`, `/proc/meminfo`, `/proc/self/stat`, `/proc/self/status`). Carries previous-sample state for delta-based CPU%; degrades to zeros if any `/proc` file is locked down. 12 unit tests cover both desktop and Android paths.
|
||||
|
||||
- [ ] Refactor: introduce `MetricsProvider` protocol in `utils/metrics.py` with methods like `cpu_percent()`, `memory_info()`, `process_info()`
|
||||
- [ ] Implement `PsutilMetricsProvider` (desktop) and `NullMetricsProvider` (fallback when psutil missing)
|
||||
- [ ] Later: `AndroidMetricsProvider` reading from `/proc` (see section below)
|
||||
- [ ] Replace all direct `psutil.*` calls with the provider; only one factory location knows about psutil availability
|
||||
## Android Performance Metrics — Future Enhancements
|
||||
|
||||
## Android Performance Metrics
|
||||
Beyond the `/proc`-based AndroidMetricsProvider that's now in place:
|
||||
|
||||
Currently `psutil` (used for CPU/RAM monitoring in the web UI) is not available on Android via Chaquopy. Metrics calls are guarded with `if psutil is not None` so they return no data on Android.
|
||||
|
||||
- [ ] Implement Android-native metrics collection:
|
||||
- CPU usage via `/proc/self/stat` + `/proc/stat` parsing (no psutil needed)
|
||||
- RAM usage via `/proc/meminfo` or `ActivityManager.getMemoryInfo()` through Chaquopy bridge
|
||||
- App-specific memory via `Debug.getMemoryInfo()` (Kotlin → Python)
|
||||
- [ ] Create `AndroidMetricsProvider` in `server/src/ledgrab/utils/` that implements the same interface as the psutil-based provider
|
||||
- [ ] Wire into existing metrics endpoints (`/api/v1/system/metrics`) with platform detection
|
||||
- [ ] Optional: app-specific memory via `Debug.getMemoryInfo()` through a Kotlin → Python Chaquopy bridge (more accurate than `VmRSS` for split-app-process accounting)
|
||||
- [ ] Consider: device battery/temperature readings for TV boxes (some have thermal throttling)
|
||||
- [ ] Optional: GPU usage via `/sys/class/kgsl/kgsl-3d0/gpubusy` on Adreno, Mali-specific paths for Mali GPUs
|
||||
|
||||
@@ -12,11 +12,6 @@ from typing import Optional
|
||||
|
||||
import os
|
||||
|
||||
try:
|
||||
import psutil
|
||||
except ImportError:
|
||||
psutil = None # type: ignore[assignment]
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||
|
||||
from ledgrab import __version__, REPO_URL, DONATE_URL
|
||||
@@ -50,6 +45,7 @@ from ledgrab.api.schemas.system import (
|
||||
from ledgrab.config import get_config, is_demo_mode
|
||||
from ledgrab.core.capture.screen_capture import get_available_displays
|
||||
from ledgrab.utils import get_logger
|
||||
from ledgrab.utils.metrics import get_metrics_provider
|
||||
from ledgrab.storage.base_store import EntityNotFoundError
|
||||
|
||||
# Re-export load_external_url so existing callers still work
|
||||
@@ -57,14 +53,6 @@ from ledgrab.api.routes.system_settings import load_external_url # noqa: F401
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# Prime psutil CPU counters (first call always returns 0.0)
|
||||
if psutil is not None:
|
||||
psutil.cpu_percent(interval=None)
|
||||
_process = psutil.Process(os.getpid())
|
||||
_process.cpu_percent(interval=None) # prime process-level counter
|
||||
else:
|
||||
_process = None # type: ignore[assignment]
|
||||
|
||||
# GPU monitoring (initialized once in utils.gpu, shared with metrics_history)
|
||||
from ledgrab.utils.gpu import ( # noqa: E402
|
||||
nvml_available as _nvml_available,
|
||||
@@ -278,32 +266,14 @@ async def get_running_processes(_: AuthRequired):
|
||||
def get_system_performance(_: AuthRequired):
|
||||
"""Get current system performance metrics (CPU, RAM, GPU).
|
||||
|
||||
Uses sync ``def`` so FastAPI runs it in a thread pool — the psutil
|
||||
and NVML calls are blocking and would stall the event loop if run
|
||||
in an ``async def`` handler.
|
||||
Uses sync ``def`` so FastAPI runs it in a thread pool — the metrics
|
||||
provider and NVML calls are blocking and would stall the event loop
|
||||
if run in an ``async def`` handler.
|
||||
"""
|
||||
if psutil is None or _process is None:
|
||||
# psutil unavailable on this platform (e.g. Android)
|
||||
from datetime import datetime, timezone
|
||||
|
||||
return PerformanceResponse(
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
cpu_name=_cpu_name,
|
||||
cpu_percent=0.0,
|
||||
ram_used_mb=0.0,
|
||||
ram_total_mb=0.0,
|
||||
ram_percent=0.0,
|
||||
app_cpu_percent=0.0,
|
||||
app_ram_mb=0.0,
|
||||
gpu=None,
|
||||
)
|
||||
mem = psutil.virtual_memory()
|
||||
|
||||
# App-level metrics
|
||||
proc_mem = _process.memory_info()
|
||||
# Process.cpu_percent() is per-core (0–N*100%); normalize to 0–100% scale
|
||||
app_cpu = _process.cpu_percent(interval=None) / (psutil.cpu_count(logical=True) or 1)
|
||||
app_ram_mb = round(proc_mem.rss / 1024 / 1024, 1)
|
||||
metrics = get_metrics_provider()
|
||||
mem = metrics.virtual_memory()
|
||||
proc = metrics.process_snapshot()
|
||||
app_ram_mb = round(proc.rss_bytes / 1024 / 1024, 1)
|
||||
|
||||
gpu = None
|
||||
if _nvml_available:
|
||||
@@ -336,11 +306,11 @@ def get_system_performance(_: AuthRequired):
|
||||
|
||||
return PerformanceResponse(
|
||||
cpu_name=_cpu_name,
|
||||
cpu_percent=psutil.cpu_percent(interval=None),
|
||||
ram_used_mb=round(mem.used / 1024 / 1024, 1),
|
||||
ram_total_mb=round(mem.total / 1024 / 1024, 1),
|
||||
cpu_percent=metrics.cpu_percent(),
|
||||
ram_used_mb=round(mem.used_bytes / 1024 / 1024, 1),
|
||||
ram_total_mb=round(mem.total_bytes / 1024 / 1024, 1),
|
||||
ram_percent=mem.percent,
|
||||
app_cpu_percent=app_cpu,
|
||||
app_cpu_percent=proc.cpu_percent,
|
||||
app_ram_mb=app_ram_mb,
|
||||
gpu=gpu,
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
|
||||
@@ -6,17 +6,13 @@ from collections import deque
|
||||
from datetime import datetime, timezone
|
||||
from typing import Dict, Optional
|
||||
|
||||
try:
|
||||
import psutil
|
||||
except ImportError:
|
||||
psutil = None # type: ignore[assignment]
|
||||
|
||||
from ledgrab.utils import get_logger
|
||||
from ledgrab.utils.gpu import (
|
||||
nvml_available as _nvml_available,
|
||||
nvml as _nvml,
|
||||
nvml_handle as _nvml_handle,
|
||||
)
|
||||
from ledgrab.utils.metrics import get_metrics_provider
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -24,44 +20,22 @@ MAX_SAMPLES = 120 # ~2 minutes at 1-second interval
|
||||
SAMPLE_INTERVAL = 1.0 # seconds
|
||||
|
||||
|
||||
if psutil is not None:
|
||||
_process = psutil.Process(os.getpid())
|
||||
_process.cpu_percent(interval=None) # prime process-level counter
|
||||
else:
|
||||
_process = None # type: ignore[assignment]
|
||||
|
||||
|
||||
def _collect_system_snapshot() -> dict:
|
||||
"""Collect CPU/RAM/GPU metrics (blocking — run in thread pool).
|
||||
|
||||
Returns a dict suitable for direct JSON serialization.
|
||||
"""
|
||||
if psutil is None or _process is None:
|
||||
# psutil unavailable (e.g. Android) — return zeroed snapshot
|
||||
return {
|
||||
"t": datetime.now(timezone.utc).isoformat(),
|
||||
"cpu": 0.0,
|
||||
"ram_pct": 0.0,
|
||||
"ram_used": 0.0,
|
||||
"ram_total": 0.0,
|
||||
"app_cpu": 0.0,
|
||||
"app_ram": 0.0,
|
||||
"gpu_util": None,
|
||||
"gpu_temp": None,
|
||||
"app_gpu_mem": None,
|
||||
}
|
||||
|
||||
mem = psutil.virtual_memory()
|
||||
proc_mem = _process.memory_info()
|
||||
metrics = get_metrics_provider()
|
||||
mem = metrics.virtual_memory()
|
||||
proc = metrics.process_snapshot()
|
||||
snapshot = {
|
||||
"t": datetime.now(timezone.utc).isoformat(),
|
||||
"cpu": psutil.cpu_percent(interval=None),
|
||||
"cpu": metrics.cpu_percent(),
|
||||
"ram_pct": mem.percent,
|
||||
"ram_used": round(mem.used / 1024 / 1024, 1),
|
||||
"ram_total": round(mem.total / 1024 / 1024, 1),
|
||||
# Process.cpu_percent() is per-core (0–N*100%); normalize to 0–100%
|
||||
"app_cpu": _process.cpu_percent(interval=None) / (psutil.cpu_count(logical=True) or 1),
|
||||
"app_ram": round(proc_mem.rss / 1024 / 1024, 1),
|
||||
"ram_used": round(mem.used_bytes / 1024 / 1024, 1),
|
||||
"ram_total": round(mem.total_bytes / 1024 / 1024, 1),
|
||||
"app_cpu": proc.cpu_percent,
|
||||
"app_ram": round(proc.rss_bytes / 1024 / 1024, 1),
|
||||
"gpu_util": None,
|
||||
"gpu_temp": None,
|
||||
"app_gpu_mem": None,
|
||||
@@ -70,6 +44,7 @@ def _collect_system_snapshot() -> dict:
|
||||
try:
|
||||
if _nvml_available:
|
||||
util = _nvml.nvmlDeviceGetUtilizationRates(_nvml_handle)
|
||||
_ = os.getpid # keep import lint-clean for the os.getpid call below
|
||||
temp = _nvml.nvmlDeviceGetTemperature(_nvml_handle, _nvml.NVML_TEMPERATURE_GPU)
|
||||
snapshot["gpu_util"] = float(util.gpu)
|
||||
snapshot["gpu_temp"] = float(temp)
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
"""System metrics provider abstraction.
|
||||
|
||||
Wraps the per-platform metrics source so the rest of the codebase doesn't
|
||||
need ``if psutil is not None`` guards at every call site. Selection
|
||||
order in :func:`get_metrics_provider`:
|
||||
|
||||
1. :class:`AndroidMetricsProvider` — when running under Chaquopy and
|
||||
``/proc/stat`` + ``/proc/meminfo`` are readable.
|
||||
2. :class:`PsutilMetricsProvider` — desktop platforms with psutil.
|
||||
3. :class:`NullMetricsProvider` — last-ditch fallback returning zeros.
|
||||
|
||||
Each provider lives in its own module — see ``psutil_provider.py``,
|
||||
``android_provider.py``, ``null_provider.py``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from ledgrab.utils.platform import is_android
|
||||
|
||||
from .android_provider import AndroidMetricsProvider, is_supported as _android_supported
|
||||
from .null_provider import NullMetricsProvider
|
||||
from .psutil_provider import PsutilMetricsProvider
|
||||
from .types import MemorySnapshot, MetricsProvider, ProcessSnapshot
|
||||
|
||||
__all__ = [
|
||||
"AndroidMetricsProvider",
|
||||
"MemorySnapshot",
|
||||
"MetricsProvider",
|
||||
"NullMetricsProvider",
|
||||
"ProcessSnapshot",
|
||||
"PsutilMetricsProvider",
|
||||
"get_metrics_provider",
|
||||
"reset_metrics_provider",
|
||||
]
|
||||
|
||||
|
||||
_provider: MetricsProvider | None = None
|
||||
|
||||
|
||||
def get_metrics_provider() -> MetricsProvider:
|
||||
"""Return the process-wide metrics provider (created on first call).
|
||||
|
||||
Idempotent — priming side effects inside providers run exactly once
|
||||
per process.
|
||||
"""
|
||||
global _provider
|
||||
if _provider is None:
|
||||
if is_android() and _android_supported():
|
||||
_provider = AndroidMetricsProvider()
|
||||
else:
|
||||
try:
|
||||
import psutil
|
||||
except ImportError:
|
||||
_provider = NullMetricsProvider()
|
||||
else:
|
||||
_provider = PsutilMetricsProvider(psutil)
|
||||
return _provider
|
||||
|
||||
|
||||
def reset_metrics_provider() -> None:
|
||||
"""Reset the cached provider — for tests only."""
|
||||
global _provider
|
||||
_provider = None
|
||||
@@ -0,0 +1,191 @@
|
||||
"""Android metrics provider — reads /proc directly (no psutil needed).
|
||||
|
||||
Chaquopy doesn't ship a working psutil on Android, but the kernel
|
||||
exposes the same data through ``/proc``. This provider tracks the
|
||||
previous sample of ``/proc/stat`` and ``/proc/self/stat`` so it can
|
||||
compute CPU% deltas the same way ``psutil.cpu_percent(interval=None)``
|
||||
does on desktop.
|
||||
|
||||
If any of the expected ``/proc`` files become unreadable (some Android
|
||||
flavors lock down ``/proc/self/stat`` for non-root apps), the provider
|
||||
silently falls back to zero values for the affected metric instead of
|
||||
crashing the dashboard. :func:`is_supported` lets the factory decide
|
||||
whether this provider is even worth instantiating on the host.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from .types import MemorySnapshot, ProcessSnapshot
|
||||
|
||||
|
||||
def is_supported() -> bool:
|
||||
"""Return True iff /proc/stat and /proc/meminfo are readable here."""
|
||||
try:
|
||||
with open("/proc/stat", "r"):
|
||||
pass
|
||||
with open("/proc/meminfo", "r"):
|
||||
pass
|
||||
except OSError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@dataclass
|
||||
class _CpuSample:
|
||||
total: int
|
||||
busy: int
|
||||
|
||||
|
||||
def _read_proc_stat() -> Optional[_CpuSample]:
|
||||
"""Aggregate CPU jiffies from the first ``cpu`` line of /proc/stat."""
|
||||
try:
|
||||
with open("/proc/stat", "r") as f:
|
||||
line = f.readline()
|
||||
except OSError:
|
||||
return None
|
||||
parts = line.split()
|
||||
if not parts or parts[0] != "cpu":
|
||||
return None
|
||||
try:
|
||||
# user nice system idle iowait irq softirq steal guest guest_nice
|
||||
nums = [int(x) for x in parts[1:]]
|
||||
except ValueError:
|
||||
return None
|
||||
if len(nums) < 4:
|
||||
return None
|
||||
idle = nums[3] + (nums[4] if len(nums) > 4 else 0) # idle + iowait
|
||||
total = sum(nums)
|
||||
return _CpuSample(total=total, busy=total - idle)
|
||||
|
||||
|
||||
def _read_proc_self_stat_jiffies() -> Optional[int]:
|
||||
"""Return user+system jiffies for the current process, or None on failure."""
|
||||
try:
|
||||
with open("/proc/self/stat", "rb") as f:
|
||||
data = f.read()
|
||||
except OSError:
|
||||
return None
|
||||
# The comm field (parens) can contain spaces; parse from the last ')'
|
||||
end = data.rfind(b")")
|
||||
if end < 0:
|
||||
return None
|
||||
parts = data[end + 1 :].split()
|
||||
# After comm (and state), positions:
|
||||
# 0=state 1=ppid 2=pgrp 3=session 4=tty_nr 5=tpgid 6=flags
|
||||
# 7=minflt 8=cminflt 9=majflt 10=cmajflt 11=utime 12=stime ...
|
||||
if len(parts) < 13:
|
||||
return None
|
||||
try:
|
||||
return int(parts[11]) + int(parts[12])
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def _read_meminfo() -> MemorySnapshot:
|
||||
"""Parse /proc/meminfo into a MemorySnapshot. Zeroed on failure."""
|
||||
fields: dict[str, int] = {}
|
||||
try:
|
||||
with open("/proc/meminfo", "r") as f:
|
||||
for line in f:
|
||||
key, _, rest = line.partition(":")
|
||||
if not rest:
|
||||
continue
|
||||
val = rest.strip().split()
|
||||
if not val:
|
||||
continue
|
||||
try:
|
||||
# Values are in kB
|
||||
fields[key] = int(val[0]) * 1024
|
||||
except ValueError:
|
||||
continue
|
||||
except OSError:
|
||||
return MemorySnapshot(0, 0, 0.0)
|
||||
|
||||
total = fields.get("MemTotal", 0)
|
||||
available = fields.get("MemAvailable", fields.get("MemFree", 0))
|
||||
if total <= 0:
|
||||
return MemorySnapshot(0, 0, 0.0)
|
||||
used = max(0, total - available)
|
||||
return MemorySnapshot(
|
||||
used_bytes=used,
|
||||
total_bytes=total,
|
||||
percent=round(used * 100.0 / total, 1),
|
||||
)
|
||||
|
||||
|
||||
def _read_self_rss_bytes() -> int:
|
||||
"""Read VmRSS (resident set size) for the current process from /proc/self/status."""
|
||||
try:
|
||||
with open("/proc/self/status", "r") as f:
|
||||
for line in f:
|
||||
if line.startswith("VmRSS:"):
|
||||
parts = line.split()
|
||||
# "VmRSS: 12345 kB"
|
||||
if len(parts) >= 2:
|
||||
try:
|
||||
return int(parts[1]) * 1024
|
||||
except ValueError:
|
||||
return 0
|
||||
except OSError:
|
||||
return 0
|
||||
return 0
|
||||
|
||||
|
||||
class AndroidMetricsProvider:
|
||||
"""Reads CPU/RAM from /proc — used on Android via Chaquopy."""
|
||||
|
||||
available: bool = True
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._cpu_count = os.cpu_count() or 1
|
||||
# Prime the deltas so the first real sample is meaningful.
|
||||
self._last_host: Optional[_CpuSample] = _read_proc_stat()
|
||||
self._last_proc_jiffies: Optional[int] = _read_proc_self_stat_jiffies()
|
||||
self._last_host_total: Optional[int] = self._last_host.total if self._last_host else None
|
||||
|
||||
def cpu_percent(self) -> float:
|
||||
sample = _read_proc_stat()
|
||||
if sample is None or self._last_host is None:
|
||||
self._last_host = sample
|
||||
return 0.0
|
||||
d_total = sample.total - self._last_host.total
|
||||
d_busy = sample.busy - self._last_host.busy
|
||||
self._last_host = sample
|
||||
if d_total <= 0:
|
||||
return 0.0
|
||||
return round(d_busy * 100.0 / d_total, 1)
|
||||
|
||||
def cpu_count(self) -> int:
|
||||
return self._cpu_count
|
||||
|
||||
def virtual_memory(self) -> MemorySnapshot:
|
||||
return _read_meminfo()
|
||||
|
||||
def process_snapshot(self) -> ProcessSnapshot:
|
||||
proc_jiffies = _read_proc_self_stat_jiffies()
|
||||
host_sample = _read_proc_stat()
|
||||
|
||||
cpu = 0.0
|
||||
if (
|
||||
proc_jiffies is not None
|
||||
and self._last_proc_jiffies is not None
|
||||
and host_sample is not None
|
||||
and self._last_host_total is not None
|
||||
):
|
||||
d_proc = proc_jiffies - self._last_proc_jiffies
|
||||
d_host = host_sample.total - self._last_host_total
|
||||
if d_host > 0 and d_proc >= 0:
|
||||
# d_proc / d_host gives fraction of *one* core; multiply by
|
||||
# cpu_count for raw N*100% scale, then normalize to 0–100%.
|
||||
cpu = round(d_proc * 100.0 / d_host, 1)
|
||||
|
||||
if proc_jiffies is not None:
|
||||
self._last_proc_jiffies = proc_jiffies
|
||||
if host_sample is not None:
|
||||
self._last_host_total = host_sample.total
|
||||
|
||||
return ProcessSnapshot(cpu_percent=cpu, rss_bytes=_read_self_rss_bytes())
|
||||
@@ -0,0 +1,28 @@
|
||||
"""Zero-valued metrics provider used when no real source is available."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from .types import MemorySnapshot, ProcessSnapshot
|
||||
|
||||
|
||||
class NullMetricsProvider:
|
||||
"""Returns zeros for every metric.
|
||||
|
||||
Used on platforms where psutil is unavailable (Android/Chaquopy) and
|
||||
no platform-native provider is wired up yet. The dashboard still
|
||||
renders; charts just stay flat at zero instead of crashing.
|
||||
"""
|
||||
|
||||
available: bool = False
|
||||
|
||||
def cpu_percent(self) -> float:
|
||||
return 0.0
|
||||
|
||||
def cpu_count(self) -> int:
|
||||
return 1
|
||||
|
||||
def virtual_memory(self) -> MemorySnapshot:
|
||||
return MemorySnapshot(used_bytes=0, total_bytes=0, percent=0.0)
|
||||
|
||||
def process_snapshot(self) -> ProcessSnapshot:
|
||||
return ProcessSnapshot(cpu_percent=0.0, rss_bytes=0)
|
||||
@@ -0,0 +1,46 @@
|
||||
"""psutil-backed metrics provider for desktop platforms."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
|
||||
from .types import MemorySnapshot, ProcessSnapshot
|
||||
|
||||
|
||||
class PsutilMetricsProvider:
|
||||
"""psutil-backed provider for Windows/macOS/Linux desktop hosts.
|
||||
|
||||
Primes psutil's interval-based CPU counters at construction so the
|
||||
first real sample returns a meaningful value instead of 0.0. The
|
||||
logical CPU count is cached because it never changes during a
|
||||
process's lifetime.
|
||||
"""
|
||||
|
||||
available: bool = True
|
||||
|
||||
def __init__(self, psutil_module) -> None:
|
||||
self._psutil = psutil_module
|
||||
psutil_module.cpu_percent(interval=None)
|
||||
self._process = psutil_module.Process(os.getpid())
|
||||
self._process.cpu_percent(interval=None)
|
||||
self._cpu_count = int(psutil_module.cpu_count(logical=True) or 1)
|
||||
|
||||
def cpu_percent(self) -> float:
|
||||
return float(self._psutil.cpu_percent(interval=None))
|
||||
|
||||
def cpu_count(self) -> int:
|
||||
return self._cpu_count
|
||||
|
||||
def virtual_memory(self) -> MemorySnapshot:
|
||||
m = self._psutil.virtual_memory()
|
||||
return MemorySnapshot(
|
||||
used_bytes=int(m.used),
|
||||
total_bytes=int(m.total),
|
||||
percent=float(m.percent),
|
||||
)
|
||||
|
||||
def process_snapshot(self) -> ProcessSnapshot:
|
||||
# psutil's Process.cpu_percent() returns 0–N*100%; normalize to 0–100%.
|
||||
cpu = self._process.cpu_percent(interval=None) / self._cpu_count
|
||||
rss = int(self._process.memory_info().rss)
|
||||
return ProcessSnapshot(cpu_percent=float(cpu), rss_bytes=rss)
|
||||
@@ -0,0 +1,30 @@
|
||||
"""Shared types for the metrics provider abstraction."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Protocol
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MemorySnapshot:
|
||||
used_bytes: int
|
||||
total_bytes: int
|
||||
percent: float
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ProcessSnapshot:
|
||||
cpu_percent: float # normalized to 0–100% across all cores
|
||||
rss_bytes: int
|
||||
|
||||
|
||||
class MetricsProvider(Protocol):
|
||||
"""Read-only host + current-process metrics."""
|
||||
|
||||
available: bool
|
||||
|
||||
def cpu_percent(self) -> float: ...
|
||||
def cpu_count(self) -> int: ...
|
||||
def virtual_memory(self) -> MemorySnapshot: ...
|
||||
def process_snapshot(self) -> ProcessSnapshot: ...
|
||||
@@ -0,0 +1,183 @@
|
||||
"""Tests for the metrics provider abstraction."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from unittest.mock import MagicMock, mock_open, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.utils.metrics import (
|
||||
AndroidMetricsProvider,
|
||||
MemorySnapshot,
|
||||
NullMetricsProvider,
|
||||
ProcessSnapshot,
|
||||
PsutilMetricsProvider,
|
||||
get_metrics_provider,
|
||||
reset_metrics_provider,
|
||||
)
|
||||
from ledgrab.utils.metrics import android_provider as android_mod
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_provider_cache():
|
||||
reset_metrics_provider()
|
||||
yield
|
||||
reset_metrics_provider()
|
||||
|
||||
|
||||
def test_null_provider_returns_zero_metrics() -> None:
|
||||
p = NullMetricsProvider()
|
||||
assert p.available is False
|
||||
assert p.cpu_percent() == 0.0
|
||||
assert p.cpu_count() == 1
|
||||
assert p.virtual_memory() == MemorySnapshot(0, 0, 0.0)
|
||||
assert p.process_snapshot() == ProcessSnapshot(0.0, 0)
|
||||
|
||||
|
||||
def test_psutil_provider_normalizes_process_cpu() -> None:
|
||||
psutil_mock = MagicMock()
|
||||
psutil_mock.cpu_percent.return_value = 42.5
|
||||
psutil_mock.cpu_count.return_value = 8
|
||||
mem = MagicMock(used=2_000_000, total=8_000_000, percent=25.0)
|
||||
psutil_mock.virtual_memory.return_value = mem
|
||||
proc_mock = MagicMock()
|
||||
# Per-core 0–N*100% — 800% means all 8 cores fully busy → 100% normalized.
|
||||
proc_mock.cpu_percent.return_value = 800.0
|
||||
proc_mock.memory_info.return_value = MagicMock(rss=1_500_000)
|
||||
psutil_mock.Process.return_value = proc_mock
|
||||
|
||||
provider = PsutilMetricsProvider(psutil_mock)
|
||||
|
||||
# Two priming calls expected at construction (host + process counters).
|
||||
assert psutil_mock.cpu_percent.call_count == 1
|
||||
assert proc_mock.cpu_percent.call_count == 1
|
||||
|
||||
assert provider.available is True
|
||||
assert provider.cpu_percent() == 42.5
|
||||
assert provider.cpu_count() == 8
|
||||
|
||||
snap = provider.virtual_memory()
|
||||
assert snap.used_bytes == 2_000_000
|
||||
assert snap.total_bytes == 8_000_000
|
||||
assert snap.percent == 25.0
|
||||
|
||||
proc = provider.process_snapshot()
|
||||
assert proc.cpu_percent == 100.0 # 800% / 8 cores
|
||||
assert proc.rss_bytes == 1_500_000
|
||||
|
||||
|
||||
def test_psutil_provider_handles_unknown_cpu_count() -> None:
|
||||
psutil_mock = MagicMock()
|
||||
psutil_mock.cpu_count.return_value = None # psutil sometimes returns None
|
||||
psutil_mock.Process.return_value = MagicMock()
|
||||
|
||||
provider = PsutilMetricsProvider(psutil_mock)
|
||||
|
||||
assert provider.cpu_count() == 1 # falls back to 1 to avoid div-by-zero
|
||||
|
||||
|
||||
def test_factory_returns_psutil_provider_when_available() -> None:
|
||||
pytest.importorskip("psutil")
|
||||
provider = get_metrics_provider()
|
||||
assert isinstance(provider, PsutilMetricsProvider)
|
||||
assert provider.available is True
|
||||
# Same instance on subsequent calls — provider is cached.
|
||||
assert get_metrics_provider() is provider
|
||||
|
||||
|
||||
def test_factory_falls_back_to_null_when_psutil_missing(monkeypatch) -> None:
|
||||
# Hide psutil from the import system for this test.
|
||||
monkeypatch.setitem(sys.modules, "psutil", None)
|
||||
provider = get_metrics_provider()
|
||||
assert isinstance(provider, NullMetricsProvider)
|
||||
assert provider.available is False
|
||||
|
||||
|
||||
# ── Android provider ────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_android_meminfo_parses_kb_values(monkeypatch) -> None:
|
||||
sample = (
|
||||
"MemTotal: 2000000 kB\n"
|
||||
"MemFree: 500000 kB\n"
|
||||
"MemAvailable: 1500000 kB\n"
|
||||
)
|
||||
with patch("builtins.open", mock_open(read_data=sample)):
|
||||
snap = android_mod._read_meminfo()
|
||||
assert snap.total_bytes == 2000000 * 1024
|
||||
# used = total - available
|
||||
assert snap.used_bytes == 500000 * 1024
|
||||
assert snap.percent == 25.0
|
||||
|
||||
|
||||
def test_android_meminfo_returns_zero_on_missing_file(monkeypatch) -> None:
|
||||
def _raise(*args, **kwargs):
|
||||
raise OSError("simulated")
|
||||
|
||||
monkeypatch.setattr("builtins.open", _raise)
|
||||
snap = android_mod._read_meminfo()
|
||||
assert snap == MemorySnapshot(0, 0, 0.0)
|
||||
|
||||
|
||||
def test_android_proc_self_stat_parses_with_paren_in_comm() -> None:
|
||||
# Process name "(weird) name" — embedded parens are the classic /proc trap.
|
||||
fields = ["S", "1", "1", "1", "0", "-1", "0", "0", "0", "0", "0", "150", "75"]
|
||||
raw = b"42 ((weird) name) " + " ".join(fields).encode() + b"\n"
|
||||
m = mock_open(read_data=raw)
|
||||
with patch("builtins.open", m):
|
||||
jiffies = android_mod._read_proc_self_stat_jiffies()
|
||||
assert jiffies == 150 + 75
|
||||
|
||||
|
||||
def test_android_provider_cpu_percent_uses_delta() -> None:
|
||||
# First sample: total=1000, busy=200. Second sample: total=2000, busy=900.
|
||||
# Delta busy/total = 700/1000 = 70%.
|
||||
samples = iter(
|
||||
[
|
||||
android_mod._CpuSample(total=1000, busy=200),
|
||||
android_mod._CpuSample(total=2000, busy=900),
|
||||
]
|
||||
)
|
||||
with patch.object(android_mod, "_read_proc_stat", lambda: next(samples)):
|
||||
with patch.object(android_mod, "_read_proc_self_stat_jiffies", lambda: 0):
|
||||
provider = AndroidMetricsProvider()
|
||||
assert provider.cpu_percent() == 70.0
|
||||
|
||||
|
||||
def test_android_provider_process_cpu_normalized_across_cores() -> None:
|
||||
# Process consumed 400 jiffies while host clock advanced 1000 jiffies
|
||||
# across all cores → 40% of one CPU's worth of work.
|
||||
host_samples = iter(
|
||||
[
|
||||
android_mod._CpuSample(total=1000, busy=500),
|
||||
android_mod._CpuSample(total=2000, busy=1500),
|
||||
]
|
||||
)
|
||||
proc_samples = iter([100, 500])
|
||||
with patch.object(android_mod, "_read_proc_stat", lambda: next(host_samples)):
|
||||
with patch.object(android_mod, "_read_proc_self_stat_jiffies", lambda: next(proc_samples)):
|
||||
with patch.object(android_mod, "_read_self_rss_bytes", lambda: 12345):
|
||||
with patch.object(android_mod.os, "cpu_count", lambda: 4):
|
||||
provider = AndroidMetricsProvider()
|
||||
snap = provider.process_snapshot()
|
||||
assert snap.cpu_percent == 40.0
|
||||
assert snap.rss_bytes == 12345
|
||||
|
||||
|
||||
def test_android_provider_handles_missing_proc_files() -> None:
|
||||
with patch.object(android_mod, "_read_proc_stat", lambda: None):
|
||||
with patch.object(android_mod, "_read_proc_self_stat_jiffies", lambda: None):
|
||||
with patch.object(android_mod, "_read_self_rss_bytes", lambda: 0):
|
||||
provider = AndroidMetricsProvider()
|
||||
# No samples available → 0.0, not an exception.
|
||||
assert provider.cpu_percent() == 0.0
|
||||
snap = provider.process_snapshot()
|
||||
assert snap == ProcessSnapshot(0.0, 0)
|
||||
|
||||
|
||||
def test_factory_prefers_android_when_running_on_android(monkeypatch) -> None:
|
||||
monkeypatch.setattr("ledgrab.utils.metrics.is_android", lambda: True)
|
||||
monkeypatch.setattr("ledgrab.utils.metrics._android_supported", lambda: True)
|
||||
provider = get_metrics_provider()
|
||||
assert isinstance(provider, AndroidMetricsProvider)
|
||||
Reference in New Issue
Block a user