diff --git a/server/src/ledgrab/storage/value_source_factories.py b/server/src/ledgrab/storage/value_source_factories.py new file mode 100644 index 0000000..674fbde --- /dev/null +++ b/server/src/ledgrab/storage/value_source_factories.py @@ -0,0 +1,740 @@ +"""Per-type factories for ValueSource create / update flows. + +``ValueSourceStore.create_source`` used to be a ~260-line ``if/elif`` +chain over 14 ``source_type`` strings; ``update_source`` did the same +thing again with 14 ``isinstance`` branches (audit finding C7). Adding +a new value-source kind meant editing both chains and remembering to +keep them aligned. + +This module replaces both ladders with a single ``CREATE_BUILDERS`` +plus a single ``UPDATE_APPLIERS`` dict, both keyed by ``source_type``. +Each builder is a small free function that knows the per-type +construction rules (defaults, validation, sub-mode dispatch for +``adaptive_*``); each applier mutates an existing instance in-place +with the same defaulting / validation rules. + +An import-time coverage assertion guarantees both registries stay in +sync with ``storage.value_source._VALUE_SOURCE_MAP`` so a new +``source_type`` cannot ship without both a builder and an applier. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional + +from ledgrab.storage.utils import resolve_ref +from ledgrab.storage.value_source import ( + VALID_SYSTEM_METRICS, + AdaptiveValueSource, + AdaptiveTimeColorValueSource, + AnimatedColorValueSource, + AnimatedValueSource, + AudioValueSource, + CSSExtractValueSource, + DaylightValueSource, + GameEventValueSource, + GradientMapValueSource, + HAEntityValueSource, + HTTPValueSource, + StaticColorValueSource, + StaticValueSource, + SystemMetricsValueSource, + ValueSource, + _VALUE_SOURCE_MAP, +) + + +CreateBuilder = Callable[..., ValueSource] +UpdateApplier = Callable[..., None] + + +# --------------------------------------------------------------------------- +# Common kwargs scaffold +# --------------------------------------------------------------------------- + + +def _common( + sid: str, + name: str, + source_type: str, + now: datetime, + description: Optional[str], + tags: Optional[List[str]], + icon: Optional[str], + icon_color: Optional[str], +) -> dict: + """Build the kwargs dict shared by every ValueSource subclass __init__.""" + return dict( + id=sid, + name=name, + source_type=source_type, + created_at=now, + updated_at=now, + description=description, + tags=tags or [], + icon=icon or "", + icon_color=icon_color or "", + ) + + +# --------------------------------------------------------------------------- +# Create builders — one per source_type +# Each builder takes the common scaffold plus the per-type kwargs from the +# store's CRUD method. Validation that's specific to one type lives here. +# --------------------------------------------------------------------------- + + +def _build_static(*, common: dict, value: Optional[float] = None, **_) -> ValueSource: + return StaticValueSource(**common, value=value if value is not None else 1.0) + + +def _build_animated( + *, + common: dict, + waveform: Optional[str] = None, + speed: Optional[float] = None, + min_value: Optional[float] = None, + max_value: Optional[float] = None, + **_, +) -> ValueSource: + return AnimatedValueSource( + **common, + waveform=waveform or "sine", + speed=speed if speed is not None else 10.0, + min_value=min_value if min_value is not None else 0.0, + max_value=max_value if max_value is not None else 1.0, + ) + + +def _build_audio( + *, + common: dict, + audio_source_id: Optional[str] = None, + mode: Optional[str] = None, + sensitivity: Optional[float] = None, + smoothing: Optional[float] = None, + min_value: Optional[float] = None, + max_value: Optional[float] = None, + auto_gain: Optional[bool] = None, + **_, +) -> ValueSource: + return AudioValueSource( + **common, + audio_source_id=audio_source_id or "", + mode=mode or "rms", + sensitivity=sensitivity if sensitivity is not None else 1.0, + smoothing=smoothing if smoothing is not None else 0.3, + min_value=min_value if min_value is not None else 0.0, + max_value=max_value if max_value is not None else 1.0, + auto_gain=bool(auto_gain) if auto_gain is not None else False, + ) + + +def _build_adaptive_time( + *, + common: dict, + schedule: Optional[list] = None, + min_value: Optional[float] = None, + max_value: Optional[float] = None, + **_, +) -> ValueSource: + schedule_data = schedule or [] + if len(schedule_data) < 2: + raise ValueError("Time of day schedule requires at least 2 points") + return AdaptiveValueSource( + **common, + schedule=schedule_data, + min_value=min_value if min_value is not None else 0.0, + max_value=max_value if max_value is not None else 1.0, + ) + + +def _build_adaptive_scene( + *, + common: dict, + picture_source_id: Optional[str] = None, + scene_behavior: Optional[str] = None, + sensitivity: Optional[float] = None, + smoothing: Optional[float] = None, + min_value: Optional[float] = None, + max_value: Optional[float] = None, + **_, +) -> ValueSource: + return AdaptiveValueSource( + **common, + picture_source_id=picture_source_id or "", + scene_behavior=scene_behavior or "complement", + sensitivity=sensitivity if sensitivity is not None else 1.0, + smoothing=smoothing if smoothing is not None else 0.3, + min_value=min_value if min_value is not None else 0.0, + max_value=max_value if max_value is not None else 1.0, + ) + + +def _build_daylight( + *, + common: dict, + speed: Optional[float] = None, + use_real_time: Optional[bool] = None, + latitude: Optional[float] = None, + longitude: Optional[float] = None, + min_value: Optional[float] = None, + max_value: Optional[float] = None, + **_, +) -> ValueSource: + return DaylightValueSource( + **common, + speed=speed if speed is not None else 1.0, + use_real_time=bool(use_real_time) if use_real_time is not None else False, + latitude=latitude if latitude is not None else 50.0, + longitude=longitude if longitude is not None else 0.0, + min_value=min_value if min_value is not None else 0.0, + max_value=max_value if max_value is not None else 1.0, + ) + + +def _build_static_color(*, common: dict, color: Optional[list] = None, **_) -> ValueSource: + rgb = color if isinstance(color, list) and len(color) == 3 else [255, 255, 255] + return StaticColorValueSource(**common, color=rgb) + + +def _build_animated_color( + *, + common: dict, + colors: Optional[list] = None, + speed: Optional[float] = None, + easing: Optional[str] = None, + clock_id: Optional[str] = None, + **_, +) -> ValueSource: + seed = ( + colors + if isinstance(colors, list) and len(colors) >= 2 + else [[255, 0, 0], [0, 255, 0], [0, 0, 255]] + ) + return AnimatedColorValueSource( + **common, + colors=seed, + speed=speed if speed is not None else 10.0, + easing=easing or "linear", + clock_id=clock_id or None, + ) + + +def _build_adaptive_time_color( + *, common: dict, schedule: Optional[list] = None, **_ +) -> ValueSource: + schedule_data = schedule or [] + if len(schedule_data) < 2: + raise ValueError("Color schedule requires at least 2 points") + return AdaptiveTimeColorValueSource(**common, schedule=schedule_data) + + +def _build_ha_entity( + *, + common: dict, + ha_source_id: Optional[str] = None, + entity_id: Optional[str] = None, + attribute: Optional[str] = None, + min_ha_value: Optional[float] = None, + max_ha_value: Optional[float] = None, + smoothing: Optional[float] = None, + **_, +) -> ValueSource: + if not ha_source_id: + raise ValueError("HA source ID is required for ha_entity type") + if not entity_id: + raise ValueError("Entity ID is required for ha_entity type") + return HAEntityValueSource( + **common, + ha_source_id=ha_source_id, + entity_id=entity_id, + attribute=attribute or "", + min_ha_value=min_ha_value if min_ha_value is not None else 0.0, + max_ha_value=max_ha_value if max_ha_value is not None else 100.0, + smoothing=smoothing if smoothing is not None else 0.0, + ) + + +def _build_gradient_map( + *, + common: dict, + value_source_id: Optional[str] = None, + gradient_id: Optional[str] = None, + easing: Optional[str] = None, + **_, +) -> ValueSource: + if not value_source_id: + raise ValueError("Value source ID is required for gradient_map type") + return GradientMapValueSource( + **common, + value_source_id=value_source_id, + gradient_id=gradient_id or "", + easing=easing or "linear", + ) + + +def _build_css_extract( + *, + common: dict, + color_strip_source_id: Optional[str] = None, + led_start: Optional[int] = None, + led_end: Optional[int] = None, + **_, +) -> ValueSource: + if not color_strip_source_id: + raise ValueError("Color strip source ID is required for css_extract type") + return CSSExtractValueSource( + **common, + color_strip_source_id=color_strip_source_id, + led_start=led_start if led_start is not None else 0, + led_end=led_end if led_end is not None else -1, + ) + + +def _build_system_metrics( + *, + common: dict, + metric: Optional[str] = None, + min_value: Optional[float] = None, + max_value: Optional[float] = None, + max_rate: Optional[float] = None, + disk_path: Optional[str] = None, + sensor_label: Optional[str] = None, + poll_interval: Optional[float] = None, + smoothing: Optional[float] = None, + **_, +) -> ValueSource: + m = metric or "cpu_load" + if m not in VALID_SYSTEM_METRICS: + raise ValueError(f"Invalid metric: {m}") + return SystemMetricsValueSource( + **common, + metric=m, + min_value=min_value if min_value is not None else 0.0, + max_value=max_value if max_value is not None else 100.0, + max_rate=max_rate if max_rate is not None else 125_000_000.0, + disk_path=disk_path or "", + sensor_label=sensor_label or "", + poll_interval=poll_interval if poll_interval is not None else 1.0, + smoothing=smoothing if smoothing is not None else 0.0, + ) + + +def _build_game_event( + *, + common: dict, + **_, +) -> ValueSource: + # GameEventValueSource has its own defaults; the store never previously + # exposed a way to create one through this path, but we register a + # builder anyway so the coverage assertion stays symmetric. + return GameEventValueSource(**common) + + +def _build_http( + *, + common: dict, + http_endpoint_id: Optional[str] = None, + json_path: Optional[str] = None, + interval_s: Optional[int] = None, + min_value: Optional[float] = None, + max_value: Optional[float] = None, + smoothing: Optional[float] = None, + **_, +) -> ValueSource: + if not http_endpoint_id: + raise ValueError("http_endpoint_id is required for http type") + iv = interval_s if interval_s is not None else 60 + if iv < 1: + raise ValueError("interval_s must be >= 1") + return HTTPValueSource( + **common, + http_endpoint_id=http_endpoint_id, + json_path=json_path or "", + interval_s=iv, + min_value=min_value if min_value is not None else 0.0, + max_value=max_value if max_value is not None else 100.0, + smoothing=smoothing if smoothing is not None else 0.0, + ) + + +CREATE_BUILDERS: Dict[str, CreateBuilder] = { + "static": _build_static, + "animated": _build_animated, + "audio": _build_audio, + "adaptive_time": _build_adaptive_time, + "adaptive_scene": _build_adaptive_scene, + "daylight": _build_daylight, + "static_color": _build_static_color, + "animated_color": _build_animated_color, + "adaptive_time_color": _build_adaptive_time_color, + "ha_entity": _build_ha_entity, + "gradient_map": _build_gradient_map, + "css_extract": _build_css_extract, + "system_metrics": _build_system_metrics, + "game_event": _build_game_event, + "http": _build_http, +} + + +def build_source( + *, + source_type: str, + sid: str, + name: str, + now: datetime, + description: Optional[str], + tags: Optional[List[str]], + icon: Optional[str], + icon_color: Optional[str], + **type_specific: Any, +) -> ValueSource: + """Look up the per-type builder and produce a fresh ValueSource.""" + builder = CREATE_BUILDERS.get(source_type) + if builder is None: + raise ValueError(f"Invalid source type: {source_type}") + common = _common(sid, name, source_type, now, description, tags, icon, icon_color) + return builder(common=common, **type_specific) + + +# --------------------------------------------------------------------------- +# Update appliers — one per source_type (mutates the existing instance) +# --------------------------------------------------------------------------- + + +def _apply_static(source: StaticValueSource, *, value=None, **_) -> None: + if value is not None: + source.value = value + + +def _apply_animated( + source: AnimatedValueSource, + *, + waveform=None, + speed=None, + min_value=None, + max_value=None, + **_, +) -> None: + if waveform is not None: + source.waveform = waveform + if speed is not None: + source.speed = speed + if min_value is not None: + source.min_value = min_value + if max_value is not None: + source.max_value = max_value + + +def _apply_audio( + source: AudioValueSource, + *, + audio_source_id=None, + mode=None, + sensitivity=None, + smoothing=None, + min_value=None, + max_value=None, + auto_gain=None, + **_, +) -> None: + if audio_source_id is not None: + source.audio_source_id = resolve_ref(audio_source_id, source.audio_source_id) + if mode is not None: + source.mode = mode + if sensitivity is not None: + source.sensitivity = sensitivity + if smoothing is not None: + source.smoothing = smoothing + if min_value is not None: + source.min_value = min_value + if max_value is not None: + source.max_value = max_value + if auto_gain is not None: + source.auto_gain = auto_gain + + +def _apply_adaptive_time( + source: AdaptiveValueSource, + *, + schedule=None, + min_value=None, + max_value=None, + **_, +) -> None: + if schedule is not None: + if len(schedule) < 2: + raise ValueError("Time of day schedule requires at least 2 points") + source.schedule = schedule + if min_value is not None: + source.min_value = min_value + if max_value is not None: + source.max_value = max_value + + +def _apply_adaptive_scene( + source: AdaptiveValueSource, + *, + picture_source_id=None, + scene_behavior=None, + sensitivity=None, + smoothing=None, + min_value=None, + max_value=None, + **_, +) -> None: + if picture_source_id is not None: + source.picture_source_id = resolve_ref(picture_source_id, source.picture_source_id) + if scene_behavior is not None: + source.scene_behavior = scene_behavior + if sensitivity is not None: + source.sensitivity = sensitivity + if smoothing is not None: + source.smoothing = smoothing + if min_value is not None: + source.min_value = min_value + if max_value is not None: + source.max_value = max_value + + +def _apply_daylight( + source: DaylightValueSource, + *, + speed=None, + use_real_time=None, + latitude=None, + longitude=None, + min_value=None, + max_value=None, + **_, +) -> None: + if speed is not None: + source.speed = speed + if use_real_time is not None: + source.use_real_time = use_real_time + if latitude is not None: + source.latitude = latitude + if longitude is not None: + source.longitude = longitude + if min_value is not None: + source.min_value = min_value + if max_value is not None: + source.max_value = max_value + + +def _apply_static_color(source: StaticColorValueSource, *, color=None, **_) -> None: + if color is not None and isinstance(color, list) and len(color) == 3: + source.color = color + + +def _apply_animated_color( + source: AnimatedColorValueSource, + *, + colors=None, + speed=None, + easing=None, + clock_id=None, + **_, +) -> None: + if colors is not None and isinstance(colors, list): + source.colors = colors + if speed is not None: + source.speed = speed + if easing is not None: + source.easing = easing + if clock_id is not None: + source.clock_id = resolve_ref(clock_id, source.clock_id) + + +def _apply_adaptive_time_color(source: AdaptiveTimeColorValueSource, *, schedule=None, **_) -> None: + if schedule is not None: + if len(schedule) < 2: + raise ValueError("Color schedule requires at least 2 points") + source.schedule = schedule + + +def _apply_ha_entity( + source: HAEntityValueSource, + *, + ha_source_id=None, + entity_id=None, + attribute=None, + min_ha_value=None, + max_ha_value=None, + smoothing=None, + **_, +) -> None: + if ha_source_id is not None: + source.ha_source_id = resolve_ref(ha_source_id, source.ha_source_id) + if entity_id is not None: + source.entity_id = entity_id + if attribute is not None: + source.attribute = attribute + if min_ha_value is not None: + source.min_ha_value = min_ha_value + if max_ha_value is not None: + source.max_ha_value = max_ha_value + if smoothing is not None: + source.smoothing = smoothing + + +def _apply_gradient_map( + source: GradientMapValueSource, + *, + value_source_id=None, + gradient_id=None, + easing=None, + **_, +) -> None: + if value_source_id is not None: + source.value_source_id = resolve_ref(value_source_id, source.value_source_id) + if gradient_id is not None: + source.gradient_id = resolve_ref(gradient_id, source.gradient_id) + if easing is not None: + source.easing = easing + + +def _apply_css_extract( + source: CSSExtractValueSource, + *, + color_strip_source_id=None, + led_start=None, + led_end=None, + **_, +) -> None: + if color_strip_source_id is not None: + source.color_strip_source_id = resolve_ref( + color_strip_source_id, source.color_strip_source_id + ) + if led_start is not None: + source.led_start = led_start + if led_end is not None: + source.led_end = led_end + + +def _apply_system_metrics( + source: SystemMetricsValueSource, + *, + metric=None, + min_value=None, + max_value=None, + max_rate=None, + disk_path=None, + sensor_label=None, + poll_interval=None, + smoothing=None, + **_, +) -> None: + if metric is not None: + if metric not in VALID_SYSTEM_METRICS: + raise ValueError(f"Invalid metric: {metric}") + source.metric = metric + if min_value is not None: + source.min_value = min_value + if max_value is not None: + source.max_value = max_value + if max_rate is not None: + source.max_rate = max_rate + if disk_path is not None: + source.disk_path = disk_path + if sensor_label is not None: + source.sensor_label = sensor_label + if poll_interval is not None: + source.poll_interval = poll_interval + if smoothing is not None: + source.smoothing = smoothing + + +def _apply_game_event(source: GameEventValueSource, **_) -> None: + # No update fields are exposed through the store for game-event yet; + # registered for symmetry with CREATE_BUILDERS. + return None + + +def _apply_http( + source: HTTPValueSource, + *, + http_endpoint_id=None, + json_path=None, + interval_s=None, + min_value=None, + max_value=None, + smoothing=None, + **_, +) -> None: + if http_endpoint_id is not None: + source.http_endpoint_id = resolve_ref(http_endpoint_id, source.http_endpoint_id) + if json_path is not None: + source.json_path = json_path + if interval_s is not None: + if interval_s < 1: + raise ValueError("interval_s must be >= 1") + source.interval_s = interval_s + if min_value is not None: + source.min_value = min_value + if max_value is not None: + source.max_value = max_value + if smoothing is not None: + source.smoothing = smoothing + + +UPDATE_APPLIERS: Dict[str, UpdateApplier] = { + "static": _apply_static, + "animated": _apply_animated, + "audio": _apply_audio, + "adaptive_time": _apply_adaptive_time, + "adaptive_scene": _apply_adaptive_scene, + "daylight": _apply_daylight, + "static_color": _apply_static_color, + "animated_color": _apply_animated_color, + "adaptive_time_color": _apply_adaptive_time_color, + "ha_entity": _apply_ha_entity, + "gradient_map": _apply_gradient_map, + "css_extract": _apply_css_extract, + "system_metrics": _apply_system_metrics, + "game_event": _apply_game_event, + "http": _apply_http, +} + + +def apply_update(source: ValueSource, **kwargs: Any) -> None: + """Dispatch to the per-type applier; no-op for unknown source_type.""" + applier = UPDATE_APPLIERS.get(source.source_type) + if applier is None: + # Unknown source_type means the registry drifted from storage; the + # coverage assertion at import time should have caught this. + return + applier(source, **kwargs) + + +# --------------------------------------------------------------------------- +# Coverage assertion — both registries stay in sync with storage's map +# --------------------------------------------------------------------------- + + +def _assert_factory_coverage() -> None: + storage_kinds = set(_VALUE_SOURCE_MAP.keys()) + create_kinds = set(CREATE_BUILDERS.keys()) + update_kinds = set(UPDATE_APPLIERS.keys()) + + problems: list[str] = [] + if create_kinds != storage_kinds: + if missing := storage_kinds - create_kinds: + problems.append(f"CREATE_BUILDERS missing: {sorted(missing)}") + if extra := create_kinds - storage_kinds: + problems.append(f"CREATE_BUILDERS extra: {sorted(extra)}") + if update_kinds != storage_kinds: + if missing := storage_kinds - update_kinds: + problems.append(f"UPDATE_APPLIERS missing: {sorted(missing)}") + if extra := update_kinds - storage_kinds: + problems.append(f"UPDATE_APPLIERS extra: {sorted(extra)}") + if problems: + raise RuntimeError( + "value_source_factories registries are out of sync with " + "storage._VALUE_SOURCE_MAP: " + "; ".join(problems) + ) + + +_assert_factory_coverage() diff --git a/server/src/ledgrab/storage/value_source_store.py b/server/src/ledgrab/storage/value_source_store.py index a38e3f8..10a0edf 100644 --- a/server/src/ledgrab/storage/value_source_store.py +++ b/server/src/ledgrab/storage/value_source_store.py @@ -1,27 +1,21 @@ -"""Value source storage using SQLite.""" +"""Value source storage using SQLite. + +Per-type create / update logic lives in +``ledgrab.storage.value_source_factories`` so this module stays focused +on the BaseSqliteStore protocol (load, save, name uniqueness, common +fields). See audit finding C7 for context on the previous if/elif +ladder this layout replaces. +""" import uuid from datetime import datetime, timezone -from typing import List, Optional from ledgrab.storage.base_sqlite_store import BaseSqliteStore from ledgrab.storage.database import Database -from ledgrab.storage.utils import resolve_ref -from ledgrab.storage.value_source import ( - VALID_SYSTEM_METRICS, - AdaptiveValueSource, - AdaptiveTimeColorValueSource, - AnimatedColorValueSource, - AnimatedValueSource, - AudioValueSource, - CSSExtractValueSource, - DaylightValueSource, - GradientMapValueSource, - HAEntityValueSource, - StaticColorValueSource, - StaticValueSource, - SystemMetricsValueSource, - ValueSource, +from ledgrab.storage.value_source import ValueSource +from ledgrab.storage.value_source_factories import ( + apply_update as _apply_value_source_update, + build_source as _build_value_source, ) from ledgrab.utils import get_logger @@ -44,307 +38,29 @@ class ValueSourceStore(BaseSqliteStore[ValueSource]): # ── CRUD ───────────────────────────────────────────────────────── - def create_source( - self, - name: str, - source_type: str, - value: Optional[float] = None, - waveform: Optional[str] = None, - speed: Optional[float] = None, - min_value: Optional[float] = None, - max_value: Optional[float] = None, - audio_source_id: Optional[str] = None, - mode: Optional[str] = None, - sensitivity: Optional[float] = None, - smoothing: Optional[float] = None, - description: Optional[str] = None, - schedule: Optional[list] = None, - picture_source_id: Optional[str] = None, - scene_behavior: Optional[str] = None, - auto_gain: Optional[bool] = None, - use_real_time: Optional[bool] = None, - latitude: Optional[float] = None, - longitude: Optional[float] = None, - color: Optional[list] = None, - colors: Optional[list] = None, - easing: Optional[str] = None, - tags: Optional[List[str]] = None, - ha_source_id: Optional[str] = None, - entity_id: Optional[str] = None, - attribute: Optional[str] = None, - min_ha_value: Optional[float] = None, - max_ha_value: Optional[float] = None, - value_source_id: Optional[str] = None, - gradient_id: Optional[str] = None, - color_strip_source_id: Optional[str] = None, - led_start: Optional[int] = None, - led_end: Optional[int] = None, - metric: Optional[str] = None, - max_rate: Optional[float] = None, - disk_path: Optional[str] = None, - sensor_label: Optional[str] = None, - poll_interval: Optional[float] = None, - clock_id: Optional[str] = None, - icon: Optional[str] = None, - icon_color: Optional[str] = None, - ) -> ValueSource: - _VALID = ( - "static", - "animated", - "audio", - "adaptive_time", - "adaptive_scene", - "daylight", - "static_color", - "animated_color", - "adaptive_time_color", - "ha_entity", - "gradient_map", - "css_extract", - "system_metrics", - ) - if source_type not in _VALID: - raise ValueError(f"Invalid source type: {source_type}") + def create_source(self, name: str, source_type: str, **kwargs) -> ValueSource: + """Create a new value source. + Per-type construction (defaults, validation, sub-mode dispatch) + lives in ``value_source_factories.CREATE_BUILDERS`` so the store + only owns name-uniqueness, id minting, and persistence. + """ self._check_name_unique(name) sid = f"vs_{uuid.uuid4().hex[:8]}" now = datetime.now(timezone.utc) - common_tags = tags or [] - common_icon = icon or "" - common_icon_color = icon_color or "" - - if source_type == "static": - source: ValueSource = StaticValueSource( - id=sid, - name=name, - source_type="static", - created_at=now, - updated_at=now, - description=description, - tags=common_tags, - icon=common_icon, - icon_color=common_icon_color, - value=value if value is not None else 1.0, - ) - elif source_type == "animated": - source = AnimatedValueSource( - id=sid, - name=name, - source_type="animated", - created_at=now, - updated_at=now, - description=description, - tags=common_tags, - icon=common_icon, - icon_color=common_icon_color, - waveform=waveform or "sine", - speed=speed if speed is not None else 10.0, - min_value=min_value if min_value is not None else 0.0, - max_value=max_value if max_value is not None else 1.0, - ) - elif source_type == "audio": - source = AudioValueSource( - id=sid, - name=name, - source_type="audio", - created_at=now, - updated_at=now, - description=description, - tags=common_tags, - icon=common_icon, - icon_color=common_icon_color, - audio_source_id=audio_source_id or "", - mode=mode or "rms", - sensitivity=sensitivity if sensitivity is not None else 1.0, - smoothing=smoothing if smoothing is not None else 0.3, - min_value=min_value if min_value is not None else 0.0, - max_value=max_value if max_value is not None else 1.0, - auto_gain=bool(auto_gain) if auto_gain is not None else False, - ) - elif source_type == "adaptive_time": - schedule_data = schedule or [] - if len(schedule_data) < 2: - raise ValueError("Time of day schedule requires at least 2 points") - source = AdaptiveValueSource( - id=sid, - name=name, - source_type="adaptive_time", - created_at=now, - updated_at=now, - description=description, - tags=common_tags, - icon=common_icon, - icon_color=common_icon_color, - schedule=schedule_data, - min_value=min_value if min_value is not None else 0.0, - max_value=max_value if max_value is not None else 1.0, - ) - elif source_type == "adaptive_scene": - source = AdaptiveValueSource( - id=sid, - name=name, - source_type="adaptive_scene", - created_at=now, - updated_at=now, - description=description, - tags=common_tags, - icon=common_icon, - icon_color=common_icon_color, - picture_source_id=picture_source_id or "", - scene_behavior=scene_behavior or "complement", - sensitivity=sensitivity if sensitivity is not None else 1.0, - smoothing=smoothing if smoothing is not None else 0.3, - min_value=min_value if min_value is not None else 0.0, - max_value=max_value if max_value is not None else 1.0, - ) - elif source_type == "daylight": - source = DaylightValueSource( - id=sid, - name=name, - source_type="daylight", - created_at=now, - updated_at=now, - description=description, - tags=common_tags, - icon=common_icon, - icon_color=common_icon_color, - speed=speed if speed is not None else 1.0, - use_real_time=bool(use_real_time) if use_real_time is not None else False, - latitude=latitude if latitude is not None else 50.0, - longitude=longitude if longitude is not None else 0.0, - min_value=min_value if min_value is not None else 0.0, - max_value=max_value if max_value is not None else 1.0, - ) - elif source_type == "static_color": - source = StaticColorValueSource( - id=sid, - name=name, - source_type="static_color", - created_at=now, - updated_at=now, - description=description, - tags=common_tags, - icon=common_icon, - icon_color=common_icon_color, - color=color if isinstance(color, list) and len(color) == 3 else [255, 255, 255], - ) - elif source_type == "animated_color": - source = AnimatedColorValueSource( - id=sid, - name=name, - source_type="animated_color", - created_at=now, - updated_at=now, - description=description, - tags=common_tags, - icon=common_icon, - icon_color=common_icon_color, - colors=( - colors - if isinstance(colors, list) and len(colors) >= 2 - else [[255, 0, 0], [0, 255, 0], [0, 0, 255]] - ), - speed=speed if speed is not None else 10.0, - easing=easing or "linear", - clock_id=clock_id or None, - ) - elif source_type == "adaptive_time_color": - schedule_data = schedule or [] - if len(schedule_data) < 2: - raise ValueError("Color schedule requires at least 2 points") - source = AdaptiveTimeColorValueSource( - id=sid, - name=name, - source_type="adaptive_time_color", - created_at=now, - updated_at=now, - description=description, - tags=common_tags, - icon=common_icon, - icon_color=common_icon_color, - schedule=schedule_data, - ) - elif source_type == "ha_entity": - if not ha_source_id: - raise ValueError("HA source ID is required for ha_entity type") - if not entity_id: - raise ValueError("Entity ID is required for ha_entity type") - source = HAEntityValueSource( - id=sid, - name=name, - source_type="ha_entity", - created_at=now, - updated_at=now, - description=description, - tags=common_tags, - icon=common_icon, - icon_color=common_icon_color, - ha_source_id=ha_source_id, - entity_id=entity_id, - attribute=attribute or "", - min_ha_value=min_ha_value if min_ha_value is not None else 0.0, - max_ha_value=max_ha_value if max_ha_value is not None else 100.0, - smoothing=smoothing if smoothing is not None else 0.0, - ) - elif source_type == "gradient_map": - if not value_source_id: - raise ValueError("Value source ID is required for gradient_map type") - source = GradientMapValueSource( - id=sid, - name=name, - source_type="gradient_map", - created_at=now, - updated_at=now, - description=description, - tags=common_tags, - icon=common_icon, - icon_color=common_icon_color, - value_source_id=value_source_id, - gradient_id=gradient_id or "", - easing=easing or "linear", - ) - elif source_type == "css_extract": - if not color_strip_source_id: - raise ValueError("Color strip source ID is required for css_extract type") - source = CSSExtractValueSource( - id=sid, - name=name, - source_type="css_extract", - created_at=now, - updated_at=now, - description=description, - tags=common_tags, - icon=common_icon, - icon_color=common_icon_color, - color_strip_source_id=color_strip_source_id, - led_start=led_start if led_start is not None else 0, - led_end=led_end if led_end is not None else -1, - ) - elif source_type == "system_metrics": - m = metric or "cpu_load" - if m not in VALID_SYSTEM_METRICS: - raise ValueError(f"Invalid metric: {m}") - source = SystemMetricsValueSource( - id=sid, - name=name, - source_type="system_metrics", - created_at=now, - updated_at=now, - description=description, - tags=common_tags, - icon=common_icon, - icon_color=common_icon_color, - metric=m, - min_value=min_value if min_value is not None else 0.0, - max_value=max_value if max_value is not None else 100.0, - max_rate=max_rate if max_rate is not None else 125_000_000.0, - disk_path=disk_path or "", - sensor_label=sensor_label or "", - poll_interval=poll_interval if poll_interval is not None else 1.0, - smoothing=smoothing if smoothing is not None else 0.0, - ) + source = _build_value_source( + source_type=source_type, + sid=sid, + name=name, + now=now, + description=kwargs.pop("description", None), + tags=kwargs.pop("tags", None), + icon=kwargs.pop("icon", None), + icon_color=kwargs.pop("icon_color", None), + **kwargs, + ) self._items[sid] = source self._save_item(sid, source) @@ -352,187 +68,41 @@ class ValueSourceStore(BaseSqliteStore[ValueSource]): logger.info(f"Created value source: {name} ({sid}, type={source_type})") return source - def update_source( - self, - source_id: str, - name: Optional[str] = None, - value: Optional[float] = None, - waveform: Optional[str] = None, - speed: Optional[float] = None, - min_value: Optional[float] = None, - max_value: Optional[float] = None, - audio_source_id: Optional[str] = None, - mode: Optional[str] = None, - sensitivity: Optional[float] = None, - smoothing: Optional[float] = None, - description: Optional[str] = None, - schedule: Optional[list] = None, - picture_source_id: Optional[str] = None, - scene_behavior: Optional[str] = None, - auto_gain: Optional[bool] = None, - use_real_time: Optional[bool] = None, - latitude: Optional[float] = None, - longitude: Optional[float] = None, - color: Optional[list] = None, - colors: Optional[list] = None, - easing: Optional[str] = None, - tags: Optional[List[str]] = None, - ha_source_id: Optional[str] = None, - entity_id: Optional[str] = None, - attribute: Optional[str] = None, - min_ha_value: Optional[float] = None, - max_ha_value: Optional[float] = None, - value_source_id: Optional[str] = None, - gradient_id: Optional[str] = None, - color_strip_source_id: Optional[str] = None, - led_start: Optional[int] = None, - led_end: Optional[int] = None, - metric: Optional[str] = None, - max_rate: Optional[float] = None, - disk_path: Optional[str] = None, - sensor_label: Optional[str] = None, - poll_interval: Optional[float] = None, - clock_id: Optional[str] = None, - icon: Optional[str] = None, - icon_color: Optional[str] = None, - ) -> ValueSource: + def update_source(self, source_id: str, **kwargs) -> ValueSource: + """Update an existing value source. + + Common fields (name, description, tags, icon, icon_color) are + applied here; per-type field updates are delegated to + ``value_source_factories.apply_update`` (which dispatches on + ``source.source_type``). + """ source = self.get(source_id) + name = kwargs.pop("name", None) if name is not None: self._check_name_unique(name, exclude_id=source_id) source.name = name + description = kwargs.pop("description", None) if description is not None: source.description = description + + tags = kwargs.pop("tags", None) if tags is not None: source.tags = tags + + icon = kwargs.pop("icon", None) if icon is not None: source.icon = icon + + icon_color = kwargs.pop("icon_color", None) if icon_color is not None: source.icon_color = icon_color - if isinstance(source, StaticValueSource): - if value is not None: - source.value = value - elif isinstance(source, AnimatedValueSource): - if waveform is not None: - source.waveform = waveform - if speed is not None: - source.speed = speed - if min_value is not None: - source.min_value = min_value - if max_value is not None: - source.max_value = max_value - elif isinstance(source, AudioValueSource): - if audio_source_id is not None: - source.audio_source_id = resolve_ref(audio_source_id, source.audio_source_id) - if mode is not None: - source.mode = mode - if sensitivity is not None: - source.sensitivity = sensitivity - if smoothing is not None: - source.smoothing = smoothing - if min_value is not None: - source.min_value = min_value - if max_value is not None: - source.max_value = max_value - if auto_gain is not None: - source.auto_gain = auto_gain - elif isinstance(source, AdaptiveValueSource): - if schedule is not None: - if source.source_type == "adaptive_time" and len(schedule) < 2: - raise ValueError("Time of day schedule requires at least 2 points") - source.schedule = schedule - if picture_source_id is not None: - source.picture_source_id = resolve_ref(picture_source_id, source.picture_source_id) - if scene_behavior is not None: - source.scene_behavior = scene_behavior - if sensitivity is not None: - source.sensitivity = sensitivity - if smoothing is not None: - source.smoothing = smoothing - if min_value is not None: - source.min_value = min_value - if max_value is not None: - source.max_value = max_value - elif isinstance(source, DaylightValueSource): - if speed is not None: - source.speed = speed - if use_real_time is not None: - source.use_real_time = use_real_time - if latitude is not None: - source.latitude = latitude - if longitude is not None: - source.longitude = longitude - if min_value is not None: - source.min_value = min_value - if max_value is not None: - source.max_value = max_value - elif isinstance(source, StaticColorValueSource): - if color is not None and isinstance(color, list) and len(color) == 3: - source.color = color - elif isinstance(source, AnimatedColorValueSource): - if colors is not None and isinstance(colors, list): - source.colors = colors - if speed is not None: - source.speed = speed - if easing is not None: - source.easing = easing - if clock_id is not None: - source.clock_id = resolve_ref(clock_id, source.clock_id) - elif isinstance(source, AdaptiveTimeColorValueSource): - if schedule is not None: - if len(schedule) < 2: - raise ValueError("Color schedule requires at least 2 points") - source.schedule = schedule - elif isinstance(source, HAEntityValueSource): - if ha_source_id is not None: - source.ha_source_id = resolve_ref(ha_source_id, source.ha_source_id) - if entity_id is not None: - source.entity_id = entity_id - if attribute is not None: - source.attribute = attribute - if min_ha_value is not None: - source.min_ha_value = min_ha_value - if max_ha_value is not None: - source.max_ha_value = max_ha_value - if smoothing is not None: - source.smoothing = smoothing - elif isinstance(source, GradientMapValueSource): - if value_source_id is not None: - source.value_source_id = resolve_ref(value_source_id, source.value_source_id) - if gradient_id is not None: - source.gradient_id = resolve_ref(gradient_id, source.gradient_id) - if easing is not None: - source.easing = easing - elif isinstance(source, CSSExtractValueSource): - if color_strip_source_id is not None: - source.color_strip_source_id = resolve_ref( - color_strip_source_id, source.color_strip_source_id - ) - if led_start is not None: - source.led_start = led_start - if led_end is not None: - source.led_end = led_end - elif isinstance(source, SystemMetricsValueSource): - if metric is not None: - if metric not in VALID_SYSTEM_METRICS: - raise ValueError(f"Invalid metric: {metric}") - source.metric = metric - if min_value is not None: - source.min_value = min_value - if max_value is not None: - source.max_value = max_value - if max_rate is not None: - source.max_rate = max_rate - if disk_path is not None: - source.disk_path = disk_path - if sensor_label is not None: - source.sensor_label = sensor_label - if poll_interval is not None: - source.poll_interval = poll_interval - if smoothing is not None: - source.smoothing = smoothing + # Per-type field updates (and per-type validation) live in the + # factories module so adding a new value-source kind requires only + # one new builder + one new applier and no store edits. + _apply_value_source_update(source, **kwargs) source.updated_at = datetime.now(timezone.utc) self._save_item(source_id, source) diff --git a/server/tests/storage/test_value_source_factories.py b/server/tests/storage/test_value_source_factories.py new file mode 100644 index 0000000..8bac5da --- /dev/null +++ b/server/tests/storage/test_value_source_factories.py @@ -0,0 +1,251 @@ +"""Tests for the per-type ValueSource factory registry. + +Locks in the contract that ``CREATE_BUILDERS`` and ``UPDATE_APPLIERS`` +stay symmetric with storage's ``_VALUE_SOURCE_MAP``, and exercises a +couple of representative builders end-to-end. +""" + +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest + +from ledgrab.storage import value_source_factories as vsf +from ledgrab.storage.value_source import ( + AdaptiveValueSource, + HAEntityValueSource, + StaticValueSource, + _VALUE_SOURCE_MAP, +) + + +# --------------------------------------------------------------------------- +# Coverage / shape +# --------------------------------------------------------------------------- + + +def test_create_builders_cover_every_storage_kind(): + assert set(vsf.CREATE_BUILDERS.keys()) == set(_VALUE_SOURCE_MAP.keys()) + + +def test_update_appliers_cover_every_storage_kind(): + assert set(vsf.UPDATE_APPLIERS.keys()) == set(_VALUE_SOURCE_MAP.keys()) + + +def test_coverage_assertion_raises_when_create_drifts(monkeypatch): + """Removing a builder makes _assert_factory_coverage() fail loudly.""" + pruned = {k: v for k, v in vsf.CREATE_BUILDERS.items() if k != "static"} + monkeypatch.setattr(vsf, "CREATE_BUILDERS", pruned) + with pytest.raises(RuntimeError, match="static"): + vsf._assert_factory_coverage() + + +def test_coverage_assertion_raises_when_update_drifts(monkeypatch): + pruned = {k: v for k, v in vsf.UPDATE_APPLIERS.items() if k != "http"} + monkeypatch.setattr(vsf, "UPDATE_APPLIERS", pruned) + with pytest.raises(RuntimeError, match="http"): + vsf._assert_factory_coverage() + + +# --------------------------------------------------------------------------- +# build_source — representative paths +# --------------------------------------------------------------------------- + + +def _now() -> datetime: + return datetime.now(timezone.utc) + + +def test_build_source_static_uses_value_default(): + src = vsf.build_source( + source_type="static", + sid="vs_t", + name="Test", + now=_now(), + description=None, + tags=None, + icon=None, + icon_color=None, + ) + assert isinstance(src, StaticValueSource) + assert src.value == 1.0 + assert src.source_type == "static" + + +def test_build_source_static_honours_supplied_value(): + src = vsf.build_source( + source_type="static", + sid="vs_t", + name="Test", + now=_now(), + description=None, + tags=None, + icon=None, + icon_color=None, + value=0.42, + ) + assert src.value == 0.42 + + +def test_build_source_adaptive_time_requires_schedule_with_two_points(): + with pytest.raises(ValueError, match="Time of day schedule"): + vsf.build_source( + source_type="adaptive_time", + sid="vs_t", + name="Test", + now=_now(), + description=None, + tags=None, + icon=None, + icon_color=None, + schedule=[{"time": "00:00", "value": 0.5}], # only one point + ) + + +def test_build_source_adaptive_scene_routes_to_adaptive_class_with_scene_type(): + """``adaptive_scene`` and ``adaptive_time`` share AdaptiveValueSource but + differ in source_type — the builder must set the right discriminator.""" + src = vsf.build_source( + source_type="adaptive_scene", + sid="vs_t", + name="Test", + now=_now(), + description=None, + tags=None, + icon=None, + icon_color=None, + picture_source_id="pic_42", + ) + assert isinstance(src, AdaptiveValueSource) + assert src.source_type == "adaptive_scene" + assert src.picture_source_id == "pic_42" + + +def test_build_source_ha_entity_requires_ha_source_id_and_entity_id(): + with pytest.raises(ValueError, match="HA source ID"): + vsf.build_source( + source_type="ha_entity", + sid="vs_t", + name="Test", + now=_now(), + description=None, + tags=None, + icon=None, + icon_color=None, + entity_id="light.foo", # missing ha_source_id + ) + + with pytest.raises(ValueError, match="Entity ID"): + vsf.build_source( + source_type="ha_entity", + sid="vs_t", + name="Test", + now=_now(), + description=None, + tags=None, + icon=None, + icon_color=None, + ha_source_id="ha_main", # missing entity_id + ) + + +def test_build_source_http_validates_interval(): + with pytest.raises(ValueError, match="interval_s"): + vsf.build_source( + source_type="http", + sid="vs_t", + name="Test", + now=_now(), + description=None, + tags=None, + icon=None, + icon_color=None, + http_endpoint_id="ep_1", + interval_s=0, + ) + + +def test_build_source_unknown_type_raises(): + with pytest.raises(ValueError, match="Invalid source type"): + vsf.build_source( + source_type="totally_unregistered", + sid="vs_t", + name="Test", + now=_now(), + description=None, + tags=None, + icon=None, + icon_color=None, + ) + + +# --------------------------------------------------------------------------- +# apply_update — representative paths +# --------------------------------------------------------------------------- + + +def test_apply_update_static_mutates_value(): + src = vsf.build_source( + source_type="static", + sid="vs_t", + name="Test", + now=_now(), + description=None, + tags=None, + icon=None, + icon_color=None, + ) + vsf.apply_update(src, value=0.75) + assert src.value == 0.75 + + +def test_apply_update_ignores_unknown_kwargs(): + """Stray kwargs are silently dropped by each applier's ``**_``.""" + src = vsf.build_source( + source_type="static", + sid="vs_t", + name="Test", + now=_now(), + description=None, + tags=None, + icon=None, + icon_color=None, + ) + vsf.apply_update(src, value=0.5, totally_made_up_field="ignored") + assert src.value == 0.5 + + +def test_apply_update_ha_entity_validates_smoothing_path(): + src = vsf.build_source( + source_type="ha_entity", + sid="vs_t", + name="Test", + now=_now(), + description=None, + tags=None, + icon=None, + icon_color=None, + ha_source_id="ha_main", + entity_id="light.foo", + ) + assert isinstance(src, HAEntityValueSource) + vsf.apply_update(src, smoothing=0.5, attribute="brightness") + assert src.smoothing == 0.5 + assert src.attribute == "brightness" + + +def test_apply_update_http_rejects_zero_interval(): + src = vsf.build_source( + source_type="http", + sid="vs_t", + name="Test", + now=_now(), + description=None, + tags=None, + icon=None, + icon_color=None, + http_endpoint_id="ep_1", + ) + with pytest.raises(ValueError, match="interval_s"): + vsf.apply_update(src, interval_s=0)