Add demo mode: virtual hardware sandbox for testing without real devices

Demo mode provides a complete sandbox environment with:
- Virtual capture engine (radial rainbow test pattern on 3 displays)
- Virtual audio engine (synthetic music-like audio on 2 devices)
- Virtual LED device provider (strip/60, matrix/256, ring/24 LEDs)
- Isolated data directory (data/demo/) with auto-seeded sample entities
- Dedicated config (config/demo_config.yaml) with pre-configured API key
- Frontend indicator (DEMO badge + dismissible banner)
- Engine filtering (only demo engines visible in demo mode)
- Separate entry point: python -m wled_controller.demo (port 8081)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-20 16:17:14 +03:00
parent 81b275979b
commit 2240471b67
36 changed files with 1548 additions and 282 deletions
@@ -15,10 +15,12 @@ from wled_controller.core.audio.analysis import (
)
from wled_controller.core.audio.wasapi_engine import WasapiEngine, WasapiCaptureStream
from wled_controller.core.audio.sounddevice_engine import SounddeviceEngine, SounddeviceCaptureStream
from wled_controller.core.audio.demo_engine import DemoAudioEngine, DemoAudioCaptureStream
# Auto-register available engines
AudioEngineRegistry.register(WasapiEngine)
AudioEngineRegistry.register(SounddeviceEngine)
AudioEngineRegistry.register(DemoAudioEngine)
__all__ = [
"AudioCaptureEngine",
@@ -34,4 +36,6 @@ __all__ = [
"WasapiCaptureStream",
"SounddeviceEngine",
"SounddeviceCaptureStream",
"DemoAudioEngine",
"DemoAudioCaptureStream",
]
@@ -0,0 +1,153 @@
"""Demo audio engine — virtual audio devices with synthetic audio data."""
import time
from typing import Any, Dict, List, Optional
import numpy as np
from wled_controller.config import is_demo_mode
from wled_controller.core.audio.base import (
AudioCaptureEngine,
AudioCaptureStreamBase,
AudioDeviceInfo,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
# Virtual audio device definitions: (name, is_loopback, channels, samplerate)
_VIRTUAL_DEVICES = [
("Demo Microphone", False, 2, 44100.0),
("Demo System Audio", True, 2, 44100.0),
]
class DemoAudioCaptureStream(AudioCaptureStreamBase):
"""Demo audio capture stream that produces synthetic music-like audio.
Generates a mix of sine waves with slowly varying frequencies to
simulate beat-like patterns suitable for audio-reactive visualizations.
"""
def __init__(self, device_index: int, is_loopback: bool, config: Dict[str, Any]):
super().__init__(device_index, is_loopback, config)
self._channels = 2
self._sample_rate = 44100
self._chunk_size = 1024
self._phase = 0.0 # Accumulated phase in samples for continuity
@property
def channels(self) -> int:
return self._channels
@property
def sample_rate(self) -> int:
return self._sample_rate
@property
def chunk_size(self) -> int:
return self._chunk_size
def initialize(self) -> None:
if self._initialized:
return
self._phase = 0.0
self._initialized = True
logger.info(
f"Demo audio stream initialized "
f"(device={self.device_index}, loopback={self.is_loopback})"
)
def cleanup(self) -> None:
self._initialized = False
logger.info(f"Demo audio stream cleaned up (device={self.device_index})")
def read_chunk(self) -> Optional[np.ndarray]:
if not self._initialized:
return None
t_now = time.time()
n = self._chunk_size
sr = self._sample_rate
# Sample indices for this chunk (continuous across calls)
t = (self._phase + np.arange(n, dtype=np.float64)) / sr
self._phase += n
# --- Synthetic "music" signal ---
# Bass drum: ~80 Hz with slow amplitude envelope (~2 Hz beat)
bass_freq = 80.0
beat_rate = 2.0 # beats per second
bass_env = np.maximum(0.0, np.sin(2.0 * np.pi * beat_rate * t)) ** 4
bass = 0.5 * bass_env * np.sin(2.0 * np.pi * bass_freq * t)
# Mid-range tone: slowly sweeping between 300-600 Hz
mid_freq = 450.0 + 150.0 * np.sin(2.0 * np.pi * 0.1 * t_now)
mid = 0.25 * np.sin(2.0 * np.pi * mid_freq * t)
# High shimmer: ~3 kHz with faster modulation
hi_freq = 3000.0 + 500.0 * np.sin(2.0 * np.pi * 0.3 * t_now)
hi_env = 0.5 + 0.5 * np.sin(2.0 * np.pi * 4.0 * t)
hi = 0.1 * hi_env * np.sin(2.0 * np.pi * hi_freq * t)
# Mix mono signal
mono = (bass + mid + hi).astype(np.float32)
# Interleave stereo (identical L/R)
stereo = np.empty(n * self._channels, dtype=np.float32)
stereo[0::2] = mono
stereo[1::2] = mono
return stereo
class DemoAudioEngine(AudioCaptureEngine):
"""Virtual audio engine for demo mode.
Provides virtual audio devices and produces synthetic audio data
so the full audio-reactive pipeline works without real audio hardware.
"""
ENGINE_TYPE = "demo"
ENGINE_PRIORITY = 1000 # Highest priority in demo mode
@classmethod
def is_available(cls) -> bool:
return is_demo_mode()
@classmethod
def get_default_config(cls) -> Dict[str, Any]:
return {
"sample_rate": 44100,
"chunk_size": 1024,
}
@classmethod
def enumerate_devices(cls) -> List[AudioDeviceInfo]:
devices = []
for idx, (name, is_loopback, channels, samplerate) in enumerate(_VIRTUAL_DEVICES):
devices.append(AudioDeviceInfo(
index=idx,
name=name,
is_input=True,
is_loopback=is_loopback,
channels=channels,
default_samplerate=samplerate,
))
logger.debug(f"Demo audio engine: {len(devices)} virtual device(s)")
return devices
@classmethod
def create_stream(
cls,
device_index: int,
is_loopback: bool,
config: Dict[str, Any],
) -> DemoAudioCaptureStream:
if device_index < 0 or device_index >= len(_VIRTUAL_DEVICES):
raise ValueError(
f"Invalid demo audio device index {device_index}. "
f"Available: 0-{len(_VIRTUAL_DEVICES) - 1}"
)
merged = {**cls.get_default_config(), **config}
return DemoAudioCaptureStream(device_index, is_loopback, merged)
@@ -3,6 +3,7 @@
from typing import Any, Dict, List, Optional, Type
from wled_controller.core.audio.base import AudioCaptureEngine, AudioCaptureStreamBase
from wled_controller.config import is_demo_mode
from wled_controller.utils import get_logger
logger = get_logger(__name__)
@@ -67,9 +68,13 @@ class AudioEngineRegistry:
Returns:
List of engine type identifiers that are available
"""
demo = is_demo_mode()
available = []
for engine_type, engine_class in cls._engines.items():
try:
# In demo mode, only demo engines are available
if demo and engine_type != "demo":
continue
if engine_class.is_available():
available.append(engine_type)
except Exception as e:
@@ -85,10 +90,13 @@ class AudioEngineRegistry:
Returns:
Engine type string, or None if no engines are available.
"""
demo = is_demo_mode()
best_type = None
best_priority = -1
for engine_type, engine_class in cls._engines.items():
try:
if demo and engine_type != "demo":
continue
if engine_class.is_available() and engine_class.ENGINE_PRIORITY > best_priority:
best_priority = engine_class.ENGINE_PRIORITY
best_type = engine_type
@@ -102,9 +110,13 @@ class AudioEngineRegistry:
def get_all_engines(cls) -> Dict[str, Type[AudioCaptureEngine]]:
"""Get all registered engines (available or not).
In demo mode, only demo engines are returned.
Returns:
Dictionary mapping engine type to engine class
"""
if is_demo_mode():
return {k: v for k, v in cls._engines.items() if k == "demo"}
return cls._engines.copy()
@classmethod
@@ -13,6 +13,7 @@ from wled_controller.core.capture_engines.bettercam_engine import BetterCamEngin
from wled_controller.core.capture_engines.wgc_engine import WGCEngine, WGCCaptureStream
from wled_controller.core.capture_engines.scrcpy_engine import ScrcpyEngine, ScrcpyCaptureStream
from wled_controller.core.capture_engines.camera_engine import CameraEngine, CameraCaptureStream
from wled_controller.core.capture_engines.demo_engine import DemoCaptureEngine, DemoCaptureStream
# Auto-register available engines
EngineRegistry.register(MSSEngine)
@@ -21,6 +22,7 @@ EngineRegistry.register(BetterCamEngine)
EngineRegistry.register(WGCEngine)
EngineRegistry.register(ScrcpyEngine)
EngineRegistry.register(CameraEngine)
EngineRegistry.register(DemoCaptureEngine)
__all__ = [
"CaptureEngine",
@@ -40,4 +42,6 @@ __all__ = [
"ScrcpyCaptureStream",
"CameraEngine",
"CameraCaptureStream",
"DemoCaptureEngine",
"DemoCaptureStream",
]
@@ -0,0 +1,171 @@
"""Demo capture engine — virtual displays with animated test patterns."""
import time
from typing import Any, Dict, List, Optional
import numpy as np
from wled_controller.config import is_demo_mode
from wled_controller.core.capture_engines.base import (
CaptureEngine,
CaptureStream,
DisplayInfo,
ScreenCapture,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
# Virtual display definitions: (name, width, height, x, y, is_primary)
_VIRTUAL_DISPLAYS = [
("Demo Display 1080p", 1920, 1080, 0, 360, True),
("Demo Ultrawide", 3440, 1440, 1920, 0, False),
("Demo Portrait", 1080, 1920, 5360, 0, False),
]
class DemoCaptureStream(CaptureStream):
"""Demo capture stream producing a radial rainbow centred on the screen.
The rainbow rotates slowly over time — hue is mapped to the angle from
the screen centre, and brightness fades toward the edges.
"""
_RENDER_SCALE = 4 # render at 1/4 resolution, then upscale
def __init__(self, display_index: int, config: Dict[str, Any]):
super().__init__(display_index, config)
self._width: int = config.get("width", 1920)
self._height: int = config.get("height", 1080)
# Pre-compute at render resolution
rw = max(1, self._width // self._RENDER_SCALE)
rh = max(1, self._height // self._RENDER_SCALE)
self._rw = rw
self._rh = rh
# Coordinate grids centred at (0, 0), aspect-corrected so the
# gradient is circular even on non-square displays
aspect = self._width / max(self._height, 1)
x = np.linspace(-aspect, aspect, rw, dtype=np.float32)
y = np.linspace(-1.0, 1.0, rh, dtype=np.float32)
self._yy, self._xx = np.meshgrid(y, x, indexing="ij")
# Pre-compute angle (atan2) and radius — they don't change per frame
self._angle = np.arctan2(self._yy, self._xx) # -pi..pi
self._radius = np.sqrt(self._xx ** 2 + self._yy ** 2)
def initialize(self) -> None:
self._initialized = True
logger.info(
f"Demo capture stream initialized "
f"(display={self.display_index}, {self._width}x{self._height})"
)
def cleanup(self) -> None:
self._initialized = False
logger.info(f"Demo capture stream cleaned up (display={self.display_index})")
def capture_frame(self) -> Optional[ScreenCapture]:
if not self._initialized:
self.initialize()
t = time.time() % 1e6
# Hue = angle from centre, rotating over time
rotation = t * 0.15 # radians per second
hue = ((self._angle + rotation) / (2.0 * np.pi)) % 1.0
# Saturation: full
sat = 1.0
# Value: bright at centre, fading toward edges
max_r = float(self._radius.max()) or 1.0
val = np.clip(1.0 - 0.6 * (self._radius / max_r), 0.0, 1.0)
# Vectorised HSV → RGB (S=1 simplification)
h6 = hue * 6.0
sector = h6.astype(np.int32) % 6
frac = h6 - np.floor(h6)
q = val * (1.0 - frac)
t_ch = val * frac # "t" channel in HSV conversion
r = np.where(sector == 0, val,
np.where(sector == 1, q,
np.where(sector == 2, 0,
np.where(sector == 3, 0,
np.where(sector == 4, t_ch, val)))))
g = np.where(sector == 0, t_ch,
np.where(sector == 1, val,
np.where(sector == 2, val,
np.where(sector == 3, q,
np.where(sector == 4, 0, 0)))))
b = np.where(sector == 0, 0,
np.where(sector == 1, 0,
np.where(sector == 2, t_ch,
np.where(sector == 3, val,
np.where(sector == 4, val, q)))))
small_u8 = (np.stack([r, g, b], axis=-1) * 255.0).astype(np.uint8)
# Upscale to full resolution
if self._RENDER_SCALE > 1:
image = np.repeat(
np.repeat(small_u8, self._RENDER_SCALE, axis=0),
self._RENDER_SCALE, axis=1,
)[: self._height, : self._width]
else:
image = small_u8
return ScreenCapture(
image=image,
width=self._width,
height=self._height,
display_index=self.display_index,
)
class DemoCaptureEngine(CaptureEngine):
"""Virtual capture engine for demo mode.
Provides virtual displays and produces animated test-pattern frames
so the full capture pipeline works without real monitors.
"""
ENGINE_TYPE = "demo"
ENGINE_PRIORITY = 1000 # Highest priority in demo mode
@classmethod
def is_available(cls) -> bool:
return is_demo_mode()
@classmethod
def get_default_config(cls) -> Dict[str, Any]:
return {}
@classmethod
def get_available_displays(cls) -> List[DisplayInfo]:
displays = []
for idx, (name, width, height, x, y, primary) in enumerate(_VIRTUAL_DISPLAYS):
displays.append(DisplayInfo(
index=idx,
name=name,
width=width,
height=height,
x=x,
y=y,
is_primary=primary,
refresh_rate=60,
))
logger.debug(f"Demo engine: {len(displays)} virtual display(s)")
return displays
@classmethod
def create_stream(
cls, display_index: int, config: Dict[str, Any],
) -> DemoCaptureStream:
if display_index < 0 or display_index >= len(_VIRTUAL_DISPLAYS):
raise ValueError(
f"Invalid demo display index {display_index}. "
f"Available: 0-{len(_VIRTUAL_DISPLAYS) - 1}"
)
name, width, height, *_ = _VIRTUAL_DISPLAYS[display_index]
stream_config = {**config, "width": width, "height": height}
return DemoCaptureStream(display_index, stream_config)
@@ -3,6 +3,7 @@
from typing import Any, Dict, List, Optional, Type
from wled_controller.core.capture_engines.base import CaptureEngine, CaptureStream
from wled_controller.config import is_demo_mode
from wled_controller.utils import get_logger
logger = get_logger(__name__)
@@ -67,9 +68,13 @@ class EngineRegistry:
Returns:
List of engine type identifiers that are available
"""
demo = is_demo_mode()
available = []
for engine_type, engine_class in cls._engines.items():
try:
# In demo mode, only demo engines are available
if demo and engine_type != "demo":
continue
if engine_class.is_available():
available.append(engine_type)
except Exception as e:
@@ -86,10 +91,13 @@ class EngineRegistry:
Returns:
Engine type string, or None if no engines are available.
"""
demo = is_demo_mode()
best_type = None
best_priority = -1
for engine_type, engine_class in cls._engines.items():
try:
if demo and engine_type != "demo":
continue
if engine_class.is_available() and engine_class.ENGINE_PRIORITY > best_priority:
best_priority = engine_class.ENGINE_PRIORITY
best_type = engine_type
@@ -103,9 +111,13 @@ class EngineRegistry:
def get_all_engines(cls) -> Dict[str, Type[CaptureEngine]]:
"""Get all registered engines (available or not).
In demo mode, only demo engines are returned.
Returns:
Dictionary mapping engine type to engine class
"""
if is_demo_mode():
return {k: v for k, v in cls._engines.items() if k == "demo"}
return cls._engines.copy()
@classmethod
@@ -0,0 +1,412 @@
"""Seed data generator for demo mode.
Populates the demo data directory with sample entities on first run,
giving new users a realistic out-of-the-box experience without needing
real hardware.
"""
import json
from datetime import datetime, timezone
from pathlib import Path
from wled_controller.config import StorageConfig
from wled_controller.utils import get_logger
logger = get_logger(__name__)
# Fixed IDs so cross-references are stable
_DEVICE_IDS = {
"strip": "device_demo0001",
"matrix": "device_demo0002",
"ring": "device_demo0003",
}
_TARGET_IDS = {
"strip": "pt_demo0001",
"matrix": "pt_demo0002",
}
_PS_IDS = {
"main": "ps_demo0001",
"secondary": "ps_demo0002",
}
_CSS_IDS = {
"gradient": "css_demo0001",
"cycle": "css_demo0002",
"picture": "css_demo0003",
"audio": "css_demo0004",
}
_AS_IDS = {
"system": "as_demo0001",
"mono": "as_demo0002",
}
_TPL_ID = "tpl_demo0001"
_SCENE_ID = "scene_demo0001"
_NOW = datetime.now(timezone.utc).isoformat()
def _write_store(path: Path, json_key: str, items: dict) -> None:
"""Write a store JSON file with version wrapper."""
path.parent.mkdir(parents=True, exist_ok=True)
data = {
"version": "1.0.0",
json_key: items,
}
path.write_text(json.dumps(data, indent=2), encoding="utf-8")
logger.info(f"Seeded {len(items)} {json_key} -> {path}")
def _has_data(storage_config: StorageConfig) -> bool:
"""Check if any demo store file already has entities."""
for field_name in storage_config.model_fields:
value = getattr(storage_config, field_name)
if not isinstance(value, str):
continue
p = Path(value)
if p.exists() and p.stat().st_size > 20:
# File exists and is non-trivial — check if it has entities
try:
raw = json.loads(p.read_text(encoding="utf-8"))
for key, val in raw.items():
if key != "version" and isinstance(val, dict) and val:
return True
except Exception:
pass
return False
def seed_demo_data(storage_config: StorageConfig) -> None:
"""Populate demo data directory with sample entities.
Only runs when the demo data directory is empty (no existing entities).
Must be called BEFORE store constructors run so they load the seeded data.
"""
if _has_data(storage_config):
logger.info("Demo data already exists — skipping seed")
return
logger.info("Seeding demo data for first-run experience")
_seed_devices(Path(storage_config.devices_file))
_seed_capture_templates(Path(storage_config.templates_file))
_seed_output_targets(Path(storage_config.output_targets_file))
_seed_picture_sources(Path(storage_config.picture_sources_file))
_seed_color_strip_sources(Path(storage_config.color_strip_sources_file))
_seed_audio_sources(Path(storage_config.audio_sources_file))
_seed_scene_presets(Path(storage_config.scene_presets_file))
logger.info("Demo seed data complete")
# ── Devices ────────────────────────────────────────────────────────
def _seed_devices(path: Path) -> None:
devices = {
_DEVICE_IDS["strip"]: {
"id": _DEVICE_IDS["strip"],
"name": "Demo LED Strip",
"url": "demo://demo-strip",
"led_count": 60,
"enabled": True,
"device_type": "demo",
"created_at": _NOW,
"updated_at": _NOW,
},
_DEVICE_IDS["matrix"]: {
"id": _DEVICE_IDS["matrix"],
"name": "Demo LED Matrix",
"url": "demo://demo-matrix",
"led_count": 256,
"enabled": True,
"device_type": "demo",
"created_at": _NOW,
"updated_at": _NOW,
},
_DEVICE_IDS["ring"]: {
"id": _DEVICE_IDS["ring"],
"name": "Demo LED Ring",
"url": "demo://demo-ring",
"led_count": 24,
"enabled": True,
"device_type": "demo",
"created_at": _NOW,
"updated_at": _NOW,
},
}
_write_store(path, "devices", devices)
# ── Capture Templates ──────────────────────────────────────────────
def _seed_capture_templates(path: Path) -> None:
templates = {
_TPL_ID: {
"id": _TPL_ID,
"name": "Demo Capture",
"engine_type": "demo",
"engine_config": {},
"description": "Default capture template using demo engine",
"tags": ["demo"],
"created_at": _NOW,
"updated_at": _NOW,
},
}
_write_store(path, "templates", templates)
# ── Output Targets ─────────────────────────────────────────────────
def _seed_output_targets(path: Path) -> None:
targets = {
_TARGET_IDS["strip"]: {
"id": _TARGET_IDS["strip"],
"name": "Strip — Gradient",
"target_type": "led",
"device_id": _DEVICE_IDS["strip"],
"color_strip_source_id": _CSS_IDS["gradient"],
"brightness_value_source_id": "",
"fps": 30,
"keepalive_interval": 1.0,
"state_check_interval": 30,
"min_brightness_threshold": 0,
"adaptive_fps": False,
"protocol": "ddp",
"description": "Demo LED strip with gradient effect",
"tags": ["demo"],
"created_at": _NOW,
"updated_at": _NOW,
},
_TARGET_IDS["matrix"]: {
"id": _TARGET_IDS["matrix"],
"name": "Matrix — Screen Capture",
"target_type": "led",
"device_id": _DEVICE_IDS["matrix"],
"color_strip_source_id": _CSS_IDS["picture"],
"brightness_value_source_id": "",
"fps": 30,
"keepalive_interval": 1.0,
"state_check_interval": 30,
"min_brightness_threshold": 0,
"adaptive_fps": False,
"protocol": "ddp",
"description": "Demo LED matrix with screen capture",
"tags": ["demo"],
"created_at": _NOW,
"updated_at": _NOW,
},
}
_write_store(path, "output_targets", targets)
# ── Picture Sources ────────────────────────────────────────────────
def _seed_picture_sources(path: Path) -> None:
sources = {
_PS_IDS["main"]: {
"id": _PS_IDS["main"],
"name": "Demo Display 1080p",
"stream_type": "raw",
"display_index": 0,
"capture_template_id": _TPL_ID,
"target_fps": 30,
"description": "Virtual 1920x1080 display capture",
"tags": ["demo"],
"created_at": _NOW,
"updated_at": _NOW,
# Nulls for non-applicable subclass fields
"source_stream_id": None,
"postprocessing_template_id": None,
"image_source": None,
"url": None,
"loop": None,
"playback_speed": None,
"start_time": None,
"end_time": None,
"resolution_limit": None,
"clock_id": None,
},
_PS_IDS["secondary"]: {
"id": _PS_IDS["secondary"],
"name": "Demo Display 4K",
"stream_type": "raw",
"display_index": 1,
"capture_template_id": _TPL_ID,
"target_fps": 30,
"description": "Virtual 3840x2160 display capture",
"tags": ["demo"],
"created_at": _NOW,
"updated_at": _NOW,
"source_stream_id": None,
"postprocessing_template_id": None,
"image_source": None,
"url": None,
"loop": None,
"playback_speed": None,
"start_time": None,
"end_time": None,
"resolution_limit": None,
"clock_id": None,
},
}
_write_store(path, "picture_sources", sources)
# ── Color Strip Sources ────────────────────────────────────────────
def _seed_color_strip_sources(path: Path) -> None:
sources = {
_CSS_IDS["gradient"]: {
"id": _CSS_IDS["gradient"],
"name": "Rainbow Gradient",
"source_type": "gradient",
"description": "Smooth rainbow gradient across all LEDs",
"clock_id": None,
"tags": ["demo"],
"stops": [
{"position": 0.0, "color": [255, 0, 0]},
{"position": 0.25, "color": [255, 255, 0]},
{"position": 0.5, "color": [0, 255, 0]},
{"position": 0.75, "color": [0, 0, 255]},
{"position": 1.0, "color": [255, 0, 255]},
],
"animation": {"enabled": True, "type": "gradient_shift", "speed": 0.5},
"created_at": _NOW,
"updated_at": _NOW,
},
_CSS_IDS["cycle"]: {
"id": _CSS_IDS["cycle"],
"name": "Warm Color Cycle",
"source_type": "color_cycle",
"description": "Smoothly cycles through warm colors",
"clock_id": None,
"tags": ["demo"],
"colors": [
[255, 60, 0],
[255, 140, 0],
[255, 200, 50],
[255, 100, 20],
],
"created_at": _NOW,
"updated_at": _NOW,
},
_CSS_IDS["picture"]: {
"id": _CSS_IDS["picture"],
"name": "Screen Capture — Main Display",
"source_type": "picture",
"description": "Captures colors from the main demo display",
"clock_id": None,
"tags": ["demo"],
"picture_source_id": _PS_IDS["main"],
"fps": 30,
"smoothing": 0.3,
"interpolation_mode": "average",
"calibration": {
"mode": "simple",
"layout": "clockwise",
"start_position": "bottom_left",
"leds_top": 28,
"leds_bottom": 28,
"leds_left": 16,
"leds_right": 16,
},
"led_count": 0,
"created_at": _NOW,
"updated_at": _NOW,
},
_CSS_IDS["audio"]: {
"id": _CSS_IDS["audio"],
"name": "Audio Spectrum",
"source_type": "audio",
"description": "Audio-reactive spectrum visualization",
"clock_id": None,
"tags": ["demo"],
"visualization_mode": "spectrum",
"audio_source_id": _AS_IDS["mono"],
"sensitivity": 1.0,
"smoothing": 0.3,
"palette": "rainbow",
"color": [0, 255, 0],
"color_peak": [255, 0, 0],
"led_count": 0,
"mirror": False,
"created_at": _NOW,
"updated_at": _NOW,
},
}
_write_store(path, "color_strip_sources", sources)
# ── Audio Sources ──────────────────────────────────────────────────
def _seed_audio_sources(path: Path) -> None:
sources = {
_AS_IDS["system"]: {
"id": _AS_IDS["system"],
"name": "Demo System Audio",
"source_type": "multichannel",
"device_index": 1,
"is_loopback": True,
"audio_template_id": None,
"description": "Virtual system audio (loopback)",
"tags": ["demo"],
"created_at": _NOW,
"updated_at": _NOW,
# Forward-compat null fields
"audio_source_id": None,
"channel": None,
},
_AS_IDS["mono"]: {
"id": _AS_IDS["mono"],
"name": "Demo Audio — Mono",
"source_type": "mono",
"audio_source_id": _AS_IDS["system"],
"channel": "mono",
"description": "Mono mix of demo system audio",
"tags": ["demo"],
"created_at": _NOW,
"updated_at": _NOW,
# Forward-compat null fields
"device_index": None,
"is_loopback": None,
"audio_template_id": None,
},
}
_write_store(path, "audio_sources", sources)
# ── Scene Presets ──────────────────────────────────────────────────
def _seed_scene_presets(path: Path) -> None:
presets = {
_SCENE_ID: {
"id": _SCENE_ID,
"name": "Demo Ambient",
"description": "Activates gradient on the strip and screen capture on the matrix",
"tags": ["demo"],
"order": 0,
"targets": [
{
"target_id": _TARGET_IDS["strip"],
"running": True,
"color_strip_source_id": _CSS_IDS["gradient"],
"brightness_value_source_id": "",
"fps": 30,
},
{
"target_id": _TARGET_IDS["matrix"],
"running": True,
"color_strip_source_id": _CSS_IDS["picture"],
"brightness_value_source_id": "",
"fps": 30,
},
],
"created_at": _NOW,
"updated_at": _NOW,
},
}
_write_store(path, "scene_presets", presets)
@@ -0,0 +1,93 @@
"""Demo device provider — virtual LED devices for demo mode."""
from datetime import datetime, timezone
from typing import List
from wled_controller.config import is_demo_mode
from wled_controller.core.devices.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
)
from wled_controller.core.devices.mock_client import MockClient
# Pre-defined virtual devices: (name, led_count, ip, width, height)
_DEMO_DEVICES = [
("Demo LED Strip", 60, "demo-strip", None, None),
("Demo LED Matrix", 256, "demo-matrix", 16, 16),
("Demo LED Ring", 24, "demo-ring", None, None),
]
class DemoDeviceProvider(LEDDeviceProvider):
"""Provider for virtual demo LED devices.
Exposes three discoverable virtual devices when demo mode is active.
Uses MockClient for actual LED output (pixels are silently discarded).
"""
@property
def device_type(self) -> str:
return "demo"
@property
def capabilities(self) -> set:
return {"manual_led_count", "power_control", "brightness_control", "static_color"}
def create_client(self, url: str, **kwargs) -> LEDClient:
return MockClient(
url,
led_count=kwargs.get("led_count", 0),
send_latency_ms=kwargs.get("send_latency_ms", 0),
)
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
# Simulate ~2ms latency for realistic appearance
return DeviceHealth(
online=True,
latency_ms=2.0,
last_checked=datetime.now(timezone.utc),
device_name=url,
device_version="demo",
)
async def validate_device(self, url: str) -> dict:
# Look up configured LED count from demo devices
for name, led_count, ip, _w, _h in _DEMO_DEVICES:
if url == f"demo://{ip}":
return {"led_count": led_count}
# Fallback for unknown demo URLs
return {"led_count": 60}
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
if not is_demo_mode():
return []
return [
DiscoveredDevice(
name=name,
url=f"demo://{ip}",
device_type="demo",
ip=ip,
mac=f"DE:MO:00:00:00:{i:02X}",
led_count=led_count,
version="demo",
)
for i, (name, led_count, ip, _w, _h) in enumerate(_DEMO_DEVICES)
]
async def get_power(self, url: str, **kwargs) -> bool:
return True
async def set_power(self, url: str, on: bool, **kwargs) -> None:
pass
async def get_brightness(self, url: str) -> int:
return 255
async def set_brightness(self, url: str, brightness: int) -> None:
pass
async def set_color(self, url: str, color, **kwargs) -> None:
pass
@@ -317,5 +317,8 @@ def _register_builtin_providers():
from wled_controller.core.devices.gamesense_provider import GameSenseDeviceProvider
register_provider(GameSenseDeviceProvider())
from wled_controller.core.devices.demo_provider import DemoDeviceProvider
register_provider(DemoDeviceProvider())
_register_builtin_providers()
@@ -78,6 +78,7 @@ class WledTargetProcessor(TargetProcessor):
self._last_preview_colors: np.ndarray | None = None
self._last_preview_brightness: int = 255
self._last_preview_data: bytes | None = None # cached full binary frame
self._preview_force_send: bool = False # flag to force immediate broadcast
# ----- Properties -----
@@ -512,6 +513,13 @@ class WledTargetProcessor(TargetProcessor):
# Send last known frame immediately so late joiners see current state
if self._last_preview_data is not None:
asyncio.ensure_future(self._send_preview_to(ws, self._last_preview_data))
elif self._last_preview_colors is not None:
data = bytes([self._last_preview_brightness]) + self._last_preview_colors.tobytes()
self._last_preview_data = data
asyncio.ensure_future(self._send_preview_to(ws, data))
# Also force the processing loop to broadcast on next iteration
# (handles edge cases where cached data is stale or None)
self._preview_force_send = True
@staticmethod
async def _send_preview_to(ws, data: bytes) -> None:
@@ -844,11 +852,24 @@ class WledTargetProcessor(TargetProcessor):
last_send_time = now
send_timestamps.append(now)
self._metrics.frames_keepalive += 1
if self._preview_clients and (now - _last_preview_broadcast) >= 0.066:
await self._broadcast_led_preview(send_colors, cur_brightness)
_last_preview_broadcast = now
self._metrics.frames_skipped += 1
self._metrics.fps_current = _fps_current_from_timestamps()
await asyncio.sleep(SKIP_REPOLL)
continue
# Force-send preview when a new client just connected
if self._preview_force_send and self._preview_clients and prev_frame_ref is not None:
self._preview_force_send = False
_force_colors = _cached_brightness(
self._fit_to_device(prev_frame_ref, _total_leds),
cur_brightness,
)
await self._broadcast_led_preview(_force_colors, cur_brightness)
_last_preview_broadcast = time.perf_counter()
if frame is prev_frame_ref and cur_brightness == _prev_brightness:
# Same frame + same brightness — keepalive or skip
if self._needs_keepalive and has_any_frame and (loop_start - last_send_time) >= keepalive_interval: