Add audio sources as first-class entities, add mapped CSS type, simplify target editor for mapped sources

- Audio sources moved to separate tab with dedicated CRUD API, store, and editor modal
- New "mapped" color strip source type: assigns different CSS sources to distinct LED sub-ranges (zones)
- Mapped stream runtime with per-zone sub-streams, auto-sizing, hot-update support
- Target editor auto-collapses segments UI when mapped CSS is selected
- Delete protection for CSS sources referenced by mapped zones
- Compact header/footer layout

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-23 23:35:58 +03:00
parent 199039326b
commit 9efb08acb6
28 changed files with 1729 additions and 153 deletions

View File

@@ -0,0 +1,113 @@
"""Audio source data model with inheritance-based source types.
An AudioSource represents a reusable audio input configuration:
MultichannelAudioSource — wraps a physical audio device (index + loopback flag)
MonoAudioSource — extracts a single channel from a multichannel source
"""
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class AudioSource:
"""Base class for audio source configurations."""
id: str
name: str
source_type: str # "multichannel" | "mono"
created_at: datetime
updated_at: datetime
description: Optional[str] = None
def to_dict(self) -> dict:
"""Convert source to dictionary. Subclasses extend this."""
return {
"id": self.id,
"name": self.name,
"source_type": self.source_type,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"description": self.description,
# Subclass fields default to None for forward compat
"device_index": None,
"is_loopback": None,
"audio_source_id": None,
"channel": None,
}
@staticmethod
def from_dict(data: dict) -> "AudioSource":
"""Factory: dispatch to the correct subclass based on source_type."""
source_type: str = data.get("source_type", "multichannel") or "multichannel"
sid: str = data["id"]
name: str = data["name"]
description: str | None = data.get("description")
raw_created = data.get("created_at")
created_at: datetime = (
datetime.fromisoformat(raw_created)
if isinstance(raw_created, str)
else raw_created if isinstance(raw_created, datetime)
else datetime.utcnow()
)
raw_updated = data.get("updated_at")
updated_at: datetime = (
datetime.fromisoformat(raw_updated)
if isinstance(raw_updated, str)
else raw_updated if isinstance(raw_updated, datetime)
else datetime.utcnow()
)
if source_type == "mono":
return MonoAudioSource(
id=sid, name=name, source_type="mono",
created_at=created_at, updated_at=updated_at, description=description,
audio_source_id=data.get("audio_source_id") or "",
channel=data.get("channel") or "mono",
)
# Default: multichannel
return MultichannelAudioSource(
id=sid, name=name, source_type="multichannel",
created_at=created_at, updated_at=updated_at, description=description,
device_index=int(data.get("device_index", -1)),
is_loopback=bool(data.get("is_loopback", True)),
)
@dataclass
class MultichannelAudioSource(AudioSource):
"""Audio source wrapping a physical audio device.
Captures all channels from the device. For WASAPI loopback devices
(system audio output), set is_loopback=True.
"""
device_index: int = -1 # -1 = default device
is_loopback: bool = True # True = WASAPI loopback (system audio)
def to_dict(self) -> dict:
d = super().to_dict()
d["device_index"] = self.device_index
d["is_loopback"] = self.is_loopback
return d
@dataclass
class MonoAudioSource(AudioSource):
"""Audio source that extracts a single channel from a multichannel source.
References a MultichannelAudioSource and selects which channel to use:
mono (L+R mix), left, or right.
"""
audio_source_id: str = "" # references a MultichannelAudioSource
channel: str = "mono" # mono | left | right
def to_dict(self) -> dict:
d = super().to_dict()
d["audio_source_id"] = self.audio_source_id
d["channel"] = self.channel
return d

View File

