refactor(storage,processing): kind registries + versioned data migrations
Two CRITICAL data-safety bugs from the architecture audit and the two
worst parallel-change problems are fixed in one coherent pass.
Audit findings addressed:
- C2 silent CSS response fallback. The previous _RESPONSE_MAP fell
through to a fabricated PictureCSSResponse whenever a source
class lacked an entry; in particular game_event sources were
silently mis-shaped. Now: GameEventCSSResponse/Create/Update
schemas exist, _RESPONSE_MAP is re-keyed by source_type string,
an import-time _assert_response_map_coverage() requires symmetric
agreement with storage._SOURCE_TYPE_MAP, and the runtime path
raises instead of fabricating a response.
- C11 string-replace JSON migration. ColorStripStore used
blob.replace('"source_type": "static"', '"source_type":
"single_color"') which can corrupt unrelated substrings (e.g.
an animation type named "static_wave") and provides no audit,
no transaction, no idempotency. Replaced with
storage.data_migrations.MigrationRunner backed by a
data_migrations audit table. Each migration runs inside one
db.transaction() that covers the applied-check, the apply(),
and the audit-INSERT — partial failures roll back atomically.
StaticToSingleColorMigration parses each row with json.loads
and mutates only the source_type field. Frozen-write databases
skip with a warning.
- C3+C4 color-strip stream dispatch. The 7-branch elif in
ColorStripStreamManager.acquire() and the duplicate one in
ws_stream._create_stream() now share a single STREAM_BUILDERS
registry in core.processing.color_strip_kinds, keyed by
source.source_type. Both call sites populate a StreamDeps bag
and delegate to build_stream(). _assert_stream_kind_coverage()
asserts at import that STREAM_BUILDERS plus SHARABLE_KINDS
partitions storage._SOURCE_TYPE_MAP. ws_stream's preview path
wraps each FastAPI-DI getter in _safe() so non-audio previews
no longer crash when audio/CSPT stores are not wired.
- C6+C7 value stream dispatch. The 14-branch isinstance ladder in
ValueStreamManager._create_stream and its silent
StaticValueStream(value=1.0) fallback are replaced by
core.processing.value_kinds.STREAM_BUILDERS, keyed by
source_type string (so AdaptiveValueSource's adaptive_time and
adaptive_scene route to different builders correctly). The
manager retains only the SyncClockRuntime pre-acquisition step
for animated_color (kinds needing this are listed explicitly
in NEEDS_CLOCK_RUNTIME). Symmetric coverage assertion plus a
separate assertion that NEEDS_CLOCK_RUNTIME is a subset of the
registry.
Bundled in: the static->single_color rename plus the HTTPValueStream
/ http_endpoint introduction that were already in flight on this
branch share these files; the registry refactor naturally absorbs
both via the new "single_color" / "static" alias entries and the
_build_http builder.
Tests: 26 new tests cover response-map coverage drift, migration
runner audit-table mechanics + transactional rollback +
frozen-write skip, and the two stream-builder registries. 343
existing storage / API / e2e tests stay green. Ruff clean.
This commit is contained in:
@@ -9,6 +9,7 @@ from ledgrab.api.schemas.color_strip_sources import (
|
|||||||
CompositeCSSResponse,
|
CompositeCSSResponse,
|
||||||
DaylightCSSResponse,
|
DaylightCSSResponse,
|
||||||
EffectCSSResponse,
|
EffectCSSResponse,
|
||||||
|
GameEventCSSResponse,
|
||||||
GradientCSSResponse,
|
GradientCSSResponse,
|
||||||
KeyColorsCSSResponse,
|
KeyColorsCSSResponse,
|
||||||
MappedCSSResponse,
|
MappedCSSResponse,
|
||||||
@@ -17,7 +18,7 @@ from ledgrab.api.schemas.color_strip_sources import (
|
|||||||
PictureAdvancedCSSResponse,
|
PictureAdvancedCSSResponse,
|
||||||
PictureCSSResponse,
|
PictureCSSResponse,
|
||||||
ProcessedCSSResponse,
|
ProcessedCSSResponse,
|
||||||
StaticCSSResponse,
|
SingleColorCSSResponse,
|
||||||
WeatherCSSResponse,
|
WeatherCSSResponse,
|
||||||
)
|
)
|
||||||
from ledgrab.api.schemas.devices import Calibration as CalibrationSchema
|
from ledgrab.api.schemas.devices import Calibration as CalibrationSchema
|
||||||
@@ -26,22 +27,7 @@ from ledgrab.core.capture.calibration import (
|
|||||||
calibration_to_dict,
|
calibration_to_dict,
|
||||||
)
|
)
|
||||||
from ledgrab.storage.color_strip_source import (
|
from ledgrab.storage.color_strip_source import (
|
||||||
AdvancedPictureColorStripSource,
|
_SOURCE_TYPE_MAP as _STORAGE_TYPE_MAP,
|
||||||
ApiInputColorStripSource,
|
|
||||||
AudioColorStripSource,
|
|
||||||
CandlelightColorStripSource,
|
|
||||||
CompositeColorStripSource,
|
|
||||||
DaylightColorStripSource,
|
|
||||||
EffectColorStripSource,
|
|
||||||
GradientColorStripSource,
|
|
||||||
KeyColorsColorStripSource,
|
|
||||||
MappedColorStripSource,
|
|
||||||
MathWaveColorStripSource,
|
|
||||||
NotificationColorStripSource,
|
|
||||||
PictureColorStripSource,
|
|
||||||
ProcessedColorStripSource,
|
|
||||||
StaticColorStripSource,
|
|
||||||
WeatherColorStripSource,
|
|
||||||
)
|
)
|
||||||
from ledgrab.storage.picture_source import (
|
from ledgrab.storage.picture_source import (
|
||||||
ProcessedPictureSource,
|
ProcessedPictureSource,
|
||||||
@@ -94,34 +80,46 @@ def _stops_schema(source) -> list[ColorStopSchema] | None:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
# Maps storage class → response builder lambda.
|
# Maps ``source_type`` string → response builder.
|
||||||
|
#
|
||||||
|
# Keying by source_type (rather than type(source)) lets the import-time
|
||||||
|
# coverage check use the storage registry's keys directly, with no
|
||||||
|
# inversion or duplicate-class handling for legacy aliases.
|
||||||
_RESPONSE_MAP: dict = {
|
_RESPONSE_MAP: dict = {
|
||||||
PictureColorStripSource: lambda s, kw: PictureCSSResponse(
|
"picture": lambda s, kw: PictureCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
picture_source_id=s.picture_source_id,
|
picture_source_id=s.picture_source_id,
|
||||||
smoothing=s.smoothing.to_dict(),
|
smoothing=s.smoothing.to_dict(),
|
||||||
interpolation_mode=s.interpolation_mode,
|
interpolation_mode=s.interpolation_mode,
|
||||||
calibration=_calibration_schema(s),
|
calibration=_calibration_schema(s),
|
||||||
),
|
),
|
||||||
AdvancedPictureColorStripSource: lambda s, kw: PictureAdvancedCSSResponse(
|
"picture_advanced": lambda s, kw: PictureAdvancedCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
smoothing=s.smoothing.to_dict(),
|
smoothing=s.smoothing.to_dict(),
|
||||||
interpolation_mode=s.interpolation_mode,
|
interpolation_mode=s.interpolation_mode,
|
||||||
calibration=_calibration_schema(s),
|
calibration=_calibration_schema(s),
|
||||||
),
|
),
|
||||||
StaticColorStripSource: lambda s, kw: StaticCSSResponse(
|
"single_color": lambda s, kw: SingleColorCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
color=s.color.to_dict(),
|
color=s.color.to_dict(),
|
||||||
animation=s.animation,
|
animation=s.animation,
|
||||||
),
|
),
|
||||||
GradientColorStripSource: lambda s, kw: GradientCSSResponse(
|
# Legacy alias: pre-rename rows used "static"; the data migration rewrites
|
||||||
|
# them on first store load but a stale in-flight instance would still
|
||||||
|
# carry source_type='static' until the next reload.
|
||||||
|
"static": lambda s, kw: SingleColorCSSResponse(
|
||||||
|
**kw,
|
||||||
|
color=s.color.to_dict(),
|
||||||
|
animation=s.animation,
|
||||||
|
),
|
||||||
|
"gradient": lambda s, kw: GradientCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
stops=_stops_schema(s),
|
stops=_stops_schema(s),
|
||||||
animation=s.animation,
|
animation=s.animation,
|
||||||
easing=s.easing,
|
easing=s.easing,
|
||||||
gradient_id=s.gradient_id,
|
gradient_id=s.gradient_id,
|
||||||
),
|
),
|
||||||
EffectColorStripSource: lambda s, kw: EffectCSSResponse(
|
"effect": lambda s, kw: EffectCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
effect_type=s.effect_type,
|
effect_type=s.effect_type,
|
||||||
palette=s.palette,
|
palette=s.palette,
|
||||||
@@ -132,15 +130,15 @@ _RESPONSE_MAP: dict = {
|
|||||||
mirror=s.mirror,
|
mirror=s.mirror,
|
||||||
custom_palette=s.custom_palette,
|
custom_palette=s.custom_palette,
|
||||||
),
|
),
|
||||||
CompositeColorStripSource: lambda s, kw: CompositeCSSResponse(
|
"composite": lambda s, kw: CompositeCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
layers=[dict(layer) for layer in s.layers],
|
layers=[dict(layer) for layer in s.layers],
|
||||||
),
|
),
|
||||||
MappedColorStripSource: lambda s, kw: MappedCSSResponse(
|
"mapped": lambda s, kw: MappedCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
zones=[dict(z) for z in s.zones],
|
zones=[dict(z) for z in s.zones],
|
||||||
),
|
),
|
||||||
AudioColorStripSource: lambda s, kw: AudioCSSResponse(
|
"audio": lambda s, kw: AudioCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
visualization_mode=s.visualization_mode,
|
visualization_mode=s.visualization_mode,
|
||||||
audio_source_id=s.audio_source_id,
|
audio_source_id=s.audio_source_id,
|
||||||
@@ -153,13 +151,13 @@ _RESPONSE_MAP: dict = {
|
|||||||
mirror=s.mirror,
|
mirror=s.mirror,
|
||||||
beat_decay=s.beat_decay.to_dict(),
|
beat_decay=s.beat_decay.to_dict(),
|
||||||
),
|
),
|
||||||
ApiInputColorStripSource: lambda s, kw: ApiInputCSSResponse(
|
"api_input": lambda s, kw: ApiInputCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
fallback_color=s.fallback_color.to_dict(),
|
fallback_color=s.fallback_color.to_dict(),
|
||||||
timeout=s.timeout.to_dict(),
|
timeout=s.timeout.to_dict(),
|
||||||
interpolation=s.interpolation,
|
interpolation=s.interpolation,
|
||||||
),
|
),
|
||||||
NotificationColorStripSource: lambda s, kw: NotificationCSSResponse(
|
"notification": lambda s, kw: NotificationCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
notification_effect=s.notification_effect,
|
notification_effect=s.notification_effect,
|
||||||
duration_ms=s.duration_ms.to_dict(),
|
duration_ms=s.duration_ms.to_dict(),
|
||||||
@@ -172,14 +170,14 @@ _RESPONSE_MAP: dict = {
|
|||||||
sound_volume=s.sound_volume.to_dict(),
|
sound_volume=s.sound_volume.to_dict(),
|
||||||
app_sounds=dict(s.app_sounds),
|
app_sounds=dict(s.app_sounds),
|
||||||
),
|
),
|
||||||
DaylightColorStripSource: lambda s, kw: DaylightCSSResponse(
|
"daylight": lambda s, kw: DaylightCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
speed=s.speed.to_dict(),
|
speed=s.speed.to_dict(),
|
||||||
use_real_time=s.use_real_time,
|
use_real_time=s.use_real_time,
|
||||||
latitude=s.latitude,
|
latitude=s.latitude,
|
||||||
longitude=s.longitude,
|
longitude=s.longitude,
|
||||||
),
|
),
|
||||||
CandlelightColorStripSource: lambda s, kw: CandlelightCSSResponse(
|
"candlelight": lambda s, kw: CandlelightCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
color=s.color.to_dict(),
|
color=s.color.to_dict(),
|
||||||
intensity=s.intensity.to_dict(),
|
intensity=s.intensity.to_dict(),
|
||||||
@@ -188,18 +186,18 @@ _RESPONSE_MAP: dict = {
|
|||||||
wind_strength=s.wind_strength.to_dict(),
|
wind_strength=s.wind_strength.to_dict(),
|
||||||
candle_type=s.candle_type,
|
candle_type=s.candle_type,
|
||||||
),
|
),
|
||||||
ProcessedColorStripSource: lambda s, kw: ProcessedCSSResponse(
|
"processed": lambda s, kw: ProcessedCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
input_source_id=s.input_source_id,
|
input_source_id=s.input_source_id,
|
||||||
processing_template_id=s.processing_template_id,
|
processing_template_id=s.processing_template_id,
|
||||||
),
|
),
|
||||||
WeatherColorStripSource: lambda s, kw: WeatherCSSResponse(
|
"weather": lambda s, kw: WeatherCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
weather_source_id=s.weather_source_id,
|
weather_source_id=s.weather_source_id,
|
||||||
speed=s.speed.to_dict(),
|
speed=s.speed.to_dict(),
|
||||||
temperature_influence=s.temperature_influence.to_dict(),
|
temperature_influence=s.temperature_influence.to_dict(),
|
||||||
),
|
),
|
||||||
KeyColorsColorStripSource: lambda s, kw: KeyColorsCSSResponse(
|
"key_colors": lambda s, kw: KeyColorsCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
picture_source_id=s.picture_source_id,
|
picture_source_id=s.picture_source_id,
|
||||||
rectangles=[r.to_dict() for r in s.rectangles],
|
rectangles=[r.to_dict() for r in s.rectangles],
|
||||||
@@ -207,28 +205,67 @@ _RESPONSE_MAP: dict = {
|
|||||||
smoothing=s.smoothing.to_dict(),
|
smoothing=s.smoothing.to_dict(),
|
||||||
brightness=s.brightness.to_dict(),
|
brightness=s.brightness.to_dict(),
|
||||||
),
|
),
|
||||||
MathWaveColorStripSource: lambda s, kw: MathWaveCSSResponse(
|
"math_wave": lambda s, kw: MathWaveCSSResponse(
|
||||||
**kw,
|
**kw,
|
||||||
waves=s.waves,
|
waves=s.waves,
|
||||||
speed=s.speed.to_dict(),
|
speed=s.speed.to_dict(),
|
||||||
gradient_id=s.gradient_id,
|
gradient_id=s.gradient_id,
|
||||||
),
|
),
|
||||||
|
"game_event": lambda s, kw: GameEventCSSResponse(
|
||||||
|
**kw,
|
||||||
|
game_integration_id=s.game_integration_id,
|
||||||
|
idle_color=s.idle_color.to_dict(),
|
||||||
|
event_mappings=[dict(m) for m in s.event_mappings],
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _assert_response_map_coverage() -> None:
|
||||||
|
"""Verify _RESPONSE_MAP has a builder for every kind in storage's registry.
|
||||||
|
|
||||||
|
Runs at module import. Surfaces missing builders eagerly instead of
|
||||||
|
letting a request fall through to a silent / wrong response shape.
|
||||||
|
|
||||||
|
Contract note
|
||||||
|
-------------
|
||||||
|
This check is **symmetric** (``_RESPONSE_MAP keys == storage_kinds``)
|
||||||
|
because every kind — sharable or not — needs a response shape. The
|
||||||
|
sister assertion in
|
||||||
|
``core/processing/color_strip_kinds.py::_assert_stream_kind_coverage``
|
||||||
|
is asymmetric because sharable kinds construct their streams via a
|
||||||
|
different path. Adding a new kind requires keeping all three registries
|
||||||
|
aligned: storage's ``_SOURCE_TYPE_MAP``, this ``_RESPONSE_MAP``, and
|
||||||
|
either ``STREAM_BUILDERS`` or ``SHARABLE_KINDS``.
|
||||||
|
"""
|
||||||
|
storage_kinds = set(_STORAGE_TYPE_MAP.keys())
|
||||||
|
builder_kinds = set(_RESPONSE_MAP.keys())
|
||||||
|
missing = storage_kinds - builder_kinds
|
||||||
|
extra = builder_kinds - storage_kinds
|
||||||
|
if missing or extra:
|
||||||
|
problems = []
|
||||||
|
if missing:
|
||||||
|
problems.append(f"missing builders for: {sorted(missing)}")
|
||||||
|
if extra:
|
||||||
|
problems.append(f"unregistered kinds in _RESPONSE_MAP: {sorted(extra)}")
|
||||||
|
raise RuntimeError(
|
||||||
|
"_RESPONSE_MAP is out of sync with storage._SOURCE_TYPE_MAP: " + "; ".join(problems)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
_assert_response_map_coverage()
|
||||||
|
|
||||||
|
|
||||||
def _css_to_response(source, overlay_active: bool = False) -> ColorStripSourceResponse:
|
def _css_to_response(source, overlay_active: bool = False) -> ColorStripSourceResponse:
|
||||||
"""Convert a ColorStripSource to the matching per-type response schema."""
|
"""Convert a ColorStripSource to the matching per-type response schema."""
|
||||||
kw = _common_response_kwargs(source, overlay_active)
|
kw = _common_response_kwargs(source, overlay_active)
|
||||||
builder = _RESPONSE_MAP.get(type(source))
|
builder = _RESPONSE_MAP.get(source.source_type)
|
||||||
if builder is None:
|
if builder is None:
|
||||||
# Fallback: use to_dict() and build a PictureCSSResponse
|
# Coverage is asserted at import time, so reaching this branch means a
|
||||||
logger.warning("No response builder for %s, falling back", type(source).__name__)
|
# source was loaded with a source_type that is not registered.
|
||||||
return PictureCSSResponse(
|
# Surface the bug instead of silently returning a wrong-shaped response.
|
||||||
**kw,
|
raise RuntimeError(
|
||||||
picture_source_id="",
|
f"No CSS response builder registered for source_type "
|
||||||
smoothing=0.3,
|
f"{source.source_type!r} (class={type(source).__name__})"
|
||||||
interpolation_mode="average",
|
|
||||||
calibration=None,
|
|
||||||
)
|
)
|
||||||
return builder(source, kw)
|
return builder(source, kw)
|
||||||
|
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ router = APIRouter()
|
|||||||
|
|
||||||
|
|
||||||
_PREVIEW_ALLOWED_TYPES = {
|
_PREVIEW_ALLOWED_TYPES = {
|
||||||
"static",
|
"single_color",
|
||||||
"gradient",
|
"gradient",
|
||||||
"effect",
|
"effect",
|
||||||
"daylight",
|
"daylight",
|
||||||
@@ -97,65 +97,65 @@ async def preview_color_strip_ws(
|
|||||||
return ColorStripSource.from_dict(config)
|
return ColorStripSource.from_dict(config)
|
||||||
|
|
||||||
def _create_stream(source):
|
def _create_stream(source):
|
||||||
"""Instantiate and start the appropriate stream class for *source*."""
|
"""Instantiate and start the appropriate stream class for *source*.
|
||||||
from ledgrab.core.processing.color_strip_stream_manager import _SIMPLE_STREAM_MAP
|
|
||||||
|
|
||||||
mgr = get_processor_manager()
|
Delegates the per-kind dispatch to ``color_strip_kinds.build_stream``
|
||||||
csm = mgr.color_strip_stream_manager
|
so this preview path and the production ``ColorStripStreamManager``
|
||||||
|
share a single registry. Per-kind dependencies (CSPT store, audio
|
||||||
|
stores, weather manager, …) are gathered into a ``StreamDeps`` bag.
|
||||||
|
|
||||||
if source.source_type == "audio":
|
FastAPI-DI providers raise ``RuntimeError`` when they aren't wired,
|
||||||
|
so we resolve each one through ``_safe`` and pass ``None`` on
|
||||||
|
failure. The per-kind builder will still see a clear error if a
|
||||||
|
truly-required dep is missing for that kind, but unrelated previews
|
||||||
|
(e.g. a ``single_color`` preview on a fresh install where the CSPT
|
||||||
|
store isn't initialized yet) keep working.
|
||||||
|
"""
|
||||||
from ledgrab.api.dependencies import (
|
from ledgrab.api.dependencies import (
|
||||||
get_audio_processing_template_store,
|
get_audio_processing_template_store,
|
||||||
get_audio_source_store,
|
get_audio_source_store,
|
||||||
get_audio_template_store,
|
get_audio_template_store,
|
||||||
|
get_cspt_store,
|
||||||
)
|
)
|
||||||
from ledgrab.core.processing.audio_stream import AudioColorStripStream
|
from ledgrab.core.processing.color_strip_kinds import StreamDeps, build_stream
|
||||||
|
|
||||||
s = AudioColorStripStream(
|
def _safe(getter):
|
||||||
source,
|
try:
|
||||||
mgr.audio_capture_manager,
|
return getter()
|
||||||
get_audio_source_store(),
|
except RuntimeError as e:
|
||||||
get_audio_template_store(),
|
logger.debug("Preview dep not available (%s): %s", getter.__name__, e)
|
||||||
get_audio_processing_template_store(),
|
return None
|
||||||
)
|
|
||||||
elif source.source_type == "weather":
|
|
||||||
from ledgrab.core.processing.weather_stream import WeatherColorStripStream
|
|
||||||
|
|
||||||
s = WeatherColorStripStream(source, mgr.weather_manager)
|
mgr = get_processor_manager()
|
||||||
elif source.source_type == "game_event":
|
csm = mgr.color_strip_stream_manager
|
||||||
from ledgrab.core.processing.game_event_stream import GameEventColorStripStream
|
|
||||||
|
|
||||||
s = GameEventColorStripStream(source)
|
# The game-event bus is optional in preview contexts.
|
||||||
try:
|
try:
|
||||||
from ledgrab.api.dependencies import get_game_event_bus
|
from ledgrab.api.dependencies import get_game_event_bus
|
||||||
|
|
||||||
bus = get_game_event_bus()
|
game_event_bus = get_game_event_bus()
|
||||||
except RuntimeError as e:
|
except RuntimeError as e:
|
||||||
logger.debug("Preview: no game event bus available: %s", e)
|
logger.debug("Preview: no game event bus available: %s", e)
|
||||||
else:
|
game_event_bus = None
|
||||||
if bus is not None:
|
|
||||||
s.set_event_bus(bus)
|
|
||||||
elif source.source_type == "mapped":
|
|
||||||
from ledgrab.core.processing.mapped_stream import MappedColorStripStream
|
|
||||||
|
|
||||||
s = MappedColorStripStream(source, csm)
|
deps = StreamDeps(
|
||||||
elif source.source_type == "composite":
|
css_manager=csm,
|
||||||
from ledgrab.api.dependencies import get_cspt_store
|
value_stream_manager=mgr.value_stream_manager,
|
||||||
from ledgrab.core.processing.composite_stream import CompositeColorStripStream
|
cspt_store=_safe(get_cspt_store),
|
||||||
|
weather_manager=mgr.weather_manager,
|
||||||
s = CompositeColorStripStream(
|
audio_capture_manager=mgr.audio_capture_manager,
|
||||||
source, csm, mgr.value_stream_manager, get_cspt_store(), depth=0
|
audio_source_store=_safe(get_audio_source_store),
|
||||||
|
audio_template_store=_safe(get_audio_template_store),
|
||||||
|
audio_processing_template_store=_safe(get_audio_processing_template_store),
|
||||||
|
game_event_bus=game_event_bus,
|
||||||
|
depth=0,
|
||||||
)
|
)
|
||||||
elif source.source_type == "processed":
|
try:
|
||||||
from ledgrab.api.dependencies import get_cspt_store
|
s = build_stream(source, deps)
|
||||||
from ledgrab.core.processing.processed_stream import ProcessedColorStripStream
|
except ValueError as e:
|
||||||
|
# Preserve the registry's original detail so the API consumer
|
||||||
s = ProcessedColorStripStream(source, csm, get_cspt_store())
|
# sees which kind was rejected, not just a generic message.
|
||||||
else:
|
raise ValueError(f"Unsupported preview source_type: {e}") from e
|
||||||
stream_cls = _SIMPLE_STREAM_MAP.get(source.source_type)
|
|
||||||
if not stream_cls:
|
|
||||||
raise ValueError(f"Unsupported preview source_type: {source.source_type}")
|
|
||||||
s = stream_cls(source)
|
|
||||||
# Inject gradient store for palette resolution
|
# Inject gradient store for palette resolution
|
||||||
if hasattr(s, "set_gradient_store"):
|
if hasattr(s, "set_gradient_store"):
|
||||||
try:
|
try:
|
||||||
@@ -428,8 +428,17 @@ async def css_api_input_ws(
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
elif "bytes" in message:
|
elif "bytes" in message:
|
||||||
# Binary frame: raw RGBRGB... bytes (3 bytes per LED)
|
# Binary frame: raw RGBRGB... bytes (3 bytes per LED).
|
||||||
|
# Cap to a generous upper bound on the LED count — a hostile
|
||||||
|
# client could otherwise stream 100 MB frames and OOM the
|
||||||
|
# server before any application logic ran.
|
||||||
raw_bytes = message["bytes"]
|
raw_bytes = message["bytes"]
|
||||||
|
_MAX_BINARY_LEDS = 8192
|
||||||
|
if len(raw_bytes) > _MAX_BINARY_LEDS * 3:
|
||||||
|
await websocket.send_json(
|
||||||
|
{"error": f"Binary frame too large (max {_MAX_BINARY_LEDS} LEDs)"}
|
||||||
|
)
|
||||||
|
continue
|
||||||
if len(raw_bytes) % 3 != 0:
|
if len(raw_bytes) % 3 != 0:
|
||||||
await websocket.send_json({"error": "Binary data must be multiple of 3 bytes"})
|
await websocket.send_json({"error": "Binary data must be multiple of 3 bytes"})
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -122,9 +122,9 @@ class PictureAdvancedCSSResponse(_CSSResponseBase):
|
|||||||
calibration: Optional[Calibration] = Field(None, description="LED calibration")
|
calibration: Optional[Calibration] = Field(None, description="LED calibration")
|
||||||
|
|
||||||
|
|
||||||
class StaticCSSResponse(_CSSResponseBase):
|
class SingleColorCSSResponse(_CSSResponseBase):
|
||||||
source_type: Literal["static"] = "static"
|
source_type: Literal["single_color"] = "single_color"
|
||||||
color: Any = Field(description="Static RGB color")
|
color: Any = Field(description="Solid RGB color")
|
||||||
animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config")
|
animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config")
|
||||||
|
|
||||||
|
|
||||||
@@ -240,11 +240,18 @@ class MathWaveCSSResponse(_CSSResponseBase):
|
|||||||
gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping")
|
gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping")
|
||||||
|
|
||||||
|
|
||||||
|
class GameEventCSSResponse(_CSSResponseBase):
|
||||||
|
source_type: Literal["game_event"] = "game_event"
|
||||||
|
game_integration_id: str = Field(description="Game integration entity ID")
|
||||||
|
idle_color: Any = Field(description="Idle RGB color (bindable)")
|
||||||
|
event_mappings: List[dict] = Field(default_factory=list, description="Event-to-effect mappings")
|
||||||
|
|
||||||
|
|
||||||
ColorStripSourceResponse = Annotated[
|
ColorStripSourceResponse = Annotated[
|
||||||
Union[
|
Union[
|
||||||
Annotated[PictureCSSResponse, Tag("picture")],
|
Annotated[PictureCSSResponse, Tag("picture")],
|
||||||
Annotated[PictureAdvancedCSSResponse, Tag("picture_advanced")],
|
Annotated[PictureAdvancedCSSResponse, Tag("picture_advanced")],
|
||||||
Annotated[StaticCSSResponse, Tag("static")],
|
Annotated[SingleColorCSSResponse, Tag("single_color")],
|
||||||
Annotated[GradientCSSResponse, Tag("gradient")],
|
Annotated[GradientCSSResponse, Tag("gradient")],
|
||||||
Annotated[EffectCSSResponse, Tag("effect")],
|
Annotated[EffectCSSResponse, Tag("effect")],
|
||||||
Annotated[CompositeCSSResponse, Tag("composite")],
|
Annotated[CompositeCSSResponse, Tag("composite")],
|
||||||
@@ -258,6 +265,7 @@ ColorStripSourceResponse = Annotated[
|
|||||||
Annotated[WeatherCSSResponse, Tag("weather")],
|
Annotated[WeatherCSSResponse, Tag("weather")],
|
||||||
Annotated[KeyColorsCSSResponse, Tag("key_colors")],
|
Annotated[KeyColorsCSSResponse, Tag("key_colors")],
|
||||||
Annotated[MathWaveCSSResponse, Tag("math_wave")],
|
Annotated[MathWaveCSSResponse, Tag("math_wave")],
|
||||||
|
Annotated[GameEventCSSResponse, Tag("game_event")],
|
||||||
],
|
],
|
||||||
Discriminator("source_type"),
|
Discriminator("source_type"),
|
||||||
]
|
]
|
||||||
@@ -303,9 +311,9 @@ class PictureAdvancedCSSCreate(_CSSCreateBase):
|
|||||||
calibration: Optional[Calibration] = Field(None, description="LED calibration")
|
calibration: Optional[Calibration] = Field(None, description="LED calibration")
|
||||||
|
|
||||||
|
|
||||||
class StaticCSSCreate(_CSSCreateBase):
|
class SingleColorCSSCreate(_CSSCreateBase):
|
||||||
source_type: Literal["static"] = "static"
|
source_type: Literal["single_color"] = "single_color"
|
||||||
color: Any = Field(default=None, description="Static RGB color [R,G,B]")
|
color: Any = Field(default=None, description="Solid RGB color [R,G,B]")
|
||||||
animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config")
|
animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config")
|
||||||
|
|
||||||
|
|
||||||
@@ -434,11 +442,18 @@ class MathWaveCSSCreate(_CSSCreateBase):
|
|||||||
gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping")
|
gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping")
|
||||||
|
|
||||||
|
|
||||||
|
class GameEventCSSCreate(_CSSCreateBase):
|
||||||
|
source_type: Literal["game_event"] = "game_event"
|
||||||
|
game_integration_id: Optional[str] = Field(None, description="Game integration entity ID")
|
||||||
|
idle_color: Any = Field(default=None, description="Idle RGB color [R,G,B] (bindable)")
|
||||||
|
event_mappings: Optional[List[dict]] = Field(None, description="Event-to-effect mappings")
|
||||||
|
|
||||||
|
|
||||||
ColorStripSourceCreate = Annotated[
|
ColorStripSourceCreate = Annotated[
|
||||||
Union[
|
Union[
|
||||||
Annotated[PictureCSSCreate, Tag("picture")],
|
Annotated[PictureCSSCreate, Tag("picture")],
|
||||||
Annotated[PictureAdvancedCSSCreate, Tag("picture_advanced")],
|
Annotated[PictureAdvancedCSSCreate, Tag("picture_advanced")],
|
||||||
Annotated[StaticCSSCreate, Tag("static")],
|
Annotated[SingleColorCSSCreate, Tag("single_color")],
|
||||||
Annotated[GradientCSSCreate, Tag("gradient")],
|
Annotated[GradientCSSCreate, Tag("gradient")],
|
||||||
Annotated[EffectCSSCreate, Tag("effect")],
|
Annotated[EffectCSSCreate, Tag("effect")],
|
||||||
Annotated[CompositeCSSCreate, Tag("composite")],
|
Annotated[CompositeCSSCreate, Tag("composite")],
|
||||||
@@ -452,6 +467,7 @@ ColorStripSourceCreate = Annotated[
|
|||||||
Annotated[WeatherCSSCreate, Tag("weather")],
|
Annotated[WeatherCSSCreate, Tag("weather")],
|
||||||
Annotated[KeyColorsCSSCreate, Tag("key_colors")],
|
Annotated[KeyColorsCSSCreate, Tag("key_colors")],
|
||||||
Annotated[MathWaveCSSCreate, Tag("math_wave")],
|
Annotated[MathWaveCSSCreate, Tag("math_wave")],
|
||||||
|
Annotated[GameEventCSSCreate, Tag("game_event")],
|
||||||
],
|
],
|
||||||
Discriminator("source_type"),
|
Discriminator("source_type"),
|
||||||
]
|
]
|
||||||
@@ -497,9 +513,9 @@ class PictureAdvancedCSSUpdate(_CSSUpdateBase):
|
|||||||
calibration: Optional[Calibration] = Field(None, description="LED calibration")
|
calibration: Optional[Calibration] = Field(None, description="LED calibration")
|
||||||
|
|
||||||
|
|
||||||
class StaticCSSUpdate(_CSSUpdateBase):
|
class SingleColorCSSUpdate(_CSSUpdateBase):
|
||||||
source_type: Literal["static"] = "static"
|
source_type: Literal["single_color"] = "single_color"
|
||||||
color: Any = Field(default=None, description="Static RGB color [R,G,B]")
|
color: Any = Field(default=None, description="Solid RGB color [R,G,B]")
|
||||||
animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config")
|
animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config")
|
||||||
|
|
||||||
|
|
||||||
@@ -626,11 +642,18 @@ class MathWaveCSSUpdate(_CSSUpdateBase):
|
|||||||
gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping")
|
gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping")
|
||||||
|
|
||||||
|
|
||||||
|
class GameEventCSSUpdate(_CSSUpdateBase):
|
||||||
|
source_type: Literal["game_event"] = "game_event"
|
||||||
|
game_integration_id: Optional[str] = Field(None, description="Game integration entity ID")
|
||||||
|
idle_color: Any = Field(default=None, description="Idle RGB color [R,G,B] (bindable)")
|
||||||
|
event_mappings: Optional[List[dict]] = Field(None, description="Event-to-effect mappings")
|
||||||
|
|
||||||
|
|
||||||
ColorStripSourceUpdate = Annotated[
|
ColorStripSourceUpdate = Annotated[
|
||||||
Union[
|
Union[
|
||||||
Annotated[PictureCSSUpdate, Tag("picture")],
|
Annotated[PictureCSSUpdate, Tag("picture")],
|
||||||
Annotated[PictureAdvancedCSSUpdate, Tag("picture_advanced")],
|
Annotated[PictureAdvancedCSSUpdate, Tag("picture_advanced")],
|
||||||
Annotated[StaticCSSUpdate, Tag("static")],
|
Annotated[SingleColorCSSUpdate, Tag("single_color")],
|
||||||
Annotated[GradientCSSUpdate, Tag("gradient")],
|
Annotated[GradientCSSUpdate, Tag("gradient")],
|
||||||
Annotated[EffectCSSUpdate, Tag("effect")],
|
Annotated[EffectCSSUpdate, Tag("effect")],
|
||||||
Annotated[CompositeCSSUpdate, Tag("composite")],
|
Annotated[CompositeCSSUpdate, Tag("composite")],
|
||||||
@@ -644,6 +667,7 @@ ColorStripSourceUpdate = Annotated[
|
|||||||
Annotated[WeatherCSSUpdate, Tag("weather")],
|
Annotated[WeatherCSSUpdate, Tag("weather")],
|
||||||
Annotated[KeyColorsCSSUpdate, Tag("key_colors")],
|
Annotated[KeyColorsCSSUpdate, Tag("key_colors")],
|
||||||
Annotated[MathWaveCSSUpdate, Tag("math_wave")],
|
Annotated[MathWaveCSSUpdate, Tag("math_wave")],
|
||||||
|
Annotated[GameEventCSSUpdate, Tag("game_event")],
|
||||||
],
|
],
|
||||||
Discriminator("source_type"),
|
Discriminator("source_type"),
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -0,0 +1,273 @@
|
|||||||
|
"""Single source of truth for non-sharable color-strip stream construction.
|
||||||
|
|
||||||
|
Both the preview WebSocket (``api/routes/color_strip_sources/ws_stream.py``)
|
||||||
|
and the production ``ColorStripStreamManager.acquire`` used to maintain
|
||||||
|
parallel ``if source.source_type == "..." elif ... else ..._SIMPLE_STREAM_MAP``
|
||||||
|
chains. Adding a new kind required keeping those two chains in lockstep, and
|
||||||
|
silently fell through to a generic stream class when an entry was missed.
|
||||||
|
|
||||||
|
This module replaces both chains with a single ``STREAM_BUILDERS`` registry
|
||||||
|
plus a small ``StreamDeps`` dependency bag. Each caller populates the bag
|
||||||
|
from its own context (DI container, processor manager, etc.) and looks the
|
||||||
|
builder up by ``source.source_type``. A coverage assertion at import time
|
||||||
|
guarantees every kind in ``storage._SOURCE_TYPE_MAP`` is either sharable or
|
||||||
|
has a builder here — silent fall-throughs are no longer possible.
|
||||||
|
|
||||||
|
Sharable kinds (``picture``, ``picture_advanced``, ``key_colors``) are NOT in
|
||||||
|
this registry: they require an injected ``LiveStream`` whose acquisition is
|
||||||
|
intertwined with the source's calibration, which does not fit a uniform
|
||||||
|
factory signature. Those continue to use bespoke paths inside
|
||||||
|
``ColorStripStreamManager``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any, Callable
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class StreamDeps:
|
||||||
|
"""Dependency bag for non-sharable stream construction.
|
||||||
|
|
||||||
|
Each call site (preview WebSocket, production stream manager) builds one
|
||||||
|
of these from its own context before invoking :func:`build_stream`.
|
||||||
|
Fields are ``Any`` (with ``None`` defaults) because individual builders
|
||||||
|
only consume a subset; tests can supply a minimal bag.
|
||||||
|
|
||||||
|
``frozen=True`` guards against a builder accidentally reassigning a
|
||||||
|
field; it does NOT make the referenced objects immutable — the
|
||||||
|
``css_manager``, stores, etc. are live mutable services.
|
||||||
|
|
||||||
|
``css_manager`` is needed by composite/mapped/processed builders so they
|
||||||
|
can recursively acquire dependent streams. Single-kind builders ignore
|
||||||
|
it. The field has no default so callers are forced to think about which
|
||||||
|
manager they are wiring through.
|
||||||
|
"""
|
||||||
|
|
||||||
|
css_manager: Any
|
||||||
|
value_stream_manager: Any = None
|
||||||
|
cspt_store: Any = None # ColorStripProcessingTemplateStore
|
||||||
|
weather_manager: Any = None
|
||||||
|
audio_capture_manager: Any = None
|
||||||
|
audio_source_store: Any = None
|
||||||
|
audio_template_store: Any = None
|
||||||
|
audio_processing_template_store: Any = None
|
||||||
|
game_event_bus: Any = None
|
||||||
|
depth: int = 0 # composite nesting depth — passed through verbatim
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Per-kind builders
|
||||||
|
#
|
||||||
|
# Each builder is a small free function ``(source, deps) -> ColorStripStream``.
|
||||||
|
# Imports are deferred to keep this module cheap to import (the production
|
||||||
|
# processing graph is large).
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _build_audio(source, d: StreamDeps):
|
||||||
|
from ledgrab.core.processing.audio_stream import AudioColorStripStream
|
||||||
|
|
||||||
|
return AudioColorStripStream(
|
||||||
|
source,
|
||||||
|
d.audio_capture_manager,
|
||||||
|
d.audio_source_store,
|
||||||
|
d.audio_template_store,
|
||||||
|
d.audio_processing_template_store,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_composite(source, d: StreamDeps):
|
||||||
|
from ledgrab.core.processing.composite_stream import CompositeColorStripStream
|
||||||
|
|
||||||
|
return CompositeColorStripStream(
|
||||||
|
source,
|
||||||
|
d.css_manager,
|
||||||
|
d.value_stream_manager,
|
||||||
|
d.cspt_store,
|
||||||
|
depth=d.depth,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_mapped(source, d: StreamDeps):
|
||||||
|
from ledgrab.core.processing.mapped_stream import MappedColorStripStream
|
||||||
|
|
||||||
|
return MappedColorStripStream(source, d.css_manager)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_processed(source, d: StreamDeps):
|
||||||
|
from ledgrab.core.processing.processed_stream import ProcessedColorStripStream
|
||||||
|
|
||||||
|
return ProcessedColorStripStream(source, d.css_manager, d.cspt_store)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_weather(source, d: StreamDeps):
|
||||||
|
from ledgrab.core.processing.weather_stream import WeatherColorStripStream
|
||||||
|
|
||||||
|
return WeatherColorStripStream(source, d.weather_manager)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_game_event(source, d: StreamDeps):
|
||||||
|
from ledgrab.core.processing.game_event_stream import GameEventColorStripStream
|
||||||
|
|
||||||
|
stream = GameEventColorStripStream(source)
|
||||||
|
if d.game_event_bus is not None:
|
||||||
|
stream.set_event_bus(d.game_event_bus)
|
||||||
|
return stream
|
||||||
|
|
||||||
|
|
||||||
|
def _make_source_only_builder(loader: Callable[[], type]) -> Callable[[Any, StreamDeps], Any]:
|
||||||
|
"""Wrap a class-loader so it produces a uniform ``(source, deps) -> stream`` builder.
|
||||||
|
|
||||||
|
The loader is called on each invocation but module caching makes the
|
||||||
|
import a single dict lookup after the first call.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _build(source, _deps: StreamDeps):
|
||||||
|
return loader()(source)
|
||||||
|
|
||||||
|
return _build
|
||||||
|
|
||||||
|
|
||||||
|
def _single_color_cls() -> type:
|
||||||
|
from ledgrab.core.processing.color_strip_stream import SingleColorStripStream
|
||||||
|
|
||||||
|
return SingleColorStripStream
|
||||||
|
|
||||||
|
|
||||||
|
def _gradient_cls() -> type:
|
||||||
|
from ledgrab.core.processing.color_strip_stream import GradientColorStripStream
|
||||||
|
|
||||||
|
return GradientColorStripStream
|
||||||
|
|
||||||
|
|
||||||
|
def _effect_cls() -> type:
|
||||||
|
from ledgrab.core.processing.effect_stream import EffectColorStripStream
|
||||||
|
|
||||||
|
return EffectColorStripStream
|
||||||
|
|
||||||
|
|
||||||
|
def _api_input_cls() -> type:
|
||||||
|
from ledgrab.core.processing.api_input_stream import ApiInputColorStripStream
|
||||||
|
|
||||||
|
return ApiInputColorStripStream
|
||||||
|
|
||||||
|
|
||||||
|
def _notification_cls() -> type:
|
||||||
|
from ledgrab.core.processing.notification_stream import NotificationColorStripStream
|
||||||
|
|
||||||
|
return NotificationColorStripStream
|
||||||
|
|
||||||
|
|
||||||
|
def _daylight_cls() -> type:
|
||||||
|
from ledgrab.core.processing.daylight_stream import DaylightColorStripStream
|
||||||
|
|
||||||
|
return DaylightColorStripStream
|
||||||
|
|
||||||
|
|
||||||
|
def _candlelight_cls() -> type:
|
||||||
|
from ledgrab.core.processing.candlelight_stream import CandlelightColorStripStream
|
||||||
|
|
||||||
|
return CandlelightColorStripStream
|
||||||
|
|
||||||
|
|
||||||
|
def _math_wave_cls() -> type:
|
||||||
|
from ledgrab.core.processing.math_wave_stream import MathWaveColorStripStream
|
||||||
|
|
||||||
|
return MathWaveColorStripStream
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Registry
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
StreamBuilder = Callable[[Any, StreamDeps], Any]
|
||||||
|
|
||||||
|
STREAM_BUILDERS: dict[str, StreamBuilder] = {
|
||||||
|
"audio": _build_audio,
|
||||||
|
"composite": _build_composite,
|
||||||
|
"mapped": _build_mapped,
|
||||||
|
"processed": _build_processed,
|
||||||
|
"weather": _build_weather,
|
||||||
|
"game_event": _build_game_event,
|
||||||
|
"single_color": _make_source_only_builder(_single_color_cls),
|
||||||
|
# Legacy alias: pre-rename rows used "static". The data migration rewrites
|
||||||
|
# the on-disk source_type on startup, but this alias keeps an in-flight
|
||||||
|
# legacy entry resolving to the right stream class.
|
||||||
|
"static": _make_source_only_builder(_single_color_cls),
|
||||||
|
"gradient": _make_source_only_builder(_gradient_cls),
|
||||||
|
"effect": _make_source_only_builder(_effect_cls),
|
||||||
|
"api_input": _make_source_only_builder(_api_input_cls),
|
||||||
|
"notification": _make_source_only_builder(_notification_cls),
|
||||||
|
"daylight": _make_source_only_builder(_daylight_cls),
|
||||||
|
"candlelight": _make_source_only_builder(_candlelight_cls),
|
||||||
|
"math_wave": _make_source_only_builder(_math_wave_cls),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# Sharable kinds are handled by dedicated LiveStream-acquisition paths in
|
||||||
|
# ColorStripStreamManager (their construction depends on calibration → picture
|
||||||
|
# source resolution, which does not fit a uniform factory signature).
|
||||||
|
SHARABLE_KINDS: frozenset[str] = frozenset({"picture", "picture_advanced", "key_colors"})
|
||||||
|
|
||||||
|
|
||||||
|
def build_stream(source, deps: StreamDeps):
|
||||||
|
"""Build a non-sharable color-strip stream for *source*.
|
||||||
|
|
||||||
|
Raises ``ValueError`` if the kind has no registered builder (which
|
||||||
|
would indicate a sharable source slipped through the caller's
|
||||||
|
``sharable`` gate, or a new kind missing from this registry).
|
||||||
|
"""
|
||||||
|
builder = STREAM_BUILDERS.get(source.source_type)
|
||||||
|
if builder is None:
|
||||||
|
raise ValueError(
|
||||||
|
f"No stream builder for non-sharable color-strip-source kind "
|
||||||
|
f"{source.source_type!r} (id={getattr(source, 'id', '?')!r})"
|
||||||
|
)
|
||||||
|
return builder(source, deps)
|
||||||
|
|
||||||
|
|
||||||
|
def _assert_stream_kind_coverage() -> None:
|
||||||
|
"""Verify the registry is a strict partition: every kind from storage is
|
||||||
|
either listed in SHARABLE_KINDS or has a STREAM_BUILDERS entry.
|
||||||
|
|
||||||
|
Runs at module import so a kind added to ``_SOURCE_TYPE_MAP`` without a
|
||||||
|
corresponding builder fails the server boot loudly instead of silently
|
||||||
|
falling through at request time.
|
||||||
|
|
||||||
|
Contract note
|
||||||
|
-------------
|
||||||
|
This check is **asymmetric** (``STREAM_BUILDERS ∪ SHARABLE_KINDS ==
|
||||||
|
storage_kinds``) because sharable kinds are constructed by a separate
|
||||||
|
path inside ``ColorStripStreamManager``. The sister assertion in
|
||||||
|
``api/routes/color_strip_sources/_helpers.py::_assert_response_map_coverage``
|
||||||
|
is **symmetric** (``_RESPONSE_MAP keys == storage_kinds``) because every
|
||||||
|
kind, sharable or not, still needs a response shape. Both assertions key
|
||||||
|
by the ``source_type`` string; adding a new kind requires updates to
|
||||||
|
storage, ``_RESPONSE_MAP``, and either ``STREAM_BUILDERS`` or
|
||||||
|
``SHARABLE_KINDS``. Both assertions catch missing entries; only this one
|
||||||
|
expects a subset relationship.
|
||||||
|
"""
|
||||||
|
from ledgrab.storage.color_strip_source import _SOURCE_TYPE_MAP
|
||||||
|
|
||||||
|
storage_kinds = set(_SOURCE_TYPE_MAP.keys())
|
||||||
|
builder_kinds = set(STREAM_BUILDERS.keys())
|
||||||
|
expected_non_sharable = storage_kinds - SHARABLE_KINDS
|
||||||
|
|
||||||
|
missing = expected_non_sharable - builder_kinds
|
||||||
|
extra = builder_kinds - storage_kinds
|
||||||
|
if missing or extra:
|
||||||
|
problems = []
|
||||||
|
if missing:
|
||||||
|
problems.append(f"missing builders: {sorted(missing)}")
|
||||||
|
if extra:
|
||||||
|
problems.append(f"unregistered kinds: {sorted(extra)}")
|
||||||
|
raise RuntimeError(
|
||||||
|
"color_strip_kinds.STREAM_BUILDERS is out of sync with storage._SOURCE_TYPE_MAP: "
|
||||||
|
+ "; ".join(problems)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
_assert_stream_kind_coverage()
|
||||||
@@ -3,7 +3,7 @@
|
|||||||
PictureColorStripStreams (expensive screen capture) are shared across multiple
|
PictureColorStripStreams (expensive screen capture) are shared across multiple
|
||||||
consumers via reference counting — processing runs once, not once per target.
|
consumers via reference counting — processing runs once, not once per target.
|
||||||
|
|
||||||
Count-dependent streams (static, gradient, effect) are NOT shared.
|
Count-dependent streams (single_color, gradient, effect) are NOT shared.
|
||||||
Each consumer gets its own instance so it can configure an independent LED count
|
Each consumer gets its own instance so it can configure an independent LED count
|
||||||
without interfering with other targets.
|
without interfering with other targets.
|
||||||
"""
|
"""
|
||||||
@@ -11,37 +11,18 @@ without interfering with other targets.
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Dict, Optional
|
from typing import Dict, Optional
|
||||||
|
|
||||||
|
from ledgrab.core.processing.color_strip_kinds import (
|
||||||
|
StreamDeps,
|
||||||
|
build_stream,
|
||||||
|
)
|
||||||
from ledgrab.core.processing.color_strip_stream import (
|
from ledgrab.core.processing.color_strip_stream import (
|
||||||
ColorStripStream,
|
ColorStripStream,
|
||||||
GradientColorStripStream,
|
|
||||||
PictureColorStripStream,
|
PictureColorStripStream,
|
||||||
StaticColorStripStream,
|
|
||||||
)
|
)
|
||||||
from ledgrab.core.processing.processed_stream import ProcessedColorStripStream
|
|
||||||
from ledgrab.core.processing.effect_stream import EffectColorStripStream
|
|
||||||
from ledgrab.core.processing.api_input_stream import ApiInputColorStripStream
|
|
||||||
from ledgrab.core.processing.notification_stream import NotificationColorStripStream
|
|
||||||
from ledgrab.core.processing.daylight_stream import DaylightColorStripStream
|
|
||||||
from ledgrab.core.processing.candlelight_stream import CandlelightColorStripStream
|
|
||||||
from ledgrab.core.processing.game_event_stream import GameEventColorStripStream
|
|
||||||
from ledgrab.core.processing.math_wave_stream import MathWaveColorStripStream
|
|
||||||
from ledgrab.utils import get_logger
|
from ledgrab.utils import get_logger
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
# source_type → stream class for non-picture (non-sharable) sources
|
|
||||||
_SIMPLE_STREAM_MAP = {
|
|
||||||
"static": StaticColorStripStream,
|
|
||||||
"gradient": GradientColorStripStream,
|
|
||||||
"effect": EffectColorStripStream,
|
|
||||||
"api_input": ApiInputColorStripStream,
|
|
||||||
"notification": NotificationColorStripStream,
|
|
||||||
"daylight": DaylightColorStripStream,
|
|
||||||
"candlelight": CandlelightColorStripStream,
|
|
||||||
"game_event": GameEventColorStripStream,
|
|
||||||
"math_wave": MathWaveColorStripStream,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class _ColorStripEntry:
|
class _ColorStripEntry:
|
||||||
@@ -241,43 +222,27 @@ class ColorStripStreamManager:
|
|||||||
"""
|
"""
|
||||||
source = self._color_strip_store.get_source(css_id)
|
source = self._color_strip_store.get_source(css_id)
|
||||||
|
|
||||||
# Non-sharable: always create a fresh per-consumer instance
|
# Non-sharable: always create a fresh per-consumer instance.
|
||||||
|
# Construction is delegated to the per-kind registry in
|
||||||
|
# ``color_strip_kinds`` so the dispatch lives in exactly one place.
|
||||||
if not source.sharable:
|
if not source.sharable:
|
||||||
if source.source_type == "audio":
|
deps = StreamDeps(
|
||||||
from ledgrab.core.processing.audio_stream import AudioColorStripStream
|
css_manager=self,
|
||||||
|
value_stream_manager=self._value_stream_manager,
|
||||||
css_stream = AudioColorStripStream(
|
cspt_store=self._cspt_store,
|
||||||
source,
|
weather_manager=self._weather_manager,
|
||||||
self._audio_capture_manager,
|
audio_capture_manager=self._audio_capture_manager,
|
||||||
self._audio_source_store,
|
audio_source_store=self._audio_source_store,
|
||||||
self._audio_template_store,
|
audio_template_store=self._audio_template_store,
|
||||||
self._audio_processing_template_store,
|
audio_processing_template_store=self._audio_processing_template_store,
|
||||||
|
game_event_bus=self._game_event_bus,
|
||||||
|
depth=depth,
|
||||||
)
|
)
|
||||||
elif source.source_type == "composite":
|
try:
|
||||||
from ledgrab.core.processing.composite_stream import (
|
css_stream = build_stream(source, deps)
|
||||||
CompositeColorStripStream,
|
except ValueError as e:
|
||||||
)
|
# Surface the css_id alongside the registry's error.
|
||||||
|
raise ValueError(f"{e} (css_id={css_id})") from e
|
||||||
css_stream = CompositeColorStripStream(
|
|
||||||
source, self, self._value_stream_manager, self._cspt_store, depth=depth
|
|
||||||
)
|
|
||||||
elif source.source_type == "mapped":
|
|
||||||
from ledgrab.core.processing.mapped_stream import MappedColorStripStream
|
|
||||||
|
|
||||||
css_stream = MappedColorStripStream(source, self)
|
|
||||||
elif source.source_type == "processed":
|
|
||||||
css_stream = ProcessedColorStripStream(source, self, self._cspt_store)
|
|
||||||
elif source.source_type == "weather":
|
|
||||||
from ledgrab.core.processing.weather_stream import WeatherColorStripStream
|
|
||||||
|
|
||||||
css_stream = WeatherColorStripStream(source, self._weather_manager)
|
|
||||||
else:
|
|
||||||
stream_cls = _SIMPLE_STREAM_MAP.get(source.source_type)
|
|
||||||
if not stream_cls:
|
|
||||||
raise ValueError(
|
|
||||||
f"Unsupported color strip source type '{source.source_type}' for {css_id}"
|
|
||||||
)
|
|
||||||
css_stream = stream_cls(source)
|
|
||||||
# Inject gradient store for palette resolution
|
# Inject gradient store for palette resolution
|
||||||
if self._gradient_store and hasattr(css_stream, "set_gradient_store"):
|
if self._gradient_store and hasattr(css_stream, "set_gradient_store"):
|
||||||
css_stream.set_gradient_store(self._gradient_store)
|
css_stream.set_gradient_store(self._gradient_store)
|
||||||
|
|||||||
@@ -0,0 +1,354 @@
|
|||||||
|
"""Single source of truth for value-stream construction.
|
||||||
|
|
||||||
|
``ValueStreamManager._create_stream`` used to be a 168-line ``isinstance``
|
||||||
|
ladder over 14 ``ValueSource`` subclasses with a silent fallback to
|
||||||
|
``StaticValueStream(value=1.0)``. The ladder forced any new value kind to
|
||||||
|
edit the factory plus the storage subclass plus the schemas plus the store's
|
||||||
|
``create_source`` — and a missing branch corrupted the stream at runtime.
|
||||||
|
|
||||||
|
This module replaces the ladder with a single ``STREAM_BUILDERS`` registry
|
||||||
|
keyed by the ``source_type`` string (matching the storage layer's
|
||||||
|
``_VALUE_SOURCE_MAP``). An import-time coverage assertion guarantees the two
|
||||||
|
registries stay aligned.
|
||||||
|
|
||||||
|
Builders take a ``(source, deps: ValueStreamDeps) -> ValueStream`` shape so
|
||||||
|
both the production manager and any preview / test harness can populate the
|
||||||
|
deps from their own context.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import TYPE_CHECKING, Any, Callable, Optional
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
# Typed forward references so mypy/pyright catch typos like
|
||||||
|
# ``d.gradient_stroe`` at static-analysis time. At runtime these are
|
||||||
|
# ``Any`` — the live objects come from the FastAPI / manager wiring.
|
||||||
|
from ledgrab.core.audio.audio_capture_manager import AudioCaptureManager
|
||||||
|
from ledgrab.core.game_integration.event_bus import GameEventBus
|
||||||
|
from ledgrab.core.home_assistant.ha_manager import HomeAssistantManager
|
||||||
|
from ledgrab.core.processing.color_strip_stream_manager import (
|
||||||
|
ColorStripStreamManager,
|
||||||
|
)
|
||||||
|
from ledgrab.core.processing.live_stream_manager import LiveStreamManager
|
||||||
|
from ledgrab.core.processing.sync_clock_manager import SyncClockRuntime
|
||||||
|
from ledgrab.storage.audio_processing_template_store import (
|
||||||
|
AudioProcessingTemplateStore,
|
||||||
|
)
|
||||||
|
from ledgrab.storage.audio_source_store import AudioSourceStore
|
||||||
|
from ledgrab.storage.audio_template_store import AudioTemplateStore
|
||||||
|
from ledgrab.storage.gradient_store import GradientStore
|
||||||
|
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class ValueStreamDeps:
|
||||||
|
"""Dependency bag for value-stream construction.
|
||||||
|
|
||||||
|
Builders only read the subset they need. ``value_stream_manager`` is the
|
||||||
|
one truly load-bearing field — it is referenced by
|
||||||
|
``GradientMapValueStream`` so it can recursively acquire its source
|
||||||
|
value stream.
|
||||||
|
|
||||||
|
``clock_runtime`` is pre-acquired by the manager exclusively for the
|
||||||
|
``animated_color`` kind (see :data:`NEEDS_CLOCK_RUNTIME`). The manager
|
||||||
|
owns the bookkeeping for tracking ``vs_id → clock_id`` so the builder
|
||||||
|
stays a pure ``(source, deps) -> stream`` mapping. If a new kind ever
|
||||||
|
grows a clock dependency, add it to ``NEEDS_CLOCK_RUNTIME`` AND surface
|
||||||
|
a separate field — sharing ``clock_runtime`` across kinds invites the
|
||||||
|
wrong runtime being passed to the wrong builder.
|
||||||
|
|
||||||
|
Field types are quoted so they stay informational under
|
||||||
|
``TYPE_CHECKING`` and the dataclass still accepts plain mocks at
|
||||||
|
runtime. Builders therefore get IDE/lint help against typos like
|
||||||
|
``d.gradient_stroe`` while production code remains duck-typed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
value_stream_manager: "Any"
|
||||||
|
audio_capture_manager: Optional["AudioCaptureManager"] = None
|
||||||
|
audio_source_store: Optional["AudioSourceStore"] = None
|
||||||
|
audio_template_store: Optional["AudioTemplateStore"] = None
|
||||||
|
audio_processing_template_store: Optional["AudioProcessingTemplateStore"] = None
|
||||||
|
live_stream_manager: Optional["LiveStreamManager"] = None
|
||||||
|
ha_manager: Optional["HomeAssistantManager"] = None
|
||||||
|
gradient_store: Optional["GradientStore"] = None
|
||||||
|
css_stream_manager: Optional["ColorStripStreamManager"] = None
|
||||||
|
event_bus: Optional["GameEventBus"] = None
|
||||||
|
http_endpoint_store: Optional["HTTPEndpointStore"] = None
|
||||||
|
clock_runtime: Optional["SyncClockRuntime"] = None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Per-kind builders
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _build_static(source, _d: ValueStreamDeps):
|
||||||
|
from ledgrab.core.processing.value_stream import StaticValueStream
|
||||||
|
|
||||||
|
return StaticValueStream(value=source.value)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_animated(source, _d: ValueStreamDeps):
|
||||||
|
from ledgrab.core.processing.value_stream import AnimatedValueStream
|
||||||
|
|
||||||
|
return AnimatedValueStream(
|
||||||
|
waveform=source.waveform,
|
||||||
|
speed=source.speed,
|
||||||
|
min_value=source.min_value,
|
||||||
|
max_value=source.max_value,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_audio(source, d: ValueStreamDeps):
|
||||||
|
from ledgrab.core.processing.value_stream import AudioValueStream
|
||||||
|
|
||||||
|
return AudioValueStream(
|
||||||
|
audio_source_id=source.audio_source_id,
|
||||||
|
mode=source.mode,
|
||||||
|
sensitivity=source.sensitivity,
|
||||||
|
smoothing=source.smoothing,
|
||||||
|
min_value=source.min_value,
|
||||||
|
max_value=source.max_value,
|
||||||
|
auto_gain=source.auto_gain,
|
||||||
|
audio_capture_manager=d.audio_capture_manager,
|
||||||
|
audio_source_store=d.audio_source_store,
|
||||||
|
audio_template_store=d.audio_template_store,
|
||||||
|
audio_processing_template_store=d.audio_processing_template_store,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_daylight(source, _d: ValueStreamDeps):
|
||||||
|
from ledgrab.core.processing.value_stream import DaylightValueStream
|
||||||
|
|
||||||
|
return DaylightValueStream(
|
||||||
|
speed=source.speed,
|
||||||
|
use_real_time=source.use_real_time,
|
||||||
|
latitude=source.latitude,
|
||||||
|
longitude=source.longitude,
|
||||||
|
min_value=source.min_value,
|
||||||
|
max_value=source.max_value,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_adaptive_time(source, _d: ValueStreamDeps):
|
||||||
|
from ledgrab.core.processing.value_stream import TimeOfDayValueStream
|
||||||
|
|
||||||
|
return TimeOfDayValueStream(
|
||||||
|
schedule=source.schedule,
|
||||||
|
min_value=source.min_value,
|
||||||
|
max_value=source.max_value,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_adaptive_scene(source, d: ValueStreamDeps):
|
||||||
|
from ledgrab.core.processing.value_stream import SceneValueStream
|
||||||
|
|
||||||
|
return SceneValueStream(
|
||||||
|
picture_source_id=source.picture_source_id,
|
||||||
|
scene_behavior=source.scene_behavior,
|
||||||
|
sensitivity=source.sensitivity,
|
||||||
|
smoothing=source.smoothing,
|
||||||
|
min_value=source.min_value,
|
||||||
|
max_value=source.max_value,
|
||||||
|
live_stream_manager=d.live_stream_manager,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_static_color(source, _d: ValueStreamDeps):
|
||||||
|
from ledgrab.core.processing.value_stream import StaticColorValueStream
|
||||||
|
|
||||||
|
return StaticColorValueStream(color=source.color)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_animated_color(source, d: ValueStreamDeps):
|
||||||
|
# See NEEDS_CLOCK_RUNTIME below: ``d.clock_runtime`` is pre-acquired by
|
||||||
|
# ``ValueStreamManager._create_stream`` exclusively for this kind. Any
|
||||||
|
# other builder that ever needs a clock should add its own deps field
|
||||||
|
# rather than read this one.
|
||||||
|
from ledgrab.core.processing.value_stream import AnimatedColorValueStream
|
||||||
|
|
||||||
|
return AnimatedColorValueStream(
|
||||||
|
colors=source.colors,
|
||||||
|
speed=source.speed,
|
||||||
|
easing=source.easing,
|
||||||
|
clock=d.clock_runtime,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_adaptive_time_color(source, _d: ValueStreamDeps):
|
||||||
|
from ledgrab.core.processing.value_stream import AdaptiveTimeColorValueStream
|
||||||
|
|
||||||
|
return AdaptiveTimeColorValueStream(schedule=source.schedule)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_ha_entity(source, d: ValueStreamDeps):
|
||||||
|
from ledgrab.core.processing.value_stream import HAEntityValueStream
|
||||||
|
|
||||||
|
return HAEntityValueStream(
|
||||||
|
ha_source_id=source.ha_source_id,
|
||||||
|
entity_id=source.entity_id,
|
||||||
|
attribute=source.attribute,
|
||||||
|
min_ha_value=source.min_ha_value,
|
||||||
|
max_ha_value=source.max_ha_value,
|
||||||
|
smoothing=source.smoothing,
|
||||||
|
ha_manager=d.ha_manager,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_gradient_map(source, d: ValueStreamDeps):
|
||||||
|
from ledgrab.core.processing.value_stream import GradientMapValueStream
|
||||||
|
|
||||||
|
return GradientMapValueStream(
|
||||||
|
value_source_id=source.value_source_id,
|
||||||
|
gradient_id=source.gradient_id,
|
||||||
|
easing=source.easing,
|
||||||
|
value_stream_manager=d.value_stream_manager,
|
||||||
|
gradient_store=d.gradient_store,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_css_extract(source, d: ValueStreamDeps):
|
||||||
|
from ledgrab.core.processing.value_stream import CSSExtractValueStream
|
||||||
|
|
||||||
|
return CSSExtractValueStream(
|
||||||
|
color_strip_source_id=source.color_strip_source_id,
|
||||||
|
led_start=source.led_start,
|
||||||
|
led_end=source.led_end,
|
||||||
|
css_stream_manager=d.css_stream_manager,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_system_metrics(source, _d: ValueStreamDeps):
|
||||||
|
from ledgrab.core.processing.value_stream import SystemMetricsValueStream
|
||||||
|
|
||||||
|
return SystemMetricsValueStream(
|
||||||
|
metric=source.metric,
|
||||||
|
min_value=source.min_value,
|
||||||
|
max_value=source.max_value,
|
||||||
|
max_rate=source.max_rate,
|
||||||
|
disk_path=source.disk_path,
|
||||||
|
sensor_label=source.sensor_label,
|
||||||
|
poll_interval=source.poll_interval,
|
||||||
|
smoothing=source.smoothing,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_game_event(source, d: ValueStreamDeps):
|
||||||
|
# Late import: ``GameEventValueStream`` lives in a separate sub-package
|
||||||
|
# to keep the game-event subsystem (which transitively pulls in the
|
||||||
|
# game-integration adapters) off the cold-start path for installs that
|
||||||
|
# never use game events.
|
||||||
|
from ledgrab.core.value_sources.game_event_value_source import GameEventValueStream
|
||||||
|
|
||||||
|
return GameEventValueStream(
|
||||||
|
event_type=source.event_type,
|
||||||
|
min_game_value=source.min_game_value,
|
||||||
|
max_game_value=source.max_game_value,
|
||||||
|
smoothing=source.smoothing,
|
||||||
|
default_value=source.default_value,
|
||||||
|
timeout=source.timeout,
|
||||||
|
event_bus=d.event_bus,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_http(source, d: ValueStreamDeps):
|
||||||
|
from ledgrab.core.processing.value_stream import HTTPValueStream
|
||||||
|
|
||||||
|
return HTTPValueStream(
|
||||||
|
endpoint_id=source.http_endpoint_id,
|
||||||
|
json_path=source.json_path,
|
||||||
|
interval_s=source.interval_s,
|
||||||
|
min_value=source.min_value,
|
||||||
|
max_value=source.max_value,
|
||||||
|
smoothing=source.smoothing,
|
||||||
|
http_endpoint_store=d.http_endpoint_store,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Registry
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
StreamBuilder = Callable[[Any, ValueStreamDeps], Any]
|
||||||
|
|
||||||
|
STREAM_BUILDERS: dict[str, StreamBuilder] = {
|
||||||
|
"static": _build_static,
|
||||||
|
"animated": _build_animated,
|
||||||
|
"audio": _build_audio,
|
||||||
|
"daylight": _build_daylight,
|
||||||
|
"adaptive_time": _build_adaptive_time,
|
||||||
|
"adaptive_scene": _build_adaptive_scene,
|
||||||
|
"static_color": _build_static_color,
|
||||||
|
"animated_color": _build_animated_color,
|
||||||
|
"adaptive_time_color": _build_adaptive_time_color,
|
||||||
|
"ha_entity": _build_ha_entity,
|
||||||
|
"gradient_map": _build_gradient_map,
|
||||||
|
"css_extract": _build_css_extract,
|
||||||
|
"system_metrics": _build_system_metrics,
|
||||||
|
"game_event": _build_game_event,
|
||||||
|
"http": _build_http,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ``animated_color`` is the only kind that needs a pre-acquired SyncClockRuntime
|
||||||
|
# from the manager; the rest derive everything they need from ``source`` and
|
||||||
|
# ``deps``. Exposing this set lets the manager perform the side-effecting
|
||||||
|
# acquisition step exactly once, before delegating to the registry.
|
||||||
|
NEEDS_CLOCK_RUNTIME: frozenset[str] = frozenset({"animated_color"})
|
||||||
|
|
||||||
|
|
||||||
|
def build_stream(source, deps: ValueStreamDeps):
|
||||||
|
"""Build a ValueStream for *source*.
|
||||||
|
|
||||||
|
Raises ``ValueError`` when no builder is registered for
|
||||||
|
``source.source_type``. Coverage is asserted at module import, so this
|
||||||
|
only fires for an in-flight instance whose ``source_type`` somehow
|
||||||
|
drifted from the registered set.
|
||||||
|
"""
|
||||||
|
builder = STREAM_BUILDERS.get(source.source_type)
|
||||||
|
if builder is None:
|
||||||
|
raise ValueError(
|
||||||
|
f"No value-stream builder for source_type {source.source_type!r} "
|
||||||
|
f"(id={getattr(source, 'id', '?')!r})"
|
||||||
|
)
|
||||||
|
return builder(source, deps)
|
||||||
|
|
||||||
|
|
||||||
|
def _assert_value_kind_coverage() -> None:
|
||||||
|
"""Verify the registry and storage's ``_VALUE_SOURCE_MAP`` agree.
|
||||||
|
|
||||||
|
Runs at module import. Symmetric: every kind in storage must have a
|
||||||
|
builder, and every builder must correspond to a real storage kind.
|
||||||
|
Also asserts ``NEEDS_CLOCK_RUNTIME`` only names kinds that exist in the
|
||||||
|
registry, so a typo there fails the boot loudly instead of silently
|
||||||
|
leaking a ``SyncClockRuntime`` acquisition.
|
||||||
|
"""
|
||||||
|
from ledgrab.storage.value_source import _VALUE_SOURCE_MAP
|
||||||
|
|
||||||
|
storage_kinds = set(_VALUE_SOURCE_MAP.keys())
|
||||||
|
builder_kinds = set(STREAM_BUILDERS.keys())
|
||||||
|
missing = storage_kinds - builder_kinds
|
||||||
|
extra = builder_kinds - storage_kinds
|
||||||
|
if missing or extra:
|
||||||
|
problems = []
|
||||||
|
if missing:
|
||||||
|
problems.append(f"missing builders: {sorted(missing)}")
|
||||||
|
if extra:
|
||||||
|
problems.append(f"unregistered kinds: {sorted(extra)}")
|
||||||
|
raise RuntimeError(
|
||||||
|
"value_kinds.STREAM_BUILDERS is out of sync with storage._VALUE_SOURCE_MAP: "
|
||||||
|
+ "; ".join(problems)
|
||||||
|
)
|
||||||
|
|
||||||
|
rogue_clock_kinds = NEEDS_CLOCK_RUNTIME - builder_kinds
|
||||||
|
if rogue_clock_kinds:
|
||||||
|
raise RuntimeError(
|
||||||
|
"value_kinds.NEEDS_CLOCK_RUNTIME names kinds with no registered "
|
||||||
|
f"builder: {sorted(rogue_clock_kinds)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
_assert_value_kind_coverage()
|
||||||
@@ -21,7 +21,9 @@ ValueStreamManager owns all running ValueStreams, keyed by
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import json
|
||||||
import math
|
import math
|
||||||
|
import re
|
||||||
import time
|
import time
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
@@ -29,8 +31,13 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
|
|||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
from ledgrab.storage.base_store import EntityNotFoundError
|
||||||
from ledgrab.utils import get_logger
|
from ledgrab.utils import get_logger
|
||||||
|
|
||||||
|
# Compiled once — used by ``_extract_simple_path`` on every poll.
|
||||||
|
_NAME_HEAD_RE = re.compile(r"^([^\[]*)")
|
||||||
|
_INDEX_RE = re.compile(r"^\[(\d+)\]")
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ledgrab.core.audio.audio_capture import AudioCaptureManager
|
from ledgrab.core.audio.audio_capture import AudioCaptureManager
|
||||||
from ledgrab.core.game_integration.event_bus import GameEventBus
|
from ledgrab.core.game_integration.event_bus import GameEventBus
|
||||||
@@ -39,6 +46,7 @@ if TYPE_CHECKING:
|
|||||||
from ledgrab.core.processing.live_stream_manager import LiveStreamManager
|
from ledgrab.core.processing.live_stream_manager import LiveStreamManager
|
||||||
from ledgrab.core.processing.sync_clock_manager import SyncClockManager
|
from ledgrab.core.processing.sync_clock_manager import SyncClockManager
|
||||||
from ledgrab.storage.audio_source_store import AudioSourceStore
|
from ledgrab.storage.audio_source_store import AudioSourceStore
|
||||||
|
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
|
||||||
from ledgrab.storage.value_source import ValueSource
|
from ledgrab.storage.value_source import ValueSource
|
||||||
from ledgrab.storage.value_source_store import ValueSourceStore
|
from ledgrab.storage.value_source_store import ValueSourceStore
|
||||||
|
|
||||||
@@ -1017,6 +1025,211 @@ class HAEntityValueStream(ValueStream):
|
|||||||
logger.warning("HAEntityValueStream failed to swap HA runtime: %s", e)
|
logger.warning("HAEntityValueStream failed to swap HA runtime: %s", e)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# HTTP poll
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class HTTPValueStream(ValueStream):
|
||||||
|
"""Periodically polls an HTTPEndpoint and extracts a value via json_path.
|
||||||
|
|
||||||
|
Exposes two accessors:
|
||||||
|
|
||||||
|
- ``get_value()`` returns a normalized float in [0, 1] for use as a
|
||||||
|
modulator (brightness, color, etc.). The raw extracted value is
|
||||||
|
coerced to float; non-numeric values yield 0.0.
|
||||||
|
- ``get_raw_value()`` returns the un-normalized extracted value
|
||||||
|
(str / int / float / bool / None) for consumers that need it
|
||||||
|
verbatim — e.g. an automation rule comparing ``"playing"``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
endpoint_id: str,
|
||||||
|
json_path: str,
|
||||||
|
interval_s: int,
|
||||||
|
min_value: float,
|
||||||
|
max_value: float,
|
||||||
|
smoothing: float,
|
||||||
|
http_endpoint_store: Optional["HTTPEndpointStore"] = None,
|
||||||
|
) -> None:
|
||||||
|
self._endpoint_id = endpoint_id
|
||||||
|
self._json_path = json_path
|
||||||
|
self._interval_s = max(1, int(interval_s))
|
||||||
|
self._min_value = min_value
|
||||||
|
self._max_value = max_value
|
||||||
|
self._smoothing = smoothing
|
||||||
|
self._http_endpoint_store = http_endpoint_store
|
||||||
|
self._task: Optional[asyncio.Task] = None
|
||||||
|
self._raw_value: Any = None
|
||||||
|
self._prev_normalized: Optional[float] = None
|
||||||
|
# Kept as private attrs for internal/log diagnostics; not exposed via
|
||||||
|
# public properties or API until a status endpoint consumes them.
|
||||||
|
self._last_fetched_at: Optional[datetime] = None
|
||||||
|
self._last_status_code: Optional[int] = None
|
||||||
|
self._last_error: Optional[str] = None
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
if self._task is not None:
|
||||||
|
return
|
||||||
|
if not self._endpoint_id or self._http_endpoint_store is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
except RuntimeError:
|
||||||
|
# No running loop — can't poll. Construction in a sync test
|
||||||
|
# context is fine; the stream just stays idle until started
|
||||||
|
# from an async context.
|
||||||
|
return
|
||||||
|
self._task = loop.create_task(self._poll_loop())
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
task = self._task
|
||||||
|
self._task = None
|
||||||
|
if task is not None:
|
||||||
|
task.cancel()
|
||||||
|
|
||||||
|
def get_value(self) -> float:
|
||||||
|
raw = self._raw_value
|
||||||
|
if raw is None:
|
||||||
|
return self._prev_normalized if self._prev_normalized is not None else 0.0
|
||||||
|
try:
|
||||||
|
numeric = float(raw)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return self._prev_normalized if self._prev_normalized is not None else 0.0
|
||||||
|
|
||||||
|
rng = self._max_value - self._min_value
|
||||||
|
if abs(rng) < 1e-9:
|
||||||
|
normalized = 0.5
|
||||||
|
else:
|
||||||
|
normalized = (numeric - self._min_value) / rng
|
||||||
|
normalized = max(0.0, min(1.0, normalized))
|
||||||
|
|
||||||
|
if self._smoothing > 0.0 and self._prev_normalized is not None:
|
||||||
|
normalized = (
|
||||||
|
self._smoothing * self._prev_normalized + (1.0 - self._smoothing) * normalized
|
||||||
|
)
|
||||||
|
self._prev_normalized = normalized
|
||||||
|
return normalized
|
||||||
|
|
||||||
|
def get_raw_value(self) -> Any:
|
||||||
|
"""Return the last raw extracted value (string, int, float, etc.)."""
|
||||||
|
return self._raw_value
|
||||||
|
|
||||||
|
def update_source(self, source: "ValueSource") -> None:
|
||||||
|
from ledgrab.storage.value_source import HTTPValueSource
|
||||||
|
|
||||||
|
if not isinstance(source, HTTPValueSource):
|
||||||
|
return
|
||||||
|
self._endpoint_id = source.http_endpoint_id
|
||||||
|
self._json_path = source.json_path
|
||||||
|
self._interval_s = max(1, int(source.interval_s))
|
||||||
|
self._min_value = source.min_value
|
||||||
|
self._max_value = source.max_value
|
||||||
|
self._smoothing = source.smoothing
|
||||||
|
|
||||||
|
async def _poll_loop(self) -> None:
|
||||||
|
from ledgrab.utils.safe_source import safe_request_bounded
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
endpoint = self._http_endpoint_store.get(self._endpoint_id)
|
||||||
|
except EntityNotFoundError:
|
||||||
|
# The endpoint was deleted out from under us. Stop the
|
||||||
|
# poll task so it doesn't spin forever; the next entity
|
||||||
|
# event (or the value source being deleted) will tidy
|
||||||
|
# the rest of the bookkeeping.
|
||||||
|
logger.warning(
|
||||||
|
"HTTPValueStream stopping: endpoint %s no longer exists",
|
||||||
|
self._endpoint_id,
|
||||||
|
)
|
||||||
|
self._last_error = "endpoint_deleted"
|
||||||
|
self._raw_value = None
|
||||||
|
self._task = None
|
||||||
|
return
|
||||||
|
except Exception as exc:
|
||||||
|
self._last_error = f"Endpoint lookup failed: {type(exc).__name__}"
|
||||||
|
self._raw_value = None
|
||||||
|
await asyncio.sleep(self._interval_s)
|
||||||
|
continue
|
||||||
|
|
||||||
|
headers = endpoint.build_request_headers()
|
||||||
|
try:
|
||||||
|
status, body_bytes, error = await safe_request_bounded(
|
||||||
|
endpoint.method,
|
||||||
|
endpoint.url,
|
||||||
|
headers=headers,
|
||||||
|
timeout=endpoint.timeout_s,
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
# safe_request_bounded raises HTTPException on URL
|
||||||
|
# validation failure; treat that as a recoverable poll
|
||||||
|
# error and try again next cycle.
|
||||||
|
self._last_status_code = None
|
||||||
|
self._last_error = f"URL validation failed: {type(exc).__name__}"
|
||||||
|
self._raw_value = None
|
||||||
|
await asyncio.sleep(self._interval_s)
|
||||||
|
continue
|
||||||
|
|
||||||
|
self._last_status_code = status if status else None
|
||||||
|
self._last_error = error
|
||||||
|
if not error and status:
|
||||||
|
try:
|
||||||
|
body_text = body_bytes.decode("utf-8", errors="replace")
|
||||||
|
except Exception:
|
||||||
|
body_text = ""
|
||||||
|
body_json: Any
|
||||||
|
try:
|
||||||
|
body_json = json.loads(body_text) if body_text else None
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
body_json = None
|
||||||
|
self._raw_value = _extract_simple_path(body_json, self._json_path, body_text)
|
||||||
|
else:
|
||||||
|
self._raw_value = None
|
||||||
|
self._last_fetched_at = datetime.now()
|
||||||
|
|
||||||
|
await asyncio.sleep(self._interval_s)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_simple_path(body_json: Any, path: str, body_text: str) -> Any:
|
||||||
|
"""Extract a value via a dot-path (with optional ``[N]`` indices).
|
||||||
|
|
||||||
|
Uses module-level compiled regexes so repeated polls don't recompile.
|
||||||
|
Returns ``None`` when the path doesn't resolve — runtime callers just
|
||||||
|
need "the value, or nothing." Empty path returns the raw body text so
|
||||||
|
plain-text endpoints work too.
|
||||||
|
"""
|
||||||
|
if not path:
|
||||||
|
return body_text or None
|
||||||
|
if body_json is None:
|
||||||
|
return None
|
||||||
|
current: Any = body_json
|
||||||
|
for raw_segment in path.split("."):
|
||||||
|
segment = raw_segment.strip()
|
||||||
|
if not segment:
|
||||||
|
continue
|
||||||
|
name_match = _NAME_HEAD_RE.match(segment)
|
||||||
|
name_part = name_match.group(1) if name_match else ""
|
||||||
|
remainder = segment[len(name_part) :]
|
||||||
|
if name_part:
|
||||||
|
if not isinstance(current, dict) or name_part not in current:
|
||||||
|
return None
|
||||||
|
current = current[name_part]
|
||||||
|
while remainder:
|
||||||
|
idx_match = _INDEX_RE.match(remainder)
|
||||||
|
if not idx_match:
|
||||||
|
return None
|
||||||
|
idx = int(idx_match.group(1))
|
||||||
|
if not isinstance(current, list) or idx < 0 or idx >= len(current):
|
||||||
|
return None
|
||||||
|
current = current[idx]
|
||||||
|
remainder = remainder[idx_match.end() :]
|
||||||
|
return current
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Gradient Map
|
# Gradient Map
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -1547,6 +1760,7 @@ class ValueStreamManager:
|
|||||||
event_bus: Optional["GameEventBus"] = None,
|
event_bus: Optional["GameEventBus"] = None,
|
||||||
audio_processing_template_store=None,
|
audio_processing_template_store=None,
|
||||||
sync_clock_manager: Optional["SyncClockManager"] = None,
|
sync_clock_manager: Optional["SyncClockManager"] = None,
|
||||||
|
http_endpoint_store: Optional["HTTPEndpointStore"] = None,
|
||||||
):
|
):
|
||||||
self._value_source_store = value_source_store
|
self._value_source_store = value_source_store
|
||||||
self._audio_capture_manager = audio_capture_manager
|
self._audio_capture_manager = audio_capture_manager
|
||||||
@@ -1559,6 +1773,7 @@ class ValueStreamManager:
|
|||||||
self._event_bus = event_bus
|
self._event_bus = event_bus
|
||||||
self._audio_processing_template_store = audio_processing_template_store
|
self._audio_processing_template_store = audio_processing_template_store
|
||||||
self._sync_clock_manager = sync_clock_manager
|
self._sync_clock_manager = sync_clock_manager
|
||||||
|
self._http_endpoint_store = http_endpoint_store
|
||||||
self._streams: Dict[str, ValueStream] = {} # vs_id → stream
|
self._streams: Dict[str, ValueStream] = {} # vs_id → stream
|
||||||
self._ref_counts: Dict[str, int] = {} # vs_id → ref count
|
self._ref_counts: Dict[str, int] = {} # vs_id → ref count
|
||||||
# Tracks which clock_id (if any) was acquired for each stream so we
|
# Tracks which clock_id (if any) was acquired for each stream so we
|
||||||
@@ -1602,6 +1817,17 @@ class ValueStreamManager:
|
|||||||
else:
|
else:
|
||||||
logger.info(f"Released ref for value stream {vs_id} (refs={refs})")
|
logger.info(f"Released ref for value stream {vs_id} (refs={refs})")
|
||||||
|
|
||||||
|
def peek(self, vs_id: str) -> Optional[ValueStream]:
|
||||||
|
"""Read-only accessor: return the running ValueStream for ``vs_id``
|
||||||
|
if one exists, else ``None``.
|
||||||
|
|
||||||
|
Does NOT change ref counts. Use for consumer-driven reads where the
|
||||||
|
caller already holds a reference via :meth:`acquire` (e.g. the
|
||||||
|
:class:`AutomationEngine` evaluating an ``HTTPPollRule`` against a
|
||||||
|
value source it has already acquired in ``_sync_value_stream_refs``).
|
||||||
|
"""
|
||||||
|
return self._streams.get(vs_id)
|
||||||
|
|
||||||
def update_source(self, vs_id: str) -> None:
|
def update_source(self, vs_id: str) -> None:
|
||||||
"""Hot-update the shared stream for the given ValueSource."""
|
"""Hot-update the shared stream for the given ValueSource."""
|
||||||
try:
|
try:
|
||||||
@@ -1699,158 +1925,52 @@ class ValueStreamManager:
|
|||||||
logger.info("Released all value streams")
|
logger.info("Released all value streams")
|
||||||
|
|
||||||
def _create_stream(self, source: "ValueSource", vs_id: Optional[str] = None) -> ValueStream:
|
def _create_stream(self, source: "ValueSource", vs_id: Optional[str] = None) -> ValueStream:
|
||||||
"""Factory: create the appropriate ValueStream for a ValueSource."""
|
"""Build a ValueStream for *source* via the central kind registry.
|
||||||
from ledgrab.storage.value_source import (
|
|
||||||
AdaptiveValueSource,
|
The 14-branch ``isinstance`` ladder this method used to host was the
|
||||||
AnimatedValueSource,
|
canonical example of the parallel-change smell flagged in the
|
||||||
AudioValueSource,
|
architecture audit. The actual per-kind construction now lives in
|
||||||
CSSExtractValueSource,
|
``ledgrab.core.processing.value_kinds.STREAM_BUILDERS``, keyed by
|
||||||
DaylightValueSource,
|
``source.source_type``. This method only handles the manager-side
|
||||||
GameEventValueSource,
|
bookkeeping that does not fit a uniform builder signature — namely
|
||||||
GradientMapValueSource,
|
the optional :class:`SyncClockRuntime` acquisition for
|
||||||
HAEntityValueSource,
|
``animated_color`` sources, whose ``vs_id → clock_id`` mapping the
|
||||||
StaticValueSource,
|
manager owns for symmetric release at teardown.
|
||||||
StaticColorValueSource,
|
"""
|
||||||
AnimatedColorValueSource,
|
from ledgrab.core.processing.value_kinds import (
|
||||||
AdaptiveTimeColorValueSource,
|
NEEDS_CLOCK_RUNTIME,
|
||||||
SystemMetricsValueSource,
|
ValueStreamDeps,
|
||||||
|
build_stream,
|
||||||
)
|
)
|
||||||
|
|
||||||
if isinstance(source, StaticValueSource):
|
clock_runtime = None
|
||||||
return StaticValueStream(value=source.value)
|
if source.source_type in NEEDS_CLOCK_RUNTIME:
|
||||||
|
clock_id = getattr(source, "clock_id", None)
|
||||||
if isinstance(source, AnimatedValueSource):
|
if clock_id and self._sync_clock_manager:
|
||||||
return AnimatedValueStream(
|
try:
|
||||||
waveform=source.waveform,
|
clock_runtime = self._sync_clock_manager.acquire(clock_id)
|
||||||
speed=source.speed,
|
if vs_id is not None:
|
||||||
min_value=source.min_value,
|
self._stream_clock_ids[vs_id] = clock_id
|
||||||
max_value=source.max_value,
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
"Could not acquire sync clock %s for value source %s: %s",
|
||||||
|
clock_id,
|
||||||
|
source.id,
|
||||||
|
e,
|
||||||
)
|
)
|
||||||
|
|
||||||
if isinstance(source, AudioValueSource):
|
deps = ValueStreamDeps(
|
||||||
return AudioValueStream(
|
value_stream_manager=self,
|
||||||
audio_source_id=source.audio_source_id,
|
|
||||||
mode=source.mode,
|
|
||||||
sensitivity=source.sensitivity,
|
|
||||||
smoothing=source.smoothing,
|
|
||||||
min_value=source.min_value,
|
|
||||||
max_value=source.max_value,
|
|
||||||
auto_gain=source.auto_gain,
|
|
||||||
audio_capture_manager=self._audio_capture_manager,
|
audio_capture_manager=self._audio_capture_manager,
|
||||||
audio_source_store=self._audio_source_store,
|
audio_source_store=self._audio_source_store,
|
||||||
audio_template_store=self._audio_template_store,
|
audio_template_store=self._audio_template_store,
|
||||||
audio_processing_template_store=self._audio_processing_template_store,
|
audio_processing_template_store=self._audio_processing_template_store,
|
||||||
)
|
|
||||||
|
|
||||||
if isinstance(source, DaylightValueSource):
|
|
||||||
return DaylightValueStream(
|
|
||||||
speed=source.speed,
|
|
||||||
use_real_time=source.use_real_time,
|
|
||||||
latitude=source.latitude,
|
|
||||||
longitude=source.longitude,
|
|
||||||
min_value=source.min_value,
|
|
||||||
max_value=source.max_value,
|
|
||||||
)
|
|
||||||
|
|
||||||
if isinstance(source, AdaptiveValueSource):
|
|
||||||
if source.source_type == "adaptive_scene":
|
|
||||||
return SceneValueStream(
|
|
||||||
picture_source_id=source.picture_source_id,
|
|
||||||
scene_behavior=source.scene_behavior,
|
|
||||||
sensitivity=source.sensitivity,
|
|
||||||
smoothing=source.smoothing,
|
|
||||||
min_value=source.min_value,
|
|
||||||
max_value=source.max_value,
|
|
||||||
live_stream_manager=self._live_stream_manager,
|
live_stream_manager=self._live_stream_manager,
|
||||||
)
|
|
||||||
return TimeOfDayValueStream(
|
|
||||||
schedule=source.schedule,
|
|
||||||
min_value=source.min_value,
|
|
||||||
max_value=source.max_value,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Color streams
|
|
||||||
if isinstance(source, StaticColorValueSource):
|
|
||||||
return StaticColorValueStream(color=source.color)
|
|
||||||
|
|
||||||
if isinstance(source, AnimatedColorValueSource):
|
|
||||||
clock_runtime = None
|
|
||||||
if source.clock_id and self._sync_clock_manager:
|
|
||||||
try:
|
|
||||||
clock_runtime = self._sync_clock_manager.acquire(source.clock_id)
|
|
||||||
if vs_id is not None:
|
|
||||||
self._stream_clock_ids[vs_id] = source.clock_id
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(
|
|
||||||
"Could not acquire sync clock %s for value source %s: %s",
|
|
||||||
source.clock_id,
|
|
||||||
source.id,
|
|
||||||
e,
|
|
||||||
)
|
|
||||||
return AnimatedColorValueStream(
|
|
||||||
colors=source.colors,
|
|
||||||
speed=source.speed,
|
|
||||||
easing=source.easing,
|
|
||||||
clock=clock_runtime,
|
|
||||||
)
|
|
||||||
|
|
||||||
if isinstance(source, AdaptiveTimeColorValueSource):
|
|
||||||
return AdaptiveTimeColorValueStream(schedule=source.schedule)
|
|
||||||
|
|
||||||
if isinstance(source, HAEntityValueSource):
|
|
||||||
return HAEntityValueStream(
|
|
||||||
ha_source_id=source.ha_source_id,
|
|
||||||
entity_id=source.entity_id,
|
|
||||||
attribute=source.attribute,
|
|
||||||
min_ha_value=source.min_ha_value,
|
|
||||||
max_ha_value=source.max_ha_value,
|
|
||||||
smoothing=source.smoothing,
|
|
||||||
ha_manager=self._ha_manager,
|
ha_manager=self._ha_manager,
|
||||||
)
|
|
||||||
|
|
||||||
if isinstance(source, GradientMapValueSource):
|
|
||||||
return GradientMapValueStream(
|
|
||||||
value_source_id=source.value_source_id,
|
|
||||||
gradient_id=source.gradient_id,
|
|
||||||
easing=source.easing,
|
|
||||||
value_stream_manager=self,
|
|
||||||
gradient_store=self._gradient_store,
|
gradient_store=self._gradient_store,
|
||||||
)
|
|
||||||
|
|
||||||
if isinstance(source, CSSExtractValueSource):
|
|
||||||
return CSSExtractValueStream(
|
|
||||||
color_strip_source_id=source.color_strip_source_id,
|
|
||||||
led_start=source.led_start,
|
|
||||||
led_end=source.led_end,
|
|
||||||
css_stream_manager=self._css_stream_manager,
|
css_stream_manager=self._css_stream_manager,
|
||||||
)
|
|
||||||
|
|
||||||
if isinstance(source, SystemMetricsValueSource):
|
|
||||||
return SystemMetricsValueStream(
|
|
||||||
metric=source.metric,
|
|
||||||
min_value=source.min_value,
|
|
||||||
max_value=source.max_value,
|
|
||||||
max_rate=source.max_rate,
|
|
||||||
disk_path=source.disk_path,
|
|
||||||
sensor_label=source.sensor_label,
|
|
||||||
poll_interval=source.poll_interval,
|
|
||||||
smoothing=source.smoothing,
|
|
||||||
)
|
|
||||||
|
|
||||||
if isinstance(source, GameEventValueSource):
|
|
||||||
from ledgrab.core.value_sources.game_event_value_source import (
|
|
||||||
GameEventValueStream,
|
|
||||||
)
|
|
||||||
|
|
||||||
return GameEventValueStream(
|
|
||||||
event_type=source.event_type,
|
|
||||||
min_game_value=source.min_game_value,
|
|
||||||
max_game_value=source.max_game_value,
|
|
||||||
smoothing=source.smoothing,
|
|
||||||
default_value=source.default_value,
|
|
||||||
timeout=source.timeout,
|
|
||||||
event_bus=self._event_bus,
|
event_bus=self._event_bus,
|
||||||
|
http_endpoint_store=self._http_endpoint_store,
|
||||||
|
clock_runtime=clock_runtime,
|
||||||
)
|
)
|
||||||
|
return build_stream(source, deps)
|
||||||
# Fallback
|
|
||||||
return StaticValueStream(value=1.0)
|
|
||||||
|
|||||||
@@ -32,6 +32,15 @@ class BaseSqliteStore(Generic[T]):
|
|||||||
self._items: Dict[str, T] = {}
|
self._items: Dict[str, T] = {}
|
||||||
self._deserializer = deserializer
|
self._deserializer = deserializer
|
||||||
self._lock = threading.RLock()
|
self._lock = threading.RLock()
|
||||||
|
# Apply pending JSON-blob data migrations before loading rows so the
|
||||||
|
# in-memory cache reflects the canonical, post-migration shape. The
|
||||||
|
# runner is idempotent across stores — it consults a dedicated
|
||||||
|
# ``data_migrations`` audit table — so each store construction may
|
||||||
|
# invoke it without re-running already-applied migrations.
|
||||||
|
# Imported lazily to avoid a circular dependency at module load.
|
||||||
|
from ledgrab.storage.data_migrations import ALL_MIGRATIONS, MigrationRunner
|
||||||
|
|
||||||
|
MigrationRunner(db).run(ALL_MIGRATIONS)
|
||||||
self._load()
|
self._load()
|
||||||
|
|
||||||
# -- I/O -----------------------------------------------------------------
|
# -- I/O -----------------------------------------------------------------
|
||||||
|
|||||||
@@ -23,7 +23,13 @@ MAX_COMPOSITE_DEPTH = 4
|
|||||||
|
|
||||||
|
|
||||||
class ColorStripStore(BaseSqliteStore[ColorStripSource]):
|
class ColorStripStore(BaseSqliteStore[ColorStripSource]):
|
||||||
"""Persistent storage for color strip sources."""
|
"""Persistent storage for color strip sources.
|
||||||
|
|
||||||
|
JSON-blob field renames (e.g. legacy ``source_type='static'`` →
|
||||||
|
``'single_color'``) are handled by the central
|
||||||
|
:mod:`ledgrab.storage.data_migrations` runner, which executes once per
|
||||||
|
database from :meth:`Database._ensure_schema`.
|
||||||
|
"""
|
||||||
|
|
||||||
_table_name = "color_strip_sources"
|
_table_name = "color_strip_sources"
|
||||||
_entity_name = "Color strip source"
|
_entity_name = "Color strip source"
|
||||||
|
|||||||
@@ -0,0 +1,219 @@
|
|||||||
|
"""Versioned data migrations for stored JSON entities.
|
||||||
|
|
||||||
|
Each migration is a small class with a stable ``name`` (used as the idempotency
|
||||||
|
key) and an ``apply(db)`` method that performs the change inside a transaction.
|
||||||
|
The ``MigrationRunner`` keeps a ``data_migrations`` table that records which
|
||||||
|
migrations have already executed; subsequent runs are a no-op.
|
||||||
|
|
||||||
|
This replaces ad-hoc per-store migrations that were rewriting raw JSON via
|
||||||
|
``str.replace`` — that approach corrupted nested fields whose values happened
|
||||||
|
to share a substring with the renamed key, and had no transaction or audit.
|
||||||
|
|
||||||
|
Adding a new migration
|
||||||
|
----------------------
|
||||||
|
|
||||||
|
1. Subclass :class:`DataMigration`, give it a unique ``name`` (prefix with the
|
||||||
|
next sequence number, e.g. ``"002_..."``) and implement ``apply``.
|
||||||
|
2. Append it to :data:`ALL_MIGRATIONS` in commit order. Never reorder existing
|
||||||
|
entries — the runner records each by name.
|
||||||
|
3. The first ``ColorStripStore`` (or any other store) construction triggers the
|
||||||
|
runner; nothing else has to change.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import sqlite3
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
from ledgrab.storage.database import Database, is_writes_frozen
|
||||||
|
from ledgrab.utils import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class DataMigration(ABC):
|
||||||
|
"""One JSON-blob migration step.
|
||||||
|
|
||||||
|
Subclasses must declare a class-level ``name`` and implement ``apply``.
|
||||||
|
Implementations must operate on the supplied ``conn`` (an already-open
|
||||||
|
transaction). They MUST NOT commit or open nested transactions — the
|
||||||
|
:class:`MigrationRunner` owns the transaction so the data UPDATEs and the
|
||||||
|
audit-table INSERT are atomic.
|
||||||
|
"""
|
||||||
|
|
||||||
|
name: str = ""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def apply(self, conn: sqlite3.Connection) -> int:
|
||||||
|
"""Perform the migration inside the supplied transaction.
|
||||||
|
|
||||||
|
Return the number of rows changed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class MigrationRecord:
|
||||||
|
"""A row in the ``data_migrations`` audit table."""
|
||||||
|
|
||||||
|
name: str
|
||||||
|
applied_at: str
|
||||||
|
rows_changed: int
|
||||||
|
|
||||||
|
|
||||||
|
class MigrationRunner:
|
||||||
|
"""Apply pending data migrations exactly once per database.
|
||||||
|
|
||||||
|
Each call to :meth:`run` opens a single transaction that covers (a) the
|
||||||
|
"is this migration already applied?" check, (b) the migration body, and
|
||||||
|
(c) the audit INSERT. That guarantees:
|
||||||
|
|
||||||
|
* a partial-failure cannot leave data rewritten but unrecorded;
|
||||||
|
* concurrent stores constructing on multiple threads cannot race each
|
||||||
|
other into a UNIQUE-constraint crash on the audit table.
|
||||||
|
|
||||||
|
The runner skips silently when writes are frozen so a post-restore boot
|
||||||
|
does not mutate the freshly-restored database before the imminent restart.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_TABLE = "data_migrations"
|
||||||
|
|
||||||
|
def __init__(self, db: Database):
|
||||||
|
self._db = db
|
||||||
|
self._ensure_table()
|
||||||
|
|
||||||
|
def _ensure_table(self) -> None:
|
||||||
|
# CREATE TABLE is idempotent thanks to IF NOT EXISTS; running on every
|
||||||
|
# startup is cheap. ``Database.execute`` does NOT honour the
|
||||||
|
# ``is_writes_frozen`` guard (DDL is always permitted), so the audit
|
||||||
|
# table reliably exists even on a frozen boot.
|
||||||
|
self._db.execute(
|
||||||
|
f"""
|
||||||
|
CREATE TABLE IF NOT EXISTS {self._TABLE} (
|
||||||
|
name TEXT PRIMARY KEY,
|
||||||
|
applied_at TEXT NOT NULL,
|
||||||
|
rows_changed INTEGER NOT NULL DEFAULT 0
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
def applied_names(self) -> set[str]:
|
||||||
|
"""Return the set of migration names already recorded as applied."""
|
||||||
|
cursor = self._db.execute(f"SELECT name FROM {self._TABLE}")
|
||||||
|
return {row["name"] for row in cursor.fetchall()}
|
||||||
|
|
||||||
|
def run(self, migrations: list[DataMigration]) -> list[MigrationRecord]:
|
||||||
|
"""Apply each migration in *migrations* that hasn't been recorded yet.
|
||||||
|
|
||||||
|
Returns the records for migrations that actually executed this call.
|
||||||
|
Returns an empty list when writes are frozen.
|
||||||
|
"""
|
||||||
|
if is_writes_frozen():
|
||||||
|
# Frozen-write databases are read-only between a restore and the
|
||||||
|
# imminent restart. Surface this clearly because the in-memory
|
||||||
|
# state may briefly reflect pre-migration data.
|
||||||
|
logger.warning("Data migrations skipped: writes are frozen (restart expected).")
|
||||||
|
return []
|
||||||
|
|
||||||
|
# Validate names up-front so an unnamed migration aborts before any
|
||||||
|
# transaction work.
|
||||||
|
for migration in migrations:
|
||||||
|
if not migration.name:
|
||||||
|
raise ValueError(
|
||||||
|
f"DataMigration {type(migration).__name__} must declare a non-empty name"
|
||||||
|
)
|
||||||
|
|
||||||
|
executed: list[MigrationRecord] = []
|
||||||
|
for migration in migrations:
|
||||||
|
# Use a separate transaction per migration so a failure rolls back
|
||||||
|
# only the failing one and any earlier (already-recorded)
|
||||||
|
# migrations keep their audit rows.
|
||||||
|
with self._db.transaction() as conn:
|
||||||
|
already_applied = conn.execute(
|
||||||
|
f"SELECT 1 FROM {self._TABLE} WHERE name = ?",
|
||||||
|
(migration.name,),
|
||||||
|
).fetchone()
|
||||||
|
if already_applied:
|
||||||
|
continue
|
||||||
|
logger.info("Applying data migration: %s", migration.name)
|
||||||
|
rows_changed = int(migration.apply(conn))
|
||||||
|
applied_at = datetime.now(timezone.utc).isoformat()
|
||||||
|
# INSERT OR IGNORE defends against a hypothetical concurrent
|
||||||
|
# writer that recorded the same name between the SELECT above
|
||||||
|
# and this INSERT (the RLock serialises this in practice, but
|
||||||
|
# the constraint is the durable contract).
|
||||||
|
conn.execute(
|
||||||
|
f"INSERT OR IGNORE INTO {self._TABLE} "
|
||||||
|
f"(name, applied_at, rows_changed) VALUES (?, ?, ?)",
|
||||||
|
(migration.name, applied_at, rows_changed),
|
||||||
|
)
|
||||||
|
executed.append(
|
||||||
|
MigrationRecord(
|
||||||
|
name=migration.name,
|
||||||
|
applied_at=applied_at,
|
||||||
|
rows_changed=rows_changed,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if rows_changed:
|
||||||
|
logger.warning("Migration %s rewrote %d row(s)", migration.name, rows_changed)
|
||||||
|
|
||||||
|
if executed:
|
||||||
|
logger.info(
|
||||||
|
"Applied %d data migration(s): %s",
|
||||||
|
len(executed),
|
||||||
|
", ".join(r.name for r in executed),
|
||||||
|
)
|
||||||
|
return executed
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Concrete migrations
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class StaticToSingleColorMigration(DataMigration):
|
||||||
|
"""Rename the legacy ``source_type='static'`` color-strip kind to ``single_color``.
|
||||||
|
|
||||||
|
The pre-rename rows in ``color_strip_sources`` used ``source_type='static'``;
|
||||||
|
the new canonical value is ``single_color``. The previous in-line string
|
||||||
|
replace risked rewriting unrelated substrings (e.g. an animation type named
|
||||||
|
``static_wave``). This migration parses the JSON, mutates the single field
|
||||||
|
we care about, and writes the canonical serialization back. Runs inside
|
||||||
|
the runner's transaction.
|
||||||
|
"""
|
||||||
|
|
||||||
|
name = "001_color_strip_static_to_single_color"
|
||||||
|
|
||||||
|
def apply(self, conn: sqlite3.Connection) -> int:
|
||||||
|
rows_changed = 0
|
||||||
|
rows = conn.execute("SELECT id, data FROM [color_strip_sources]").fetchall()
|
||||||
|
for row in rows:
|
||||||
|
blob = row["data"]
|
||||||
|
try:
|
||||||
|
parsed = json.loads(blob)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
# A corrupt blob is a pre-existing problem; do not crash the
|
||||||
|
# migration. The store's own load path will surface it.
|
||||||
|
logger.warning(
|
||||||
|
"Skipping corrupt color_strip_sources row %s during migration: %s",
|
||||||
|
row["id"],
|
||||||
|
e,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
if parsed.get("source_type") != "static":
|
||||||
|
continue
|
||||||
|
parsed["source_type"] = "single_color"
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE [color_strip_sources] SET data = ? WHERE id = ?",
|
||||||
|
(json.dumps(parsed, ensure_ascii=False), row["id"]),
|
||||||
|
)
|
||||||
|
rows_changed += 1
|
||||||
|
return rows_changed
|
||||||
|
|
||||||
|
|
||||||
|
# Master list — ORDER MATTERS. Append new migrations; never reorder.
|
||||||
|
ALL_MIGRATIONS: list[DataMigration] = [
|
||||||
|
StaticToSingleColorMigration(),
|
||||||
|
]
|
||||||
@@ -0,0 +1,85 @@
|
|||||||
|
"""Regression tests for the CSS response builder dispatch.
|
||||||
|
|
||||||
|
These lock in the safety net introduced when the silent "fallback to a fake
|
||||||
|
PictureCSSResponse" branch was removed:
|
||||||
|
|
||||||
|
* every concrete ColorStripSource subclass registered in ``_SOURCE_TYPE_MAP``
|
||||||
|
must have a builder in ``_RESPONSE_MAP`` (verified at module import); and
|
||||||
|
* the previously-missing ``GameEventColorStripSource`` round-trips through
|
||||||
|
the response builder into the matching ``GameEventCSSResponse`` schema.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from ledgrab.api.routes.color_strip_sources import _helpers
|
||||||
|
from ledgrab.api.schemas.color_strip_sources import GameEventCSSResponse
|
||||||
|
from ledgrab.storage.color_strip_source import (
|
||||||
|
ColorStripSource,
|
||||||
|
GameEventColorStripSource,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_response_map_covers_every_registered_source_type():
|
||||||
|
"""Every source_type in _SOURCE_TYPE_MAP has a response builder."""
|
||||||
|
storage_kinds = set(_helpers._STORAGE_TYPE_MAP.keys())
|
||||||
|
builder_kinds = set(_helpers._RESPONSE_MAP.keys())
|
||||||
|
missing = storage_kinds - builder_kinds
|
||||||
|
assert not missing, f"_RESPONSE_MAP is missing builders for: {sorted(missing)}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_assert_helper_raises_when_a_kind_is_unregistered(monkeypatch):
|
||||||
|
"""Adding a kind to the storage registry without a builder is caught."""
|
||||||
|
|
||||||
|
class _PhantomColorStripSource(ColorStripSource):
|
||||||
|
pass
|
||||||
|
|
||||||
|
monkeypatch.setitem(_helpers._STORAGE_TYPE_MAP, "phantom", _PhantomColorStripSource)
|
||||||
|
with pytest.raises(RuntimeError, match="phantom"):
|
||||||
|
_helpers._assert_response_map_coverage()
|
||||||
|
|
||||||
|
|
||||||
|
def test_game_event_source_serialises_to_game_event_response():
|
||||||
|
"""A GameEventColorStripSource is rendered as GameEventCSSResponse (not the
|
||||||
|
fake PictureCSSResponse that the silent fallback used to return).
|
||||||
|
"""
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
source = GameEventColorStripSource(
|
||||||
|
id="css_gev0001",
|
||||||
|
name="Counter-Strike feedback",
|
||||||
|
source_type="game_event",
|
||||||
|
created_at=now,
|
||||||
|
updated_at=now,
|
||||||
|
game_integration_id="gi_cs2_main",
|
||||||
|
event_mappings=[{"event": "hit", "effect": "flash"}],
|
||||||
|
led_count=60,
|
||||||
|
)
|
||||||
|
|
||||||
|
response = _helpers._css_to_response(source)
|
||||||
|
|
||||||
|
assert isinstance(response, GameEventCSSResponse)
|
||||||
|
assert response.source_type == "game_event"
|
||||||
|
assert response.game_integration_id == "gi_cs2_main"
|
||||||
|
assert response.event_mappings == [{"event": "hit", "effect": "flash"}]
|
||||||
|
assert response.led_count == 60
|
||||||
|
|
||||||
|
|
||||||
|
def test_unregistered_source_type_raises_in_css_to_response():
|
||||||
|
"""Reaching ``_css_to_response`` with an unmapped source_type raises loudly."""
|
||||||
|
|
||||||
|
class _UnregisteredCSS(ColorStripSource):
|
||||||
|
pass
|
||||||
|
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
source = _UnregisteredCSS(
|
||||||
|
id="css_xxx",
|
||||||
|
name="rogue",
|
||||||
|
source_type="rogue_unregistered",
|
||||||
|
created_at=now,
|
||||||
|
updated_at=now,
|
||||||
|
)
|
||||||
|
with pytest.raises(RuntimeError, match="No CSS response builder"):
|
||||||
|
_helpers._css_to_response(source)
|
||||||
@@ -0,0 +1,97 @@
|
|||||||
|
"""Tests for the unified color-strip stream-builder registry."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from ledgrab.core.processing import color_strip_kinds
|
||||||
|
from ledgrab.core.processing.color_strip_kinds import (
|
||||||
|
SHARABLE_KINDS,
|
||||||
|
STREAM_BUILDERS,
|
||||||
|
StreamDeps,
|
||||||
|
build_stream,
|
||||||
|
)
|
||||||
|
from ledgrab.storage.color_strip_source import _SOURCE_TYPE_MAP
|
||||||
|
|
||||||
|
|
||||||
|
def test_stream_builders_partition_storage_kinds_with_sharable():
|
||||||
|
"""STREAM_BUILDERS ∪ SHARABLE_KINDS == storage source_types."""
|
||||||
|
storage_kinds = set(_SOURCE_TYPE_MAP.keys())
|
||||||
|
covered = set(STREAM_BUILDERS.keys()) | SHARABLE_KINDS
|
||||||
|
assert (
|
||||||
|
storage_kinds == covered
|
||||||
|
), f"missing={storage_kinds - covered}, extra={covered - storage_kinds}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_sharable_kinds_list_matches_actual_sharable_property():
|
||||||
|
"""The hand-coded SHARABLE_KINDS list mirrors the `sharable` @property values.
|
||||||
|
|
||||||
|
Every ColorStripSource subclass currently implements ``sharable`` as a
|
||||||
|
literal ``return True``/``return False`` that does not touch ``self``,
|
||||||
|
so ``fget(None)`` is a safe introspection. If a future subclass changes
|
||||||
|
that contract — e.g. by computing sharability from instance state — this
|
||||||
|
test will raise a clear error rather than silently misclassify the kind,
|
||||||
|
forcing the maintainer to update both ``SHARABLE_KINDS`` and this test.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _is_sharable(name: str, cls) -> bool:
|
||||||
|
prop = getattr(cls, "sharable", None)
|
||||||
|
if not isinstance(prop, property) or prop.fget is None:
|
||||||
|
raise AssertionError(
|
||||||
|
f"{cls.__name__} (source_type={name!r}) does not expose `sharable` "
|
||||||
|
f"as a @property; SHARABLE_KINDS introspection no longer works."
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
return bool(prop.fget(None))
|
||||||
|
except (AttributeError, TypeError) as e:
|
||||||
|
raise AssertionError(
|
||||||
|
f"{cls.__name__}.sharable now depends on `self` ({e}); update "
|
||||||
|
"this test and SHARABLE_KINDS together."
|
||||||
|
) from e
|
||||||
|
|
||||||
|
actually_sharable = {name for name, cls in _SOURCE_TYPE_MAP.items() if _is_sharable(name, cls)}
|
||||||
|
assert actually_sharable == set(SHARABLE_KINDS), (
|
||||||
|
"SHARABLE_KINDS drifted from the @property values: "
|
||||||
|
f"actually_sharable={sorted(actually_sharable)}, "
|
||||||
|
f"SHARABLE_KINDS={sorted(SHARABLE_KINDS)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_stream_raises_on_unknown_kind():
|
||||||
|
class _FakeSource:
|
||||||
|
source_type = "totally_unregistered_kind"
|
||||||
|
id = "fake"
|
||||||
|
|
||||||
|
deps = StreamDeps(css_manager=None)
|
||||||
|
with pytest.raises(ValueError, match="totally_unregistered_kind"):
|
||||||
|
build_stream(_FakeSource(), deps)
|
||||||
|
|
||||||
|
|
||||||
|
def test_coverage_assertion_raises_on_drift(monkeypatch):
|
||||||
|
"""Adding a kind to storage's _SOURCE_TYPE_MAP without a builder fails fast."""
|
||||||
|
|
||||||
|
monkeypatch.setitem(_SOURCE_TYPE_MAP, "phantom", object)
|
||||||
|
with pytest.raises(RuntimeError, match="phantom"):
|
||||||
|
color_strip_kinds._assert_stream_kind_coverage()
|
||||||
|
|
||||||
|
|
||||||
|
def test_legacy_static_alias_resolves_to_single_color_builder():
|
||||||
|
"""The legacy 'static' source_type still maps to the single_color builder."""
|
||||||
|
assert STREAM_BUILDERS["static"] is not None
|
||||||
|
assert STREAM_BUILDERS["single_color"] is not None
|
||||||
|
# The two route to the same loader, but builders are distinct closures —
|
||||||
|
# call them with a tiny source stub and confirm same class.
|
||||||
|
from ledgrab.core.processing.color_strip_stream import SingleColorStripStream
|
||||||
|
from ledgrab.storage.color_strip_source import SingleColorStripSource
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
src = SingleColorStripSource(
|
||||||
|
id="t",
|
||||||
|
name="t",
|
||||||
|
source_type="static",
|
||||||
|
created_at=datetime.now(timezone.utc),
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
deps = StreamDeps(css_manager=None)
|
||||||
|
s = STREAM_BUILDERS["static"](src, deps)
|
||||||
|
assert isinstance(s, SingleColorStripStream)
|
||||||
@@ -0,0 +1,64 @@
|
|||||||
|
"""Tests for the unified value-stream-builder registry."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from ledgrab.core.processing import value_kinds
|
||||||
|
from ledgrab.core.processing.value_kinds import (
|
||||||
|
NEEDS_CLOCK_RUNTIME,
|
||||||
|
STREAM_BUILDERS,
|
||||||
|
ValueStreamDeps,
|
||||||
|
build_stream,
|
||||||
|
)
|
||||||
|
from ledgrab.storage.value_source import _VALUE_SOURCE_MAP
|
||||||
|
|
||||||
|
|
||||||
|
def test_builders_cover_every_storage_kind():
|
||||||
|
"""STREAM_BUILDERS keys == storage._VALUE_SOURCE_MAP keys."""
|
||||||
|
storage_kinds = set(_VALUE_SOURCE_MAP.keys())
|
||||||
|
builder_kinds = set(STREAM_BUILDERS.keys())
|
||||||
|
assert storage_kinds == builder_kinds, (
|
||||||
|
f"missing={storage_kinds - builder_kinds}, " f"extra={builder_kinds - storage_kinds}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_stream_raises_on_unknown_kind():
|
||||||
|
class _Fake:
|
||||||
|
source_type = "totally_unregistered"
|
||||||
|
id = "fake"
|
||||||
|
|
||||||
|
deps = ValueStreamDeps(value_stream_manager=None)
|
||||||
|
with pytest.raises(ValueError, match="totally_unregistered"):
|
||||||
|
build_stream(_Fake(), deps)
|
||||||
|
|
||||||
|
|
||||||
|
def test_coverage_assertion_raises_on_drift(monkeypatch):
|
||||||
|
"""A kind added to storage without a builder fails the import-time check."""
|
||||||
|
monkeypatch.setitem(_VALUE_SOURCE_MAP, "phantom_kind", object)
|
||||||
|
with pytest.raises(RuntimeError, match="phantom_kind"):
|
||||||
|
value_kinds._assert_value_kind_coverage()
|
||||||
|
|
||||||
|
|
||||||
|
def test_needs_clock_runtime_is_subset_of_registered_kinds():
|
||||||
|
assert NEEDS_CLOCK_RUNTIME.issubset(set(STREAM_BUILDERS.keys()))
|
||||||
|
|
||||||
|
|
||||||
|
def test_static_builder_returns_static_value_stream():
|
||||||
|
from ledgrab.core.processing.value_stream import StaticValueStream
|
||||||
|
from ledgrab.storage.value_source import StaticValueSource
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
src = StaticValueSource(
|
||||||
|
id="vs_t",
|
||||||
|
name="t",
|
||||||
|
source_type="static",
|
||||||
|
created_at=datetime.now(timezone.utc),
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
value=0.42,
|
||||||
|
)
|
||||||
|
deps = ValueStreamDeps(value_stream_manager=None)
|
||||||
|
stream = build_stream(src, deps)
|
||||||
|
assert isinstance(stream, StaticValueStream)
|
||||||
|
# Sanity: the value flowed through
|
||||||
|
assert stream.get_value() == 0.42
|
||||||
@@ -0,0 +1,90 @@
|
|||||||
|
"""Tests for ColorStripStore startup migrations."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from ledgrab.storage.color_strip_source import SingleColorStripSource
|
||||||
|
from ledgrab.storage.color_strip_store import ColorStripStore
|
||||||
|
from ledgrab.storage.database import Database
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def tmp_db(tmp_path):
|
||||||
|
db = Database(tmp_path / "test.db")
|
||||||
|
yield db
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _insert_legacy_row(db: Database, item_id: str, name: str, color: list[int]) -> dict:
|
||||||
|
"""Insert a pre-rename color strip row with ``source_type='static'``."""
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
now = datetime.now(timezone.utc).isoformat()
|
||||||
|
blob = {
|
||||||
|
"id": item_id,
|
||||||
|
"name": name,
|
||||||
|
"source_type": "static",
|
||||||
|
"color": color,
|
||||||
|
"animation": None,
|
||||||
|
"tags": [],
|
||||||
|
"led_count": 30,
|
||||||
|
"created_at": now,
|
||||||
|
"updated_at": now,
|
||||||
|
"icon": "",
|
||||||
|
"icon_color": "",
|
||||||
|
}
|
||||||
|
db.upsert("color_strip_sources", item_id, name, blob)
|
||||||
|
return blob
|
||||||
|
|
||||||
|
|
||||||
|
def test_legacy_static_row_is_rewritten_to_single_color(tmp_db):
|
||||||
|
"""A row with ``source_type='static'`` is rewritten on first store load."""
|
||||||
|
_insert_legacy_row(tmp_db, "css_legacy01", "Legacy Solid", [255, 0, 0])
|
||||||
|
|
||||||
|
# Sanity: legacy blob really is on disk
|
||||||
|
raw_before = tmp_db.load_all("color_strip_sources")
|
||||||
|
assert raw_before[0]["source_type"] == "static"
|
||||||
|
|
||||||
|
store = ColorStripStore(tmp_db)
|
||||||
|
|
||||||
|
# In-memory item is a SingleColorStripSource with the canonical type
|
||||||
|
item = store.get("css_legacy01")
|
||||||
|
assert isinstance(item, SingleColorStripSource)
|
||||||
|
assert item.source_type == "single_color"
|
||||||
|
|
||||||
|
# The on-disk row was rewritten too — no second migration on next boot
|
||||||
|
raw_after = tmp_db.load_all("color_strip_sources")
|
||||||
|
assert raw_after[0]["source_type"] == "single_color"
|
||||||
|
|
||||||
|
|
||||||
|
def test_already_migrated_row_is_left_alone(tmp_db):
|
||||||
|
"""Rows already using ``single_color`` are not touched."""
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
now = datetime.now(timezone.utc).isoformat()
|
||||||
|
blob = {
|
||||||
|
"id": "css_new01",
|
||||||
|
"name": "Already Migrated",
|
||||||
|
"source_type": "single_color",
|
||||||
|
"color": [0, 128, 255],
|
||||||
|
"animation": None,
|
||||||
|
"tags": [],
|
||||||
|
"led_count": 30,
|
||||||
|
"created_at": now,
|
||||||
|
"updated_at": now,
|
||||||
|
"icon": "",
|
||||||
|
"icon_color": "",
|
||||||
|
}
|
||||||
|
tmp_db.upsert("color_strip_sources", "css_new01", "Already Migrated", blob)
|
||||||
|
|
||||||
|
raw_before = tmp_db.load_all("color_strip_sources")
|
||||||
|
before_json = json.dumps(raw_before[0], sort_keys=True)
|
||||||
|
|
||||||
|
store = ColorStripStore(tmp_db)
|
||||||
|
item = store.get("css_new01")
|
||||||
|
assert isinstance(item, SingleColorStripSource)
|
||||||
|
|
||||||
|
raw_after = tmp_db.load_all("color_strip_sources")
|
||||||
|
after_json = json.dumps(raw_after[0], sort_keys=True)
|
||||||
|
assert before_json == after_json
|
||||||
@@ -0,0 +1,309 @@
|
|||||||
|
"""Tests for the versioned data-migration runner.
|
||||||
|
|
||||||
|
These verify the safety properties that replaced the string-replace migration:
|
||||||
|
|
||||||
|
* migrations parse JSON properly (no substring corruption);
|
||||||
|
* the runner records each migration name in ``data_migrations`` so subsequent
|
||||||
|
runs are no-ops;
|
||||||
|
* a migration that touches no rows still gets recorded as applied;
|
||||||
|
* a migration that raises does not get recorded (so a fixed migration can be
|
||||||
|
re-attempted on the next boot);
|
||||||
|
* the legacy ``static`` rename only rewrites rows whose ``source_type`` field
|
||||||
|
actually equals ``static`` — values that merely contain the substring (e.g.
|
||||||
|
an animation type ``static_wave``) are left untouched.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from ledgrab.storage.data_migrations import (
|
||||||
|
ALL_MIGRATIONS,
|
||||||
|
DataMigration,
|
||||||
|
MigrationRunner,
|
||||||
|
StaticToSingleColorMigration,
|
||||||
|
)
|
||||||
|
from ledgrab.storage.database import Database
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def tmp_db(tmp_path):
|
||||||
|
db = Database(tmp_path / "test.db")
|
||||||
|
yield db
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _now() -> str:
|
||||||
|
return datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
|
||||||
|
def _insert_css(db: Database, item_id: str, blob: dict) -> None:
|
||||||
|
db.upsert("color_strip_sources", item_id, blob.get("name", ""), blob)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Runner mechanics
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_runner_creates_audit_table(tmp_db):
|
||||||
|
MigrationRunner(tmp_db)
|
||||||
|
cursor = tmp_db.execute(
|
||||||
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='data_migrations'"
|
||||||
|
)
|
||||||
|
assert cursor.fetchone() is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_runner_records_applied_migration(tmp_db):
|
||||||
|
class _Noop(DataMigration):
|
||||||
|
name = "test_noop"
|
||||||
|
|
||||||
|
def apply(self, conn):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
runner = MigrationRunner(tmp_db)
|
||||||
|
records = runner.run([_Noop()])
|
||||||
|
|
||||||
|
assert [r.name for r in records] == ["test_noop"]
|
||||||
|
assert "test_noop" in runner.applied_names()
|
||||||
|
|
||||||
|
|
||||||
|
def test_runner_skips_already_applied_migration(tmp_db):
|
||||||
|
calls = 0
|
||||||
|
|
||||||
|
class _CountedMigration(DataMigration):
|
||||||
|
name = "test_counted"
|
||||||
|
|
||||||
|
def apply(self, conn):
|
||||||
|
nonlocal calls
|
||||||
|
calls += 1
|
||||||
|
return 0
|
||||||
|
|
||||||
|
runner = MigrationRunner(tmp_db)
|
||||||
|
runner.run([_CountedMigration()])
|
||||||
|
runner.run([_CountedMigration()])
|
||||||
|
runner.run([_CountedMigration()])
|
||||||
|
|
||||||
|
assert calls == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_runner_records_zero_row_migration_so_it_does_not_repeat(tmp_db):
|
||||||
|
"""Even when a migration changes 0 rows, it is recorded as applied."""
|
||||||
|
|
||||||
|
class _NoMatch(DataMigration):
|
||||||
|
name = "test_no_match"
|
||||||
|
|
||||||
|
def apply(self, conn):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
runner = MigrationRunner(tmp_db)
|
||||||
|
runner.run([_NoMatch()])
|
||||||
|
|
||||||
|
assert "test_no_match" in runner.applied_names()
|
||||||
|
|
||||||
|
|
||||||
|
def test_runner_does_not_record_failed_migration(tmp_db):
|
||||||
|
class _Boom(DataMigration):
|
||||||
|
name = "test_boom"
|
||||||
|
|
||||||
|
def apply(self, conn):
|
||||||
|
raise RuntimeError("kaboom")
|
||||||
|
|
||||||
|
runner = MigrationRunner(tmp_db)
|
||||||
|
with pytest.raises(RuntimeError, match="kaboom"):
|
||||||
|
runner.run([_Boom()])
|
||||||
|
|
||||||
|
assert "test_boom" not in runner.applied_names()
|
||||||
|
|
||||||
|
|
||||||
|
def test_runner_rejects_unnamed_migration(tmp_db):
|
||||||
|
class _Unnamed(DataMigration):
|
||||||
|
def apply(self, conn):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
runner = MigrationRunner(tmp_db)
|
||||||
|
with pytest.raises(ValueError, match="non-empty name"):
|
||||||
|
runner.run([_Unnamed()])
|
||||||
|
|
||||||
|
|
||||||
|
def test_runner_rolls_back_data_change_when_migration_raises(tmp_db):
|
||||||
|
"""A migration that raises mid-way leaves no partial UPDATEs behind."""
|
||||||
|
blob = {
|
||||||
|
"id": "css_partial",
|
||||||
|
"name": "Partial",
|
||||||
|
"source_type": "static",
|
||||||
|
"color": [10, 20, 30],
|
||||||
|
"tags": [],
|
||||||
|
"led_count": 30,
|
||||||
|
"created_at": _now(),
|
||||||
|
"updated_at": _now(),
|
||||||
|
}
|
||||||
|
_insert_css(tmp_db, "css_partial", blob)
|
||||||
|
|
||||||
|
class _PartialBoom(DataMigration):
|
||||||
|
name = "test_partial_boom"
|
||||||
|
|
||||||
|
def apply(self, conn):
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE color_strip_sources SET data = ? WHERE id = ?",
|
||||||
|
(json.dumps({**blob, "source_type": "single_color"}), "css_partial"),
|
||||||
|
)
|
||||||
|
raise RuntimeError("explode after update")
|
||||||
|
|
||||||
|
runner = MigrationRunner(tmp_db)
|
||||||
|
with pytest.raises(RuntimeError, match="explode after update"):
|
||||||
|
runner.run([_PartialBoom()])
|
||||||
|
|
||||||
|
rows = tmp_db.load_all("color_strip_sources")
|
||||||
|
assert rows[0]["source_type"] == "static", "UPDATE was not rolled back"
|
||||||
|
assert "test_partial_boom" not in runner.applied_names()
|
||||||
|
|
||||||
|
|
||||||
|
def test_runner_skips_when_writes_are_frozen(tmp_db, monkeypatch):
|
||||||
|
"""Frozen-write databases must not be mutated by the runner."""
|
||||||
|
monkeypatch.setattr("ledgrab.storage.data_migrations.is_writes_frozen", lambda: True)
|
||||||
|
|
||||||
|
calls = 0
|
||||||
|
|
||||||
|
class _Counted(DataMigration):
|
||||||
|
name = "test_frozen"
|
||||||
|
|
||||||
|
def apply(self, conn):
|
||||||
|
nonlocal calls
|
||||||
|
calls += 1
|
||||||
|
return 0
|
||||||
|
|
||||||
|
runner = MigrationRunner(tmp_db)
|
||||||
|
records = runner.run([_Counted()])
|
||||||
|
|
||||||
|
assert records == []
|
||||||
|
assert calls == 0
|
||||||
|
assert "test_frozen" not in runner.applied_names()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Legacy static-rename specifics
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_static_rename_only_touches_source_type_field(tmp_db):
|
||||||
|
"""A row whose ``animation.type`` happens to contain ``static`` survives intact."""
|
||||||
|
blob = {
|
||||||
|
"id": "css_safe01",
|
||||||
|
"name": "Has static_wave anim",
|
||||||
|
"source_type": "gradient", # not "static" — should be untouched
|
||||||
|
"animation": {"type": "static_wave", "speed": 1.0}, # substring trap
|
||||||
|
"stops": [{"position": 0.0, "color": [10, 20, 30]}],
|
||||||
|
"tags": [],
|
||||||
|
"led_count": 30,
|
||||||
|
"created_at": _now(),
|
||||||
|
"updated_at": _now(),
|
||||||
|
}
|
||||||
|
_insert_css(tmp_db, "css_safe01", blob)
|
||||||
|
|
||||||
|
# Run the migration via the runner (clears the existing audit row first
|
||||||
|
# because the Database fixture already triggered it through any incidental
|
||||||
|
# store creation — here we drive it directly).
|
||||||
|
runner = MigrationRunner(tmp_db)
|
||||||
|
runner.run([StaticToSingleColorMigration()])
|
||||||
|
|
||||||
|
rows = tmp_db.load_all("color_strip_sources")
|
||||||
|
assert len(rows) == 1
|
||||||
|
assert rows[0]["source_type"] == "gradient"
|
||||||
|
assert rows[0]["animation"]["type"] == "static_wave"
|
||||||
|
|
||||||
|
|
||||||
|
def test_static_rename_rewrites_only_legacy_rows(tmp_db):
|
||||||
|
legacy = {
|
||||||
|
"id": "css_legacy01",
|
||||||
|
"name": "Legacy",
|
||||||
|
"source_type": "static",
|
||||||
|
"color": [255, 0, 0],
|
||||||
|
"tags": [],
|
||||||
|
"led_count": 30,
|
||||||
|
"created_at": _now(),
|
||||||
|
"updated_at": _now(),
|
||||||
|
}
|
||||||
|
current = {
|
||||||
|
"id": "css_modern01",
|
||||||
|
"name": "Modern",
|
||||||
|
"source_type": "single_color",
|
||||||
|
"color": [0, 128, 255],
|
||||||
|
"tags": [],
|
||||||
|
"led_count": 30,
|
||||||
|
"created_at": _now(),
|
||||||
|
"updated_at": _now(),
|
||||||
|
}
|
||||||
|
_insert_css(tmp_db, "css_legacy01", legacy)
|
||||||
|
_insert_css(tmp_db, "css_modern01", current)
|
||||||
|
|
||||||
|
runner = MigrationRunner(tmp_db)
|
||||||
|
runner.run([StaticToSingleColorMigration()])
|
||||||
|
|
||||||
|
rows = {r["id"]: r for r in tmp_db.load_all("color_strip_sources")}
|
||||||
|
assert rows["css_legacy01"]["source_type"] == "single_color"
|
||||||
|
assert rows["css_modern01"]["source_type"] == "single_color"
|
||||||
|
|
||||||
|
|
||||||
|
def test_static_rename_survives_corrupt_blob(tmp_db):
|
||||||
|
"""A row whose JSON blob is corrupt does not crash the migration."""
|
||||||
|
# Insert a row with deliberately broken JSON via direct SQL (bypass upsert
|
||||||
|
# which would re-serialize). We use a hand-crafted blob.
|
||||||
|
tmp_db.execute(
|
||||||
|
"INSERT INTO color_strip_sources (id, name, data) VALUES (?, ?, ?)",
|
||||||
|
("css_bad01", "Bad", "{not valid json"),
|
||||||
|
)
|
||||||
|
legacy = {
|
||||||
|
"id": "css_legacy02",
|
||||||
|
"name": "Legacy after corrupt",
|
||||||
|
"source_type": "static",
|
||||||
|
"color": [1, 2, 3],
|
||||||
|
"tags": [],
|
||||||
|
"led_count": 30,
|
||||||
|
"created_at": _now(),
|
||||||
|
"updated_at": _now(),
|
||||||
|
}
|
||||||
|
_insert_css(tmp_db, "css_legacy02", legacy)
|
||||||
|
|
||||||
|
runner = MigrationRunner(tmp_db)
|
||||||
|
runner.run([StaticToSingleColorMigration()])
|
||||||
|
|
||||||
|
# Legacy row still migrated, corrupt row untouched, no exception
|
||||||
|
cursor = tmp_db.execute("SELECT id, data FROM color_strip_sources")
|
||||||
|
out = {row["id"]: row["data"] for row in cursor.fetchall()}
|
||||||
|
assert out["css_bad01"] == "{not valid json"
|
||||||
|
assert json.loads(out["css_legacy02"])["source_type"] == "single_color"
|
||||||
|
|
||||||
|
|
||||||
|
def test_all_migrations_have_unique_names():
|
||||||
|
names = [m.name for m in ALL_MIGRATIONS]
|
||||||
|
assert len(names) == len(set(names)), f"duplicate names: {names}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_color_strip_store_construction_records_static_migration(tmp_db):
|
||||||
|
"""Integration: constructing the store applies AND records the migration."""
|
||||||
|
from ledgrab.storage.color_strip_source import SingleColorStripSource
|
||||||
|
from ledgrab.storage.color_strip_store import ColorStripStore
|
||||||
|
|
||||||
|
legacy = {
|
||||||
|
"id": "css_int01",
|
||||||
|
"name": "Integration legacy",
|
||||||
|
"source_type": "static",
|
||||||
|
"color": [1, 2, 3],
|
||||||
|
"tags": [],
|
||||||
|
"led_count": 30,
|
||||||
|
"created_at": _now(),
|
||||||
|
"updated_at": _now(),
|
||||||
|
}
|
||||||
|
_insert_css(tmp_db, "css_int01", legacy)
|
||||||
|
|
||||||
|
store = ColorStripStore(tmp_db)
|
||||||
|
|
||||||
|
# In-memory shape is correct
|
||||||
|
assert isinstance(store.get("css_int01"), SingleColorStripSource)
|
||||||
|
# Audit row exists
|
||||||
|
runner = MigrationRunner(tmp_db)
|
||||||
|
assert "001_color_strip_static_to_single_color" in runner.applied_names()
|
||||||
Reference in New Issue
Block a user