Add Daylight Cycle and Candlelight CSS source types

Full-stack implementation of two new color strip source types:
- Daylight: simulates day/night color cycle with real-time or speed-based mode, latitude support
- Candlelight: multi-candle fire simulation with Gaussian falloff, layered-sine flicker, warm color shift

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-10 11:07:30 +03:00
parent 954e37c2ca
commit 37c80f01af
15 changed files with 882 additions and 7 deletions

View File

@@ -0,0 +1,247 @@
"""Candlelight LED stream — realistic per-LED candle flickering.
Implements CandlelightColorStripStream which produces warm, organic
flickering across all LEDs using layered sine waves and value noise.
Each "candle" is an independent flicker source that illuminates
nearby LEDs with smooth falloff.
"""
import math
import threading
import time
from typing import Optional
import numpy as np
from wled_controller.core.processing.color_strip_stream import ColorStripStream
from wled_controller.utils import get_logger
from wled_controller.utils.timer import high_resolution_timer
logger = get_logger(__name__)
# ── Simple hash-based noise ──────────────────────────────────────────
_PERM = np.arange(256, dtype=np.int32)
_rng = np.random.RandomState(seed=17)
_rng.shuffle(_PERM)
_PERM = np.concatenate([_PERM, _PERM]) # 512 entries for wrap-free indexing
def _noise1d(x: np.ndarray) -> np.ndarray:
"""Fast 1-D value noise (vectorized). Returns float32 in [0, 1]."""
xi = x.astype(np.int32) & 255
xf = x - np.floor(x)
# Smoothstep
u = xf * xf * (3.0 - 2.0 * xf)
a = _PERM[xi].astype(np.float32) / 255.0
b = _PERM[xi + 1].astype(np.float32) / 255.0
return a + u * (b - a)
class CandlelightColorStripStream(ColorStripStream):
"""Color strip stream simulating realistic candle flickering.
Each LED flickers independently with warm tones. Multiple
"candle sources" are distributed along the strip, each generating
its own flicker pattern with smooth spatial falloff.
"""
def __init__(self, source):
self._colors_lock = threading.Lock()
self._running = False
self._thread: Optional[threading.Thread] = None
self._fps = 30
self._frame_time = 1.0 / 30
self._clock = None
self._led_count = 1
self._auto_size = True
# Scratch arrays
self._s_bright: Optional[np.ndarray] = None
self._s_noise: Optional[np.ndarray] = None
self._s_x: Optional[np.ndarray] = None
self._pool_n = 0
self._update_from_source(source)
def _update_from_source(self, source) -> None:
raw_color = getattr(source, "color", None)
self._color = raw_color if isinstance(raw_color, list) and len(raw_color) == 3 else [255, 147, 41]
self._intensity = float(getattr(source, "intensity", 1.0))
self._num_candles = max(1, int(getattr(source, "num_candles", 3)))
self._speed = float(getattr(source, "speed", 1.0))
_lc = getattr(source, "led_count", 0)
self._auto_size = not _lc
self._led_count = _lc if _lc and _lc > 0 else 1
with self._colors_lock:
self._colors: Optional[np.ndarray] = None
def configure(self, device_led_count: int) -> None:
if self._auto_size and device_led_count > 0:
new_count = max(self._led_count, device_led_count)
if new_count != self._led_count:
self._led_count = new_count
@property
def target_fps(self) -> int:
return self._fps
@property
def led_count(self) -> int:
return self._led_count
def set_capture_fps(self, fps: int) -> None:
self._fps = max(1, min(90, fps))
self._frame_time = 1.0 / self._fps
def start(self) -> None:
if self._running:
return
self._running = True
self._thread = threading.Thread(
target=self._animate_loop,
name="css-candlelight",
daemon=True,
)
self._thread.start()
logger.info(f"CandlelightColorStripStream started (leds={self._led_count}, candles={self._num_candles})")
def stop(self) -> None:
self._running = False
if self._thread:
self._thread.join(timeout=5.0)
if self._thread.is_alive():
logger.warning("CandlelightColorStripStream thread did not terminate within 5s")
self._thread = None
logger.info("CandlelightColorStripStream stopped")
def get_latest_colors(self) -> Optional[np.ndarray]:
with self._colors_lock:
return self._colors
def update_source(self, source) -> None:
from wled_controller.storage.color_strip_source import CandlelightColorStripSource
if isinstance(source, CandlelightColorStripSource):
prev_led_count = self._led_count if self._auto_size else None
self._update_from_source(source)
if prev_led_count and self._auto_size:
self._led_count = prev_led_count
logger.info("CandlelightColorStripStream params updated in-place")
def set_clock(self, clock) -> None:
self._clock = clock
# ── Animation loop ──────────────────────────────────────────────
def _animate_loop(self) -> None:
_pool_n = 0
_buf_a = _buf_b = None
_use_a = True
try:
with high_resolution_timer():
while self._running:
wall_start = time.perf_counter()
frame_time = self._frame_time
try:
clock = self._clock
if clock:
if not clock.is_running:
time.sleep(0.1)
continue
t = clock.get_time()
speed = clock.speed * self._speed
else:
t = wall_start
speed = self._speed
n = self._led_count
if n != _pool_n:
_pool_n = n
_buf_a = np.empty((n, 3), dtype=np.uint8)
_buf_b = np.empty((n, 3), dtype=np.uint8)
self._s_bright = np.empty(n, dtype=np.float32)
self._s_noise = np.empty(n, dtype=np.float32)
self._s_x = np.arange(n, dtype=np.float32)
buf = _buf_a if _use_a else _buf_b
_use_a = not _use_a
self._render_candlelight(buf, n, t, speed)
with self._colors_lock:
self._colors = buf
except Exception as e:
logger.error(f"CandlelightColorStripStream animation error: {e}")
elapsed = time.perf_counter() - wall_start
time.sleep(max(frame_time - elapsed, 0.001))
except Exception as e:
logger.error(f"Fatal CandlelightColorStripStream loop error: {e}", exc_info=True)
finally:
self._running = False
def _render_candlelight(self, buf: np.ndarray, n: int, t: float, speed: float) -> None:
"""Render candle flickering into buf (n, 3) uint8.
Algorithm:
- Place num_candles evenly along the strip
- Each candle has independent layered-sine flicker
- Spatial falloff: LEDs near a candle are brighter
- Per-LED noise adds individual variation
- Final brightness modulates the base warm color
"""
intensity = self._intensity
num_candles = self._num_candles
base_r, base_g, base_b = self._color[0], self._color[1], self._color[2]
bright = self._s_bright
bright[:] = 0.0
# Candle positions: evenly distributed
if num_candles == 1:
positions = [n / 2.0]
else:
positions = [i * (n - 1) / (num_candles - 1) for i in range(num_candles)]
x = self._s_x[:n]
for ci, pos in enumerate(positions):
# Independent flicker for this candle: layered sines at different frequencies
# Use candle index as phase offset for independence
offset = ci * 137.5 # golden-angle offset for non-repeating
flicker = (
0.40 * math.sin(2 * math.pi * speed * t * 3.7 + offset)
+ 0.25 * math.sin(2 * math.pi * speed * t * 7.3 + offset * 0.7)
+ 0.15 * math.sin(2 * math.pi * speed * t * 13.1 + offset * 1.3)
+ 0.10 * math.sin(2 * math.pi * speed * t * 1.9 + offset * 0.3)
)
# Normalize flicker to [0.3, 1.0] range (candles never fully go dark)
candle_brightness = 0.65 + 0.35 * flicker * intensity
# Spatial falloff: Gaussian centered on candle position
# sigma proportional to strip length / num_candles
sigma = max(n / (num_candles * 2.0), 2.0)
dist = x - pos
falloff = np.exp(-0.5 * (dist * dist) / (sigma * sigma))
bright += candle_brightness * falloff
# Per-LED noise for individual variation
noise_x = x * 0.3 + t * speed * 5.0
noise = _noise1d(noise_x)
# Modulate brightness with noise (±15%)
bright *= (0.85 + 0.30 * noise)
# Clamp to [0, 1]
np.clip(bright, 0.0, 1.0, out=bright)
# Apply base color with brightness modulation
# Candles emit warmer (more red, less blue) at lower brightness
# Add slight color variation: dimmer = warmer
warm_shift = (1.0 - bright) * 0.3
r = bright * base_r
g = bright * base_g * (1.0 - warm_shift * 0.5)
b = bright * base_b * (1.0 - warm_shift)
buf[:, 0] = np.clip(r, 0, 255).astype(np.uint8)
buf[:, 1] = np.clip(g, 0, 255).astype(np.uint8)
buf[:, 2] = np.clip(b, 0, 255).astype(np.uint8)

