refactor(processing): dedupe HA/Z2M _swap_color_source via shared helper

HALightTargetProcessor and Z2MLightTargetProcessor used to carry
character-for-character identical _swap_color_source method bodies
(audit finding C5) — only the log prefix differed. Extract the body
into a free function ``swap_color_source(processor, new_kind,
new_color_vs_id, *, log_label)`` in a new ``light_target_helpers``
module. Each processor's _swap_color_source now delegates to the helper
and then clears its per-entity history (``_previous_colors`` /
``_previous_on``) — that bit stays on the processor because it's per-
target state, not colour-source state.

Scope deliberately narrower than the full BaseLightTargetProcessor ABC
the audit gestured at: the 76 read sites for the per-processor colour
state across the two files made a full state-composition refactor too
risky for the live LED control loop. The free-function helper is the
minimum-blast-radius way to delete the duplication while leaving WLED
(which has no value-stream-vs-CSS dispatch) untouched.

The helper standardises both warning messages on HA's original wording
("failed to acquire color VS stream" / "failed to re-acquire CSS
stream") so existing log alerts/grep patterns keep working.

A LightTargetSwapState Protocol under TYPE_CHECKING documents the
expected processor surface; no runtime enforcement (acceptable trade-
off vs a 76-site touchpoint).

Tests: 7 new tests cover the release+acquire ordering, the not-running
no-op path, the manager-error-swallowing behaviour, the empty-id
short-circuit, and the missing-manager (TargetContext(None, None))
fallback. 354 existing storage + API + e2e + processing tests stay
green; ruff clean.
This commit is contained in:
2026-05-22 22:54:14 +03:00
parent 563cbac88c
commit 29bdacf69a
4 changed files with 303 additions and 76 deletions
@@ -0,0 +1,183 @@
"""Tests for the shared HA/Z2M light-target helper.
Black-box: build a stub processor with the required attributes, capture
the manager call sequence, and verify ``swap_color_source`` performs the
release+acquire dance in the same order the two production processors
used to do it inline.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, List, Tuple
from ledgrab.core.processing.light_target_helpers import swap_color_source
@dataclass
class _RecordingManager:
"""Captures acquire/release calls so tests can assert ordering."""
calls: List[Tuple[str, ...]] = field(default_factory=list)
raise_on_acquire: bool = False
fake_stream: Any = "fake-stream-token"
def acquire(self, *args):
self.calls.append(("acquire", *(str(a) for a in args)))
if self.raise_on_acquire:
raise RuntimeError("simulated manager failure")
return self.fake_stream
def release(self, *args):
self.calls.append(("release", *(str(a) for a in args)))
@dataclass
class _StubCtx:
color_strip_stream_manager: Any = None
value_stream_manager: Any = None
class _StubProcessor:
"""Minimum surface required by ``swap_color_source``."""
def __init__(self, *, source_kind: str, css_id: str, color_vs_id: str, is_running: bool):
self._is_running = is_running
self._source_kind = source_kind
self._css_id = css_id
self._color_vs_id = color_vs_id
self._target_id = "tgt_x"
self._css_stream = None
self._color_stream = None
self._ctx = _StubCtx()
# ---------------------------------------------------------------------------
# Basic swap behaviour
# ---------------------------------------------------------------------------
def test_swap_from_css_to_color_vs_releases_css_and_acquires_vs():
css_mgr = _RecordingManager()
vs_mgr = _RecordingManager()
p = _StubProcessor(source_kind="css", css_id="css_a", color_vs_id="", is_running=True)
p._ctx.color_strip_stream_manager = css_mgr
p._ctx.value_stream_manager = vs_mgr
p._css_stream = "old-css-stream"
swap_color_source(p, new_kind="color_vs", new_color_vs_id="vs_42", log_label="HA light")
# CSS released first, then VS acquired with the new vs_id
assert css_mgr.calls == [("release", "css_a", "tgt_x")]
assert vs_mgr.calls == [("acquire", "vs_42")]
# State updated
assert p._source_kind == "color_vs"
assert p._color_vs_id == "vs_42"
assert p._css_stream is None
assert p._color_stream == "fake-stream-token"
def test_swap_from_color_vs_to_css_releases_vs_and_acquires_css():
css_mgr = _RecordingManager()
vs_mgr = _RecordingManager()
p = _StubProcessor(
source_kind="color_vs", css_id="css_b", color_vs_id="vs_old", is_running=True
)
p._ctx.color_strip_stream_manager = css_mgr
p._ctx.value_stream_manager = vs_mgr
p._color_stream = "old-vs-stream"
swap_color_source(p, new_kind="css", new_color_vs_id="", log_label="Z2M light")
assert vs_mgr.calls == [("release", "vs_old")]
assert css_mgr.calls == [("acquire", "css_b", "tgt_x")]
assert p._source_kind == "css"
assert p._color_stream is None
assert p._css_stream == "fake-stream-token"
def test_swap_when_not_running_only_updates_config():
"""If the processor is not running, no acquire/release calls fire."""
css_mgr = _RecordingManager()
vs_mgr = _RecordingManager()
p = _StubProcessor(source_kind="css", css_id="css_x", color_vs_id="", is_running=False)
p._ctx.color_strip_stream_manager = css_mgr
p._ctx.value_stream_manager = vs_mgr
swap_color_source(p, new_kind="color_vs", new_color_vs_id="vs_new", log_label="HA light")
assert css_mgr.calls == []
assert vs_mgr.calls == []
assert p._source_kind == "color_vs"
assert p._color_vs_id == "vs_new"
def test_swap_swallows_manager_release_errors():
"""A manager whose release() raises does not abort the swap."""
class _ExplodingMgr(_RecordingManager):
def release(self, *args):
self.calls.append(("release", *(str(a) for a in args)))
raise RuntimeError("source already deleted")
css_mgr = _ExplodingMgr()
vs_mgr = _RecordingManager()
p = _StubProcessor(source_kind="css", css_id="css_dead", color_vs_id="", is_running=True)
p._ctx.color_strip_stream_manager = css_mgr
p._ctx.value_stream_manager = vs_mgr
p._css_stream = "stale"
swap_color_source(p, new_kind="color_vs", new_color_vs_id="vs_99", log_label="HA light")
assert css_mgr.calls == [("release", "css_dead", "tgt_x")]
assert vs_mgr.calls == [("acquire", "vs_99")]
assert p._css_stream is None
assert p._color_stream == "fake-stream-token"
def test_swap_swallows_acquire_error_and_leaves_stream_none():
"""A failing acquire() does NOT propagate; the stream stays None."""
vs_mgr = _RecordingManager(raise_on_acquire=True)
p = _StubProcessor(source_kind="css", css_id="css_x", color_vs_id="", is_running=True)
p._ctx.value_stream_manager = vs_mgr
# Should not raise even though the manager does.
swap_color_source(p, new_kind="color_vs", new_color_vs_id="vs_y", log_label="HA light")
assert vs_mgr.calls == [("acquire", "vs_y")]
assert p._color_stream is None
assert p._source_kind == "color_vs"
assert p._color_vs_id == "vs_y"
def test_swap_skips_acquire_when_new_id_empty():
"""Swapping to color_vs with empty id is a no-op for the acquire step."""
vs_mgr = _RecordingManager()
p = _StubProcessor(source_kind="css", css_id="css_x", color_vs_id="", is_running=True)
p._ctx.value_stream_manager = vs_mgr
swap_color_source(p, new_kind="color_vs", new_color_vs_id="", log_label="HA light")
assert vs_mgr.calls == []
assert p._color_stream is None
def test_swap_tolerates_missing_managers_on_ctx():
"""A context with both managers set to None must not crash the swap.
Locks the helper's behaviour for a freshly-constructed processor whose
``TargetContext`` is the default empty bag.
"""
p = _StubProcessor(source_kind="css", css_id="css_x", color_vs_id="vs_old", is_running=True)
# Both manager attributes are None by default on _StubCtx — this
# mirrors a fresh-install / demo-mode TargetContext.
assert p._ctx.color_strip_stream_manager is None
assert p._ctx.value_stream_manager is None
# Should not raise
swap_color_source(p, new_kind="color_vs", new_color_vs_id="vs_new", log_label="HA light")
assert p._source_kind == "color_vs"
assert p._color_vs_id == "vs_new"
assert p._color_stream is None
assert p._css_stream is None