feat(scenes): scene playlists with timed auto-cycling

Add ordered, timed sequences of scene presets that auto-cycle — activating
each preset and holding it for its dwell duration before advancing.

Backend:
- ScenePlaylist / PlaylistItem models + SQLite store (new scene_playlists table)
- PlaylistEngine: cycles ONE playlist at a time (starting one stops any other),
  loop/shuffle, re-reads the playlist each cycle so edits/deletes apply at the
  boundary, skips missing presets, guards against busy-loops; reuses the shared
  apply_scene_state path used by scene presets and automations
- REST API: CRUD + /start, /stop, /state with scene-preset reference validation
- Constructed in the app lifespan with a bounded stop on shutdown

Frontend:
- New "Playlists" sub-tab in the Automations tab with start/stop controls and a
  running indicator; editor modal with ordered scene rows (reorder + per-item
  duration), loop/shuffle toggles, and tags
- Live refresh via the playlist_state_changed WebSocket event
- i18n in en/ru/zh

Tests: new unit + API coverage for the store/model, engine (cycling,
single-active exclusivity, missing-preset skip, shuffle, and the
playlist_state_changed event contract), and routes. Full suite green;
ruff and tsc clean.
This commit is contained in:
2026-06-08 13:48:43 +03:00
parent ca59546711
commit f71e10ee06
27 changed files with 2739 additions and 6 deletions
@@ -0,0 +1,296 @@
"""Tests for scene-playlist CRUD + cycling-control routes.
The PlaylistEngine is replaced with a lightweight fake so the route layer is
tested without driving the real asyncio cycling loop.
"""
from datetime import datetime, timezone
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from ledgrab.api import dependencies as deps
from ledgrab.api.routes.scene_playlists import router
from ledgrab.core.scenes.playlist_engine import PlaylistError
from ledgrab.storage.scene_playlist_store import ScenePlaylistStore
from ledgrab.storage.scene_preset import ScenePreset
from ledgrab.storage.scene_preset_store import ScenePresetStore
class FakePlaylistEngine:
"""Minimal stand-in matching the methods the routes call."""
def __init__(self, store):
self._store = store
self._running_id = None
self.start_calls = []
self.stop_calls = 0
def get_running_playlist_id(self):
return self._running_id
def get_state(self):
if self._running_id is None:
return {
"is_running": False,
"playlist_id": None,
"playlist_name": None,
"current_index": 0,
"item_count": 0,
"current_preset_id": None,
"started_at": None,
"step_started_at": None,
"step_duration": 0.0,
}
return {
"is_running": True,
"playlist_id": self._running_id,
"playlist_name": "running",
"current_index": 0,
"item_count": 1,
"current_preset_id": "scene_a",
"started_at": datetime.now(timezone.utc).isoformat(),
"step_started_at": datetime.now(timezone.utc).isoformat(),
"step_duration": 30.0,
}
async def start_playlist(self, playlist_id):
self.start_calls.append(playlist_id)
pl = self._store.get_playlist(playlist_id)
if not pl.items:
raise PlaylistError("empty")
self._running_id = playlist_id
async def stop(self):
self.stop_calls += 1
self._running_id = None
async def stop_if_running(self, playlist_id):
if self._running_id == playlist_id:
self._running_id = None
@pytest.fixture
def _route_db(tmp_path):
from ledgrab.storage.database import Database
db = Database(tmp_path / "test.db")
yield db
db.close()
@pytest.fixture
def preset_store(_route_db) -> ScenePresetStore:
store = ScenePresetStore(_route_db)
now = datetime.now(timezone.utc)
for sid, name in [("scene_a", "A"), ("scene_b", "B")]:
store.create_preset(
ScenePreset(id=sid, name=name, targets=[], created_at=now, updated_at=now)
)
return store
@pytest.fixture
def playlist_store(_route_db) -> ScenePlaylistStore:
return ScenePlaylistStore(_route_db)
@pytest.fixture
def fake_engine(playlist_store):
return FakePlaylistEngine(playlist_store)
@pytest.fixture
def client(playlist_store, preset_store, fake_engine):
app = FastAPI()
app.include_router(router)
from ledgrab.api.auth import verify_api_key
app.dependency_overrides[verify_api_key] = lambda: "test-user"
app.dependency_overrides[deps.get_scene_playlist_store] = lambda: playlist_store
app.dependency_overrides[deps.get_scene_preset_store] = lambda: preset_store
app.dependency_overrides[deps.get_playlist_engine] = lambda: fake_engine
# Routes fire entity events through the processor manager; give it a stub.
deps._deps["processor_manager"] = None
return TestClient(app, raise_server_exceptions=False)
def _create(client, name="P1", items=None, **extra):
body = {"name": name, "items": items if items is not None else [], **extra}
return client.post("/api/v1/scene-playlists", json=body)
# ---------------------------------------------------------------------------
# CRUD
# ---------------------------------------------------------------------------
class TestCreate:
def test_create_minimal(self, client):
resp = _create(client, "Empty OK")
assert resp.status_code == 201
data = resp.json()
assert data["name"] == "Empty OK"
assert data["loop"] is True
assert data["is_running"] is False
assert data["id"].startswith("playlist_")
def test_create_with_items(self, client):
resp = _create(
client,
"With items",
items=[
{"scene_preset_id": "scene_a", "duration_seconds": 15},
{"scene_preset_id": "scene_b", "duration_seconds": 45},
],
loop=False,
shuffle=True,
)
assert resp.status_code == 201
data = resp.json()
assert len(data["items"]) == 2
assert data["loop"] is False
assert data["shuffle"] is True
def test_create_rejects_unknown_preset(self, client):
resp = _create(
client, "Bad ref", items=[{"scene_preset_id": "ghost", "duration_seconds": 10}]
)
assert resp.status_code == 400
assert "ghost" in resp.json()["detail"]
def test_create_rejects_below_min_duration(self, client):
resp = _create(
client, "Too fast", items=[{"scene_preset_id": "scene_a", "duration_seconds": 0}]
)
# Pydantic ge=MIN validation → 422
assert resp.status_code == 422
def test_create_duplicate_name(self, client):
_create(client, "Dup")
resp = _create(client, "Dup")
assert resp.status_code == 400
class TestList:
def test_list_empty_includes_idle_state(self, client):
resp = client.get("/api/v1/scene-playlists")
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 0
assert data["state"]["is_running"] is False
def test_list_marks_running(self, client, fake_engine):
pid = _create(
client, "Runner", items=[{"scene_preset_id": "scene_a", "duration_seconds": 10}]
).json()["id"]
fake_engine._running_id = pid
data = client.get("/api/v1/scene-playlists").json()
assert data["count"] == 1
assert data["playlists"][0]["is_running"] is True
assert data["state"]["is_running"] is True
class TestGet:
def test_get_existing(self, client):
pid = _create(client, "Find me").json()["id"]
resp = client.get(f"/api/v1/scene-playlists/{pid}")
assert resp.status_code == 200
assert resp.json()["name"] == "Find me"
def test_get_missing(self, client):
resp = client.get("/api/v1/scene-playlists/nope")
assert resp.status_code == 404
def test_state_route_not_shadowed_by_id(self, client):
# The literal /state path must resolve to the state endpoint.
resp = client.get("/api/v1/scene-playlists/state")
assert resp.status_code == 200
assert "is_running" in resp.json()
class TestUpdate:
def test_update_fields(self, client):
pid = _create(client, "Edit").json()["id"]
resp = client.put(
f"/api/v1/scene-playlists/{pid}",
json={
"name": "Edited",
"loop": False,
"items": [{"scene_preset_id": "scene_b", "duration_seconds": 20}],
},
)
assert resp.status_code == 200
data = resp.json()
assert data["name"] == "Edited"
assert data["loop"] is False
assert data["items"][0]["scene_preset_id"] == "scene_b"
def test_update_rejects_unknown_preset(self, client):
pid = _create(client, "Edit2").json()["id"]
resp = client.put(
f"/api/v1/scene-playlists/{pid}",
json={"items": [{"scene_preset_id": "ghost", "duration_seconds": 10}]},
)
assert resp.status_code == 400
def test_update_missing(self, client):
resp = client.put("/api/v1/scene-playlists/nope", json={"name": "x"})
assert resp.status_code == 404
class TestDelete:
def test_delete(self, client):
pid = _create(client, "Goner").json()["id"]
resp = client.delete(f"/api/v1/scene-playlists/{pid}")
assert resp.status_code == 204
assert client.get(f"/api/v1/scene-playlists/{pid}").status_code == 404
def test_delete_stops_if_running(self, client, fake_engine):
pid = _create(
client, "Running goner", items=[{"scene_preset_id": "scene_a", "duration_seconds": 10}]
).json()["id"]
fake_engine._running_id = pid
client.delete(f"/api/v1/scene-playlists/{pid}")
assert fake_engine.get_running_playlist_id() is None
def test_delete_missing(self, client):
assert client.delete("/api/v1/scene-playlists/nope").status_code == 404
# ---------------------------------------------------------------------------
# Cycling control
# ---------------------------------------------------------------------------
class TestControl:
def test_start(self, client, fake_engine):
pid = _create(
client, "Go", items=[{"scene_preset_id": "scene_a", "duration_seconds": 10}]
).json()["id"]
resp = client.post(f"/api/v1/scene-playlists/{pid}/start")
assert resp.status_code == 200
assert resp.json()["is_running"] is True
assert fake_engine.start_calls == [pid]
def test_start_missing_playlist(self, client):
resp = client.post("/api/v1/scene-playlists/nope/start")
assert resp.status_code == 404
def test_start_empty_playlist_400(self, client):
pid = _create(client, "EmptyGo").json()["id"]
resp = client.post(f"/api/v1/scene-playlists/{pid}/start")
assert resp.status_code == 400
def test_stop(self, client, fake_engine):
pid = _create(
client, "Stoppable", items=[{"scene_preset_id": "scene_a", "duration_seconds": 10}]
).json()["id"]
fake_engine._running_id = pid
resp = client.post("/api/v1/scene-playlists/stop")
assert resp.status_code == 200
assert resp.json()["is_running"] is False
assert fake_engine.stop_calls == 1
+317
View File
@@ -0,0 +1,317 @@
"""Tests for PlaylistEngine — the scene-playlist auto-cycling runtime.
The engine dwells on each scene for ``MIN_DURATION_SECONDS`` (1s) at minimum,
so these tests patch ``asyncio.sleep`` to a tiny real delay (``fast_sleep``)
to keep the cycling deterministic and fast. A captured ``_REAL_SLEEP`` is used
for the test's own real-time waits so they aren't shortened by the patch.
"""
import asyncio
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
import pytest
from ledgrab.core.scenes.playlist_engine import PlaylistEngine, PlaylistError
from ledgrab.storage.scene_playlist import PlaylistItem, ScenePlaylist
from ledgrab.storage.scene_playlist_store import ScenePlaylistStore
from ledgrab.storage.scene_preset import ScenePreset
from ledgrab.storage.scene_preset_store import ScenePresetStore
_REAL_SLEEP = asyncio.sleep
@pytest.fixture
def fast_sleep():
"""Patch the engine's dwell sleep to a tiny real delay."""
async def _fast(_duration):
await _REAL_SLEEP(0.005)
with patch("asyncio.sleep", _fast):
yield
@pytest.fixture
def preset_store(tmp_db) -> ScenePresetStore:
store = ScenePresetStore(tmp_db)
for sid, name in [("scene_a", "A"), ("scene_b", "B"), ("scene_c", "C")]:
now = datetime.now(timezone.utc)
store.create_preset(
ScenePreset(id=sid, name=name, targets=[], created_at=now, updated_at=now)
)
return store
@pytest.fixture
def playlist_store(tmp_db) -> ScenePlaylistStore:
return ScenePlaylistStore(tmp_db)
@pytest.fixture
def applied():
"""A list that records the preset ids the engine applies, in order."""
return []
@pytest.fixture
def engine(playlist_store, preset_store, applied):
manager = MagicMock()
manager.fire_event = MagicMock()
target_store = MagicMock()
eng = PlaylistEngine(
playlist_store=playlist_store,
scene_preset_store=preset_store,
target_store=target_store,
processor_manager=manager,
)
async def _fake_apply(preset, _ts, _mgr):
applied.append(preset.id)
return ("activated", [])
# apply_scene_state is imported lazily inside the engine, so patch it at
# its definition site with a plain async function (unambiguous awaiting).
patcher = patch(
"ledgrab.core.scenes.scene_activator.apply_scene_state",
new=_fake_apply,
)
patcher.start()
eng._apply_patcher = patcher # keep a handle for teardown
yield eng
patcher.stop()
def _make_playlist(store, name, item_specs, **kwargs) -> ScenePlaylist:
import uuid
items = [PlaylistItem(sid, dur) for sid, dur in item_specs]
pl = ScenePlaylist(
id=f"playlist_{uuid.uuid4().hex[:8]}",
name=name,
items=items,
order=store.count(),
**kwargs,
)
return store.create_playlist(pl)
async def _drain(engine, timeout=2.0):
"""Await the engine's current cycling task (for non-loop playlists)."""
task = engine._task
if task is not None:
await asyncio.wait_for(asyncio.shield(task), timeout=timeout)
# ---------------------------------------------------------------------------
# Start validation
# ---------------------------------------------------------------------------
class TestStartValidation:
async def test_start_unknown_raises(self, engine):
with pytest.raises(PlaylistError):
await engine.start_playlist("missing")
async def test_start_empty_playlist_raises(self, engine, playlist_store):
pl = _make_playlist(playlist_store, "Empty", [])
with pytest.raises(PlaylistError):
await engine.start_playlist(pl.id)
assert engine.is_running() is False
async def test_initial_state_set_on_start(self, engine, playlist_store, fast_sleep):
pl = _make_playlist(playlist_store, "Loopy", [("scene_a", 50), ("scene_b", 50)], loop=True)
state = await engine.start_playlist(pl.id)
try:
assert state.current_index == 0
assert state.current_preset_id == "scene_a"
s = engine.get_state()
assert s["is_running"] is True
assert s["playlist_id"] == pl.id
assert s["item_count"] == 2
finally:
await engine.stop()
# ---------------------------------------------------------------------------
# Cycling behaviour
# ---------------------------------------------------------------------------
class TestCycling:
async def test_non_loop_applies_all_in_order_then_idle(
self, engine, playlist_store, applied, fast_sleep
):
pl = _make_playlist(
playlist_store,
"Once",
[("scene_a", 50), ("scene_b", 50), ("scene_c", 50)],
loop=False,
)
await engine.start_playlist(pl.id)
await _drain(engine)
assert applied == ["scene_a", "scene_b", "scene_c"]
assert engine.is_running() is False
assert engine.get_state()["is_running"] is False
async def test_loop_keeps_cycling_until_stopped(
self, engine, playlist_store, applied, fast_sleep
):
pl = _make_playlist(
playlist_store, "Forever", [("scene_a", 50), ("scene_b", 50)], loop=True
)
await engine.start_playlist(pl.id)
await _REAL_SLEEP(0.05)
assert engine.is_running() is True
await engine.stop()
assert engine.is_running() is False
# Looped at least past the first pass.
assert len(applied) >= 3
assert applied[0] == "scene_a"
async def test_missing_preset_is_skipped(self, engine, playlist_store, applied, fast_sleep):
pl = _make_playlist(
playlist_store,
"Mixed",
[("scene_a", 50), ("ghost", 50), ("scene_b", 50)],
loop=False,
)
await engine.start_playlist(pl.id)
await _drain(engine)
assert applied == ["scene_a", "scene_b"]
async def test_all_missing_with_loop_stops(self, engine, playlist_store, applied):
pl = _make_playlist(playlist_store, "Dead", [("ghost1", 50), ("ghost2", 50)], loop=True)
await engine.start_playlist(pl.id)
await _drain(engine) # guard should break the loop immediately
assert applied == []
assert engine.is_running() is False
async def test_shuffle_uses_random_order(self, engine, playlist_store, applied, fast_sleep):
pl = _make_playlist(
playlist_store,
"Shuffled",
[("scene_a", 50), ("scene_b", 50), ("scene_c", 50)],
loop=False,
shuffle=True,
)
def _reverse(seq):
seq.reverse()
with patch("ledgrab.core.scenes.playlist_engine.random.shuffle", side_effect=_reverse):
await engine.start_playlist(pl.id)
await _drain(engine)
assert applied == ["scene_c", "scene_b", "scene_a"]
# ---------------------------------------------------------------------------
# Single-playlist exclusivity + stop helpers
# ---------------------------------------------------------------------------
class TestExclusivityAndStop:
async def test_starting_second_playlist_replaces_first(
self, engine, playlist_store, fast_sleep
):
a = _make_playlist(playlist_store, "A", [("scene_a", 50)], loop=True)
b = _make_playlist(playlist_store, "B", [("scene_b", 50)], loop=True)
await engine.start_playlist(a.id)
await engine.start_playlist(b.id)
try:
assert engine.get_running_playlist_id() == b.id
finally:
await engine.stop()
async def test_stop_when_idle_is_noop(self, engine):
await engine.stop() # should not raise
assert engine.is_running() is False
async def test_stop_if_running_only_matching(self, engine, playlist_store, fast_sleep):
a = _make_playlist(playlist_store, "A", [("scene_a", 50)], loop=True)
await engine.start_playlist(a.id)
await engine.stop_if_running("some-other-id")
assert engine.is_running() is True
await engine.stop_if_running(a.id)
assert engine.is_running() is False
async def test_get_state_idle_shape(self, engine):
s = engine.get_state()
assert s["is_running"] is False
assert s["playlist_id"] is None
assert s["current_index"] == 0
# ---------------------------------------------------------------------------
# Event firing — the playlist_state_changed contract the frontend WS layer
# (events-ws.ts allowlist + scene-playlists.ts listener) depends on. A dropped
# event here would silently freeze the UI's running indicator, yet the
# is_running()/ordering assertions above would stay green — so assert the
# fire_event payloads directly.
# ---------------------------------------------------------------------------
def _fired_events(engine) -> list:
"""All payloads passed to processor_manager.fire_event, in order."""
return [c.args[0] for c in engine._manager.fire_event.call_args_list]
def _fired_actions(engine) -> list:
return [e.get("action") for e in _fired_events(engine)]
class TestEvents:
async def test_start_fires_started_event(self, engine, playlist_store, fast_sleep):
pl = _make_playlist(playlist_store, "Started", [("scene_a", 50)], loop=True)
await engine.start_playlist(pl.id)
try:
started = [e for e in _fired_events(engine) if e.get("action") == "started"]
assert len(started) == 1
assert started[0]["type"] == "playlist_state_changed"
assert started[0]["playlist_id"] == pl.id
finally:
await engine.stop()
async def test_non_loop_completion_fires_final_stopped(
self, engine, playlist_store, fast_sleep
):
pl = _make_playlist(
playlist_store, "Finishes", [("scene_a", 50), ("scene_b", 50)], loop=False
)
await engine.start_playlist(pl.id)
await _drain(engine)
actions = _fired_actions(engine)
assert actions[0] == "started"
assert actions[-1] == "stopped"
# The natural-completion 'stopped' must carry the playlist id even
# though _run clears _state to None before firing (ended_id capture).
stopped = [e for e in _fired_events(engine) if e.get("action") == "stopped"]
assert stopped and stopped[-1]["playlist_id"] == pl.id
async def test_advanced_fires_per_applied_item_only(self, engine, playlist_store, fast_sleep):
pl = _make_playlist(
playlist_store,
"Mixed",
[("scene_a", 50), ("ghost", 50), ("scene_b", 50)],
loop=False,
)
await engine.start_playlist(pl.id)
await _drain(engine)
advanced = [e for e in _fired_events(engine) if e.get("action") == "advanced"]
# Only the two real presets advance; the missing one fires nothing.
assert [e["preset_id"] for e in advanced] == ["scene_a", "scene_b"]
assert [e["index"] for e in advanced] == [0, 2]
async def test_explicit_stop_fires_stopped(self, engine, playlist_store, fast_sleep):
pl = _make_playlist(playlist_store, "Stopper", [("scene_a", 50)], loop=True)
await engine.start_playlist(pl.id)
await engine.stop()
stopped = [e for e in _fired_events(engine) if e.get("action") == "stopped"]
assert stopped and stopped[-1]["playlist_id"] == pl.id
async def test_stop_when_idle_fires_nothing(self, engine):
await engine.stop()
assert _fired_actions(engine) == []
@@ -0,0 +1,189 @@
"""Tests for ScenePlaylist model + ScenePlaylistStore."""
import pytest
from ledgrab.storage.base_store import EntityNotFoundError
from ledgrab.storage.scene_playlist import (
DEFAULT_DURATION_SECONDS,
MAX_DURATION_SECONDS,
MIN_DURATION_SECONDS,
PlaylistItem,
ScenePlaylist,
clamp_duration,
)
from ledgrab.storage.scene_playlist_store import ScenePlaylistStore
@pytest.fixture
def store(tmp_db) -> ScenePlaylistStore:
return ScenePlaylistStore(tmp_db)
def _make_playlist(store: ScenePlaylistStore, name="My Playlist", **kwargs) -> ScenePlaylist:
import uuid
items = kwargs.pop("items", [PlaylistItem("scene_aaa", 10.0)])
pl = ScenePlaylist(
id=f"playlist_{uuid.uuid4().hex[:8]}",
name=name,
items=items,
order=store.count(),
**kwargs,
)
return store.create_playlist(pl)
# ---------------------------------------------------------------------------
# clamp_duration
# ---------------------------------------------------------------------------
class TestClampDuration:
def test_within_range_unchanged(self):
assert clamp_duration(42.5) == 42.5
def test_below_floor_clamped(self):
assert clamp_duration(0) == MIN_DURATION_SECONDS
assert clamp_duration(-5) == MIN_DURATION_SECONDS
def test_above_ceiling_clamped(self):
assert clamp_duration(MAX_DURATION_SECONDS + 1000) == MAX_DURATION_SECONDS
def test_non_numeric_returns_default(self):
assert clamp_duration("oops") == DEFAULT_DURATION_SECONDS
assert clamp_duration(None) == DEFAULT_DURATION_SECONDS
# ---------------------------------------------------------------------------
# Model serialization round-trip
# ---------------------------------------------------------------------------
class TestModelSerialization:
def test_round_trip_preserves_fields(self):
pl = ScenePlaylist(
id="playlist_1",
name="Cycle",
description="desc",
items=[PlaylistItem("a", 5.0), PlaylistItem("b", 15.0)],
loop=False,
shuffle=True,
tags=["movie"],
icon="play",
icon_color="#fff",
order=3,
)
restored = ScenePlaylist.from_dict(pl.to_dict())
assert restored.id == "playlist_1"
assert restored.name == "Cycle"
assert restored.description == "desc"
assert restored.loop is False
assert restored.shuffle is True
assert restored.tags == ["movie"]
assert restored.icon == "play"
assert restored.icon_color == "#fff"
assert restored.order == 3
assert [(i.scene_preset_id, i.duration_seconds) for i in restored.items] == [
("a", 5.0),
("b", 15.0),
]
def test_from_dict_clamps_bad_duration(self):
data = {
"id": "playlist_x",
"name": "X",
"items": [{"scene_preset_id": "a", "duration_seconds": 0}],
}
restored = ScenePlaylist.from_dict(data)
assert restored.items[0].duration_seconds == MIN_DURATION_SECONDS
def test_to_dict_omits_empty_icon(self):
pl = ScenePlaylist(id="p", name="n")
d = pl.to_dict()
assert "icon" not in d
assert "icon_color" not in d
assert d["loop"] is True
assert d["shuffle"] is False
# ---------------------------------------------------------------------------
# Store CRUD
# ---------------------------------------------------------------------------
class TestStoreCrud:
def test_create_and_get(self, store):
pl = _make_playlist(store, "First")
fetched = store.get_playlist(pl.id)
assert fetched.name == "First"
def test_create_duplicate_name_rejected(self, store):
_make_playlist(store, "Dup")
with pytest.raises(ValueError):
_make_playlist(store, "Dup")
def test_create_empty_name_rejected(self, store):
with pytest.raises(ValueError):
_make_playlist(store, " ")
def test_get_missing_raises(self, store):
with pytest.raises(EntityNotFoundError):
store.get_playlist("nope")
def test_get_all_sorted_by_order(self, store):
_make_playlist(store, "A") # order 0
_make_playlist(store, "B") # order 1
_make_playlist(store, "C") # order 2
names = [p.name for p in store.get_all_playlists()]
assert names == ["A", "B", "C"]
def test_update_fields(self, store):
pl = _make_playlist(store, "Edit me")
updated = store.update_playlist(
pl.id,
name="Edited",
loop=False,
shuffle=True,
items=[PlaylistItem("x", 20.0)],
tags=["t"],
)
assert updated.name == "Edited"
assert updated.loop is False
assert updated.shuffle is True
assert updated.items[0].scene_preset_id == "x"
assert updated.tags == ["t"]
def test_update_duplicate_name_rejected(self, store):
_make_playlist(store, "Taken")
pl = _make_playlist(store, "Other")
with pytest.raises(ValueError):
store.update_playlist(pl.id, name="Taken")
def test_update_missing_raises(self, store):
with pytest.raises(EntityNotFoundError):
store.update_playlist("nope", name="x")
def test_delete(self, store):
pl = _make_playlist(store, "Goner")
store.delete_playlist(pl.id)
assert store.count() == 0
with pytest.raises(EntityNotFoundError):
store.get_playlist(pl.id)
def test_persistence_across_reload(self, tmp_db):
store1 = ScenePlaylistStore(tmp_db)
_make_playlist(store1, "Persisted", items=[PlaylistItem("s1", 12.0)])
# New store instance over the same DB reloads from SQLite.
store2 = ScenePlaylistStore(tmp_db)
all_pls = store2.get_all_playlists()
assert len(all_pls) == 1
assert all_pls[0].name == "Persisted"
assert all_pls[0].items[0].duration_seconds == 12.0
def test_clone_supported(self, store):
pl = _make_playlist(store, "Original")
clone = store.clone(pl.id, "Copy")
assert clone.name == "Copy"
assert clone.id != pl.id
assert clone.id.startswith("playlist_")
assert store.count() == 2