Add audio capture engine template system with multi-backend support

Introduces an engine+template abstraction for audio capture, mirroring the
existing screen capture engine pattern. This enables multiple audio backends
(WASAPI for Windows, sounddevice for cross-platform) with per-source
engine configuration via reusable templates.

Backend:
- AudioCaptureEngine ABC with WasapiEngine and SounddeviceEngine implementations
- AudioEngineRegistry for engine discovery and factory creation
- AudioAnalyzer class decouples FFT/RMS/beat analysis from engine-specific capture
- ManagedAudioStream wraps engine stream + analyzer in background thread
- AudioCaptureTemplate model and AudioTemplateStore with JSON CRUD
- AudioCaptureManager keyed by (engine_type, device_index, is_loopback)
- Auto-migration: default template created on startup, assigned to existing sources
- Full REST API: CRUD for audio templates + engine listing with availability flags
- audio_template_id added to MultichannelAudioSource model and API schemas

Frontend:
- Audio template cards in Streams > Audio tab with engine badge and config details
- Audio template editor modal with engine selector and dynamic config fields
- Audio template dropdown in multichannel audio source editor
- Template name crosslink badge on multichannel audio source cards
- Confirm modal z-index fix (always stacks above editor modals)
- i18n keys for EN and RU

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-26 13:55:46 +03:00
parent cbbaa852ed
commit bae2166bc2
35 changed files with 2163 additions and 402 deletions

View File

@@ -0,0 +1,37 @@
"""Audio capture engine abstraction layer."""
from wled_controller.core.audio.base import (
AudioCaptureEngine,
AudioCaptureStreamBase,
AudioDeviceInfo,
)
from wled_controller.core.audio.factory import AudioEngineRegistry
from wled_controller.core.audio.analysis import (
AudioAnalysis,
AudioAnalyzer,
NUM_BANDS,
DEFAULT_SAMPLE_RATE,
DEFAULT_CHUNK_SIZE,
)
from wled_controller.core.audio.wasapi_engine import WasapiEngine, WasapiCaptureStream
from wled_controller.core.audio.sounddevice_engine import SounddeviceEngine, SounddeviceCaptureStream
# Auto-register available engines
AudioEngineRegistry.register(WasapiEngine)
AudioEngineRegistry.register(SounddeviceEngine)
__all__ = [
"AudioCaptureEngine",
"AudioCaptureStreamBase",
"AudioDeviceInfo",
"AudioEngineRegistry",
"AudioAnalysis",
"AudioAnalyzer",
"NUM_BANDS",
"DEFAULT_SAMPLE_RATE",
"DEFAULT_CHUNK_SIZE",
"WasapiEngine",
"WasapiCaptureStream",
"SounddeviceEngine",
"SounddeviceCaptureStream",
]

View File

@@ -0,0 +1,217 @@
"""Shared audio analysis — FFT spectrum, RMS, beat detection.
Engines provide raw audio chunks; AudioAnalyzer processes them into
AudioAnalysis snapshots consumed by visualization streams.
"""
import math
import time
from dataclasses import dataclass, field
from typing import List, Tuple
import numpy as np
# Number of logarithmic frequency bands for spectrum analysis
NUM_BANDS = 64
# Audio defaults
DEFAULT_SAMPLE_RATE = 44100
DEFAULT_CHUNK_SIZE = 2048 # ~46 ms at 44100 Hz
@dataclass
class AudioAnalysis:
"""Snapshot of audio analysis results.
Written by the capture thread, read by visualization streams.
Mono fields contain the mixed-down signal (all channels averaged).
Per-channel fields (left/right) are populated when the source is stereo+.
For mono sources, left/right are copies of the mono data.
"""
timestamp: float = 0.0
# Mono (mixed) — backward-compatible fields
rms: float = 0.0
peak: float = 0.0
spectrum: np.ndarray = field(default_factory=lambda: np.zeros(NUM_BANDS, dtype=np.float32))
beat: bool = False
beat_intensity: float = 0.0
# Per-channel
left_rms: float = 0.0
left_spectrum: np.ndarray = field(default_factory=lambda: np.zeros(NUM_BANDS, dtype=np.float32))
right_rms: float = 0.0
right_spectrum: np.ndarray = field(default_factory=lambda: np.zeros(NUM_BANDS, dtype=np.float32))
def _build_log_bands(num_bands: int, fft_size: int, sample_rate: int) -> List[Tuple[int, int]]:
"""Build logarithmically-spaced frequency band boundaries for FFT bins.
Returns list of (start_bin, end_bin) pairs.
"""
nyquist = sample_rate / 2
min_freq = 20.0
max_freq = min(nyquist, 20000.0)
log_min = math.log10(min_freq)
log_max = math.log10(max_freq)
freqs = np.logspace(log_min, log_max, num_bands + 1)
bin_width = sample_rate / fft_size
bands = []
for i in range(num_bands):
start_bin = max(1, int(freqs[i] / bin_width))
end_bin = max(start_bin + 1, int(freqs[i + 1] / bin_width))
end_bin = min(end_bin, fft_size // 2)
bands.append((start_bin, end_bin))
return bands
class AudioAnalyzer:
"""Stateful audio analyzer — call analyze() per raw chunk.
Maintains smoothing buffers, energy history for beat detection,
and pre-allocated FFT scratch buffers. Thread-safe only if a single
thread calls analyze() (the capture thread).
"""
def __init__(self, sample_rate: int = DEFAULT_SAMPLE_RATE, chunk_size: int = DEFAULT_CHUNK_SIZE):
self._sample_rate = sample_rate
self._chunk_size = chunk_size
# FFT helpers
self._window = np.hanning(chunk_size).astype(np.float32)
self._bands = _build_log_bands(NUM_BANDS, chunk_size, sample_rate)
# Beat detection state
self._energy_history: np.ndarray = np.zeros(43, dtype=np.float64) # ~1s at 44100/2048
self._energy_idx = 0
# Smoothed spectrum (exponential decay)
self._smooth_spectrum = np.zeros(NUM_BANDS, dtype=np.float32)
self._smooth_spectrum_left = np.zeros(NUM_BANDS, dtype=np.float32)
self._smooth_spectrum_right = np.zeros(NUM_BANDS, dtype=np.float32)
self._smoothing_alpha = 0.3
# Pre-allocated scratch buffers
self._fft_windowed = np.empty(chunk_size, dtype=np.float32)
self._spectrum_buf = np.zeros(NUM_BANDS, dtype=np.float32)
self._spectrum_buf_left = np.zeros(NUM_BANDS, dtype=np.float32)
self._spectrum_buf_right = np.zeros(NUM_BANDS, dtype=np.float32)
self._sq_buf = np.empty(chunk_size, dtype=np.float32)
# Pre-allocated channel buffers for stereo
self._left_buf = np.empty(chunk_size, dtype=np.float32)
self._right_buf = np.empty(chunk_size, dtype=np.float32)
self._mono_buf = np.empty(chunk_size, dtype=np.float32)
@property
def sample_rate(self) -> int:
return self._sample_rate
@sample_rate.setter
def sample_rate(self, value: int):
if value != self._sample_rate:
self._sample_rate = value
self._bands = _build_log_bands(NUM_BANDS, self._chunk_size, value)
def analyze(self, raw_data: np.ndarray, channels: int) -> AudioAnalysis:
"""Analyze a raw audio chunk and return an AudioAnalysis snapshot.
Args:
raw_data: 1-D float32 array of interleaved samples (length = chunk_size * channels)
channels: Number of audio channels
Returns:
AudioAnalysis with spectrum, RMS, beat, etc.
"""
chunk_size = self._chunk_size
alpha = self._smoothing_alpha
one_minus_alpha = 1.0 - alpha
# Split channels and mix to mono
if channels > 1:
data = raw_data.reshape(-1, channels)
np.copyto(self._left_buf[:len(data)], data[:, 0])
right_col = data[:, 1] if channels >= 2 else data[:, 0]
np.copyto(self._right_buf[:len(data)], right_col)
np.add(data[:, 0], right_col, out=self._mono_buf[:len(data)])
self._mono_buf[:len(data)] *= 0.5
samples = self._mono_buf[:len(data)]
left_samples = self._left_buf[:len(data)]
right_samples = self._right_buf[:len(data)]
else:
samples = raw_data
left_samples = samples
right_samples = samples
# RMS and peak
n = len(samples)
np.multiply(samples, samples, out=self._sq_buf[:n])
rms = float(np.sqrt(np.mean(self._sq_buf[:n])))
peak = float(np.max(np.abs(samples)))
if channels > 1:
np.multiply(left_samples, left_samples, out=self._sq_buf[:n])
left_rms = float(np.sqrt(np.mean(self._sq_buf[:n])))
np.multiply(right_samples, right_samples, out=self._sq_buf[:n])
right_rms = float(np.sqrt(np.mean(self._sq_buf[:n])))
else:
left_rms = rms
right_rms = rms
# FFT for mono, left, right
self._fft_bands(samples, self._spectrum_buf, self._smooth_spectrum,
alpha, one_minus_alpha)
self._fft_bands(left_samples, self._spectrum_buf_left, self._smooth_spectrum_left,
alpha, one_minus_alpha)
self._fft_bands(right_samples, self._spectrum_buf_right, self._smooth_spectrum_right,
alpha, one_minus_alpha)
# Beat detection — compare current energy to rolling average (mono)
np.multiply(samples, samples, out=self._sq_buf[:n])
energy = float(np.sum(self._sq_buf[:n]))
self._energy_history[self._energy_idx] = energy
self._energy_idx = (self._energy_idx + 1) % len(self._energy_history)
avg_energy = float(np.mean(self._energy_history))
beat = False
beat_intensity = 0.0
if avg_energy > 1e-8:
ratio = energy / avg_energy
if ratio > 1.5:
beat = True
beat_intensity = min(1.0, (ratio - 1.0) / 2.0)
return AudioAnalysis(
timestamp=time.perf_counter(),
rms=rms,
peak=peak,
spectrum=self._smooth_spectrum.copy(),
beat=beat,
beat_intensity=beat_intensity,
left_rms=left_rms,
left_spectrum=self._smooth_spectrum_left.copy(),
right_rms=right_rms,
right_spectrum=self._smooth_spectrum_right.copy(),
)
def _fft_bands(self, samps, buf, smooth_buf, alpha, one_minus_alpha):
"""Compute FFT, bin into bands, normalize, and smooth."""
chunk_size = self._chunk_size
chunk = samps[:chunk_size]
if len(chunk) < chunk_size:
chunk = np.pad(chunk, (0, chunk_size - len(chunk)))
np.multiply(chunk, self._window, out=self._fft_windowed)
fft_mag = np.abs(np.fft.rfft(self._fft_windowed))
fft_mag *= (1.0 / chunk_size)
fft_len = len(fft_mag)
for b, (s, e) in enumerate(self._bands):
if s < fft_len and e <= fft_len:
buf[b] = float(np.mean(fft_mag[s:e]))
else:
buf[b] = 0.0
spec_max = float(np.max(buf))
if spec_max > 1e-6:
buf *= (1.0 / spec_max)
smooth_buf *= one_minus_alpha
smooth_buf += alpha * buf

View File

@@ -1,137 +1,66 @@
"""Audio capture service — shared audio analysis with ref counting.
Provides real-time FFT spectrum, RMS level, and beat detection from
system audio (WASAPI loopback) or microphone/line-in. Multiple
AudioColorStripStreams sharing the same device reuse a single capture
thread via AudioCaptureManager.
system audio or microphone/line-in. Multiple AudioColorStripStreams
sharing the same device reuse a single capture thread via
AudioCaptureManager.
Uses PyAudioWPatch for WASAPI loopback support on Windows.
Engine-agnostic: uses AudioEngineRegistry to create the underlying
capture stream (WASAPI, sounddevice, etc.).
"""
import math
import threading
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
import numpy as np
from typing import Any, Dict, List, Optional, Tuple
from wled_controller.core.audio.analysis import (
AudioAnalysis,
AudioAnalyzer,
DEFAULT_CHUNK_SIZE,
DEFAULT_SAMPLE_RATE,
)
from wled_controller.core.audio.base import AudioCaptureStreamBase
from wled_controller.core.audio.factory import AudioEngineRegistry
from wled_controller.utils import get_logger
logger = get_logger(__name__)
# Number of logarithmic frequency bands for spectrum analysis
NUM_BANDS = 64
# Audio defaults
DEFAULT_SAMPLE_RATE = 44100
DEFAULT_CHUNK_SIZE = 2048 # ~46 ms at 44100 Hz
# Re-export for backward compatibility
__all__ = [
"AudioAnalysis",
"ManagedAudioStream",
"AudioCaptureManager",
]
# ---------------------------------------------------------------------------
# AudioAnalysis — thread-safe snapshot of latest analysis results
# ManagedAudioStream — wraps engine stream + analyzer in background thread
# ---------------------------------------------------------------------------
@dataclass
class AudioAnalysis:
"""Snapshot of audio analysis results.
class ManagedAudioStream:
"""Wraps an AudioCaptureStreamBase + AudioAnalyzer in a background thread.
Written by the capture thread, read by visualization streams.
Mono fields contain the mixed-down signal (all channels averaged).
Per-channel fields (left/right) are populated when the source is stereo+.
For mono sources, left/right are copies of the mono data.
"""
timestamp: float = 0.0
# Mono (mixed) — backward-compatible fields
rms: float = 0.0
peak: float = 0.0
spectrum: np.ndarray = field(default_factory=lambda: np.zeros(NUM_BANDS, dtype=np.float32))
beat: bool = False
beat_intensity: float = 0.0
# Per-channel
left_rms: float = 0.0
left_spectrum: np.ndarray = field(default_factory=lambda: np.zeros(NUM_BANDS, dtype=np.float32))
right_rms: float = 0.0
right_spectrum: np.ndarray = field(default_factory=lambda: np.zeros(NUM_BANDS, dtype=np.float32))
# ---------------------------------------------------------------------------
# AudioCaptureStream — one per unique audio device
# ---------------------------------------------------------------------------
def _build_log_bands(num_bands: int, fft_size: int, sample_rate: int) -> List[Tuple[int, int]]:
"""Build logarithmically-spaced frequency band boundaries for FFT bins.
Returns list of (start_bin, end_bin) pairs.
"""
nyquist = sample_rate / 2
# Map bands to log-spaced frequencies from 20 Hz to Nyquist
min_freq = 20.0
max_freq = min(nyquist, 20000.0)
log_min = math.log10(min_freq)
log_max = math.log10(max_freq)
freqs = np.logspace(log_min, log_max, num_bands + 1)
bin_width = sample_rate / fft_size
bands = []
for i in range(num_bands):
start_bin = max(1, int(freqs[i] / bin_width))
end_bin = max(start_bin + 1, int(freqs[i + 1] / bin_width))
# Clamp to FFT range
end_bin = min(end_bin, fft_size // 2)
bands.append((start_bin, end_bin))
return bands
class AudioCaptureStream:
"""Captures audio from a single device and provides real-time analysis.
Runs a background thread that reads audio chunks, computes FFT, RMS,
and beat detection. Consumers read the latest analysis via
``get_latest_analysis()`` (thread-safe).
Public API is the same as the old AudioCaptureStream:
start(), stop(), get_latest_analysis(), get_last_timing().
"""
def __init__(
self,
engine_type: str,
device_index: int,
is_loopback: bool,
sample_rate: int = DEFAULT_SAMPLE_RATE,
chunk_size: int = DEFAULT_CHUNK_SIZE,
engine_config: Optional[Dict[str, Any]] = None,
):
self._engine_type = engine_type
self._device_index = device_index
self._is_loopback = is_loopback
self._sample_rate = sample_rate
self._chunk_size = chunk_size
self._engine_config = engine_config or {}
self._running = False
self._thread: Optional[threading.Thread] = None
self._lock = threading.Lock()
self._latest: Optional[AudioAnalysis] = None
# Pre-allocated FFT helpers
self._window = np.hanning(chunk_size).astype(np.float32)
self._bands = _build_log_bands(NUM_BANDS, chunk_size, sample_rate)
# Beat detection state
self._energy_history: np.ndarray = np.zeros(43, dtype=np.float64) # ~1s at 44100/2048
self._energy_idx = 0
# Smoothed spectrum (exponential decay between frames)
self._smooth_spectrum = np.zeros(NUM_BANDS, dtype=np.float32)
self._smooth_spectrum_left = np.zeros(NUM_BANDS, dtype=np.float32)
self._smooth_spectrum_right = np.zeros(NUM_BANDS, dtype=np.float32)
self._smoothing_alpha = 0.3 # lower = smoother
# Pre-allocated FFT scratch buffers
self._fft_windowed = np.empty(chunk_size, dtype=np.float32)
self._fft_mag = None # allocated on first use (depends on rfft output size)
# Pre-compute valid band ranges (avoid per-frame bounds checks)
self._valid_bands = None # set after first FFT when fft_mag size is known
# Per-iteration timing (written by capture thread, read by consumers)
self._last_timing: dict = {}
def start(self) -> None:
@@ -140,12 +69,13 @@ class AudioCaptureStream:
self._running = True
self._thread = threading.Thread(
target=self._capture_loop, daemon=True,
name=f"AudioCapture-{self._device_index}-{'lb' if self._is_loopback else 'in'}",
name=f"AudioCapture-{self._engine_type}-{self._device_index}-"
f"{'lb' if self._is_loopback else 'in'}",
)
self._thread.start()
logger.info(
f"AudioCaptureStream started: device={self._device_index} "
f"loopback={self._is_loopback} sr={self._sample_rate} chunk={self._chunk_size}"
f"ManagedAudioStream started: engine={self._engine_type} "
f"device={self._device_index} loopback={self._is_loopback}"
)
def stop(self) -> None:
@@ -155,179 +85,48 @@ class AudioCaptureStream:
self._thread = None
with self._lock:
self._latest = None
logger.info(f"AudioCaptureStream stopped: device={self._device_index}")
logger.info(
f"ManagedAudioStream stopped: engine={self._engine_type} "
f"device={self._device_index}"
)
def get_latest_analysis(self) -> Optional[AudioAnalysis]:
with self._lock:
return self._latest
def get_last_timing(self) -> dict:
"""Return per-iteration timing from the capture loop (ms)."""
return dict(self._last_timing)
def _fft_bands(self, samps, buf, smooth_buf, window, bands, alpha, one_minus_alpha):
"""Compute FFT, bin into bands, normalize, and smooth."""
chunk_size = self._chunk_size
chunk = samps[:chunk_size]
if len(chunk) < chunk_size:
chunk = np.pad(chunk, (0, chunk_size - len(chunk)))
np.multiply(chunk, window, out=self._fft_windowed)
fft_mag = np.abs(np.fft.rfft(self._fft_windowed))
fft_mag *= (1.0 / chunk_size) # in-place scale (faster than /=)
fft_len = len(fft_mag)
for b, (s, e) in enumerate(bands):
if s < fft_len and e <= fft_len:
buf[b] = float(np.mean(fft_mag[s:e]))
else:
buf[b] = 0.0
spec_max = float(np.max(buf))
if spec_max > 1e-6:
buf *= (1.0 / spec_max)
# Exponential smoothing: smooth = alpha * new + (1-alpha) * old
smooth_buf *= one_minus_alpha
smooth_buf += alpha * buf
def _capture_loop(self) -> None:
stream: Optional[AudioCaptureStreamBase] = None
try:
import pyaudiowpatch as pyaudio
except ImportError:
logger.error("PyAudioWPatch is not installed — audio capture unavailable")
self._running = False
return
pa = None
stream = None
try:
pa = pyaudio.PyAudio()
if self._is_loopback:
# Loopback capture: find the loopback device for the output device
loopback_device = self._find_loopback_device(pa, self._device_index)
if loopback_device is None:
logger.error(
f"No loopback device found for output device {self._device_index}"
)
self._running = False
return
device_idx = loopback_device["index"]
channels = loopback_device["maxInputChannels"]
sample_rate = int(loopback_device["defaultSampleRate"])
else:
# Regular input device
device_idx = self._device_index if self._device_index >= 0 else None
if device_idx is not None:
dev_info = pa.get_device_info_by_index(device_idx)
channels = max(1, dev_info["maxInputChannels"])
sample_rate = int(dev_info["defaultSampleRate"])
else:
channels = 1
sample_rate = self._sample_rate
# Update FFT helpers if sample rate changed
if sample_rate != self._sample_rate:
self._sample_rate = sample_rate
self._bands = _build_log_bands(NUM_BANDS, self._chunk_size, sample_rate)
stream = pa.open(
format=pyaudio.paFloat32,
channels=channels,
rate=sample_rate,
input=True,
input_device_index=device_idx,
frames_per_buffer=self._chunk_size,
stream = AudioEngineRegistry.create_stream(
self._engine_type, self._device_index,
self._is_loopback, self._engine_config,
)
stream.initialize()
sample_rate = stream.sample_rate
chunk_size = stream.chunk_size
channels = stream.channels
analyzer = AudioAnalyzer(sample_rate=sample_rate, chunk_size=chunk_size)
logger.info(
f"Audio stream opened: device={device_idx} loopback={self._is_loopback} "
f"Audio stream opened: engine={self._engine_type} "
f"device={self._device_index} loopback={self._is_loopback} "
f"channels={channels} sr={sample_rate}"
)
spectrum_buf = np.zeros(NUM_BANDS, dtype=np.float32)
spectrum_buf_left = np.zeros(NUM_BANDS, dtype=np.float32)
spectrum_buf_right = np.zeros(NUM_BANDS, dtype=np.float32)
# Pre-allocate channel buffers for stereo splitting
chunk_samples = self._chunk_size
if channels > 1:
_left_buf = np.empty(chunk_samples, dtype=np.float32)
_right_buf = np.empty(chunk_samples, dtype=np.float32)
_mono_buf = np.empty(chunk_samples, dtype=np.float32)
else:
_left_buf = _right_buf = _mono_buf = None
# Pre-allocate scratch for RMS (avoid samples**2 temp array)
_sq_buf = np.empty(chunk_samples, dtype=np.float32)
# Snapshot loop-invariant values
window = self._window
bands = self._bands
energy_history = self._energy_history
energy_len = len(energy_history)
alpha = self._smoothing_alpha
one_minus_alpha = 1.0 - alpha
while self._running:
t_read_start = time.perf_counter()
try:
raw_data = stream.read(self._chunk_size, exception_on_overflow=False)
data = np.frombuffer(raw_data, dtype=np.float32)
except Exception as e:
logger.warning(f"Audio read error: {e}")
raw_data = stream.read_chunk()
if raw_data is None:
time.sleep(0.05)
continue
t_read_end = time.perf_counter()
# Split channels and mix to mono
if channels > 1:
data = data.reshape(-1, channels)
np.copyto(_left_buf, data[:, 0])
np.copyto(_right_buf, data[:, 1] if channels >= 2 else data[:, 0])
np.add(data[:, 0], data[:, 1] if channels >= 2 else data[:, 0], out=_mono_buf)
_mono_buf *= 0.5
samples = _mono_buf
left_samples = _left_buf
right_samples = _right_buf
else:
samples = data
left_samples = samples
right_samples = samples
# RMS and peak — reuse scratch buffer
np.multiply(samples, samples, out=_sq_buf[:len(samples)])
rms = float(np.sqrt(np.mean(_sq_buf[:len(samples)])))
peak = float(np.max(np.abs(samples)))
if channels > 1:
np.multiply(left_samples, left_samples, out=_sq_buf)
left_rms = float(np.sqrt(np.mean(_sq_buf)))
np.multiply(right_samples, right_samples, out=_sq_buf)
right_rms = float(np.sqrt(np.mean(_sq_buf)))
else:
left_rms = rms
right_rms = rms
# Compute FFT for mono, left, right
self._fft_bands(samples, spectrum_buf, self._smooth_spectrum,
window, bands, alpha, one_minus_alpha)
self._fft_bands(left_samples, spectrum_buf_left, self._smooth_spectrum_left,
window, bands, alpha, one_minus_alpha)
self._fft_bands(right_samples, spectrum_buf_right, self._smooth_spectrum_right,
window, bands, alpha, one_minus_alpha)
# Beat detection — compare current energy to rolling average (mono)
np.multiply(samples, samples, out=_sq_buf[:len(samples)])
energy = float(np.sum(_sq_buf[:len(samples)]))
energy_history[self._energy_idx] = energy
self._energy_idx = (self._energy_idx + 1) % energy_len
avg_energy = float(np.mean(energy_history))
beat = False
beat_intensity = 0.0
if avg_energy > 1e-8:
ratio = energy / avg_energy
if ratio > 1.5:
beat = True
beat_intensity = min(1.0, (ratio - 1.0) / 2.0)
analysis = analyzer.analyze(raw_data, channels)
t_fft_end = time.perf_counter()
self._last_timing = {
@@ -335,66 +134,22 @@ class AudioCaptureStream:
"fft_ms": (t_fft_end - t_read_end) * 1000,
}
analysis = AudioAnalysis(
timestamp=time.perf_counter(),
rms=rms,
peak=peak,
spectrum=self._smooth_spectrum.copy(),
beat=beat,
beat_intensity=beat_intensity,
left_rms=left_rms,
left_spectrum=self._smooth_spectrum_left.copy(),
right_rms=right_rms,
right_spectrum=self._smooth_spectrum_right.copy(),
)
with self._lock:
self._latest = analysis
except Exception as e:
logger.error(f"AudioCaptureStream fatal error: {e}", exc_info=True)
logger.error(f"ManagedAudioStream fatal error: {e}", exc_info=True)
finally:
if stream is not None:
try:
stream.stop_stream()
stream.close()
except Exception:
pass
if pa is not None:
try:
pa.terminate()
stream.cleanup()
except Exception:
pass
self._running = False
logger.info(f"AudioCaptureStream loop ended: device={self._device_index}")
@staticmethod
def _find_loopback_device(pa, output_device_index: int) -> Optional[dict]:
"""Find the PyAudioWPatch loopback device for a given output device.
PyAudioWPatch exposes virtual loopback input devices for each WASAPI
output device. We match by name via ``get_loopback_device_info_generator()``.
"""
try:
first_loopback = None
for loopback in pa.get_loopback_device_info_generator():
if first_loopback is None:
first_loopback = loopback
# Default (-1): return first loopback device (typically default speakers)
if output_device_index < 0:
return loopback
# Match by output device name contained in loopback device name
target_info = pa.get_device_info_by_index(output_device_index)
if target_info["name"] in loopback["name"]:
return loopback
# No exact match — return first available loopback
return first_loopback
except Exception as e:
logger.error(f"Error finding loopback device: {e}")
return None
logger.info(
f"ManagedAudioStream loop ended: engine={self._engine_type} "
f"device={self._device_index}"
)
# ---------------------------------------------------------------------------
@@ -402,23 +157,43 @@ class AudioCaptureStream:
# ---------------------------------------------------------------------------
class AudioCaptureManager:
"""Manages shared AudioCaptureStream instances with reference counting.
"""Manages shared ManagedAudioStream instances with reference counting.
Multiple AudioColorStripStreams using the same audio device share a
single capture thread.
single capture thread. Key: (engine_type, device_index, is_loopback).
"""
def __init__(self):
self._streams: Dict[Tuple[int, bool], Tuple[AudioCaptureStream, int]] = {}
self._streams: Dict[
Tuple[str, int, bool],
Tuple[ManagedAudioStream, int],
] = {}
self._lock = threading.Lock()
def acquire(self, device_index: int, is_loopback: bool) -> AudioCaptureStream:
"""Get or create an AudioCaptureStream for the given device.
def acquire(
self,
device_index: int,
is_loopback: bool,
engine_type: Optional[str] = None,
engine_config: Optional[Dict[str, Any]] = None,
) -> ManagedAudioStream:
"""Get or create a ManagedAudioStream for the given device.
Args:
device_index: Audio device index
is_loopback: Whether to capture loopback audio
engine_type: Engine type (falls back to best available if None)
engine_config: Engine-specific configuration
Returns:
Shared AudioCaptureStream instance.
Shared ManagedAudioStream instance.
"""
key = (device_index, is_loopback)
if engine_type is None:
engine_type = AudioEngineRegistry.get_best_available_engine()
if engine_type is None:
raise RuntimeError("No audio capture engines available")
key = (engine_type, device_index, is_loopback)
with self._lock:
if key in self._streams:
stream, ref_count = self._streams[key]
@@ -426,15 +201,27 @@ class AudioCaptureManager:
logger.info(f"Reusing audio capture {key} (ref_count={ref_count + 1})")
return stream
stream = AudioCaptureStream(device_index, is_loopback)
stream = ManagedAudioStream(
engine_type, device_index, is_loopback, engine_config,
)
stream.start()
self._streams[key] = (stream, 1)
logger.info(f"Created audio capture {key}")
return stream
def release(self, device_index: int, is_loopback: bool) -> None:
"""Release a reference to an AudioCaptureStream."""
key = (device_index, is_loopback)
def release(
self,
device_index: int,
is_loopback: bool,
engine_type: Optional[str] = None,
) -> None:
"""Release a reference to a ManagedAudioStream."""
if engine_type is None:
engine_type = AudioEngineRegistry.get_best_available_engine()
if engine_type is None:
return
key = (engine_type, device_index, is_loopback)
with self._lock:
if key not in self._streams:
logger.warning(f"Attempted to release unknown audio capture: {key}")
@@ -463,61 +250,25 @@ class AudioCaptureManager:
@staticmethod
def enumerate_devices() -> List[dict]:
"""List available audio devices for the frontend dropdown.
"""List available audio devices from all registered engines.
Returns list of dicts with device info. Output devices with WASAPI
hostapi are marked as loopback candidates.
Returns list of dicts with device info, each tagged with engine_type.
"""
try:
import pyaudiowpatch as pyaudio
except ImportError:
logger.warning("PyAudioWPatch not installed — no audio devices available")
return []
pa = None
try:
pa = pyaudio.PyAudio()
wasapi_info = pa.get_host_api_info_by_type(pyaudio.paWASAPI)
wasapi_idx = wasapi_info["index"]
result = []
device_count = pa.get_device_count()
for i in range(device_count):
dev = pa.get_device_info_by_index(i)
if dev["hostApi"] != wasapi_idx:
result = []
for engine_type, engine_class in AudioEngineRegistry.get_all_engines().items():
try:
if not engine_class.is_available():
continue
is_input = dev["maxInputChannels"] > 0
is_output = dev["maxOutputChannels"] > 0
if is_input:
for dev in engine_class.enumerate_devices():
result.append({
"index": i,
"name": dev["name"],
"is_input": True,
"is_loopback": False,
"channels": dev["maxInputChannels"],
"default_samplerate": dev["defaultSampleRate"],
"index": dev.index,
"name": dev.name,
"is_input": dev.is_input,
"is_loopback": dev.is_loopback,
"channels": dev.channels,
"default_samplerate": dev.default_samplerate,
"engine_type": engine_type,
})
if is_output:
result.append({
"index": i,
"name": f"{dev['name']} [Loopback]",
"is_input": False,
"is_loopback": True,
"channels": dev["maxOutputChannels"],
"default_samplerate": dev["defaultSampleRate"],
})
return result
except Exception as e:
logger.error(f"Failed to enumerate audio devices: {e}", exc_info=True)
return []
finally:
if pa is not None:
try:
pa.terminate()
except Exception:
pass
except Exception as e:
logger.error(f"Error enumerating devices for engine '{engine_type}': {e}")
return result

View File

@@ -0,0 +1,165 @@
"""Base classes for audio capture engines."""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import numpy as np
@dataclass
class AudioDeviceInfo:
"""Information about an audio device."""
index: int
name: str
is_input: bool
is_loopback: bool
channels: int
default_samplerate: float
class AudioCaptureStreamBase(ABC):
"""Abstract base class for an audio capture session.
An AudioCaptureStreamBase is a stateful session bound to a specific
audio device. It holds device-specific resources and provides raw
audio chunk reading.
Created by AudioCaptureEngine.create_stream().
Lifecycle:
stream = engine.create_stream(device_index, is_loopback, config)
stream.initialize()
chunk = stream.read_chunk()
stream.cleanup()
Or via context manager:
with engine.create_stream(device_index, is_loopback, config) as stream:
chunk = stream.read_chunk()
"""
def __init__(
self,
device_index: int,
is_loopback: bool,
config: Dict[str, Any],
):
self.device_index = device_index
self.is_loopback = is_loopback
self.config = config
self._initialized = False
@property
@abstractmethod
def channels(self) -> int:
"""Number of audio channels in the stream."""
pass
@property
@abstractmethod
def sample_rate(self) -> int:
"""Sample rate of the audio stream."""
pass
@property
@abstractmethod
def chunk_size(self) -> int:
"""Number of frames per read_chunk() call."""
pass
@abstractmethod
def initialize(self) -> None:
"""Initialize audio capture resources.
Raises:
RuntimeError: If initialization fails
"""
pass
@abstractmethod
def cleanup(self) -> None:
"""Release all audio capture resources."""
pass
@abstractmethod
def read_chunk(self) -> Optional[np.ndarray]:
"""Read one chunk of raw audio data.
Returns:
1-D float32 ndarray of interleaved samples (length = chunk_size * channels),
or None if no data available.
Raises:
RuntimeError: If read fails
"""
pass
def __enter__(self):
"""Context manager entry — initialize stream."""
self.initialize()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit — cleanup stream."""
self.cleanup()
class AudioCaptureEngine(ABC):
"""Abstract base class for audio capture engines.
An AudioCaptureEngine is a stateless factory that knows about an audio
capture technology. It can enumerate devices, check availability, and
create AudioCaptureStreamBase instances.
All methods are classmethods — no instance creation needed.
"""
ENGINE_TYPE: str = "base"
ENGINE_PRIORITY: int = 0
@classmethod
@abstractmethod
def is_available(cls) -> bool:
"""Check if this engine is available on the current system."""
pass
@classmethod
@abstractmethod
def get_default_config(cls) -> Dict[str, Any]:
"""Get default configuration for this engine."""
pass
@classmethod
@abstractmethod
def enumerate_devices(cls) -> List[AudioDeviceInfo]:
"""Get list of available audio devices.
Returns:
List of AudioDeviceInfo objects
Raises:
RuntimeError: If unable to detect devices
"""
pass
@classmethod
@abstractmethod
def create_stream(
cls,
device_index: int,
is_loopback: bool,
config: Dict[str, Any],
) -> AudioCaptureStreamBase:
"""Create a capture stream for the specified device.
Args:
device_index: Index of audio device
is_loopback: Whether to capture loopback audio
config: Engine-specific configuration dict
Returns:
Uninitialized AudioCaptureStreamBase. Caller must call
initialize() or use as context manager.
"""
pass

View File

@@ -0,0 +1,156 @@
"""Engine registry and factory for audio capture engines."""
from typing import Any, Dict, List, Optional, Type
from wled_controller.core.audio.base import AudioCaptureEngine, AudioCaptureStreamBase
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class AudioEngineRegistry:
"""Registry for available audio capture engines.
Maintains a registry of all audio engine implementations
and provides factory methods for creating capture streams.
"""
_engines: Dict[str, Type[AudioCaptureEngine]] = {}
@classmethod
def register(cls, engine_class: Type[AudioCaptureEngine]):
"""Register an audio capture engine.
Args:
engine_class: Engine class to register (must inherit from AudioCaptureEngine)
Raises:
ValueError: If engine_class is not a subclass of AudioCaptureEngine
"""
if not issubclass(engine_class, AudioCaptureEngine):
raise ValueError(f"{engine_class} must be a subclass of AudioCaptureEngine")
engine_type = engine_class.ENGINE_TYPE
if engine_type == "base":
raise ValueError("Cannot register base engine type")
if engine_type in cls._engines:
logger.warning(f"Audio engine '{engine_type}' already registered, overwriting")
cls._engines[engine_type] = engine_class
logger.info(f"Registered audio engine: {engine_type}")
@classmethod
def get_engine(cls, engine_type: str) -> Type[AudioCaptureEngine]:
"""Get engine class by type.
Args:
engine_type: Engine type identifier (e.g., "wasapi", "sounddevice")
Returns:
Engine class
Raises:
ValueError: If engine type not found
"""
if engine_type not in cls._engines:
available = ", ".join(cls._engines.keys()) or "none"
raise ValueError(
f"Unknown audio engine type: '{engine_type}'. Available engines: {available}"
)
return cls._engines[engine_type]
@classmethod
def get_available_engines(cls) -> List[str]:
"""Get list of available engine types on this system.
Returns:
List of engine type identifiers that are available
"""
available = []
for engine_type, engine_class in cls._engines.items():
try:
if engine_class.is_available():
available.append(engine_type)
except Exception as e:
logger.error(
f"Error checking availability for audio engine '{engine_type}': {e}"
)
return available
@classmethod
def get_best_available_engine(cls) -> Optional[str]:
"""Get the highest-priority available engine type.
Returns:
Engine type string, or None if no engines are available.
"""
best_type = None
best_priority = -1
for engine_type, engine_class in cls._engines.items():
try:
if engine_class.is_available() and engine_class.ENGINE_PRIORITY > best_priority:
best_priority = engine_class.ENGINE_PRIORITY
best_type = engine_type
except Exception as e:
logger.error(
f"Error checking availability for audio engine '{engine_type}': {e}"
)
return best_type
@classmethod
def get_all_engines(cls) -> Dict[str, Type[AudioCaptureEngine]]:
"""Get all registered engines (available or not).
Returns:
Dictionary mapping engine type to engine class
"""
return cls._engines.copy()
@classmethod
def create_stream(
cls,
engine_type: str,
device_index: int,
is_loopback: bool,
config: Dict[str, Any],
) -> AudioCaptureStreamBase:
"""Create an AudioCaptureStreamBase for the specified engine and device.
Args:
engine_type: Engine type identifier
device_index: Audio device index
is_loopback: Whether to capture loopback audio
config: Engine-specific configuration
Returns:
Uninitialized AudioCaptureStreamBase instance
Raises:
ValueError: If engine type not found or not available
"""
engine_class = cls.get_engine(engine_type)
if not engine_class.is_available():
raise ValueError(
f"Audio engine '{engine_type}' is not available on this system"
)
try:
stream = engine_class.create_stream(device_index, is_loopback, config)
logger.debug(
f"Created audio stream: {engine_type} "
f"(device={device_index}, loopback={is_loopback})"
)
return stream
except Exception as e:
logger.error(f"Failed to create stream for audio engine '{engine_type}': {e}")
raise RuntimeError(
f"Failed to create stream for audio engine '{engine_type}': {e}"
)
@classmethod
def clear_registry(cls):
"""Clear all registered engines (for testing)."""
cls._engines.clear()
logger.debug("Cleared audio engine registry")

View File

@@ -0,0 +1,159 @@
"""Sounddevice audio capture engine (cross-platform, via PortAudio)."""
from typing import Any, Dict, List, Optional
import numpy as np
from wled_controller.core.audio.base import (
AudioCaptureEngine,
AudioCaptureStreamBase,
AudioDeviceInfo,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class SounddeviceCaptureStream(AudioCaptureStreamBase):
"""Audio capture stream using sounddevice (PortAudio)."""
def __init__(self, device_index: int, is_loopback: bool, config: Dict[str, Any]):
super().__init__(device_index, is_loopback, config)
self._sd_stream = None
self._channels = config.get("channels", 2)
self._sample_rate = config.get("sample_rate", 44100)
self._chunk_size = config.get("chunk_size", 2048)
@property
def channels(self) -> int:
return self._channels
@property
def sample_rate(self) -> int:
return self._sample_rate
@property
def chunk_size(self) -> int:
return self._chunk_size
def initialize(self) -> None:
if self._initialized:
return
try:
import sounddevice as sd
except ImportError:
raise RuntimeError("sounddevice is not installed — sounddevice engine unavailable")
# Resolve device
device_id = self.device_index if self.device_index >= 0 else None
if device_id is not None:
dev_info = sd.query_devices(device_id)
self._channels = min(self._channels, int(dev_info["max_input_channels"]))
if self._channels < 1:
raise RuntimeError(
f"Device {device_id} ({dev_info['name']}) has no input channels"
)
self._sample_rate = int(dev_info["default_samplerate"])
self._sd_stream = sd.InputStream(
device=device_id,
channels=self._channels,
samplerate=self._sample_rate,
blocksize=self._chunk_size,
dtype="float32",
)
self._sd_stream.start()
self._initialized = True
logger.info(
f"sounddevice stream opened: device={device_id} loopback={self.is_loopback} "
f"channels={self._channels} sr={self._sample_rate}"
)
def cleanup(self) -> None:
if self._sd_stream is not None:
try:
self._sd_stream.stop()
self._sd_stream.close()
except Exception:
pass
self._sd_stream = None
self._initialized = False
def read_chunk(self) -> Optional[np.ndarray]:
if self._sd_stream is None:
return None
try:
# sd.InputStream.read() returns (data, overflowed)
data, _ = self._sd_stream.read(self._chunk_size)
# data shape: (chunk_size, channels) — flatten to interleaved 1-D
return data.flatten().astype(np.float32)
except Exception as e:
logger.warning(f"sounddevice read error: {e}")
return None
class SounddeviceEngine(AudioCaptureEngine):
"""Sounddevice (PortAudio) audio capture engine — cross-platform."""
ENGINE_TYPE = "sounddevice"
ENGINE_PRIORITY = 5
@classmethod
def is_available(cls) -> bool:
try:
import sounddevice # noqa: F401
return True
except ImportError:
return False
@classmethod
def get_default_config(cls) -> Dict[str, Any]:
return {
"sample_rate": 44100,
"chunk_size": 2048,
}
@classmethod
def enumerate_devices(cls) -> List[AudioDeviceInfo]:
try:
import sounddevice as sd
except ImportError:
return []
try:
devices = sd.query_devices()
result = []
for i, dev in enumerate(devices):
max_in = int(dev["max_input_channels"])
if max_in < 1:
continue
name = dev["name"]
# On PulseAudio/PipeWire, monitor sources are loopback-capable
is_loopback = "monitor" in name.lower()
result.append(AudioDeviceInfo(
index=i,
name=name,
is_input=True,
is_loopback=is_loopback,
channels=max_in,
default_samplerate=dev["default_samplerate"],
))
return result
except Exception as e:
logger.error(f"Failed to enumerate sounddevice devices: {e}", exc_info=True)
return []
@classmethod
def create_stream(
cls,
device_index: int,
is_loopback: bool,
config: Dict[str, Any],
) -> SounddeviceCaptureStream:
merged = {**cls.get_default_config(), **config}
return SounddeviceCaptureStream(device_index, is_loopback, merged)

View File

@@ -0,0 +1,215 @@
"""WASAPI audio capture engine (Windows only, via PyAudioWPatch)."""
from typing import Any, Dict, List, Optional
import numpy as np
from wled_controller.core.audio.base import (
AudioCaptureEngine,
AudioCaptureStreamBase,
AudioDeviceInfo,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class WasapiCaptureStream(AudioCaptureStreamBase):
"""Audio capture stream using PyAudioWPatch (WASAPI)."""
def __init__(self, device_index: int, is_loopback: bool, config: Dict[str, Any]):
super().__init__(device_index, is_loopback, config)
self._pa = None
self._stream = None
self._channels = config.get("channels", 2)
self._sample_rate = config.get("sample_rate", 44100)
self._chunk_size = config.get("chunk_size", 2048)
@property
def channels(self) -> int:
return self._channels
@property
def sample_rate(self) -> int:
return self._sample_rate
@property
def chunk_size(self) -> int:
return self._chunk_size
def initialize(self) -> None:
if self._initialized:
return
try:
import pyaudiowpatch as pyaudio
except ImportError:
raise RuntimeError("PyAudioWPatch is not installed — WASAPI engine unavailable")
self._pa = pyaudio.PyAudio()
if self.is_loopback:
loopback_device = self._find_loopback_device(self._pa, self.device_index)
if loopback_device is None:
self._pa.terminate()
self._pa = None
raise RuntimeError(
f"No loopback device found for output device {self.device_index}"
)
device_idx = loopback_device["index"]
self._channels = loopback_device["maxInputChannels"]
self._sample_rate = int(loopback_device["defaultSampleRate"])
else:
device_idx = self.device_index if self.device_index >= 0 else None
if device_idx is not None:
dev_info = self._pa.get_device_info_by_index(device_idx)
self._channels = max(1, dev_info["maxInputChannels"])
self._sample_rate = int(dev_info["defaultSampleRate"])
self._stream = self._pa.open(
format=pyaudio.paFloat32,
channels=self._channels,
rate=self._sample_rate,
input=True,
input_device_index=device_idx,
frames_per_buffer=self._chunk_size,
)
self._initialized = True
logger.info(
f"WASAPI stream opened: device={device_idx} loopback={self.is_loopback} "
f"channels={self._channels} sr={self._sample_rate}"
)
def cleanup(self) -> None:
if self._stream is not None:
try:
self._stream.stop_stream()
self._stream.close()
except Exception:
pass
self._stream = None
if self._pa is not None:
try:
self._pa.terminate()
except Exception:
pass
self._pa = None
self._initialized = False
def read_chunk(self) -> Optional[np.ndarray]:
if self._stream is None:
return None
try:
raw_data = self._stream.read(self._chunk_size, exception_on_overflow=False)
return np.frombuffer(raw_data, dtype=np.float32)
except Exception as e:
logger.warning(f"WASAPI read error: {e}")
return None
@staticmethod
def _find_loopback_device(pa, output_device_index: int) -> Optional[dict]:
"""Find the PyAudioWPatch loopback device for a given output device."""
try:
first_loopback = None
for loopback in pa.get_loopback_device_info_generator():
if first_loopback is None:
first_loopback = loopback
if output_device_index < 0:
return loopback
target_info = pa.get_device_info_by_index(output_device_index)
if target_info["name"] in loopback["name"]:
return loopback
return first_loopback
except Exception as e:
logger.error(f"Error finding loopback device: {e}")
return None
class WasapiEngine(AudioCaptureEngine):
"""WASAPI audio capture engine (Windows only)."""
ENGINE_TYPE = "wasapi"
ENGINE_PRIORITY = 10
@classmethod
def is_available(cls) -> bool:
try:
import pyaudiowpatch # noqa: F401
return True
except ImportError:
return False
@classmethod
def get_default_config(cls) -> Dict[str, Any]:
return {
"sample_rate": 44100,
"chunk_size": 2048,
}
@classmethod
def enumerate_devices(cls) -> List[AudioDeviceInfo]:
try:
import pyaudiowpatch as pyaudio
except ImportError:
return []
pa = None
try:
pa = pyaudio.PyAudio()
wasapi_info = pa.get_host_api_info_by_type(pyaudio.paWASAPI)
wasapi_idx = wasapi_info["index"]
result = []
device_count = pa.get_device_count()
for i in range(device_count):
dev = pa.get_device_info_by_index(i)
if dev["hostApi"] != wasapi_idx:
continue
is_input = dev["maxInputChannels"] > 0
is_output = dev["maxOutputChannels"] > 0
if is_input:
result.append(AudioDeviceInfo(
index=i,
name=dev["name"],
is_input=True,
is_loopback=False,
channels=dev["maxInputChannels"],
default_samplerate=dev["defaultSampleRate"],
))
if is_output:
result.append(AudioDeviceInfo(
index=i,
name=f"{dev['name']} [Loopback]",
is_input=False,
is_loopback=True,
channels=dev["maxOutputChannels"],
default_samplerate=dev["defaultSampleRate"],
))
return result
except Exception as e:
logger.error(f"Failed to enumerate WASAPI devices: {e}", exc_info=True)
return []
finally:
if pa is not None:
try:
pa.terminate()
except Exception:
pass
@classmethod
def create_stream(
cls,
device_index: int,
is_loopback: bool,
config: Dict[str, Any],
) -> WasapiCaptureStream:
merged = {**cls.get_default_config(), **config}
return WasapiCaptureStream(device_index, is_loopback, merged)