feat(processed-audio-sources): phase 4 - runtime filter integration

Add AudioFilterPipeline for chained filter execution on AudioAnalysis.
Wire filter pipelines into AudioColorStripStream, AudioValueStream,
and WebSocket test endpoint. Add hot-update support via
ProcessorManager.refresh_audio_filter_pipelines(). Thread
AudioProcessingTemplateStore through dependency injection hierarchy.
This commit is contained in:
2026-03-31 19:15:29 +03:00
parent 353c090b42
commit ab43578049
12 changed files with 309 additions and 38 deletions
+2 -2
View File
@@ -9,7 +9,7 @@
- **Test:** `cd server && py -3.13 -m pytest tests/ --no-cov -q` - **Test:** `cd server && py -3.13 -m pytest tests/ --no-cov -q`
## Current State ## Current State
Phase 1 (Audio Filter Framework), Phase 2 (Audio Filters), and Phase 3 (Processed Audio Source Model) implemented. Phases 1-4 implemented. Phase 4 (Runtime Integration) wired the audio filter pipeline into the stream runtime.
Phase 1 framework: Phase 1 framework:
- `AudioFilter` base class, `AudioFilterRegistry`, `AudioFilterOptionDef` in `core/audio/filters/` - `AudioFilter` base class, `AudioFilterRegistry`, `AudioFilterOptionDef` in `core/audio/filters/`
@@ -93,7 +93,7 @@ _(none yet)_
| Phase 1 | impl-agent | — | No | Tasks 7+8 skipped (SQLite migration made them obsolete) | | Phase 1 | impl-agent | — | No | Tasks 7+8 skipped (SQLite migration made them obsolete) |
| Phase 2 | impl-agent | — | No | All 11 filters implemented, no deviations | | Phase 2 | impl-agent | — | No | All 11 filters implemented, no deviations |
| Phase 3 | impl-agent | — | No | All 11 tasks done; channel/band logic deferred to Phase 4 | | Phase 3 | impl-agent | — | No | All 11 tasks done; channel/band logic deferred to Phase 4 |
| Phase 4 | | — | | | | Phase 4 | impl-agent | — | No | All 6 tasks done; dependency injection threaded through |
| Phase 5 | — | — | — | — | | Phase 5 | — | — | — | — |
| Phase 6 | — | — | — | — | | Phase 6 | — | — | — | — |
| Phase 7 | — | — | — | — | | Phase 7 | — | — | — | — |
+1 -1
View File
@@ -42,7 +42,7 @@ Clean-slate approach: no data migration for old source types.
| Phase 1: Audio Filter Framework | backend | 🔨 In Progress | ⬜ | ⬜ | ⬜ | | Phase 1: Audio Filter Framework | backend | 🔨 In Progress | ⬜ | ⬜ | ⬜ |
| Phase 2: Audio Filters | backend | 🔨 In Progress | ⬜ | ⬜ | ⬜ | | Phase 2: Audio Filters | backend | 🔨 In Progress | ⬜ | ⬜ | ⬜ |
| Phase 3: Processed Audio Source Model | backend | ✅ Done | ⬜ | ⬜ | ⬜ | | Phase 3: Processed Audio Source Model | backend | ✅ Done | ⬜ | ⬜ | ⬜ |
| Phase 4: Runtime Integration | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | | Phase 4: Runtime Integration | backend | ✅ Done | ⬜ | ⬜ | ⬜ |
| Phase 5: Frontend — Audio Processing Templates | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | | Phase 5: Frontend — Audio Processing Templates | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
| Phase 6: Frontend — Source Types | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | | Phase 6: Frontend — Source Types | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
| Phase 7: Testing & Polish | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | | Phase 7: Testing & Polish | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
@@ -1,6 +1,6 @@
# Phase 4: Runtime Integration # Phase 4: Runtime Integration
**Status:** ⬜ Not Started **Status:** ✅ Done
**Parent plan:** [PLAN.md](./PLAN.md) **Parent plan:** [PLAN.md](./PLAN.md)
**Domain:** backend **Domain:** backend
@@ -9,7 +9,7 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P
## Tasks ## Tasks
- [ ] Task 1: Create filter pipeline executor in `core/audio/filters/pipeline.py` - [x] Task 1: Create filter pipeline executor in `core/audio/filters/pipeline.py`
- `AudioFilterPipeline` class: - `AudioFilterPipeline` class:
- `__init__(filter_instances: List[FilterInstance], registry: AudioFilterRegistry)` - `__init__(filter_instances: List[FilterInstance], registry: AudioFilterRegistry)`
- Instantiates all filters from FilterInstance specs - Instantiates all filters from FilterInstance specs
@@ -17,7 +17,7 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P
- `reset()` — resets all stateful filters - `reset()` — resets all stateful filters
- `close()` — cleanup resources - `close()` — cleanup resources
- Handles stateful filter lifecycle (create on init, reset on demand, close on cleanup) - Handles stateful filter lifecycle (create on init, reset on demand, close on cleanup)
- [ ] Task 2: Update `AudioColorStripStream` in `core/processing/audio_stream.py` - [x] Task 2: Update `AudioColorStripStream` in `core/processing/audio_stream.py`
- On construction: if source is ProcessedAudioSource, resolve the full chain: - On construction: if source is ProcessedAudioSource, resolve the full chain:
- Walk to CaptureAudioSource for device info - Walk to CaptureAudioSource for device info
- Collect all AudioProcessingTemplates along the chain - Collect all AudioProcessingTemplates along the chain
@@ -26,18 +26,18 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P
- In render loop: after getting AudioAnalysis from ManagedAudioStream, run it through the filter pipeline before visualization - In render loop: after getting AudioAnalysis from ManagedAudioStream, run it through the filter pipeline before visualization
- Remove old inline channel selection and band filtering code (now handled by filters) - Remove old inline channel selection and band filtering code (now handled by filters)
- On stop: close the filter pipeline - On stop: close the filter pipeline
- [ ] Task 3: Update `AudioValueStream` in `core/processing/value_stream.py` - [x] Task 3: Update `AudioValueStream` in `core/processing/value_stream.py`
- Same pattern: resolve ProcessedAudioSource chain, create filter pipeline, apply in get_value() - Same pattern: resolve ProcessedAudioSource chain, create filter pipeline, apply in get_value()
- Remove old inline channel/band handling - Remove old inline channel/band handling
- [ ] Task 4: Hot-update support for filter templates - [x] Task 4: Hot-update support for filter templates
- When an AudioProcessingTemplate is updated, running streams that use it should re-resolve their filter pipeline - When an AudioProcessingTemplate is updated, running streams that use it should re-resolve their filter pipeline
- Listen for template update events (or implement a refresh mechanism) - Listen for template update events (or implement a refresh mechanism)
- Re-create AudioFilterPipeline with updated filter instances - Re-create AudioFilterPipeline with updated filter instances
- Reset stateful filter state on pipeline refresh - Reset stateful filter state on pipeline refresh
- [ ] Task 5: Update WebSocket test endpoint in `api/routes/audio_sources.py` - [x] Task 5: Update WebSocket test endpoint in `api/routes/audio_sources.py`
- For ProcessedAudioSource: resolve chain, create pipeline, apply filters to test stream data - For ProcessedAudioSource: resolve chain, create pipeline, apply filters to test stream data
- Return filtered analysis in real-time over WebSocket - Return filtered analysis in real-time over WebSocket
- [ ] Task 6: Update any code that calls `AudioSourceStore.resolve_audio_source()` to handle the new return shape - [x] Task 6: Update any code that calls `AudioSourceStore.resolve_audio_source()` to handle the new return shape
## Files to Modify/Create ## Files to Modify/Create
- `core/audio/filters/pipeline.py`**create** — AudioFilterPipeline - `core/audio/filters/pipeline.py`**create** — AudioFilterPipeline
@@ -61,11 +61,34 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P
- The filter pipeline should produce a new AudioAnalysis each time (immutability), not mutate the shared snapshot from ManagedAudioStream. - The filter pipeline should produce a new AudioAnalysis each time (immutability), not mutate the shared snapshot from ManagedAudioStream.
## Review Checklist ## Review Checklist
- [ ] All tasks completed - [x] All tasks completed
- [ ] Code follows project conventions - [x] Code follows project conventions
- [ ] No unintended side effects - [x] No unintended side effects
- [ ] Build passes - [ ] Build passes
- [ ] Tests pass (new + existing) - [ ] Tests pass (new + existing)
## Handoff to Next Phase ## Handoff to Next Phase
<!-- Filled in by the implementation agent after completing this phase. -->
### What was built
- `AudioFilterPipeline` class in `core/audio/filters/pipeline.py` — thread-safe pipeline that instantiates, processes, resets, and closes audio filters
- `build_pipeline_from_template_ids()` helper — resolves template IDs to FilterInstance lists and creates a pipeline
- `AudioColorStripStream` now builds and applies filter pipeline: `_rebuild_filter_pipeline()` called on construction and source update; `_apply_filters()` replaces `_pick_channel()` in render loop; pipeline closed on stop
- `AudioValueStream` now builds and applies filter pipeline: `_rebuild_filter_pipeline()` called on construction; filters applied in `_extract_raw()` before scalar extraction; pipeline closed on stop
- Hot-update: `ProcessorManager.refresh_audio_filter_pipelines(template_id)` dispatches to both CSS and value stream managers; called from audio processing template update/delete routes
- WebSocket test endpoint creates a filter pipeline from the resolved template chain and applies it to analysis before sending
- Dependency injection: `audio_processing_template_store` threaded through `ProcessorDependencies` -> `ProcessorManager` -> `ColorStripStreamManager` / `ValueStreamManager` -> `AudioColorStripStream` / `AudioValueStream`
### What Phase 5 needs to know
- The backend is fully wired: creating a ProcessedAudioSource that references templates with channel_extract, band_extract, gain, etc. filters will apply those filters to live audio data
- The WebSocket test endpoint shows filtered analysis in real-time
- Frontend needs to provide UI for creating/editing AudioProcessingTemplates and ProcessedAudioSources
- Filter pipeline is per-stream-instance (not shared) — stateful filters maintain independent state
### Temporary breakages resolved
- `_pick_channel()` removed from AudioColorStripStream — replaced by `_apply_filters()` which uses the filter pipeline
- Channel/band handling in AudioValueStream now goes through filter pipeline
- All "Phase 4 will wire..." comments resolved
### Known deviations from plan
- `AudioFilterPipeline.__init__` takes `List[FilterInstance]` only (not `AudioFilterRegistry` — uses the class-level registry directly via `AudioFilterRegistry.create_instance()`)
- Hot-update uses explicit method calls from the route handler rather than an event-listener pattern — simpler and avoids the need for a subscription system
@@ -6,6 +6,7 @@ from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import ( from wled_controller.api.dependencies import (
fire_entity_event, fire_entity_event,
get_audio_processing_template_store, get_audio_processing_template_store,
get_processor_manager,
) )
from wled_controller.api.schemas.audio_processing import ( from wled_controller.api.schemas.audio_processing import (
AudioProcessingTemplateCreate, AudioProcessingTemplateCreate,
@@ -129,6 +130,12 @@ async def update_audio_processing_template(
tags=data.tags, tags=data.tags,
) )
fire_entity_event("audio_processing_template", "updated", template_id) fire_entity_event("audio_processing_template", "updated", template_id)
# Hot-update: rebuild filter pipelines for running streams using this template
try:
pm = get_processor_manager()
pm.refresh_audio_filter_pipelines(template_id)
except Exception:
pass # Non-critical: streams will pick up changes on next restart
return _apt_to_response(template) return _apt_to_response(template)
except EntityNotFoundError as e: except EntityNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e)) raise HTTPException(status_code=404, detail=str(e))
@@ -154,6 +161,12 @@ async def delete_audio_processing_template(
# TODO: Phase 3 will add reference checks against ProcessedAudioSource # TODO: Phase 3 will add reference checks against ProcessedAudioSource
store.delete_template(template_id) store.delete_template(template_id)
fire_entity_event("audio_processing_template", "deleted", template_id) fire_entity_event("audio_processing_template", "deleted", template_id)
# Hot-update: rebuild filter pipelines for running streams that used this template
try:
pm = get_processor_manager()
pm.refresh_audio_filter_pipelines(template_id)
except Exception:
pass # Non-critical
except HTTPException: except HTTPException:
raise raise
except EntityNotFoundError as e: except EntityNotFoundError as e:
@@ -9,6 +9,7 @@ from starlette.websockets import WebSocket, WebSocketDisconnect
from wled_controller.api.auth import AuthRequired from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import ( from wled_controller.api.dependencies import (
fire_entity_event, fire_entity_event,
get_audio_processing_template_store,
get_audio_source_store, get_audio_source_store,
get_audio_template_store, get_audio_template_store,
get_color_strip_store, get_color_strip_store,
@@ -210,11 +211,12 @@ async def test_audio_source_ws(
ManagedAudioStream (ref-counted — shares with running targets), and streams ManagedAudioStream (ref-counted — shares with running targets), and streams
AudioAnalysis snapshots as JSON at ~20 Hz. AudioAnalysis snapshots as JSON at ~20 Hz.
NOTE: Audio processing filters from the template chain are NOT applied in Audio processing filters from the template chain are applied to the
this WebSocket yet — that will be wired in Phase 4 when the stream runtime analysis before sending, so the WebSocket output matches what running
integrates filter instances. streams see.
""" """
from wled_controller.api.auth import verify_ws_token from wled_controller.api.auth import verify_ws_token
from wled_controller.core.audio.filters.pipeline import build_pipeline_from_template_ids
if not verify_ws_token(token): if not verify_ws_token(token):
await websocket.close(code=4001, reason="Unauthorized") await websocket.close(code=4001, reason="Unauthorized")
@@ -223,6 +225,7 @@ async def test_audio_source_ws(
# Resolve source → device info + processing template chain # Resolve source → device info + processing template chain
store = get_audio_source_store() store = get_audio_source_store()
template_store = get_audio_template_store() template_store = get_audio_template_store()
apt_store = get_audio_processing_template_store()
manager = get_processor_manager() manager = get_processor_manager()
try: try:
@@ -247,6 +250,15 @@ async def test_audio_source_ws(
logger.debug("Audio template not found, falling back to best available engine: %s", e) logger.debug("Audio template not found, falling back to best available engine: %s", e)
pass # Fall back to best available engine pass # Fall back to best available engine
# Build filter pipeline from processing template chain
pipeline = None
if resolved.audio_processing_template_ids and apt_store:
pipeline = build_pipeline_from_template_ids(
resolved.audio_processing_template_ids, apt_store
)
if pipeline.empty:
pipeline = None
# Acquire shared audio stream # Acquire shared audio stream
audio_mgr = manager.audio_capture_manager audio_mgr = manager.audio_capture_manager
try: try:
@@ -265,7 +277,10 @@ async def test_audio_source_ws(
if analysis is not None and analysis.timestamp != last_ts: if analysis is not None and analysis.timestamp != last_ts:
last_ts = analysis.timestamp last_ts = analysis.timestamp
# Send raw analysis — filter processing will be added in Phase 4 # Apply filter pipeline (channel extract, band extract, gain, etc.)
if pipeline is not None:
analysis = pipeline.process(analysis)
await websocket.send_json( await websocket.send_json(
{ {
"spectrum": analysis.spectrum.tolist(), "spectrum": analysis.spectrum.tolist(),
@@ -283,5 +298,7 @@ async def test_audio_source_ws(
except Exception as e: except Exception as e:
logger.error(f"Audio test WebSocket error for {source_id}: {e}") logger.error(f"Audio test WebSocket error for {source_id}: {e}")
finally: finally:
if pipeline is not None:
pipeline.close()
audio_mgr.release(device_index, is_loopback, engine_type) audio_mgr.release(device_index, is_loopback, engine_type)
logger.info(f"Audio test WebSocket disconnected for source {source_id}") logger.info(f"Audio test WebSocket disconnected for source {source_id}")
@@ -5,6 +5,7 @@ Import this package to ensure all built-in filters are registered.
""" """
from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef
from wled_controller.core.audio.filters.pipeline import AudioFilterPipeline
from wled_controller.core.audio.filters.registry import AudioFilterRegistry from wled_controller.core.audio.filters.registry import AudioFilterRegistry
# Import individual filters to trigger auto-registration # Import individual filters to trigger auto-registration
@@ -24,5 +25,6 @@ import wled_controller.core.audio.filters.delay # noqa: F401
__all__ = [ __all__ = [
"AudioFilter", "AudioFilter",
"AudioFilterOptionDef", "AudioFilterOptionDef",
"AudioFilterPipeline",
"AudioFilterRegistry", "AudioFilterRegistry",
] ]
@@ -0,0 +1,113 @@
"""Audio filter pipeline — chains multiple AudioFilter instances sequentially.
The pipeline is the runtime executor for audio processing template chains.
It takes a flat list of FilterInstance specs, instantiates each filter via
the AudioFilterRegistry, and runs AudioAnalysis through them in order.
Thread-safe: each stream gets its own pipeline instance with independent
stateful filter state.
"""
from __future__ import annotations
import threading
from typing import TYPE_CHECKING, List
from wled_controller.core.audio.analysis import AudioAnalysis
from wled_controller.core.audio.filters.base import AudioFilter
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
from wled_controller.utils import get_logger
if TYPE_CHECKING:
from wled_controller.core.filters.filter_instance import FilterInstance
from wled_controller.storage.audio_processing_template_store import (
AudioProcessingTemplateStore,
)
logger = get_logger(__name__)
class AudioFilterPipeline:
"""Chains multiple AudioFilter instances and runs AudioAnalysis through them.
Each pipeline owns its own filter instances — stateful filters maintain
per-pipeline state, so pipelines must NOT be shared across streams.
"""
def __init__(self, filter_instances: List["FilterInstance"]) -> None:
"""Create a pipeline from FilterInstance specs.
Args:
filter_instances: Flat (already-resolved) list of FilterInstance specs.
Each is instantiated via AudioFilterRegistry.create_instance().
"""
self._lock = threading.Lock()
self._filters: List[AudioFilter] = []
for fi in filter_instances:
try:
f = AudioFilterRegistry.create_instance(fi.filter_id, fi.options)
self._filters.append(f)
except ValueError:
logger.warning("Skipping unknown audio filter '%s'", fi.filter_id)
@property
def empty(self) -> bool:
"""True when the pipeline has no filters (passthrough)."""
return len(self._filters) == 0
def process(self, analysis: AudioAnalysis) -> AudioAnalysis:
"""Run analysis through all filters in order.
Returns a new AudioAnalysis (filters produce new objects via
dataclasses.replace, never mutating the input).
"""
result = analysis
with self._lock:
for f in self._filters:
result = f.process(result)
return result
def reset(self) -> None:
"""Reset all stateful filters to their initial state."""
with self._lock:
for f in self._filters:
if f.is_stateful:
f.reset()
def close(self) -> None:
"""Release resources and clear filter list."""
with self._lock:
self._filters.clear()
def __repr__(self) -> str:
return f"AudioFilterPipeline(filters={self._filters})"
def build_pipeline_from_template_ids(
template_ids: List[str],
template_store: "AudioProcessingTemplateStore",
) -> AudioFilterPipeline:
"""Resolve a list of audio processing template IDs into a single pipeline.
Expands each template's filters (including nested audio_filter_template
references) via the store's resolve_filter_instances(), concatenates all
resolved FilterInstance lists in order, and returns a ready-to-use pipeline.
Args:
template_ids: Ordered template IDs (outermost first, as returned by
ResolvedAudioSource.audio_processing_template_ids).
template_store: AudioProcessingTemplateStore for template lookup and
recursive filter resolution.
Returns:
AudioFilterPipeline (may be empty if no templates / all templates missing).
"""
all_instances: List[FilterInstance] = []
for tid in template_ids:
try:
template = template_store.get_template(tid)
resolved = template_store.resolve_filter_instances(template.filters)
all_instances.extend(resolved)
except ValueError:
logger.warning("Audio processing template '%s' not found, skipping", tid)
return AudioFilterPipeline(all_instances)
@@ -17,6 +17,7 @@ import numpy as np
from wled_controller.core.audio.analysis import NUM_BANDS from wled_controller.core.audio.analysis import NUM_BANDS
from wled_controller.core.audio.audio_capture import AudioCaptureManager from wled_controller.core.audio.audio_capture import AudioCaptureManager
from wled_controller.core.audio.filters.pipeline import build_pipeline_from_template_ids
from wled_controller.core.processing.color_strip_stream import ColorStripStream from wled_controller.core.processing.color_strip_stream import ColorStripStream
from wled_controller.core.processing.effect_stream import _build_palette_lut from wled_controller.core.processing.effect_stream import _build_palette_lut
from wled_controller.utils import get_logger from wled_controller.utils import get_logger
@@ -42,11 +43,14 @@ class AudioColorStripStream(ColorStripStream):
audio_capture_manager: AudioCaptureManager, audio_capture_manager: AudioCaptureManager,
audio_source_store=None, audio_source_store=None,
audio_template_store=None, audio_template_store=None,
audio_processing_template_store=None,
): ):
self._audio_capture_manager = audio_capture_manager self._audio_capture_manager = audio_capture_manager
self._audio_source_store = audio_source_store self._audio_source_store = audio_source_store
self._audio_template_store = audio_template_store self._audio_template_store = audio_template_store
self._audio_processing_template_store = audio_processing_template_store
self._audio_stream = None # acquired on start self._audio_stream = None # acquired on start
self._filter_pipeline = None # AudioFilterPipeline, created on start
self._colors_lock = threading.Lock() self._colors_lock = threading.Lock()
self._running = False self._running = False
@@ -104,9 +108,6 @@ class AudioColorStripStream(ColorStripStream):
self._mirror = bool(getattr(source, "mirror", False)) self._mirror = bool(getattr(source, "mirror", False))
# Resolve audio device/template via audio_source_id # Resolve audio device/template via audio_source_id
# NOTE: channel selection and band filtering are now handled by audio
# processing filters (channel_extract, band_extract) applied via
# ProcessedAudioSource chains. Filter application will be wired in Phase 4.
audio_source_id = getattr(source, "audio_source_id", "") audio_source_id = getattr(source, "audio_source_id", "")
self._audio_source_id = audio_source_id self._audio_source_id = audio_source_id
self._audio_engine_type = None self._audio_engine_type = None
@@ -138,9 +139,25 @@ class AudioColorStripStream(ColorStripStream):
self._audio_device_index = -1 self._audio_device_index = -1
self._audio_loopback = True self._audio_loopback = True
# Build audio filter pipeline from processing template chain
self._rebuild_filter_pipeline()
with self._colors_lock: with self._colors_lock:
self._colors: Optional[np.ndarray] = None self._colors: Optional[np.ndarray] = None
def _rebuild_filter_pipeline(self) -> None:
"""Build (or rebuild) the audio filter pipeline from processing template IDs."""
old_pipeline = self._filter_pipeline
if self._audio_processing_template_ids and self._audio_processing_template_store:
self._filter_pipeline = build_pipeline_from_template_ids(
self._audio_processing_template_ids,
self._audio_processing_template_store,
)
else:
self._filter_pipeline = None
if old_pipeline is not None:
old_pipeline.close()
# ── ColorStripStream interface ────────────────────────────────── # ── ColorStripStream interface ──────────────────────────────────
def configure(self, device_led_count: int) -> None: def configure(self, device_led_count: int) -> None:
@@ -200,6 +217,10 @@ class AudioColorStripStream(ColorStripStream):
engine_type=self._audio_engine_type, engine_type=self._audio_engine_type,
) )
self._audio_stream = None self._audio_stream = None
# Close audio filter pipeline
if self._filter_pipeline is not None:
self._filter_pipeline.close()
self._filter_pipeline = None
self._prev_spectrum = None self._prev_spectrum = None
logger.info("AudioColorStripStream stopped") logger.info("AudioColorStripStream stopped")
@@ -338,16 +359,17 @@ class AudioColorStripStream(ColorStripStream):
finally: finally:
self._running = False self._running = False
# ── Channel selection ───────────────────────────────────────── # ── Filter pipeline + channel selection ──────────────────────────
def _pick_channel(self, analysis): def _apply_filters(self, analysis):
"""Return (spectrum, rms) from the analysis. """Apply audio filter pipeline (if any) and return (spectrum, rms).
Channel selection and band filtering are now handled by audio processing The filter pipeline handles channel extraction, band extraction,
filters applied via ProcessedAudioSource chains (Phase 4 will wire filter gain, noise gate, etc. as configured by the ProcessedAudioSource
instances into the stream runtime). template chain.
""" """
# ⚠️ Temporary breakage: channel/band filtering removed — resolved in Phase 4 if self._filter_pipeline is not None:
analysis = self._filter_pipeline.process(analysis)
return analysis.spectrum, analysis.rms return analysis.spectrum, analysis.rms
# ── Spectrum Analyzer ────────────────────────────────────────── # ── Spectrum Analyzer ──────────────────────────────────────────
@@ -357,7 +379,7 @@ class AudioColorStripStream(ColorStripStream):
buf[:] = 0 buf[:] = 0
return return
spectrum, _ = self._pick_channel(analysis) spectrum, _ = self._apply_filters(analysis)
sensitivity = self.resolve("sensitivity", self._sensitivity) sensitivity = self.resolve("sensitivity", self._sensitivity)
smoothing = self.resolve("smoothing", self._smoothing) smoothing = self.resolve("smoothing", self._smoothing)
lut = self._palette_lut lut = self._palette_lut
@@ -408,7 +430,7 @@ class AudioColorStripStream(ColorStripStream):
buf[:] = 0 buf[:] = 0
return return
_, ch_rms = self._pick_channel(analysis) _, ch_rms = self._apply_filters(analysis)
sensitivity = self.resolve("sensitivity", self._sensitivity) sensitivity = self.resolve("sensitivity", self._sensitivity)
smoothing = self.resolve("smoothing", self._smoothing) smoothing = self.resolve("smoothing", self._smoothing)
rms = ch_rms * sensitivity rms = ch_rms * sensitivity
@@ -89,6 +89,7 @@ class ColorStripStreamManager:
weather_manager=None, weather_manager=None,
asset_store=None, asset_store=None,
game_event_bus=None, game_event_bus=None,
audio_processing_template_store=None,
): ):
""" """
Args: Args:
@@ -101,6 +102,7 @@ class ColorStripStreamManager:
cspt_store: ColorStripProcessingTemplateStore for per-layer filter chains cspt_store: ColorStripProcessingTemplateStore for per-layer filter chains
gradient_store: GradientStore for resolving gradient entity references gradient_store: GradientStore for resolving gradient entity references
game_event_bus: GameEventBus for game event stream subscriptions game_event_bus: GameEventBus for game event stream subscriptions
audio_processing_template_store: AudioProcessingTemplateStore for filter chains
""" """
self._color_strip_store = color_strip_store self._color_strip_store = color_strip_store
self._live_stream_manager = live_stream_manager self._live_stream_manager = live_stream_manager
@@ -114,6 +116,7 @@ class ColorStripStreamManager:
self._weather_manager = weather_manager self._weather_manager = weather_manager
self._asset_store = asset_store self._asset_store = asset_store
self._game_event_bus = game_event_bus self._game_event_bus = game_event_bus
self._audio_processing_template_store = audio_processing_template_store
self._streams: Dict[str, _ColorStripEntry] = {} self._streams: Dict[str, _ColorStripEntry] = {}
def _inject_clock(self, css_stream, source) -> Optional[str]: def _inject_clock(self, css_stream, source) -> Optional[str]:
@@ -246,6 +249,7 @@ class ColorStripStreamManager:
self._audio_capture_manager, self._audio_capture_manager,
self._audio_source_store, self._audio_source_store,
self._audio_template_store, self._audio_template_store,
self._audio_processing_template_store,
) )
elif source.source_type == "composite": elif source.source_type == "composite":
from wled_controller.core.processing.composite_stream import ( from wled_controller.core.processing.composite_stream import (
@@ -498,6 +502,28 @@ class ColorStripStreamManager:
logger.info(f"Updated {len(matching_keys)} running stream(s) for source {css_id}") logger.info(f"Updated {len(matching_keys)} running stream(s) for source {css_id}")
def refresh_audio_filter_pipelines(self, template_id: str) -> None:
"""Rebuild audio filter pipelines for any running AudioColorStripStream
that references the given audio processing template ID.
Called when an audio processing template is updated or deleted.
"""
from wled_controller.core.processing.audio_stream import AudioColorStripStream
count = 0
for entry in self._streams.values():
stream = entry.stream
if isinstance(stream, AudioColorStripStream):
if template_id in getattr(stream, "_audio_processing_template_ids", []):
stream._rebuild_filter_pipeline()
count += 1
if count:
logger.info(
"Refreshed audio filter pipeline for %d stream(s) after template %s update",
count,
template_id,
)
def notify_target_fps(self, css_id: str, target_id: str, fps: int) -> None: def notify_target_fps(self, css_id: str, target_id: str, fps: int) -> None:
"""Register or update a consumer's target FPS. """Register or update a consumer's target FPS.
@@ -73,6 +73,7 @@ class ProcessorDependencies:
asset_store: Optional[AssetStore] = None asset_store: Optional[AssetStore] = None
ha_manager: Optional[Any] = None # HomeAssistantManager ha_manager: Optional[Any] = None # HomeAssistantManager
game_event_bus: Optional[Any] = None # GameEventBus game_event_bus: Optional[Any] = None # GameEventBus
audio_processing_template_store: Optional[Any] = None # AudioProcessingTemplateStore
@dataclass @dataclass
@@ -153,6 +154,7 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
weather_manager=deps.weather_manager, weather_manager=deps.weather_manager,
asset_store=deps.asset_store, asset_store=deps.asset_store,
game_event_bus=deps.game_event_bus, game_event_bus=deps.game_event_bus,
audio_processing_template_store=deps.audio_processing_template_store,
) )
self._value_stream_manager = ( self._value_stream_manager = (
ValueStreamManager( ValueStreamManager(
@@ -165,6 +167,7 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
css_stream_manager=self._color_strip_stream_manager, css_stream_manager=self._color_strip_stream_manager,
gradient_store=deps.gradient_store, gradient_store=deps.gradient_store,
event_bus=deps.game_event_bus, event_bus=deps.game_event_bus,
audio_processing_template_store=deps.audio_processing_template_store,
) )
if deps.value_source_store if deps.value_source_store
else None else None
@@ -194,6 +197,13 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
def color_strip_stream_manager(self) -> ColorStripStreamManager: def color_strip_stream_manager(self) -> ColorStripStreamManager:
return self._color_strip_stream_manager return self._color_strip_stream_manager
def refresh_audio_filter_pipelines(self, template_id: str) -> None:
"""Rebuild audio filter pipelines across all running streams when a
template is updated or deleted. Dispatches to both CSS and value stream managers."""
self._color_strip_stream_manager.refresh_audio_filter_pipelines(template_id)
if self._value_stream_manager:
self._value_stream_manager.refresh_audio_filter_pipelines(template_id)
# ===== SHARED CONTEXT (passed to target processors) ===== # ===== SHARED CONTEXT (passed to target processors) =====
def _build_context(self) -> TargetContext: def _build_context(self) -> TargetContext:
@@ -178,6 +178,7 @@ class AudioValueStream(ValueStream):
audio_capture_manager: Optional["AudioCaptureManager"] = None, audio_capture_manager: Optional["AudioCaptureManager"] = None,
audio_source_store: Optional["AudioSourceStore"] = None, audio_source_store: Optional["AudioSourceStore"] = None,
audio_template_store=None, audio_template_store=None,
audio_processing_template_store=None,
): ):
self._audio_source_id = audio_source_id self._audio_source_id = audio_source_id
self._mode = mode self._mode = mode
@@ -191,6 +192,7 @@ class AudioValueStream(ValueStream):
self._audio_capture_manager = audio_capture_manager self._audio_capture_manager = audio_capture_manager
self._audio_source_store = audio_source_store self._audio_source_store = audio_source_store
self._audio_template_store = audio_template_store self._audio_template_store = audio_template_store
self._audio_processing_template_store = audio_processing_template_store
# Resolved audio device params # Resolved audio device params
self._audio_device_index = -1 self._audio_device_index = -1
@@ -200,6 +202,7 @@ class AudioValueStream(ValueStream):
self._audio_processing_template_ids: list = [] self._audio_processing_template_ids: list = []
self._audio_stream = None self._audio_stream = None
self._filter_pipeline = None # AudioFilterPipeline
self._prev_value = 0.0 self._prev_value = 0.0
self._beat_brightness = 0.0 self._beat_brightness = 0.0
@@ -208,9 +211,7 @@ class AudioValueStream(ValueStream):
def _resolve_audio_source(self) -> None: def _resolve_audio_source(self) -> None:
"""Resolve audio source to device index / engine info / processing template IDs. """Resolve audio source to device index / engine info / processing template IDs.
Channel selection and band filtering are now handled by audio processing Builds the audio filter pipeline from the processing template chain.
filters applied via ProcessedAudioSource chains (Phase 4 will wire filter
instances into the stream runtime).
""" """
if self._audio_source_id and self._audio_source_store: if self._audio_source_id and self._audio_source_store:
try: try:
@@ -232,6 +233,22 @@ class AudioValueStream(ValueStream):
pass pass
except ValueError as e: except ValueError as e:
logger.warning(f"Failed to resolve audio source {self._audio_source_id}: {e}") logger.warning(f"Failed to resolve audio source {self._audio_source_id}: {e}")
self._rebuild_filter_pipeline()
def _rebuild_filter_pipeline(self) -> None:
"""Build (or rebuild) the audio filter pipeline from processing template IDs."""
from wled_controller.core.audio.filters.pipeline import build_pipeline_from_template_ids
old_pipeline = self._filter_pipeline
if self._audio_processing_template_ids and self._audio_processing_template_store:
self._filter_pipeline = build_pipeline_from_template_ids(
self._audio_processing_template_ids,
self._audio_processing_template_store,
)
else:
self._filter_pipeline = None
if old_pipeline is not None:
old_pipeline.close()
def start(self) -> None: def start(self) -> None:
if self._audio_capture_manager is None: if self._audio_capture_manager is None:
@@ -255,6 +272,9 @@ class AudioValueStream(ValueStream):
engine_type=self._audio_engine_type, engine_type=self._audio_engine_type,
) )
self._audio_stream = None self._audio_stream = None
if self._filter_pipeline is not None:
self._filter_pipeline.close()
self._filter_pipeline = None
self._prev_value = 0.0 self._prev_value = 0.0
self._beat_brightness = 0.0 self._beat_brightness = 0.0
@@ -285,7 +305,13 @@ class AudioValueStream(ValueStream):
return max(0.0, min(1.0, mapped)) return max(0.0, min(1.0, mapped))
def _extract_raw(self, analysis) -> float: def _extract_raw(self, analysis) -> float:
"""Extract raw scalar from audio analysis based on mode.""" """Extract raw scalar from audio analysis based on mode.
Applies the audio filter pipeline (if any) before extracting the scalar.
Channel extraction, band filtering, gain, etc. are handled by filters.
"""
if self._filter_pipeline is not None:
analysis = self._filter_pipeline.process(analysis)
if self._mode == "peak": if self._mode == "peak":
return self._pick_peak(analysis) return self._pick_peak(analysis)
if self._mode == "beat": if self._mode == "beat":
@@ -294,13 +320,9 @@ class AudioValueStream(ValueStream):
return self._pick_rms(analysis) return self._pick_rms(analysis)
def _pick_rms(self, analysis) -> float: def _pick_rms(self, analysis) -> float:
# ⚠️ Temporary breakage: channel selection removed — resolved in Phase 4
# Channel selection is now handled by audio processing filters
# (channel_extract) applied via ProcessedAudioSource chains.
return getattr(analysis, "rms", 0.0) return getattr(analysis, "rms", 0.0)
def _pick_peak(self, analysis) -> float: def _pick_peak(self, analysis) -> float:
# ⚠️ Temporary breakage: channel selection removed — resolved in Phase 4
return getattr(analysis, "peak", 0.0) return getattr(analysis, "peak", 0.0)
def _compute_beat(self, analysis) -> float: def _compute_beat(self, analysis) -> float:
@@ -1411,6 +1433,7 @@ class ValueStreamManager:
css_stream_manager: Optional["ColorStripStreamManager"] = None, css_stream_manager: Optional["ColorStripStreamManager"] = None,
gradient_store: Optional[Any] = None, gradient_store: Optional[Any] = None,
event_bus: Optional["GameEventBus"] = None, event_bus: Optional["GameEventBus"] = None,
audio_processing_template_store=None,
): ):
self._value_source_store = value_source_store self._value_source_store = value_source_store
self._audio_capture_manager = audio_capture_manager self._audio_capture_manager = audio_capture_manager
@@ -1421,6 +1444,7 @@ class ValueStreamManager:
self._css_stream_manager = css_stream_manager self._css_stream_manager = css_stream_manager
self._gradient_store = gradient_store self._gradient_store = gradient_store
self._event_bus = event_bus self._event_bus = event_bus
self._audio_processing_template_store = audio_processing_template_store
self._streams: Dict[str, ValueStream] = {} # vs_id → stream self._streams: Dict[str, ValueStream] = {} # vs_id → stream
self._ref_counts: Dict[str, int] = {} # vs_id → ref count self._ref_counts: Dict[str, int] = {} # vs_id → ref count
@@ -1473,6 +1497,25 @@ class ValueStreamManager:
stream.update_source(source) stream.update_source(source)
logger.debug(f"Updated value stream {vs_id}") logger.debug(f"Updated value stream {vs_id}")
def refresh_audio_filter_pipelines(self, template_id: str) -> None:
"""Rebuild audio filter pipelines for any running AudioValueStream
that references the given audio processing template ID.
Called when an audio processing template is updated or deleted.
"""
count = 0
for stream in self._streams.values():
if isinstance(stream, AudioValueStream):
if template_id in getattr(stream, "_audio_processing_template_ids", []):
stream._rebuild_filter_pipeline()
count += 1
if count:
logger.info(
"Refreshed audio filter pipeline for %d value stream(s) after template %s update",
count,
template_id,
)
def release_all(self) -> None: def release_all(self) -> None:
"""Stop and remove all managed streams. Called on shutdown.""" """Stop and remove all managed streams. Called on shutdown."""
for vs_id, stream in self._streams.items(): for vs_id, stream in self._streams.items():
@@ -1525,6 +1568,7 @@ class ValueStreamManager:
audio_capture_manager=self._audio_capture_manager, audio_capture_manager=self._audio_capture_manager,
audio_source_store=self._audio_source_store, audio_source_store=self._audio_source_store,
audio_template_store=self._audio_template_store, audio_template_store=self._audio_template_store,
audio_processing_template_store=self._audio_processing_template_store,
) )
if isinstance(source, DaylightValueSource): if isinstance(source, DaylightValueSource):
+1
View File
@@ -128,6 +128,7 @@ processor_manager = ProcessorManager(
asset_store=asset_store, asset_store=asset_store,
ha_manager=ha_manager, ha_manager=ha_manager,
game_event_bus=game_event_bus, game_event_bus=game_event_bus,
audio_processing_template_store=audio_processing_template_store,
) )
) )