feat(value-sources): optional normalization for magnitude sources
Add an opt-out `normalize` flag to the four magnitude value sources (ha_entity, http, system_metrics, game_event). get_value() stays in [0,1] for every source (the normalized scalar-bus invariant), so all existing consumers — brightness sinks, gradient_map, template `name` bindings, and color-strip bindable floats — are unaffected. - normalize=False is a clamp-passthrough: skip the min/max rescale and clamp the raw reading into [0,1] (for sources already reporting a 0..1 fraction). The un-normalized magnitude stays available via get_raw_value() / template raw[name] / automations. - Add get_raw_value() + a raw channel to GameEventValueStream (it had none). game_event's flag is model/stream-level only (no CRUD schema). - Finite-safe clamp01() util; harden the composite-layer brightness multiply (latent negative-wrap / >=1 skip) with it. - Preview WebSocket: tolerate non-numeric raw, generalize raw-range. - Frontend: settings-toggle slider per HA/system_metrics/http editor with min/max grey-out; toggle hidden for fixed-mapping percent metrics. en/ru/zh locale keys. - Additive optional field (default True) — JSON round-trip, no migration. Tests: store create/update round-trip, clamp-passthrough, live normalize flip, game_event raw channel + build_stream forwarding, and finite-safe clamp01.
This commit is contained in:
@@ -0,0 +1,380 @@
|
||||
"""Tests for the optional-normalization feature on magnitude value sources.
|
||||
|
||||
Covers, for HA / HTTP / system_metrics / game_event:
|
||||
* the ``normalize`` flag round-trips through to_dict/from_dict and defaults to
|
||||
True for old rows that predate the field,
|
||||
* ``normalize=False`` makes get_value() a finite-safe clamp passthrough while
|
||||
get_value() stays in [0, 1] (the normalized scalar-bus contract), and
|
||||
* the raw magnitude remains available via get_raw_value() (added to
|
||||
game_event, which had no raw channel before).
|
||||
Plus a unit test for the shared finite-safe ``clamp01`` helper.
|
||||
"""
|
||||
|
||||
import math
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.core.game_integration.event_bus import GameEventBus
|
||||
from ledgrab.core.game_integration.events import GameEvent
|
||||
from ledgrab.core.processing.value_kinds import ValueStreamDeps, build_stream
|
||||
from ledgrab.core.processing.value_stream import (
|
||||
HAEntityValueStream,
|
||||
HTTPValueStream,
|
||||
SystemMetricsValueStream,
|
||||
)
|
||||
from ledgrab.core.value_sources.game_event_value_source import GameEventValueStream
|
||||
from ledgrab.storage.value_source import ValueSource
|
||||
from ledgrab.utils import clamp01
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# clamp01 finite-safe helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestClamp01:
|
||||
def test_in_range_passthrough(self):
|
||||
assert clamp01(0.0) == 0.0
|
||||
assert clamp01(0.42) == pytest.approx(0.42)
|
||||
assert clamp01(1.0) == 1.0
|
||||
|
||||
def test_out_of_range_clamped(self):
|
||||
assert clamp01(-2.5) == 0.0
|
||||
assert clamp01(5.0) == 1.0
|
||||
|
||||
def test_non_finite_rejected_to_default(self):
|
||||
assert clamp01(float("nan")) == 0.0
|
||||
assert clamp01(float("inf")) == 0.0
|
||||
assert clamp01(float("-inf")) == 0.0
|
||||
# custom default is honoured
|
||||
assert clamp01(float("nan"), default=0.5) == 0.5
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Round-trip persistence of the normalize flag
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _base(source_type: str, **extra) -> dict:
|
||||
return {
|
||||
"id": f"vs_{source_type}",
|
||||
"name": source_type,
|
||||
"source_type": source_type,
|
||||
"created_at": "2025-01-01T00:00:00+00:00",
|
||||
"updated_at": "2025-01-01T00:00:00+00:00",
|
||||
**extra,
|
||||
}
|
||||
|
||||
|
||||
class TestNormalizeRoundTrip:
|
||||
@pytest.mark.parametrize(
|
||||
"source_type,extra",
|
||||
[
|
||||
("ha_entity", {"ha_source_id": "ha1", "entity_id": "sensor.temp"}),
|
||||
("http", {"http_endpoint_id": "ep1"}),
|
||||
("system_metrics", {"metric": "cpu_load"}),
|
||||
("game_event", {}),
|
||||
],
|
||||
)
|
||||
def test_normalize_false_round_trips(self, source_type, extra):
|
||||
src = ValueSource.from_dict(_base(source_type, normalize=False, **extra))
|
||||
assert src.normalize is False
|
||||
assert src.to_dict()["normalize"] is False
|
||||
restored = ValueSource.from_dict(src.to_dict())
|
||||
assert restored.normalize is False
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"source_type,extra",
|
||||
[
|
||||
("ha_entity", {"ha_source_id": "ha1", "entity_id": "sensor.temp"}),
|
||||
("http", {"http_endpoint_id": "ep1"}),
|
||||
("system_metrics", {"metric": "cpu_load"}),
|
||||
("game_event", {}),
|
||||
],
|
||||
)
|
||||
def test_missing_key_defaults_true(self, source_type, extra):
|
||||
"""An old row written before the field existed reads back normalize=True."""
|
||||
src = ValueSource.from_dict(_base(source_type, **extra))
|
||||
assert src.normalize is True
|
||||
assert src.to_dict()["normalize"] is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HTTP stream — clamp passthrough + verbatim raw
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _http(normalize: bool, raw) -> HTTPValueStream:
|
||||
s = HTTPValueStream(
|
||||
endpoint_id="ep",
|
||||
json_path="",
|
||||
interval_s=60,
|
||||
min_value=0.0,
|
||||
max_value=100.0,
|
||||
smoothing=0.0,
|
||||
normalize=normalize,
|
||||
)
|
||||
s._raw_value = raw # bypass the poll loop; exercise get_value() directly
|
||||
return s
|
||||
|
||||
|
||||
class TestHttpNormalize:
|
||||
def test_normalize_true_rescales(self):
|
||||
assert _http(True, 50.0).get_value() == pytest.approx(0.5)
|
||||
|
||||
def test_normalize_false_clamps_in_range(self):
|
||||
assert _http(False, 0.7).get_value() == pytest.approx(0.7)
|
||||
|
||||
def test_normalize_false_clamps_out_of_range(self):
|
||||
assert _http(False, 5.0).get_value() == 1.0
|
||||
assert _http(False, -3.0).get_value() == 0.0
|
||||
|
||||
def test_normalize_false_non_finite_safe(self):
|
||||
assert _http(False, float("inf")).get_value() == 0.0
|
||||
assert _http(False, float("nan")).get_value() == 0.0
|
||||
|
||||
def test_non_numeric_raw_keeps_get_value_numeric_and_raw_verbatim(self):
|
||||
s = _http(False, "playing")
|
||||
assert s.get_value() == 0.0 # float("playing") fails -> safe fallback
|
||||
assert s.get_raw_value() == "playing" # verbatim for automations/templates
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HA stream — clamp passthrough via a minimal fake HA manager
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _FakeState:
|
||||
def __init__(self, state, attributes=None):
|
||||
self.state = state
|
||||
self.attributes = attributes or {}
|
||||
|
||||
|
||||
class _FakeHA:
|
||||
def __init__(self, state):
|
||||
self._state = state
|
||||
|
||||
def get_state(self, _source_id, _entity_id):
|
||||
return self._state
|
||||
|
||||
|
||||
def _ha(normalize: bool, state_value: str) -> HAEntityValueStream:
|
||||
return HAEntityValueStream(
|
||||
ha_source_id="ha1",
|
||||
entity_id="sensor.temp",
|
||||
attribute="",
|
||||
min_ha_value=0.0,
|
||||
max_ha_value=100.0,
|
||||
smoothing=0.0,
|
||||
normalize=normalize,
|
||||
ha_manager=_FakeHA(_FakeState(state_value)),
|
||||
)
|
||||
|
||||
|
||||
class TestHaNormalize:
|
||||
def test_normalize_true_rescales(self):
|
||||
assert _ha(True, "50").get_value() == pytest.approx(0.5)
|
||||
|
||||
def test_normalize_false_clamps_and_keeps_raw(self):
|
||||
s = _ha(False, "21.5") # e.g. °C — out of [0,1]
|
||||
assert s.get_value() == 1.0 # clamped
|
||||
assert s.get_raw_value() == pytest.approx(21.5) # un-clamped magnitude
|
||||
|
||||
def test_normalize_false_in_range_fraction(self):
|
||||
s = _ha(False, "0.3")
|
||||
assert s.get_value() == pytest.approx(0.3)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# system_metrics stream — clamp passthrough (raw reader monkeypatched)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSystemMetricsNormalize:
|
||||
def test_normalize_false_clamps_and_keeps_raw(self):
|
||||
s = SystemMetricsValueStream(
|
||||
metric="cpu_load", min_value=0.0, max_value=100.0, normalize=False
|
||||
)
|
||||
s._read_metric = lambda: 65.0 # type: ignore[method-assign]
|
||||
assert s.get_value() == 1.0 # 65 clamped to 1.0
|
||||
assert s.get_raw_value() == pytest.approx(65.0)
|
||||
|
||||
def test_normalize_false_in_range_fraction(self):
|
||||
s = SystemMetricsValueStream(
|
||||
metric="cpu_load", min_value=0.0, max_value=100.0, normalize=False
|
||||
)
|
||||
s._read_metric = lambda: 0.25 # type: ignore[method-assign]
|
||||
assert s.get_value() == pytest.approx(0.25)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# game_event stream — new raw channel + normalize flag
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _evt(value: float) -> GameEvent:
|
||||
return GameEvent(adapter_id="t", event_type="health", value=value)
|
||||
|
||||
|
||||
class TestGameEventNormalizeAndRaw:
|
||||
def test_raw_value_none_before_first_event(self):
|
||||
bus = GameEventBus()
|
||||
stream = GameEventValueStream(event_type="health", event_bus=bus)
|
||||
stream.start()
|
||||
assert stream.get_raw_value() is None
|
||||
stream.stop()
|
||||
|
||||
def test_raw_value_exposed_after_event(self):
|
||||
bus = GameEventBus()
|
||||
stream = GameEventValueStream(
|
||||
event_type="health", min_game_value=0.0, max_game_value=100.0, event_bus=bus
|
||||
)
|
||||
stream.start()
|
||||
bus.publish(_evt(73.0))
|
||||
assert stream.get_value() == pytest.approx(0.73) # normalized output
|
||||
assert stream.get_raw_value() == pytest.approx(73.0) # un-clamped raw
|
||||
stream.stop()
|
||||
|
||||
def test_normalize_false_clamps_output_keeps_raw(self):
|
||||
bus = GameEventBus()
|
||||
stream = GameEventValueStream(
|
||||
event_type="health",
|
||||
min_game_value=0.0,
|
||||
max_game_value=100.0,
|
||||
normalize=False,
|
||||
event_bus=bus,
|
||||
)
|
||||
stream.start()
|
||||
bus.publish(_evt(50.0)) # 50 is > 1 -> clamped to 1.0 in passthrough mode
|
||||
assert stream.get_value() == 1.0
|
||||
assert stream.get_raw_value() == pytest.approx(50.0)
|
||||
stream.stop()
|
||||
|
||||
def test_normalize_false_in_range_fraction(self):
|
||||
bus = GameEventBus()
|
||||
stream = GameEventValueStream(event_type="health", normalize=False, event_bus=bus)
|
||||
stream.start()
|
||||
bus.publish(_evt(0.4))
|
||||
assert stream.get_value() == pytest.approx(0.4)
|
||||
stream.stop()
|
||||
|
||||
def test_get_value_always_finite_in_passthrough(self):
|
||||
bus = GameEventBus()
|
||||
stream = GameEventValueStream(event_type="health", normalize=False, event_bus=bus)
|
||||
stream.start()
|
||||
bus.publish(_evt(math.inf))
|
||||
assert stream.get_value() == 0.0 # finite-safe clamp
|
||||
stream.stop()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Live update_source() flip of the normalize flag on a running stream
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# The design's load-bearing invariant: flipping normalize live never blends a
|
||||
# raw magnitude against a fraction in the EMA, because BOTH modes emit [0,1].
|
||||
# These tests lock that in (a future regression dropping a stream's
|
||||
# _normalize_enabled reassignment would otherwise pass every other test).
|
||||
|
||||
|
||||
class TestLiveNormalizeFlip:
|
||||
def test_http_flip_true_to_false_and_back(self):
|
||||
s = _http(True, 50.0)
|
||||
assert s.get_value() == pytest.approx(0.5) # rescaled
|
||||
s.update_source(
|
||||
ValueSource.from_dict(
|
||||
_base(
|
||||
"http", http_endpoint_id="ep", min_value=0.0, max_value=100.0, normalize=False
|
||||
)
|
||||
)
|
||||
)
|
||||
assert s.get_value() == 1.0 # now clamp-passthrough of 50.0
|
||||
s.update_source(
|
||||
ValueSource.from_dict(
|
||||
_base("http", http_endpoint_id="ep", min_value=0.0, max_value=100.0, normalize=True)
|
||||
)
|
||||
)
|
||||
assert s.get_value() == pytest.approx(0.5) # rescaled again, no stale-cache blend
|
||||
|
||||
def test_ha_flip_true_to_false(self):
|
||||
s = _ha(True, "50")
|
||||
assert s.get_value() == pytest.approx(0.5)
|
||||
s.update_source(
|
||||
ValueSource.from_dict(
|
||||
_base(
|
||||
"ha_entity",
|
||||
ha_source_id="ha1",
|
||||
entity_id="sensor.temp",
|
||||
min_ha_value=0.0,
|
||||
max_ha_value=100.0,
|
||||
normalize=False,
|
||||
)
|
||||
)
|
||||
)
|
||||
assert s.get_value() == 1.0
|
||||
|
||||
def test_system_metrics_flip_true_to_false(self):
|
||||
s = SystemMetricsValueStream(
|
||||
metric="cpu_load", min_value=0.0, max_value=100.0, normalize=True
|
||||
)
|
||||
s._read_metric = lambda: 65.0 # type: ignore[method-assign]
|
||||
first = s.get_value() # rescaled via the cpu_load spec
|
||||
assert 0.0 <= first <= 1.0
|
||||
s.update_source(
|
||||
ValueSource.from_dict(
|
||||
_base(
|
||||
"system_metrics",
|
||||
metric="cpu_load",
|
||||
min_value=0.0,
|
||||
max_value=100.0,
|
||||
normalize=False,
|
||||
)
|
||||
)
|
||||
)
|
||||
s._read_metric = lambda: 65.0 # type: ignore[method-assign]
|
||||
s._prev_value = None # force a fresh poll (bypass the poll-interval cache)
|
||||
assert s.get_value() == 1.0 # clamp-passthrough of 65.0
|
||||
|
||||
def test_game_event_flip_true_to_false(self):
|
||||
bus = GameEventBus()
|
||||
stream = GameEventValueStream(
|
||||
event_type="health",
|
||||
min_game_value=0.0,
|
||||
max_game_value=100.0,
|
||||
normalize=True,
|
||||
event_bus=bus,
|
||||
)
|
||||
stream.start()
|
||||
bus.publish(_evt(50.0))
|
||||
assert stream.get_value() == pytest.approx(0.5)
|
||||
stream.update_source(
|
||||
ValueSource.from_dict(
|
||||
_base("game_event", min_game_value=0.0, max_game_value=100.0, normalize=False)
|
||||
)
|
||||
)
|
||||
bus.publish(_evt(50.0))
|
||||
assert stream.get_value() == 1.0 # clamp-passthrough
|
||||
assert stream.get_raw_value() == pytest.approx(50.0)
|
||||
stream.stop()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# build_stream() forwarding — game_event's SOLE wiring path (no CRUD/schema)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBuildStreamForwarding:
|
||||
def test_game_event_builder_forwards_normalize(self):
|
||||
src = ValueSource.from_dict(
|
||||
_base("game_event", min_game_value=0.0, max_game_value=100.0, normalize=False)
|
||||
)
|
||||
bus = GameEventBus()
|
||||
deps = ValueStreamDeps(value_stream_manager=None, event_bus=bus)
|
||||
stream = build_stream(src, deps)
|
||||
assert isinstance(stream, GameEventValueStream)
|
||||
assert stream._normalize_enabled is False # forwarded from the dataclass
|
||||
stream.start()
|
||||
bus.publish(_evt(50.0))
|
||||
assert stream.get_value() == 1.0 # passthrough clamp, not 0.5 rescale
|
||||
assert stream.get_raw_value() == pytest.approx(50.0)
|
||||
stream.stop()
|
||||
Reference in New Issue
Block a user