Files
ledgrab/server/tests/storage/test_data_migrations.py
alexei.dolgolyov 563cbac88c refactor(storage,processing): kind registries + versioned data migrations
Two CRITICAL data-safety bugs from the architecture audit and the two
worst parallel-change problems are fixed in one coherent pass.

Audit findings addressed:

- C2  silent CSS response fallback. The previous _RESPONSE_MAP fell
      through to a fabricated PictureCSSResponse whenever a source
      class lacked an entry; in particular game_event sources were
      silently mis-shaped. Now: GameEventCSSResponse/Create/Update
      schemas exist, _RESPONSE_MAP is re-keyed by source_type string,
      an import-time _assert_response_map_coverage() requires symmetric
      agreement with storage._SOURCE_TYPE_MAP, and the runtime path
      raises instead of fabricating a response.

- C11 string-replace JSON migration. ColorStripStore used
      blob.replace('"source_type": "static"', '"source_type":
      "single_color"') which can corrupt unrelated substrings (e.g.
      an animation type named "static_wave") and provides no audit,
      no transaction, no idempotency. Replaced with
      storage.data_migrations.MigrationRunner backed by a
      data_migrations audit table. Each migration runs inside one
      db.transaction() that covers the applied-check, the apply(),
      and the audit-INSERT — partial failures roll back atomically.
      StaticToSingleColorMigration parses each row with json.loads
      and mutates only the source_type field. Frozen-write databases
      skip with a warning.

- C3+C4 color-strip stream dispatch. The 7-branch elif in
      ColorStripStreamManager.acquire() and the duplicate one in
      ws_stream._create_stream() now share a single STREAM_BUILDERS
      registry in core.processing.color_strip_kinds, keyed by
      source.source_type. Both call sites populate a StreamDeps bag
      and delegate to build_stream(). _assert_stream_kind_coverage()
      asserts at import that STREAM_BUILDERS plus SHARABLE_KINDS
      partitions storage._SOURCE_TYPE_MAP. ws_stream's preview path
      wraps each FastAPI-DI getter in _safe() so non-audio previews
      no longer crash when audio/CSPT stores are not wired.

- C6+C7 value stream dispatch. The 14-branch isinstance ladder in
      ValueStreamManager._create_stream and its silent
      StaticValueStream(value=1.0) fallback are replaced by
      core.processing.value_kinds.STREAM_BUILDERS, keyed by
      source_type string (so AdaptiveValueSource's adaptive_time and
      adaptive_scene route to different builders correctly). The
      manager retains only the SyncClockRuntime pre-acquisition step
      for animated_color (kinds needing this are listed explicitly
      in NEEDS_CLOCK_RUNTIME). Symmetric coverage assertion plus a
      separate assertion that NEEDS_CLOCK_RUNTIME is a subset of the
      registry.

Bundled in: the static->single_color rename plus the HTTPValueStream
/ http_endpoint introduction that were already in flight on this
branch share these files; the registry refactor naturally absorbs
both via the new "single_color" / "static" alias entries and the
_build_http builder.

Tests: 26 new tests cover response-map coverage drift, migration
runner audit-table mechanics + transactional rollback +
frozen-write skip, and the two stream-builder registries. 343
existing storage / API / e2e tests stay green. Ruff clean.
2026-05-22 22:45:28 +03:00

310 lines
9.4 KiB
Python

