feat(value-sources): optional normalization for magnitude sources

Add an opt-out `normalize` flag to the four magnitude value sources
(ha_entity, http, system_metrics, game_event). get_value() stays in
[0,1] for every source (the normalized scalar-bus invariant), so all
existing consumers — brightness sinks, gradient_map, template `name`
bindings, and color-strip bindable floats — are unaffected.

- normalize=False is a clamp-passthrough: skip the min/max rescale and
  clamp the raw reading into [0,1] (for sources already reporting a
  0..1 fraction). The un-normalized magnitude stays available via
  get_raw_value() / template raw[name] / automations.
- Add get_raw_value() + a raw channel to GameEventValueStream (it had
  none). game_event's flag is model/stream-level only (no CRUD schema).
- Finite-safe clamp01() util; harden the composite-layer brightness
  multiply (latent negative-wrap / >=1 skip) with it.
- Preview WebSocket: tolerate non-numeric raw, generalize raw-range.
- Frontend: settings-toggle slider per HA/system_metrics/http editor
  with min/max grey-out; toggle hidden for fixed-mapping percent
  metrics. en/ru/zh locale keys.
- Additive optional field (default True) — JSON round-trip, no migration.

Tests: store create/update round-trip, clamp-passthrough, live
normalize flip, game_event raw channel + build_stream forwarding,
and finite-safe clamp01.
This commit is contained in:
2026-06-02 02:24:40 +03:00
parent 6de61b965e
commit 669ae20824
20 changed files with 753 additions and 30 deletions
+40 -4
View File
@@ -174,6 +174,7 @@ _RESPONSE_MAP = {
min_ha_value=s.min_ha_value,
max_ha_value=s.max_ha_value,
smoothing=s.smoothing,
normalize=s.normalize,
),
GradientMapValueSource: lambda s: GradientMapValueSourceResponse(
id=s.id,
@@ -218,6 +219,7 @@ _RESPONSE_MAP = {
sensor_label=s.sensor_label,
poll_interval=s.poll_interval,
smoothing=s.smoothing,
normalize=s.normalize,
),
HTTPValueSource: lambda s: HTTPValueSourceResponse(
id=s.id,
@@ -234,6 +236,7 @@ _RESPONSE_MAP = {
min_value=s.min_value,
max_value=s.max_value,
smoothing=s.smoothing,
normalize=s.normalize,
),
TemplateValueSource: lambda s: TemplateValueSourceResponse(
id=s.id,
@@ -525,6 +528,27 @@ def _source_exists(store: ValueSourceStore, source_id: str) -> bool:
return False
# Per-stream (min, max) attribute pairs for the normalization range, so the
# preview can show where the raw value maps. Attribute names differ per stream
# type (historical), so probe each pair rather than assume one.
_RAW_RANGE_ATTRS: tuple[tuple[str, str], ...] = (
("_min_ha", "_max_ha"), # HAEntityValueStream
("_min_value", "_max_value"), # HTTPValueStream
("_min_val", "_max_val"), # SystemMetricsValueStream
("_min_game", "_max_game"), # GameEventValueStream
)
def _stream_raw_range(stream) -> list | None:
"""Return ``[min, max]`` for the stream's normalization range, or None."""
for lo_attr, hi_attr in _RAW_RANGE_ATTRS:
lo = getattr(stream, lo_attr, None)
hi = getattr(stream, hi_attr, None)
if isinstance(lo, (int, float)) and isinstance(hi, (int, float)):
return [lo, hi]
return None
# ===== REAL-TIME VALUE SOURCE TEST WEBSOCKET =====
@@ -588,10 +612,22 @@ async def test_value_source_ws(
msg["input_value"] = round(stream.get_input_value(), 4)
if hasattr(stream, "get_raw_value"):
raw = stream.get_raw_value()
if raw is not None:
msg["raw_value"] = round(raw, 4)
if hasattr(stream, "_min_ha"):
msg["raw_range"] = [stream._min_ha, stream._max_ha]
if isinstance(raw, bool):
# bool is a subclass of int — send as-is (don't coerce/round).
msg["raw_value"] = raw
elif isinstance(raw, (int, float)):
msg["raw_value"] = round(float(raw), 4)
elif raw is not None:
# Non-numeric raw (e.g. an HTTP string payload) — send verbatim
# rather than crash the socket on round().
msg["raw_value"] = raw
rng = _stream_raw_range(stream)
if rng is not None:
msg["raw_range"] = rng
# Tell the client whether this source is currently normalizing, so the
# preview can render the value as a fraction vs a clamped passthrough.
if hasattr(stream, "_normalize_enabled"):
msg["normalized"] = bool(stream._normalize_enabled)
await websocket.send_json(msg)
await asyncio.sleep(0.05)
except WebSocketDisconnect:
@@ -131,6 +131,9 @@ class HAEntityValueSourceResponse(_ValueSourceResponseBase):
min_ha_value: float = Field(description="Raw HA value mapped to output 0.0")
max_ha_value: float = Field(description="Raw HA value mapped to output 1.0")
smoothing: float = Field(description="EMA smoothing factor (0.0-1.0)")
normalize: bool = Field(
description="Rescale raw value to [0,1] via min/max; false clamps the raw value as-is"
)
class GradientMapValueSourceResponse(_ValueSourceResponseBase):
@@ -160,6 +163,9 @@ class SystemMetricsValueSourceResponse(_ValueSourceResponseBase):
sensor_label: str = Field(description="Sensor label for cpu_temp/fan_speed")
poll_interval: float = Field(description="Seconds between reads")
smoothing: float = Field(description="EMA smoothing factor (0.0-1.0)")
normalize: bool = Field(
description="Rescale raw value to [0,1] via min/max; false clamps the raw value as-is"
)
class HTTPValueSourceResponse(_ValueSourceResponseBase):
@@ -171,6 +177,9 @@ class HTTPValueSourceResponse(_ValueSourceResponseBase):
min_value: float = Field(description="Raw value mapped to output 0.0")
max_value: float = Field(description="Raw value mapped to output 1.0")
smoothing: float = Field(description="EMA smoothing factor (0.0-1.0)")
normalize: bool = Field(
description="Rescale raw value to [0,1] via min/max; false clamps the raw value as-is"
)
class TemplateValueSourceResponse(_ValueSourceResponseBase):
@@ -317,6 +326,9 @@ class HAEntityValueSourceCreate(_ValueSourceCreateBase):
min_ha_value: float = Field(0.0, description="Raw HA value mapped to output 0.0")
max_ha_value: float = Field(100.0, description="Raw HA value mapped to output 1.0")
smoothing: float = Field(0.0, description="EMA smoothing (0.0-1.0)", ge=0.0, le=1.0)
normalize: bool = Field(
True, description="Rescale raw value to [0,1] via min/max; false clamps the raw value as-is"
)
class GradientMapValueSourceCreate(_ValueSourceCreateBase):
@@ -343,6 +355,9 @@ class SystemMetricsValueSourceCreate(_ValueSourceCreateBase):
sensor_label: str = Field("", description="Sensor label for cpu_temp/fan_speed")
poll_interval: float = Field(1.0, description="Poll interval in seconds", ge=0.1, le=60.0)
smoothing: float = Field(0.0, description="EMA smoothing (0.0-1.0)", ge=0.0, le=1.0)
normalize: bool = Field(
True, description="Rescale raw value to [0,1] via min/max; false clamps the raw value as-is"
)
class HTTPValueSourceCreate(_ValueSourceCreateBase):
@@ -353,6 +368,9 @@ class HTTPValueSourceCreate(_ValueSourceCreateBase):
min_value: float = Field(0.0, description="Raw value mapped to output 0.0")
max_value: float = Field(100.0, description="Raw value mapped to output 1.0")
smoothing: float = Field(0.0, description="EMA smoothing (0.0-1.0)", ge=0.0, le=1.0)
normalize: bool = Field(
True, description="Rescale raw value to [0,1] via min/max; false clamps the raw value as-is"
)
class TemplateValueSourceCreate(_ValueSourceCreateBase):
@@ -499,6 +517,9 @@ class HAEntityValueSourceUpdate(_ValueSourceUpdateBase):
min_ha_value: float | None = Field(None, description="Min HA value")
max_ha_value: float | None = Field(None, description="Max HA value")
smoothing: float | None = Field(None, description="EMA smoothing", ge=0.0, le=1.0)
normalize: bool | None = Field(
None, description="Rescale raw via min/max (false = clamp as-is)"
)
class GradientMapValueSourceUpdate(_ValueSourceUpdateBase):
@@ -525,6 +546,9 @@ class SystemMetricsValueSourceUpdate(_ValueSourceUpdateBase):
sensor_label: str | None = Field(None, description="Sensor label")
poll_interval: float | None = Field(None, description="Poll interval", ge=0.1, le=60.0)
smoothing: float | None = Field(None, description="EMA smoothing", ge=0.0, le=1.0)
normalize: bool | None = Field(
None, description="Rescale raw via min/max (false = clamp as-is)"
)
class HTTPValueSourceUpdate(_ValueSourceUpdateBase):
@@ -535,6 +559,9 @@ class HTTPValueSourceUpdate(_ValueSourceUpdateBase):
min_value: float | None = Field(None, description="Raw value mapped to 0.0")
max_value: float | None = Field(None, description="Raw value mapped to 1.0")
smoothing: float | None = Field(None, description="EMA smoothing", ge=0.0, le=1.0)
normalize: bool | None = Field(
None, description="Rescale raw via min/max (false = clamp as-is)"
)
class TemplateValueSourceUpdate(_ValueSourceUpdateBase):
@@ -8,7 +8,7 @@ import numpy as np
from ledgrab.core.processing.color_strip_stream import ColorStripStream
from ledgrab.storage.bindable import bfloat
from ledgrab.utils import get_logger
from ledgrab.utils import clamp01, get_logger
from ledgrab.utils.frame_limiter import FrameLimiter
logger = get_logger(__name__)
@@ -176,7 +176,7 @@ class CompositeColorStripStream(ColorStripStream):
if i in self._brightness_streams:
_vs_id, vs = self._brightness_streams[i]
try:
result.append(vs.get_value())
result.append(clamp01(vs.get_value()))
except Exception:
result.append(None)
else:
@@ -660,10 +660,14 @@ class CompositeColorStripStream(ColorStripStream):
if layer.get("reverse", False):
colors = colors[::-1].copy()
# Apply per-layer brightness from value source
# Apply per-layer brightness from value source.
# clamp01 is finite-safe: it rejects nan/inf (which would
# crash the int() cast) and pins out-of-range values into
# [0,1] so the uint16 fixed-point multiply can't wrap on a
# negative. bri == 1.0 correctly skips the scale (no-op).
if i in self._brightness_streams:
_vs_id, vs = self._brightness_streams[i]
bri = vs.get_value()
bri = clamp01(vs.get_value())
if bri < 1.0:
colors = (colors.astype(np.uint16) * int(bri * 256) >> 8).astype(
np.uint8
@@ -193,6 +193,7 @@ def _build_ha_entity(source, d: ValueStreamDeps):
min_ha_value=source.min_ha_value,
max_ha_value=source.max_ha_value,
smoothing=source.smoothing,
normalize=source.normalize,
ha_manager=d.ha_manager,
)
@@ -232,6 +233,7 @@ def _build_system_metrics(source, _d: ValueStreamDeps):
sensor_label=source.sensor_label,
poll_interval=source.poll_interval,
smoothing=source.smoothing,
normalize=source.normalize,
)
@@ -249,6 +251,7 @@ def _build_game_event(source, d: ValueStreamDeps):
smoothing=source.smoothing,
default_value=source.default_value,
timeout=source.timeout,
normalize=source.normalize,
event_bus=d.event_bus,
)
@@ -263,6 +266,7 @@ def _build_http(source, d: ValueStreamDeps):
min_value=source.min_value,
max_value=source.max_value,
smoothing=source.smoothing,
normalize=source.normalize,
http_endpoint_store=d.http_endpoint_store,
)
@@ -33,7 +33,7 @@ import numpy as np
from ledgrab.core.processing import metric_readers as _metric_readers
from ledgrab.storage.base_store import EntityNotFoundError
from ledgrab.utils import get_logger
from ledgrab.utils import clamp01, get_logger
from ledgrab.utils.template_expr import (
TemplateValidationError,
compile_template,
@@ -915,6 +915,7 @@ class HAEntityValueStream(ValueStream):
min_ha_value: float = 0.0,
max_ha_value: float = 100.0,
smoothing: float = 0.0,
normalize: bool = True,
ha_manager: Any | None = None,
):
self._ha_source_id = ha_source_id
@@ -923,6 +924,7 @@ class HAEntityValueStream(ValueStream):
self._min_ha = min_ha_value
self._max_ha = max_ha_value
self._smoothing = smoothing
self._normalize_enabled = normalize
self._ha_manager = ha_manager
self._prev_value: float | None = None
self._raw_value: float | None = None
@@ -987,16 +989,23 @@ class HAEntityValueStream(ValueStream):
self._raw_value = raw
# Normalize to [0, 1]
if self._normalize_enabled:
# Normalize to [0, 1] via the configured min/max range.
ha_range = self._max_ha - self._min_ha
if abs(ha_range) < 1e-9:
normalized = 0.5
else:
normalized = (raw - self._min_ha) / ha_range
normalized = max(0.0, min(1.0, normalized))
else:
# Skip the rescale: treat the raw reading as already a 01 fraction
# (finite-safe clamp). The un-clamped magnitude stays on
# get_raw_value(); get_value() never leaves [0, 1] either way.
normalized = clamp01(raw)
# EMA smoothing
# EMA smoothing — both branches produce a [0, 1] value, so _prev_value
# is always normalized and flipping ``normalize`` live never blends a
# raw magnitude against a fraction.
if self._smoothing > 0.0 and self._prev_value is not None:
normalized = self._smoothing * self._prev_value + (1.0 - self._smoothing) * normalized
@@ -1020,6 +1029,7 @@ class HAEntityValueStream(ValueStream):
self._min_ha = source.min_ha_value
self._max_ha = source.max_ha_value
self._smoothing = source.smoothing
self._normalize_enabled = source.normalize
# If HA source changed, swap runtime
if source.ha_source_id != old_ha_source and self._ha_manager:
@@ -1063,6 +1073,7 @@ class HTTPValueStream(ValueStream):
min_value: float,
max_value: float,
smoothing: float,
normalize: bool = True,
http_endpoint_store: "HTTPEndpointStore" | None = None,
) -> None:
self._endpoint_id = endpoint_id
@@ -1071,6 +1082,7 @@ class HTTPValueStream(ValueStream):
self._min_value = min_value
self._max_value = max_value
self._smoothing = smoothing
self._normalize_enabled = normalize
self._http_endpoint_store = http_endpoint_store
self._task: asyncio.Task | None = None
self._raw_value: Any = None
@@ -1110,12 +1122,19 @@ class HTTPValueStream(ValueStream):
except (TypeError, ValueError):
return self._prev_normalized if self._prev_normalized is not None else 0.0
if self._normalize_enabled:
rng = self._max_value - self._min_value
if abs(rng) < 1e-9:
normalized = 0.5
else:
normalized = (numeric - self._min_value) / rng
normalized = max(0.0, min(1.0, normalized))
else:
# Skip the rescale: treat the extracted number as already a 01
# fraction (finite-safe clamp). The verbatim extracted value (which
# may be non-numeric) stays on get_raw_value(); get_value() is always
# a float in [0, 1].
normalized = clamp01(numeric)
if self._smoothing > 0.0 and self._prev_normalized is not None:
normalized = (
@@ -1139,6 +1158,7 @@ class HTTPValueStream(ValueStream):
self._min_value = source.min_value
self._max_value = source.max_value
self._smoothing = source.smoothing
self._normalize_enabled = source.normalize
async def _poll_loop(self) -> None:
from ledgrab.utils.safe_source import safe_request_bounded
@@ -1674,6 +1694,7 @@ class SystemMetricsValueStream(ValueStream):
sensor_label: str = "",
poll_interval: float = 1.0,
smoothing: float = 0.0,
normalize: bool = True,
):
self._metric = metric
self._min_val = min_value
@@ -1683,6 +1704,7 @@ class SystemMetricsValueStream(ValueStream):
self._sensor_label = sensor_label
self._poll_interval = max(0.1, poll_interval)
self._smoothing = smoothing
self._normalize_enabled = normalize
self._prev_value: float | None = None
self._raw_value: float | None = None
self._last_poll: float = 0.0
@@ -1721,10 +1743,16 @@ class SystemMetricsValueStream(ValueStream):
raw = self._read_metric()
self._raw_value = raw
# Normalize
if self._normalize_enabled:
normalized = self._normalize(raw)
else:
# Skip the rescale: treat the raw reading as already a 01 fraction
# (finite-safe clamp). The un-clamped reading stays on
# get_raw_value(); get_value() never leaves [0, 1].
normalized = clamp01(raw)
# EMA smoothing
# EMA smoothing — both branches output [0, 1], so _prev_value is always
# normalized and a live ``normalize`` flip never blends raw vs fraction.
if self._smoothing > 0.0 and self._prev_value is not None:
normalized = self._smoothing * self._prev_value + (1.0 - self._smoothing) * normalized
@@ -1772,6 +1800,7 @@ class SystemMetricsValueStream(ValueStream):
self._sensor_label = source.sensor_label
self._poll_interval = max(0.1, source.poll_interval)
self._smoothing = source.smoothing
self._normalize_enabled = source.normalize
# ---------------------------------------------------------------------------
@@ -15,7 +15,7 @@ import time
from typing import TYPE_CHECKING
from ledgrab.core.processing.value_stream import ValueStream
from ledgrab.utils import get_logger
from ledgrab.utils import clamp01, get_logger
if TYPE_CHECKING:
from ledgrab.core.game_integration.event_bus import GameEventBus
@@ -41,6 +41,7 @@ class GameEventValueStream(ValueStream):
smoothing: float = 0.0,
default_value: float = 0.5,
timeout: float = 5.0,
normalize: bool = True,
event_bus: "GameEventBus" | None = None,
) -> None:
self._event_type = event_type
@@ -49,10 +50,15 @@ class GameEventValueStream(ValueStream):
self._smoothing = max(0.0, min(1.0, smoothing))
self._default_value = max(0.0, min(1.0, default_value))
self._timeout = max(0.0, timeout)
# When False, skip the min/max rescale: the [0,1] output clamps the raw
# value as-is. get_value() stays in [0,1] either way; the un-clamped
# value is exposed via get_raw_value() for templates/automations.
self._normalize_enabled = normalize
self._event_bus = event_bus
self._lock = threading.Lock()
self._current_value: float = self._default_value
self._current_raw: float | None = None
self._last_event_time: float | None = None
self._subscription_id: str | None = None
self._has_received_event: bool = False
@@ -82,11 +88,18 @@ class GameEventValueStream(ValueStream):
self._subscription_id = None
with self._lock:
self._current_value = self._default_value
self._current_raw = None
self._last_event_time = None
self._has_received_event = False
def get_value(self) -> float:
"""Return current normalized value (0.0-1.0), or default if timed out."""
"""Return current value in [0,1], or default_value if timed out.
Always in [0,1]: ``_current_value`` holds the smoothed output computed
at event time under the active ``normalize`` mode (rescaled, or the raw
value clamped into [0,1]). ``default_value`` is itself in [0,1], so the
timeout fallback is valid in both modes.
"""
with self._lock:
if not self._has_received_event:
return self._default_value
@@ -98,6 +111,15 @@ class GameEventValueStream(ValueStream):
return self._current_value
def get_raw_value(self) -> float | None:
"""Return the last raw game value before normalization.
``None`` until the first event arrives (mirrors HA/HTTP/SystemMetrics).
Exposes the un-clamped magnitude to template ``raw[]`` and automations.
"""
with self._lock:
return self._current_raw
def get_color(self) -> tuple:
"""Game event value source only provides scalars, not colors."""
raise NotImplementedError("GameEventValueStream does not produce colors")
@@ -115,6 +137,7 @@ class GameEventValueStream(ValueStream):
self._smoothing = max(0.0, min(1.0, source.smoothing))
self._default_value = max(0.0, min(1.0, source.default_value))
self._timeout = max(0.0, source.timeout)
self._normalize_enabled = source.normalize
def _on_event(self, event: "GameEvent") -> None:
"""EventBus callback — normalize and apply smoothing.
@@ -122,14 +145,17 @@ class GameEventValueStream(ValueStream):
Called from the publisher's thread; must be thread-safe.
"""
raw_value = event.value
normalized = self._normalize(raw_value)
# Output is always in [0,1]: rescale via min/max, or (normalize off)
# clamp the raw value as-is. The un-clamped raw is kept for get_raw_value().
out = self._normalize(raw_value) if self._normalize_enabled else clamp01(raw_value)
with self._lock:
self._current_raw = raw_value
if self._smoothing > 0.0 and self._has_received_event:
alpha = 1.0 - self._smoothing
normalized = alpha * normalized + self._smoothing * self._current_value
out = alpha * out + self._smoothing * self._current_value
self._current_value = normalized
self._current_value = out
self._last_event_time = time.monotonic()
self._has_received_event = True
+2
View File
@@ -184,6 +184,7 @@ import {
showValueSourceModal, closeValueSourceModal, saveValueSource,
editValueSource, cloneValueSource, deleteValueSource, onValueSourceTypeChange,
onDaylightVSRealTimeChange,
onValueSourceNormalizeChange,
addSchedulePoint,
addAnimatedColor, removeAnimatedColor,
addColorSchedulePoint, removeColorSchedulePoint,
@@ -580,6 +581,7 @@ Object.assign(window, {
deleteValueSource,
onValueSourceTypeChange,
onDaylightVSRealTimeChange,
onValueSourceNormalizeChange,
addSchedulePoint,
addAnimatedColor,
removeAnimatedColor,
@@ -418,6 +418,16 @@ function _onMetricChange(metric: string) {
if (networkFields) networkFields.style.display = networkMetrics.includes(metric) ? '' : 'none';
if (diskFields) diskFields.style.display = metric === 'disk_usage' ? '' : 'none';
if (sensorFields) sensorFields.style.display = sensorMetrics.includes(metric) ? '' : 'none';
// The Normalize toggle only makes sense where a Min/Max range is shown
// (rangeMetrics). Percent/network/disk metrics have a fixed natural→0-1
// mapping in their reader, so "clamp the raw value as-is" would just
// saturate them; hide the toggle and force normalization on for those.
const normGroup = document.getElementById('value-source-sysmetric-normalize-group') as HTMLElement | null;
const normCb = document.getElementById('value-source-sysmetric-normalize') as HTMLInputElement | null;
const showNorm = rangeMetrics.includes(metric);
if (normGroup) normGroup.style.display = showNorm ? '' : 'none';
if (!showNorm && normCb) normCb.checked = true;
_syncVsNormalizeUI();
}
// ── Game Event Value Source helpers ──
@@ -619,6 +629,8 @@ export async function showValueSourceModal(editData: any, presetType: any = null
(document.getElementById('value-source-min-ha-value') as HTMLInputElement).value = String(editData.min_ha_value ?? 0);
(document.getElementById('value-source-max-ha-value') as HTMLInputElement).value = String(editData.max_ha_value ?? 100);
_setSlider('value-source-ha-smoothing', editData.smoothing ?? 0);
(document.getElementById('value-source-ha-normalize') as HTMLInputElement).checked = editData.normalize !== false;
_syncVsNormalizeUI();
} else if (editData.source_type === 'gradient_map') {
_populateGradientInputDropdown(editData.value_source_id || '');
_populateGradientEntityDropdown(editData.gradient_id || '');
@@ -638,7 +650,9 @@ export async function showValueSourceModal(editData: any, presetType: any = null
(document.getElementById('value-source-sensor-label') as HTMLInputElement).value = editData.sensor_label || '';
_setSlider('value-source-poll-interval', editData.poll_interval ?? 1.0);
_setSlider('value-source-sysmetric-smoothing', editData.smoothing ?? 0);
(document.getElementById('value-source-sysmetric-normalize') as HTMLInputElement).checked = editData.normalize !== false;
_onMetricChange(editData.metric || 'cpu_load');
_syncVsNormalizeUI();
} else if (editData.source_type === 'game_event') {
_populateVSGameIntegrationDropdown(editData.game_integration_id || '');
_populateVSGameEventTypeDropdown(editData.event_type || 'health');
@@ -655,6 +669,8 @@ export async function showValueSourceModal(editData: any, presetType: any = null
(document.getElementById('value-source-http-min') as HTMLInputElement).value = String(editData.min_value ?? 0);
(document.getElementById('value-source-http-max') as HTMLInputElement).value = String(editData.max_value ?? 100);
_setSlider('value-source-http-smoothing', editData.smoothing ?? 0);
(document.getElementById('value-source-http-normalize') as HTMLInputElement).checked = editData.normalize !== false;
_syncVsNormalizeUI();
} else if (editData.source_type === 'template') {
(document.getElementById('value-source-template-expression') as HTMLTextAreaElement).value = editData.template || '';
_setSlider('value-source-template-default-value', editData.default_value ?? 0.0);
@@ -714,6 +730,7 @@ export async function showValueSourceModal(editData: any, presetType: any = null
(document.getElementById('value-source-min-ha-value') as HTMLInputElement).value = '0';
(document.getElementById('value-source-max-ha-value') as HTMLInputElement).value = '100';
_setSlider('value-source-ha-smoothing', 0);
(document.getElementById('value-source-ha-normalize') as HTMLInputElement).checked = true;
// Gradient map defaults
(document.getElementById('value-source-gradient-easing') as HTMLSelectElement).value = 'linear';
// CSS extract defaults
@@ -728,6 +745,7 @@ export async function showValueSourceModal(editData: any, presetType: any = null
(document.getElementById('value-source-sensor-label') as HTMLInputElement).value = '';
_setSlider('value-source-poll-interval', 1.0);
_setSlider('value-source-sysmetric-smoothing', 0);
(document.getElementById('value-source-sysmetric-normalize') as HTMLInputElement).checked = true;
// HTTP value source defaults
const httpJsonPath = document.getElementById('value-source-http-json-path') as HTMLInputElement | null;
if (httpJsonPath) httpJsonPath.value = '';
@@ -738,6 +756,9 @@ export async function showValueSourceModal(editData: any, presetType: any = null
const httpMax = document.getElementById('value-source-http-max') as HTMLInputElement | null;
if (httpMax) httpMax.value = '100';
_setSlider('value-source-http-smoothing', 0);
const httpNormalize = document.getElementById('value-source-http-normalize') as HTMLInputElement | null;
if (httpNormalize) httpNormalize.checked = true;
_syncVsNormalizeUI();
// Template value source defaults
const tmplExpr = document.getElementById('value-source-template-expression') as HTMLTextAreaElement | null;
if (tmplExpr) tmplExpr.value = '';
@@ -874,6 +895,35 @@ function _syncDaylightVSSpeedVisibility() {
(document.getElementById('value-source-daylight-speed-group') as HTMLElement).style.display = rt ? 'none' : '';
}
// ── Normalize toggle (magnitude sources: ha_entity / system_metrics / http) ──
export function onValueSourceNormalizeChange() {
_syncVsNormalizeUI();
}
// Grey out + disable the Min/Max range inputs when a magnitude source is in
// clamp-passthrough mode (normalize off) — they are inert there. Only the
// visible section's pair is relevant, but syncing all three is harmless.
function _syncVsNormalizeUI() {
const groups: Array<[string, string[]]> = [
['value-source-ha-normalize', ['value-source-min-ha-value', 'value-source-max-ha-value']],
['value-source-sysmetric-normalize', ['value-source-sysmetric-min', 'value-source-sysmetric-max']],
['value-source-http-normalize', ['value-source-http-min', 'value-source-http-max']],
];
for (const [cbId, inputIds] of groups) {
const cb = document.getElementById(cbId) as HTMLInputElement | null;
if (!cb) continue;
const on = cb.checked;
for (const id of inputIds) {
const inp = document.getElementById(id) as HTMLInputElement | null;
if (!inp) continue;
inp.disabled = !on;
const fg = inp.closest('.form-group') as HTMLElement | null;
if (fg) fg.style.opacity = on ? '' : '0.45';
}
}
}
// ── Save ──────────────────────────────────────────────────────
export async function saveValueSource() {
@@ -948,6 +998,7 @@ export async function saveValueSource() {
payload.min_ha_value = parseFloat((document.getElementById('value-source-min-ha-value') as HTMLInputElement).value);
payload.max_ha_value = parseFloat((document.getElementById('value-source-max-ha-value') as HTMLInputElement).value);
payload.smoothing = parseFloat((document.getElementById('value-source-ha-smoothing') as HTMLInputElement).value);
payload.normalize = (document.getElementById('value-source-ha-normalize') as HTMLInputElement).checked;
if (!payload.ha_source_id) {
errorEl.textContent = t('value_source.ha_source') + ' required';
errorEl.style.display = '';
@@ -991,6 +1042,7 @@ export async function saveValueSource() {
payload.sensor_label = (document.getElementById('value-source-sensor-label') as HTMLInputElement).value;
payload.poll_interval = parseFloat((document.getElementById('value-source-poll-interval') as HTMLInputElement).value) || 1.0;
payload.smoothing = parseFloat((document.getElementById('value-source-sysmetric-smoothing') as HTMLInputElement).value) || 0;
payload.normalize = (document.getElementById('value-source-sysmetric-normalize') as HTMLInputElement).checked;
} else if (sourceType === 'game_event') {
payload.game_integration_id = (document.getElementById('value-source-game-integration') as HTMLSelectElement).value;
payload.event_type = (document.getElementById('value-source-game-event-type') as HTMLSelectElement).value;
@@ -1011,6 +1063,7 @@ export async function saveValueSource() {
payload.min_value = parseFloat((document.getElementById('value-source-http-min') as HTMLInputElement).value) || 0;
payload.max_value = parseFloat((document.getElementById('value-source-http-max') as HTMLInputElement).value) || 100;
payload.smoothing = parseFloat((document.getElementById('value-source-http-smoothing') as HTMLInputElement).value) || 0;
payload.normalize = (document.getElementById('value-source-http-normalize') as HTMLInputElement).checked;
if (!payload.http_endpoint_id) {
errorEl.textContent = t('value_source.http.endpoint_required');
errorEl.style.display = '';
+1
View File
@@ -331,6 +331,7 @@ startTargetOverlay: (...args: any[]) => any;
deleteValueSource: (...args: any[]) => any;
onValueSourceTypeChange: (...args: any[]) => any;
onDaylightVSRealTimeChange: (...args: any[]) => any;
onValueSourceNormalizeChange: (...args: any[]) => any;
addSchedulePoint: (...args: any[]) => any;
addAnimatedColor: (...args: any[]) => any;
removeAnimatedColor: (...args: any[]) => any;
@@ -126,6 +126,7 @@ export interface HAEntityValueSource extends ValueSourceBase {
min_ha_value: number;
max_ha_value: number;
smoothing: number;
normalize: boolean;
}
export interface GradientMapValueSource extends ValueSourceBase {
@@ -155,6 +156,7 @@ export interface SystemMetricsValueSource extends ValueSourceBase {
sensor_label: string;
poll_interval: number;
smoothing: number;
normalize: boolean;
}
export interface GameEventValueSource extends ValueSourceBase {
@@ -178,6 +180,7 @@ export interface HTTPValueSource extends ValueSourceBase {
min_value: number;
max_value: number;
smoothing: number;
normalize: boolean;
}
export interface TemplateValueSource extends ValueSourceBase {
@@ -1985,6 +1985,8 @@
"value_source.daylight.speed.hint": "Cycle speed multiplier. 1.0 = full day/night cycle in ~4 minutes. Higher values cycle faster.",
"value_source.daylight.use_real_time": "Use Real Time:",
"value_source.daylight.use_real_time.hint": "When enabled, the value follows the actual time of day. Speed is ignored.",
"value_source.normalize": "Normalize to 01:",
"value_source.normalize.hint": "On: rescale the raw value to 01 using Min/Max. Off: the value is clamped to 01 as-is (for sources that already report a 01 fraction). The raw value stays available to templates (raw[name]) and automations.",
"value_source.daylight.enable_real_time": "Follow wall clock",
"value_source.daylight.latitude": "Latitude:",
"value_source.daylight.latitude.hint": "Your geographic latitude (-90 to 90). Steepens or flattens the sunrise/sunset edges of the cycle.",
@@ -1845,6 +1845,8 @@
"value_source.daylight.speed.hint": "Множитель скорости цикла. 1.0 = полный цикл день/ночь за ~4 минуты.",
"value_source.daylight.use_real_time": "Реальное время:",
"value_source.daylight.use_real_time.hint": "Значение следует за реальным временем суток. Скорость игнорируется.",
"value_source.normalize": "Нормализовать в 0–1:",
"value_source.normalize.hint": "Вкл.: масштабировать сырое значение в 0–1 по Min/Max. Выкл.: значение ограничивается диапазоном 0–1 как есть (для источников, уже выдающих долю 0–1). Сырое значение остаётся доступным в шаблонах (raw[name]) и автоматизациях.",
"value_source.daylight.enable_real_time": "Следовать за часами",
"value_source.daylight.latitude": "Широта:",
"value_source.daylight.latitude.hint": "Географическая широта (-90 до 90). Делает переходы рассвета и заката круче или плавнее.",
@@ -1841,6 +1841,8 @@
"value_source.daylight.speed.hint": "周期速度倍率。1.0 = 完整日夜周期约4分钟。",
"value_source.daylight.use_real_time": "使用实时:",
"value_source.daylight.use_real_time.hint": "启用后,数值跟随实际时间。速度设置将被忽略。",
"value_source.normalize": "归一化到 01",
"value_source.normalize.hint": "开启:使用最小/最大值将原始值缩放到 0–1。关闭:直接将数值钳制到 0–1(适用于本身就输出 0–1 比例的来源)。原始值始终可在模板(raw[name])和自动化中使用。",
"value_source.daylight.enable_real_time": "跟随系统时钟",
"value_source.daylight.latitude": "纬度:",
"value_source.daylight.latitude.hint": "地理纬度(-90到90)。使日出/日落过渡更陡峭或更平缓。",
@@ -396,6 +396,11 @@ class HAEntityValueSource(ValueSource):
min_ha_value: float = 0.0 # raw HA value mapped to output 0.0
max_ha_value: float = 100.0 # raw HA value mapped to output 1.0
smoothing: float = 0.0 # EMA smoothing factor (0.01.0)
# When False, skip the min/max rescale: get_value() clamps the raw value
# into [0,1] as-is (for entities that already report a 01 fraction). The
# un-clamped magnitude stays available via get_raw_value(). get_value() is
# always in [0,1] regardless, so the normalized scalar bus is preserved.
normalize: bool = True
def to_dict(self) -> dict:
d = super().to_dict()
@@ -405,6 +410,7 @@ class HAEntityValueSource(ValueSource):
d["min_ha_value"] = self.min_ha_value
d["max_ha_value"] = self.max_ha_value
d["smoothing"] = self.smoothing
d["normalize"] = self.normalize
return d
@classmethod
@@ -421,6 +427,7 @@ class HAEntityValueSource(ValueSource):
data.get("max_ha_value") if data.get("max_ha_value") is not None else 100.0
),
smoothing=float(data.get("smoothing") or 0.0),
normalize=bool(data.get("normalize", True)),
)
@@ -519,6 +526,13 @@ class GameEventValueSource(ValueSource):
smoothing: float = 0.0 # EMA smoothing factor (0.0-1.0)
default_value: float = 0.5 # value when timed out or no events
timeout: float = 5.0 # seconds before reverting to default
# When False, skip the min/max rescale: get_value() clamps the raw game
# value into [0,1] as-is. The un-clamped value stays available via
# get_raw_value(). get_value() is always in [0,1]. See HAEntityValueSource.
# NOTE: game_event has no value-source CRUD schema/API (constructed only via
# the game-integration path), so this flag is settable only there, not over
# POST/PUT /value-sources.
normalize: bool = True
def to_dict(self) -> dict:
d = super().to_dict()
@@ -529,6 +543,7 @@ class GameEventValueSource(ValueSource):
d["smoothing"] = self.smoothing
d["default_value"] = self.default_value
d["timeout"] = self.timeout
d["normalize"] = self.normalize
return d
@classmethod
@@ -548,6 +563,7 @@ class GameEventValueSource(ValueSource):
data.get("default_value") if data.get("default_value") is not None else 0.5
),
timeout=float(data.get("timeout") if data.get("timeout") is not None else 5.0),
normalize=bool(data.get("normalize", True)),
)
@@ -567,6 +583,10 @@ class SystemMetricsValueSource(ValueSource):
sensor_label: str = "" # for cpu_temp/fan_speed (empty = first available)
poll_interval: float = 1.0 # seconds between reads
smoothing: float = 0.0 # EMA smoothing factor
# When False, skip the min/max rescale: get_value() clamps the raw metric
# into [0,1] as-is. The un-clamped reading stays available via
# get_raw_value(). get_value() is always in [0,1]. See HAEntityValueSource.
normalize: bool = True
def to_dict(self) -> dict:
d = super().to_dict()
@@ -578,6 +598,7 @@ class SystemMetricsValueSource(ValueSource):
d["sensor_label"] = self.sensor_label
d["poll_interval"] = self.poll_interval
d["smoothing"] = self.smoothing
d["normalize"] = self.normalize
return d
@classmethod
@@ -596,6 +617,7 @@ class SystemMetricsValueSource(ValueSource):
sensor_label=data.get("sensor_label") or "",
poll_interval=float(data.get("poll_interval") or 1.0),
smoothing=float(data.get("smoothing") or 0.0),
normalize=bool(data.get("normalize", True)),
)
@@ -622,6 +644,11 @@ class HTTPValueSource(ValueSource):
min_value: float = 0.0 # raw value → 0.0
max_value: float = 100.0 # raw value → 1.0
smoothing: float = 0.0 # EMA smoothing on the normalized output
# When False, skip the min/max rescale: get_value() coerces the extracted
# value to float and clamps it into [0,1] as-is. The verbatim extracted
# value (which may be str/bool) stays available via get_raw_value().
# get_value() is always a float in [0,1]. See HAEntityValueSource.
normalize: bool = True
def to_dict(self) -> dict:
d = super().to_dict()
@@ -631,6 +658,7 @@ class HTTPValueSource(ValueSource):
d["min_value"] = self.min_value
d["max_value"] = self.max_value
d["smoothing"] = self.smoothing
d["normalize"] = self.normalize
return d
@classmethod
@@ -645,6 +673,7 @@ class HTTPValueSource(ValueSource):
min_value=float(data.get("min_value") or 0.0),
max_value=float(data.get("max_value") if data.get("max_value") is not None else 100.0),
smoothing=float(data.get("smoothing") or 0.0),
normalize=bool(data.get("normalize", True)),
)
@@ -238,6 +238,7 @@ def _build_ha_entity(
min_ha_value: float | None = None,
max_ha_value: float | None = None,
smoothing: float | None = None,
normalize: bool | None = None,
**_,
) -> ValueSource:
if not ha_source_id:
@@ -252,6 +253,7 @@ def _build_ha_entity(
min_ha_value=min_ha_value if min_ha_value is not None else 0.0,
max_ha_value=max_ha_value if max_ha_value is not None else 100.0,
smoothing=smoothing if smoothing is not None else 0.0,
normalize=normalize if normalize is not None else True,
)
@@ -302,6 +304,7 @@ def _build_system_metrics(
sensor_label: str | None = None,
poll_interval: float | None = None,
smoothing: float | None = None,
normalize: bool | None = None,
**_,
) -> ValueSource:
m = metric or "cpu_load"
@@ -317,6 +320,7 @@ def _build_system_metrics(
sensor_label=sensor_label or "",
poll_interval=poll_interval if poll_interval is not None else 1.0,
smoothing=smoothing if smoothing is not None else 0.0,
normalize=normalize if normalize is not None else True,
)
@@ -348,6 +352,7 @@ def _build_http(
min_value: float | None = None,
max_value: float | None = None,
smoothing: float | None = None,
normalize: bool | None = None,
**_,
) -> ValueSource:
if not http_endpoint_id:
@@ -363,6 +368,7 @@ def _build_http(
min_value=min_value if min_value is not None else 0.0,
max_value=max_value if max_value is not None else 100.0,
smoothing=smoothing if smoothing is not None else 0.0,
normalize=normalize if normalize is not None else True,
)
@@ -644,6 +650,7 @@ def _apply_ha_entity(
min_ha_value=None,
max_ha_value=None,
smoothing=None,
normalize=None,
**_,
) -> None:
if ha_source_id is not None:
@@ -658,6 +665,8 @@ def _apply_ha_entity(
source.max_ha_value = max_ha_value
if smoothing is not None:
source.smoothing = smoothing
if normalize is not None:
source.normalize = normalize
def _apply_gradient_map(
@@ -705,6 +714,7 @@ def _apply_system_metrics(
sensor_label=None,
poll_interval=None,
smoothing=None,
normalize=None,
**_,
) -> None:
if metric is not None:
@@ -725,6 +735,8 @@ def _apply_system_metrics(
source.poll_interval = poll_interval
if smoothing is not None:
source.smoothing = smoothing
if normalize is not None:
source.normalize = normalize
def _apply_game_event(source: GameEventValueSource, **_) -> None:
@@ -742,6 +754,7 @@ def _apply_http(
min_value=None,
max_value=None,
smoothing=None,
normalize=None,
**_,
) -> None:
if http_endpoint_id is not None:
@@ -758,6 +771,8 @@ def _apply_http(
source.max_value = max_value
if smoothing is not None:
source.smoothing = smoothing
if normalize is not None:
source.normalize = normalize
def _apply_template(
@@ -395,6 +395,18 @@
<input type="text" id="value-source-attribute" placeholder="">
</div>
<div class="form-group">
<div class="label-row">
<label for="value-source-ha-normalize" data-i18n="value_source.normalize">Normalize to 01:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?" data-i18n-aria-label="aria.hint">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="value_source.normalize.hint">On: rescale the raw value to 01 using Min/Max below. Off: the value is clamped to 01 as-is (for entities that already report a 01 fraction). The raw value is always available to templates (raw[name]) and automations.</small>
<label class="settings-toggle">
<input type="checkbox" id="value-source-ha-normalize" checked onchange="onValueSourceNormalizeChange()">
<span class="settings-toggle-slider"></span>
</label>
</div>
<div class="form-group">
<div class="label-row">
<label for="value-source-min-ha-value"><span data-i18n="value_source.min_ha_value">Min HA Value:</span> <span id="value-source-min-ha-value-display">0</span></label>
@@ -586,6 +598,18 @@
</select>
</div>
<div class="form-group" id="value-source-sysmetric-normalize-group">
<div class="label-row">
<label for="value-source-sysmetric-normalize" data-i18n="value_source.normalize">Normalize to 01:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?" data-i18n-aria-label="aria.hint">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="value_source.normalize.hint">On: rescale the raw value to 01 using Min/Max below. Off: the value is clamped to 01 as-is (for metrics that already report a 01 fraction). The raw value is always available to templates (raw[name]) and automations.</small>
<label class="settings-toggle">
<input type="checkbox" id="value-source-sysmetric-normalize" checked onchange="onValueSourceNormalizeChange()">
<span class="settings-toggle-slider"></span>
</label>
</div>
<div id="value-source-sysmetric-range" style="display:none">
<div class="form-group">
<label for="value-source-sysmetric-min"><span data-i18n="value_source.sysmetric.min">Min Value:</span></label>
@@ -747,6 +771,17 @@
<summary data-i18n="value_source.http.modulator.summary">Modulator mapping (optional)</summary>
<div class="form-collapse-body">
<small class="input-hint" data-i18n="value_source.http.modulator.hint">Only used when this source drives brightness or color. Automation rules read the raw extracted value and ignore these settings.</small>
<div class="form-group">
<div class="label-row">
<label for="value-source-http-normalize" data-i18n="value_source.normalize">Normalize to 01:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?" data-i18n-aria-label="aria.hint">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="value_source.normalize.hint">On: rescale the raw value to 01 using Min/Max below. Off: the value is clamped to 01 as-is (for endpoints that already return a 01 fraction). The raw value is always available to templates (raw[name]) and automations.</small>
<label class="settings-toggle">
<input type="checkbox" id="value-source-http-normalize" checked onchange="onValueSourceNormalizeChange()">
<span class="settings-toggle-slider"></span>
</label>
</div>
<div class="form-group">
<div class="label-row">
<label for="value-source-http-min" data-i18n="value_source.http.min_value">Min Value:</label>
+2
View File
@@ -3,6 +3,7 @@
from .file_ops import atomic_write_json, read_upload_capped
from .logger import setup_logging, get_logger
from .monitor_names import get_monitor_names, get_monitor_name, get_monitor_refresh_rates
from .numeric import clamp01
from .timer import high_resolution_timer
from .log_broadcaster import broadcaster as log_broadcaster, install_broadcast_handler
from .url_scheme import infer_http_scheme
@@ -15,6 +16,7 @@ __all__ = [
"get_monitor_names",
"get_monitor_name",
"get_monitor_refresh_rates",
"clamp01",
"high_resolution_timer",
"log_broadcaster",
"install_broadcast_handler",
+24
View File
@@ -0,0 +1,24 @@
"""Small numeric helpers shared across the processing layer."""
from __future__ import annotations
import math
def clamp01(x: float, default: float = 0.0) -> float:
"""Clamp ``x`` into the unit interval ``[0.0, 1.0]``, finite-safe.
NaN/inf are rejected to ``default`` *before* clamping they are valid
floats, so ``max/min`` alone would silently pass them through (and an
``int(base * inf)`` cast downstream raises OverflowError, ``int(nan)``
raises ValueError). Use this at any boundary that feeds a value into a
fixed-point / uint brightness multiply where a non-finite or out-of-range
value would corrupt or crash the math.
"""
if not math.isfinite(x):
return default
if x < 0.0:
return 0.0
if x > 1.0:
return 1.0
return x
@@ -0,0 +1,380 @@
"""Tests for the optional-normalization feature on magnitude value sources.
Covers, for HA / HTTP / system_metrics / game_event:
* the ``normalize`` flag round-trips through to_dict/from_dict and defaults to
True for old rows that predate the field,
* ``normalize=False`` makes get_value() a finite-safe clamp passthrough while
get_value() stays in [0, 1] (the normalized scalar-bus contract), and
* the raw magnitude remains available via get_raw_value() (added to
game_event, which had no raw channel before).
Plus a unit test for the shared finite-safe ``clamp01`` helper.
"""
import math
import pytest
from ledgrab.core.game_integration.event_bus import GameEventBus
from ledgrab.core.game_integration.events import GameEvent
from ledgrab.core.processing.value_kinds import ValueStreamDeps, build_stream
from ledgrab.core.processing.value_stream import (
HAEntityValueStream,
HTTPValueStream,
SystemMetricsValueStream,
)
from ledgrab.core.value_sources.game_event_value_source import GameEventValueStream
from ledgrab.storage.value_source import ValueSource
from ledgrab.utils import clamp01
# ---------------------------------------------------------------------------
# clamp01 finite-safe helper
# ---------------------------------------------------------------------------
class TestClamp01:
def test_in_range_passthrough(self):
assert clamp01(0.0) == 0.0
assert clamp01(0.42) == pytest.approx(0.42)
assert clamp01(1.0) == 1.0
def test_out_of_range_clamped(self):
assert clamp01(-2.5) == 0.0
assert clamp01(5.0) == 1.0
def test_non_finite_rejected_to_default(self):
assert clamp01(float("nan")) == 0.0
assert clamp01(float("inf")) == 0.0
assert clamp01(float("-inf")) == 0.0
# custom default is honoured
assert clamp01(float("nan"), default=0.5) == 0.5
# ---------------------------------------------------------------------------
# Round-trip persistence of the normalize flag
# ---------------------------------------------------------------------------
def _base(source_type: str, **extra) -> dict:
return {
"id": f"vs_{source_type}",
"name": source_type,
"source_type": source_type,
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
**extra,
}
class TestNormalizeRoundTrip:
@pytest.mark.parametrize(
"source_type,extra",
[
("ha_entity", {"ha_source_id": "ha1", "entity_id": "sensor.temp"}),
("http", {"http_endpoint_id": "ep1"}),
("system_metrics", {"metric": "cpu_load"}),
("game_event", {}),
],
)
def test_normalize_false_round_trips(self, source_type, extra):
src = ValueSource.from_dict(_base(source_type, normalize=False, **extra))
assert src.normalize is False
assert src.to_dict()["normalize"] is False
restored = ValueSource.from_dict(src.to_dict())
assert restored.normalize is False
@pytest.mark.parametrize(
"source_type,extra",
[
("ha_entity", {"ha_source_id": "ha1", "entity_id": "sensor.temp"}),
("http", {"http_endpoint_id": "ep1"}),
("system_metrics", {"metric": "cpu_load"}),
("game_event", {}),
],
)
def test_missing_key_defaults_true(self, source_type, extra):
"""An old row written before the field existed reads back normalize=True."""
src = ValueSource.from_dict(_base(source_type, **extra))
assert src.normalize is True
assert src.to_dict()["normalize"] is True
# ---------------------------------------------------------------------------
# HTTP stream — clamp passthrough + verbatim raw
# ---------------------------------------------------------------------------
def _http(normalize: bool, raw) -> HTTPValueStream:
s = HTTPValueStream(
endpoint_id="ep",
json_path="",
interval_s=60,
min_value=0.0,
max_value=100.0,
smoothing=0.0,
normalize=normalize,
)
s._raw_value = raw # bypass the poll loop; exercise get_value() directly
return s
class TestHttpNormalize:
def test_normalize_true_rescales(self):
assert _http(True, 50.0).get_value() == pytest.approx(0.5)
def test_normalize_false_clamps_in_range(self):
assert _http(False, 0.7).get_value() == pytest.approx(0.7)
def test_normalize_false_clamps_out_of_range(self):
assert _http(False, 5.0).get_value() == 1.0
assert _http(False, -3.0).get_value() == 0.0
def test_normalize_false_non_finite_safe(self):
assert _http(False, float("inf")).get_value() == 0.0
assert _http(False, float("nan")).get_value() == 0.0
def test_non_numeric_raw_keeps_get_value_numeric_and_raw_verbatim(self):
s = _http(False, "playing")
assert s.get_value() == 0.0 # float("playing") fails -> safe fallback
assert s.get_raw_value() == "playing" # verbatim for automations/templates
# ---------------------------------------------------------------------------
# HA stream — clamp passthrough via a minimal fake HA manager
# ---------------------------------------------------------------------------
class _FakeState:
def __init__(self, state, attributes=None):
self.state = state
self.attributes = attributes or {}
class _FakeHA:
def __init__(self, state):
self._state = state
def get_state(self, _source_id, _entity_id):
return self._state
def _ha(normalize: bool, state_value: str) -> HAEntityValueStream:
return HAEntityValueStream(
ha_source_id="ha1",
entity_id="sensor.temp",
attribute="",
min_ha_value=0.0,
max_ha_value=100.0,
smoothing=0.0,
normalize=normalize,
ha_manager=_FakeHA(_FakeState(state_value)),
)
class TestHaNormalize:
def test_normalize_true_rescales(self):
assert _ha(True, "50").get_value() == pytest.approx(0.5)
def test_normalize_false_clamps_and_keeps_raw(self):
s = _ha(False, "21.5") # e.g. °C — out of [0,1]
assert s.get_value() == 1.0 # clamped
assert s.get_raw_value() == pytest.approx(21.5) # un-clamped magnitude
def test_normalize_false_in_range_fraction(self):
s = _ha(False, "0.3")
assert s.get_value() == pytest.approx(0.3)
# ---------------------------------------------------------------------------
# system_metrics stream — clamp passthrough (raw reader monkeypatched)
# ---------------------------------------------------------------------------
class TestSystemMetricsNormalize:
def test_normalize_false_clamps_and_keeps_raw(self):
s = SystemMetricsValueStream(
metric="cpu_load", min_value=0.0, max_value=100.0, normalize=False
)
s._read_metric = lambda: 65.0 # type: ignore[method-assign]
assert s.get_value() == 1.0 # 65 clamped to 1.0
assert s.get_raw_value() == pytest.approx(65.0)
def test_normalize_false_in_range_fraction(self):
s = SystemMetricsValueStream(
metric="cpu_load", min_value=0.0, max_value=100.0, normalize=False
)
s._read_metric = lambda: 0.25 # type: ignore[method-assign]
assert s.get_value() == pytest.approx(0.25)
# ---------------------------------------------------------------------------
# game_event stream — new raw channel + normalize flag
# ---------------------------------------------------------------------------
def _evt(value: float) -> GameEvent:
return GameEvent(adapter_id="t", event_type="health", value=value)
class TestGameEventNormalizeAndRaw:
def test_raw_value_none_before_first_event(self):
bus = GameEventBus()
stream = GameEventValueStream(event_type="health", event_bus=bus)
stream.start()
assert stream.get_raw_value() is None
stream.stop()
def test_raw_value_exposed_after_event(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health", min_game_value=0.0, max_game_value=100.0, event_bus=bus
)
stream.start()
bus.publish(_evt(73.0))
assert stream.get_value() == pytest.approx(0.73) # normalized output
assert stream.get_raw_value() == pytest.approx(73.0) # un-clamped raw
stream.stop()
def test_normalize_false_clamps_output_keeps_raw(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
normalize=False,
event_bus=bus,
)
stream.start()
bus.publish(_evt(50.0)) # 50 is > 1 -> clamped to 1.0 in passthrough mode
assert stream.get_value() == 1.0
assert stream.get_raw_value() == pytest.approx(50.0)
stream.stop()
def test_normalize_false_in_range_fraction(self):
bus = GameEventBus()
stream = GameEventValueStream(event_type="health", normalize=False, event_bus=bus)
stream.start()
bus.publish(_evt(0.4))
assert stream.get_value() == pytest.approx(0.4)
stream.stop()
def test_get_value_always_finite_in_passthrough(self):
bus = GameEventBus()
stream = GameEventValueStream(event_type="health", normalize=False, event_bus=bus)
stream.start()
bus.publish(_evt(math.inf))
assert stream.get_value() == 0.0 # finite-safe clamp
stream.stop()
# ---------------------------------------------------------------------------
# Live update_source() flip of the normalize flag on a running stream
# ---------------------------------------------------------------------------
# The design's load-bearing invariant: flipping normalize live never blends a
# raw magnitude against a fraction in the EMA, because BOTH modes emit [0,1].
# These tests lock that in (a future regression dropping a stream's
# _normalize_enabled reassignment would otherwise pass every other test).
class TestLiveNormalizeFlip:
def test_http_flip_true_to_false_and_back(self):
s = _http(True, 50.0)
assert s.get_value() == pytest.approx(0.5) # rescaled
s.update_source(
ValueSource.from_dict(
_base(
"http", http_endpoint_id="ep", min_value=0.0, max_value=100.0, normalize=False
)
)
)
assert s.get_value() == 1.0 # now clamp-passthrough of 50.0
s.update_source(
ValueSource.from_dict(
_base("http", http_endpoint_id="ep", min_value=0.0, max_value=100.0, normalize=True)
)
)
assert s.get_value() == pytest.approx(0.5) # rescaled again, no stale-cache blend
def test_ha_flip_true_to_false(self):
s = _ha(True, "50")
assert s.get_value() == pytest.approx(0.5)
s.update_source(
ValueSource.from_dict(
_base(
"ha_entity",
ha_source_id="ha1",
entity_id="sensor.temp",
min_ha_value=0.0,
max_ha_value=100.0,
normalize=False,
)
)
)
assert s.get_value() == 1.0
def test_system_metrics_flip_true_to_false(self):
s = SystemMetricsValueStream(
metric="cpu_load", min_value=0.0, max_value=100.0, normalize=True
)
s._read_metric = lambda: 65.0 # type: ignore[method-assign]
first = s.get_value() # rescaled via the cpu_load spec
assert 0.0 <= first <= 1.0
s.update_source(
ValueSource.from_dict(
_base(
"system_metrics",
metric="cpu_load",
min_value=0.0,
max_value=100.0,
normalize=False,
)
)
)
s._read_metric = lambda: 65.0 # type: ignore[method-assign]
s._prev_value = None # force a fresh poll (bypass the poll-interval cache)
assert s.get_value() == 1.0 # clamp-passthrough of 65.0
def test_game_event_flip_true_to_false(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
normalize=True,
event_bus=bus,
)
stream.start()
bus.publish(_evt(50.0))
assert stream.get_value() == pytest.approx(0.5)
stream.update_source(
ValueSource.from_dict(
_base("game_event", min_game_value=0.0, max_game_value=100.0, normalize=False)
)
)
bus.publish(_evt(50.0))
assert stream.get_value() == 1.0 # clamp-passthrough
assert stream.get_raw_value() == pytest.approx(50.0)
stream.stop()
# ---------------------------------------------------------------------------
# build_stream() forwarding — game_event's SOLE wiring path (no CRUD/schema)
# ---------------------------------------------------------------------------
class TestBuildStreamForwarding:
def test_game_event_builder_forwards_normalize(self):
src = ValueSource.from_dict(
_base("game_event", min_game_value=0.0, max_game_value=100.0, normalize=False)
)
bus = GameEventBus()
deps = ValueStreamDeps(value_stream_manager=None, event_bus=bus)
stream = build_stream(src, deps)
assert isinstance(stream, GameEventValueStream)
assert stream._normalize_enabled is False # forwarded from the dataclass
stream.start()
bus.publish(_evt(50.0))
assert stream.get_value() == 1.0 # passthrough clamp, not 0.5 rescale
assert stream.get_raw_value() == pytest.approx(50.0)
stream.stop()
@@ -7,7 +7,10 @@ from ledgrab.storage.value_source import (
AnimatedValueSource,
AudioValueSource,
DaylightValueSource,
HAEntityValueSource,
HTTPValueSource,
StaticValueSource,
SystemMetricsValueSource,
ValueSource,
)
from ledgrab.storage.value_source_store import ValueSourceStore
@@ -238,6 +241,50 @@ class TestValueSourceStoreCRUD:
assert updated.speed == 30.0
# ---------------------------------------------------------------------------
# normalize flag — full store create/update path (factory builder + applier)
# ---------------------------------------------------------------------------
class TestValueSourceNormalizeStoreRoundTrip:
"""The ``normalize`` flag must survive the real write path —
create_source -> CREATE_BUILDERS and update_source -> UPDATE_APPLIERS
not just dataclass from_dict/to_dict. A dropped builder/applier line would
silently revert/ignore the flag while every dataclass-level test stayed
green, so these exercise the factory layer end-to-end (game_event is
intentionally excluded it has no store/CRUD path).
"""
_CASES = [
("ha_entity", {"ha_source_id": "ha1", "entity_id": "sensor.temp"}, HAEntityValueSource),
("http", {"http_endpoint_id": "ep1"}, HTTPValueSource),
("system_metrics", {"metric": "cpu_load"}, SystemMetricsValueSource),
]
@pytest.mark.parametrize("source_type,kwargs,cls", _CASES)
def test_create_normalize_false_persists(self, store, source_type, kwargs, cls):
s = store.create_source(
name=f"{source_type}_n", source_type=source_type, normalize=False, **kwargs
)
assert isinstance(s, cls)
assert s.normalize is False
# Re-read exercises the SQLite blob persistence round-trip as well.
assert store.get_source(s.id).normalize is False
@pytest.mark.parametrize("source_type,kwargs,cls", _CASES)
def test_create_normalize_defaults_true(self, store, source_type, kwargs, cls):
s = store.create_source(name=f"{source_type}_d", source_type=source_type, **kwargs)
assert s.normalize is True
@pytest.mark.parametrize("source_type,kwargs,cls", _CASES)
def test_update_normalize_flip(self, store, source_type, kwargs, cls):
s = store.create_source(
name=f"{source_type}_u", source_type=source_type, normalize=False, **kwargs
)
assert store.update_source(s.id, normalize=True).normalize is True
assert store.update_source(s.id, normalize=False).normalize is False
# ---------------------------------------------------------------------------
# Name uniqueness
# ---------------------------------------------------------------------------