Add audio-reactive color strip sources, improve delete error messages

Add new "audio" color strip source type with three visualization modes
(spectrum analyzer, beat pulse, VU meter) supporting WASAPI loopback and
microphone input via PyAudioWPatch. Includes shared audio capture with
ref counting, real-time FFT spectrum analysis, and beat detection.

Improve all referential integrity 409 error messages across delete
endpoints to include specific names of referencing entities instead of
generic "one or more" messages.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-23 11:56:54 +03:00
parent 2657f46e5d
commit bbd2ac9910
24 changed files with 1247 additions and 86 deletions

View File

@@ -0,0 +1,300 @@
"""Audio-reactive LED color strip stream.
Implements AudioColorStripStream which produces LED color arrays driven by
real-time audio analysis (spectrum, beat detection, RMS levels).
Three visualization modes:
spectrum — FFT frequency bars mapped across LEDs with palette coloring
beat_pulse — full-strip flash on beat detection with exponential decay
vu_meter — volume level fills LEDs like a progress bar
"""
import threading
import time
from typing import Optional
import numpy as np
from wled_controller.core.audio.audio_capture import AudioCaptureManager
from wled_controller.core.processing.color_strip_stream import ColorStripStream
from wled_controller.core.processing.effect_stream import _build_palette_lut
from wled_controller.utils import get_logger
from wled_controller.utils.timer import high_resolution_timer
logger = get_logger(__name__)
class AudioColorStripStream(ColorStripStream):
"""Color strip stream driven by audio analysis.
Dispatches to one of three render methods based on visualization_mode:
spectrum, beat_pulse, vu_meter.
Polls AudioCaptureStream for latest analysis data and renders to LED array.
Uses the same lifecycle pattern as EffectColorStripStream: background
thread, double-buffered output, configure() for auto-sizing.
"""
def __init__(self, source, audio_capture_manager: AudioCaptureManager):
self._audio_capture_manager = audio_capture_manager
self._audio_stream = None # acquired on start
self._colors_lock = threading.Lock()
self._running = False
self._thread: Optional[threading.Thread] = None
self._fps = 30
# Beat pulse persistent state
self._pulse_brightness = 0.0
# Smoothed spectrum for temporal smoothing between frames
self._prev_spectrum: Optional[np.ndarray] = None
self._prev_rms = 0.0
self._update_from_source(source)
def _update_from_source(self, source) -> None:
self._visualization_mode = getattr(source, "visualization_mode", "spectrum")
self._audio_device_index = getattr(source, "audio_device_index", -1)
self._audio_loopback = bool(getattr(source, "audio_loopback", True))
self._sensitivity = float(getattr(source, "sensitivity", 1.0))
self._smoothing = float(getattr(source, "smoothing", 0.3))
self._palette_name = getattr(source, "palette", "rainbow")
self._palette_lut = _build_palette_lut(self._palette_name)
color = getattr(source, "color", None)
self._color = color if isinstance(color, list) and len(color) == 3 else [0, 255, 0]
color_peak = getattr(source, "color_peak", None)
self._color_peak = color_peak if isinstance(color_peak, list) and len(color_peak) == 3 else [255, 0, 0]
self._auto_size = not source.led_count
self._led_count = source.led_count if source.led_count and source.led_count > 0 else 1
self._mirror = bool(getattr(source, "mirror", False))
with self._colors_lock:
self._colors: Optional[np.ndarray] = None
# ── ColorStripStream interface ──────────────────────────────────
def configure(self, device_led_count: int) -> None:
if self._auto_size and device_led_count > 0:
new_count = max(self._led_count, device_led_count)
if new_count != self._led_count:
self._led_count = new_count
logger.debug(f"AudioColorStripStream auto-sized to {new_count} LEDs")
@property
def target_fps(self) -> int:
return self._fps
@property
def led_count(self) -> int:
return self._led_count
def set_capture_fps(self, fps: int) -> None:
self._fps = max(1, min(90, fps))
def start(self) -> None:
if self._running:
return
# Acquire shared audio capture stream
self._audio_stream = self._audio_capture_manager.acquire(
self._audio_device_index, self._audio_loopback
)
self._running = True
self._thread = threading.Thread(
target=self._animate_loop,
name=f"css-audio-{self._visualization_mode}",
daemon=True,
)
self._thread.start()
logger.info(
f"AudioColorStripStream started (viz={self._visualization_mode}, "
f"device={self._audio_device_index}, loopback={self._audio_loopback})"
)
def stop(self) -> None:
self._running = False
if self._thread:
self._thread.join(timeout=5.0)
if self._thread.is_alive():
logger.warning("AudioColorStripStream thread did not terminate within 5s")
self._thread = None
# Release shared audio capture
if self._audio_stream is not None:
self._audio_capture_manager.release(self._audio_device_index, self._audio_loopback)
self._audio_stream = None
self._prev_spectrum = None
logger.info("AudioColorStripStream stopped")
def get_latest_colors(self) -> Optional[np.ndarray]:
with self._colors_lock:
return self._colors
def update_source(self, source) -> None:
from wled_controller.storage.color_strip_source import AudioColorStripSource
if isinstance(source, AudioColorStripSource):
old_device = self._audio_device_index
old_loopback = self._audio_loopback
prev_led_count = self._led_count if self._auto_size else None
self._update_from_source(source)
if prev_led_count and self._auto_size:
self._led_count = prev_led_count
# If audio device changed, swap capture stream
if self._running and (self._audio_device_index != old_device or self._audio_loopback != old_loopback):
self._audio_capture_manager.release(old_device, old_loopback)
self._audio_stream = self._audio_capture_manager.acquire(
self._audio_device_index, self._audio_loopback
)
logger.info(
f"AudioColorStripStream swapped audio device: "
f"{old_device}:{old_loopback}{self._audio_device_index}:{self._audio_loopback}"
)
logger.info("AudioColorStripStream params updated in-place")
# ── Main animation loop ─────────────────────────────────────────
def _animate_loop(self) -> None:
_pool_n = 0
_buf_a = _buf_b = None
_use_a = True
renderers = {
"spectrum": self._render_spectrum,
"beat_pulse": self._render_beat_pulse,
"vu_meter": self._render_vu_meter,
}
with high_resolution_timer():
while self._running:
loop_start = time.perf_counter()
frame_time = 1.0 / self._fps
n = self._led_count
# Rebuild scratch buffers when LED count changes
if n != _pool_n:
_pool_n = n
_buf_a = np.zeros((n, 3), dtype=np.uint8)
_buf_b = np.zeros((n, 3), dtype=np.uint8)
buf = _buf_a if _use_a else _buf_b
_use_a = not _use_a
# Get latest audio analysis
analysis = None
if self._audio_stream is not None:
analysis = self._audio_stream.get_latest_analysis()
render_fn = renderers.get(self._visualization_mode, self._render_spectrum)
render_fn(buf, n, analysis)
with self._colors_lock:
self._colors = buf
elapsed = time.perf_counter() - loop_start
time.sleep(max(frame_time - elapsed, 0.001))
# ── Spectrum Analyzer ──────────────────────────────────────────
def _render_spectrum(self, buf: np.ndarray, n: int, analysis) -> None:
if analysis is None:
buf[:] = 0
return
spectrum = analysis.spectrum
sensitivity = self._sensitivity
smoothing = self._smoothing
lut = self._palette_lut
num_bands = len(spectrum)
band_x = np.arange(num_bands, dtype=np.float32)
if self._mirror:
half = (n + 1) // 2
led_x = np.linspace(0, num_bands - 1, half)
amplitudes = np.interp(led_x, band_x, spectrum)
amplitudes *= sensitivity
np.clip(amplitudes, 0.0, 1.0, out=amplitudes)
# Temporal smoothing
if self._prev_spectrum is not None and len(self._prev_spectrum) == half:
amplitudes[:] = smoothing * self._prev_spectrum + (1.0 - smoothing) * amplitudes
self._prev_spectrum = amplitudes.copy()
# Mirror: center = bass, edges = treble
full_amp = np.empty(n, dtype=np.float32)
full_amp[:half] = amplitudes[::-1]
full_amp[half:] = amplitudes[: n - half]
else:
led_x = np.linspace(0, num_bands - 1, n)
amplitudes = np.interp(led_x, band_x, spectrum)
amplitudes *= sensitivity
np.clip(amplitudes, 0.0, 1.0, out=amplitudes)
# Temporal smoothing
if self._prev_spectrum is not None and len(self._prev_spectrum) == n:
amplitudes[:] = smoothing * self._prev_spectrum + (1.0 - smoothing) * amplitudes
self._prev_spectrum = amplitudes.copy()
full_amp = amplitudes
# Map to palette: amplitude → palette index → color
indices = (full_amp * 255).astype(np.int32)
np.clip(indices, 0, 255, out=indices)
colors = lut[indices] # (n, 3) uint8
# Scale brightness by amplitude
for ch in range(3):
buf[:, ch] = (colors[:, ch].astype(np.float32) * full_amp).astype(np.uint8)
# ── VU Meter ───────────────────────────────────────────────────
def _render_vu_meter(self, buf: np.ndarray, n: int, analysis) -> None:
if analysis is None:
buf[:] = 0
return
rms = analysis.rms * self._sensitivity
# Temporal smoothing on RMS
rms = self._smoothing * self._prev_rms + (1.0 - self._smoothing) * rms
self._prev_rms = rms
rms = min(1.0, rms)
fill_count = int(rms * n)
buf[:] = 0
if fill_count > 0:
base = np.array(self._color, dtype=np.float32)
peak = np.array(self._color_peak, dtype=np.float32)
# Gradient from base color to peak color
t = np.linspace(0, 1, n, dtype=np.float32)[:fill_count]
for ch in range(3):
buf[:fill_count, ch] = np.clip(
base[ch] + (peak[ch] - base[ch]) * t, 0, 255
).astype(np.uint8)
# ── Beat Pulse ─────────────────────────────────────────────────
def _render_beat_pulse(self, buf: np.ndarray, n: int, analysis) -> None:
if analysis is None:
buf[:] = 0
return
# On beat: flash to full brightness
if analysis.beat:
self._pulse_brightness = 1.0
else:
# Exponential decay — sensitivity controls decay speed
decay_rate = 0.05 + 0.15 * (1.0 / max(self._sensitivity, 0.1))
self._pulse_brightness = max(0.0, self._pulse_brightness - decay_rate)
brightness = self._pulse_brightness
if brightness < 0.01:
buf[:] = 0
return
# Color from palette based on beat intensity
palette_idx = int(analysis.beat_intensity * 255)
palette_idx = max(0, min(255, palette_idx))
base_color = self._palette_lut[palette_idx]
# Fill all LEDs with pulsing color
buf[:, 0] = int(base_color[0] * brightness)
buf[:, 1] = int(base_color[1] * brightness)
buf[:, 2] = int(base_color[2] * brightness)

