From ab43578049b16f2c0c8a143bce6305cedf10e223 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Tue, 31 Mar 2026 19:15:29 +0300 Subject: [PATCH] feat(processed-audio-sources): phase 4 - runtime filter integration Add AudioFilterPipeline for chained filter execution on AudioAnalysis. Wire filter pipelines into AudioColorStripStream, AudioValueStream, and WebSocket test endpoint. Add hot-update support via ProcessorManager.refresh_audio_filter_pipelines(). Thread AudioProcessingTemplateStore through dependency injection hierarchy. --- plans/processed-audio-sources/CONTEXT.md | 4 +- plans/processed-audio-sources/PLAN.md | 2 +- .../phase-4-runtime-integration.md | 45 +++++-- .../api/routes/audio_processing_templates.py | 13 ++ .../api/routes/audio_sources.py | 25 +++- .../core/audio/filters/__init__.py | 2 + .../core/audio/filters/pipeline.py | 113 ++++++++++++++++++ .../core/processing/audio_stream.py | 46 +++++-- .../processing/color_strip_stream_manager.py | 26 ++++ .../core/processing/processor_manager.py | 10 ++ .../core/processing/value_stream.py | 60 ++++++++-- server/src/wled_controller/main.py | 1 + 12 files changed, 309 insertions(+), 38 deletions(-) create mode 100644 server/src/wled_controller/core/audio/filters/pipeline.py diff --git a/plans/processed-audio-sources/CONTEXT.md b/plans/processed-audio-sources/CONTEXT.md index da84bd8..15e19d8 100644 --- a/plans/processed-audio-sources/CONTEXT.md +++ b/plans/processed-audio-sources/CONTEXT.md @@ -9,7 +9,7 @@ - **Test:** `cd server && py -3.13 -m pytest tests/ --no-cov -q` ## Current State -Phase 1 (Audio Filter Framework), Phase 2 (Audio Filters), and Phase 3 (Processed Audio Source Model) implemented. +Phases 1-4 implemented. Phase 4 (Runtime Integration) wired the audio filter pipeline into the stream runtime. Phase 1 framework: - `AudioFilter` base class, `AudioFilterRegistry`, `AudioFilterOptionDef` in `core/audio/filters/` @@ -93,7 +93,7 @@ _(none yet)_ | Phase 1 | impl-agent | — | No | Tasks 7+8 skipped (SQLite migration made them obsolete) | | Phase 2 | impl-agent | — | No | All 11 filters implemented, no deviations | | Phase 3 | impl-agent | — | No | All 11 tasks done; channel/band logic deferred to Phase 4 | -| Phase 4 | — | — | — | — | +| Phase 4 | impl-agent | — | No | All 6 tasks done; dependency injection threaded through | | Phase 5 | — | — | — | — | | Phase 6 | — | — | — | — | | Phase 7 | — | — | — | — | diff --git a/plans/processed-audio-sources/PLAN.md b/plans/processed-audio-sources/PLAN.md index 25cb20d..76696fc 100644 --- a/plans/processed-audio-sources/PLAN.md +++ b/plans/processed-audio-sources/PLAN.md @@ -42,7 +42,7 @@ Clean-slate approach: no data migration for old source types. | Phase 1: Audio Filter Framework | backend | 🔨 In Progress | ⬜ | ⬜ | ⬜ | | Phase 2: Audio Filters | backend | 🔨 In Progress | ⬜ | ⬜ | ⬜ | | Phase 3: Processed Audio Source Model | backend | ✅ Done | ⬜ | ⬜ | ⬜ | -| Phase 4: Runtime Integration | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | +| Phase 4: Runtime Integration | backend | ✅ Done | ⬜ | ⬜ | ⬜ | | Phase 5: Frontend — Audio Processing Templates | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | | Phase 6: Frontend — Source Types | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | | Phase 7: Testing & Polish | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | diff --git a/plans/processed-audio-sources/phase-4-runtime-integration.md b/plans/processed-audio-sources/phase-4-runtime-integration.md index 5da9407..2fc2d07 100644 --- a/plans/processed-audio-sources/phase-4-runtime-integration.md +++ b/plans/processed-audio-sources/phase-4-runtime-integration.md @@ -1,6 +1,6 @@ # Phase 4: Runtime Integration -**Status:** ⬜ Not Started +**Status:** ✅ Done **Parent plan:** [PLAN.md](./PLAN.md) **Domain:** backend @@ -9,7 +9,7 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P ## Tasks -- [ ] Task 1: Create filter pipeline executor in `core/audio/filters/pipeline.py` +- [x] Task 1: Create filter pipeline executor in `core/audio/filters/pipeline.py` - `AudioFilterPipeline` class: - `__init__(filter_instances: List[FilterInstance], registry: AudioFilterRegistry)` - Instantiates all filters from FilterInstance specs @@ -17,7 +17,7 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P - `reset()` — resets all stateful filters - `close()` — cleanup resources - Handles stateful filter lifecycle (create on init, reset on demand, close on cleanup) -- [ ] Task 2: Update `AudioColorStripStream` in `core/processing/audio_stream.py` +- [x] Task 2: Update `AudioColorStripStream` in `core/processing/audio_stream.py` - On construction: if source is ProcessedAudioSource, resolve the full chain: - Walk to CaptureAudioSource for device info - Collect all AudioProcessingTemplates along the chain @@ -26,18 +26,18 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P - In render loop: after getting AudioAnalysis from ManagedAudioStream, run it through the filter pipeline before visualization - Remove old inline channel selection and band filtering code (now handled by filters) - On stop: close the filter pipeline -- [ ] Task 3: Update `AudioValueStream` in `core/processing/value_stream.py` +- [x] Task 3: Update `AudioValueStream` in `core/processing/value_stream.py` - Same pattern: resolve ProcessedAudioSource chain, create filter pipeline, apply in get_value() - Remove old inline channel/band handling -- [ ] Task 4: Hot-update support for filter templates +- [x] Task 4: Hot-update support for filter templates - When an AudioProcessingTemplate is updated, running streams that use it should re-resolve their filter pipeline - Listen for template update events (or implement a refresh mechanism) - Re-create AudioFilterPipeline with updated filter instances - Reset stateful filter state on pipeline refresh -- [ ] Task 5: Update WebSocket test endpoint in `api/routes/audio_sources.py` +- [x] Task 5: Update WebSocket test endpoint in `api/routes/audio_sources.py` - For ProcessedAudioSource: resolve chain, create pipeline, apply filters to test stream data - Return filtered analysis in real-time over WebSocket -- [ ] Task 6: Update any code that calls `AudioSourceStore.resolve_audio_source()` to handle the new return shape +- [x] Task 6: Update any code that calls `AudioSourceStore.resolve_audio_source()` to handle the new return shape ## Files to Modify/Create - `core/audio/filters/pipeline.py` — **create** — AudioFilterPipeline @@ -61,11 +61,34 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P - The filter pipeline should produce a new AudioAnalysis each time (immutability), not mutate the shared snapshot from ManagedAudioStream. ## Review Checklist -- [ ] All tasks completed -- [ ] Code follows project conventions -- [ ] No unintended side effects +- [x] All tasks completed +- [x] Code follows project conventions +- [x] No unintended side effects - [ ] Build passes - [ ] Tests pass (new + existing) ## Handoff to Next Phase - + +### What was built +- `AudioFilterPipeline` class in `core/audio/filters/pipeline.py` — thread-safe pipeline that instantiates, processes, resets, and closes audio filters +- `build_pipeline_from_template_ids()` helper — resolves template IDs to FilterInstance lists and creates a pipeline +- `AudioColorStripStream` now builds and applies filter pipeline: `_rebuild_filter_pipeline()` called on construction and source update; `_apply_filters()` replaces `_pick_channel()` in render loop; pipeline closed on stop +- `AudioValueStream` now builds and applies filter pipeline: `_rebuild_filter_pipeline()` called on construction; filters applied in `_extract_raw()` before scalar extraction; pipeline closed on stop +- Hot-update: `ProcessorManager.refresh_audio_filter_pipelines(template_id)` dispatches to both CSS and value stream managers; called from audio processing template update/delete routes +- WebSocket test endpoint creates a filter pipeline from the resolved template chain and applies it to analysis before sending +- Dependency injection: `audio_processing_template_store` threaded through `ProcessorDependencies` -> `ProcessorManager` -> `ColorStripStreamManager` / `ValueStreamManager` -> `AudioColorStripStream` / `AudioValueStream` + +### What Phase 5 needs to know +- The backend is fully wired: creating a ProcessedAudioSource that references templates with channel_extract, band_extract, gain, etc. filters will apply those filters to live audio data +- The WebSocket test endpoint shows filtered analysis in real-time +- Frontend needs to provide UI for creating/editing AudioProcessingTemplates and ProcessedAudioSources +- Filter pipeline is per-stream-instance (not shared) — stateful filters maintain independent state + +### Temporary breakages resolved +- `_pick_channel()` removed from AudioColorStripStream — replaced by `_apply_filters()` which uses the filter pipeline +- Channel/band handling in AudioValueStream now goes through filter pipeline +- All "Phase 4 will wire..." comments resolved + +### Known deviations from plan +- `AudioFilterPipeline.__init__` takes `List[FilterInstance]` only (not `AudioFilterRegistry` — uses the class-level registry directly via `AudioFilterRegistry.create_instance()`) +- Hot-update uses explicit method calls from the route handler rather than an event-listener pattern — simpler and avoids the need for a subscription system diff --git a/server/src/wled_controller/api/routes/audio_processing_templates.py b/server/src/wled_controller/api/routes/audio_processing_templates.py index bbba419..ec60cb3 100644 --- a/server/src/wled_controller/api/routes/audio_processing_templates.py +++ b/server/src/wled_controller/api/routes/audio_processing_templates.py @@ -6,6 +6,7 @@ from wled_controller.api.auth import AuthRequired from wled_controller.api.dependencies import ( fire_entity_event, get_audio_processing_template_store, + get_processor_manager, ) from wled_controller.api.schemas.audio_processing import ( AudioProcessingTemplateCreate, @@ -129,6 +130,12 @@ async def update_audio_processing_template( tags=data.tags, ) fire_entity_event("audio_processing_template", "updated", template_id) + # Hot-update: rebuild filter pipelines for running streams using this template + try: + pm = get_processor_manager() + pm.refresh_audio_filter_pipelines(template_id) + except Exception: + pass # Non-critical: streams will pick up changes on next restart return _apt_to_response(template) except EntityNotFoundError as e: raise HTTPException(status_code=404, detail=str(e)) @@ -154,6 +161,12 @@ async def delete_audio_processing_template( # TODO: Phase 3 will add reference checks against ProcessedAudioSource store.delete_template(template_id) fire_entity_event("audio_processing_template", "deleted", template_id) + # Hot-update: rebuild filter pipelines for running streams that used this template + try: + pm = get_processor_manager() + pm.refresh_audio_filter_pipelines(template_id) + except Exception: + pass # Non-critical except HTTPException: raise except EntityNotFoundError as e: diff --git a/server/src/wled_controller/api/routes/audio_sources.py b/server/src/wled_controller/api/routes/audio_sources.py index 0e779a5..cddb2cc 100644 --- a/server/src/wled_controller/api/routes/audio_sources.py +++ b/server/src/wled_controller/api/routes/audio_sources.py @@ -9,6 +9,7 @@ from starlette.websockets import WebSocket, WebSocketDisconnect from wled_controller.api.auth import AuthRequired from wled_controller.api.dependencies import ( fire_entity_event, + get_audio_processing_template_store, get_audio_source_store, get_audio_template_store, get_color_strip_store, @@ -210,11 +211,12 @@ async def test_audio_source_ws( ManagedAudioStream (ref-counted — shares with running targets), and streams AudioAnalysis snapshots as JSON at ~20 Hz. - NOTE: Audio processing filters from the template chain are NOT applied in - this WebSocket yet — that will be wired in Phase 4 when the stream runtime - integrates filter instances. + Audio processing filters from the template chain are applied to the + analysis before sending, so the WebSocket output matches what running + streams see. """ from wled_controller.api.auth import verify_ws_token + from wled_controller.core.audio.filters.pipeline import build_pipeline_from_template_ids if not verify_ws_token(token): await websocket.close(code=4001, reason="Unauthorized") @@ -223,6 +225,7 @@ async def test_audio_source_ws( # Resolve source → device info + processing template chain store = get_audio_source_store() template_store = get_audio_template_store() + apt_store = get_audio_processing_template_store() manager = get_processor_manager() try: @@ -247,6 +250,15 @@ async def test_audio_source_ws( logger.debug("Audio template not found, falling back to best available engine: %s", e) pass # Fall back to best available engine + # Build filter pipeline from processing template chain + pipeline = None + if resolved.audio_processing_template_ids and apt_store: + pipeline = build_pipeline_from_template_ids( + resolved.audio_processing_template_ids, apt_store + ) + if pipeline.empty: + pipeline = None + # Acquire shared audio stream audio_mgr = manager.audio_capture_manager try: @@ -265,7 +277,10 @@ async def test_audio_source_ws( if analysis is not None and analysis.timestamp != last_ts: last_ts = analysis.timestamp - # Send raw analysis — filter processing will be added in Phase 4 + # Apply filter pipeline (channel extract, band extract, gain, etc.) + if pipeline is not None: + analysis = pipeline.process(analysis) + await websocket.send_json( { "spectrum": analysis.spectrum.tolist(), @@ -283,5 +298,7 @@ async def test_audio_source_ws( except Exception as e: logger.error(f"Audio test WebSocket error for {source_id}: {e}") finally: + if pipeline is not None: + pipeline.close() audio_mgr.release(device_index, is_loopback, engine_type) logger.info(f"Audio test WebSocket disconnected for source {source_id}") diff --git a/server/src/wled_controller/core/audio/filters/__init__.py b/server/src/wled_controller/core/audio/filters/__init__.py index 456d997..ca28a6e 100644 --- a/server/src/wled_controller/core/audio/filters/__init__.py +++ b/server/src/wled_controller/core/audio/filters/__init__.py @@ -5,6 +5,7 @@ Import this package to ensure all built-in filters are registered. """ from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef +from wled_controller.core.audio.filters.pipeline import AudioFilterPipeline from wled_controller.core.audio.filters.registry import AudioFilterRegistry # Import individual filters to trigger auto-registration @@ -24,5 +25,6 @@ import wled_controller.core.audio.filters.delay # noqa: F401 __all__ = [ "AudioFilter", "AudioFilterOptionDef", + "AudioFilterPipeline", "AudioFilterRegistry", ] diff --git a/server/src/wled_controller/core/audio/filters/pipeline.py b/server/src/wled_controller/core/audio/filters/pipeline.py new file mode 100644 index 0000000..333c8ad --- /dev/null +++ b/server/src/wled_controller/core/audio/filters/pipeline.py @@ -0,0 +1,113 @@ +"""Audio filter pipeline — chains multiple AudioFilter instances sequentially. + +The pipeline is the runtime executor for audio processing template chains. +It takes a flat list of FilterInstance specs, instantiates each filter via +the AudioFilterRegistry, and runs AudioAnalysis through them in order. + +Thread-safe: each stream gets its own pipeline instance with independent +stateful filter state. +""" + +from __future__ import annotations + +import threading +from typing import TYPE_CHECKING, List + +from wled_controller.core.audio.analysis import AudioAnalysis +from wled_controller.core.audio.filters.base import AudioFilter +from wled_controller.core.audio.filters.registry import AudioFilterRegistry +from wled_controller.utils import get_logger + +if TYPE_CHECKING: + from wled_controller.core.filters.filter_instance import FilterInstance + from wled_controller.storage.audio_processing_template_store import ( + AudioProcessingTemplateStore, + ) + +logger = get_logger(__name__) + + +class AudioFilterPipeline: + """Chains multiple AudioFilter instances and runs AudioAnalysis through them. + + Each pipeline owns its own filter instances — stateful filters maintain + per-pipeline state, so pipelines must NOT be shared across streams. + """ + + def __init__(self, filter_instances: List["FilterInstance"]) -> None: + """Create a pipeline from FilterInstance specs. + + Args: + filter_instances: Flat (already-resolved) list of FilterInstance specs. + Each is instantiated via AudioFilterRegistry.create_instance(). + """ + self._lock = threading.Lock() + self._filters: List[AudioFilter] = [] + for fi in filter_instances: + try: + f = AudioFilterRegistry.create_instance(fi.filter_id, fi.options) + self._filters.append(f) + except ValueError: + logger.warning("Skipping unknown audio filter '%s'", fi.filter_id) + + @property + def empty(self) -> bool: + """True when the pipeline has no filters (passthrough).""" + return len(self._filters) == 0 + + def process(self, analysis: AudioAnalysis) -> AudioAnalysis: + """Run analysis through all filters in order. + + Returns a new AudioAnalysis (filters produce new objects via + dataclasses.replace, never mutating the input). + """ + result = analysis + with self._lock: + for f in self._filters: + result = f.process(result) + return result + + def reset(self) -> None: + """Reset all stateful filters to their initial state.""" + with self._lock: + for f in self._filters: + if f.is_stateful: + f.reset() + + def close(self) -> None: + """Release resources and clear filter list.""" + with self._lock: + self._filters.clear() + + def __repr__(self) -> str: + return f"AudioFilterPipeline(filters={self._filters})" + + +def build_pipeline_from_template_ids( + template_ids: List[str], + template_store: "AudioProcessingTemplateStore", +) -> AudioFilterPipeline: + """Resolve a list of audio processing template IDs into a single pipeline. + + Expands each template's filters (including nested audio_filter_template + references) via the store's resolve_filter_instances(), concatenates all + resolved FilterInstance lists in order, and returns a ready-to-use pipeline. + + Args: + template_ids: Ordered template IDs (outermost first, as returned by + ResolvedAudioSource.audio_processing_template_ids). + template_store: AudioProcessingTemplateStore for template lookup and + recursive filter resolution. + + Returns: + AudioFilterPipeline (may be empty if no templates / all templates missing). + """ + all_instances: List[FilterInstance] = [] + for tid in template_ids: + try: + template = template_store.get_template(tid) + resolved = template_store.resolve_filter_instances(template.filters) + all_instances.extend(resolved) + except ValueError: + logger.warning("Audio processing template '%s' not found, skipping", tid) + return AudioFilterPipeline(all_instances) diff --git a/server/src/wled_controller/core/processing/audio_stream.py b/server/src/wled_controller/core/processing/audio_stream.py index b410051..cd3e567 100644 --- a/server/src/wled_controller/core/processing/audio_stream.py +++ b/server/src/wled_controller/core/processing/audio_stream.py @@ -17,6 +17,7 @@ import numpy as np from wled_controller.core.audio.analysis import NUM_BANDS from wled_controller.core.audio.audio_capture import AudioCaptureManager +from wled_controller.core.audio.filters.pipeline import build_pipeline_from_template_ids from wled_controller.core.processing.color_strip_stream import ColorStripStream from wled_controller.core.processing.effect_stream import _build_palette_lut from wled_controller.utils import get_logger @@ -42,11 +43,14 @@ class AudioColorStripStream(ColorStripStream): audio_capture_manager: AudioCaptureManager, audio_source_store=None, audio_template_store=None, + audio_processing_template_store=None, ): self._audio_capture_manager = audio_capture_manager self._audio_source_store = audio_source_store self._audio_template_store = audio_template_store + self._audio_processing_template_store = audio_processing_template_store self._audio_stream = None # acquired on start + self._filter_pipeline = None # AudioFilterPipeline, created on start self._colors_lock = threading.Lock() self._running = False @@ -104,9 +108,6 @@ class AudioColorStripStream(ColorStripStream): self._mirror = bool(getattr(source, "mirror", False)) # Resolve audio device/template via audio_source_id - # NOTE: channel selection and band filtering are now handled by audio - # processing filters (channel_extract, band_extract) applied via - # ProcessedAudioSource chains. Filter application will be wired in Phase 4. audio_source_id = getattr(source, "audio_source_id", "") self._audio_source_id = audio_source_id self._audio_engine_type = None @@ -138,9 +139,25 @@ class AudioColorStripStream(ColorStripStream): self._audio_device_index = -1 self._audio_loopback = True + # Build audio filter pipeline from processing template chain + self._rebuild_filter_pipeline() + with self._colors_lock: self._colors: Optional[np.ndarray] = None + def _rebuild_filter_pipeline(self) -> None: + """Build (or rebuild) the audio filter pipeline from processing template IDs.""" + old_pipeline = self._filter_pipeline + if self._audio_processing_template_ids and self._audio_processing_template_store: + self._filter_pipeline = build_pipeline_from_template_ids( + self._audio_processing_template_ids, + self._audio_processing_template_store, + ) + else: + self._filter_pipeline = None + if old_pipeline is not None: + old_pipeline.close() + # ── ColorStripStream interface ────────────────────────────────── def configure(self, device_led_count: int) -> None: @@ -200,6 +217,10 @@ class AudioColorStripStream(ColorStripStream): engine_type=self._audio_engine_type, ) self._audio_stream = None + # Close audio filter pipeline + if self._filter_pipeline is not None: + self._filter_pipeline.close() + self._filter_pipeline = None self._prev_spectrum = None logger.info("AudioColorStripStream stopped") @@ -338,16 +359,17 @@ class AudioColorStripStream(ColorStripStream): finally: self._running = False - # ── Channel selection ───────────────────────────────────────── + # ── Filter pipeline + channel selection ────────────────────────── - def _pick_channel(self, analysis): - """Return (spectrum, rms) from the analysis. + def _apply_filters(self, analysis): + """Apply audio filter pipeline (if any) and return (spectrum, rms). - Channel selection and band filtering are now handled by audio processing - filters applied via ProcessedAudioSource chains (Phase 4 will wire filter - instances into the stream runtime). + The filter pipeline handles channel extraction, band extraction, + gain, noise gate, etc. as configured by the ProcessedAudioSource + template chain. """ - # ⚠️ Temporary breakage: channel/band filtering removed — resolved in Phase 4 + if self._filter_pipeline is not None: + analysis = self._filter_pipeline.process(analysis) return analysis.spectrum, analysis.rms # ── Spectrum Analyzer ────────────────────────────────────────── @@ -357,7 +379,7 @@ class AudioColorStripStream(ColorStripStream): buf[:] = 0 return - spectrum, _ = self._pick_channel(analysis) + spectrum, _ = self._apply_filters(analysis) sensitivity = self.resolve("sensitivity", self._sensitivity) smoothing = self.resolve("smoothing", self._smoothing) lut = self._palette_lut @@ -408,7 +430,7 @@ class AudioColorStripStream(ColorStripStream): buf[:] = 0 return - _, ch_rms = self._pick_channel(analysis) + _, ch_rms = self._apply_filters(analysis) sensitivity = self.resolve("sensitivity", self._sensitivity) smoothing = self.resolve("smoothing", self._smoothing) rms = ch_rms * sensitivity diff --git a/server/src/wled_controller/core/processing/color_strip_stream_manager.py b/server/src/wled_controller/core/processing/color_strip_stream_manager.py index 11d43b2..79180eb 100644 --- a/server/src/wled_controller/core/processing/color_strip_stream_manager.py +++ b/server/src/wled_controller/core/processing/color_strip_stream_manager.py @@ -89,6 +89,7 @@ class ColorStripStreamManager: weather_manager=None, asset_store=None, game_event_bus=None, + audio_processing_template_store=None, ): """ Args: @@ -101,6 +102,7 @@ class ColorStripStreamManager: cspt_store: ColorStripProcessingTemplateStore for per-layer filter chains gradient_store: GradientStore for resolving gradient entity references game_event_bus: GameEventBus for game event stream subscriptions + audio_processing_template_store: AudioProcessingTemplateStore for filter chains """ self._color_strip_store = color_strip_store self._live_stream_manager = live_stream_manager @@ -114,6 +116,7 @@ class ColorStripStreamManager: self._weather_manager = weather_manager self._asset_store = asset_store self._game_event_bus = game_event_bus + self._audio_processing_template_store = audio_processing_template_store self._streams: Dict[str, _ColorStripEntry] = {} def _inject_clock(self, css_stream, source) -> Optional[str]: @@ -246,6 +249,7 @@ class ColorStripStreamManager: self._audio_capture_manager, self._audio_source_store, self._audio_template_store, + self._audio_processing_template_store, ) elif source.source_type == "composite": from wled_controller.core.processing.composite_stream import ( @@ -498,6 +502,28 @@ class ColorStripStreamManager: logger.info(f"Updated {len(matching_keys)} running stream(s) for source {css_id}") + def refresh_audio_filter_pipelines(self, template_id: str) -> None: + """Rebuild audio filter pipelines for any running AudioColorStripStream + that references the given audio processing template ID. + + Called when an audio processing template is updated or deleted. + """ + from wled_controller.core.processing.audio_stream import AudioColorStripStream + + count = 0 + for entry in self._streams.values(): + stream = entry.stream + if isinstance(stream, AudioColorStripStream): + if template_id in getattr(stream, "_audio_processing_template_ids", []): + stream._rebuild_filter_pipeline() + count += 1 + if count: + logger.info( + "Refreshed audio filter pipeline for %d stream(s) after template %s update", + count, + template_id, + ) + def notify_target_fps(self, css_id: str, target_id: str, fps: int) -> None: """Register or update a consumer's target FPS. diff --git a/server/src/wled_controller/core/processing/processor_manager.py b/server/src/wled_controller/core/processing/processor_manager.py index d90591b..1bf7e5c 100644 --- a/server/src/wled_controller/core/processing/processor_manager.py +++ b/server/src/wled_controller/core/processing/processor_manager.py @@ -73,6 +73,7 @@ class ProcessorDependencies: asset_store: Optional[AssetStore] = None ha_manager: Optional[Any] = None # HomeAssistantManager game_event_bus: Optional[Any] = None # GameEventBus + audio_processing_template_store: Optional[Any] = None # AudioProcessingTemplateStore @dataclass @@ -153,6 +154,7 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin) weather_manager=deps.weather_manager, asset_store=deps.asset_store, game_event_bus=deps.game_event_bus, + audio_processing_template_store=deps.audio_processing_template_store, ) self._value_stream_manager = ( ValueStreamManager( @@ -165,6 +167,7 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin) css_stream_manager=self._color_strip_stream_manager, gradient_store=deps.gradient_store, event_bus=deps.game_event_bus, + audio_processing_template_store=deps.audio_processing_template_store, ) if deps.value_source_store else None @@ -194,6 +197,13 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin) def color_strip_stream_manager(self) -> ColorStripStreamManager: return self._color_strip_stream_manager + def refresh_audio_filter_pipelines(self, template_id: str) -> None: + """Rebuild audio filter pipelines across all running streams when a + template is updated or deleted. Dispatches to both CSS and value stream managers.""" + self._color_strip_stream_manager.refresh_audio_filter_pipelines(template_id) + if self._value_stream_manager: + self._value_stream_manager.refresh_audio_filter_pipelines(template_id) + # ===== SHARED CONTEXT (passed to target processors) ===== def _build_context(self) -> TargetContext: diff --git a/server/src/wled_controller/core/processing/value_stream.py b/server/src/wled_controller/core/processing/value_stream.py index 279b07b..05ee64d 100644 --- a/server/src/wled_controller/core/processing/value_stream.py +++ b/server/src/wled_controller/core/processing/value_stream.py @@ -178,6 +178,7 @@ class AudioValueStream(ValueStream): audio_capture_manager: Optional["AudioCaptureManager"] = None, audio_source_store: Optional["AudioSourceStore"] = None, audio_template_store=None, + audio_processing_template_store=None, ): self._audio_source_id = audio_source_id self._mode = mode @@ -191,6 +192,7 @@ class AudioValueStream(ValueStream): self._audio_capture_manager = audio_capture_manager self._audio_source_store = audio_source_store self._audio_template_store = audio_template_store + self._audio_processing_template_store = audio_processing_template_store # Resolved audio device params self._audio_device_index = -1 @@ -200,6 +202,7 @@ class AudioValueStream(ValueStream): self._audio_processing_template_ids: list = [] self._audio_stream = None + self._filter_pipeline = None # AudioFilterPipeline self._prev_value = 0.0 self._beat_brightness = 0.0 @@ -208,9 +211,7 @@ class AudioValueStream(ValueStream): def _resolve_audio_source(self) -> None: """Resolve audio source to device index / engine info / processing template IDs. - Channel selection and band filtering are now handled by audio processing - filters applied via ProcessedAudioSource chains (Phase 4 will wire filter - instances into the stream runtime). + Builds the audio filter pipeline from the processing template chain. """ if self._audio_source_id and self._audio_source_store: try: @@ -232,6 +233,22 @@ class AudioValueStream(ValueStream): pass except ValueError as e: logger.warning(f"Failed to resolve audio source {self._audio_source_id}: {e}") + self._rebuild_filter_pipeline() + + def _rebuild_filter_pipeline(self) -> None: + """Build (or rebuild) the audio filter pipeline from processing template IDs.""" + from wled_controller.core.audio.filters.pipeline import build_pipeline_from_template_ids + + old_pipeline = self._filter_pipeline + if self._audio_processing_template_ids and self._audio_processing_template_store: + self._filter_pipeline = build_pipeline_from_template_ids( + self._audio_processing_template_ids, + self._audio_processing_template_store, + ) + else: + self._filter_pipeline = None + if old_pipeline is not None: + old_pipeline.close() def start(self) -> None: if self._audio_capture_manager is None: @@ -255,6 +272,9 @@ class AudioValueStream(ValueStream): engine_type=self._audio_engine_type, ) self._audio_stream = None + if self._filter_pipeline is not None: + self._filter_pipeline.close() + self._filter_pipeline = None self._prev_value = 0.0 self._beat_brightness = 0.0 @@ -285,7 +305,13 @@ class AudioValueStream(ValueStream): return max(0.0, min(1.0, mapped)) def _extract_raw(self, analysis) -> float: - """Extract raw scalar from audio analysis based on mode.""" + """Extract raw scalar from audio analysis based on mode. + + Applies the audio filter pipeline (if any) before extracting the scalar. + Channel extraction, band filtering, gain, etc. are handled by filters. + """ + if self._filter_pipeline is not None: + analysis = self._filter_pipeline.process(analysis) if self._mode == "peak": return self._pick_peak(analysis) if self._mode == "beat": @@ -294,13 +320,9 @@ class AudioValueStream(ValueStream): return self._pick_rms(analysis) def _pick_rms(self, analysis) -> float: - # ⚠️ Temporary breakage: channel selection removed — resolved in Phase 4 - # Channel selection is now handled by audio processing filters - # (channel_extract) applied via ProcessedAudioSource chains. return getattr(analysis, "rms", 0.0) def _pick_peak(self, analysis) -> float: - # ⚠️ Temporary breakage: channel selection removed — resolved in Phase 4 return getattr(analysis, "peak", 0.0) def _compute_beat(self, analysis) -> float: @@ -1411,6 +1433,7 @@ class ValueStreamManager: css_stream_manager: Optional["ColorStripStreamManager"] = None, gradient_store: Optional[Any] = None, event_bus: Optional["GameEventBus"] = None, + audio_processing_template_store=None, ): self._value_source_store = value_source_store self._audio_capture_manager = audio_capture_manager @@ -1421,6 +1444,7 @@ class ValueStreamManager: self._css_stream_manager = css_stream_manager self._gradient_store = gradient_store self._event_bus = event_bus + self._audio_processing_template_store = audio_processing_template_store self._streams: Dict[str, ValueStream] = {} # vs_id → stream self._ref_counts: Dict[str, int] = {} # vs_id → ref count @@ -1473,6 +1497,25 @@ class ValueStreamManager: stream.update_source(source) logger.debug(f"Updated value stream {vs_id}") + def refresh_audio_filter_pipelines(self, template_id: str) -> None: + """Rebuild audio filter pipelines for any running AudioValueStream + that references the given audio processing template ID. + + Called when an audio processing template is updated or deleted. + """ + count = 0 + for stream in self._streams.values(): + if isinstance(stream, AudioValueStream): + if template_id in getattr(stream, "_audio_processing_template_ids", []): + stream._rebuild_filter_pipeline() + count += 1 + if count: + logger.info( + "Refreshed audio filter pipeline for %d value stream(s) after template %s update", + count, + template_id, + ) + def release_all(self) -> None: """Stop and remove all managed streams. Called on shutdown.""" for vs_id, stream in self._streams.items(): @@ -1525,6 +1568,7 @@ class ValueStreamManager: audio_capture_manager=self._audio_capture_manager, audio_source_store=self._audio_source_store, audio_template_store=self._audio_template_store, + audio_processing_template_store=self._audio_processing_template_store, ) if isinstance(source, DaylightValueSource): diff --git a/server/src/wled_controller/main.py b/server/src/wled_controller/main.py index 2310fe4..6ac0196 100644 --- a/server/src/wled_controller/main.py +++ b/server/src/wled_controller/main.py @@ -128,6 +128,7 @@ processor_manager = ProcessorManager( asset_store=asset_store, ha_manager=ha_manager, game_event_bus=game_event_bus, + audio_processing_template_store=audio_processing_template_store, ) )