Files
ledgrab/server/tests/api/test_graph_schema.py
alexei.dolgolyov 15cfb821d3 feat(graph): duplicate a selected subgraph server-side
A secret-safe equivalent of blueprint import: the graph editor's overflow menu
gains "Duplicate selection", which deep-clones the selected value and
colour-strip sources server-side (full config preserved, never crossing the
wire) and rewires references that point within the selection — shared
dependencies (devices, HA sources, …) stay shared.

- graph_schema.remap_refs: write-twin of extract_refs (same dot/list/bindable
  grammar) that rewrites only in-selection ids; 8 unit tests.
- BaseSqliteStore.clone(): faithful deep-copy clone (no schema round-trip, so no
  field is lost), prefix-preserving fresh id; reusable by any store.
- POST /api/v1/graph/duplicate: two-pass clone-then-rewire restricted to value /
  colour-strip sources (no inline secrets), with a safety net flagging any
  unremapped reference; 7 integration tests vs real stores.
- Frontend: duplicateSubgraph (+cache invalidation), graphDuplicateSelection
  (reload + reselect the new cluster), overflow-menu item, i18n (en/ru/zh).
2026-05-29 11:45:55 +03:00

397 lines
15 KiB
Python

"""Unit tests for the pure wiring-graph engine (ledgrab.api.graph_schema).
These exercise reference extraction, topology building, dependents, cycle and
dangling-reference detection without booting the app or any store.
"""
import json
import re
from dataclasses import dataclass, field
from ledgrab.api.graph_schema import (
CONNECTION_SCHEMA,
ENTITY_KINDS,
build_topology,
detect_cycles,
extract_refs,
find_dependents,
graph_field_roots,
is_editable,
remap_refs,
schema_for_kind,
serialize_entity_for_graph,
validate_connection,
would_create_cycle,
)
# ── extract_refs ────────────────────────────────────────────────────────────
def test_extract_refs_top_level():
assert extract_refs({"device_id": "dev_1"}, "device_id") == ["dev_1"]
def test_extract_refs_empty_and_missing_are_dropped():
assert extract_refs({"device_id": ""}, "device_id") == []
assert extract_refs({}, "device_id") == []
assert extract_refs({"device_id": None}, "device_id") == []
def test_extract_refs_bindable_bound_vs_unbound():
# Bound bindable serializes as a dict.
assert extract_refs(
{"brightness": {"value": 1.0, "source_id": "vs_1"}}, "brightness.source_id"
) == ["vs_1"]
# Unbound bindable serializes as a plain number → no reference.
assert extract_refs({"brightness": 0.5}, "brightness.source_id") == []
# Bound-but-empty source_id → no reference.
assert (
extract_refs({"brightness": {"value": 1.0, "source_id": ""}}, "brightness.source_id") == []
)
def test_extract_refs_list_field():
entity = {"layers": [{"source_id": "a"}, {"source_id": ""}, {"other": "x"}]}
assert extract_refs(entity, "layers[].source_id") == ["a"]
def test_extract_refs_deep_object_then_list():
entity = {"calibration": {"lines": [{"picture_source_id": "p1"}, {"picture_source_id": "p2"}]}}
assert extract_refs(entity, "calibration.lines[].picture_source_id") == ["p1", "p2"]
def test_extract_refs_nested_object_none_is_safe():
assert extract_refs({"settings": None}, "settings.pattern_template_id") == []
assert extract_refs(
{"settings": {"pattern_template_id": "pt_1"}}, "settings.pattern_template_id"
) == ["pt_1"]
# ── remap_refs (write-twin of extract_refs) ──────────────────────────────────
def test_remap_refs_top_level():
e = {"device_id": "a"}
assert remap_refs(e, "device_id", {"a": "b"}) == 1
assert e["device_id"] == "b"
def test_remap_refs_leaves_unmapped_ids_untouched():
e = {"device_id": "external"}
assert remap_refs(e, "device_id", {"a": "b"}) == 0
assert e["device_id"] == "external" # shared dependency outside the set
def test_remap_refs_bindable_bound():
e = {"brightness": {"value": 1.0, "source_id": "a"}}
assert remap_refs(e, "brightness.source_id", {"a": "b"}) == 1
assert e["brightness"] == {"value": 1.0, "source_id": "b"}
def test_remap_refs_unbound_bindable_is_safe():
e = {"brightness": 0.5} # plain number, no binding
assert remap_refs(e, "brightness.source_id", {"a": "b"}) == 0
assert e["brightness"] == 0.5
def test_remap_refs_list_field_only_mapped_elements():
e = {"layers": [{"source_id": "a"}, {"source_id": "external"}, {"source_id": "c"}]}
assert remap_refs(e, "layers[].source_id", {"a": "b", "c": "d"}) == 2
assert [layer["source_id"] for layer in e["layers"]] == ["b", "external", "d"]
def test_remap_refs_deep_object_then_list():
e = {"calibration": {"lines": [{"picture_source_id": "p1"}, {"picture_source_id": "p2"}]}}
assert remap_refs(e, "calibration.lines[].picture_source_id", {"p1": "q1"}) == 1
assert [ln["picture_source_id"] for ln in e["calibration"]["lines"]] == ["q1", "p2"]
def test_remap_refs_missing_keys_are_safe():
assert remap_refs({}, "layers[].source_id", {"a": "b"}) == 0
assert remap_refs({"layers": None}, "layers[].source_id", {"a": "b"}) == 0
def test_remap_refs_round_trips_with_extract_refs():
e = {"layers": [{"source_id": "a"}, {"source_id": "a"}]}
remap_refs(e, "layers[].source_id", {"a": "b"})
assert extract_refs(e, "layers[].source_id") == ["b", "b"]
# ── registry consistency ─────────────────────────────────────────────────────
def test_registry_kinds_are_known():
for cf in CONNECTION_SCHEMA:
assert cf.target_kind in ENTITY_KINDS, cf.target_kind
assert cf.source_kind in ENTITY_KINDS, cf.source_kind
def test_registry_has_no_duplicate_target_field_pairs():
pairs = [(cf.target_kind, cf.field) for cf in CONNECTION_SCHEMA]
assert len(pairs) == len(set(pairs)), "duplicate (target_kind, field) in CONNECTION_SCHEMA"
def test_schema_for_kind_filters_by_referrer():
fields = {cf.field for cf in schema_for_kind("automation")}
assert fields == {"scene_preset_id", "deactivation_scene_preset_id"}
# ── build_topology ───────────────────────────────────────────────────────────
def _sample_entities():
return {
"device": [{"id": "dev_1", "name": "Strip", "device_type": "wled"}],
"picture_source": [{"id": "ps_1", "name": "Cap", "stream_type": "raw"}],
"color_strip_source": [
{"id": "css_1", "name": "CSS", "source_type": "picture", "picture_source_id": "ps_1"}
],
"output_target": [
{
"id": "ot_1",
"name": "TV",
"target_type": "led",
"device_id": "dev_1",
"color_strip_source_id": "css_1",
}
],
}
def test_build_topology_nodes_and_edges():
topo = build_topology(_sample_entities())
assert {n["id"] for n in topo["nodes"]} == {"dev_1", "ps_1", "css_1", "ot_1"}
edge_set = {(e["from"], e["to"], e["field"]) for e in topo["edges"]}
assert ("ps_1", "css_1", "picture_source_id") in edge_set
assert ("dev_1", "ot_1", "device_id") in edge_set
assert ("css_1", "ot_1", "color_strip_source_id") in edge_set
assert topo["issues"]["orphans"] == []
assert topo["issues"]["broken_refs"] == []
assert topo["issues"]["cycles"] == []
def test_build_topology_flags_orphan_and_broken_ref():
entities = _sample_entities()
entities["sync_clock"] = [{"id": "clk_1", "name": "Lonely"}] # no edges → orphan
entities["color_strip_source"][0]["audio_source_id"] = "ghost" # dangling
topo = build_topology(entities)
assert "clk_1" in topo["issues"]["orphans"]
broken = topo["issues"]["broken_refs"]
assert {"ref": "ghost", "by": "css_1", "field": "audio_source_id"} in broken
# The dangling ref must NOT appear as a real edge.
assert all(e["from"] != "ghost" for e in topo["edges"])
def test_build_topology_detects_cycle():
entities = {
"value_source": [
{"id": "vs_1", "name": "A", "source_type": "static", "value_source_id": "vs_2"},
{"id": "vs_2", "name": "B", "source_type": "static", "value_source_id": "vs_1"},
]
}
topo = build_topology(entities)
assert set(topo["issues"]["cycles"]) == {"vs_1", "vs_2"}
# ── detect_cycles ────────────────────────────────────────────────────────────
def test_detect_cycles_no_false_positive_on_diamond():
# a→b, a→c, b→d, c→d (DAG)
edges = [
{"from": "a", "to": "b"},
{"from": "a", "to": "c"},
{"from": "b", "to": "d"},
{"from": "c", "to": "d"},
]
assert detect_cycles(edges) == set()
def test_detect_cycles_marks_only_cycle_members():
# x→a→b→a (a,b in cycle; x is not)
edges = [
{"from": "x", "to": "a"},
{"from": "a", "to": "b"},
{"from": "b", "to": "a"},
]
assert detect_cycles(edges) == {"a", "b"}
# ── would_create_cycle ───────────────────────────────────────────────────────
def test_would_create_cycle_detects_back_edge_and_self():
edges = [{"from": "a", "to": "b"}] # b depends on a (a→b)
# Wiring a to consume b (edge b→a) closes the a→b→a loop.
assert would_create_cycle(edges, source_id="b", target_id="a") is True
# Wiring an unrelated pair is fine.
assert would_create_cycle(edges, source_id="a", target_id="c") is False
# Self-reference is always a cycle.
assert would_create_cycle(edges, source_id="z", target_id="z") is True
# ── find_dependents ──────────────────────────────────────────────────────────
def test_find_dependents_returns_referrers():
deps = find_dependents(_sample_entities(), "color_strip_source", "css_1")
assert deps == [
{"id": "ot_1", "kind": "output_target", "name": "TV", "field": "color_strip_source_id"}
]
def test_find_dependents_empty_when_unreferenced():
assert find_dependents(_sample_entities(), "color_strip_source", "css_unused") == []
# ── validate_connection ──────────────────────────────────────────────────────
def test_validate_connection_accepts_valid_edit():
ok, err = validate_connection(_sample_entities(), "output_target", "ot_1", "device_id", "dev_1")
assert ok is True
assert err is None
def test_validate_connection_detach_always_ok():
ok, err = validate_connection(_sample_entities(), "output_target", "ot_1", "device_id", "")
assert ok is True and err is None
def test_validate_connection_rejects_unknown_field():
ok, err = validate_connection(_sample_entities(), "output_target", "ot_1", "nope_id", "dev_1")
assert ok is False
assert "Unknown connection field" in err
def test_validate_connection_rejects_missing_source():
ok, err = validate_connection(_sample_entities(), "output_target", "ot_1", "device_id", "ghost")
assert ok is False
assert "not found" in err
def test_validate_connection_accepts_bindable_single_field():
# Single-level bindable slots (e.g. brightness) ARE editable — only list
# fields are rejected.
entities = {
"color_strip_source": [
{"id": "css_1", "name": "X", "source_type": "effect", "brightness": 0.5}
],
"value_source": [{"id": "vs_1", "name": "V", "source_type": "static"}],
}
ok, err = validate_connection(
entities, "color_strip_source", "css_1", "brightness.source_id", "vs_1"
)
assert ok is True
assert err is None
def test_validate_connection_rejects_list_field():
# List slots can't be cycle-checked without an element index → rejected.
entities = {
"color_strip_source": [
{
"id": "css_1",
"name": "X",
"source_type": "composite",
"layers": [{"source_id": "css_2"}],
},
{"id": "css_2", "name": "Y", "source_type": "picture"},
]
}
ok, err = validate_connection(
entities, "color_strip_source", "css_1", "layers[].source_id", "css_2"
)
assert ok is False
assert "not editable" in err
def test_validate_connection_rejects_color_bindable():
# Colour bindings are structurally bindable but not graph-editable (a value
# source can't drive a colour).
entities = {
"color_strip_source": [
{"id": "css_1", "name": "X", "source_type": "single_color", "color": [255, 0, 0]}
],
"value_source": [{"id": "vs_1", "name": "V", "source_type": "static"}],
}
ok, err = validate_connection(
entities, "color_strip_source", "css_1", "color.source_id", "vs_1"
)
assert ok is False
assert "not editable" in err
@dataclass
class _FakeRule:
token: str = "SUPER_SECRET_WEBHOOK_TOKEN"
@dataclass
class _FakeAutomation:
id: str = "auto_1"
name: str = "A"
enabled: bool = True
scene_preset_id: str = "sp_1"
rules: list = field(default_factory=lambda: [_FakeRule()])
# Keys that must NEVER appear in the /graph projection allowlist.
_SECRET_KEY_RE = re.compile(
r"token|password|secret|credential|api[_-]?key|_key$|username|\burl\b|\bhost\b", re.I
)
def test_projection_roots_never_expose_secrets():
# The /graph projection (graph_field_roots) is the leak boundary. Assert no
# kind's allowlist contains a secret-bearing key — locks the boundary against
# future schema drift (a new reference field whose root looks like a secret).
for kind in ENTITY_KINDS:
for root in graph_field_roots(kind):
assert not _SECRET_KEY_RE.search(
root
), f"{kind}: projected root {root!r} looks secret-bearing"
def test_serialize_entity_for_graph_drops_secrets():
# The graph projection must strip everything except id/name/type + reference
# roots — a deep asdict would otherwise leak the webhook token (a real,
# auth-equivalent secret) in the /graph response.
projected = serialize_entity_for_graph("automation", _FakeAutomation())
assert projected["id"] == "auto_1"
assert projected["name"] == "A"
assert projected["scene_preset_id"] == "sp_1" # reference root kept
assert "rules" not in projected # non-reference field dropped
assert "enabled" not in projected
assert "SUPER_SECRET_WEBHOOK_TOKEN" not in json.dumps(projected)
def test_is_editable_classifies_fields():
by_field = {(c.target_kind, c.field): c for c in CONNECTION_SCHEMA}
# Top-level reference and single-level BindableFloat → editable.
assert is_editable(by_field[("output_target", "device_id")])
assert is_editable(by_field[("color_strip_source", "brightness.source_id")])
assert is_editable(by_field[("output_target", "transition.source_id")])
# Colour binding, list slot, and double-nested → NOT editable.
assert not is_editable(by_field[("color_strip_source", "color.source_id")])
assert not is_editable(by_field[("color_strip_source", "layers[].source_id")])
assert not is_editable(by_field[("output_target", "settings.brightness.source_id")])
def test_validate_connection_rejects_cycle():
entities = {
"value_source": [
{"id": "vs_1", "name": "A", "source_type": "static", "value_source_id": ""},
{"id": "vs_2", "name": "B", "source_type": "static", "value_source_id": "vs_1"},
]
}
# vs_2 already depends on vs_1 (edge vs_1→vs_2). Wiring vs_1 to consume vs_2
# would close the loop.
ok, err = validate_connection(entities, "value_source", "vs_1", "value_source_id", "vs_2")
assert ok is False
assert "cycle" in err.lower()