"""Unit tests for the pure wiring-graph engine (ledgrab.api.graph_schema). These exercise reference extraction, topology building, dependents, cycle and dangling-reference detection without booting the app or any store. """ import json import re from dataclasses import dataclass, field from ledgrab.api.graph_schema import ( CONNECTION_SCHEMA, ENTITY_KINDS, build_topology, detect_cycles, extract_refs, find_dependents, graph_field_roots, is_editable, schema_for_kind, serialize_entity_for_graph, validate_connection, would_create_cycle, ) # ── extract_refs ──────────────────────────────────────────────────────────── def test_extract_refs_top_level(): assert extract_refs({"device_id": "dev_1"}, "device_id") == ["dev_1"] def test_extract_refs_empty_and_missing_are_dropped(): assert extract_refs({"device_id": ""}, "device_id") == [] assert extract_refs({}, "device_id") == [] assert extract_refs({"device_id": None}, "device_id") == [] def test_extract_refs_bindable_bound_vs_unbound(): # Bound bindable serializes as a dict. assert extract_refs( {"brightness": {"value": 1.0, "source_id": "vs_1"}}, "brightness.source_id" ) == ["vs_1"] # Unbound bindable serializes as a plain number → no reference. assert extract_refs({"brightness": 0.5}, "brightness.source_id") == [] # Bound-but-empty source_id → no reference. assert ( extract_refs({"brightness": {"value": 1.0, "source_id": ""}}, "brightness.source_id") == [] ) def test_extract_refs_list_field(): entity = {"layers": [{"source_id": "a"}, {"source_id": ""}, {"other": "x"}]} assert extract_refs(entity, "layers[].source_id") == ["a"] def test_extract_refs_deep_object_then_list(): entity = {"calibration": {"lines": [{"picture_source_id": "p1"}, {"picture_source_id": "p2"}]}} assert extract_refs(entity, "calibration.lines[].picture_source_id") == ["p1", "p2"] def test_extract_refs_nested_object_none_is_safe(): assert extract_refs({"settings": None}, "settings.pattern_template_id") == [] assert extract_refs( {"settings": {"pattern_template_id": "pt_1"}}, "settings.pattern_template_id" ) == ["pt_1"] # ── registry consistency ───────────────────────────────────────────────────── def test_registry_kinds_are_known(): for cf in CONNECTION_SCHEMA: assert cf.target_kind in ENTITY_KINDS, cf.target_kind assert cf.source_kind in ENTITY_KINDS, cf.source_kind def test_registry_has_no_duplicate_target_field_pairs(): pairs = [(cf.target_kind, cf.field) for cf in CONNECTION_SCHEMA] assert len(pairs) == len(set(pairs)), "duplicate (target_kind, field) in CONNECTION_SCHEMA" def test_schema_for_kind_filters_by_referrer(): fields = {cf.field for cf in schema_for_kind("automation")} assert fields == {"scene_preset_id", "deactivation_scene_preset_id"} # ── build_topology ─────────────────────────────────────────────────────────── def _sample_entities(): return { "device": [{"id": "dev_1", "name": "Strip", "device_type": "wled"}], "picture_source": [{"id": "ps_1", "name": "Cap", "stream_type": "raw"}], "color_strip_source": [ {"id": "css_1", "name": "CSS", "source_type": "picture", "picture_source_id": "ps_1"} ], "output_target": [ { "id": "ot_1", "name": "TV", "target_type": "led", "device_id": "dev_1", "color_strip_source_id": "css_1", } ], } def test_build_topology_nodes_and_edges(): topo = build_topology(_sample_entities()) assert {n["id"] for n in topo["nodes"]} == {"dev_1", "ps_1", "css_1", "ot_1"} edge_set = {(e["from"], e["to"], e["field"]) for e in topo["edges"]} assert ("ps_1", "css_1", "picture_source_id") in edge_set assert ("dev_1", "ot_1", "device_id") in edge_set assert ("css_1", "ot_1", "color_strip_source_id") in edge_set assert topo["issues"]["orphans"] == [] assert topo["issues"]["broken_refs"] == [] assert topo["issues"]["cycles"] == [] def test_build_topology_flags_orphan_and_broken_ref(): entities = _sample_entities() entities["sync_clock"] = [{"id": "clk_1", "name": "Lonely"}] # no edges → orphan entities["color_strip_source"][0]["audio_source_id"] = "ghost" # dangling topo = build_topology(entities) assert "clk_1" in topo["issues"]["orphans"] broken = topo["issues"]["broken_refs"] assert {"ref": "ghost", "by": "css_1", "field": "audio_source_id"} in broken # The dangling ref must NOT appear as a real edge. assert all(e["from"] != "ghost" for e in topo["edges"]) def test_build_topology_detects_cycle(): entities = { "value_source": [ {"id": "vs_1", "name": "A", "source_type": "static", "value_source_id": "vs_2"}, {"id": "vs_2", "name": "B", "source_type": "static", "value_source_id": "vs_1"}, ] } topo = build_topology(entities) assert set(topo["issues"]["cycles"]) == {"vs_1", "vs_2"} # ── detect_cycles ──────────────────────────────────────────────────────────── def test_detect_cycles_no_false_positive_on_diamond(): # a→b, a→c, b→d, c→d (DAG) edges = [ {"from": "a", "to": "b"}, {"from": "a", "to": "c"}, {"from": "b", "to": "d"}, {"from": "c", "to": "d"}, ] assert detect_cycles(edges) == set() def test_detect_cycles_marks_only_cycle_members(): # x→a→b→a (a,b in cycle; x is not) edges = [ {"from": "x", "to": "a"}, {"from": "a", "to": "b"}, {"from": "b", "to": "a"}, ] assert detect_cycles(edges) == {"a", "b"} # ── would_create_cycle ─────────────────────────────────────────────────────── def test_would_create_cycle_detects_back_edge_and_self(): edges = [{"from": "a", "to": "b"}] # b depends on a (a→b) # Wiring a to consume b (edge b→a) closes the a→b→a loop. assert would_create_cycle(edges, source_id="b", target_id="a") is True # Wiring an unrelated pair is fine. assert would_create_cycle(edges, source_id="a", target_id="c") is False # Self-reference is always a cycle. assert would_create_cycle(edges, source_id="z", target_id="z") is True # ── find_dependents ────────────────────────────────────────────────────────── def test_find_dependents_returns_referrers(): deps = find_dependents(_sample_entities(), "color_strip_source", "css_1") assert deps == [ {"id": "ot_1", "kind": "output_target", "name": "TV", "field": "color_strip_source_id"} ] def test_find_dependents_empty_when_unreferenced(): assert find_dependents(_sample_entities(), "color_strip_source", "css_unused") == [] # ── validate_connection ────────────────────────────────────────────────────── def test_validate_connection_accepts_valid_edit(): ok, err = validate_connection(_sample_entities(), "output_target", "ot_1", "device_id", "dev_1") assert ok is True assert err is None def test_validate_connection_detach_always_ok(): ok, err = validate_connection(_sample_entities(), "output_target", "ot_1", "device_id", "") assert ok is True and err is None def test_validate_connection_rejects_unknown_field(): ok, err = validate_connection(_sample_entities(), "output_target", "ot_1", "nope_id", "dev_1") assert ok is False assert "Unknown connection field" in err def test_validate_connection_rejects_missing_source(): ok, err = validate_connection(_sample_entities(), "output_target", "ot_1", "device_id", "ghost") assert ok is False assert "not found" in err def test_validate_connection_accepts_bindable_single_field(): # Single-level bindable slots (e.g. brightness) ARE editable — only list # fields are rejected. entities = { "color_strip_source": [ {"id": "css_1", "name": "X", "source_type": "effect", "brightness": 0.5} ], "value_source": [{"id": "vs_1", "name": "V", "source_type": "static"}], } ok, err = validate_connection( entities, "color_strip_source", "css_1", "brightness.source_id", "vs_1" ) assert ok is True assert err is None def test_validate_connection_rejects_list_field(): # List slots can't be cycle-checked without an element index → rejected. entities = { "color_strip_source": [ { "id": "css_1", "name": "X", "source_type": "composite", "layers": [{"source_id": "css_2"}], }, {"id": "css_2", "name": "Y", "source_type": "picture"}, ] } ok, err = validate_connection( entities, "color_strip_source", "css_1", "layers[].source_id", "css_2" ) assert ok is False assert "not editable" in err def test_validate_connection_rejects_color_bindable(): # Colour bindings are structurally bindable but not graph-editable (a value # source can't drive a colour). entities = { "color_strip_source": [ {"id": "css_1", "name": "X", "source_type": "single_color", "color": [255, 0, 0]} ], "value_source": [{"id": "vs_1", "name": "V", "source_type": "static"}], } ok, err = validate_connection( entities, "color_strip_source", "css_1", "color.source_id", "vs_1" ) assert ok is False assert "not editable" in err @dataclass class _FakeRule: token: str = "SUPER_SECRET_WEBHOOK_TOKEN" @dataclass class _FakeAutomation: id: str = "auto_1" name: str = "A" enabled: bool = True scene_preset_id: str = "sp_1" rules: list = field(default_factory=lambda: [_FakeRule()]) # Keys that must NEVER appear in the /graph projection allowlist. _SECRET_KEY_RE = re.compile( r"token|password|secret|credential|api[_-]?key|_key$|username|\burl\b|\bhost\b", re.I ) def test_projection_roots_never_expose_secrets(): # The /graph projection (graph_field_roots) is the leak boundary. Assert no # kind's allowlist contains a secret-bearing key — locks the boundary against # future schema drift (a new reference field whose root looks like a secret). for kind in ENTITY_KINDS: for root in graph_field_roots(kind): assert not _SECRET_KEY_RE.search( root ), f"{kind}: projected root {root!r} looks secret-bearing" def test_serialize_entity_for_graph_drops_secrets(): # The graph projection must strip everything except id/name/type + reference # roots — a deep asdict would otherwise leak the webhook token (a real, # auth-equivalent secret) in the /graph response. projected = serialize_entity_for_graph("automation", _FakeAutomation()) assert projected["id"] == "auto_1" assert projected["name"] == "A" assert projected["scene_preset_id"] == "sp_1" # reference root kept assert "rules" not in projected # non-reference field dropped assert "enabled" not in projected assert "SUPER_SECRET_WEBHOOK_TOKEN" not in json.dumps(projected) def test_is_editable_classifies_fields(): by_field = {(c.target_kind, c.field): c for c in CONNECTION_SCHEMA} # Top-level reference and single-level BindableFloat → editable. assert is_editable(by_field[("output_target", "device_id")]) assert is_editable(by_field[("color_strip_source", "brightness.source_id")]) assert is_editable(by_field[("output_target", "transition.source_id")]) # Colour binding, list slot, and double-nested → NOT editable. assert not is_editable(by_field[("color_strip_source", "color.source_id")]) assert not is_editable(by_field[("color_strip_source", "layers[].source_id")]) assert not is_editable(by_field[("output_target", "settings.brightness.source_id")]) def test_validate_connection_rejects_cycle(): entities = { "value_source": [ {"id": "vs_1", "name": "A", "source_type": "static", "value_source_id": ""}, {"id": "vs_2", "name": "B", "source_type": "static", "value_source_id": "vs_1"}, ] } # vs_2 already depends on vs_1 (edge vs_1→vs_2). Wiring vs_1 to consume vs_2 # would close the loop. ok, err = validate_connection(entities, "value_source", "vs_1", "value_source_id", "vs_2") assert ok is False assert "cycle" in err.lower()