refactor(value-source): per-type factories for create / update dispatch

ValueSourceStore.create_source used to be a ~260-line if/elif chain
over 14 source_type strings; update_source did the same dance again
with 14 isinstance branches (audit finding C7 store-side). Each
branch duplicated the common-fields scaffold and the per-type
defaulting + validation logic.

Lift each per-type create / update body into a free function in a
new ``storage.value_source_factories`` module:

  * ``CREATE_BUILDERS[source_type]`` — owns defaulting + per-type
    validation (HA needs ha_source_id + entity_id; gradient_map
    needs value_source_id; system_metrics validates against
    VALID_SYSTEM_METRICS; http rejects interval_s < 1; the two
    adaptive_* sub-modes route to the same AdaptiveValueSource
    class with different source_type discriminators).
  * ``UPDATE_APPLIERS[source_type]`` — mirrors the above on the
    update side; ``resolve_ref`` is applied to cross-entity
    references so empty-string clears keep working.
  * ``build_source(...)`` / ``apply_update(source, **kwargs)`` are
    the public entry points the store calls.
  * ``_assert_factory_coverage()`` runs at module import and
    requires BOTH registries to match storage's _VALUE_SOURCE_MAP
    exactly.

The store's ``create_source`` shrinks from ~260 lines to ~25;
``update_source`` from ~200 lines to ~40.

