feat: aggregated snapshot + wiring-graph APIs, MQTT device brokers

Backend
- snapshot: GET /api/v1/snapshot aggregates targets, devices, sources,
  presets and system into one payload for the HA coordinator, collapsing
  the prior ~2N+M request fan-out; per-section ?include= gating.
- graph: GET /api/v1/graph{,/schema,/dependents} backed by a pure,
  unit-tested graph_schema engine — one authoritative connectable-field
  registry so the editor no longer hard-codes topology in two places.
- devices: thread mqtt_source_id through DeviceCreate/Update/Response and
  the routes for multi-broker MQTT; shared validate_mqtt_source_exists
  (_mqtt_validation.py) reused by device + output-target routes; stop
  update_device masking intentional 4xx as 500.
- shutdown: bound uvicorn graceful-shutdown via GRACEFUL_SHUTDOWN_TIMEOUT
  (shared by __main__, android_entry, demo) so a lingering events WebSocket
  can't strand LED targets or block process exit.
- access log: structured _access_log middleware attributing each request to
  its authenticated token label (never the secret); uvicorn access_log off.

Frontend
- graph editor: generic schema-driven port/edge rendering, layout and
  connection handling; service-worker refresh.
- device modals: MQTT broker EntitySelect for device_type=mqtt in add-device
  and settings, wired into load/save/validate/dirty-check/clone.
- i18n: en/ru/zh keys.

