refactor(storage,processing): kind registries + versioned data migrations

Two CRITICAL data-safety bugs from the architecture audit and the two
worst parallel-change problems are fixed in one coherent pass.

Audit findings addressed:

- C2  silent CSS response fallback. The previous _RESPONSE_MAP fell
      through to a fabricated PictureCSSResponse whenever a source
      class lacked an entry; in particular game_event sources were
      silently mis-shaped. Now: GameEventCSSResponse/Create/Update
      schemas exist, _RESPONSE_MAP is re-keyed by source_type string,
      an import-time _assert_response_map_coverage() requires symmetric
      agreement with storage._SOURCE_TYPE_MAP, and the runtime path
      raises instead of fabricating a response.

- C11 string-replace JSON migration. ColorStripStore used
      blob.replace('"source_type": "static"', '"source_type":
      "single_color"') which can corrupt unrelated substrings (e.g.
      an animation type named "static_wave") and provides no audit,
      no transaction, no idempotency. Replaced with
      storage.data_migrations.MigrationRunner backed by a
      data_migrations audit table. Each migration runs inside one
      db.transaction() that covers the applied-check, the apply(),
      and the audit-INSERT — partial failures roll back atomically.
      StaticToSingleColorMigration parses each row with json.loads
      and mutates only the source_type field. Frozen-write databases
      skip with a warning.

- C3+C4 color-strip stream dispatch. The 7-branch elif in
      ColorStripStreamManager.acquire() and the duplicate one in
      ws_stream._create_stream() now share a single STREAM_BUILDERS
      registry in core.processing.color_strip_kinds, keyed by
      source.source_type. Both call sites populate a StreamDeps bag
      and delegate to build_stream(). _assert_stream_kind_coverage()
      asserts at import that STREAM_BUILDERS plus SHARABLE_KINDS
      partitions storage._SOURCE_TYPE_MAP. ws_stream's preview path
      wraps each FastAPI-DI getter in _safe() so non-audio previews
      no longer crash when audio/CSPT stores are not wired.

- C6+C7 value stream dispatch. The 14-branch isinstance ladder in
      ValueStreamManager._create_stream and its silent
      StaticValueStream(value=1.0) fallback are replaced by
      core.processing.value_kinds.STREAM_BUILDERS, keyed by
      source_type string (so AdaptiveValueSource's adaptive_time and
      adaptive_scene route to different builders correctly). The
      manager retains only the SyncClockRuntime pre-acquisition step
      for animated_color (kinds needing this are listed explicitly
      in NEEDS_CLOCK_RUNTIME). Symmetric coverage assertion plus a
      separate assertion that NEEDS_CLOCK_RUNTIME is a subset of the
      registry.

Bundled in: the static->single_color rename plus the HTTPValueStream
/ http_endpoint introduction that were already in flight on this
branch share these files; the registry refactor naturally absorbs
both via the new "single_color" / "static" alias entries and the
_build_http builder.

