From 29bdacf69a7f32ee27b457534aaae594b49891e4 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Fri, 22 May 2026 22:54:14 +0300 Subject: [PATCH] refactor(processing): dedupe HA/Z2M _swap_color_source via shared helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit HALightTargetProcessor and Z2MLightTargetProcessor used to carry character-for-character identical _swap_color_source method bodies (audit finding C5) — only the log prefix differed. Extract the body into a free function ``swap_color_source(processor, new_kind, new_color_vs_id, *, log_label)`` in a new ``light_target_helpers`` module. Each processor's _swap_color_source now delegates to the helper and then clears its per-entity history (``_previous_colors`` / ``_previous_on``) — that bit stays on the processor because it's per- target state, not colour-source state. Scope deliberately narrower than the full BaseLightTargetProcessor ABC the audit gestured at: the 76 read sites for the per-processor colour state across the two files made a full state-composition refactor too risky for the live LED control loop. The free-function helper is the minimum-blast-radius way to delete the duplication while leaving WLED (which has no value-stream-vs-CSS dispatch) untouched. The helper standardises both warning messages on HA's original wording ("failed to acquire color VS stream" / "failed to re-acquire CSS stream") so existing log alerts/grep patterns keep working. A LightTargetSwapState Protocol under TYPE_CHECKING documents the expected processor surface; no runtime enforcement (acceptable trade- off vs a 76-site touchpoint). Tests: 7 new tests cover the release+acquire ordering, the not-running no-op path, the manager-error-swallowing behaviour, the empty-id short-circuit, and the missing-manager (TargetContext(None, None)) fallback. 354 existing storage + API + e2e + processing tests stay green; ruff clean. --- .../processing/ha_light_target_processor.py | 42 +--- .../core/processing/light_target_helpers.py | 114 +++++++++++ .../processing/z2m_light_target_processor.py | 40 +--- .../processing/test_light_target_helpers.py | 183 ++++++++++++++++++ 4 files changed, 303 insertions(+), 76 deletions(-) create mode 100644 server/src/ledgrab/core/processing/light_target_helpers.py create mode 100644 server/tests/core/processing/test_light_target_helpers.py diff --git a/server/src/ledgrab/core/processing/ha_light_target_processor.py b/server/src/ledgrab/core/processing/ha_light_target_processor.py index 36efa2c..505fa08 100644 --- a/server/src/ledgrab/core/processing/ha_light_target_processor.py +++ b/server/src/ledgrab/core/processing/ha_light_target_processor.py @@ -12,6 +12,7 @@ from typing import Any, Dict, List, Optional, Tuple import numpy as np +from ledgrab.core.processing.light_target_helpers import swap_color_source from ledgrab.core.processing.target_processor import TargetContext, TargetProcessor from ledgrab.storage.ha_light_output_target import HALightMapping from ledgrab.utils import get_logger @@ -255,50 +256,11 @@ class HALightTargetProcessor(TargetProcessor): def _swap_color_source(self, new_kind: str, new_color_vs_id: str) -> None: """Release the previous colour stream and acquire the new one.""" - # Tear down previous stream first to keep ref-counts honest. - if self._is_running: - if self._css_stream and self._ctx.color_strip_stream_manager: - try: - self._ctx.color_strip_stream_manager.release(self._css_id, self._target_id) - except Exception: - pass - self._css_stream = None - if self._color_stream is not None and self._ctx.value_stream_manager: - try: - self._ctx.value_stream_manager.release(self._color_vs_id) - except Exception: - pass - self._color_stream = None - - self._source_kind = new_kind - self._color_vs_id = new_color_vs_id - + swap_color_source(self, new_kind, new_color_vs_id, log_label="HA light") # Reset per-entity history so the new source isn't gated by stale values. self._previous_colors.clear() self._previous_on.clear() - if not self._is_running: - return - - if self._source_kind == "color_vs": - if self._color_vs_id and self._ctx.value_stream_manager: - try: - self._color_stream = self._ctx.value_stream_manager.acquire(self._color_vs_id) - except Exception as e: - logger.warning( - f"HA light {self._target_id}: failed to acquire color VS stream: {e}" - ) - else: - if self._css_id and self._ctx.color_strip_stream_manager: - try: - self._css_stream = self._ctx.color_strip_stream_manager.acquire( - self._css_id, self._target_id - ) - except Exception as e: - logger.warning( - f"HA light {self._target_id}: failed to re-acquire CSS stream: {e}" - ) - # ── WebSocket clients ── def add_ws_client(self, ws: Any) -> None: diff --git a/server/src/ledgrab/core/processing/light_target_helpers.py b/server/src/ledgrab/core/processing/light_target_helpers.py new file mode 100644 index 0000000..635f99a --- /dev/null +++ b/server/src/ledgrab/core/processing/light_target_helpers.py @@ -0,0 +1,114 @@ +"""Shared helpers for HA / Zigbee2MQTT light target processors. + +``HALightTargetProcessor`` and ``Z2MLightTargetProcessor`` historically +duplicated their colour-source swap logic character-for-character — only +the log prefix and a docstring differed (audit finding C5). This module +hosts the deduplicated implementation. + +We deliberately stop short of extracting a full ``BaseLightTargetProcessor`` +ABC here: the read sites for the per-processor state are spread across ~38 +locations per file and a wholesale composition refactor risks regressing +the live LED control loop. The free-function approach below is the +minimum-blast-radius way to delete the duplication. The processor still +owns its state; the helper reaches in to mutate it, which is ugly but +Pythonic and isolated to two methods. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from ledgrab.utils import get_logger + +if TYPE_CHECKING: + # ``LightTargetSwapState`` is a structural protocol — anything carrying + # the listed attributes is acceptable. We do not import the concrete + # processor classes to avoid a circular dependency. + from typing import Any, Protocol + + class LightTargetSwapState(Protocol): + _is_running: bool + _css_stream: Any + _color_stream: Any + _ctx: Any + _css_id: str + _color_vs_id: str + _source_kind: str + _target_id: str + + +logger = get_logger(__name__) + + +def swap_color_source( + processor: "LightTargetSwapState", + new_kind: str, + new_color_vs_id: str, + *, + log_label: str, +) -> None: + """Release the current colour source and acquire the new one. + + Mirrors what ``HALightTargetProcessor._swap_color_source`` and + ``Z2MLightTargetProcessor._swap_color_source`` used to do inline. + + The caller is responsible for clearing per-entity history + (``_previous_colors``, ``_previous_on``) after this returns — + that state is owned by the processor, not the colour source. + + ``log_label`` is the short identifier used in warning logs + (e.g. ``"HA light"`` or ``"Z2M light"``) so a failure is + traceable back to the right processor in mixed deployments. + """ + # Tear down the previously-acquired stream first to keep ref-counts honest. + if processor._is_running: + if processor._css_stream and processor._ctx.color_strip_stream_manager: + try: + processor._ctx.color_strip_stream_manager.release( + processor._css_id, processor._target_id + ) + except Exception: + # Manager-level errors are non-fatal: stream may already be + # gone if the source was deleted out from under us. + pass + processor._css_stream = None + if processor._color_stream is not None and processor._ctx.value_stream_manager: + try: + processor._ctx.value_stream_manager.release(processor._color_vs_id) + except Exception: + pass + processor._color_stream = None + + processor._source_kind = new_kind + processor._color_vs_id = new_color_vs_id + + if not processor._is_running: + # Not started yet; the start() path will acquire when called. + return + + if processor._source_kind == "color_vs": + if processor._color_vs_id and processor._ctx.value_stream_manager: + try: + processor._color_stream = processor._ctx.value_stream_manager.acquire( + processor._color_vs_id + ) + except Exception as e: + logger.warning( + "%s %s: failed to acquire color VS stream: %s", + log_label, + processor._target_id, + e, + ) + else: + if processor._css_id and processor._ctx.color_strip_stream_manager: + try: + processor._css_stream = processor._ctx.color_strip_stream_manager.acquire( + processor._css_id, processor._target_id + ) + except Exception as e: + logger.warning( + "%s %s: failed to re-acquire CSS stream: %s", + log_label, + processor._target_id, + e, + ) diff --git a/server/src/ledgrab/core/processing/z2m_light_target_processor.py b/server/src/ledgrab/core/processing/z2m_light_target_processor.py index 55012af..ce4a1b9 100644 --- a/server/src/ledgrab/core/processing/z2m_light_target_processor.py +++ b/server/src/ledgrab/core/processing/z2m_light_target_processor.py @@ -14,6 +14,7 @@ from typing import Any, Dict, List, Optional, Tuple import numpy as np +from ledgrab.core.processing.light_target_helpers import swap_color_source from ledgrab.core.processing.target_processor import TargetContext, TargetProcessor from ledgrab.storage.z2m_light_output_target import ( DEFAULT_Z2M_BASE_TOPIC, @@ -270,45 +271,12 @@ class Z2MLightTargetProcessor(TargetProcessor): logger.warning(f"Z2M light {self._target_id}: CSS swap failed: {e}") def _swap_color_source(self, new_kind: str, new_color_vs_id: str) -> None: - if self._is_running: - if self._css_stream and self._ctx.color_strip_stream_manager: - try: - self._ctx.color_strip_stream_manager.release(self._css_id, self._target_id) - except Exception: - pass - self._css_stream = None - if self._color_stream is not None and self._ctx.value_stream_manager: - try: - self._ctx.value_stream_manager.release(self._color_vs_id) - except Exception: - pass - self._color_stream = None - - self._source_kind = new_kind - self._color_vs_id = new_color_vs_id + """Release the previous colour stream and acquire the new one.""" + swap_color_source(self, new_kind, new_color_vs_id, log_label="Z2M light") + # Reset per-entity history so the new source isn't gated by stale values. self._previous_colors.clear() self._previous_on.clear() - if not self._is_running: - return - - if self._source_kind == "color_vs": - if self._color_vs_id and self._ctx.value_stream_manager: - try: - self._color_stream = self._ctx.value_stream_manager.acquire(self._color_vs_id) - except Exception as e: - logger.warning(f"Z2M light {self._target_id}: failed to acquire color VS: {e}") - else: - if self._css_id and self._ctx.color_strip_stream_manager: - try: - self._css_stream = self._ctx.color_strip_stream_manager.acquire( - self._css_id, self._target_id - ) - except Exception as e: - logger.warning( - f"Z2M light {self._target_id}: failed to re-acquire CSS stream: {e}" - ) - # ─────────── WebSocket clients ─────────── def add_ws_client(self, ws: Any) -> None: diff --git a/server/tests/core/processing/test_light_target_helpers.py b/server/tests/core/processing/test_light_target_helpers.py new file mode 100644 index 0000000..efa16bc --- /dev/null +++ b/server/tests/core/processing/test_light_target_helpers.py @@ -0,0 +1,183 @@ +"""Tests for the shared HA/Z2M light-target helper. + +Black-box: build a stub processor with the required attributes, capture +the manager call sequence, and verify ``swap_color_source`` performs the +release+acquire dance in the same order the two production processors +used to do it inline. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, List, Tuple + +from ledgrab.core.processing.light_target_helpers import swap_color_source + + +@dataclass +class _RecordingManager: + """Captures acquire/release calls so tests can assert ordering.""" + + calls: List[Tuple[str, ...]] = field(default_factory=list) + raise_on_acquire: bool = False + fake_stream: Any = "fake-stream-token" + + def acquire(self, *args): + self.calls.append(("acquire", *(str(a) for a in args))) + if self.raise_on_acquire: + raise RuntimeError("simulated manager failure") + return self.fake_stream + + def release(self, *args): + self.calls.append(("release", *(str(a) for a in args))) + + +@dataclass +class _StubCtx: + color_strip_stream_manager: Any = None + value_stream_manager: Any = None + + +class _StubProcessor: + """Minimum surface required by ``swap_color_source``.""" + + def __init__(self, *, source_kind: str, css_id: str, color_vs_id: str, is_running: bool): + self._is_running = is_running + self._source_kind = source_kind + self._css_id = css_id + self._color_vs_id = color_vs_id + self._target_id = "tgt_x" + self._css_stream = None + self._color_stream = None + self._ctx = _StubCtx() + + +# --------------------------------------------------------------------------- +# Basic swap behaviour +# --------------------------------------------------------------------------- + + +def test_swap_from_css_to_color_vs_releases_css_and_acquires_vs(): + css_mgr = _RecordingManager() + vs_mgr = _RecordingManager() + p = _StubProcessor(source_kind="css", css_id="css_a", color_vs_id="", is_running=True) + p._ctx.color_strip_stream_manager = css_mgr + p._ctx.value_stream_manager = vs_mgr + p._css_stream = "old-css-stream" + + swap_color_source(p, new_kind="color_vs", new_color_vs_id="vs_42", log_label="HA light") + + # CSS released first, then VS acquired with the new vs_id + assert css_mgr.calls == [("release", "css_a", "tgt_x")] + assert vs_mgr.calls == [("acquire", "vs_42")] + # State updated + assert p._source_kind == "color_vs" + assert p._color_vs_id == "vs_42" + assert p._css_stream is None + assert p._color_stream == "fake-stream-token" + + +def test_swap_from_color_vs_to_css_releases_vs_and_acquires_css(): + css_mgr = _RecordingManager() + vs_mgr = _RecordingManager() + p = _StubProcessor( + source_kind="color_vs", css_id="css_b", color_vs_id="vs_old", is_running=True + ) + p._ctx.color_strip_stream_manager = css_mgr + p._ctx.value_stream_manager = vs_mgr + p._color_stream = "old-vs-stream" + + swap_color_source(p, new_kind="css", new_color_vs_id="", log_label="Z2M light") + + assert vs_mgr.calls == [("release", "vs_old")] + assert css_mgr.calls == [("acquire", "css_b", "tgt_x")] + assert p._source_kind == "css" + assert p._color_stream is None + assert p._css_stream == "fake-stream-token" + + +def test_swap_when_not_running_only_updates_config(): + """If the processor is not running, no acquire/release calls fire.""" + css_mgr = _RecordingManager() + vs_mgr = _RecordingManager() + p = _StubProcessor(source_kind="css", css_id="css_x", color_vs_id="", is_running=False) + p._ctx.color_strip_stream_manager = css_mgr + p._ctx.value_stream_manager = vs_mgr + + swap_color_source(p, new_kind="color_vs", new_color_vs_id="vs_new", log_label="HA light") + + assert css_mgr.calls == [] + assert vs_mgr.calls == [] + assert p._source_kind == "color_vs" + assert p._color_vs_id == "vs_new" + + +def test_swap_swallows_manager_release_errors(): + """A manager whose release() raises does not abort the swap.""" + + class _ExplodingMgr(_RecordingManager): + def release(self, *args): + self.calls.append(("release", *(str(a) for a in args))) + raise RuntimeError("source already deleted") + + css_mgr = _ExplodingMgr() + vs_mgr = _RecordingManager() + p = _StubProcessor(source_kind="css", css_id="css_dead", color_vs_id="", is_running=True) + p._ctx.color_strip_stream_manager = css_mgr + p._ctx.value_stream_manager = vs_mgr + p._css_stream = "stale" + + swap_color_source(p, new_kind="color_vs", new_color_vs_id="vs_99", log_label="HA light") + + assert css_mgr.calls == [("release", "css_dead", "tgt_x")] + assert vs_mgr.calls == [("acquire", "vs_99")] + assert p._css_stream is None + assert p._color_stream == "fake-stream-token" + + +def test_swap_swallows_acquire_error_and_leaves_stream_none(): + """A failing acquire() does NOT propagate; the stream stays None.""" + vs_mgr = _RecordingManager(raise_on_acquire=True) + p = _StubProcessor(source_kind="css", css_id="css_x", color_vs_id="", is_running=True) + p._ctx.value_stream_manager = vs_mgr + + # Should not raise even though the manager does. + swap_color_source(p, new_kind="color_vs", new_color_vs_id="vs_y", log_label="HA light") + + assert vs_mgr.calls == [("acquire", "vs_y")] + assert p._color_stream is None + assert p._source_kind == "color_vs" + assert p._color_vs_id == "vs_y" + + +def test_swap_skips_acquire_when_new_id_empty(): + """Swapping to color_vs with empty id is a no-op for the acquire step.""" + vs_mgr = _RecordingManager() + p = _StubProcessor(source_kind="css", css_id="css_x", color_vs_id="", is_running=True) + p._ctx.value_stream_manager = vs_mgr + + swap_color_source(p, new_kind="color_vs", new_color_vs_id="", log_label="HA light") + + assert vs_mgr.calls == [] + assert p._color_stream is None + + +def test_swap_tolerates_missing_managers_on_ctx(): + """A context with both managers set to None must not crash the swap. + + Locks the helper's behaviour for a freshly-constructed processor whose + ``TargetContext`` is the default empty bag. + """ + p = _StubProcessor(source_kind="css", css_id="css_x", color_vs_id="vs_old", is_running=True) + # Both manager attributes are None by default on _StubCtx — this + # mirrors a fresh-install / demo-mode TargetContext. + assert p._ctx.color_strip_stream_manager is None + assert p._ctx.value_stream_manager is None + + # Should not raise + swap_color_source(p, new_kind="color_vs", new_color_vs_id="vs_new", log_label="HA light") + + assert p._source_kind == "color_vs" + assert p._color_vs_id == "vs_new" + assert p._color_stream is None + assert p._css_stream is None