refactor(storage,processing): kind registries + versioned data migrations

Two CRITICAL data-safety bugs from the architecture audit and the two
worst parallel-change problems are fixed in one coherent pass.

Audit findings addressed:

- C2  silent CSS response fallback. The previous _RESPONSE_MAP fell
      through to a fabricated PictureCSSResponse whenever a source
      class lacked an entry; in particular game_event sources were
      silently mis-shaped. Now: GameEventCSSResponse/Create/Update
      schemas exist, _RESPONSE_MAP is re-keyed by source_type string,
      an import-time _assert_response_map_coverage() requires symmetric
      agreement with storage._SOURCE_TYPE_MAP, and the runtime path
      raises instead of fabricating a response.

- C11 string-replace JSON migration. ColorStripStore used
      blob.replace('"source_type": "static"', '"source_type":
      "single_color"') which can corrupt unrelated substrings (e.g.
      an animation type named "static_wave") and provides no audit,
      no transaction, no idempotency. Replaced with
      storage.data_migrations.MigrationRunner backed by a
      data_migrations audit table. Each migration runs inside one
      db.transaction() that covers the applied-check, the apply(),
      and the audit-INSERT — partial failures roll back atomically.
      StaticToSingleColorMigration parses each row with json.loads
      and mutates only the source_type field. Frozen-write databases
      skip with a warning.

- C3+C4 color-strip stream dispatch. The 7-branch elif in
      ColorStripStreamManager.acquire() and the duplicate one in
      ws_stream._create_stream() now share a single STREAM_BUILDERS
      registry in core.processing.color_strip_kinds, keyed by
      source.source_type. Both call sites populate a StreamDeps bag
      and delegate to build_stream(). _assert_stream_kind_coverage()
      asserts at import that STREAM_BUILDERS plus SHARABLE_KINDS
      partitions storage._SOURCE_TYPE_MAP. ws_stream's preview path
      wraps each FastAPI-DI getter in _safe() so non-audio previews
      no longer crash when audio/CSPT stores are not wired.

- C6+C7 value stream dispatch. The 14-branch isinstance ladder in
      ValueStreamManager._create_stream and its silent
      StaticValueStream(value=1.0) fallback are replaced by
      core.processing.value_kinds.STREAM_BUILDERS, keyed by
      source_type string (so AdaptiveValueSource's adaptive_time and
      adaptive_scene route to different builders correctly). The
      manager retains only the SyncClockRuntime pre-acquisition step
      for animated_color (kinds needing this are listed explicitly
      in NEEDS_CLOCK_RUNTIME). Symmetric coverage assertion plus a
      separate assertion that NEEDS_CLOCK_RUNTIME is a subset of the
      registry.

Bundled in: the static->single_color rename plus the HTTPValueStream
/ http_endpoint introduction that were already in flight on this
branch share these files; the registry refactor naturally absorbs
both via the new "single_color" / "static" alias entries and the
_build_http builder.

