From 563cbac88cce5af31168c72646a898402f206ed4 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Fri, 22 May 2026 22:45:28 +0300 Subject: [PATCH] refactor(storage,processing): kind registries + versioned data migrations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two CRITICAL data-safety bugs from the architecture audit and the two worst parallel-change problems are fixed in one coherent pass. Audit findings addressed: - C2 silent CSS response fallback. The previous _RESPONSE_MAP fell through to a fabricated PictureCSSResponse whenever a source class lacked an entry; in particular game_event sources were silently mis-shaped. Now: GameEventCSSResponse/Create/Update schemas exist, _RESPONSE_MAP is re-keyed by source_type string, an import-time _assert_response_map_coverage() requires symmetric agreement with storage._SOURCE_TYPE_MAP, and the runtime path raises instead of fabricating a response. - C11 string-replace JSON migration. ColorStripStore used blob.replace('"source_type": "static"', '"source_type": "single_color"') which can corrupt unrelated substrings (e.g. an animation type named "static_wave") and provides no audit, no transaction, no idempotency. Replaced with storage.data_migrations.MigrationRunner backed by a data_migrations audit table. Each migration runs inside one db.transaction() that covers the applied-check, the apply(), and the audit-INSERT — partial failures roll back atomically. StaticToSingleColorMigration parses each row with json.loads and mutates only the source_type field. Frozen-write databases skip with a warning. - C3+C4 color-strip stream dispatch. The 7-branch elif in ColorStripStreamManager.acquire() and the duplicate one in ws_stream._create_stream() now share a single STREAM_BUILDERS registry in core.processing.color_strip_kinds, keyed by source.source_type. Both call sites populate a StreamDeps bag and delegate to build_stream(). _assert_stream_kind_coverage() asserts at import that STREAM_BUILDERS plus SHARABLE_KINDS partitions storage._SOURCE_TYPE_MAP. ws_stream's preview path wraps each FastAPI-DI getter in _safe() so non-audio previews no longer crash when audio/CSPT stores are not wired. - C6+C7 value stream dispatch. The 14-branch isinstance ladder in ValueStreamManager._create_stream and its silent StaticValueStream(value=1.0) fallback are replaced by core.processing.value_kinds.STREAM_BUILDERS, keyed by source_type string (so AdaptiveValueSource's adaptive_time and adaptive_scene route to different builders correctly). The manager retains only the SyncClockRuntime pre-acquisition step for animated_color (kinds needing this are listed explicitly in NEEDS_CLOCK_RUNTIME). Symmetric coverage assertion plus a separate assertion that NEEDS_CLOCK_RUNTIME is a subset of the registry. Bundled in: the static->single_color rename plus the HTTPValueStream / http_endpoint introduction that were already in flight on this branch share these files; the registry refactor naturally absorbs both via the new "single_color" / "static" alias entries and the _build_http builder. Tests: 26 new tests cover response-map coverage drift, migration runner audit-table mechanics + transactional rollback + frozen-write skip, and the two stream-builder registries. 343 existing storage / API / e2e tests stay green. Ruff clean. --- .../routes/color_strip_sources/_helpers.py | 123 ++++-- .../routes/color_strip_sources/ws_stream.py | 119 ++--- .../api/schemas/color_strip_sources.py | 48 ++- .../core/processing/color_strip_kinds.py | 273 ++++++++++++ .../processing/color_strip_stream_manager.py | 85 ++-- .../ledgrab/core/processing/value_kinds.py | 354 +++++++++++++++ .../ledgrab/core/processing/value_stream.py | 408 +++++++++++------- .../src/ledgrab/storage/base_sqlite_store.py | 9 + .../src/ledgrab/storage/color_strip_store.py | 8 +- server/src/ledgrab/storage/data_migrations.py | 219 ++++++++++ .../test_color_strip_sources_response_map.py | 85 ++++ .../core/processing/test_color_strip_kinds.py | 97 +++++ .../tests/core/processing/test_value_kinds.py | 64 +++ .../test_color_strip_store_migration.py | 90 ++++ server/tests/storage/test_data_migrations.py | 309 +++++++++++++ 15 files changed, 1976 insertions(+), 315 deletions(-) create mode 100644 server/src/ledgrab/core/processing/color_strip_kinds.py create mode 100644 server/src/ledgrab/core/processing/value_kinds.py create mode 100644 server/src/ledgrab/storage/data_migrations.py create mode 100644 server/tests/api/routes/test_color_strip_sources_response_map.py create mode 100644 server/tests/core/processing/test_color_strip_kinds.py create mode 100644 server/tests/core/processing/test_value_kinds.py create mode 100644 server/tests/storage/test_color_strip_store_migration.py create mode 100644 server/tests/storage/test_data_migrations.py diff --git a/server/src/ledgrab/api/routes/color_strip_sources/_helpers.py b/server/src/ledgrab/api/routes/color_strip_sources/_helpers.py index 4a18d2f..f07a060 100644 --- a/server/src/ledgrab/api/routes/color_strip_sources/_helpers.py +++ b/server/src/ledgrab/api/routes/color_strip_sources/_helpers.py @@ -9,6 +9,7 @@ from ledgrab.api.schemas.color_strip_sources import ( CompositeCSSResponse, DaylightCSSResponse, EffectCSSResponse, + GameEventCSSResponse, GradientCSSResponse, KeyColorsCSSResponse, MappedCSSResponse, @@ -17,7 +18,7 @@ from ledgrab.api.schemas.color_strip_sources import ( PictureAdvancedCSSResponse, PictureCSSResponse, ProcessedCSSResponse, - StaticCSSResponse, + SingleColorCSSResponse, WeatherCSSResponse, ) from ledgrab.api.schemas.devices import Calibration as CalibrationSchema @@ -26,22 +27,7 @@ from ledgrab.core.capture.calibration import ( calibration_to_dict, ) from ledgrab.storage.color_strip_source import ( - AdvancedPictureColorStripSource, - ApiInputColorStripSource, - AudioColorStripSource, - CandlelightColorStripSource, - CompositeColorStripSource, - DaylightColorStripSource, - EffectColorStripSource, - GradientColorStripSource, - KeyColorsColorStripSource, - MappedColorStripSource, - MathWaveColorStripSource, - NotificationColorStripSource, - PictureColorStripSource, - ProcessedColorStripSource, - StaticColorStripSource, - WeatherColorStripSource, + _SOURCE_TYPE_MAP as _STORAGE_TYPE_MAP, ) from ledgrab.storage.picture_source import ( ProcessedPictureSource, @@ -94,34 +80,46 @@ def _stops_schema(source) -> list[ColorStopSchema] | None: return None -# Maps storage class → response builder lambda. +# Maps ``source_type`` string → response builder. +# +# Keying by source_type (rather than type(source)) lets the import-time +# coverage check use the storage registry's keys directly, with no +# inversion or duplicate-class handling for legacy aliases. _RESPONSE_MAP: dict = { - PictureColorStripSource: lambda s, kw: PictureCSSResponse( + "picture": lambda s, kw: PictureCSSResponse( **kw, picture_source_id=s.picture_source_id, smoothing=s.smoothing.to_dict(), interpolation_mode=s.interpolation_mode, calibration=_calibration_schema(s), ), - AdvancedPictureColorStripSource: lambda s, kw: PictureAdvancedCSSResponse( + "picture_advanced": lambda s, kw: PictureAdvancedCSSResponse( **kw, smoothing=s.smoothing.to_dict(), interpolation_mode=s.interpolation_mode, calibration=_calibration_schema(s), ), - StaticColorStripSource: lambda s, kw: StaticCSSResponse( + "single_color": lambda s, kw: SingleColorCSSResponse( **kw, color=s.color.to_dict(), animation=s.animation, ), - GradientColorStripSource: lambda s, kw: GradientCSSResponse( + # Legacy alias: pre-rename rows used "static"; the data migration rewrites + # them on first store load but a stale in-flight instance would still + # carry source_type='static' until the next reload. + "static": lambda s, kw: SingleColorCSSResponse( + **kw, + color=s.color.to_dict(), + animation=s.animation, + ), + "gradient": lambda s, kw: GradientCSSResponse( **kw, stops=_stops_schema(s), animation=s.animation, easing=s.easing, gradient_id=s.gradient_id, ), - EffectColorStripSource: lambda s, kw: EffectCSSResponse( + "effect": lambda s, kw: EffectCSSResponse( **kw, effect_type=s.effect_type, palette=s.palette, @@ -132,15 +130,15 @@ _RESPONSE_MAP: dict = { mirror=s.mirror, custom_palette=s.custom_palette, ), - CompositeColorStripSource: lambda s, kw: CompositeCSSResponse( + "composite": lambda s, kw: CompositeCSSResponse( **kw, layers=[dict(layer) for layer in s.layers], ), - MappedColorStripSource: lambda s, kw: MappedCSSResponse( + "mapped": lambda s, kw: MappedCSSResponse( **kw, zones=[dict(z) for z in s.zones], ), - AudioColorStripSource: lambda s, kw: AudioCSSResponse( + "audio": lambda s, kw: AudioCSSResponse( **kw, visualization_mode=s.visualization_mode, audio_source_id=s.audio_source_id, @@ -153,13 +151,13 @@ _RESPONSE_MAP: dict = { mirror=s.mirror, beat_decay=s.beat_decay.to_dict(), ), - ApiInputColorStripSource: lambda s, kw: ApiInputCSSResponse( + "api_input": lambda s, kw: ApiInputCSSResponse( **kw, fallback_color=s.fallback_color.to_dict(), timeout=s.timeout.to_dict(), interpolation=s.interpolation, ), - NotificationColorStripSource: lambda s, kw: NotificationCSSResponse( + "notification": lambda s, kw: NotificationCSSResponse( **kw, notification_effect=s.notification_effect, duration_ms=s.duration_ms.to_dict(), @@ -172,14 +170,14 @@ _RESPONSE_MAP: dict = { sound_volume=s.sound_volume.to_dict(), app_sounds=dict(s.app_sounds), ), - DaylightColorStripSource: lambda s, kw: DaylightCSSResponse( + "daylight": lambda s, kw: DaylightCSSResponse( **kw, speed=s.speed.to_dict(), use_real_time=s.use_real_time, latitude=s.latitude, longitude=s.longitude, ), - CandlelightColorStripSource: lambda s, kw: CandlelightCSSResponse( + "candlelight": lambda s, kw: CandlelightCSSResponse( **kw, color=s.color.to_dict(), intensity=s.intensity.to_dict(), @@ -188,18 +186,18 @@ _RESPONSE_MAP: dict = { wind_strength=s.wind_strength.to_dict(), candle_type=s.candle_type, ), - ProcessedColorStripSource: lambda s, kw: ProcessedCSSResponse( + "processed": lambda s, kw: ProcessedCSSResponse( **kw, input_source_id=s.input_source_id, processing_template_id=s.processing_template_id, ), - WeatherColorStripSource: lambda s, kw: WeatherCSSResponse( + "weather": lambda s, kw: WeatherCSSResponse( **kw, weather_source_id=s.weather_source_id, speed=s.speed.to_dict(), temperature_influence=s.temperature_influence.to_dict(), ), - KeyColorsColorStripSource: lambda s, kw: KeyColorsCSSResponse( + "key_colors": lambda s, kw: KeyColorsCSSResponse( **kw, picture_source_id=s.picture_source_id, rectangles=[r.to_dict() for r in s.rectangles], @@ -207,28 +205,67 @@ _RESPONSE_MAP: dict = { smoothing=s.smoothing.to_dict(), brightness=s.brightness.to_dict(), ), - MathWaveColorStripSource: lambda s, kw: MathWaveCSSResponse( + "math_wave": lambda s, kw: MathWaveCSSResponse( **kw, waves=s.waves, speed=s.speed.to_dict(), gradient_id=s.gradient_id, ), + "game_event": lambda s, kw: GameEventCSSResponse( + **kw, + game_integration_id=s.game_integration_id, + idle_color=s.idle_color.to_dict(), + event_mappings=[dict(m) for m in s.event_mappings], + ), } +def _assert_response_map_coverage() -> None: + """Verify _RESPONSE_MAP has a builder for every kind in storage's registry. + + Runs at module import. Surfaces missing builders eagerly instead of + letting a request fall through to a silent / wrong response shape. + + Contract note + ------------- + This check is **symmetric** (``_RESPONSE_MAP keys == storage_kinds``) + because every kind — sharable or not — needs a response shape. The + sister assertion in + ``core/processing/color_strip_kinds.py::_assert_stream_kind_coverage`` + is asymmetric because sharable kinds construct their streams via a + different path. Adding a new kind requires keeping all three registries + aligned: storage's ``_SOURCE_TYPE_MAP``, this ``_RESPONSE_MAP``, and + either ``STREAM_BUILDERS`` or ``SHARABLE_KINDS``. + """ + storage_kinds = set(_STORAGE_TYPE_MAP.keys()) + builder_kinds = set(_RESPONSE_MAP.keys()) + missing = storage_kinds - builder_kinds + extra = builder_kinds - storage_kinds + if missing or extra: + problems = [] + if missing: + problems.append(f"missing builders for: {sorted(missing)}") + if extra: + problems.append(f"unregistered kinds in _RESPONSE_MAP: {sorted(extra)}") + raise RuntimeError( + "_RESPONSE_MAP is out of sync with storage._SOURCE_TYPE_MAP: " + "; ".join(problems) + ) + + +_assert_response_map_coverage() + + def _css_to_response(source, overlay_active: bool = False) -> ColorStripSourceResponse: """Convert a ColorStripSource to the matching per-type response schema.""" kw = _common_response_kwargs(source, overlay_active) - builder = _RESPONSE_MAP.get(type(source)) + builder = _RESPONSE_MAP.get(source.source_type) if builder is None: - # Fallback: use to_dict() and build a PictureCSSResponse - logger.warning("No response builder for %s, falling back", type(source).__name__) - return PictureCSSResponse( - **kw, - picture_source_id="", - smoothing=0.3, - interpolation_mode="average", - calibration=None, + # Coverage is asserted at import time, so reaching this branch means a + # source was loaded with a source_type that is not registered. + # Surface the bug instead of silently returning a wrong-shaped response. + raise RuntimeError( + f"No CSS response builder registered for source_type " + f"{source.source_type!r} (class={type(source).__name__})" ) return builder(source, kw) diff --git a/server/src/ledgrab/api/routes/color_strip_sources/ws_stream.py b/server/src/ledgrab/api/routes/color_strip_sources/ws_stream.py index c00cee4..6342280 100644 --- a/server/src/ledgrab/api/routes/color_strip_sources/ws_stream.py +++ b/server/src/ledgrab/api/routes/color_strip_sources/ws_stream.py @@ -29,7 +29,7 @@ router = APIRouter() _PREVIEW_ALLOWED_TYPES = { - "static", + "single_color", "gradient", "effect", "daylight", @@ -97,65 +97,65 @@ async def preview_color_strip_ws( return ColorStripSource.from_dict(config) def _create_stream(source): - """Instantiate and start the appropriate stream class for *source*.""" - from ledgrab.core.processing.color_strip_stream_manager import _SIMPLE_STREAM_MAP + """Instantiate and start the appropriate stream class for *source*. + + Delegates the per-kind dispatch to ``color_strip_kinds.build_stream`` + so this preview path and the production ``ColorStripStreamManager`` + share a single registry. Per-kind dependencies (CSPT store, audio + stores, weather manager, …) are gathered into a ``StreamDeps`` bag. + + FastAPI-DI providers raise ``RuntimeError`` when they aren't wired, + so we resolve each one through ``_safe`` and pass ``None`` on + failure. The per-kind builder will still see a clear error if a + truly-required dep is missing for that kind, but unrelated previews + (e.g. a ``single_color`` preview on a fresh install where the CSPT + store isn't initialized yet) keep working. + """ + from ledgrab.api.dependencies import ( + get_audio_processing_template_store, + get_audio_source_store, + get_audio_template_store, + get_cspt_store, + ) + from ledgrab.core.processing.color_strip_kinds import StreamDeps, build_stream + + def _safe(getter): + try: + return getter() + except RuntimeError as e: + logger.debug("Preview dep not available (%s): %s", getter.__name__, e) + return None mgr = get_processor_manager() csm = mgr.color_strip_stream_manager - if source.source_type == "audio": - from ledgrab.api.dependencies import ( - get_audio_processing_template_store, - get_audio_source_store, - get_audio_template_store, - ) - from ledgrab.core.processing.audio_stream import AudioColorStripStream + # The game-event bus is optional in preview contexts. + try: + from ledgrab.api.dependencies import get_game_event_bus - s = AudioColorStripStream( - source, - mgr.audio_capture_manager, - get_audio_source_store(), - get_audio_template_store(), - get_audio_processing_template_store(), - ) - elif source.source_type == "weather": - from ledgrab.core.processing.weather_stream import WeatherColorStripStream + game_event_bus = get_game_event_bus() + except RuntimeError as e: + logger.debug("Preview: no game event bus available: %s", e) + game_event_bus = None - s = WeatherColorStripStream(source, mgr.weather_manager) - elif source.source_type == "game_event": - from ledgrab.core.processing.game_event_stream import GameEventColorStripStream - - s = GameEventColorStripStream(source) - try: - from ledgrab.api.dependencies import get_game_event_bus - - bus = get_game_event_bus() - except RuntimeError as e: - logger.debug("Preview: no game event bus available: %s", e) - else: - if bus is not None: - s.set_event_bus(bus) - elif source.source_type == "mapped": - from ledgrab.core.processing.mapped_stream import MappedColorStripStream - - s = MappedColorStripStream(source, csm) - elif source.source_type == "composite": - from ledgrab.api.dependencies import get_cspt_store - from ledgrab.core.processing.composite_stream import CompositeColorStripStream - - s = CompositeColorStripStream( - source, csm, mgr.value_stream_manager, get_cspt_store(), depth=0 - ) - elif source.source_type == "processed": - from ledgrab.api.dependencies import get_cspt_store - from ledgrab.core.processing.processed_stream import ProcessedColorStripStream - - s = ProcessedColorStripStream(source, csm, get_cspt_store()) - else: - stream_cls = _SIMPLE_STREAM_MAP.get(source.source_type) - if not stream_cls: - raise ValueError(f"Unsupported preview source_type: {source.source_type}") - s = stream_cls(source) + deps = StreamDeps( + css_manager=csm, + value_stream_manager=mgr.value_stream_manager, + cspt_store=_safe(get_cspt_store), + weather_manager=mgr.weather_manager, + audio_capture_manager=mgr.audio_capture_manager, + audio_source_store=_safe(get_audio_source_store), + audio_template_store=_safe(get_audio_template_store), + audio_processing_template_store=_safe(get_audio_processing_template_store), + game_event_bus=game_event_bus, + depth=0, + ) + try: + s = build_stream(source, deps) + except ValueError as e: + # Preserve the registry's original detail so the API consumer + # sees which kind was rejected, not just a generic message. + raise ValueError(f"Unsupported preview source_type: {e}") from e # Inject gradient store for palette resolution if hasattr(s, "set_gradient_store"): try: @@ -428,8 +428,17 @@ async def css_api_input_ws( continue elif "bytes" in message: - # Binary frame: raw RGBRGB... bytes (3 bytes per LED) + # Binary frame: raw RGBRGB... bytes (3 bytes per LED). + # Cap to a generous upper bound on the LED count — a hostile + # client could otherwise stream 100 MB frames and OOM the + # server before any application logic ran. raw_bytes = message["bytes"] + _MAX_BINARY_LEDS = 8192 + if len(raw_bytes) > _MAX_BINARY_LEDS * 3: + await websocket.send_json( + {"error": f"Binary frame too large (max {_MAX_BINARY_LEDS} LEDs)"} + ) + continue if len(raw_bytes) % 3 != 0: await websocket.send_json({"error": "Binary data must be multiple of 3 bytes"}) continue diff --git a/server/src/ledgrab/api/schemas/color_strip_sources.py b/server/src/ledgrab/api/schemas/color_strip_sources.py index 6a19f56..bd4446f 100644 --- a/server/src/ledgrab/api/schemas/color_strip_sources.py +++ b/server/src/ledgrab/api/schemas/color_strip_sources.py @@ -122,9 +122,9 @@ class PictureAdvancedCSSResponse(_CSSResponseBase): calibration: Optional[Calibration] = Field(None, description="LED calibration") -class StaticCSSResponse(_CSSResponseBase): - source_type: Literal["static"] = "static" - color: Any = Field(description="Static RGB color") +class SingleColorCSSResponse(_CSSResponseBase): + source_type: Literal["single_color"] = "single_color" + color: Any = Field(description="Solid RGB color") animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config") @@ -240,11 +240,18 @@ class MathWaveCSSResponse(_CSSResponseBase): gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping") +class GameEventCSSResponse(_CSSResponseBase): + source_type: Literal["game_event"] = "game_event" + game_integration_id: str = Field(description="Game integration entity ID") + idle_color: Any = Field(description="Idle RGB color (bindable)") + event_mappings: List[dict] = Field(default_factory=list, description="Event-to-effect mappings") + + ColorStripSourceResponse = Annotated[ Union[ Annotated[PictureCSSResponse, Tag("picture")], Annotated[PictureAdvancedCSSResponse, Tag("picture_advanced")], - Annotated[StaticCSSResponse, Tag("static")], + Annotated[SingleColorCSSResponse, Tag("single_color")], Annotated[GradientCSSResponse, Tag("gradient")], Annotated[EffectCSSResponse, Tag("effect")], Annotated[CompositeCSSResponse, Tag("composite")], @@ -258,6 +265,7 @@ ColorStripSourceResponse = Annotated[ Annotated[WeatherCSSResponse, Tag("weather")], Annotated[KeyColorsCSSResponse, Tag("key_colors")], Annotated[MathWaveCSSResponse, Tag("math_wave")], + Annotated[GameEventCSSResponse, Tag("game_event")], ], Discriminator("source_type"), ] @@ -303,9 +311,9 @@ class PictureAdvancedCSSCreate(_CSSCreateBase): calibration: Optional[Calibration] = Field(None, description="LED calibration") -class StaticCSSCreate(_CSSCreateBase): - source_type: Literal["static"] = "static" - color: Any = Field(default=None, description="Static RGB color [R,G,B]") +class SingleColorCSSCreate(_CSSCreateBase): + source_type: Literal["single_color"] = "single_color" + color: Any = Field(default=None, description="Solid RGB color [R,G,B]") animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config") @@ -434,11 +442,18 @@ class MathWaveCSSCreate(_CSSCreateBase): gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping") +class GameEventCSSCreate(_CSSCreateBase): + source_type: Literal["game_event"] = "game_event" + game_integration_id: Optional[str] = Field(None, description="Game integration entity ID") + idle_color: Any = Field(default=None, description="Idle RGB color [R,G,B] (bindable)") + event_mappings: Optional[List[dict]] = Field(None, description="Event-to-effect mappings") + + ColorStripSourceCreate = Annotated[ Union[ Annotated[PictureCSSCreate, Tag("picture")], Annotated[PictureAdvancedCSSCreate, Tag("picture_advanced")], - Annotated[StaticCSSCreate, Tag("static")], + Annotated[SingleColorCSSCreate, Tag("single_color")], Annotated[GradientCSSCreate, Tag("gradient")], Annotated[EffectCSSCreate, Tag("effect")], Annotated[CompositeCSSCreate, Tag("composite")], @@ -452,6 +467,7 @@ ColorStripSourceCreate = Annotated[ Annotated[WeatherCSSCreate, Tag("weather")], Annotated[KeyColorsCSSCreate, Tag("key_colors")], Annotated[MathWaveCSSCreate, Tag("math_wave")], + Annotated[GameEventCSSCreate, Tag("game_event")], ], Discriminator("source_type"), ] @@ -497,9 +513,9 @@ class PictureAdvancedCSSUpdate(_CSSUpdateBase): calibration: Optional[Calibration] = Field(None, description="LED calibration") -class StaticCSSUpdate(_CSSUpdateBase): - source_type: Literal["static"] = "static" - color: Any = Field(default=None, description="Static RGB color [R,G,B]") +class SingleColorCSSUpdate(_CSSUpdateBase): + source_type: Literal["single_color"] = "single_color" + color: Any = Field(default=None, description="Solid RGB color [R,G,B]") animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config") @@ -626,11 +642,18 @@ class MathWaveCSSUpdate(_CSSUpdateBase): gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping") +class GameEventCSSUpdate(_CSSUpdateBase): + source_type: Literal["game_event"] = "game_event" + game_integration_id: Optional[str] = Field(None, description="Game integration entity ID") + idle_color: Any = Field(default=None, description="Idle RGB color [R,G,B] (bindable)") + event_mappings: Optional[List[dict]] = Field(None, description="Event-to-effect mappings") + + ColorStripSourceUpdate = Annotated[ Union[ Annotated[PictureCSSUpdate, Tag("picture")], Annotated[PictureAdvancedCSSUpdate, Tag("picture_advanced")], - Annotated[StaticCSSUpdate, Tag("static")], + Annotated[SingleColorCSSUpdate, Tag("single_color")], Annotated[GradientCSSUpdate, Tag("gradient")], Annotated[EffectCSSUpdate, Tag("effect")], Annotated[CompositeCSSUpdate, Tag("composite")], @@ -644,6 +667,7 @@ ColorStripSourceUpdate = Annotated[ Annotated[WeatherCSSUpdate, Tag("weather")], Annotated[KeyColorsCSSUpdate, Tag("key_colors")], Annotated[MathWaveCSSUpdate, Tag("math_wave")], + Annotated[GameEventCSSUpdate, Tag("game_event")], ], Discriminator("source_type"), ] diff --git a/server/src/ledgrab/core/processing/color_strip_kinds.py b/server/src/ledgrab/core/processing/color_strip_kinds.py new file mode 100644 index 0000000..807c701 --- /dev/null +++ b/server/src/ledgrab/core/processing/color_strip_kinds.py @@ -0,0 +1,273 @@ +"""Single source of truth for non-sharable color-strip stream construction. + +Both the preview WebSocket (``api/routes/color_strip_sources/ws_stream.py``) +and the production ``ColorStripStreamManager.acquire`` used to maintain +parallel ``if source.source_type == "..." elif ... else ..._SIMPLE_STREAM_MAP`` +chains. Adding a new kind required keeping those two chains in lockstep, and +silently fell through to a generic stream class when an entry was missed. + +This module replaces both chains with a single ``STREAM_BUILDERS`` registry +plus a small ``StreamDeps`` dependency bag. Each caller populates the bag +from its own context (DI container, processor manager, etc.) and looks the +builder up by ``source.source_type``. A coverage assertion at import time +guarantees every kind in ``storage._SOURCE_TYPE_MAP`` is either sharable or +has a builder here — silent fall-throughs are no longer possible. + +Sharable kinds (``picture``, ``picture_advanced``, ``key_colors``) are NOT in +this registry: they require an injected ``LiveStream`` whose acquisition is +intertwined with the source's calibration, which does not fit a uniform +factory signature. Those continue to use bespoke paths inside +``ColorStripStreamManager``. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Callable + + +@dataclass(frozen=True) +class StreamDeps: + """Dependency bag for non-sharable stream construction. + + Each call site (preview WebSocket, production stream manager) builds one + of these from its own context before invoking :func:`build_stream`. + Fields are ``Any`` (with ``None`` defaults) because individual builders + only consume a subset; tests can supply a minimal bag. + + ``frozen=True`` guards against a builder accidentally reassigning a + field; it does NOT make the referenced objects immutable — the + ``css_manager``, stores, etc. are live mutable services. + + ``css_manager`` is needed by composite/mapped/processed builders so they + can recursively acquire dependent streams. Single-kind builders ignore + it. The field has no default so callers are forced to think about which + manager they are wiring through. + """ + + css_manager: Any + value_stream_manager: Any = None + cspt_store: Any = None # ColorStripProcessingTemplateStore + weather_manager: Any = None + audio_capture_manager: Any = None + audio_source_store: Any = None + audio_template_store: Any = None + audio_processing_template_store: Any = None + game_event_bus: Any = None + depth: int = 0 # composite nesting depth — passed through verbatim + + +# --------------------------------------------------------------------------- +# Per-kind builders +# +# Each builder is a small free function ``(source, deps) -> ColorStripStream``. +# Imports are deferred to keep this module cheap to import (the production +# processing graph is large). +# --------------------------------------------------------------------------- + + +def _build_audio(source, d: StreamDeps): + from ledgrab.core.processing.audio_stream import AudioColorStripStream + + return AudioColorStripStream( + source, + d.audio_capture_manager, + d.audio_source_store, + d.audio_template_store, + d.audio_processing_template_store, + ) + + +def _build_composite(source, d: StreamDeps): + from ledgrab.core.processing.composite_stream import CompositeColorStripStream + + return CompositeColorStripStream( + source, + d.css_manager, + d.value_stream_manager, + d.cspt_store, + depth=d.depth, + ) + + +def _build_mapped(source, d: StreamDeps): + from ledgrab.core.processing.mapped_stream import MappedColorStripStream + + return MappedColorStripStream(source, d.css_manager) + + +def _build_processed(source, d: StreamDeps): + from ledgrab.core.processing.processed_stream import ProcessedColorStripStream + + return ProcessedColorStripStream(source, d.css_manager, d.cspt_store) + + +def _build_weather(source, d: StreamDeps): + from ledgrab.core.processing.weather_stream import WeatherColorStripStream + + return WeatherColorStripStream(source, d.weather_manager) + + +def _build_game_event(source, d: StreamDeps): + from ledgrab.core.processing.game_event_stream import GameEventColorStripStream + + stream = GameEventColorStripStream(source) + if d.game_event_bus is not None: + stream.set_event_bus(d.game_event_bus) + return stream + + +def _make_source_only_builder(loader: Callable[[], type]) -> Callable[[Any, StreamDeps], Any]: + """Wrap a class-loader so it produces a uniform ``(source, deps) -> stream`` builder. + + The loader is called on each invocation but module caching makes the + import a single dict lookup after the first call. + """ + + def _build(source, _deps: StreamDeps): + return loader()(source) + + return _build + + +def _single_color_cls() -> type: + from ledgrab.core.processing.color_strip_stream import SingleColorStripStream + + return SingleColorStripStream + + +def _gradient_cls() -> type: + from ledgrab.core.processing.color_strip_stream import GradientColorStripStream + + return GradientColorStripStream + + +def _effect_cls() -> type: + from ledgrab.core.processing.effect_stream import EffectColorStripStream + + return EffectColorStripStream + + +def _api_input_cls() -> type: + from ledgrab.core.processing.api_input_stream import ApiInputColorStripStream + + return ApiInputColorStripStream + + +def _notification_cls() -> type: + from ledgrab.core.processing.notification_stream import NotificationColorStripStream + + return NotificationColorStripStream + + +def _daylight_cls() -> type: + from ledgrab.core.processing.daylight_stream import DaylightColorStripStream + + return DaylightColorStripStream + + +def _candlelight_cls() -> type: + from ledgrab.core.processing.candlelight_stream import CandlelightColorStripStream + + return CandlelightColorStripStream + + +def _math_wave_cls() -> type: + from ledgrab.core.processing.math_wave_stream import MathWaveColorStripStream + + return MathWaveColorStripStream + + +# --------------------------------------------------------------------------- +# Registry +# --------------------------------------------------------------------------- + + +StreamBuilder = Callable[[Any, StreamDeps], Any] + +STREAM_BUILDERS: dict[str, StreamBuilder] = { + "audio": _build_audio, + "composite": _build_composite, + "mapped": _build_mapped, + "processed": _build_processed, + "weather": _build_weather, + "game_event": _build_game_event, + "single_color": _make_source_only_builder(_single_color_cls), + # Legacy alias: pre-rename rows used "static". The data migration rewrites + # the on-disk source_type on startup, but this alias keeps an in-flight + # legacy entry resolving to the right stream class. + "static": _make_source_only_builder(_single_color_cls), + "gradient": _make_source_only_builder(_gradient_cls), + "effect": _make_source_only_builder(_effect_cls), + "api_input": _make_source_only_builder(_api_input_cls), + "notification": _make_source_only_builder(_notification_cls), + "daylight": _make_source_only_builder(_daylight_cls), + "candlelight": _make_source_only_builder(_candlelight_cls), + "math_wave": _make_source_only_builder(_math_wave_cls), +} + + +# Sharable kinds are handled by dedicated LiveStream-acquisition paths in +# ColorStripStreamManager (their construction depends on calibration → picture +# source resolution, which does not fit a uniform factory signature). +SHARABLE_KINDS: frozenset[str] = frozenset({"picture", "picture_advanced", "key_colors"}) + + +def build_stream(source, deps: StreamDeps): + """Build a non-sharable color-strip stream for *source*. + + Raises ``ValueError`` if the kind has no registered builder (which + would indicate a sharable source slipped through the caller's + ``sharable`` gate, or a new kind missing from this registry). + """ + builder = STREAM_BUILDERS.get(source.source_type) + if builder is None: + raise ValueError( + f"No stream builder for non-sharable color-strip-source kind " + f"{source.source_type!r} (id={getattr(source, 'id', '?')!r})" + ) + return builder(source, deps) + + +def _assert_stream_kind_coverage() -> None: + """Verify the registry is a strict partition: every kind from storage is + either listed in SHARABLE_KINDS or has a STREAM_BUILDERS entry. + + Runs at module import so a kind added to ``_SOURCE_TYPE_MAP`` without a + corresponding builder fails the server boot loudly instead of silently + falling through at request time. + + Contract note + ------------- + This check is **asymmetric** (``STREAM_BUILDERS ∪ SHARABLE_KINDS == + storage_kinds``) because sharable kinds are constructed by a separate + path inside ``ColorStripStreamManager``. The sister assertion in + ``api/routes/color_strip_sources/_helpers.py::_assert_response_map_coverage`` + is **symmetric** (``_RESPONSE_MAP keys == storage_kinds``) because every + kind, sharable or not, still needs a response shape. Both assertions key + by the ``source_type`` string; adding a new kind requires updates to + storage, ``_RESPONSE_MAP``, and either ``STREAM_BUILDERS`` or + ``SHARABLE_KINDS``. Both assertions catch missing entries; only this one + expects a subset relationship. + """ + from ledgrab.storage.color_strip_source import _SOURCE_TYPE_MAP + + storage_kinds = set(_SOURCE_TYPE_MAP.keys()) + builder_kinds = set(STREAM_BUILDERS.keys()) + expected_non_sharable = storage_kinds - SHARABLE_KINDS + + missing = expected_non_sharable - builder_kinds + extra = builder_kinds - storage_kinds + if missing or extra: + problems = [] + if missing: + problems.append(f"missing builders: {sorted(missing)}") + if extra: + problems.append(f"unregistered kinds: {sorted(extra)}") + raise RuntimeError( + "color_strip_kinds.STREAM_BUILDERS is out of sync with storage._SOURCE_TYPE_MAP: " + + "; ".join(problems) + ) + + +_assert_stream_kind_coverage() diff --git a/server/src/ledgrab/core/processing/color_strip_stream_manager.py b/server/src/ledgrab/core/processing/color_strip_stream_manager.py index e684fb7..9ff4db7 100644 --- a/server/src/ledgrab/core/processing/color_strip_stream_manager.py +++ b/server/src/ledgrab/core/processing/color_strip_stream_manager.py @@ -3,7 +3,7 @@ PictureColorStripStreams (expensive screen capture) are shared across multiple consumers via reference counting — processing runs once, not once per target. -Count-dependent streams (static, gradient, effect) are NOT shared. +Count-dependent streams (single_color, gradient, effect) are NOT shared. Each consumer gets its own instance so it can configure an independent LED count without interfering with other targets. """ @@ -11,37 +11,18 @@ without interfering with other targets. from dataclasses import dataclass from typing import Dict, Optional +from ledgrab.core.processing.color_strip_kinds import ( + StreamDeps, + build_stream, +) from ledgrab.core.processing.color_strip_stream import ( ColorStripStream, - GradientColorStripStream, PictureColorStripStream, - StaticColorStripStream, ) -from ledgrab.core.processing.processed_stream import ProcessedColorStripStream -from ledgrab.core.processing.effect_stream import EffectColorStripStream -from ledgrab.core.processing.api_input_stream import ApiInputColorStripStream -from ledgrab.core.processing.notification_stream import NotificationColorStripStream -from ledgrab.core.processing.daylight_stream import DaylightColorStripStream -from ledgrab.core.processing.candlelight_stream import CandlelightColorStripStream -from ledgrab.core.processing.game_event_stream import GameEventColorStripStream -from ledgrab.core.processing.math_wave_stream import MathWaveColorStripStream from ledgrab.utils import get_logger logger = get_logger(__name__) -# source_type → stream class for non-picture (non-sharable) sources -_SIMPLE_STREAM_MAP = { - "static": StaticColorStripStream, - "gradient": GradientColorStripStream, - "effect": EffectColorStripStream, - "api_input": ApiInputColorStripStream, - "notification": NotificationColorStripStream, - "daylight": DaylightColorStripStream, - "candlelight": CandlelightColorStripStream, - "game_event": GameEventColorStripStream, - "math_wave": MathWaveColorStripStream, -} - @dataclass class _ColorStripEntry: @@ -241,43 +222,27 @@ class ColorStripStreamManager: """ source = self._color_strip_store.get_source(css_id) - # Non-sharable: always create a fresh per-consumer instance + # Non-sharable: always create a fresh per-consumer instance. + # Construction is delegated to the per-kind registry in + # ``color_strip_kinds`` so the dispatch lives in exactly one place. if not source.sharable: - if source.source_type == "audio": - from ledgrab.core.processing.audio_stream import AudioColorStripStream - - css_stream = AudioColorStripStream( - source, - self._audio_capture_manager, - self._audio_source_store, - self._audio_template_store, - self._audio_processing_template_store, - ) - elif source.source_type == "composite": - from ledgrab.core.processing.composite_stream import ( - CompositeColorStripStream, - ) - - css_stream = CompositeColorStripStream( - source, self, self._value_stream_manager, self._cspt_store, depth=depth - ) - elif source.source_type == "mapped": - from ledgrab.core.processing.mapped_stream import MappedColorStripStream - - css_stream = MappedColorStripStream(source, self) - elif source.source_type == "processed": - css_stream = ProcessedColorStripStream(source, self, self._cspt_store) - elif source.source_type == "weather": - from ledgrab.core.processing.weather_stream import WeatherColorStripStream - - css_stream = WeatherColorStripStream(source, self._weather_manager) - else: - stream_cls = _SIMPLE_STREAM_MAP.get(source.source_type) - if not stream_cls: - raise ValueError( - f"Unsupported color strip source type '{source.source_type}' for {css_id}" - ) - css_stream = stream_cls(source) + deps = StreamDeps( + css_manager=self, + value_stream_manager=self._value_stream_manager, + cspt_store=self._cspt_store, + weather_manager=self._weather_manager, + audio_capture_manager=self._audio_capture_manager, + audio_source_store=self._audio_source_store, + audio_template_store=self._audio_template_store, + audio_processing_template_store=self._audio_processing_template_store, + game_event_bus=self._game_event_bus, + depth=depth, + ) + try: + css_stream = build_stream(source, deps) + except ValueError as e: + # Surface the css_id alongside the registry's error. + raise ValueError(f"{e} (css_id={css_id})") from e # Inject gradient store for palette resolution if self._gradient_store and hasattr(css_stream, "set_gradient_store"): css_stream.set_gradient_store(self._gradient_store) diff --git a/server/src/ledgrab/core/processing/value_kinds.py b/server/src/ledgrab/core/processing/value_kinds.py new file mode 100644 index 0000000..91c6d61 --- /dev/null +++ b/server/src/ledgrab/core/processing/value_kinds.py @@ -0,0 +1,354 @@ +"""Single source of truth for value-stream construction. + +``ValueStreamManager._create_stream`` used to be a 168-line ``isinstance`` +ladder over 14 ``ValueSource`` subclasses with a silent fallback to +``StaticValueStream(value=1.0)``. The ladder forced any new value kind to +edit the factory plus the storage subclass plus the schemas plus the store's +``create_source`` — and a missing branch corrupted the stream at runtime. + +This module replaces the ladder with a single ``STREAM_BUILDERS`` registry +keyed by the ``source_type`` string (matching the storage layer's +``_VALUE_SOURCE_MAP``). An import-time coverage assertion guarantees the two +registries stay aligned. + +Builders take a ``(source, deps: ValueStreamDeps) -> ValueStream`` shape so +both the production manager and any preview / test harness can populate the +deps from their own context. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Callable, Optional + +if TYPE_CHECKING: + # Typed forward references so mypy/pyright catch typos like + # ``d.gradient_stroe`` at static-analysis time. At runtime these are + # ``Any`` — the live objects come from the FastAPI / manager wiring. + from ledgrab.core.audio.audio_capture_manager import AudioCaptureManager + from ledgrab.core.game_integration.event_bus import GameEventBus + from ledgrab.core.home_assistant.ha_manager import HomeAssistantManager + from ledgrab.core.processing.color_strip_stream_manager import ( + ColorStripStreamManager, + ) + from ledgrab.core.processing.live_stream_manager import LiveStreamManager + from ledgrab.core.processing.sync_clock_manager import SyncClockRuntime + from ledgrab.storage.audio_processing_template_store import ( + AudioProcessingTemplateStore, + ) + from ledgrab.storage.audio_source_store import AudioSourceStore + from ledgrab.storage.audio_template_store import AudioTemplateStore + from ledgrab.storage.gradient_store import GradientStore + from ledgrab.storage.http_endpoint_store import HTTPEndpointStore + + +@dataclass(frozen=True) +class ValueStreamDeps: + """Dependency bag for value-stream construction. + + Builders only read the subset they need. ``value_stream_manager`` is the + one truly load-bearing field — it is referenced by + ``GradientMapValueStream`` so it can recursively acquire its source + value stream. + + ``clock_runtime`` is pre-acquired by the manager exclusively for the + ``animated_color`` kind (see :data:`NEEDS_CLOCK_RUNTIME`). The manager + owns the bookkeeping for tracking ``vs_id → clock_id`` so the builder + stays a pure ``(source, deps) -> stream`` mapping. If a new kind ever + grows a clock dependency, add it to ``NEEDS_CLOCK_RUNTIME`` AND surface + a separate field — sharing ``clock_runtime`` across kinds invites the + wrong runtime being passed to the wrong builder. + + Field types are quoted so they stay informational under + ``TYPE_CHECKING`` and the dataclass still accepts plain mocks at + runtime. Builders therefore get IDE/lint help against typos like + ``d.gradient_stroe`` while production code remains duck-typed. + """ + + value_stream_manager: "Any" + audio_capture_manager: Optional["AudioCaptureManager"] = None + audio_source_store: Optional["AudioSourceStore"] = None + audio_template_store: Optional["AudioTemplateStore"] = None + audio_processing_template_store: Optional["AudioProcessingTemplateStore"] = None + live_stream_manager: Optional["LiveStreamManager"] = None + ha_manager: Optional["HomeAssistantManager"] = None + gradient_store: Optional["GradientStore"] = None + css_stream_manager: Optional["ColorStripStreamManager"] = None + event_bus: Optional["GameEventBus"] = None + http_endpoint_store: Optional["HTTPEndpointStore"] = None + clock_runtime: Optional["SyncClockRuntime"] = None + + +# --------------------------------------------------------------------------- +# Per-kind builders +# --------------------------------------------------------------------------- + + +def _build_static(source, _d: ValueStreamDeps): + from ledgrab.core.processing.value_stream import StaticValueStream + + return StaticValueStream(value=source.value) + + +def _build_animated(source, _d: ValueStreamDeps): + from ledgrab.core.processing.value_stream import AnimatedValueStream + + return AnimatedValueStream( + waveform=source.waveform, + speed=source.speed, + min_value=source.min_value, + max_value=source.max_value, + ) + + +def _build_audio(source, d: ValueStreamDeps): + from ledgrab.core.processing.value_stream import AudioValueStream + + return AudioValueStream( + audio_source_id=source.audio_source_id, + mode=source.mode, + sensitivity=source.sensitivity, + smoothing=source.smoothing, + min_value=source.min_value, + max_value=source.max_value, + auto_gain=source.auto_gain, + audio_capture_manager=d.audio_capture_manager, + audio_source_store=d.audio_source_store, + audio_template_store=d.audio_template_store, + audio_processing_template_store=d.audio_processing_template_store, + ) + + +def _build_daylight(source, _d: ValueStreamDeps): + from ledgrab.core.processing.value_stream import DaylightValueStream + + return DaylightValueStream( + speed=source.speed, + use_real_time=source.use_real_time, + latitude=source.latitude, + longitude=source.longitude, + min_value=source.min_value, + max_value=source.max_value, + ) + + +def _build_adaptive_time(source, _d: ValueStreamDeps): + from ledgrab.core.processing.value_stream import TimeOfDayValueStream + + return TimeOfDayValueStream( + schedule=source.schedule, + min_value=source.min_value, + max_value=source.max_value, + ) + + +def _build_adaptive_scene(source, d: ValueStreamDeps): + from ledgrab.core.processing.value_stream import SceneValueStream + + return SceneValueStream( + picture_source_id=source.picture_source_id, + scene_behavior=source.scene_behavior, + sensitivity=source.sensitivity, + smoothing=source.smoothing, + min_value=source.min_value, + max_value=source.max_value, + live_stream_manager=d.live_stream_manager, + ) + + +def _build_static_color(source, _d: ValueStreamDeps): + from ledgrab.core.processing.value_stream import StaticColorValueStream + + return StaticColorValueStream(color=source.color) + + +def _build_animated_color(source, d: ValueStreamDeps): + # See NEEDS_CLOCK_RUNTIME below: ``d.clock_runtime`` is pre-acquired by + # ``ValueStreamManager._create_stream`` exclusively for this kind. Any + # other builder that ever needs a clock should add its own deps field + # rather than read this one. + from ledgrab.core.processing.value_stream import AnimatedColorValueStream + + return AnimatedColorValueStream( + colors=source.colors, + speed=source.speed, + easing=source.easing, + clock=d.clock_runtime, + ) + + +def _build_adaptive_time_color(source, _d: ValueStreamDeps): + from ledgrab.core.processing.value_stream import AdaptiveTimeColorValueStream + + return AdaptiveTimeColorValueStream(schedule=source.schedule) + + +def _build_ha_entity(source, d: ValueStreamDeps): + from ledgrab.core.processing.value_stream import HAEntityValueStream + + return HAEntityValueStream( + ha_source_id=source.ha_source_id, + entity_id=source.entity_id, + attribute=source.attribute, + min_ha_value=source.min_ha_value, + max_ha_value=source.max_ha_value, + smoothing=source.smoothing, + ha_manager=d.ha_manager, + ) + + +def _build_gradient_map(source, d: ValueStreamDeps): + from ledgrab.core.processing.value_stream import GradientMapValueStream + + return GradientMapValueStream( + value_source_id=source.value_source_id, + gradient_id=source.gradient_id, + easing=source.easing, + value_stream_manager=d.value_stream_manager, + gradient_store=d.gradient_store, + ) + + +def _build_css_extract(source, d: ValueStreamDeps): + from ledgrab.core.processing.value_stream import CSSExtractValueStream + + return CSSExtractValueStream( + color_strip_source_id=source.color_strip_source_id, + led_start=source.led_start, + led_end=source.led_end, + css_stream_manager=d.css_stream_manager, + ) + + +def _build_system_metrics(source, _d: ValueStreamDeps): + from ledgrab.core.processing.value_stream import SystemMetricsValueStream + + return SystemMetricsValueStream( + metric=source.metric, + min_value=source.min_value, + max_value=source.max_value, + max_rate=source.max_rate, + disk_path=source.disk_path, + sensor_label=source.sensor_label, + poll_interval=source.poll_interval, + smoothing=source.smoothing, + ) + + +def _build_game_event(source, d: ValueStreamDeps): + # Late import: ``GameEventValueStream`` lives in a separate sub-package + # to keep the game-event subsystem (which transitively pulls in the + # game-integration adapters) off the cold-start path for installs that + # never use game events. + from ledgrab.core.value_sources.game_event_value_source import GameEventValueStream + + return GameEventValueStream( + event_type=source.event_type, + min_game_value=source.min_game_value, + max_game_value=source.max_game_value, + smoothing=source.smoothing, + default_value=source.default_value, + timeout=source.timeout, + event_bus=d.event_bus, + ) + + +def _build_http(source, d: ValueStreamDeps): + from ledgrab.core.processing.value_stream import HTTPValueStream + + return HTTPValueStream( + endpoint_id=source.http_endpoint_id, + json_path=source.json_path, + interval_s=source.interval_s, + min_value=source.min_value, + max_value=source.max_value, + smoothing=source.smoothing, + http_endpoint_store=d.http_endpoint_store, + ) + + +# --------------------------------------------------------------------------- +# Registry +# --------------------------------------------------------------------------- + + +StreamBuilder = Callable[[Any, ValueStreamDeps], Any] + +STREAM_BUILDERS: dict[str, StreamBuilder] = { + "static": _build_static, + "animated": _build_animated, + "audio": _build_audio, + "daylight": _build_daylight, + "adaptive_time": _build_adaptive_time, + "adaptive_scene": _build_adaptive_scene, + "static_color": _build_static_color, + "animated_color": _build_animated_color, + "adaptive_time_color": _build_adaptive_time_color, + "ha_entity": _build_ha_entity, + "gradient_map": _build_gradient_map, + "css_extract": _build_css_extract, + "system_metrics": _build_system_metrics, + "game_event": _build_game_event, + "http": _build_http, +} + + +# ``animated_color`` is the only kind that needs a pre-acquired SyncClockRuntime +# from the manager; the rest derive everything they need from ``source`` and +# ``deps``. Exposing this set lets the manager perform the side-effecting +# acquisition step exactly once, before delegating to the registry. +NEEDS_CLOCK_RUNTIME: frozenset[str] = frozenset({"animated_color"}) + + +def build_stream(source, deps: ValueStreamDeps): + """Build a ValueStream for *source*. + + Raises ``ValueError`` when no builder is registered for + ``source.source_type``. Coverage is asserted at module import, so this + only fires for an in-flight instance whose ``source_type`` somehow + drifted from the registered set. + """ + builder = STREAM_BUILDERS.get(source.source_type) + if builder is None: + raise ValueError( + f"No value-stream builder for source_type {source.source_type!r} " + f"(id={getattr(source, 'id', '?')!r})" + ) + return builder(source, deps) + + +def _assert_value_kind_coverage() -> None: + """Verify the registry and storage's ``_VALUE_SOURCE_MAP`` agree. + + Runs at module import. Symmetric: every kind in storage must have a + builder, and every builder must correspond to a real storage kind. + Also asserts ``NEEDS_CLOCK_RUNTIME`` only names kinds that exist in the + registry, so a typo there fails the boot loudly instead of silently + leaking a ``SyncClockRuntime`` acquisition. + """ + from ledgrab.storage.value_source import _VALUE_SOURCE_MAP + + storage_kinds = set(_VALUE_SOURCE_MAP.keys()) + builder_kinds = set(STREAM_BUILDERS.keys()) + missing = storage_kinds - builder_kinds + extra = builder_kinds - storage_kinds + if missing or extra: + problems = [] + if missing: + problems.append(f"missing builders: {sorted(missing)}") + if extra: + problems.append(f"unregistered kinds: {sorted(extra)}") + raise RuntimeError( + "value_kinds.STREAM_BUILDERS is out of sync with storage._VALUE_SOURCE_MAP: " + + "; ".join(problems) + ) + + rogue_clock_kinds = NEEDS_CLOCK_RUNTIME - builder_kinds + if rogue_clock_kinds: + raise RuntimeError( + "value_kinds.NEEDS_CLOCK_RUNTIME names kinds with no registered " + f"builder: {sorted(rogue_clock_kinds)}" + ) + + +_assert_value_kind_coverage() diff --git a/server/src/ledgrab/core/processing/value_stream.py b/server/src/ledgrab/core/processing/value_stream.py index f634faf..272c564 100644 --- a/server/src/ledgrab/core/processing/value_stream.py +++ b/server/src/ledgrab/core/processing/value_stream.py @@ -21,7 +21,9 @@ ValueStreamManager owns all running ValueStreams, keyed by from __future__ import annotations import asyncio +import json import math +import re import time from abc import ABC, abstractmethod from datetime import datetime @@ -29,8 +31,13 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple import numpy as np +from ledgrab.storage.base_store import EntityNotFoundError from ledgrab.utils import get_logger +# Compiled once — used by ``_extract_simple_path`` on every poll. +_NAME_HEAD_RE = re.compile(r"^([^\[]*)") +_INDEX_RE = re.compile(r"^\[(\d+)\]") + if TYPE_CHECKING: from ledgrab.core.audio.audio_capture import AudioCaptureManager from ledgrab.core.game_integration.event_bus import GameEventBus @@ -39,6 +46,7 @@ if TYPE_CHECKING: from ledgrab.core.processing.live_stream_manager import LiveStreamManager from ledgrab.core.processing.sync_clock_manager import SyncClockManager from ledgrab.storage.audio_source_store import AudioSourceStore + from ledgrab.storage.http_endpoint_store import HTTPEndpointStore from ledgrab.storage.value_source import ValueSource from ledgrab.storage.value_source_store import ValueSourceStore @@ -1017,6 +1025,211 @@ class HAEntityValueStream(ValueStream): logger.warning("HAEntityValueStream failed to swap HA runtime: %s", e) +# --------------------------------------------------------------------------- +# HTTP poll +# --------------------------------------------------------------------------- + + +class HTTPValueStream(ValueStream): + """Periodically polls an HTTPEndpoint and extracts a value via json_path. + + Exposes two accessors: + + - ``get_value()`` returns a normalized float in [0, 1] for use as a + modulator (brightness, color, etc.). The raw extracted value is + coerced to float; non-numeric values yield 0.0. + - ``get_raw_value()`` returns the un-normalized extracted value + (str / int / float / bool / None) for consumers that need it + verbatim — e.g. an automation rule comparing ``"playing"``. + """ + + def __init__( + self, + endpoint_id: str, + json_path: str, + interval_s: int, + min_value: float, + max_value: float, + smoothing: float, + http_endpoint_store: Optional["HTTPEndpointStore"] = None, + ) -> None: + self._endpoint_id = endpoint_id + self._json_path = json_path + self._interval_s = max(1, int(interval_s)) + self._min_value = min_value + self._max_value = max_value + self._smoothing = smoothing + self._http_endpoint_store = http_endpoint_store + self._task: Optional[asyncio.Task] = None + self._raw_value: Any = None + self._prev_normalized: Optional[float] = None + # Kept as private attrs for internal/log diagnostics; not exposed via + # public properties or API until a status endpoint consumes them. + self._last_fetched_at: Optional[datetime] = None + self._last_status_code: Optional[int] = None + self._last_error: Optional[str] = None + + def start(self) -> None: + if self._task is not None: + return + if not self._endpoint_id or self._http_endpoint_store is None: + return + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop — can't poll. Construction in a sync test + # context is fine; the stream just stays idle until started + # from an async context. + return + self._task = loop.create_task(self._poll_loop()) + + def stop(self) -> None: + task = self._task + self._task = None + if task is not None: + task.cancel() + + def get_value(self) -> float: + raw = self._raw_value + if raw is None: + return self._prev_normalized if self._prev_normalized is not None else 0.0 + try: + numeric = float(raw) + except (TypeError, ValueError): + return self._prev_normalized if self._prev_normalized is not None else 0.0 + + rng = self._max_value - self._min_value + if abs(rng) < 1e-9: + normalized = 0.5 + else: + normalized = (numeric - self._min_value) / rng + normalized = max(0.0, min(1.0, normalized)) + + if self._smoothing > 0.0 and self._prev_normalized is not None: + normalized = ( + self._smoothing * self._prev_normalized + (1.0 - self._smoothing) * normalized + ) + self._prev_normalized = normalized + return normalized + + def get_raw_value(self) -> Any: + """Return the last raw extracted value (string, int, float, etc.).""" + return self._raw_value + + def update_source(self, source: "ValueSource") -> None: + from ledgrab.storage.value_source import HTTPValueSource + + if not isinstance(source, HTTPValueSource): + return + self._endpoint_id = source.http_endpoint_id + self._json_path = source.json_path + self._interval_s = max(1, int(source.interval_s)) + self._min_value = source.min_value + self._max_value = source.max_value + self._smoothing = source.smoothing + + async def _poll_loop(self) -> None: + from ledgrab.utils.safe_source import safe_request_bounded + + try: + while True: + try: + endpoint = self._http_endpoint_store.get(self._endpoint_id) + except EntityNotFoundError: + # The endpoint was deleted out from under us. Stop the + # poll task so it doesn't spin forever; the next entity + # event (or the value source being deleted) will tidy + # the rest of the bookkeeping. + logger.warning( + "HTTPValueStream stopping: endpoint %s no longer exists", + self._endpoint_id, + ) + self._last_error = "endpoint_deleted" + self._raw_value = None + self._task = None + return + except Exception as exc: + self._last_error = f"Endpoint lookup failed: {type(exc).__name__}" + self._raw_value = None + await asyncio.sleep(self._interval_s) + continue + + headers = endpoint.build_request_headers() + try: + status, body_bytes, error = await safe_request_bounded( + endpoint.method, + endpoint.url, + headers=headers, + timeout=endpoint.timeout_s, + ) + except Exception as exc: + # safe_request_bounded raises HTTPException on URL + # validation failure; treat that as a recoverable poll + # error and try again next cycle. + self._last_status_code = None + self._last_error = f"URL validation failed: {type(exc).__name__}" + self._raw_value = None + await asyncio.sleep(self._interval_s) + continue + + self._last_status_code = status if status else None + self._last_error = error + if not error and status: + try: + body_text = body_bytes.decode("utf-8", errors="replace") + except Exception: + body_text = "" + body_json: Any + try: + body_json = json.loads(body_text) if body_text else None + except (ValueError, TypeError): + body_json = None + self._raw_value = _extract_simple_path(body_json, self._json_path, body_text) + else: + self._raw_value = None + self._last_fetched_at = datetime.now() + + await asyncio.sleep(self._interval_s) + except asyncio.CancelledError: + raise + + +def _extract_simple_path(body_json: Any, path: str, body_text: str) -> Any: + """Extract a value via a dot-path (with optional ``[N]`` indices). + + Uses module-level compiled regexes so repeated polls don't recompile. + Returns ``None`` when the path doesn't resolve — runtime callers just + need "the value, or nothing." Empty path returns the raw body text so + plain-text endpoints work too. + """ + if not path: + return body_text or None + if body_json is None: + return None + current: Any = body_json + for raw_segment in path.split("."): + segment = raw_segment.strip() + if not segment: + continue + name_match = _NAME_HEAD_RE.match(segment) + name_part = name_match.group(1) if name_match else "" + remainder = segment[len(name_part) :] + if name_part: + if not isinstance(current, dict) or name_part not in current: + return None + current = current[name_part] + while remainder: + idx_match = _INDEX_RE.match(remainder) + if not idx_match: + return None + idx = int(idx_match.group(1)) + if not isinstance(current, list) or idx < 0 or idx >= len(current): + return None + current = current[idx] + remainder = remainder[idx_match.end() :] + return current + + # --------------------------------------------------------------------------- # Gradient Map # --------------------------------------------------------------------------- @@ -1547,6 +1760,7 @@ class ValueStreamManager: event_bus: Optional["GameEventBus"] = None, audio_processing_template_store=None, sync_clock_manager: Optional["SyncClockManager"] = None, + http_endpoint_store: Optional["HTTPEndpointStore"] = None, ): self._value_source_store = value_source_store self._audio_capture_manager = audio_capture_manager @@ -1559,6 +1773,7 @@ class ValueStreamManager: self._event_bus = event_bus self._audio_processing_template_store = audio_processing_template_store self._sync_clock_manager = sync_clock_manager + self._http_endpoint_store = http_endpoint_store self._streams: Dict[str, ValueStream] = {} # vs_id → stream self._ref_counts: Dict[str, int] = {} # vs_id → ref count # Tracks which clock_id (if any) was acquired for each stream so we @@ -1602,6 +1817,17 @@ class ValueStreamManager: else: logger.info(f"Released ref for value stream {vs_id} (refs={refs})") + def peek(self, vs_id: str) -> Optional[ValueStream]: + """Read-only accessor: return the running ValueStream for ``vs_id`` + if one exists, else ``None``. + + Does NOT change ref counts. Use for consumer-driven reads where the + caller already holds a reference via :meth:`acquire` (e.g. the + :class:`AutomationEngine` evaluating an ``HTTPPollRule`` against a + value source it has already acquired in ``_sync_value_stream_refs``). + """ + return self._streams.get(vs_id) + def update_source(self, vs_id: str) -> None: """Hot-update the shared stream for the given ValueSource.""" try: @@ -1699,158 +1925,52 @@ class ValueStreamManager: logger.info("Released all value streams") def _create_stream(self, source: "ValueSource", vs_id: Optional[str] = None) -> ValueStream: - """Factory: create the appropriate ValueStream for a ValueSource.""" - from ledgrab.storage.value_source import ( - AdaptiveValueSource, - AnimatedValueSource, - AudioValueSource, - CSSExtractValueSource, - DaylightValueSource, - GameEventValueSource, - GradientMapValueSource, - HAEntityValueSource, - StaticValueSource, - StaticColorValueSource, - AnimatedColorValueSource, - AdaptiveTimeColorValueSource, - SystemMetricsValueSource, + """Build a ValueStream for *source* via the central kind registry. + + The 14-branch ``isinstance`` ladder this method used to host was the + canonical example of the parallel-change smell flagged in the + architecture audit. The actual per-kind construction now lives in + ``ledgrab.core.processing.value_kinds.STREAM_BUILDERS``, keyed by + ``source.source_type``. This method only handles the manager-side + bookkeeping that does not fit a uniform builder signature — namely + the optional :class:`SyncClockRuntime` acquisition for + ``animated_color`` sources, whose ``vs_id → clock_id`` mapping the + manager owns for symmetric release at teardown. + """ + from ledgrab.core.processing.value_kinds import ( + NEEDS_CLOCK_RUNTIME, + ValueStreamDeps, + build_stream, ) - if isinstance(source, StaticValueSource): - return StaticValueStream(value=source.value) - - if isinstance(source, AnimatedValueSource): - return AnimatedValueStream( - waveform=source.waveform, - speed=source.speed, - min_value=source.min_value, - max_value=source.max_value, - ) - - if isinstance(source, AudioValueSource): - return AudioValueStream( - audio_source_id=source.audio_source_id, - mode=source.mode, - sensitivity=source.sensitivity, - smoothing=source.smoothing, - min_value=source.min_value, - max_value=source.max_value, - auto_gain=source.auto_gain, - audio_capture_manager=self._audio_capture_manager, - audio_source_store=self._audio_source_store, - audio_template_store=self._audio_template_store, - audio_processing_template_store=self._audio_processing_template_store, - ) - - if isinstance(source, DaylightValueSource): - return DaylightValueStream( - speed=source.speed, - use_real_time=source.use_real_time, - latitude=source.latitude, - longitude=source.longitude, - min_value=source.min_value, - max_value=source.max_value, - ) - - if isinstance(source, AdaptiveValueSource): - if source.source_type == "adaptive_scene": - return SceneValueStream( - picture_source_id=source.picture_source_id, - scene_behavior=source.scene_behavior, - sensitivity=source.sensitivity, - smoothing=source.smoothing, - min_value=source.min_value, - max_value=source.max_value, - live_stream_manager=self._live_stream_manager, - ) - return TimeOfDayValueStream( - schedule=source.schedule, - min_value=source.min_value, - max_value=source.max_value, - ) - - # Color streams - if isinstance(source, StaticColorValueSource): - return StaticColorValueStream(color=source.color) - - if isinstance(source, AnimatedColorValueSource): - clock_runtime = None - if source.clock_id and self._sync_clock_manager: + clock_runtime = None + if source.source_type in NEEDS_CLOCK_RUNTIME: + clock_id = getattr(source, "clock_id", None) + if clock_id and self._sync_clock_manager: try: - clock_runtime = self._sync_clock_manager.acquire(source.clock_id) + clock_runtime = self._sync_clock_manager.acquire(clock_id) if vs_id is not None: - self._stream_clock_ids[vs_id] = source.clock_id + self._stream_clock_ids[vs_id] = clock_id except Exception as e: logger.warning( "Could not acquire sync clock %s for value source %s: %s", - source.clock_id, + clock_id, source.id, e, ) - return AnimatedColorValueStream( - colors=source.colors, - speed=source.speed, - easing=source.easing, - clock=clock_runtime, - ) - if isinstance(source, AdaptiveTimeColorValueSource): - return AdaptiveTimeColorValueStream(schedule=source.schedule) - - if isinstance(source, HAEntityValueSource): - return HAEntityValueStream( - ha_source_id=source.ha_source_id, - entity_id=source.entity_id, - attribute=source.attribute, - min_ha_value=source.min_ha_value, - max_ha_value=source.max_ha_value, - smoothing=source.smoothing, - ha_manager=self._ha_manager, - ) - - if isinstance(source, GradientMapValueSource): - return GradientMapValueStream( - value_source_id=source.value_source_id, - gradient_id=source.gradient_id, - easing=source.easing, - value_stream_manager=self, - gradient_store=self._gradient_store, - ) - - if isinstance(source, CSSExtractValueSource): - return CSSExtractValueStream( - color_strip_source_id=source.color_strip_source_id, - led_start=source.led_start, - led_end=source.led_end, - css_stream_manager=self._css_stream_manager, - ) - - if isinstance(source, SystemMetricsValueSource): - return SystemMetricsValueStream( - metric=source.metric, - min_value=source.min_value, - max_value=source.max_value, - max_rate=source.max_rate, - disk_path=source.disk_path, - sensor_label=source.sensor_label, - poll_interval=source.poll_interval, - smoothing=source.smoothing, - ) - - if isinstance(source, GameEventValueSource): - from ledgrab.core.value_sources.game_event_value_source import ( - GameEventValueStream, - ) - - return GameEventValueStream( - event_type=source.event_type, - min_game_value=source.min_game_value, - max_game_value=source.max_game_value, - smoothing=source.smoothing, - default_value=source.default_value, - timeout=source.timeout, - event_bus=self._event_bus, - ) - - # Fallback - return StaticValueStream(value=1.0) + deps = ValueStreamDeps( + value_stream_manager=self, + audio_capture_manager=self._audio_capture_manager, + audio_source_store=self._audio_source_store, + audio_template_store=self._audio_template_store, + audio_processing_template_store=self._audio_processing_template_store, + live_stream_manager=self._live_stream_manager, + ha_manager=self._ha_manager, + gradient_store=self._gradient_store, + css_stream_manager=self._css_stream_manager, + event_bus=self._event_bus, + http_endpoint_store=self._http_endpoint_store, + clock_runtime=clock_runtime, + ) + return build_stream(source, deps) diff --git a/server/src/ledgrab/storage/base_sqlite_store.py b/server/src/ledgrab/storage/base_sqlite_store.py index eabeb18..96d4a07 100644 --- a/server/src/ledgrab/storage/base_sqlite_store.py +++ b/server/src/ledgrab/storage/base_sqlite_store.py @@ -32,6 +32,15 @@ class BaseSqliteStore(Generic[T]): self._items: Dict[str, T] = {} self._deserializer = deserializer self._lock = threading.RLock() + # Apply pending JSON-blob data migrations before loading rows so the + # in-memory cache reflects the canonical, post-migration shape. The + # runner is idempotent across stores — it consults a dedicated + # ``data_migrations`` audit table — so each store construction may + # invoke it without re-running already-applied migrations. + # Imported lazily to avoid a circular dependency at module load. + from ledgrab.storage.data_migrations import ALL_MIGRATIONS, MigrationRunner + + MigrationRunner(db).run(ALL_MIGRATIONS) self._load() # -- I/O ----------------------------------------------------------------- diff --git a/server/src/ledgrab/storage/color_strip_store.py b/server/src/ledgrab/storage/color_strip_store.py index 2556565..5c7acc1 100644 --- a/server/src/ledgrab/storage/color_strip_store.py +++ b/server/src/ledgrab/storage/color_strip_store.py @@ -23,7 +23,13 @@ MAX_COMPOSITE_DEPTH = 4 class ColorStripStore(BaseSqliteStore[ColorStripSource]): - """Persistent storage for color strip sources.""" + """Persistent storage for color strip sources. + + JSON-blob field renames (e.g. legacy ``source_type='static'`` → + ``'single_color'``) are handled by the central + :mod:`ledgrab.storage.data_migrations` runner, which executes once per + database from :meth:`Database._ensure_schema`. + """ _table_name = "color_strip_sources" _entity_name = "Color strip source" diff --git a/server/src/ledgrab/storage/data_migrations.py b/server/src/ledgrab/storage/data_migrations.py new file mode 100644 index 0000000..b8515d1 --- /dev/null +++ b/server/src/ledgrab/storage/data_migrations.py @@ -0,0 +1,219 @@ +"""Versioned data migrations for stored JSON entities. + +Each migration is a small class with a stable ``name`` (used as the idempotency +key) and an ``apply(db)`` method that performs the change inside a transaction. +The ``MigrationRunner`` keeps a ``data_migrations`` table that records which +migrations have already executed; subsequent runs are a no-op. + +This replaces ad-hoc per-store migrations that were rewriting raw JSON via +``str.replace`` — that approach corrupted nested fields whose values happened +to share a substring with the renamed key, and had no transaction or audit. + +Adding a new migration +---------------------- + +1. Subclass :class:`DataMigration`, give it a unique ``name`` (prefix with the + next sequence number, e.g. ``"002_..."``) and implement ``apply``. +2. Append it to :data:`ALL_MIGRATIONS` in commit order. Never reorder existing + entries — the runner records each by name. +3. The first ``ColorStripStore`` (or any other store) construction triggers the + runner; nothing else has to change. +""" + +from __future__ import annotations + +import json +import sqlite3 +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime, timezone + +from ledgrab.storage.database import Database, is_writes_frozen +from ledgrab.utils import get_logger + +logger = get_logger(__name__) + + +class DataMigration(ABC): + """One JSON-blob migration step. + + Subclasses must declare a class-level ``name`` and implement ``apply``. + Implementations must operate on the supplied ``conn`` (an already-open + transaction). They MUST NOT commit or open nested transactions — the + :class:`MigrationRunner` owns the transaction so the data UPDATEs and the + audit-table INSERT are atomic. + """ + + name: str = "" + + @abstractmethod + def apply(self, conn: sqlite3.Connection) -> int: + """Perform the migration inside the supplied transaction. + + Return the number of rows changed. + """ + + +@dataclass(frozen=True) +class MigrationRecord: + """A row in the ``data_migrations`` audit table.""" + + name: str + applied_at: str + rows_changed: int + + +class MigrationRunner: + """Apply pending data migrations exactly once per database. + + Each call to :meth:`run` opens a single transaction that covers (a) the + "is this migration already applied?" check, (b) the migration body, and + (c) the audit INSERT. That guarantees: + + * a partial-failure cannot leave data rewritten but unrecorded; + * concurrent stores constructing on multiple threads cannot race each + other into a UNIQUE-constraint crash on the audit table. + + The runner skips silently when writes are frozen so a post-restore boot + does not mutate the freshly-restored database before the imminent restart. + """ + + _TABLE = "data_migrations" + + def __init__(self, db: Database): + self._db = db + self._ensure_table() + + def _ensure_table(self) -> None: + # CREATE TABLE is idempotent thanks to IF NOT EXISTS; running on every + # startup is cheap. ``Database.execute`` does NOT honour the + # ``is_writes_frozen`` guard (DDL is always permitted), so the audit + # table reliably exists even on a frozen boot. + self._db.execute( + f""" + CREATE TABLE IF NOT EXISTS {self._TABLE} ( + name TEXT PRIMARY KEY, + applied_at TEXT NOT NULL, + rows_changed INTEGER NOT NULL DEFAULT 0 + ) + """ + ) + + def applied_names(self) -> set[str]: + """Return the set of migration names already recorded as applied.""" + cursor = self._db.execute(f"SELECT name FROM {self._TABLE}") + return {row["name"] for row in cursor.fetchall()} + + def run(self, migrations: list[DataMigration]) -> list[MigrationRecord]: + """Apply each migration in *migrations* that hasn't been recorded yet. + + Returns the records for migrations that actually executed this call. + Returns an empty list when writes are frozen. + """ + if is_writes_frozen(): + # Frozen-write databases are read-only between a restore and the + # imminent restart. Surface this clearly because the in-memory + # state may briefly reflect pre-migration data. + logger.warning("Data migrations skipped: writes are frozen (restart expected).") + return [] + + # Validate names up-front so an unnamed migration aborts before any + # transaction work. + for migration in migrations: + if not migration.name: + raise ValueError( + f"DataMigration {type(migration).__name__} must declare a non-empty name" + ) + + executed: list[MigrationRecord] = [] + for migration in migrations: + # Use a separate transaction per migration so a failure rolls back + # only the failing one and any earlier (already-recorded) + # migrations keep their audit rows. + with self._db.transaction() as conn: + already_applied = conn.execute( + f"SELECT 1 FROM {self._TABLE} WHERE name = ?", + (migration.name,), + ).fetchone() + if already_applied: + continue + logger.info("Applying data migration: %s", migration.name) + rows_changed = int(migration.apply(conn)) + applied_at = datetime.now(timezone.utc).isoformat() + # INSERT OR IGNORE defends against a hypothetical concurrent + # writer that recorded the same name between the SELECT above + # and this INSERT (the RLock serialises this in practice, but + # the constraint is the durable contract). + conn.execute( + f"INSERT OR IGNORE INTO {self._TABLE} " + f"(name, applied_at, rows_changed) VALUES (?, ?, ?)", + (migration.name, applied_at, rows_changed), + ) + executed.append( + MigrationRecord( + name=migration.name, + applied_at=applied_at, + rows_changed=rows_changed, + ) + ) + if rows_changed: + logger.warning("Migration %s rewrote %d row(s)", migration.name, rows_changed) + + if executed: + logger.info( + "Applied %d data migration(s): %s", + len(executed), + ", ".join(r.name for r in executed), + ) + return executed + + +# --------------------------------------------------------------------------- +# Concrete migrations +# --------------------------------------------------------------------------- + + +class StaticToSingleColorMigration(DataMigration): + """Rename the legacy ``source_type='static'`` color-strip kind to ``single_color``. + + The pre-rename rows in ``color_strip_sources`` used ``source_type='static'``; + the new canonical value is ``single_color``. The previous in-line string + replace risked rewriting unrelated substrings (e.g. an animation type named + ``static_wave``). This migration parses the JSON, mutates the single field + we care about, and writes the canonical serialization back. Runs inside + the runner's transaction. + """ + + name = "001_color_strip_static_to_single_color" + + def apply(self, conn: sqlite3.Connection) -> int: + rows_changed = 0 + rows = conn.execute("SELECT id, data FROM [color_strip_sources]").fetchall() + for row in rows: + blob = row["data"] + try: + parsed = json.loads(blob) + except json.JSONDecodeError as e: + # A corrupt blob is a pre-existing problem; do not crash the + # migration. The store's own load path will surface it. + logger.warning( + "Skipping corrupt color_strip_sources row %s during migration: %s", + row["id"], + e, + ) + continue + if parsed.get("source_type") != "static": + continue + parsed["source_type"] = "single_color" + conn.execute( + "UPDATE [color_strip_sources] SET data = ? WHERE id = ?", + (json.dumps(parsed, ensure_ascii=False), row["id"]), + ) + rows_changed += 1 + return rows_changed + + +# Master list — ORDER MATTERS. Append new migrations; never reorder. +ALL_MIGRATIONS: list[DataMigration] = [ + StaticToSingleColorMigration(), +] diff --git a/server/tests/api/routes/test_color_strip_sources_response_map.py b/server/tests/api/routes/test_color_strip_sources_response_map.py new file mode 100644 index 0000000..1f8115f --- /dev/null +++ b/server/tests/api/routes/test_color_strip_sources_response_map.py @@ -0,0 +1,85 @@ +"""Regression tests for the CSS response builder dispatch. + +These lock in the safety net introduced when the silent "fallback to a fake +PictureCSSResponse" branch was removed: + +* every concrete ColorStripSource subclass registered in ``_SOURCE_TYPE_MAP`` + must have a builder in ``_RESPONSE_MAP`` (verified at module import); and +* the previously-missing ``GameEventColorStripSource`` round-trips through + the response builder into the matching ``GameEventCSSResponse`` schema. +""" + +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest + +from ledgrab.api.routes.color_strip_sources import _helpers +from ledgrab.api.schemas.color_strip_sources import GameEventCSSResponse +from ledgrab.storage.color_strip_source import ( + ColorStripSource, + GameEventColorStripSource, +) + + +def test_response_map_covers_every_registered_source_type(): + """Every source_type in _SOURCE_TYPE_MAP has a response builder.""" + storage_kinds = set(_helpers._STORAGE_TYPE_MAP.keys()) + builder_kinds = set(_helpers._RESPONSE_MAP.keys()) + missing = storage_kinds - builder_kinds + assert not missing, f"_RESPONSE_MAP is missing builders for: {sorted(missing)}" + + +def test_assert_helper_raises_when_a_kind_is_unregistered(monkeypatch): + """Adding a kind to the storage registry without a builder is caught.""" + + class _PhantomColorStripSource(ColorStripSource): + pass + + monkeypatch.setitem(_helpers._STORAGE_TYPE_MAP, "phantom", _PhantomColorStripSource) + with pytest.raises(RuntimeError, match="phantom"): + _helpers._assert_response_map_coverage() + + +def test_game_event_source_serialises_to_game_event_response(): + """A GameEventColorStripSource is rendered as GameEventCSSResponse (not the + fake PictureCSSResponse that the silent fallback used to return). + """ + now = datetime.now(timezone.utc) + source = GameEventColorStripSource( + id="css_gev0001", + name="Counter-Strike feedback", + source_type="game_event", + created_at=now, + updated_at=now, + game_integration_id="gi_cs2_main", + event_mappings=[{"event": "hit", "effect": "flash"}], + led_count=60, + ) + + response = _helpers._css_to_response(source) + + assert isinstance(response, GameEventCSSResponse) + assert response.source_type == "game_event" + assert response.game_integration_id == "gi_cs2_main" + assert response.event_mappings == [{"event": "hit", "effect": "flash"}] + assert response.led_count == 60 + + +def test_unregistered_source_type_raises_in_css_to_response(): + """Reaching ``_css_to_response`` with an unmapped source_type raises loudly.""" + + class _UnregisteredCSS(ColorStripSource): + pass + + now = datetime.now(timezone.utc) + source = _UnregisteredCSS( + id="css_xxx", + name="rogue", + source_type="rogue_unregistered", + created_at=now, + updated_at=now, + ) + with pytest.raises(RuntimeError, match="No CSS response builder"): + _helpers._css_to_response(source) diff --git a/server/tests/core/processing/test_color_strip_kinds.py b/server/tests/core/processing/test_color_strip_kinds.py new file mode 100644 index 0000000..f09cb3a --- /dev/null +++ b/server/tests/core/processing/test_color_strip_kinds.py @@ -0,0 +1,97 @@ +"""Tests for the unified color-strip stream-builder registry.""" + +from __future__ import annotations + +import pytest + +from ledgrab.core.processing import color_strip_kinds +from ledgrab.core.processing.color_strip_kinds import ( + SHARABLE_KINDS, + STREAM_BUILDERS, + StreamDeps, + build_stream, +) +from ledgrab.storage.color_strip_source import _SOURCE_TYPE_MAP + + +def test_stream_builders_partition_storage_kinds_with_sharable(): + """STREAM_BUILDERS ∪ SHARABLE_KINDS == storage source_types.""" + storage_kinds = set(_SOURCE_TYPE_MAP.keys()) + covered = set(STREAM_BUILDERS.keys()) | SHARABLE_KINDS + assert ( + storage_kinds == covered + ), f"missing={storage_kinds - covered}, extra={covered - storage_kinds}" + + +def test_sharable_kinds_list_matches_actual_sharable_property(): + """The hand-coded SHARABLE_KINDS list mirrors the `sharable` @property values. + + Every ColorStripSource subclass currently implements ``sharable`` as a + literal ``return True``/``return False`` that does not touch ``self``, + so ``fget(None)`` is a safe introspection. If a future subclass changes + that contract — e.g. by computing sharability from instance state — this + test will raise a clear error rather than silently misclassify the kind, + forcing the maintainer to update both ``SHARABLE_KINDS`` and this test. + """ + + def _is_sharable(name: str, cls) -> bool: + prop = getattr(cls, "sharable", None) + if not isinstance(prop, property) or prop.fget is None: + raise AssertionError( + f"{cls.__name__} (source_type={name!r}) does not expose `sharable` " + f"as a @property; SHARABLE_KINDS introspection no longer works." + ) + try: + return bool(prop.fget(None)) + except (AttributeError, TypeError) as e: + raise AssertionError( + f"{cls.__name__}.sharable now depends on `self` ({e}); update " + "this test and SHARABLE_KINDS together." + ) from e + + actually_sharable = {name for name, cls in _SOURCE_TYPE_MAP.items() if _is_sharable(name, cls)} + assert actually_sharable == set(SHARABLE_KINDS), ( + "SHARABLE_KINDS drifted from the @property values: " + f"actually_sharable={sorted(actually_sharable)}, " + f"SHARABLE_KINDS={sorted(SHARABLE_KINDS)}" + ) + + +def test_build_stream_raises_on_unknown_kind(): + class _FakeSource: + source_type = "totally_unregistered_kind" + id = "fake" + + deps = StreamDeps(css_manager=None) + with pytest.raises(ValueError, match="totally_unregistered_kind"): + build_stream(_FakeSource(), deps) + + +def test_coverage_assertion_raises_on_drift(monkeypatch): + """Adding a kind to storage's _SOURCE_TYPE_MAP without a builder fails fast.""" + + monkeypatch.setitem(_SOURCE_TYPE_MAP, "phantom", object) + with pytest.raises(RuntimeError, match="phantom"): + color_strip_kinds._assert_stream_kind_coverage() + + +def test_legacy_static_alias_resolves_to_single_color_builder(): + """The legacy 'static' source_type still maps to the single_color builder.""" + assert STREAM_BUILDERS["static"] is not None + assert STREAM_BUILDERS["single_color"] is not None + # The two route to the same loader, but builders are distinct closures — + # call them with a tiny source stub and confirm same class. + from ledgrab.core.processing.color_strip_stream import SingleColorStripStream + from ledgrab.storage.color_strip_source import SingleColorStripSource + from datetime import datetime, timezone + + src = SingleColorStripSource( + id="t", + name="t", + source_type="static", + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + ) + deps = StreamDeps(css_manager=None) + s = STREAM_BUILDERS["static"](src, deps) + assert isinstance(s, SingleColorStripStream) diff --git a/server/tests/core/processing/test_value_kinds.py b/server/tests/core/processing/test_value_kinds.py new file mode 100644 index 0000000..0dfaf43 --- /dev/null +++ b/server/tests/core/processing/test_value_kinds.py @@ -0,0 +1,64 @@ +"""Tests for the unified value-stream-builder registry.""" + +from __future__ import annotations + +import pytest + +from ledgrab.core.processing import value_kinds +from ledgrab.core.processing.value_kinds import ( + NEEDS_CLOCK_RUNTIME, + STREAM_BUILDERS, + ValueStreamDeps, + build_stream, +) +from ledgrab.storage.value_source import _VALUE_SOURCE_MAP + + +def test_builders_cover_every_storage_kind(): + """STREAM_BUILDERS keys == storage._VALUE_SOURCE_MAP keys.""" + storage_kinds = set(_VALUE_SOURCE_MAP.keys()) + builder_kinds = set(STREAM_BUILDERS.keys()) + assert storage_kinds == builder_kinds, ( + f"missing={storage_kinds - builder_kinds}, " f"extra={builder_kinds - storage_kinds}" + ) + + +def test_build_stream_raises_on_unknown_kind(): + class _Fake: + source_type = "totally_unregistered" + id = "fake" + + deps = ValueStreamDeps(value_stream_manager=None) + with pytest.raises(ValueError, match="totally_unregistered"): + build_stream(_Fake(), deps) + + +def test_coverage_assertion_raises_on_drift(monkeypatch): + """A kind added to storage without a builder fails the import-time check.""" + monkeypatch.setitem(_VALUE_SOURCE_MAP, "phantom_kind", object) + with pytest.raises(RuntimeError, match="phantom_kind"): + value_kinds._assert_value_kind_coverage() + + +def test_needs_clock_runtime_is_subset_of_registered_kinds(): + assert NEEDS_CLOCK_RUNTIME.issubset(set(STREAM_BUILDERS.keys())) + + +def test_static_builder_returns_static_value_stream(): + from ledgrab.core.processing.value_stream import StaticValueStream + from ledgrab.storage.value_source import StaticValueSource + from datetime import datetime, timezone + + src = StaticValueSource( + id="vs_t", + name="t", + source_type="static", + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + value=0.42, + ) + deps = ValueStreamDeps(value_stream_manager=None) + stream = build_stream(src, deps) + assert isinstance(stream, StaticValueStream) + # Sanity: the value flowed through + assert stream.get_value() == 0.42 diff --git a/server/tests/storage/test_color_strip_store_migration.py b/server/tests/storage/test_color_strip_store_migration.py new file mode 100644 index 0000000..30d3666 --- /dev/null +++ b/server/tests/storage/test_color_strip_store_migration.py @@ -0,0 +1,90 @@ +"""Tests for ColorStripStore startup migrations.""" + +import json + +import pytest + +from ledgrab.storage.color_strip_source import SingleColorStripSource +from ledgrab.storage.color_strip_store import ColorStripStore +from ledgrab.storage.database import Database + + +@pytest.fixture +def tmp_db(tmp_path): + db = Database(tmp_path / "test.db") + yield db + db.close() + + +def _insert_legacy_row(db: Database, item_id: str, name: str, color: list[int]) -> dict: + """Insert a pre-rename color strip row with ``source_type='static'``.""" + from datetime import datetime, timezone + + now = datetime.now(timezone.utc).isoformat() + blob = { + "id": item_id, + "name": name, + "source_type": "static", + "color": color, + "animation": None, + "tags": [], + "led_count": 30, + "created_at": now, + "updated_at": now, + "icon": "", + "icon_color": "", + } + db.upsert("color_strip_sources", item_id, name, blob) + return blob + + +def test_legacy_static_row_is_rewritten_to_single_color(tmp_db): + """A row with ``source_type='static'`` is rewritten on first store load.""" + _insert_legacy_row(tmp_db, "css_legacy01", "Legacy Solid", [255, 0, 0]) + + # Sanity: legacy blob really is on disk + raw_before = tmp_db.load_all("color_strip_sources") + assert raw_before[0]["source_type"] == "static" + + store = ColorStripStore(tmp_db) + + # In-memory item is a SingleColorStripSource with the canonical type + item = store.get("css_legacy01") + assert isinstance(item, SingleColorStripSource) + assert item.source_type == "single_color" + + # The on-disk row was rewritten too — no second migration on next boot + raw_after = tmp_db.load_all("color_strip_sources") + assert raw_after[0]["source_type"] == "single_color" + + +def test_already_migrated_row_is_left_alone(tmp_db): + """Rows already using ``single_color`` are not touched.""" + from datetime import datetime, timezone + + now = datetime.now(timezone.utc).isoformat() + blob = { + "id": "css_new01", + "name": "Already Migrated", + "source_type": "single_color", + "color": [0, 128, 255], + "animation": None, + "tags": [], + "led_count": 30, + "created_at": now, + "updated_at": now, + "icon": "", + "icon_color": "", + } + tmp_db.upsert("color_strip_sources", "css_new01", "Already Migrated", blob) + + raw_before = tmp_db.load_all("color_strip_sources") + before_json = json.dumps(raw_before[0], sort_keys=True) + + store = ColorStripStore(tmp_db) + item = store.get("css_new01") + assert isinstance(item, SingleColorStripSource) + + raw_after = tmp_db.load_all("color_strip_sources") + after_json = json.dumps(raw_after[0], sort_keys=True) + assert before_json == after_json diff --git a/server/tests/storage/test_data_migrations.py b/server/tests/storage/test_data_migrations.py new file mode 100644 index 0000000..1f5d704 --- /dev/null +++ b/server/tests/storage/test_data_migrations.py @@ -0,0 +1,309 @@ +"""Tests for the versioned data-migration runner. + +These verify the safety properties that replaced the string-replace migration: + +* migrations parse JSON properly (no substring corruption); +* the runner records each migration name in ``data_migrations`` so subsequent + runs are no-ops; +* a migration that touches no rows still gets recorded as applied; +* a migration that raises does not get recorded (so a fixed migration can be + re-attempted on the next boot); +* the legacy ``static`` rename only rewrites rows whose ``source_type`` field + actually equals ``static`` — values that merely contain the substring (e.g. + an animation type ``static_wave``) are left untouched. +""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone + +import pytest + +from ledgrab.storage.data_migrations import ( + ALL_MIGRATIONS, + DataMigration, + MigrationRunner, + StaticToSingleColorMigration, +) +from ledgrab.storage.database import Database + + +@pytest.fixture +def tmp_db(tmp_path): + db = Database(tmp_path / "test.db") + yield db + db.close() + + +def _now() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _insert_css(db: Database, item_id: str, blob: dict) -> None: + db.upsert("color_strip_sources", item_id, blob.get("name", ""), blob) + + +# --------------------------------------------------------------------------- +# Runner mechanics +# --------------------------------------------------------------------------- + + +def test_runner_creates_audit_table(tmp_db): + MigrationRunner(tmp_db) + cursor = tmp_db.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='data_migrations'" + ) + assert cursor.fetchone() is not None + + +def test_runner_records_applied_migration(tmp_db): + class _Noop(DataMigration): + name = "test_noop" + + def apply(self, conn): + return 0 + + runner = MigrationRunner(tmp_db) + records = runner.run([_Noop()]) + + assert [r.name for r in records] == ["test_noop"] + assert "test_noop" in runner.applied_names() + + +def test_runner_skips_already_applied_migration(tmp_db): + calls = 0 + + class _CountedMigration(DataMigration): + name = "test_counted" + + def apply(self, conn): + nonlocal calls + calls += 1 + return 0 + + runner = MigrationRunner(tmp_db) + runner.run([_CountedMigration()]) + runner.run([_CountedMigration()]) + runner.run([_CountedMigration()]) + + assert calls == 1 + + +def test_runner_records_zero_row_migration_so_it_does_not_repeat(tmp_db): + """Even when a migration changes 0 rows, it is recorded as applied.""" + + class _NoMatch(DataMigration): + name = "test_no_match" + + def apply(self, conn): + return 0 + + runner = MigrationRunner(tmp_db) + runner.run([_NoMatch()]) + + assert "test_no_match" in runner.applied_names() + + +def test_runner_does_not_record_failed_migration(tmp_db): + class _Boom(DataMigration): + name = "test_boom" + + def apply(self, conn): + raise RuntimeError("kaboom") + + runner = MigrationRunner(tmp_db) + with pytest.raises(RuntimeError, match="kaboom"): + runner.run([_Boom()]) + + assert "test_boom" not in runner.applied_names() + + +def test_runner_rejects_unnamed_migration(tmp_db): + class _Unnamed(DataMigration): + def apply(self, conn): + return 0 + + runner = MigrationRunner(tmp_db) + with pytest.raises(ValueError, match="non-empty name"): + runner.run([_Unnamed()]) + + +def test_runner_rolls_back_data_change_when_migration_raises(tmp_db): + """A migration that raises mid-way leaves no partial UPDATEs behind.""" + blob = { + "id": "css_partial", + "name": "Partial", + "source_type": "static", + "color": [10, 20, 30], + "tags": [], + "led_count": 30, + "created_at": _now(), + "updated_at": _now(), + } + _insert_css(tmp_db, "css_partial", blob) + + class _PartialBoom(DataMigration): + name = "test_partial_boom" + + def apply(self, conn): + conn.execute( + "UPDATE color_strip_sources SET data = ? WHERE id = ?", + (json.dumps({**blob, "source_type": "single_color"}), "css_partial"), + ) + raise RuntimeError("explode after update") + + runner = MigrationRunner(tmp_db) + with pytest.raises(RuntimeError, match="explode after update"): + runner.run([_PartialBoom()]) + + rows = tmp_db.load_all("color_strip_sources") + assert rows[0]["source_type"] == "static", "UPDATE was not rolled back" + assert "test_partial_boom" not in runner.applied_names() + + +def test_runner_skips_when_writes_are_frozen(tmp_db, monkeypatch): + """Frozen-write databases must not be mutated by the runner.""" + monkeypatch.setattr("ledgrab.storage.data_migrations.is_writes_frozen", lambda: True) + + calls = 0 + + class _Counted(DataMigration): + name = "test_frozen" + + def apply(self, conn): + nonlocal calls + calls += 1 + return 0 + + runner = MigrationRunner(tmp_db) + records = runner.run([_Counted()]) + + assert records == [] + assert calls == 0 + assert "test_frozen" not in runner.applied_names() + + +# --------------------------------------------------------------------------- +# Legacy static-rename specifics +# --------------------------------------------------------------------------- + + +def test_static_rename_only_touches_source_type_field(tmp_db): + """A row whose ``animation.type`` happens to contain ``static`` survives intact.""" + blob = { + "id": "css_safe01", + "name": "Has static_wave anim", + "source_type": "gradient", # not "static" — should be untouched + "animation": {"type": "static_wave", "speed": 1.0}, # substring trap + "stops": [{"position": 0.0, "color": [10, 20, 30]}], + "tags": [], + "led_count": 30, + "created_at": _now(), + "updated_at": _now(), + } + _insert_css(tmp_db, "css_safe01", blob) + + # Run the migration via the runner (clears the existing audit row first + # because the Database fixture already triggered it through any incidental + # store creation — here we drive it directly). + runner = MigrationRunner(tmp_db) + runner.run([StaticToSingleColorMigration()]) + + rows = tmp_db.load_all("color_strip_sources") + assert len(rows) == 1 + assert rows[0]["source_type"] == "gradient" + assert rows[0]["animation"]["type"] == "static_wave" + + +def test_static_rename_rewrites_only_legacy_rows(tmp_db): + legacy = { + "id": "css_legacy01", + "name": "Legacy", + "source_type": "static", + "color": [255, 0, 0], + "tags": [], + "led_count": 30, + "created_at": _now(), + "updated_at": _now(), + } + current = { + "id": "css_modern01", + "name": "Modern", + "source_type": "single_color", + "color": [0, 128, 255], + "tags": [], + "led_count": 30, + "created_at": _now(), + "updated_at": _now(), + } + _insert_css(tmp_db, "css_legacy01", legacy) + _insert_css(tmp_db, "css_modern01", current) + + runner = MigrationRunner(tmp_db) + runner.run([StaticToSingleColorMigration()]) + + rows = {r["id"]: r for r in tmp_db.load_all("color_strip_sources")} + assert rows["css_legacy01"]["source_type"] == "single_color" + assert rows["css_modern01"]["source_type"] == "single_color" + + +def test_static_rename_survives_corrupt_blob(tmp_db): + """A row whose JSON blob is corrupt does not crash the migration.""" + # Insert a row with deliberately broken JSON via direct SQL (bypass upsert + # which would re-serialize). We use a hand-crafted blob. + tmp_db.execute( + "INSERT INTO color_strip_sources (id, name, data) VALUES (?, ?, ?)", + ("css_bad01", "Bad", "{not valid json"), + ) + legacy = { + "id": "css_legacy02", + "name": "Legacy after corrupt", + "source_type": "static", + "color": [1, 2, 3], + "tags": [], + "led_count": 30, + "created_at": _now(), + "updated_at": _now(), + } + _insert_css(tmp_db, "css_legacy02", legacy) + + runner = MigrationRunner(tmp_db) + runner.run([StaticToSingleColorMigration()]) + + # Legacy row still migrated, corrupt row untouched, no exception + cursor = tmp_db.execute("SELECT id, data FROM color_strip_sources") + out = {row["id"]: row["data"] for row in cursor.fetchall()} + assert out["css_bad01"] == "{not valid json" + assert json.loads(out["css_legacy02"])["source_type"] == "single_color" + + +def test_all_migrations_have_unique_names(): + names = [m.name for m in ALL_MIGRATIONS] + assert len(names) == len(set(names)), f"duplicate names: {names}" + + +def test_color_strip_store_construction_records_static_migration(tmp_db): + """Integration: constructing the store applies AND records the migration.""" + from ledgrab.storage.color_strip_source import SingleColorStripSource + from ledgrab.storage.color_strip_store import ColorStripStore + + legacy = { + "id": "css_int01", + "name": "Integration legacy", + "source_type": "static", + "color": [1, 2, 3], + "tags": [], + "led_count": 30, + "created_at": _now(), + "updated_at": _now(), + } + _insert_css(tmp_db, "css_int01", legacy) + + store = ColorStripStore(tmp_db) + + # In-memory shape is correct + assert isinstance(store.get("css_int01"), SingleColorStripSource) + # Audit row exists + runner = MigrationRunner(tmp_db) + assert "001_color_strip_static_to_single_color" in runner.applied_names()