Tests: 26 new tests cover response-map coverage drift, migration
runner audit-table mechanics + transactional rollback +
frozen-write skip, and the two stream-builder registries. 343
existing storage / API / e2e tests stay green. Ruff clean.
This commit is contained in:
2026-05-22 22:45:28 +03:00
parent e24f9d33cc
commit 563cbac88c
15 changed files with 1976 additions and 315 deletions
@@ -9,6 +9,7 @@ from ledgrab.api.schemas.color_strip_sources import (
CompositeCSSResponse,
DaylightCSSResponse,
EffectCSSResponse,
GameEventCSSResponse,
GradientCSSResponse,
KeyColorsCSSResponse,
MappedCSSResponse,
@@ -17,7 +18,7 @@ from ledgrab.api.schemas.color_strip_sources import (
PictureAdvancedCSSResponse,
PictureCSSResponse,
ProcessedCSSResponse,
StaticCSSResponse,
SingleColorCSSResponse,
WeatherCSSResponse,
)
from ledgrab.api.schemas.devices import Calibration as CalibrationSchema
@@ -26,22 +27,7 @@ from ledgrab.core.capture.calibration import (
calibration_to_dict,
)
from ledgrab.storage.color_strip_source import (
AdvancedPictureColorStripSource,
ApiInputColorStripSource,
AudioColorStripSource,
CandlelightColorStripSource,
CompositeColorStripSource,
DaylightColorStripSource,
EffectColorStripSource,
GradientColorStripSource,
KeyColorsColorStripSource,
MappedColorStripSource,
MathWaveColorStripSource,
NotificationColorStripSource,
PictureColorStripSource,
ProcessedColorStripSource,
StaticColorStripSource,
WeatherColorStripSource,
_SOURCE_TYPE_MAP as _STORAGE_TYPE_MAP,
)
from ledgrab.storage.picture_source import (
ProcessedPictureSource,
@@ -94,34 +80,46 @@ def _stops_schema(source) -> list[ColorStopSchema] | None:
return None
# Maps storage class → response builder lambda.
# Maps ``source_type`` string → response builder.
#
# Keying by source_type (rather than type(source)) lets the import-time
# coverage check use the storage registry's keys directly, with no
# inversion or duplicate-class handling for legacy aliases.
_RESPONSE_MAP: dict = {
PictureColorStripSource: lambda s, kw: PictureCSSResponse(
"picture": lambda s, kw: PictureCSSResponse(
**kw,
picture_source_id=s.picture_source_id,
smoothing=s.smoothing.to_dict(),
interpolation_mode=s.interpolation_mode,
calibration=_calibration_schema(s),
),
AdvancedPictureColorStripSource: lambda s, kw: PictureAdvancedCSSResponse(
"picture_advanced": lambda s, kw: PictureAdvancedCSSResponse(
**kw,
smoothing=s.smoothing.to_dict(),
interpolation_mode=s.interpolation_mode,
calibration=_calibration_schema(s),
),
StaticColorStripSource: lambda s, kw: StaticCSSResponse(
"single_color": lambda s, kw: SingleColorCSSResponse(
**kw,
color=s.color.to_dict(),
animation=s.animation,
),
GradientColorStripSource: lambda s, kw: GradientCSSResponse(
# Legacy alias: pre-rename rows used "static"; the data migration rewrites
# them on first store load but a stale in-flight instance would still
# carry source_type='static' until the next reload.
"static": lambda s, kw: SingleColorCSSResponse(
**kw,
color=s.color.to_dict(),
animation=s.animation,
),
"gradient": lambda s, kw: GradientCSSResponse(
**kw,
stops=_stops_schema(s),
animation=s.animation,
easing=s.easing,
gradient_id=s.gradient_id,
),
EffectColorStripSource: lambda s, kw: EffectCSSResponse(
"effect": lambda s, kw: EffectCSSResponse(
**kw,
effect_type=s.effect_type,
palette=s.palette,
@@ -132,15 +130,15 @@ _RESPONSE_MAP: dict = {
mirror=s.mirror,
custom_palette=s.custom_palette,
),
CompositeColorStripSource: lambda s, kw: CompositeCSSResponse(
"composite": lambda s, kw: CompositeCSSResponse(
**kw,
layers=[dict(layer) for layer in s.layers],
),
MappedColorStripSource: lambda s, kw: MappedCSSResponse(
"mapped": lambda s, kw: MappedCSSResponse(
**kw,
zones=[dict(z) for z in s.zones],
),
AudioColorStripSource: lambda s, kw: AudioCSSResponse(
"audio": lambda s, kw: AudioCSSResponse(
**kw,
visualization_mode=s.visualization_mode,
audio_source_id=s.audio_source_id,
@@ -153,13 +151,13 @@ _RESPONSE_MAP: dict = {
mirror=s.mirror,
beat_decay=s.beat_decay.to_dict(),
),
ApiInputColorStripSource: lambda s, kw: ApiInputCSSResponse(
"api_input": lambda s, kw: ApiInputCSSResponse(
**kw,
fallback_color=s.fallback_color.to_dict(),
timeout=s.timeout.to_dict(),
interpolation=s.interpolation,
),
NotificationColorStripSource: lambda s, kw: NotificationCSSResponse(
"notification": lambda s, kw: NotificationCSSResponse(
**kw,
notification_effect=s.notification_effect,
duration_ms=s.duration_ms.to_dict(),
@@ -172,14 +170,14 @@ _RESPONSE_MAP: dict = {
sound_volume=s.sound_volume.to_dict(),
app_sounds=dict(s.app_sounds),
),
DaylightColorStripSource: lambda s, kw: DaylightCSSResponse(
"daylight": lambda s, kw: DaylightCSSResponse(
**kw,
speed=s.speed.to_dict(),
use_real_time=s.use_real_time,
latitude=s.latitude,
longitude=s.longitude,
),
CandlelightColorStripSource: lambda s, kw: CandlelightCSSResponse(
"candlelight": lambda s, kw: CandlelightCSSResponse(
**kw,
color=s.color.to_dict(),
intensity=s.intensity.to_dict(),
@@ -188,18 +186,18 @@ _RESPONSE_MAP: dict = {
wind_strength=s.wind_strength.to_dict(),
candle_type=s.candle_type,
),
ProcessedColorStripSource: lambda s, kw: ProcessedCSSResponse(
"processed": lambda s, kw: ProcessedCSSResponse(
**kw,
input_source_id=s.input_source_id,
processing_template_id=s.processing_template_id,
),
WeatherColorStripSource: lambda s, kw: WeatherCSSResponse(
"weather": lambda s, kw: WeatherCSSResponse(
**kw,
weather_source_id=s.weather_source_id,
speed=s.speed.to_dict(),
temperature_influence=s.temperature_influence.to_dict(),
),
KeyColorsColorStripSource: lambda s, kw: KeyColorsCSSResponse(
"key_colors": lambda s, kw: KeyColorsCSSResponse(
**kw,
picture_source_id=s.picture_source_id,
rectangles=[r.to_dict() for r in s.rectangles],
@@ -207,28 +205,67 @@ _RESPONSE_MAP: dict = {
smoothing=s.smoothing.to_dict(),
brightness=s.brightness.to_dict(),
),
MathWaveColorStripSource: lambda s, kw: MathWaveCSSResponse(
"math_wave": lambda s, kw: MathWaveCSSResponse(
**kw,
waves=s.waves,
speed=s.speed.to_dict(),
gradient_id=s.gradient_id,
),
"game_event": lambda s, kw: GameEventCSSResponse(
**kw,
game_integration_id=s.game_integration_id,
idle_color=s.idle_color.to_dict(),
event_mappings=[dict(m) for m in s.event_mappings],
),
}
def _assert_response_map_coverage() -> None:
"""Verify _RESPONSE_MAP has a builder for every kind in storage's registry.
Runs at module import. Surfaces missing builders eagerly instead of
letting a request fall through to a silent / wrong response shape.
Contract note
-------------
This check is **symmetric** (``_RESPONSE_MAP keys == storage_kinds``)
because every kind — sharable or not — needs a response shape. The
sister assertion in
``core/processing/color_strip_kinds.py::_assert_stream_kind_coverage``
is asymmetric because sharable kinds construct their streams via a
different path. Adding a new kind requires keeping all three registries
aligned: storage's ``_SOURCE_TYPE_MAP``, this ``_RESPONSE_MAP``, and
either ``STREAM_BUILDERS`` or ``SHARABLE_KINDS``.
"""
storage_kinds = set(_STORAGE_TYPE_MAP.keys())
builder_kinds = set(_RESPONSE_MAP.keys())
missing = storage_kinds - builder_kinds
extra = builder_kinds - storage_kinds
if missing or extra:
problems = []
if missing:
problems.append(f"missing builders for: {sorted(missing)}")
if extra:
problems.append(f"unregistered kinds in _RESPONSE_MAP: {sorted(extra)}")
raise RuntimeError(
"_RESPONSE_MAP is out of sync with storage._SOURCE_TYPE_MAP: " + "; ".join(problems)
)
_assert_response_map_coverage()
def _css_to_response(source, overlay_active: bool = False) -> ColorStripSourceResponse:
"""Convert a ColorStripSource to the matching per-type response schema."""
kw = _common_response_kwargs(source, overlay_active)
builder = _RESPONSE_MAP.get(type(source))
builder = _RESPONSE_MAP.get(source.source_type)
if builder is None:
# Fallback: use to_dict() and build a PictureCSSResponse
logger.warning("No response builder for %s, falling back", type(source).__name__)
return PictureCSSResponse(
**kw,
picture_source_id="",
smoothing=0.3,
interpolation_mode="average",
calibration=None,
# Coverage is asserted at import time, so reaching this branch means a
# source was loaded with a source_type that is not registered.
# Surface the bug instead of silently returning a wrong-shaped response.
raise RuntimeError(
f"No CSS response builder registered for source_type "
f"{source.source_type!r} (class={type(source).__name__})"
)
return builder(source, kw)
@@ -29,7 +29,7 @@ router = APIRouter()
_PREVIEW_ALLOWED_TYPES = {
"static",
"single_color",
"gradient",
"effect",
"daylight",
@@ -97,65 +97,65 @@ async def preview_color_strip_ws(
return ColorStripSource.from_dict(config)
def _create_stream(source):
"""Instantiate and start the appropriate stream class for *source*."""
from ledgrab.core.processing.color_strip_stream_manager import _SIMPLE_STREAM_MAP
"""Instantiate and start the appropriate stream class for *source*.
mgr = get_processor_manager()
csm = mgr.color_strip_stream_manager
Delegates the per-kind dispatch to ``color_strip_kinds.build_stream``
so this preview path and the production ``ColorStripStreamManager``
share a single registry. Per-kind dependencies (CSPT store, audio
stores, weather manager, …) are gathered into a ``StreamDeps`` bag.
if source.source_type == "audio":
FastAPI-DI providers raise ``RuntimeError`` when they aren't wired,
so we resolve each one through ``_safe`` and pass ``None`` on
failure. The per-kind builder will still see a clear error if a
truly-required dep is missing for that kind, but unrelated previews
(e.g. a ``single_color`` preview on a fresh install where the CSPT
store isn't initialized yet) keep working.
"""
from ledgrab.api.dependencies import (
get_audio_processing_template_store,
get_audio_source_store,
get_audio_template_store,
get_cspt_store,
)
from ledgrab.core.processing.audio_stream import AudioColorStripStream
from ledgrab.core.processing.color_strip_kinds import StreamDeps, build_stream
s = AudioColorStripStream(
source,
mgr.audio_capture_manager,
get_audio_source_store(),
get_audio_template_store(),
get_audio_processing_template_store(),
)
elif source.source_type == "weather":
from ledgrab.core.processing.weather_stream import WeatherColorStripStream
def _safe(getter):
try:
return getter()
except RuntimeError as e:
logger.debug("Preview dep not available (%s): %s", getter.__name__, e)
return None
s = WeatherColorStripStream(source, mgr.weather_manager)
elif source.source_type == "game_event":
from ledgrab.core.processing.game_event_stream import GameEventColorStripStream
mgr = get_processor_manager()
csm = mgr.color_strip_stream_manager
s = GameEventColorStripStream(source)
# The game-event bus is optional in preview contexts.
try:
from ledgrab.api.dependencies import get_game_event_bus
bus = get_game_event_bus()
game_event_bus = get_game_event_bus()
except RuntimeError as e:
logger.debug("Preview: no game event bus available: %s", e)
else:
if bus is not None:
s.set_event_bus(bus)
elif source.source_type == "mapped":
from ledgrab.core.processing.mapped_stream import MappedColorStripStream
game_event_bus = None
s = MappedColorStripStream(source, csm)
elif source.source_type == "composite":
from ledgrab.api.dependencies import get_cspt_store
from ledgrab.core.processing.composite_stream import CompositeColorStripStream
s = CompositeColorStripStream(
source, csm, mgr.value_stream_manager, get_cspt_store(), depth=0
deps = StreamDeps(
css_manager=csm,
value_stream_manager=mgr.value_stream_manager,
cspt_store=_safe(get_cspt_store),
weather_manager=mgr.weather_manager,
audio_capture_manager=mgr.audio_capture_manager,
audio_source_store=_safe(get_audio_source_store),
audio_template_store=_safe(get_audio_template_store),
audio_processing_template_store=_safe(get_audio_processing_template_store),
game_event_bus=game_event_bus,
depth=0,
)
elif source.source_type == "processed":
from ledgrab.api.dependencies import get_cspt_store
from ledgrab.core.processing.processed_stream import ProcessedColorStripStream
s = ProcessedColorStripStream(source, csm, get_cspt_store())
else:
stream_cls = _SIMPLE_STREAM_MAP.get(source.source_type)
if not stream_cls:
raise ValueError(f"Unsupported preview source_type: {source.source_type}")
s = stream_cls(source)
try:
s = build_stream(source, deps)
except ValueError as e:
# Preserve the registry's original detail so the API consumer
# sees which kind was rejected, not just a generic message.
raise ValueError(f"Unsupported preview source_type: {e}") from e
# Inject gradient store for palette resolution
if hasattr(s, "set_gradient_store"):
try:
@@ -428,8 +428,17 @@ async def css_api_input_ws(
continue
elif "bytes" in message:
# Binary frame: raw RGBRGB... bytes (3 bytes per LED)
# Binary frame: raw RGBRGB... bytes (3 bytes per LED).
# Cap to a generous upper bound on the LED count — a hostile
# client could otherwise stream 100 MB frames and OOM the
# server before any application logic ran.
raw_bytes = message["bytes"]
_MAX_BINARY_LEDS = 8192
if len(raw_bytes) > _MAX_BINARY_LEDS * 3:
await websocket.send_json(
{"error": f"Binary frame too large (max {_MAX_BINARY_LEDS} LEDs)"}
)
continue
if len(raw_bytes) % 3 != 0:
await websocket.send_json({"error": "Binary data must be multiple of 3 bytes"})
continue
@@ -122,9 +122,9 @@ class PictureAdvancedCSSResponse(_CSSResponseBase):
calibration: Optional[Calibration] = Field(None, description="LED calibration")
class StaticCSSResponse(_CSSResponseBase):
source_type: Literal["static"] = "static"
color: Any = Field(description="Static RGB color")
class SingleColorCSSResponse(_CSSResponseBase):
source_type: Literal["single_color"] = "single_color"
color: Any = Field(description="Solid RGB color")
animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config")
@@ -240,11 +240,18 @@ class MathWaveCSSResponse(_CSSResponseBase):
gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping")
class GameEventCSSResponse(_CSSResponseBase):
source_type: Literal["game_event"] = "game_event"
game_integration_id: str = Field(description="Game integration entity ID")
idle_color: Any = Field(description="Idle RGB color (bindable)")
event_mappings: List[dict] = Field(default_factory=list, description="Event-to-effect mappings")
ColorStripSourceResponse = Annotated[
Union[
Annotated[PictureCSSResponse, Tag("picture")],
Annotated[PictureAdvancedCSSResponse, Tag("picture_advanced")],
Annotated[StaticCSSResponse, Tag("static")],
Annotated[SingleColorCSSResponse, Tag("single_color")],
Annotated[GradientCSSResponse, Tag("gradient")],
Annotated[EffectCSSResponse, Tag("effect")],
Annotated[CompositeCSSResponse, Tag("composite")],
@@ -258,6 +265,7 @@ ColorStripSourceResponse = Annotated[
Annotated[WeatherCSSResponse, Tag("weather")],
Annotated[KeyColorsCSSResponse, Tag("key_colors")],
Annotated[MathWaveCSSResponse, Tag("math_wave")],
Annotated[GameEventCSSResponse, Tag("game_event")],
],
Discriminator("source_type"),
]
@@ -303,9 +311,9 @@ class PictureAdvancedCSSCreate(_CSSCreateBase):
calibration: Optional[Calibration] = Field(None, description="LED calibration")
class StaticCSSCreate(_CSSCreateBase):
source_type: Literal["static"] = "static"
color: Any = Field(default=None, description="Static RGB color [R,G,B]")
class SingleColorCSSCreate(_CSSCreateBase):
source_type: Literal["single_color"] = "single_color"
color: Any = Field(default=None, description="Solid RGB color [R,G,B]")
animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config")
@@ -434,11 +442,18 @@ class MathWaveCSSCreate(_CSSCreateBase):
gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping")
class GameEventCSSCreate(_CSSCreateBase):
source_type: Literal["game_event"] = "game_event"
game_integration_id: Optional[str] = Field(None, description="Game integration entity ID")
idle_color: Any = Field(default=None, description="Idle RGB color [R,G,B] (bindable)")
event_mappings: Optional[List[dict]] = Field(None, description="Event-to-effect mappings")
ColorStripSourceCreate = Annotated[
Union[
Annotated[PictureCSSCreate, Tag("picture")],
Annotated[PictureAdvancedCSSCreate, Tag("picture_advanced")],
Annotated[StaticCSSCreate, Tag("static")],
Annotated[SingleColorCSSCreate, Tag("single_color")],
Annotated[GradientCSSCreate, Tag("gradient")],
Annotated[EffectCSSCreate, Tag("effect")],
Annotated[CompositeCSSCreate, Tag("composite")],
@@ -452,6 +467,7 @@ ColorStripSourceCreate = Annotated[
Annotated[WeatherCSSCreate, Tag("weather")],
Annotated[KeyColorsCSSCreate, Tag("key_colors")],
Annotated[MathWaveCSSCreate, Tag("math_wave")],
Annotated[GameEventCSSCreate, Tag("game_event")],
],
Discriminator("source_type"),
]
@@ -497,9 +513,9 @@ class PictureAdvancedCSSUpdate(_CSSUpdateBase):
calibration: Optional[Calibration] = Field(None, description="LED calibration")
class StaticCSSUpdate(_CSSUpdateBase):
source_type: Literal["static"] = "static"
color: Any = Field(default=None, description="Static RGB color [R,G,B]")
class SingleColorCSSUpdate(_CSSUpdateBase):
source_type: Literal["single_color"] = "single_color"
color: Any = Field(default=None, description="Solid RGB color [R,G,B]")
animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config")
@@ -626,11 +642,18 @@ class MathWaveCSSUpdate(_CSSUpdateBase):
gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping")
class GameEventCSSUpdate(_CSSUpdateBase):
source_type: Literal["game_event"] = "game_event"
game_integration_id: Optional[str] = Field(None, description="Game integration entity ID")
idle_color: Any = Field(default=None, description="Idle RGB color [R,G,B] (bindable)")
event_mappings: Optional[List[dict]] = Field(None, description="Event-to-effect mappings")
ColorStripSourceUpdate = Annotated[
Union[
Annotated[PictureCSSUpdate, Tag("picture")],
Annotated[PictureAdvancedCSSUpdate, Tag("picture_advanced")],
Annotated[StaticCSSUpdate, Tag("static")],
Annotated[SingleColorCSSUpdate, Tag("single_color")],
Annotated[GradientCSSUpdate, Tag("gradient")],
Annotated[EffectCSSUpdate, Tag("effect")],
Annotated[CompositeCSSUpdate, Tag("composite")],
@@ -644,6 +667,7 @@ ColorStripSourceUpdate = Annotated[
Annotated[WeatherCSSUpdate, Tag("weather")],
Annotated[KeyColorsCSSUpdate, Tag("key_colors")],
Annotated[MathWaveCSSUpdate, Tag("math_wave")],
Annotated[GameEventCSSUpdate, Tag("game_event")],
],
Discriminator("source_type"),
]
@@ -0,0 +1,273 @@
"""Single source of truth for non-sharable color-strip stream construction.
Both the preview WebSocket (``api/routes/color_strip_sources/ws_stream.py``)
and the production ``ColorStripStreamManager.acquire`` used to maintain
parallel ``if source.source_type == "..." elif ... else ..._SIMPLE_STREAM_MAP``
chains. Adding a new kind required keeping those two chains in lockstep, and
silently fell through to a generic stream class when an entry was missed.
This module replaces both chains with a single ``STREAM_BUILDERS`` registry
plus a small ``StreamDeps`` dependency bag. Each caller populates the bag
from its own context (DI container, processor manager, etc.) and looks the
builder up by ``source.source_type``. A coverage assertion at import time
guarantees every kind in ``storage._SOURCE_TYPE_MAP`` is either sharable or
has a builder here — silent fall-throughs are no longer possible.
Sharable kinds (``picture``, ``picture_advanced``, ``key_colors``) are NOT in
this registry: they require an injected ``LiveStream`` whose acquisition is
intertwined with the source's calibration, which does not fit a uniform
factory signature. Those continue to use bespoke paths inside
``ColorStripStreamManager``.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Callable
@dataclass(frozen=True)
class StreamDeps:
"""Dependency bag for non-sharable stream construction.
Each call site (preview WebSocket, production stream manager) builds one
of these from its own context before invoking :func:`build_stream`.
Fields are ``Any`` (with ``None`` defaults) because individual builders
only consume a subset; tests can supply a minimal bag.
``frozen=True`` guards against a builder accidentally reassigning a
field; it does NOT make the referenced objects immutable — the
``css_manager``, stores, etc. are live mutable services.
``css_manager`` is needed by composite/mapped/processed builders so they
can recursively acquire dependent streams. Single-kind builders ignore
it. The field has no default so callers are forced to think about which
manager they are wiring through.
"""
css_manager: Any
value_stream_manager: Any = None
cspt_store: Any = None # ColorStripProcessingTemplateStore
weather_manager: Any = None
audio_capture_manager: Any = None
audio_source_store: Any = None
audio_template_store: Any = None
audio_processing_template_store: Any = None
game_event_bus: Any = None
depth: int = 0 # composite nesting depth — passed through verbatim
# ---------------------------------------------------------------------------
# Per-kind builders
#
# Each builder is a small free function ``(source, deps) -> ColorStripStream``.
# Imports are deferred to keep this module cheap to import (the production
# processing graph is large).
# ---------------------------------------------------------------------------
def _build_audio(source, d: StreamDeps):
from ledgrab.core.processing.audio_stream import AudioColorStripStream
return AudioColorStripStream(
source,
d.audio_capture_manager,
d.audio_source_store,
d.audio_template_store,
d.audio_processing_template_store,
)
def _build_composite(source, d: StreamDeps):
from ledgrab.core.processing.composite_stream import CompositeColorStripStream
return CompositeColorStripStream(
source,
d.css_manager,
d.value_stream_manager,
d.cspt_store,
depth=d.depth,
)
def _build_mapped(source, d: StreamDeps):
from ledgrab.core.processing.mapped_stream import MappedColorStripStream
return MappedColorStripStream(source, d.css_manager)
def _build_processed(source, d: StreamDeps):
from ledgrab.core.processing.processed_stream import ProcessedColorStripStream
return ProcessedColorStripStream(source, d.css_manager, d.cspt_store)
def _build_weather(source, d: StreamDeps):
from ledgrab.core.processing.weather_stream import WeatherColorStripStream
return WeatherColorStripStream(source, d.weather_manager)
def _build_game_event(source, d: StreamDeps):
from ledgrab.core.processing.game_event_stream import GameEventColorStripStream
stream = GameEventColorStripStream(source)
if d.game_event_bus is not None:
stream.set_event_bus(d.game_event_bus)
return stream
def _make_source_only_builder(loader: Callable[[], type]) -> Callable[[Any, StreamDeps], Any]:
"""Wrap a class-loader so it produces a uniform ``(source, deps) -> stream`` builder.
The loader is called on each invocation but module caching makes the
import a single dict lookup after the first call.
"""
def _build(source, _deps: StreamDeps):
return loader()(source)
return _build
def _single_color_cls() -> type:
from ledgrab.core.processing.color_strip_stream import SingleColorStripStream
return SingleColorStripStream
def _gradient_cls() -> type:
from ledgrab.core.processing.color_strip_stream import GradientColorStripStream
return GradientColorStripStream
def _effect_cls() -> type:
from ledgrab.core.processing.effect_stream import EffectColorStripStream
return EffectColorStripStream
def _api_input_cls() -> type:
from ledgrab.core.processing.api_input_stream import ApiInputColorStripStream
return ApiInputColorStripStream
def _notification_cls() -> type:
from ledgrab.core.processing.notification_stream import NotificationColorStripStream
return NotificationColorStripStream
def _daylight_cls() -> type:
from ledgrab.core.processing.daylight_stream import DaylightColorStripStream
return DaylightColorStripStream
def _candlelight_cls() -> type:
from ledgrab.core.processing.candlelight_stream import CandlelightColorStripStream
return CandlelightColorStripStream
def _math_wave_cls() -> type:
from ledgrab.core.processing.math_wave_stream import MathWaveColorStripStream
return MathWaveColorStripStream
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
StreamBuilder = Callable[[Any, StreamDeps], Any]
STREAM_BUILDERS: dict[str, StreamBuilder] = {
"audio": _build_audio,
"composite": _build_composite,
"mapped": _build_mapped,
"processed": _build_processed,
"weather": _build_weather,
"game_event": _build_game_event,
"single_color": _make_source_only_builder(_single_color_cls),
# Legacy alias: pre-rename rows used "static". The data migration rewrites
# the on-disk source_type on startup, but this alias keeps an in-flight
# legacy entry resolving to the right stream class.
"static": _make_source_only_builder(_single_color_cls),
"gradient": _make_source_only_builder(_gradient_cls),
"effect": _make_source_only_builder(_effect_cls),
"api_input": _make_source_only_builder(_api_input_cls),
"notification": _make_source_only_builder(_notification_cls),
"daylight": _make_source_only_builder(_daylight_cls),
"candlelight": _make_source_only_builder(_candlelight_cls),
"math_wave": _make_source_only_builder(_math_wave_cls),
}
# Sharable kinds are handled by dedicated LiveStream-acquisition paths in
# ColorStripStreamManager (their construction depends on calibration → picture
# source resolution, which does not fit a uniform factory signature).
SHARABLE_KINDS: frozenset[str] = frozenset({"picture", "picture_advanced", "key_colors"})
def build_stream(source, deps: StreamDeps):
"""Build a non-sharable color-strip stream for *source*.
Raises ``ValueError`` if the kind has no registered builder (which
would indicate a sharable source slipped through the caller's
``sharable`` gate, or a new kind missing from this registry).
"""
builder = STREAM_BUILDERS.get(source.source_type)
if builder is None:
raise ValueError(
f"No stream builder for non-sharable color-strip-source kind "
f"{source.source_type!r} (id={getattr(source, 'id', '?')!r})"
)
return builder(source, deps)
def _assert_stream_kind_coverage() -> None:
"""Verify the registry is a strict partition: every kind from storage is
either listed in SHARABLE_KINDS or has a STREAM_BUILDERS entry.
Runs at module import so a kind added to ``_SOURCE_TYPE_MAP`` without a
corresponding builder fails the server boot loudly instead of silently
falling through at request time.
Contract note
-------------
This check is **asymmetric** (``STREAM_BUILDERS SHARABLE_KINDS ==
storage_kinds``) because sharable kinds are constructed by a separate
path inside ``ColorStripStreamManager``. The sister assertion in
``api/routes/color_strip_sources/_helpers.py::_assert_response_map_coverage``
is **symmetric** (``_RESPONSE_MAP keys == storage_kinds``) because every
kind, sharable or not, still needs a response shape. Both assertions key
by the ``source_type`` string; adding a new kind requires updates to
storage, ``_RESPONSE_MAP``, and either ``STREAM_BUILDERS`` or
``SHARABLE_KINDS``. Both assertions catch missing entries; only this one
expects a subset relationship.
"""
from ledgrab.storage.color_strip_source import _SOURCE_TYPE_MAP
storage_kinds = set(_SOURCE_TYPE_MAP.keys())
builder_kinds = set(STREAM_BUILDERS.keys())
expected_non_sharable = storage_kinds - SHARABLE_KINDS
missing = expected_non_sharable - builder_kinds
extra = builder_kinds - storage_kinds
if missing or extra:
problems = []
if missing:
problems.append(f"missing builders: {sorted(missing)}")
if extra:
problems.append(f"unregistered kinds: {sorted(extra)}")
raise RuntimeError(
"color_strip_kinds.STREAM_BUILDERS is out of sync with storage._SOURCE_TYPE_MAP: "
+ "; ".join(problems)
)
_assert_stream_kind_coverage()
@@ -3,7 +3,7 @@
PictureColorStripStreams (expensive screen capture) are shared across multiple
consumers via reference counting — processing runs once, not once per target.
Count-dependent streams (static, gradient, effect) are NOT shared.
Count-dependent streams (single_color, gradient, effect) are NOT shared.
Each consumer gets its own instance so it can configure an independent LED count
without interfering with other targets.
"""
@@ -11,37 +11,18 @@ without interfering with other targets.
from dataclasses import dataclass
from typing import Dict, Optional
from ledgrab.core.processing.color_strip_kinds import (
StreamDeps,
build_stream,
)
from ledgrab.core.processing.color_strip_stream import (
ColorStripStream,
GradientColorStripStream,
PictureColorStripStream,
StaticColorStripStream,
)
from ledgrab.core.processing.processed_stream import ProcessedColorStripStream
from ledgrab.core.processing.effect_stream import EffectColorStripStream
from ledgrab.core.processing.api_input_stream import ApiInputColorStripStream
from ledgrab.core.processing.notification_stream import NotificationColorStripStream
from ledgrab.core.processing.daylight_stream import DaylightColorStripStream
from ledgrab.core.processing.candlelight_stream import CandlelightColorStripStream
from ledgrab.core.processing.game_event_stream import GameEventColorStripStream
from ledgrab.core.processing.math_wave_stream import MathWaveColorStripStream
from ledgrab.utils import get_logger
logger = get_logger(__name__)
# source_type → stream class for non-picture (non-sharable) sources
_SIMPLE_STREAM_MAP = {
"static": StaticColorStripStream,
"gradient": GradientColorStripStream,
"effect": EffectColorStripStream,
"api_input": ApiInputColorStripStream,
"notification": NotificationColorStripStream,
"daylight": DaylightColorStripStream,
"candlelight": CandlelightColorStripStream,
"game_event": GameEventColorStripStream,
"math_wave": MathWaveColorStripStream,
}
@dataclass
class _ColorStripEntry:
@@ -241,43 +222,27 @@ class ColorStripStreamManager:
"""
source = self._color_strip_store.get_source(css_id)
# Non-sharable: always create a fresh per-consumer instance
# Non-sharable: always create a fresh per-consumer instance.
# Construction is delegated to the per-kind registry in
# ``color_strip_kinds`` so the dispatch lives in exactly one place.
if not source.sharable:
if source.source_type == "audio":
from ledgrab.core.processing.audio_stream import AudioColorStripStream
css_stream = AudioColorStripStream(
source,
self._audio_capture_manager,
self._audio_source_store,
self._audio_template_store,
self._audio_processing_template_store,
deps = StreamDeps(
css_manager=self,
value_stream_manager=self._value_stream_manager,
cspt_store=self._cspt_store,
weather_manager=self._weather_manager,
audio_capture_manager=self._audio_capture_manager,
audio_source_store=self._audio_source_store,
audio_template_store=self._audio_template_store,
audio_processing_template_store=self._audio_processing_template_store,
game_event_bus=self._game_event_bus,
depth=depth,
)
elif source.source_type == "composite":
from ledgrab.core.processing.composite_stream import (
CompositeColorStripStream,
)
css_stream = CompositeColorStripStream(
source, self, self._value_stream_manager, self._cspt_store, depth=depth
)
elif source.source_type == "mapped":
from ledgrab.core.processing.mapped_stream import MappedColorStripStream
css_stream = MappedColorStripStream(source, self)
elif source.source_type == "processed":
css_stream = ProcessedColorStripStream(source, self, self._cspt_store)
elif source.source_type == "weather":
from ledgrab.core.processing.weather_stream import WeatherColorStripStream
css_stream = WeatherColorStripStream(source, self._weather_manager)
else:
stream_cls = _SIMPLE_STREAM_MAP.get(source.source_type)
if not stream_cls:
raise ValueError(
f"Unsupported color strip source type '{source.source_type}' for {css_id}"
)
css_stream = stream_cls(source)
try:
css_stream = build_stream(source, deps)
except ValueError as e:
# Surface the css_id alongside the registry's error.
raise ValueError(f"{e} (css_id={css_id})") from e
# Inject gradient store for palette resolution
if self._gradient_store and hasattr(css_stream, "set_gradient_store"):
css_stream.set_gradient_store(self._gradient_store)
@@ -0,0 +1,354 @@
"""Single source of truth for value-stream construction.
``ValueStreamManager._create_stream`` used to be a 168-line ``isinstance``
ladder over 14 ``ValueSource`` subclasses with a silent fallback to
``StaticValueStream(value=1.0)``. The ladder forced any new value kind to
edit the factory plus the storage subclass plus the schemas plus the store's
``create_source`` — and a missing branch corrupted the stream at runtime.
This module replaces the ladder with a single ``STREAM_BUILDERS`` registry
keyed by the ``source_type`` string (matching the storage layer's
``_VALUE_SOURCE_MAP``). An import-time coverage assertion guarantees the two
registries stay aligned.
Builders take a ``(source, deps: ValueStreamDeps) -> ValueStream`` shape so
both the production manager and any preview / test harness can populate the
deps from their own context.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Callable, Optional
if TYPE_CHECKING:
# Typed forward references so mypy/pyright catch typos like
# ``d.gradient_stroe`` at static-analysis time. At runtime these are
# ``Any`` — the live objects come from the FastAPI / manager wiring.
from ledgrab.core.audio.audio_capture_manager import AudioCaptureManager
from ledgrab.core.game_integration.event_bus import GameEventBus
from ledgrab.core.home_assistant.ha_manager import HomeAssistantManager
from ledgrab.core.processing.color_strip_stream_manager import (
ColorStripStreamManager,
)
from ledgrab.core.processing.live_stream_manager import LiveStreamManager
from ledgrab.core.processing.sync_clock_manager import SyncClockRuntime
from ledgrab.storage.audio_processing_template_store import (
AudioProcessingTemplateStore,
)
from ledgrab.storage.audio_source_store import AudioSourceStore
from ledgrab.storage.audio_template_store import AudioTemplateStore
from ledgrab.storage.gradient_store import GradientStore
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
@dataclass(frozen=True)
class ValueStreamDeps:
"""Dependency bag for value-stream construction.
Builders only read the subset they need. ``value_stream_manager`` is the
one truly load-bearing field — it is referenced by
``GradientMapValueStream`` so it can recursively acquire its source
value stream.
``clock_runtime`` is pre-acquired by the manager exclusively for the
``animated_color`` kind (see :data:`NEEDS_CLOCK_RUNTIME`). The manager
owns the bookkeeping for tracking ``vs_id → clock_id`` so the builder
stays a pure ``(source, deps) -> stream`` mapping. If a new kind ever
grows a clock dependency, add it to ``NEEDS_CLOCK_RUNTIME`` AND surface
a separate field — sharing ``clock_runtime`` across kinds invites the
wrong runtime being passed to the wrong builder.
Field types are quoted so they stay informational under
``TYPE_CHECKING`` and the dataclass still accepts plain mocks at
runtime. Builders therefore get IDE/lint help against typos like
``d.gradient_stroe`` while production code remains duck-typed.
"""
value_stream_manager: "Any"
audio_capture_manager: Optional["AudioCaptureManager"] = None
audio_source_store: Optional["AudioSourceStore"] = None
audio_template_store: Optional["AudioTemplateStore"] = None
audio_processing_template_store: Optional["AudioProcessingTemplateStore"] = None
live_stream_manager: Optional["LiveStreamManager"] = None
ha_manager: Optional["HomeAssistantManager"] = None
gradient_store: Optional["GradientStore"] = None
css_stream_manager: Optional["ColorStripStreamManager"] = None
event_bus: Optional["GameEventBus"] = None
http_endpoint_store: Optional["HTTPEndpointStore"] = None
clock_runtime: Optional["SyncClockRuntime"] = None
# ---------------------------------------------------------------------------
# Per-kind builders
# ---------------------------------------------------------------------------
def _build_static(source, _d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import StaticValueStream
return StaticValueStream(value=source.value)
def _build_animated(source, _d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import AnimatedValueStream
return AnimatedValueStream(
waveform=source.waveform,
speed=source.speed,
min_value=source.min_value,
max_value=source.max_value,
)
def _build_audio(source, d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import AudioValueStream
return AudioValueStream(
audio_source_id=source.audio_source_id,
mode=source.mode,
sensitivity=source.sensitivity,
smoothing=source.smoothing,
min_value=source.min_value,
max_value=source.max_value,
auto_gain=source.auto_gain,
audio_capture_manager=d.audio_capture_manager,
audio_source_store=d.audio_source_store,
audio_template_store=d.audio_template_store,
audio_processing_template_store=d.audio_processing_template_store,
)
def _build_daylight(source, _d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import DaylightValueStream
return DaylightValueStream(
speed=source.speed,
use_real_time=source.use_real_time,
latitude=source.latitude,
longitude=source.longitude,
min_value=source.min_value,
max_value=source.max_value,
)
def _build_adaptive_time(source, _d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import TimeOfDayValueStream
return TimeOfDayValueStream(
schedule=source.schedule,
min_value=source.min_value,
max_value=source.max_value,
)
def _build_adaptive_scene(source, d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import SceneValueStream
return SceneValueStream(
picture_source_id=source.picture_source_id,
scene_behavior=source.scene_behavior,
sensitivity=source.sensitivity,
smoothing=source.smoothing,
min_value=source.min_value,
max_value=source.max_value,
live_stream_manager=d.live_stream_manager,
)
def _build_static_color(source, _d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import StaticColorValueStream
return StaticColorValueStream(color=source.color)
def _build_animated_color(source, d: ValueStreamDeps):
# See NEEDS_CLOCK_RUNTIME below: ``d.clock_runtime`` is pre-acquired by
# ``ValueStreamManager._create_stream`` exclusively for this kind. Any
# other builder that ever needs a clock should add its own deps field
# rather than read this one.
from ledgrab.core.processing.value_stream import AnimatedColorValueStream
return AnimatedColorValueStream(
colors=source.colors,
speed=source.speed,
easing=source.easing,
clock=d.clock_runtime,
)
def _build_adaptive_time_color(source, _d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import AdaptiveTimeColorValueStream
return AdaptiveTimeColorValueStream(schedule=source.schedule)
def _build_ha_entity(source, d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import HAEntityValueStream
return HAEntityValueStream(
ha_source_id=source.ha_source_id,
entity_id=source.entity_id,
attribute=source.attribute,
min_ha_value=source.min_ha_value,
max_ha_value=source.max_ha_value,
smoothing=source.smoothing,
ha_manager=d.ha_manager,
)
def _build_gradient_map(source, d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import GradientMapValueStream
return GradientMapValueStream(
value_source_id=source.value_source_id,
gradient_id=source.gradient_id,
easing=source.easing,
value_stream_manager=d.value_stream_manager,
gradient_store=d.gradient_store,
)
def _build_css_extract(source, d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import CSSExtractValueStream
return CSSExtractValueStream(
color_strip_source_id=source.color_strip_source_id,
led_start=source.led_start,
led_end=source.led_end,
css_stream_manager=d.css_stream_manager,
)
def _build_system_metrics(source, _d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import SystemMetricsValueStream
return SystemMetricsValueStream(
metric=source.metric,
min_value=source.min_value,
max_value=source.max_value,
max_rate=source.max_rate,
disk_path=source.disk_path,
sensor_label=source.sensor_label,
poll_interval=source.poll_interval,
smoothing=source.smoothing,
)
def _build_game_event(source, d: ValueStreamDeps):
# Late import: ``GameEventValueStream`` lives in a separate sub-package
# to keep the game-event subsystem (which transitively pulls in the
# game-integration adapters) off the cold-start path for installs that
# never use game events.
from ledgrab.core.value_sources.game_event_value_source import GameEventValueStream
return GameEventValueStream(
event_type=source.event_type,
min_game_value=source.min_game_value,
max_game_value=source.max_game_value,
smoothing=source.smoothing,
default_value=source.default_value,
timeout=source.timeout,
event_bus=d.event_bus,
)
def _build_http(source, d: ValueStreamDeps):
from ledgrab.core.processing.value_stream import HTTPValueStream
return HTTPValueStream(
endpoint_id=source.http_endpoint_id,
json_path=source.json_path,
interval_s=source.interval_s,
min_value=source.min_value,
max_value=source.max_value,
smoothing=source.smoothing,
http_endpoint_store=d.http_endpoint_store,
)
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
StreamBuilder = Callable[[Any, ValueStreamDeps], Any]
STREAM_BUILDERS: dict[str, StreamBuilder] = {
"static": _build_static,
"animated": _build_animated,
"audio": _build_audio,
"daylight": _build_daylight,
"adaptive_time": _build_adaptive_time,
"adaptive_scene": _build_adaptive_scene,
"static_color": _build_static_color,
"animated_color": _build_animated_color,
"adaptive_time_color": _build_adaptive_time_color,
"ha_entity": _build_ha_entity,
"gradient_map": _build_gradient_map,
"css_extract": _build_css_extract,
"system_metrics": _build_system_metrics,
"game_event": _build_game_event,
"http": _build_http,
}
# ``animated_color`` is the only kind that needs a pre-acquired SyncClockRuntime
# from the manager; the rest derive everything they need from ``source`` and
# ``deps``. Exposing this set lets the manager perform the side-effecting
# acquisition step exactly once, before delegating to the registry.
NEEDS_CLOCK_RUNTIME: frozenset[str] = frozenset({"animated_color"})
def build_stream(source, deps: ValueStreamDeps):
"""Build a ValueStream for *source*.
Raises ``ValueError`` when no builder is registered for
``source.source_type``. Coverage is asserted at module import, so this
only fires for an in-flight instance whose ``source_type`` somehow
drifted from the registered set.
"""
builder = STREAM_BUILDERS.get(source.source_type)
if builder is None:
raise ValueError(
f"No value-stream builder for source_type {source.source_type!r} "
f"(id={getattr(source, 'id', '?')!r})"
)
return builder(source, deps)
def _assert_value_kind_coverage() -> None:
"""Verify the registry and storage's ``_VALUE_SOURCE_MAP`` agree.
Runs at module import. Symmetric: every kind in storage must have a
builder, and every builder must correspond to a real storage kind.
Also asserts ``NEEDS_CLOCK_RUNTIME`` only names kinds that exist in the
registry, so a typo there fails the boot loudly instead of silently
leaking a ``SyncClockRuntime`` acquisition.
"""
from ledgrab.storage.value_source import _VALUE_SOURCE_MAP
storage_kinds = set(_VALUE_SOURCE_MAP.keys())
builder_kinds = set(STREAM_BUILDERS.keys())
missing = storage_kinds - builder_kinds
extra = builder_kinds - storage_kinds
if missing or extra:
problems = []
if missing:
problems.append(f"missing builders: {sorted(missing)}")
if extra:
problems.append(f"unregistered kinds: {sorted(extra)}")
raise RuntimeError(
"value_kinds.STREAM_BUILDERS is out of sync with storage._VALUE_SOURCE_MAP: "
+ "; ".join(problems)
)
rogue_clock_kinds = NEEDS_CLOCK_RUNTIME - builder_kinds
if rogue_clock_kinds:
raise RuntimeError(
"value_kinds.NEEDS_CLOCK_RUNTIME names kinds with no registered "
f"builder: {sorted(rogue_clock_kinds)}"
)
_assert_value_kind_coverage()
+261 -141
View File
@@ -21,7 +21,9 @@ ValueStreamManager owns all running ValueStreams, keyed by
from __future__ import annotations
import asyncio
import json
import math
import re
import time
from abc import ABC, abstractmethod
from datetime import datetime
@@ -29,8 +31,13 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
import numpy as np
from ledgrab.storage.base_store import EntityNotFoundError
from ledgrab.utils import get_logger
# Compiled once — used by ``_extract_simple_path`` on every poll.
_NAME_HEAD_RE = re.compile(r"^([^\[]*)")
_INDEX_RE = re.compile(r"^\[(\d+)\]")
if TYPE_CHECKING:
from ledgrab.core.audio.audio_capture import AudioCaptureManager
from ledgrab.core.game_integration.event_bus import GameEventBus
@@ -39,6 +46,7 @@ if TYPE_CHECKING:
from ledgrab.core.processing.live_stream_manager import LiveStreamManager
from ledgrab.core.processing.sync_clock_manager import SyncClockManager
from ledgrab.storage.audio_source_store import AudioSourceStore
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
from ledgrab.storage.value_source import ValueSource
from ledgrab.storage.value_source_store import ValueSourceStore
@@ -1017,6 +1025,211 @@ class HAEntityValueStream(ValueStream):
logger.warning("HAEntityValueStream failed to swap HA runtime: %s", e)
# ---------------------------------------------------------------------------
# HTTP poll
# ---------------------------------------------------------------------------
class HTTPValueStream(ValueStream):
"""Periodically polls an HTTPEndpoint and extracts a value via json_path.
Exposes two accessors:
- ``get_value()`` returns a normalized float in [0, 1] for use as a
modulator (brightness, color, etc.). The raw extracted value is
coerced to float; non-numeric values yield 0.0.
- ``get_raw_value()`` returns the un-normalized extracted value
(str / int / float / bool / None) for consumers that need it
verbatim — e.g. an automation rule comparing ``"playing"``.
"""
def __init__(
self,
endpoint_id: str,
json_path: str,
interval_s: int,
min_value: float,
max_value: float,
smoothing: float,
http_endpoint_store: Optional["HTTPEndpointStore"] = None,
) -> None:
self._endpoint_id = endpoint_id
self._json_path = json_path
self._interval_s = max(1, int(interval_s))
self._min_value = min_value
self._max_value = max_value
self._smoothing = smoothing
self._http_endpoint_store = http_endpoint_store
self._task: Optional[asyncio.Task] = None
self._raw_value: Any = None
self._prev_normalized: Optional[float] = None
# Kept as private attrs for internal/log diagnostics; not exposed via
# public properties or API until a status endpoint consumes them.
self._last_fetched_at: Optional[datetime] = None
self._last_status_code: Optional[int] = None
self._last_error: Optional[str] = None
def start(self) -> None:
if self._task is not None:
return
if not self._endpoint_id or self._http_endpoint_store is None:
return
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop — can't poll. Construction in a sync test
# context is fine; the stream just stays idle until started
# from an async context.
return
self._task = loop.create_task(self._poll_loop())
def stop(self) -> None:
task = self._task
self._task = None
if task is not None:
task.cancel()
def get_value(self) -> float:
raw = self._raw_value
if raw is None:
return self._prev_normalized if self._prev_normalized is not None else 0.0
try:
numeric = float(raw)
except (TypeError, ValueError):
return self._prev_normalized if self._prev_normalized is not None else 0.0
rng = self._max_value - self._min_value
if abs(rng) < 1e-9:
normalized = 0.5
else:
normalized = (numeric - self._min_value) / rng
normalized = max(0.0, min(1.0, normalized))
if self._smoothing > 0.0 and self._prev_normalized is not None:
normalized = (
self._smoothing * self._prev_normalized + (1.0 - self._smoothing) * normalized
)
self._prev_normalized = normalized
return normalized
def get_raw_value(self) -> Any:
"""Return the last raw extracted value (string, int, float, etc.)."""
return self._raw_value
def update_source(self, source: "ValueSource") -> None:
from ledgrab.storage.value_source import HTTPValueSource
if not isinstance(source, HTTPValueSource):
return
self._endpoint_id = source.http_endpoint_id
self._json_path = source.json_path
self._interval_s = max(1, int(source.interval_s))
self._min_value = source.min_value
self._max_value = source.max_value
self._smoothing = source.smoothing
async def _poll_loop(self) -> None:
from ledgrab.utils.safe_source import safe_request_bounded
try:
while True:
try:
endpoint = self._http_endpoint_store.get(self._endpoint_id)
except EntityNotFoundError:
# The endpoint was deleted out from under us. Stop the
# poll task so it doesn't spin forever; the next entity
# event (or the value source being deleted) will tidy
# the rest of the bookkeeping.
logger.warning(
"HTTPValueStream stopping: endpoint %s no longer exists",
self._endpoint_id,
)
self._last_error = "endpoint_deleted"
self._raw_value = None
self._task = None
return
except Exception as exc:
self._last_error = f"Endpoint lookup failed: {type(exc).__name__}"
self._raw_value = None
await asyncio.sleep(self._interval_s)
continue
headers = endpoint.build_request_headers()
try:
status, body_bytes, error = await safe_request_bounded(
endpoint.method,
endpoint.url,
headers=headers,
timeout=endpoint.timeout_s,
)
except Exception as exc:
# safe_request_bounded raises HTTPException on URL
# validation failure; treat that as a recoverable poll
# error and try again next cycle.
self._last_status_code = None
self._last_error = f"URL validation failed: {type(exc).__name__}"
self._raw_value = None
await asyncio.sleep(self._interval_s)
continue
self._last_status_code = status if status else None
self._last_error = error
if not error and status:
try:
body_text = body_bytes.decode("utf-8", errors="replace")
except Exception:
body_text = ""
body_json: Any
try:
body_json = json.loads(body_text) if body_text else None
except (ValueError, TypeError):
body_json = None
self._raw_value = _extract_simple_path(body_json, self._json_path, body_text)
else:
self._raw_value = None
self._last_fetched_at = datetime.now()
await asyncio.sleep(self._interval_s)
except asyncio.CancelledError:
raise
def _extract_simple_path(body_json: Any, path: str, body_text: str) -> Any:
"""Extract a value via a dot-path (with optional ``[N]`` indices).
Uses module-level compiled regexes so repeated polls don't recompile.
Returns ``None`` when the path doesn't resolve — runtime callers just
need "the value, or nothing." Empty path returns the raw body text so
plain-text endpoints work too.
"""
if not path:
return body_text or None
if body_json is None:
return None
current: Any = body_json
for raw_segment in path.split("."):
segment = raw_segment.strip()
if not segment:
continue
name_match = _NAME_HEAD_RE.match(segment)
name_part = name_match.group(1) if name_match else ""
remainder = segment[len(name_part) :]
if name_part:
if not isinstance(current, dict) or name_part not in current:
return None
current = current[name_part]
while remainder:
idx_match = _INDEX_RE.match(remainder)
if not idx_match:
return None
idx = int(idx_match.group(1))
if not isinstance(current, list) or idx < 0 or idx >= len(current):
return None
current = current[idx]
remainder = remainder[idx_match.end() :]
return current
# ---------------------------------------------------------------------------
# Gradient Map
# ---------------------------------------------------------------------------
@@ -1547,6 +1760,7 @@ class ValueStreamManager:
event_bus: Optional["GameEventBus"] = None,
audio_processing_template_store=None,
sync_clock_manager: Optional["SyncClockManager"] = None,
http_endpoint_store: Optional["HTTPEndpointStore"] = None,
):
self._value_source_store = value_source_store
self._audio_capture_manager = audio_capture_manager
@@ -1559,6 +1773,7 @@ class ValueStreamManager:
self._event_bus = event_bus
self._audio_processing_template_store = audio_processing_template_store
self._sync_clock_manager = sync_clock_manager
self._http_endpoint_store = http_endpoint_store
self._streams: Dict[str, ValueStream] = {} # vs_id → stream
self._ref_counts: Dict[str, int] = {} # vs_id → ref count
# Tracks which clock_id (if any) was acquired for each stream so we
@@ -1602,6 +1817,17 @@ class ValueStreamManager:
else:
logger.info(f"Released ref for value stream {vs_id} (refs={refs})")
def peek(self, vs_id: str) -> Optional[ValueStream]:
"""Read-only accessor: return the running ValueStream for ``vs_id``
if one exists, else ``None``.
Does NOT change ref counts. Use for consumer-driven reads where the
caller already holds a reference via :meth:`acquire` (e.g. the
:class:`AutomationEngine` evaluating an ``HTTPPollRule`` against a
value source it has already acquired in ``_sync_value_stream_refs``).
"""
return self._streams.get(vs_id)
def update_source(self, vs_id: str) -> None:
"""Hot-update the shared stream for the given ValueSource."""
try:
@@ -1699,158 +1925,52 @@ class ValueStreamManager:
logger.info("Released all value streams")
def _create_stream(self, source: "ValueSource", vs_id: Optional[str] = None) -> ValueStream:
"""Factory: create the appropriate ValueStream for a ValueSource."""
from ledgrab.storage.value_source import (
AdaptiveValueSource,
AnimatedValueSource,
AudioValueSource,
CSSExtractValueSource,
DaylightValueSource,
GameEventValueSource,
GradientMapValueSource,
HAEntityValueSource,
StaticValueSource,
StaticColorValueSource,
AnimatedColorValueSource,
AdaptiveTimeColorValueSource,
SystemMetricsValueSource,
"""Build a ValueStream for *source* via the central kind registry.
The 14-branch ``isinstance`` ladder this method used to host was the
canonical example of the parallel-change smell flagged in the
architecture audit. The actual per-kind construction now lives in
``ledgrab.core.processing.value_kinds.STREAM_BUILDERS``, keyed by
``source.source_type``. This method only handles the manager-side
bookkeeping that does not fit a uniform builder signature — namely
the optional :class:`SyncClockRuntime` acquisition for
``animated_color`` sources, whose ``vs_id → clock_id`` mapping the
manager owns for symmetric release at teardown.
"""
from ledgrab.core.processing.value_kinds import (
NEEDS_CLOCK_RUNTIME,
ValueStreamDeps,
build_stream,
)
if isinstance(source, StaticValueSource):
return StaticValueStream(value=source.value)
if isinstance(source, AnimatedValueSource):
return AnimatedValueStream(
waveform=source.waveform,
speed=source.speed,
min_value=source.min_value,
max_value=source.max_value,
clock_runtime = None
if source.source_type in NEEDS_CLOCK_RUNTIME:
clock_id = getattr(source, "clock_id", None)
if clock_id and self._sync_clock_manager:
try:
clock_runtime = self._sync_clock_manager.acquire(clock_id)
if vs_id is not None:
self._stream_clock_ids[vs_id] = clock_id
except Exception as e:
logger.warning(
"Could not acquire sync clock %s for value source %s: %s",
clock_id,
source.id,
e,
)
if isinstance(source, AudioValueSource):
return AudioValueStream(
audio_source_id=source.audio_source_id,
mode=source.mode,
sensitivity=source.sensitivity,
smoothing=source.smoothing,
min_value=source.min_value,
max_value=source.max_value,
auto_gain=source.auto_gain,
deps = ValueStreamDeps(
value_stream_manager=self,
audio_capture_manager=self._audio_capture_manager,
audio_source_store=self._audio_source_store,
audio_template_store=self._audio_template_store,
audio_processing_template_store=self._audio_processing_template_store,
)
if isinstance(source, DaylightValueSource):
return DaylightValueStream(
speed=source.speed,
use_real_time=source.use_real_time,
latitude=source.latitude,
longitude=source.longitude,
min_value=source.min_value,
max_value=source.max_value,
)
if isinstance(source, AdaptiveValueSource):
if source.source_type == "adaptive_scene":
return SceneValueStream(
picture_source_id=source.picture_source_id,
scene_behavior=source.scene_behavior,
sensitivity=source.sensitivity,
smoothing=source.smoothing,
min_value=source.min_value,
max_value=source.max_value,
live_stream_manager=self._live_stream_manager,
)
return TimeOfDayValueStream(
schedule=source.schedule,
min_value=source.min_value,
max_value=source.max_value,
)
# Color streams
if isinstance(source, StaticColorValueSource):
return StaticColorValueStream(color=source.color)
if isinstance(source, AnimatedColorValueSource):
clock_runtime = None
if source.clock_id and self._sync_clock_manager:
try:
clock_runtime = self._sync_clock_manager.acquire(source.clock_id)
if vs_id is not None:
self._stream_clock_ids[vs_id] = source.clock_id
except Exception as e:
logger.warning(
"Could not acquire sync clock %s for value source %s: %s",
source.clock_id,
source.id,
e,
)
return AnimatedColorValueStream(
colors=source.colors,
speed=source.speed,
easing=source.easing,
clock=clock_runtime,
)
if isinstance(source, AdaptiveTimeColorValueSource):
return AdaptiveTimeColorValueStream(schedule=source.schedule)
if isinstance(source, HAEntityValueSource):
return HAEntityValueStream(
ha_source_id=source.ha_source_id,
entity_id=source.entity_id,
attribute=source.attribute,
min_ha_value=source.min_ha_value,
max_ha_value=source.max_ha_value,
smoothing=source.smoothing,
ha_manager=self._ha_manager,
)
if isinstance(source, GradientMapValueSource):
return GradientMapValueStream(
value_source_id=source.value_source_id,
gradient_id=source.gradient_id,
easing=source.easing,
value_stream_manager=self,
gradient_store=self._gradient_store,
)
if isinstance(source, CSSExtractValueSource):
return CSSExtractValueStream(
color_strip_source_id=source.color_strip_source_id,
led_start=source.led_start,
led_end=source.led_end,
css_stream_manager=self._css_stream_manager,
)
if isinstance(source, SystemMetricsValueSource):
return SystemMetricsValueStream(
metric=source.metric,
min_value=source.min_value,
max_value=source.max_value,
max_rate=source.max_rate,
disk_path=source.disk_path,
sensor_label=source.sensor_label,
poll_interval=source.poll_interval,
smoothing=source.smoothing,
)
if isinstance(source, GameEventValueSource):
from ledgrab.core.value_sources.game_event_value_source import (
GameEventValueStream,
)
return GameEventValueStream(
event_type=source.event_type,
min_game_value=source.min_game_value,
max_game_value=source.max_game_value,
smoothing=source.smoothing,
default_value=source.default_value,
timeout=source.timeout,
event_bus=self._event_bus,
http_endpoint_store=self._http_endpoint_store,
clock_runtime=clock_runtime,
)
# Fallback
return StaticValueStream(value=1.0)
return build_stream(source, deps)
@@ -32,6 +32,15 @@ class BaseSqliteStore(Generic[T]):
self._items: Dict[str, T] = {}
self._deserializer = deserializer
self._lock = threading.RLock()
# Apply pending JSON-blob data migrations before loading rows so the
# in-memory cache reflects the canonical, post-migration shape. The
# runner is idempotent across stores — it consults a dedicated
# ``data_migrations`` audit table — so each store construction may
# invoke it without re-running already-applied migrations.
# Imported lazily to avoid a circular dependency at module load.
from ledgrab.storage.data_migrations import ALL_MIGRATIONS, MigrationRunner
MigrationRunner(db).run(ALL_MIGRATIONS)
self._load()
# -- I/O -----------------------------------------------------------------
@@ -23,7 +23,13 @@ MAX_COMPOSITE_DEPTH = 4
class ColorStripStore(BaseSqliteStore[ColorStripSource]):
"""Persistent storage for color strip sources."""
"""Persistent storage for color strip sources.
JSON-blob field renames (e.g. legacy ``source_type='static'`` →
``'single_color'``) are handled by the central
:mod:`ledgrab.storage.data_migrations` runner, which executes once per
database from :meth:`Database._ensure_schema`.
"""
_table_name = "color_strip_sources"
_entity_name = "Color strip source"
@@ -0,0 +1,219 @@
"""Versioned data migrations for stored JSON entities.
Each migration is a small class with a stable ``name`` (used as the idempotency
key) and an ``apply(db)`` method that performs the change inside a transaction.
The ``MigrationRunner`` keeps a ``data_migrations`` table that records which
migrations have already executed; subsequent runs are a no-op.
This replaces ad-hoc per-store migrations that were rewriting raw JSON via
``str.replace`` — that approach corrupted nested fields whose values happened
to share a substring with the renamed key, and had no transaction or audit.
Adding a new migration
----------------------
1. Subclass :class:`DataMigration`, give it a unique ``name`` (prefix with the
next sequence number, e.g. ``"002_..."``) and implement ``apply``.
2. Append it to :data:`ALL_MIGRATIONS` in commit order. Never reorder existing
entries — the runner records each by name.
3. The first ``ColorStripStore`` (or any other store) construction triggers the
runner; nothing else has to change.
"""
from __future__ import annotations
import json
import sqlite3
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timezone
from ledgrab.storage.database import Database, is_writes_frozen
from ledgrab.utils import get_logger
logger = get_logger(__name__)
class DataMigration(ABC):
"""One JSON-blob migration step.
Subclasses must declare a class-level ``name`` and implement ``apply``.
Implementations must operate on the supplied ``conn`` (an already-open
transaction). They MUST NOT commit or open nested transactions — the
:class:`MigrationRunner` owns the transaction so the data UPDATEs and the
audit-table INSERT are atomic.
"""
name: str = ""
@abstractmethod
def apply(self, conn: sqlite3.Connection) -> int:
"""Perform the migration inside the supplied transaction.
Return the number of rows changed.
"""
@dataclass(frozen=True)
class MigrationRecord:
"""A row in the ``data_migrations`` audit table."""
name: str
applied_at: str
rows_changed: int
class MigrationRunner:
"""Apply pending data migrations exactly once per database.
Each call to :meth:`run` opens a single transaction that covers (a) the
"is this migration already applied?" check, (b) the migration body, and
(c) the audit INSERT. That guarantees:
* a partial-failure cannot leave data rewritten but unrecorded;
* concurrent stores constructing on multiple threads cannot race each
other into a UNIQUE-constraint crash on the audit table.
The runner skips silently when writes are frozen so a post-restore boot
does not mutate the freshly-restored database before the imminent restart.
"""
_TABLE = "data_migrations"
def __init__(self, db: Database):
self._db = db
self._ensure_table()
def _ensure_table(self) -> None:
# CREATE TABLE is idempotent thanks to IF NOT EXISTS; running on every
# startup is cheap. ``Database.execute`` does NOT honour the
# ``is_writes_frozen`` guard (DDL is always permitted), so the audit
# table reliably exists even on a frozen boot.
self._db.execute(
f"""
CREATE TABLE IF NOT EXISTS {self._TABLE} (
name TEXT PRIMARY KEY,
applied_at TEXT NOT NULL,
rows_changed INTEGER NOT NULL DEFAULT 0
)
"""
)
def applied_names(self) -> set[str]:
"""Return the set of migration names already recorded as applied."""
cursor = self._db.execute(f"SELECT name FROM {self._TABLE}")
return {row["name"] for row in cursor.fetchall()}
def run(self, migrations: list[DataMigration]) -> list[MigrationRecord]:
"""Apply each migration in *migrations* that hasn't been recorded yet.
Returns the records for migrations that actually executed this call.
Returns an empty list when writes are frozen.
"""
if is_writes_frozen():
# Frozen-write databases are read-only between a restore and the
# imminent restart. Surface this clearly because the in-memory
# state may briefly reflect pre-migration data.
logger.warning("Data migrations skipped: writes are frozen (restart expected).")
return []
# Validate names up-front so an unnamed migration aborts before any
# transaction work.
for migration in migrations:
if not migration.name:
raise ValueError(
f"DataMigration {type(migration).__name__} must declare a non-empty name"
)
executed: list[MigrationRecord] = []
for migration in migrations:
# Use a separate transaction per migration so a failure rolls back
# only the failing one and any earlier (already-recorded)
# migrations keep their audit rows.
with self._db.transaction() as conn:
already_applied = conn.execute(
f"SELECT 1 FROM {self._TABLE} WHERE name = ?",
(migration.name,),
).fetchone()
if already_applied:
continue
logger.info("Applying data migration: %s", migration.name)
rows_changed = int(migration.apply(conn))
applied_at = datetime.now(timezone.utc).isoformat()
# INSERT OR IGNORE defends against a hypothetical concurrent
# writer that recorded the same name between the SELECT above
# and this INSERT (the RLock serialises this in practice, but
# the constraint is the durable contract).
conn.execute(
f"INSERT OR IGNORE INTO {self._TABLE} "
f"(name, applied_at, rows_changed) VALUES (?, ?, ?)",
(migration.name, applied_at, rows_changed),
)
executed.append(
MigrationRecord(
name=migration.name,
applied_at=applied_at,
rows_changed=rows_changed,
)
)
if rows_changed:
logger.warning("Migration %s rewrote %d row(s)", migration.name, rows_changed)
if executed:
logger.info(
"Applied %d data migration(s): %s",
len(executed),
", ".join(r.name for r in executed),
)
return executed
# ---------------------------------------------------------------------------
# Concrete migrations
# ---------------------------------------------------------------------------
class StaticToSingleColorMigration(DataMigration):
"""Rename the legacy ``source_type='static'`` color-strip kind to ``single_color``.
The pre-rename rows in ``color_strip_sources`` used ``source_type='static'``;
the new canonical value is ``single_color``. The previous in-line string
replace risked rewriting unrelated substrings (e.g. an animation type named
``static_wave``). This migration parses the JSON, mutates the single field
we care about, and writes the canonical serialization back. Runs inside
the runner's transaction.
"""
name = "001_color_strip_static_to_single_color"
def apply(self, conn: sqlite3.Connection) -> int:
rows_changed = 0
rows = conn.execute("SELECT id, data FROM [color_strip_sources]").fetchall()
for row in rows:
blob = row["data"]
try:
parsed = json.loads(blob)
except json.JSONDecodeError as e:
# A corrupt blob is a pre-existing problem; do not crash the
# migration. The store's own load path will surface it.
logger.warning(
"Skipping corrupt color_strip_sources row %s during migration: %s",
row["id"],
e,
)
continue
if parsed.get("source_type") != "static":
continue
parsed["source_type"] = "single_color"
conn.execute(
"UPDATE [color_strip_sources] SET data = ? WHERE id = ?",
(json.dumps(parsed, ensure_ascii=False), row["id"]),
)
rows_changed += 1
return rows_changed
# Master list — ORDER MATTERS. Append new migrations; never reorder.
ALL_MIGRATIONS: list[DataMigration] = [
StaticToSingleColorMigration(),
]
@@ -0,0 +1,85 @@
"""Regression tests for the CSS response builder dispatch.
These lock in the safety net introduced when the silent "fallback to a fake
PictureCSSResponse" branch was removed:
* every concrete ColorStripSource subclass registered in ``_SOURCE_TYPE_MAP``
must have a builder in ``_RESPONSE_MAP`` (verified at module import); and
* the previously-missing ``GameEventColorStripSource`` round-trips through
the response builder into the matching ``GameEventCSSResponse`` schema.
"""
from __future__ import annotations
from datetime import datetime, timezone
import pytest
from ledgrab.api.routes.color_strip_sources import _helpers
from ledgrab.api.schemas.color_strip_sources import GameEventCSSResponse
from ledgrab.storage.color_strip_source import (
ColorStripSource,
GameEventColorStripSource,
)
def test_response_map_covers_every_registered_source_type():
"""Every source_type in _SOURCE_TYPE_MAP has a response builder."""
storage_kinds = set(_helpers._STORAGE_TYPE_MAP.keys())
builder_kinds = set(_helpers._RESPONSE_MAP.keys())
missing = storage_kinds - builder_kinds
assert not missing, f"_RESPONSE_MAP is missing builders for: {sorted(missing)}"
def test_assert_helper_raises_when_a_kind_is_unregistered(monkeypatch):
"""Adding a kind to the storage registry without a builder is caught."""
class _PhantomColorStripSource(ColorStripSource):
pass
monkeypatch.setitem(_helpers._STORAGE_TYPE_MAP, "phantom", _PhantomColorStripSource)
with pytest.raises(RuntimeError, match="phantom"):
_helpers._assert_response_map_coverage()
def test_game_event_source_serialises_to_game_event_response():
"""A GameEventColorStripSource is rendered as GameEventCSSResponse (not the
fake PictureCSSResponse that the silent fallback used to return).
"""
now = datetime.now(timezone.utc)
source = GameEventColorStripSource(
id="css_gev0001",
name="Counter-Strike feedback",
source_type="game_event",
created_at=now,
updated_at=now,
game_integration_id="gi_cs2_main",
event_mappings=[{"event": "hit", "effect": "flash"}],
led_count=60,
)
response = _helpers._css_to_response(source)
assert isinstance(response, GameEventCSSResponse)
assert response.source_type == "game_event"
assert response.game_integration_id == "gi_cs2_main"
assert response.event_mappings == [{"event": "hit", "effect": "flash"}]
assert response.led_count == 60
def test_unregistered_source_type_raises_in_css_to_response():
"""Reaching ``_css_to_response`` with an unmapped source_type raises loudly."""
class _UnregisteredCSS(ColorStripSource):
pass
now = datetime.now(timezone.utc)
source = _UnregisteredCSS(
id="css_xxx",
name="rogue",
source_type="rogue_unregistered",
created_at=now,
updated_at=now,
)
with pytest.raises(RuntimeError, match="No CSS response builder"):
_helpers._css_to_response(source)
@@ -0,0 +1,97 @@
"""Tests for the unified color-strip stream-builder registry."""
from __future__ import annotations
import pytest
from ledgrab.core.processing import color_strip_kinds
from ledgrab.core.processing.color_strip_kinds import (
SHARABLE_KINDS,
STREAM_BUILDERS,
StreamDeps,
build_stream,
)
from ledgrab.storage.color_strip_source import _SOURCE_TYPE_MAP
def test_stream_builders_partition_storage_kinds_with_sharable():
"""STREAM_BUILDERS SHARABLE_KINDS == storage source_types."""
storage_kinds = set(_SOURCE_TYPE_MAP.keys())
covered = set(STREAM_BUILDERS.keys()) | SHARABLE_KINDS
assert (
storage_kinds == covered
), f"missing={storage_kinds - covered}, extra={covered - storage_kinds}"
def test_sharable_kinds_list_matches_actual_sharable_property():
"""The hand-coded SHARABLE_KINDS list mirrors the `sharable` @property values.
Every ColorStripSource subclass currently implements ``sharable`` as a
literal ``return True``/``return False`` that does not touch ``self``,
so ``fget(None)`` is a safe introspection. If a future subclass changes
that contract — e.g. by computing sharability from instance state — this
test will raise a clear error rather than silently misclassify the kind,
forcing the maintainer to update both ``SHARABLE_KINDS`` and this test.
"""
def _is_sharable(name: str, cls) -> bool:
prop = getattr(cls, "sharable", None)
if not isinstance(prop, property) or prop.fget is None:
raise AssertionError(
f"{cls.__name__} (source_type={name!r}) does not expose `sharable` "
f"as a @property; SHARABLE_KINDS introspection no longer works."
)
try:
return bool(prop.fget(None))
except (AttributeError, TypeError) as e:
raise AssertionError(
f"{cls.__name__}.sharable now depends on `self` ({e}); update "
"this test and SHARABLE_KINDS together."
) from e
actually_sharable = {name for name, cls in _SOURCE_TYPE_MAP.items() if _is_sharable(name, cls)}
assert actually_sharable == set(SHARABLE_KINDS), (
"SHARABLE_KINDS drifted from the @property values: "
f"actually_sharable={sorted(actually_sharable)}, "
f"SHARABLE_KINDS={sorted(SHARABLE_KINDS)}"
)
def test_build_stream_raises_on_unknown_kind():
class _FakeSource:
source_type = "totally_unregistered_kind"
id = "fake"
deps = StreamDeps(css_manager=None)
with pytest.raises(ValueError, match="totally_unregistered_kind"):
build_stream(_FakeSource(), deps)
def test_coverage_assertion_raises_on_drift(monkeypatch):
"""Adding a kind to storage's _SOURCE_TYPE_MAP without a builder fails fast."""
monkeypatch.setitem(_SOURCE_TYPE_MAP, "phantom", object)
with pytest.raises(RuntimeError, match="phantom"):
color_strip_kinds._assert_stream_kind_coverage()
def test_legacy_static_alias_resolves_to_single_color_builder():
"""The legacy 'static' source_type still maps to the single_color builder."""
assert STREAM_BUILDERS["static"] is not None
assert STREAM_BUILDERS["single_color"] is not None
# The two route to the same loader, but builders are distinct closures —
# call them with a tiny source stub and confirm same class.
from ledgrab.core.processing.color_strip_stream import SingleColorStripStream
from ledgrab.storage.color_strip_source import SingleColorStripSource
from datetime import datetime, timezone
src = SingleColorStripSource(
id="t",
name="t",
source_type="static",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
deps = StreamDeps(css_manager=None)
s = STREAM_BUILDERS["static"](src, deps)
assert isinstance(s, SingleColorStripStream)
@@ -0,0 +1,64 @@
"""Tests for the unified value-stream-builder registry."""
from __future__ import annotations
import pytest
from ledgrab.core.processing import value_kinds
from ledgrab.core.processing.value_kinds import (
NEEDS_CLOCK_RUNTIME,
STREAM_BUILDERS,
ValueStreamDeps,
build_stream,
)
from ledgrab.storage.value_source import _VALUE_SOURCE_MAP
def test_builders_cover_every_storage_kind():
"""STREAM_BUILDERS keys == storage._VALUE_SOURCE_MAP keys."""
storage_kinds = set(_VALUE_SOURCE_MAP.keys())
builder_kinds = set(STREAM_BUILDERS.keys())
assert storage_kinds == builder_kinds, (
f"missing={storage_kinds - builder_kinds}, " f"extra={builder_kinds - storage_kinds}"
)
def test_build_stream_raises_on_unknown_kind():
class _Fake:
source_type = "totally_unregistered"
id = "fake"
deps = ValueStreamDeps(value_stream_manager=None)
with pytest.raises(ValueError, match="totally_unregistered"):
build_stream(_Fake(), deps)
def test_coverage_assertion_raises_on_drift(monkeypatch):
"""A kind added to storage without a builder fails the import-time check."""
monkeypatch.setitem(_VALUE_SOURCE_MAP, "phantom_kind", object)
with pytest.raises(RuntimeError, match="phantom_kind"):
value_kinds._assert_value_kind_coverage()
def test_needs_clock_runtime_is_subset_of_registered_kinds():
assert NEEDS_CLOCK_RUNTIME.issubset(set(STREAM_BUILDERS.keys()))
def test_static_builder_returns_static_value_stream():
from ledgrab.core.processing.value_stream import StaticValueStream
from ledgrab.storage.value_source import StaticValueSource
from datetime import datetime, timezone
src = StaticValueSource(
id="vs_t",
name="t",
source_type="static",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
value=0.42,
)
deps = ValueStreamDeps(value_stream_manager=None)
stream = build_stream(src, deps)
assert isinstance(stream, StaticValueStream)
# Sanity: the value flowed through
assert stream.get_value() == 0.42
@@ -0,0 +1,90 @@
"""Tests for ColorStripStore startup migrations."""
import json
import pytest
from ledgrab.storage.color_strip_source import SingleColorStripSource
from ledgrab.storage.color_strip_store import ColorStripStore
from ledgrab.storage.database import Database
@pytest.fixture
def tmp_db(tmp_path):
db = Database(tmp_path / "test.db")
yield db
db.close()
def _insert_legacy_row(db: Database, item_id: str, name: str, color: list[int]) -> dict:
"""Insert a pre-rename color strip row with ``source_type='static'``."""
from datetime import datetime, timezone
now = datetime.now(timezone.utc).isoformat()
blob = {
"id": item_id,
"name": name,
"source_type": "static",
"color": color,
"animation": None,
"tags": [],
"led_count": 30,
"created_at": now,
"updated_at": now,
"icon": "",
"icon_color": "",
}
db.upsert("color_strip_sources", item_id, name, blob)
return blob
def test_legacy_static_row_is_rewritten_to_single_color(tmp_db):
"""A row with ``source_type='static'`` is rewritten on first store load."""
_insert_legacy_row(tmp_db, "css_legacy01", "Legacy Solid", [255, 0, 0])
# Sanity: legacy blob really is on disk
raw_before = tmp_db.load_all("color_strip_sources")
assert raw_before[0]["source_type"] == "static"
store = ColorStripStore(tmp_db)
# In-memory item is a SingleColorStripSource with the canonical type
item = store.get("css_legacy01")
assert isinstance(item, SingleColorStripSource)
assert item.source_type == "single_color"
# The on-disk row was rewritten too — no second migration on next boot
raw_after = tmp_db.load_all("color_strip_sources")
assert raw_after[0]["source_type"] == "single_color"
def test_already_migrated_row_is_left_alone(tmp_db):
"""Rows already using ``single_color`` are not touched."""
from datetime import datetime, timezone
now = datetime.now(timezone.utc).isoformat()
blob = {
"id": "css_new01",
"name": "Already Migrated",
"source_type": "single_color",
"color": [0, 128, 255],
"animation": None,
"tags": [],
"led_count": 30,
"created_at": now,
"updated_at": now,
"icon": "",
"icon_color": "",
}
tmp_db.upsert("color_strip_sources", "css_new01", "Already Migrated", blob)
raw_before = tmp_db.load_all("color_strip_sources")
before_json = json.dumps(raw_before[0], sort_keys=True)
store = ColorStripStore(tmp_db)
item = store.get("css_new01")
assert isinstance(item, SingleColorStripSource)
raw_after = tmp_db.load_all("color_strip_sources")
after_json = json.dumps(raw_after[0], sort_keys=True)
assert before_json == after_json
@@ -0,0 +1,309 @@
"""Tests for the versioned data-migration runner.
These verify the safety properties that replaced the string-replace migration:
* migrations parse JSON properly (no substring corruption);
* the runner records each migration name in ``data_migrations`` so subsequent
runs are no-ops;
* a migration that touches no rows still gets recorded as applied;
* a migration that raises does not get recorded (so a fixed migration can be
re-attempted on the next boot);
* the legacy ``static`` rename only rewrites rows whose ``source_type`` field
actually equals ``static`` — values that merely contain the substring (e.g.
an animation type ``static_wave``) are left untouched.
"""
from __future__ import annotations
import json
from datetime import datetime, timezone
import pytest
from ledgrab.storage.data_migrations import (
ALL_MIGRATIONS,
DataMigration,
MigrationRunner,
StaticToSingleColorMigration,
)
from ledgrab.storage.database import Database
@pytest.fixture
def tmp_db(tmp_path):
db = Database(tmp_path / "test.db")
yield db
db.close()
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _insert_css(db: Database, item_id: str, blob: dict) -> None:
db.upsert("color_strip_sources", item_id, blob.get("name", ""), blob)
# ---------------------------------------------------------------------------
# Runner mechanics
# ---------------------------------------------------------------------------
def test_runner_creates_audit_table(tmp_db):
MigrationRunner(tmp_db)
cursor = tmp_db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='data_migrations'"
)
assert cursor.fetchone() is not None
def test_runner_records_applied_migration(tmp_db):
class _Noop(DataMigration):
name = "test_noop"
def apply(self, conn):
return 0
runner = MigrationRunner(tmp_db)
records = runner.run([_Noop()])
assert [r.name for r in records] == ["test_noop"]
assert "test_noop" in runner.applied_names()
def test_runner_skips_already_applied_migration(tmp_db):
calls = 0
class _CountedMigration(DataMigration):
name = "test_counted"
def apply(self, conn):
nonlocal calls
calls += 1
return 0
runner = MigrationRunner(tmp_db)
runner.run([_CountedMigration()])
runner.run([_CountedMigration()])
runner.run([_CountedMigration()])
assert calls == 1
def test_runner_records_zero_row_migration_so_it_does_not_repeat(tmp_db):
"""Even when a migration changes 0 rows, it is recorded as applied."""
class _NoMatch(DataMigration):
name = "test_no_match"
def apply(self, conn):
return 0
runner = MigrationRunner(tmp_db)
runner.run([_NoMatch()])
assert "test_no_match" in runner.applied_names()
def test_runner_does_not_record_failed_migration(tmp_db):
class _Boom(DataMigration):
name = "test_boom"
def apply(self, conn):
raise RuntimeError("kaboom")
runner = MigrationRunner(tmp_db)
with pytest.raises(RuntimeError, match="kaboom"):
runner.run([_Boom()])
assert "test_boom" not in runner.applied_names()
def test_runner_rejects_unnamed_migration(tmp_db):
class _Unnamed(DataMigration):
def apply(self, conn):
return 0
runner = MigrationRunner(tmp_db)
with pytest.raises(ValueError, match="non-empty name"):
runner.run([_Unnamed()])
def test_runner_rolls_back_data_change_when_migration_raises(tmp_db):
"""A migration that raises mid-way leaves no partial UPDATEs behind."""
blob = {
"id": "css_partial",
"name": "Partial",
"source_type": "static",
"color": [10, 20, 30],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
_insert_css(tmp_db, "css_partial", blob)
class _PartialBoom(DataMigration):
name = "test_partial_boom"
def apply(self, conn):
conn.execute(
"UPDATE color_strip_sources SET data = ? WHERE id = ?",
(json.dumps({**blob, "source_type": "single_color"}), "css_partial"),
)
raise RuntimeError("explode after update")
runner = MigrationRunner(tmp_db)
with pytest.raises(RuntimeError, match="explode after update"):
runner.run([_PartialBoom()])
rows = tmp_db.load_all("color_strip_sources")
assert rows[0]["source_type"] == "static", "UPDATE was not rolled back"
assert "test_partial_boom" not in runner.applied_names()
def test_runner_skips_when_writes_are_frozen(tmp_db, monkeypatch):
"""Frozen-write databases must not be mutated by the runner."""
monkeypatch.setattr("ledgrab.storage.data_migrations.is_writes_frozen", lambda: True)
calls = 0
class _Counted(DataMigration):
name = "test_frozen"
def apply(self, conn):
nonlocal calls
calls += 1
return 0
runner = MigrationRunner(tmp_db)
records = runner.run([_Counted()])
assert records == []
assert calls == 0
assert "test_frozen" not in runner.applied_names()
# ---------------------------------------------------------------------------
# Legacy static-rename specifics
# ---------------------------------------------------------------------------
def test_static_rename_only_touches_source_type_field(tmp_db):
"""A row whose ``animation.type`` happens to contain ``static`` survives intact."""
blob = {
"id": "css_safe01",
"name": "Has static_wave anim",
"source_type": "gradient", # not "static" — should be untouched
"animation": {"type": "static_wave", "speed": 1.0}, # substring trap
"stops": [{"position": 0.0, "color": [10, 20, 30]}],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
_insert_css(tmp_db, "css_safe01", blob)
# Run the migration via the runner (clears the existing audit row first
# because the Database fixture already triggered it through any incidental
# store creation — here we drive it directly).
runner = MigrationRunner(tmp_db)
runner.run([StaticToSingleColorMigration()])
rows = tmp_db.load_all("color_strip_sources")
assert len(rows) == 1
assert rows[0]["source_type"] == "gradient"
assert rows[0]["animation"]["type"] == "static_wave"
def test_static_rename_rewrites_only_legacy_rows(tmp_db):
legacy = {
"id": "css_legacy01",
"name": "Legacy",
"source_type": "static",
"color": [255, 0, 0],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
current = {
"id": "css_modern01",
"name": "Modern",
"source_type": "single_color",
"color": [0, 128, 255],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
_insert_css(tmp_db, "css_legacy01", legacy)
_insert_css(tmp_db, "css_modern01", current)
runner = MigrationRunner(tmp_db)
runner.run([StaticToSingleColorMigration()])
rows = {r["id"]: r for r in tmp_db.load_all("color_strip_sources")}
assert rows["css_legacy01"]["source_type"] == "single_color"
assert rows["css_modern01"]["source_type"] == "single_color"
def test_static_rename_survives_corrupt_blob(tmp_db):
"""A row whose JSON blob is corrupt does not crash the migration."""
# Insert a row with deliberately broken JSON via direct SQL (bypass upsert
# which would re-serialize). We use a hand-crafted blob.
tmp_db.execute(
"INSERT INTO color_strip_sources (id, name, data) VALUES (?, ?, ?)",
("css_bad01", "Bad", "{not valid json"),
)
legacy = {
"id": "css_legacy02",
"name": "Legacy after corrupt",
"source_type": "static",
"color": [1, 2, 3],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
_insert_css(tmp_db, "css_legacy02", legacy)
runner = MigrationRunner(tmp_db)
runner.run([StaticToSingleColorMigration()])
# Legacy row still migrated, corrupt row untouched, no exception
cursor = tmp_db.execute("SELECT id, data FROM color_strip_sources")
out = {row["id"]: row["data"] for row in cursor.fetchall()}
assert out["css_bad01"] == "{not valid json"
assert json.loads(out["css_legacy02"])["source_type"] == "single_color"
def test_all_migrations_have_unique_names():
names = [m.name for m in ALL_MIGRATIONS]
assert len(names) == len(set(names)), f"duplicate names: {names}"
def test_color_strip_store_construction_records_static_migration(tmp_db):
"""Integration: constructing the store applies AND records the migration."""
from ledgrab.storage.color_strip_source import SingleColorStripSource
from ledgrab.storage.color_strip_store import ColorStripStore
legacy = {
"id": "css_int01",
"name": "Integration legacy",
"source_type": "static",
"color": [1, 2, 3],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
_insert_css(tmp_db, "css_int01", legacy)
store = ColorStripStore(tmp_db)
# In-memory shape is correct
assert isinstance(store.get("css_int01"), SingleColorStripSource)
# Audit row exists
runner = MigrationRunner(tmp_db)
assert "001_color_strip_static_to_single_color" in runner.applied_names()