refactor(storage,processing): kind registries + versioned data migrations
Two CRITICAL data-safety bugs from the architecture audit and the two
worst parallel-change problems are fixed in one coherent pass.
Audit findings addressed:
- C2 silent CSS response fallback. The previous _RESPONSE_MAP fell
through to a fabricated PictureCSSResponse whenever a source
class lacked an entry; in particular game_event sources were
silently mis-shaped. Now: GameEventCSSResponse/Create/Update
schemas exist, _RESPONSE_MAP is re-keyed by source_type string,
an import-time _assert_response_map_coverage() requires symmetric
agreement with storage._SOURCE_TYPE_MAP, and the runtime path
raises instead of fabricating a response.
- C11 string-replace JSON migration. ColorStripStore used
blob.replace('"source_type": "static"', '"source_type":
"single_color"') which can corrupt unrelated substrings (e.g.
an animation type named "static_wave") and provides no audit,
no transaction, no idempotency. Replaced with
storage.data_migrations.MigrationRunner backed by a
data_migrations audit table. Each migration runs inside one
db.transaction() that covers the applied-check, the apply(),
and the audit-INSERT — partial failures roll back atomically.
StaticToSingleColorMigration parses each row with json.loads
and mutates only the source_type field. Frozen-write databases
skip with a warning.
- C3+C4 color-strip stream dispatch. The 7-branch elif in
ColorStripStreamManager.acquire() and the duplicate one in
ws_stream._create_stream() now share a single STREAM_BUILDERS
registry in core.processing.color_strip_kinds, keyed by
source.source_type. Both call sites populate a StreamDeps bag
and delegate to build_stream(). _assert_stream_kind_coverage()
asserts at import that STREAM_BUILDERS plus SHARABLE_KINDS
partitions storage._SOURCE_TYPE_MAP. ws_stream's preview path
wraps each FastAPI-DI getter in _safe() so non-audio previews
no longer crash when audio/CSPT stores are not wired.
- C6+C7 value stream dispatch. The 14-branch isinstance ladder in
ValueStreamManager._create_stream and its silent
StaticValueStream(value=1.0) fallback are replaced by
core.processing.value_kinds.STREAM_BUILDERS, keyed by
source_type string (so AdaptiveValueSource's adaptive_time and
adaptive_scene route to different builders correctly). The
manager retains only the SyncClockRuntime pre-acquisition step
for animated_color (kinds needing this are listed explicitly
in NEEDS_CLOCK_RUNTIME). Symmetric coverage assertion plus a
separate assertion that NEEDS_CLOCK_RUNTIME is a subset of the
registry.
Bundled in: the static->single_color rename plus the HTTPValueStream
/ http_endpoint introduction that were already in flight on this
branch share these files; the registry refactor naturally absorbs
both via the new "single_color" / "static" alias entries and the
_build_http builder.
Tests: 26 new tests cover response-map coverage drift, migration
runner audit-table mechanics + transactional rollback +
frozen-write skip, and the two stream-builder registries. 343
existing storage / API / e2e tests stay green. Ruff clean.
This commit is contained in:
@@ -0,0 +1,85 @@
|
||||
"""Regression tests for the CSS response builder dispatch.
|
||||
|
||||
These lock in the safety net introduced when the silent "fallback to a fake
|
||||
PictureCSSResponse" branch was removed:
|
||||
|
||||
* every concrete ColorStripSource subclass registered in ``_SOURCE_TYPE_MAP``
|
||||
must have a builder in ``_RESPONSE_MAP`` (verified at module import); and
|
||||
* the previously-missing ``GameEventColorStripSource`` round-trips through
|
||||
the response builder into the matching ``GameEventCSSResponse`` schema.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.api.routes.color_strip_sources import _helpers
|
||||
from ledgrab.api.schemas.color_strip_sources import GameEventCSSResponse
|
||||
from ledgrab.storage.color_strip_source import (
|
||||
ColorStripSource,
|
||||
GameEventColorStripSource,
|
||||
)
|
||||
|
||||
|
||||
def test_response_map_covers_every_registered_source_type():
|
||||
"""Every source_type in _SOURCE_TYPE_MAP has a response builder."""
|
||||
storage_kinds = set(_helpers._STORAGE_TYPE_MAP.keys())
|
||||
builder_kinds = set(_helpers._RESPONSE_MAP.keys())
|
||||
missing = storage_kinds - builder_kinds
|
||||
assert not missing, f"_RESPONSE_MAP is missing builders for: {sorted(missing)}"
|
||||
|
||||
|
||||
def test_assert_helper_raises_when_a_kind_is_unregistered(monkeypatch):
|
||||
"""Adding a kind to the storage registry without a builder is caught."""
|
||||
|
||||
class _PhantomColorStripSource(ColorStripSource):
|
||||
pass
|
||||
|
||||
monkeypatch.setitem(_helpers._STORAGE_TYPE_MAP, "phantom", _PhantomColorStripSource)
|
||||
with pytest.raises(RuntimeError, match="phantom"):
|
||||
_helpers._assert_response_map_coverage()
|
||||
|
||||
|
||||
def test_game_event_source_serialises_to_game_event_response():
|
||||
"""A GameEventColorStripSource is rendered as GameEventCSSResponse (not the
|
||||
fake PictureCSSResponse that the silent fallback used to return).
|
||||
"""
|
||||
now = datetime.now(timezone.utc)
|
||||
source = GameEventColorStripSource(
|
||||
id="css_gev0001",
|
||||
name="Counter-Strike feedback",
|
||||
source_type="game_event",
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
game_integration_id="gi_cs2_main",
|
||||
event_mappings=[{"event": "hit", "effect": "flash"}],
|
||||
led_count=60,
|
||||
)
|
||||
|
||||
response = _helpers._css_to_response(source)
|
||||
|
||||
assert isinstance(response, GameEventCSSResponse)
|
||||
assert response.source_type == "game_event"
|
||||
assert response.game_integration_id == "gi_cs2_main"
|
||||
assert response.event_mappings == [{"event": "hit", "effect": "flash"}]
|
||||
assert response.led_count == 60
|
||||
|
||||
|
||||
def test_unregistered_source_type_raises_in_css_to_response():
|
||||
"""Reaching ``_css_to_response`` with an unmapped source_type raises loudly."""
|
||||
|
||||
class _UnregisteredCSS(ColorStripSource):
|
||||
pass
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
source = _UnregisteredCSS(
|
||||
id="css_xxx",
|
||||
name="rogue",
|
||||
source_type="rogue_unregistered",
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
with pytest.raises(RuntimeError, match="No CSS response builder"):
|
||||
_helpers._css_to_response(source)
|
||||
@@ -0,0 +1,97 @@
|
||||
"""Tests for the unified color-strip stream-builder registry."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.core.processing import color_strip_kinds
|
||||
from ledgrab.core.processing.color_strip_kinds import (
|
||||
SHARABLE_KINDS,
|
||||
STREAM_BUILDERS,
|
||||
StreamDeps,
|
||||
build_stream,
|
||||
)
|
||||
from ledgrab.storage.color_strip_source import _SOURCE_TYPE_MAP
|
||||
|
||||
|
||||
def test_stream_builders_partition_storage_kinds_with_sharable():
|
||||
"""STREAM_BUILDERS ∪ SHARABLE_KINDS == storage source_types."""
|
||||
storage_kinds = set(_SOURCE_TYPE_MAP.keys())
|
||||
covered = set(STREAM_BUILDERS.keys()) | SHARABLE_KINDS
|
||||
assert (
|
||||
storage_kinds == covered
|
||||
), f"missing={storage_kinds - covered}, extra={covered - storage_kinds}"
|
||||
|
||||
|
||||
def test_sharable_kinds_list_matches_actual_sharable_property():
|
||||
"""The hand-coded SHARABLE_KINDS list mirrors the `sharable` @property values.
|
||||
|
||||
Every ColorStripSource subclass currently implements ``sharable`` as a
|
||||
literal ``return True``/``return False`` that does not touch ``self``,
|
||||
so ``fget(None)`` is a safe introspection. If a future subclass changes
|
||||
that contract — e.g. by computing sharability from instance state — this
|
||||
test will raise a clear error rather than silently misclassify the kind,
|
||||
forcing the maintainer to update both ``SHARABLE_KINDS`` and this test.
|
||||
"""
|
||||
|
||||
def _is_sharable(name: str, cls) -> bool:
|
||||
prop = getattr(cls, "sharable", None)
|
||||
if not isinstance(prop, property) or prop.fget is None:
|
||||
raise AssertionError(
|
||||
f"{cls.__name__} (source_type={name!r}) does not expose `sharable` "
|
||||
f"as a @property; SHARABLE_KINDS introspection no longer works."
|
||||
)
|
||||
try:
|
||||
return bool(prop.fget(None))
|
||||
except (AttributeError, TypeError) as e:
|
||||
raise AssertionError(
|
||||
f"{cls.__name__}.sharable now depends on `self` ({e}); update "
|
||||
"this test and SHARABLE_KINDS together."
|
||||
) from e
|
||||
|
||||
actually_sharable = {name for name, cls in _SOURCE_TYPE_MAP.items() if _is_sharable(name, cls)}
|
||||
assert actually_sharable == set(SHARABLE_KINDS), (
|
||||
"SHARABLE_KINDS drifted from the @property values: "
|
||||
f"actually_sharable={sorted(actually_sharable)}, "
|
||||
f"SHARABLE_KINDS={sorted(SHARABLE_KINDS)}"
|
||||
)
|
||||
|
||||
|
||||
def test_build_stream_raises_on_unknown_kind():
|
||||
class _FakeSource:
|
||||
source_type = "totally_unregistered_kind"
|
||||
id = "fake"
|
||||
|
||||
deps = StreamDeps(css_manager=None)
|
||||
with pytest.raises(ValueError, match="totally_unregistered_kind"):
|
||||
build_stream(_FakeSource(), deps)
|
||||
|
||||
|
||||
def test_coverage_assertion_raises_on_drift(monkeypatch):
|
||||
"""Adding a kind to storage's _SOURCE_TYPE_MAP without a builder fails fast."""
|
||||
|
||||
monkeypatch.setitem(_SOURCE_TYPE_MAP, "phantom", object)
|
||||
with pytest.raises(RuntimeError, match="phantom"):
|
||||
color_strip_kinds._assert_stream_kind_coverage()
|
||||
|
||||
|
||||
def test_legacy_static_alias_resolves_to_single_color_builder():
|
||||
"""The legacy 'static' source_type still maps to the single_color builder."""
|
||||
assert STREAM_BUILDERS["static"] is not None
|
||||
assert STREAM_BUILDERS["single_color"] is not None
|
||||
# The two route to the same loader, but builders are distinct closures —
|
||||
# call them with a tiny source stub and confirm same class.
|
||||
from ledgrab.core.processing.color_strip_stream import SingleColorStripStream
|
||||
from ledgrab.storage.color_strip_source import SingleColorStripSource
|
||||
from datetime import datetime, timezone
|
||||
|
||||
src = SingleColorStripSource(
|
||||
id="t",
|
||||
name="t",
|
||||
source_type="static",
|
||||
created_at=datetime.now(timezone.utc),
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
deps = StreamDeps(css_manager=None)
|
||||
s = STREAM_BUILDERS["static"](src, deps)
|
||||
assert isinstance(s, SingleColorStripStream)
|
||||
@@ -0,0 +1,64 @@
|
||||
"""Tests for the unified value-stream-builder registry."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.core.processing import value_kinds
|
||||
from ledgrab.core.processing.value_kinds import (
|
||||
NEEDS_CLOCK_RUNTIME,
|
||||
STREAM_BUILDERS,
|
||||
ValueStreamDeps,
|
||||
build_stream,
|
||||
)
|
||||
from ledgrab.storage.value_source import _VALUE_SOURCE_MAP
|
||||
|
||||
|
||||
def test_builders_cover_every_storage_kind():
|
||||
"""STREAM_BUILDERS keys == storage._VALUE_SOURCE_MAP keys."""
|
||||
storage_kinds = set(_VALUE_SOURCE_MAP.keys())
|
||||
builder_kinds = set(STREAM_BUILDERS.keys())
|
||||
assert storage_kinds == builder_kinds, (
|
||||
f"missing={storage_kinds - builder_kinds}, " f"extra={builder_kinds - storage_kinds}"
|
||||
)
|
||||
|
||||
|
||||
def test_build_stream_raises_on_unknown_kind():
|
||||
class _Fake:
|
||||
source_type = "totally_unregistered"
|
||||
id = "fake"
|
||||
|
||||
deps = ValueStreamDeps(value_stream_manager=None)
|
||||
with pytest.raises(ValueError, match="totally_unregistered"):
|
||||
build_stream(_Fake(), deps)
|
||||
|
||||
|
||||
def test_coverage_assertion_raises_on_drift(monkeypatch):
|
||||
"""A kind added to storage without a builder fails the import-time check."""
|
||||
monkeypatch.setitem(_VALUE_SOURCE_MAP, "phantom_kind", object)
|
||||
with pytest.raises(RuntimeError, match="phantom_kind"):
|
||||
value_kinds._assert_value_kind_coverage()
|
||||
|
||||
|
||||
def test_needs_clock_runtime_is_subset_of_registered_kinds():
|
||||
assert NEEDS_CLOCK_RUNTIME.issubset(set(STREAM_BUILDERS.keys()))
|
||||
|
||||
|
||||
def test_static_builder_returns_static_value_stream():
|
||||
from ledgrab.core.processing.value_stream import StaticValueStream
|
||||
from ledgrab.storage.value_source import StaticValueSource
|
||||
from datetime import datetime, timezone
|
||||
|
||||
src = StaticValueSource(
|
||||
id="vs_t",
|
||||
name="t",
|
||||
source_type="static",
|
||||
created_at=datetime.now(timezone.utc),
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
value=0.42,
|
||||
)
|
||||
deps = ValueStreamDeps(value_stream_manager=None)
|
||||
stream = build_stream(src, deps)
|
||||
assert isinstance(stream, StaticValueStream)
|
||||
# Sanity: the value flowed through
|
||||
assert stream.get_value() == 0.42
|
||||
@@ -0,0 +1,90 @@
|
||||
"""Tests for ColorStripStore startup migrations."""
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.storage.color_strip_source import SingleColorStripSource
|
||||
from ledgrab.storage.color_strip_store import ColorStripStore
|
||||
from ledgrab.storage.database import Database
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_db(tmp_path):
|
||||
db = Database(tmp_path / "test.db")
|
||||
yield db
|
||||
db.close()
|
||||
|
||||
|
||||
def _insert_legacy_row(db: Database, item_id: str, name: str, color: list[int]) -> dict:
|
||||
"""Insert a pre-rename color strip row with ``source_type='static'``."""
|
||||
from datetime import datetime, timezone
|
||||
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
blob = {
|
||||
"id": item_id,
|
||||
"name": name,
|
||||
"source_type": "static",
|
||||
"color": color,
|
||||
"animation": None,
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": now,
|
||||
"updated_at": now,
|
||||
"icon": "",
|
||||
"icon_color": "",
|
||||
}
|
||||
db.upsert("color_strip_sources", item_id, name, blob)
|
||||
return blob
|
||||
|
||||
|
||||
def test_legacy_static_row_is_rewritten_to_single_color(tmp_db):
|
||||
"""A row with ``source_type='static'`` is rewritten on first store load."""
|
||||
_insert_legacy_row(tmp_db, "css_legacy01", "Legacy Solid", [255, 0, 0])
|
||||
|
||||
# Sanity: legacy blob really is on disk
|
||||
raw_before = tmp_db.load_all("color_strip_sources")
|
||||
assert raw_before[0]["source_type"] == "static"
|
||||
|
||||
store = ColorStripStore(tmp_db)
|
||||
|
||||
# In-memory item is a SingleColorStripSource with the canonical type
|
||||
item = store.get("css_legacy01")
|
||||
assert isinstance(item, SingleColorStripSource)
|
||||
assert item.source_type == "single_color"
|
||||
|
||||
# The on-disk row was rewritten too — no second migration on next boot
|
||||
raw_after = tmp_db.load_all("color_strip_sources")
|
||||
assert raw_after[0]["source_type"] == "single_color"
|
||||
|
||||
|
||||
def test_already_migrated_row_is_left_alone(tmp_db):
|
||||
"""Rows already using ``single_color`` are not touched."""
|
||||
from datetime import datetime, timezone
|
||||
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
blob = {
|
||||
"id": "css_new01",
|
||||
"name": "Already Migrated",
|
||||
"source_type": "single_color",
|
||||
"color": [0, 128, 255],
|
||||
"animation": None,
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": now,
|
||||
"updated_at": now,
|
||||
"icon": "",
|
||||
"icon_color": "",
|
||||
}
|
||||
tmp_db.upsert("color_strip_sources", "css_new01", "Already Migrated", blob)
|
||||
|
||||
raw_before = tmp_db.load_all("color_strip_sources")
|
||||
before_json = json.dumps(raw_before[0], sort_keys=True)
|
||||
|
||||
store = ColorStripStore(tmp_db)
|
||||
item = store.get("css_new01")
|
||||
assert isinstance(item, SingleColorStripSource)
|
||||
|
||||
raw_after = tmp_db.load_all("color_strip_sources")
|
||||
after_json = json.dumps(raw_after[0], sort_keys=True)
|
||||
assert before_json == after_json
|
||||
@@ -0,0 +1,309 @@
|
||||
"""Tests for the versioned data-migration runner.
|
||||
|
||||
These verify the safety properties that replaced the string-replace migration:
|
||||
|
||||
* migrations parse JSON properly (no substring corruption);
|
||||
* the runner records each migration name in ``data_migrations`` so subsequent
|
||||
runs are no-ops;
|
||||
* a migration that touches no rows still gets recorded as applied;
|
||||
* a migration that raises does not get recorded (so a fixed migration can be
|
||||
re-attempted on the next boot);
|
||||
* the legacy ``static`` rename only rewrites rows whose ``source_type`` field
|
||||
actually equals ``static`` — values that merely contain the substring (e.g.
|
||||
an animation type ``static_wave``) are left untouched.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.storage.data_migrations import (
|
||||
ALL_MIGRATIONS,
|
||||
DataMigration,
|
||||
MigrationRunner,
|
||||
StaticToSingleColorMigration,
|
||||
)
|
||||
from ledgrab.storage.database import Database
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_db(tmp_path):
|
||||
db = Database(tmp_path / "test.db")
|
||||
yield db
|
||||
db.close()
|
||||
|
||||
|
||||
def _now() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def _insert_css(db: Database, item_id: str, blob: dict) -> None:
|
||||
db.upsert("color_strip_sources", item_id, blob.get("name", ""), blob)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Runner mechanics
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_runner_creates_audit_table(tmp_db):
|
||||
MigrationRunner(tmp_db)
|
||||
cursor = tmp_db.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND name='data_migrations'"
|
||||
)
|
||||
assert cursor.fetchone() is not None
|
||||
|
||||
|
||||
def test_runner_records_applied_migration(tmp_db):
|
||||
class _Noop(DataMigration):
|
||||
name = "test_noop"
|
||||
|
||||
def apply(self, conn):
|
||||
return 0
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
records = runner.run([_Noop()])
|
||||
|
||||
assert [r.name for r in records] == ["test_noop"]
|
||||
assert "test_noop" in runner.applied_names()
|
||||
|
||||
|
||||
def test_runner_skips_already_applied_migration(tmp_db):
|
||||
calls = 0
|
||||
|
||||
class _CountedMigration(DataMigration):
|
||||
name = "test_counted"
|
||||
|
||||
def apply(self, conn):
|
||||
nonlocal calls
|
||||
calls += 1
|
||||
return 0
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
runner.run([_CountedMigration()])
|
||||
runner.run([_CountedMigration()])
|
||||
runner.run([_CountedMigration()])
|
||||
|
||||
assert calls == 1
|
||||
|
||||
|
||||
def test_runner_records_zero_row_migration_so_it_does_not_repeat(tmp_db):
|
||||
"""Even when a migration changes 0 rows, it is recorded as applied."""
|
||||
|
||||
class _NoMatch(DataMigration):
|
||||
name = "test_no_match"
|
||||
|
||||
def apply(self, conn):
|
||||
return 0
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
runner.run([_NoMatch()])
|
||||
|
||||
assert "test_no_match" in runner.applied_names()
|
||||
|
||||
|
||||
def test_runner_does_not_record_failed_migration(tmp_db):
|
||||
class _Boom(DataMigration):
|
||||
name = "test_boom"
|
||||
|
||||
def apply(self, conn):
|
||||
raise RuntimeError("kaboom")
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
with pytest.raises(RuntimeError, match="kaboom"):
|
||||
runner.run([_Boom()])
|
||||
|
||||
assert "test_boom" not in runner.applied_names()
|
||||
|
||||
|
||||
def test_runner_rejects_unnamed_migration(tmp_db):
|
||||
class _Unnamed(DataMigration):
|
||||
def apply(self, conn):
|
||||
return 0
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
with pytest.raises(ValueError, match="non-empty name"):
|
||||
runner.run([_Unnamed()])
|
||||
|
||||
|
||||
def test_runner_rolls_back_data_change_when_migration_raises(tmp_db):
|
||||
"""A migration that raises mid-way leaves no partial UPDATEs behind."""
|
||||
blob = {
|
||||
"id": "css_partial",
|
||||
"name": "Partial",
|
||||
"source_type": "static",
|
||||
"color": [10, 20, 30],
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": _now(),
|
||||
"updated_at": _now(),
|
||||
}
|
||||
_insert_css(tmp_db, "css_partial", blob)
|
||||
|
||||
class _PartialBoom(DataMigration):
|
||||
name = "test_partial_boom"
|
||||
|
||||
def apply(self, conn):
|
||||
conn.execute(
|
||||
"UPDATE color_strip_sources SET data = ? WHERE id = ?",
|
||||
(json.dumps({**blob, "source_type": "single_color"}), "css_partial"),
|
||||
)
|
||||
raise RuntimeError("explode after update")
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
with pytest.raises(RuntimeError, match="explode after update"):
|
||||
runner.run([_PartialBoom()])
|
||||
|
||||
rows = tmp_db.load_all("color_strip_sources")
|
||||
assert rows[0]["source_type"] == "static", "UPDATE was not rolled back"
|
||||
assert "test_partial_boom" not in runner.applied_names()
|
||||
|
||||
|
||||
def test_runner_skips_when_writes_are_frozen(tmp_db, monkeypatch):
|
||||
"""Frozen-write databases must not be mutated by the runner."""
|
||||
monkeypatch.setattr("ledgrab.storage.data_migrations.is_writes_frozen", lambda: True)
|
||||
|
||||
calls = 0
|
||||
|
||||
class _Counted(DataMigration):
|
||||
name = "test_frozen"
|
||||
|
||||
def apply(self, conn):
|
||||
nonlocal calls
|
||||
calls += 1
|
||||
return 0
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
records = runner.run([_Counted()])
|
||||
|
||||
assert records == []
|
||||
assert calls == 0
|
||||
assert "test_frozen" not in runner.applied_names()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Legacy static-rename specifics
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_static_rename_only_touches_source_type_field(tmp_db):
|
||||
"""A row whose ``animation.type`` happens to contain ``static`` survives intact."""
|
||||
blob = {
|
||||
"id": "css_safe01",
|
||||
"name": "Has static_wave anim",
|
||||
"source_type": "gradient", # not "static" — should be untouched
|
||||
"animation": {"type": "static_wave", "speed": 1.0}, # substring trap
|
||||
"stops": [{"position": 0.0, "color": [10, 20, 30]}],
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": _now(),
|
||||
"updated_at": _now(),
|
||||
}
|
||||
_insert_css(tmp_db, "css_safe01", blob)
|
||||
|
||||
# Run the migration via the runner (clears the existing audit row first
|
||||
# because the Database fixture already triggered it through any incidental
|
||||
# store creation — here we drive it directly).
|
||||
runner = MigrationRunner(tmp_db)
|
||||
runner.run([StaticToSingleColorMigration()])
|
||||
|
||||
rows = tmp_db.load_all("color_strip_sources")
|
||||
assert len(rows) == 1
|
||||
assert rows[0]["source_type"] == "gradient"
|
||||
assert rows[0]["animation"]["type"] == "static_wave"
|
||||
|
||||
|
||||
def test_static_rename_rewrites_only_legacy_rows(tmp_db):
|
||||
legacy = {
|
||||
"id": "css_legacy01",
|
||||
"name": "Legacy",
|
||||
"source_type": "static",
|
||||
"color": [255, 0, 0],
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": _now(),
|
||||
"updated_at": _now(),
|
||||
}
|
||||
current = {
|
||||
"id": "css_modern01",
|
||||
"name": "Modern",
|
||||
"source_type": "single_color",
|
||||
"color": [0, 128, 255],
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": _now(),
|
||||
"updated_at": _now(),
|
||||
}
|
||||
_insert_css(tmp_db, "css_legacy01", legacy)
|
||||
_insert_css(tmp_db, "css_modern01", current)
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
runner.run([StaticToSingleColorMigration()])
|
||||
|
||||
rows = {r["id"]: r for r in tmp_db.load_all("color_strip_sources")}
|
||||
assert rows["css_legacy01"]["source_type"] == "single_color"
|
||||
assert rows["css_modern01"]["source_type"] == "single_color"
|
||||
|
||||
|
||||
def test_static_rename_survives_corrupt_blob(tmp_db):
|
||||
"""A row whose JSON blob is corrupt does not crash the migration."""
|
||||
# Insert a row with deliberately broken JSON via direct SQL (bypass upsert
|
||||
# which would re-serialize). We use a hand-crafted blob.
|
||||
tmp_db.execute(
|
||||
"INSERT INTO color_strip_sources (id, name, data) VALUES (?, ?, ?)",
|
||||
("css_bad01", "Bad", "{not valid json"),
|
||||
)
|
||||
legacy = {
|
||||
"id": "css_legacy02",
|
||||
"name": "Legacy after corrupt",
|
||||
"source_type": "static",
|
||||
"color": [1, 2, 3],
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": _now(),
|
||||
"updated_at": _now(),
|
||||
}
|
||||
_insert_css(tmp_db, "css_legacy02", legacy)
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
runner.run([StaticToSingleColorMigration()])
|
||||
|
||||
# Legacy row still migrated, corrupt row untouched, no exception
|
||||
cursor = tmp_db.execute("SELECT id, data FROM color_strip_sources")
|
||||
out = {row["id"]: row["data"] for row in cursor.fetchall()}
|
||||
assert out["css_bad01"] == "{not valid json"
|
||||
assert json.loads(out["css_legacy02"])["source_type"] == "single_color"
|
||||
|
||||
|
||||
def test_all_migrations_have_unique_names():
|
||||
names = [m.name for m in ALL_MIGRATIONS]
|
||||
assert len(names) == len(set(names)), f"duplicate names: {names}"
|
||||
|
||||
|
||||
def test_color_strip_store_construction_records_static_migration(tmp_db):
|
||||
"""Integration: constructing the store applies AND records the migration."""
|
||||
from ledgrab.storage.color_strip_source import SingleColorStripSource
|
||||
from ledgrab.storage.color_strip_store import ColorStripStore
|
||||
|
||||
legacy = {
|
||||
"id": "css_int01",
|
||||
"name": "Integration legacy",
|
||||
"source_type": "static",
|
||||
"color": [1, 2, 3],
|
||||
"tags": [],
|
||||
"led_count": 30,
|
||||
"created_at": _now(),
|
||||
"updated_at": _now(),
|
||||
}
|
||||
_insert_css(tmp_db, "css_int01", legacy)
|
||||
|
||||
store = ColorStripStore(tmp_db)
|
||||
|
||||
# In-memory shape is correct
|
||||
assert isinstance(store.get("css_int01"), SingleColorStripSource)
|
||||
# Audit row exists
|
||||
runner = MigrationRunner(tmp_db)
|
||||
assert "001_color_strip_static_to_single_color" in runner.applied_names()
|
||||
Reference in New Issue
Block a user