Add API Input color strip source type with REST and WebSocket push

New source_type "api_input" allows external clients to push raw LED
color arrays ([R,G,B] per LED) via REST POST or WebSocket. Includes
configurable fallback color and timeout for automatic revert when no
data is received. Stream auto-sizes LED count from the target device.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-24 17:07:47 +03:00
parent 1e4a7a067f
commit 67a15776b2
10 changed files with 512 additions and 10 deletions

View File

@@ -0,0 +1,169 @@
"""API Input color strip stream — receives raw LED colors from external clients.
External clients push [R,G,B] arrays via REST POST or WebSocket. The stream
buffers the latest frame and serves it to targets. When no data has been
received within `timeout` seconds, LEDs revert to `fallback_color`.
Thread-safe: push_colors() can be called from any thread (REST handler,
WebSocket handler) while get_latest_colors() is called from the target
processor thread.
"""
import threading
import time
from typing import Optional
import numpy as np
from wled_controller.core.processing.color_strip_stream import ColorStripStream
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class ApiInputColorStripStream(ColorStripStream):
"""Color strip stream backed by externally-pushed LED color data.
Holds a thread-safe np.ndarray buffer. External clients push colors via
push_colors(). A background thread checks for timeout and reverts to
fallback_color when no data arrives within the configured timeout window.
"""
def __init__(self, source):
"""
Args:
source: ApiInputColorStripSource config
"""
self._lock = threading.Lock()
self._running = False
self._thread: Optional[threading.Thread] = None
self._fps = 30
# Parse config
fallback = source.fallback_color
self._fallback_color = fallback if isinstance(fallback, list) and len(fallback) == 3 else [0, 0, 0]
self._timeout = max(0.0, source.timeout if source.timeout else 5.0)
self._auto_size = not source.led_count
self._led_count = source.led_count if source.led_count and source.led_count > 0 else 1
# Build initial fallback buffer
self._fallback_array = self._build_fallback(self._led_count)
self._colors = self._fallback_array.copy()
self._last_push_time: float = 0.0
self._timed_out = True # Start in timed-out state
def _build_fallback(self, led_count: int) -> np.ndarray:
"""Build a (led_count, 3) uint8 array filled with fallback_color."""
return np.tile(
np.array(self._fallback_color, dtype=np.uint8),
(led_count, 1),
)
def push_colors(self, colors: np.ndarray) -> None:
"""Push a new frame of LED colors.
Thread-safe. The array is truncated or zero-padded to match led_count.
Args:
colors: np.ndarray shape (N, 3) uint8
"""
with self._lock:
n = len(colors)
if n == self._led_count:
self._colors = colors.astype(np.uint8)
elif n > self._led_count:
self._colors = colors[:self._led_count].astype(np.uint8)
else:
# Zero-pad to led_count
padded = np.zeros((self._led_count, 3), dtype=np.uint8)
padded[:n] = colors[:n]
self._colors = padded
self._last_push_time = time.monotonic()
self._timed_out = False
def configure(self, device_led_count: int) -> None:
"""Set LED count from the target device (called on target start).
Only takes effect when led_count was 0 (auto-size).
"""
if self._auto_size and device_led_count > 0 and device_led_count != self._led_count:
with self._lock:
self._led_count = device_led_count
self._fallback_array = self._build_fallback(device_led_count)
self._colors = self._fallback_array.copy()
self._timed_out = True
logger.debug(f"ApiInputColorStripStream auto-sized to {device_led_count} LEDs")
@property
def target_fps(self) -> int:
return self._fps
@property
def is_animated(self) -> bool:
return True # Always poll — external data can arrive at any time
@property
def led_count(self) -> int:
return self._led_count
def start(self) -> None:
if self._running:
return
self._running = True
self._thread = threading.Thread(
target=self._timeout_loop,
name="css-api-input-timeout",
daemon=True,
)
self._thread.start()
logger.info(f"ApiInputColorStripStream started (leds={self._led_count})")
def stop(self) -> None:
self._running = False
if self._thread:
self._thread.join(timeout=5.0)
if self._thread.is_alive():
logger.warning("ApiInputColorStripStream timeout thread did not terminate within 5s")
self._thread = None
logger.info("ApiInputColorStripStream stopped")
def get_latest_colors(self) -> Optional[np.ndarray]:
with self._lock:
return self._colors
def update_source(self, source) -> None:
"""Hot-update fallback_color and timeout from updated source config."""
from wled_controller.storage.color_strip_source import ApiInputColorStripSource
if isinstance(source, ApiInputColorStripSource):
fallback = source.fallback_color
self._fallback_color = fallback if isinstance(fallback, list) and len(fallback) == 3 else [0, 0, 0]
self._timeout = max(0.0, source.timeout if source.timeout else 5.0)
prev_led_count = self._led_count if self._auto_size else None
self._auto_size = not source.led_count
with self._lock:
self._fallback_array = self._build_fallback(self._led_count)
if self._timed_out:
self._colors = self._fallback_array.copy()
# Preserve runtime LED count across updates if auto-sized
if prev_led_count and self._auto_size:
self._led_count = prev_led_count
with self._lock:
self._fallback_array = self._build_fallback(self._led_count)
if self._timed_out:
self._colors = self._fallback_array.copy()
logger.info("ApiInputColorStripStream params updated in-place")
def _timeout_loop(self) -> None:
"""Background thread that reverts to fallback on timeout."""
while self._running:
time.sleep(0.5)
if self._timeout <= 0:
continue
if self._timed_out:
continue
elapsed = time.monotonic() - self._last_push_time
if elapsed >= self._timeout:
with self._lock:
self._colors = self._fallback_array.copy()
self._timed_out = True
logger.debug("ApiInputColorStripStream timed out, reverted to fallback")

View File

@@ -19,6 +19,7 @@ from wled_controller.core.processing.color_strip_stream import (
StaticColorStripStream,
)
from wled_controller.core.processing.effect_stream import EffectColorStripStream
from wled_controller.core.processing.api_input_stream import ApiInputColorStripStream
from wled_controller.utils import get_logger
logger = get_logger(__name__)
@@ -29,6 +30,7 @@ _SIMPLE_STREAM_MAP = {
"gradient": GradientColorStripStream,
"color_cycle": ColorCycleColorStripStream,
"effect": EffectColorStripStream,
"api_input": ApiInputColorStripStream,
}
@@ -271,6 +273,19 @@ class ColorStripStreamManager:
new_fps = 30 # default when no consumers
entry.stream.set_capture_fps(new_fps)
def get_streams_by_source_id(self, css_id: str) -> list:
"""Return all running stream instances for a given source ID.
Checks both the shared key (css_id) and per-consumer keys (css_id:xxx).
Used by the REST/WS push handlers to broadcast pushed colors to all
consumers of an api_input source.
"""
streams = []
for key, entry in self._streams.items():
if key == css_id or key.startswith(f"{css_id}:"):
streams.append(entry.stream)
return streams
def release_all(self) -> None:
"""Stop and remove all managed color strip streams. Called on shutdown."""
css_ids = list(self._streams.keys())