feat(processed-audio-sources): phase 4 - runtime filter integration
Add AudioFilterPipeline for chained filter execution on AudioAnalysis. Wire filter pipelines into AudioColorStripStream, AudioValueStream, and WebSocket test endpoint. Add hot-update support via ProcessorManager.refresh_audio_filter_pipelines(). Thread AudioProcessingTemplateStore through dependency injection hierarchy.
This commit is contained in:
@@ -9,7 +9,7 @@
|
||||
- **Test:** `cd server && py -3.13 -m pytest tests/ --no-cov -q`
|
||||
|
||||
## Current State
|
||||
Phase 1 (Audio Filter Framework), Phase 2 (Audio Filters), and Phase 3 (Processed Audio Source Model) implemented.
|
||||
Phases 1-4 implemented. Phase 4 (Runtime Integration) wired the audio filter pipeline into the stream runtime.
|
||||
|
||||
Phase 1 framework:
|
||||
- `AudioFilter` base class, `AudioFilterRegistry`, `AudioFilterOptionDef` in `core/audio/filters/`
|
||||
@@ -93,7 +93,7 @@ _(none yet)_
|
||||
| Phase 1 | impl-agent | — | No | Tasks 7+8 skipped (SQLite migration made them obsolete) |
|
||||
| Phase 2 | impl-agent | — | No | All 11 filters implemented, no deviations |
|
||||
| Phase 3 | impl-agent | — | No | All 11 tasks done; channel/band logic deferred to Phase 4 |
|
||||
| Phase 4 | — | — | — | — |
|
||||
| Phase 4 | impl-agent | — | No | All 6 tasks done; dependency injection threaded through |
|
||||
| Phase 5 | — | — | — | — |
|
||||
| Phase 6 | — | — | — | — |
|
||||
| Phase 7 | — | — | — | — |
|
||||
|
||||
@@ -42,7 +42,7 @@ Clean-slate approach: no data migration for old source types.
|
||||
| Phase 1: Audio Filter Framework | backend | 🔨 In Progress | ⬜ | ⬜ | ⬜ |
|
||||
| Phase 2: Audio Filters | backend | 🔨 In Progress | ⬜ | ⬜ | ⬜ |
|
||||
| Phase 3: Processed Audio Source Model | backend | ✅ Done | ⬜ | ⬜ | ⬜ |
|
||||
| Phase 4: Runtime Integration | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
|
||||
| Phase 4: Runtime Integration | backend | ✅ Done | ⬜ | ⬜ | ⬜ |
|
||||
| Phase 5: Frontend — Audio Processing Templates | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
|
||||
| Phase 6: Frontend — Source Types | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
|
||||
| Phase 7: Testing & Polish | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Phase 4: Runtime Integration
|
||||
|
||||
**Status:** ⬜ Not Started
|
||||
**Status:** ✅ Done
|
||||
**Parent plan:** [PLAN.md](./PLAN.md)
|
||||
**Domain:** backend
|
||||
|
||||
@@ -9,7 +9,7 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P
|
||||
|
||||
## Tasks
|
||||
|
||||
- [ ] Task 1: Create filter pipeline executor in `core/audio/filters/pipeline.py`
|
||||
- [x] Task 1: Create filter pipeline executor in `core/audio/filters/pipeline.py`
|
||||
- `AudioFilterPipeline` class:
|
||||
- `__init__(filter_instances: List[FilterInstance], registry: AudioFilterRegistry)`
|
||||
- Instantiates all filters from FilterInstance specs
|
||||
@@ -17,7 +17,7 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P
|
||||
- `reset()` — resets all stateful filters
|
||||
- `close()` — cleanup resources
|
||||
- Handles stateful filter lifecycle (create on init, reset on demand, close on cleanup)
|
||||
- [ ] Task 2: Update `AudioColorStripStream` in `core/processing/audio_stream.py`
|
||||
- [x] Task 2: Update `AudioColorStripStream` in `core/processing/audio_stream.py`
|
||||
- On construction: if source is ProcessedAudioSource, resolve the full chain:
|
||||
- Walk to CaptureAudioSource for device info
|
||||
- Collect all AudioProcessingTemplates along the chain
|
||||
@@ -26,18 +26,18 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P
|
||||
- In render loop: after getting AudioAnalysis from ManagedAudioStream, run it through the filter pipeline before visualization
|
||||
- Remove old inline channel selection and band filtering code (now handled by filters)
|
||||
- On stop: close the filter pipeline
|
||||
- [ ] Task 3: Update `AudioValueStream` in `core/processing/value_stream.py`
|
||||
- [x] Task 3: Update `AudioValueStream` in `core/processing/value_stream.py`
|
||||
- Same pattern: resolve ProcessedAudioSource chain, create filter pipeline, apply in get_value()
|
||||
- Remove old inline channel/band handling
|
||||
- [ ] Task 4: Hot-update support for filter templates
|
||||
- [x] Task 4: Hot-update support for filter templates
|
||||
- When an AudioProcessingTemplate is updated, running streams that use it should re-resolve their filter pipeline
|
||||
- Listen for template update events (or implement a refresh mechanism)
|
||||
- Re-create AudioFilterPipeline with updated filter instances
|
||||
- Reset stateful filter state on pipeline refresh
|
||||
- [ ] Task 5: Update WebSocket test endpoint in `api/routes/audio_sources.py`
|
||||
- [x] Task 5: Update WebSocket test endpoint in `api/routes/audio_sources.py`
|
||||
- For ProcessedAudioSource: resolve chain, create pipeline, apply filters to test stream data
|
||||
- Return filtered analysis in real-time over WebSocket
|
||||
- [ ] Task 6: Update any code that calls `AudioSourceStore.resolve_audio_source()` to handle the new return shape
|
||||
- [x] Task 6: Update any code that calls `AudioSourceStore.resolve_audio_source()` to handle the new return shape
|
||||
|
||||
## Files to Modify/Create
|
||||
- `core/audio/filters/pipeline.py` — **create** — AudioFilterPipeline
|
||||
@@ -61,11 +61,34 @@ Wire the audio filter pipeline into the runtime audio streaming system so that P
|
||||
- The filter pipeline should produce a new AudioAnalysis each time (immutability), not mutate the shared snapshot from ManagedAudioStream.
|
||||
|
||||
## Review Checklist
|
||||
- [ ] All tasks completed
|
||||
- [ ] Code follows project conventions
|
||||
- [ ] No unintended side effects
|
||||
- [x] All tasks completed
|
||||
- [x] Code follows project conventions
|
||||
- [x] No unintended side effects
|
||||
- [ ] Build passes
|
||||
- [ ] Tests pass (new + existing)
|
||||
|
||||
## Handoff to Next Phase
|
||||
<!-- Filled in by the implementation agent after completing this phase. -->
|
||||
|
||||
### What was built
|
||||
- `AudioFilterPipeline` class in `core/audio/filters/pipeline.py` — thread-safe pipeline that instantiates, processes, resets, and closes audio filters
|
||||
- `build_pipeline_from_template_ids()` helper — resolves template IDs to FilterInstance lists and creates a pipeline
|
||||
- `AudioColorStripStream` now builds and applies filter pipeline: `_rebuild_filter_pipeline()` called on construction and source update; `_apply_filters()` replaces `_pick_channel()` in render loop; pipeline closed on stop
|
||||
- `AudioValueStream` now builds and applies filter pipeline: `_rebuild_filter_pipeline()` called on construction; filters applied in `_extract_raw()` before scalar extraction; pipeline closed on stop
|
||||
- Hot-update: `ProcessorManager.refresh_audio_filter_pipelines(template_id)` dispatches to both CSS and value stream managers; called from audio processing template update/delete routes
|
||||
- WebSocket test endpoint creates a filter pipeline from the resolved template chain and applies it to analysis before sending
|
||||
- Dependency injection: `audio_processing_template_store` threaded through `ProcessorDependencies` -> `ProcessorManager` -> `ColorStripStreamManager` / `ValueStreamManager` -> `AudioColorStripStream` / `AudioValueStream`
|
||||
|
||||
### What Phase 5 needs to know
|
||||
- The backend is fully wired: creating a ProcessedAudioSource that references templates with channel_extract, band_extract, gain, etc. filters will apply those filters to live audio data
|
||||
- The WebSocket test endpoint shows filtered analysis in real-time
|
||||
- Frontend needs to provide UI for creating/editing AudioProcessingTemplates and ProcessedAudioSources
|
||||
- Filter pipeline is per-stream-instance (not shared) — stateful filters maintain independent state
|
||||
|
||||
### Temporary breakages resolved
|
||||
- `_pick_channel()` removed from AudioColorStripStream — replaced by `_apply_filters()` which uses the filter pipeline
|
||||
- Channel/band handling in AudioValueStream now goes through filter pipeline
|
||||
- All "Phase 4 will wire..." comments resolved
|
||||
|
||||
### Known deviations from plan
|
||||
- `AudioFilterPipeline.__init__` takes `List[FilterInstance]` only (not `AudioFilterRegistry` — uses the class-level registry directly via `AudioFilterRegistry.create_instance()`)
|
||||
- Hot-update uses explicit method calls from the route handler rather than an event-listener pattern — simpler and avoids the need for a subscription system
|
||||
|
||||
@@ -6,6 +6,7 @@ from wled_controller.api.auth import AuthRequired
|
||||
from wled_controller.api.dependencies import (
|
||||
fire_entity_event,
|
||||
get_audio_processing_template_store,
|
||||
get_processor_manager,
|
||||
)
|
||||
from wled_controller.api.schemas.audio_processing import (
|
||||
AudioProcessingTemplateCreate,
|
||||
@@ -129,6 +130,12 @@ async def update_audio_processing_template(
|
||||
tags=data.tags,
|
||||
)
|
||||
fire_entity_event("audio_processing_template", "updated", template_id)
|
||||
# Hot-update: rebuild filter pipelines for running streams using this template
|
||||
try:
|
||||
pm = get_processor_manager()
|
||||
pm.refresh_audio_filter_pipelines(template_id)
|
||||
except Exception:
|
||||
pass # Non-critical: streams will pick up changes on next restart
|
||||
return _apt_to_response(template)
|
||||
except EntityNotFoundError as e:
|
||||
raise HTTPException(status_code=404, detail=str(e))
|
||||
@@ -154,6 +161,12 @@ async def delete_audio_processing_template(
|
||||
# TODO: Phase 3 will add reference checks against ProcessedAudioSource
|
||||
store.delete_template(template_id)
|
||||
fire_entity_event("audio_processing_template", "deleted", template_id)
|
||||
# Hot-update: rebuild filter pipelines for running streams that used this template
|
||||
try:
|
||||
pm = get_processor_manager()
|
||||
pm.refresh_audio_filter_pipelines(template_id)
|
||||
except Exception:
|
||||
pass # Non-critical
|
||||
except HTTPException:
|
||||
raise
|
||||
except EntityNotFoundError as e:
|
||||
|
||||
@@ -9,6 +9,7 @@ from starlette.websockets import WebSocket, WebSocketDisconnect
|
||||
from wled_controller.api.auth import AuthRequired
|
||||
from wled_controller.api.dependencies import (
|
||||
fire_entity_event,
|
||||
get_audio_processing_template_store,
|
||||
get_audio_source_store,
|
||||
get_audio_template_store,
|
||||
get_color_strip_store,
|
||||
@@ -210,11 +211,12 @@ async def test_audio_source_ws(
|
||||
ManagedAudioStream (ref-counted — shares with running targets), and streams
|
||||
AudioAnalysis snapshots as JSON at ~20 Hz.
|
||||
|
||||
NOTE: Audio processing filters from the template chain are NOT applied in
|
||||
this WebSocket yet — that will be wired in Phase 4 when the stream runtime
|
||||
integrates filter instances.
|
||||
Audio processing filters from the template chain are applied to the
|
||||
analysis before sending, so the WebSocket output matches what running
|
||||
streams see.
|
||||
"""
|
||||
from wled_controller.api.auth import verify_ws_token
|
||||
from wled_controller.core.audio.filters.pipeline import build_pipeline_from_template_ids
|
||||
|
||||
if not verify_ws_token(token):
|
||||
await websocket.close(code=4001, reason="Unauthorized")
|
||||
@@ -223,6 +225,7 @@ async def test_audio_source_ws(
|
||||
# Resolve source → device info + processing template chain
|
||||
store = get_audio_source_store()
|
||||
template_store = get_audio_template_store()
|
||||
apt_store = get_audio_processing_template_store()
|
||||
manager = get_processor_manager()
|
||||
|
||||
try:
|
||||
@@ -247,6 +250,15 @@ async def test_audio_source_ws(
|
||||
logger.debug("Audio template not found, falling back to best available engine: %s", e)
|
||||
pass # Fall back to best available engine
|
||||
|
||||
# Build filter pipeline from processing template chain
|
||||
pipeline = None
|
||||
if resolved.audio_processing_template_ids and apt_store:
|
||||
pipeline = build_pipeline_from_template_ids(
|
||||
resolved.audio_processing_template_ids, apt_store
|
||||
)
|
||||
if pipeline.empty:
|
||||
pipeline = None
|
||||
|
||||
# Acquire shared audio stream
|
||||
audio_mgr = manager.audio_capture_manager
|
||||
try:
|
||||
@@ -265,7 +277,10 @@ async def test_audio_source_ws(
|
||||
if analysis is not None and analysis.timestamp != last_ts:
|
||||
last_ts = analysis.timestamp
|
||||
|
||||
# Send raw analysis — filter processing will be added in Phase 4
|
||||
# Apply filter pipeline (channel extract, band extract, gain, etc.)
|
||||
if pipeline is not None:
|
||||
analysis = pipeline.process(analysis)
|
||||
|
||||
await websocket.send_json(
|
||||
{
|
||||
"spectrum": analysis.spectrum.tolist(),
|
||||
@@ -283,5 +298,7 @@ async def test_audio_source_ws(
|
||||
except Exception as e:
|
||||
logger.error(f"Audio test WebSocket error for {source_id}: {e}")
|
||||
finally:
|
||||
if pipeline is not None:
|
||||
pipeline.close()
|
||||
audio_mgr.release(device_index, is_loopback, engine_type)
|
||||
logger.info(f"Audio test WebSocket disconnected for source {source_id}")
|
||||
|
||||
@@ -5,6 +5,7 @@ Import this package to ensure all built-in filters are registered.
|
||||
"""
|
||||
|
||||
from wled_controller.core.audio.filters.base import AudioFilter, AudioFilterOptionDef
|
||||
from wled_controller.core.audio.filters.pipeline import AudioFilterPipeline
|
||||
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
|
||||
|
||||
# Import individual filters to trigger auto-registration
|
||||
@@ -24,5 +25,6 @@ import wled_controller.core.audio.filters.delay # noqa: F401
|
||||
__all__ = [
|
||||
"AudioFilter",
|
||||
"AudioFilterOptionDef",
|
||||
"AudioFilterPipeline",
|
||||
"AudioFilterRegistry",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,113 @@
|
||||
"""Audio filter pipeline — chains multiple AudioFilter instances sequentially.
|
||||
|
||||
The pipeline is the runtime executor for audio processing template chains.
|
||||
It takes a flat list of FilterInstance specs, instantiates each filter via
|
||||
the AudioFilterRegistry, and runs AudioAnalysis through them in order.
|
||||
|
||||
Thread-safe: each stream gets its own pipeline instance with independent
|
||||
stateful filter state.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from typing import TYPE_CHECKING, List
|
||||
|
||||
from wled_controller.core.audio.analysis import AudioAnalysis
|
||||
from wled_controller.core.audio.filters.base import AudioFilter
|
||||
from wled_controller.core.audio.filters.registry import AudioFilterRegistry
|
||||
from wled_controller.utils import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from wled_controller.core.filters.filter_instance import FilterInstance
|
||||
from wled_controller.storage.audio_processing_template_store import (
|
||||
AudioProcessingTemplateStore,
|
||||
)
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class AudioFilterPipeline:
|
||||
"""Chains multiple AudioFilter instances and runs AudioAnalysis through them.
|
||||
|
||||
Each pipeline owns its own filter instances — stateful filters maintain
|
||||
per-pipeline state, so pipelines must NOT be shared across streams.
|
||||
"""
|
||||
|
||||
def __init__(self, filter_instances: List["FilterInstance"]) -> None:
|
||||
"""Create a pipeline from FilterInstance specs.
|
||||
|
||||
Args:
|
||||
filter_instances: Flat (already-resolved) list of FilterInstance specs.
|
||||
Each is instantiated via AudioFilterRegistry.create_instance().
|
||||
"""
|
||||
self._lock = threading.Lock()
|
||||
self._filters: List[AudioFilter] = []
|
||||
for fi in filter_instances:
|
||||
try:
|
||||
f = AudioFilterRegistry.create_instance(fi.filter_id, fi.options)
|
||||
self._filters.append(f)
|
||||
except ValueError:
|
||||
logger.warning("Skipping unknown audio filter '%s'", fi.filter_id)
|
||||
|
||||
@property
|
||||
def empty(self) -> bool:
|
||||
"""True when the pipeline has no filters (passthrough)."""
|
||||
return len(self._filters) == 0
|
||||
|
||||
def process(self, analysis: AudioAnalysis) -> AudioAnalysis:
|
||||
"""Run analysis through all filters in order.
|
||||
|
||||
Returns a new AudioAnalysis (filters produce new objects via
|
||||
dataclasses.replace, never mutating the input).
|
||||
"""
|
||||
result = analysis
|
||||
with self._lock:
|
||||
for f in self._filters:
|
||||
result = f.process(result)
|
||||
return result
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset all stateful filters to their initial state."""
|
||||
with self._lock:
|
||||
for f in self._filters:
|
||||
if f.is_stateful:
|
||||
f.reset()
|
||||
|
||||
def close(self) -> None:
|
||||
"""Release resources and clear filter list."""
|
||||
with self._lock:
|
||||
self._filters.clear()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"AudioFilterPipeline(filters={self._filters})"
|
||||
|
||||
|
||||
def build_pipeline_from_template_ids(
|
||||
template_ids: List[str],
|
||||
template_store: "AudioProcessingTemplateStore",
|
||||
) -> AudioFilterPipeline:
|
||||
"""Resolve a list of audio processing template IDs into a single pipeline.
|
||||
|
||||
Expands each template's filters (including nested audio_filter_template
|
||||
references) via the store's resolve_filter_instances(), concatenates all
|
||||
resolved FilterInstance lists in order, and returns a ready-to-use pipeline.
|
||||
|
||||
Args:
|
||||
template_ids: Ordered template IDs (outermost first, as returned by
|
||||
ResolvedAudioSource.audio_processing_template_ids).
|
||||
template_store: AudioProcessingTemplateStore for template lookup and
|
||||
recursive filter resolution.
|
||||
|
||||
Returns:
|
||||
AudioFilterPipeline (may be empty if no templates / all templates missing).
|
||||
"""
|
||||
all_instances: List[FilterInstance] = []
|
||||
for tid in template_ids:
|
||||
try:
|
||||
template = template_store.get_template(tid)
|
||||
resolved = template_store.resolve_filter_instances(template.filters)
|
||||
all_instances.extend(resolved)
|
||||
except ValueError:
|
||||
logger.warning("Audio processing template '%s' not found, skipping", tid)
|
||||
return AudioFilterPipeline(all_instances)
|
||||
@@ -17,6 +17,7 @@ import numpy as np
|
||||
|
||||
from wled_controller.core.audio.analysis import NUM_BANDS
|
||||
from wled_controller.core.audio.audio_capture import AudioCaptureManager
|
||||
from wled_controller.core.audio.filters.pipeline import build_pipeline_from_template_ids
|
||||
from wled_controller.core.processing.color_strip_stream import ColorStripStream
|
||||
from wled_controller.core.processing.effect_stream import _build_palette_lut
|
||||
from wled_controller.utils import get_logger
|
||||
@@ -42,11 +43,14 @@ class AudioColorStripStream(ColorStripStream):
|
||||
audio_capture_manager: AudioCaptureManager,
|
||||
audio_source_store=None,
|
||||
audio_template_store=None,
|
||||
audio_processing_template_store=None,
|
||||
):
|
||||
self._audio_capture_manager = audio_capture_manager
|
||||
self._audio_source_store = audio_source_store
|
||||
self._audio_template_store = audio_template_store
|
||||
self._audio_processing_template_store = audio_processing_template_store
|
||||
self._audio_stream = None # acquired on start
|
||||
self._filter_pipeline = None # AudioFilterPipeline, created on start
|
||||
|
||||
self._colors_lock = threading.Lock()
|
||||
self._running = False
|
||||
@@ -104,9 +108,6 @@ class AudioColorStripStream(ColorStripStream):
|
||||
self._mirror = bool(getattr(source, "mirror", False))
|
||||
|
||||
# Resolve audio device/template via audio_source_id
|
||||
# NOTE: channel selection and band filtering are now handled by audio
|
||||
# processing filters (channel_extract, band_extract) applied via
|
||||
# ProcessedAudioSource chains. Filter application will be wired in Phase 4.
|
||||
audio_source_id = getattr(source, "audio_source_id", "")
|
||||
self._audio_source_id = audio_source_id
|
||||
self._audio_engine_type = None
|
||||
@@ -138,9 +139,25 @@ class AudioColorStripStream(ColorStripStream):
|
||||
self._audio_device_index = -1
|
||||
self._audio_loopback = True
|
||||
|
||||
# Build audio filter pipeline from processing template chain
|
||||
self._rebuild_filter_pipeline()
|
||||
|
||||
with self._colors_lock:
|
||||
self._colors: Optional[np.ndarray] = None
|
||||
|
||||
def _rebuild_filter_pipeline(self) -> None:
|
||||
"""Build (or rebuild) the audio filter pipeline from processing template IDs."""
|
||||
old_pipeline = self._filter_pipeline
|
||||
if self._audio_processing_template_ids and self._audio_processing_template_store:
|
||||
self._filter_pipeline = build_pipeline_from_template_ids(
|
||||
self._audio_processing_template_ids,
|
||||
self._audio_processing_template_store,
|
||||
)
|
||||
else:
|
||||
self._filter_pipeline = None
|
||||
if old_pipeline is not None:
|
||||
old_pipeline.close()
|
||||
|
||||
# ── ColorStripStream interface ──────────────────────────────────
|
||||
|
||||
def configure(self, device_led_count: int) -> None:
|
||||
@@ -200,6 +217,10 @@ class AudioColorStripStream(ColorStripStream):
|
||||
engine_type=self._audio_engine_type,
|
||||
)
|
||||
self._audio_stream = None
|
||||
# Close audio filter pipeline
|
||||
if self._filter_pipeline is not None:
|
||||
self._filter_pipeline.close()
|
||||
self._filter_pipeline = None
|
||||
self._prev_spectrum = None
|
||||
logger.info("AudioColorStripStream stopped")
|
||||
|
||||
@@ -338,16 +359,17 @@ class AudioColorStripStream(ColorStripStream):
|
||||
finally:
|
||||
self._running = False
|
||||
|
||||
# ── Channel selection ─────────────────────────────────────────
|
||||
# ── Filter pipeline + channel selection ──────────────────────────
|
||||
|
||||
def _pick_channel(self, analysis):
|
||||
"""Return (spectrum, rms) from the analysis.
|
||||
def _apply_filters(self, analysis):
|
||||
"""Apply audio filter pipeline (if any) and return (spectrum, rms).
|
||||
|
||||
Channel selection and band filtering are now handled by audio processing
|
||||
filters applied via ProcessedAudioSource chains (Phase 4 will wire filter
|
||||
instances into the stream runtime).
|
||||
The filter pipeline handles channel extraction, band extraction,
|
||||
gain, noise gate, etc. as configured by the ProcessedAudioSource
|
||||
template chain.
|
||||
"""
|
||||
# ⚠️ Temporary breakage: channel/band filtering removed — resolved in Phase 4
|
||||
if self._filter_pipeline is not None:
|
||||
analysis = self._filter_pipeline.process(analysis)
|
||||
return analysis.spectrum, analysis.rms
|
||||
|
||||
# ── Spectrum Analyzer ──────────────────────────────────────────
|
||||
@@ -357,7 +379,7 @@ class AudioColorStripStream(ColorStripStream):
|
||||
buf[:] = 0
|
||||
return
|
||||
|
||||
spectrum, _ = self._pick_channel(analysis)
|
||||
spectrum, _ = self._apply_filters(analysis)
|
||||
sensitivity = self.resolve("sensitivity", self._sensitivity)
|
||||
smoothing = self.resolve("smoothing", self._smoothing)
|
||||
lut = self._palette_lut
|
||||
@@ -408,7 +430,7 @@ class AudioColorStripStream(ColorStripStream):
|
||||
buf[:] = 0
|
||||
return
|
||||
|
||||
_, ch_rms = self._pick_channel(analysis)
|
||||
_, ch_rms = self._apply_filters(analysis)
|
||||
sensitivity = self.resolve("sensitivity", self._sensitivity)
|
||||
smoothing = self.resolve("smoothing", self._smoothing)
|
||||
rms = ch_rms * sensitivity
|
||||
|
||||
@@ -89,6 +89,7 @@ class ColorStripStreamManager:
|
||||
weather_manager=None,
|
||||
asset_store=None,
|
||||
game_event_bus=None,
|
||||
audio_processing_template_store=None,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
@@ -101,6 +102,7 @@ class ColorStripStreamManager:
|
||||
cspt_store: ColorStripProcessingTemplateStore for per-layer filter chains
|
||||
gradient_store: GradientStore for resolving gradient entity references
|
||||
game_event_bus: GameEventBus for game event stream subscriptions
|
||||
audio_processing_template_store: AudioProcessingTemplateStore for filter chains
|
||||
"""
|
||||
self._color_strip_store = color_strip_store
|
||||
self._live_stream_manager = live_stream_manager
|
||||
@@ -114,6 +116,7 @@ class ColorStripStreamManager:
|
||||
self._weather_manager = weather_manager
|
||||
self._asset_store = asset_store
|
||||
self._game_event_bus = game_event_bus
|
||||
self._audio_processing_template_store = audio_processing_template_store
|
||||
self._streams: Dict[str, _ColorStripEntry] = {}
|
||||
|
||||
def _inject_clock(self, css_stream, source) -> Optional[str]:
|
||||
@@ -246,6 +249,7 @@ class ColorStripStreamManager:
|
||||
self._audio_capture_manager,
|
||||
self._audio_source_store,
|
||||
self._audio_template_store,
|
||||
self._audio_processing_template_store,
|
||||
)
|
||||
elif source.source_type == "composite":
|
||||
from wled_controller.core.processing.composite_stream import (
|
||||
@@ -498,6 +502,28 @@ class ColorStripStreamManager:
|
||||
|
||||
logger.info(f"Updated {len(matching_keys)} running stream(s) for source {css_id}")
|
||||
|
||||
def refresh_audio_filter_pipelines(self, template_id: str) -> None:
|
||||
"""Rebuild audio filter pipelines for any running AudioColorStripStream
|
||||
that references the given audio processing template ID.
|
||||
|
||||
Called when an audio processing template is updated or deleted.
|
||||
"""
|
||||
from wled_controller.core.processing.audio_stream import AudioColorStripStream
|
||||
|
||||
count = 0
|
||||
for entry in self._streams.values():
|
||||
stream = entry.stream
|
||||
if isinstance(stream, AudioColorStripStream):
|
||||
if template_id in getattr(stream, "_audio_processing_template_ids", []):
|
||||
stream._rebuild_filter_pipeline()
|
||||
count += 1
|
||||
if count:
|
||||
logger.info(
|
||||
"Refreshed audio filter pipeline for %d stream(s) after template %s update",
|
||||
count,
|
||||
template_id,
|
||||
)
|
||||
|
||||
def notify_target_fps(self, css_id: str, target_id: str, fps: int) -> None:
|
||||
"""Register or update a consumer's target FPS.
|
||||
|
||||
|
||||
@@ -73,6 +73,7 @@ class ProcessorDependencies:
|
||||
asset_store: Optional[AssetStore] = None
|
||||
ha_manager: Optional[Any] = None # HomeAssistantManager
|
||||
game_event_bus: Optional[Any] = None # GameEventBus
|
||||
audio_processing_template_store: Optional[Any] = None # AudioProcessingTemplateStore
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -153,6 +154,7 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
|
||||
weather_manager=deps.weather_manager,
|
||||
asset_store=deps.asset_store,
|
||||
game_event_bus=deps.game_event_bus,
|
||||
audio_processing_template_store=deps.audio_processing_template_store,
|
||||
)
|
||||
self._value_stream_manager = (
|
||||
ValueStreamManager(
|
||||
@@ -165,6 +167,7 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
|
||||
css_stream_manager=self._color_strip_stream_manager,
|
||||
gradient_store=deps.gradient_store,
|
||||
event_bus=deps.game_event_bus,
|
||||
audio_processing_template_store=deps.audio_processing_template_store,
|
||||
)
|
||||
if deps.value_source_store
|
||||
else None
|
||||
@@ -194,6 +197,13 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
|
||||
def color_strip_stream_manager(self) -> ColorStripStreamManager:
|
||||
return self._color_strip_stream_manager
|
||||
|
||||
def refresh_audio_filter_pipelines(self, template_id: str) -> None:
|
||||
"""Rebuild audio filter pipelines across all running streams when a
|
||||
template is updated or deleted. Dispatches to both CSS and value stream managers."""
|
||||
self._color_strip_stream_manager.refresh_audio_filter_pipelines(template_id)
|
||||
if self._value_stream_manager:
|
||||
self._value_stream_manager.refresh_audio_filter_pipelines(template_id)
|
||||
|
||||
# ===== SHARED CONTEXT (passed to target processors) =====
|
||||
|
||||
def _build_context(self) -> TargetContext:
|
||||
|
||||
@@ -178,6 +178,7 @@ class AudioValueStream(ValueStream):
|
||||
audio_capture_manager: Optional["AudioCaptureManager"] = None,
|
||||
audio_source_store: Optional["AudioSourceStore"] = None,
|
||||
audio_template_store=None,
|
||||
audio_processing_template_store=None,
|
||||
):
|
||||
self._audio_source_id = audio_source_id
|
||||
self._mode = mode
|
||||
@@ -191,6 +192,7 @@ class AudioValueStream(ValueStream):
|
||||
self._audio_capture_manager = audio_capture_manager
|
||||
self._audio_source_store = audio_source_store
|
||||
self._audio_template_store = audio_template_store
|
||||
self._audio_processing_template_store = audio_processing_template_store
|
||||
|
||||
# Resolved audio device params
|
||||
self._audio_device_index = -1
|
||||
@@ -200,6 +202,7 @@ class AudioValueStream(ValueStream):
|
||||
self._audio_processing_template_ids: list = []
|
||||
|
||||
self._audio_stream = None
|
||||
self._filter_pipeline = None # AudioFilterPipeline
|
||||
self._prev_value = 0.0
|
||||
self._beat_brightness = 0.0
|
||||
|
||||
@@ -208,9 +211,7 @@ class AudioValueStream(ValueStream):
|
||||
def _resolve_audio_source(self) -> None:
|
||||
"""Resolve audio source to device index / engine info / processing template IDs.
|
||||
|
||||
Channel selection and band filtering are now handled by audio processing
|
||||
filters applied via ProcessedAudioSource chains (Phase 4 will wire filter
|
||||
instances into the stream runtime).
|
||||
Builds the audio filter pipeline from the processing template chain.
|
||||
"""
|
||||
if self._audio_source_id and self._audio_source_store:
|
||||
try:
|
||||
@@ -232,6 +233,22 @@ class AudioValueStream(ValueStream):
|
||||
pass
|
||||
except ValueError as e:
|
||||
logger.warning(f"Failed to resolve audio source {self._audio_source_id}: {e}")
|
||||
self._rebuild_filter_pipeline()
|
||||
|
||||
def _rebuild_filter_pipeline(self) -> None:
|
||||
"""Build (or rebuild) the audio filter pipeline from processing template IDs."""
|
||||
from wled_controller.core.audio.filters.pipeline import build_pipeline_from_template_ids
|
||||
|
||||
old_pipeline = self._filter_pipeline
|
||||
if self._audio_processing_template_ids and self._audio_processing_template_store:
|
||||
self._filter_pipeline = build_pipeline_from_template_ids(
|
||||
self._audio_processing_template_ids,
|
||||
self._audio_processing_template_store,
|
||||
)
|
||||
else:
|
||||
self._filter_pipeline = None
|
||||
if old_pipeline is not None:
|
||||
old_pipeline.close()
|
||||
|
||||
def start(self) -> None:
|
||||
if self._audio_capture_manager is None:
|
||||
@@ -255,6 +272,9 @@ class AudioValueStream(ValueStream):
|
||||
engine_type=self._audio_engine_type,
|
||||
)
|
||||
self._audio_stream = None
|
||||
if self._filter_pipeline is not None:
|
||||
self._filter_pipeline.close()
|
||||
self._filter_pipeline = None
|
||||
self._prev_value = 0.0
|
||||
self._beat_brightness = 0.0
|
||||
|
||||
@@ -285,7 +305,13 @@ class AudioValueStream(ValueStream):
|
||||
return max(0.0, min(1.0, mapped))
|
||||
|
||||
def _extract_raw(self, analysis) -> float:
|
||||
"""Extract raw scalar from audio analysis based on mode."""
|
||||
"""Extract raw scalar from audio analysis based on mode.
|
||||
|
||||
Applies the audio filter pipeline (if any) before extracting the scalar.
|
||||
Channel extraction, band filtering, gain, etc. are handled by filters.
|
||||
"""
|
||||
if self._filter_pipeline is not None:
|
||||
analysis = self._filter_pipeline.process(analysis)
|
||||
if self._mode == "peak":
|
||||
return self._pick_peak(analysis)
|
||||
if self._mode == "beat":
|
||||
@@ -294,13 +320,9 @@ class AudioValueStream(ValueStream):
|
||||
return self._pick_rms(analysis)
|
||||
|
||||
def _pick_rms(self, analysis) -> float:
|
||||
# ⚠️ Temporary breakage: channel selection removed — resolved in Phase 4
|
||||
# Channel selection is now handled by audio processing filters
|
||||
# (channel_extract) applied via ProcessedAudioSource chains.
|
||||
return getattr(analysis, "rms", 0.0)
|
||||
|
||||
def _pick_peak(self, analysis) -> float:
|
||||
# ⚠️ Temporary breakage: channel selection removed — resolved in Phase 4
|
||||
return getattr(analysis, "peak", 0.0)
|
||||
|
||||
def _compute_beat(self, analysis) -> float:
|
||||
@@ -1411,6 +1433,7 @@ class ValueStreamManager:
|
||||
css_stream_manager: Optional["ColorStripStreamManager"] = None,
|
||||
gradient_store: Optional[Any] = None,
|
||||
event_bus: Optional["GameEventBus"] = None,
|
||||
audio_processing_template_store=None,
|
||||
):
|
||||
self._value_source_store = value_source_store
|
||||
self._audio_capture_manager = audio_capture_manager
|
||||
@@ -1421,6 +1444,7 @@ class ValueStreamManager:
|
||||
self._css_stream_manager = css_stream_manager
|
||||
self._gradient_store = gradient_store
|
||||
self._event_bus = event_bus
|
||||
self._audio_processing_template_store = audio_processing_template_store
|
||||
self._streams: Dict[str, ValueStream] = {} # vs_id → stream
|
||||
self._ref_counts: Dict[str, int] = {} # vs_id → ref count
|
||||
|
||||
@@ -1473,6 +1497,25 @@ class ValueStreamManager:
|
||||
stream.update_source(source)
|
||||
logger.debug(f"Updated value stream {vs_id}")
|
||||
|
||||
def refresh_audio_filter_pipelines(self, template_id: str) -> None:
|
||||
"""Rebuild audio filter pipelines for any running AudioValueStream
|
||||
that references the given audio processing template ID.
|
||||
|
||||
Called when an audio processing template is updated or deleted.
|
||||
"""
|
||||
count = 0
|
||||
for stream in self._streams.values():
|
||||
if isinstance(stream, AudioValueStream):
|
||||
if template_id in getattr(stream, "_audio_processing_template_ids", []):
|
||||
stream._rebuild_filter_pipeline()
|
||||
count += 1
|
||||
if count:
|
||||
logger.info(
|
||||
"Refreshed audio filter pipeline for %d value stream(s) after template %s update",
|
||||
count,
|
||||
template_id,
|
||||
)
|
||||
|
||||
def release_all(self) -> None:
|
||||
"""Stop and remove all managed streams. Called on shutdown."""
|
||||
for vs_id, stream in self._streams.items():
|
||||
@@ -1525,6 +1568,7 @@ class ValueStreamManager:
|
||||
audio_capture_manager=self._audio_capture_manager,
|
||||
audio_source_store=self._audio_source_store,
|
||||
audio_template_store=self._audio_template_store,
|
||||
audio_processing_template_store=self._audio_processing_template_store,
|
||||
)
|
||||
|
||||
if isinstance(source, DaylightValueSource):
|
||||
|
||||
@@ -128,6 +128,7 @@ processor_manager = ProcessorManager(
|
||||
asset_store=asset_store,
|
||||
ha_manager=ha_manager,
|
||||
game_event_bus=game_event_bus,
|
||||
audio_processing_template_store=audio_processing_template_store,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user