View File

@@ -21,6 +21,8 @@ from wled_controller.core.processing.color_strip_stream import (
from wled_controller.core.processing.effect_stream import EffectColorStripStream
from wled_controller.core.processing.api_input_stream import ApiInputColorStripStream
from wled_controller.core.processing.notification_stream import NotificationColorStripStream
from wled_controller.core.processing.daylight_stream import DaylightColorStripStream
from wled_controller.core.processing.candlelight_stream import CandlelightColorStripStream
from wled_controller.utils import get_logger
logger = get_logger(__name__)
@@ -33,6 +35,8 @@ _SIMPLE_STREAM_MAP = {
"effect": EffectColorStripStream,
"api_input": ApiInputColorStripStream,
"notification": NotificationColorStripStream,
"daylight": DaylightColorStripStream,
"candlelight": CandlelightColorStripStream,
}

View File

@@ -0,0 +1,221 @@
"""Daylight cycle LED stream — simulates natural daylight color temperature.
Implements DaylightColorStripStream which produces a uniform LED color array
that transitions through dawn, daylight, sunset, and night over a continuous
24-hour cycle. Can use real wall-clock time or a configurable simulation speed.
"""
import math
import threading
import time
from typing import Optional
import numpy as np
from wled_controller.core.processing.color_strip_stream import ColorStripStream
from wled_controller.utils import get_logger
from wled_controller.utils.timer import high_resolution_timer
logger = get_logger(__name__)
# ── Daylight color table ────────────────────────────────────────────────
#
# Maps hour-of-day (024) to RGB color. Interpolated linearly between
# control points. Colors approximate natural daylight color temperature
# from warm sunrise tones through cool midday to warm sunset and dim night.
#
# Format: (hour, R, G, B)
_DAYLIGHT_CURVE = [
(0.0, 10, 10, 30), # midnight — deep blue
(4.0, 10, 10, 40), # pre-dawn — dark blue
(5.5, 40, 20, 60), # first light — purple hint
(6.0, 255, 100, 30), # sunrise — warm orange
(7.0, 255, 170, 80), # early morning — golden
(8.0, 255, 220, 160), # morning — warm white
(10.0, 255, 245, 230), # mid-morning — neutral warm
(12.0, 240, 248, 255), # noon — cool white / slight blue
(14.0, 255, 250, 240), # afternoon — neutral
(16.0, 255, 230, 180), # late afternoon — warm
(17.5, 255, 180, 100), # pre-sunset — golden
(18.5, 255, 100, 40), # sunset — deep orange
(19.0, 200, 60, 40), # late sunset — red
(19.5, 100, 30, 60), # dusk — purple
(20.0, 40, 20, 60), # twilight — dark purple
(21.0, 15, 15, 45), # night — dark blue
(24.0, 10, 10, 30), # midnight (wrap)
]
# Pre-build a (1440, 3) uint8 LUT — one entry per minute of the day
_daylight_lut: Optional[np.ndarray] = None
def _get_daylight_lut() -> np.ndarray:
global _daylight_lut
if _daylight_lut is not None:
return _daylight_lut
lut = np.zeros((1440, 3), dtype=np.uint8)
for minute in range(1440):
hour = minute / 60.0
# Find surrounding control points
prev = _DAYLIGHT_CURVE[0]
nxt = _DAYLIGHT_CURVE[-1]
for i in range(len(_DAYLIGHT_CURVE) - 1):
if _DAYLIGHT_CURVE[i][0] <= hour <= _DAYLIGHT_CURVE[i + 1][0]:
prev = _DAYLIGHT_CURVE[i]
nxt = _DAYLIGHT_CURVE[i + 1]
break
span = nxt[0] - prev[0]
t = (hour - prev[0]) / span if span > 0 else 0.0
# Smooth interpolation (smoothstep)
t = t * t * (3 - 2 * t)
for ch in range(3):
lut[minute, ch] = int(prev[ch + 1] + (nxt[ch + 1] - prev[ch + 1]) * t + 0.5)
_daylight_lut = lut
return lut
class DaylightColorStripStream(ColorStripStream):
"""Color strip stream simulating a 24-hour daylight cycle.
All LEDs display the same color at any moment. The color smoothly
transitions through a pre-defined daylight curve.
"""
def __init__(self, source):
self._colors_lock = threading.Lock()
self._running = False
self._thread: Optional[threading.Thread] = None
self._fps = 10 # low FPS — transitions are slow
self._frame_time = 1.0 / 10
self._clock = None
self._led_count = 1
self._auto_size = True
self._lut = _get_daylight_lut()
self._update_from_source(source)
def _update_from_source(self, source) -> None:
self._speed = float(getattr(source, "speed", 1.0))
self._use_real_time = bool(getattr(source, "use_real_time", False))
self._latitude = float(getattr(source, "latitude", 50.0))
_lc = getattr(source, "led_count", 0)
self._auto_size = not _lc
self._led_count = _lc if _lc and _lc > 0 else 1
with self._colors_lock:
self._colors: Optional[np.ndarray] = None
def configure(self, device_led_count: int) -> None:
if self._auto_size and device_led_count > 0:
new_count = max(self._led_count, device_led_count)
if new_count != self._led_count:
self._led_count = new_count
@property
def target_fps(self) -> int:
return self._fps
@property
def led_count(self) -> int:
return self._led_count
def set_capture_fps(self, fps: int) -> None:
self._fps = max(1, min(30, fps))
self._frame_time = 1.0 / self._fps
def start(self) -> None:
if self._running:
return
self._running = True
self._thread = threading.Thread(
target=self._animate_loop,
name="css-daylight",
daemon=True,
)
self._thread.start()
logger.info(f"DaylightColorStripStream started (leds={self._led_count})")
def stop(self) -> None:
self._running = False
if self._thread:
self._thread.join(timeout=5.0)
if self._thread.is_alive():
logger.warning("DaylightColorStripStream thread did not terminate within 5s")
self._thread = None
logger.info("DaylightColorStripStream stopped")
def get_latest_colors(self) -> Optional[np.ndarray]:
with self._colors_lock:
return self._colors
def update_source(self, source) -> None:
from wled_controller.storage.color_strip_source import DaylightColorStripSource
if isinstance(source, DaylightColorStripSource):
prev_led_count = self._led_count if self._auto_size else None
self._update_from_source(source)
if prev_led_count and self._auto_size:
self._led_count = prev_led_count
logger.info("DaylightColorStripStream params updated in-place")
def set_clock(self, clock) -> None:
self._clock = clock
# ── Animation loop ──────────────────────────────────────────────
def _animate_loop(self) -> None:
_pool_n = 0
_buf_a = _buf_b = None
_use_a = True
try:
with high_resolution_timer():
while self._running:
wall_start = time.perf_counter()
frame_time = self._frame_time
try:
clock = self._clock
if clock:
if not clock.is_running:
time.sleep(0.1)
continue
t = clock.get_time()
speed = clock.speed
else:
t = wall_start
speed = self._speed
n = self._led_count
if n != _pool_n:
_pool_n = n
_buf_a = np.empty((n, 3), dtype=np.uint8)
_buf_b = np.empty((n, 3), dtype=np.uint8)
buf = _buf_a if _use_a else _buf_b
_use_a = not _use_a
if self._use_real_time:
# Use actual wall-clock time
import datetime
now = datetime.datetime.now()
minute_of_day = now.hour * 60 + now.minute + now.second / 60.0
else:
# Simulated cycle: speed=1.0 → full 24h in ~240s (4 min)
cycle_seconds = 240.0 / max(speed, 0.01)
phase = (t % cycle_seconds) / cycle_seconds # 0..1
minute_of_day = phase * 1440.0
idx = int(minute_of_day) % 1440
color = self._lut[idx]
buf[:] = color
with self._colors_lock:
self._colors = buf
except Exception as e:
logger.error(f"DaylightColorStripStream animation error: {e}")
elapsed = time.perf_counter() - wall_start
time.sleep(max(frame_time - elapsed, 0.001))
except Exception as e:
logger.error(f"Fatal DaylightColorStripStream loop error: {e}", exc_info=True)
finally:
self._running = False