@@ -0,0 +1,324 @@
"""Audio source storage using JSON files."""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from wled_controller.storage.audio_source import (
AudioSource,
MonoAudioSource,
MultichannelAudioSource,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class AudioSourceStore:
"""Persistent storage for audio sources."""
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self._sources: Dict[str, AudioSource] = {}
self._load()
def _load(self) -> None:
if not self.file_path.exists():
logger.info("Audio source store file not found — starting empty")
return
try:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json.load(f)
sources_data = data.get("audio_sources", {})
loaded = 0
for source_id, source_dict in sources_data.items():
try:
source = AudioSource.from_dict(source_dict)
self._sources[source_id] = source
loaded += 1
except Exception as e:
logger.error(
f"Failed to load audio source {source_id}: {e}",
exc_info=True,
)
if loaded > 0:
logger.info(f"Loaded {loaded} audio sources from storage")
except Exception as e:
logger.error(f"Failed to load audio sources from {self.file_path}: {e}")
raise
logger.info(f"Audio source store initialized with {len(self._sources)} sources")
def _save(self) -> None:
try:
self.file_path.parent.mkdir(parents=True, exist_ok=True)
sources_dict = {
sid: source.to_dict()
for sid, source in self._sources.items()
}
data = {
"version": "1.0.0",
"audio_sources": sources_dict,
}
with open(self.file_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Failed to save audio sources to {self.file_path}: {e}")
raise
# ── CRUD ─────────────────────────────────────────────────────────
def get_all_sources(self) -> List[AudioSource]:
return list(self._sources.values())
def get_mono_sources(self) -> List[MonoAudioSource]:
"""Return only mono audio sources (for CSS dropdown)."""
return [s for s in self._sources.values() if isinstance(s, MonoAudioSource)]
def get_source(self, source_id: str) -> AudioSource:
if source_id not in self._sources:
raise ValueError(f"Audio source not found: {source_id}")
return self._sources[source_id]
def create_source(
self,
name: str,
source_type: str,
device_index: Optional[int] = None,
is_loopback: Optional[bool] = None,
audio_source_id: Optional[str] = None,
channel: Optional[str] = None,
description: Optional[str] = None,
) -> AudioSource:
if not name or not name.strip():
raise ValueError("Name is required")
if source_type not in ("multichannel", "mono"):
raise ValueError(f"Invalid source type: {source_type}")
for source in self._sources.values():
if source.name == name:
raise ValueError(f"Audio source with name '{name}' already exists")
sid = f"as_{uuid.uuid4().hex[:8]}"
now = datetime.utcnow()
if source_type == "mono":
if not audio_source_id:
raise ValueError("Mono sources require audio_source_id")
# Validate parent exists and is multichannel
parent = self._sources.get(audio_source_id)
if not parent:
raise ValueError(f"Parent audio source not found: {audio_source_id}")
if not isinstance(parent, MultichannelAudioSource):
raise ValueError("Mono sources must reference a multichannel source")
source = MonoAudioSource(
id=sid, name=name, source_type="mono",
created_at=now, updated_at=now, description=description,
audio_source_id=audio_source_id,
channel=channel or "mono",
)
else:
source = MultichannelAudioSource(
id=sid, name=name, source_type="multichannel",
created_at=now, updated_at=now, description=description,
device_index=device_index if device_index is not None else -1,
is_loopback=bool(is_loopback) if is_loopback is not None else True,
)
self._sources[sid] = source
self._save()
logger.info(f"Created audio source: {name} ({sid}, type={source_type})")
return source
def update_source(
self,
source_id: str,
name: Optional[str] = None,
device_index: Optional[int] = None,
is_loopback: Optional[bool] = None,
audio_source_id: Optional[str] = None,
channel: Optional[str] = None,
description: Optional[str] = None,
) -> AudioSource:
if source_id not in self._sources:
raise ValueError(f"Audio source not found: {source_id}")
source = self._sources[source_id]
if name is not None:
for other in self._sources.values():
if other.id != source_id and other.name == name:
raise ValueError(f"Audio source with name '{name}' already exists")
source.name = name
if description is not None:
source.description = description
if isinstance(source, MultichannelAudioSource):
if device_index is not None:
source.device_index = device_index
if is_loopback is not None:
source.is_loopback = bool(is_loopback)
elif isinstance(source, MonoAudioSource):
if audio_source_id is not None:
parent = self._sources.get(audio_source_id)
if not parent:
raise ValueError(f"Parent audio source not found: {audio_source_id}")
if not isinstance(parent, MultichannelAudioSource):
raise ValueError("Mono sources must reference a multichannel source")
source.audio_source_id = audio_source_id
if channel is not None:
source.channel = channel
source.updated_at = datetime.utcnow()
self._save()
logger.info(f"Updated audio source: {source_id}")
return source
def delete_source(self, source_id: str) -> None:
if source_id not in self._sources:
raise ValueError(f"Audio source not found: {source_id}")
source = self._sources[source_id]
# Prevent deleting multichannel sources referenced by mono sources
if isinstance(source, MultichannelAudioSource):
for other in self._sources.values():
if isinstance(other, MonoAudioSource) and other.audio_source_id == source_id:
raise ValueError(
f"Cannot delete '{source.name}': referenced by mono source '{other.name}'"
)
del self._sources[source_id]
self._save()
logger.info(f"Deleted audio source: {source_id}")
# ── Resolution ───────────────────────────────────────────────────
def resolve_mono_source(self, mono_id: str) -> Tuple[int, bool, str]:
"""Resolve a mono audio source to (device_index, is_loopback, channel).
Follows the reference chain: mono → multichannel.
Raises:
ValueError: If source not found or chain is broken
"""
mono = self.get_source(mono_id)
if not isinstance(mono, MonoAudioSource):
raise ValueError(f"Audio source {mono_id} is not a mono source")
parent = self.get_source(mono.audio_source_id)
if not isinstance(parent, MultichannelAudioSource):
raise ValueError(
f"Mono source {mono_id} references non-multichannel source {mono.audio_source_id}"
)
return parent.device_index, parent.is_loopback, mono.channel
# ── Migration ────────────────────────────────────────────────────
def migrate_from_css(self, color_strip_store) -> None:
"""One-time migration: extract audio config from existing CSS entities.
For each AudioColorStripSource that has old-style embedded audio fields
(audio_device_index, audio_loopback, audio_channel) but no audio_source_id:
1. Create a MultichannelAudioSource if one with matching config doesn't exist
2. Create a MonoAudioSource referencing it
3. Set audio_source_id on the CSS entity
4. Save both stores
"""
from wled_controller.storage.color_strip_source import AudioColorStripSource
migrated = 0
multichannel_cache: Dict[Tuple[int, bool], str] = {} # (dev, loopback) → id
# Index existing multichannel sources for dedup
for source in self._sources.values():
if isinstance(source, MultichannelAudioSource):
key = (source.device_index, source.is_loopback)
multichannel_cache[key] = source.id
for css in color_strip_store.get_all_sources():
if not isinstance(css, AudioColorStripSource):
continue
# Skip if already migrated
if getattr(css, "audio_source_id", None):
continue
# Skip if no old fields present
if not hasattr(css, "audio_device_index"):
continue
dev_idx = getattr(css, "audio_device_index", -1)
loopback = bool(getattr(css, "audio_loopback", True))
channel = getattr(css, "audio_channel", "mono") or "mono"
# Find or create multichannel source
mc_key = (dev_idx, loopback)
if mc_key in multichannel_cache:
mc_id = multichannel_cache[mc_key]
else:
device_label = "Loopback" if loopback else "Input"
mc_name = f"Audio Device {dev_idx} ({device_label})"
# Ensure unique name
suffix = 2
base_name = mc_name
while any(s.name == mc_name for s in self._sources.values()):
mc_name = f"{base_name} #{suffix}"
suffix += 1
mc_id = f"as_{uuid.uuid4().hex[:8]}"
now = datetime.utcnow()
mc_source = MultichannelAudioSource(
id=mc_id, name=mc_name, source_type="multichannel",
created_at=now, updated_at=now,
device_index=dev_idx, is_loopback=loopback,
)
self._sources[mc_id] = mc_source
multichannel_cache[mc_key] = mc_id
logger.info(f"Migration: created multichannel source '{mc_name}' ({mc_id})")
# Create mono source
channel_label = {"mono": "Mono", "left": "Left", "right": "Right"}.get(channel, channel)
mono_name = f"{css.name} - {channel_label}"
# Ensure unique name
suffix = 2
base_name = mono_name
while any(s.name == mono_name for s in self._sources.values()):
mono_name = f"{base_name} #{suffix}"
suffix += 1
mono_id = f"as_{uuid.uuid4().hex[:8]}"
now = datetime.utcnow()
mono_source = MonoAudioSource(
id=mono_id, name=mono_name, source_type="mono",
created_at=now, updated_at=now,
audio_source_id=mc_id, channel=channel,
)
self._sources[mono_id] = mono_source
logger.info(f"Migration: created mono source '{mono_name}' ({mono_id})")
# Update CSS entity
css.audio_source_id = mono_id
migrated += 1
if migrated > 0:
self._save()
color_strip_store._save()
logger.info(f"Migration complete: migrated {migrated} audio CSS entities")
else:
logger.debug("No audio CSS entities needed migration")

View File

@@ -73,12 +73,11 @@ class ColorStripSource:
"scale": None,
"mirror": None,
"layers": None,
"zones": None,
"visualization_mode": None,
"audio_device_index": None,
"audio_loopback": None,
"audio_source_id": None,
"sensitivity": None,
"color_peak": None,
"audio_channel": None,
}
@staticmethod
@@ -155,6 +154,14 @@ class ColorStripSource:
led_count=data.get("led_count") or 0,
)
if source_type == "mapped":
return MappedColorStripSource(
id=sid, name=name, source_type="mapped",
created_at=created_at, updated_at=updated_at, description=description,
zones=data.get("zones") or [],
led_count=data.get("led_count") or 0,
)
if source_type == "audio":
raw_color = data.get("color")
color = raw_color if isinstance(raw_color, list) and len(raw_color) == 3 else [0, 255, 0]
@@ -164,9 +171,7 @@ class ColorStripSource:
id=sid, name=name, source_type="audio",
created_at=created_at, updated_at=updated_at, description=description,
visualization_mode=data.get("visualization_mode") or "spectrum",
audio_device_index=int(data.get("audio_device_index", -1)),
audio_loopback=bool(data.get("audio_loopback", True)),
audio_channel=data.get("audio_channel") or "mono",
audio_source_id=data.get("audio_source_id") or "",
sensitivity=float(data.get("sensitivity") or 1.0),
smoothing=float(data.get("smoothing") or 0.3),
palette=data.get("palette") or "rainbow",
@@ -366,9 +371,7 @@ class AudioColorStripSource(ColorStripSource):
"""
visualization_mode: str = "spectrum" # spectrum | beat_pulse | vu_meter
audio_device_index: int = -1 # -1 = default input device
audio_loopback: bool = True # True = WASAPI loopback (system audio)
audio_channel: str = "mono" # mono | left | right
audio_source_id: str = "" # references a MonoAudioSource
sensitivity: float = 1.0 # gain multiplier (0.15.0)
smoothing: float = 0.3 # temporal smoothing (0.01.0)
palette: str = "rainbow" # named color palette
@@ -380,9 +383,7 @@ class AudioColorStripSource(ColorStripSource):
def to_dict(self) -> dict:
d = super().to_dict()
d["visualization_mode"] = self.visualization_mode
d["audio_device_index"] = self.audio_device_index
d["audio_loopback"] = self.audio_loopback
d["audio_channel"] = self.audio_channel
d["audio_source_id"] = self.audio_source_id
d["sensitivity"] = self.sensitivity
d["smoothing"] = self.smoothing
d["palette"] = self.palette
@@ -411,3 +412,24 @@ class CompositeColorStripSource(ColorStripSource):
d["layers"] = [dict(layer) for layer in self.layers]
d["led_count"] = self.led_count
return d
@dataclass
class MappedColorStripSource(ColorStripSource):
"""Color strip source that maps different sources to different LED ranges.
Each zone assigns a sub-range of LEDs to a different color strip source.
Zones are placed side-by-side (spatial multiplexing) rather than blended.
Gaps between zones stay black. LED count auto-sizes from the connected
device when led_count == 0.
"""
# Each zone: {"source_id": str, "start": int, "end": int, "reverse": bool}
zones: list = field(default_factory=list)
led_count: int = 0 # 0 = use device LED count
def to_dict(self) -> dict:
d = super().to_dict()
d["zones"] = [dict(z) for z in self.zones]
d["led_count"] = self.led_count
return d

View File

@@ -14,6 +14,7 @@ from wled_controller.storage.color_strip_source import (
CompositeColorStripSource,
EffectColorStripSource,
GradientColorStripSource,
MappedColorStripSource,
PictureColorStripSource,
StaticColorStripSource,
)
@@ -119,10 +120,9 @@ class ColorStripStore:
scale: float = 1.0,
mirror: bool = False,
layers: Optional[list] = None,
zones: Optional[list] = None,
visualization_mode: str = "spectrum",
audio_device_index: int = -1,
audio_loopback: bool = True,
audio_channel: str = "mono",
audio_source_id: str = "",
sensitivity: float = 1.0,
color_peak: Optional[list] = None,
) -> ColorStripSource:
@@ -214,9 +214,7 @@ class ColorStripStore:
updated_at=now,
description=description,
visualization_mode=visualization_mode or "spectrum",
audio_device_index=audio_device_index if audio_device_index is not None else -1,
audio_loopback=bool(audio_loopback),
audio_channel=audio_channel or "mono",
audio_source_id=audio_source_id or "",
sensitivity=float(sensitivity) if sensitivity else 1.0,
smoothing=float(smoothing) if smoothing else 0.3,
palette=palette or "rainbow",
@@ -236,6 +234,17 @@ class ColorStripStore:
layers=layers if isinstance(layers, list) else [],
led_count=led_count,
)
elif source_type == "mapped":
source = MappedColorStripSource(
id=source_id,
name=name,
source_type="mapped",
created_at=now,
updated_at=now,
description=description,
zones=zones if isinstance(zones, list) else [],
led_count=led_count,
)
else:
if calibration is None:
calibration = CalibrationConfig(layout="clockwise", start_position="bottom_left")
@@ -291,10 +300,9 @@ class ColorStripStore:
scale: Optional[float] = None,
mirror: Optional[bool] = None,
layers: Optional[list] = None,
zones: Optional[list] = None,
visualization_mode: Optional[str] = None,
audio_device_index: Optional[int] = None,
audio_loopback: Optional[bool] = None,
audio_channel: Optional[str] = None,
audio_source_id: Optional[str] = None,
sensitivity: Optional[float] = None,
color_peak: Optional[list] = None,
) -> ColorStripSource:
@@ -380,12 +388,8 @@ class ColorStripStore:
elif isinstance(source, AudioColorStripSource):
if visualization_mode is not None:
source.visualization_mode = visualization_mode
if audio_device_index is not None:
source.audio_device_index = audio_device_index
if audio_loopback is not None:
source.audio_loopback = bool(audio_loopback)
if audio_channel is not None:
source.audio_channel = audio_channel
if audio_source_id is not None:
source.audio_source_id = audio_source_id
if sensitivity is not None:
source.sensitivity = float(sensitivity)
if smoothing is not None:
@@ -405,6 +409,11 @@ class ColorStripStore:
source.layers = layers
if led_count is not None:
source.led_count = led_count
elif isinstance(source, MappedColorStripSource):
if zones is not None and isinstance(zones, list):
source.zones = zones
if led_count is not None:
source.led_count = led_count
source.updated_at = datetime.utcnow()
self._save()
@@ -436,3 +445,14 @@ class ColorStripStore:
names.append(source.name)
break
return names
def get_mapped_referencing(self, source_id: str) -> List[str]:
"""Return names of mapped sources that reference a given source as a zone."""
names = []
for source in self._sources.values():
if isinstance(source, MappedColorStripSource):
for zone in source.zones:
if zone.get("source_id") == source_id:
names.append(source.name)
break
return names