feat: support nesting for composite color strip sources
Lint & Test / test (push) Successful in 2m17s
Lint & Test / test (push) Successful in 2m17s
Allow composite sources to reference other composite/mapped sources as layers. Adds cycle detection (via transitive dependency graph walk), depth limiting (MAX_COMPOSITE_DEPTH=4), and a runtime safety net in the stream manager. Frontend layer dropdown now shows all source types except the source being edited. 17 new tests covering cycles, depth limits, and valid nesting — all 715 tests passing.
This commit is contained in:
@@ -1,28 +1,34 @@
|
|||||||
# Group Device Type Implementation
|
# Composite Nesting Support
|
||||||
|
|
||||||
## Phase 1: Storage Layer
|
## Phase 1: Store — Cycle & Depth Validation
|
||||||
- [x] Add `group_device_ids`, `group_mode` fields to Device model
|
|
||||||
- [x] Add cycle detection + led_count resolution + group reference helpers to DeviceStore
|
|
||||||
|
|
||||||
## Phase 2: API Schemas
|
- [x] Add `get_transitive_dependencies()` to ColorStripStore
|
||||||
- [x] Add group fields to DeviceCreate, DeviceUpdate, DeviceResponse
|
- [x] Add `validate_nesting()` with cycle detection + depth limit (MAX_DEPTH=4)
|
||||||
|
|
||||||
## Phase 3: GroupLEDClient + Provider
|
## Phase 2: API — Validation in Create/Update
|
||||||
- [x] Create `group_client.py` — GroupLEDClient (sequence slice / independent resample)
|
|
||||||
- [x] Create `group_provider.py` — GroupDeviceProvider
|
|
||||||
- [x] Register group provider in `led_client.py`
|
|
||||||
|
|
||||||
## Phase 4: Routes + Processing Pipeline
|
- [x] Call `validate_nesting()` in create handler for composite sources
|
||||||
- [x] Update device routes — group-specific create/update logic, delete protection, cycle validation
|
- [x] Call `validate_nesting()` in update handler for composite sources
|
||||||
- [x] Add group fields to DeviceInfo + _DEVICE_FIELD_DEFAULTS
|
|
||||||
- [x] Pass group context (device_store, group fields) to create_led_client
|
## Phase 3: Runtime — Depth Guard in Stream
|
||||||
|
|
||||||
|
- [x] Add `depth` parameter to CompositeColorStripStream
|
||||||
|
- [x] Pass depth through ColorStripStreamManager.acquire()
|
||||||
|
- [x] Cap depth at runtime to prevent runaway nesting
|
||||||
|
|
||||||
|
## Phase 4: Frontend — Allow Composites in Layer Dropdown
|
||||||
|
|
||||||
|
- [x] Remove `source_type !== 'composite'` filter (keep self-exclusion)
|
||||||
|
- [x] Update docstring in composite_stream.py
|
||||||
|
|
||||||
## Phase 5: Tests
|
## Phase 5: Tests
|
||||||
- [x] Unit tests for cycle detection, led_count resolution, GroupLEDClient (20 tests, all passing)
|
|
||||||
|
|
||||||
## Phase 6: Frontend
|
- [x] Cycle detection tests (A→B→A, self-reference)
|
||||||
- [x] Group device UI (child picker, mode selector, hide URL for groups)
|
- [x] Depth limit tests (chain exceeding MAX_DEPTH)
|
||||||
- [x] i18n keys (en, ru, zh)
|
- [x] Valid nesting tests (A→B both composite, no cycle)
|
||||||
- [x] TypeScript types + API helper
|
|
||||||
- [x] Icon (layers) for group device type
|
## Phase 6: Lint & Build
|
||||||
- [x] CSS for group child rows
|
|
||||||
|
- [x] Ruff check passes
|
||||||
|
- [x] TypeScript build passes
|
||||||
|
- [x] All existing tests pass (715/715)
|
||||||
|
|||||||
@@ -366,6 +366,20 @@ async def create_color_strip_source(
|
|||||||
"""Create a new color strip source."""
|
"""Create a new color strip source."""
|
||||||
try:
|
try:
|
||||||
kwargs = _extract_css_kwargs(data)
|
kwargs = _extract_css_kwargs(data)
|
||||||
|
|
||||||
|
# Validate nesting for composite/mapped sources before creating
|
||||||
|
if data.source_type == "composite" and kwargs.get("layers"):
|
||||||
|
child_ids = [ly.get("source_id", "") for ly in kwargs["layers"] if ly.get("source_id")]
|
||||||
|
# No parent_id yet (new source), just check depth
|
||||||
|
from wled_controller.storage.color_strip_store import MAX_COMPOSITE_DEPTH
|
||||||
|
|
||||||
|
for cid in child_ids:
|
||||||
|
depth = store.get_nesting_depth(cid)
|
||||||
|
if 1 + depth > MAX_COMPOSITE_DEPTH:
|
||||||
|
raise ValueError(
|
||||||
|
f"Nesting depth {1 + depth} exceeds maximum of {MAX_COMPOSITE_DEPTH}"
|
||||||
|
)
|
||||||
|
|
||||||
source = store.create_source(source_type=data.source_type, **kwargs)
|
source = store.create_source(source_type=data.source_type, **kwargs)
|
||||||
fire_entity_event("color_strip_source", "created", source.id)
|
fire_entity_event("color_strip_source", "created", source.id)
|
||||||
return _css_to_response(source)
|
return _css_to_response(source)
|
||||||
@@ -414,6 +428,12 @@ async def update_color_strip_source(
|
|||||||
"""Update a color strip source and hot-reload any running streams."""
|
"""Update a color strip source and hot-reload any running streams."""
|
||||||
try:
|
try:
|
||||||
kwargs = _extract_css_kwargs(data)
|
kwargs = _extract_css_kwargs(data)
|
||||||
|
|
||||||
|
# Validate nesting for composite sources before updating
|
||||||
|
if data.source_type == "composite" and kwargs.get("layers") is not None:
|
||||||
|
child_ids = [ly.get("source_id", "") for ly in kwargs["layers"] if ly.get("source_id")]
|
||||||
|
store.validate_nesting(source_id, child_ids)
|
||||||
|
|
||||||
source = store.update_source(source_id=source_id, **kwargs)
|
source = store.update_source(source_id=source_id, **kwargs)
|
||||||
|
|
||||||
# Hot-reload running stream (no restart needed for in-place param changes)
|
# Hot-reload running stream (no restart needed for in-place param changes)
|
||||||
|
|||||||
@@ -220,7 +220,7 @@ class ColorStripStreamManager:
|
|||||||
return composite
|
return composite
|
||||||
return css_id
|
return css_id
|
||||||
|
|
||||||
def acquire(self, css_id: str, consumer_id: str = "") -> ColorStripStream:
|
def acquire(self, css_id: str, consumer_id: str = "", depth: int = 0) -> ColorStripStream:
|
||||||
"""Get or create a ColorStripStream for the given ColorStripSource.
|
"""Get or create a ColorStripStream for the given ColorStripSource.
|
||||||
|
|
||||||
Sharable sources (picture) are shared — keyed by css_id, ref-counted.
|
Sharable sources (picture) are shared — keyed by css_id, ref-counted.
|
||||||
@@ -231,6 +231,8 @@ class ColorStripStreamManager:
|
|||||||
css_id: ID of the ColorStripSource config
|
css_id: ID of the ColorStripSource config
|
||||||
consumer_id: Unique consumer identifier (target_id) — used as
|
consumer_id: Unique consumer identifier (target_id) — used as
|
||||||
registry key for non-sharable streams.
|
registry key for non-sharable streams.
|
||||||
|
depth: Current nesting depth (passed to composite streams for
|
||||||
|
runtime depth guarding).
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
ColorStripStream instance
|
ColorStripStream instance
|
||||||
@@ -259,7 +261,7 @@ class ColorStripStreamManager:
|
|||||||
)
|
)
|
||||||
|
|
||||||
css_stream = CompositeColorStripStream(
|
css_stream = CompositeColorStripStream(
|
||||||
source, self, self._value_stream_manager, self._cspt_store
|
source, self, self._value_stream_manager, self._cspt_store, depth=depth
|
||||||
)
|
)
|
||||||
elif source.source_type == "mapped":
|
elif source.source_type == "mapped":
|
||||||
from wled_controller.core.processing.mapped_stream import MappedColorStripStream
|
from wled_controller.core.processing.mapped_stream import MappedColorStripStream
|
||||||
|
|||||||
@@ -25,10 +25,16 @@ _BLEND_DIFFERENCE = "difference"
|
|||||||
_BLEND_EXCLUSION = "exclusion"
|
_BLEND_EXCLUSION = "exclusion"
|
||||||
|
|
||||||
|
|
||||||
|
_MAX_RUNTIME_DEPTH = 4
|
||||||
|
|
||||||
|
|
||||||
class CompositeColorStripStream(ColorStripStream):
|
class CompositeColorStripStream(ColorStripStream):
|
||||||
"""Blends multiple ColorStripStreams as layers with blend modes and opacity.
|
"""Blends multiple ColorStripStreams as layers with blend modes and opacity.
|
||||||
|
|
||||||
Each layer references an existing (non-composite) ColorStripSource.
|
Layers may reference any ColorStripSource, including other composites
|
||||||
|
(nesting). Cycle detection and depth limits are enforced by the store
|
||||||
|
at save time; a runtime depth guard provides a safety net.
|
||||||
|
|
||||||
Sub-streams are acquired from the ColorStripStreamManager so picture
|
Sub-streams are acquired from the ColorStripStreamManager so picture
|
||||||
sources share their existing capture pipeline.
|
sources share their existing capture pipeline.
|
||||||
|
|
||||||
@@ -36,11 +42,14 @@ class CompositeColorStripStream(ColorStripStream):
|
|||||||
sub-stream's latest colors and blending bottom-to-top.
|
sub-stream's latest colors and blending bottom-to-top.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, source, css_manager, value_stream_manager=None, cspt_store=None):
|
def __init__(
|
||||||
|
self, source, css_manager, value_stream_manager=None, cspt_store=None, depth: int = 0
|
||||||
|
):
|
||||||
import uuid as _uuid
|
import uuid as _uuid
|
||||||
|
|
||||||
self._source_id: str = source.id
|
self._source_id: str = source.id
|
||||||
self._instance_id: str = _uuid.uuid4().hex[:8] # unique per instance to avoid release races
|
self._instance_id: str = _uuid.uuid4().hex[:8] # unique per instance to avoid release races
|
||||||
|
self._depth: int = depth
|
||||||
self._layers: List[dict] = list(source.layers)
|
self._layers: List[dict] = list(source.layers)
|
||||||
self._led_count: int = source.led_count
|
self._led_count: int = source.led_count
|
||||||
self._auto_size: bool = source.led_count == 0
|
self._auto_size: bool = source.led_count == 0
|
||||||
@@ -207,6 +216,12 @@ class CompositeColorStripStream(ColorStripStream):
|
|||||||
|
|
||||||
def _acquire_sub_streams(self) -> None:
|
def _acquire_sub_streams(self) -> None:
|
||||||
self._sub_streams_version += 1
|
self._sub_streams_version += 1
|
||||||
|
if self._depth >= _MAX_RUNTIME_DEPTH:
|
||||||
|
logger.warning(
|
||||||
|
f"CompositeColorStripStream {self._source_id} at depth {self._depth} — "
|
||||||
|
f"skipping sub-stream acquisition (max depth {_MAX_RUNTIME_DEPTH})"
|
||||||
|
)
|
||||||
|
return
|
||||||
for i, layer in enumerate(self._layers):
|
for i, layer in enumerate(self._layers):
|
||||||
if not layer.get("enabled", True):
|
if not layer.get("enabled", True):
|
||||||
continue
|
continue
|
||||||
@@ -215,7 +230,7 @@ class CompositeColorStripStream(ColorStripStream):
|
|||||||
continue
|
continue
|
||||||
consumer_id = f"{self._source_id}__{self._instance_id}__layer_{i}"
|
consumer_id = f"{self._source_id}__{self._instance_id}__layer_{i}"
|
||||||
try:
|
try:
|
||||||
stream = self._css_manager.acquire(src_id, consumer_id)
|
stream = self._css_manager.acquire(src_id, consumer_id, depth=self._depth + 1)
|
||||||
if hasattr(stream, "configure") and self._led_count > 0:
|
if hasattr(stream, "configure") and self._led_count > 0:
|
||||||
# Configure with zone length if layer has a range, else full strip
|
# Configure with zone length if layer has a range, else full strip
|
||||||
layer_start = layer.get("start", 0)
|
layer_start = layer.get("start", 0)
|
||||||
|
|||||||
@@ -1432,7 +1432,7 @@ export async function showCSSEditor(cssId: any = null, cloneData: any = null, pr
|
|||||||
await valueSourcesCache.fetch().catch((): any[] => []);
|
await valueSourcesCache.fetch().catch((): any[] => []);
|
||||||
const allCssSources: any[] = await colorStripSourcesCache.fetch().catch((): any[] => []);
|
const allCssSources: any[] = await colorStripSourcesCache.fetch().catch((): any[] => []);
|
||||||
compositeSetAvailableSources(allCssSources.filter(s =>
|
compositeSetAvailableSources(allCssSources.filter(s =>
|
||||||
s.source_type !== 'composite' && (!cssId || s.id !== cssId)
|
!cssId || s.id !== cssId
|
||||||
));
|
));
|
||||||
mappedSetAvailableSources(allCssSources.filter(s =>
|
mappedSetAvailableSources(allCssSources.filter(s =>
|
||||||
s.source_type !== 'mapped' && (!cssId || s.id !== cssId)
|
s.source_type !== 'mapped' && (!cssId || s.id !== cssId)
|
||||||
@@ -1484,7 +1484,7 @@ export async function showCSSEditor(cssId: any = null, cloneData: any = null, pr
|
|||||||
(document.getElementById('css-editor-name') as HTMLInputElement).value = css.name;
|
(document.getElementById('css-editor-name') as HTMLInputElement).value = css.name;
|
||||||
if (css.source_type === 'composite') {
|
if (css.source_type === 'composite') {
|
||||||
compositeSetAvailableSources(allCssSources.filter(s =>
|
compositeSetAvailableSources(allCssSources.filter(s =>
|
||||||
s.source_type !== 'composite' && s.id !== css.id
|
s.id !== css.id
|
||||||
));
|
));
|
||||||
} else if (css.source_type === 'mapped') {
|
} else if (css.source_type === 'mapped') {
|
||||||
mappedSetAvailableSources(allCssSources.filter(s =>
|
mappedSetAvailableSources(allCssSources.filter(s =>
|
||||||
|
|||||||
@@ -17,6 +17,8 @@ from wled_controller.utils import get_logger
|
|||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
MAX_COMPOSITE_DEPTH = 4
|
||||||
|
|
||||||
|
|
||||||
class ColorStripStore(BaseSqliteStore[ColorStripSource]):
|
class ColorStripStore(BaseSqliteStore[ColorStripSource]):
|
||||||
"""Persistent storage for color strip sources."""
|
"""Persistent storage for color strip sources."""
|
||||||
@@ -35,8 +37,7 @@ class ColorStripStore(BaseSqliteStore[ColorStripSource]):
|
|||||||
"""Get a color strip source by ID (alias for get())."""
|
"""Get a color strip source by ID (alias for get())."""
|
||||||
return self.get(source_id)
|
return self.get(source_id)
|
||||||
|
|
||||||
def create_source(self, name: str, source_type: str = "picture",
|
def create_source(self, name: str, source_type: str = "picture", **kwargs) -> ColorStripSource:
|
||||||
**kwargs) -> ColorStripSource:
|
|
||||||
"""Create a new color strip source.
|
"""Create a new color strip source.
|
||||||
|
|
||||||
All type-specific parameters are passed as keyword arguments and
|
All type-specific parameters are passed as keyword arguments and
|
||||||
@@ -138,6 +139,111 @@ class ColorStripStore(BaseSqliteStore[ColorStripSource]):
|
|||||||
break
|
break
|
||||||
return names
|
return names
|
||||||
|
|
||||||
|
def get_transitive_dependencies(self, source_id: str) -> set[str]:
|
||||||
|
"""Return the set of all source IDs that *source_id* transitively depends on.
|
||||||
|
|
||||||
|
Walks composite layers and mapped zones to build the full dependency graph.
|
||||||
|
"""
|
||||||
|
visited: set[str] = set()
|
||||||
|
stack = [source_id]
|
||||||
|
while stack:
|
||||||
|
current = stack.pop()
|
||||||
|
if current in visited:
|
||||||
|
continue
|
||||||
|
visited.add(current)
|
||||||
|
source = self._items.get(current)
|
||||||
|
if source is None:
|
||||||
|
continue
|
||||||
|
child_ids: list[str] = []
|
||||||
|
if isinstance(source, CompositeColorStripSource):
|
||||||
|
child_ids = [
|
||||||
|
layer.get("source_id", "") for layer in source.layers if layer.get("source_id")
|
||||||
|
]
|
||||||
|
elif isinstance(source, MappedColorStripSource):
|
||||||
|
child_ids = [
|
||||||
|
zone.get("source_id", "") for zone in source.zones if zone.get("source_id")
|
||||||
|
]
|
||||||
|
elif isinstance(source, ProcessedColorStripSource):
|
||||||
|
if source.input_source_id:
|
||||||
|
child_ids = [source.input_source_id]
|
||||||
|
stack.extend(cid for cid in child_ids if cid not in visited)
|
||||||
|
visited.discard(source_id) # exclude self from dependency set
|
||||||
|
return visited
|
||||||
|
|
||||||
|
def get_nesting_depth(self, source_id: str) -> int:
|
||||||
|
"""Return the maximum nesting depth reachable from *source_id*.
|
||||||
|
|
||||||
|
Leaf sources have depth 0. A composite with only leaf layers has depth 1.
|
||||||
|
"""
|
||||||
|
cache: dict[str, int] = {}
|
||||||
|
|
||||||
|
def _depth(sid: str, path: frozenset[str]) -> int:
|
||||||
|
if sid in cache:
|
||||||
|
return cache[sid]
|
||||||
|
source = self._items.get(sid)
|
||||||
|
if source is None:
|
||||||
|
return 0
|
||||||
|
child_ids: list[str] = []
|
||||||
|
if isinstance(source, CompositeColorStripSource):
|
||||||
|
child_ids = [ly.get("source_id", "") for ly in source.layers if ly.get("source_id")]
|
||||||
|
elif isinstance(source, MappedColorStripSource):
|
||||||
|
child_ids = [z.get("source_id", "") for z in source.zones if z.get("source_id")]
|
||||||
|
elif isinstance(source, ProcessedColorStripSource):
|
||||||
|
if source.input_source_id:
|
||||||
|
child_ids = [source.input_source_id]
|
||||||
|
if not child_ids:
|
||||||
|
cache[sid] = 0
|
||||||
|
return 0
|
||||||
|
max_child = 0
|
||||||
|
for cid in child_ids:
|
||||||
|
if cid in path:
|
||||||
|
continue # skip cycles (validated separately)
|
||||||
|
max_child = max(max_child, _depth(cid, path | {cid}))
|
||||||
|
depth = 1 + max_child
|
||||||
|
cache[sid] = depth
|
||||||
|
return depth
|
||||||
|
|
||||||
|
return _depth(source_id, frozenset({source_id}))
|
||||||
|
|
||||||
|
def validate_nesting(
|
||||||
|
self,
|
||||||
|
parent_id: str,
|
||||||
|
child_source_ids: list[str],
|
||||||
|
) -> None:
|
||||||
|
"""Validate that adding *child_source_ids* as layers/zones of *parent_id* is safe.
|
||||||
|
|
||||||
|
Raises ValueError if:
|
||||||
|
- Any child is the parent itself (self-reference)
|
||||||
|
- Any child transitively depends on the parent (would create a cycle)
|
||||||
|
- The resulting nesting depth exceeds MAX_COMPOSITE_DEPTH
|
||||||
|
"""
|
||||||
|
for child_id in child_source_ids:
|
||||||
|
if not child_id:
|
||||||
|
continue
|
||||||
|
if child_id == parent_id:
|
||||||
|
child_name = getattr(self._items.get(child_id), "name", child_id)
|
||||||
|
raise ValueError(f"Cannot add '{child_name}' as its own layer (self-reference)")
|
||||||
|
child_deps = self.get_transitive_dependencies(child_id)
|
||||||
|
if parent_id in child_deps:
|
||||||
|
child_name = getattr(self._items.get(child_id), "name", child_id)
|
||||||
|
parent_name = getattr(self._items.get(parent_id), "name", parent_id)
|
||||||
|
raise ValueError(
|
||||||
|
f"Cannot add '{child_name}' as a layer of '{parent_name}': "
|
||||||
|
f"it would create a circular reference"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check depth: temporarily compute what the depth would be
|
||||||
|
max_child_depth = 0
|
||||||
|
for child_id in child_source_ids:
|
||||||
|
if not child_id:
|
||||||
|
continue
|
||||||
|
max_child_depth = max(max_child_depth, self.get_nesting_depth(child_id))
|
||||||
|
resulting_depth = 1 + max_child_depth
|
||||||
|
if resulting_depth > MAX_COMPOSITE_DEPTH:
|
||||||
|
raise ValueError(
|
||||||
|
f"Nesting depth {resulting_depth} exceeds maximum of {MAX_COMPOSITE_DEPTH}"
|
||||||
|
)
|
||||||
|
|
||||||
def get_processed_referencing(self, source_id: str) -> List[str]:
|
def get_processed_referencing(self, source_id: str) -> List[str]:
|
||||||
"""Return names of processed sources that reference a given source as input."""
|
"""Return names of processed sources that reference a given source as input."""
|
||||||
names = []
|
names = []
|
||||||
|
|||||||
@@ -0,0 +1,169 @@
|
|||||||
|
"""Tests for composite nesting: cycle detection, depth limits, transitive dependencies."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from wled_controller.storage.color_strip_store import ColorStripStore, MAX_COMPOSITE_DEPTH
|
||||||
|
from wled_controller.storage.database import Database
|
||||||
|
|
||||||
|
|
||||||
|
# ── Fixtures ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def tmp_db(tmp_path):
|
||||||
|
db = Database(tmp_path / "test.db")
|
||||||
|
yield db
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def store(tmp_db):
|
||||||
|
return ColorStripStore(tmp_db)
|
||||||
|
|
||||||
|
|
||||||
|
def _create_static(store: ColorStripStore, name: str) -> str:
|
||||||
|
"""Create a static color strip source, return its ID."""
|
||||||
|
source = store.create_source(name=name, source_type="static", colors=[[255, 0, 0]])
|
||||||
|
return source.id
|
||||||
|
|
||||||
|
|
||||||
|
def _create_composite(store: ColorStripStore, name: str, layer_ids: list[str]) -> str:
|
||||||
|
"""Create a composite source referencing the given layer IDs, return its ID."""
|
||||||
|
layers = [
|
||||||
|
{"source_id": sid, "blend_mode": "normal", "opacity": 1.0, "enabled": True}
|
||||||
|
for sid in layer_ids
|
||||||
|
]
|
||||||
|
source = store.create_source(name=name, source_type="composite", layers=layers)
|
||||||
|
return source.id
|
||||||
|
|
||||||
|
|
||||||
|
# ── Transitive Dependencies ──────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestTransitiveDependencies:
|
||||||
|
def test_leaf_has_no_deps(self, store):
|
||||||
|
sid = _create_static(store, "leaf")
|
||||||
|
assert store.get_transitive_dependencies(sid) == set()
|
||||||
|
|
||||||
|
def test_composite_deps_are_its_layers(self, store):
|
||||||
|
a = _create_static(store, "A")
|
||||||
|
b = _create_static(store, "B")
|
||||||
|
comp = _create_composite(store, "comp", [a, b])
|
||||||
|
deps = store.get_transitive_dependencies(comp)
|
||||||
|
assert deps == {a, b}
|
||||||
|
|
||||||
|
def test_nested_composite_transitive_deps(self, store):
|
||||||
|
a = _create_static(store, "A")
|
||||||
|
b = _create_static(store, "B")
|
||||||
|
inner = _create_composite(store, "inner", [a])
|
||||||
|
outer = _create_composite(store, "outer", [inner, b])
|
||||||
|
deps = store.get_transitive_dependencies(outer)
|
||||||
|
assert deps == {inner, a, b}
|
||||||
|
|
||||||
|
def test_nonexistent_id_returns_empty(self, store):
|
||||||
|
assert store.get_transitive_dependencies("css_nonexist") == set()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Nesting Depth ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestNestingDepth:
|
||||||
|
def test_leaf_depth_is_zero(self, store):
|
||||||
|
sid = _create_static(store, "leaf")
|
||||||
|
assert store.get_nesting_depth(sid) == 0
|
||||||
|
|
||||||
|
def test_flat_composite_depth_is_one(self, store):
|
||||||
|
a = _create_static(store, "A")
|
||||||
|
comp = _create_composite(store, "comp", [a])
|
||||||
|
assert store.get_nesting_depth(comp) == 1
|
||||||
|
|
||||||
|
def test_two_level_nesting_depth(self, store):
|
||||||
|
a = _create_static(store, "A")
|
||||||
|
inner = _create_composite(store, "inner", [a])
|
||||||
|
outer = _create_composite(store, "outer", [inner])
|
||||||
|
assert store.get_nesting_depth(outer) == 2
|
||||||
|
|
||||||
|
def test_depth_uses_deepest_branch(self, store):
|
||||||
|
a = _create_static(store, "A")
|
||||||
|
b = _create_static(store, "B")
|
||||||
|
inner = _create_composite(store, "inner", [a])
|
||||||
|
outer = _create_composite(store, "outer", [inner, b])
|
||||||
|
# inner has depth 1, b has depth 0 → outer picks max (1) + 1 = 2
|
||||||
|
assert store.get_nesting_depth(outer) == 2
|
||||||
|
|
||||||
|
|
||||||
|
# ── Validate Nesting ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestValidateNesting:
|
||||||
|
def test_self_reference_rejected(self, store):
|
||||||
|
a = _create_static(store, "A")
|
||||||
|
comp = _create_composite(store, "comp", [a])
|
||||||
|
with pytest.raises(ValueError, match="self-reference"):
|
||||||
|
store.validate_nesting(comp, [comp])
|
||||||
|
|
||||||
|
def test_direct_cycle_rejected(self, store):
|
||||||
|
"""A → B → A should be rejected."""
|
||||||
|
a = _create_static(store, "A")
|
||||||
|
comp_b = _create_composite(store, "B", [a])
|
||||||
|
comp_a = _create_composite(store, "A-comp", [comp_b])
|
||||||
|
# Try to add A-comp as a layer of B → would create B → A-comp → B
|
||||||
|
with pytest.raises(ValueError, match="circular reference"):
|
||||||
|
store.validate_nesting(comp_b, [comp_a])
|
||||||
|
|
||||||
|
def test_transitive_cycle_rejected(self, store):
|
||||||
|
"""A → B → C, then trying C → A should be rejected."""
|
||||||
|
leaf = _create_static(store, "leaf")
|
||||||
|
c = _create_composite(store, "C", [leaf])
|
||||||
|
b = _create_composite(store, "B", [c])
|
||||||
|
a = _create_composite(store, "A", [b])
|
||||||
|
# Try to add A as a layer of C → C → A → B → C
|
||||||
|
with pytest.raises(ValueError, match="circular reference"):
|
||||||
|
store.validate_nesting(c, [a])
|
||||||
|
|
||||||
|
def test_valid_nesting_accepted(self, store):
|
||||||
|
"""A → B where B is composite with only leaves should be fine."""
|
||||||
|
leaf = _create_static(store, "leaf")
|
||||||
|
inner = _create_composite(store, "inner", [leaf])
|
||||||
|
outer_id = _create_composite(store, "outer", [])
|
||||||
|
# Should not raise
|
||||||
|
store.validate_nesting(outer_id, [inner])
|
||||||
|
|
||||||
|
def test_depth_limit_exceeded(self, store):
|
||||||
|
"""Build a chain deeper than MAX_COMPOSITE_DEPTH and verify rejection."""
|
||||||
|
# Build chain: leaf → comp1 → comp2 → ... → compN
|
||||||
|
prev = _create_static(store, "leaf")
|
||||||
|
for i in range(MAX_COMPOSITE_DEPTH):
|
||||||
|
prev = _create_composite(store, f"comp_{i}", [prev])
|
||||||
|
# prev is now at depth MAX_COMPOSITE_DEPTH
|
||||||
|
# Trying to wrap it in one more composite should fail
|
||||||
|
wrapper = _create_composite(store, "wrapper", [])
|
||||||
|
with pytest.raises(ValueError, match="Nesting depth"):
|
||||||
|
store.validate_nesting(wrapper, [prev])
|
||||||
|
|
||||||
|
def test_depth_at_limit_accepted(self, store):
|
||||||
|
"""A chain exactly at MAX_COMPOSITE_DEPTH should be accepted."""
|
||||||
|
prev = _create_static(store, "leaf")
|
||||||
|
for i in range(MAX_COMPOSITE_DEPTH - 1):
|
||||||
|
prev = _create_composite(store, f"comp_{i}", [prev])
|
||||||
|
# prev is at depth MAX_COMPOSITE_DEPTH - 1
|
||||||
|
wrapper = _create_composite(store, "wrapper", [])
|
||||||
|
# 1 + (MAX_COMPOSITE_DEPTH - 1) = MAX_COMPOSITE_DEPTH → should pass
|
||||||
|
store.validate_nesting(wrapper, [prev])
|
||||||
|
|
||||||
|
def test_multiple_children_validates_all(self, store):
|
||||||
|
"""If one child creates a cycle, validation fails even if others are fine."""
|
||||||
|
leaf = _create_static(store, "leaf")
|
||||||
|
comp = _create_composite(store, "comp", [leaf])
|
||||||
|
with pytest.raises(ValueError, match="self-reference"):
|
||||||
|
store.validate_nesting(comp, [leaf, comp])
|
||||||
|
|
||||||
|
def test_empty_children_accepted(self, store):
|
||||||
|
"""Empty children list should always pass."""
|
||||||
|
comp = _create_composite(store, "comp", [])
|
||||||
|
store.validate_nesting(comp, [])
|
||||||
|
|
||||||
|
def test_none_children_skipped(self, store):
|
||||||
|
"""Empty-string child IDs should be skipped."""
|
||||||
|
comp = _create_composite(store, "comp", [])
|
||||||
|
store.validate_nesting(comp, ["", ""])
|
||||||
Reference in New Issue
Block a user