Tests: graph routes + schema, snapshot routes, access log, mqtt_source_id
device regressions, bounded-shutdown entrypoint. 1614 passed.
This commit is contained in:
2026-05-28 22:51:04 +03:00
parent b83a72e63f
commit a5effba553
37 changed files with 3068 additions and 145 deletions
+103 -1
View File
@@ -47,6 +47,13 @@ def output_target_store(_route_db):
return OutputTargetStore(_route_db)
@pytest.fixture
def mqtt_source_store(_route_db):
from ledgrab.storage.mqtt_source_store import MQTTSourceStore
return MQTTSourceStore(_route_db)
@pytest.fixture
def processor_manager():
"""A mock ProcessorManager — avoids real hardware."""
@@ -60,7 +67,7 @@ def processor_manager():
@pytest.fixture
def client(device_store, output_target_store, processor_manager):
def client(device_store, output_target_store, processor_manager, mqtt_source_store):
app = _make_app()
# Override auth to always pass
@@ -72,6 +79,7 @@ def client(device_store, output_target_store, processor_manager):
app.dependency_overrides[deps.get_device_store] = lambda: device_store
app.dependency_overrides[deps.get_output_target_store] = lambda: output_target_store
app.dependency_overrides[deps.get_processor_manager] = lambda: processor_manager
app.dependency_overrides[deps.get_mqtt_store] = lambda: mqtt_source_store
return TestClient(app, raise_server_exceptions=False)
@@ -428,6 +436,100 @@ class TestWLEDSchemeInference:
assert device_store.get_device(existing.id).url == "http://10.0.0.5"
class TestMqttSourceId:
"""Regression coverage for the device ``mqtt_source_id`` field.
The store + ``device_config.MQTTConfig`` already carried the field, but
the API schema/route layer dropped it (DeviceCreate/Update/Response never
declared it, and the route never threaded it). These pin the create +
update round-trip and the referenced-source validation so it can't
silently regress.
"""
@pytest.fixture
def _stub_mqtt_validate(self, monkeypatch):
async def fake_validate(self, url): # noqa: ARG001 — provider self
return {"led_count": 60}
from ledgrab.core.devices.mqtt_provider import MQTTDeviceProvider
monkeypatch.setattr(MQTTDeviceProvider, "validate_device", fake_validate)
return fake_validate
def test_create_mqtt_device_persists_source_id(
self, client, device_store, mqtt_source_store, _stub_mqtt_validate
):
src = mqtt_source_store.create_source(name="Broker A", broker_host="192.168.1.10")
resp = client.post(
"/api/v1/devices",
json={
"name": "Living Room MQTT",
"device_type": "mqtt",
"url": "mqtt://ledgrab/device/living-room",
"led_count": 60,
"mqtt_source_id": src.id,
},
)
assert resp.status_code == 201, resp.text
body = resp.json()
assert body["mqtt_source_id"] == src.id
assert device_store.get_device(body["id"]).mqtt_source_id == src.id
def test_create_mqtt_device_rejects_unknown_source(self, client, _stub_mqtt_validate):
resp = client.post(
"/api/v1/devices",
json={
"name": "Bad Broker Ref",
"device_type": "mqtt",
"url": "mqtt://ledgrab/device/x",
"led_count": 60,
"mqtt_source_id": "mqs_doesnotexist",
},
)
assert resp.status_code == 422, resp.text
assert "not found" in resp.json()["detail"]
def test_update_device_sets_mqtt_source_id(self, client, device_store, mqtt_source_store):
src = mqtt_source_store.create_source(name="Broker B", broker_host="10.0.0.2")
dev = device_store.create_device(
name="MQTT dev",
url="mqtt://ledgrab/device/a",
led_count=10,
device_type="mqtt",
)
resp = client.put(f"/api/v1/devices/{dev.id}", json={"mqtt_source_id": src.id})
assert resp.status_code == 200, resp.text
assert resp.json()["mqtt_source_id"] == src.id
assert device_store.get_device(dev.id).mqtt_source_id == src.id
def test_update_device_can_clear_mqtt_source(self, client, device_store, mqtt_source_store):
"""An empty string unsets the broker (back to 'first available'). The
store's None-means-skip rule means '' is a real value that persists."""
src = mqtt_source_store.create_source(name="Broker C", broker_host="10.0.0.3")
dev = device_store.create_device(
name="MQTT dev3",
url="mqtt://ledgrab/device/c",
led_count=10,
device_type="mqtt",
mqtt_source_id=src.id,
)
resp = client.put(f"/api/v1/devices/{dev.id}", json={"mqtt_source_id": ""})
assert resp.status_code == 200, resp.text
assert resp.json()["mqtt_source_id"] == ""
assert device_store.get_device(dev.id).mqtt_source_id == ""
def test_update_device_rejects_unknown_mqtt_source(self, client, device_store):
dev = device_store.create_device(
name="MQTT dev2",
url="mqtt://ledgrab/device/b",
led_count=10,
device_type="mqtt",
)
resp = client.put(f"/api/v1/devices/{dev.id}", json={"mqtt_source_id": "mqs_nope"})
assert resp.status_code == 422, resp.text
assert "not found" in resp.json()["detail"]
class TestPairThenCreateFlow:
"""End-to-end coverage: pair, then persist; assert the token is
encrypted at rest and decrypted in to_config(), and that the API
@@ -0,0 +1,143 @@
"""Endpoint tests for the wiring-graph router (/api/v1/graph*).
The graph router resolves stores via the ``dependencies`` getters *directly*
(not FastAPI ``Depends``), so these tests populate the ``deps._deps`` registry
rather than using ``app.dependency_overrides``. Auth stays real (conftest pins a
test API key) so the rejection path is covered too.
"""
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from ledgrab.api import dependencies as deps
from ledgrab.api.routes.graph import router
_AUTH = {"Authorization": "Bearer test-api-key-12345"}
class _Ent:
"""Minimal stand-in for a storage model exposing ``to_dict``."""
def __init__(self, data: dict):
self._data = data
def to_dict(self) -> dict:
return self._data
def _store(*entities: dict):
class _S:
def get_all(self):
return [_Ent(e) for e in entities]
return _S()
@pytest.fixture
def client(test_config, monkeypatch):
import ledgrab.config as config_mod
monkeypatch.setattr(config_mod, "config", test_config)
# Populate only the kinds under test; the rest resolve to RuntimeError and
# are gracefully treated as empty by the route's _gather_entities.
monkeypatch.setattr(
deps,
"_deps",
{
"device_store": _store({"id": "dev_1", "name": "Strip", "device_type": "wled"}),
"picture_source_store": _store({"id": "ps_1", "name": "Cap", "stream_type": "raw"}),
"color_strip_store": _store(
{
"id": "css_1",
"name": "CSS",
"source_type": "picture",
"picture_source_id": "ps_1",
}
),
"output_target_store": _store(
{
"id": "ot_1",
"name": "TV",
"target_type": "led",
"device_id": "dev_1",
"color_strip_source_id": "css_1",
}
),
},
)
app = FastAPI()
app.include_router(router)
return TestClient(app, raise_server_exceptions=False)
def test_schema_endpoint_returns_registry(client):
resp = client.get("/api/v1/graph/schema", headers=_AUTH)
assert resp.status_code == 200
data = resp.json()
assert "device" in data["kinds"]
assert any(
c["target_kind"] == "output_target" and c["field"] == "device_id"
for c in data["connections"]
)
# Bindable + nested flags must be carried through.
assert any(c["bindable"] and c["nested"] for c in data["connections"])
def test_schema_endpoint_requires_auth(client):
assert client.get("/api/v1/graph/schema").status_code in (401, 403)
def test_graph_endpoint_builds_topology(client):
data = client.get("/api/v1/graph", headers=_AUTH).json()
assert {n["id"] for n in data["nodes"]} == {"dev_1", "ps_1", "css_1", "ot_1"}
edges = {(e["from"], e["to"]) for e in data["edges"]}
assert ("dev_1", "ot_1") in edges
assert ("css_1", "ot_1") in edges
assert ("ps_1", "css_1") in edges
assert data["issues"]["broken_refs"] == []
def test_dependents_endpoint(client):
data = client.get("/api/v1/graph/dependents/color_strip_source/css_1", headers=_AUTH).json()
assert data["dependents"] == [
{"id": "ot_1", "kind": "output_target", "name": "TV", "field": "color_strip_source_id"}
]
def test_dependents_endpoint_rejects_unknown_kind(client):
resp = client.get("/api/v1/graph/dependents/bogus/x", headers=_AUTH)
assert resp.status_code == 404
def test_validate_connection_endpoint_accepts_valid(client):
resp = client.post(
"/api/v1/graph/validate-connection",
json={
"target_kind": "output_target",
"target_id": "ot_1",
"field": "color_strip_source_id",
"source_id": "css_1",
},
headers=_AUTH,
)
assert resp.status_code == 200
assert resp.json() == {"ok": True, "error": None}
def test_validate_connection_endpoint_rejects_missing_source(client):
resp = client.post(
"/api/v1/graph/validate-connection",
json={
"target_kind": "output_target",
"target_id": "ot_1",
"field": "color_strip_source_id",
"source_id": "ghost",
},
headers=_AUTH,
)
data = resp.json()
assert data["ok"] is False
assert "not found" in data["error"]
@@ -0,0 +1,225 @@
"""Tests for the aggregated /api/v1/snapshot endpoint.
The snapshot collapses the integration's per-target/per-device poll fan-out
into one response. These tests build a minimal app with the snapshot router and
override the store/manager getters, mirroring tests/api/routes/test_devices_routes.py.
Auth is left real (the conftest patches a test API key) so the rejection path is
also covered. System metrics + health run for real — they read module-level
providers and the patched config, no lifespan needed.
"""
import types
from unittest.mock import AsyncMock, MagicMock
import pytest
from fastapi import FastAPI, HTTPException
from fastapi.testclient import TestClient
from ledgrab.api import dependencies as deps
from ledgrab.api.routes import devices as devices_mod
from ledgrab.api.routes.devices import resolve_device_brightness
from ledgrab.api.routes.snapshot import SNAPSHOT_SECTIONS, _resolve_sections, router
from ledgrab.core.processing.processor_manager import ProcessorManager
from ledgrab.core.update.update_service import UpdateService
_AUTH = {"Authorization": "Bearer test-api-key-12345"}
_TOP_LEVEL_KEYS = (
"targets",
"target_states",
"target_metrics",
"devices",
"device_brightness",
"css_sources",
"value_sources",
"scene_presets",
"sync_clocks",
"system",
)
@pytest.fixture
def client(test_config, monkeypatch):
# Pin the global config (with its test API key) so auth is deterministic
# regardless of test ordering — other suites mutate the config singleton.
import ledgrab.config as config_mod
monkeypatch.setattr(config_mod, "config", test_config)
target_store = MagicMock()
target_store.get_all_targets.return_value = []
device_store = MagicMock()
device_store.get_all_devices.return_value = []
css_store = MagicMock()
css_store.get_all_sources.return_value = []
value_store = MagicMock()
value_store.get_all_sources.return_value = []
preset_store = MagicMock()
preset_store.get_all_presets.return_value = []
clock_store = MagicMock()
clock_store.get_all_clocks.return_value = []
clock_manager = MagicMock()
manager = MagicMock(spec=ProcessorManager)
manager.get_all_target_states.return_value = {}
manager.get_all_target_metrics.return_value = {}
update_service = MagicMock(spec=UpdateService)
update_service.get_status.return_value = {"has_update": False, "current_version": "test"}
app = FastAPI()
app.include_router(router)
app.dependency_overrides[deps.get_output_target_store] = lambda: target_store
app.dependency_overrides[deps.get_device_store] = lambda: device_store
app.dependency_overrides[deps.get_color_strip_store] = lambda: css_store
app.dependency_overrides[deps.get_value_source_store] = lambda: value_store
app.dependency_overrides[deps.get_scene_preset_store] = lambda: preset_store
app.dependency_overrides[deps.get_sync_clock_store] = lambda: clock_store
app.dependency_overrides[deps.get_sync_clock_manager] = lambda: clock_manager
app.dependency_overrides[deps.get_processor_manager] = lambda: manager
app.dependency_overrides[deps.get_update_service] = lambda: update_service
return TestClient(app, raise_server_exceptions=False)
def test_snapshot_returns_all_sections(client):
resp = client.get("/api/v1/snapshot", headers=_AUTH)
assert resp.status_code == 200
data = resp.json()
for key in _TOP_LEVEL_KEYS:
assert key in data, f"snapshot missing top-level key: {key}"
for list_key in (
"targets",
"devices",
"css_sources",
"value_sources",
"scene_presets",
"sync_clocks",
):
assert data[list_key] == []
for dict_key in ("target_states", "target_metrics", "device_brightness"):
assert data[dict_key] == {}
def test_snapshot_system_block_has_health_version(client):
data = client.get("/api/v1/snapshot", headers=_AUTH).json()
system = data["system"]
assert {"performance", "health", "update"}.issubset(system)
# health drives the coordinator's version + boot-time derivation
assert system["health"]["version"]
assert "uptime_seconds" in system["health"]
def test_snapshot_requires_auth(client):
resp = client.get("/api/v1/snapshot")
assert resp.status_code in (401, 403)
def test_snapshot_include_filters_to_requested_sections(client):
resp = client.get("/api/v1/snapshot", params={"include": "devices,system"}, headers=_AUTH)
assert resp.status_code == 200
# Only requested sections are present — excluded ones are omitted entirely.
assert set(resp.json().keys()) == {"devices", "system"}
def test_snapshot_include_rejects_unknown_section(client):
resp = client.get("/api/v1/snapshot", params={"include": "devices,bogus"}, headers=_AUTH)
assert resp.status_code == 422
assert "bogus" in resp.json()["detail"]
# ---------------------------------------------------------------------------
# _resolve_sections — query-param parsing edge cases
# ---------------------------------------------------------------------------
def test_resolve_sections_defaults_to_all_when_empty():
assert _resolve_sections(None) == frozenset(SNAPSHOT_SECTIONS)
assert _resolve_sections("") == frozenset(SNAPSHOT_SECTIONS)
def test_resolve_sections_strips_whitespace_and_dedupes():
assert _resolve_sections("devices, system ,devices") == frozenset({"devices", "system"})
def test_resolve_sections_ignores_empty_segments():
assert _resolve_sections("devices,,system,") == frozenset({"devices", "system"})
def test_resolve_sections_is_case_sensitive():
with pytest.raises(HTTPException) as exc:
_resolve_sections("Devices")
assert exc.value.status_code == 422
# ---------------------------------------------------------------------------
# resolve_device_brightness — cached / cold-fetch / graceful-degrade paths
# ---------------------------------------------------------------------------
def _fake_device(**kw):
return types.SimpleNamespace(
id=kw.get("id", "d1"),
device_type=kw.get("device_type", "wled"),
url=kw.get("url", "http://x"),
software_brightness=kw.get("software_brightness", 42),
)
async def test_brightness_none_without_capability(monkeypatch):
monkeypatch.setattr(devices_mod, "get_device_capabilities", lambda _dt: set())
manager = MagicMock()
assert await resolve_device_brightness(_fake_device(), manager) is None
manager.find_device_state.assert_not_called()
async def test_brightness_returns_cached(monkeypatch):
monkeypatch.setattr(devices_mod, "get_device_capabilities", lambda _dt: {"brightness_control"})
manager = MagicMock()
manager.find_device_state.return_value = types.SimpleNamespace(hardware_brightness=128)
assert await resolve_device_brightness(_fake_device(), manager) == 128
async def test_brightness_active_fetch_and_caches_on_cold_cache(monkeypatch):
monkeypatch.setattr(devices_mod, "get_device_capabilities", lambda _dt: {"brightness_control"})
provider = MagicMock()
provider.get_brightness = AsyncMock(return_value=200)
monkeypatch.setattr(devices_mod, "get_provider", lambda _dt: provider)
ds = types.SimpleNamespace(hardware_brightness=None)
manager = MagicMock()
manager.find_device_state.return_value = ds
assert await resolve_device_brightness(_fake_device(), manager) == 200
assert ds.hardware_brightness == 200 # cached so the next poll is I/O-free
async def test_brightness_degrades_to_none_on_provider_error(monkeypatch):
monkeypatch.setattr(devices_mod, "get_device_capabilities", lambda _dt: {"brightness_control"})
provider = MagicMock()
provider.get_brightness = AsyncMock(side_effect=OSError("unreachable"))
monkeypatch.setattr(devices_mod, "get_provider", lambda _dt: provider)
manager = MagicMock()
manager.find_device_state.return_value = types.SimpleNamespace(hardware_brightness=None)
# A single unreachable device must not raise — it degrades to None.
assert await resolve_device_brightness(_fake_device(), manager) is None
async def test_brightness_falls_back_to_software_when_unsupported(monkeypatch):
monkeypatch.setattr(devices_mod, "get_device_capabilities", lambda _dt: {"brightness_control"})
provider = MagicMock()
provider.get_brightness = AsyncMock(side_effect=NotImplementedError)
monkeypatch.setattr(devices_mod, "get_provider", lambda _dt: provider)
manager = MagicMock()
manager.find_device_state.return_value = types.SimpleNamespace(hardware_brightness=None)
assert await resolve_device_brightness(_fake_device(software_brightness=42), manager) == 42
+266
View File
@@ -0,0 +1,266 @@
"""Unit tests for the pure wiring-graph engine (ledgrab.api.graph_schema).
These exercise reference extraction, topology building, dependents, cycle and
dangling-reference detection without booting the app or any store.
"""
from ledgrab.api.graph_schema import (
CONNECTION_SCHEMA,
ENTITY_KINDS,
build_topology,
detect_cycles,
extract_refs,
find_dependents,
schema_for_kind,
validate_connection,
would_create_cycle,
)
# ── extract_refs ────────────────────────────────────────────────────────────
def test_extract_refs_top_level():
assert extract_refs({"device_id": "dev_1"}, "device_id") == ["dev_1"]
def test_extract_refs_empty_and_missing_are_dropped():
assert extract_refs({"device_id": ""}, "device_id") == []
assert extract_refs({}, "device_id") == []
assert extract_refs({"device_id": None}, "device_id") == []
def test_extract_refs_bindable_bound_vs_unbound():
# Bound bindable serializes as a dict.
assert extract_refs(
{"brightness": {"value": 1.0, "source_id": "vs_1"}}, "brightness.source_id"
) == ["vs_1"]
# Unbound bindable serializes as a plain number → no reference.
assert extract_refs({"brightness": 0.5}, "brightness.source_id") == []
# Bound-but-empty source_id → no reference.
assert (
extract_refs({"brightness": {"value": 1.0, "source_id": ""}}, "brightness.source_id") == []
)
def test_extract_refs_list_field():
entity = {"layers": [{"source_id": "a"}, {"source_id": ""}, {"other": "x"}]}
assert extract_refs(entity, "layers[].source_id") == ["a"]
def test_extract_refs_deep_object_then_list():
entity = {"calibration": {"lines": [{"picture_source_id": "p1"}, {"picture_source_id": "p2"}]}}
assert extract_refs(entity, "calibration.lines[].picture_source_id") == ["p1", "p2"]
def test_extract_refs_nested_object_none_is_safe():
assert extract_refs({"settings": None}, "settings.pattern_template_id") == []
assert extract_refs(
{"settings": {"pattern_template_id": "pt_1"}}, "settings.pattern_template_id"
) == ["pt_1"]
# ── registry consistency ─────────────────────────────────────────────────────
def test_registry_kinds_are_known():
for cf in CONNECTION_SCHEMA:
assert cf.target_kind in ENTITY_KINDS, cf.target_kind
assert cf.source_kind in ENTITY_KINDS, cf.source_kind
def test_registry_has_no_duplicate_target_field_pairs():
pairs = [(cf.target_kind, cf.field) for cf in CONNECTION_SCHEMA]
assert len(pairs) == len(set(pairs)), "duplicate (target_kind, field) in CONNECTION_SCHEMA"
def test_schema_for_kind_filters_by_referrer():
fields = {cf.field for cf in schema_for_kind("automation")}
assert fields == {"scene_preset_id", "deactivation_scene_preset_id"}
# ── build_topology ───────────────────────────────────────────────────────────
def _sample_entities():
return {
"device": [{"id": "dev_1", "name": "Strip", "device_type": "wled"}],
"picture_source": [{"id": "ps_1", "name": "Cap", "stream_type": "raw"}],
"color_strip_source": [
{"id": "css_1", "name": "CSS", "source_type": "picture", "picture_source_id": "ps_1"}
],
"output_target": [
{
"id": "ot_1",
"name": "TV",
"target_type": "led",
"device_id": "dev_1",
"color_strip_source_id": "css_1",
}
],
}
def test_build_topology_nodes_and_edges():
topo = build_topology(_sample_entities())
assert {n["id"] for n in topo["nodes"]} == {"dev_1", "ps_1", "css_1", "ot_1"}
edge_set = {(e["from"], e["to"], e["field"]) for e in topo["edges"]}
assert ("ps_1", "css_1", "picture_source_id") in edge_set
assert ("dev_1", "ot_1", "device_id") in edge_set
assert ("css_1", "ot_1", "color_strip_source_id") in edge_set
assert topo["issues"]["orphans"] == []
assert topo["issues"]["broken_refs"] == []
assert topo["issues"]["cycles"] == []
def test_build_topology_flags_orphan_and_broken_ref():
entities = _sample_entities()
entities["sync_clock"] = [{"id": "clk_1", "name": "Lonely"}] # no edges → orphan
entities["color_strip_source"][0]["audio_source_id"] = "ghost" # dangling
topo = build_topology(entities)
assert "clk_1" in topo["issues"]["orphans"]
broken = topo["issues"]["broken_refs"]
assert {"ref": "ghost", "by": "css_1", "field": "audio_source_id"} in broken
# The dangling ref must NOT appear as a real edge.
assert all(e["from"] != "ghost" for e in topo["edges"])
def test_build_topology_detects_cycle():
entities = {
"value_source": [
{"id": "vs_1", "name": "A", "source_type": "static", "value_source_id": "vs_2"},
{"id": "vs_2", "name": "B", "source_type": "static", "value_source_id": "vs_1"},
]
}
topo = build_topology(entities)
assert set(topo["issues"]["cycles"]) == {"vs_1", "vs_2"}
# ── detect_cycles ────────────────────────────────────────────────────────────
def test_detect_cycles_no_false_positive_on_diamond():
# a→b, a→c, b→d, c→d (DAG)
edges = [
{"from": "a", "to": "b"},
{"from": "a", "to": "c"},
{"from": "b", "to": "d"},
{"from": "c", "to": "d"},
]
assert detect_cycles(edges) == set()
def test_detect_cycles_marks_only_cycle_members():
# x→a→b→a (a,b in cycle; x is not)
edges = [
{"from": "x", "to": "a"},
{"from": "a", "to": "b"},
{"from": "b", "to": "a"},
]
assert detect_cycles(edges) == {"a", "b"}
# ── would_create_cycle ───────────────────────────────────────────────────────
def test_would_create_cycle_detects_back_edge_and_self():
edges = [{"from": "a", "to": "b"}] # b depends on a (a→b)
# Wiring a to consume b (edge b→a) closes the a→b→a loop.
assert would_create_cycle(edges, source_id="b", target_id="a") is True
# Wiring an unrelated pair is fine.
assert would_create_cycle(edges, source_id="a", target_id="c") is False
# Self-reference is always a cycle.
assert would_create_cycle(edges, source_id="z", target_id="z") is True
# ── find_dependents ──────────────────────────────────────────────────────────
def test_find_dependents_returns_referrers():
deps = find_dependents(_sample_entities(), "color_strip_source", "css_1")
assert deps == [
{"id": "ot_1", "kind": "output_target", "name": "TV", "field": "color_strip_source_id"}
]
def test_find_dependents_empty_when_unreferenced():
assert find_dependents(_sample_entities(), "color_strip_source", "css_unused") == []
# ── validate_connection ──────────────────────────────────────────────────────
def test_validate_connection_accepts_valid_edit():
ok, err = validate_connection(_sample_entities(), "output_target", "ot_1", "device_id", "dev_1")
assert ok is True
assert err is None
def test_validate_connection_detach_always_ok():
ok, err = validate_connection(_sample_entities(), "output_target", "ot_1", "device_id", "")
assert ok is True and err is None
def test_validate_connection_rejects_unknown_field():
ok, err = validate_connection(_sample_entities(), "output_target", "ot_1", "nope_id", "dev_1")
assert ok is False
assert "Unknown connection field" in err
def test_validate_connection_rejects_missing_source():
ok, err = validate_connection(_sample_entities(), "output_target", "ot_1", "device_id", "ghost")
assert ok is False
assert "not found" in err
def test_validate_connection_accepts_bindable_single_field():
# Single-level bindable slots (e.g. brightness) ARE editable — only list
# fields are rejected.
entities = {
"color_strip_source": [
{"id": "css_1", "name": "X", "source_type": "effect", "brightness": 0.5}
],
"value_source": [{"id": "vs_1", "name": "V", "source_type": "static"}],
}
ok, err = validate_connection(
entities, "color_strip_source", "css_1", "brightness.source_id", "vs_1"
)
assert ok is True
assert err is None
def test_validate_connection_rejects_list_field():
# List slots can't be cycle-checked without an element index → rejected.
entities = {
"color_strip_source": [
{
"id": "css_1",
"name": "X",
"source_type": "composite",
"layers": [{"source_id": "css_2"}],
},
{"id": "css_2", "name": "Y", "source_type": "picture"},
]
}
ok, err = validate_connection(
entities, "color_strip_source", "css_1", "layers[].source_id", "css_2"
)
assert ok is False
assert "List connection" in err
def test_validate_connection_rejects_cycle():
entities = {
"value_source": [
{"id": "vs_1", "name": "A", "source_type": "static", "value_source_id": ""},
{"id": "vs_2", "name": "B", "source_type": "static", "value_source_id": "vs_1"},
]
}
# vs_2 already depends on vs_1 (edge vs_1→vs_2). Wiring vs_1 to consume vs_2
# would close the loop.
ok, err = validate_connection(entities, "value_source", "vs_1", "value_source_id", "vs_2")
assert ok is False
assert "cycle" in err.lower()