feat(processed-audio-sources): phase 2 - implement 11 audio filters
Add all audio filters that transform AudioAnalysis data: - Channel Extract, Band Extract (migration from old source types) - Peak Hold, Gain, Noise Gate, Envelope Follower - Spectral Smoothing, Compressor, Inverter, Beat Gate, Delay All registered via AudioFilterRegistry with option schemas.
This commit is contained in:
@@ -9,6 +9,17 @@ from wled_controller.core.audio.filters.registry import AudioFilterRegistry
|
||||
|
||||
# Import individual filters to trigger auto-registration
|
||||
import wled_controller.core.audio.filters.audio_filter_template # noqa: F401
|
||||
import wled_controller.core.audio.filters.channel_extract # noqa: F401
|
||||
import wled_controller.core.audio.filters.band_extract # noqa: F401
|
||||
import wled_controller.core.audio.filters.peak_hold # noqa: F401
|
||||
import wled_controller.core.audio.filters.gain # noqa: F401
|
||||
import wled_controller.core.audio.filters.noise_gate # noqa: F401
|
||||
import wled_controller.core.audio.filters.envelope_follower # noqa: F401
|
||||
import wled_controller.core.audio.filters.spectral_smoothing # noqa: F401
|
||||
import wled_controller.core.audio.filters.compressor # noqa: F401
|
||||
import wled_controller.core.audio.filters.inverter # noqa: F401
|
||||
import wled_controller.core.audio.filters.beat_gate # noqa: F401
|
||||
import wled_controller.core.audio.filters.delay # noqa: F401
|
||||
|
||||
__all__ = [
|
||||
"AudioFilter",
|
||||
|
||||
@@ -0,0 +1,103 @@
|
||||
"""Band Extract audio filter — mask spectrum to a frequency range and recompute RMS."""
|
||||
|
||||
from dataclasses import replace
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from wled_controller.core.audio.analysis import AudioAnalysis
|
||||
from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef
|
||||
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
|
||||
from wled_controller.core.audio.band_filter import apply_band_filter, compute_band_mask
|
||||
|
||||
|
||||
# Preset frequency ranges
|
||||
_PRESETS = {
|
||||
"bass": (20.0, 250.0),
|
||||
"mid": (250.0, 4000.0),
|
||||
"treble": (4000.0, 20000.0),
|
||||
}
|
||||
|
||||
|
||||
@AudioFilterRegistry.register
|
||||
class BandExtractFilter(AudioFilter):
|
||||
"""Extract a frequency band from the spectrum.
|
||||
|
||||
Supports presets (bass, mid, treble) or a custom frequency range.
|
||||
Zeros out-of-band spectrum bins and recomputes RMS from in-band data.
|
||||
"""
|
||||
|
||||
filter_id = "band_extract"
|
||||
filter_name = "Band Extract"
|
||||
|
||||
def __init__(self, options: Dict[str, Any]):
|
||||
super().__init__(options)
|
||||
band = self.options["band"]
|
||||
if band == "custom":
|
||||
freq_low = self.options["freq_low"]
|
||||
freq_high = self.options["freq_high"]
|
||||
else:
|
||||
freq_low, freq_high = _PRESETS.get(band, (20.0, 20000.0))
|
||||
self._mask = compute_band_mask(freq_low, freq_high)
|
||||
|
||||
@classmethod
|
||||
def get_options_schema(cls) -> List[AudioFilterOptionDef]:
|
||||
return [
|
||||
AudioFilterOptionDef(
|
||||
key="band",
|
||||
label="Band",
|
||||
option_type="select",
|
||||
default="bass",
|
||||
min_value=None,
|
||||
max_value=None,
|
||||
step=None,
|
||||
choices=[
|
||||
{"value": "bass", "label": "Bass (20-250 Hz)"},
|
||||
{"value": "mid", "label": "Mid (250-4000 Hz)"},
|
||||
{"value": "treble", "label": "Treble (4000-20000 Hz)"},
|
||||
{"value": "custom", "label": "Custom Range"},
|
||||
],
|
||||
),
|
||||
AudioFilterOptionDef(
|
||||
key="freq_low",
|
||||
label="Low Frequency (Hz)",
|
||||
option_type="float",
|
||||
default=20.0,
|
||||
min_value=20.0,
|
||||
max_value=20000.0,
|
||||
step=1.0,
|
||||
),
|
||||
AudioFilterOptionDef(
|
||||
key="freq_high",
|
||||
label="High Frequency (Hz)",
|
||||
option_type="float",
|
||||
default=20000.0,
|
||||
min_value=20.0,
|
||||
max_value=20000.0,
|
||||
step=1.0,
|
||||
),
|
||||
]
|
||||
|
||||
def process(self, analysis: AudioAnalysis) -> AudioAnalysis:
|
||||
filtered_spectrum, filtered_rms = apply_band_filter(
|
||||
analysis.spectrum,
|
||||
analysis.rms,
|
||||
self._mask,
|
||||
)
|
||||
filtered_left, filtered_left_rms = apply_band_filter(
|
||||
analysis.left_spectrum,
|
||||
analysis.left_rms,
|
||||
self._mask,
|
||||
)
|
||||
filtered_right, filtered_right_rms = apply_band_filter(
|
||||
analysis.right_spectrum,
|
||||
analysis.right_rms,
|
||||
self._mask,
|
||||
)
|
||||
return replace(
|
||||
analysis,
|
||||
rms=filtered_rms,
|
||||
spectrum=filtered_spectrum,
|
||||
left_rms=filtered_left_rms,
|
||||
left_spectrum=filtered_left,
|
||||
right_rms=filtered_right_rms,
|
||||
right_spectrum=filtered_right,
|
||||
)
|
||||
@@ -0,0 +1,78 @@
|
||||
"""Beat Gate audio filter — pass signal only around beat events."""
|
||||
|
||||
import time
|
||||
from dataclasses import replace
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import numpy as np
|
||||
|
||||
from wled_controller.core.audio.analysis import NUM_BANDS, AudioAnalysis
|
||||
from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef
|
||||
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
|
||||
|
||||
_ZERO_SPECTRUM = np.zeros(NUM_BANDS, dtype=np.float32)
|
||||
|
||||
|
||||
@AudioFilterRegistry.register
|
||||
class BeatGateFilter(AudioFilter):
|
||||
"""Pass audio signal through only when a beat is detected.
|
||||
|
||||
When a beat is detected, the gate opens and holds for ``hold_ms``
|
||||
milliseconds, passing the signal through. Between beats (after hold
|
||||
expires), rms/peak/spectrum are zeroed out. Beat fields themselves
|
||||
always pass through unchanged.
|
||||
"""
|
||||
|
||||
filter_id = "beat_gate"
|
||||
filter_name = "Beat Gate"
|
||||
|
||||
def __init__(self, options: Dict[str, Any]):
|
||||
super().__init__(options)
|
||||
self._hold_ms = self.options["hold_ms"]
|
||||
self._last_beat_time: float | None = None
|
||||
|
||||
@property
|
||||
def is_stateful(self) -> bool:
|
||||
return True
|
||||
|
||||
def reset(self) -> None:
|
||||
self._last_beat_time = None
|
||||
|
||||
@classmethod
|
||||
def get_options_schema(cls) -> List[AudioFilterOptionDef]:
|
||||
return [
|
||||
AudioFilterOptionDef(
|
||||
key="hold_ms",
|
||||
label="Hold Time (ms)",
|
||||
option_type="float",
|
||||
default=50.0,
|
||||
min_value=10.0,
|
||||
max_value=500.0,
|
||||
step=1.0,
|
||||
),
|
||||
]
|
||||
|
||||
def process(self, analysis: AudioAnalysis) -> AudioAnalysis:
|
||||
now = time.perf_counter()
|
||||
|
||||
# Record beat time
|
||||
if analysis.beat:
|
||||
self._last_beat_time = now
|
||||
|
||||
# Check if we're within the hold window
|
||||
if self._last_beat_time is not None:
|
||||
elapsed_ms = (now - self._last_beat_time) * 1000.0
|
||||
if elapsed_ms <= self._hold_ms:
|
||||
return analysis
|
||||
|
||||
# Gate closed — zero out levels, preserve beat fields
|
||||
return replace(
|
||||
analysis,
|
||||
rms=0.0,
|
||||
peak=0.0,
|
||||
spectrum=np.copy(_ZERO_SPECTRUM),
|
||||
left_rms=0.0,
|
||||
left_spectrum=np.copy(_ZERO_SPECTRUM),
|
||||
right_rms=0.0,
|
||||
right_spectrum=np.copy(_ZERO_SPECTRUM),
|
||||
)
|
||||
@@ -0,0 +1,70 @@
|
||||
"""Channel Extract audio filter — select mono/left/right from stereo AudioAnalysis."""
|
||||
|
||||
from dataclasses import replace
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import numpy as np
|
||||
|
||||
from wled_controller.core.audio.analysis import AudioAnalysis
|
||||
from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef
|
||||
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
|
||||
|
||||
|
||||
@AudioFilterRegistry.register
|
||||
class ChannelExtractFilter(AudioFilter):
|
||||
"""Select a single channel (mono mix, left, or right) from stereo AudioAnalysis.
|
||||
|
||||
When 'mono' is selected, left and right are averaged into the main fields.
|
||||
When 'left' or 'right' is selected, that channel's data replaces the main fields.
|
||||
"""
|
||||
|
||||
filter_id = "channel_extract"
|
||||
filter_name = "Channel Extract"
|
||||
|
||||
def __init__(self, options: Dict[str, Any]):
|
||||
super().__init__(options)
|
||||
self._channel = self.options["channel"]
|
||||
|
||||
@classmethod
|
||||
def get_options_schema(cls) -> List[AudioFilterOptionDef]:
|
||||
return [
|
||||
AudioFilterOptionDef(
|
||||
key="channel",
|
||||
label="Channel",
|
||||
option_type="select",
|
||||
default="mono",
|
||||
min_value=None,
|
||||
max_value=None,
|
||||
step=None,
|
||||
choices=[
|
||||
{"value": "mono", "label": "Mono (L+R average)"},
|
||||
{"value": "left", "label": "Left"},
|
||||
{"value": "right", "label": "Right"},
|
||||
],
|
||||
),
|
||||
]
|
||||
|
||||
def process(self, analysis: AudioAnalysis) -> AudioAnalysis:
|
||||
channel = self._channel
|
||||
|
||||
if channel == "left":
|
||||
return replace(
|
||||
analysis,
|
||||
rms=analysis.left_rms,
|
||||
spectrum=np.copy(analysis.left_spectrum),
|
||||
)
|
||||
elif channel == "right":
|
||||
return replace(
|
||||
analysis,
|
||||
rms=analysis.right_rms,
|
||||
spectrum=np.copy(analysis.right_spectrum),
|
||||
)
|
||||
else:
|
||||
# mono: average left and right
|
||||
avg_rms = (analysis.left_rms + analysis.right_rms) / 2.0
|
||||
avg_spectrum = (analysis.left_spectrum + analysis.right_spectrum) / 2.0
|
||||
return replace(
|
||||
analysis,
|
||||
rms=avg_rms,
|
||||
spectrum=avg_spectrum.astype(np.float32),
|
||||
)
|
||||
@@ -0,0 +1,103 @@
|
||||
"""Compressor audio filter — reduce dynamic range above threshold."""
|
||||
|
||||
from dataclasses import replace
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import numpy as np
|
||||
|
||||
from wled_controller.core.audio.analysis import AudioAnalysis
|
||||
from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef
|
||||
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
|
||||
|
||||
|
||||
@AudioFilterRegistry.register
|
||||
class CompressorFilter(AudioFilter):
|
||||
"""Reduce dynamic range above a threshold.
|
||||
|
||||
For signals above ``threshold``, output is compressed:
|
||||
``output = threshold + (input - threshold) / ratio``
|
||||
|
||||
Makeup gain is applied after compression to restore overall level.
|
||||
Applied to rms, peak, and per-bin spectrum values.
|
||||
"""
|
||||
|
||||
filter_id = "compressor"
|
||||
filter_name = "Compressor"
|
||||
|
||||
def __init__(self, options: Dict[str, Any]):
|
||||
super().__init__(options)
|
||||
self._threshold = self.options["threshold"]
|
||||
self._ratio = self.options["ratio"]
|
||||
self._makeup_gain = self.options["makeup_gain"]
|
||||
|
||||
@property
|
||||
def is_stateful(self) -> bool:
|
||||
return True
|
||||
|
||||
def reset(self) -> None:
|
||||
pass # Stateful for envelope tracking; minimal state for static compression
|
||||
|
||||
@classmethod
|
||||
def get_options_schema(cls) -> List[AudioFilterOptionDef]:
|
||||
return [
|
||||
AudioFilterOptionDef(
|
||||
key="threshold",
|
||||
label="Threshold",
|
||||
option_type="float",
|
||||
default=0.5,
|
||||
min_value=0.0,
|
||||
max_value=1.0,
|
||||
step=0.01,
|
||||
),
|
||||
AudioFilterOptionDef(
|
||||
key="ratio",
|
||||
label="Ratio",
|
||||
option_type="float",
|
||||
default=4.0,
|
||||
min_value=1.0,
|
||||
max_value=20.0,
|
||||
step=0.1,
|
||||
),
|
||||
AudioFilterOptionDef(
|
||||
key="makeup_gain",
|
||||
label="Makeup Gain",
|
||||
option_type="float",
|
||||
default=1.0,
|
||||
min_value=0.0,
|
||||
max_value=2.0,
|
||||
step=0.05,
|
||||
),
|
||||
]
|
||||
|
||||
def _compress_scalar(self, value: float) -> float:
|
||||
"""Compress a single scalar value."""
|
||||
threshold = self._threshold
|
||||
if value <= threshold:
|
||||
compressed = value
|
||||
else:
|
||||
compressed = threshold + (value - threshold) / self._ratio
|
||||
return min(1.0, compressed * self._makeup_gain)
|
||||
|
||||
def _compress_spectrum(self, spectrum: np.ndarray) -> np.ndarray:
|
||||
"""Compress spectrum array element-wise."""
|
||||
threshold = self._threshold
|
||||
ratio = self._ratio
|
||||
makeup = self._makeup_gain
|
||||
|
||||
above_mask = spectrum > threshold
|
||||
result = np.copy(spectrum)
|
||||
result[above_mask] = threshold + (result[above_mask] - threshold) / ratio
|
||||
result *= makeup
|
||||
return np.clip(result, 0.0, 1.0).astype(np.float32)
|
||||
|
||||
def process(self, analysis: AudioAnalysis) -> AudioAnalysis:
|
||||
return replace(
|
||||
analysis,
|
||||
rms=self._compress_scalar(analysis.rms),
|
||||
peak=self._compress_scalar(analysis.peak),
|
||||
spectrum=self._compress_spectrum(analysis.spectrum),
|
||||
left_rms=self._compress_scalar(analysis.left_rms),
|
||||
left_spectrum=self._compress_spectrum(analysis.left_spectrum),
|
||||
right_rms=self._compress_scalar(analysis.right_rms),
|
||||
right_spectrum=self._compress_spectrum(analysis.right_spectrum),
|
||||
)
|
||||
@@ -0,0 +1,83 @@
|
||||
"""Delay audio filter — time-shift AudioAnalysis by a configurable amount."""
|
||||
|
||||
from collections import deque
|
||||
from dataclasses import replace
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import numpy as np
|
||||
|
||||
from wled_controller.core.audio.analysis import NUM_BANDS, AudioAnalysis
|
||||
from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef
|
||||
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
|
||||
|
||||
# Assumed update rate for sizing the ring buffer
|
||||
_UPDATE_RATE_HZ = 30
|
||||
|
||||
|
||||
@AudioFilterRegistry.register
|
||||
class DelayFilter(AudioFilter):
|
||||
"""Buffer incoming AudioAnalysis snapshots and output the one from N ms ago.
|
||||
|
||||
Uses a ring buffer (deque) sized for the configured delay at ~30 Hz
|
||||
update rate. Until the buffer is full, outputs a silent AudioAnalysis.
|
||||
"""
|
||||
|
||||
filter_id = "delay"
|
||||
filter_name = "Delay"
|
||||
|
||||
def __init__(self, options: Dict[str, Any]):
|
||||
super().__init__(options)
|
||||
self._delay_ms = self.options["delay_ms"]
|
||||
self._buffer_size = max(1, int(self._delay_ms / 1000.0 * _UPDATE_RATE_HZ))
|
||||
self._buffer: deque[AudioAnalysis] = deque(maxlen=self._buffer_size)
|
||||
|
||||
@property
|
||||
def is_stateful(self) -> bool:
|
||||
return True
|
||||
|
||||
def reset(self) -> None:
|
||||
self._buffer.clear()
|
||||
|
||||
@classmethod
|
||||
def get_options_schema(cls) -> List[AudioFilterOptionDef]:
|
||||
return [
|
||||
AudioFilterOptionDef(
|
||||
key="delay_ms",
|
||||
label="Delay (ms)",
|
||||
option_type="float",
|
||||
default=100.0,
|
||||
min_value=10.0,
|
||||
max_value=2000.0,
|
||||
step=10.0,
|
||||
),
|
||||
]
|
||||
|
||||
def process(self, analysis: AudioAnalysis) -> AudioAnalysis:
|
||||
# Take a snapshot with copied arrays to avoid reference issues
|
||||
snapshot = replace(
|
||||
analysis,
|
||||
spectrum=np.copy(analysis.spectrum),
|
||||
left_spectrum=np.copy(analysis.left_spectrum),
|
||||
right_spectrum=np.copy(analysis.right_spectrum),
|
||||
)
|
||||
|
||||
if len(self._buffer) >= self._buffer_size:
|
||||
# Buffer full — return the oldest entry (the delayed one)
|
||||
delayed = self._buffer[0]
|
||||
self._buffer.append(snapshot)
|
||||
return delayed
|
||||
else:
|
||||
# Buffer not yet full — store and output silence
|
||||
self._buffer.append(snapshot)
|
||||
return replace(
|
||||
analysis,
|
||||
rms=0.0,
|
||||
peak=0.0,
|
||||
spectrum=np.zeros(NUM_BANDS, dtype=np.float32),
|
||||
beat=False,
|
||||
beat_intensity=0.0,
|
||||
left_rms=0.0,
|
||||
left_spectrum=np.zeros(NUM_BANDS, dtype=np.float32),
|
||||
right_rms=0.0,
|
||||
right_spectrum=np.zeros(NUM_BANDS, dtype=np.float32),
|
||||
)
|
||||
@@ -0,0 +1,116 @@
|
||||
"""Envelope Follower audio filter — smooth amplitude with asymmetric attack/release."""
|
||||
|
||||
import time
|
||||
from dataclasses import replace
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import numpy as np
|
||||
|
||||
from wled_controller.core.audio.analysis import NUM_BANDS, AudioAnalysis
|
||||
from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef
|
||||
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
|
||||
|
||||
|
||||
def _time_constant_coeff(time_ms: float, dt: float) -> float:
|
||||
"""Compute exponential smoothing coefficient from time constant and delta-time.
|
||||
|
||||
Returns a value in [0, 1] where 0 = no change, 1 = instant follow.
|
||||
"""
|
||||
if time_ms <= 0.0 or dt <= 0.0:
|
||||
return 1.0
|
||||
# Time constant: the coefficient such that we reach ~63.2% in time_ms
|
||||
tau = time_ms / 1000.0
|
||||
return min(1.0, 1.0 - np.exp(-dt / tau))
|
||||
|
||||
|
||||
@AudioFilterRegistry.register
|
||||
class EnvelopeFollowerFilter(AudioFilter):
|
||||
"""Smooth RMS and peak with asymmetric attack/release time constants.
|
||||
|
||||
Fast attack + slow release produces punchy transients that fade smoothly.
|
||||
Applied to rms, peak, and per-bin spectrum values.
|
||||
"""
|
||||
|
||||
filter_id = "envelope_follower"
|
||||
filter_name = "Envelope Follower"
|
||||
|
||||
def __init__(self, options: Dict[str, Any]):
|
||||
super().__init__(options)
|
||||
self._attack_ms = self.options["attack_ms"]
|
||||
self._release_ms = self.options["release_ms"]
|
||||
self._env_rms = 0.0
|
||||
self._env_peak = 0.0
|
||||
self._env_spectrum = np.zeros(NUM_BANDS, dtype=np.float32)
|
||||
self._env_left_rms = 0.0
|
||||
self._env_right_rms = 0.0
|
||||
self._last_time: float | None = None
|
||||
|
||||
@property
|
||||
def is_stateful(self) -> bool:
|
||||
return True
|
||||
|
||||
def reset(self) -> None:
|
||||
self._env_rms = 0.0
|
||||
self._env_peak = 0.0
|
||||
self._env_spectrum[:] = 0.0
|
||||
self._env_left_rms = 0.0
|
||||
self._env_right_rms = 0.0
|
||||
self._last_time = None
|
||||
|
||||
@classmethod
|
||||
def get_options_schema(cls) -> List[AudioFilterOptionDef]:
|
||||
return [
|
||||
AudioFilterOptionDef(
|
||||
key="attack_ms",
|
||||
label="Attack (ms)",
|
||||
option_type="float",
|
||||
default=10.0,
|
||||
min_value=1.0,
|
||||
max_value=500.0,
|
||||
step=1.0,
|
||||
),
|
||||
AudioFilterOptionDef(
|
||||
key="release_ms",
|
||||
label="Release (ms)",
|
||||
option_type="float",
|
||||
default=200.0,
|
||||
min_value=10.0,
|
||||
max_value=2000.0,
|
||||
step=1.0,
|
||||
),
|
||||
]
|
||||
|
||||
def _smooth_scalar(self, current: float, env: float, dt: float) -> float:
|
||||
"""Apply asymmetric smoothing to a single scalar value."""
|
||||
if current > env:
|
||||
coeff = _time_constant_coeff(self._attack_ms, dt)
|
||||
else:
|
||||
coeff = _time_constant_coeff(self._release_ms, dt)
|
||||
return env + coeff * (current - env)
|
||||
|
||||
def process(self, analysis: AudioAnalysis) -> AudioAnalysis:
|
||||
now = time.perf_counter()
|
||||
dt = (now - self._last_time) if self._last_time is not None else 0.0
|
||||
self._last_time = now
|
||||
|
||||
# Smooth scalars
|
||||
self._env_rms = self._smooth_scalar(analysis.rms, self._env_rms, dt)
|
||||
self._env_peak = self._smooth_scalar(analysis.peak, self._env_peak, dt)
|
||||
self._env_left_rms = self._smooth_scalar(analysis.left_rms, self._env_left_rms, dt)
|
||||
self._env_right_rms = self._smooth_scalar(analysis.right_rms, self._env_right_rms, dt)
|
||||
|
||||
# Smooth spectrum per-bin
|
||||
attack_coeff = _time_constant_coeff(self._attack_ms, dt)
|
||||
release_coeff = _time_constant_coeff(self._release_ms, dt)
|
||||
rising = analysis.spectrum > self._env_spectrum
|
||||
coeff = np.where(rising, attack_coeff, release_coeff).astype(np.float32)
|
||||
self._env_spectrum = self._env_spectrum + coeff * (analysis.spectrum - self._env_spectrum)
|
||||
|
||||
return replace(
|
||||
analysis,
|
||||
rms=self._env_rms,
|
||||
peak=self._env_peak,
|
||||
spectrum=np.copy(self._env_spectrum),
|
||||
left_rms=self._env_left_rms,
|
||||
right_rms=self._env_right_rms,
|
||||
)
|
||||
@@ -0,0 +1,56 @@
|
||||
"""Gain audio filter — multiply all levels by a configurable factor."""
|
||||
|
||||
from dataclasses import replace
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import numpy as np
|
||||
|
||||
from wled_controller.core.audio.analysis import AudioAnalysis
|
||||
from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef
|
||||
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
|
||||
|
||||
|
||||
@AudioFilterRegistry.register
|
||||
class GainFilter(AudioFilter):
|
||||
"""Multiply rms, peak, spectrum, and per-channel values by a factor.
|
||||
|
||||
Values are clamped to [0, 1] for rms/peak scalars.
|
||||
Spectrum bins are clamped to [0, 1] as well.
|
||||
"""
|
||||
|
||||
filter_id = "gain"
|
||||
filter_name = "Gain"
|
||||
|
||||
def __init__(self, options: Dict[str, Any]):
|
||||
super().__init__(options)
|
||||
self._factor = self.options["factor"]
|
||||
|
||||
@classmethod
|
||||
def get_options_schema(cls) -> List[AudioFilterOptionDef]:
|
||||
return [
|
||||
AudioFilterOptionDef(
|
||||
key="factor",
|
||||
label="Gain Factor",
|
||||
option_type="float",
|
||||
default=1.0,
|
||||
min_value=0.1,
|
||||
max_value=10.0,
|
||||
step=0.1,
|
||||
),
|
||||
]
|
||||
|
||||
def process(self, analysis: AudioAnalysis) -> AudioAnalysis:
|
||||
factor = self._factor
|
||||
if factor == 1.0:
|
||||
return analysis
|
||||
|
||||
return replace(
|
||||
analysis,
|
||||
rms=min(1.0, analysis.rms * factor),
|
||||
peak=min(1.0, analysis.peak * factor),
|
||||
spectrum=np.clip(analysis.spectrum * factor, 0.0, 1.0).astype(np.float32),
|
||||
left_rms=min(1.0, analysis.left_rms * factor),
|
||||
left_spectrum=np.clip(analysis.left_spectrum * factor, 0.0, 1.0).astype(np.float32),
|
||||
right_rms=min(1.0, analysis.right_rms * factor),
|
||||
right_spectrum=np.clip(analysis.right_spectrum * factor, 0.0, 1.0).astype(np.float32),
|
||||
)
|
||||
@@ -0,0 +1,55 @@
|
||||
"""Inverter audio filter — invert all audio levels (1.0 - value)."""
|
||||
|
||||
from dataclasses import replace
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import numpy as np
|
||||
|
||||
from wled_controller.core.audio.analysis import AudioAnalysis
|
||||
from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef
|
||||
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
|
||||
|
||||
|
||||
@AudioFilterRegistry.register
|
||||
class InverterFilter(AudioFilter):
|
||||
"""Invert all audio levels: ``output = 1.0 - input``.
|
||||
|
||||
When ``invert_spectrum`` is True (default), spectrum bins are also inverted.
|
||||
Beat fields (beat, beat_intensity) are always passed through unchanged.
|
||||
"""
|
||||
|
||||
filter_id = "inverter"
|
||||
filter_name = "Inverter"
|
||||
|
||||
def __init__(self, options: Dict[str, Any]):
|
||||
super().__init__(options)
|
||||
self._invert_spectrum = self.options["invert_spectrum"]
|
||||
|
||||
@classmethod
|
||||
def get_options_schema(cls) -> List[AudioFilterOptionDef]:
|
||||
return [
|
||||
AudioFilterOptionDef(
|
||||
key="invert_spectrum",
|
||||
label="Invert Spectrum",
|
||||
option_type="bool",
|
||||
default=True,
|
||||
min_value=None,
|
||||
max_value=None,
|
||||
step=None,
|
||||
),
|
||||
]
|
||||
|
||||
def process(self, analysis: AudioAnalysis) -> AudioAnalysis:
|
||||
kwargs = {
|
||||
"rms": 1.0 - analysis.rms,
|
||||
"peak": 1.0 - analysis.peak,
|
||||
"left_rms": 1.0 - analysis.left_rms,
|
||||
"right_rms": 1.0 - analysis.right_rms,
|
||||
}
|
||||
|
||||
if self._invert_spectrum:
|
||||
kwargs["spectrum"] = (1.0 - analysis.spectrum).astype(np.float32)
|
||||
kwargs["left_spectrum"] = (1.0 - analysis.left_spectrum).astype(np.float32)
|
||||
kwargs["right_spectrum"] = (1.0 - analysis.right_spectrum).astype(np.float32)
|
||||
|
||||
return replace(analysis, **kwargs)
|
||||
@@ -0,0 +1,87 @@
|
||||
"""Noise Gate audio filter — zero signal below threshold with hysteresis."""
|
||||
|
||||
from dataclasses import replace
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import numpy as np
|
||||
|
||||
from wled_controller.core.audio.analysis import NUM_BANDS, AudioAnalysis
|
||||
from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef
|
||||
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
|
||||
|
||||
_ZERO_SPECTRUM = np.zeros(NUM_BANDS, dtype=np.float32)
|
||||
|
||||
|
||||
@AudioFilterRegistry.register
|
||||
class NoiseGateFilter(AudioFilter):
|
||||
"""Zero out all audio levels when RMS falls below a threshold.
|
||||
|
||||
Hysteresis prevents rapid gate toggling: the gate opens when RMS rises
|
||||
above ``threshold`` and closes only when RMS drops below
|
||||
``threshold - hysteresis``.
|
||||
"""
|
||||
|
||||
filter_id = "noise_gate"
|
||||
filter_name = "Noise Gate"
|
||||
|
||||
def __init__(self, options: Dict[str, Any]):
|
||||
super().__init__(options)
|
||||
self._threshold = self.options["threshold"]
|
||||
self._hysteresis = self.options["hysteresis"]
|
||||
self._gate_open = False
|
||||
|
||||
@property
|
||||
def is_stateful(self) -> bool:
|
||||
return True
|
||||
|
||||
def reset(self) -> None:
|
||||
self._gate_open = False
|
||||
|
||||
@classmethod
|
||||
def get_options_schema(cls) -> List[AudioFilterOptionDef]:
|
||||
return [
|
||||
AudioFilterOptionDef(
|
||||
key="threshold",
|
||||
label="Threshold",
|
||||
option_type="float",
|
||||
default=0.05,
|
||||
min_value=0.0,
|
||||
max_value=1.0,
|
||||
step=0.01,
|
||||
),
|
||||
AudioFilterOptionDef(
|
||||
key="hysteresis",
|
||||
label="Hysteresis",
|
||||
option_type="float",
|
||||
default=0.05,
|
||||
min_value=0.0,
|
||||
max_value=0.2,
|
||||
step=0.01,
|
||||
),
|
||||
]
|
||||
|
||||
def process(self, analysis: AudioAnalysis) -> AudioAnalysis:
|
||||
rms = analysis.rms
|
||||
|
||||
# Update gate state with hysteresis
|
||||
if self._gate_open:
|
||||
if rms < (self._threshold - self._hysteresis):
|
||||
self._gate_open = False
|
||||
else:
|
||||
if rms >= self._threshold:
|
||||
self._gate_open = True
|
||||
|
||||
if self._gate_open:
|
||||
return analysis
|
||||
|
||||
# Gate is closed — zero out levels, preserve beat fields and timestamp
|
||||
return replace(
|
||||
analysis,
|
||||
rms=0.0,
|
||||
peak=0.0,
|
||||
spectrum=np.copy(_ZERO_SPECTRUM),
|
||||
left_rms=0.0,
|
||||
left_spectrum=np.copy(_ZERO_SPECTRUM),
|
||||
right_rms=0.0,
|
||||
right_spectrum=np.copy(_ZERO_SPECTRUM),
|
||||
)
|
||||
@@ -0,0 +1,104 @@
|
||||
"""Peak Hold audio filter — retain peak values with configurable decay."""
|
||||
|
||||
import time
|
||||
from dataclasses import replace
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import numpy as np
|
||||
|
||||
from wled_controller.core.audio.analysis import NUM_BANDS, AudioAnalysis
|
||||
from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef
|
||||
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
|
||||
|
||||
|
||||
@AudioFilterRegistry.register
|
||||
class PeakHoldFilter(AudioFilter):
|
||||
"""Retain peak values and decay them over time.
|
||||
|
||||
For each spectrum bin (if per_bin) or for rms/peak scalars, retains the
|
||||
maximum value seen and decays it at the configured rate. Output is the
|
||||
maximum of the current value and the held (decaying) peak.
|
||||
"""
|
||||
|
||||
filter_id = "peak_hold"
|
||||
filter_name = "Peak Hold"
|
||||
|
||||
def __init__(self, options: Dict[str, Any]):
|
||||
super().__init__(options)
|
||||
self._decay_rate = self.options["decay_rate"] # dB/s
|
||||
self._per_bin = self.options["per_bin"]
|
||||
self._held_spectrum = np.zeros(NUM_BANDS, dtype=np.float32)
|
||||
self._held_rms = 0.0
|
||||
self._held_peak = 0.0
|
||||
self._last_time: float | None = None
|
||||
|
||||
@property
|
||||
def is_stateful(self) -> bool:
|
||||
return True
|
||||
|
||||
def reset(self) -> None:
|
||||
self._held_spectrum[:] = 0.0
|
||||
self._held_rms = 0.0
|
||||
self._held_peak = 0.0
|
||||
self._last_time = None
|
||||
|
||||
@classmethod
|
||||
def get_options_schema(cls) -> List[AudioFilterOptionDef]:
|
||||
return [
|
||||
AudioFilterOptionDef(
|
||||
key="decay_rate",
|
||||
label="Decay Rate (dB/s)",
|
||||
option_type="float",
|
||||
default=10.0,
|
||||
min_value=0.1,
|
||||
max_value=50.0,
|
||||
step=0.1,
|
||||
),
|
||||
AudioFilterOptionDef(
|
||||
key="per_bin",
|
||||
label="Per Spectrum Bin",
|
||||
option_type="bool",
|
||||
default=True,
|
||||
min_value=None,
|
||||
max_value=None,
|
||||
step=None,
|
||||
),
|
||||
]
|
||||
|
||||
def process(self, analysis: AudioAnalysis) -> AudioAnalysis:
|
||||
now = time.perf_counter()
|
||||
if self._last_time is not None:
|
||||
dt = now - self._last_time
|
||||
else:
|
||||
dt = 0.0
|
||||
self._last_time = now
|
||||
|
||||
# Compute linear decay factor from dB/s
|
||||
# decay_rate dB/s means the held value drops by decay_rate dB each second
|
||||
# In linear: factor = 10^(-decay_rate * dt / 20)
|
||||
decay_factor = 10.0 ** (-self._decay_rate * dt / 20.0) if dt > 0 else 1.0
|
||||
|
||||
# Decay held values
|
||||
self._held_rms *= decay_factor
|
||||
self._held_peak *= decay_factor
|
||||
|
||||
# Update held values with current maxima
|
||||
self._held_rms = max(self._held_rms, analysis.rms)
|
||||
self._held_peak = max(self._held_peak, analysis.peak)
|
||||
|
||||
new_rms = self._held_rms
|
||||
new_peak = self._held_peak
|
||||
|
||||
if self._per_bin:
|
||||
self._held_spectrum *= decay_factor
|
||||
np.maximum(self._held_spectrum, analysis.spectrum, out=self._held_spectrum)
|
||||
new_spectrum = np.copy(self._held_spectrum)
|
||||
else:
|
||||
new_spectrum = np.copy(analysis.spectrum)
|
||||
|
||||
return replace(
|
||||
analysis,
|
||||
rms=new_rms,
|
||||
peak=new_peak,
|
||||
spectrum=new_spectrum,
|
||||
)
|
||||
@@ -0,0 +1,72 @@
|
||||
"""Spectral Smoothing audio filter — exponential moving average per spectrum bin."""
|
||||
|
||||
from dataclasses import replace
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import numpy as np
|
||||
|
||||
from wled_controller.core.audio.analysis import NUM_BANDS, AudioAnalysis
|
||||
from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef
|
||||
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
|
||||
|
||||
|
||||
@AudioFilterRegistry.register
|
||||
class SpectralSmoothingFilter(AudioFilter):
|
||||
"""Apply exponential moving average smoothing to each spectrum bin.
|
||||
|
||||
``smoothed[i] = factor * prev[i] + (1 - factor) * current[i]``
|
||||
|
||||
Higher factor values produce smoother (slower-responding) output.
|
||||
"""
|
||||
|
||||
filter_id = "spectral_smoothing"
|
||||
filter_name = "Spectral Smoothing"
|
||||
|
||||
def __init__(self, options: Dict[str, Any]):
|
||||
super().__init__(options)
|
||||
self._factor = self.options["factor"]
|
||||
self._prev_spectrum = np.zeros(NUM_BANDS, dtype=np.float32)
|
||||
self._prev_left = np.zeros(NUM_BANDS, dtype=np.float32)
|
||||
self._prev_right = np.zeros(NUM_BANDS, dtype=np.float32)
|
||||
|
||||
@property
|
||||
def is_stateful(self) -> bool:
|
||||
return True
|
||||
|
||||
def reset(self) -> None:
|
||||
self._prev_spectrum[:] = 0.0
|
||||
self._prev_left[:] = 0.0
|
||||
self._prev_right[:] = 0.0
|
||||
|
||||
@classmethod
|
||||
def get_options_schema(cls) -> List[AudioFilterOptionDef]:
|
||||
return [
|
||||
AudioFilterOptionDef(
|
||||
key="factor",
|
||||
label="Smoothing Factor",
|
||||
option_type="float",
|
||||
default=0.5,
|
||||
min_value=0.0,
|
||||
max_value=0.99,
|
||||
step=0.01,
|
||||
),
|
||||
]
|
||||
|
||||
def _smooth(self, prev: np.ndarray, current: np.ndarray) -> np.ndarray:
|
||||
"""Compute EMA and update previous state in-place, returning a copy."""
|
||||
f = self._factor
|
||||
smoothed = f * prev + (1.0 - f) * current
|
||||
np.copyto(prev, smoothed)
|
||||
return smoothed.astype(np.float32)
|
||||
|
||||
def process(self, analysis: AudioAnalysis) -> AudioAnalysis:
|
||||
new_spectrum = self._smooth(self._prev_spectrum, analysis.spectrum)
|
||||
new_left = self._smooth(self._prev_left, analysis.left_spectrum)
|
||||
new_right = self._smooth(self._prev_right, analysis.right_spectrum)
|
||||
|
||||
return replace(
|
||||
analysis,
|
||||
spectrum=new_spectrum,
|
||||
left_spectrum=new_left,
|
||||
right_spectrum=new_right,
|
||||
)
|
||||
Reference in New Issue
Block a user