feat(processed-audio-sources): phase 4 - runtime filter integration

Add AudioFilterPipeline for chained filter execution on AudioAnalysis.
Wire filter pipelines into AudioColorStripStream, AudioValueStream,
and WebSocket test endpoint. Add hot-update support via
ProcessorManager.refresh_audio_filter_pipelines(). Thread
AudioProcessingTemplateStore through dependency injection hierarchy.
This commit is contained in:
2026-03-31 19:15:29 +03:00
parent 353c090b42
commit ab43578049
12 changed files with 309 additions and 38 deletions
+2 -2
View File
@@ -9,7 +9,7 @@
- **Test:** `cd server && py -3.13 -m pytest tests/ --no-cov -q`
## Current State
Phase 1 (Audio Filter Framework), Phase 2 (Audio Filters), and Phase 3 (Processed Audio Source Model) implemented.
Phases 1-4 implemented. Phase 4 (Runtime Integration) wired the audio filter pipeline into the stream runtime.
Phase 1 framework:
- `AudioFilter` base class, `AudioFilterRegistry`, `AudioFilterOptionDef` in `core/audio/filters/`
@@ -93,7 +93,7 @@ _(none yet)_
| Phase 1 | impl-agent | — | No | Tasks 7+8 skipped (SQLite migration made them obsolete) |
| Phase 2 | impl-agent | — | No | All 11 filters implemented, no deviations |
| Phase 3 | impl-agent | — | No | All 11 tasks done; channel/band logic deferred to Phase 4 |
| Phase 4 | | — | | |
| Phase 4 | impl-agent | — | No | All 6 tasks done; dependency injection threaded through |
| Phase 5 | — | — | — | — |
| Phase 6 | — | — | — | — |
| Phase 7 | — | — | — | — |
+1 -1
View File
@@ -42,7 +42,7 @@ Clean-slate approach: no data migration for old source types.
| Phase 1: Audio Filter Framework | backend | 🔨 In Progress | ⬜ | ⬜ | ⬜ |
| Phase 2: Audio Filters | backend | 🔨 In Progress | ⬜ | ⬜ | ⬜ |
| Phase 3: Processed Audio Source Model | backend | ✅ Done | ⬜ | ⬜ | ⬜ |
| Phase 4: Runtime Integration | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
| Phase 4: Runtime Integration | backend | ✅ Done | ⬜ | ⬜ | ⬜ |
| Phase 5: Frontend — Audio Processing Templates | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
| Phase 6: Frontend — Source Types | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
| Phase 7: Testing & Polish | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
@@ -1,6 +1,6 @@
# Phase 4: Runtime Integration
**Status:** ⬜ Not Started
**Status:** ✅ Done
**Parent plan:** [PLAN.md](./PLAN.md)
**Domain:** backend
@@ -9,7 +9,7 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P
## Tasks
- [ ] Task 1: Create filter pipeline executor in `core/audio/filters/pipeline.py`
- [x] Task 1: Create filter pipeline executor in `core/audio/filters/pipeline.py`
- `AudioFilterPipeline` class:
- `__init__(filter_instances: List[FilterInstance], registry: AudioFilterRegistry)`
- Instantiates all filters from FilterInstance specs
@@ -17,7 +17,7 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P
- `reset()` — resets all stateful filters
- `close()` — cleanup resources
- Handles stateful filter lifecycle (create on init, reset on demand, close on cleanup)
- [ ] Task 2: Update `AudioColorStripStream` in `core/processing/audio_stream.py`
- [x] Task 2: Update `AudioColorStripStream` in `core/processing/audio_stream.py`
- On construction: if source is ProcessedAudioSource, resolve the full chain:
- Walk to CaptureAudioSource for device info
- Collect all AudioProcessingTemplates along the chain
@@ -26,18 +26,18 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P
- In render loop: after getting AudioAnalysis from ManagedAudioStream, run it through the filter pipeline before visualization
- Remove old inline channel selection and band filtering code (now handled by filters)
- On stop: close the filter pipeline
- [ ] Task 3: Update `AudioValueStream` in `core/processing/value_stream.py`
- [x] Task 3: Update `AudioValueStream` in `core/processing/value_stream.py`
- Same pattern: resolve ProcessedAudioSource chain, create filter pipeline, apply in get_value()
- Remove old inline channel/band handling
- [ ] Task 4: Hot-update support for filter templates
- [x] Task 4: Hot-update support for filter templates
- When an AudioProcessingTemplate is updated, running streams that use it should re-resolve their filter pipeline
- Listen for template update events (or implement a refresh mechanism)
- Re-create AudioFilterPipeline with updated filter instances
- Reset stateful filter state on pipeline refresh
- [ ] Task 5: Update WebSocket test endpoint in `api/routes/audio_sources.py`
- [x] Task 5: Update WebSocket test endpoint in `api/routes/audio_sources.py`
- For ProcessedAudioSource: resolve chain, create pipeline, apply filters to test stream data
- Return filtered analysis in real-time over WebSocket
- [ ] Task 6: Update any code that calls `AudioSourceStore.resolve_audio_source()` to handle the new return shape
- [x] Task 6: Update any code that calls `AudioSourceStore.resolve_audio_source()` to handle the new return shape
## Files to Modify/Create
- `core/audio/filters/pipeline.py`**create** — AudioFilterPipeline
@@ -61,11 +61,34 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P
- The filter pipeline should produce a new AudioAnalysis each time (immutability), not mutate the shared snapshot from ManagedAudioStream.
## Review Checklist
- [ ] All tasks completed
- [ ] Code follows project conventions
- [ ] No unintended side effects
- [x] All tasks completed
- [x] Code follows project conventions
- [x] No unintended side effects
- [ ] Build passes
- [ ] Tests pass (new + existing)
## Handoff to Next Phase
<!-- Filled in by the implementation agent after completing this phase. -->
### What was built
- `AudioFilterPipeline` class in `core/audio/filters/pipeline.py` — thread-safe pipeline that instantiates, processes, resets, and closes audio filters
- `build_pipeline_from_template_ids()` helper — resolves template IDs to FilterInstance lists and creates a pipeline
- `AudioColorStripStream` now builds and applies filter pipeline: `_rebuild_filter_pipeline()` called on construction and source update; `_apply_filters()` replaces `_pick_channel()` in render loop; pipeline closed on stop
- `AudioValueStream` now builds and applies filter pipeline: `_rebuild_filter_pipeline()` called on construction; filters applied in `_extract_raw()` before scalar extraction; pipeline closed on stop
- Hot-update: `ProcessorManager.refresh_audio_filter_pipelines(template_id)` dispatches to both CSS and value stream managers; called from audio processing template update/delete routes
- WebSocket test endpoint creates a filter pipeline from the resolved template chain and applies it to analysis before sending
- Dependency injection: `audio_processing_template_store` threaded through `ProcessorDependencies` -> `ProcessorManager` -> `ColorStripStreamManager` / `ValueStreamManager` -> `AudioColorStripStream` / `AudioValueStream`
### What Phase 5 needs to know
- The backend is fully wired: creating a ProcessedAudioSource that references templates with channel_extract, band_extract, gain, etc. filters will apply those filters to live audio data
- The WebSocket test endpoint shows filtered analysis in real-time
- Frontend needs to provide UI for creating/editing AudioProcessingTemplates and ProcessedAudioSources
- Filter pipeline is per-stream-instance (not shared) — stateful filters maintain independent state
### Temporary breakages resolved
- `_pick_channel()` removed from AudioColorStripStream — replaced by `_apply_filters()` which uses the filter pipeline
- Channel/band handling in AudioValueStream now goes through filter pipeline
- All "Phase 4 will wire..." comments resolved
### Known deviations from plan
- `AudioFilterPipeline.__init__` takes `List[FilterInstance]` only (not `AudioFilterRegistry` — uses the class-level registry directly via `AudioFilterRegistry.create_instance()`)
- Hot-update uses explicit method calls from the route handler rather than an event-listener pattern — simpler and avoids the need for a subscription system
@@ -6,6 +6,7 @@ from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
fire_entity_event,
get_audio_processing_template_store,
get_processor_manager,
)
from wled_controller.api.schemas.audio_processing import (
AudioProcessingTemplateCreate,
@@ -129,6 +130,12 @@ async def update_audio_processing_template(
tags=data.tags,
)
fire_entity_event("audio_processing_template", "updated", template_id)
# Hot-update: rebuild filter pipelines for running streams using this template
try:
pm = get_processor_manager()
pm.refresh_audio_filter_pipelines(template_id)
except Exception:
pass # Non-critical: streams will pick up changes on next restart
return _apt_to_response(template)
except EntityNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
@@ -154,6 +161,12 @@ async def delete_audio_processing_template(
# TODO: Phase 3 will add reference checks against ProcessedAudioSource
store.delete_template(template_id)
fire_entity_event("audio_processing_template", "deleted", template_id)
# Hot-update: rebuild filter pipelines for running streams that used this template
try:
pm = get_processor_manager()
pm.refresh_audio_filter_pipelines(template_id)
except Exception:
pass # Non-critical
except HTTPException:
raise
except EntityNotFoundError as e:
@@ -9,6 +9,7 @@ from starlette.websockets import WebSocket, WebSocketDisconnect
from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
fire_entity_event,
get_audio_processing_template_store,
get_audio_source_store,
get_audio_template_store,
get_color_strip_store,
@@ -210,11 +211,12 @@ async def test_audio_source_ws(
ManagedAudioStream (ref-counted — shares with running targets), and streams
AudioAnalysis snapshots as JSON at ~20 Hz.
NOTE: Audio processing filters from the template chain are NOT applied in
this WebSocket yet — that will be wired in Phase 4 when the stream runtime
integrates filter instances.
Audio processing filters from the template chain are applied to the
analysis before sending, so the WebSocket output matches what running
streams see.
"""
from wled_controller.api.auth import verify_ws_token
from wled_controller.core.audio.filters.pipeline import build_pipeline_from_template_ids
if not verify_ws_token(token):
await websocket.close(code=4001, reason="Unauthorized")
@@ -223,6 +225,7 @@ async def test_audio_source_ws(
# Resolve source → device info + processing template chain
store = get_audio_source_store()
template_store = get_audio_template_store()
apt_store = get_audio_processing_template_store()
manager = get_processor_manager()
try:
@@ -247,6 +250,15 @@ async def test_audio_source_ws(
logger.debug("Audio template not found, falling back to best available engine: %s", e)
pass # Fall back to best available engine
# Build filter pipeline from processing template chain
pipeline = None
if resolved.audio_processing_template_ids and apt_store:
pipeline = build_pipeline_from_template_ids(
resolved.audio_processing_template_ids, apt_store
)
if pipeline.empty:
pipeline = None
# Acquire shared audio stream
audio_mgr = manager.audio_capture_manager
try:
@@ -265,7 +277,10 @@ async def test_audio_source_ws(
if analysis is not None and analysis.timestamp != last_ts:
last_ts = analysis.timestamp
# Send raw analysis — filter processing will be added in Phase 4
# Apply filter pipeline (channel extract, band extract, gain, etc.)
if pipeline is not None:
analysis = pipeline.process(analysis)
await websocket.send_json(
{
"spectrum": analysis.spectrum.tolist(),
@@ -283,5 +298,7 @@ async def test_audio_source_ws(
except Exception as e:
logger.error(f"Audio test WebSocket error for {source_id}: {e}")
finally:
if pipeline is not None:
pipeline.close()
audio_mgr.release(device_index, is_loopback, engine_type)
logger.info(f"Audio test WebSocket disconnected for source {source_id}")
@@ -5,6 +5,7 @@ Import this package to ensure all built-in filters are registered.
"""
from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef
from wled_controller.core.audio.filters.pipeline import AudioFilterPipeline
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
# Import individual filters to trigger auto-registration
@@ -24,5 +25,6 @@ import wled_controller.core.audio.filters.delay # noqa: F401
__all__ = [
"AudioFilter",
"AudioFilterOptionDef",
"AudioFilterPipeline",
"AudioFilterRegistry",
]
@@ -0,0 +1,113 @@
"""Audio filter pipeline — chains multiple AudioFilter instances sequentially.
The pipeline is the runtime executor for audio processing template chains.
It takes a flat list of FilterInstance specs, instantiates each filter via
the AudioFilterRegistry, and runs AudioAnalysis through them in order.
Thread-safe: each stream gets its own pipeline instance with independent
stateful filter state.
"""
from __future__ import annotations
import threading
from typing import TYPE_CHECKING, List
from wled_controller.core.audio.analysis import AudioAnalysis
from wled_controller.core.audio.filters.base import AudioFilter
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
from wled_controller.utils import get_logger
if TYPE_CHECKING:
from wled_controller.core.filters.filter_instance import FilterInstance
from wled_controller.storage.audio_processing_template_store import (
AudioProcessingTemplateStore,
)
logger = get_logger(__name__)
class AudioFilterPipeline:
"""Chains multiple AudioFilter instances and runs AudioAnalysis through them.
Each pipeline owns its own filter instances — stateful filters maintain
per-pipeline state, so pipelines must NOT be shared across streams.
"""
def __init__(self, filter_instances: List["FilterInstance"]) -> None:
"""Create a pipeline from FilterInstance specs.
Args:
filter_instances: Flat (already-resolved) list of FilterInstance specs.
Each is instantiated via AudioFilterRegistry.create_instance().
"""
self._lock = threading.Lock()
self._filters: List[AudioFilter] = []
for fi in filter_instances:
try:
f = AudioFilterRegistry.create_instance(fi.filter_id, fi.options)
self._filters.append(f)
except ValueError:
logger.warning("Skipping unknown audio filter '%s'", fi.filter_id)
@property
def empty(self) -> bool:
"""True when the pipeline has no filters (passthrough)."""
return len(self._filters) == 0
def process(self, analysis: AudioAnalysis) -> AudioAnalysis:
"""Run analysis through all filters in order.
Returns a new AudioAnalysis (filters produce new objects via
dataclasses.replace, never mutating the input).
"""
result = analysis
with self._lock:
for f in self._filters:
result = f.process(result)
return result
def reset(self) -> None:
"""Reset all stateful filters to their initial state."""
with self._lock:
for f in self._filters:
if f.is_stateful:
f.reset()
def close(self) -> None:
"""Release resources and clear filter list."""
with self._lock:
self._filters.clear()
def __repr__(self) -> str:
return f"AudioFilterPipeline(filters={self._filters})"
def build_pipeline_from_template_ids(
template_ids: List[str],
template_store: "AudioProcessingTemplateStore",
) -> AudioFilterPipeline:
"""Resolve a list of audio processing template IDs into a single pipeline.
Expands each template's filters (including nested audio_filter_template
references) via the store's resolve_filter_instances(), concatenates all
resolved FilterInstance lists in order, and returns a ready-to-use pipeline.
Args:
template_ids: Ordered template IDs (outermost first, as returned by
ResolvedAudioSource.audio_processing_template_ids).
template_store: AudioProcessingTemplateStore for template lookup and
recursive filter resolution.
Returns:
AudioFilterPipeline (may be empty if no templates / all templates missing).
"""
all_instances: List[FilterInstance] = []
for tid in template_ids:
try:
template = template_store.get_template(tid)
resolved = template_store.resolve_filter_instances(template.filters)
all_instances.extend(resolved)
except ValueError:
logger.warning("Audio processing template '%s' not found, skipping", tid)
return AudioFilterPipeline(all_instances)
@@ -17,6 +17,7 @@ import numpy as np
from wled_controller.core.audio.analysis import NUM_BANDS
from wled_controller.core.audio.audio_capture import AudioCaptureManager
from wled_controller.core.audio.filters.pipeline import build_pipeline_from_template_ids
from wled_controller.core.processing.color_strip_stream import ColorStripStream
from wled_controller.core.processing.effect_stream import _build_palette_lut
from wled_controller.utils import get_logger
@@ -42,11 +43,14 @@ class AudioColorStripStream(ColorStripStream):
audio_capture_manager: AudioCaptureManager,
audio_source_store=None,
audio_template_store=None,
audio_processing_template_store=None,
):
self._audio_capture_manager = audio_capture_manager
self._audio_source_store = audio_source_store
self._audio_template_store = audio_template_store
self._audio_processing_template_store = audio_processing_template_store
self._audio_stream = None # acquired on start
self._filter_pipeline = None # AudioFilterPipeline, created on start
self._colors_lock = threading.Lock()
self._running = False
@@ -104,9 +108,6 @@ class AudioColorStripStream(ColorStripStream):
self._mirror = bool(getattr(source, "mirror", False))
# Resolve audio device/template via audio_source_id
# NOTE: channel selection and band filtering are now handled by audio
# processing filters (channel_extract, band_extract) applied via
# ProcessedAudioSource chains. Filter application will be wired in Phase 4.
audio_source_id = getattr(source, "audio_source_id", "")
self._audio_source_id = audio_source_id
self._audio_engine_type = None
@@ -138,9 +139,25 @@ class AudioColorStripStream(ColorStripStream):
self._audio_device_index = -1
self._audio_loopback = True
# Build audio filter pipeline from processing template chain
self._rebuild_filter_pipeline()
with self._colors_lock:
self._colors: Optional[np.ndarray] = None
def _rebuild_filter_pipeline(self) -> None:
"""Build (or rebuild) the audio filter pipeline from processing template IDs."""
old_pipeline = self._filter_pipeline
if self._audio_processing_template_ids and self._audio_processing_template_store:
self._filter_pipeline = build_pipeline_from_template_ids(
self._audio_processing_template_ids,
self._audio_processing_template_store,
)
else:
self._filter_pipeline = None
if old_pipeline is not None:
old_pipeline.close()
# ── ColorStripStream interface ──────────────────────────────────
def configure(self, device_led_count: int) -> None:
@@ -200,6 +217,10 @@ class AudioColorStripStream(ColorStripStream):
engine_type=self._audio_engine_type,
)
self._audio_stream = None
# Close audio filter pipeline
if self._filter_pipeline is not None:
self._filter_pipeline.close()
self._filter_pipeline = None
self._prev_spectrum = None
logger.info("AudioColorStripStream stopped")
@@ -338,16 +359,17 @@ class AudioColorStripStream(ColorStripStream):
finally:
self._running = False
# ── Channel selection ─────────────────────────────────────────
# ── Filter pipeline + channel selection ──────────────────────────
def _pick_channel(self, analysis):
"""Return (spectrum, rms) from the analysis.
def _apply_filters(self, analysis):
"""Apply audio filter pipeline (if any) and return (spectrum, rms).
Channel selection and band filtering are now handled by audio processing
filters applied via ProcessedAudioSource chains (Phase 4 will wire filter
instances into the stream runtime).
The filter pipeline handles channel extraction, band extraction,
gain, noise gate, etc. as configured by the ProcessedAudioSource
template chain.
"""
# ⚠️ Temporary breakage: channel/band filtering removed — resolved in Phase 4
if self._filter_pipeline is not None:
analysis = self._filter_pipeline.process(analysis)
return analysis.spectrum, analysis.rms
# ── Spectrum Analyzer ──────────────────────────────────────────
@@ -357,7 +379,7 @@ class AudioColorStripStream(ColorStripStream):
buf[:] = 0
return
spectrum, _ = self._pick_channel(analysis)
spectrum, _ = self._apply_filters(analysis)
sensitivity = self.resolve("sensitivity", self._sensitivity)
smoothing = self.resolve("smoothing", self._smoothing)
lut = self._palette_lut
@@ -408,7 +430,7 @@ class AudioColorStripStream(ColorStripStream):
buf[:] = 0
return
_, ch_rms = self._pick_channel(analysis)
_, ch_rms = self._apply_filters(analysis)
sensitivity = self.resolve("sensitivity", self._sensitivity)
smoothing = self.resolve("smoothing", self._smoothing)
rms = ch_rms * sensitivity
@@ -89,6 +89,7 @@ class ColorStripStreamManager:
weather_manager=None,
asset_store=None,
game_event_bus=None,
audio_processing_template_store=None,
):
"""
Args:
@@ -101,6 +102,7 @@ class ColorStripStreamManager:
cspt_store: ColorStripProcessingTemplateStore for per-layer filter chains
gradient_store: GradientStore for resolving gradient entity references
game_event_bus: GameEventBus for game event stream subscriptions
audio_processing_template_store: AudioProcessingTemplateStore for filter chains
"""
self._color_strip_store = color_strip_store
self._live_stream_manager = live_stream_manager
@@ -114,6 +116,7 @@ class ColorStripStreamManager:
self._weather_manager = weather_manager
self._asset_store = asset_store
self._game_event_bus = game_event_bus
self._audio_processing_template_store = audio_processing_template_store
self._streams: Dict[str, _ColorStripEntry] = {}
def _inject_clock(self, css_stream, source) -> Optional[str]:
@@ -246,6 +249,7 @@ class ColorStripStreamManager:
self._audio_capture_manager,
self._audio_source_store,
self._audio_template_store,
self._audio_processing_template_store,
)
elif source.source_type == "composite":
from wled_controller.core.processing.composite_stream import (
@@ -498,6 +502,28 @@ class ColorStripStreamManager:
logger.info(f"Updated {len(matching_keys)} running stream(s) for source {css_id}")
def refresh_audio_filter_pipelines(self, template_id: str) -> None:
"""Rebuild audio filter pipelines for any running AudioColorStripStream
that references the given audio processing template ID.
Called when an audio processing template is updated or deleted.
"""
from wled_controller.core.processing.audio_stream import AudioColorStripStream
count = 0
for entry in self._streams.values():
stream = entry.stream
if isinstance(stream, AudioColorStripStream):
if template_id in getattr(stream, "_audio_processing_template_ids", []):
stream._rebuild_filter_pipeline()
count += 1
if count:
logger.info(
"Refreshed audio filter pipeline for %d stream(s) after template %s update",
count,
template_id,
)
def notify_target_fps(self, css_id: str, target_id: str, fps: int) -> None:
"""Register or update a consumer's target FPS.
@@ -73,6 +73,7 @@ class ProcessorDependencies:
asset_store: Optional[AssetStore] = None
ha_manager: Optional[Any] = None # HomeAssistantManager
game_event_bus: Optional[Any] = None # GameEventBus
audio_processing_template_store: Optional[Any] = None # AudioProcessingTemplateStore
@dataclass
@@ -153,6 +154,7 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
weather_manager=deps.weather_manager,
asset_store=deps.asset_store,
game_event_bus=deps.game_event_bus,
audio_processing_template_store=deps.audio_processing_template_store,
)
self._value_stream_manager = (
ValueStreamManager(
@@ -165,6 +167,7 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
css_stream_manager=self._color_strip_stream_manager,
gradient_store=deps.gradient_store,
event_bus=deps.game_event_bus,
audio_processing_template_store=deps.audio_processing_template_store,
)
if deps.value_source_store
else None
@@ -194,6 +197,13 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
def color_strip_stream_manager(self) -> ColorStripStreamManager:
return self._color_strip_stream_manager
def refresh_audio_filter_pipelines(self, template_id: str) -> None:
"""Rebuild audio filter pipelines across all running streams when a
template is updated or deleted. Dispatches to both CSS and value stream managers."""
self._color_strip_stream_manager.refresh_audio_filter_pipelines(template_id)
if self._value_stream_manager:
self._value_stream_manager.refresh_audio_filter_pipelines(template_id)
# ===== SHARED CONTEXT (passed to target processors) =====
def _build_context(self) -> TargetContext:
@@ -178,6 +178,7 @@ class AudioValueStream(ValueStream):
audio_capture_manager: Optional["AudioCaptureManager"] = None,
audio_source_store: Optional["AudioSourceStore"] = None,
audio_template_store=None,
audio_processing_template_store=None,
):
self._audio_source_id = audio_source_id
self._mode = mode
@@ -191,6 +192,7 @@ class AudioValueStream(ValueStream):
self._audio_capture_manager = audio_capture_manager
self._audio_source_store = audio_source_store
self._audio_template_store = audio_template_store
self._audio_processing_template_store = audio_processing_template_store
# Resolved audio device params
self._audio_device_index = -1
@@ -200,6 +202,7 @@ class AudioValueStream(ValueStream):
self._audio_processing_template_ids: list = []
self._audio_stream = None
self._filter_pipeline = None # AudioFilterPipeline
self._prev_value = 0.0
self._beat_brightness = 0.0
@@ -208,9 +211,7 @@ class AudioValueStream(ValueStream):
def _resolve_audio_source(self) -> None:
"""Resolve audio source to device index / engine info / processing template IDs.
Channel selection and band filtering are now handled by audio processing
filters applied via ProcessedAudioSource chains (Phase 4 will wire filter
instances into the stream runtime).
Builds the audio filter pipeline from the processing template chain.
"""
if self._audio_source_id and self._audio_source_store:
try:
@@ -232,6 +233,22 @@ class AudioValueStream(ValueStream):
pass
except ValueError as e:
logger.warning(f"Failed to resolve audio source {self._audio_source_id}: {e}")
self._rebuild_filter_pipeline()
def _rebuild_filter_pipeline(self) -> None:
"""Build (or rebuild) the audio filter pipeline from processing template IDs."""
from wled_controller.core.audio.filters.pipeline import build_pipeline_from_template_ids
old_pipeline = self._filter_pipeline
if self._audio_processing_template_ids and self._audio_processing_template_store:
self._filter_pipeline = build_pipeline_from_template_ids(
self._audio_processing_template_ids,
self._audio_processing_template_store,
)
else:
self._filter_pipeline = None
if old_pipeline is not None:
old_pipeline.close()
def start(self) -> None:
if self._audio_capture_manager is None:
@@ -255,6 +272,9 @@ class AudioValueStream(ValueStream):
engine_type=self._audio_engine_type,
)
self._audio_stream = None
if self._filter_pipeline is not None:
self._filter_pipeline.close()
self._filter_pipeline = None
self._prev_value = 0.0
self._beat_brightness = 0.0
@@ -285,7 +305,13 @@ class AudioValueStream(ValueStream):
return max(0.0, min(1.0, mapped))
def _extract_raw(self, analysis) -> float:
"""Extract raw scalar from audio analysis based on mode."""
"""Extract raw scalar from audio analysis based on mode.
Applies the audio filter pipeline (if any) before extracting the scalar.
Channel extraction, band filtering, gain, etc. are handled by filters.
"""
if self._filter_pipeline is not None:
analysis = self._filter_pipeline.process(analysis)
if self._mode == "peak":
return self._pick_peak(analysis)
if self._mode == "beat":
@@ -294,13 +320,9 @@ class AudioValueStream(ValueStream):
return self._pick_rms(analysis)
def _pick_rms(self, analysis) -> float:
# ⚠️ Temporary breakage: channel selection removed — resolved in Phase 4
# Channel selection is now handled by audio processing filters
# (channel_extract) applied via ProcessedAudioSource chains.
return getattr(analysis, "rms", 0.0)
def _pick_peak(self, analysis) -> float:
# ⚠️ Temporary breakage: channel selection removed — resolved in Phase 4
return getattr(analysis, "peak", 0.0)
def _compute_beat(self, analysis) -> float:
@@ -1411,6 +1433,7 @@ class ValueStreamManager:
css_stream_manager: Optional["ColorStripStreamManager"] = None,
gradient_store: Optional[Any] = None,
event_bus: Optional["GameEventBus"] = None,
audio_processing_template_store=None,
):
self._value_source_store = value_source_store
self._audio_capture_manager = audio_capture_manager
@@ -1421,6 +1444,7 @@ class ValueStreamManager:
self._css_stream_manager = css_stream_manager
self._gradient_store = gradient_store
self._event_bus = event_bus
self._audio_processing_template_store = audio_processing_template_store
self._streams: Dict[str, ValueStream] = {} # vs_id → stream
self._ref_counts: Dict[str, int] = {} # vs_id → ref count
@@ -1473,6 +1497,25 @@ class ValueStreamManager:
stream.update_source(source)
logger.debug(f"Updated value stream {vs_id}")
def refresh_audio_filter_pipelines(self, template_id: str) -> None:
"""Rebuild audio filter pipelines for any running AudioValueStream
that references the given audio processing template ID.
Called when an audio processing template is updated or deleted.
"""
count = 0
for stream in self._streams.values():
if isinstance(stream, AudioValueStream):
if template_id in getattr(stream, "_audio_processing_template_ids", []):
stream._rebuild_filter_pipeline()
count += 1
if count:
logger.info(
"Refreshed audio filter pipeline for %d value stream(s) after template %s update",
count,
template_id,
)
def release_all(self) -> None:
"""Stop and remove all managed streams. Called on shutdown."""
for vs_id, stream in self._streams.items():
@@ -1525,6 +1568,7 @@ class ValueStreamManager:
audio_capture_manager=self._audio_capture_manager,
audio_source_store=self._audio_source_store,
audio_template_store=self._audio_template_store,
audio_processing_template_store=self._audio_processing_template_store,
)
if isinstance(source, DaylightValueSource):
+1
View File
@@ -128,6 +128,7 @@ processor_manager = ProcessorManager(
asset_store=asset_store,
ha_manager=ha_manager,
game_event_bus=game_event_bus,
audio_processing_template_store=audio_processing_template_store,
)
)