From e5a6eafd09314a1254dd03aac2796ad97f267059 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Mon, 23 Feb 2026 02:31:08 +0300 Subject: [PATCH] Make count-dependent streams non-sharable, each target gets own instance Static, gradient, color cycle, and effect streams depend on LED count and were being reconfigured per-consumer when shared. Now only picture streams (expensive capture) are shared; count-dependent sources get per-consumer instances keyed by css_id:target_id. Co-Authored-By: Claude Opus 4.6 --- .../processing/color_strip_stream_manager.py | 197 +++++++++--------- .../core/processing/wled_target_processor.py | 8 +- .../storage/color_strip_source.py | 14 ++ 3 files changed, 114 insertions(+), 105 deletions(-) diff --git a/server/src/wled_controller/core/processing/color_strip_stream_manager.py b/server/src/wled_controller/core/processing/color_strip_stream_manager.py index 03aaf49..963a0c5 100644 --- a/server/src/wled_controller/core/processing/color_strip_stream_manager.py +++ b/server/src/wled_controller/core/processing/color_strip_stream_manager.py @@ -1,12 +1,11 @@ -"""Shared color strip stream management with reference counting. +"""Color strip stream management with reference counting. -ColorStripStreamManager creates PictureColorStripStream instances from -ColorStripSource configs and shares them across multiple consumers (LED targets). -When multiple targets reference the same ColorStripSource, they share a single -stream instance — processing runs once, not once per target. +PictureColorStripStreams (expensive screen capture) are shared across multiple +consumers via reference counting — processing runs once, not once per target. -Reference counting ensures streams are cleaned up when the last consumer -releases them. +Count-dependent streams (static, gradient, color cycle, effect) are NOT shared. +Each consumer gets its own instance so it can configure an independent LED count +without interfering with other targets. """ from dataclasses import dataclass @@ -24,6 +23,14 @@ from wled_controller.utils import get_logger logger = get_logger(__name__) +# source_type → stream class for non-picture (non-sharable) sources +_SIMPLE_STREAM_MAP = { + "static": StaticColorStripStream, + "gradient": GradientColorStripStream, + "color_cycle": ColorCycleColorStripStream, + "effect": EffectColorStripStream, +} + @dataclass class _ColorStripEntry: @@ -42,11 +49,11 @@ class _ColorStripEntry: class ColorStripStreamManager: - """Manages shared ColorStripStream instances with reference counting. + """Manages ColorStripStream instances with reference counting. - Multiple LED targets using the same ColorStripSource share a single - ColorStripStream. Streams are created on first acquire and cleaned up - when the last consumer releases. + Picture streams are shared — multiple consumers reuse the same instance. + Count-dependent streams (static, gradient, cycle, effect) are per-consumer, + keyed by ``{css_id}:{consumer_id}``. """ def __init__(self, color_strip_store, live_stream_manager): @@ -59,92 +66,66 @@ class ColorStripStreamManager: self._live_stream_manager = live_stream_manager self._streams: Dict[str, _ColorStripEntry] = {} - def acquire(self, css_id: str) -> ColorStripStream: + def _resolve_key(self, css_id: str, consumer_id: str) -> str: + """Resolve internal registry key for a (css_id, consumer_id) pair. + + Per-consumer entries (non-picture sources) take priority over shared. + """ + if consumer_id: + composite = f"{css_id}:{consumer_id}" + if composite in self._streams: + return composite + return css_id + + def acquire(self, css_id: str, consumer_id: str = "") -> ColorStripStream: """Get or create a ColorStripStream for the given ColorStripSource. - If a stream already exists for this css_id, increments the reference - count and returns the existing instance. - - Otherwise, loads the ColorStripSource config, acquires the underlying - LiveStream, creates a new ColorStripStream, starts it, and stores it - with ref_count=1. + Sharable sources (picture) are shared — keyed by css_id, ref-counted. + Non-sharable sources always get a fresh instance per consumer, keyed by + ``css_id:consumer_id``. Args: css_id: ID of the ColorStripSource config + consumer_id: Unique consumer identifier (target_id) — used as + registry key for non-sharable streams. Returns: - ColorStripStream instance (shared if already running) + ColorStripStream instance Raises: ValueError: If ColorStripSource not found or type unsupported RuntimeError: If stream creation/start fails """ + source = self._color_strip_store.get_source(css_id) + + # Non-sharable: always create a fresh per-consumer instance + if not source.sharable: + stream_cls = _SIMPLE_STREAM_MAP.get(source.source_type) + if not stream_cls: + raise ValueError( + f"Unsupported color strip source type '{source.source_type}' for {css_id}" + ) + css_stream = stream_cls(source) + css_stream.start() + key = f"{css_id}:{consumer_id}" if consumer_id else css_id + self._streams[key] = _ColorStripEntry( + stream=css_stream, ref_count=1, picture_source_id="", + ) + logger.info(f"Created {source.source_type} stream {key}") + return css_stream + + # Sharable (picture): reuse existing stream if running if css_id in self._streams: entry = self._streams[css_id] entry.ref_count += 1 - logger.info( - f"Reusing color strip stream for source {css_id} " - f"(ref_count={entry.ref_count})" - ) + logger.info(f"Reusing stream {css_id} (ref_count={entry.ref_count})") return entry.stream - from wled_controller.storage.color_strip_source import ( - ColorCycleColorStripSource, - EffectColorStripSource, - GradientColorStripSource, - PictureColorStripSource, - StaticColorStripSource, - ) - - source = self._color_strip_store.get_source(css_id) - - if isinstance(source, ColorCycleColorStripSource): - css_stream = ColorCycleColorStripStream(source) - css_stream.start() - self._streams[css_id] = _ColorStripEntry( - stream=css_stream, - ref_count=1, - picture_source_id="", - ) - logger.info(f"Created color cycle stream for source {css_id}") - return css_stream - - if isinstance(source, StaticColorStripSource): - css_stream = StaticColorStripStream(source) - css_stream.start() - self._streams[css_id] = _ColorStripEntry( - stream=css_stream, - ref_count=1, - picture_source_id="", # no live stream to manage - ) - logger.info(f"Created static color strip stream for source {css_id}") - return css_stream - - if isinstance(source, GradientColorStripSource): - css_stream = GradientColorStripStream(source) - css_stream.start() - self._streams[css_id] = _ColorStripEntry( - stream=css_stream, - ref_count=1, - picture_source_id="", # no live stream to manage - ) - logger.info(f"Created gradient color strip stream for source {css_id}") - return css_stream - - if isinstance(source, EffectColorStripSource): - css_stream = EffectColorStripStream(source) - css_stream.start() - self._streams[css_id] = _ColorStripEntry( - stream=css_stream, - ref_count=1, - picture_source_id="", - ) - logger.info(f"Created effect stream for source {css_id} (effect={source.effect_type})") - return css_stream - + # Create new picture stream — needs a LiveStream from the capture pipeline + from wled_controller.storage.color_strip_source import PictureColorStripSource if not isinstance(source, PictureColorStripSource): raise ValueError( - f"Unsupported color strip source type '{source.source_type}' for {css_id}" + f"Unsupported sharable source type '{source.source_type}' for {css_id}" ) if not source.picture_source_id: @@ -170,10 +151,10 @@ class ColorStripStreamManager: picture_source_id=source.picture_source_id, ) - logger.info(f"Created picture color strip stream for source {css_id}") + logger.info(f"Created picture color strip stream {css_id}") return css_stream - def release(self, css_id: str) -> None: + def release(self, css_id: str, consumer_id: str = "") -> None: """Release a reference to a ColorStripStream. Decrements the reference count. When it reaches 0, stops the stream, @@ -181,56 +162,68 @@ class ColorStripStreamManager: Args: css_id: ID of the ColorStripSource to release + consumer_id: Consumer identifier (same as passed to acquire) """ - entry = self._streams.get(css_id) + key = self._resolve_key(css_id, consumer_id) + entry = self._streams.get(key) if not entry: - logger.warning(f"Attempted to release unknown color strip stream: {css_id}") + logger.warning(f"Attempted to release unknown color strip stream: {key}") return entry.ref_count -= 1 - logger.debug(f"Released color strip stream {css_id} (ref_count={entry.ref_count})") + logger.debug(f"Released color strip stream {key} (ref_count={entry.ref_count})") if entry.ref_count <= 0: try: entry.stream.stop() except Exception as e: - logger.error(f"Error stopping color strip stream {css_id}: {e}") + logger.error(f"Error stopping color strip stream {key}: {e}") picture_source_id = entry.picture_source_id - del self._streams[css_id] - logger.info(f"Removed color strip stream for source {css_id}") + del self._streams[key] + logger.info(f"Removed color strip stream {key}") # Release the underlying live stream (not needed for static sources) if picture_source_id: self._live_stream_manager.release(picture_source_id) def update_source(self, css_id: str, new_source) -> None: - """Hot-update processing params on a running stream. + """Hot-update processing params on all running streams for a source. - If the picture_source_id changed, only updates in-place params; - the underlying LiveStream is not swapped (requires target restart - to take full effect). + Applies to both the shared entry (picture) and per-consumer entries + (count-dependent). If the picture_source_id changed, only updates + in-place params; the underlying LiveStream is not swapped (requires + target restart to take full effect). Args: css_id: ID of the ColorStripSource new_source: Updated ColorStripSource config """ - entry = self._streams.get(css_id) - if not entry: + # Find all entries: shared key OR per-consumer keys (css_id:xxx) + matching_keys = [ + k for k in self._streams + if k == css_id or k.startswith(f"{css_id}:") + ] + if not matching_keys: return # Stream not running; config will be used on next acquire - entry.stream.update_source(new_source) + for key in matching_keys: + entry = self._streams[key] + entry.stream.update_source(new_source) # Track picture_source_id change for future reference counting from wled_controller.storage.color_strip_source import PictureColorStripSource if isinstance(new_source, PictureColorStripSource): - if new_source.picture_source_id != entry.picture_source_id: - logger.info( - f"CSS {css_id}: picture_source_id changed — " - f"restart target to use new source" - ) + for key in matching_keys: + entry = self._streams[key] + if new_source.picture_source_id != entry.picture_source_id: + logger.info( + f"CSS {css_id}: picture_source_id changed — " + f"restart target to use new source" + ) + break - logger.info(f"Updated running color strip stream {css_id}") + logger.info(f"Updated {len(matching_keys)} running stream(s) for source {css_id}") def notify_target_fps(self, css_id: str, target_id: str, fps: int) -> None: """Register or update a consumer's target FPS. @@ -238,7 +231,8 @@ class ColorStripStreamManager: Recalculates the capture rate for PictureColorStripStreams as max(all consumer FPS values). Non-picture streams are unaffected. """ - entry = self._streams.get(css_id) + key = self._resolve_key(css_id, target_id) + entry = self._streams.get(key) if not entry: return entry.target_fps[target_id] = fps @@ -246,7 +240,8 @@ class ColorStripStreamManager: def remove_target_fps(self, css_id: str, target_id: str) -> None: """Unregister a consumer's target FPS (e.g. on stop).""" - entry = self._streams.get(css_id) + key = self._resolve_key(css_id, target_id) + entry = self._streams.get(key) if not entry: return entry.target_fps.pop(target_id, None) diff --git a/server/src/wled_controller/core/processing/wled_target_processor.py b/server/src/wled_controller/core/processing/wled_target_processor.py index 1687190..b89b732 100644 --- a/server/src/wled_controller/core/processing/wled_target_processor.py +++ b/server/src/wled_controller/core/processing/wled_target_processor.py @@ -118,7 +118,7 @@ class WledTargetProcessor(TargetProcessor): raise RuntimeError(f"Target {self._target_id} has no color strip source assigned") try: - stream = await asyncio.to_thread(css_manager.acquire, self._color_strip_source_id) + stream = await asyncio.to_thread(css_manager.acquire, self._color_strip_source_id, self._target_id) self._color_strip_stream = stream self._resolved_display_index = stream.display_index @@ -193,7 +193,7 @@ class WledTargetProcessor(TargetProcessor): if css_manager and self._color_strip_source_id: try: css_manager.remove_target_fps(self._color_strip_source_id, self._target_id) - await asyncio.to_thread(css_manager.release, self._color_strip_source_id) + await asyncio.to_thread(css_manager.release, self._color_strip_source_id, self._target_id) except Exception as e: logger.warning(f"Error releasing color strip stream for {self._target_id}: {e}") self._color_strip_stream = None @@ -241,9 +241,9 @@ class WledTargetProcessor(TargetProcessor): old_id = self._color_strip_source_id try: - new_stream = css_manager.acquire(color_strip_source_id) + new_stream = css_manager.acquire(color_strip_source_id, self._target_id) css_manager.remove_target_fps(old_id, self._target_id) - css_manager.release(old_id) + css_manager.release(old_id, self._target_id) self._color_strip_stream = new_stream self._resolved_display_index = new_stream.display_index self._color_strip_source_id = color_strip_source_id diff --git a/server/src/wled_controller/storage/color_strip_source.py b/server/src/wled_controller/storage/color_strip_source.py index b1ec372..6a9285a 100644 --- a/server/src/wled_controller/storage/color_strip_source.py +++ b/server/src/wled_controller/storage/color_strip_source.py @@ -33,6 +33,15 @@ class ColorStripSource: updated_at: datetime description: Optional[str] = None + @property + def sharable(self) -> bool: + """Whether multiple consumers can share a single stream instance. + + Count-dependent sources (static, gradient, cycle, effect) return False + because each consumer may configure a different LED count. + """ + return False + def to_dict(self) -> dict: """Convert source to dictionary. Subclasses extend this.""" return { @@ -174,6 +183,11 @@ class PictureColorStripSource(ColorStripSource): calibration (LED positions), color correction, smoothing, FPS target. """ + @property + def sharable(self) -> bool: + """Picture streams are expensive (screen capture) and safe to share.""" + return True + picture_source_id: str = "" fps: int = 30 brightness: float = 1.0 # color correction multiplier (0.0–2.0; 1.0 = unchanged)