feat: support nesting for composite color strip sources
Lint & Test / test (push) Successful in 2m17s

Allow composite sources to reference other composite/mapped sources as
layers. Adds cycle detection (via transitive dependency graph walk),
depth limiting (MAX_COMPOSITE_DEPTH=4), and a runtime safety net in the
stream manager. Frontend layer dropdown now shows all source types
except the source being edited.

17 new tests covering cycles, depth limits, and valid nesting — all
715 tests passing.
This commit is contained in:
2026-04-12 20:41:15 +03:00
parent 4940007e54
commit cc9900d801
7 changed files with 348 additions and 30 deletions
+27 -21
View File
@@ -1,28 +1,34 @@
# Group Device Type Implementation
# Composite Nesting Support
## Phase 1: Storage Layer
- [x] Add `group_device_ids`, `group_mode` fields to Device model
- [x] Add cycle detection + led_count resolution + group reference helpers to DeviceStore
## Phase 1: Store — Cycle & Depth Validation
## Phase 2: API Schemas
- [x] Add group fields to DeviceCreate, DeviceUpdate, DeviceResponse
- [x] Add `get_transitive_dependencies()` to ColorStripStore
- [x] Add `validate_nesting()` with cycle detection + depth limit (MAX_DEPTH=4)
## Phase 3: GroupLEDClient + Provider
- [x] Create `group_client.py` — GroupLEDClient (sequence slice / independent resample)
- [x] Create `group_provider.py` — GroupDeviceProvider
- [x] Register group provider in `led_client.py`
## Phase 2: API — Validation in Create/Update
## Phase 4: Routes + Processing Pipeline
- [x] Update device routes — group-specific create/update logic, delete protection, cycle validation
- [x] Add group fields to DeviceInfo + _DEVICE_FIELD_DEFAULTS
- [x] Pass group context (device_store, group fields) to create_led_client
- [x] Call `validate_nesting()` in create handler for composite sources
- [x] Call `validate_nesting()` in update handler for composite sources
## Phase 3: Runtime — Depth Guard in Stream
- [x] Add `depth` parameter to CompositeColorStripStream
- [x] Pass depth through ColorStripStreamManager.acquire()
- [x] Cap depth at runtime to prevent runaway nesting
## Phase 4: Frontend — Allow Composites in Layer Dropdown
- [x] Remove `source_type !== 'composite'` filter (keep self-exclusion)
- [x] Update docstring in composite_stream.py
## Phase 5: Tests
- [x] Unit tests for cycle detection, led_count resolution, GroupLEDClient (20 tests, all passing)
## Phase 6: Frontend
- [x] Group device UI (child picker, mode selector, hide URL for groups)
- [x] i18n keys (en, ru, zh)
- [x] TypeScript types + API helper
- [x] Icon (layers) for group device type
- [x] CSS for group child rows
- [x] Cycle detection tests (A→B→A, self-reference)
- [x] Depth limit tests (chain exceeding MAX_DEPTH)
- [x] Valid nesting tests (A→B both composite, no cycle)
## Phase 6: Lint & Build
- [x] Ruff check passes
- [x] TypeScript build passes
- [x] All existing tests pass (715/715)
@@ -366,6 +366,20 @@ async def create_color_strip_source(
"""Create a new color strip source."""
try:
kwargs = _extract_css_kwargs(data)
# Validate nesting for composite/mapped sources before creating
if data.source_type == "composite" and kwargs.get("layers"):
child_ids = [ly.get("source_id", "") for ly in kwargs["layers"] if ly.get("source_id")]
# No parent_id yet (new source), just check depth
from wled_controller.storage.color_strip_store import MAX_COMPOSITE_DEPTH
for cid in child_ids:
depth = store.get_nesting_depth(cid)
if 1 + depth > MAX_COMPOSITE_DEPTH:
raise ValueError(
f"Nesting depth {1 + depth} exceeds maximum of {MAX_COMPOSITE_DEPTH}"
)
source = store.create_source(source_type=data.source_type, **kwargs)
fire_entity_event("color_strip_source", "created", source.id)
return _css_to_response(source)
@@ -414,6 +428,12 @@ async def update_color_strip_source(
"""Update a color strip source and hot-reload any running streams."""
try:
kwargs = _extract_css_kwargs(data)
# Validate nesting for composite sources before updating
if data.source_type == "composite" and kwargs.get("layers") is not None:
child_ids = [ly.get("source_id", "") for ly in kwargs["layers"] if ly.get("source_id")]
store.validate_nesting(source_id, child_ids)
source = store.update_source(source_id=source_id, **kwargs)
# Hot-reload running stream (no restart needed for in-place param changes)
@@ -220,7 +220,7 @@ class ColorStripStreamManager:
return composite
return css_id
def acquire(self, css_id: str, consumer_id: str = "") -> ColorStripStream:
def acquire(self, css_id: str, consumer_id: str = "", depth: int = 0) -> ColorStripStream:
"""Get or create a ColorStripStream for the given ColorStripSource.
Sharable sources (picture) are shared — keyed by css_id, ref-counted.
@@ -231,6 +231,8 @@ class ColorStripStreamManager:
css_id: ID of the ColorStripSource config
consumer_id: Unique consumer identifier (target_id) — used as
registry key for non-sharable streams.
depth: Current nesting depth (passed to composite streams for
runtime depth guarding).
Returns:
ColorStripStream instance
@@ -259,7 +261,7 @@ class ColorStripStreamManager:
)
css_stream = CompositeColorStripStream(
source, self, self._value_stream_manager, self._cspt_store
source, self, self._value_stream_manager, self._cspt_store, depth=depth
)
elif source.source_type == "mapped":
from wled_controller.core.processing.mapped_stream import MappedColorStripStream
@@ -25,10 +25,16 @@ _BLEND_DIFFERENCE = "difference"
_BLEND_EXCLUSION = "exclusion"
_MAX_RUNTIME_DEPTH = 4
class CompositeColorStripStream(ColorStripStream):
"""Blends multiple ColorStripStreams as layers with blend modes and opacity.
Each layer references an existing (non-composite) ColorStripSource.
Layers may reference any ColorStripSource, including other composites
(nesting). Cycle detection and depth limits are enforced by the store
at save time; a runtime depth guard provides a safety net.
Sub-streams are acquired from the ColorStripStreamManager so picture
sources share their existing capture pipeline.
@@ -36,11 +42,14 @@ class CompositeColorStripStream(ColorStripStream):
sub-stream's latest colors and blending bottom-to-top.
"""
def __init__(self, source, css_manager, value_stream_manager=None, cspt_store=None):
def __init__(
self, source, css_manager, value_stream_manager=None, cspt_store=None, depth: int = 0
):
import uuid as _uuid
self._source_id: str = source.id
self._instance_id: str = _uuid.uuid4().hex[:8] # unique per instance to avoid release races
self._depth: int = depth
self._layers: List[dict] = list(source.layers)
self._led_count: int = source.led_count
self._auto_size: bool = source.led_count == 0
@@ -207,6 +216,12 @@ class CompositeColorStripStream(ColorStripStream):
def _acquire_sub_streams(self) -> None:
self._sub_streams_version += 1
if self._depth >= _MAX_RUNTIME_DEPTH:
logger.warning(
f"CompositeColorStripStream {self._source_id} at depth {self._depth}"
f"skipping sub-stream acquisition (max depth {_MAX_RUNTIME_DEPTH})"
)
return
for i, layer in enumerate(self._layers):
if not layer.get("enabled", True):
continue
@@ -215,7 +230,7 @@ class CompositeColorStripStream(ColorStripStream):
continue
consumer_id = f"{self._source_id}__{self._instance_id}__layer_{i}"
try:
stream = self._css_manager.acquire(src_id, consumer_id)
stream = self._css_manager.acquire(src_id, consumer_id, depth=self._depth + 1)
if hasattr(stream, "configure") and self._led_count > 0:
# Configure with zone length if layer has a range, else full strip
layer_start = layer.get("start", 0)
@@ -1432,7 +1432,7 @@ export async function showCSSEditor(cssId: any = null, cloneData: any = null, pr
await valueSourcesCache.fetch().catch((): any[] => []);
const allCssSources: any[] = await colorStripSourcesCache.fetch().catch((): any[] => []);
compositeSetAvailableSources(allCssSources.filter(s =>
s.source_type !== 'composite' && (!cssId || s.id !== cssId)
!cssId || s.id !== cssId
));
mappedSetAvailableSources(allCssSources.filter(s =>
s.source_type !== 'mapped' && (!cssId || s.id !== cssId)
@@ -1484,7 +1484,7 @@ export async function showCSSEditor(cssId: any = null, cloneData: any = null, pr
(document.getElementById('css-editor-name') as HTMLInputElement).value = css.name;
if (css.source_type === 'composite') {
compositeSetAvailableSources(allCssSources.filter(s =>
s.source_type !== 'composite' && s.id !== css.id
s.id !== css.id
));
} else if (css.source_type === 'mapped') {
mappedSetAvailableSources(allCssSources.filter(s =>
@@ -17,6 +17,8 @@ from wled_controller.utils import get_logger
logger = get_logger(__name__)
MAX_COMPOSITE_DEPTH = 4
class ColorStripStore(BaseSqliteStore[ColorStripSource]):
"""Persistent storage for color strip sources."""
@@ -35,8 +37,7 @@ class ColorStripStore(BaseSqliteStore[ColorStripSource]):
"""Get a color strip source by ID (alias for get())."""
return self.get(source_id)
def create_source(self, name: str, source_type: str = "picture",
**kwargs) -> ColorStripSource:
def create_source(self, name: str, source_type: str = "picture", **kwargs) -> ColorStripSource:
"""Create a new color strip source.
All type-specific parameters are passed as keyword arguments and
@@ -138,6 +139,111 @@ class ColorStripStore(BaseSqliteStore[ColorStripSource]):
break
return names
def get_transitive_dependencies(self, source_id: str) -> set[str]:
"""Return the set of all source IDs that *source_id* transitively depends on.
Walks composite layers and mapped zones to build the full dependency graph.
"""
visited: set[str] = set()
stack = [source_id]
while stack:
current = stack.pop()
if current in visited:
continue
visited.add(current)
source = self._items.get(current)
if source is None:
continue
child_ids: list[str] = []
if isinstance(source, CompositeColorStripSource):
child_ids = [
layer.get("source_id", "") for layer in source.layers if layer.get("source_id")
]
elif isinstance(source, MappedColorStripSource):
child_ids = [
zone.get("source_id", "") for zone in source.zones if zone.get("source_id")
]
elif isinstance(source, ProcessedColorStripSource):
if source.input_source_id:
child_ids = [source.input_source_id]
stack.extend(cid for cid in child_ids if cid not in visited)
visited.discard(source_id) # exclude self from dependency set
return visited
def get_nesting_depth(self, source_id: str) -> int:
"""Return the maximum nesting depth reachable from *source_id*.
Leaf sources have depth 0. A composite with only leaf layers has depth 1.
"""
cache: dict[str, int] = {}
def _depth(sid: str, path: frozenset[str]) -> int:
if sid in cache:
return cache[sid]
source = self._items.get(sid)
if source is None:
return 0
child_ids: list[str] = []
if isinstance(source, CompositeColorStripSource):
child_ids = [ly.get("source_id", "") for ly in source.layers if ly.get("source_id")]
elif isinstance(source, MappedColorStripSource):
child_ids = [z.get("source_id", "") for z in source.zones if z.get("source_id")]
elif isinstance(source, ProcessedColorStripSource):
if source.input_source_id:
child_ids = [source.input_source_id]
if not child_ids:
cache[sid] = 0
return 0
max_child = 0
for cid in child_ids:
if cid in path:
continue # skip cycles (validated separately)
max_child = max(max_child, _depth(cid, path | {cid}))
depth = 1 + max_child
cache[sid] = depth
return depth
return _depth(source_id, frozenset({source_id}))
def validate_nesting(
self,
parent_id: str,
child_source_ids: list[str],
) -> None:
"""Validate that adding *child_source_ids* as layers/zones of *parent_id* is safe.
Raises ValueError if:
- Any child is the parent itself (self-reference)
- Any child transitively depends on the parent (would create a cycle)
- The resulting nesting depth exceeds MAX_COMPOSITE_DEPTH
"""
for child_id in child_source_ids:
if not child_id:
continue
if child_id == parent_id:
child_name = getattr(self._items.get(child_id), "name", child_id)
raise ValueError(f"Cannot add '{child_name}' as its own layer (self-reference)")
child_deps = self.get_transitive_dependencies(child_id)
if parent_id in child_deps:
child_name = getattr(self._items.get(child_id), "name", child_id)
parent_name = getattr(self._items.get(parent_id), "name", parent_id)
raise ValueError(
f"Cannot add '{child_name}' as a layer of '{parent_name}': "
f"it would create a circular reference"
)
# Check depth: temporarily compute what the depth would be
max_child_depth = 0
for child_id in child_source_ids:
if not child_id:
continue
max_child_depth = max(max_child_depth, self.get_nesting_depth(child_id))
resulting_depth = 1 + max_child_depth
if resulting_depth > MAX_COMPOSITE_DEPTH:
raise ValueError(
f"Nesting depth {resulting_depth} exceeds maximum of {MAX_COMPOSITE_DEPTH}"
)
def get_processed_referencing(self, source_id: str) -> List[str]:
"""Return names of processed sources that reference a given source as input."""
names = []
+169
View File
@@ -0,0 +1,169 @@
"""Tests for composite nesting: cycle detection, depth limits, transitive dependencies."""
import pytest
from wled_controller.storage.color_strip_store import ColorStripStore, MAX_COMPOSITE_DEPTH
from wled_controller.storage.database import Database
# ── Fixtures ──────────────────────────────────────────────────────────
@pytest.fixture
def tmp_db(tmp_path):
db = Database(tmp_path / "test.db")
yield db
db.close()
@pytest.fixture
def store(tmp_db):
return ColorStripStore(tmp_db)
def _create_static(store: ColorStripStore, name: str) -> str:
"""Create a static color strip source, return its ID."""
source = store.create_source(name=name, source_type="static", colors=[[255, 0, 0]])
return source.id
def _create_composite(store: ColorStripStore, name: str, layer_ids: list[str]) -> str:
"""Create a composite source referencing the given layer IDs, return its ID."""
layers = [
{"source_id": sid, "blend_mode": "normal", "opacity": 1.0, "enabled": True}
for sid in layer_ids
]
source = store.create_source(name=name, source_type="composite", layers=layers)
return source.id
# ── Transitive Dependencies ──────────────────────────────────────────
class TestTransitiveDependencies:
def test_leaf_has_no_deps(self, store):
sid = _create_static(store, "leaf")
assert store.get_transitive_dependencies(sid) == set()
def test_composite_deps_are_its_layers(self, store):
a = _create_static(store, "A")
b = _create_static(store, "B")
comp = _create_composite(store, "comp", [a, b])
deps = store.get_transitive_dependencies(comp)
assert deps == {a, b}
def test_nested_composite_transitive_deps(self, store):
a = _create_static(store, "A")
b = _create_static(store, "B")
inner = _create_composite(store, "inner", [a])
outer = _create_composite(store, "outer", [inner, b])
deps = store.get_transitive_dependencies(outer)
assert deps == {inner, a, b}
def test_nonexistent_id_returns_empty(self, store):
assert store.get_transitive_dependencies("css_nonexist") == set()
# ── Nesting Depth ────────────────────────────────────────────────────
class TestNestingDepth:
def test_leaf_depth_is_zero(self, store):
sid = _create_static(store, "leaf")
assert store.get_nesting_depth(sid) == 0
def test_flat_composite_depth_is_one(self, store):
a = _create_static(store, "A")
comp = _create_composite(store, "comp", [a])
assert store.get_nesting_depth(comp) == 1
def test_two_level_nesting_depth(self, store):
a = _create_static(store, "A")
inner = _create_composite(store, "inner", [a])
outer = _create_composite(store, "outer", [inner])
assert store.get_nesting_depth(outer) == 2
def test_depth_uses_deepest_branch(self, store):
a = _create_static(store, "A")
b = _create_static(store, "B")
inner = _create_composite(store, "inner", [a])
outer = _create_composite(store, "outer", [inner, b])
# inner has depth 1, b has depth 0 → outer picks max (1) + 1 = 2
assert store.get_nesting_depth(outer) == 2
# ── Validate Nesting ─────────────────────────────────────────────────
class TestValidateNesting:
def test_self_reference_rejected(self, store):
a = _create_static(store, "A")
comp = _create_composite(store, "comp", [a])
with pytest.raises(ValueError, match="self-reference"):
store.validate_nesting(comp, [comp])
def test_direct_cycle_rejected(self, store):
"""A → B → A should be rejected."""
a = _create_static(store, "A")
comp_b = _create_composite(store, "B", [a])
comp_a = _create_composite(store, "A-comp", [comp_b])
# Try to add A-comp as a layer of B → would create B → A-comp → B
with pytest.raises(ValueError, match="circular reference"):
store.validate_nesting(comp_b, [comp_a])
def test_transitive_cycle_rejected(self, store):
"""A → B → C, then trying C → A should be rejected."""
leaf = _create_static(store, "leaf")
c = _create_composite(store, "C", [leaf])
b = _create_composite(store, "B", [c])
a = _create_composite(store, "A", [b])
# Try to add A as a layer of C → C → A → B → C
with pytest.raises(ValueError, match="circular reference"):
store.validate_nesting(c, [a])
def test_valid_nesting_accepted(self, store):
"""A → B where B is composite with only leaves should be fine."""
leaf = _create_static(store, "leaf")
inner = _create_composite(store, "inner", [leaf])
outer_id = _create_composite(store, "outer", [])
# Should not raise
store.validate_nesting(outer_id, [inner])
def test_depth_limit_exceeded(self, store):
"""Build a chain deeper than MAX_COMPOSITE_DEPTH and verify rejection."""
# Build chain: leaf → comp1 → comp2 → ... → compN
prev = _create_static(store, "leaf")
for i in range(MAX_COMPOSITE_DEPTH):
prev = _create_composite(store, f"comp_{i}", [prev])
# prev is now at depth MAX_COMPOSITE_DEPTH
# Trying to wrap it in one more composite should fail
wrapper = _create_composite(store, "wrapper", [])
with pytest.raises(ValueError, match="Nesting depth"):
store.validate_nesting(wrapper, [prev])
def test_depth_at_limit_accepted(self, store):
"""A chain exactly at MAX_COMPOSITE_DEPTH should be accepted."""
prev = _create_static(store, "leaf")
for i in range(MAX_COMPOSITE_DEPTH - 1):
prev = _create_composite(store, f"comp_{i}", [prev])
# prev is at depth MAX_COMPOSITE_DEPTH - 1
wrapper = _create_composite(store, "wrapper", [])
# 1 + (MAX_COMPOSITE_DEPTH - 1) = MAX_COMPOSITE_DEPTH → should pass
store.validate_nesting(wrapper, [prev])
def test_multiple_children_validates_all(self, store):
"""If one child creates a cycle, validation fails even if others are fine."""
leaf = _create_static(store, "leaf")
comp = _create_composite(store, "comp", [leaf])
with pytest.raises(ValueError, match="self-reference"):
store.validate_nesting(comp, [leaf, comp])
def test_empty_children_accepted(self, store):
"""Empty children list should always pass."""
comp = _create_composite(store, "comp", [])
store.validate_nesting(comp, [])
def test_none_children_skipped(self, store):
"""Empty-string child IDs should be skipped."""
comp = _create_composite(store, "comp", [])
store.validate_nesting(comp, ["", ""])