"""Tests for the versioned data-migration runner.
These verify the safety properties that replaced the string-replace migration:
* migrations parse JSON properly (no substring corruption);
* the runner records each migration name in ``data_migrations`` so subsequent
runs are no-ops;
* a migration that touches no rows still gets recorded as applied;
* a migration that raises does not get recorded (so a fixed migration can be
re-attempted on the next boot);
* the legacy ``static`` rename only rewrites rows whose ``source_type`` field
actually equals ``static`` — values that merely contain the substring (e.g.
an animation type ``static_wave``) are left untouched.
"""
from __future__ import annotations
import json
from datetime import datetime, timezone
import pytest
from ledgrab.storage.data_migrations import (
ALL_MIGRATIONS,
DataMigration,
MigrationRunner,
StaticToSingleColorMigration,
)
from ledgrab.storage.database import Database
@pytest.fixture
def tmp_db(tmp_path):
db = Database(tmp_path / "test.db")
yield db
db.close()
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _insert_css(db: Database, item_id: str, blob: dict) -> None:
db.upsert("color_strip_sources", item_id, blob.get("name", ""), blob)
# ---------------------------------------------------------------------------
# Runner mechanics
# ---------------------------------------------------------------------------
def test_runner_creates_audit_table(tmp_db):
MigrationRunner(tmp_db)
cursor = tmp_db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='data_migrations'"
)
assert cursor.fetchone() is not None
def test_runner_records_applied_migration(tmp_db):
class _Noop(DataMigration):
name = "test_noop"
def apply(self, conn):
return 0
runner = MigrationRunner(tmp_db)
records = runner.run([_Noop()])
assert [r.name for r in records] == ["test_noop"]
assert "test_noop" in runner.applied_names()
def test_runner_skips_already_applied_migration(tmp_db):
calls = 0
class _CountedMigration(DataMigration):
name = "test_counted"
def apply(self, conn):
nonlocal calls
calls += 1
return 0
runner = MigrationRunner(tmp_db)
runner.run([_CountedMigration()])
runner.run([_CountedMigration()])
runner.run([_CountedMigration()])
assert calls == 1
def test_runner_records_zero_row_migration_so_it_does_not_repeat(tmp_db):
"""Even when a migration changes 0 rows, it is recorded as applied."""
class _NoMatch(DataMigration):
name = "test_no_match"
def apply(self, conn):
return 0
runner = MigrationRunner(tmp_db)
runner.run([_NoMatch()])
assert "test_no_match" in runner.applied_names()
def test_runner_does_not_record_failed_migration(tmp_db):
class _Boom(DataMigration):
name = "test_boom"
def apply(self, conn):
raise RuntimeError("kaboom")
runner = MigrationRunner(tmp_db)
with pytest.raises(RuntimeError, match="kaboom"):
runner.run([_Boom()])
assert "test_boom" not in runner.applied_names()
def test_runner_rejects_unnamed_migration(tmp_db):
class _Unnamed(DataMigration):
def apply(self, conn):
return 0
runner = MigrationRunner(tmp_db)
with pytest.raises(ValueError, match="non-empty name"):
runner.run([_Unnamed()])
def test_runner_rolls_back_data_change_when_migration_raises(tmp_db):
"""A migration that raises mid-way leaves no partial UPDATEs behind."""
blob = {
"id": "css_partial",
"name": "Partial",
"source_type": "static",
"color": [10, 20, 30],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
_insert_css(tmp_db, "css_partial", blob)
class _PartialBoom(DataMigration):
name = "test_partial_boom"
def apply(self, conn):
conn.execute(
"UPDATE color_strip_sources SET data = ? WHERE id = ?",
(json.dumps({**blob, "source_type": "single_color"}), "css_partial"),
)
raise RuntimeError("explode after update")
runner = MigrationRunner(tmp_db)
with pytest.raises(RuntimeError, match="explode after update"):
runner.run([_PartialBoom()])
rows = tmp_db.load_all("color_strip_sources")
assert rows[0]["source_type"] == "static", "UPDATE was not rolled back"
assert "test_partial_boom" not in runner.applied_names()
def test_runner_skips_when_writes_are_frozen(tmp_db, monkeypatch):
"""Frozen-write databases must not be mutated by the runner."""
monkeypatch.setattr("ledgrab.storage.data_migrations.is_writes_frozen", lambda: True)
calls = 0
class _Counted(DataMigration):
name = "test_frozen"
def apply(self, conn):
nonlocal calls
calls += 1
return 0
runner = MigrationRunner(tmp_db)
records = runner.run([_Counted()])
assert records == []
assert calls == 0
assert "test_frozen" not in runner.applied_names()
# ---------------------------------------------------------------------------
# Legacy static-rename specifics
# ---------------------------------------------------------------------------
def test_static_rename_only_touches_source_type_field(tmp_db):
"""A row whose ``animation.type`` happens to contain ``static`` survives intact."""
blob = {
"id": "css_safe01",
"name": "Has static_wave anim",
"source_type": "gradient", # not "static" — should be untouched
"animation": {"type": "static_wave", "speed": 1.0}, # substring trap
"stops": [{"position": 0.0, "color": [10, 20, 30]}],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
_insert_css(tmp_db, "css_safe01", blob)
# Run the migration via the runner (clears the existing audit row first
# because the Database fixture already triggered it through any incidental
# store creation — here we drive it directly).
runner = MigrationRunner(tmp_db)
runner.run([StaticToSingleColorMigration()])
rows = tmp_db.load_all("color_strip_sources")
assert len(rows) == 1
assert rows[0]["source_type"] == "gradient"
assert rows[0]["animation"]["type"] == "static_wave"
def test_static_rename_rewrites_only_legacy_rows(tmp_db):
legacy = {
"id": "css_legacy01",
"name": "Legacy",
"source_type": "static",
"color": [255, 0, 0],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
current = {
"id": "css_modern01",
"name": "Modern",
"source_type": "single_color",
"color": [0, 128, 255],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
_insert_css(tmp_db, "css_legacy01", legacy)
_insert_css(tmp_db, "css_modern01", current)
runner = MigrationRunner(tmp_db)
runner.run([StaticToSingleColorMigration()])
rows = {r["id"]: r for r in tmp_db.load_all("color_strip_sources")}
assert rows["css_legacy01"]["source_type"] == "single_color"
assert rows["css_modern01"]["source_type"] == "single_color"
def test_static_rename_survives_corrupt_blob(tmp_db):
"""A row whose JSON blob is corrupt does not crash the migration."""
# Insert a row with deliberately broken JSON via direct SQL (bypass upsert
# which would re-serialize). We use a hand-crafted blob.
tmp_db.execute(
"INSERT INTO color_strip_sources (id, name, data) VALUES (?, ?, ?)",
("css_bad01", "Bad", "{not valid json"),
)
legacy = {
"id": "css_legacy02",
"name": "Legacy after corrupt",
"source_type": "static",
"color": [1, 2, 3],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
_insert_css(tmp_db, "css_legacy02", legacy)
runner = MigrationRunner(tmp_db)
runner.run([StaticToSingleColorMigration()])
# Legacy row still migrated, corrupt row untouched, no exception
cursor = tmp_db.execute("SELECT id, data FROM color_strip_sources")
out = {row["id"]: row["data"] for row in cursor.fetchall()}
assert out["css_bad01"] == "{not valid json"
assert json.loads(out["css_legacy02"])["source_type"] == "single_color"
def test_all_migrations_have_unique_names():
names = [m.name for m in ALL_MIGRATIONS]
assert len(names) == len(set(names)), f"duplicate names: {names}"
def test_color_strip_store_construction_records_static_migration(tmp_db):
"""Integration: constructing the store applies AND records the migration."""
from ledgrab.storage.color_strip_source import SingleColorStripSource
from ledgrab.storage.color_strip_store import ColorStripStore
legacy = {
"id": "css_int01",
"name": "Integration legacy",
"source_type": "static",
"color": [1, 2, 3],
"tags": [],
"led_count": 30,
"created_at": _now(),
"updated_at": _now(),
}
_insert_css(tmp_db, "css_int01", legacy)
store = ColorStripStore(tmp_db)
# In-memory shape is correct
assert isinstance(store.get("css_int01"), SingleColorStripSource)
# Audit row exists
runner = MigrationRunner(tmp_db)
assert "001_color_strip_static_to_single_color" in runner.applied_names()