Tests: 14 new tests cover registry coverage in both directions
plus drift assertions, representative builder paths (static /
adaptive_time / adaptive_scene / ha_entity / http / unknown),
the AdaptiveValueSource dual-source-type discriminator, and
several applier paths including ``**_`` swallowing unknown kwargs
and HTTP zero-interval rejection. 47 existing value-source store
tests stay green; 769 storage / core / api tests in aggregate.
Ruff clean.
This commit is contained in:
2026-05-22 23:56:10 +03:00
parent 05f73eedf9
commit 3b8f00e3f9
3 changed files with 1040 additions and 479 deletions
@@ -0,0 +1,740 @@
"""Per-type factories for ValueSource create / update flows.
``ValueSourceStore.create_source`` used to be a ~260-line ``if/elif``
chain over 14 ``source_type`` strings; ``update_source`` did the same
thing again with 14 ``isinstance`` branches (audit finding C7). Adding
a new value-source kind meant editing both chains and remembering to
keep them aligned.
This module replaces both ladders with a single ``CREATE_BUILDERS``
plus a single ``UPDATE_APPLIERS`` dict, both keyed by ``source_type``.
Each builder is a small free function that knows the per-type
construction rules (defaults, validation, sub-mode dispatch for
``adaptive_*``); each applier mutates an existing instance in-place
with the same defaulting / validation rules.
An import-time coverage assertion guarantees both registries stay in
sync with ``storage.value_source._VALUE_SOURCE_MAP`` so a new
``source_type`` cannot ship without both a builder and an applier.
"""
from __future__ import annotations
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional
from ledgrab.storage.utils import resolve_ref
from ledgrab.storage.value_source import (
VALID_SYSTEM_METRICS,
AdaptiveValueSource,
AdaptiveTimeColorValueSource,
AnimatedColorValueSource,
AnimatedValueSource,
AudioValueSource,
CSSExtractValueSource,
DaylightValueSource,
GameEventValueSource,
GradientMapValueSource,
HAEntityValueSource,
HTTPValueSource,
StaticColorValueSource,
StaticValueSource,
SystemMetricsValueSource,
ValueSource,
_VALUE_SOURCE_MAP,
)
CreateBuilder = Callable[..., ValueSource]
UpdateApplier = Callable[..., None]
# ---------------------------------------------------------------------------
# Common kwargs scaffold
# ---------------------------------------------------------------------------
def _common(
sid: str,
name: str,
source_type: str,
now: datetime,
description: Optional[str],
tags: Optional[List[str]],
icon: Optional[str],
icon_color: Optional[str],
) -> dict:
"""Build the kwargs dict shared by every ValueSource subclass __init__."""
return dict(
id=sid,
name=name,
source_type=source_type,
created_at=now,
updated_at=now,
description=description,
tags=tags or [],
icon=icon or "",
icon_color=icon_color or "",
)
# ---------------------------------------------------------------------------
# Create builders — one per source_type
# Each builder takes the common scaffold plus the per-type kwargs from the
# store's CRUD method. Validation that's specific to one type lives here.
# ---------------------------------------------------------------------------
def _build_static(*, common: dict, value: Optional[float] = None, **_) -> ValueSource:
return StaticValueSource(**common, value=value if value is not None else 1.0)
def _build_animated(
*,
common: dict,
waveform: Optional[str] = None,
speed: Optional[float] = None,
min_value: Optional[float] = None,
max_value: Optional[float] = None,
**_,
) -> ValueSource:
return AnimatedValueSource(
**common,
waveform=waveform or "sine",
speed=speed if speed is not None else 10.0,
min_value=min_value if min_value is not None else 0.0,
max_value=max_value if max_value is not None else 1.0,
)
def _build_audio(
*,
common: dict,
audio_source_id: Optional[str] = None,
mode: Optional[str] = None,
sensitivity: Optional[float] = None,
smoothing: Optional[float] = None,
min_value: Optional[float] = None,
max_value: Optional[float] = None,
auto_gain: Optional[bool] = None,
**_,
) -> ValueSource:
return AudioValueSource(
**common,
audio_source_id=audio_source_id or "",
mode=mode or "rms",
sensitivity=sensitivity if sensitivity is not None else 1.0,
smoothing=smoothing if smoothing is not None else 0.3,
min_value=min_value if min_value is not None else 0.0,
max_value=max_value if max_value is not None else 1.0,
auto_gain=bool(auto_gain) if auto_gain is not None else False,
)
def _build_adaptive_time(
*,
common: dict,
schedule: Optional[list] = None,
min_value: Optional[float] = None,
max_value: Optional[float] = None,
**_,
) -> ValueSource:
schedule_data = schedule or []
if len(schedule_data) < 2:
raise ValueError("Time of day schedule requires at least 2 points")
return AdaptiveValueSource(
**common,
schedule=schedule_data,
min_value=min_value if min_value is not None else 0.0,
max_value=max_value if max_value is not None else 1.0,
)
def _build_adaptive_scene(
*,
common: dict,
picture_source_id: Optional[str] = None,
scene_behavior: Optional[str] = None,
sensitivity: Optional[float] = None,
smoothing: Optional[float] = None,
min_value: Optional[float] = None,
max_value: Optional[float] = None,
**_,
) -> ValueSource:
return AdaptiveValueSource(
**common,
picture_source_id=picture_source_id or "",
scene_behavior=scene_behavior or "complement",
sensitivity=sensitivity if sensitivity is not None else 1.0,
smoothing=smoothing if smoothing is not None else 0.3,
min_value=min_value if min_value is not None else 0.0,
max_value=max_value if max_value is not None else 1.0,
)
def _build_daylight(
*,
common: dict,
speed: Optional[float] = None,
use_real_time: Optional[bool] = None,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
min_value: Optional[float] = None,
max_value: Optional[float] = None,
**_,
) -> ValueSource:
return DaylightValueSource(
**common,
speed=speed if speed is not None else 1.0,
use_real_time=bool(use_real_time) if use_real_time is not None else False,
latitude=latitude if latitude is not None else 50.0,
longitude=longitude if longitude is not None else 0.0,
min_value=min_value if min_value is not None else 0.0,
max_value=max_value if max_value is not None else 1.0,
)
def _build_static_color(*, common: dict, color: Optional[list] = None, **_) -> ValueSource:
rgb = color if isinstance(color, list) and len(color) == 3 else [255, 255, 255]
return StaticColorValueSource(**common, color=rgb)
def _build_animated_color(
*,
common: dict,
colors: Optional[list] = None,
speed: Optional[float] = None,
easing: Optional[str] = None,
clock_id: Optional[str] = None,
**_,
) -> ValueSource:
seed = (
colors
if isinstance(colors, list) and len(colors) >= 2
else [[255, 0, 0], [0, 255, 0], [0, 0, 255]]
)
return AnimatedColorValueSource(
**common,
colors=seed,
speed=speed if speed is not None else 10.0,
easing=easing or "linear",
clock_id=clock_id or None,
)
def _build_adaptive_time_color(
*, common: dict, schedule: Optional[list] = None, **_
) -> ValueSource:
schedule_data = schedule or []
if len(schedule_data) < 2:
raise ValueError("Color schedule requires at least 2 points")
return AdaptiveTimeColorValueSource(**common, schedule=schedule_data)
def _build_ha_entity(
*,
common: dict,
ha_source_id: Optional[str] = None,
entity_id: Optional[str] = None,
attribute: Optional[str] = None,
min_ha_value: Optional[float] = None,
max_ha_value: Optional[float] = None,
smoothing: Optional[float] = None,
**_,
) -> ValueSource:
if not ha_source_id:
raise ValueError("HA source ID is required for ha_entity type")
if not entity_id:
raise ValueError("Entity ID is required for ha_entity type")
return HAEntityValueSource(
**common,
ha_source_id=ha_source_id,
entity_id=entity_id,
attribute=attribute or "",
min_ha_value=min_ha_value if min_ha_value is not None else 0.0,
max_ha_value=max_ha_value if max_ha_value is not None else 100.0,
smoothing=smoothing if smoothing is not None else 0.0,
)
def _build_gradient_map(
*,
common: dict,
value_source_id: Optional[str] = None,
gradient_id: Optional[str] = None,
easing: Optional[str] = None,
**_,
) -> ValueSource:
if not value_source_id:
raise ValueError("Value source ID is required for gradient_map type")
return GradientMapValueSource(
**common,
value_source_id=value_source_id,
gradient_id=gradient_id or "",
easing=easing or "linear",
)
def _build_css_extract(
*,
common: dict,
color_strip_source_id: Optional[str] = None,
led_start: Optional[int] = None,
led_end: Optional[int] = None,
**_,
) -> ValueSource:
if not color_strip_source_id:
raise ValueError("Color strip source ID is required for css_extract type")
return CSSExtractValueSource(
**common,
color_strip_source_id=color_strip_source_id,
led_start=led_start if led_start is not None else 0,
led_end=led_end if led_end is not None else -1,
)
def _build_system_metrics(
*,
common: dict,
metric: Optional[str] = None,
min_value: Optional[float] = None,
max_value: Optional[float] = None,
max_rate: Optional[float] = None,
disk_path: Optional[str] = None,
sensor_label: Optional[str] = None,
poll_interval: Optional[float] = None,
smoothing: Optional[float] = None,
**_,
) -> ValueSource:
m = metric or "cpu_load"
if m not in VALID_SYSTEM_METRICS:
raise ValueError(f"Invalid metric: {m}")
return SystemMetricsValueSource(
**common,
metric=m,
min_value=min_value if min_value is not None else 0.0,
max_value=max_value if max_value is not None else 100.0,
max_rate=max_rate if max_rate is not None else 125_000_000.0,
disk_path=disk_path or "",
sensor_label=sensor_label or "",
poll_interval=poll_interval if poll_interval is not None else 1.0,
smoothing=smoothing if smoothing is not None else 0.0,
)
def _build_game_event(
*,
common: dict,
**_,
) -> ValueSource:
# GameEventValueSource has its own defaults; the store never previously
# exposed a way to create one through this path, but we register a
# builder anyway so the coverage assertion stays symmetric.
return GameEventValueSource(**common)
def _build_http(
*,
common: dict,
http_endpoint_id: Optional[str] = None,
json_path: Optional[str] = None,
interval_s: Optional[int] = None,
min_value: Optional[float] = None,
max_value: Optional[float] = None,
smoothing: Optional[float] = None,
**_,
) -> ValueSource:
if not http_endpoint_id:
raise ValueError("http_endpoint_id is required for http type")
iv = interval_s if interval_s is not None else 60
if iv < 1:
raise ValueError("interval_s must be >= 1")
return HTTPValueSource(
**common,
http_endpoint_id=http_endpoint_id,
json_path=json_path or "",
interval_s=iv,
min_value=min_value if min_value is not None else 0.0,
max_value=max_value if max_value is not None else 100.0,
smoothing=smoothing if smoothing is not None else 0.0,
)
CREATE_BUILDERS: Dict[str, CreateBuilder] = {
"static": _build_static,
"animated": _build_animated,
"audio": _build_audio,
"adaptive_time": _build_adaptive_time,
"adaptive_scene": _build_adaptive_scene,
"daylight": _build_daylight,
"static_color": _build_static_color,
"animated_color": _build_animated_color,
"adaptive_time_color": _build_adaptive_time_color,
"ha_entity": _build_ha_entity,
"gradient_map": _build_gradient_map,
"css_extract": _build_css_extract,
"system_metrics": _build_system_metrics,
"game_event": _build_game_event,
"http": _build_http,
}
def build_source(
*,
source_type: str,
sid: str,
name: str,
now: datetime,
description: Optional[str],
tags: Optional[List[str]],
icon: Optional[str],
icon_color: Optional[str],
**type_specific: Any,
) -> ValueSource:
"""Look up the per-type builder and produce a fresh ValueSource."""
builder = CREATE_BUILDERS.get(source_type)
if builder is None:
raise ValueError(f"Invalid source type: {source_type}")
common = _common(sid, name, source_type, now, description, tags, icon, icon_color)
return builder(common=common, **type_specific)
# ---------------------------------------------------------------------------
# Update appliers — one per source_type (mutates the existing instance)
# ---------------------------------------------------------------------------
def _apply_static(source: StaticValueSource, *, value=None, **_) -> None:
if value is not None:
source.value = value
def _apply_animated(
source: AnimatedValueSource,
*,
waveform=None,
speed=None,
min_value=None,
max_value=None,
**_,
) -> None:
if waveform is not None:
source.waveform = waveform
if speed is not None:
source.speed = speed
if min_value is not None:
source.min_value = min_value
if max_value is not None:
source.max_value = max_value
def _apply_audio(
source: AudioValueSource,
*,
audio_source_id=None,
mode=None,
sensitivity=None,
smoothing=None,
min_value=None,
max_value=None,
auto_gain=None,
**_,
) -> None:
if audio_source_id is not None:
source.audio_source_id = resolve_ref(audio_source_id, source.audio_source_id)
if mode is not None:
source.mode = mode
if sensitivity is not None:
source.sensitivity = sensitivity
if smoothing is not None:
source.smoothing = smoothing
if min_value is not None:
source.min_value = min_value
if max_value is not None:
source.max_value = max_value
if auto_gain is not None:
source.auto_gain = auto_gain
def _apply_adaptive_time(
source: AdaptiveValueSource,
*,
schedule=None,
min_value=None,
max_value=None,
**_,
) -> None:
if schedule is not None:
if len(schedule) < 2:
raise ValueError("Time of day schedule requires at least 2 points")
source.schedule = schedule
if min_value is not None:
source.min_value = min_value
if max_value is not None:
source.max_value = max_value
def _apply_adaptive_scene(
source: AdaptiveValueSource,
*,
picture_source_id=None,
scene_behavior=None,
sensitivity=None,
smoothing=None,
min_value=None,
max_value=None,
**_,
) -> None:
if picture_source_id is not None:
source.picture_source_id = resolve_ref(picture_source_id, source.picture_source_id)
if scene_behavior is not None:
source.scene_behavior = scene_behavior
if sensitivity is not None:
source.sensitivity = sensitivity
if smoothing is not None:
source.smoothing = smoothing
if min_value is not None:
source.min_value = min_value
if max_value is not None:
source.max_value = max_value
def _apply_daylight(
source: DaylightValueSource,
*,
speed=None,
use_real_time=None,
latitude=None,
longitude=None,
min_value=None,
max_value=None,
**_,
) -> None:
if speed is not None:
source.speed = speed
if use_real_time is not None:
source.use_real_time = use_real_time
if latitude is not None:
source.latitude = latitude
if longitude is not None:
source.longitude = longitude
if min_value is not None:
source.min_value = min_value
if max_value is not None:
source.max_value = max_value
def _apply_static_color(source: StaticColorValueSource, *, color=None, **_) -> None:
if color is not None and isinstance(color, list) and len(color) == 3:
source.color = color
def _apply_animated_color(
source: AnimatedColorValueSource,
*,
colors=None,
speed=None,
easing=None,
clock_id=None,
**_,
) -> None:
if colors is not None and isinstance(colors, list):
source.colors = colors
if speed is not None:
source.speed = speed
if easing is not None:
source.easing = easing
if clock_id is not None:
source.clock_id = resolve_ref(clock_id, source.clock_id)
def _apply_adaptive_time_color(source: AdaptiveTimeColorValueSource, *, schedule=None, **_) -> None:
if schedule is not None:
if len(schedule) < 2:
raise ValueError("Color schedule requires at least 2 points")
source.schedule = schedule
def _apply_ha_entity(
source: HAEntityValueSource,
*,
ha_source_id=None,
entity_id=None,
attribute=None,
min_ha_value=None,
max_ha_value=None,
smoothing=None,
**_,
) -> None:
if ha_source_id is not None:
source.ha_source_id = resolve_ref(ha_source_id, source.ha_source_id)
if entity_id is not None:
source.entity_id = entity_id
if attribute is not None:
source.attribute = attribute
if min_ha_value is not None:
source.min_ha_value = min_ha_value
if max_ha_value is not None:
source.max_ha_value = max_ha_value
if smoothing is not None:
source.smoothing = smoothing
def _apply_gradient_map(
source: GradientMapValueSource,
*,
value_source_id=None,
gradient_id=None,
easing=None,
**_,
) -> None:
if value_source_id is not None:
source.value_source_id = resolve_ref(value_source_id, source.value_source_id)
if gradient_id is not None:
source.gradient_id = resolve_ref(gradient_id, source.gradient_id)
if easing is not None:
source.easing = easing
def _apply_css_extract(
source: CSSExtractValueSource,
*,
color_strip_source_id=None,
led_start=None,
led_end=None,
**_,
) -> None:
if color_strip_source_id is not None:
source.color_strip_source_id = resolve_ref(
color_strip_source_id, source.color_strip_source_id
)
if led_start is not None:
source.led_start = led_start
if led_end is not None:
source.led_end = led_end
def _apply_system_metrics(
source: SystemMetricsValueSource,
*,
metric=None,
min_value=None,
max_value=None,
max_rate=None,
disk_path=None,
sensor_label=None,
poll_interval=None,
smoothing=None,
**_,
) -> None:
if metric is not None:
if metric not in VALID_SYSTEM_METRICS:
raise ValueError(f"Invalid metric: {metric}")
source.metric = metric
if min_value is not None:
source.min_value = min_value
if max_value is not None:
source.max_value = max_value
if max_rate is not None:
source.max_rate = max_rate
if disk_path is not None:
source.disk_path = disk_path
if sensor_label is not None:
source.sensor_label = sensor_label
if poll_interval is not None:
source.poll_interval = poll_interval
if smoothing is not None:
source.smoothing = smoothing
def _apply_game_event(source: GameEventValueSource, **_) -> None:
# No update fields are exposed through the store for game-event yet;
# registered for symmetry with CREATE_BUILDERS.
return None
def _apply_http(
source: HTTPValueSource,
*,
http_endpoint_id=None,
json_path=None,
interval_s=None,
min_value=None,
max_value=None,
smoothing=None,
**_,
) -> None:
if http_endpoint_id is not None:
source.http_endpoint_id = resolve_ref(http_endpoint_id, source.http_endpoint_id)
if json_path is not None:
source.json_path = json_path
if interval_s is not None:
if interval_s < 1:
raise ValueError("interval_s must be >= 1")
source.interval_s = interval_s
if min_value is not None:
source.min_value = min_value
if max_value is not None:
source.max_value = max_value
if smoothing is not None:
source.smoothing = smoothing
UPDATE_APPLIERS: Dict[str, UpdateApplier] = {
"static": _apply_static,
"animated": _apply_animated,
"audio": _apply_audio,
"adaptive_time": _apply_adaptive_time,
"adaptive_scene": _apply_adaptive_scene,
"daylight": _apply_daylight,
"static_color": _apply_static_color,
"animated_color": _apply_animated_color,
"adaptive_time_color": _apply_adaptive_time_color,
"ha_entity": _apply_ha_entity,
"gradient_map": _apply_gradient_map,
"css_extract": _apply_css_extract,
"system_metrics": _apply_system_metrics,
"game_event": _apply_game_event,
"http": _apply_http,
}
def apply_update(source: ValueSource, **kwargs: Any) -> None:
"""Dispatch to the per-type applier; no-op for unknown source_type."""
applier = UPDATE_APPLIERS.get(source.source_type)
if applier is None:
# Unknown source_type means the registry drifted from storage; the
# coverage assertion at import time should have caught this.
return
applier(source, **kwargs)
# ---------------------------------------------------------------------------
# Coverage assertion — both registries stay in sync with storage's map
# ---------------------------------------------------------------------------
def _assert_factory_coverage() -> None:
storage_kinds = set(_VALUE_SOURCE_MAP.keys())
create_kinds = set(CREATE_BUILDERS.keys())
update_kinds = set(UPDATE_APPLIERS.keys())
problems: list[str] = []
if create_kinds != storage_kinds:
if missing := storage_kinds - create_kinds:
problems.append(f"CREATE_BUILDERS missing: {sorted(missing)}")
if extra := create_kinds - storage_kinds:
problems.append(f"CREATE_BUILDERS extra: {sorted(extra)}")
if update_kinds != storage_kinds:
if missing := storage_kinds - update_kinds:
problems.append(f"UPDATE_APPLIERS missing: {sorted(missing)}")
if extra := update_kinds - storage_kinds:
problems.append(f"UPDATE_APPLIERS extra: {sorted(extra)}")
if problems:
raise RuntimeError(
"value_source_factories registries are out of sync with "
"storage._VALUE_SOURCE_MAP: " + "; ".join(problems)
)
_assert_factory_coverage()
+47 -477
View File
@@ -1,27 +1,21 @@
"""Value source storage using SQLite."""
"""Value source storage using SQLite.
Per-type create / update logic lives in
``ledgrab.storage.value_source_factories`` so this module stays focused
on the BaseSqliteStore protocol (load, save, name uniqueness, common
fields). See audit finding C7 for context on the previous if/elif
ladder this layout replaces.
"""
import uuid
from datetime import datetime, timezone
from typing import List, Optional
from ledgrab.storage.base_sqlite_store import BaseSqliteStore
from ledgrab.storage.database import Database
from ledgrab.storage.utils import resolve_ref
from ledgrab.storage.value_source import (
VALID_SYSTEM_METRICS,
AdaptiveValueSource,
AdaptiveTimeColorValueSource,
AnimatedColorValueSource,
AnimatedValueSource,
AudioValueSource,
CSSExtractValueSource,
DaylightValueSource,
GradientMapValueSource,
HAEntityValueSource,
StaticColorValueSource,
StaticValueSource,
SystemMetricsValueSource,
ValueSource,
from ledgrab.storage.value_source import ValueSource
from ledgrab.storage.value_source_factories import (
apply_update as _apply_value_source_update,
build_source as _build_value_source,
)
from ledgrab.utils import get_logger
@@ -44,306 +38,28 @@ class ValueSourceStore(BaseSqliteStore[ValueSource]):
# ── CRUD ─────────────────────────────────────────────────────────
def create_source(
self,
name: str,
source_type: str,
value: Optional[float] = None,
waveform: Optional[str] = None,
speed: Optional[float] = None,
min_value: Optional[float] = None,
max_value: Optional[float] = None,
audio_source_id: Optional[str] = None,
mode: Optional[str] = None,
sensitivity: Optional[float] = None,
smoothing: Optional[float] = None,
description: Optional[str] = None,
schedule: Optional[list] = None,
picture_source_id: Optional[str] = None,
scene_behavior: Optional[str] = None,
auto_gain: Optional[bool] = None,
use_real_time: Optional[bool] = None,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
color: Optional[list] = None,
colors: Optional[list] = None,
easing: Optional[str] = None,
tags: Optional[List[str]] = None,
ha_source_id: Optional[str] = None,
entity_id: Optional[str] = None,
attribute: Optional[str] = None,
min_ha_value: Optional[float] = None,
max_ha_value: Optional[float] = None,
value_source_id: Optional[str] = None,
gradient_id: Optional[str] = None,
color_strip_source_id: Optional[str] = None,
led_start: Optional[int] = None,
led_end: Optional[int] = None,
metric: Optional[str] = None,
max_rate: Optional[float] = None,
disk_path: Optional[str] = None,
sensor_label: Optional[str] = None,
poll_interval: Optional[float] = None,
clock_id: Optional[str] = None,
icon: Optional[str] = None,
icon_color: Optional[str] = None,
) -> ValueSource:
_VALID = (
"static",
"animated",
"audio",
"adaptive_time",
"adaptive_scene",
"daylight",
"static_color",
"animated_color",
"adaptive_time_color",
"ha_entity",
"gradient_map",
"css_extract",
"system_metrics",
)
if source_type not in _VALID:
raise ValueError(f"Invalid source type: {source_type}")
def create_source(self, name: str, source_type: str, **kwargs) -> ValueSource:
"""Create a new value source.
Per-type construction (defaults, validation, sub-mode dispatch)
lives in ``value_source_factories.CREATE_BUILDERS`` so the store
only owns name-uniqueness, id minting, and persistence.
"""
self._check_name_unique(name)
sid = f"vs_{uuid.uuid4().hex[:8]}"
now = datetime.now(timezone.utc)
common_tags = tags or []
common_icon = icon or ""
common_icon_color = icon_color or ""
if source_type == "static":
source: ValueSource = StaticValueSource(
id=sid,
source = _build_value_source(
source_type=source_type,
sid=sid,
name=name,
source_type="static",
created_at=now,
updated_at=now,
description=description,
tags=common_tags,
icon=common_icon,
icon_color=common_icon_color,
value=value if value is not None else 1.0,
)
elif source_type == "animated":
source = AnimatedValueSource(
id=sid,
name=name,
source_type="animated",
created_at=now,
updated_at=now,
description=description,
tags=common_tags,
icon=common_icon,
icon_color=common_icon_color,
waveform=waveform or "sine",
speed=speed if speed is not None else 10.0,
min_value=min_value if min_value is not None else 0.0,
max_value=max_value if max_value is not None else 1.0,
)
elif source_type == "audio":
source = AudioValueSource(
id=sid,
name=name,
source_type="audio",
created_at=now,
updated_at=now,
description=description,
tags=common_tags,
icon=common_icon,
icon_color=common_icon_color,
audio_source_id=audio_source_id or "",
mode=mode or "rms",
sensitivity=sensitivity if sensitivity is not None else 1.0,
smoothing=smoothing if smoothing is not None else 0.3,
min_value=min_value if min_value is not None else 0.0,
max_value=max_value if max_value is not None else 1.0,
auto_gain=bool(auto_gain) if auto_gain is not None else False,
)
elif source_type == "adaptive_time":
schedule_data = schedule or []
if len(schedule_data) < 2:
raise ValueError("Time of day schedule requires at least 2 points")
source = AdaptiveValueSource(
id=sid,
name=name,
source_type="adaptive_time",
created_at=now,
updated_at=now,
description=description,
tags=common_tags,
icon=common_icon,
icon_color=common_icon_color,
schedule=schedule_data,
min_value=min_value if min_value is not None else 0.0,
max_value=max_value if max_value is not None else 1.0,
)
elif source_type == "adaptive_scene":
source = AdaptiveValueSource(
id=sid,
name=name,
source_type="adaptive_scene",
created_at=now,
updated_at=now,
description=description,
tags=common_tags,
icon=common_icon,
icon_color=common_icon_color,
picture_source_id=picture_source_id or "",
scene_behavior=scene_behavior or "complement",
sensitivity=sensitivity if sensitivity is not None else 1.0,
smoothing=smoothing if smoothing is not None else 0.3,
min_value=min_value if min_value is not None else 0.0,
max_value=max_value if max_value is not None else 1.0,
)
elif source_type == "daylight":
source = DaylightValueSource(
id=sid,
name=name,
source_type="daylight",
created_at=now,
updated_at=now,
description=description,
tags=common_tags,
icon=common_icon,
icon_color=common_icon_color,
speed=speed if speed is not None else 1.0,
use_real_time=bool(use_real_time) if use_real_time is not None else False,
latitude=latitude if latitude is not None else 50.0,
longitude=longitude if longitude is not None else 0.0,
min_value=min_value if min_value is not None else 0.0,
max_value=max_value if max_value is not None else 1.0,
)
elif source_type == "static_color":
source = StaticColorValueSource(
id=sid,
name=name,
source_type="static_color",
created_at=now,
updated_at=now,
description=description,
tags=common_tags,
icon=common_icon,
icon_color=common_icon_color,
color=color if isinstance(color, list) and len(color) == 3 else [255, 255, 255],
)
elif source_type == "animated_color":
source = AnimatedColorValueSource(
id=sid,
name=name,
source_type="animated_color",
created_at=now,
updated_at=now,
description=description,
tags=common_tags,
icon=common_icon,
icon_color=common_icon_color,
colors=(
colors
if isinstance(colors, list) and len(colors) >= 2
else [[255, 0, 0], [0, 255, 0], [0, 0, 255]]
),
speed=speed if speed is not None else 10.0,
easing=easing or "linear",
clock_id=clock_id or None,
)
elif source_type == "adaptive_time_color":
schedule_data = schedule or []
if len(schedule_data) < 2:
raise ValueError("Color schedule requires at least 2 points")
source = AdaptiveTimeColorValueSource(
id=sid,
name=name,
source_type="adaptive_time_color",
created_at=now,
updated_at=now,
description=description,
tags=common_tags,
icon=common_icon,
icon_color=common_icon_color,
schedule=schedule_data,
)
elif source_type == "ha_entity":
if not ha_source_id:
raise ValueError("HA source ID is required for ha_entity type")
if not entity_id:
raise ValueError("Entity ID is required for ha_entity type")
source = HAEntityValueSource(
id=sid,
name=name,
source_type="ha_entity",
created_at=now,
updated_at=now,
description=description,
tags=common_tags,
icon=common_icon,
icon_color=common_icon_color,
ha_source_id=ha_source_id,
entity_id=entity_id,
attribute=attribute or "",
min_ha_value=min_ha_value if min_ha_value is not None else 0.0,
max_ha_value=max_ha_value if max_ha_value is not None else 100.0,
smoothing=smoothing if smoothing is not None else 0.0,
)
elif source_type == "gradient_map":
if not value_source_id:
raise ValueError("Value source ID is required for gradient_map type")
source = GradientMapValueSource(
id=sid,
name=name,
source_type="gradient_map",
created_at=now,
updated_at=now,
description=description,
tags=common_tags,
icon=common_icon,
icon_color=common_icon_color,
value_source_id=value_source_id,
gradient_id=gradient_id or "",
easing=easing or "linear",
)
elif source_type == "css_extract":
if not color_strip_source_id:
raise ValueError("Color strip source ID is required for css_extract type")
source = CSSExtractValueSource(
id=sid,
name=name,
source_type="css_extract",
created_at=now,
updated_at=now,
description=description,
tags=common_tags,
icon=common_icon,
icon_color=common_icon_color,
color_strip_source_id=color_strip_source_id,
led_start=led_start if led_start is not None else 0,
led_end=led_end if led_end is not None else -1,
)
elif source_type == "system_metrics":
m = metric or "cpu_load"
if m not in VALID_SYSTEM_METRICS:
raise ValueError(f"Invalid metric: {m}")
source = SystemMetricsValueSource(
id=sid,
name=name,
source_type="system_metrics",
created_at=now,
updated_at=now,
description=description,
tags=common_tags,
icon=common_icon,
icon_color=common_icon_color,
metric=m,
min_value=min_value if min_value is not None else 0.0,
max_value=max_value if max_value is not None else 100.0,
max_rate=max_rate if max_rate is not None else 125_000_000.0,
disk_path=disk_path or "",
sensor_label=sensor_label or "",
poll_interval=poll_interval if poll_interval is not None else 1.0,
smoothing=smoothing if smoothing is not None else 0.0,
now=now,
description=kwargs.pop("description", None),
tags=kwargs.pop("tags", None),
icon=kwargs.pop("icon", None),
icon_color=kwargs.pop("icon_color", None),
**kwargs,
)
self._items[sid] = source
@@ -352,187 +68,41 @@ class ValueSourceStore(BaseSqliteStore[ValueSource]):
logger.info(f"Created value source: {name} ({sid}, type={source_type})")
return source
def update_source(
self,
source_id: str,
name: Optional[str] = None,
value: Optional[float] = None,
waveform: Optional[str] = None,
speed: Optional[float] = None,
min_value: Optional[float] = None,
max_value: Optional[float] = None,
audio_source_id: Optional[str] = None,
mode: Optional[str] = None,
sensitivity: Optional[float] = None,
smoothing: Optional[float] = None,
description: Optional[str] = None,
schedule: Optional[list] = None,
picture_source_id: Optional[str] = None,
scene_behavior: Optional[str] = None,
auto_gain: Optional[bool] = None,
use_real_time: Optional[bool] = None,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
color: Optional[list] = None,
colors: Optional[list] = None,
easing: Optional[str] = None,
tags: Optional[List[str]] = None,
ha_source_id: Optional[str] = None,
entity_id: Optional[str] = None,
attribute: Optional[str] = None,
min_ha_value: Optional[float] = None,
max_ha_value: Optional[float] = None,
value_source_id: Optional[str] = None,
gradient_id: Optional[str] = None,
color_strip_source_id: Optional[str] = None,
led_start: Optional[int] = None,
led_end: Optional[int] = None,
metric: Optional[str] = None,
max_rate: Optional[float] = None,
disk_path: Optional[str] = None,
sensor_label: Optional[str] = None,
poll_interval: Optional[float] = None,
clock_id: Optional[str] = None,
icon: Optional[str] = None,
icon_color: Optional[str] = None,
) -> ValueSource:
def update_source(self, source_id: str, **kwargs) -> ValueSource:
"""Update an existing value source.
Common fields (name, description, tags, icon, icon_color) are
applied here; per-type field updates are delegated to
``value_source_factories.apply_update`` (which dispatches on
``source.source_type``).
"""
source = self.get(source_id)
name = kwargs.pop("name", None)
if name is not None:
self._check_name_unique(name, exclude_id=source_id)
source.name = name
description = kwargs.pop("description", None)
if description is not None:
source.description = description
tags = kwargs.pop("tags", None)
if tags is not None:
source.tags = tags
icon = kwargs.pop("icon", None)
if icon is not None:
source.icon = icon
icon_color = kwargs.pop("icon_color", None)
if icon_color is not None:
source.icon_color = icon_color
if isinstance(source, StaticValueSource):
if value is not None:
source.value = value
elif isinstance(source, AnimatedValueSource):
if waveform is not None:
source.waveform = waveform
if speed is not None:
source.speed = speed
if min_value is not None:
source.min_value = min_value
if max_value is not None:
source.max_value = max_value
elif isinstance(source, AudioValueSource):
if audio_source_id is not None:
source.audio_source_id = resolve_ref(audio_source_id, source.audio_source_id)
if mode is not None:
source.mode = mode
if sensitivity is not None:
source.sensitivity = sensitivity
if smoothing is not None:
source.smoothing = smoothing
if min_value is not None:
source.min_value = min_value
if max_value is not None:
source.max_value = max_value
if auto_gain is not None:
source.auto_gain = auto_gain
elif isinstance(source, AdaptiveValueSource):
if schedule is not None:
if source.source_type == "adaptive_time" and len(schedule) < 2:
raise ValueError("Time of day schedule requires at least 2 points")
source.schedule = schedule
if picture_source_id is not None:
source.picture_source_id = resolve_ref(picture_source_id, source.picture_source_id)
if scene_behavior is not None:
source.scene_behavior = scene_behavior
if sensitivity is not None:
source.sensitivity = sensitivity
if smoothing is not None:
source.smoothing = smoothing
if min_value is not None:
source.min_value = min_value
if max_value is not None:
source.max_value = max_value
elif isinstance(source, DaylightValueSource):
if speed is not None:
source.speed = speed
if use_real_time is not None:
source.use_real_time = use_real_time
if latitude is not None:
source.latitude = latitude
if longitude is not None:
source.longitude = longitude
if min_value is not None:
source.min_value = min_value
if max_value is not None:
source.max_value = max_value
elif isinstance(source, StaticColorValueSource):
if color is not None and isinstance(color, list) and len(color) == 3:
source.color = color
elif isinstance(source, AnimatedColorValueSource):
if colors is not None and isinstance(colors, list):
source.colors = colors
if speed is not None:
source.speed = speed
if easing is not None:
source.easing = easing
if clock_id is not None:
source.clock_id = resolve_ref(clock_id, source.clock_id)
elif isinstance(source, AdaptiveTimeColorValueSource):
if schedule is not None:
if len(schedule) < 2:
raise ValueError("Color schedule requires at least 2 points")
source.schedule = schedule
elif isinstance(source, HAEntityValueSource):
if ha_source_id is not None:
source.ha_source_id = resolve_ref(ha_source_id, source.ha_source_id)
if entity_id is not None:
source.entity_id = entity_id
if attribute is not None:
source.attribute = attribute
if min_ha_value is not None:
source.min_ha_value = min_ha_value
if max_ha_value is not None:
source.max_ha_value = max_ha_value
if smoothing is not None:
source.smoothing = smoothing
elif isinstance(source, GradientMapValueSource):
if value_source_id is not None:
source.value_source_id = resolve_ref(value_source_id, source.value_source_id)
if gradient_id is not None:
source.gradient_id = resolve_ref(gradient_id, source.gradient_id)
if easing is not None:
source.easing = easing
elif isinstance(source, CSSExtractValueSource):
if color_strip_source_id is not None:
source.color_strip_source_id = resolve_ref(
color_strip_source_id, source.color_strip_source_id
)
if led_start is not None:
source.led_start = led_start
if led_end is not None:
source.led_end = led_end
elif isinstance(source, SystemMetricsValueSource):
if metric is not None:
if metric not in VALID_SYSTEM_METRICS:
raise ValueError(f"Invalid metric: {metric}")
source.metric = metric
if min_value is not None:
source.min_value = min_value
if max_value is not None:
source.max_value = max_value
if max_rate is not None:
source.max_rate = max_rate
if disk_path is not None:
source.disk_path = disk_path
if sensor_label is not None:
source.sensor_label = sensor_label
if poll_interval is not None:
source.poll_interval = poll_interval
if smoothing is not None:
source.smoothing = smoothing
# Per-type field updates (and per-type validation) live in the
# factories module so adding a new value-source kind requires only
# one new builder + one new applier and no store edits.
_apply_value_source_update(source, **kwargs)
source.updated_at = datetime.now(timezone.utc)
self._save_item(source_id, source)
@@ -0,0 +1,251 @@
"""Tests for the per-type ValueSource factory registry.
Locks in the contract that ``CREATE_BUILDERS`` and ``UPDATE_APPLIERS``
stay symmetric with storage's ``_VALUE_SOURCE_MAP``, and exercises a
couple of representative builders end-to-end.
"""
from __future__ import annotations
from datetime import datetime, timezone
import pytest
from ledgrab.storage import value_source_factories as vsf
from ledgrab.storage.value_source import (
AdaptiveValueSource,
HAEntityValueSource,
StaticValueSource,
_VALUE_SOURCE_MAP,
)
# ---------------------------------------------------------------------------
# Coverage / shape
# ---------------------------------------------------------------------------
def test_create_builders_cover_every_storage_kind():
assert set(vsf.CREATE_BUILDERS.keys()) == set(_VALUE_SOURCE_MAP.keys())
def test_update_appliers_cover_every_storage_kind():
assert set(vsf.UPDATE_APPLIERS.keys()) == set(_VALUE_SOURCE_MAP.keys())
def test_coverage_assertion_raises_when_create_drifts(monkeypatch):
"""Removing a builder makes _assert_factory_coverage() fail loudly."""
pruned = {k: v for k, v in vsf.CREATE_BUILDERS.items() if k != "static"}
monkeypatch.setattr(vsf, "CREATE_BUILDERS", pruned)
with pytest.raises(RuntimeError, match="static"):
vsf._assert_factory_coverage()
def test_coverage_assertion_raises_when_update_drifts(monkeypatch):
pruned = {k: v for k, v in vsf.UPDATE_APPLIERS.items() if k != "http"}
monkeypatch.setattr(vsf, "UPDATE_APPLIERS", pruned)
with pytest.raises(RuntimeError, match="http"):
vsf._assert_factory_coverage()
# ---------------------------------------------------------------------------
# build_source — representative paths
# ---------------------------------------------------------------------------
def _now() -> datetime:
return datetime.now(timezone.utc)
def test_build_source_static_uses_value_default():
src = vsf.build_source(
source_type="static",
sid="vs_t",
name="Test",
now=_now(),
description=None,
tags=None,
icon=None,
icon_color=None,
)
assert isinstance(src, StaticValueSource)
assert src.value == 1.0
assert src.source_type == "static"
def test_build_source_static_honours_supplied_value():
src = vsf.build_source(
source_type="static",
sid="vs_t",
name="Test",
now=_now(),
description=None,
tags=None,
icon=None,
icon_color=None,
value=0.42,
)
assert src.value == 0.42
def test_build_source_adaptive_time_requires_schedule_with_two_points():
with pytest.raises(ValueError, match="Time of day schedule"):
vsf.build_source(
source_type="adaptive_time",
sid="vs_t",
name="Test",
now=_now(),
description=None,
tags=None,
icon=None,
icon_color=None,
schedule=[{"time": "00:00", "value": 0.5}], # only one point
)
def test_build_source_adaptive_scene_routes_to_adaptive_class_with_scene_type():
"""``adaptive_scene`` and ``adaptive_time`` share AdaptiveValueSource but
differ in source_type — the builder must set the right discriminator."""
src = vsf.build_source(
source_type="adaptive_scene",
sid="vs_t",
name="Test",
now=_now(),
description=None,
tags=None,
icon=None,
icon_color=None,
picture_source_id="pic_42",
)
assert isinstance(src, AdaptiveValueSource)
assert src.source_type == "adaptive_scene"
assert src.picture_source_id == "pic_42"
def test_build_source_ha_entity_requires_ha_source_id_and_entity_id():
with pytest.raises(ValueError, match="HA source ID"):
vsf.build_source(
source_type="ha_entity",
sid="vs_t",
name="Test",
now=_now(),
description=None,
tags=None,
icon=None,
icon_color=None,
entity_id="light.foo", # missing ha_source_id
)
with pytest.raises(ValueError, match="Entity ID"):
vsf.build_source(
source_type="ha_entity",
sid="vs_t",
name="Test",
now=_now(),
description=None,
tags=None,
icon=None,
icon_color=None,
ha_source_id="ha_main", # missing entity_id
)
def test_build_source_http_validates_interval():
with pytest.raises(ValueError, match="interval_s"):
vsf.build_source(
source_type="http",
sid="vs_t",
name="Test",
now=_now(),
description=None,
tags=None,
icon=None,
icon_color=None,
http_endpoint_id="ep_1",
interval_s=0,
)
def test_build_source_unknown_type_raises():
with pytest.raises(ValueError, match="Invalid source type"):
vsf.build_source(
source_type="totally_unregistered",
sid="vs_t",
name="Test",
now=_now(),
description=None,
tags=None,
icon=None,
icon_color=None,
)
# ---------------------------------------------------------------------------
# apply_update — representative paths
# ---------------------------------------------------------------------------
def test_apply_update_static_mutates_value():
src = vsf.build_source(
source_type="static",
sid="vs_t",
name="Test",
now=_now(),
description=None,
tags=None,
icon=None,
icon_color=None,
)
vsf.apply_update(src, value=0.75)
assert src.value == 0.75
def test_apply_update_ignores_unknown_kwargs():
"""Stray kwargs are silently dropped by each applier's ``**_``."""
src = vsf.build_source(
source_type="static",
sid="vs_t",
name="Test",
now=_now(),
description=None,
tags=None,
icon=None,
icon_color=None,
)
vsf.apply_update(src, value=0.5, totally_made_up_field="ignored")
assert src.value == 0.5
def test_apply_update_ha_entity_validates_smoothing_path():
src = vsf.build_source(
source_type="ha_entity",
sid="vs_t",
name="Test",
now=_now(),
description=None,
tags=None,
icon=None,
icon_color=None,
ha_source_id="ha_main",
entity_id="light.foo",
)
assert isinstance(src, HAEntityValueSource)
vsf.apply_update(src, smoothing=0.5, attribute="brightness")
assert src.smoothing == 0.5
assert src.attribute == "brightness"
def test_apply_update_http_rejects_zero_interval():
src = vsf.build_source(
source_type="http",
sid="vs_t",
name="Test",
now=_now(),
description=None,
tags=None,
icon=None,
icon_color=None,
http_endpoint_id="ep_1",
)
with pytest.raises(ValueError, match="interval_s"):
vsf.apply_update(src, interval_s=0)