View File

@@ -56,14 +56,16 @@ class ColorStripStreamManager:
keyed by ``{css_id}:{consumer_id}``.
"""
def __init__(self, color_strip_store, live_stream_manager):
def __init__(self, color_strip_store, live_stream_manager, audio_capture_manager=None):
"""
Args:
color_strip_store: ColorStripStore for resolving source configs
live_stream_manager: LiveStreamManager for acquiring picture streams
audio_capture_manager: AudioCaptureManager for audio-reactive sources
"""
self._color_strip_store = color_strip_store
self._live_stream_manager = live_stream_manager
self._audio_capture_manager = audio_capture_manager
self._streams: Dict[str, _ColorStripEntry] = {}
def _resolve_key(self, css_id: str, consumer_id: str) -> str:
@@ -100,7 +102,10 @@ class ColorStripStreamManager:
# Non-sharable: always create a fresh per-consumer instance
if not source.sharable:
if source.source_type == "composite":
if source.source_type == "audio":
from wled_controller.core.processing.audio_stream import AudioColorStripStream
css_stream = AudioColorStripStream(source, self._audio_capture_manager)
elif source.source_type == "composite":
from wled_controller.core.processing.composite_stream import CompositeColorStripStream
css_stream = CompositeColorStripStream(source, self)
else:

View File

@@ -13,6 +13,7 @@ from wled_controller.core.devices.led_client import (
create_led_client,
get_provider,
)
from wled_controller.core.audio.audio_capture import AudioCaptureManager
from wled_controller.core.processing.live_stream_manager import LiveStreamManager
from wled_controller.core.processing.color_strip_stream_manager import ColorStripStreamManager
from wled_controller.core.capture.screen_overlay import OverlayManager
@@ -79,9 +80,11 @@ class ProcessorManager:
self._live_stream_manager = LiveStreamManager(
picture_source_store, capture_template_store, pp_template_store
)
self._audio_capture_manager = AudioCaptureManager()
self._color_strip_stream_manager = ColorStripStreamManager(
color_strip_store=color_strip_store,
live_stream_manager=self._live_stream_manager,
audio_capture_manager=self._audio_capture_manager,
)
self._overlay_manager = OverlayManager()
self._event_queues: List[asyncio.Queue] = []