feat(processed-audio-sources): phase 4 - runtime filter integration

Add AudioFilterPipeline for chained filter execution on AudioAnalysis.
Wire filter pipelines into AudioColorStripStream, AudioValueStream,
and WebSocket test endpoint. Add hot-update support via
ProcessorManager.refresh_audio_filter_pipelines(). Thread
AudioProcessingTemplateStore through dependency injection hierarchy.
This commit is contained in:
2026-03-31 19:15:29 +03:00
parent 353c090b42
commit ab43578049
12 changed files with 309 additions and 38 deletions
@@ -6,6 +6,7 @@ from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
fire_entity_event,
get_audio_processing_template_store,
get_processor_manager,
)
from wled_controller.api.schemas.audio_processing import (
AudioProcessingTemplateCreate,
@@ -129,6 +130,12 @@ async def update_audio_processing_template(
tags=data.tags,
)
fire_entity_event("audio_processing_template", "updated", template_id)
# Hot-update: rebuild filter pipelines for running streams using this template
try:
pm = get_processor_manager()
pm.refresh_audio_filter_pipelines(template_id)
except Exception:
pass # Non-critical: streams will pick up changes on next restart
return _apt_to_response(template)
except EntityNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
@@ -154,6 +161,12 @@ async def delete_audio_processing_template(
# TODO: Phase 3 will add reference checks against ProcessedAudioSource
store.delete_template(template_id)
fire_entity_event("audio_processing_template", "deleted", template_id)
# Hot-update: rebuild filter pipelines for running streams that used this template
try:
pm = get_processor_manager()
pm.refresh_audio_filter_pipelines(template_id)
except Exception:
pass # Non-critical
except HTTPException:
raise
except EntityNotFoundError as e:
@@ -9,6 +9,7 @@ from starlette.websockets import WebSocket, WebSocketDisconnect
from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
fire_entity_event,
get_audio_processing_template_store,
get_audio_source_store,
get_audio_template_store,
get_color_strip_store,
@@ -210,11 +211,12 @@ async def test_audio_source_ws(
ManagedAudioStream (ref-counted — shares with running targets), and streams
AudioAnalysis snapshots as JSON at ~20 Hz.
NOTE: Audio processing filters from the template chain are NOT applied in
this WebSocket yet — that will be wired in Phase 4 when the stream runtime
integrates filter instances.
Audio processing filters from the template chain are applied to the
analysis before sending, so the WebSocket output matches what running
streams see.
"""
from wled_controller.api.auth import verify_ws_token
from wled_controller.core.audio.filters.pipeline import build_pipeline_from_template_ids
if not verify_ws_token(token):
await websocket.close(code=4001, reason="Unauthorized")
@@ -223,6 +225,7 @@ async def test_audio_source_ws(
# Resolve source → device info + processing template chain
store = get_audio_source_store()
template_store = get_audio_template_store()
apt_store = get_audio_processing_template_store()
manager = get_processor_manager()
try:
@@ -247,6 +250,15 @@ async def test_audio_source_ws(
logger.debug("Audio template not found, falling back to best available engine: %s", e)
pass # Fall back to best available engine
# Build filter pipeline from processing template chain
pipeline = None
if resolved.audio_processing_template_ids and apt_store:
pipeline = build_pipeline_from_template_ids(
resolved.audio_processing_template_ids, apt_store
)
if pipeline.empty:
pipeline = None
# Acquire shared audio stream
audio_mgr = manager.audio_capture_manager
try:
@@ -265,7 +277,10 @@ async def test_audio_source_ws(
if analysis is not None and analysis.timestamp != last_ts:
last_ts = analysis.timestamp
# Send raw analysis — filter processing will be added in Phase 4
# Apply filter pipeline (channel extract, band extract, gain, etc.)
if pipeline is not None:
analysis = pipeline.process(analysis)
await websocket.send_json(
{
"spectrum": analysis.spectrum.tolist(),
@@ -283,5 +298,7 @@ async def test_audio_source_ws(
except Exception as e:
logger.error(f"Audio test WebSocket error for {source_id}: {e}")
finally:
if pipeline is not None:
pipeline.close()
audio_mgr.release(device_index, is_loopback, engine_type)
logger.info(f"Audio test WebSocket disconnected for source {source_id}")
@@ -5,6 +5,7 @@ Import this package to ensure all built-in filters are registered.
"""
from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef
from wled_controller.core.audio.filters.pipeline import AudioFilterPipeline
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
# Import individual filters to trigger auto-registration
@@ -24,5 +25,6 @@ import wled_controller.core.audio.filters.delay # noqa: F401
__all__ = [
"AudioFilter",
"AudioFilterOptionDef",
"AudioFilterPipeline",
"AudioFilterRegistry",
]
@@ -0,0 +1,113 @@
"""Audio filter pipeline — chains multiple AudioFilter instances sequentially.
The pipeline is the runtime executor for audio processing template chains.
It takes a flat list of FilterInstance specs, instantiates each filter via
the AudioFilterRegistry, and runs AudioAnalysis through them in order.
Thread-safe: each stream gets its own pipeline instance with independent
stateful filter state.
"""
from __future__ import annotations
import threading
from typing import TYPE_CHECKING, List
from wled_controller.core.audio.analysis import AudioAnalysis
from wled_controller.core.audio.filters.base import AudioFilter
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
from wled_controller.utils import get_logger
if TYPE_CHECKING:
from wled_controller.core.filters.filter_instance import FilterInstance
from wled_controller.storage.audio_processing_template_store import (
AudioProcessingTemplateStore,
)
logger = get_logger(__name__)
class AudioFilterPipeline:
"""Chains multiple AudioFilter instances and runs AudioAnalysis through them.
Each pipeline owns its own filter instances — stateful filters maintain
per-pipeline state, so pipelines must NOT be shared across streams.
"""
def __init__(self, filter_instances: List["FilterInstance"]) -> None:
"""Create a pipeline from FilterInstance specs.
Args:
filter_instances: Flat (already-resolved) list of FilterInstance specs.
Each is instantiated via AudioFilterRegistry.create_instance().
"""
self._lock = threading.Lock()
self._filters: List[AudioFilter] = []
for fi in filter_instances:
try:
f = AudioFilterRegistry.create_instance(fi.filter_id, fi.options)
self._filters.append(f)
except ValueError:
logger.warning("Skipping unknown audio filter '%s'", fi.filter_id)
@property
def empty(self) -> bool:
"""True when the pipeline has no filters (passthrough)."""
return len(self._filters) == 0
def process(self, analysis: AudioAnalysis) -> AudioAnalysis:
"""Run analysis through all filters in order.
Returns a new AudioAnalysis (filters produce new objects via
dataclasses.replace, never mutating the input).
"""
result = analysis
with self._lock:
for f in self._filters:
result = f.process(result)
return result
def reset(self) -> None:
"""Reset all stateful filters to their initial state."""
with self._lock:
for f in self._filters:
if f.is_stateful:
f.reset()
def close(self) -> None:
"""Release resources and clear filter list."""
with self._lock:
self._filters.clear()
def __repr__(self) -> str:
return f"AudioFilterPipeline(filters={self._filters})"
def build_pipeline_from_template_ids(
template_ids: List[str],
template_store: "AudioProcessingTemplateStore",
) -> AudioFilterPipeline:
"""Resolve a list of audio processing template IDs into a single pipeline.
Expands each template's filters (including nested audio_filter_template
references) via the store's resolve_filter_instances(), concatenates all
resolved FilterInstance lists in order, and returns a ready-to-use pipeline.
Args:
template_ids: Ordered template IDs (outermost first, as returned by
ResolvedAudioSource.audio_processing_template_ids).
template_store: AudioProcessingTemplateStore for template lookup and
recursive filter resolution.
Returns:
AudioFilterPipeline (may be empty if no templates / all templates missing).
"""
all_instances: List[FilterInstance] = []
for tid in template_ids:
try:
template = template_store.get_template(tid)
resolved = template_store.resolve_filter_instances(template.filters)
all_instances.extend(resolved)
except ValueError:
logger.warning("Audio processing template '%s' not found, skipping", tid)
return AudioFilterPipeline(all_instances)
@@ -17,6 +17,7 @@ import numpy as np
from wled_controller.core.audio.analysis import NUM_BANDS
from wled_controller.core.audio.audio_capture import AudioCaptureManager
from wled_controller.core.audio.filters.pipeline import build_pipeline_from_template_ids
from wled_controller.core.processing.color_strip_stream import ColorStripStream
from wled_controller.core.processing.effect_stream import _build_palette_lut
from wled_controller.utils import get_logger
@@ -42,11 +43,14 @@ class AudioColorStripStream(ColorStripStream):
audio_capture_manager: AudioCaptureManager,
audio_source_store=None,
audio_template_store=None,
audio_processing_template_store=None,
):
self._audio_capture_manager = audio_capture_manager
self._audio_source_store = audio_source_store
self._audio_template_store = audio_template_store
self._audio_processing_template_store = audio_processing_template_store
self._audio_stream = None # acquired on start
self._filter_pipeline = None # AudioFilterPipeline, created on start
self._colors_lock = threading.Lock()
self._running = False
@@ -104,9 +108,6 @@ class AudioColorStripStream(ColorStripStream):
self._mirror = bool(getattr(source, "mirror", False))
# Resolve audio device/template via audio_source_id
# NOTE: channel selection and band filtering are now handled by audio
# processing filters (channel_extract, band_extract) applied via
# ProcessedAudioSource chains. Filter application will be wired in Phase 4.
audio_source_id = getattr(source, "audio_source_id", "")
self._audio_source_id = audio_source_id
self._audio_engine_type = None
@@ -138,9 +139,25 @@ class AudioColorStripStream(ColorStripStream):
self._audio_device_index = -1
self._audio_loopback = True
# Build audio filter pipeline from processing template chain
self._rebuild_filter_pipeline()
with self._colors_lock:
self._colors: Optional[np.ndarray] = None
def _rebuild_filter_pipeline(self) -> None:
"""Build (or rebuild) the audio filter pipeline from processing template IDs."""
old_pipeline = self._filter_pipeline
if self._audio_processing_template_ids and self._audio_processing_template_store:
self._filter_pipeline = build_pipeline_from_template_ids(
self._audio_processing_template_ids,
self._audio_processing_template_store,
)
else:
self._filter_pipeline = None
if old_pipeline is not None:
old_pipeline.close()
# ── ColorStripStream interface ──────────────────────────────────
def configure(self, device_led_count: int) -> None:
@@ -200,6 +217,10 @@ class AudioColorStripStream(ColorStripStream):
engine_type=self._audio_engine_type,
)
self._audio_stream = None
# Close audio filter pipeline
if self._filter_pipeline is not None:
self._filter_pipeline.close()
self._filter_pipeline = None
self._prev_spectrum = None
logger.info("AudioColorStripStream stopped")
@@ -338,16 +359,17 @@ class AudioColorStripStream(ColorStripStream):
finally:
self._running = False
# ── Channel selection ─────────────────────────────────────────
# ── Filter pipeline + channel selection ──────────────────────────
def _pick_channel(self, analysis):
"""Return (spectrum, rms) from the analysis.
def _apply_filters(self, analysis):
"""Apply audio filter pipeline (if any) and return (spectrum, rms).
Channel selection and band filtering are now handled by audio processing
filters applied via ProcessedAudioSource chains (Phase 4 will wire filter
instances into the stream runtime).
The filter pipeline handles channel extraction, band extraction,
gain, noise gate, etc. as configured by the ProcessedAudioSource
template chain.
"""
# ⚠️ Temporary breakage: channel/band filtering removed — resolved in Phase 4
if self._filter_pipeline is not None:
analysis = self._filter_pipeline.process(analysis)
return analysis.spectrum, analysis.rms
# ── Spectrum Analyzer ──────────────────────────────────────────
@@ -357,7 +379,7 @@ class AudioColorStripStream(ColorStripStream):
buf[:] = 0
return
spectrum, _ = self._pick_channel(analysis)
spectrum, _ = self._apply_filters(analysis)
sensitivity = self.resolve("sensitivity", self._sensitivity)
smoothing = self.resolve("smoothing", self._smoothing)
lut = self._palette_lut
@@ -408,7 +430,7 @@ class AudioColorStripStream(ColorStripStream):
buf[:] = 0
return
_, ch_rms = self._pick_channel(analysis)
_, ch_rms = self._apply_filters(analysis)
sensitivity = self.resolve("sensitivity", self._sensitivity)
smoothing = self.resolve("smoothing", self._smoothing)
rms = ch_rms * sensitivity
@@ -89,6 +89,7 @@ class ColorStripStreamManager:
weather_manager=None,
asset_store=None,
game_event_bus=None,
audio_processing_template_store=None,
):
"""
Args:
@@ -101,6 +102,7 @@ class ColorStripStreamManager:
cspt_store: ColorStripProcessingTemplateStore for per-layer filter chains
gradient_store: GradientStore for resolving gradient entity references
game_event_bus: GameEventBus for game event stream subscriptions
audio_processing_template_store: AudioProcessingTemplateStore for filter chains
"""
self._color_strip_store = color_strip_store
self._live_stream_manager = live_stream_manager
@@ -114,6 +116,7 @@ class ColorStripStreamManager:
self._weather_manager = weather_manager
self._asset_store = asset_store
self._game_event_bus = game_event_bus
self._audio_processing_template_store = audio_processing_template_store
self._streams: Dict[str, _ColorStripEntry] = {}
def _inject_clock(self, css_stream, source) -> Optional[str]:
@@ -246,6 +249,7 @@ class ColorStripStreamManager:
self._audio_capture_manager,
self._audio_source_store,
self._audio_template_store,
self._audio_processing_template_store,
)
elif source.source_type == "composite":
from wled_controller.core.processing.composite_stream import (
@@ -498,6 +502,28 @@ class ColorStripStreamManager:
logger.info(f"Updated {len(matching_keys)} running stream(s) for source {css_id}")
def refresh_audio_filter_pipelines(self, template_id: str) -> None:
"""Rebuild audio filter pipelines for any running AudioColorStripStream
that references the given audio processing template ID.
Called when an audio processing template is updated or deleted.
"""
from wled_controller.core.processing.audio_stream import AudioColorStripStream
count = 0
for entry in self._streams.values():
stream = entry.stream
if isinstance(stream, AudioColorStripStream):
if template_id in getattr(stream, "_audio_processing_template_ids", []):
stream._rebuild_filter_pipeline()
count += 1
if count:
logger.info(
"Refreshed audio filter pipeline for %d stream(s) after template %s update",
count,
template_id,
)
def notify_target_fps(self, css_id: str, target_id: str, fps: int) -> None:
"""Register or update a consumer's target FPS.
@@ -73,6 +73,7 @@ class ProcessorDependencies:
asset_store: Optional[AssetStore] = None
ha_manager: Optional[Any] = None # HomeAssistantManager
game_event_bus: Optional[Any] = None # GameEventBus
audio_processing_template_store: Optional[Any] = None # AudioProcessingTemplateStore
@dataclass
@@ -153,6 +154,7 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
weather_manager=deps.weather_manager,
asset_store=deps.asset_store,
game_event_bus=deps.game_event_bus,
audio_processing_template_store=deps.audio_processing_template_store,
)
self._value_stream_manager = (
ValueStreamManager(
@@ -165,6 +167,7 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
css_stream_manager=self._color_strip_stream_manager,
gradient_store=deps.gradient_store,
event_bus=deps.game_event_bus,
audio_processing_template_store=deps.audio_processing_template_store,
)
if deps.value_source_store
else None
@@ -194,6 +197,13 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
def color_strip_stream_manager(self) -> ColorStripStreamManager:
return self._color_strip_stream_manager
def refresh_audio_filter_pipelines(self, template_id: str) -> None:
"""Rebuild audio filter pipelines across all running streams when a
template is updated or deleted. Dispatches to both CSS and value stream managers."""
self._color_strip_stream_manager.refresh_audio_filter_pipelines(template_id)
if self._value_stream_manager:
self._value_stream_manager.refresh_audio_filter_pipelines(template_id)
# ===== SHARED CONTEXT (passed to target processors) =====
def _build_context(self) -> TargetContext:
@@ -178,6 +178,7 @@ class AudioValueStream(ValueStream):
audio_capture_manager: Optional["AudioCaptureManager"] = None,
audio_source_store: Optional["AudioSourceStore"] = None,
audio_template_store=None,
audio_processing_template_store=None,
):
self._audio_source_id = audio_source_id
self._mode = mode
@@ -191,6 +192,7 @@ class AudioValueStream(ValueStream):
self._audio_capture_manager = audio_capture_manager
self._audio_source_store = audio_source_store
self._audio_template_store = audio_template_store
self._audio_processing_template_store = audio_processing_template_store
# Resolved audio device params
self._audio_device_index = -1
@@ -200,6 +202,7 @@ class AudioValueStream(ValueStream):
self._audio_processing_template_ids: list = []
self._audio_stream = None
self._filter_pipeline = None # AudioFilterPipeline
self._prev_value = 0.0
self._beat_brightness = 0.0
@@ -208,9 +211,7 @@ class AudioValueStream(ValueStream):
def _resolve_audio_source(self) -> None:
"""Resolve audio source to device index / engine info / processing template IDs.
Channel selection and band filtering are now handled by audio processing
filters applied via ProcessedAudioSource chains (Phase 4 will wire filter
instances into the stream runtime).
Builds the audio filter pipeline from the processing template chain.
"""
if self._audio_source_id and self._audio_source_store:
try:
@@ -232,6 +233,22 @@ class AudioValueStream(ValueStream):
pass
except ValueError as e:
logger.warning(f"Failed to resolve audio source {self._audio_source_id}: {e}")
self._rebuild_filter_pipeline()
def _rebuild_filter_pipeline(self) -> None:
"""Build (or rebuild) the audio filter pipeline from processing template IDs."""
from wled_controller.core.audio.filters.pipeline import build_pipeline_from_template_ids
old_pipeline = self._filter_pipeline
if self._audio_processing_template_ids and self._audio_processing_template_store:
self._filter_pipeline = build_pipeline_from_template_ids(
self._audio_processing_template_ids,
self._audio_processing_template_store,
)
else:
self._filter_pipeline = None
if old_pipeline is not None:
old_pipeline.close()
def start(self) -> None:
if self._audio_capture_manager is None:
@@ -255,6 +272,9 @@ class AudioValueStream(ValueStream):
engine_type=self._audio_engine_type,
)
self._audio_stream = None
if self._filter_pipeline is not None:
self._filter_pipeline.close()
self._filter_pipeline = None
self._prev_value = 0.0
self._beat_brightness = 0.0
@@ -285,7 +305,13 @@ class AudioValueStream(ValueStream):
return max(0.0, min(1.0, mapped))
def _extract_raw(self, analysis) -> float:
"""Extract raw scalar from audio analysis based on mode."""
"""Extract raw scalar from audio analysis based on mode.
Applies the audio filter pipeline (if any) before extracting the scalar.
Channel extraction, band filtering, gain, etc. are handled by filters.
"""
if self._filter_pipeline is not None:
analysis = self._filter_pipeline.process(analysis)
if self._mode == "peak":
return self._pick_peak(analysis)
if self._mode == "beat":
@@ -294,13 +320,9 @@ class AudioValueStream(ValueStream):
return self._pick_rms(analysis)
def _pick_rms(self, analysis) -> float:
# ⚠️ Temporary breakage: channel selection removed — resolved in Phase 4
# Channel selection is now handled by audio processing filters
# (channel_extract) applied via ProcessedAudioSource chains.
return getattr(analysis, "rms", 0.0)
def _pick_peak(self, analysis) -> float:
# ⚠️ Temporary breakage: channel selection removed — resolved in Phase 4
return getattr(analysis, "peak", 0.0)
def _compute_beat(self, analysis) -> float:
@@ -1411,6 +1433,7 @@ class ValueStreamManager:
css_stream_manager: Optional["ColorStripStreamManager"] = None,
gradient_store: Optional[Any] = None,
event_bus: Optional["GameEventBus"] = None,
audio_processing_template_store=None,
):
self._value_source_store = value_source_store
self._audio_capture_manager = audio_capture_manager
@@ -1421,6 +1444,7 @@ class ValueStreamManager:
self._css_stream_manager = css_stream_manager
self._gradient_store = gradient_store
self._event_bus = event_bus
self._audio_processing_template_store = audio_processing_template_store
self._streams: Dict[str, ValueStream] = {} # vs_id → stream
self._ref_counts: Dict[str, int] = {} # vs_id → ref count
@@ -1473,6 +1497,25 @@ class ValueStreamManager:
stream.update_source(source)
logger.debug(f"Updated value stream {vs_id}")
def refresh_audio_filter_pipelines(self, template_id: str) -> None:
"""Rebuild audio filter pipelines for any running AudioValueStream
that references the given audio processing template ID.
Called when an audio processing template is updated or deleted.
"""
count = 0
for stream in self._streams.values():
if isinstance(stream, AudioValueStream):
if template_id in getattr(stream, "_audio_processing_template_ids", []):
stream._rebuild_filter_pipeline()
count += 1
if count:
logger.info(
"Refreshed audio filter pipeline for %d value stream(s) after template %s update",
count,
template_id,
)
def release_all(self) -> None:
"""Stop and remove all managed streams. Called on shutdown."""
for vs_id, stream in self._streams.items():
@@ -1525,6 +1568,7 @@ class ValueStreamManager:
audio_capture_manager=self._audio_capture_manager,
audio_source_store=self._audio_source_store,
audio_template_store=self._audio_template_store,
audio_processing_template_store=self._audio_processing_template_store,
)
if isinstance(source, DaylightValueSource):
+1
View File
@@ -128,6 +128,7 @@ processor_manager = ProcessorManager(
asset_store=asset_store,
ha_manager=ha_manager,
game_event_bus=game_event_bus,
audio_processing_template_store=audio_processing_template_store,
)
)