refactor(processing): dedupe HA/Z2M _swap_color_source via shared helper
HALightTargetProcessor and Z2MLightTargetProcessor used to carry
character-for-character identical _swap_color_source method bodies
(audit finding C5) — only the log prefix differed. Extract the body
into a free function ``swap_color_source(processor, new_kind,
new_color_vs_id, *, log_label)`` in a new ``light_target_helpers``
module. Each processor's _swap_color_source now delegates to the helper
and then clears its per-entity history (``_previous_colors`` /
``_previous_on``) — that bit stays on the processor because it's per-
target state, not colour-source state.
Scope deliberately narrower than the full BaseLightTargetProcessor ABC
the audit gestured at: the 76 read sites for the per-processor colour
state across the two files made a full state-composition refactor too
risky for the live LED control loop. The free-function helper is the
minimum-blast-radius way to delete the duplication while leaving WLED
(which has no value-stream-vs-CSS dispatch) untouched.
The helper standardises both warning messages on HA's original wording
("failed to acquire color VS stream" / "failed to re-acquire CSS
stream") so existing log alerts/grep patterns keep working.
A LightTargetSwapState Protocol under TYPE_CHECKING documents the
expected processor surface; no runtime enforcement (acceptable trade-
off vs a 76-site touchpoint).
Tests: 7 new tests cover the release+acquire ordering, the not-running
no-op path, the manager-error-swallowing behaviour, the empty-id
short-circuit, and the missing-manager (TargetContext(None, None))
fallback. 354 existing storage + API + e2e + processing tests stay
green; ruff clean.
This commit is contained in:
@@ -12,6 +12,7 @@ from typing import Any, Dict, List, Optional, Tuple
|
|||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
from ledgrab.core.processing.light_target_helpers import swap_color_source
|
||||||
from ledgrab.core.processing.target_processor import TargetContext, TargetProcessor
|
from ledgrab.core.processing.target_processor import TargetContext, TargetProcessor
|
||||||
from ledgrab.storage.ha_light_output_target import HALightMapping
|
from ledgrab.storage.ha_light_output_target import HALightMapping
|
||||||
from ledgrab.utils import get_logger
|
from ledgrab.utils import get_logger
|
||||||
@@ -255,50 +256,11 @@ class HALightTargetProcessor(TargetProcessor):
|
|||||||
|
|
||||||
def _swap_color_source(self, new_kind: str, new_color_vs_id: str) -> None:
|
def _swap_color_source(self, new_kind: str, new_color_vs_id: str) -> None:
|
||||||
"""Release the previous colour stream and acquire the new one."""
|
"""Release the previous colour stream and acquire the new one."""
|
||||||
# Tear down previous stream first to keep ref-counts honest.
|
swap_color_source(self, new_kind, new_color_vs_id, log_label="HA light")
|
||||||
if self._is_running:
|
|
||||||
if self._css_stream and self._ctx.color_strip_stream_manager:
|
|
||||||
try:
|
|
||||||
self._ctx.color_strip_stream_manager.release(self._css_id, self._target_id)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
self._css_stream = None
|
|
||||||
if self._color_stream is not None and self._ctx.value_stream_manager:
|
|
||||||
try:
|
|
||||||
self._ctx.value_stream_manager.release(self._color_vs_id)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
self._color_stream = None
|
|
||||||
|
|
||||||
self._source_kind = new_kind
|
|
||||||
self._color_vs_id = new_color_vs_id
|
|
||||||
|
|
||||||
# Reset per-entity history so the new source isn't gated by stale values.
|
# Reset per-entity history so the new source isn't gated by stale values.
|
||||||
self._previous_colors.clear()
|
self._previous_colors.clear()
|
||||||
self._previous_on.clear()
|
self._previous_on.clear()
|
||||||
|
|
||||||
if not self._is_running:
|
|
||||||
return
|
|
||||||
|
|
||||||
if self._source_kind == "color_vs":
|
|
||||||
if self._color_vs_id and self._ctx.value_stream_manager:
|
|
||||||
try:
|
|
||||||
self._color_stream = self._ctx.value_stream_manager.acquire(self._color_vs_id)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(
|
|
||||||
f"HA light {self._target_id}: failed to acquire color VS stream: {e}"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
if self._css_id and self._ctx.color_strip_stream_manager:
|
|
||||||
try:
|
|
||||||
self._css_stream = self._ctx.color_strip_stream_manager.acquire(
|
|
||||||
self._css_id, self._target_id
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(
|
|
||||||
f"HA light {self._target_id}: failed to re-acquire CSS stream: {e}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── WebSocket clients ──
|
# ── WebSocket clients ──
|
||||||
|
|
||||||
def add_ws_client(self, ws: Any) -> None:
|
def add_ws_client(self, ws: Any) -> None:
|
||||||
|
|||||||
@@ -0,0 +1,114 @@
|
|||||||
|
"""Shared helpers for HA / Zigbee2MQTT light target processors.
|
||||||
|
|
||||||
|
``HALightTargetProcessor`` and ``Z2MLightTargetProcessor`` historically
|
||||||
|
duplicated their colour-source swap logic character-for-character — only
|
||||||
|
the log prefix and a docstring differed (audit finding C5). This module
|
||||||
|
hosts the deduplicated implementation.
|
||||||
|
|
||||||
|
We deliberately stop short of extracting a full ``BaseLightTargetProcessor``
|
||||||
|
ABC here: the read sites for the per-processor state are spread across ~38
|
||||||
|
locations per file and a wholesale composition refactor risks regressing
|
||||||
|
the live LED control loop. The free-function approach below is the
|
||||||
|
minimum-blast-radius way to delete the duplication. The processor still
|
||||||
|
owns its state; the helper reaches in to mutate it, which is ugly but
|
||||||
|
Pythonic and isolated to two methods.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from ledgrab.utils import get_logger
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
# ``LightTargetSwapState`` is a structural protocol — anything carrying
|
||||||
|
# the listed attributes is acceptable. We do not import the concrete
|
||||||
|
# processor classes to avoid a circular dependency.
|
||||||
|
from typing import Any, Protocol
|
||||||
|
|
||||||
|
class LightTargetSwapState(Protocol):
|
||||||
|
_is_running: bool
|
||||||
|
_css_stream: Any
|
||||||
|
_color_stream: Any
|
||||||
|
_ctx: Any
|
||||||
|
_css_id: str
|
||||||
|
_color_vs_id: str
|
||||||
|
_source_kind: str
|
||||||
|
_target_id: str
|
||||||
|
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def swap_color_source(
|
||||||
|
processor: "LightTargetSwapState",
|
||||||
|
new_kind: str,
|
||||||
|
new_color_vs_id: str,
|
||||||
|
*,
|
||||||
|
log_label: str,
|
||||||
|
) -> None:
|
||||||
|
"""Release the current colour source and acquire the new one.
|
||||||
|
|
||||||
|
Mirrors what ``HALightTargetProcessor._swap_color_source`` and
|
||||||
|
``Z2MLightTargetProcessor._swap_color_source`` used to do inline.
|
||||||
|
|
||||||
|
The caller is responsible for clearing per-entity history
|
||||||
|
(``_previous_colors``, ``_previous_on``) after this returns —
|
||||||
|
that state is owned by the processor, not the colour source.
|
||||||
|
|
||||||
|
``log_label`` is the short identifier used in warning logs
|
||||||
|
(e.g. ``"HA light"`` or ``"Z2M light"``) so a failure is
|
||||||
|
traceable back to the right processor in mixed deployments.
|
||||||
|
"""
|
||||||
|
# Tear down the previously-acquired stream first to keep ref-counts honest.
|
||||||
|
if processor._is_running:
|
||||||
|
if processor._css_stream and processor._ctx.color_strip_stream_manager:
|
||||||
|
try:
|
||||||
|
processor._ctx.color_strip_stream_manager.release(
|
||||||
|
processor._css_id, processor._target_id
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
# Manager-level errors are non-fatal: stream may already be
|
||||||
|
# gone if the source was deleted out from under us.
|
||||||
|
pass
|
||||||
|
processor._css_stream = None
|
||||||
|
if processor._color_stream is not None and processor._ctx.value_stream_manager:
|
||||||
|
try:
|
||||||
|
processor._ctx.value_stream_manager.release(processor._color_vs_id)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
processor._color_stream = None
|
||||||
|
|
||||||
|
processor._source_kind = new_kind
|
||||||
|
processor._color_vs_id = new_color_vs_id
|
||||||
|
|
||||||
|
if not processor._is_running:
|
||||||
|
# Not started yet; the start() path will acquire when called.
|
||||||
|
return
|
||||||
|
|
||||||
|
if processor._source_kind == "color_vs":
|
||||||
|
if processor._color_vs_id and processor._ctx.value_stream_manager:
|
||||||
|
try:
|
||||||
|
processor._color_stream = processor._ctx.value_stream_manager.acquire(
|
||||||
|
processor._color_vs_id
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
"%s %s: failed to acquire color VS stream: %s",
|
||||||
|
log_label,
|
||||||
|
processor._target_id,
|
||||||
|
e,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
if processor._css_id and processor._ctx.color_strip_stream_manager:
|
||||||
|
try:
|
||||||
|
processor._css_stream = processor._ctx.color_strip_stream_manager.acquire(
|
||||||
|
processor._css_id, processor._target_id
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
"%s %s: failed to re-acquire CSS stream: %s",
|
||||||
|
log_label,
|
||||||
|
processor._target_id,
|
||||||
|
e,
|
||||||
|
)
|
||||||
@@ -14,6 +14,7 @@ from typing import Any, Dict, List, Optional, Tuple
|
|||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
from ledgrab.core.processing.light_target_helpers import swap_color_source
|
||||||
from ledgrab.core.processing.target_processor import TargetContext, TargetProcessor
|
from ledgrab.core.processing.target_processor import TargetContext, TargetProcessor
|
||||||
from ledgrab.storage.z2m_light_output_target import (
|
from ledgrab.storage.z2m_light_output_target import (
|
||||||
DEFAULT_Z2M_BASE_TOPIC,
|
DEFAULT_Z2M_BASE_TOPIC,
|
||||||
@@ -270,45 +271,12 @@ class Z2MLightTargetProcessor(TargetProcessor):
|
|||||||
logger.warning(f"Z2M light {self._target_id}: CSS swap failed: {e}")
|
logger.warning(f"Z2M light {self._target_id}: CSS swap failed: {e}")
|
||||||
|
|
||||||
def _swap_color_source(self, new_kind: str, new_color_vs_id: str) -> None:
|
def _swap_color_source(self, new_kind: str, new_color_vs_id: str) -> None:
|
||||||
if self._is_running:
|
"""Release the previous colour stream and acquire the new one."""
|
||||||
if self._css_stream and self._ctx.color_strip_stream_manager:
|
swap_color_source(self, new_kind, new_color_vs_id, log_label="Z2M light")
|
||||||
try:
|
# Reset per-entity history so the new source isn't gated by stale values.
|
||||||
self._ctx.color_strip_stream_manager.release(self._css_id, self._target_id)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
self._css_stream = None
|
|
||||||
if self._color_stream is not None and self._ctx.value_stream_manager:
|
|
||||||
try:
|
|
||||||
self._ctx.value_stream_manager.release(self._color_vs_id)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
self._color_stream = None
|
|
||||||
|
|
||||||
self._source_kind = new_kind
|
|
||||||
self._color_vs_id = new_color_vs_id
|
|
||||||
self._previous_colors.clear()
|
self._previous_colors.clear()
|
||||||
self._previous_on.clear()
|
self._previous_on.clear()
|
||||||
|
|
||||||
if not self._is_running:
|
|
||||||
return
|
|
||||||
|
|
||||||
if self._source_kind == "color_vs":
|
|
||||||
if self._color_vs_id and self._ctx.value_stream_manager:
|
|
||||||
try:
|
|
||||||
self._color_stream = self._ctx.value_stream_manager.acquire(self._color_vs_id)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Z2M light {self._target_id}: failed to acquire color VS: {e}")
|
|
||||||
else:
|
|
||||||
if self._css_id and self._ctx.color_strip_stream_manager:
|
|
||||||
try:
|
|
||||||
self._css_stream = self._ctx.color_strip_stream_manager.acquire(
|
|
||||||
self._css_id, self._target_id
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(
|
|
||||||
f"Z2M light {self._target_id}: failed to re-acquire CSS stream: {e}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# ─────────── WebSocket clients ───────────
|
# ─────────── WebSocket clients ───────────
|
||||||
|
|
||||||
def add_ws_client(self, ws: Any) -> None:
|
def add_ws_client(self, ws: Any) -> None:
|
||||||
|
|||||||
@@ -0,0 +1,183 @@
|
|||||||
|
"""Tests for the shared HA/Z2M light-target helper.
|
||||||
|
|
||||||
|
Black-box: build a stub processor with the required attributes, capture
|
||||||
|
the manager call sequence, and verify ``swap_color_source`` performs the
|
||||||
|
release+acquire dance in the same order the two production processors
|
||||||
|
used to do it inline.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Any, List, Tuple
|
||||||
|
|
||||||
|
from ledgrab.core.processing.light_target_helpers import swap_color_source
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class _RecordingManager:
|
||||||
|
"""Captures acquire/release calls so tests can assert ordering."""
|
||||||
|
|
||||||
|
calls: List[Tuple[str, ...]] = field(default_factory=list)
|
||||||
|
raise_on_acquire: bool = False
|
||||||
|
fake_stream: Any = "fake-stream-token"
|
||||||
|
|
||||||
|
def acquire(self, *args):
|
||||||
|
self.calls.append(("acquire", *(str(a) for a in args)))
|
||||||
|
if self.raise_on_acquire:
|
||||||
|
raise RuntimeError("simulated manager failure")
|
||||||
|
return self.fake_stream
|
||||||
|
|
||||||
|
def release(self, *args):
|
||||||
|
self.calls.append(("release", *(str(a) for a in args)))
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class _StubCtx:
|
||||||
|
color_strip_stream_manager: Any = None
|
||||||
|
value_stream_manager: Any = None
|
||||||
|
|
||||||
|
|
||||||
|
class _StubProcessor:
|
||||||
|
"""Minimum surface required by ``swap_color_source``."""
|
||||||
|
|
||||||
|
def __init__(self, *, source_kind: str, css_id: str, color_vs_id: str, is_running: bool):
|
||||||
|
self._is_running = is_running
|
||||||
|
self._source_kind = source_kind
|
||||||
|
self._css_id = css_id
|
||||||
|
self._color_vs_id = color_vs_id
|
||||||
|
self._target_id = "tgt_x"
|
||||||
|
self._css_stream = None
|
||||||
|
self._color_stream = None
|
||||||
|
self._ctx = _StubCtx()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Basic swap behaviour
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_swap_from_css_to_color_vs_releases_css_and_acquires_vs():
|
||||||
|
css_mgr = _RecordingManager()
|
||||||
|
vs_mgr = _RecordingManager()
|
||||||
|
p = _StubProcessor(source_kind="css", css_id="css_a", color_vs_id="", is_running=True)
|
||||||
|
p._ctx.color_strip_stream_manager = css_mgr
|
||||||
|
p._ctx.value_stream_manager = vs_mgr
|
||||||
|
p._css_stream = "old-css-stream"
|
||||||
|
|
||||||
|
swap_color_source(p, new_kind="color_vs", new_color_vs_id="vs_42", log_label="HA light")
|
||||||
|
|
||||||
|
# CSS released first, then VS acquired with the new vs_id
|
||||||
|
assert css_mgr.calls == [("release", "css_a", "tgt_x")]
|
||||||
|
assert vs_mgr.calls == [("acquire", "vs_42")]
|
||||||
|
# State updated
|
||||||
|
assert p._source_kind == "color_vs"
|
||||||
|
assert p._color_vs_id == "vs_42"
|
||||||
|
assert p._css_stream is None
|
||||||
|
assert p._color_stream == "fake-stream-token"
|
||||||
|
|
||||||
|
|
||||||
|
def test_swap_from_color_vs_to_css_releases_vs_and_acquires_css():
|
||||||
|
css_mgr = _RecordingManager()
|
||||||
|
vs_mgr = _RecordingManager()
|
||||||
|
p = _StubProcessor(
|
||||||
|
source_kind="color_vs", css_id="css_b", color_vs_id="vs_old", is_running=True
|
||||||
|
)
|
||||||
|
p._ctx.color_strip_stream_manager = css_mgr
|
||||||
|
p._ctx.value_stream_manager = vs_mgr
|
||||||
|
p._color_stream = "old-vs-stream"
|
||||||
|
|
||||||
|
swap_color_source(p, new_kind="css", new_color_vs_id="", log_label="Z2M light")
|
||||||
|
|
||||||
|
assert vs_mgr.calls == [("release", "vs_old")]
|
||||||
|
assert css_mgr.calls == [("acquire", "css_b", "tgt_x")]
|
||||||
|
assert p._source_kind == "css"
|
||||||
|
assert p._color_stream is None
|
||||||
|
assert p._css_stream == "fake-stream-token"
|
||||||
|
|
||||||
|
|
||||||
|
def test_swap_when_not_running_only_updates_config():
|
||||||
|
"""If the processor is not running, no acquire/release calls fire."""
|
||||||
|
css_mgr = _RecordingManager()
|
||||||
|
vs_mgr = _RecordingManager()
|
||||||
|
p = _StubProcessor(source_kind="css", css_id="css_x", color_vs_id="", is_running=False)
|
||||||
|
p._ctx.color_strip_stream_manager = css_mgr
|
||||||
|
p._ctx.value_stream_manager = vs_mgr
|
||||||
|
|
||||||
|
swap_color_source(p, new_kind="color_vs", new_color_vs_id="vs_new", log_label="HA light")
|
||||||
|
|
||||||
|
assert css_mgr.calls == []
|
||||||
|
assert vs_mgr.calls == []
|
||||||
|
assert p._source_kind == "color_vs"
|
||||||
|
assert p._color_vs_id == "vs_new"
|
||||||
|
|
||||||
|
|
||||||
|
def test_swap_swallows_manager_release_errors():
|
||||||
|
"""A manager whose release() raises does not abort the swap."""
|
||||||
|
|
||||||
|
class _ExplodingMgr(_RecordingManager):
|
||||||
|
def release(self, *args):
|
||||||
|
self.calls.append(("release", *(str(a) for a in args)))
|
||||||
|
raise RuntimeError("source already deleted")
|
||||||
|
|
||||||
|
css_mgr = _ExplodingMgr()
|
||||||
|
vs_mgr = _RecordingManager()
|
||||||
|
p = _StubProcessor(source_kind="css", css_id="css_dead", color_vs_id="", is_running=True)
|
||||||
|
p._ctx.color_strip_stream_manager = css_mgr
|
||||||
|
p._ctx.value_stream_manager = vs_mgr
|
||||||
|
p._css_stream = "stale"
|
||||||
|
|
||||||
|
swap_color_source(p, new_kind="color_vs", new_color_vs_id="vs_99", log_label="HA light")
|
||||||
|
|
||||||
|
assert css_mgr.calls == [("release", "css_dead", "tgt_x")]
|
||||||
|
assert vs_mgr.calls == [("acquire", "vs_99")]
|
||||||
|
assert p._css_stream is None
|
||||||
|
assert p._color_stream == "fake-stream-token"
|
||||||
|
|
||||||
|
|
||||||
|
def test_swap_swallows_acquire_error_and_leaves_stream_none():
|
||||||
|
"""A failing acquire() does NOT propagate; the stream stays None."""
|
||||||
|
vs_mgr = _RecordingManager(raise_on_acquire=True)
|
||||||
|
p = _StubProcessor(source_kind="css", css_id="css_x", color_vs_id="", is_running=True)
|
||||||
|
p._ctx.value_stream_manager = vs_mgr
|
||||||
|
|
||||||
|
# Should not raise even though the manager does.
|
||||||
|
swap_color_source(p, new_kind="color_vs", new_color_vs_id="vs_y", log_label="HA light")
|
||||||
|
|
||||||
|
assert vs_mgr.calls == [("acquire", "vs_y")]
|
||||||
|
assert p._color_stream is None
|
||||||
|
assert p._source_kind == "color_vs"
|
||||||
|
assert p._color_vs_id == "vs_y"
|
||||||
|
|
||||||
|
|
||||||
|
def test_swap_skips_acquire_when_new_id_empty():
|
||||||
|
"""Swapping to color_vs with empty id is a no-op for the acquire step."""
|
||||||
|
vs_mgr = _RecordingManager()
|
||||||
|
p = _StubProcessor(source_kind="css", css_id="css_x", color_vs_id="", is_running=True)
|
||||||
|
p._ctx.value_stream_manager = vs_mgr
|
||||||
|
|
||||||
|
swap_color_source(p, new_kind="color_vs", new_color_vs_id="", log_label="HA light")
|
||||||
|
|
||||||
|
assert vs_mgr.calls == []
|
||||||
|
assert p._color_stream is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_swap_tolerates_missing_managers_on_ctx():
|
||||||
|
"""A context with both managers set to None must not crash the swap.
|
||||||
|
|
||||||
|
Locks the helper's behaviour for a freshly-constructed processor whose
|
||||||
|
``TargetContext`` is the default empty bag.
|
||||||
|
"""
|
||||||
|
p = _StubProcessor(source_kind="css", css_id="css_x", color_vs_id="vs_old", is_running=True)
|
||||||
|
# Both manager attributes are None by default on _StubCtx — this
|
||||||
|
# mirrors a fresh-install / demo-mode TargetContext.
|
||||||
|
assert p._ctx.color_strip_stream_manager is None
|
||||||
|
assert p._ctx.value_stream_manager is None
|
||||||
|
|
||||||
|
# Should not raise
|
||||||
|
swap_color_source(p, new_kind="color_vs", new_color_vs_id="vs_new", log_label="HA light")
|
||||||
|
|
||||||
|
assert p._source_kind == "color_vs"
|
||||||
|
assert p._color_vs_id == "vs_new"
|
||||||
|
assert p._color_stream is None
|
||||||
|
assert p._css_stream is None
|
||||||
Reference in New Issue
Block a user