refactor(storage,processing): kind registries + versioned data migrations
Two CRITICAL data-safety bugs from the architecture audit and the two
worst parallel-change problems are fixed in one coherent pass.
Audit findings addressed:
- C2 silent CSS response fallback. The previous _RESPONSE_MAP fell
through to a fabricated PictureCSSResponse whenever a source
class lacked an entry; in particular game_event sources were
silently mis-shaped. Now: GameEventCSSResponse/Create/Update
schemas exist, _RESPONSE_MAP is re-keyed by source_type string,
an import-time _assert_response_map_coverage() requires symmetric
agreement with storage._SOURCE_TYPE_MAP, and the runtime path
raises instead of fabricating a response.
- C11 string-replace JSON migration. ColorStripStore used
blob.replace('"source_type": "static"', '"source_type":
"single_color"') which can corrupt unrelated substrings (e.g.
an animation type named "static_wave") and provides no audit,
no transaction, no idempotency. Replaced with
storage.data_migrations.MigrationRunner backed by a
data_migrations audit table. Each migration runs inside one
db.transaction() that covers the applied-check, the apply(),
and the audit-INSERT — partial failures roll back atomically.
StaticToSingleColorMigration parses each row with json.loads
and mutates only the source_type field. Frozen-write databases
skip with a warning.
- C3+C4 color-strip stream dispatch. The 7-branch elif in
ColorStripStreamManager.acquire() and the duplicate one in
ws_stream._create_stream() now share a single STREAM_BUILDERS
registry in core.processing.color_strip_kinds, keyed by
source.source_type. Both call sites populate a StreamDeps bag
and delegate to build_stream(). _assert_stream_kind_coverage()
asserts at import that STREAM_BUILDERS plus SHARABLE_KINDS
partitions storage._SOURCE_TYPE_MAP. ws_stream's preview path
wraps each FastAPI-DI getter in _safe() so non-audio previews
no longer crash when audio/CSPT stores are not wired.
- C6+C7 value stream dispatch. The 14-branch isinstance ladder in
ValueStreamManager._create_stream and its silent
StaticValueStream(value=1.0) fallback are replaced by
core.processing.value_kinds.STREAM_BUILDERS, keyed by
source_type string (so AdaptiveValueSource's adaptive_time and
adaptive_scene route to different builders correctly). The
manager retains only the SyncClockRuntime pre-acquisition step
for animated_color (kinds needing this are listed explicitly
in NEEDS_CLOCK_RUNTIME). Symmetric coverage assertion plus a
separate assertion that NEEDS_CLOCK_RUNTIME is a subset of the
registry.
Bundled in: the static->single_color rename plus the HTTPValueStream
/ http_endpoint introduction that were already in flight on this
branch share these files; the registry refactor naturally absorbs
both via the new "single_color" / "static" alias entries and the
_build_http builder.
Tests: 26 new tests cover response-map coverage drift, migration
runner audit-table mechanics + transactional rollback +
frozen-write skip, and the two stream-builder registries. 343
existing storage / API / e2e tests stay green. Ruff clean.
This commit is contained in:
@@ -9,6 +9,7 @@ from ledgrab.api.schemas.color_strip_sources import (
|
||||
CompositeCSSResponse,
|
||||
DaylightCSSResponse,
|
||||
EffectCSSResponse,
|
||||
GameEventCSSResponse,
|
||||
GradientCSSResponse,
|
||||
KeyColorsCSSResponse,
|
||||
MappedCSSResponse,
|
||||
@@ -17,7 +18,7 @@ from ledgrab.api.schemas.color_strip_sources import (
|
||||
PictureAdvancedCSSResponse,
|
||||
PictureCSSResponse,
|
||||
ProcessedCSSResponse,
|
||||
StaticCSSResponse,
|
||||
SingleColorCSSResponse,
|
||||
WeatherCSSResponse,
|
||||
)
|
||||
from ledgrab.api.schemas.devices import Calibration as CalibrationSchema
|
||||
@@ -26,22 +27,7 @@ from ledgrab.core.capture.calibration import (
|
||||
calibration_to_dict,
|
||||
)
|
||||
from ledgrab.storage.color_strip_source import (
|
||||
AdvancedPictureColorStripSource,
|
||||
ApiInputColorStripSource,
|
||||
AudioColorStripSource,
|
||||
CandlelightColorStripSource,
|
||||
CompositeColorStripSource,
|
||||
DaylightColorStripSource,
|
||||
EffectColorStripSource,
|
||||
GradientColorStripSource,
|
||||
KeyColorsColorStripSource,
|
||||
MappedColorStripSource,
|
||||
MathWaveColorStripSource,
|
||||
NotificationColorStripSource,
|
||||
PictureColorStripSource,
|
||||
ProcessedColorStripSource,
|
||||
StaticColorStripSource,
|
||||
WeatherColorStripSource,
|
||||
_SOURCE_TYPE_MAP as _STORAGE_TYPE_MAP,
|
||||
)
|
||||
from ledgrab.storage.picture_source import (
|
||||
ProcessedPictureSource,
|
||||
@@ -94,34 +80,46 @@ def _stops_schema(source) -> list[ColorStopSchema] | None:
|
||||
return None
|
||||
|
||||
|
||||
# Maps storage class → response builder lambda.
|
||||
# Maps ``source_type`` string → response builder.
|
||||
#
|
||||
# Keying by source_type (rather than type(source)) lets the import-time
|
||||
# coverage check use the storage registry's keys directly, with no
|
||||
# inversion or duplicate-class handling for legacy aliases.
|
||||
_RESPONSE_MAP: dict = {
|
||||
PictureColorStripSource: lambda s, kw: PictureCSSResponse(
|
||||
"picture": lambda s, kw: PictureCSSResponse(
|
||||
**kw,
|
||||
picture_source_id=s.picture_source_id,
|
||||
smoothing=s.smoothing.to_dict(),
|
||||
interpolation_mode=s.interpolation_mode,
|
||||
calibration=_calibration_schema(s),
|
||||
),
|
||||
AdvancedPictureColorStripSource: lambda s, kw: PictureAdvancedCSSResponse(
|
||||
"picture_advanced": lambda s, kw: PictureAdvancedCSSResponse(
|
||||
**kw,
|
||||
smoothing=s.smoothing.to_dict(),
|
||||
interpolation_mode=s.interpolation_mode,
|
||||
calibration=_calibration_schema(s),
|
||||
),
|
||||
StaticColorStripSource: lambda s, kw: StaticCSSResponse(
|
||||
"single_color": lambda s, kw: SingleColorCSSResponse(
|
||||
**kw,
|
||||
color=s.color.to_dict(),
|
||||
animation=s.animation,
|
||||
),
|
||||
GradientColorStripSource: lambda s, kw: GradientCSSResponse(
|
||||
# Legacy alias: pre-rename rows used "static"; the data migration rewrites
|
||||
# them on first store load but a stale in-flight instance would still
|
||||
# carry source_type='static' until the next reload.
|
||||
"static": lambda s, kw: SingleColorCSSResponse(
|
||||
**kw,
|
||||
color=s.color.to_dict(),
|
||||
animation=s.animation,
|
||||
),
|
||||
"gradient": lambda s, kw: GradientCSSResponse(
|
||||
**kw,
|
||||
stops=_stops_schema(s),
|
||||
animation=s.animation,
|
||||
easing=s.easing,
|
||||
gradient_id=s.gradient_id,
|
||||
),
|
||||
EffectColorStripSource: lambda s, kw: EffectCSSResponse(
|
||||
"effect": lambda s, kw: EffectCSSResponse(
|
||||
**kw,
|
||||
effect_type=s.effect_type,
|
||||
palette=s.palette,
|
||||
@@ -132,15 +130,15 @@ _RESPONSE_MAP: dict = {
|
||||
mirror=s.mirror,
|
||||
custom_palette=s.custom_palette,
|
||||
),
|
||||
CompositeColorStripSource: lambda s, kw: CompositeCSSResponse(
|
||||
"composite": lambda s, kw: CompositeCSSResponse(
|
||||
**kw,
|
||||
layers=[dict(layer) for layer in s.layers],
|
||||
),
|
||||
MappedColorStripSource: lambda s, kw: MappedCSSResponse(
|
||||
"mapped": lambda s, kw: MappedCSSResponse(
|
||||
**kw,
|
||||
zones=[dict(z) for z in s.zones],
|
||||
),
|
||||
AudioColorStripSource: lambda s, kw: AudioCSSResponse(
|
||||
"audio": lambda s, kw: AudioCSSResponse(
|
||||
**kw,
|
||||
visualization_mode=s.visualization_mode,
|
||||
audio_source_id=s.audio_source_id,
|
||||
@@ -153,13 +151,13 @@ _RESPONSE_MAP: dict = {
|
||||
mirror=s.mirror,
|
||||
beat_decay=s.beat_decay.to_dict(),
|
||||
),
|
||||
ApiInputColorStripSource: lambda s, kw: ApiInputCSSResponse(
|
||||
"api_input": lambda s, kw: ApiInputCSSResponse(
|
||||
**kw,
|
||||
fallback_color=s.fallback_color.to_dict(),
|
||||
timeout=s.timeout.to_dict(),
|
||||
interpolation=s.interpolation,
|
||||
),
|
||||
NotificationColorStripSource: lambda s, kw: NotificationCSSResponse(
|
||||
"notification": lambda s, kw: NotificationCSSResponse(
|
||||
**kw,
|
||||
notification_effect=s.notification_effect,
|
||||
duration_ms=s.duration_ms.to_dict(),
|
||||
@@ -172,14 +170,14 @@ _RESPONSE_MAP: dict = {
|
||||
sound_volume=s.sound_volume.to_dict(),
|
||||
app_sounds=dict(s.app_sounds),
|
||||
),
|
||||
DaylightColorStripSource: lambda s, kw: DaylightCSSResponse(
|
||||
"daylight": lambda s, kw: DaylightCSSResponse(
|
||||
**kw,
|
||||
speed=s.speed.to_dict(),
|
||||
use_real_time=s.use_real_time,
|
||||
latitude=s.latitude,
|
||||
longitude=s.longitude,
|
||||
),
|
||||
CandlelightColorStripSource: lambda s, kw: CandlelightCSSResponse(
|
||||
"candlelight": lambda s, kw: CandlelightCSSResponse(
|
||||
**kw,
|
||||
color=s.color.to_dict(),
|
||||
intensity=s.intensity.to_dict(),
|
||||
@@ -188,18 +186,18 @@ _RESPONSE_MAP: dict = {
|
||||
wind_strength=s.wind_strength.to_dict(),
|
||||
candle_type=s.candle_type,
|
||||
),
|
||||
ProcessedColorStripSource: lambda s, kw: ProcessedCSSResponse(
|
||||
"processed": lambda s, kw: ProcessedCSSResponse(
|
||||
**kw,
|
||||
input_source_id=s.input_source_id,
|
||||
processing_template_id=s.processing_template_id,
|
||||
),
|
||||
WeatherColorStripSource: lambda s, kw: WeatherCSSResponse(
|
||||
"weather": lambda s, kw: WeatherCSSResponse(
|
||||
**kw,
|
||||
weather_source_id=s.weather_source_id,
|
||||
speed=s.speed.to_dict(),
|
||||
temperature_influence=s.temperature_influence.to_dict(),
|
||||
),
|
||||
KeyColorsColorStripSource: lambda s, kw: KeyColorsCSSResponse(
|
||||
"key_colors": lambda s, kw: KeyColorsCSSResponse(
|
||||
**kw,
|
||||
picture_source_id=s.picture_source_id,
|
||||
rectangles=[r.to_dict() for r in s.rectangles],
|
||||
@@ -207,28 +205,67 @@ _RESPONSE_MAP: dict = {
|
||||
smoothing=s.smoothing.to_dict(),
|
||||
brightness=s.brightness.to_dict(),
|
||||
),
|
||||
MathWaveColorStripSource: lambda s, kw: MathWaveCSSResponse(
|
||||
"math_wave": lambda s, kw: MathWaveCSSResponse(
|
||||
**kw,
|
||||
waves=s.waves,
|
||||
speed=s.speed.to_dict(),
|
||||
gradient_id=s.gradient_id,
|
||||
),
|
||||
"game_event": lambda s, kw: GameEventCSSResponse(
|
||||
**kw,
|
||||
game_integration_id=s.game_integration_id,
|
||||
idle_color=s.idle_color.to_dict(),
|
||||
event_mappings=[dict(m) for m in s.event_mappings],
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def _assert_response_map_coverage() -> None:
|
||||
"""Verify _RESPONSE_MAP has a builder for every kind in storage's registry.
|
||||
|
||||
Runs at module import. Surfaces missing builders eagerly instead of
|
||||
letting a request fall through to a silent / wrong response shape.
|
||||
|
||||
Contract note
|
||||
-------------
|
||||
This check is **symmetric** (``_RESPONSE_MAP keys == storage_kinds``)
|
||||
because every kind — sharable or not — needs a response shape. The
|
||||
sister assertion in
|
||||
``core/processing/color_strip_kinds.py::_assert_stream_kind_coverage``
|
||||
is asymmetric because sharable kinds construct their streams via a
|
||||
different path. Adding a new kind requires keeping all three registries
|
||||
aligned: storage's ``_SOURCE_TYPE_MAP``, this ``_RESPONSE_MAP``, and
|
||||
either ``STREAM_BUILDERS`` or ``SHARABLE_KINDS``.
|
||||
"""
|
||||
storage_kinds = set(_STORAGE_TYPE_MAP.keys())
|
||||
builder_kinds = set(_RESPONSE_MAP.keys())
|
||||
missing = storage_kinds - builder_kinds
|
||||
extra = builder_kinds - storage_kinds
|
||||
if missing or extra:
|
||||
problems = []
|
||||
if missing:
|
||||
problems.append(f"missing builders for: {sorted(missing)}")
|
||||
if extra:
|
||||
problems.append(f"unregistered kinds in _RESPONSE_MAP: {sorted(extra)}")
|
||||
raise RuntimeError(
|
||||
"_RESPONSE_MAP is out of sync with storage._SOURCE_TYPE_MAP: " + "; ".join(problems)
|
||||
)
|
||||
|
||||
|
||||
_assert_response_map_coverage()
|
||||
|
||||
|
||||
def _css_to_response(source, overlay_active: bool = False) -> ColorStripSourceResponse:
|
||||
"""Convert a ColorStripSource to the matching per-type response schema."""
|
||||
kw = _common_response_kwargs(source, overlay_active)
|
||||
builder = _RESPONSE_MAP.get(type(source))
|
||||
builder = _RESPONSE_MAP.get(source.source_type)
|
||||
if builder is None:
|
||||
# Fallback: use to_dict() and build a PictureCSSResponse
|
||||
logger.warning("No response builder for %s, falling back", type(source).__name__)
|
||||
return PictureCSSResponse(
|
||||
**kw,
|
||||
picture_source_id="",
|
||||
smoothing=0.3,
|
||||
interpolation_mode="average",
|
||||
calibration=None,
|
||||
# Coverage is asserted at import time, so reaching this branch means a
|
||||
# source was loaded with a source_type that is not registered.
|
||||
# Surface the bug instead of silently returning a wrong-shaped response.
|
||||
raise RuntimeError(
|
||||
f"No CSS response builder registered for source_type "
|
||||
f"{source.source_type!r} (class={type(source).__name__})"
|
||||
)
|
||||
return builder(source, kw)
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ router = APIRouter()
|
||||
|
||||
|
||||
_PREVIEW_ALLOWED_TYPES = {
|
||||
"static",
|
||||
"single_color",
|
||||
"gradient",
|
||||
"effect",
|
||||
"daylight",
|
||||
@@ -97,65 +97,65 @@ async def preview_color_strip_ws(
|
||||
return ColorStripSource.from_dict(config)
|
||||
|
||||
def _create_stream(source):
|
||||
"""Instantiate and start the appropriate stream class for *source*."""
|
||||
from ledgrab.core.processing.color_strip_stream_manager import _SIMPLE_STREAM_MAP
|
||||
"""Instantiate and start the appropriate stream class for *source*.
|
||||
|
||||
Delegates the per-kind dispatch to ``color_strip_kinds.build_stream``
|
||||
so this preview path and the production ``ColorStripStreamManager``
|
||||
share a single registry. Per-kind dependencies (CSPT store, audio
|
||||
stores, weather manager, …) are gathered into a ``StreamDeps`` bag.
|
||||
|
||||
FastAPI-DI providers raise ``RuntimeError`` when they aren't wired,
|
||||
so we resolve each one through ``_safe`` and pass ``None`` on
|
||||
failure. The per-kind builder will still see a clear error if a
|
||||
truly-required dep is missing for that kind, but unrelated previews
|
||||
(e.g. a ``single_color`` preview on a fresh install where the CSPT
|
||||
store isn't initialized yet) keep working.
|
||||
"""
|
||||
from ledgrab.api.dependencies import (
|
||||
get_audio_processing_template_store,
|
||||
get_audio_source_store,
|
||||
get_audio_template_store,
|
||||
get_cspt_store,
|
||||
)
|
||||
from ledgrab.core.processing.color_strip_kinds import StreamDeps, build_stream
|
||||
|
||||
def _safe(getter):
|
||||
try:
|
||||
return getter()
|
||||
except RuntimeError as e:
|
||||
logger.debug("Preview dep not available (%s): %s", getter.__name__, e)
|
||||
return None
|
||||
|
||||
mgr = get_processor_manager()
|
||||
csm = mgr.color_strip_stream_manager
|
||||
|
||||
if source.source_type == "audio":
|
||||
from ledgrab.api.dependencies import (
|
||||
get_audio_processing_template_store,
|
||||
get_audio_source_store,
|
||||
get_audio_template_store,
|
||||
)
|
||||
from ledgrab.core.processing.audio_stream import AudioColorStripStream
|
||||
# The game-event bus is optional in preview contexts.
|
||||
try:
|
||||
from ledgrab.api.dependencies import get_game_event_bus
|
||||
|
||||
s = AudioColorStripStream(
|
||||
source,
|
||||
mgr.audio_capture_manager,
|
||||
get_audio_source_store(),
|
||||
get_audio_template_store(),
|
||||
get_audio_processing_template_store(),
|
||||
)
|
||||
elif source.source_type == "weather":
|
||||
from ledgrab.core.processing.weather_stream import WeatherColorStripStream
|
||||
game_event_bus = get_game_event_bus()
|
||||
except RuntimeError as e:
|
||||
logger.debug("Preview: no game event bus available: %s", e)
|
||||
game_event_bus = None
|
||||
|
||||
s = WeatherColorStripStream(source, mgr.weather_manager)
|
||||
elif source.source_type == "game_event":
|
||||
from ledgrab.core.processing.game_event_stream import GameEventColorStripStream
|
||||
|
||||
s = GameEventColorStripStream(source)
|
||||
try:
|
||||
from ledgrab.api.dependencies import get_game_event_bus
|
||||
|
||||
bus = get_game_event_bus()
|
||||
except RuntimeError as e:
|
||||
logger.debug("Preview: no game event bus available: %s", e)
|
||||
else:
|
||||
if bus is not None:
|
||||
s.set_event_bus(bus)
|
||||
elif source.source_type == "mapped":
|
||||
from ledgrab.core.processing.mapped_stream import MappedColorStripStream
|
||||
|
||||
s = MappedColorStripStream(source, csm)
|
||||
elif source.source_type == "composite":
|
||||
from ledgrab.api.dependencies import get_cspt_store
|
||||
from ledgrab.core.processing.composite_stream import CompositeColorStripStream
|
||||
|
||||
s = CompositeColorStripStream(
|
||||
source, csm, mgr.value_stream_manager, get_cspt_store(), depth=0
|
||||
)
|
||||
elif source.source_type == "processed":
|
||||
from ledgrab.api.dependencies import get_cspt_store
|
||||
from ledgrab.core.processing.processed_stream import ProcessedColorStripStream
|
||||
|
||||
s = ProcessedColorStripStream(source, csm, get_cspt_store())
|
||||
else:
|
||||
stream_cls = _SIMPLE_STREAM_MAP.get(source.source_type)
|
||||
if not stream_cls:
|
||||
raise ValueError(f"Unsupported preview source_type: {source.source_type}")
|
||||
s = stream_cls(source)
|
||||
deps = StreamDeps(
|
||||
css_manager=csm,
|
||||
value_stream_manager=mgr.value_stream_manager,
|
||||
cspt_store=_safe(get_cspt_store),
|
||||
weather_manager=mgr.weather_manager,
|
||||
audio_capture_manager=mgr.audio_capture_manager,
|
||||
audio_source_store=_safe(get_audio_source_store),
|
||||
audio_template_store=_safe(get_audio_template_store),
|
||||
audio_processing_template_store=_safe(get_audio_processing_template_store),
|
||||
game_event_bus=game_event_bus,
|
||||
depth=0,
|
||||
)
|
||||
try:
|
||||
s = build_stream(source, deps)
|
||||
except ValueError as e:
|
||||
# Preserve the registry's original detail so the API consumer
|
||||
# sees which kind was rejected, not just a generic message.
|
||||
raise ValueError(f"Unsupported preview source_type: {e}") from e
|
||||
# Inject gradient store for palette resolution
|
||||
if hasattr(s, "set_gradient_store"):
|
||||
try:
|
||||
@@ -428,8 +428,17 @@ async def css_api_input_ws(
|
||||
continue
|
||||
|
||||
elif "bytes" in message:
|
||||
# Binary frame: raw RGBRGB... bytes (3 bytes per LED)
|
||||
# Binary frame: raw RGBRGB... bytes (3 bytes per LED).
|
||||
# Cap to a generous upper bound on the LED count — a hostile
|
||||
# client could otherwise stream 100 MB frames and OOM the
|
||||
# server before any application logic ran.
|
||||
raw_bytes = message["bytes"]
|
||||
_MAX_BINARY_LEDS = 8192
|
||||
if len(raw_bytes) > _MAX_BINARY_LEDS * 3:
|
||||
await websocket.send_json(
|
||||
{"error": f"Binary frame too large (max {_MAX_BINARY_LEDS} LEDs)"}
|
||||
)
|
||||
continue
|
||||
if len(raw_bytes) % 3 != 0:
|
||||
await websocket.send_json({"error": "Binary data must be multiple of 3 bytes"})
|
||||
continue
|
||||
|
||||
@@ -122,9 +122,9 @@ class PictureAdvancedCSSResponse(_CSSResponseBase):
|
||||
calibration: Optional[Calibration] = Field(None, description="LED calibration")
|
||||
|
||||
|
||||
class StaticCSSResponse(_CSSResponseBase):
|
||||
source_type: Literal["static"] = "static"
|
||||
color: Any = Field(description="Static RGB color")
|
||||
class SingleColorCSSResponse(_CSSResponseBase):
|
||||
source_type: Literal["single_color"] = "single_color"
|
||||
color: Any = Field(description="Solid RGB color")
|
||||
animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config")
|
||||
|
||||
|
||||
@@ -240,11 +240,18 @@ class MathWaveCSSResponse(_CSSResponseBase):
|
||||
gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping")
|
||||
|
||||
|
||||
class GameEventCSSResponse(_CSSResponseBase):
|
||||
source_type: Literal["game_event"] = "game_event"
|
||||
game_integration_id: str = Field(description="Game integration entity ID")
|
||||
idle_color: Any = Field(description="Idle RGB color (bindable)")
|
||||
event_mappings: List[dict] = Field(default_factory=list, description="Event-to-effect mappings")
|
||||
|
||||
|
||||
ColorStripSourceResponse = Annotated[
|
||||
Union[
|
||||
Annotated[PictureCSSResponse, Tag("picture")],
|
||||
Annotated[PictureAdvancedCSSResponse, Tag("picture_advanced")],
|
||||
Annotated[StaticCSSResponse, Tag("static")],
|
||||
Annotated[SingleColorCSSResponse, Tag("single_color")],
|
||||
Annotated[GradientCSSResponse, Tag("gradient")],
|
||||
Annotated[EffectCSSResponse, Tag("effect")],
|
||||
Annotated[CompositeCSSResponse, Tag("composite")],
|
||||
@@ -258,6 +265,7 @@ ColorStripSourceResponse = Annotated[
|
||||
Annotated[WeatherCSSResponse, Tag("weather")],
|
||||
Annotated[KeyColorsCSSResponse, Tag("key_colors")],
|
||||
Annotated[MathWaveCSSResponse, Tag("math_wave")],
|
||||
Annotated[GameEventCSSResponse, Tag("game_event")],
|
||||
],
|
||||
Discriminator("source_type"),
|
||||
]
|
||||
@@ -303,9 +311,9 @@ class PictureAdvancedCSSCreate(_CSSCreateBase):
|
||||
calibration: Optional[Calibration] = Field(None, description="LED calibration")
|
||||
|
||||
|
||||
class StaticCSSCreate(_CSSCreateBase):
|
||||
source_type: Literal["static"] = "static"
|
||||
color: Any = Field(default=None, description="Static RGB color [R,G,B]")
|
||||
class SingleColorCSSCreate(_CSSCreateBase):
|
||||
source_type: Literal["single_color"] = "single_color"
|
||||
color: Any = Field(default=None, description="Solid RGB color [R,G,B]")
|
||||
animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config")
|
||||
|
||||
|
||||
@@ -434,11 +442,18 @@ class MathWaveCSSCreate(_CSSCreateBase):
|
||||
gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping")
|
||||
|
||||
|
||||
class GameEventCSSCreate(_CSSCreateBase):
|
||||
source_type: Literal["game_event"] = "game_event"
|
||||
game_integration_id: Optional[str] = Field(None, description="Game integration entity ID")
|
||||
idle_color: Any = Field(default=None, description="Idle RGB color [R,G,B] (bindable)")
|
||||
event_mappings: Optional[List[dict]] = Field(None, description="Event-to-effect mappings")
|
||||
|
||||
|
||||
ColorStripSourceCreate = Annotated[
|
||||
Union[
|
||||
Annotated[PictureCSSCreate, Tag("picture")],
|
||||
Annotated[PictureAdvancedCSSCreate, Tag("picture_advanced")],
|
||||
Annotated[StaticCSSCreate, Tag("static")],
|
||||
Annotated[SingleColorCSSCreate, Tag("single_color")],
|
||||
Annotated[GradientCSSCreate, Tag("gradient")],
|
||||
Annotated[EffectCSSCreate, Tag("effect")],
|
||||
Annotated[CompositeCSSCreate, Tag("composite")],
|
||||
@@ -452,6 +467,7 @@ ColorStripSourceCreate = Annotated[
|
||||
Annotated[WeatherCSSCreate, Tag("weather")],
|
||||
Annotated[KeyColorsCSSCreate, Tag("key_colors")],
|
||||
Annotated[MathWaveCSSCreate, Tag("math_wave")],
|
||||
Annotated[GameEventCSSCreate, Tag("game_event")],
|
||||
],
|
||||
Discriminator("source_type"),
|
||||
]
|
||||
@@ -497,9 +513,9 @@ class PictureAdvancedCSSUpdate(_CSSUpdateBase):
|
||||
calibration: Optional[Calibration] = Field(None, description="LED calibration")
|
||||
|
||||
|
||||
class StaticCSSUpdate(_CSSUpdateBase):
|
||||
source_type: Literal["static"] = "static"
|
||||
color: Any = Field(default=None, description="Static RGB color [R,G,B]")
|
||||
class SingleColorCSSUpdate(_CSSUpdateBase):
|
||||
source_type: Literal["single_color"] = "single_color"
|
||||
color: Any = Field(default=None, description="Solid RGB color [R,G,B]")
|
||||
animation: Optional[AnimationConfig] = Field(None, description="Procedural animation config")
|
||||
|
||||
|
||||
@@ -626,11 +642,18 @@ class MathWaveCSSUpdate(_CSSUpdateBase):
|
||||
gradient_id: Optional[str] = Field(None, description="Gradient entity ID for color mapping")
|
||||
|
||||
|
||||
class GameEventCSSUpdate(_CSSUpdateBase):
|
||||
source_type: Literal["game_event"] = "game_event"
|
||||
game_integration_id: Optional[str] = Field(None, description="Game integration entity ID")
|
||||
idle_color: Any = Field(default=None, description="Idle RGB color [R,G,B] (bindable)")
|
||||
event_mappings: Optional[List[dict]] = Field(None, description="Event-to-effect mappings")
|
||||
|
||||
|
||||
ColorStripSourceUpdate = Annotated[
|
||||
Union[
|
||||
Annotated[PictureCSSUpdate, Tag("picture")],
|
||||
Annotated[PictureAdvancedCSSUpdate, Tag("picture_advanced")],
|
||||
Annotated[StaticCSSUpdate, Tag("static")],
|
||||
Annotated[SingleColorCSSUpdate, Tag("single_color")],
|
||||
Annotated[GradientCSSUpdate, Tag("gradient")],
|
||||
Annotated[EffectCSSUpdate, Tag("effect")],
|
||||
Annotated[CompositeCSSUpdate, Tag("composite")],
|
||||
@@ -644,6 +667,7 @@ ColorStripSourceUpdate = Annotated[
|
||||
Annotated[WeatherCSSUpdate, Tag("weather")],
|
||||
Annotated[KeyColorsCSSUpdate, Tag("key_colors")],
|
||||
Annotated[MathWaveCSSUpdate, Tag("math_wave")],
|
||||
Annotated[GameEventCSSUpdate, Tag("game_event")],
|
||||
],
|
||||
Discriminator("source_type"),
|
||||
]
|
||||
|
||||
@@ -0,0 +1,273 @@
|
||||
"""Single source of truth for non-sharable color-strip stream construction.
|
||||
|
||||
Both the preview WebSocket (``api/routes/color_strip_sources/ws_stream.py``)
|
||||
and the production ``ColorStripStreamManager.acquire`` used to maintain
|
||||
parallel ``if source.source_type == "..." elif ... else ..._SIMPLE_STREAM_MAP``
|
||||
chains. Adding a new kind required keeping those two chains in lockstep, and
|
||||
silently fell through to a generic stream class when an entry was missed.
|
||||
|
||||
This module replaces both chains with a single ``STREAM_BUILDERS`` registry
|
||||
plus a small ``StreamDeps`` dependency bag. Each caller populates the bag
|
||||
from its own context (DI container, processor manager, etc.) and looks the
|
||||
builder up by ``source.source_type``. A coverage assertion at import time
|
||||
guarantees every kind in ``storage._SOURCE_TYPE_MAP`` is either sharable or
|
||||
has a builder here — silent fall-throughs are no longer possible.
|
||||
|
||||
Sharable kinds (``picture``, ``picture_advanced``, ``key_colors``) are NOT in
|
||||
this registry: they require an injected ``LiveStream`` whose acquisition is
|
||||
intertwined with the source's calibration, which does not fit a uniform
|
||||
factory signature. Those continue to use bespoke paths inside
|
||||
``ColorStripStreamManager``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Callable
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class StreamDeps:
|
||||
"""Dependency bag for non-sharable stream construction.
|
||||
|
||||
Each call site (preview WebSocket, production stream manager) builds one
|
||||
of these from its own context before invoking :func:`build_stream`.
|
||||
Fields are ``Any`` (with ``None`` defaults) because individual builders
|
||||
only consume a subset; tests can supply a minimal bag.
|
||||
|
||||
``frozen=True`` guards against a builder accidentally reassigning a
|
||||
field; it does NOT make the referenced objects immutable — the
|
||||
``css_manager``, stores, etc. are live mutable services.
|
||||
|
||||
``css_manager`` is needed by composite/mapped/processed builders so they
|
||||
can recursively acquire dependent streams. Single-kind builders ignore
|
||||
it. The field has no default so callers are forced to think about which
|
||||
manager they are wiring through.
|
||||
"""
|
||||
|
||||
css_manager: Any
|
||||
value_stream_manager: Any = None
|
||||
cspt_store: Any = None # ColorStripProcessingTemplateStore
|
||||
weather_manager: Any = None
|
||||
audio_capture_manager: Any = None
|
||||
audio_source_store: Any = None
|
||||
audio_template_store: Any = None
|
||||
audio_processing_template_store: Any = None
|
||||
game_event_bus: Any = None
|
||||
depth: int = 0 # composite nesting depth — passed through verbatim
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Per-kind builders
|
||||
#
|
||||
# Each builder is a small free function ``(source, deps) -> ColorStripStream``.
|
||||
# Imports are deferred to keep this module cheap to import (the production
|
||||
# processing graph is large).
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _build_audio(source, d: StreamDeps):
|
||||
from ledgrab.core.processing.audio_stream import AudioColorStripStream
|
||||
|
||||
return AudioColorStripStream(
|
||||
source,
|
||||
d.audio_capture_manager,
|
||||
d.audio_source_store,
|
||||
d.audio_template_store,
|
||||
d.audio_processing_template_store,
|
||||
)
|
||||
|
||||
|
||||
def _build_composite(source, d: StreamDeps):
|
||||
from ledgrab.core.processing.composite_stream import CompositeColorStripStream
|
||||
|
||||
return CompositeColorStripStream(
|
||||
source,
|
||||
d.css_manager,
|
||||
d.value_stream_manager,
|
||||
d.cspt_store,
|
||||
depth=d.depth,
|
||||
)
|
||||
|
||||
|
||||
def _build_mapped(source, d: StreamDeps):
|
||||
from ledgrab.core.processing.mapped_stream import MappedColorStripStream
|
||||
|
||||
return MappedColorStripStream(source, d.css_manager)
|
||||
|
||||
|
||||
def _build_processed(source, d: StreamDeps):
|
||||
from ledgrab.core.processing.processed_stream import ProcessedColorStripStream
|
||||
|
||||
return ProcessedColorStripStream(source, d.css_manager, d.cspt_store)
|
||||
|
||||
|
||||
def _build_weather(source, d: StreamDeps):
|
||||
from ledgrab.core.processing.weather_stream import WeatherColorStripStream
|
||||
|
||||
return WeatherColorStripStream(source, d.weather_manager)
|
||||
|
||||
|
||||
def _build_game_event(source, d: StreamDeps):
|
||||
from ledgrab.core.processing.game_event_stream import GameEventColorStripStream
|
||||
|
||||
stream = GameEventColorStripStream(source)
|
||||
if d.game_event_bus is not None:
|
||||
stream.set_event_bus(d.game_event_bus)
|
||||
return stream
|
||||
|
||||
|
||||
def _make_source_only_builder(loader: Callable[[], type]) -> Callable[[Any, StreamDeps], Any]:
|
||||
"""Wrap a class-loader so it produces a uniform ``(source, deps) -> stream`` builder.
|
||||
|
||||
The loader is called on each invocation but module caching makes the
|
||||
import a single dict lookup after the first call.
|
||||
"""
|
||||
|
||||
def _build(source, _deps: StreamDeps):
|
||||
return loader()(source)
|
||||
|
||||
return _build
|
||||
|
||||
|
||||
def _single_color_cls() -> type:
|
||||
from ledgrab.core.processing.color_strip_stream import SingleColorStripStream
|
||||
|
||||
return SingleColorStripStream
|
||||
|
||||
|
||||
def _gradient_cls() -> type:
|
||||
from ledgrab.core.processing.color_strip_stream import GradientColorStripStream
|
||||
|
||||
return GradientColorStripStream
|
||||
|
||||
|
||||
def _effect_cls() -> type:
|
||||
from ledgrab.core.processing.effect_stream import EffectColorStripStream
|
||||
|
||||
return EffectColorStripStream
|
||||
|
||||
|
||||
def _api_input_cls() -> type:
|
||||
from ledgrab.core.processing.api_input_stream import ApiInputColorStripStream
|
||||
|
||||
return ApiInputColorStripStream
|
||||
|
||||
|
||||
def _notification_cls() -> type:
|
||||
from ledgrab.core.processing.notification_stream import NotificationColorStripStream
|
||||
|
||||
return NotificationColorStripStream
|
||||
|
||||
|
||||
def _daylight_cls() -> type:
|
||||
from ledgrab.core.processing.daylight_stream import DaylightColorStripStream
|
||||
|
||||
return DaylightColorStripStream
|
||||
|
||||
|
||||
def _candlelight_cls() -> type:
|
||||
from ledgrab.core.processing.candlelight_stream import CandlelightColorStripStream
|
||||
|
||||
return CandlelightColorStripStream
|
||||
|
||||
|
||||
def _math_wave_cls() -> type:
|
||||
from ledgrab.core.processing.math_wave_stream import MathWaveColorStripStream
|
||||
|
||||
return MathWaveColorStripStream
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
StreamBuilder = Callable[[Any, StreamDeps], Any]
|
||||
|
||||
STREAM_BUILDERS: dict[str, StreamBuilder] = {
|
||||
"audio": _build_audio,
|
||||
"composite": _build_composite,
|
||||
"mapped": _build_mapped,
|
||||
"processed": _build_processed,
|
||||
"weather": _build_weather,
|
||||
"game_event": _build_game_event,
|
||||
"single_color": _make_source_only_builder(_single_color_cls),
|
||||
# Legacy alias: pre-rename rows used "static". The data migration rewrites
|
||||
# the on-disk source_type on startup, but this alias keeps an in-flight
|
||||
# legacy entry resolving to the right stream class.
|
||||
"static": _make_source_only_builder(_single_color_cls),
|
||||
"gradient": _make_source_only_builder(_gradient_cls),
|
||||
"effect": _make_source_only_builder(_effect_cls),
|
||||
"api_input": _make_source_only_builder(_api_input_cls),
|
||||
"notification": _make_source_only_builder(_notification_cls),
|
||||
"daylight": _make_source_only_builder(_daylight_cls),
|
||||
"candlelight": _make_source_only_builder(_candlelight_cls),
|
||||
"math_wave": _make_source_only_builder(_math_wave_cls),
|
||||
}
|
||||
|
||||
|
||||
# Sharable kinds are handled by dedicated LiveStream-acquisition paths in
|
||||
# ColorStripStreamManager (their construction depends on calibration → picture
|
||||
# source resolution, which does not fit a uniform factory signature).
|
||||
SHARABLE_KINDS: frozenset[str] = frozenset({"picture", "picture_advanced", "key_colors"})
|
||||
|
||||
|
||||
def build_stream(source, deps: StreamDeps):
|
||||
"""Build a non-sharable color-strip stream for *source*.
|
||||
|
||||
Raises ``ValueError`` if the kind has no registered builder (which
|
||||
would indicate a sharable source slipped through the caller's
|
||||
``sharable`` gate, or a new kind missing from this registry).
|
||||
"""
|
||||
builder = STREAM_BUILDERS.get(source.source_type)
|
||||
if builder is None:
|
||||
raise ValueError(
|
||||
f"No stream builder for non-sharable color-strip-source kind "
|
||||
f"{source.source_type!r} (id={getattr(source, 'id', '?')!r})"
|
||||
)
|
||||
return builder(source, deps)
|
||||
|
||||
|
||||
def _assert_stream_kind_coverage() -> None:
|
||||
"""Verify the registry is a strict partition: every kind from storage is
|
||||
either listed in SHARABLE_KINDS or has a STREAM_BUILDERS entry.
|
||||
|
||||
Runs at module import so a kind added to ``_SOURCE_TYPE_MAP`` without a
|
||||
corresponding builder fails the server boot loudly instead of silently
|
||||
falling through at request time.
|
||||
|
||||
Contract note
|
||||
-------------
|
||||
This check is **asymmetric** (``STREAM_BUILDERS ∪ SHARABLE_KINDS ==
|
||||
storage_kinds``) because sharable kinds are constructed by a separate
|
||||
path inside ``ColorStripStreamManager``. The sister assertion in
|
||||
``api/routes/color_strip_sources/_helpers.py::_assert_response_map_coverage``
|
||||
is **symmetric** (``_RESPONSE_MAP keys == storage_kinds``) because every
|
||||
kind, sharable or not, still needs a response shape. Both assertions key
|
||||
by the ``source_type`` string; adding a new kind requires updates to
|
||||
storage, ``_RESPONSE_MAP``, and either ``STREAM_BUILDERS`` or
|
||||
``SHARABLE_KINDS``. Both assertions catch missing entries; only this one
|
||||
expects a subset relationship.
|
||||
"""
|
||||
from ledgrab.storage.color_strip_source import _SOURCE_TYPE_MAP
|
||||
|
||||
storage_kinds = set(_SOURCE_TYPE_MAP.keys())
|
||||
builder_kinds = set(STREAM_BUILDERS.keys())
|
||||
expected_non_sharable = storage_kinds - SHARABLE_KINDS
|
||||
|
||||
missing = expected_non_sharable - builder_kinds
|
||||
extra = builder_kinds - storage_kinds
|
||||
if missing or extra:
|
||||
problems = []
|
||||
if missing:
|
||||
problems.append(f"missing builders: {sorted(missing)}")
|
||||
if extra:
|
||||
problems.append(f"unregistered kinds: {sorted(extra)}")
|
||||
raise RuntimeError(
|
||||
"color_strip_kinds.STREAM_BUILDERS is out of sync with storage._SOURCE_TYPE_MAP: "
|
||||
+ "; ".join(problems)
|
||||
)
|
||||
|
||||
|
||||
_assert_stream_kind_coverage()
|
||||
@@ -3,7 +3,7 @@
|
||||
PictureColorStripStreams (expensive screen capture) are shared across multiple
|
||||
consumers via reference counting — processing runs once, not once per target.
|
||||
|
||||
Count-dependent streams (static, gradient, effect) are NOT shared.
|
||||
Count-dependent streams (single_color, gradient, effect) are NOT shared.
|
||||
Each consumer gets its own instance so it can configure an independent LED count
|
||||
without interfering with other targets.
|
||||
"""
|
||||
@@ -11,37 +11,18 @@ without interfering with other targets.
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Optional
|
||||
|
||||
from ledgrab.core.processing.color_strip_kinds import (
|
||||
StreamDeps,
|
||||
build_stream,
|
||||
)
|
||||
from ledgrab.core.processing.color_strip_stream import (
|
||||
ColorStripStream,
|
||||
GradientColorStripStream,
|
||||
PictureColorStripStream,
|
||||
StaticColorStripStream,
|
||||
)
|
||||
from ledgrab.core.processing.processed_stream import ProcessedColorStripStream
|
||||
from ledgrab.core.processing.effect_stream import EffectColorStripStream
|
||||
from ledgrab.core.processing.api_input_stream import ApiInputColorStripStream
|
||||
from ledgrab.core.processing.notification_stream import NotificationColorStripStream
|
||||
from ledgrab.core.processing.daylight_stream import DaylightColorStripStream
|
||||
from ledgrab.core.processing.candlelight_stream import CandlelightColorStripStream
|
||||
from ledgrab.core.processing.game_event_stream import GameEventColorStripStream
|
||||
from ledgrab.core.processing.math_wave_stream import MathWaveColorStripStream
|
||||
from ledgrab.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# source_type → stream class for non-picture (non-sharable) sources
|
||||
_SIMPLE_STREAM_MAP = {
|
||||
"static": StaticColorStripStream,
|
||||
"gradient": GradientColorStripStream,
|
||||
"effect": EffectColorStripStream,
|
||||
"api_input": ApiInputColorStripStream,
|
||||
"notification": NotificationColorStripStream,
|
||||
"daylight": DaylightColorStripStream,
|
||||
"candlelight": CandlelightColorStripStream,
|
||||
"game_event": GameEventColorStripStream,
|
||||
"math_wave": MathWaveColorStripStream,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class _ColorStripEntry:
|
||||
@@ -241,43 +222,27 @@ class ColorStripStreamManager:
|
||||
"""
|
||||
source = self._color_strip_store.get_source(css_id)
|
||||
|
||||
# Non-sharable: always create a fresh per-consumer instance
|
||||
# Non-sharable: always create a fresh per-consumer instance.
|
||||
# Construction is delegated to the per-kind registry in
|
||||
# ``color_strip_kinds`` so the dispatch lives in exactly one place.
|
||||
if not source.sharable:
|
||||
if source.source_type == "audio":
|
||||
from ledgrab.core.processing.audio_stream import AudioColorStripStream
|
||||
|
||||
css_stream = AudioColorStripStream(
|
||||
source,
|
||||
self._audio_capture_manager,
|
||||
self._audio_source_store,
|
||||
self._audio_template_store,
|
||||
self._audio_processing_template_store,
|
||||
)
|
||||
elif source.source_type == "composite":
|
||||
from ledgrab.core.processing.composite_stream import (
|
||||
CompositeColorStripStream,
|
||||
)
|
||||
|
||||
css_stream = CompositeColorStripStream(
|
||||
source, self, self._value_stream_manager, self._cspt_store, depth=depth
|
||||
)
|
||||
elif source.source_type == "mapped":
|
||||
from ledgrab.core.processing.mapped_stream import MappedColorStripStream
|
||||
|
||||
css_stream = MappedColorStripStream(source, self)
|
||||
elif source.source_type == "processed":
|
||||
css_stream = ProcessedColorStripStream(source, self, self._cspt_store)
|
||||
elif source.source_type == "weather":
|
||||
from ledgrab.core.processing.weather_stream import WeatherColorStripStream
|
||||
|
||||
css_stream = WeatherColorStripStream(source, self._weather_manager)
|
||||
else:
|
||||
stream_cls = _SIMPLE_STREAM_MAP.get(source.source_type)
|
||||
if not stream_cls:
|
||||
raise ValueError(
|
||||
f"Unsupported color strip source type '{source.source_type}' for {css_id}"
|
||||
)
|
||||
css_stream = stream_cls(source)
|
||||
deps = StreamDeps(
|
||||
css_manager=self,
|
||||
value_stream_manager=self._value_stream_manager,
|
||||
cspt_store=self._cspt_store,
|
||||
weather_manager=self._weather_manager,
|
||||
audio_capture_manager=self._audio_capture_manager,
|
||||
audio_source_store=self._audio_source_store,
|
||||
audio_template_store=self._audio_template_store,
|
||||
audio_processing_template_store=self._audio_processing_template_store,
|
||||
game_event_bus=self._game_event_bus,
|
||||
depth=depth,
|
||||
)
|
||||
try:
|
||||
css_stream = build_stream(source, deps)
|
||||
except ValueError as e:
|
||||
# Surface the css_id alongside the registry's error.
|
||||
raise ValueError(f"{e} (css_id={css_id})") from e
|
||||
# Inject gradient store for palette resolution
|
||||
if self._gradient_store and hasattr(css_stream, "set_gradient_store"):
|
||||
css_stream.set_gradient_store(self._gradient_store)
|
||||
|
||||
@@ -0,0 +1,354 @@
|
||||
"""Single source of truth for value-stream construction.
|
||||
|
||||
``ValueStreamManager._create_stream`` used to be a 168-line ``isinstance``
|
||||
ladder over 14 ``ValueSource`` subclasses with a silent fallback to
|
||||
``StaticValueStream(value=1.0)``. The ladder forced any new value kind to
|
||||
edit the factory plus the storage subclass plus the schemas plus the store's
|
||||
``create_source`` — and a missing branch corrupted the stream at runtime.
|
||||
|
||||
This module replaces the ladder with a single ``STREAM_BUILDERS`` registry
|
||||
keyed by the ``source_type`` string (matching the storage layer's
|
||||
``_VALUE_SOURCE_MAP``). An import-time coverage assertion guarantees the two
|
||||
registries stay aligned.
|
||||
|
||||
Builders take a ``(source, deps: ValueStreamDeps) -> ValueStream`` shape so
|
||||
both the production manager and any preview / test harness can populate the
|
||||
deps from their own context.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any, Callable, Optional
|
||||
|
||||
if TYPE_CHECKING:
|
||||
# Typed forward references so mypy/pyright catch typos like
|
||||
# ``d.gradient_stroe`` at static-analysis time. At runtime these are
|
||||
# ``Any`` — the live objects come from the FastAPI / manager wiring.
|
||||
from ledgrab.core.audio.audio_capture_manager import AudioCaptureManager
|
||||
from ledgrab.core.game_integration.event_bus import GameEventBus
|
||||
from ledgrab.core.home_assistant.ha_manager import HomeAssistantManager
|
||||
from ledgrab.core.processing.color_strip_stream_manager import (
|
||||
ColorStripStreamManager,
|
||||
)
|
||||
from ledgrab.core.processing.live_stream_manager import LiveStreamManager
|
||||
from ledgrab.core.processing.sync_clock_manager import SyncClockRuntime
|
||||
from ledgrab.storage.audio_processing_template_store import (
|
||||
AudioProcessingTemplateStore,
|
||||
)
|
||||
from ledgrab.storage.audio_source_store import AudioSourceStore
|
||||
from ledgrab.storage.audio_template_store import AudioTemplateStore
|
||||
from ledgrab.storage.gradient_store import GradientStore
|
||||
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ValueStreamDeps:
|
||||
"""Dependency bag for value-stream construction.
|
||||
|
||||
Builders only read the subset they need. ``value_stream_manager`` is the
|
||||
one truly load-bearing field — it is referenced by
|
||||
``GradientMapValueStream`` so it can recursively acquire its source
|
||||
value stream.
|
||||
|
||||
``clock_runtime`` is pre-acquired by the manager exclusively for the
|
||||
``animated_color`` kind (see :data:`NEEDS_CLOCK_RUNTIME`). The manager
|
||||
owns the bookkeeping for tracking ``vs_id → clock_id`` so the builder
|
||||
stays a pure ``(source, deps) -> stream`` mapping. If a new kind ever
|
||||
grows a clock dependency, add it to ``NEEDS_CLOCK_RUNTIME`` AND surface
|
||||
a separate field — sharing ``clock_runtime`` across kinds invites the
|
||||
wrong runtime being passed to the wrong builder.
|
||||
|
||||
Field types are quoted so they stay informational under
|
||||
``TYPE_CHECKING`` and the dataclass still accepts plain mocks at
|
||||
runtime. Builders therefore get IDE/lint help against typos like
|
||||
``d.gradient_stroe`` while production code remains duck-typed.
|
||||
"""
|
||||
|
||||
value_stream_manager: "Any"
|
||||
audio_capture_manager: Optional["AudioCaptureManager"] = None
|
||||
audio_source_store: Optional["AudioSourceStore"] = None
|
||||
audio_template_store: Optional["AudioTemplateStore"] = None
|
||||
audio_processing_template_store: Optional["AudioProcessingTemplateStore"] = None
|
||||
live_stream_manager: Optional["LiveStreamManager"] = None
|
||||
ha_manager: Optional["HomeAssistantManager"] = None
|
||||
gradient_store: Optional["GradientStore"] = None
|
||||
css_stream_manager: Optional["ColorStripStreamManager"] = None
|
||||
event_bus: Optional["GameEventBus"] = None
|
||||
http_endpoint_store: Optional["HTTPEndpointStore"] = None
|
||||
clock_runtime: Optional["SyncClockRuntime"] = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Per-kind builders
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _build_static(source, _d: ValueStreamDeps):
|
||||
from ledgrab.core.processing.value_stream import StaticValueStream
|
||||
|
||||
return StaticValueStream(value=source.value)
|
||||
|
||||
|
||||
def _build_animated(source, _d: ValueStreamDeps):
|
||||
from ledgrab.core.processing.value_stream import AnimatedValueStream
|
||||
|
||||
return AnimatedValueStream(
|
||||
waveform=source.waveform,
|
||||
speed=source.speed,
|
||||
min_value=source.min_value,
|
||||
max_value=source.max_value,
|
||||
)
|
||||
|
||||
|
||||
def _build_audio(source, d: ValueStreamDeps):
|
||||
from ledgrab.core.processing.value_stream import AudioValueStream
|
||||
|
||||
return AudioValueStream(
|
||||
audio_source_id=source.audio_source_id,
|
||||
mode=source.mode,
|
||||
sensitivity=source.sensitivity,
|
||||
smoothing=source.smoothing,
|
||||
min_value=source.min_value,
|
||||
max_value=source.max_value,
|
||||
auto_gain=source.auto_gain,
|
||||
audio_capture_manager=d.audio_capture_manager,
|
||||
audio_source_store=d.audio_source_store,
|
||||
audio_template_store=d.audio_template_store,
|
||||
audio_processing_template_store=d.audio_processing_template_store,
|
||||
)
|
||||
|
||||
|
||||
def _build_daylight(source, _d: ValueStreamDeps):
|
||||
from ledgrab.core.processing.value_stream import DaylightValueStream
|
||||
|
||||
return DaylightValueStream(
|
||||
speed=source.speed,
|
||||
use_real_time=source.use_real_time,
|
||||
latitude=source.latitude,
|
||||
longitude=source.longitude,
|
||||
min_value=source.min_value,
|
||||
max_value=source.max_value,
|
||||
)
|
||||
|
||||
|
||||
def _build_adaptive_time(source, _d: ValueStreamDeps):
|
||||
from ledgrab.core.processing.value_stream import TimeOfDayValueStream
|
||||
|
||||
return TimeOfDayValueStream(
|
||||
schedule=source.schedule,
|
||||
min_value=source.min_value,
|
||||
max_value=source.max_value,
|
||||
)
|
||||
|
||||
|
||||
def _build_adaptive_scene(source, d: ValueStreamDeps):
|
||||
from ledgrab.core.processing.value_stream import SceneValueStream
|
||||
|
||||
return SceneValueStream(
|
||||
picture_source_id=source.picture_source_id,
|
||||
scene_behavior=source.scene_behavior,
|
||||
sensitivity=source.sensitivity,
|
||||
smoothing=source.smoothing,
|
||||
min_value=source.min_value,
|
||||
max_value=source.max_value,
|
||||
live_stream_manager=d.live_stream_manager,
|
||||
)
|
||||
|
||||
|
||||
def _build_static_color(source, _d: ValueStreamDeps):
|
||||
from ledgrab.core.processing.value_stream import StaticColorValueStream
|
||||
|
||||
return StaticColorValueStream(color=source.color)
|
||||
|
||||
|
||||
def _build_animated_color(source, d: ValueStreamDeps):
|
||||
# See NEEDS_CLOCK_RUNTIME below: ``d.clock_runtime`` is pre-acquired by
|
||||
# ``ValueStreamManager._create_stream`` exclusively for this kind. Any
|
||||
# other builder that ever needs a clock should add its own deps field
|
||||
# rather than read this one.
|
||||
from ledgrab.core.processing.value_stream import AnimatedColorValueStream
|
||||
|
||||
return AnimatedColorValueStream(
|
||||
colors=source.colors,
|
||||
speed=source.speed,
|
||||
easing=source.easing,
|
||||
clock=d.clock_runtime,
|
||||
)
|
||||
|
||||
|
||||
def _build_adaptive_time_color(source, _d: ValueStreamDeps):
|
||||
from ledgrab.core.processing.value_stream import AdaptiveTimeColorValueStream
|
||||
|
||||
return AdaptiveTimeColorValueStream(schedule=source.schedule)
|
||||
|
||||
|
||||
def _build_ha_entity(source, d: ValueStreamDeps):
|
||||
from ledgrab.core.processing.value_stream import HAEntityValueStream
|
||||
|
||||
return HAEntityValueStream(
|
||||
ha_source_id=source.ha_source_id,
|
||||
entity_id=source.entity_id,
|
||||
attribute=source.attribute,
|
||||
min_ha_value=source.min_ha_value,
|
||||
max_ha_value=source.max_ha_value,
|
||||
smoothing=source.smoothing,
|
||||
ha_manager=d.ha_manager,
|
||||
)
|
||||
|
||||
|
||||
def _build_gradient_map(source, d: ValueStreamDeps):
|
||||
from ledgrab.core.processing.value_stream import GradientMapValueStream
|
||||
|
||||
return GradientMapValueStream(
|
||||
value_source_id=source.value_source_id,
|
||||
gradient_id=source.gradient_id,
|
||||
easing=source.easing,
|
||||
value_stream_manager=d.value_stream_manager,
|
||||
gradient_store=d.gradient_store,
|
||||
)
|
||||
|
||||
|
||||
def _build_css_extract(source, d: ValueStreamDeps):
|
||||
from ledgrab.core.processing.value_stream import CSSExtractValueStream
|
||||
|
||||
return CSSExtractValueStream(
|
||||
color_strip_source_id=source.color_strip_source_id,
|
||||
led_start=source.led_start,
|
||||
led_end=source.led_end,
|
||||
css_stream_manager=d.css_stream_manager,
|
||||
)
|
||||
|
||||
|
||||
def _build_system_metrics(source, _d: ValueStreamDeps):
|
||||
from ledgrab.core.processing.value_stream import SystemMetricsValueStream
|
||||
|
||||
return SystemMetricsValueStream(
|
||||
metric=source.metric,
|
||||
min_value=source.min_value,
|
||||
max_value=source.max_value,
|
||||
max_rate=source.max_rate,
|
||||
disk_path=source.disk_path,
|
||||
sensor_label=source.sensor_label,
|
||||
poll_interval=source.poll_interval,
|
||||
smoothing=source.smoothing,
|
||||
)
|
||||
|
||||
|
||||
def _build_game_event(source, d: ValueStreamDeps):
|
||||
# Late import: ``GameEventValueStream`` lives in a separate sub-package
|
||||
# to keep the game-event subsystem (which transitively pulls in the
|
||||
# game-integration adapters) off the cold-start path for installs that
|
||||
# never use game events.
|
||||
from ledgrab.core.value_sources.game_event_value_source import GameEventValueStream
|
||||
|
||||
return GameEventValueStream(
|
||||
event_type=source.event_type,
|
||||
min_game_value=source.min_game_value,
|
||||
max_game_value=source.max_game_value,
|
||||
smoothing=source.smoothing,
|
||||
default_value=source.default_value,
|
||||
timeout=source.timeout,
|
||||
event_bus=d.event_bus,
|
||||
)
|
||||
|
||||
|
||||
def _build_http(source, d: ValueStreamDeps):
|
||||
from ledgrab.core.processing.value_stream import HTTPValueStream
|
||||
|
||||
return HTTPValueStream(
|
||||
endpoint_id=source.http_endpoint_id,
|
||||
json_path=source.json_path,
|
||||
interval_s=source.interval_s,
|
||||
min_value=source.min_value,
|
||||
max_value=source.max_value,
|
||||
smoothing=source.smoothing,
|
||||
http_endpoint_store=d.http_endpoint_store,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
StreamBuilder = Callable[[Any, ValueStreamDeps], Any]
|
||||
|
||||
STREAM_BUILDERS: dict[str, StreamBuilder] = {
|
||||
"static": _build_static,
|
||||
"animated": _build_animated,
|
||||
"audio": _build_audio,
|
||||
"daylight": _build_daylight,
|
||||
"adaptive_time": _build_adaptive_time,
|
||||
"adaptive_scene": _build_adaptive_scene,
|
||||
"static_color": _build_static_color,
|
||||
"animated_color": _build_animated_color,
|
||||
"adaptive_time_color": _build_adaptive_time_color,
|
||||
"ha_entity": _build_ha_entity,
|
||||
"gradient_map": _build_gradient_map,
|
||||
"css_extract": _build_css_extract,
|
||||
"system_metrics": _build_system_metrics,
|
||||
"game_event": _build_game_event,
|
||||
"http": _build_http,
|
||||
}
|
||||
|
||||
|
||||
# ``animated_color`` is the only kind that needs a pre-acquired SyncClockRuntime
|
||||
# from the manager; the rest derive everything they need from ``source`` and
|
||||
# ``deps``. Exposing this set lets the manager perform the side-effecting
|
||||
# acquisition step exactly once, before delegating to the registry.
|
||||
NEEDS_CLOCK_RUNTIME: frozenset[str] = frozenset({"animated_color"})
|
||||
|
||||
|
||||
def build_stream(source, deps: ValueStreamDeps):
|
||||
"""Build a ValueStream for *source*.
|
||||
|
||||
Raises ``ValueError`` when no builder is registered for
|
||||
``source.source_type``. Coverage is asserted at module import, so this
|
||||
only fires for an in-flight instance whose ``source_type`` somehow
|
||||
drifted from the registered set.
|
||||
"""
|
||||
builder = STREAM_BUILDERS.get(source.source_type)
|
||||
if builder is None:
|
||||
raise ValueError(
|
||||
f"No value-stream builder for source_type {source.source_type!r} "
|
||||
f"(id={getattr(source, 'id', '?')!r})"
|
||||
)
|
||||
return builder(source, deps)
|
||||
|
||||
|
||||
def _assert_value_kind_coverage() -> None:
|
||||
"""Verify the registry and storage's ``_VALUE_SOURCE_MAP`` agree.
|
||||
|
||||
Runs at module import. Symmetric: every kind in storage must have a
|
||||
builder, and every builder must correspond to a real storage kind.
|
||||
Also asserts ``NEEDS_CLOCK_RUNTIME`` only names kinds that exist in the
|
||||
registry, so a typo there fails the boot loudly instead of silently
|
||||
leaking a ``SyncClockRuntime`` acquisition.
|
||||
"""
|
||||
from ledgrab.storage.value_source import _VALUE_SOURCE_MAP
|
||||
|
||||
storage_kinds = set(_VALUE_SOURCE_MAP.keys())
|
||||
builder_kinds = set(STREAM_BUILDERS.keys())
|
||||
missing = storage_kinds - builder_kinds
|
||||
extra = builder_kinds - storage_kinds
|
||||
if missing or extra:
|
||||
problems = []
|
||||
if missing:
|
||||
problems.append(f"missing builders: {sorted(missing)}")
|
||||
if extra:
|
||||
problems.append(f"unregistered kinds: {sorted(extra)}")
|
||||
raise RuntimeError(
|
||||
"value_kinds.STREAM_BUILDERS is out of sync with storage._VALUE_SOURCE_MAP: "
|
||||
+ "; ".join(problems)
|
||||
)
|
||||
|
||||
rogue_clock_kinds = NEEDS_CLOCK_RUNTIME - builder_kinds
|
||||
if rogue_clock_kinds:
|
||||
raise RuntimeError(
|
||||
"value_kinds.NEEDS_CLOCK_RUNTIME names kinds with no registered "
|
||||
f"builder: {sorted(rogue_clock_kinds)}"
|
||||
)
|
||||
|
||||
|
||||
_assert_value_kind_coverage()
|
||||
@@ -21,7 +21,9 @@ ValueStreamManager owns all running ValueStreams, keyed by
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import math
|
||||
import re
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import datetime
|
||||
@@ -29,8 +31,13 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
from ledgrab.storage.base_store import EntityNotFoundError
|
||||
from ledgrab.utils import get_logger
|
||||
|
||||
# Compiled once — used by ``_extract_simple_path`` on every poll.
|
||||
_NAME_HEAD_RE = re.compile(r"^([^\[]*)")
|
||||
_INDEX_RE = re.compile(r"^\[(\d+)\]")
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ledgrab.core.audio.audio_capture import AudioCaptureManager
|
||||
from ledgrab.core.game_integration.event_bus import GameEventBus
|
||||
@@ -39,6 +46,7 @@ if TYPE_CHECKING:
|
||||
from ledgrab.core.processing.live_stream_manager import LiveStreamManager
|
||||
from ledgrab.core.processing.sync_clock_manager import SyncClockManager
|
||||
from ledgrab.storage.audio_source_store import AudioSourceStore
|
||||
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
|
||||
from ledgrab.storage.value_source import ValueSource
|
||||
from ledgrab.storage.value_source_store import ValueSourceStore
|
||||
|
||||
@@ -1017,6 +1025,211 @@ class HAEntityValueStream(ValueStream):
|
||||
logger.warning("HAEntityValueStream failed to swap HA runtime: %s", e)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HTTP poll
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class HTTPValueStream(ValueStream):
|
||||
"""Periodically polls an HTTPEndpoint and extracts a value via json_path.
|
||||
|
||||
Exposes two accessors:
|
||||
|
||||
- ``get_value()`` returns a normalized float in [0, 1] for use as a
|
||||
modulator (brightness, color, etc.). The raw extracted value is
|
||||
coerced to float; non-numeric values yield 0.0.
|
||||
- ``get_raw_value()`` returns the un-normalized extracted value
|
||||
(str / int / float / bool / None) for consumers that need it
|
||||
verbatim — e.g. an automation rule comparing ``"playing"``.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
endpoint_id: str,
|
||||
json_path: str,
|
||||
interval_s: int,
|
||||
min_value: float,
|
||||
max_value: float,
|
||||
smoothing: float,
|
||||
http_endpoint_store: Optional["HTTPEndpointStore"] = None,
|
||||
) -> None:
|
||||
self._endpoint_id = endpoint_id
|
||||
self._json_path = json_path
|
||||
self._interval_s = max(1, int(interval_s))
|
||||
self._min_value = min_value
|
||||
self._max_value = max_value
|
||||
self._smoothing = smoothing
|
||||
self._http_endpoint_store = http_endpoint_store
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._raw_value: Any = None
|
||||
self._prev_normalized: Optional[float] = None
|
||||
# Kept as private attrs for internal/log diagnostics; not exposed via
|
||||
# public properties or API until a status endpoint consumes them.
|
||||
self._last_fetched_at: Optional[datetime] = None
|
||||
self._last_status_code: Optional[int] = None
|
||||
self._last_error: Optional[str] = None
|
||||
|
||||
def start(self) -> None:
|
||||
if self._task is not None:
|
||||
return
|
||||
if not self._endpoint_id or self._http_endpoint_store is None:
|
||||
return
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
# No running loop — can't poll. Construction in a sync test
|
||||
# context is fine; the stream just stays idle until started
|
||||
# from an async context.
|
||||
return
|
||||
self._task = loop.create_task(self._poll_loop())
|
||||
|
||||
def stop(self) -> None:
|
||||
task = self._task
|
||||
self._task = None
|
||||
if task is not None:
|
||||
task.cancel()
|
||||
|
||||
def get_value(self) -> float:
|
||||
raw = self._raw_value
|
||||
if raw is None:
|
||||
return self._prev_normalized if self._prev_normalized is not None else 0.0
|
||||
try:
|
||||
numeric = float(raw)
|
||||
except (TypeError, ValueError):
|
||||
return self._prev_normalized if self._prev_normalized is not None else 0.0
|
||||
|
||||
rng = self._max_value - self._min_value
|
||||
if abs(rng) < 1e-9:
|
||||
normalized = 0.5
|
||||
else:
|
||||
normalized = (numeric - self._min_value) / rng
|
||||
normalized = max(0.0, min(1.0, normalized))
|
||||
|
||||
if self._smoothing > 0.0 and self._prev_normalized is not None:
|
||||
normalized = (
|
||||
self._smoothing * self._prev_normalized + (1.0 - self._smoothing) * normalized
|
||||
)
|
||||
self._prev_normalized = normalized
|
||||
return normalized
|
||||
|
||||
def get_raw_value(self) -> Any:
|
||||
"""Return the last raw extracted value (string, int, float, etc.)."""
|
||||
return self._raw_value
|
||||
|
||||
def update_source(self, source: "ValueSource") -> None:
|
||||
from ledgrab.storage.value_source import HTTPValueSource
|
||||
|
||||
if not isinstance(source, HTTPValueSource):
|
||||
return
|
||||
self._endpoint_id = source.http_endpoint_id
|
||||
self._json_path = source.json_path
|
||||
self._interval_s = max(1, int(source.interval_s))
|
||||
self._min_value = source.min_value
|
||||
self._max_value = source.max_value
|
||||
self._smoothing = source.smoothing
|
||||
|
||||
async def _poll_loop(self) -> None:
|
||||
from ledgrab.utils.safe_source import safe_request_bounded
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
endpoint = self._http_endpoint_store.get(self._endpoint_id)
|
||||
except EntityNotFoundError:
|
||||
# The endpoint was deleted out from under us. Stop the
|
||||
# poll task so it doesn't spin forever; the next entity
|
||||
# event (or the value source being deleted) will tidy
|
||||
# the rest of the bookkeeping.
|
||||
logger.warning(
|
||||
"HTTPValueStream stopping: endpoint %s no longer exists",
|
||||
self._endpoint_id,
|
||||
)
|
||||
self._last_error = "endpoint_deleted"
|
||||
self._raw_value = None
|
||||
self._task = None
|
||||
return
|
||||
except Exception as exc:
|
||||
self._last_error = f"Endpoint lookup failed: {type(exc).__name__}"
|
||||
self._raw_value = None
|
||||
await asyncio.sleep(self._interval_s)
|
||||
continue
|
||||
|
||||
headers = endpoint.build_request_headers()
|
||||
try:
|
||||
status, body_bytes, error = await safe_request_bounded(
|
||||
endpoint.method,
|
||||
endpoint.url,
|
||||
headers=headers,
|
||||
timeout=endpoint.timeout_s,
|
||||
)
|
||||
except Exception as exc:
|
||||
# safe_request_bounded raises HTTPException on URL
|
||||
# validation failure; treat that as a recoverable poll
|
||||
# error and try again next cycle.
|
||||
self._last_status_code = None
|
||||
self._last_error = f"URL validation failed: {type(exc).__name__}"
|
||||
self._raw_value = None
|
||||
await asyncio.sleep(self._interval_s)
|
||||
continue
|
||||
|
||||
self._last_status_code = status if status else None
|
||||
self._last_error = error
|
||||
if not error and status:
|
||||
try:
|
||||
body_text = body_bytes.decode("utf-8", errors="replace")
|
||||
except Exception:
|
||||
body_text = ""
|
||||
body_json: Any
|
||||
try:
|
||||
body_json = json.loads(body_text) if body_text else None
|
||||
except (ValueError, TypeError):
|
||||
body_json = None
|
||||
self._raw_value = _extract_simple_path(body_json, self._json_path, body_text)
|
||||
else:
|
||||
self._raw_value = None
|
||||
self._last_fetched_at = datetime.now()
|
||||
|
||||
await asyncio.sleep(self._interval_s)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
|
||||
|
||||
def _extract_simple_path(body_json: Any, path: str, body_text: str) -> Any:
|
||||
"""Extract a value via a dot-path (with optional ``[N]`` indices).
|
||||
|
||||
Uses module-level compiled regexes so repeated polls don't recompile.
|
||||
Returns ``None`` when the path doesn't resolve — runtime callers just
|
||||
need "the value, or nothing." Empty path returns the raw body text so
|
||||
plain-text endpoints work too.
|
||||
"""
|
||||
if not path:
|
||||
return body_text or None
|
||||
if body_json is None:
|
||||
return None
|
||||
current: Any = body_json
|
||||
for raw_segment in path.split("."):
|
||||
segment = raw_segment.strip()
|
||||
if not segment:
|
||||
continue
|
||||
name_match = _NAME_HEAD_RE.match(segment)
|
||||
name_part = name_match.group(1) if name_match else ""
|
||||
remainder = segment[len(name_part) :]
|
||||
if name_part:
|
||||
if not isinstance(current, dict) or name_part not in current:
|
||||
return None
|
||||
current = current[name_part]
|
||||
while remainder:
|
||||
idx_match = _INDEX_RE.match(remainder)
|
||||
if not idx_match:
|
||||
return None
|
||||
idx = int(idx_match.group(1))
|
||||
if not isinstance(current, list) or idx < 0 or idx >= len(current):
|
||||
return None
|
||||
current = current[idx]
|
||||
remainder = remainder[idx_match.end() :]
|
||||
return current
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gradient Map
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -1547,6 +1760,7 @@ class ValueStreamManager:
|
||||
event_bus: Optional["GameEventBus"] = None,
|
||||
audio_processing_template_store=None,
|
||||
sync_clock_manager: Optional["SyncClockManager"] = None,
|
||||
http_endpoint_store: Optional["HTTPEndpointStore"] = None,
|
||||
):
|
||||
self._value_source_store = value_source_store
|
||||
self._audio_capture_manager = audio_capture_manager
|
||||
@@ -1559,6 +1773,7 @@ class ValueStreamManager:
|
||||
self._event_bus = event_bus
|
||||
self._audio_processing_template_store = audio_processing_template_store
|
||||
self._sync_clock_manager = sync_clock_manager
|
||||
self._http_endpoint_store = http_endpoint_store
|
||||
self._streams: Dict[str, ValueStream] = {} # vs_id → stream
|
||||
self._ref_counts: Dict[str, int] = {} # vs_id → ref count
|
||||
# Tracks which clock_id (if any) was acquired for each stream so we
|
||||
@@ -1602,6 +1817,17 @@ class ValueStreamManager:
|
||||
else:
|
||||
logger.info(f"Released ref for value stream {vs_id} (refs={refs})")
|
||||
|
||||
def peek(self, vs_id: str) -> Optional[ValueStream]:
|
||||
"""Read-only accessor: return the running ValueStream for ``vs_id``
|
||||
if one exists, else ``None``.
|
||||
|
||||
Does NOT change ref counts. Use for consumer-driven reads where the
|
||||
caller already holds a reference via :meth:`acquire` (e.g. the
|
||||
:class:`AutomationEngine` evaluating an ``HTTPPollRule`` against a
|
||||
value source it has already acquired in ``_sync_value_stream_refs``).
|
||||
"""
|
||||
return self._streams.get(vs_id)
|
||||
|
||||
def update_source(self, vs_id: str) -> None:
|
||||
"""Hot-update the shared stream for the given ValueSource."""
|
||||
try:
|
||||
@@ -1699,158 +1925,52 @@ class ValueStreamManager:
|
||||
logger.info("Released all value streams")
|
||||
|
||||
def _create_stream(self, source: "ValueSource", vs_id: Optional[str] = None) -> ValueStream:
|
||||
"""Factory: create the appropriate ValueStream for a ValueSource."""
|
||||
from ledgrab.storage.value_source import (
|
||||
AdaptiveValueSource,
|
||||
AnimatedValueSource,
|
||||
AudioValueSource,
|
||||
CSSExtractValueSource,
|
||||
DaylightValueSource,
|
||||
GameEventValueSource,
|
||||
GradientMapValueSource,
|
||||
HAEntityValueSource,
|
||||
StaticValueSource,
|
||||
StaticColorValueSource,
|
||||
AnimatedColorValueSource,
|
||||
AdaptiveTimeColorValueSource,
|
||||
SystemMetricsValueSource,
|
||||
"""Build a ValueStream for *source* via the central kind registry.
|
||||
|
||||
The 14-branch ``isinstance`` ladder this method used to host was the
|
||||
canonical example of the parallel-change smell flagged in the
|
||||
architecture audit. The actual per-kind construction now lives in
|
||||
``ledgrab.core.processing.value_kinds.STREAM_BUILDERS``, keyed by
|
||||
``source.source_type``. This method only handles the manager-side
|
||||
bookkeeping that does not fit a uniform builder signature — namely
|
||||
the optional :class:`SyncClockRuntime` acquisition for
|
||||
``animated_color`` sources, whose ``vs_id → clock_id`` mapping the
|
||||
manager owns for symmetric release at teardown.
|
||||
"""
|
||||
from ledgrab.core.processing.value_kinds import (
|
||||
NEEDS_CLOCK_RUNTIME,
|
||||
ValueStreamDeps,
|
||||
build_stream,
|
||||
)
|
||||
|
||||
if isinstance(source, StaticValueSource):
|
||||
return StaticValueStream(value=source.value)
|
||||
|
||||
if isinstance(source, AnimatedValueSource):
|
||||
return AnimatedValueStream(
|
||||
waveform=source.waveform,
|
||||
speed=source.speed,
|
||||
min_value=source.min_value,
|
||||
max_value=source.max_value,
|
||||
)
|
||||
|
||||
if isinstance(source, AudioValueSource):
|
||||
return AudioValueStream(
|
||||
audio_source_id=source.audio_source_id,
|
||||
mode=source.mode,
|
||||
sensitivity=source.sensitivity,
|
||||
smoothing=source.smoothing,
|
||||
min_value=source.min_value,
|
||||
max_value=source.max_value,
|
||||
auto_gain=source.auto_gain,
|
||||
audio_capture_manager=self._audio_capture_manager,
|
||||
audio_source_store=self._audio_source_store,
|
||||
audio_template_store=self._audio_template_store,
|
||||
audio_processing_template_store=self._audio_processing_template_store,
|
||||
)
|
||||
|
||||
if isinstance(source, DaylightValueSource):
|
||||
return DaylightValueStream(
|
||||
speed=source.speed,
|
||||
use_real_time=source.use_real_time,
|
||||
latitude=source.latitude,
|
||||
longitude=source.longitude,
|
||||
min_value=source.min_value,
|
||||
max_value=source.max_value,
|
||||
)
|
||||
|
||||
if isinstance(source, AdaptiveValueSource):
|
||||
if source.source_type == "adaptive_scene":
|
||||
return SceneValueStream(
|
||||
picture_source_id=source.picture_source_id,
|
||||
scene_behavior=source.scene_behavior,
|
||||
sensitivity=source.sensitivity,
|
||||
smoothing=source.smoothing,
|
||||
min_value=source.min_value,
|
||||
max_value=source.max_value,
|
||||
live_stream_manager=self._live_stream_manager,
|
||||
)
|
||||
return TimeOfDayValueStream(
|
||||
schedule=source.schedule,
|
||||
min_value=source.min_value,
|
||||
max_value=source.max_value,
|
||||
)
|
||||
|
||||
# Color streams
|
||||
if isinstance(source, StaticColorValueSource):
|
||||
return StaticColorValueStream(color=source.color)
|
||||
|
||||
if isinstance(source, AnimatedColorValueSource):
|
||||
clock_runtime = None
|
||||
if source.clock_id and self._sync_clock_manager:
|
||||
clock_runtime = None
|
||||
if source.source_type in NEEDS_CLOCK_RUNTIME:
|
||||
clock_id = getattr(source, "clock_id", None)
|
||||
if clock_id and self._sync_clock_manager:
|
||||
try:
|
||||
clock_runtime = self._sync_clock_manager.acquire(source.clock_id)
|
||||
clock_runtime = self._sync_clock_manager.acquire(clock_id)
|
||||
if vs_id is not None:
|
||||
self._stream_clock_ids[vs_id] = source.clock_id
|
||||
self._stream_clock_ids[vs_id] = clock_id
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Could not acquire sync clock %s for value source %s: %s",
|
||||
source.clock_id,
|
||||
clock_id,
|
||||
source.id,
|
||||
e,
|
||||
)
|
||||
return AnimatedColorValueStream(
|
||||
colors=source.colors,
|
||||
speed=source.speed,
|
||||
easing=source.easing,
|
||||
clock=clock_runtime,
|
||||
)
|
||||
|
||||
if isinstance(source, AdaptiveTimeColorValueSource):
|
||||
return AdaptiveTimeColorValueStream(schedule=source.schedule)
|
||||
|
||||
if isinstance(source, HAEntityValueSource):
|
||||
return HAEntityValueStream(
|
||||
ha_source_id=source.ha_source_id,
|
||||
entity_id=source.entity_id,
|
||||
attribute=source.attribute,
|
||||
min_ha_value=source.min_ha_value,
|
||||
max_ha_value=source.max_ha_value,
|
||||
smoothing=source.smoothing,
|
||||
ha_manager=self._ha_manager,
|
||||
)
|
||||
|
||||
if isinstance(source, GradientMapValueSource):
|
||||
return GradientMapValueStream(
|
||||
value_source_id=source.value_source_id,
|
||||
gradient_id=source.gradient_id,
|
||||
easing=source.easing,
|
||||
value_stream_manager=self,
|
||||
gradient_store=self._gradient_store,
|
||||
)
|
||||
|
||||
if isinstance(source, CSSExtractValueSource):
|
||||
return CSSExtractValueStream(
|
||||
color_strip_source_id=source.color_strip_source_id,
|
||||
led_start=source.led_start,
|
||||
led_end=source.led_end,
|
||||
css_stream_manager=self._css_stream_manager,
|
||||
)
|
||||
|
||||
if isinstance(source, SystemMetricsValueSource):
|
||||
return SystemMetricsValueStream(
|
||||
metric=source.metric,
|
||||
min_value=source.min_value,
|
||||
max_value=source.max_value,
|
||||
max_rate=source.max_rate,
|
||||
disk_path=source.disk_path,
|
||||
sensor_label=source.sensor_label,
|
||||
poll_interval=source.poll_interval,
|
||||
smoothing=source.smoothing,
|
||||
)
|
||||
|
||||
if isinstance(source, GameEventValueSource):
|
||||
from ledgrab.core.value_sources.game_event_value_source import (
|
||||
GameEventValueStream,
|
||||
)
|
||||
|
||||
return GameEventValueStream(
|
||||
event_type=source.event_type,
|
||||
min_game_value=source.min_game_value,
|
||||
max_game_value=source.max_game_value,
|
||||
smoothing=source.smoothing,
|
||||
default_value=source.default_value,
|
||||
timeout=source.timeout,
|
||||
event_bus=self._event_bus,
|
||||
)
|
||||
|
||||
# Fallback
|
||||
return StaticValueStream(value=1.0)
|
||||
deps = ValueStreamDeps(
|
||||
value_stream_manager=self,
|
||||
audio_capture_manager=self._audio_capture_manager,
|
||||
audio_source_store=self._audio_source_store,
|
||||
audio_template_store=self._audio_template_store,
|
||||
audio_processing_template_store=self._audio_processing_template_store,
|
||||
live_stream_manager=self._live_stream_manager,
|
||||
ha_manager=self._ha_manager,
|
||||
gradient_store=self._gradient_store,
|
||||
css_stream_manager=self._css_stream_manager,
|
||||
event_bus=self._event_bus,
|
||||
http_endpoint_store=self._http_endpoint_store,
|
||||
clock_runtime=clock_runtime,
|
||||
)
|
||||
return build_stream(source, deps)
|
||||
|
||||
@@ -32,6 +32,15 @@ class BaseSqliteStore(Generic[T]):
|
||||
self._items: Dict[str, T] = {}
|
||||
self._deserializer = deserializer
|
||||
self._lock = threading.RLock()
|
||||
# Apply pending JSON-blob data migrations before loading rows so the
|
||||
# in-memory cache reflects the canonical, post-migration shape. The
|
||||
# runner is idempotent across stores — it consults a dedicated
|
||||
# ``data_migrations`` audit table — so each store construction may
|
||||
# invoke it without re-running already-applied migrations.
|
||||
# Imported lazily to avoid a circular dependency at module load.
|
||||
from ledgrab.storage.data_migrations import ALL_MIGRATIONS, MigrationRunner
|
||||
|
||||
MigrationRunner(db).run(ALL_MIGRATIONS)
|
||||
self._load()
|
||||
|
||||
# -- I/O -----------------------------------------------------------------
|
||||
|
||||
@@ -23,7 +23,13 @@ MAX_COMPOSITE_DEPTH = 4
|
||||
|
||||
|
||||
class ColorStripStore(BaseSqliteStore[ColorStripSource]):
|
||||
"""Persistent storage for color strip sources."""
|
||||
"""Persistent storage for color strip sources.
|
||||
|
||||
JSON-blob field renames (e.g. legacy ``source_type='static'`` →
|
||||
``'single_color'``) are handled by the central
|
||||
:mod:`ledgrab.storage.data_migrations` runner, which executes once per
|
||||
database from :meth:`Database._ensure_schema`.
|
||||
"""
|
||||
|
||||
_table_name = "color_strip_sources"
|
||||
_entity_name = "Color strip source"
|
||||
|
||||
@@ -0,0 +1,219 @@
|
||||
"""Versioned data migrations for stored JSON entities.
|
||||
|
||||
Each migration is a small class with a stable ``name`` (used as the idempotency
|
||||
key) and an ``apply(db)`` method that performs the change inside a transaction.
|
||||
The ``MigrationRunner`` keeps a ``data_migrations`` table that records which
|
||||
migrations have already executed; subsequent runs are a no-op.
|
||||
|
||||
This replaces ad-hoc per-store migrations that were rewriting raw JSON via
|
||||
``str.replace`` — that approach corrupted nested fields whose values happened
|
||||
to share a substring with the renamed key, and had no transaction or audit.
|
||||
|
||||
Adding a new migration
|
||||
----------------------
|
||||
|
||||
1. Subclass :class:`DataMigration`, give it a unique ``name`` (prefix with the
|
||||
next sequence number, e.g. ``"002_..."``) and implement ``apply``.
|
||||
2. Append it to :data:`ALL_MIGRATIONS` in commit order. Never reorder existing
|
||||
entries — the runner records each by name.
|
||||
3. The first ``ColorStripStore`` (or any other store) construction triggers the
|
||||
runner; nothing else has to change.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from ledgrab.storage.database import Database, is_writes_frozen
|
||||
from ledgrab.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class DataMigration(ABC):
|
||||
"""One JSON-blob migration step.
|
||||
|
||||
Subclasses must declare a class-level ``name`` and implement ``apply``.
|
||||
Implementations must operate on the supplied ``conn`` (an already-open
|
||||
transaction). They MUST NOT commit or open nested transactions — the
|
||||
:class:`MigrationRunner` owns the transaction so the data UPDATEs and the
|
||||
audit-table INSERT are atomic.
|
||||
"""
|
||||
|
||||
name: str = ""
|
||||
|
||||
@abstractmethod
|
||||
def apply(self, conn: sqlite3.Connection) -> int:
|
||||
"""Perform the migration inside the supplied transaction.
|
||||
|
||||
Return the number of rows changed.
|
||||
"""
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MigrationRecord:
|
||||
"""A row in the ``data_migrations`` audit table."""
|
||||
|
||||
name: str
|
||||
applied_at: str
|
||||
rows_changed: int
|
||||
|
||||
|
||||
class MigrationRunner:
|
||||
"""Apply pending data migrations exactly once per database.
|
||||
|
||||
Each call to :meth:`run` opens a single transaction that covers (a) the
|
||||
"is this migration already applied?" check, (b) the migration body, and
|
||||
(c) the audit INSERT. That guarantees:
|
||||
|
||||
* a partial-failure cannot leave data rewritten but unrecorded;
|
||||
* concurrent stores constructing on multiple threads cannot race each
|
||||
other into a UNIQUE-constraint crash on the audit table.
|
||||
|
||||
The runner skips silently when writes are frozen so a post-restore boot
|
||||
does not mutate the freshly-restored database before the imminent restart.
|
||||
"""
|
||||
|
||||
_TABLE = "data_migrations"
|
||||
|
||||
def __init__(self, db: Database):
|
||||
self._db = db
|
||||
self._ensure_table()
|
||||
|
||||
def _ensure_table(self) -> None:
|
||||
# CREATE TABLE is idempotent thanks to IF NOT EXISTS; running on every
|
||||
# startup is cheap. ``Database.execute`` does NOT honour the
|
||||
# ``is_writes_frozen`` guard (DDL is always permitted), so the audit
|
||||
# table reliably exists even on a frozen boot.
|
||||
self._db.execute(
|
||||
f"""
|
||||
CREATE TABLE IF NOT EXISTS {self._TABLE} (
|
||||
name TEXT PRIMARY KEY,
|
||||
applied_at TEXT NOT NULL,
|
||||
rows_changed INTEGER NOT NULL DEFAULT 0
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
def applied_names(self) -> set[str]:
|
||||
"""Return the set of migration names already recorded as applied."""
|
||||
cursor = self._db.execute(f"SELECT name FROM {self._TABLE}")
|
||||
return {row["name"] for row in cursor.fetchall()}
|
||||
|
||||
def run(self, migrations: list[DataMigration]) -> list[MigrationRecord]:
|
||||
"""Apply each migration in *migrations* that hasn't been recorded yet.
|
||||
|
||||
Returns the records for migrations that actually executed this call.
|
||||
Returns an empty list when writes are frozen.
|
||||
"""
|
||||
if is_writes_frozen():
|
||||
# Frozen-write databases are read-only between a restore and the
|
||||
# imminent restart. Surface this clearly because the in-memory
|
||||
# state may briefly reflect pre-migration data.
|
||||
logger.warning("Data migrations skipped: writes are frozen (restart expected).")
|
||||
return []
|
||||
|
||||
# Validate names up-front so an unnamed migration aborts before any
|
||||
# transaction work.
|
||||
for migration in migrations:
|
||||
if not migration.name:
|
||||
raise ValueError(
|
||||
f"DataMigration {type(migration).__name__} must declare a non-empty name"
|
||||
)
|
||||
|
||||
executed: list[MigrationRecord] = []
|
||||
for migration in migrations:
|
||||
# Use a separate transaction per migration so a failure rolls back
|
||||
# only the failing one and any earlier (already-recorded)
|
||||
# migrations keep their audit rows.
|
||||
with self._db.transaction() as conn:
|
||||
already_applied = conn.execute(
|
||||
f"SELECT 1 FROM {self._TABLE} WHERE name = ?",
|
||||
(migration.name,),
|
||||
).fetchone()
|
||||
if already_applied:
|
||||
continue
|
||||
logger.info("Applying data migration: %s", migration.name)
|
||||
rows_changed = int(migration.apply(conn))
|
||||
applied_at = datetime.now(timezone.utc).isoformat()
|
||||
# INSERT OR IGNORE defends against a hypothetical concurrent
|
||||
# writer that recorded the same name between the SELECT above
|
||||
# and this INSERT (the RLock serialises this in practice, but
|
||||
# the constraint is the durable contract).
|
||||
conn.execute(
|
||||
f"INSERT OR IGNORE INTO {self._TABLE} "
|
||||
f"(name, applied_at, rows_changed) VALUES (?, ?, ?)",
|
||||
(migration.name, applied_at, rows_changed),
|
||||
)
|
||||
executed.append(
|
||||
MigrationRecord(
|
||||
name=migration.name,
|
||||
applied_at=applied_at,
|
||||
rows_changed=rows_changed,
|
||||
)
|
||||
)
|
||||
if rows_changed:
|
||||
logger.warning("Migration %s rewrote %d row(s)", migration.name, rows_changed)
|
||||
|
||||
if executed:
|
||||
logger.info(
|
||||
"Applied %d data migration(s): %s",
|
||||
len(executed),
|
||||
", ".join(r.name for r in executed),
|
||||
)
|
||||
return executed
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Concrete migrations
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class StaticToSingleColorMigration(DataMigration):
|
||||
"""Rename the legacy ``source_type='static'`` color-strip kind to ``single_color``.
|
||||
|
||||
The pre-rename rows in ``color_strip_sources`` used ``source_type='static'``;
|
||||
the new canonical value is ``single_color``. The previous in-line string
|
||||
replace risked rewriting unrelated substrings (e.g. an animation type named
|
||||
``static_wave``). This migration parses the JSON, mutates the single field
|
||||
we care about, and writes the canonical serialization back. Runs inside
|
||||
the runner's transaction.
|
||||
"""
|
||||
|
||||
name = "001_color_strip_static_to_single_color"
|
||||
|
||||
def apply(self, conn: sqlite3.Connection) -> int:
|
||||
rows_changed = 0
|
||||
rows = conn.execute("SELECT id, data FROM [color_strip_sources]").fetchall()
|
||||
for row in rows:
|
||||
blob = row["data"]
|
||||
try:
|
||||
parsed = json.loads(blob)
|
||||
except json.JSONDecodeError as e:
|
||||
# A corrupt blob is a pre-existing problem; do not crash the
|
||||
# migration. The store's own load path will surface it.
|
||||
logger.warning(
|
||||
"Skipping corrupt color_strip_sources row %s during migration: %s",
|
||||
row["id"],
|
||||
e,
|
||||
)
|
||||
continue
|
||||
if parsed.get("source_type") != "static":
|
||||
continue
|
||||
parsed["source_type"] = "single_color"
|
||||
conn.execute(
|
||||
"UPDATE [color_strip_sources] SET data = ? WHERE id = ?",
|
||||
(json.dumps(parsed, ensure_ascii=False), row["id"]),
|
||||
)
|
||||
rows_changed += 1
|
||||
return rows_changed
|
||||
|
||||
|
||||
# Master list — ORDER MATTERS. Append new migrations; never reorder.
|
||||
ALL_MIGRATIONS: list[DataMigration] = [
|
||||
StaticToSingleColorMigration(),
|
||||
]
|
||||
@@ -0,0 +1,85 @@
|
||||
"""Regression tests for the CSS response builder dispatch.
|
||||
|
||||
These lock in the safety net introduced when the silent "fallback to a fake
|
||||
PictureCSSResponse" branch was removed:
|
||||
|
||||
* every concrete ColorStripSource subclass registered in ``_SOURCE_TYPE_MAP``
|
||||
must have a builder in ``_RESPONSE_MAP`` (verified at module import); and
|
||||
* the previously-missing ``GameEventColorStripSource`` round-trips through
|
||||
the response builder into the matching ``GameEventCSSResponse`` schema.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.api.routes.color_strip_sources import _helpers
|
||||
from ledgrab.api.schemas.color_strip_sources import GameEventCSSResponse
|
||||
from ledgrab.storage.color_strip_source import (
|
||||
ColorStripSource,
|
||||
GameEventColorStripSource,
|
||||
)
|
||||
|
||||
|
||||
def test_response_map_covers_every_registered_source_type():
|
||||
"""Every source_type in _SOURCE_TYPE_MAP has a response builder."""
|
||||
storage_kinds = set(_helpers._STORAGE_TYPE_MAP.keys())
|
||||
builder_kinds = set(_helpers._RESPONSE_MAP.keys())
|
||||
missing = storage_kinds - builder_kinds
|
||||
assert not missing, f"_RESPONSE_MAP is missing builders for: {sorted(missing)}"
|
||||
|
||||
|
||||
def test_assert_helper_raises_when_a_kind_is_unregistered(monkeypatch):
|
||||
"""Adding a kind to the storage registry without a builder is caught."""
|
||||
|
||||
class _PhantomColorStripSource(ColorStripSource):
|
||||
pass
|
||||
|
||||
monkeypatch.setitem(_helpers._STORAGE_TYPE_MAP, "phantom", _PhantomColorStripSource)
|
||||
with pytest.raises(RuntimeError, match="phantom"):
|
||||
_helpers._assert_response_map_coverage()
|
||||
|
||||
|
||||
def test_game_event_source_serialises_to_game_event_response():
|
||||
"""A GameEventColorStripSource is rendered as GameEventCSSResponse (not the
|
||||
fake PictureCSSResponse that the silent fallback used to return).
|
||||
"""
|
||||
now = datetime.now(timezone.utc)
|
||||
source = GameEventColorStripSource(
|
||||
id="css_gev0001",
|
||||
name="Counter-Strike feedback",
|
||||
source_type="game_event",
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
game_integration_id="gi_cs2_main",
|
||||
event_mappings=[{"event": "hit", "effect": "flash"}],
|
||||
led_count=60,
|
||||
)
|
||||
|
||||
response = _helpers._css_to_response(source)
|
||||
|
||||
assert isinstance(response, GameEventCSSResponse)
|
||||
assert response.source_type == "game_event"
|
||||
assert response.game_integration_id == "gi_cs2_main"
|
||||
assert response.event_mappings == [{"event": "hit", "effect": "flash"}]
|
||||
assert response.led_count == 60
|
||||
|
||||
|
||||
def test_unregistered_source_type_raises_in_css_to_response():
|
||||
"""Reaching ``_css_to_response`` with an unmapped source_type raises loudly."""
|
||||
|
||||
class _UnregisteredCSS(ColorStripSource):
|
||||
pass
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
source = _UnregisteredCSS(
|
||||
id="css_xxx",
|
||||
name="rogue",
|
||||
source_type="rogue_unregistered",
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
with pytest.raises(RuntimeError, match="No CSS response builder"):
|
||||
_helpers._css_to_response(source)
|
||||
@@ -0,0 +1,97 @@
|
||||
"""Tests for the unified color-strip stream-builder registry."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.core.processing import color_strip_kinds
|
||||
from ledgrab.core.processing.color_strip_kinds import (
|
||||
SHARABLE_KINDS,
|
||||
STREAM_BUILDERS,
|
||||
StreamDeps,
|
||||
build_stream,
|
||||
)
|
||||
from ledgrab.storage.color_strip_source import _SOURCE_TYPE_MAP
|
||||
|
||||
|
||||
def test_stream_builders_partition_storage_kinds_with_sharable():
|
||||
"""STREAM_BUILDERS ∪ SHARABLE_KINDS == storage source_types."""
|
||||
storage_kinds = set(_SOURCE_TYPE_MAP.keys())
|
||||
covered = set(STREAM_BUILDERS.keys()) | SHARABLE_KINDS
|
||||
assert (
|
||||
storage_kinds == covered
|
||||
), f"missing={storage_kinds - covered}, extra={covered - storage_kinds}"
|
||||
|
||||
|
||||
def test_sharable_kinds_list_matches_actual_sharable_property():
|
||||
"""The hand-coded SHARABLE_KINDS list mirrors the `sharable` @property values.
|
||||
|
||||
Every ColorStripSource subclass currently implements ``sharable`` as a
|
||||
literal ``return True``/``return False`` that does not touch ``self``,
|
||||
so ``fget(None)`` is a safe introspection. If a future subclass changes
|
||||
that contract — e.g. by computing sharability from instance state — this
|
||||
test will raise a clear error rather than silently misclassify the kind,
|
||||
forcing the maintainer to update both ``SHARABLE_KINDS`` and this test.
|
||||
"""
|
||||
|
||||
def _is_sharable(name: str, cls) -> bool:
|
||||
prop = getattr(cls, "sharable", None)
|
||||
if not isinstance(prop, property) or prop.fget is None:
|
||||
raise AssertionError(
|
||||
f"{cls.__name__} (source_type={name!r}) does not expose `sharable` "
|
||||
f"as a @property; SHARABLE_KINDS introspection no longer works."
|
||||
)
|
||||
try:
|
||||
return bool(prop.fget(None))
|
||||
except (AttributeError, TypeError) as e:
|
||||
raise AssertionError(
|
||||
f"{cls.__name__}.sharable now depends on `self` ({e}); update "
|
||||
"this test and SHARABLE_KINDS together."
|
||||
) from e
|
||||
|
||||
actually_sharable = {name for name, cls in _SOURCE_TYPE_MAP.items() if _is_sharable(name, cls)}
|
||||
assert actually_sharable == set(SHARABLE_KINDS), (
|
||||
"SHARABLE_KINDS drifted from the @property values: "
|
||||
f"actually_sharable={sorted(actually_sharable)}, "
|
||||
f"SHARABLE_KINDS={sorted(SHARABLE_KINDS)}"
|
||||
)
|
||||
|
||||
|
||||
def test_build_stream_raises_on_unknown_kind():
|
||||
class _FakeSource:
|
||||
source_type = "totally_unregistered_kind"
|
||||
id = "fake"
|
||||
|
||||
deps = StreamDeps(css_manager=None)
|
||||
with pytest.raises(ValueError, match="totally_unregistered_kind"):
|
||||
build_stream(_FakeSource(), deps)
|
||||
|
||||
|
||||
def test_coverage_assertion_raises_on_drift(monkeypatch):
|
||||
"""Adding a kind to storage's _SOURCE_TYPE_MAP without a builder fails fast."""
|
||||
|
||||
monkeypatch.setitem(_SOURCE_TYPE_MAP, "phantom", object)
|
||||
with pytest.raises(RuntimeError, match="phantom"):
|
||||
color_strip_kinds._assert_stream_kind_coverage()
|
||||
|
||||
|
||||
def test_legacy_static_alias_resolves_to_single_color_builder():
|
||||
"""The legacy 'static' source_type still maps to the single_color builder."""
|
||||
assert STREAM_BUILDERS["static"] is not None
|
||||
assert STREAM_BUILDERS["single_color"] is not None
|
||||
# The two route to the same loader, but builders are distinct closures —
|
||||
# call them with a tiny source stub and confirm same class.
|
||||
from ledgrab.core.processing.color_strip_stream import SingleColorStripStream
|
||||
from ledgrab.storage.color_strip_source import SingleColorStripSource
|
||||
from datetime import datetime, timezone
|
||||
|
||||
src = SingleColorStripSource(
|
||||
id="t",
|
||||
name="t",
|
||||
source_type="static",
|
||||
created_at=datetime.now(timezone.utc),
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
deps = StreamDeps(css_manager=None)
|
||||
s = STREAM_BUILDERS["static"](src, deps)
|
||||
assert isinstance(s, SingleColorStripStream)
|
||||
@@ -0,0 +1,64 @@
|
||||
"""Tests for the unified value-stream-builder registry."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.core.processing import value_kinds
|
||||
from ledgrab.core.processing.value_kinds import (
|
||||
NEEDS_CLOCK_RUNTIME,
|
||||
STREAM_BUILDERS,
|
||||
ValueStreamDeps,
|
||||
build_stream,
|
||||
)
|
||||
from ledgrab.storage.value_source import _VALUE_SOURCE_MAP
|
||||
|
||||
|
||||
def test_builders_cover_every_storage_kind():
|
||||
"""STREAM_BUILDERS keys == storage._VALUE_SOURCE_MAP keys."""
|
||||
storage_kinds = set(_VALUE_SOURCE_MAP.keys())
|
||||
builder_kinds = set(STREAM_BUILDERS.keys())
|
||||
assert storage_kinds == builder_kinds, (
|
||||
f"missing={storage_kinds - builder_kinds}, " f"extra={builder_kinds - storage_kinds}"
|
||||
)
|
||||
|
||||
|
||||
def test_build_stream_raises_on_unknown_kind():
|
||||
class _Fake:
|
||||
source_type = "totally_unregistered"
|
||||
id = "fake"
|
||||
|
||||
deps = ValueStreamDeps(value_stream_manager=None)
|
||||
with pytest.raises(ValueError, match="totally_unregistered"):
|
||||
build_stream(_Fake(), deps)
|
||||
|
||||
|
||||
def test_coverage_assertion_raises_on_drift(monkeypatch):
|
||||
"""A kind added to storage without a builder fails the import-time check."""
|
||||
monkeypatch.setitem(_VALUE_SOURCE_MAP, "phantom_kind", object)
|
||||
with pytest.raises(RuntimeError, match="phantom_kind"):
|
||||
value_kinds._assert_value_kind_coverage()
|
||||
|
||||
|
||||
def test_needs_clock_runtime_is_subset_of_registered_kinds():
|
||||
assert NEEDS_CLOCK_RUNTIME.issubset(set(STREAM_BUILDERS.keys()))
|
||||
|
||||
|
||||
def test_static_builder_returns_static_value_stream():
|
||||
from ledgrab.core.processing.value_stream import StaticValueStream
|
||||
from ledgrab.storage.value_source import StaticValueSource
|
||||
from datetime import datetime, timezone
|
||||
|
||||
src = StaticValueSource(
|
||||
id="vs_t",
|
||||
name="t",
|
||||
source_type="static",
|
||||
created_at=datetime.now(timezone.utc),
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
value=0.42,
|
||||
)
|
||||
deps = ValueStreamDeps(value_stream_manager=None)
|
||||
stream = build_stream(src, deps)
|
||||
assert isinstance(stream, StaticValueStream)
|
||||
# Sanity: the value flowed through
|
||||
assert stream.get_value() == 0.42
|
||||
@@ -0,0 +1,90 @@
|
||||
"""Tests for ColorStripStore startup migrations."""
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.storage.color_strip_source import SingleColorStripSource
|
||||
from ledgrab.storage.color_strip_store import ColorStripStore
|
||||
from ledgrab.storage.database import Database
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_db(tmp_path):
|
||||
db = Database(tmp_path / "test.db")
|
||||
yield db
|
||||
db.close()
|
||||
|
||||
|
||||
def _insert_legacy_row(db: Database, item_id: str, name: str, color: list[int]) -> dict:
|
||||
"""Insert a pre-rename color strip row with ``source_type='static'``."""
|
||||
from datetime import datetime, timezone
|
||||
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
blob = {
|
||||
"id": item_id,
|
||||
"name": name,
|
||||
"source_type": "static",
|
||||
"color": color,
|
||||
"animation": None,
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": now,
|
||||
"updated_at": now,
|
||||
"icon": "",
|
||||
"icon_color": "",
|
||||
}
|
||||
db.upsert("color_strip_sources", item_id, name, blob)
|
||||
return blob
|
||||
|
||||
|
||||
def test_legacy_static_row_is_rewritten_to_single_color(tmp_db):
|
||||
"""A row with ``source_type='static'`` is rewritten on first store load."""
|
||||
_insert_legacy_row(tmp_db, "css_legacy01", "Legacy Solid", [255, 0, 0])
|
||||
|
||||
# Sanity: legacy blob really is on disk
|
||||
raw_before = tmp_db.load_all("color_strip_sources")
|
||||
assert raw_before[0]["source_type"] == "static"
|
||||
|
||||
store = ColorStripStore(tmp_db)
|
||||
|
||||
# In-memory item is a SingleColorStripSource with the canonical type
|
||||
item = store.get("css_legacy01")
|
||||
assert isinstance(item, SingleColorStripSource)
|
||||
assert item.source_type == "single_color"
|
||||
|
||||
# The on-disk row was rewritten too — no second migration on next boot
|
||||
raw_after = tmp_db.load_all("color_strip_sources")
|
||||
assert raw_after[0]["source_type"] == "single_color"
|
||||
|
||||
|
||||
def test_already_migrated_row_is_left_alone(tmp_db):
|
||||
"""Rows already using ``single_color`` are not touched."""
|
||||
from datetime import datetime, timezone
|
||||
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
blob = {
|
||||
"id": "css_new01",
|
||||
"name": "Already Migrated",
|
||||
"source_type": "single_color",
|
||||
"color": [0, 128, 255],
|
||||
"animation": None,
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": now,
|
||||
"updated_at": now,
|
||||
"icon": "",
|
||||
"icon_color": "",
|
||||
}
|
||||
tmp_db.upsert("color_strip_sources", "css_new01", "Already Migrated", blob)
|
||||
|
||||
raw_before = tmp_db.load_all("color_strip_sources")
|
||||
before_json = json.dumps(raw_before[0], sort_keys=True)
|
||||
|
||||
store = ColorStripStore(tmp_db)
|
||||
item = store.get("css_new01")
|
||||
assert isinstance(item, SingleColorStripSource)
|
||||
|
||||
raw_after = tmp_db.load_all("color_strip_sources")
|
||||
after_json = json.dumps(raw_after[0], sort_keys=True)
|
||||
assert before_json == after_json
|
||||
@@ -0,0 +1,309 @@
|
||||
"""Tests for the versioned data-migration runner.
|
||||
|
||||
These verify the safety properties that replaced the string-replace migration:
|
||||
|
||||
* migrations parse JSON properly (no substring corruption);
|
||||
* the runner records each migration name in ``data_migrations`` so subsequent
|
||||
runs are no-ops;
|
||||
* a migration that touches no rows still gets recorded as applied;
|
||||
* a migration that raises does not get recorded (so a fixed migration can be
|
||||
re-attempted on the next boot);
|
||||
* the legacy ``static`` rename only rewrites rows whose ``source_type`` field
|
||||
actually equals ``static`` — values that merely contain the substring (e.g.
|
||||
an animation type ``static_wave``) are left untouched.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.storage.data_migrations import (
|
||||
ALL_MIGRATIONS,
|
||||
DataMigration,
|
||||
MigrationRunner,
|
||||
StaticToSingleColorMigration,
|
||||
)
|
||||
from ledgrab.storage.database import Database
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_db(tmp_path):
|
||||
db = Database(tmp_path / "test.db")
|
||||
yield db
|
||||
db.close()
|
||||
|
||||
|
||||
def _now() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def _insert_css(db: Database, item_id: str, blob: dict) -> None:
|
||||
db.upsert("color_strip_sources", item_id, blob.get("name", ""), blob)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Runner mechanics
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_runner_creates_audit_table(tmp_db):
|
||||
MigrationRunner(tmp_db)
|
||||
cursor = tmp_db.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND name='data_migrations'"
|
||||
)
|
||||
assert cursor.fetchone() is not None
|
||||
|
||||
|
||||
def test_runner_records_applied_migration(tmp_db):
|
||||
class _Noop(DataMigration):
|
||||
name = "test_noop"
|
||||
|
||||
def apply(self, conn):
|
||||
return 0
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
records = runner.run([_Noop()])
|
||||
|
||||
assert [r.name for r in records] == ["test_noop"]
|
||||
assert "test_noop" in runner.applied_names()
|
||||
|
||||
|
||||
def test_runner_skips_already_applied_migration(tmp_db):
|
||||
calls = 0
|
||||
|
||||
class _CountedMigration(DataMigration):
|
||||
name = "test_counted"
|
||||
|
||||
def apply(self, conn):
|
||||
nonlocal calls
|
||||
calls += 1
|
||||
return 0
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
runner.run([_CountedMigration()])
|
||||
runner.run([_CountedMigration()])
|
||||
runner.run([_CountedMigration()])
|
||||
|
||||
assert calls == 1
|
||||
|
||||
|
||||
def test_runner_records_zero_row_migration_so_it_does_not_repeat(tmp_db):
|
||||
"""Even when a migration changes 0 rows, it is recorded as applied."""
|
||||
|
||||
class _NoMatch(DataMigration):
|
||||
name = "test_no_match"
|
||||
|
||||
def apply(self, conn):
|
||||
return 0
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
runner.run([_NoMatch()])
|
||||
|
||||
assert "test_no_match" in runner.applied_names()
|
||||
|
||||
|
||||
def test_runner_does_not_record_failed_migration(tmp_db):
|
||||
class _Boom(DataMigration):
|
||||
name = "test_boom"
|
||||
|
||||
def apply(self, conn):
|
||||
raise RuntimeError("kaboom")
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
with pytest.raises(RuntimeError, match="kaboom"):
|
||||
runner.run([_Boom()])
|
||||
|
||||
assert "test_boom" not in runner.applied_names()
|
||||
|
||||
|
||||
def test_runner_rejects_unnamed_migration(tmp_db):
|
||||
class _Unnamed(DataMigration):
|
||||
def apply(self, conn):
|
||||
return 0
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
with pytest.raises(ValueError, match="non-empty name"):
|
||||
runner.run([_Unnamed()])
|
||||
|
||||
|
||||
def test_runner_rolls_back_data_change_when_migration_raises(tmp_db):
|
||||
"""A migration that raises mid-way leaves no partial UPDATEs behind."""
|
||||
blob = {
|
||||
"id": "css_partial",
|
||||
"name": "Partial",
|
||||
"source_type": "static",
|
||||
"color": [10, 20, 30],
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": _now(),
|
||||
"updated_at": _now(),
|
||||
}
|
||||
_insert_css(tmp_db, "css_partial", blob)
|
||||
|
||||
class _PartialBoom(DataMigration):
|
||||
name = "test_partial_boom"
|
||||
|
||||
def apply(self, conn):
|
||||
conn.execute(
|
||||
"UPDATE color_strip_sources SET data = ? WHERE id = ?",
|
||||
(json.dumps({**blob, "source_type": "single_color"}), "css_partial"),
|
||||
)
|
||||
raise RuntimeError("explode after update")
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
with pytest.raises(RuntimeError, match="explode after update"):
|
||||
runner.run([_PartialBoom()])
|
||||
|
||||
rows = tmp_db.load_all("color_strip_sources")
|
||||
assert rows[0]["source_type"] == "static", "UPDATE was not rolled back"
|
||||
assert "test_partial_boom" not in runner.applied_names()
|
||||
|
||||
|
||||
def test_runner_skips_when_writes_are_frozen(tmp_db, monkeypatch):
|
||||
"""Frozen-write databases must not be mutated by the runner."""
|
||||
monkeypatch.setattr("ledgrab.storage.data_migrations.is_writes_frozen", lambda: True)
|
||||
|
||||
calls = 0
|
||||
|
||||
class _Counted(DataMigration):
|
||||
name = "test_frozen"
|
||||
|
||||
def apply(self, conn):
|
||||
nonlocal calls
|
||||
calls += 1
|
||||
return 0
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
records = runner.run([_Counted()])
|
||||
|
||||
assert records == []
|
||||
assert calls == 0
|
||||
assert "test_frozen" not in runner.applied_names()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Legacy static-rename specifics
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_static_rename_only_touches_source_type_field(tmp_db):
|
||||
"""A row whose ``animation.type`` happens to contain ``static`` survives intact."""
|
||||
blob = {
|
||||
"id": "css_safe01",
|
||||
"name": "Has static_wave anim",
|
||||
"source_type": "gradient", # not "static" — should be untouched
|
||||
"animation": {"type": "static_wave", "speed": 1.0}, # substring trap
|
||||
"stops": [{"position": 0.0, "color": [10, 20, 30]}],
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": _now(),
|
||||
"updated_at": _now(),
|
||||
}
|
||||
_insert_css(tmp_db, "css_safe01", blob)
|
||||
|
||||
# Run the migration via the runner (clears the existing audit row first
|
||||
# because the Database fixture already triggered it through any incidental
|
||||
# store creation — here we drive it directly).
|
||||
runner = MigrationRunner(tmp_db)
|
||||
runner.run([StaticToSingleColorMigration()])
|
||||
|
||||
rows = tmp_db.load_all("color_strip_sources")
|
||||
assert len(rows) == 1
|
||||
assert rows[0]["source_type"] == "gradient"
|
||||
assert rows[0]["animation"]["type"] == "static_wave"
|
||||
|
||||
|
||||
def test_static_rename_rewrites_only_legacy_rows(tmp_db):
|
||||
legacy = {
|
||||
"id": "css_legacy01",
|
||||
"name": "Legacy",
|
||||
"source_type": "static",
|
||||
"color": [255, 0, 0],
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": _now(),
|
||||
"updated_at": _now(),
|
||||
}
|
||||
current = {
|
||||
"id": "css_modern01",
|
||||
"name": "Modern",
|
||||
"source_type": "single_color",
|
||||
"color": [0, 128, 255],
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": _now(),
|
||||
"updated_at": _now(),
|
||||
}
|
||||
_insert_css(tmp_db, "css_legacy01", legacy)
|
||||
_insert_css(tmp_db, "css_modern01", current)
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
runner.run([StaticToSingleColorMigration()])
|
||||
|
||||
rows = {r["id"]: r for r in tmp_db.load_all("color_strip_sources")}
|
||||
assert rows["css_legacy01"]["source_type"] == "single_color"
|
||||
assert rows["css_modern01"]["source_type"] == "single_color"
|
||||
|
||||
|
||||
def test_static_rename_survives_corrupt_blob(tmp_db):
|
||||
"""A row whose JSON blob is corrupt does not crash the migration."""
|
||||
# Insert a row with deliberately broken JSON via direct SQL (bypass upsert
|
||||
# which would re-serialize). We use a hand-crafted blob.
|
||||
tmp_db.execute(
|
||||
"INSERT INTO color_strip_sources (id, name, data) VALUES (?, ?, ?)",
|
||||
("css_bad01", "Bad", "{not valid json"),
|
||||
)
|
||||
legacy = {
|
||||
"id": "css_legacy02",
|
||||
"name": "Legacy after corrupt",
|
||||
"source_type": "static",
|
||||
"color": [1, 2, 3],
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": _now(),
|
||||
"updated_at": _now(),
|
||||
}
|
||||
_insert_css(tmp_db, "css_legacy02", legacy)
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
runner.run([StaticToSingleColorMigration()])
|
||||
|
||||
# Legacy row still migrated, corrupt row untouched, no exception
|
||||
cursor = tmp_db.execute("SELECT id, data FROM color_strip_sources")
|
||||
out = {row["id"]: row["data"] for row in cursor.fetchall()}
|
||||
assert out["css_bad01"] == "{not valid json"
|
||||
assert json.loads(out["css_legacy02"])["source_type"] == "single_color"
|
||||
|
||||
|
||||
def test_all_migrations_have_unique_names():
|
||||
names = [m.name for m in ALL_MIGRATIONS]
|
||||
assert len(names) == len(set(names)), f"duplicate names: {names}"
|
||||
|
||||
|
||||
def test_color_strip_store_construction_records_static_migration(tmp_db):
|
||||
"""Integration: constructing the store applies AND records the migration."""
|
||||
from ledgrab.storage.color_strip_source import SingleColorStripSource
|
||||
from ledgrab.storage.color_strip_store import ColorStripStore
|
||||
|
||||
legacy = {
|
||||
"id": "css_int01",
|
||||
"name": "Integration legacy",
|
||||
"source_type": "static",
|
||||
"color": [1, 2, 3],
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": _now(),
|
||||
"updated_at": _now(),
|
||||
}
|
||||
_insert_css(tmp_db, "css_int01", legacy)
|
||||
|
||||
store = ColorStripStore(tmp_db)
|
||||
|
||||
# In-memory shape is correct
|
||||
assert isinstance(store.get("css_int01"), SingleColorStripSource)
|
||||
# Audit row exists
|
||||
runner = MigrationRunner(tmp_db)
|
||||
assert "001_color_strip_static_to_single_color" in runner.applied_names()
|
||||
Reference in New Issue
Block a user