Add value source test modal, auto-gain, brightness always-show, shared value streams
- Add real-time value source test: WebSocket endpoint streams get_value() at ~20Hz, frontend renders scrolling time-series chart with min/max/current stats - Add auto-gain for audio value sources: rolling peak normalization with slow decay, sensitivity range increased to 0.1-20.0 - Always show brightness overlay on LED preview when brightness source is set - Refactor ValueStreamManager to shared ref-counted streams (value streams produce scalars, not LED-count-dependent, so sharing is correct) - Simplify acquire/release API: remove consumer_id parameter since streams are no longer consumer-dependent Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -162,7 +162,7 @@ class KCTargetProcessor(TargetProcessor):
|
||||
if self._brightness_vs_id and self._ctx.value_stream_manager:
|
||||
try:
|
||||
self._value_stream = self._ctx.value_stream_manager.acquire(
|
||||
self._brightness_vs_id, self._target_id
|
||||
self._brightness_vs_id
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to acquire value stream {self._brightness_vs_id}: {e}")
|
||||
@@ -207,7 +207,7 @@ class KCTargetProcessor(TargetProcessor):
|
||||
# Release value stream
|
||||
if self._value_stream is not None and self._ctx.value_stream_manager:
|
||||
try:
|
||||
self._ctx.value_stream_manager.release(self._brightness_vs_id, self._target_id)
|
||||
self._ctx.value_stream_manager.release(self._brightness_vs_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error releasing value stream: {e}")
|
||||
self._value_stream = None
|
||||
@@ -235,7 +235,7 @@ class KCTargetProcessor(TargetProcessor):
|
||||
# Release old stream
|
||||
if self._value_stream is not None and old_vs_id:
|
||||
try:
|
||||
vs_mgr.release(old_vs_id, self._target_id)
|
||||
vs_mgr.release(old_vs_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error releasing old value stream {old_vs_id}: {e}")
|
||||
self._value_stream = None
|
||||
@@ -243,7 +243,7 @@ class KCTargetProcessor(TargetProcessor):
|
||||
# Acquire new stream
|
||||
if vs_id:
|
||||
try:
|
||||
self._value_stream = vs_mgr.acquire(vs_id, self._target_id)
|
||||
self._value_stream = vs_mgr.acquire(vs_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to acquire value stream {vs_id}: {e}")
|
||||
self._value_stream = None
|
||||
|
||||
@@ -109,6 +109,10 @@ class ProcessorManager:
|
||||
def audio_capture_manager(self) -> AudioCaptureManager:
|
||||
return self._audio_capture_manager
|
||||
|
||||
@property
|
||||
def value_stream_manager(self) -> Optional[ValueStreamManager]:
|
||||
return self._value_stream_manager
|
||||
|
||||
@property
|
||||
def metrics_history(self) -> MetricsHistory:
|
||||
return self._metrics_history
|
||||
|
||||
@@ -158,6 +158,7 @@ class AudioValueStream(ValueStream):
|
||||
smoothing: float = 0.3,
|
||||
min_value: float = 0.0,
|
||||
max_value: float = 1.0,
|
||||
auto_gain: bool = False,
|
||||
audio_capture_manager: Optional["AudioCaptureManager"] = None,
|
||||
audio_source_store: Optional["AudioSourceStore"] = None,
|
||||
audio_template_store=None,
|
||||
@@ -168,6 +169,9 @@ class AudioValueStream(ValueStream):
|
||||
self._smoothing = smoothing
|
||||
self._min = min_value
|
||||
self._max = max_value
|
||||
self._auto_gain = auto_gain
|
||||
self._rolling_peak = 0.0 # tracks observed max raw audio value
|
||||
self._rolling_decay = 0.995 # slow decay (~5-10s adaptation)
|
||||
self._audio_capture_manager = audio_capture_manager
|
||||
self._audio_source_store = audio_source_store
|
||||
self._audio_template_store = audio_template_store
|
||||
@@ -237,6 +241,13 @@ class AudioValueStream(ValueStream):
|
||||
return self._prev_value
|
||||
|
||||
raw = self._extract_raw(analysis)
|
||||
|
||||
# Auto-gain: normalize raw against rolling observed peak
|
||||
if self._auto_gain:
|
||||
self._rolling_peak = max(raw, self._rolling_peak * self._rolling_decay)
|
||||
if self._rolling_peak > 0.001:
|
||||
raw = raw / self._rolling_peak
|
||||
|
||||
raw = min(1.0, raw * self._sensitivity)
|
||||
|
||||
# Temporal smoothing
|
||||
@@ -284,12 +295,18 @@ class AudioValueStream(ValueStream):
|
||||
return
|
||||
|
||||
old_source_id = self._audio_source_id
|
||||
old_auto_gain = self._auto_gain
|
||||
self._audio_source_id = source.audio_source_id
|
||||
self._mode = source.mode
|
||||
self._sensitivity = source.sensitivity
|
||||
self._smoothing = source.smoothing
|
||||
self._min = source.min_value
|
||||
self._max = source.max_value
|
||||
self._auto_gain = source.auto_gain
|
||||
|
||||
# Reset rolling peak when auto-gain is toggled on
|
||||
if self._auto_gain and not old_auto_gain:
|
||||
self._rolling_peak = 0.0
|
||||
|
||||
# If audio source changed, re-resolve and swap capture stream
|
||||
if source.audio_source_id != old_source_id:
|
||||
@@ -525,15 +542,13 @@ class SceneValueStream(ValueStream):
|
||||
# Manager
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_key(vs_id: str, consumer_id: str) -> str:
|
||||
return f"{vs_id}:{consumer_id}"
|
||||
|
||||
|
||||
class ValueStreamManager:
|
||||
"""Owns running ValueStream instances, keyed by ``vs_id:consumer_id``.
|
||||
"""Owns running ValueStream instances, shared and ref-counted by vs_id.
|
||||
|
||||
Each consumer (target processor) gets its own stream instance —
|
||||
no sharing or ref-counting needed since streams are cheap.
|
||||
Value streams produce scalars (not LED-count-dependent), so a single
|
||||
stream instance is shared across all consumers that use the same
|
||||
ValueSource. Ref-counting ensures the stream is stopped only when
|
||||
the last consumer releases it.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -549,59 +564,66 @@ class ValueStreamManager:
|
||||
self._audio_source_store = audio_source_store
|
||||
self._live_stream_manager = live_stream_manager
|
||||
self._audio_template_store = audio_template_store
|
||||
self._streams: Dict[str, ValueStream] = {}
|
||||
self._streams: Dict[str, ValueStream] = {} # vs_id → stream
|
||||
self._ref_counts: Dict[str, int] = {} # vs_id → ref count
|
||||
|
||||
def acquire(self, vs_id: str, consumer_id: str) -> ValueStream:
|
||||
"""Create and start a ValueStream for the given ValueSource.
|
||||
def acquire(self, vs_id: str) -> ValueStream:
|
||||
"""Get or create a shared ValueStream for the given ValueSource.
|
||||
|
||||
Args:
|
||||
vs_id: ID of the ValueSource config
|
||||
consumer_id: Unique consumer identifier (target_id)
|
||||
|
||||
Returns:
|
||||
Running ValueStream instance
|
||||
Increments the ref count. The stream is stopped only when all
|
||||
consumers have called :meth:`release`.
|
||||
"""
|
||||
key = _make_key(vs_id, consumer_id)
|
||||
if key in self._streams:
|
||||
return self._streams[key]
|
||||
if vs_id in self._streams:
|
||||
self._ref_counts[vs_id] += 1
|
||||
logger.info(f"Shared value stream {vs_id} (refs={self._ref_counts[vs_id]})")
|
||||
return self._streams[vs_id]
|
||||
|
||||
source = self._value_source_store.get_source(vs_id)
|
||||
stream = self._create_stream(source)
|
||||
stream.start()
|
||||
self._streams[key] = stream
|
||||
logger.info(f"Acquired value stream {key} (type={source.source_type})")
|
||||
self._streams[vs_id] = stream
|
||||
self._ref_counts[vs_id] = 1
|
||||
logger.info(f"Acquired value stream {vs_id} (type={source.source_type})")
|
||||
return stream
|
||||
|
||||
def release(self, vs_id: str, consumer_id: str) -> None:
|
||||
"""Stop and remove a ValueStream."""
|
||||
key = _make_key(vs_id, consumer_id)
|
||||
stream = self._streams.pop(key, None)
|
||||
if stream:
|
||||
stream.stop()
|
||||
logger.info(f"Released value stream {key}")
|
||||
def release(self, vs_id: str) -> None:
|
||||
"""Decrement ref count; stop the stream when it reaches zero."""
|
||||
if vs_id not in self._ref_counts:
|
||||
return
|
||||
|
||||
self._ref_counts[vs_id] -= 1
|
||||
refs = self._ref_counts[vs_id]
|
||||
|
||||
if refs <= 0:
|
||||
stream = self._streams.pop(vs_id, None)
|
||||
if stream:
|
||||
stream.stop()
|
||||
del self._ref_counts[vs_id]
|
||||
logger.info(f"Released value stream {vs_id} (last ref)")
|
||||
else:
|
||||
logger.info(f"Released ref for value stream {vs_id} (refs={refs})")
|
||||
|
||||
def update_source(self, vs_id: str) -> None:
|
||||
"""Hot-update all running streams that use the given ValueSource."""
|
||||
"""Hot-update the shared stream for the given ValueSource."""
|
||||
try:
|
||||
source = self._value_source_store.get_source(vs_id)
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
prefix = f"{vs_id}:"
|
||||
for key, stream in self._streams.items():
|
||||
if key.startswith(prefix):
|
||||
stream.update_source(source)
|
||||
|
||||
logger.debug(f"Updated running value streams for source {vs_id}")
|
||||
stream = self._streams.get(vs_id)
|
||||
if stream:
|
||||
stream.update_source(source)
|
||||
logger.debug(f"Updated value stream {vs_id}")
|
||||
|
||||
def release_all(self) -> None:
|
||||
"""Stop and remove all managed streams. Called on shutdown."""
|
||||
for key, stream in self._streams.items():
|
||||
for vs_id, stream in self._streams.items():
|
||||
try:
|
||||
stream.stop()
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping value stream {key}: {e}")
|
||||
logger.error(f"Error stopping value stream {vs_id}: {e}")
|
||||
self._streams.clear()
|
||||
self._ref_counts.clear()
|
||||
logger.info("Released all value streams")
|
||||
|
||||
def _create_stream(self, source: "ValueSource") -> ValueStream:
|
||||
@@ -632,6 +654,7 @@ class ValueStreamManager:
|
||||
smoothing=source.smoothing,
|
||||
min_value=source.min_value,
|
||||
max_value=source.max_value,
|
||||
auto_gain=source.auto_gain,
|
||||
audio_capture_manager=self._audio_capture_manager,
|
||||
audio_source_store=self._audio_source_store,
|
||||
audio_template_store=self._audio_template_store,
|
||||
|
||||
@@ -136,7 +136,7 @@ class WledTargetProcessor(TargetProcessor):
|
||||
if self._brightness_vs_id and self._ctx.value_stream_manager:
|
||||
try:
|
||||
self._value_stream = self._ctx.value_stream_manager.acquire(
|
||||
self._brightness_vs_id, self._target_id
|
||||
self._brightness_vs_id
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to acquire value stream {self._brightness_vs_id}: {e}")
|
||||
@@ -190,7 +190,7 @@ class WledTargetProcessor(TargetProcessor):
|
||||
# Release value stream
|
||||
if self._value_stream is not None and self._ctx.value_stream_manager:
|
||||
try:
|
||||
self._ctx.value_stream_manager.release(self._brightness_vs_id, self._target_id)
|
||||
self._ctx.value_stream_manager.release(self._brightness_vs_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error releasing value stream: {e}")
|
||||
self._value_stream = None
|
||||
@@ -270,7 +270,7 @@ class WledTargetProcessor(TargetProcessor):
|
||||
# Release old stream
|
||||
if self._value_stream is not None and old_vs_id:
|
||||
try:
|
||||
vs_mgr.release(old_vs_id, self._target_id)
|
||||
vs_mgr.release(old_vs_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error releasing old value stream {old_vs_id}: {e}")
|
||||
self._value_stream = None
|
||||
@@ -278,7 +278,7 @@ class WledTargetProcessor(TargetProcessor):
|
||||
# Acquire new stream
|
||||
if vs_id:
|
||||
try:
|
||||
self._value_stream = vs_mgr.acquire(vs_id, self._target_id)
|
||||
self._value_stream = vs_mgr.acquire(vs_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to acquire value stream {vs_id}: {e}")
|
||||
self._value_stream = None
|
||||
|
||||
Reference in New Issue
Block a user