Files
media-player-server/media_server/services/audio_analyzer.py
T
alexei.dolgolyov 51ec1503f4
Lint & Test / test (push) Successful in 10s
perf(visualizer): cut spectrum + track-switch CPU significantly
Frontend hot path (player.js, background.js):
- visualizer rAF: drop per-frame getComputedStyle('--accent') (cached on
  applyAccentColor), build canvas LinearGradient once per accent change
  instead of 32× per frame, batch all bars into a single beginPath/fill
- FPS-gate canvas redraw via frequencyDataVersion so 60-144 Hz monitors
  stop re-rendering identical frames produced at 30 Hz on the backend
- editorial spectrum bars: replace style.height (layout) with
  transform: scaleY (compositor-only); cache bar refs, pre-compute
  per-bar gain/range, dedup writes at 1/1000 quantization
- coalesce VU needle into the visualizer rAF; cache vuNeedle ref;
  dedup angle writes at 0.1°
- updateUI: status-payload fingerprint short-circuits the redundant
  status_update broadcasts that fire during a track change
- swapArtworkSrc: only force layout reflow when keyframe is in flight;
  drop the ?_=Date.now() cache-buster so identical artwork URLs reuse
  the decoded bitmap; mini/glow imgs only re-set src when changed
- drop the fullscreen MutationObserver — fs-bloom-art is mirrored
  directly from the artwork-swap path, eliminating the second blur paint
- updateProgress: skip text writes when the rounded second hasn't moved;
  POSITION_INTERPOLATION_MS 100 → 250
- background.js: lift resizeBackgroundCanvas out of the rAF body, cache
  step, accept new int-scaled wire format

CSS:
- spectrum bars use transform: scaleY(var(--bar-h-scale)) + transition
  on transform; will-change updated to transform
- album-art-glow and fs-bloom-art switched to small-source-blur trick
  (render at 20-25% size, scale 4-6×, lower blur radius) — visually
  equivalent, ~10-25× cheaper repaint on track change
- drop unused transition: filter on .vinyl-stage #album-art

Backend (audio_analyzer.py, websocket_manager.py):
- pre-allocate windowed and cumsum buffers; replace
  np.concatenate(([0.0], np.cumsum(...))) with cumsum[0]=0 +
  np.cumsum(out=cumsum[1:]); float32 hanning window
- RMS via np.dot(mono, mono) — no astype copy, no ** temp
- int16 wire format (scale=1000) — smaller JSON, no Python float boxing
- versioned data + threading.Event so _audio_broadcast_loop is event-
  driven (ev.wait + monotonic seq dedup) instead of polling on a timer
  with the always-false `data is _last_data` identity check

ruff clean, pytest 7 passed / 3 numpy-skipped, esbuild bundle 113.6 kB.
2026-04-25 18:05:57 +03:00

413 lines
16 KiB
Python

