Make count-dependent streams non-sharable, each target gets own instance
Static, gradient, color cycle, and effect streams depend on LED count and were being reconfigured per-consumer when shared. Now only picture streams (expensive capture) are shared; count-dependent sources get per-consumer instances keyed by css_id:target_id. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,12 +1,11 @@
|
|||||||
"""Shared color strip stream management with reference counting.
|
"""Color strip stream management with reference counting.
|
||||||
|
|
||||||
ColorStripStreamManager creates PictureColorStripStream instances from
|
PictureColorStripStreams (expensive screen capture) are shared across multiple
|
||||||
ColorStripSource configs and shares them across multiple consumers (LED targets).
|
consumers via reference counting — processing runs once, not once per target.
|
||||||
When multiple targets reference the same ColorStripSource, they share a single
|
|
||||||
stream instance — processing runs once, not once per target.
|
|
||||||
|
|
||||||
Reference counting ensures streams are cleaned up when the last consumer
|
Count-dependent streams (static, gradient, color cycle, effect) are NOT shared.
|
||||||
releases them.
|
Each consumer gets its own instance so it can configure an independent LED count
|
||||||
|
without interfering with other targets.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
@@ -24,6 +23,14 @@ from wled_controller.utils import get_logger
|
|||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
# source_type → stream class for non-picture (non-sharable) sources
|
||||||
|
_SIMPLE_STREAM_MAP = {
|
||||||
|
"static": StaticColorStripStream,
|
||||||
|
"gradient": GradientColorStripStream,
|
||||||
|
"color_cycle": ColorCycleColorStripStream,
|
||||||
|
"effect": EffectColorStripStream,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class _ColorStripEntry:
|
class _ColorStripEntry:
|
||||||
@@ -42,11 +49,11 @@ class _ColorStripEntry:
|
|||||||
|
|
||||||
|
|
||||||
class ColorStripStreamManager:
|
class ColorStripStreamManager:
|
||||||
"""Manages shared ColorStripStream instances with reference counting.
|
"""Manages ColorStripStream instances with reference counting.
|
||||||
|
|
||||||
Multiple LED targets using the same ColorStripSource share a single
|
Picture streams are shared — multiple consumers reuse the same instance.
|
||||||
ColorStripStream. Streams are created on first acquire and cleaned up
|
Count-dependent streams (static, gradient, cycle, effect) are per-consumer,
|
||||||
when the last consumer releases.
|
keyed by ``{css_id}:{consumer_id}``.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, color_strip_store, live_stream_manager):
|
def __init__(self, color_strip_store, live_stream_manager):
|
||||||
@@ -59,92 +66,66 @@ class ColorStripStreamManager:
|
|||||||
self._live_stream_manager = live_stream_manager
|
self._live_stream_manager = live_stream_manager
|
||||||
self._streams: Dict[str, _ColorStripEntry] = {}
|
self._streams: Dict[str, _ColorStripEntry] = {}
|
||||||
|
|
||||||
def acquire(self, css_id: str) -> ColorStripStream:
|
def _resolve_key(self, css_id: str, consumer_id: str) -> str:
|
||||||
|
"""Resolve internal registry key for a (css_id, consumer_id) pair.
|
||||||
|
|
||||||
|
Per-consumer entries (non-picture sources) take priority over shared.
|
||||||
|
"""
|
||||||
|
if consumer_id:
|
||||||
|
composite = f"{css_id}:{consumer_id}"
|
||||||
|
if composite in self._streams:
|
||||||
|
return composite
|
||||||
|
return css_id
|
||||||
|
|
||||||
|
def acquire(self, css_id: str, consumer_id: str = "") -> ColorStripStream:
|
||||||
"""Get or create a ColorStripStream for the given ColorStripSource.
|
"""Get or create a ColorStripStream for the given ColorStripSource.
|
||||||
|
|
||||||
If a stream already exists for this css_id, increments the reference
|
Sharable sources (picture) are shared — keyed by css_id, ref-counted.
|
||||||
count and returns the existing instance.
|
Non-sharable sources always get a fresh instance per consumer, keyed by
|
||||||
|
``css_id:consumer_id``.
|
||||||
Otherwise, loads the ColorStripSource config, acquires the underlying
|
|
||||||
LiveStream, creates a new ColorStripStream, starts it, and stores it
|
|
||||||
with ref_count=1.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
css_id: ID of the ColorStripSource config
|
css_id: ID of the ColorStripSource config
|
||||||
|
consumer_id: Unique consumer identifier (target_id) — used as
|
||||||
|
registry key for non-sharable streams.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
ColorStripStream instance (shared if already running)
|
ColorStripStream instance
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
ValueError: If ColorStripSource not found or type unsupported
|
ValueError: If ColorStripSource not found or type unsupported
|
||||||
RuntimeError: If stream creation/start fails
|
RuntimeError: If stream creation/start fails
|
||||||
"""
|
"""
|
||||||
|
source = self._color_strip_store.get_source(css_id)
|
||||||
|
|
||||||
|
# Non-sharable: always create a fresh per-consumer instance
|
||||||
|
if not source.sharable:
|
||||||
|
stream_cls = _SIMPLE_STREAM_MAP.get(source.source_type)
|
||||||
|
if not stream_cls:
|
||||||
|
raise ValueError(
|
||||||
|
f"Unsupported color strip source type '{source.source_type}' for {css_id}"
|
||||||
|
)
|
||||||
|
css_stream = stream_cls(source)
|
||||||
|
css_stream.start()
|
||||||
|
key = f"{css_id}:{consumer_id}" if consumer_id else css_id
|
||||||
|
self._streams[key] = _ColorStripEntry(
|
||||||
|
stream=css_stream, ref_count=1, picture_source_id="",
|
||||||
|
)
|
||||||
|
logger.info(f"Created {source.source_type} stream {key}")
|
||||||
|
return css_stream
|
||||||
|
|
||||||
|
# Sharable (picture): reuse existing stream if running
|
||||||
if css_id in self._streams:
|
if css_id in self._streams:
|
||||||
entry = self._streams[css_id]
|
entry = self._streams[css_id]
|
||||||
entry.ref_count += 1
|
entry.ref_count += 1
|
||||||
logger.info(
|
logger.info(f"Reusing stream {css_id} (ref_count={entry.ref_count})")
|
||||||
f"Reusing color strip stream for source {css_id} "
|
|
||||||
f"(ref_count={entry.ref_count})"
|
|
||||||
)
|
|
||||||
return entry.stream
|
return entry.stream
|
||||||
|
|
||||||
from wled_controller.storage.color_strip_source import (
|
# Create new picture stream — needs a LiveStream from the capture pipeline
|
||||||
ColorCycleColorStripSource,
|
from wled_controller.storage.color_strip_source import PictureColorStripSource
|
||||||
EffectColorStripSource,
|
|
||||||
GradientColorStripSource,
|
|
||||||
PictureColorStripSource,
|
|
||||||
StaticColorStripSource,
|
|
||||||
)
|
|
||||||
|
|
||||||
source = self._color_strip_store.get_source(css_id)
|
|
||||||
|
|
||||||
if isinstance(source, ColorCycleColorStripSource):
|
|
||||||
css_stream = ColorCycleColorStripStream(source)
|
|
||||||
css_stream.start()
|
|
||||||
self._streams[css_id] = _ColorStripEntry(
|
|
||||||
stream=css_stream,
|
|
||||||
ref_count=1,
|
|
||||||
picture_source_id="",
|
|
||||||
)
|
|
||||||
logger.info(f"Created color cycle stream for source {css_id}")
|
|
||||||
return css_stream
|
|
||||||
|
|
||||||
if isinstance(source, StaticColorStripSource):
|
|
||||||
css_stream = StaticColorStripStream(source)
|
|
||||||
css_stream.start()
|
|
||||||
self._streams[css_id] = _ColorStripEntry(
|
|
||||||
stream=css_stream,
|
|
||||||
ref_count=1,
|
|
||||||
picture_source_id="", # no live stream to manage
|
|
||||||
)
|
|
||||||
logger.info(f"Created static color strip stream for source {css_id}")
|
|
||||||
return css_stream
|
|
||||||
|
|
||||||
if isinstance(source, GradientColorStripSource):
|
|
||||||
css_stream = GradientColorStripStream(source)
|
|
||||||
css_stream.start()
|
|
||||||
self._streams[css_id] = _ColorStripEntry(
|
|
||||||
stream=css_stream,
|
|
||||||
ref_count=1,
|
|
||||||
picture_source_id="", # no live stream to manage
|
|
||||||
)
|
|
||||||
logger.info(f"Created gradient color strip stream for source {css_id}")
|
|
||||||
return css_stream
|
|
||||||
|
|
||||||
if isinstance(source, EffectColorStripSource):
|
|
||||||
css_stream = EffectColorStripStream(source)
|
|
||||||
css_stream.start()
|
|
||||||
self._streams[css_id] = _ColorStripEntry(
|
|
||||||
stream=css_stream,
|
|
||||||
ref_count=1,
|
|
||||||
picture_source_id="",
|
|
||||||
)
|
|
||||||
logger.info(f"Created effect stream for source {css_id} (effect={source.effect_type})")
|
|
||||||
return css_stream
|
|
||||||
|
|
||||||
if not isinstance(source, PictureColorStripSource):
|
if not isinstance(source, PictureColorStripSource):
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Unsupported color strip source type '{source.source_type}' for {css_id}"
|
f"Unsupported sharable source type '{source.source_type}' for {css_id}"
|
||||||
)
|
)
|
||||||
|
|
||||||
if not source.picture_source_id:
|
if not source.picture_source_id:
|
||||||
@@ -170,10 +151,10 @@ class ColorStripStreamManager:
|
|||||||
picture_source_id=source.picture_source_id,
|
picture_source_id=source.picture_source_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(f"Created picture color strip stream for source {css_id}")
|
logger.info(f"Created picture color strip stream {css_id}")
|
||||||
return css_stream
|
return css_stream
|
||||||
|
|
||||||
def release(self, css_id: str) -> None:
|
def release(self, css_id: str, consumer_id: str = "") -> None:
|
||||||
"""Release a reference to a ColorStripStream.
|
"""Release a reference to a ColorStripStream.
|
||||||
|
|
||||||
Decrements the reference count. When it reaches 0, stops the stream,
|
Decrements the reference count. When it reaches 0, stops the stream,
|
||||||
@@ -181,56 +162,68 @@ class ColorStripStreamManager:
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
css_id: ID of the ColorStripSource to release
|
css_id: ID of the ColorStripSource to release
|
||||||
|
consumer_id: Consumer identifier (same as passed to acquire)
|
||||||
"""
|
"""
|
||||||
entry = self._streams.get(css_id)
|
key = self._resolve_key(css_id, consumer_id)
|
||||||
|
entry = self._streams.get(key)
|
||||||
if not entry:
|
if not entry:
|
||||||
logger.warning(f"Attempted to release unknown color strip stream: {css_id}")
|
logger.warning(f"Attempted to release unknown color strip stream: {key}")
|
||||||
return
|
return
|
||||||
|
|
||||||
entry.ref_count -= 1
|
entry.ref_count -= 1
|
||||||
logger.debug(f"Released color strip stream {css_id} (ref_count={entry.ref_count})")
|
logger.debug(f"Released color strip stream {key} (ref_count={entry.ref_count})")
|
||||||
|
|
||||||
if entry.ref_count <= 0:
|
if entry.ref_count <= 0:
|
||||||
try:
|
try:
|
||||||
entry.stream.stop()
|
entry.stream.stop()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error stopping color strip stream {css_id}: {e}")
|
logger.error(f"Error stopping color strip stream {key}: {e}")
|
||||||
|
|
||||||
picture_source_id = entry.picture_source_id
|
picture_source_id = entry.picture_source_id
|
||||||
del self._streams[css_id]
|
del self._streams[key]
|
||||||
logger.info(f"Removed color strip stream for source {css_id}")
|
logger.info(f"Removed color strip stream {key}")
|
||||||
|
|
||||||
# Release the underlying live stream (not needed for static sources)
|
# Release the underlying live stream (not needed for static sources)
|
||||||
if picture_source_id:
|
if picture_source_id:
|
||||||
self._live_stream_manager.release(picture_source_id)
|
self._live_stream_manager.release(picture_source_id)
|
||||||
|
|
||||||
def update_source(self, css_id: str, new_source) -> None:
|
def update_source(self, css_id: str, new_source) -> None:
|
||||||
"""Hot-update processing params on a running stream.
|
"""Hot-update processing params on all running streams for a source.
|
||||||
|
|
||||||
If the picture_source_id changed, only updates in-place params;
|
Applies to both the shared entry (picture) and per-consumer entries
|
||||||
the underlying LiveStream is not swapped (requires target restart
|
(count-dependent). If the picture_source_id changed, only updates
|
||||||
to take full effect).
|
in-place params; the underlying LiveStream is not swapped (requires
|
||||||
|
target restart to take full effect).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
css_id: ID of the ColorStripSource
|
css_id: ID of the ColorStripSource
|
||||||
new_source: Updated ColorStripSource config
|
new_source: Updated ColorStripSource config
|
||||||
"""
|
"""
|
||||||
entry = self._streams.get(css_id)
|
# Find all entries: shared key OR per-consumer keys (css_id:xxx)
|
||||||
if not entry:
|
matching_keys = [
|
||||||
|
k for k in self._streams
|
||||||
|
if k == css_id or k.startswith(f"{css_id}:")
|
||||||
|
]
|
||||||
|
if not matching_keys:
|
||||||
return # Stream not running; config will be used on next acquire
|
return # Stream not running; config will be used on next acquire
|
||||||
|
|
||||||
|
for key in matching_keys:
|
||||||
|
entry = self._streams[key]
|
||||||
entry.stream.update_source(new_source)
|
entry.stream.update_source(new_source)
|
||||||
|
|
||||||
# Track picture_source_id change for future reference counting
|
# Track picture_source_id change for future reference counting
|
||||||
from wled_controller.storage.color_strip_source import PictureColorStripSource
|
from wled_controller.storage.color_strip_source import PictureColorStripSource
|
||||||
if isinstance(new_source, PictureColorStripSource):
|
if isinstance(new_source, PictureColorStripSource):
|
||||||
|
for key in matching_keys:
|
||||||
|
entry = self._streams[key]
|
||||||
if new_source.picture_source_id != entry.picture_source_id:
|
if new_source.picture_source_id != entry.picture_source_id:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"CSS {css_id}: picture_source_id changed — "
|
f"CSS {css_id}: picture_source_id changed — "
|
||||||
f"restart target to use new source"
|
f"restart target to use new source"
|
||||||
)
|
)
|
||||||
|
break
|
||||||
|
|
||||||
logger.info(f"Updated running color strip stream {css_id}")
|
logger.info(f"Updated {len(matching_keys)} running stream(s) for source {css_id}")
|
||||||
|
|
||||||
def notify_target_fps(self, css_id: str, target_id: str, fps: int) -> None:
|
def notify_target_fps(self, css_id: str, target_id: str, fps: int) -> None:
|
||||||
"""Register or update a consumer's target FPS.
|
"""Register or update a consumer's target FPS.
|
||||||
@@ -238,7 +231,8 @@ class ColorStripStreamManager:
|
|||||||
Recalculates the capture rate for PictureColorStripStreams as
|
Recalculates the capture rate for PictureColorStripStreams as
|
||||||
max(all consumer FPS values). Non-picture streams are unaffected.
|
max(all consumer FPS values). Non-picture streams are unaffected.
|
||||||
"""
|
"""
|
||||||
entry = self._streams.get(css_id)
|
key = self._resolve_key(css_id, target_id)
|
||||||
|
entry = self._streams.get(key)
|
||||||
if not entry:
|
if not entry:
|
||||||
return
|
return
|
||||||
entry.target_fps[target_id] = fps
|
entry.target_fps[target_id] = fps
|
||||||
@@ -246,7 +240,8 @@ class ColorStripStreamManager:
|
|||||||
|
|
||||||
def remove_target_fps(self, css_id: str, target_id: str) -> None:
|
def remove_target_fps(self, css_id: str, target_id: str) -> None:
|
||||||
"""Unregister a consumer's target FPS (e.g. on stop)."""
|
"""Unregister a consumer's target FPS (e.g. on stop)."""
|
||||||
entry = self._streams.get(css_id)
|
key = self._resolve_key(css_id, target_id)
|
||||||
|
entry = self._streams.get(key)
|
||||||
if not entry:
|
if not entry:
|
||||||
return
|
return
|
||||||
entry.target_fps.pop(target_id, None)
|
entry.target_fps.pop(target_id, None)
|
||||||
|
|||||||
@@ -118,7 +118,7 @@ class WledTargetProcessor(TargetProcessor):
|
|||||||
raise RuntimeError(f"Target {self._target_id} has no color strip source assigned")
|
raise RuntimeError(f"Target {self._target_id} has no color strip source assigned")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
stream = await asyncio.to_thread(css_manager.acquire, self._color_strip_source_id)
|
stream = await asyncio.to_thread(css_manager.acquire, self._color_strip_source_id, self._target_id)
|
||||||
self._color_strip_stream = stream
|
self._color_strip_stream = stream
|
||||||
self._resolved_display_index = stream.display_index
|
self._resolved_display_index = stream.display_index
|
||||||
|
|
||||||
@@ -193,7 +193,7 @@ class WledTargetProcessor(TargetProcessor):
|
|||||||
if css_manager and self._color_strip_source_id:
|
if css_manager and self._color_strip_source_id:
|
||||||
try:
|
try:
|
||||||
css_manager.remove_target_fps(self._color_strip_source_id, self._target_id)
|
css_manager.remove_target_fps(self._color_strip_source_id, self._target_id)
|
||||||
await asyncio.to_thread(css_manager.release, self._color_strip_source_id)
|
await asyncio.to_thread(css_manager.release, self._color_strip_source_id, self._target_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Error releasing color strip stream for {self._target_id}: {e}")
|
logger.warning(f"Error releasing color strip stream for {self._target_id}: {e}")
|
||||||
self._color_strip_stream = None
|
self._color_strip_stream = None
|
||||||
@@ -241,9 +241,9 @@ class WledTargetProcessor(TargetProcessor):
|
|||||||
|
|
||||||
old_id = self._color_strip_source_id
|
old_id = self._color_strip_source_id
|
||||||
try:
|
try:
|
||||||
new_stream = css_manager.acquire(color_strip_source_id)
|
new_stream = css_manager.acquire(color_strip_source_id, self._target_id)
|
||||||
css_manager.remove_target_fps(old_id, self._target_id)
|
css_manager.remove_target_fps(old_id, self._target_id)
|
||||||
css_manager.release(old_id)
|
css_manager.release(old_id, self._target_id)
|
||||||
self._color_strip_stream = new_stream
|
self._color_strip_stream = new_stream
|
||||||
self._resolved_display_index = new_stream.display_index
|
self._resolved_display_index = new_stream.display_index
|
||||||
self._color_strip_source_id = color_strip_source_id
|
self._color_strip_source_id = color_strip_source_id
|
||||||
|
|||||||
@@ -33,6 +33,15 @@ class ColorStripSource:
|
|||||||
updated_at: datetime
|
updated_at: datetime
|
||||||
description: Optional[str] = None
|
description: Optional[str] = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def sharable(self) -> bool:
|
||||||
|
"""Whether multiple consumers can share a single stream instance.
|
||||||
|
|
||||||
|
Count-dependent sources (static, gradient, cycle, effect) return False
|
||||||
|
because each consumer may configure a different LED count.
|
||||||
|
"""
|
||||||
|
return False
|
||||||
|
|
||||||
def to_dict(self) -> dict:
|
def to_dict(self) -> dict:
|
||||||
"""Convert source to dictionary. Subclasses extend this."""
|
"""Convert source to dictionary. Subclasses extend this."""
|
||||||
return {
|
return {
|
||||||
@@ -174,6 +183,11 @@ class PictureColorStripSource(ColorStripSource):
|
|||||||
calibration (LED positions), color correction, smoothing, FPS target.
|
calibration (LED positions), color correction, smoothing, FPS target.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def sharable(self) -> bool:
|
||||||
|
"""Picture streams are expensive (screen capture) and safe to share."""
|
||||||
|
return True
|
||||||
|
|
||||||
picture_source_id: str = ""
|
picture_source_id: str = ""
|
||||||
fps: int = 30
|
fps: int = 30
|
||||||
brightness: float = 1.0 # color correction multiplier (0.0–2.0; 1.0 = unchanged)
|
brightness: float = 1.0 # color correction multiplier (0.0–2.0; 1.0 = unchanged)
|
||||||
|
|||||||
Reference in New Issue
Block a user