Add notification reactive color strip source with webhook trigger

New source_type "notification" fires one-shot visual effects (flash, pulse, sweep)
triggered via POST webhook. Designed as a composite layer for overlay on persistent
sources. Includes app color mapping, whitelist/blacklist filtering, and auto-sizing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-08 21:10:32 +03:00
parent 80b48e3618
commit de04872fdc
16 changed files with 932 additions and 6 deletions

View File

@@ -20,6 +20,7 @@ from wled_controller.core.processing.color_strip_stream import (
)
from wled_controller.core.processing.effect_stream import EffectColorStripStream
from wled_controller.core.processing.api_input_stream import ApiInputColorStripStream
from wled_controller.core.processing.notification_stream import NotificationColorStripStream
from wled_controller.utils import get_logger
logger = get_logger(__name__)
@@ -31,6 +32,7 @@ _SIMPLE_STREAM_MAP = {
"color_cycle": ColorCycleColorStripStream,
"effect": EffectColorStripStream,
"api_input": ApiInputColorStripStream,
"notification": NotificationColorStripStream,
}

View File

@@ -0,0 +1,290 @@
"""Notification color strip stream — fires one-shot visual alerts.
Renders brief animated LED effects (flash, pulse, sweep) triggered via the
fire() method. When idle, outputs black. Thread-safe: fire() can be called
from any thread (REST handler) while get_latest_colors() is called from the
target processor thread.
Uses a background render loop at 30 FPS with double-buffered output.
"""
import collections
import math
import threading
import time
from typing import Optional
import numpy as np
from wled_controller.core.processing.color_strip_stream import ColorStripStream
from wled_controller.utils import get_logger
logger = get_logger(__name__)
def _hex_to_rgb(hex_str: str) -> tuple:
"""Convert a '#RRGGBB' hex string to an (R, G, B) tuple.
Returns (255, 255, 255) for invalid input.
"""
try:
h = hex_str.lstrip("#")
if len(h) != 6:
return (255, 255, 255)
return (int(h[0:2], 16), int(h[2:4], 16), int(h[4:6], 16))
except (ValueError, TypeError):
return (255, 255, 255)
class NotificationColorStripStream(ColorStripStream):
"""Color strip stream that fires one-shot visual alerts.
Supports three notification effects:
- flash: linear fade from full brightness to zero
- pulse: smooth bell curve (sin)
- sweep: fill LEDs left-to-right, then fade out
Uses collections.deque for thread-safe event passing and threading.Lock
for the output color buffer.
"""
def __init__(self, source):
self._colors_lock = threading.Lock()
self._running = False
self._thread: Optional[threading.Thread] = None
self._fps = 30
# Event queue: deque of (color_rgb_tuple, start_time)
self._event_queue: collections.deque = collections.deque(maxlen=16)
# Active effect state
self._active_effect: Optional[dict] = None # {"color": (r,g,b), "start": float}
self._update_from_source(source)
def _update_from_source(self, source) -> None:
"""Parse config from source dataclass."""
self._notification_effect = getattr(source, "notification_effect", "flash")
self._duration_ms = max(100, int(getattr(source, "duration_ms", 1500)))
self._default_color = getattr(source, "default_color", "#FFFFFF")
self._app_colors = dict(getattr(source, "app_colors", {}))
self._app_filter_mode = getattr(source, "app_filter_mode", "off")
self._app_filter_list = list(getattr(source, "app_filter_list", []))
self._auto_size = not source.led_count
self._led_count = source.led_count if source.led_count and source.led_count > 0 else 1
with self._colors_lock:
self._colors: Optional[np.ndarray] = np.zeros((self._led_count, 3), dtype=np.uint8)
def fire(self, app_name: str = None, color_override: str = None) -> bool:
"""Trigger a notification effect. Thread-safe.
Args:
app_name: Optional app name for color lookup and filtering.
color_override: Optional hex color override (#RRGGBB).
Returns:
True if the notification was accepted, False if filtered out.
"""
# Check app filter
if app_name and self._app_filter_mode != "off":
in_list = app_name in self._app_filter_list
if self._app_filter_mode == "whitelist" and not in_list:
return False
if self._app_filter_mode == "blacklist" and in_list:
return False
# Resolve color: override > app_colors[app_name] > default_color
if color_override:
color = _hex_to_rgb(color_override)
elif app_name and app_name in self._app_colors:
color = _hex_to_rgb(self._app_colors[app_name])
else:
color = _hex_to_rgb(self._default_color)
# Push event to queue (thread-safe deque.append)
self._event_queue.append({"color": color, "start": time.monotonic()})
return True
def configure(self, device_led_count: int) -> None:
"""Set LED count from the target device (called on target start)."""
if self._auto_size and device_led_count > 0:
new_count = max(self._led_count, device_led_count)
if new_count != self._led_count:
self._led_count = new_count
with self._colors_lock:
self._colors = np.zeros((new_count, 3), dtype=np.uint8)
logger.debug(f"NotificationColorStripStream auto-sized to {new_count} LEDs")
@property
def target_fps(self) -> int:
return self._fps
@property
def is_animated(self) -> bool:
return True
@property
def led_count(self) -> int:
return self._led_count
def start(self) -> None:
if self._running:
return
self._running = True
self._thread = threading.Thread(
target=self._render_loop,
name="css-notification",
daemon=True,
)
self._thread.start()
logger.info(f"NotificationColorStripStream started (leds={self._led_count})")
def stop(self) -> None:
self._running = False
if self._thread:
self._thread.join(timeout=5.0)
if self._thread.is_alive():
logger.warning("NotificationColorStripStream render thread did not terminate within 5s")
self._thread = None
logger.info("NotificationColorStripStream stopped")
def get_latest_colors(self) -> Optional[np.ndarray]:
with self._colors_lock:
return self._colors
def update_source(self, source) -> None:
"""Hot-update config from updated source."""
from wled_controller.storage.color_strip_source import NotificationColorStripSource
if isinstance(source, NotificationColorStripSource):
prev_led_count = self._led_count if self._auto_size else None
self._update_from_source(source)
if prev_led_count and self._auto_size:
self._led_count = prev_led_count
with self._colors_lock:
self._colors = np.zeros((self._led_count, 3), dtype=np.uint8)
logger.info("NotificationColorStripStream params updated in-place")
def set_clock(self, clock) -> None:
"""Set or clear the sync clock runtime (not used, but required for interface)."""
pass # Notification stream does not use sync clocks
# ── Render loop ──────────────────────────────────────────────────
def _render_loop(self) -> None:
"""Background thread rendering at 30 FPS."""
_pool_n = 0
_buf_a = _buf_b = None
_use_a = True
try:
while self._running:
wall_start = time.perf_counter()
frame_time = 1.0 / self._fps
try:
# Check for new events
while self._event_queue:
try:
event = self._event_queue.popleft()
self._active_effect = event
except IndexError:
break
n = self._led_count
# Reallocate buffers if LED count changed
if n != _pool_n:
_pool_n = n
_buf_a = np.zeros((n, 3), dtype=np.uint8)
_buf_b = np.zeros((n, 3), dtype=np.uint8)
buf = _buf_a if _use_a else _buf_b
_use_a = not _use_a
if self._active_effect is not None:
color = self._active_effect["color"]
start_time = self._active_effect["start"]
elapsed_ms = (time.monotonic() - start_time) * 1000.0
duration_ms = self._duration_ms
progress = min(elapsed_ms / duration_ms, 1.0)
if progress >= 1.0:
# Effect complete, output black
self._active_effect = None
buf[:] = 0
else:
self._render_effect(buf, n, color, progress)
else:
# Idle: output black
buf[:] = 0
with self._colors_lock:
self._colors = buf
except Exception as e:
logger.error(f"NotificationColorStripStream render error: {e}")
elapsed = time.perf_counter() - wall_start
time.sleep(max(frame_time - elapsed, 0.001))
except Exception as e:
logger.error(f"Fatal NotificationColorStripStream loop error: {e}", exc_info=True)
finally:
self._running = False
def _render_effect(self, buf: np.ndarray, n: int, color: tuple, progress: float) -> None:
"""Dispatch to the appropriate effect renderer."""
effect = self._notification_effect
if effect == "pulse":
self._render_pulse(buf, n, color, progress)
elif effect == "sweep":
self._render_sweep(buf, n, color, progress)
else:
# Default: flash
self._render_flash(buf, n, color, progress)
def _render_flash(self, buf: np.ndarray, n: int, color: tuple, progress: float) -> None:
"""Flash effect: linear fade from full brightness to zero."""
brightness = max(0.0, 1.0 - progress)
r = int(color[0] * brightness)
g = int(color[1] * brightness)
b = int(color[2] * brightness)
buf[:, 0] = r
buf[:, 1] = g
buf[:, 2] = b
def _render_pulse(self, buf: np.ndarray, n: int, color: tuple, progress: float) -> None:
"""Pulse effect: smooth bell curve (sin)."""
brightness = math.sin(progress * math.pi)
r = int(color[0] * brightness)
g = int(color[1] * brightness)
b = int(color[2] * brightness)
buf[:, 0] = r
buf[:, 1] = g
buf[:, 2] = b
def _render_sweep(self, buf: np.ndarray, n: int, color: tuple, progress: float) -> None:
"""Sweep effect: fill LEDs left-to-right, then fade all.
First half: progressively fill LEDs from left to right.
Second half: fade all filled LEDs from full to zero.
"""
if progress < 0.5:
# Fill phase: progress 0→0.5 maps to fill_pos 0→n
fill_progress = progress * 2.0
fill_pos = int(fill_progress * n)
buf[:] = 0
if fill_pos > 0:
buf[:fill_pos, 0] = color[0]
buf[:fill_pos, 1] = color[1]
buf[:fill_pos, 2] = color[2]
else:
# Fade phase: progress 0.5→1.0 maps to brightness 1→0
fade_progress = (progress - 0.5) * 2.0
brightness = max(0.0, 1.0 - fade_progress)
r = int(color[0] * brightness)
g = int(color[1] * brightness)
b = int(color[2] * brightness)
buf[:, 0] = r
buf[:, 1] = g
buf[:, 2] = b