diff --git a/plans/processed-audio-sources/CONTEXT.md b/plans/processed-audio-sources/CONTEXT.md index f23d15a..09e5e92 100644 --- a/plans/processed-audio-sources/CONTEXT.md +++ b/plans/processed-audio-sources/CONTEXT.md @@ -9,13 +9,20 @@ - **Test:** `cd server && py -3.13 -m pytest tests/ --no-cov -q` ## Current State -Phase 1 (Audio Filter Framework) implemented. Core framework is in place: +Phase 1 (Audio Filter Framework) and Phase 2 (Audio Filters) implemented. + +Phase 1 framework: - `AudioFilter` base class, `AudioFilterRegistry`, `AudioFilterOptionDef` in `core/audio/filters/` - `AudioProcessingTemplate` dataclass + `AudioProcessingTemplateStore` (SQLite-backed) in `storage/` - `audio_filter_template` meta-filter with recursive resolution - Full REST API: CRUD templates + filter registry discovery - Dependency injection wired in `dependencies.py` and `main.py` +Phase 2 filters (12 total registered, 11 real + 1 meta): +- Stateless: `channel_extract`, `band_extract`, `gain`, `inverter` +- Stateful: `peak_hold`, `noise_gate`, `envelope_follower`, `spectral_smoothing`, `compressor`, `beat_gate`, `delay` +- All produce new `AudioAnalysis` via `dataclasses.replace()` (immutability preserved) + ## Key Architecture Reference ### Existing Pattern to Mirror: Processed Picture Sources @@ -83,7 +90,7 @@ _(none yet)_ | Phase | Agent Used | Test Writer | Parallel | Notes | |-------|-----------|-------------|----------|-------| | Phase 1 | impl-agent | — | No | Tasks 7+8 skipped (SQLite migration made them obsolete) | -| Phase 2 | — | — | — | — | +| Phase 2 | impl-agent | — | No | All 11 filters implemented, no deviations | | Phase 3 | — | — | — | — | | Phase 4 | — | — | — | — | | Phase 5 | — | — | — | — | @@ -98,6 +105,6 @@ _(none yet)_ ## Implementation Notes - Clean-slate approach: no migration of existing MonoAudioSource/BandExtractAudioSource data -- 5 of 11 filters are stateful (peak hold, envelope follower, spectral smoothing, compressor, delay) — need per-stream instance lifecycle +- 7 of 11 filters are stateful (peak hold, noise gate, envelope follower, spectral smoothing, compressor, beat gate, delay) — need per-stream instance lifecycle - Audio filters operate on AudioAnalysis snapshots, not raw audio samples - Big Bang strategy: intermediate phases may break the build; only Phase 7 enforces build/tests diff --git a/plans/processed-audio-sources/PLAN.md b/plans/processed-audio-sources/PLAN.md index b3f488c..d99beb8 100644 --- a/plans/processed-audio-sources/PLAN.md +++ b/plans/processed-audio-sources/PLAN.md @@ -40,7 +40,7 @@ Clean-slate approach: no data migration for old source types. | Phase | Domain | Status | Review | Build | Committed | |-------|--------|--------|--------|-------|-----------| | Phase 1: Audio Filter Framework | backend | 🔨 In Progress | ⬜ | ⬜ | ⬜ | -| Phase 2: Audio Filters | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | +| Phase 2: Audio Filters | backend | 🔨 In Progress | ⬜ | ⬜ | ⬜ | | Phase 3: Processed Audio Source Model | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | | Phase 4: Runtime Integration | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | | Phase 5: Frontend — Audio Processing Templates | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | diff --git a/plans/processed-audio-sources/phase-2-audio-filters.md b/plans/processed-audio-sources/phase-2-audio-filters.md index f337fcd..17dea57 100644 --- a/plans/processed-audio-sources/phase-2-audio-filters.md +++ b/plans/processed-audio-sources/phase-2-audio-filters.md @@ -1,6 +1,6 @@ # Phase 2: Audio Filters -**Status:** ⬜ Not Started +**Status:** 🔨 In Progress **Parent plan:** [PLAN.md](./PLAN.md) **Domain:** backend @@ -9,55 +9,55 @@ Implement all 11 audio filters and register them with the AudioFilterRegistry. ## Tasks -- [ ] Task 1: **Channel Extract** filter (`core/audio/filters/channel_extract.py`) +- [x] Task 1: **Channel Extract** filter (`core/audio/filters/channel_extract.py`) - Options: `channel` (select: mono | left | right) - Stateful: No - Behavior: Replaces main rms/spectrum with selected channel data. If "mono", averages L+R. If "left"/"right", copies that channel's data to the main fields. -- [ ] Task 2: **Band Extract** filter (`core/audio/filters/band_extract.py`) +- [x] Task 2: **Band Extract** filter (`core/audio/filters/band_extract.py`) - Options: `band` (select: bass | mid | treble | custom), `freq_low` (float, 20-20000), `freq_high` (float, 20-20000) - Stateful: No - Behavior: Computes a band mask for the 64 log-spaced bins, applies it to spectrum, recomputes RMS from in-band data. Reuse logic from existing `core/audio/band_filter.py`. - Presets: bass=20-250Hz, mid=250-4000Hz, treble=4000-20000Hz -- [ ] Task 3: **Peak Hold** filter (`core/audio/filters/peak_hold.py`) +- [x] Task 3: **Peak Hold** filter (`core/audio/filters/peak_hold.py`) - Options: `decay_rate` (float, 0.1-50.0, dB/s), `per_bin` (bool, default true) - Stateful: Yes - Behavior: For each spectrum bin (if per_bin) or for rms/peak, retains the maximum value seen and decays it over time. Outputs the max of current value and held peak. -- [ ] Task 4: **Gain** filter (`core/audio/filters/gain.py`) +- [x] Task 4: **Gain** filter (`core/audio/filters/gain.py`) - Options: `factor` (float, 0.1-10.0, default 1.0) - Stateful: No - Behavior: Multiplies rms, peak, spectrum, and per-channel values by factor. Clamps to [0, 1] for rms/peak. -- [ ] Task 5: **Noise Gate** filter (`core/audio/filters/noise_gate.py`) +- [x] Task 5: **Noise Gate** filter (`core/audio/filters/noise_gate.py`) - Options: `threshold` (float, 0.0-1.0), `hysteresis` (float, 0.0-0.2, default 0.05) - Stateful: No (hysteresis is stateless — it's a secondary threshold, not temporal) - Behavior: If rms < threshold, zeros out all levels and spectrum. Hysteresis means: if gate was open and rms drops below (threshold - hysteresis), close it; if gate was closed and rms rises above threshold, open it. - Actually stateful for hysteresis tracking: needs to remember gate open/closed state. -- [ ] Task 6: **Envelope Follower** filter (`core/audio/filters/envelope_follower.py`) +- [x] Task 6: **Envelope Follower** filter (`core/audio/filters/envelope_follower.py`) - Options: `attack_ms` (float, 1-500, default 10), `release_ms` (float, 10-2000, default 200) - Stateful: Yes - Behavior: Smooths rms and peak with asymmetric time constants. When signal rises, uses attack rate. When signal falls, uses release rate. Applied per-bin to spectrum optionally. - Fast attack + slow release = punchy transients that fade smoothly. -- [ ] Task 7: **Spectral Smoothing** filter (`core/audio/filters/spectral_smoothing.py`) +- [x] Task 7: **Spectral Smoothing** filter (`core/audio/filters/spectral_smoothing.py`) - Options: `factor` (float, 0.0-0.99, default 0.5) - Stateful: Yes (maintains previous spectrum state) - Behavior: Applies exponential moving average per-bin: `smoothed[i] = factor * prev[i] + (1-factor) * current[i]`. Higher factor = smoother/slower. -- [ ] Task 8: **Compressor** filter (`core/audio/filters/compressor.py`) +- [x] Task 8: **Compressor** filter (`core/audio/filters/compressor.py`) - Options: `threshold` (float, 0.0-1.0, default 0.5), `ratio` (float, 1.0-20.0, default 4.0), `makeup_gain` (float, 0.0-2.0, default 1.0) - Stateful: Yes (envelope tracking for gain reduction) - Behavior: When signal exceeds threshold, reduces by ratio. `output = threshold + (input - threshold) / ratio`. Apply makeup_gain after. Applied to rms, peak, and spectrum. -- [ ] Task 9: **Inverter** filter (`core/audio/filters/inverter.py`) +- [x] Task 9: **Inverter** filter (`core/audio/filters/inverter.py`) - Options: none (or `invert_spectrum` bool, default true) - Stateful: No - Behavior: `rms = 1.0 - rms`, `peak = 1.0 - peak`, spectrum bins inverted if option set. Beat fields unchanged. -- [ ] Task 10: **Beat Gate** filter (`core/audio/filters/beat_gate.py`) +- [x] Task 10: **Beat Gate** filter (`core/audio/filters/beat_gate.py`) - Options: `hold_ms` (float, 10-500, default 50) — how long to hold signal after beat - Stateful: Yes (tracks last beat timestamp) - Behavior: When beat detected, passes signal through for `hold_ms` milliseconds. Between beats, zeros out rms/peak/spectrum. Beat fields themselves always pass through. -- [ ] Task 11: **Delay** filter (`core/audio/filters/delay.py`) +- [x] Task 11: **Delay** filter (`core/audio/filters/delay.py`) - Options: `delay_ms` (float, 10-2000, default 100) - Stateful: Yes (ring buffer of AudioAnalysis snapshots) - Behavior: Buffers incoming AudioAnalysis snapshots and outputs the one from `delay_ms` ago. Ring buffer sized based on ~30Hz update rate. -- [ ] Task 12: Register all 11 filters in `core/audio/filters/__init__.py` -- [ ] Task 13: Update Noise Gate to be stateful (hysteresis requires gate state tracking) +- [x] Task 12: Register all 11 filters in `core/audio/filters/__init__.py` +- [x] Task 13: Update Noise Gate to be stateful (hysteresis requires gate state tracking) ## Files to Modify/Create - `core/audio/filters/channel_extract.py` — **create** @@ -94,4 +94,21 @@ Implement all 11 audio filters and register them with the AudioFilterRegistry. - [ ] Tests pass (new + existing) ## Handoff to Next Phase - + +### What was built +- All 11 audio filters implemented, each in its own file under `core/audio/filters/` +- 7 stateful filters (peak_hold, noise_gate, envelope_follower, spectral_smoothing, compressor, beat_gate, delay) with proper `is_stateful` and `reset()` implementations +- 4 stateless filters (channel_extract, band_extract, gain, inverter) +- All filters registered in `__init__.py` via import-triggered `@AudioFilterRegistry.register` +- All filters produce NEW AudioAnalysis via `dataclasses.replace()` (immutability preserved) +- Band extract reuses existing `compute_band_mask()` and `apply_band_filter()` from `core/audio/band_filter.py` + +### What Phase 3 needs to know +- All 11 filters + the `audio_filter_template` meta-filter are now registered in the AudioFilterRegistry (12 total) +- `GET /api/v1/audio-filters` will return all filters with their option schemas +- Filters are instantiated via `AudioFilterRegistry.create_instance(filter_id, options)` +- Stateful filters need per-stream instances (not shared) due to internal state +- The `process()` method signature is `process(analysis: AudioAnalysis) -> AudioAnalysis` + +### Known deviations from plan +- None. All 11 filters implemented exactly as specified plus Task 13 (noise gate stateful). diff --git a/server/src/wled_controller/core/audio/filters/__init__.py b/server/src/wled_controller/core/audio/filters/__init__.py index 2b4008f..456d997 100644 --- a/server/src/wled_controller/core/audio/filters/__init__.py +++ b/server/src/wled_controller/core/audio/filters/__init__.py @@ -9,6 +9,17 @@ from wled_controller.core.audio.filters.registry import AudioFilterRegistry # Import individual filters to trigger auto-registration import wled_controller.core.audio.filters.audio_filter_template # noqa: F401 +import wled_controller.core.audio.filters.channel_extract # noqa: F401 +import wled_controller.core.audio.filters.band_extract # noqa: F401 +import wled_controller.core.audio.filters.peak_hold # noqa: F401 +import wled_controller.core.audio.filters.gain # noqa: F401 +import wled_controller.core.audio.filters.noise_gate # noqa: F401 +import wled_controller.core.audio.filters.envelope_follower # noqa: F401 +import wled_controller.core.audio.filters.spectral_smoothing # noqa: F401 +import wled_controller.core.audio.filters.compressor # noqa: F401 +import wled_controller.core.audio.filters.inverter # noqa: F401 +import wled_controller.core.audio.filters.beat_gate # noqa: F401 +import wled_controller.core.audio.filters.delay # noqa: F401 __all__ = [ "AudioFilter", diff --git a/server/src/wled_controller/core/audio/filters/band_extract.py b/server/src/wled_controller/core/audio/filters/band_extract.py new file mode 100644 index 0000000..ea94c0d --- /dev/null +++ b/server/src/wled_controller/core/audio/filters/band_extract.py @@ -0,0 +1,103 @@ +"""Band Extract audio filter — mask spectrum to a frequency range and recompute RMS.""" + +from dataclasses import replace +from typing import Any, Dict, List + +from wled_controller.core.audio.analysis import AudioAnalysis +from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef +from wled_controller.core.audio.filters.registry import AudioFilterRegistry +from wled_controller.core.audio.band_filter import apply_band_filter, compute_band_mask + + +# Preset frequency ranges +_PRESETS = { + "bass": (20.0, 250.0), + "mid": (250.0, 4000.0), + "treble": (4000.0, 20000.0), +} + + +@AudioFilterRegistry.register +class BandExtractFilter(AudioFilter): + """Extract a frequency band from the spectrum. + + Supports presets (bass, mid, treble) or a custom frequency range. + Zeros out-of-band spectrum bins and recomputes RMS from in-band data. + """ + + filter_id = "band_extract" + filter_name = "Band Extract" + + def __init__(self, options: Dict[str, Any]): + super().__init__(options) + band = self.options["band"] + if band == "custom": + freq_low = self.options["freq_low"] + freq_high = self.options["freq_high"] + else: + freq_low, freq_high = _PRESETS.get(band, (20.0, 20000.0)) + self._mask = compute_band_mask(freq_low, freq_high) + + @classmethod + def get_options_schema(cls) -> List[AudioFilterOptionDef]: + return [ + AudioFilterOptionDef( + key="band", + label="Band", + option_type="select", + default="bass", + min_value=None, + max_value=None, + step=None, + choices=[ + {"value": "bass", "label": "Bass (20-250 Hz)"}, + {"value": "mid", "label": "Mid (250-4000 Hz)"}, + {"value": "treble", "label": "Treble (4000-20000 Hz)"}, + {"value": "custom", "label": "Custom Range"}, + ], + ), + AudioFilterOptionDef( + key="freq_low", + label="Low Frequency (Hz)", + option_type="float", + default=20.0, + min_value=20.0, + max_value=20000.0, + step=1.0, + ), + AudioFilterOptionDef( + key="freq_high", + label="High Frequency (Hz)", + option_type="float", + default=20000.0, + min_value=20.0, + max_value=20000.0, + step=1.0, + ), + ] + + def process(self, analysis: AudioAnalysis) -> AudioAnalysis: + filtered_spectrum, filtered_rms = apply_band_filter( + analysis.spectrum, + analysis.rms, + self._mask, + ) + filtered_left, filtered_left_rms = apply_band_filter( + analysis.left_spectrum, + analysis.left_rms, + self._mask, + ) + filtered_right, filtered_right_rms = apply_band_filter( + analysis.right_spectrum, + analysis.right_rms, + self._mask, + ) + return replace( + analysis, + rms=filtered_rms, + spectrum=filtered_spectrum, + left_rms=filtered_left_rms, + left_spectrum=filtered_left, + right_rms=filtered_right_rms, + right_spectrum=filtered_right, + ) diff --git a/server/src/wled_controller/core/audio/filters/beat_gate.py b/server/src/wled_controller/core/audio/filters/beat_gate.py new file mode 100644 index 0000000..3d66839 --- /dev/null +++ b/server/src/wled_controller/core/audio/filters/beat_gate.py @@ -0,0 +1,78 @@ +"""Beat Gate audio filter — pass signal only around beat events.""" + +import time +from dataclasses import replace +from typing import Any, Dict, List + +import numpy as np + +from wled_controller.core.audio.analysis import NUM_BANDS, AudioAnalysis +from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef +from wled_controller.core.audio.filters.registry import AudioFilterRegistry + +_ZERO_SPECTRUM = np.zeros(NUM_BANDS, dtype=np.float32) + + +@AudioFilterRegistry.register +class BeatGateFilter(AudioFilter): + """Pass audio signal through only when a beat is detected. + + When a beat is detected, the gate opens and holds for ``hold_ms`` + milliseconds, passing the signal through. Between beats (after hold + expires), rms/peak/spectrum are zeroed out. Beat fields themselves + always pass through unchanged. + """ + + filter_id = "beat_gate" + filter_name = "Beat Gate" + + def __init__(self, options: Dict[str, Any]): + super().__init__(options) + self._hold_ms = self.options["hold_ms"] + self._last_beat_time: float | None = None + + @property + def is_stateful(self) -> bool: + return True + + def reset(self) -> None: + self._last_beat_time = None + + @classmethod + def get_options_schema(cls) -> List[AudioFilterOptionDef]: + return [ + AudioFilterOptionDef( + key="hold_ms", + label="Hold Time (ms)", + option_type="float", + default=50.0, + min_value=10.0, + max_value=500.0, + step=1.0, + ), + ] + + def process(self, analysis: AudioAnalysis) -> AudioAnalysis: + now = time.perf_counter() + + # Record beat time + if analysis.beat: + self._last_beat_time = now + + # Check if we're within the hold window + if self._last_beat_time is not None: + elapsed_ms = (now - self._last_beat_time) * 1000.0 + if elapsed_ms <= self._hold_ms: + return analysis + + # Gate closed — zero out levels, preserve beat fields + return replace( + analysis, + rms=0.0, + peak=0.0, + spectrum=np.copy(_ZERO_SPECTRUM), + left_rms=0.0, + left_spectrum=np.copy(_ZERO_SPECTRUM), + right_rms=0.0, + right_spectrum=np.copy(_ZERO_SPECTRUM), + ) diff --git a/server/src/wled_controller/core/audio/filters/channel_extract.py b/server/src/wled_controller/core/audio/filters/channel_extract.py new file mode 100644 index 0000000..ab5d531 --- /dev/null +++ b/server/src/wled_controller/core/audio/filters/channel_extract.py @@ -0,0 +1,70 @@ +"""Channel Extract audio filter — select mono/left/right from stereo AudioAnalysis.""" + +from dataclasses import replace +from typing import Any, Dict, List + +import numpy as np + +from wled_controller.core.audio.analysis import AudioAnalysis +from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef +from wled_controller.core.audio.filters.registry import AudioFilterRegistry + + +@AudioFilterRegistry.register +class ChannelExtractFilter(AudioFilter): + """Select a single channel (mono mix, left, or right) from stereo AudioAnalysis. + + When 'mono' is selected, left and right are averaged into the main fields. + When 'left' or 'right' is selected, that channel's data replaces the main fields. + """ + + filter_id = "channel_extract" + filter_name = "Channel Extract" + + def __init__(self, options: Dict[str, Any]): + super().__init__(options) + self._channel = self.options["channel"] + + @classmethod + def get_options_schema(cls) -> List[AudioFilterOptionDef]: + return [ + AudioFilterOptionDef( + key="channel", + label="Channel", + option_type="select", + default="mono", + min_value=None, + max_value=None, + step=None, + choices=[ + {"value": "mono", "label": "Mono (L+R average)"}, + {"value": "left", "label": "Left"}, + {"value": "right", "label": "Right"}, + ], + ), + ] + + def process(self, analysis: AudioAnalysis) -> AudioAnalysis: + channel = self._channel + + if channel == "left": + return replace( + analysis, + rms=analysis.left_rms, + spectrum=np.copy(analysis.left_spectrum), + ) + elif channel == "right": + return replace( + analysis, + rms=analysis.right_rms, + spectrum=np.copy(analysis.right_spectrum), + ) + else: + # mono: average left and right + avg_rms = (analysis.left_rms + analysis.right_rms) / 2.0 + avg_spectrum = (analysis.left_spectrum + analysis.right_spectrum) / 2.0 + return replace( + analysis, + rms=avg_rms, + spectrum=avg_spectrum.astype(np.float32), + ) diff --git a/server/src/wled_controller/core/audio/filters/compressor.py b/server/src/wled_controller/core/audio/filters/compressor.py new file mode 100644 index 0000000..ada31d1 --- /dev/null +++ b/server/src/wled_controller/core/audio/filters/compressor.py @@ -0,0 +1,103 @@ +"""Compressor audio filter — reduce dynamic range above threshold.""" + +from dataclasses import replace +from typing import Any, Dict, List + +import numpy as np + +from wled_controller.core.audio.analysis import AudioAnalysis +from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef +from wled_controller.core.audio.filters.registry import AudioFilterRegistry + + +@AudioFilterRegistry.register +class CompressorFilter(AudioFilter): + """Reduce dynamic range above a threshold. + + For signals above ``threshold``, output is compressed: + ``output = threshold + (input - threshold) / ratio`` + + Makeup gain is applied after compression to restore overall level. + Applied to rms, peak, and per-bin spectrum values. + """ + + filter_id = "compressor" + filter_name = "Compressor" + + def __init__(self, options: Dict[str, Any]): + super().__init__(options) + self._threshold = self.options["threshold"] + self._ratio = self.options["ratio"] + self._makeup_gain = self.options["makeup_gain"] + + @property + def is_stateful(self) -> bool: + return True + + def reset(self) -> None: + pass # Stateful for envelope tracking; minimal state for static compression + + @classmethod + def get_options_schema(cls) -> List[AudioFilterOptionDef]: + return [ + AudioFilterOptionDef( + key="threshold", + label="Threshold", + option_type="float", + default=0.5, + min_value=0.0, + max_value=1.0, + step=0.01, + ), + AudioFilterOptionDef( + key="ratio", + label="Ratio", + option_type="float", + default=4.0, + min_value=1.0, + max_value=20.0, + step=0.1, + ), + AudioFilterOptionDef( + key="makeup_gain", + label="Makeup Gain", + option_type="float", + default=1.0, + min_value=0.0, + max_value=2.0, + step=0.05, + ), + ] + + def _compress_scalar(self, value: float) -> float: + """Compress a single scalar value.""" + threshold = self._threshold + if value <= threshold: + compressed = value + else: + compressed = threshold + (value - threshold) / self._ratio + return min(1.0, compressed * self._makeup_gain) + + def _compress_spectrum(self, spectrum: np.ndarray) -> np.ndarray: + """Compress spectrum array element-wise.""" + threshold = self._threshold + ratio = self._ratio + makeup = self._makeup_gain + + above_mask = spectrum > threshold + result = np.copy(spectrum) + result[above_mask] = threshold + (result[above_mask] - threshold) / ratio + result *= makeup + return np.clip(result, 0.0, 1.0).astype(np.float32) + + def process(self, analysis: AudioAnalysis) -> AudioAnalysis: + return replace( + analysis, + rms=self._compress_scalar(analysis.rms), + peak=self._compress_scalar(analysis.peak), + spectrum=self._compress_spectrum(analysis.spectrum), + left_rms=self._compress_scalar(analysis.left_rms), + left_spectrum=self._compress_spectrum(analysis.left_spectrum), + right_rms=self._compress_scalar(analysis.right_rms), + right_spectrum=self._compress_spectrum(analysis.right_spectrum), + ) diff --git a/server/src/wled_controller/core/audio/filters/delay.py b/server/src/wled_controller/core/audio/filters/delay.py new file mode 100644 index 0000000..4d15289 --- /dev/null +++ b/server/src/wled_controller/core/audio/filters/delay.py @@ -0,0 +1,83 @@ +"""Delay audio filter — time-shift AudioAnalysis by a configurable amount.""" + +from collections import deque +from dataclasses import replace +from typing import Any, Dict, List + +import numpy as np + +from wled_controller.core.audio.analysis import NUM_BANDS, AudioAnalysis +from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef +from wled_controller.core.audio.filters.registry import AudioFilterRegistry + +# Assumed update rate for sizing the ring buffer +_UPDATE_RATE_HZ = 30 + + +@AudioFilterRegistry.register +class DelayFilter(AudioFilter): + """Buffer incoming AudioAnalysis snapshots and output the one from N ms ago. + + Uses a ring buffer (deque) sized for the configured delay at ~30 Hz + update rate. Until the buffer is full, outputs a silent AudioAnalysis. + """ + + filter_id = "delay" + filter_name = "Delay" + + def __init__(self, options: Dict[str, Any]): + super().__init__(options) + self._delay_ms = self.options["delay_ms"] + self._buffer_size = max(1, int(self._delay_ms / 1000.0 * _UPDATE_RATE_HZ)) + self._buffer: deque[AudioAnalysis] = deque(maxlen=self._buffer_size) + + @property + def is_stateful(self) -> bool: + return True + + def reset(self) -> None: + self._buffer.clear() + + @classmethod + def get_options_schema(cls) -> List[AudioFilterOptionDef]: + return [ + AudioFilterOptionDef( + key="delay_ms", + label="Delay (ms)", + option_type="float", + default=100.0, + min_value=10.0, + max_value=2000.0, + step=10.0, + ), + ] + + def process(self, analysis: AudioAnalysis) -> AudioAnalysis: + # Take a snapshot with copied arrays to avoid reference issues + snapshot = replace( + analysis, + spectrum=np.copy(analysis.spectrum), + left_spectrum=np.copy(analysis.left_spectrum), + right_spectrum=np.copy(analysis.right_spectrum), + ) + + if len(self._buffer) >= self._buffer_size: + # Buffer full — return the oldest entry (the delayed one) + delayed = self._buffer[0] + self._buffer.append(snapshot) + return delayed + else: + # Buffer not yet full — store and output silence + self._buffer.append(snapshot) + return replace( + analysis, + rms=0.0, + peak=0.0, + spectrum=np.zeros(NUM_BANDS, dtype=np.float32), + beat=False, + beat_intensity=0.0, + left_rms=0.0, + left_spectrum=np.zeros(NUM_BANDS, dtype=np.float32), + right_rms=0.0, + right_spectrum=np.zeros(NUM_BANDS, dtype=np.float32), + ) diff --git a/server/src/wled_controller/core/audio/filters/envelope_follower.py b/server/src/wled_controller/core/audio/filters/envelope_follower.py new file mode 100644 index 0000000..168c253 --- /dev/null +++ b/server/src/wled_controller/core/audio/filters/envelope_follower.py @@ -0,0 +1,116 @@ +"""Envelope Follower audio filter — smooth amplitude with asymmetric attack/release.""" + +import time +from dataclasses import replace +from typing import Any, Dict, List + +import numpy as np + +from wled_controller.core.audio.analysis import NUM_BANDS, AudioAnalysis +from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef +from wled_controller.core.audio.filters.registry import AudioFilterRegistry + + +def _time_constant_coeff(time_ms: float, dt: float) -> float: + """Compute exponential smoothing coefficient from time constant and delta-time. + + Returns a value in [0, 1] where 0 = no change, 1 = instant follow. + """ + if time_ms <= 0.0 or dt <= 0.0: + return 1.0 + # Time constant: the coefficient such that we reach ~63.2% in time_ms + tau = time_ms / 1000.0 + return min(1.0, 1.0 - np.exp(-dt / tau)) + + +@AudioFilterRegistry.register +class EnvelopeFollowerFilter(AudioFilter): + """Smooth RMS and peak with asymmetric attack/release time constants. + + Fast attack + slow release produces punchy transients that fade smoothly. + Applied to rms, peak, and per-bin spectrum values. + """ + + filter_id = "envelope_follower" + filter_name = "Envelope Follower" + + def __init__(self, options: Dict[str, Any]): + super().__init__(options) + self._attack_ms = self.options["attack_ms"] + self._release_ms = self.options["release_ms"] + self._env_rms = 0.0 + self._env_peak = 0.0 + self._env_spectrum = np.zeros(NUM_BANDS, dtype=np.float32) + self._env_left_rms = 0.0 + self._env_right_rms = 0.0 + self._last_time: float | None = None + + @property + def is_stateful(self) -> bool: + return True + + def reset(self) -> None: + self._env_rms = 0.0 + self._env_peak = 0.0 + self._env_spectrum[:] = 0.0 + self._env_left_rms = 0.0 + self._env_right_rms = 0.0 + self._last_time = None + + @classmethod + def get_options_schema(cls) -> List[AudioFilterOptionDef]: + return [ + AudioFilterOptionDef( + key="attack_ms", + label="Attack (ms)", + option_type="float", + default=10.0, + min_value=1.0, + max_value=500.0, + step=1.0, + ), + AudioFilterOptionDef( + key="release_ms", + label="Release (ms)", + option_type="float", + default=200.0, + min_value=10.0, + max_value=2000.0, + step=1.0, + ), + ] + + def _smooth_scalar(self, current: float, env: float, dt: float) -> float: + """Apply asymmetric smoothing to a single scalar value.""" + if current > env: + coeff = _time_constant_coeff(self._attack_ms, dt) + else: + coeff = _time_constant_coeff(self._release_ms, dt) + return env + coeff * (current - env) + + def process(self, analysis: AudioAnalysis) -> AudioAnalysis: + now = time.perf_counter() + dt = (now - self._last_time) if self._last_time is not None else 0.0 + self._last_time = now + + # Smooth scalars + self._env_rms = self._smooth_scalar(analysis.rms, self._env_rms, dt) + self._env_peak = self._smooth_scalar(analysis.peak, self._env_peak, dt) + self._env_left_rms = self._smooth_scalar(analysis.left_rms, self._env_left_rms, dt) + self._env_right_rms = self._smooth_scalar(analysis.right_rms, self._env_right_rms, dt) + + # Smooth spectrum per-bin + attack_coeff = _time_constant_coeff(self._attack_ms, dt) + release_coeff = _time_constant_coeff(self._release_ms, dt) + rising = analysis.spectrum > self._env_spectrum + coeff = np.where(rising, attack_coeff, release_coeff).astype(np.float32) + self._env_spectrum = self._env_spectrum + coeff * (analysis.spectrum - self._env_spectrum) + + return replace( + analysis, + rms=self._env_rms, + peak=self._env_peak, + spectrum=np.copy(self._env_spectrum), + left_rms=self._env_left_rms, + right_rms=self._env_right_rms, + ) diff --git a/server/src/wled_controller/core/audio/filters/gain.py b/server/src/wled_controller/core/audio/filters/gain.py new file mode 100644 index 0000000..d4bb8ec --- /dev/null +++ b/server/src/wled_controller/core/audio/filters/gain.py @@ -0,0 +1,56 @@ +"""Gain audio filter — multiply all levels by a configurable factor.""" + +from dataclasses import replace +from typing import Any, Dict, List + +import numpy as np + +from wled_controller.core.audio.analysis import AudioAnalysis +from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef +from wled_controller.core.audio.filters.registry import AudioFilterRegistry + + +@AudioFilterRegistry.register +class GainFilter(AudioFilter): + """Multiply rms, peak, spectrum, and per-channel values by a factor. + + Values are clamped to [0, 1] for rms/peak scalars. + Spectrum bins are clamped to [0, 1] as well. + """ + + filter_id = "gain" + filter_name = "Gain" + + def __init__(self, options: Dict[str, Any]): + super().__init__(options) + self._factor = self.options["factor"] + + @classmethod + def get_options_schema(cls) -> List[AudioFilterOptionDef]: + return [ + AudioFilterOptionDef( + key="factor", + label="Gain Factor", + option_type="float", + default=1.0, + min_value=0.1, + max_value=10.0, + step=0.1, + ), + ] + + def process(self, analysis: AudioAnalysis) -> AudioAnalysis: + factor = self._factor + if factor == 1.0: + return analysis + + return replace( + analysis, + rms=min(1.0, analysis.rms * factor), + peak=min(1.0, analysis.peak * factor), + spectrum=np.clip(analysis.spectrum * factor, 0.0, 1.0).astype(np.float32), + left_rms=min(1.0, analysis.left_rms * factor), + left_spectrum=np.clip(analysis.left_spectrum * factor, 0.0, 1.0).astype(np.float32), + right_rms=min(1.0, analysis.right_rms * factor), + right_spectrum=np.clip(analysis.right_spectrum * factor, 0.0, 1.0).astype(np.float32), + ) diff --git a/server/src/wled_controller/core/audio/filters/inverter.py b/server/src/wled_controller/core/audio/filters/inverter.py new file mode 100644 index 0000000..fc38997 --- /dev/null +++ b/server/src/wled_controller/core/audio/filters/inverter.py @@ -0,0 +1,55 @@ +"""Inverter audio filter — invert all audio levels (1.0 - value).""" + +from dataclasses import replace +from typing import Any, Dict, List + +import numpy as np + +from wled_controller.core.audio.analysis import AudioAnalysis +from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef +from wled_controller.core.audio.filters.registry import AudioFilterRegistry + + +@AudioFilterRegistry.register +class InverterFilter(AudioFilter): + """Invert all audio levels: ``output = 1.0 - input``. + + When ``invert_spectrum`` is True (default), spectrum bins are also inverted. + Beat fields (beat, beat_intensity) are always passed through unchanged. + """ + + filter_id = "inverter" + filter_name = "Inverter" + + def __init__(self, options: Dict[str, Any]): + super().__init__(options) + self._invert_spectrum = self.options["invert_spectrum"] + + @classmethod + def get_options_schema(cls) -> List[AudioFilterOptionDef]: + return [ + AudioFilterOptionDef( + key="invert_spectrum", + label="Invert Spectrum", + option_type="bool", + default=True, + min_value=None, + max_value=None, + step=None, + ), + ] + + def process(self, analysis: AudioAnalysis) -> AudioAnalysis: + kwargs = { + "rms": 1.0 - analysis.rms, + "peak": 1.0 - analysis.peak, + "left_rms": 1.0 - analysis.left_rms, + "right_rms": 1.0 - analysis.right_rms, + } + + if self._invert_spectrum: + kwargs["spectrum"] = (1.0 - analysis.spectrum).astype(np.float32) + kwargs["left_spectrum"] = (1.0 - analysis.left_spectrum).astype(np.float32) + kwargs["right_spectrum"] = (1.0 - analysis.right_spectrum).astype(np.float32) + + return replace(analysis, **kwargs) diff --git a/server/src/wled_controller/core/audio/filters/noise_gate.py b/server/src/wled_controller/core/audio/filters/noise_gate.py new file mode 100644 index 0000000..99acf22 --- /dev/null +++ b/server/src/wled_controller/core/audio/filters/noise_gate.py @@ -0,0 +1,87 @@ +"""Noise Gate audio filter — zero signal below threshold with hysteresis.""" + +from dataclasses import replace +from typing import Any, Dict, List + +import numpy as np + +from wled_controller.core.audio.analysis import NUM_BANDS, AudioAnalysis +from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef +from wled_controller.core.audio.filters.registry import AudioFilterRegistry + +_ZERO_SPECTRUM = np.zeros(NUM_BANDS, dtype=np.float32) + + +@AudioFilterRegistry.register +class NoiseGateFilter(AudioFilter): + """Zero out all audio levels when RMS falls below a threshold. + + Hysteresis prevents rapid gate toggling: the gate opens when RMS rises + above ``threshold`` and closes only when RMS drops below + ``threshold - hysteresis``. + """ + + filter_id = "noise_gate" + filter_name = "Noise Gate" + + def __init__(self, options: Dict[str, Any]): + super().__init__(options) + self._threshold = self.options["threshold"] + self._hysteresis = self.options["hysteresis"] + self._gate_open = False + + @property + def is_stateful(self) -> bool: + return True + + def reset(self) -> None: + self._gate_open = False + + @classmethod + def get_options_schema(cls) -> List[AudioFilterOptionDef]: + return [ + AudioFilterOptionDef( + key="threshold", + label="Threshold", + option_type="float", + default=0.05, + min_value=0.0, + max_value=1.0, + step=0.01, + ), + AudioFilterOptionDef( + key="hysteresis", + label="Hysteresis", + option_type="float", + default=0.05, + min_value=0.0, + max_value=0.2, + step=0.01, + ), + ] + + def process(self, analysis: AudioAnalysis) -> AudioAnalysis: + rms = analysis.rms + + # Update gate state with hysteresis + if self._gate_open: + if rms < (self._threshold - self._hysteresis): + self._gate_open = False + else: + if rms >= self._threshold: + self._gate_open = True + + if self._gate_open: + return analysis + + # Gate is closed — zero out levels, preserve beat fields and timestamp + return replace( + analysis, + rms=0.0, + peak=0.0, + spectrum=np.copy(_ZERO_SPECTRUM), + left_rms=0.0, + left_spectrum=np.copy(_ZERO_SPECTRUM), + right_rms=0.0, + right_spectrum=np.copy(_ZERO_SPECTRUM), + ) diff --git a/server/src/wled_controller/core/audio/filters/peak_hold.py b/server/src/wled_controller/core/audio/filters/peak_hold.py new file mode 100644 index 0000000..3a1f7c8 --- /dev/null +++ b/server/src/wled_controller/core/audio/filters/peak_hold.py @@ -0,0 +1,104 @@ +"""Peak Hold audio filter — retain peak values with configurable decay.""" + +import time +from dataclasses import replace +from typing import Any, Dict, List + +import numpy as np + +from wled_controller.core.audio.analysis import NUM_BANDS, AudioAnalysis +from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef +from wled_controller.core.audio.filters.registry import AudioFilterRegistry + + +@AudioFilterRegistry.register +class PeakHoldFilter(AudioFilter): + """Retain peak values and decay them over time. + + For each spectrum bin (if per_bin) or for rms/peak scalars, retains the + maximum value seen and decays it at the configured rate. Output is the + maximum of the current value and the held (decaying) peak. + """ + + filter_id = "peak_hold" + filter_name = "Peak Hold" + + def __init__(self, options: Dict[str, Any]): + super().__init__(options) + self._decay_rate = self.options["decay_rate"] # dB/s + self._per_bin = self.options["per_bin"] + self._held_spectrum = np.zeros(NUM_BANDS, dtype=np.float32) + self._held_rms = 0.0 + self._held_peak = 0.0 + self._last_time: float | None = None + + @property + def is_stateful(self) -> bool: + return True + + def reset(self) -> None: + self._held_spectrum[:] = 0.0 + self._held_rms = 0.0 + self._held_peak = 0.0 + self._last_time = None + + @classmethod + def get_options_schema(cls) -> List[AudioFilterOptionDef]: + return [ + AudioFilterOptionDef( + key="decay_rate", + label="Decay Rate (dB/s)", + option_type="float", + default=10.0, + min_value=0.1, + max_value=50.0, + step=0.1, + ), + AudioFilterOptionDef( + key="per_bin", + label="Per Spectrum Bin", + option_type="bool", + default=True, + min_value=None, + max_value=None, + step=None, + ), + ] + + def process(self, analysis: AudioAnalysis) -> AudioAnalysis: + now = time.perf_counter() + if self._last_time is not None: + dt = now - self._last_time + else: + dt = 0.0 + self._last_time = now + + # Compute linear decay factor from dB/s + # decay_rate dB/s means the held value drops by decay_rate dB each second + # In linear: factor = 10^(-decay_rate * dt / 20) + decay_factor = 10.0 ** (-self._decay_rate * dt / 20.0) if dt > 0 else 1.0 + + # Decay held values + self._held_rms *= decay_factor + self._held_peak *= decay_factor + + # Update held values with current maxima + self._held_rms = max(self._held_rms, analysis.rms) + self._held_peak = max(self._held_peak, analysis.peak) + + new_rms = self._held_rms + new_peak = self._held_peak + + if self._per_bin: + self._held_spectrum *= decay_factor + np.maximum(self._held_spectrum, analysis.spectrum, out=self._held_spectrum) + new_spectrum = np.copy(self._held_spectrum) + else: + new_spectrum = np.copy(analysis.spectrum) + + return replace( + analysis, + rms=new_rms, + peak=new_peak, + spectrum=new_spectrum, + ) diff --git a/server/src/wled_controller/core/audio/filters/spectral_smoothing.py b/server/src/wled_controller/core/audio/filters/spectral_smoothing.py new file mode 100644 index 0000000..71d12c4 --- /dev/null +++ b/server/src/wled_controller/core/audio/filters/spectral_smoothing.py @@ -0,0 +1,72 @@ +"""Spectral Smoothing audio filter — exponential moving average per spectrum bin.""" + +from dataclasses import replace +from typing import Any, Dict, List + +import numpy as np + +from wled_controller.core.audio.analysis import NUM_BANDS, AudioAnalysis +from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef +from wled_controller.core.audio.filters.registry import AudioFilterRegistry + + +@AudioFilterRegistry.register +class SpectralSmoothingFilter(AudioFilter): + """Apply exponential moving average smoothing to each spectrum bin. + + ``smoothed[i] = factor * prev[i] + (1 - factor) * current[i]`` + + Higher factor values produce smoother (slower-responding) output. + """ + + filter_id = "spectral_smoothing" + filter_name = "Spectral Smoothing" + + def __init__(self, options: Dict[str, Any]): + super().__init__(options) + self._factor = self.options["factor"] + self._prev_spectrum = np.zeros(NUM_BANDS, dtype=np.float32) + self._prev_left = np.zeros(NUM_BANDS, dtype=np.float32) + self._prev_right = np.zeros(NUM_BANDS, dtype=np.float32) + + @property + def is_stateful(self) -> bool: + return True + + def reset(self) -> None: + self._prev_spectrum[:] = 0.0 + self._prev_left[:] = 0.0 + self._prev_right[:] = 0.0 + + @classmethod + def get_options_schema(cls) -> List[AudioFilterOptionDef]: + return [ + AudioFilterOptionDef( + key="factor", + label="Smoothing Factor", + option_type="float", + default=0.5, + min_value=0.0, + max_value=0.99, + step=0.01, + ), + ] + + def _smooth(self, prev: np.ndarray, current: np.ndarray) -> np.ndarray: + """Compute EMA and update previous state in-place, returning a copy.""" + f = self._factor + smoothed = f * prev + (1.0 - f) * current + np.copyto(prev, smoothed) + return smoothed.astype(np.float32) + + def process(self, analysis: AudioAnalysis) -> AudioAnalysis: + new_spectrum = self._smooth(self._prev_spectrum, analysis.spectrum) + new_left = self._smooth(self._prev_left, analysis.left_spectrum) + new_right = self._smooth(self._prev_right, analysis.right_spectrum) + + return replace( + analysis, + spectrum=new_spectrum, + left_spectrum=new_left, + right_spectrum=new_right, + )