"""Audio spectrum analyzer service using system loopback capture."""
import logging
import math
import platform
import threading
import time
logger = logging.getLogger(__name__)
_np = None
_sc = None
def _load_numpy():
global _np
if _np is None:
try:
import os
import sys
if sys.platform == 'win32':
# Embedded Python doesn't auto-load DLLs from numpy.libs;
# add the directory explicitly so libopenblas can be found.
try:
import importlib.util
spec = importlib.util.find_spec('numpy')
if spec and spec.submodule_search_locations:
numpy_dir = list(spec.submodule_search_locations)[0]
libs_dir = os.path.join(os.path.dirname(numpy_dir), 'numpy.libs')
if os.path.isdir(libs_dir):
os.add_dll_directory(libs_dir)
except Exception:
pass
import numpy as np
_np = np
except Exception as e:
logger.warning("numpy unavailable - audio visualizer disabled: %s", e)
return _np
def _load_soundcard():
global _sc
if _sc is None:
try:
import soundcard as sc
_sc = sc
except Exception as e:
logger.warning("soundcard unavailable - audio visualizer disabled: %s", e)
return _sc
class AudioAnalyzer:
"""Captures system audio loopback and performs real-time FFT analysis."""
def __init__(
self,
num_bins: int = 32,
sample_rate: int = 44100,
chunk_size: int = 1024,
target_fps: int = 30,
device_name: str | None = None,
):
self.num_bins = num_bins
self.sample_rate = sample_rate
self.chunk_size = chunk_size
self.target_fps = target_fps
self.device_name = device_name
self._running = False
self._thread: threading.Thread | None = None
self._lock = threading.Lock()
self._lifecycle_lock = threading.Lock()
self._data: dict | None = None
self._current_device_name: str | None = None
# Generation counter — bumped each time _data is refreshed.
# Lets the broadcast loop dedupe without comparing dict identity
# (which is fragile because we always allocate a new dict).
self._data_seq = 0
# Threading.Event signaled when new frame data is available.
# The broadcast loop awaits this instead of polling on a timer,
# so it wakes up exactly once per produced frame.
self._data_event = threading.Event()
# Slow AGC envelope so the spectrum reflects real dynamics
# instead of being renormalized to peak=1.0 every frame.
# A loud transient (e.g. notification beep) lifts the reference
# for a few seconds afterwards; this is the price of real loudness.
self._spectrum_ref = 0.01
# Pre-compute logarithmic bin edges
self._bin_edges = self._compute_bin_edges()
def _compute_bin_edges(self) -> list[int]:
"""Compute logarithmic frequency bin boundaries for perceptual grouping."""
np = _load_numpy()
if np is None:
return []
fft_size = self.chunk_size // 2 + 1
min_freq = 20.0
max_freq = min(16000.0, self.sample_rate / 2)
edges = []
for i in range(self.num_bins + 1):
freq = min_freq * (max_freq / min_freq) ** (i / self.num_bins)
bin_idx = int(freq * self.chunk_size / self.sample_rate)
edges.append(min(bin_idx, fft_size - 1))
return edges
@property
def available(self) -> bool:
"""Whether audio capture dependencies are available."""
return _load_numpy() is not None and _load_soundcard() is not None
@property
def running(self) -> bool:
"""Whether capture is currently active."""
return self._running
def start(self) -> bool:
"""Start audio capture in a background thread. Returns False if unavailable."""
with self._lifecycle_lock:
if self._running:
return True
if not self.available:
return False
# Reset AGC envelope so a long silent gap between sessions
# doesn't make the first new transients clip at the ceiling.
self._spectrum_ref = 0.01
self._running = True
self._thread = threading.Thread(target=self._capture_loop, daemon=True)
self._thread.start()
return True
def stop(self) -> None:
"""Stop audio capture and cleanup."""
with self._lifecycle_lock:
self._running = False
# Wake any waiter so it can observe _running and exit cleanly.
self._data_event.set()
if self._thread:
self._thread.join(timeout=3.0)
self._thread = None
with self._lock:
self._data = None
self._data_event.clear()
def get_frequency_data(self) -> dict | None:
"""Return latest frequency data (thread-safe). None if not running."""
with self._lock:
return self._data
def get_frequency_data_versioned(self) -> tuple[dict | None, int]:
"""Return (data, seq) so callers can dedupe without identity tricks."""
with self._lock:
return self._data, self._data_seq
@property
def data_event(self) -> threading.Event:
"""Event signaled when a fresh frame is ready. Caller must clear()."""
return self._data_event
@staticmethod
def list_loopback_devices() -> list[dict[str, str]]:
"""List all available loopback audio devices."""
sc = _load_soundcard()
if sc is None:
return []
devices = []
try:
# COM may be needed on Windows for WASAPI
if platform.system() == "Windows":
try:
import comtypes
comtypes.CoInitializeEx(comtypes.COINIT_MULTITHREADED)
except Exception:
pass
loopback_mics = sc.all_microphones(include_loopback=True)
for mic in loopback_mics:
if mic.isloopback:
devices.append({"id": mic.id, "name": mic.name})
except Exception as e:
logger.warning("Failed to list loopback devices: %s", e)
return devices
def _find_loopback_device(self):
"""Find a loopback device for system audio capture."""
sc = _load_soundcard()
if sc is None:
return None
try:
loopback_mics = sc.all_microphones(include_loopback=True)
# If a specific device is requested, find it by name (partial match)
if self.device_name:
target = self.device_name.lower()
for mic in loopback_mics:
if mic.isloopback and target in mic.name.lower():
logger.info("Found requested loopback device: %s", mic.name)
self._current_device_name = mic.name
return mic
logger.warning("Requested device '%s' not found, falling back to default", self.device_name)
# Default: first loopback device
for mic in loopback_mics:
if mic.isloopback:
logger.info("Found loopback device: %s", mic.name)
self._current_device_name = mic.name
return mic
# Fallback: try to get default speaker's loopback
default_speaker = sc.default_speaker()
if default_speaker:
for mic in loopback_mics:
if default_speaker.name in mic.name:
logger.info("Found speaker loopback: %s", mic.name)
self._current_device_name = mic.name
return mic
except Exception as e:
logger.warning("Failed to find loopback device: %s", e)
return None
def set_device(self, device_name: str | None) -> bool:
"""Change the loopback device. Restarts capture if running. Returns True on success."""
was_running = self._running
if was_running:
self.stop()
self.device_name = device_name
self._current_device_name = None
if was_running:
return self.start()
return True
@property
def current_device(self) -> str | None:
"""Return the name of the currently active loopback device."""
return self._current_device_name
def _capture_loop(self) -> None:
"""Background thread: capture audio and compute FFT continuously."""
# Initialize COM on Windows (required for WASAPI/SoundCard)
if platform.system() == "Windows":
try:
import comtypes
comtypes.CoInitializeEx(comtypes.COINIT_MULTITHREADED)
except Exception:
try:
import ctypes
ctypes.windll.ole32.CoInitializeEx(0, 0)
except Exception as e:
logger.warning("Failed to initialize COM: %s", e)
np = _load_numpy()
sc = _load_soundcard()
if np is None or sc is None:
self._running = False
return
device = self._find_loopback_device()
if device is None:
logger.warning("No loopback audio device found - visualizer disabled")
self._running = False
return
interval = 1.0 / self.target_fps
# Float32 window — matches soundcard's typical buffer dtype and
# halves FFT memory traffic vs. the default float64.
window = np.hanning(self.chunk_size).astype(np.float32)
# Pre-compute bin edge pairs for vectorized grouping
edges = self._bin_edges
bin_starts = np.array([edges[i] for i in range(self.num_bins)], dtype=np.intp)
bin_ends = np.array([max(edges[i + 1], edges[i] + 1) for i in range(self.num_bins)], dtype=np.intp)
# Counts are constant — compute once.
bin_counts = (bin_ends - bin_starts).astype(np.float32)
# Pre-allocate working buffers so the per-frame allocator churn
# on the capture thread (which runs at target_fps Hz, hours on
# end) drops to zero copies for these arrays.
fft_size = self.chunk_size // 2 + 1
windowed = np.empty(self.chunk_size, dtype=np.float32)
cumsum = np.empty(fft_size + 1, dtype=np.float32)
cumsum[0] = 0.0
try:
with device.recorder(
samplerate=self.sample_rate,
channels=1,
blocksize=self.chunk_size,
) as recorder:
logger.info("Audio capture started on: %s", device.name)
while self._running:
t0 = time.monotonic()
try:
data = recorder.record(numframes=self.chunk_size)
except Exception as e:
logger.debug("Audio capture read error: %s", e)
time.sleep(interval)
continue
# Mono mix if needed
if data.ndim > 1:
mono = data.mean(axis=1)
else:
mono = data.ravel()
if len(mono) < self.chunk_size:
time.sleep(interval)
continue
# Apply window in-place into the pre-allocated buffer.
np.multiply(mono[:self.chunk_size], window, out=windowed)
fft_mag = np.abs(np.fft.rfft(windowed))
# Group into logarithmic bins (vectorized via cumsum).
# Write into the pre-allocated [1:] slice so cumsum[0]
# stays 0.0 and we never allocate a new array.
np.cumsum(fft_mag, out=cumsum[1:])
bins = (cumsum[bin_ends] - cumsum[bin_starts]) / bin_counts
# True loudness from time-domain RMS via single BLAS
# dot — avoids astype() and ** allocations.
mono32 = mono if mono.dtype == np.float32 else mono.astype(np.float32, copy=False)
energy = float(np.dot(mono32, mono32))
if energy > 1e-12:
rms = (energy / mono32.size) ** 0.5
db = 20.0 * math.log10(rms)
# Map -60 dB..-6 dB to 0..1 (typical music range)
level = max(0.0, min(1.0, (db + 60.0) / 54.0))
else:
level = 0.0
# Slow auto-gain: envelope follower with fast attack,
# slow release. Quiet music yields small bars; loud
# passages reach the top; the reference adapts over
# seconds instead of resetting every frame.
current_peak = float(bins.max())
if current_peak > self._spectrum_ref:
self._spectrum_ref += (current_peak - self._spectrum_ref) * 0.05
else:
self._spectrum_ref += (current_peak - self._spectrum_ref) * 0.005
ref = max(self._spectrum_ref, 1e-4)
np.divide(bins, ref, out=bins)
np.clip(bins, 0.0, 1.5, out=bins)
# Bass energy: average of first 4 bins (~20-200Hz)
bass = float(bins[:4].mean()) if self.num_bins >= 4 else 0.0
# Quantize to 0..1000 ints — same wire fidelity as
# 3-decimal floats but smaller GC churn on both ends
# (frontend smooths anyway, so quantization is
# invisible). JSON encodes ints faster than floats.
frequencies = (bins * 1000.0).astype(np.int16).tolist()
bass_i = int(bass * 1000.0)
level_i = int(level * 1000.0)
new_data = {
"frequencies": frequencies,
"bass": bass_i,
"level": level_i,
# Wire-format flag: clients that see this know
# values are 0..1000 ints, not 0..1 floats.
"scale": 1000,
}
with self._lock:
self._data = new_data
self._data_seq += 1
# Wake any broadcast loop waiting on fresh data.
self._data_event.set()
# Throttle to target FPS
elapsed = time.monotonic() - t0
if elapsed < interval:
time.sleep(interval - elapsed)
except Exception as e:
logger.error("Audio capture loop error: %s", e)
finally:
self._running = False
logger.info("Audio capture stopped")
# Global singleton
_analyzer: AudioAnalyzer | None = None
def get_audio_analyzer(
num_bins: int = 32,
sample_rate: int = 44100,
target_fps: int = 25,
device_name: str | None = None,
) -> AudioAnalyzer:
"""Get or create the global AudioAnalyzer instance."""
global _analyzer
if _analyzer is None:
_analyzer = AudioAnalyzer(
num_bins=num_bins,
sample_rate=sample_rate,
target_fps=target_fps,
device_name=device_name,
)
return _analyzer