Rework API input CSS: segments, remove led_count, HAOS light, test preview

API Input CSS rework:
- Remove led_count field from ApiInputColorStripSource (always auto-sizes)
- Add segment-based payload: solid, per_pixel, gradient modes
- Segments applied in order (last wins on overlap), auto-grow buffer
- Backward compatible: legacy {"colors": [...]} still works
- Pydantic validation: mode-specific field requirements

Test preview:
- Enable test preview button on api_input cards
- Hide LED/FPS controls for api_input (sender controls those)
- Show input source selector for all CSS tests (preselected)
- FPS sparkline chart using shared createFpsSparkline (same as target cards)
- Server only sends frames when push_generation changes (no idle frames)

HAOS integration:
- New light.py: ApiInputLight entity per api_input source (RGB + brightness)
- turn_on pushes solid segment, turn_off pushes fallback color
- Register wled_screen_controller.set_leds service for arbitrary segments
- New services.yaml with field definitions
- Coordinator: push_colors() and push_segments() methods
- Platform.LIGHT added to platforms list

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-17 14:47:42 +03:00
parent 823cb90d2d
commit 8a6ffca446
25 changed files with 1085 additions and 326 deletions
@@ -4,9 +4,9 @@ External clients push [R,G,B] arrays via REST POST or WebSocket. The stream
buffers the latest frame and serves it to targets. When no data has been
received within `timeout` seconds, LEDs revert to `fallback_color`.
Thread-safe: push_colors() can be called from any thread (REST handler,
WebSocket handler) while get_latest_colors() is called from the target
processor thread.
Thread-safe: push_colors() / push_segments() can be called from any thread
(REST handler, WebSocket handler) while get_latest_colors() is called from
the target processor thread.
"""
import threading
@@ -20,13 +20,16 @@ from wled_controller.utils import get_logger
logger = get_logger(__name__)
_DEFAULT_LED_COUNT = 150
class ApiInputColorStripStream(ColorStripStream):
"""Color strip stream backed by externally-pushed LED color data.
Holds a thread-safe np.ndarray buffer. External clients push colors via
push_colors(). A background thread checks for timeout and reverts to
fallback_color when no data arrives within the configured timeout window.
push_colors() or push_segments(). A background thread checks for timeout
and reverts to fallback_color when no data arrives within the configured
timeout window.
"""
def __init__(self, source):
@@ -43,14 +46,14 @@ class ApiInputColorStripStream(ColorStripStream):
fallback = source.fallback_color
self._fallback_color = fallback if isinstance(fallback, list) and len(fallback) == 3 else [0, 0, 0]
self._timeout = max(0.0, source.timeout if source.timeout else 5.0)
self._auto_size = not source.led_count
self._led_count = source.led_count if source.led_count and source.led_count > 0 else 1
self._led_count = _DEFAULT_LED_COUNT
# Build initial fallback buffer
self._fallback_array = self._build_fallback(self._led_count)
self._colors = self._fallback_array.copy()
self._last_push_time: float = 0.0
self._timed_out = True # Start in timed-out state
self._push_generation: int = 0 # Incremented on each push; used by test WS
def _build_fallback(self, led_count: int) -> np.ndarray:
"""Build a (led_count, 3) uint8 array filled with fallback_color."""
@@ -59,40 +62,124 @@ class ApiInputColorStripStream(ColorStripStream):
(led_count, 1),
)
def _ensure_capacity(self, required: int) -> None:
"""Grow the buffer to at least `required` LEDs (must be called under lock)."""
if required > self._led_count:
self._led_count = required
self._fallback_array = self._build_fallback(self._led_count)
# Preserve existing data if not timed out
if not self._timed_out:
new_buf = self._fallback_array.copy()
old_len = min(len(self._colors), required)
new_buf[:old_len] = self._colors[:old_len]
self._colors = new_buf
else:
self._colors = self._fallback_array.copy()
logger.debug(f"ApiInputColorStripStream buffer grown to {required} LEDs")
def push_colors(self, colors: np.ndarray) -> None:
"""Push a new frame of LED colors.
Thread-safe. The array is truncated or zero-padded to match led_count.
Thread-safe. Auto-grows the buffer if the incoming array is larger
than the current buffer; otherwise truncates or zero-pads.
Args:
colors: np.ndarray shape (N, 3) uint8
"""
with self._lock:
n = len(colors)
# Auto-grow if incoming data is larger
if n > self._led_count:
self._ensure_capacity(n)
if n == self._led_count:
self._colors = colors.astype(np.uint8)
elif n > self._led_count:
self._colors = colors[:self._led_count].astype(np.uint8)
else:
elif n < self._led_count:
# Zero-pad to led_count
padded = np.zeros((self._led_count, 3), dtype=np.uint8)
padded[:n] = colors[:n]
self._colors = padded
self._last_push_time = time.monotonic()
self._push_generation += 1
self._timed_out = False
def push_segments(self, segments: list) -> None:
"""Apply segment-based color updates to the buffer.
Each segment defines a range and fill mode. Segments are applied in
order (last wins on overlap). The buffer is auto-grown if needed.
Args:
segments: list of dicts with keys:
start (int) starting LED index
length (int) number of LEDs in segment
mode (str) "solid" | "per_pixel" | "gradient"
color (list) [R,G,B] for solid mode
colors (list) [[R,G,B], ...] for per_pixel/gradient
"""
# Compute required buffer size from all segments
max_index = max(seg["start"] + seg["length"] for seg in segments)
with self._lock:
# Auto-grow buffer if needed
if max_index > self._led_count:
self._ensure_capacity(max_index)
# Start from current buffer (or fallback if timed out)
if self._timed_out:
buf = self._fallback_array.copy()
else:
buf = self._colors.copy()
for seg in segments:
start = seg["start"]
length = seg["length"]
mode = seg["mode"]
end = start + length
if mode == "solid":
color = np.array(seg["color"], dtype=np.uint8)
buf[start:end] = color
elif mode == "per_pixel":
colors = np.array(seg["colors"], dtype=np.uint8)
available = len(colors)
if available >= length:
buf[start:end] = colors[:length]
else:
# Pad with zeros if fewer colors than length
buf[start:start + available] = colors
buf[start + available:end] = 0
elif mode == "gradient":
stops = np.array(seg["colors"], dtype=np.float32)
num_stops = len(stops)
# Positions of stops evenly spaced 0..length-1
stop_positions = np.linspace(0, length - 1, num_stops)
pixel_positions = np.arange(length, dtype=np.float32)
for ch in range(3):
buf[start:end, ch] = np.interp(
pixel_positions,
stop_positions,
stops[:, ch],
).astype(np.uint8)
self._colors = buf
self._last_push_time = time.monotonic()
self._push_generation += 1
self._timed_out = False
def configure(self, device_led_count: int) -> None:
"""Set LED count from the target device (called on target start).
Only takes effect when led_count was 0 (auto-size).
Always resizes the buffer to the device LED count.
"""
if self._auto_size and device_led_count > 0 and device_led_count != self._led_count:
if device_led_count > 0 and device_led_count != self._led_count:
with self._lock:
self._led_count = device_led_count
self._fallback_array = self._build_fallback(device_led_count)
self._colors = self._fallback_array.copy()
self._timed_out = True
logger.debug(f"ApiInputColorStripStream auto-sized to {device_led_count} LEDs")
logger.debug(f"ApiInputColorStripStream configured to {device_led_count} LEDs")
@property
def target_fps(self) -> int:
@@ -131,6 +218,11 @@ class ApiInputColorStripStream(ColorStripStream):
with self._lock:
return self._colors
@property
def push_generation(self) -> int:
"""Monotonically increasing counter, bumped on each push_colors/push_segments."""
return self._push_generation
def update_source(self, source) -> None:
"""Hot-update fallback_color and timeout from updated source config."""
from wled_controller.storage.color_strip_source import ApiInputColorStripSource
@@ -138,19 +230,10 @@ class ApiInputColorStripStream(ColorStripStream):
fallback = source.fallback_color
self._fallback_color = fallback if isinstance(fallback, list) and len(fallback) == 3 else [0, 0, 0]
self._timeout = max(0.0, source.timeout if source.timeout else 5.0)
prev_led_count = self._led_count if self._auto_size else None
self._auto_size = not source.led_count
with self._lock:
self._fallback_array = self._build_fallback(self._led_count)
if self._timed_out:
self._colors = self._fallback_array.copy()
# Preserve runtime LED count across updates if auto-sized
if prev_led_count and self._auto_size:
self._led_count = prev_led_count
with self._lock:
self._fallback_array = self._build_fallback(self._led_count)
if self._timed_out:
self._colors = self._fallback_array.copy()
logger.info("ApiInputColorStripStream params updated in-place")
def _timeout_loop(self) -> None: