Optimize audio capture and render loop performance

audio_capture.py:
- Move _fft_bands from inner function to method (avoid per-frame closure)
- Pre-allocate channel split buffers and RMS scratch arrays
- Use in-place numpy ops (np.copyto, np.multiply) instead of copies
- In-place FFT smoothing instead of temp array allocation
- Cache loop-invariant values as locals
- Fix energy index to wrap-around instead of unbounded increment

audio_stream.py:
- Pre-compute interpolation arrays (band_x, led_x, full_amp, indices_buf,
  vu_gradient) once on LED count change instead of every frame
- Pre-compute VU meter base/peak float arrays in _update_from_source
- Reuse full_amp and indices_buf buffers across frames
- In-place spectrum smoothing to avoid temp allocations

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-25 00:36:51 +03:00
parent 0cd8304004
commit 04ee2e5830
2 changed files with 132 additions and 68 deletions

View File

@@ -122,6 +122,14 @@ class AudioCaptureStream:
self._smooth_spectrum = np.zeros(NUM_BANDS, dtype=np.float32)
self._smooth_spectrum_left = np.zeros(NUM_BANDS, dtype=np.float32)
self._smooth_spectrum_right = np.zeros(NUM_BANDS, dtype=np.float32)
self._smoothing_alpha = 0.3 # lower = smoother
# Pre-allocated FFT scratch buffers
self._fft_windowed = np.empty(chunk_size, dtype=np.float32)
self._fft_mag = None # allocated on first use (depends on rfft output size)
# Pre-compute valid band ranges (avoid per-frame bounds checks)
self._valid_bands = None # set after first FFT when fft_mag size is known
# Per-iteration timing (written by capture thread, read by consumers)
self._last_timing: dict = {}
@@ -157,6 +165,28 @@ class AudioCaptureStream:
"""Return per-iteration timing from the capture loop (ms)."""
return dict(self._last_timing)
def _fft_bands(self, samps, buf, smooth_buf, window, bands, alpha, one_minus_alpha):
"""Compute FFT, bin into bands, normalize, and smooth."""
chunk_size = self._chunk_size
chunk = samps[:chunk_size]
if len(chunk) < chunk_size:
chunk = np.pad(chunk, (0, chunk_size - len(chunk)))
np.multiply(chunk, window, out=self._fft_windowed)
fft_mag = np.abs(np.fft.rfft(self._fft_windowed))
fft_mag *= (1.0 / chunk_size) # in-place scale (faster than /=)
fft_len = len(fft_mag)
for b, (s, e) in enumerate(bands):
if s < fft_len and e <= fft_len:
buf[b] = float(np.mean(fft_mag[s:e]))
else:
buf[b] = 0.0
spec_max = float(np.max(buf))
if spec_max > 1e-6:
buf *= (1.0 / spec_max)
# Exponential smoothing: smooth = alpha * new + (1-alpha) * old
smooth_buf *= one_minus_alpha
smooth_buf += alpha * buf
def _capture_loop(self) -> None:
try:
import pyaudiowpatch as pyaudio
@@ -217,6 +247,26 @@ class AudioCaptureStream:
spectrum_buf_left = np.zeros(NUM_BANDS, dtype=np.float32)
spectrum_buf_right = np.zeros(NUM_BANDS, dtype=np.float32)
# Pre-allocate channel buffers for stereo splitting
chunk_samples = self._chunk_size
if channels > 1:
_left_buf = np.empty(chunk_samples, dtype=np.float32)
_right_buf = np.empty(chunk_samples, dtype=np.float32)
_mono_buf = np.empty(chunk_samples, dtype=np.float32)
else:
_left_buf = _right_buf = _mono_buf = None
# Pre-allocate scratch for RMS (avoid samples**2 temp array)
_sq_buf = np.empty(chunk_samples, dtype=np.float32)
# Snapshot loop-invariant values
window = self._window
bands = self._bands
energy_history = self._energy_history
energy_len = len(energy_history)
alpha = self._smoothing_alpha
one_minus_alpha = 1.0 - alpha
while self._running:
t_read_start = time.perf_counter()
try:
@@ -231,50 +281,45 @@ class AudioCaptureStream:
# Split channels and mix to mono
if channels > 1:
data = data.reshape(-1, channels)
left_samples = data[:, 0].copy()
right_samples = data[:, 1].copy() if channels >= 2 else left_samples.copy()
samples = data.mean(axis=1).astype(np.float32)
np.copyto(_left_buf, data[:, 0])
np.copyto(_right_buf, data[:, 1] if channels >= 2 else data[:, 0])
np.add(data[:, 0], data[:, 1] if channels >= 2 else data[:, 0], out=_mono_buf)
_mono_buf *= 0.5
samples = _mono_buf
left_samples = _left_buf
right_samples = _right_buf
else:
samples = data
left_samples = samples
right_samples = samples
# RMS and peak (mono)
rms = float(np.sqrt(np.mean(samples ** 2)))
# RMS and peak — reuse scratch buffer
np.multiply(samples, samples, out=_sq_buf[:len(samples)])
rms = float(np.sqrt(np.mean(_sq_buf[:len(samples)])))
peak = float(np.max(np.abs(samples)))
left_rms = float(np.sqrt(np.mean(left_samples ** 2)))
right_rms = float(np.sqrt(np.mean(right_samples ** 2)))
# FFT helper
alpha = 0.3 # smoothing factor (lower = smoother)
def _fft_bands(samps, buf, smooth_buf):
chunk = samps[: self._chunk_size]
if len(chunk) < self._chunk_size:
chunk = np.pad(chunk, (0, self._chunk_size - len(chunk)))
windowed = chunk * self._window
fft_mag = np.abs(np.fft.rfft(windowed))
fft_mag /= self._chunk_size
for b, (s, e) in enumerate(self._bands):
if s < len(fft_mag) and e <= len(fft_mag):
buf[b] = float(np.mean(fft_mag[s:e]))
else:
buf[b] = 0.0
spec_max = float(np.max(buf))
if spec_max > 1e-6:
buf /= spec_max
smooth_buf[:] = alpha * buf + (1.0 - alpha) * smooth_buf
if channels > 1:
np.multiply(left_samples, left_samples, out=_sq_buf)
left_rms = float(np.sqrt(np.mean(_sq_buf)))
np.multiply(right_samples, right_samples, out=_sq_buf)
right_rms = float(np.sqrt(np.mean(_sq_buf)))
else:
left_rms = rms
right_rms = rms
# Compute FFT for mono, left, right
_fft_bands(samples, spectrum_buf, self._smooth_spectrum)
_fft_bands(left_samples, spectrum_buf_left, self._smooth_spectrum_left)
_fft_bands(right_samples, spectrum_buf_right, self._smooth_spectrum_right)
self._fft_bands(samples, spectrum_buf, self._smooth_spectrum,
window, bands, alpha, one_minus_alpha)
self._fft_bands(left_samples, spectrum_buf_left, self._smooth_spectrum_left,
window, bands, alpha, one_minus_alpha)
self._fft_bands(right_samples, spectrum_buf_right, self._smooth_spectrum_right,
window, bands, alpha, one_minus_alpha)
# Beat detection — compare current energy to rolling average (mono)
energy = float(np.sum(samples ** 2))
self._energy_history[self._energy_idx % len(self._energy_history)] = energy
self._energy_idx += 1
avg_energy = float(np.mean(self._energy_history))
np.multiply(samples, samples, out=_sq_buf[:len(samples)])
energy = float(np.sum(_sq_buf[:len(samples)]))
energy_history[self._energy_idx] = energy
self._energy_idx = (self._energy_idx + 1) % energy_len
avg_energy = float(np.mean(energy_history))
beat = False
beat_intensity = 0.0