Tests: 26 new tests cover response-map coverage drift, migration
runner audit-table mechanics + transactional rollback +
frozen-write skip, and the two stream-builder registries. 343
existing storage / API / e2e tests stay green. Ruff clean.
This commit is contained in:
2026-05-22 22:45:28 +03:00
parent e24f9d33cc
commit 563cbac88c
15 changed files with 1976 additions and 315 deletions
@@ -9,6 +9,7 @@ from ledgrab.api.schemas.color_strip_sources import (
CompositeCSSResponse, CompositeCSSResponse,
DaylightCSSResponse, DaylightCSSResponse,
EffectCSSResponse, EffectCSSResponse,
GameEventCSSResponse,
GradientCSSResponse, GradientCSSResponse,
KeyColorsCSSResponse, KeyColorsCSSResponse,
MappedCSSResponse, MappedCSSResponse,
@@ -17,7 +18,7 @@ from ledgrab.api.schemas.color_strip_sources import (
PictureAdvancedCSSResponse, PictureAdvancedCSSResponse,
PictureCSSResponse, PictureCSSResponse,
ProcessedCSSResponse, ProcessedCSSResponse,
StaticCSSResponse, SingleColorCSSResponse,
WeatherCSSResponse, WeatherCSSResponse,
) )
from ledgrab.api.schemas.devices import Calibration as CalibrationSchema from ledgrab.api.schemas.devices import Calibration as CalibrationSchema
@@ -26,22 +27,7 @@ from ledgrab.core.capture.calibration import (
calibration_to_dict, calibration_to_dict,
) )
from ledgrab.storage.color_strip_source import ( from ledgrab.storage.color_strip_source import (
AdvancedPictureColorStripSource, _SOURCE_TYPE_MAP as _STORAGE_TYPE_MAP,
ApiInputColorStripSource,
AudioColorStripSource,
CandlelightColorStripSource,
CompositeColorStripSource,
DaylightColorStripSource,
EffectColorStripSource,
GradientColorStripSource,
KeyColorsColorStripSource,
MappedColorStripSource,
MathWaveColorStripSource,
NotificationColorStripSource,
PictureColorStripSource,
ProcessedColorStripSource,
StaticColorStripSource,
WeatherColorStripSource,
) )
from ledgrab.storage.picture_source import ( from ledgrab.storage.picture_source import (
ProcessedPictureSource, ProcessedPictureSource,
@@ -94,34 +80,46 @@ def _stops_schema(source) -> list[ColorStopSchema] | None:
return None return None
# Maps storage class → response builder lambda. # Maps ``source_type`` string → response builder.
#
# Keying by source_type (rather than type(source)) lets the import-time
# coverage check use the storage registry's keys directly, with no
# inversion or duplicate-class handling for legacy aliases.
_RESPONSE_MAP: dict = { _RESPONSE_MAP: dict = {
PictureColorStripSource: lambda s, kw: PictureCSSResponse( "picture": lambda s, kw: PictureCSSResponse(
**kw, **kw,
picture_source_id=s.picture_source_id, picture_source_id=s.picture_source_id,
smoothing=s.smoothing.to_dict(), smoothing=s.smoothing.to_dict(),
interpolation_mode=s.interpolation_mode, interpolation_mode=s.interpolation_mode,
calibration=_calibration_schema(s), calibration=_calibration_schema(s),
), ),
AdvancedPictureColorStripSource: lambda s, kw: PictureAdvancedCSSResponse( "picture_advanced": lambda s, kw: PictureAdvancedCSSResponse(
**kw, **kw,
smoothing=s.smoothing.to_dict(), smoothing=s.smoothing.to_dict(),
interpolation_mode=s.interpolation_mode, interpolation_mode=s.interpolation_mode,
calibration=_calibration_schema(s), calibration=_calibration_schema(s),
), ),
StaticColorStripSource: lambda s, kw: StaticCSSResponse( "single_color": lambda s, kw: SingleColorCSSResponse(
**kw, **kw,
color=s.color.to_dict(), color=s.color.to_dict(),
animation=s.animation, animation=s.animation,
), ),
GradientColorStripSource: lambda s, kw: GradientCSSResponse( # Legacy alias: pre-rename rows used "static"; the data migration rewrites
# them on first store load but a stale in-flight instance would still
# carry source_type='static' until the next reload.
"static": lambda s, kw: SingleColorCSSResponse(
**kw,
color=s.color.to_dict(),
animation=s.animation,
),
"gradient": lambda s, kw: GradientCSSResponse(
**kw, **kw,
stops=_stops_schema(s), stops=_stops_schema(s),
animation=s.animation, animation=s.animation,
easing=s.easing, easing=s.easing,
gradient_id=s.gradient_id, gradient_id=s.gradient_id,
), ),
EffectColorStripSource: lambda s, kw: EffectCSSResponse( "effect": lambda s, kw: EffectCSSResponse(
**kw, **kw,
effect_type=s.effect_type, effect_type=s.effect_type,
palette=s.palette, palette=s.palette,
@@ -132,15 +130,15 @@ _RESPONSE_MAP: dict = {
mirror=s.mirror, mirror=s.mirror,
custom_palette=s.custom_palette, custom_palette=s.custom_palette,
), ),
CompositeColorStripSource: lambda s, kw: CompositeCSSResponse( "composite": lambda s, kw: CompositeCSSResponse(
**kw, **kw,
layers=[dict(layer) for layer in s.layers], layers=[dict(layer) for layer in s.layers],
), ),
MappedColorStripSource: lambda s, kw: MappedCSSResponse( "mapped": lambda s, kw: MappedCSSResponse(
**kw, **kw,
zones=[dict(z) for z in s.zones], zones=[dict(z) for z in s.zones],
), ),
AudioColorStripSource: lambda s, kw: AudioCSSResponse( "audio": lambda s, kw: AudioCSSResponse(
**kw, **kw,
visualization_mode=s.visualization_mode, visualization_mode=s.visualization_mode,
audio_source_id=s.audio_source_id, audio_source_id=s.audio_source_id,
@@ -153,13 +151,13 @@ _RESPONSE_MAP: dict = {
mirror=s.mirror, mirror=s.mirror,
beat_decay=s.beat_decay.to_dict(), beat_decay=s.beat_decay.to_dict(),
), ),
ApiInputColorStripSource: lambda s, kw: ApiInputCSSResponse( "api_input": lambda s, kw: ApiInputCSSResponse(
**kw, **kw,
fallback_color=s.fallback_color.to_dict(), fallback_color=s.fallback_color.to_dict(),
timeout=s.timeout.to_dict(), timeout=s.timeout.to_dict(),
interpolation=s.interpolation, interpolation=s.interpolation,
), ),
NotificationColorStripSource: lambda s, kw: NotificationCSSResponse( "notification": lambda s, kw: NotificationCSSResponse(
**kw, **kw,
notification_effect=s.notification_effect, notification_effect=s.notification_effect,
duration_ms=s.duration_ms.to_dict(), duration_ms=s.duration_ms.to_dict(),
@@ -172,14 +170,14 @@ _RESPONSE_MAP: dict = {
sound_volume=s.sound_volume.to_dict(), sound_volume=s.sound_volume.to_dict(),
app_sounds=dict(s.app_sounds), app_sounds=dict(s.app_sounds),
), ),
DaylightColorStripSource: lambda s, kw: DaylightCSSResponse( "daylight": lambda s, kw: DaylightCSSResponse(
**kw, **kw,
speed=s.speed.to_dict(), speed=s.speed.to_dict(),
use_real_time=s.use_real_time, use_real_time=s.use_real_time,
latitude=s.latitude, latitude=s.latitude,
longitude=s.longitude, longitude=s.longitude,
), ),
CandlelightColorStripSource: lambda s, kw: CandlelightCSSResponse( "candlelight": lambda s, kw: CandlelightCSSResponse(
**kw, **kw,
color=s.color.to_dict(), color=s.color.to_dict(),
intensity=s.intensity.to_dict(), intensity=s.intensity.to_dict(),
@@ -188,18 +186,18 @@ _RESPONSE_MAP: dict = {
wind_strength=s.wind_strength.to_dict(), wind_strength=s.wind_strength.to_dict(),
candle_type=s.candle_type, candle_type=s.candle_type,
), ),
ProcessedColorStripSource: lambda s, kw: ProcessedCSSResponse( "processed": lambda s, kw: ProcessedCSSResponse(
**kw, **kw,
input_source_id=s.input_source_id, input_source_id=s.input_source_id,
processing_template_id=s.processing_template_id, processing_template_id=s.processing_template_id,
), ),
WeatherColorStripSource: lambda s, kw: WeatherCSSResponse( "weather": lambda s, kw: WeatherCSSResponse(
**kw, **kw,
weather_source_id=s.weather_source_id, weather_source_id=s.weather_source_id,
speed=s.speed.to_dict(), speed=s.speed.to_dict(),
temperature_influence=s.temperature_influence.to_dict(), temperature_influence=s.temperature_influence.to_dict(),
), ),
KeyColorsColorStripSource: lambda s, kw: KeyColorsCSSResponse( "key_colors": lambda s, kw: KeyColorsCSSResponse(
**kw, **kw,
picture_source_id=s.picture_source_id, picture_source_id=s.picture_source_id,
rectangles=[r.to_dict() for r in s.rectangles], rectangles=[r.to_dict() for r in s.rectangles],
@@ -207,28 +205,67 @@ _RESPONSE_MAP: dict = {
smoothing=s.smoothing.to_dict(), smoothing=s.smoothing.to_dict(),
brightness=s.brightness.to_dict(), brightness=s.brightness.to_dict(),
), ),
MathWaveColorStripSource: lambda s, kw: MathWaveCSSResponse( "math_wave": lambda s, kw: MathWaveCSSResponse(
**kw, **kw,
waves=s.waves, waves=s.waves,
speed=s.speed.to_dict(), speed=s.speed.to_dict(),
gradient_id=s.gradient_id, gradient_id=s.gradient_id,
), ),
"game_event": lambda s, kw: GameEventCSSResponse(
**kw,
game_integration_id=s.game_integration_id,
idle_color=s.idle_color.to_dict(),
event_mappings=[dict(m) for m in s.event_mappings],
),
} }
def _assert_response_map_coverage() -> None:
"""Verify _RESPONSE_MAP has a builder for every kind in storage's registry.
Runs at module import. Surfaces missing builders eagerly instead of
letting a request fall through to a silent / wrong response shape.
Contract note
-------------
This check is **symmetric** (``_RESPONSE_MAP keys == storage_kinds``)
because every kind — sharable or not — needs a response shape. The
sister assertion in
``core/processing/color_strip_kinds.py::_assert_stream_kind_coverage``
is asymmetric because sharable kinds construct their streams via a
different path. Adding a new kind requires keeping all three registries
aligned: storage's ``_SOURCE_TYPE_MAP``, this ``_RESPONSE_MAP``, and
either ``STREAM_BUILDERS`` or ``SHARABLE_KINDS``.
"""
storage_kinds = set(_STORAGE_TYPE_MAP.keys())
builder_kinds = set(_RESPONSE_MAP.keys())
missing = storage_kinds - builder_kinds
extra = builder_kinds - storage_kinds
if missing or extra:
problems = []
if missing:
problems.append(f"missing builders for: {sorted(missing)}")
if extra:
problems.append(f"unregistered kinds in _RESPONSE_MAP: {sorted(extra)}")
raise RuntimeError(
"_RESPONSE_MAP is out of sync with storage._SOURCE_TYPE_MAP: " + "; ".join(problems)
)
_assert_response_map_coverage()
def _css_to_response(source, overlay_active: bool = False) -> ColorStripSourceResponse: def _css_to_response(source, overlay_active: bool = False) -> ColorStripSourceResponse:
"""Convert a ColorStripSource to the matching per-type response schema.""" """Convert a ColorStripSource to the matching per-type response schema."""
kw = _common_response_kwargs(source, overlay_active) kw = _common_response_kwargs(source, overlay_active)
builder = _RESPONSE_MAP.get(type(source)) builder = _RESPONSE_MAP.get(source.source_type)
if builder is None: if builder is None:
# Fallback: use to_dict() and build a PictureCSSResponse # Coverage is asserted at import time, so reaching this branch means a
logger.warning("No response builder for %s, falling back", type(source).__name__) # source was loaded with a source_type that is not registered.
return PictureCSSResponse( # Surface the bug instead of silently returning a wrong-shaped response.
**kw, raise RuntimeError(
picture_source_id="", f"No CSS response builder registered for source_type "
smoothing=0.3, f"{source.source_type!r} (class={type(source).__name__})"
interpolation_mode="average",
calibration=None,
) )
return builder(source, kw) return builder(source, kw)
@@ -29,7 +29,7 @@ router = APIRouter()
_PREVIEW_ALLOWED_TYPES = { _PREVIEW_ALLOWED_TYPES = {
"static", "single_color",
"gradient", "gradient",
"effect", "effect",
"daylight", "daylight",
@@ -97,65 +97,65 @@ async def preview_color_strip_ws(
return ColorStripSource.from_dict(config) return ColorStripSource.from_dict(config)
def _create_stream(source): def _create_stream(source):
"""Instantiate and start the appropriate stream class for *source*.""" """Instantiate and start the appropriate stream class for *source*.
from ledgrab.core.processing.color_strip_stream_manager import _SIMPLE_STREAM_MAP
mgr = get_processor_manager() Delegates the per-kind dispatch to ``color_strip_kinds.build_stream``
csm = mgr.color_strip_stream_manager so this preview path and the production ``ColorStripStreamManager``
share a single registry. Per-kind dependencies (CSPT store, audio
stores, weather manager, …) are gathered into a ``StreamDeps`` bag.
if source.source_type == "audio": FastAPI-DI providers raise ``RuntimeError`` when they aren't wired,
so we resolve each one through ``_safe`` and pass ``None`` on
failure. The per-kind builder will still see a clear error if a
truly-required dep is missing for that kind, but unrelated previews
(e.g. a ``single_color`` preview on a fresh install where the CSPT
store isn't initialized yet) keep working.
"""
from ledgrab.api.dependencies import ( from ledgrab.api.dependencies import (
get_audio_processing_template_store, get_audio_processing_template_store,
get_audio_source_store, get_audio_source_store,
get_audio_template_store, get_audio_template_store,
get_cspt_store,
) )
from ledgrab.core.processing.audio_stream import AudioColorStripStream from ledgrab.core.processing.color_strip_kinds import StreamDeps, build_stream
s = AudioColorStripStream( def _safe(getter):
source, try:
mgr.audio_capture_manager, return getter()
get_audio_source_store(), except RuntimeError as e:
get_audio_template_store(), logger.debug("Preview dep not available (%s): %s", getter.__name__, e)
get_audio_processing_template_store(), return None
)
elif source.source_type == "weather":
from ledgrab.core.processing.weather_stream import WeatherColorStripStream
s = WeatherColorStripStream(source, mgr.weather_manager) mgr = get_processor_manager()
elif source.source_type == "game_event": csm = mgr.color_strip_stream_manager
from ledgrab.core.processing.game_event_stream import GameEventColorStripStream
s = GameEventColorStripStream(source) # The game-event bus is optional in preview contexts.
try: try:
from ledgrab.api.dependencies import get_game_event_bus from ledgrab.api.dependencies import get_game_event_bus
bus = get_game_event_bus() game_event_bus = get_game_event_bus()
except RuntimeError as e: except RuntimeError as e:
logger.debug("Preview: no game event bus available: %s", e) logger.debug("Preview: no game event bus available: %s", e)
else: game_event_bus = None
if bus is not None:
s.set_event_bus(bus)
elif source.source_type == "mapped":
from ledgrab.core.processing.mapped_stream import MappedColorStripStream
s = MappedColorStripStream(source, csm) deps = StreamDeps(
elif source.source_type == "composite": css_manager=csm,
from ledgrab.api.dependencies import get_cspt_store value_stream_manager=mgr.value_stream_manager,
from ledgrab.core.processing.composite_stream import CompositeColorStripStream cspt_store=_safe(get_cspt_store),
weather_manager=mgr.weather_manager,
s = CompositeColorStripStream( audio_capture_manager=mgr.audio_capture_manager,
source, csm, mgr.value_stream_manager, get_cspt_store(), depth=0 audio_source_store=_safe(get_audio_source_store),
audio_template_store=_safe(get_audio_template_store),
audio_processing_template_store=_safe(get_audio_processing_template_store),
game_event_bus=game_event_bus,
depth=0,
) )
elif source.source_type == "processed": try:
from ledgrab.api.dependencies import get_cspt_store s = build_stream(source, deps)
from ledgrab.core.processing.processed_stream import ProcessedColorStripStream except ValueError as e:
# Preserve the registry's original detail so the API consumer
s = ProcessedColorStripStream(source, csm, get_cspt_store()) # sees which kind was rejected, not just a generic message.
else: raise ValueError(f"Unsupported preview source_type: {e}") from e
stream_cls = _SIMPLE_STREAM_MAP.get(source.source_type)
if not stream_cls:
raise ValueError(f"Unsupported preview source_type: {source.source_type}")
s = stream_cls(source)
# Inject gradient store for palette resolution # Inject gradient store for palette resolution
if hasattr(s, "set_gradient_store"): if hasattr(s, "set_gradient_store"):
try: try:
@@ -428,8 +428,17 @@ async def css_api_input_ws(
continue continue
elif "bytes" in message: elif "bytes" in message:
# Binary frame: raw RGBRGB... bytes (3 bytes per LED) # Binary frame: raw RGBRGB... bytes (3 bytes per LED).
# Cap to a generous upper bound on the LED count — a hostile
# client could otherwise stream 100 MB frames and OOM the
# server before any application logic ran.
raw_bytes = message["bytes"] raw_bytes = message["bytes"]
_MAX_BINARY_LEDS = 8192
if len(raw_bytes) > _MAX_BINARY_LEDS * 3:
await websocket.send_json(
{"error": f"Binary frame too large (max {_MAX_BINARY_LEDS} LEDs)"}
)
continue
if len(raw_bytes) % 3 != 0: if len(raw_bytes) % 3 != 0:
await websocket.send_json({"error": "Binary data must be multiple of 3 bytes"}) await websocket.send_json({"error": "Binary data must be multiple of 3 bytes"})
continue continue
@@ -122,9 +122,9 @@ class PictureAdvancedCSSResponse(_CSSResponseBase):
calibration: Optional[Calibration] = Field(None, description="LED calibration") calibration: Optional[Calibration] = Field(None, description="LED calibration")
class StaticCSSResponse(_CSSResponseBase): class SingleColorCSSResponse(_CSSResponseBase):
source_type: Literal["static"] = "static" source_type: Literal["single_color"] = "single_color"
color: Any = Field(description="Static RGB color") color: Any = Field(description="Solid RGB color")
animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config") animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config")
@@ -240,11 +240,18 @@ class MathWaveCSSResponse(_CSSResponseBase):
gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping") gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping")
class GameEventCSSResponse(_CSSResponseBase):
source_type: Literal["game_event"] = "game_event"
game_integration_id: str = Field(description="Game integration entity ID")
idle_color: Any = Field(description="Idle RGB color (bindable)")
event_mappings: List[dict] = Field(default_factory=list, description="Event-to-effect mappings")
ColorStripSourceResponse = Annotated[ ColorStripSourceResponse = Annotated[
Union[ Union[
Annotated[PictureCSSResponse, Tag("picture")], Annotated[PictureCSSResponse, Tag("picture")],
Annotated[PictureAdvancedCSSResponse, Tag("picture_advanced")], Annotated[PictureAdvancedCSSResponse, Tag("picture_advanced")],
Annotated[StaticCSSResponse, Tag("static")], Annotated[SingleColorCSSResponse, Tag("single_color")],
Annotated[GradientCSSResponse, Tag("gradient")], Annotated[GradientCSSResponse, Tag("gradient")],
Annotated[EffectCSSResponse, Tag("effect")], Annotated[EffectCSSResponse, Tag("effect")],
Annotated[CompositeCSSResponse, Tag("composite")], Annotated[CompositeCSSResponse, Tag("composite")],
@@ -258,6 +265,7 @@ ColorStripSourceResponse = Annotated[
Annotated[WeatherCSSResponse, Tag("weather")], Annotated[WeatherCSSResponse, Tag("weather")],
Annotated[KeyColorsCSSResponse, Tag("key_colors")], Annotated[KeyColorsCSSResponse, Tag("key_colors")],
Annotated[MathWaveCSSResponse, Tag("math_wave")], Annotated[MathWaveCSSResponse, Tag("math_wave")],
Annotated[GameEventCSSResponse, Tag("game_event")],
], ],
Discriminator("source_type"), Discriminator("source_type"),
] ]
@@ -303,9 +311,9 @@ class PictureAdvancedCSSCreate(_CSSCreateBase):
calibration: Optional[Calibration] = Field(None, description="LED calibration") calibration: Optional[Calibration] = Field(None, description="LED calibration")
class StaticCSSCreate(_CSSCreateBase): class SingleColorCSSCreate(_CSSCreateBase):
source_type: Literal["static"] = "static" source_type: Literal["single_color"] = "single_color"
color: Any = Field(default=None, description="Static RGB color [R,G,B]") color: Any = Field(default=None, description="Solid RGB color [R,G,B]")
animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config") animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config")
@@ -434,11 +442,18 @@ class MathWaveCSSCreate(_CSSCreateBase):
gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping") gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping")
class GameEventCSSCreate(_CSSCreateBase):
source_type: Literal["game_event"] = "game_event"
game_integration_id: Optional[str] = Field(None, description="Game integration entity ID")
idle_color: Any = Field(default=None, description="Idle RGB color [R,G,B] (bindable)")
event_mappings: Optional[List[dict]] = Field(None, description="Event-to-effect mappings")
ColorStripSourceCreate = Annotated[ ColorStripSourceCreate = Annotated[
Union[ Union[
Annotated[PictureCSSCreate, Tag("picture")], Annotated[PictureCSSCreate, Tag("picture")],
Annotated[PictureAdvancedCSSCreate, Tag("picture_advanced")], Annotated[PictureAdvancedCSSCreate, Tag("picture_advanced")],
Annotated[StaticCSSCreate, Tag("static")], Annotated[SingleColorCSSCreate, Tag("single_color")],
Annotated[GradientCSSCreate, Tag("gradient")], Annotated[GradientCSSCreate, Tag("gradient")],
Annotated[EffectCSSCreate, Tag("effect")], Annotated[EffectCSSCreate, Tag("effect")],
Annotated[CompositeCSSCreate, Tag("composite")], Annotated[CompositeCSSCreate, Tag("composite")],
@@ -452,6 +467,7 @@ ColorStripSourceCreate = Annotated[
Annotated[WeatherCSSCreate, Tag("weather")], Annotated[WeatherCSSCreate, Tag("weather")],
Annotated[KeyColorsCSSCreate, Tag("key_colors")], Annotated[KeyColorsCSSCreate, Tag("key_colors")],
Annotated[MathWaveCSSCreate, Tag("math_wave")], Annotated[MathWaveCSSCreate, Tag("math_wave")],
Annotated[GameEventCSSCreate, Tag("game_event")],
], ],
Discriminator("source_type"), Discriminator("source_type"),
] ]
@@ -497,9 +513,9 @@ class PictureAdvancedCSSUpdate(_CSSUpdateBase):
calibration: Optional[Calibration] = Field(None, description="LED calibration") calibration: Optional[Calibration] = Field(None, description="LED calibration")
class StaticCSSUpdate(_CSSUpdateBase): class SingleColorCSSUpdate(_CSSUpdateBase):
source_type: Literal["static"] = "static" source_type: Literal["single_color"] = "single_color"
color: Any = Field(default=None, description="Static RGB color [R,G,B]") color: Any = Field(default=None, description="Solid RGB color [R,G,B]")
animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config") animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config")
@@ -626,11 +642,18 @@ class MathWaveCSSUpdate(_CSSUpdateBase):
gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping") gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping")
class GameEventCSSUpdate(_CSSUpdateBase):
source_type: Literal["game_event"] = "game_event"
game_integration_id: Optional[str] = Field(None, description="Game integration entity ID")
idle_color: Any = Field(default=None, description="Idle RGB color [R,G,B] (bindable)")
event_mappings: Optional[List[dict]] = Field(None, description="Event-to-effect mappings")
ColorStripSourceUpdate = Annotated[ ColorStripSourceUpdate = Annotated[
Union[ Union[
Annotated[PictureCSSUpdate, Tag("picture")], Annotated[PictureCSSUpdate, Tag("picture")],
Annotated[PictureAdvancedCSSUpdate, Tag("picture_advanced")], Annotated[PictureAdvancedCSSUpdate, Tag("picture_advanced")],
Annotated[StaticCSSUpdate, Tag("static")], Annotated[SingleColorCSSUpdate, Tag("single_color")],
Annotated[GradientCSSUpdate, Tag("gradient")], Annotated[GradientCSSUpdate, Tag("gradient")],
Annotated[EffectCSSUpdate, Tag("effect")], Annotated[EffectCSSUpdate, Tag("effect")],
Annotated[CompositeCSSUpdate, Tag("composite")], Annotated[CompositeCSSUpdate, Tag("composite")],
@@ -644,6 +667,7 @@ ColorStripSourceUpdate = Annotated[
Annotated[WeatherCSSUpdate, Tag("weather")], Annotated[WeatherCSSUpdate, Tag("weather")],
Annotated[KeyColorsCSSUpdate, Tag("key_colors")], Annotated[KeyColorsCSSUpdate, Tag("key_colors")],
Annotated[MathWaveCSSUpdate, Tag("math_wave")], Annotated[MathWaveCSSUpdate, Tag("math_wave")],
Annotated[GameEventCSSUpdate, Tag("game_event")],
], ],
Discriminator("source_type"), Discriminator("source_type"),
] ]
@@ -0,0 +1,273 @@
"""Single source of truth for non-sharable color-strip stream construction.
Both the preview WebSocket (``api/routes/color_strip_sources/ws_stream.py``)
and the production ``ColorStripStreamManager.acquire`` used to maintain
parallel ``if source.source_type == "..." elif ... else ..._SIMPLE_STREAM_MAP``
chains. Adding a new kind required keeping those two chains in lockstep, and
silently fell through to a generic stream class when an entry was missed.
This module replaces both chains with a single ``STREAM_BUILDERS`` registry
plus a small ``StreamDeps`` dependency bag. Each caller populates the bag
from its own context (DI container, processor manager, etc.) and looks the
builder up by ``source.source_type``. A coverage assertion at import time
guarantees every kind in ``storage._SOURCE_TYPE_MAP`` is either sharable or
has a builder here — silent fall-throughs are no longer possible.
Sharable kinds (``picture``, ``picture_advanced``, ``key_colors``) are NOT in
this registry: they require an injected ``LiveStream`` whose acquisition is
intertwined with the source's calibration, which does not fit a uniform
factory signature. Those continue to use bespoke paths inside
``ColorStripStreamManager``.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Callable
@dataclass(frozen=True)
class StreamDeps:
"""Dependency bag for non-sharable stream construction.
Each call site (preview WebSocket, production stream manager) builds one
of these from its own context before invoking :func:`build_stream`.
Fields are ``Any`` (with ``None`` defaults) because individual builders
only consume a subset; tests can supply a minimal bag.
``frozen=True`` guards against a builder accidentally reassigning a
field; it does NOT make the referenced objects immutable — the
``css_manager``, stores, etc. are live mutable services.
``css_manager`` is needed by composite/mapped/processed builders so they
can recursively acquire dependent streams. Single-kind builders ignore
it. The field has no default so callers are forced to think about which
manager they are wiring through.
"""
css_manager: Any
value_stream_manager: Any = None
cspt_store: Any = None # ColorStripProcessingTemplateStore
weather_manager: Any = None
audio_capture_manager: Any = None
audio_source_store: Any = None
audio_template_store: Any = None
audio_processing_template_store: Any = None
game_event_bus: Any = None
depth: int = 0 # composite nesting depth — passed through verbatim
# ---------------------------------------------------------------------------
# Per-kind builders
#
# Each builder is a small free function ``(source, deps) -> ColorStripStream``.
# Imports are deferred to keep this module cheap to import (the production
# processing graph is large).
# ---------------------------------------------------------------------------
def _build_audio(source, d: StreamDeps):
from ledgrab.core.processing.audio_stream import AudioColorStripStream
return AudioColorStripStream(
source,
d.audio_capture_manager,
d.audio_source_store,
d.audio_template_store,
d.audio_processing_template_store,
)
def _build_composite(source, d: StreamDeps):
from ledgrab.core.processing.composite_stream import CompositeColorStripStream
return CompositeColorStripStream(
source,
d.css_manager,
d.value_stream_manager,
d.cspt_store,
depth=d.depth,
)
def _build_mapped(source, d: StreamDeps):
from ledgrab.core.processing.mapped_stream import MappedColorStripStream
return MappedColorStripStream(source, d.css_manager)
def _build_processed(source, d: StreamDeps):
from ledgrab.core.processing.processed_stream import ProcessedColorStripStream
return ProcessedColorStripStream(source, d.css_manager, d.cspt_store)
def _build_weather(source, d: StreamDeps):
from ledgrab.core.processing.weather_stream import WeatherColorStripStream
return WeatherColorStripStream(source, d.weather_manager)
def _build_game_event(source, d: StreamDeps):
from ledgrab.core.processing.game_event_stream import GameEventColorStripStream
stream = GameEventColorStripStream(source)
if d.game_event_bus is not None:
stream.set_event_bus(d.game_event_bus)
return stream
def _make_source_only_builder(loader: Callable[[], type]) -> Callable[[Any, StreamDeps], Any]:
"""Wrap a class-loader so it produces a uniform ``(source, deps) -> stream`` builder.
The loader is called on each invocation but module caching makes the
import a single dict lookup after the first call.
"""
def _build(source, _deps: StreamDeps):
return loader()(source)
return _build
def _single_color_cls() -> type:
from ledgrab.core.processing.color_strip_stream import SingleColorStripStream
return SingleColorStripStream
def _gradient_cls() -> type:
from ledgrab.core.processing.color_strip_stream import GradientColorStripStream
return GradientColorStripStream
def _effect_cls() -> type:
from ledgrab.core.processing.effect_stream import EffectColorStripStream
return EffectColorStripStream
def _api_input_cls() -> type:
from ledgrab.core.processing.api_input_stream import ApiInputColorStripStream
return ApiInputColorStripStream
def _notification_cls() -> type:
from ledgrab.core.processing.notification_stream import NotificationColorStripStream
return NotificationColorStripStream
def _daylight_cls() -> type:
from ledgrab.core.processing.daylight_stream import DaylightColorStripStream
return DaylightColorStripStream
def _candlelight_cls() -> type:
from ledgrab.core.processing.candlelight_stream import CandlelightColorStripStream
return CandlelightColorStripStream
def _math_wave_cls() -> type:
from ledgrab.core.processing.math_wave_stream import MathWaveColorStripStream
return MathWaveColorStripStream
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
StreamBuilder = Callable[[Any, StreamDeps], Any]
STREAM_BUILDERS: dict[str, StreamBuilder] = {
"audio": _build_audio,
"composite": _build_composite,
"mapped": _build_mapped,
"processed": _build_processed,
"weather": _build_weather,
"game_event": _build_game_event,
"single_color": _make_source_only_builder(_single_color_cls),
# Legacy alias: pre-rename rows used "static". The data migration rewrites
# the on-disk source_type on startup, but this alias keeps an in-flight
# legacy entry resolving to the right stream class.
"static": _make_source_only_builder(_single_color_cls),
"gradient": _make_source_only_builder(_gradient_cls),
"effect": _make_source_only_builder(_effect_cls),
"api_input": _make_source_only_builder(_api_input_cls),
"notification": _make_source_only_builder(_notification_cls),
"daylight": _make_source_only_builder(_daylight_cls),
"candlelight": _make_source_only_builder(_candlelight_cls),
"math_wave": _make_source_only_builder(_math_wave_cls),
}
# Sharable kinds are handled by dedicated LiveStream-acquisition paths in
# ColorStripStreamManager (their construction depends on calibration → picture
# source resolution, which does not fit a uniform factory signature).
SHARABLE_KINDS: frozenset[str] = frozenset({"picture", "picture_advanced", "key_colors"})
def build_stream(source, deps: StreamDeps):
"""Build a non-sharable color-strip stream for *source*.
Raises ``ValueError`` if the kind has no registered builder (which
would indicate a sharable source slipped through the caller's
``sharable`` gate, or a new kind missing from this registry).
"""
builder = STREAM_BUILDERS.get(source.source_type)
if builder is None:
raise ValueError(
f"No stream builder for non-sharable color-strip-source kind "
f"{source.source_type!r} (id={getattr(source, 'id', '?')!r})"
)
return builder(source, deps)
def _assert_stream_kind_coverage() -> None:
"""Verify the registry is a strict partition: every kind from storage is
either listed in SHARABLE_KINDS or has a STREAM_BUILDERS entry.
Runs at module import so a kind added to ``_SOURCE_TYPE_MAP`` without a
corresponding builder fails the server boot loudly instead of silently
falling through at request time.
Contract note
-------------
This check is **asymmetric** (``STREAM_BUILDERS SHARABLE_KINDS ==
storage_kinds``) because sharable kinds are constructed by a separate
path inside ``ColorStripStreamManager``. The sister assertion in
``api/routes/color_strip_sources/_helpers.py::_assert_response_map_coverage``
is **symmetric** (``_RESPONSE_MAP keys == storage_kinds``) because every
kind, sharable or not, still needs a response shape. Both assertions key
by the ``source_type`` string; adding a new kind requires updates to
storage, ``_RESPONSE_MAP``, and either ``STREAM_BUILDERS`` or
``SHARABLE_KINDS``. Both assertions catch missing entries; only this one
expects a subset relationship.
"""
from ledgrab.storage.color_strip_source import _SOURCE_TYPE_MAP
storage_kinds = set(_SOURCE_TYPE_MAP.keys())
builder_kinds = set(STREAM_BUILDERS.keys())
expected_non_sharable = storage_kinds - SHARABLE_KINDS
missing = expected_non_sharable - builder_kinds
extra = builder_kinds - storage_kinds
if missing or extra:
problems = []
if missing:
problems.append(f"missing builders: {sorted(missing)}")
if extra:
problems.append(f"unregistered kinds: {sorted(extra)}")
raise RuntimeError(
"color_strip_kinds.STREAM_BUILDERS is out of sync with storage._SOURCE_TYPE_MAP: "
+ "; ".join(problems)
)
_assert_stream_kind_coverage()
@@ -3,7 +3,7 @@
PictureColorStripStreams (expensive screen capture) are shared across multiple PictureColorStripStreams (expensive screen capture) are shared across multiple
consumers via reference counting — processing runs once, not once per target. consumers via reference counting — processing runs once, not once per target.
Count-dependent streams (static, gradient, effect) are NOT shared. Count-dependent streams (single_color, gradient, effect) are NOT shared.
Each consumer gets its own instance so it can configure an independent LED count Each consumer gets its own instance so it can configure an independent LED count
without interfering with other targets. without interfering with other targets.
""" """
@@ -11,37 +11,18 @@ without interfering with other targets.
from dataclasses import dataclass from dataclasses import dataclass
from typing import Dict, Optional from typing import Dict, Optional
from ledgrab.core.processing.color_strip_kinds import (
StreamDeps,
build_stream,
)
from ledgrab.core.processing.color_strip_stream import ( from ledgrab.core.processing.color_strip_stream import (
ColorStripStream, ColorStripStream,
GradientColorStripStream,
PictureColorStripStream, PictureColorStripStream,
StaticColorStripStream,
) )
from ledgrab.core.processing.processed_stream import ProcessedColorStripStream
from ledgrab.core.processing.effect_stream import EffectColorStripStream
from ledgrab.core.processing.api_input_stream import ApiInputColorStripStream
from ledgrab.core.processing.notification_stream import NotificationColorStripStream
from ledgrab.core.processing.daylight_stream import DaylightColorStripStream
from ledgrab.core.processing.candlelight_stream import CandlelightColorStripStream
from ledgrab.core.processing.game_event_stream import GameEventColorStripStream
from ledgrab.core.processing.math_wave_stream import MathWaveColorStripStream
from ledgrab.utils import get_logger from ledgrab.utils import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
# source_type → stream class for non-picture (non-sharable) sources
_SIMPLE_STREAM_MAP = {
"static": StaticColorStripStream,
"gradient": GradientColorStripStream,
"effect": EffectColorStripStream,
"api_input": ApiInputColorStripStream,
"notification": NotificationColorStripStream,
"daylight": DaylightColorStripStream,
"candlelight": CandlelightColorStripStream,
"game_event": GameEventColorStripStream,
"math_wave": MathWaveColorStripStream,
}
@dataclass @dataclass
class _ColorStripEntry: class _ColorStripEntry:
@@ -241,43 +222,27 @@ class ColorStripStreamManager:
""" """
source = self._color_strip_store.get_source(css_id) source = self._color_strip_store.get_source(css_id)
# Non-sharable: always create a fresh per-consumer instance # Non-sharable: always create a fresh per-consumer instance.
# Construction is delegated to the per-kind registry in
# ``color_strip_kinds`` so the dispatch lives in exactly one place.
if not source.sharable: if not source.sharable:
if source.source_type == "audio": deps = StreamDeps(
from ledgrab.core.processing.audio_stream import AudioColorStripStream css_manager=self,
value_stream_manager=self._value_stream_manager,
css_stream = AudioColorStripStream( cspt_store=self._cspt_store,
source, weather_manager=self._weather_manager,
self._audio_capture_manager, audio_capture_manager=self._audio_capture_manager,
self._audio_source_store, audio_source_store=self._audio_source_store,
self._audio_template_store, audio_template_store=self._audio_template_store,
self._audio_processing_template_store, audio_processing_template_store=self._audio_processing_template_store,
game_event_bus=self._game_event_bus,
depth=depth,
) )
elif source.source_type == "composite": try:
from ledgrab.core.processing.composite_stream import ( css_stream = build_stream(source, deps)
CompositeColorStripStream, except ValueError as e:
) # Surface the css_id alongside the registry's error.
raise ValueError(f"{e} (css_id={css_id})") from e
css_stream = CompositeColorStripStream(
source, self, self._value_stream_manager, self._cspt_store, depth=depth
)
elif source.source_type == "mapped":
from ledgrab.core.processing.mapped_stream import MappedColorStripStream
css_stream = MappedColorStripStream(source, self)
elif source.source_type == "processed":
css_stream = ProcessedColorStripStream(source, self, self._cspt_store)
elif source.source_type == "weather":
from ledgrab.core.processing.weather_stream import WeatherColorStripStream
css_stream = WeatherColorStripStream(source, self._weather_manager)
else:
stream_cls = _SIMPLE_STREAM_MAP.get(source.source_type)
if not stream_cls:
raise ValueError(
f"Unsupported color strip source type '{source.source_type}' for {css_id}"
)
css_stream = stream_cls(source)
# Inject gradient store for palette resolution # Inject gradient store for palette resolution
if self._gradient_store and hasattr(css_stream, "set_gradient_store"): if self._gradient_store and hasattr(css_stream, "set_gradient_store"):
css_stream.set_gradient_store(self._gradient_store) css_stream.set_gradient_store(self._gradient_store)
@@ -0,0 +1,354 @@
"""Single source of truth for value-stream construction.
``ValueStreamManager._create_stream`` used to be a 168-line ``isinstance``
ladder over 14 ``ValueSource`` subclasses with a silent fallback to
``StaticValueStream(value=1.0)``. The ladder forced any new value kind to
edit the factory plus the storage subclass plus the schemas plus the store's
``create_source`` — and a missing branch corrupted the stream at runtime.
This module replaces the ladder with a single ``STREAM_BUILDERS`` registry
keyed by the ``source_type`` string (matching the storage layer's
``_VALUE_SOURCE_MAP``). An import-time coverage assertion guarantees the two
registries stay aligned.
Builders take a ``(source, deps: ValueStreamDeps) -> ValueStream`` shape so
both the production manager and any preview / test harness can populate the
deps from their own context.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Callable, Optional
if TYPE_CHECKING:
# Typed forward references so mypy/pyright catch typos like
# ``d.gradient_stroe`` at static-analysis time. At runtime these are
# ``Any`` — the live objects come from the FastAPI / manager wiring.
from ledgrab.core.audio.audio_capture_manager import AudioCaptureManager
from ledgrab.core.game_integration.event_bus import GameEventBus
from ledgrab.core.home_assistant.ha_manager import HomeAssistantManager
from ledgrab.core.processing.color_strip_stream_manager import (
ColorStripStreamManager,
)
from ledgrab.core.processing.live_stream_manager import LiveStreamManager
from ledgrab.core.processing.sync_clock_manager import SyncClockRuntime
from ledgrab.storage.audio_processing_template_store import (
AudioProcessingTemplateStore,
)
from ledgrab.storage.audio_source_store import AudioSourceStore
from ledgrab.storage.audio_template_store import AudioTemplateStore
from ledgrab.storage.gradient_store import GradientStore
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
@dataclass(frozen=True)
class ValueStreamDeps:
"""Dependency bag for value-stream construction.
Builders only read the subset they need. ``value_stream_manager`` is the
one truly load-bearing field — it is referenced by
``GradientMapValueStream`` so it can recursively acquire its source
value stream.
``clock_runtime`` is pre-acquired by the manager exclusively for the
``animated_color`` kind (see :data:`NEEDS_CLOCK_RUNTIME`). The manager
owns the bookkeeping for tracking ``vs_id → clock_id`` so the builder
stays a pure ``(source, deps) -> stream`` mapping. If a new kind ever
grows a clock dependency, add it to ``NEEDS_CLOCK_RUNTIME`` AND surface
a separate field — sharing ``clock_runtime`` across kinds invites the
wrong runtime being passed to the wrong builder.
Field types are quoted so they stay informational under
``TYPE_CHECKING`` and the dataclass still accepts plain mocks at
runtime. Builders therefore get IDE/lint help against typos like
``d.gradient_stroe`` while production code remains duck-typed.
"""
value_stream_manager: "Any"
audio_capture_manager: Optional["AudioCaptureManager"] = None
audio_source_store: Optional["AudioSourceStore"] = None
audio_template_store: Optional["AudioTemplateStore"] = None
audio_processing_template_store: Optional["AudioProcessingTemplateStore"] = None
live_stream_manager: Optional["LiveStreamManager"] = None
ha_manager: Optional["HomeAssistantManager"] = None
gradient_store: Optional["GradientStore"] = None
css_stream_manager: Optional["ColorStripStreamManager"] = None
event_bus: Optional["GameEventBus"] = None
http_endpoint_store: Optional["HTTPEndpointStore"] = None
clock_runtime: Optional["SyncClockRuntime"] = None
# ---------------------------------------------------------------------------
# Per-kind builders
# ---------------------------------------------------------------------------
def _build_static(source, _d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import StaticValueStream
return StaticValueStream(value=source.value)
def _build_animated(source, _d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import AnimatedValueStream
return AnimatedValueStream(
waveform=source.waveform,
speed=source.speed,
min_value=source.min_value,
max_value=source.max_value,
)
def _build_audio(source, d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import AudioValueStream
return AudioValueStream(
audio_source_id=source.audio_source_id,
mode=source.mode,
sensitivity=source.sensitivity,
smoothing=source.smoothing,
min_value=source.min_value,
max_value=source.max_value,
auto_gain=source.auto_gain,
audio_capture_manager=d.audio_capture_manager,
audio_source_store=d.audio_source_store,
audio_template_store=d.audio_template_store,
audio_processing_template_store=d.audio_processing_template_store,
)
def _build_daylight(source, _d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import DaylightValueStream
return DaylightValueStream(
speed=source.speed,
use_real_time=source.use_real_time,
latitude=source.latitude,
longitude=source.longitude,
min_value=source.min_value,
max_value=source.max_value,
)
def _build_adaptive_time(source, _d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import TimeOfDayValueStream
return TimeOfDayValueStream(
schedule=source.schedule,
min_value=source.min_value,
max_value=source.max_value,
)
def _build_adaptive_scene(source, d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import SceneValueStream
return SceneValueStream(
picture_source_id=source.picture_source_id,
scene_behavior=source.scene_behavior,
sensitivity=source.sensitivity,
smoothing=source.smoothing,
min_value=source.min_value,
max_value=source.max_value,
live_stream_manager=d.live_stream_manager,
)
def _build_static_color(source, _d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import StaticColorValueStream
return StaticColorValueStream(color=source.color)
def _build_animated_color(source, d: ValueStreamDeps):
# See NEEDS_CLOCK_RUNTIME below: ``d.clock_runtime`` is pre-acquired by
# ``ValueStreamManager._create_stream`` exclusively for this kind. Any
# other builder that ever needs a clock should add its own deps field
# rather than read this one.
from ledgrab.core.processing.value_stream import AnimatedColorValueStream
return AnimatedColorValueStream(
colors=source.colors,
speed=source.speed,
easing=source.easing,
clock=d.clock_runtime,
)
def _build_adaptive_time_color(source, _d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import AdaptiveTimeColorValueStream
return AdaptiveTimeColorValueStream(schedule=source.schedule)
def _build_ha_entity(source, d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import HAEntityValueStream
return HAEntityValueStream(
ha_source_id=source.ha_source_id,
entity_id=source.entity_id,
attribute=source.attribute,
min_ha_value=source.min_ha_value,
max_ha_value=source.max_ha_value,
smoothing=source.smoothing,
ha_manager=d.ha_manager,
)
def _build_gradient_map(source, d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import GradientMapValueStream
return GradientMapValueStream(
value_source_id=source.value_source_id,
gradient_id=source.gradient_id,
easing=source.easing,
value_stream_manager=d.value_stream_manager,
gradient_store=d.gradient_store,
)
def _build_css_extract(source, d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import CSSExtractValueStream
return CSSExtractValueStream(
color_strip_source_id=source.color_strip_source_id,
led_start=source.led_start,
led_end=source.led_end,
css_stream_manager=d.css_stream_manager,
)
def _build_system_metrics(source, _d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import SystemMetricsValueStream
return SystemMetricsValueStream(
metric=source.metric,
min_value=source.min_value,
max_value=source.max_value,
max_rate=source.max_rate,
disk_path=source.disk_path,
sensor_label=source.sensor_label,
poll_interval=source.poll_interval,
smoothing=source.smoothing,
)
def _build_game_event(source, d: ValueStreamDeps):
# Late import: ``GameEventValueStream`` lives in a separate sub-package
# to keep the game-event subsystem (which transitively pulls in the
# game-integration adapters) off the cold-start path for installs that
# never use game events.
from ledgrab.core.value_sources.game_event_value_source import GameEventValueStream
return GameEventValueStream(
event_type=source.event_type,
min_game_value=source.min_game_value,
max_game_value=source.max_game_value,
smoothing=source.smoothing,
default_value=source.default_value,
timeout=source.timeout,
event_bus=d.event_bus,
)
def _build_http(source, d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import HTTPValueStream
return HTTPValueStream(
endpoint_id=source.http_endpoint_id,
json_path=source.json_path,
interval_s=source.interval_s,
min_value=source.min_value,
max_value=source.max_value,
smoothing=source.smoothing,
http_endpoint_store=d.http_endpoint_store,
)
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
StreamBuilder = Callable[[Any, ValueStreamDeps], Any]
STREAM_BUILDERS: dict[str, StreamBuilder] = {
"static": _build_static,
"animated": _build_animated,
"audio": _build_audio,
"daylight": _build_daylight,
"adaptive_time": _build_adaptive_time,
"adaptive_scene": _build_adaptive_scene,
"static_color": _build_static_color,
"animated_color": _build_animated_color,
"adaptive_time_color": _build_adaptive_time_color,
"ha_entity": _build_ha_entity,
"gradient_map": _build_gradient_map,
"css_extract": _build_css_extract,
"system_metrics": _build_system_metrics,
"game_event": _build_game_event,
"http": _build_http,
}
# ``animated_color`` is the only kind that needs a pre-acquired SyncClockRuntime
# from the manager; the rest derive everything they need from ``source`` and
# ``deps``. Exposing this set lets the manager perform the side-effecting
# acquisition step exactly once, before delegating to the registry.
NEEDS_CLOCK_RUNTIME: frozenset[str] = frozenset({"animated_color"})
def build_stream(source, deps: ValueStreamDeps):
"""Build a ValueStream for *source*.
Raises ``ValueError`` when no builder is registered for
``source.source_type``. Coverage is asserted at module import, so this
only fires for an in-flight instance whose ``source_type`` somehow
drifted from the registered set.
"""
builder = STREAM_BUILDERS.get(source.source_type)
if builder is None:
raise ValueError(
f"No value-stream builder for source_type {source.source_type!r} "
f"(id={getattr(source, 'id', '?')!r})"
)
return builder(source, deps)
def _assert_value_kind_coverage() -> None:
"""Verify the registry and storage's ``_VALUE_SOURCE_MAP`` agree.
Runs at module import. Symmetric: every kind in storage must have a
builder, and every builder must correspond to a real storage kind.
Also asserts ``NEEDS_CLOCK_RUNTIME`` only names kinds that exist in the
registry, so a typo there fails the boot loudly instead of silently
leaking a ``SyncClockRuntime`` acquisition.
"""
from ledgrab.storage.value_source import _VALUE_SOURCE_MAP
storage_kinds = set(_VALUE_SOURCE_MAP.keys())
builder_kinds = set(STREAM_BUILDERS.keys())
missing = storage_kinds - builder_kinds
extra = builder_kinds - storage_kinds
if missing or extra:
problems = []
if missing:
problems.append(f"missing builders: {sorted(missing)}")
if extra:
problems.append(f"unregistered kinds: {sorted(extra)}")
raise RuntimeError(
"value_kinds.STREAM_BUILDERS is out of sync with storage._VALUE_SOURCE_MAP: "
+ "; ".join(problems)
)
rogue_clock_kinds = NEEDS_CLOCK_RUNTIME - builder_kinds
if rogue_clock_kinds:
raise RuntimeError(
"value_kinds.NEEDS_CLOCK_RUNTIME names kinds with no registered "
f"builder: {sorted(rogue_clock_kinds)}"
)
_assert_value_kind_coverage()
+261 -141
View File
@@ -21,7 +21,9 @@ ValueStreamManager owns all running ValueStreams, keyed by
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import json
import math import math
import re
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime from datetime import datetime
@@ -29,8 +31,13 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
import numpy as np import numpy as np
from ledgrab.storage.base_store import EntityNotFoundError
from ledgrab.utils import get_logger from ledgrab.utils import get_logger
# Compiled once — used by ``_extract_simple_path`` on every poll.
_NAME_HEAD_RE = re.compile(r"^([^\[]*)")
_INDEX_RE = re.compile(r"^\[(\d+)\]")
if TYPE_CHECKING: if TYPE_CHECKING:
from ledgrab.core.audio.audio_capture import AudioCaptureManager from ledgrab.core.audio.audio_capture import AudioCaptureManager
from ledgrab.core.game_integration.event_bus import GameEventBus from ledgrab.core.game_integration.event_bus import GameEventBus
@@ -39,6 +46,7 @@ if TYPE_CHECKING:
from ledgrab.core.processing.live_stream_manager import LiveStreamManager from ledgrab.core.processing.live_stream_manager import LiveStreamManager
from ledgrab.core.processing.sync_clock_manager import SyncClockManager from ledgrab.core.processing.sync_clock_manager import SyncClockManager
from ledgrab.storage.audio_source_store import AudioSourceStore from ledgrab.storage.audio_source_store import AudioSourceStore
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
from ledgrab.storage.value_source import ValueSource from ledgrab.storage.value_source import ValueSource
from ledgrab.storage.value_source_store import ValueSourceStore from ledgrab.storage.value_source_store import ValueSourceStore
@@ -1017,6 +1025,211 @@ class HAEntityValueStream(ValueStream):
logger.warning("HAEntityValueStream failed to swap HA runtime: %s", e) logger.warning("HAEntityValueStream failed to swap HA runtime: %s", e)
# ---------------------------------------------------------------------------
# HTTP poll
# ---------------------------------------------------------------------------
class HTTPValueStream(ValueStream):
"""Periodically polls an HTTPEndpoint and extracts a value via json_path.
Exposes two accessors:
- ``get_value()`` returns a normalized float in [0, 1] for use as a
modulator (brightness, color, etc.). The raw extracted value is
coerced to float; non-numeric values yield 0.0.
- ``get_raw_value()`` returns the un-normalized extracted value
(str / int / float / bool / None) for consumers that need it
verbatim — e.g. an automation rule comparing ``"playing"``.
"""
def __init__(
self,
endpoint_id: str,
json_path: str,
interval_s: int,
min_value: float,
max_value: float,
smoothing: float,
http_endpoint_store: Optional["HTTPEndpointStore"] = None,
) -> None:
self._endpoint_id = endpoint_id
self._json_path = json_path
self._interval_s = max(1, int(interval_s))
self._min_value = min_value
self._max_value = max_value
self._smoothing = smoothing
self._http_endpoint_store = http_endpoint_store
self._task: Optional[asyncio.Task] = None
self._raw_value: Any = None
self._prev_normalized: Optional[float] = None
# Kept as private attrs for internal/log diagnostics; not exposed via
# public properties or API until a status endpoint consumes them.
self._last_fetched_at: Optional[datetime] = None
self._last_status_code: Optional[int] = None
self._last_error: Optional[str] = None
def start(self) -> None:
if self._task is not None:
return
if not self._endpoint_id or self._http_endpoint_store is None:
return
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop — can't poll. Construction in a sync test
# context is fine; the stream just stays idle until started
# from an async context.
return
self._task = loop.create_task(self._poll_loop())
def stop(self) -> None:
task = self._task
self._task = None
if task is not None:
task.cancel()
def get_value(self) -> float:
raw = self._raw_value
if raw is None:
return self._prev_normalized if self._prev_normalized is not None else 0.0
try:
numeric = float(raw)
except (TypeError, ValueError):
return self._prev_normalized if self._prev_normalized is not None else 0.0
rng = self._max_value - self._min_value
if abs(rng) < 1e-9:
normalized = 0.5
else:
normalized = (numeric - self._min_value) / rng
normalized = max(0.0, min(1.0, normalized))
if self._smoothing > 0.0 and self._prev_normalized is not None:
normalized = (
self._smoothing * self._prev_normalized + (1.0 - self._smoothing) * normalized
)
self._prev_normalized = normalized
return normalized
def get_raw_value(self) -> Any:
"""Return the last raw extracted value (string, int, float, etc.)."""
return self._raw_value
def update_source(self, source: "ValueSource") -> None:
from ledgrab.storage.value_source import HTTPValueSource
if not isinstance(source, HTTPValueSource):
return
self._endpoint_id = source.http_endpoint_id
self._json_path = source.json_path
self._interval_s = max(1, int(source.interval_s))
self._min_value = source.min_value
self._max_value = source.max_value
self._smoothing = source.smoothing
async def _poll_loop(self) -> None:
from ledgrab.utils.safe_source import safe_request_bounded
try:
while True:
try:
endpoint = self._http_endpoint_store.get(self._endpoint_id)
except EntityNotFoundError:
# The endpoint was deleted out from under us. Stop the
# poll task so it doesn't spin forever; the next entity
# event (or the value source being deleted) will tidy
# the rest of the bookkeeping.
logger.warning(
"HTTPValueStream stopping: endpoint %s no longer exists",
self._endpoint_id,
)
self._last_error = "endpoint_deleted"
self._raw_value = None
self._task = None
return
except Exception as exc:
self._last_error = f"Endpoint lookup failed: {type(exc).__name__}"
self._raw_value = None
await asyncio.sleep(self._interval_s)
continue
headers = endpoint.build_request_headers()
try:
status, body_bytes, error = await safe_request_bounded(
endpoint.method,
endpoint.url,
headers=headers,
timeout=endpoint.timeout_s,
)
except Exception as exc:
# safe_request_bounded raises HTTPException on URL
# validation failure; treat that as a recoverable poll
# error and try again next cycle.
self._last_status_code = None
self._last_error = f"URL validation failed: {type(exc).__name__}"
self._raw_value = None
await asyncio.sleep(self._interval_s)
continue
self._last_status_code = status if status else None
self._last_error = error
if not error and status:
try:
body_text = body_bytes.decode("utf-8", errors="replace")
except Exception:
body_text = ""
body_json: Any
try:
body_json = json.loads(body_text) if body_text else None
except (ValueError, TypeError):
body_json = None
self._raw_value = _extract_simple_path(body_json, self._json_path, body_text)
else:
self._raw_value = None
self._last_fetched_at = datetime.now()
await asyncio.sleep(self._interval_s)
except asyncio.CancelledError:
raise
def _extract_simple_path(body_json: Any, path: str, body_text: str) -> Any:
"""Extract a value via a dot-path (with optional ``[N]`` indices).
Uses module-level compiled regexes so repeated polls don't recompile.
Returns ``None`` when the path doesn't resolve — runtime callers just
need "the value, or nothing." Empty path returns the raw body text so
plain-text endpoints work too.
"""
if not path:
return body_text or None
if body_json is None:
return None
current: Any = body_json
for raw_segment in path.split("."):
segment = raw_segment.strip()
if not segment:
continue
name_match = _NAME_HEAD_RE.match(segment)
name_part = name_match.group(1) if name_match else ""
remainder = segment[len(name_part) :]
if name_part:
if not isinstance(current, dict) or name_part not in current:
return None
current = current[name_part]
while remainder:
idx_match = _INDEX_RE.match(remainder)
if not idx_match:
return None
idx = int(idx_match.group(1))
if not isinstance(current, list) or idx < 0 or idx >= len(current):
return None
current = current[idx]
remainder = remainder[idx_match.end() :]
return current
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Gradient Map # Gradient Map
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -1547,6 +1760,7 @@ class ValueStreamManager:
event_bus: Optional["GameEventBus"] = None, event_bus: Optional["GameEventBus"] = None,
audio_processing_template_store=None, audio_processing_template_store=None,
sync_clock_manager: Optional["SyncClockManager"] = None, sync_clock_manager: Optional["SyncClockManager"] = None,
http_endpoint_store: Optional["HTTPEndpointStore"] = None,
): ):
self._value_source_store = value_source_store self._value_source_store = value_source_store
self._audio_capture_manager = audio_capture_manager self._audio_capture_manager = audio_capture_manager
@@ -1559,6 +1773,7 @@ class ValueStreamManager:
self._event_bus = event_bus self._event_bus = event_bus
self._audio_processing_template_store = audio_processing_template_store self._audio_processing_template_store = audio_processing_template_store
self._sync_clock_manager = sync_clock_manager self._sync_clock_manager = sync_clock_manager
self._http_endpoint_store = http_endpoint_store
self._streams: Dict[str, ValueStream] = {} # vs_id → stream self._streams: Dict[str, ValueStream] = {} # vs_id → stream
self._ref_counts: Dict[str, int] = {} # vs_id → ref count self._ref_counts: Dict[str, int] = {} # vs_id → ref count
# Tracks which clock_id (if any) was acquired for each stream so we # Tracks which clock_id (if any) was acquired for each stream so we
@@ -1602,6 +1817,17 @@ class ValueStreamManager:
else: else:
logger.info(f"Released ref for value stream {vs_id} (refs={refs})") logger.info(f"Released ref for value stream {vs_id} (refs={refs})")
def peek(self, vs_id: str) -> Optional[ValueStream]:
"""Read-only accessor: return the running ValueStream for ``vs_id``
if one exists, else ``None``.
Does NOT change ref counts. Use for consumer-driven reads where the
caller already holds a reference via :meth:`acquire` (e.g. the
:class:`AutomationEngine` evaluating an ``HTTPPollRule`` against a
value source it has already acquired in ``_sync_value_stream_refs``).
"""
return self._streams.get(vs_id)
def update_source(self, vs_id: str) -> None: def update_source(self, vs_id: str) -> None:
"""Hot-update the shared stream for the given ValueSource.""" """Hot-update the shared stream for the given ValueSource."""
try: try:
@@ -1699,158 +1925,52 @@ class ValueStreamManager:
logger.info("Released all value streams") logger.info("Released all value streams")
def _create_stream(self, source: "ValueSource", vs_id: Optional[str] = None) -> ValueStream: def _create_stream(self, source: "ValueSource", vs_id: Optional[str] = None) -> ValueStream:
"""Factory: create the appropriate ValueStream for a ValueSource.""" """Build a ValueStream for *source* via the central kind registry.
from ledgrab.storage.value_source import (
AdaptiveValueSource, The 14-branch ``isinstance`` ladder this method used to host was the
AnimatedValueSource, canonical example of the parallel-change smell flagged in the
AudioValueSource, architecture audit. The actual per-kind construction now lives in
CSSExtractValueSource, ``ledgrab.core.processing.value_kinds.STREAM_BUILDERS``, keyed by
DaylightValueSource, ``source.source_type``. This method only handles the manager-side
GameEventValueSource, bookkeeping that does not fit a uniform builder signature — namely
GradientMapValueSource, the optional :class:`SyncClockRuntime` acquisition for
HAEntityValueSource, ``animated_color`` sources, whose ``vs_id → clock_id`` mapping the
StaticValueSource, manager owns for symmetric release at teardown.
StaticColorValueSource, """
AnimatedColorValueSource, from ledgrab.core.processing.value_kinds import (
AdaptiveTimeColorValueSource, NEEDS_CLOCK_RUNTIME,
SystemMetricsValueSource, ValueStreamDeps,
build_stream,
) )
if isinstance(source, StaticValueSource): clock_runtime = None
return StaticValueStream(value=source.value) if source.source_type in NEEDS_CLOCK_RUNTIME:
clock_id = getattr(source, "clock_id", None)
if isinstance(source, AnimatedValueSource): if clock_id and self._sync_clock_manager:
return AnimatedValueStream( try:
waveform=source.waveform, clock_runtime = self._sync_clock_manager.acquire(clock_id)
speed=source.speed, if vs_id is not None:
min_value=source.min_value, self._stream_clock_ids[vs_id] = clock_id
max_value=source.max_value, except Exception as e:
logger.warning(
"Could not acquire sync clock %s for value source %s: %s",
clock_id,
source.id,
e,
) )
if isinstance(source, AudioValueSource): deps = ValueStreamDeps(
return AudioValueStream( value_stream_manager=self,
audio_source_id=source.audio_source_id,
mode=source.mode,
sensitivity=source.sensitivity,
smoothing=source.smoothing,
min_value=source.min_value,
max_value=source.max_value,
auto_gain=source.auto_gain,
audio_capture_manager=self._audio_capture_manager, audio_capture_manager=self._audio_capture_manager,
audio_source_store=self._audio_source_store, audio_source_store=self._audio_source_store,
audio_template_store=self._audio_template_store, audio_template_store=self._audio_template_store,
audio_processing_template_store=self._audio_processing_template_store, audio_processing_template_store=self._audio_processing_template_store,
)
if isinstance(source, DaylightValueSource):
return DaylightValueStream(
speed=source.speed,
use_real_time=source.use_real_time,
latitude=source.latitude,
longitude=source.longitude,
min_value=source.min_value,
max_value=source.max_value,
)
if isinstance(source, AdaptiveValueSource):
if source.source_type == "adaptive_scene":
return SceneValueStream(
picture_source_id=source.picture_source_id,
scene_behavior=source.scene_behavior,
sensitivity=source.sensitivity,
smoothing=source.smoothing,
min_value=source.min_value,
max_value=source.max_value,
live_stream_manager=self._live_stream_manager, live_stream_manager=self._live_stream_manager,
)
return TimeOfDayValueStream(
schedule=source.schedule,
min_value=source.min_value,
max_value=source.max_value,
)
# Color streams
if isinstance(source, StaticColorValueSource):
return StaticColorValueStream(color=source.color)
if isinstance(source, AnimatedColorValueSource):
clock_runtime = None
if source.clock_id and self._sync_clock_manager:
try:
clock_runtime = self._sync_clock_manager.acquire(source.clock_id)
if vs_id is not None:
self._stream_clock_ids[vs_id] = source.clock_id
except Exception as e:
logger.warning(
"Could not acquire sync clock %s for value source %s: %s",
source.clock_id,
source.id,
e,
)
return AnimatedColorValueStream(
colors=source.colors,
speed=source.speed,
easing=source.easing,
clock=clock_runtime,
)
if isinstance(source, AdaptiveTimeColorValueSource):
return AdaptiveTimeColorValueStream(schedule=source.schedule)
if isinstance(source, HAEntityValueSource):
return HAEntityValueStream(
ha_source_id=source.ha_source_id,
entity_id=source.entity_id,
attribute=source.attribute,
min_ha_value=source.min_ha_value,
max_ha_value=source.max_ha_value,
smoothing=source.smoothing,
ha_manager=self._ha_manager, ha_manager=self._ha_manager,
)
if isinstance(source, GradientMapValueSource):
return GradientMapValueStream(
value_source_id=source.value_source_id,
gradient_id=source.gradient_id,
easing=source.easing,
value_stream_manager=self,
gradient_store=self._gradient_store, gradient_store=self._gradient_store,
)
if isinstance(source, CSSExtractValueSource):
return CSSExtractValueStream(
color_strip_source_id=source.color_strip_source_id,
led_start=source.led_start,
led_end=source.led_end,
css_stream_manager=self._css_stream_manager, css_stream_manager=self._css_stream_manager,
)
if isinstance(source, SystemMetricsValueSource):
return SystemMetricsValueStream(
metric=source.metric,
min_value=source.min_value,
max_value=source.max_value,
max_rate=source.max_rate,
disk_path=source.disk_path,
sensor_label=source.sensor_label,
poll_interval=source.poll_interval,
smoothing=source.smoothing,
)
if isinstance(source, GameEventValueSource):
from ledgrab.core.value_sources.game_event_value_source import (
GameEventValueStream,
)
return GameEventValueStream(
event_type=source.event_type,
min_game_value=source.min_game_value,
max_game_value=source.max_game_value,
smoothing=source.smoothing,
default_value=source.default_value,
timeout=source.timeout,
event_bus=self._event_bus, event_bus=self._event_bus,
http_endpoint_store=self._http_endpoint_store,
clock_runtime=clock_runtime,
) )
return build_stream(source, deps)
# Fallback
return StaticValueStream(value=1.0)
@@ -32,6 +32,15 @@ class BaseSqliteStore(Generic[T]):
self._items: Dict[str, T] = {} self._items: Dict[str, T] = {}
self._deserializer = deserializer self._deserializer = deserializer
self._lock = threading.RLock() self._lock = threading.RLock()
# Apply pending JSON-blob data migrations before loading rows so the
# in-memory cache reflects the canonical, post-migration shape. The
# runner is idempotent across stores — it consults a dedicated
# ``data_migrations`` audit table — so each store construction may
# invoke it without re-running already-applied migrations.
# Imported lazily to avoid a circular dependency at module load.
from ledgrab.storage.data_migrations import ALL_MIGRATIONS, MigrationRunner
MigrationRunner(db).run(ALL_MIGRATIONS)
self._load() self._load()
# -- I/O ----------------------------------------------------------------- # -- I/O -----------------------------------------------------------------
@@ -23,7 +23,13 @@ MAX_COMPOSITE_DEPTH = 4
class ColorStripStore(BaseSqliteStore[ColorStripSource]): class ColorStripStore(BaseSqliteStore[ColorStripSource]):
"""Persistent storage for color strip sources.""" """Persistent storage for color strip sources.
JSON-blob field renames (e.g. legacy ``source_type='static'`` →
``'single_color'``) are handled by the central
:mod:`ledgrab.storage.data_migrations` runner, which executes once per
database from :meth:`Database._ensure_schema`.
"""
_table_name = "color_strip_sources" _table_name = "color_strip_sources"
_entity_name = "Color strip source" _entity_name = "Color strip source"
@@ -0,0 +1,219 @@
"""Versioned data migrations for stored JSON entities.
Each migration is a small class with a stable ``name`` (used as the idempotency
key) and an ``apply(db)`` method that performs the change inside a transaction.
The ``MigrationRunner`` keeps a ``data_migrations`` table that records which
migrations have already executed; subsequent runs are a no-op.
This replaces ad-hoc per-store migrations that were rewriting raw JSON via
``str.replace`` — that approach corrupted nested fields whose values happened
to share a substring with the renamed key, and had no transaction or audit.
Adding a new migration
----------------------
1. Subclass :class:`DataMigration`, give it a unique ``name`` (prefix with the
next sequence number, e.g. ``"002_..."``) and implement ``apply``.
2. Append it to :data:`ALL_MIGRATIONS` in commit order. Never reorder existing
entries — the runner records each by name.
3. The first ``ColorStripStore`` (or any other store) construction triggers the
runner; nothing else has to change.
"""
from __future__ import annotations
import json
import sqlite3
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timezone
from ledgrab.storage.database import Database, is_writes_frozen
from ledgrab.utils import get_logger
logger = get_logger(__name__)
class DataMigration(ABC):
"""One JSON-blob migration step.
Subclasses must declare a class-level ``name`` and implement ``apply``.
Implementations must operate on the supplied ``conn`` (an already-open
transaction). They MUST NOT commit or open nested transactions — the
:class:`MigrationRunner` owns the transaction so the data UPDATEs and the
audit-table INSERT are atomic.
"""
name: str = ""
@abstractmethod
def apply(self, conn: sqlite3.Connection) -> int:
"""Perform the migration inside the supplied transaction.
Return the number of rows changed.
"""
@dataclass(frozen=True)
class MigrationRecord:
"""A row in the ``data_migrations`` audit table."""
name: str
applied_at: str
rows_changed: int
class MigrationRunner:
"""Apply pending data migrations exactly once per database.
Each call to :meth:`run` opens a single transaction that covers (a) the
"is this migration already applied?" check, (b) the migration body, and
(c) the audit INSERT. That guarantees:
* a partial-failure cannot leave data rewritten but unrecorded;
* concurrent stores constructing on multiple threads cannot race each
other into a UNIQUE-constraint crash on the audit table.
The runner skips silently when writes are frozen so a post-restore boot
does not mutate the freshly-restored database before the imminent restart.
"""
_TABLE = "data_migrations"
def __init__(self, db: Database):
self._db = db
self._ensure_table()
def _ensure_table(self) -> None:
# CREATE TABLE is idempotent thanks to IF NOT EXISTS; running on every
# startup is cheap. ``Database.execute`` does NOT honour the
# ``is_writes_frozen`` guard (DDL is always permitted), so the audit
# table reliably exists even on a frozen boot.
self._db.execute(
f"""
CREATE TABLE IF NOT EXISTS {self._TABLE} (
name TEXT PRIMARY KEY,
applied_at TEXT NOT NULL,
rows_changed INTEGER NOT NULL DEFAULT 0
)
"""
)
def applied_names(self) -> set[str]:
"""Return the set of migration names already recorded as applied."""
cursor = self._db.execute(f"SELECT name FROM {self._TABLE}")
return {row["name"] for row in cursor.fetchall()}
def run(self, migrations: list[DataMigration]) -> list[MigrationRecord]:
"""Apply each migration in *migrations* that hasn't been recorded yet.
Returns the records for migrations that actually executed this call.
Returns an empty list when writes are frozen.
"""
if is_writes_frozen():
# Frozen-write databases are read-only between a restore and the
# imminent restart. Surface this clearly because the in-memory
# state may briefly reflect pre-migration data.
logger.warning("Data migrations skipped: writes are frozen (restart expected).")
return []
# Validate names up-front so an unnamed migration aborts before any
# transaction work.
for migration in migrations:
if not migration.name:
raise ValueError(
f"DataMigration {type(migration).__name__} must declare a non-empty name"
)
executed: list[MigrationRecord] = []
for migration in migrations:
# Use a separate transaction per migration so a failure rolls back
# only the failing one and any earlier (already-recorded)
# migrations keep their audit rows.
with self._db.transaction() as conn:
already_applied = conn.execute(
f"SELECT 1 FROM {self._TABLE} WHERE name = ?",
(migration.name,),
).fetchone()
if already_applied:
continue
logger.info("Applying data migration: %s", migration.name)
rows_changed = int(migration.apply(conn))
applied_at = datetime.now(timezone.utc).isoformat()
# INSERT OR IGNORE defends against a hypothetical concurrent
# writer that recorded the same name between the SELECT above
# and this INSERT (the RLock serialises this in practice, but
# the constraint is the durable contract).
conn.execute(
f"INSERT OR IGNORE INTO {self._TABLE} "
f"(name, applied_at, rows_changed) VALUES (?, ?, ?)",
(migration.name, applied_at, rows_changed),
)
executed.append(
MigrationRecord(
name=migration.name,
applied_at=applied_at,
rows_changed=rows_changed,
)
)
if rows_changed:
logger.warning("Migration %s rewrote %d row(s)", migration.name, rows_changed)
if executed:
logger.info(
"Applied %d data migration(s): %s",
len(executed),
", ".join(r.name for r in executed),
)
return executed
# ---------------------------------------------------------------------------
# Concrete migrations
# ---------------------------------------------------------------------------
class StaticToSingleColorMigration(DataMigration):
"""Rename the legacy ``source_type='static'`` color-strip kind to ``single_color``.
The pre-rename rows in ``color_strip_sources`` used ``source_type='static'``;
the new canonical value is ``single_color``. The previous in-line string
replace risked rewriting unrelated substrings (e.g. an animation type named
``static_wave``). This migration parses the JSON, mutates the single field
we care about, and writes the canonical serialization back. Runs inside
the runner's transaction.
"""
name = "001_color_strip_static_to_single_color"
def apply(self, conn: sqlite3.Connection) -> int:
rows_changed = 0
rows = conn.execute("SELECT id, data FROM [color_strip_sources]").fetchall()
for row in rows:
blob = row["data"]
try:
parsed = json.loads(blob)
except json.JSONDecodeError as e:
# A corrupt blob is a pre-existing problem; do not crash the
# migration. The store's own load path will surface it.
logger.warning(
"Skipping corrupt color_strip_sources row %s during migration: %s",
row["id"],
e,
)
continue
if parsed.get("source_type") != "static":
continue
parsed["source_type"] = "single_color"
conn.execute(
"UPDATE [color_strip_sources] SET data = ? WHERE id = ?",
(json.dumps(parsed, ensure_ascii=False), row["id"]),
)
rows_changed += 1
return rows_changed
# Master list — ORDER MATTERS. Append new migrations; never reorder.
ALL_MIGRATIONS: list[DataMigration] = [
StaticToSingleColorMigration(),
]
@@ -0,0 +1,85 @@
"""Regression tests for the CSS response builder dispatch.
These lock in the safety net introduced when the silent "fallback to a fake
PictureCSSResponse" branch was removed:
* every concrete ColorStripSource subclass registered in ``_SOURCE_TYPE_MAP``
must have a builder in ``_RESPONSE_MAP`` (verified at module import); and
* the previously-missing ``GameEventColorStripSource`` round-trips through
the response builder into the matching ``GameEventCSSResponse`` schema.
"""
from __future__ import annotations
from datetime import datetime, timezone
import pytest
from ledgrab.api.routes.color_strip_sources import _helpers
from ledgrab.api.schemas.color_strip_sources import GameEventCSSResponse
from ledgrab.storage.color_strip_source import (
ColorStripSource,
GameEventColorStripSource,
)
def test_response_map_covers_every_registered_source_type():
"""Every source_type in _SOURCE_TYPE_MAP has a response builder."""
storage_kinds = set(_helpers._STORAGE_TYPE_MAP.keys())
builder_kinds = set(_helpers._RESPONSE_MAP.keys())
missing = storage_kinds - builder_kinds
assert not missing, f"_RESPONSE_MAP is missing builders for: {sorted(missing)}"
def test_assert_helper_raises_when_a_kind_is_unregistered(monkeypatch):
"""Adding a kind to the storage registry without a builder is caught."""
class _PhantomColorStripSource(ColorStripSource):
pass
monkeypatch.setitem(_helpers._STORAGE_TYPE_MAP, "phantom", _PhantomColorStripSource)
with pytest.raises(RuntimeError, match="phantom"):
_helpers._assert_response_map_coverage()
def test_game_event_source_serialises_to_game_event_response():
"""A GameEventColorStripSource is rendered as GameEventCSSResponse (not the
fake PictureCSSResponse that the silent fallback used to return).
"""
now = datetime.now(timezone.utc)
source = GameEventColorStripSource(
id="css_gev0001",
name="Counter-Strike feedback",
source_type="game_event",
created_at=now,
updated_at=now,
game_integration_id="gi_cs2_main",
event_mappings=[{"event": "hit", "effect": "flash"}],
led_count=60,
)
response = _helpers._css_to_response(source)
assert isinstance(response, GameEventCSSResponse)
assert response.source_type == "game_event"
assert response.game_integration_id == "gi_cs2_main"
assert response.event_mappings == [{"event": "hit", "effect": "flash"}]
assert response.led_count == 60
def test_unregistered_source_type_raises_in_css_to_response():
"""Reaching ``_css_to_response`` with an unmapped source_type raises loudly."""
class _UnregisteredCSS(ColorStripSource):
pass
now = datetime.now(timezone.utc)
source = _UnregisteredCSS(
id="css_xxx",
name="rogue",
source_type="rogue_unregistered",
created_at=now,
updated_at=now,
)
with pytest.raises(RuntimeError, match="No CSS response builder"):
_helpers._css_to_response(source)
@@ -0,0 +1,97 @@
"""Tests for the unified color-strip stream-builder registry."""
from __future__ import annotations
import pytest
from ledgrab.core.processing import color_strip_kinds
from ledgrab.core.processing.color_strip_kinds import (
SHARABLE_KINDS,
STREAM_BUILDERS,
StreamDeps,
build_stream,
)
from ledgrab.storage.color_strip_source import _SOURCE_TYPE_MAP
def test_stream_builders_partition_storage_kinds_with_sharable():
"""STREAM_BUILDERS SHARABLE_KINDS == storage source_types."""
storage_kinds = set(_SOURCE_TYPE_MAP.keys())
covered = set(STREAM_BUILDERS.keys()) | SHARABLE_KINDS
assert (
storage_kinds == covered
), f"missing={storage_kinds - covered}, extra={covered - storage_kinds}"
def test_sharable_kinds_list_matches_actual_sharable_property():
"""The hand-coded SHARABLE_KINDS list mirrors the `sharable` @property values.
Every ColorStripSource subclass currently implements ``sharable`` as a
literal ``return True``/``return False`` that does not touch ``self``,
so ``fget(None)`` is a safe introspection. If a future subclass changes
that contract — e.g. by computing sharability from instance state — this
test will raise a clear error rather than silently misclassify the kind,
forcing the maintainer to update both ``SHARABLE_KINDS`` and this test.
"""
def _is_sharable(name: str, cls) -> bool:
prop = getattr(cls, "sharable", None)
if not isinstance(prop, property) or prop.fget is None:
raise AssertionError(
f"{cls.__name__} (source_type={name!r}) does not expose `sharable` "
f"as a @property; SHARABLE_KINDS introspection no longer works."
)
try:
return bool(prop.fget(None))
except (AttributeError, TypeError) as e:
raise AssertionError(
f"{cls.__name__}.sharable now depends on `self` ({e}); update "
"this test and SHARABLE_KINDS together."
) from e
actually_sharable = {name for name, cls in _SOURCE_TYPE_MAP.items() if _is_sharable(name, cls)}
assert actually_sharable == set(SHARABLE_KINDS), (
"SHARABLE_KINDS drifted from the @property values: "
f"actually_sharable={sorted(actually_sharable)}, "
f"SHARABLE_KINDS={sorted(SHARABLE_KINDS)}"
)
def test_build_stream_raises_on_unknown_kind():
class _FakeSource:
source_type = "totally_unregistered_kind"
id = "fake"
deps = StreamDeps(css_manager=None)
with pytest.raises(ValueError, match="totally_unregistered_kind"):
build_stream(_FakeSource(), deps)
def test_coverage_assertion_raises_on_drift(monkeypatch):
"""Adding a kind to storage's _SOURCE_TYPE_MAP without a builder fails fast."""
monkeypatch.setitem(_SOURCE_TYPE_MAP, "phantom", object)
with pytest.raises(RuntimeError, match="phantom"):
color_strip_kinds._assert_stream_kind_coverage()
def test_legacy_static_alias_resolves_to_single_color_builder():
"""The legacy 'static' source_type still maps to the single_color builder."""
assert STREAM_BUILDERS["static"] is not None
assert STREAM_BUILDERS["single_color"] is not None
# The two route to the same loader, but builders are distinct closures —
# call them with a tiny source stub and confirm same class.
from ledgrab.core.processing.color_strip_stream import SingleColorStripStream
from ledgrab.storage.color_strip_source import SingleColorStripSource
from datetime import datetime, timezone
src = SingleColorStripSource(
id="t",
name="t",
source_type="static",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
deps = StreamDeps(css_manager=None)
s = STREAM_BUILDERS["static"](src, deps)
assert isinstance(s, SingleColorStripStream)
@@ -0,0 +1,64 @@
"""Tests for the unified value-stream-builder registry."""
from __future__ import annotations
import pytest
from ledgrab.core.processing import value_kinds
from ledgrab.core.processing.value_kinds import (
NEEDS_CLOCK_RUNTIME,
STREAM_BUILDERS,
ValueStreamDeps,
build_stream,
)
from ledgrab.storage.value_source import _VALUE_SOURCE_MAP
def test_builders_cover_every_storage_kind():
"""STREAM_BUILDERS keys == storage._VALUE_SOURCE_MAP keys."""
storage_kinds = set(_VALUE_SOURCE_MAP.keys())
builder_kinds = set(STREAM_BUILDERS.keys())
assert storage_kinds == builder_kinds, (
f"missing={storage_kinds - builder_kinds}, " f"extra={builder_kinds - storage_kinds}"
)
def test_build_stream_raises_on_unknown_kind():
class _Fake:
source_type = "totally_unregistered"
id = "fake"
deps = ValueStreamDeps(value_stream_manager=None)
with pytest.raises(ValueError, match="totally_unregistered"):
build_stream(_Fake(), deps)
def test_coverage_assertion_raises_on_drift(monkeypatch):
"""A kind added to storage without a builder fails the import-time check."""
monkeypatch.setitem(_VALUE_SOURCE_MAP, "phantom_kind", object)
with pytest.raises(RuntimeError, match="phantom_kind"):
value_kinds._assert_value_kind_coverage()
def test_needs_clock_runtime_is_subset_of_registered_kinds():
assert NEEDS_CLOCK_RUNTIME.issubset(set(STREAM_BUILDERS.keys()))
def test_static_builder_returns_static_value_stream():
from ledgrab.core.processing.value_stream import StaticValueStream
from ledgrab.storage.value_source import StaticValueSource
from datetime import datetime, timezone
src = StaticValueSource(
id="vs_t",
name="t",
source_type="static",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
value=0.42,
)
deps = ValueStreamDeps(value_stream_manager=None)
stream = build_stream(src, deps)
assert isinstance(stream, StaticValueStream)
# Sanity: the value flowed through
assert stream.get_value() == 0.42
@@ -0,0 +1,90 @@
"""Tests for ColorStripStore startup migrations."""
import json
import pytest
from ledgrab.storage.color_strip_source import SingleColorStripSource
from ledgrab.storage.color_strip_store import ColorStripStore
from ledgrab.storage.database import Database
@pytest.fixture
def tmp_db(tmp_path):
db = Database(tmp_path / "test.db")
yield db
db.close()
def _insert_legacy_row(db: Database, item_id: str, name: str, color: list[int]) -> dict:
"""Insert a pre-rename color strip row with ``source_type='static'``."""
from datetime import datetime, timezone
now = datetime.now(timezone.utc).isoformat()
blob = {
"id": item_id,
"name": name,
"source_type": "static",
"color": color,
"animation": None,
"tags": [],
"led_count": 30,
"created_at": now,
"updated_at": now,
"icon": "",
"icon_color": "",
}
db.upsert("color_strip_sources", item_id, name, blob)
return blob
def test_legacy_static_row_is_rewritten_to_single_color(tmp_db):
"""A row with ``source_type='static'`` is rewritten on first store load."""
_insert_legacy_row(tmp_db, "css_legacy01", "Legacy Solid", [255, 0, 0])
# Sanity: legacy blob really is on disk
raw_before = tmp_db.load_all("color_strip_sources")
assert raw_before[0]["source_type"] == "static"
store = ColorStripStore(tmp_db)
# In-memory item is a SingleColorStripSource with the canonical type
item = store.get("css_legacy01")
assert isinstance(item, SingleColorStripSource)
assert item.source_type == "single_color"
# The on-disk row was rewritten too — no second migration on next boot
raw_after = tmp_db.load_all("color_strip_sources")
assert raw_after[0]["source_type"] == "single_color"
def test_already_migrated_row_is_left_alone(tmp_db):
"""Rows already using ``single_color`` are not touched."""
from datetime import datetime, timezone
now = datetime.now(timezone.utc).isoformat()
blob = {
"id": "css_new01",
"name": "Already Migrated",
"source_type": "single_color",
"color": [0, 128, 255],
"animation": None,
"tags": [],
"led_count": 30,
"created_at": now,
"updated_at": now,
"icon": "",
"icon_color": "",
}
tmp_db.upsert("color_strip_sources", "css_new01", "Already Migrated", blob)
raw_before = tmp_db.load_all("color_strip_sources")
before_json = json.dumps(raw_before[0], sort_keys=True)
store = ColorStripStore(tmp_db)
item = store.get("css_new01")
assert isinstance(item, SingleColorStripSource)
raw_after = tmp_db.load_all("color_strip_sources")
after_json = json.dumps(raw_after[0], sort_keys=True)
assert before_json == after_json
@@ -0,0 +1,309 @@
"""Tests for the versioned data-migration runner.
These verify the safety properties that replaced the string-replace migration:
* migrations parse JSON properly (no substring corruption);
* the runner records each migration name in ``data_migrations`` so subsequent
runs are no-ops;
* a migration that touches no rows still gets recorded as applied;
* a migration that raises does not get recorded (so a fixed migration can be
re-attempted on the next boot);
* the legacy ``static`` rename only rewrites rows whose ``source_type`` field
actually equals ``static`` values that merely contain the substring (e.g.
an animation type ``static_wave``) are left untouched.
"""
from __future__ import annotations
import json
from datetime import datetime, timezone
import pytest
from ledgrab.storage.data_migrations import (
ALL_MIGRATIONS,
DataMigration,
MigrationRunner,
StaticToSingleColorMigration,
)
from ledgrab.storage.database import Database
@pytest.fixture
def tmp_db(tmp_path):
db = Database(tmp_path / "test.db")
yield db
db.close()
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _insert_css(db: Database, item_id: str, blob: dict) -> None:
db.upsert("color_strip_sources", item_id, blob.get("name", ""), blob)
# ---------------------------------------------------------------------------
# Runner mechanics
# ---------------------------------------------------------------------------
def test_runner_creates_audit_table(tmp_db):
MigrationRunner(tmp_db)
cursor = tmp_db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='data_migrations'"
)
assert cursor.fetchone() is not None
def test_runner_records_applied_migration(tmp_db):
class _Noop(DataMigration):
name = "test_noop"
def apply(self, conn):
return 0
runner = MigrationRunner(tmp_db)
records = runner.run([_Noop()])
assert [r.name for r in records] == ["test_noop"]
assert "test_noop" in runner.applied_names()
def test_runner_skips_already_applied_migration(tmp_db):
calls = 0
class _CountedMigration(DataMigration):
name = "test_counted"
def apply(self, conn):
nonlocal calls
calls += 1
return 0
runner = MigrationRunner(tmp_db)
runner.run([_CountedMigration()])
runner.run([_CountedMigration()])
runner.run([_CountedMigration()])
assert calls == 1
def test_runner_records_zero_row_migration_so_it_does_not_repeat(tmp_db):
"""Even when a migration changes 0 rows, it is recorded as applied."""
class _NoMatch(DataMigration):
name = "test_no_match"
def apply(self, conn):
return 0
runner = MigrationRunner(tmp_db)
runner.run([_NoMatch()])
assert "test_no_match" in runner.applied_names()
def test_runner_does_not_record_failed_migration(tmp_db):
class _Boom(DataMigration):
name = "test_boom"
def apply(self, conn):
raise RuntimeError("kaboom")
runner = MigrationRunner(tmp_db)
with pytest.raises(RuntimeError, match="kaboom"):
runner.run([_Boom()])
assert "test_boom" not in runner.applied_names()
def test_runner_rejects_unnamed_migration(tmp_db):
class _Unnamed(DataMigration):
def apply(self, conn):
return 0
runner = MigrationRunner(tmp_db)
with pytest.raises(ValueError, match="non-empty name"):
runner.run([_Unnamed()])
def test_runner_rolls_back_data_change_when_migration_raises(tmp_db):
"""A migration that raises mid-way leaves no partial UPDATEs behind."""
blob = {
"id": "css_partial",
"name": "Partial",
"source_type": "static",
"color": [10, 20, 30],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
_insert_css(tmp_db, "css_partial", blob)
class _PartialBoom(DataMigration):
name = "test_partial_boom"
def apply(self, conn):
conn.execute(
"UPDATE color_strip_sources SET data = ? WHERE id = ?",
(json.dumps({**blob, "source_type": "single_color"}), "css_partial"),
)
raise RuntimeError("explode after update")
runner = MigrationRunner(tmp_db)
with pytest.raises(RuntimeError, match="explode after update"):
runner.run([_PartialBoom()])
rows = tmp_db.load_all("color_strip_sources")
assert rows[0]["source_type"] == "static", "UPDATE was not rolled back"
assert "test_partial_boom" not in runner.applied_names()
def test_runner_skips_when_writes_are_frozen(tmp_db, monkeypatch):
"""Frozen-write databases must not be mutated by the runner."""
monkeypatch.setattr("ledgrab.storage.data_migrations.is_writes_frozen", lambda: True)
calls = 0
class _Counted(DataMigration):
name = "test_frozen"
def apply(self, conn):
nonlocal calls
calls += 1
return 0
runner = MigrationRunner(tmp_db)
records = runner.run([_Counted()])
assert records == []
assert calls == 0
assert "test_frozen" not in runner.applied_names()
# ---------------------------------------------------------------------------
# Legacy static-rename specifics
# ---------------------------------------------------------------------------
def test_static_rename_only_touches_source_type_field(tmp_db):
"""A row whose ``animation.type`` happens to contain ``static`` survives intact."""
blob = {
"id": "css_safe01",
"name": "Has static_wave anim",
"source_type": "gradient", # not "static" — should be untouched
"animation": {"type": "static_wave", "speed": 1.0}, # substring trap
"stops": [{"position": 0.0, "color": [10, 20, 30]}],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
_insert_css(tmp_db, "css_safe01", blob)
# Run the migration via the runner (clears the existing audit row first
# because the Database fixture already triggered it through any incidental
# store creation — here we drive it directly).
runner = MigrationRunner(tmp_db)
runner.run([StaticToSingleColorMigration()])
rows = tmp_db.load_all("color_strip_sources")
assert len(rows) == 1
assert rows[0]["source_type"] == "gradient"
assert rows[0]["animation"]["type"] == "static_wave"
def test_static_rename_rewrites_only_legacy_rows(tmp_db):
legacy = {
"id": "css_legacy01",
"name": "Legacy",
"source_type": "static",
"color": [255, 0, 0],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
current = {
"id": "css_modern01",
"name": "Modern",
"source_type": "single_color",
"color": [0, 128, 255],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
_insert_css(tmp_db, "css_legacy01", legacy)
_insert_css(tmp_db, "css_modern01", current)
runner = MigrationRunner(tmp_db)
runner.run([StaticToSingleColorMigration()])
rows = {r["id"]: r for r in tmp_db.load_all("color_strip_sources")}
assert rows["css_legacy01"]["source_type"] == "single_color"
assert rows["css_modern01"]["source_type"] == "single_color"
def test_static_rename_survives_corrupt_blob(tmp_db):
"""A row whose JSON blob is corrupt does not crash the migration."""
# Insert a row with deliberately broken JSON via direct SQL (bypass upsert
# which would re-serialize). We use a hand-crafted blob.
tmp_db.execute(
"INSERT INTO color_strip_sources (id, name, data) VALUES (?, ?, ?)",
("css_bad01", "Bad", "{not valid json"),
)
legacy = {
"id": "css_legacy02",
"name": "Legacy after corrupt",
"source_type": "static",
"color": [1, 2, 3],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
_insert_css(tmp_db, "css_legacy02", legacy)
runner = MigrationRunner(tmp_db)
runner.run([StaticToSingleColorMigration()])
# Legacy row still migrated, corrupt row untouched, no exception
cursor = tmp_db.execute("SELECT id, data FROM color_strip_sources")
out = {row["id"]: row["data"] for row in cursor.fetchall()}
assert out["css_bad01"] == "{not valid json"
assert json.loads(out["css_legacy02"])["source_type"] == "single_color"
def test_all_migrations_have_unique_names():
names = [m.name for m in ALL_MIGRATIONS]
assert len(names) == len(set(names)), f"duplicate names: {names}"
def test_color_strip_store_construction_records_static_migration(tmp_db):
"""Integration: constructing the store applies AND records the migration."""
from ledgrab.storage.color_strip_source import SingleColorStripSource
from ledgrab.storage.color_strip_store import ColorStripStore
legacy = {
"id": "css_int01",
"name": "Integration legacy",
"source_type": "static",
"color": [1, 2, 3],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
_insert_css(tmp_db, "css_int01", legacy)
store = ColorStripStore(tmp_db)
# In-memory shape is correct
assert isinstance(store.get("css_int01"), SingleColorStripSource)
# Audit row exists
runner = MigrationRunner(tmp_db)
assert "001_color_strip_static_to_single_color" in runner.applied_names()