Add 5 procedural LED effects, gradient presets, auto-crop min aspect ratio, static source polling optimization

New features:
- Procedural effect source type with fire, meteor, plasma, noise, and aurora algorithms
  using palette LUT system and 1D value noise generator
- 12 predefined gradient presets (rainbow, sunset, ocean, forest, fire, lava, aurora,
  ice, warm, cool, neon, pastel) selectable from a dropdown in the gradient editor
- Auto-crop filter: min aspect ratio parameter to prevent false-positive cropping
  in dark scenes on ultrawide displays

Optimization:
- Static/gradient sources without animation: stream thread sleeps 0.25s instead of
  frame_time; processor repolls at frame_time instead of 5ms (~40x fewer iterations)
- Inverted isinstance checks in routes to test for PictureColorStripSource only

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-23 01:03:16 +03:00
parent 9392741f08
commit a4083764fb
14 changed files with 1033 additions and 19 deletions

View File

@@ -37,11 +37,21 @@ class AutoCropFilter(PostprocessingFilter):
max_value=200,
step=5,
),
FilterOptionDef(
key="min_aspect_ratio",
label="Min Aspect Ratio",
option_type="float",
default=0.0,
min_value=0.0,
max_value=3.0,
step=0.01,
),
]
def process_image(self, image: np.ndarray, image_pool: ImagePool) -> Optional[np.ndarray]:
threshold = self.options.get("threshold", 15)
min_bar_size = self.options.get("min_bar_size", 20)
min_aspect_ratio = float(self.options.get("min_aspect_ratio", 0.0))
h, w = image.shape[:2]
min_h = max(1, h // 10)
@@ -81,6 +91,22 @@ class AutoCropFilter(PostprocessingFilter):
if (w - right) < min_bar_size:
right = w
# Enforce minimum aspect ratio: if the cropped region is too narrow,
# symmetrically reduce the crop on the tighter axis.
if min_aspect_ratio > 0:
cropped_w_cur = right - left
cropped_h_cur = bottom - top
if cropped_h_cur > 0:
ratio = cropped_w_cur / cropped_h_cur
if ratio < min_aspect_ratio:
# Too narrow — widen by reducing left/right crop
needed_w = int(cropped_h_cur * min_aspect_ratio)
deficit = needed_w - cropped_w_cur
expand_left = deficit // 2
expand_right = deficit - expand_left
left = max(0, left - expand_left)
right = min(w, right + expand_right)
# Safety: don't crop if remaining content is too small
if (bottom - top) < min_h:
top, bottom = 0, h

View File

@@ -79,6 +79,16 @@ class ColorStripStream(ABC):
def led_count(self) -> int:
"""Number of LEDs this stream produces colors for."""
@property
def is_animated(self) -> bool:
"""Whether this stream is actively producing new frames.
Used by the processor to adjust polling rate: animated streams are
polled at 5 ms (SKIP_REPOLL) to stay in sync with the animation
thread, while static streams are polled at frame_time rate.
"""
return True
@property
def display_index(self) -> Optional[int]:
"""Display index of the underlying capture, or None."""
@@ -505,6 +515,11 @@ class StaticColorStripStream(ColorStripStream):
def target_fps(self) -> int:
return self._fps
@property
def is_animated(self) -> bool:
anim = self._animation
return bool(anim and anim.get("enabled"))
@property
def led_count(self) -> int:
return self._led_count
@@ -644,7 +659,8 @@ class StaticColorStripStream(ColorStripStream):
self._colors = colors
elapsed = time.perf_counter() - loop_start
time.sleep(max(frame_time - elapsed, 0.001))
sleep_target = frame_time if anim and anim.get("enabled") else 0.25
time.sleep(max(sleep_target - elapsed, 0.001))
class ColorCycleColorStripStream(ColorStripStream):
@@ -834,6 +850,11 @@ class GradientColorStripStream(ColorStripStream):
def target_fps(self) -> int:
return self._fps
@property
def is_animated(self) -> bool:
anim = self._animation
return bool(anim and anim.get("enabled"))
@property
def led_count(self) -> int:
return self._led_count
@@ -1011,4 +1032,5 @@ class GradientColorStripStream(ColorStripStream):
self._colors = colors
elapsed = time.perf_counter() - loop_start
time.sleep(max(frame_time - elapsed, 0.001))
sleep_target = frame_time if anim and anim.get("enabled") else 0.25
time.sleep(max(sleep_target - elapsed, 0.001))

View File

@@ -19,6 +19,7 @@ from wled_controller.core.processing.color_strip_stream import (
PictureColorStripStream,
StaticColorStripStream,
)
from wled_controller.core.processing.effect_stream import EffectColorStripStream
from wled_controller.utils import get_logger
logger = get_logger(__name__)
@@ -89,6 +90,7 @@ class ColorStripStreamManager:
from wled_controller.storage.color_strip_source import (
ColorCycleColorStripSource,
EffectColorStripSource,
GradientColorStripSource,
PictureColorStripSource,
StaticColorStripSource,
@@ -129,6 +131,17 @@ class ColorStripStreamManager:
logger.info(f"Created gradient color strip stream for source {css_id}")
return css_stream
if isinstance(source, EffectColorStripSource):
css_stream = EffectColorStripStream(source)
css_stream.start()
self._streams[css_id] = _ColorStripEntry(
stream=css_stream,
ref_count=1,
picture_source_id="",
)
logger.info(f"Created effect stream for source {css_id} (effect={source.effect_type})")
return css_stream
if not isinstance(source, PictureColorStripSource):
raise ValueError(
f"Unsupported color strip source type '{source.source_type}' for {css_id}"

View File

@@ -0,0 +1,408 @@
"""Procedural LED effect stream — fire, meteor, plasma, noise, aurora.
Implements EffectColorStripStream which produces animated LED color arrays
using purely algorithmic rendering (no capture source). Each effect is a
render method dispatched by effect_type.
Palette LUTs and a simple 1-D value-noise generator are included so that
no external dependencies are required.
"""
import math
import threading
import time
from typing import Dict, Optional
import numpy as np
from wled_controller.core.processing.color_strip_stream import ColorStripStream
from wled_controller.utils import get_logger
from wled_controller.utils.timer import high_resolution_timer
logger = get_logger(__name__)
# ── Palette LUT system ──────────────────────────────────────────────────
# Each palette is a list of (position, R, G, B) control points.
# Positions must be monotonically increasing from 0.0 to 1.0.
_PALETTE_DEFS: Dict[str, list] = {
"fire": [(0, 0, 0, 0), (0.33, 200, 24, 0), (0.66, 255, 160, 0), (1.0, 255, 255, 200)],
"ocean": [(0, 0, 0, 32), (0.33, 0, 16, 128), (0.66, 0, 128, 255), (1.0, 128, 224, 255)],
"lava": [(0, 0, 0, 0), (0.25, 128, 0, 0), (0.5, 255, 32, 0), (0.75, 255, 160, 0), (1.0, 255, 255, 128)],
"forest": [(0, 0, 16, 0), (0.33, 0, 80, 0), (0.66, 32, 160, 0), (1.0, 128, 255, 64)],
"rainbow": [(0, 255, 0, 0), (0.17, 255, 255, 0), (0.33, 0, 255, 0),
(0.5, 0, 255, 255), (0.67, 0, 0, 255), (0.83, 255, 0, 255), (1.0, 255, 0, 0)],
"aurora": [(0, 0, 16, 32), (0.2, 0, 80, 64), (0.4, 0, 200, 100),
(0.6, 64, 128, 255), (0.8, 128, 0, 200), (1.0, 0, 16, 32)],
"sunset": [(0, 32, 0, 64), (0.25, 128, 0, 128), (0.5, 255, 64, 0),
(0.75, 255, 192, 64), (1.0, 255, 255, 192)],
"ice": [(0, 0, 0, 64), (0.33, 0, 64, 192), (0.66, 128, 192, 255), (1.0, 240, 248, 255)],
}
_palette_cache: Dict[str, np.ndarray] = {}
def _build_palette_lut(name: str) -> np.ndarray:
"""Build a (256, 3) uint8 lookup table for the named palette."""
if name in _palette_cache:
return _palette_cache[name]
points = _PALETTE_DEFS.get(name, _PALETTE_DEFS["fire"])
lut = np.zeros((256, 3), dtype=np.uint8)
for i in range(256):
t = i / 255.0
# Find surrounding control points
a_idx = 0
for j in range(len(points) - 1):
if points[j + 1][0] >= t:
a_idx = j
break
else:
a_idx = len(points) - 2
a_pos, ar, ag, ab = points[a_idx]
b_pos, br, bg, bb = points[a_idx + 1]
span = b_pos - a_pos
frac = (t - a_pos) / span if span > 0 else 0.0
lut[i] = (
int(ar + (br - ar) * frac),
int(ag + (bg - ag) * frac),
int(ab + (bb - ab) * frac),
)
_palette_cache[name] = lut
return lut
# ── 1-D value noise (no external deps) ──────────────────────────────────
class _ValueNoise1D:
"""Simple 1-D value noise with smoothstep interpolation and fractal octaves."""
def __init__(self, seed: int = 42):
rng = np.random.RandomState(seed)
self._table = rng.random(512).astype(np.float32)
def noise(self, x: np.ndarray) -> np.ndarray:
"""Single-octave smooth noise for an array of float positions."""
size = len(self._table)
xi = np.floor(x).astype(np.int64)
frac = (x - xi).astype(np.float32)
t = frac * frac * (3.0 - 2.0 * frac) # smoothstep
a = self._table[xi % size]
b = self._table[(xi + 1) % size]
return a + t * (b - a)
def fbm(self, x: np.ndarray, octaves: int = 3) -> np.ndarray:
"""Fractal Brownian Motion — layered noise at decreasing amplitude."""
result = np.zeros_like(x, dtype=np.float32)
amp = 1.0
freq = 1.0
total_amp = 0.0
for _ in range(octaves):
result += amp * self.noise(x * freq)
total_amp += amp
amp *= 0.5
freq *= 2.0
return result / total_amp
# ── Effect stream ────────────────────────────────────────────────────────
# Default palette per effect type
_EFFECT_DEFAULT_PALETTE = {
"fire": "fire",
"meteor": "fire",
"plasma": "rainbow",
"noise": "rainbow",
"aurora": "aurora",
}
class EffectColorStripStream(ColorStripStream):
"""Color strip stream that runs a procedural LED effect.
Dispatches to one of five render methods based on effect_type:
fire, meteor, plasma, noise, aurora.
Uses the same lifecycle pattern as StaticColorStripStream:
background thread, double-buffered output, configure() for auto-sizing.
"""
def __init__(self, source):
self._colors_lock = threading.Lock()
self._running = False
self._thread: Optional[threading.Thread] = None
self._fps = 30
self._noise = _ValueNoise1D(seed=42)
# Fire state — allocated lazily in render loop
self._heat: Optional[np.ndarray] = None
self._heat_n = 0
self._update_from_source(source)
def _update_from_source(self, source) -> None:
self._effect_type = getattr(source, "effect_type", "fire")
self._speed = float(getattr(source, "speed", 1.0))
self._auto_size = not source.led_count
self._led_count = source.led_count if source.led_count and source.led_count > 0 else 1
self._palette_name = getattr(source, "palette", None) or _EFFECT_DEFAULT_PALETTE.get(self._effect_type, "fire")
self._palette_lut = _build_palette_lut(self._palette_name)
color = getattr(source, "color", None)
self._color = color if isinstance(color, list) and len(color) == 3 else [255, 80, 0]
self._intensity = float(getattr(source, "intensity", 1.0))
self._scale = float(getattr(source, "scale", 1.0))
self._mirror = bool(getattr(source, "mirror", False))
with self._colors_lock:
self._colors: Optional[np.ndarray] = None
def configure(self, device_led_count: int) -> None:
if self._auto_size and device_led_count > 0:
new_count = max(self._led_count, device_led_count)
if new_count != self._led_count:
self._led_count = new_count
logger.debug(f"EffectColorStripStream auto-sized to {new_count} LEDs")
@property
def target_fps(self) -> int:
return self._fps
@property
def led_count(self) -> int:
return self._led_count
def set_capture_fps(self, fps: int) -> None:
self._fps = max(1, min(90, fps))
def start(self) -> None:
if self._running:
return
self._running = True
self._thread = threading.Thread(
target=self._animate_loop,
name=f"css-effect-{self._effect_type}",
daemon=True,
)
self._thread.start()
logger.info(f"EffectColorStripStream started (effect={self._effect_type}, leds={self._led_count})")
def stop(self) -> None:
self._running = False
if self._thread:
self._thread.join(timeout=5.0)
if self._thread.is_alive():
logger.warning("EffectColorStripStream animate thread did not terminate within 5s")
self._thread = None
self._heat = None
self._heat_n = 0
logger.info("EffectColorStripStream stopped")
def get_latest_colors(self) -> Optional[np.ndarray]:
with self._colors_lock:
return self._colors
def update_source(self, source) -> None:
from wled_controller.storage.color_strip_source import EffectColorStripSource
if isinstance(source, EffectColorStripSource):
prev_led_count = self._led_count if self._auto_size else None
self._update_from_source(source)
if prev_led_count and self._auto_size:
self._led_count = prev_led_count
logger.info("EffectColorStripStream params updated in-place")
# ── Main animation loop ──────────────────────────────────────────
def _animate_loop(self) -> None:
_pool_n = 0
_buf_a = _buf_b = None
_use_a = True
# Dispatch table
renderers = {
"fire": self._render_fire,
"meteor": self._render_meteor,
"plasma": self._render_plasma,
"noise": self._render_noise,
"aurora": self._render_aurora,
}
with high_resolution_timer():
while self._running:
loop_start = time.perf_counter()
frame_time = 1.0 / self._fps
n = self._led_count
if n != _pool_n:
_pool_n = n
_buf_a = np.empty((n, 3), dtype=np.uint8)
_buf_b = np.empty((n, 3), dtype=np.uint8)
buf = _buf_a if _use_a else _buf_b
_use_a = not _use_a
render_fn = renderers.get(self._effect_type, self._render_fire)
render_fn(buf, n, loop_start)
with self._colors_lock:
self._colors = buf
elapsed = time.perf_counter() - loop_start
time.sleep(max(frame_time - elapsed, 0.001))
# ── Fire ─────────────────────────────────────────────────────────
def _render_fire(self, buf: np.ndarray, n: int, t: float) -> None:
"""Heat-propagation fire simulation.
A 1-D heat array cools, diffuses upward, and receives random sparks
at the bottom. Heat values are mapped to the palette LUT.
"""
speed = self._speed
intensity = self._intensity
lut = self._palette_lut
# (Re)allocate heat array when LED count changes
if self._heat is None or self._heat_n != n:
self._heat = np.zeros(n, dtype=np.float32)
self._heat_n = n
heat = self._heat
# Cooling: proportional to strip length so larger strips don't go dark
cooling = 0.02 * speed * (60.0 / max(n, 1))
heat -= cooling
np.clip(heat, 0.0, 1.0, out=heat)
# Diffuse heat upward (index 0 = bottom, index n-1 = top)
if n >= 3:
# Average of neighbors, shifted upward
new_heat = np.empty_like(heat)
new_heat[0] = (heat[0] + heat[1]) * 0.5
new_heat[1:-1] = (heat[:-2] + heat[1:-1] + heat[2:]) / 3.0
new_heat[-1] = heat[-1] * 0.5
heat[:] = new_heat
# Sparks at the bottom
spark_zone = max(1, n // 8)
spark_prob = 0.3 * intensity
for i in range(spark_zone):
if np.random.random() < spark_prob:
heat[i] = min(1.0, heat[i] + 0.4 + 0.6 * np.random.random())
# Map heat to palette
indices = np.clip((heat * 255).astype(np.int32), 0, 255)
buf[:] = lut[indices]
# ── Meteor ───────────────────────────────────────────────────────
def _render_meteor(self, buf: np.ndarray, n: int, t: float) -> None:
"""Bright meteor head with exponential-decay trail."""
speed = self._speed
intensity = self._intensity
color = self._color
mirror = self._mirror
# Compute position along the strip
travel_speed = speed * 8.0 # LEDs per second
if mirror:
# Bounce: position ping-pongs 0→n-1→0
cycle = 2 * (n - 1) if n > 1 else 1
raw_pos = (t * travel_speed) % cycle
pos = raw_pos if raw_pos < n else cycle - raw_pos
else:
pos = (t * travel_speed) % n
# Tail decay factor: intensity controls how long the tail is
# Higher intensity → slower decay → longer tail
decay = 0.05 + 0.25 * (1.0 - min(1.0, intensity)) # 0.05 (long) to 0.30 (short)
# Compute brightness for each LED based on distance behind the meteor
indices = np.arange(n, dtype=np.float32)
if mirror:
dist = np.abs(indices - pos)
else:
# Signed distance in the direction of travel (behind = positive)
dist = (pos - indices) % n
brightness = np.exp(-dist * decay)
# Apply color
r, g, b = color
buf[:, 0] = np.clip(brightness * r, 0, 255).astype(np.uint8)
buf[:, 1] = np.clip(brightness * g, 0, 255).astype(np.uint8)
buf[:, 2] = np.clip(brightness * b, 0, 255).astype(np.uint8)
# Bright white-ish head (within ±1 LED of position)
head_mask = np.abs(indices - pos) < 1.5
head_brightness = np.clip(1.0 - np.abs(indices - pos), 0, 1)
buf[head_mask, 0] = np.clip(
buf[head_mask, 0].astype(np.int16) + (head_brightness[head_mask] * (255 - r)).astype(np.int16),
0, 255,
).astype(np.uint8)
buf[head_mask, 1] = np.clip(
buf[head_mask, 1].astype(np.int16) + (head_brightness[head_mask] * (255 - g)).astype(np.int16),
0, 255,
).astype(np.uint8)
buf[head_mask, 2] = np.clip(
buf[head_mask, 2].astype(np.int16) + (head_brightness[head_mask] * (255 - b)).astype(np.int16),
0, 255,
).astype(np.uint8)
# ── Plasma ───────────────────────────────────────────────────────
def _render_plasma(self, buf: np.ndarray, n: int, t: float) -> None:
"""Overlapping sine waves creating colorful plasma patterns."""
speed = self._speed
scale = self._scale
lut = self._palette_lut
phase = t * speed * 0.5
x = np.linspace(0, scale * math.pi * 2, n, dtype=np.float64)
v = (
np.sin(x + phase)
+ np.sin(x * 0.5 + phase * 1.3)
+ np.sin(x * 0.3 + phase * 0.7)
+ np.sin(x * 0.7 + phase * 1.1)
)
# Normalize from [-4, 4] to [0, 255]
indices = np.clip(((v + 4.0) * (255.0 / 8.0)).astype(np.int32), 0, 255)
buf[:] = lut[indices]
# ── Perlin Noise ─────────────────────────────────────────────────
def _render_noise(self, buf: np.ndarray, n: int, t: float) -> None:
"""Smooth scrolling fractal noise mapped to a color palette."""
speed = self._speed
scale = self._scale
lut = self._palette_lut
positions = np.arange(n, dtype=np.float32) * scale * 0.1 + t * speed * 0.5
values = self._noise.fbm(positions, octaves=3)
indices = np.clip((values * 255).astype(np.int32), 0, 255)
buf[:] = lut[indices]
# ── Aurora ───────────────────────────────────────────────────────
def _render_aurora(self, buf: np.ndarray, n: int, t: float) -> None:
"""Layered noise bands simulating aurora borealis."""
speed = self._speed
scale = self._scale
intensity = self._intensity
lut = self._palette_lut
positions = np.arange(n, dtype=np.float32) * scale * 0.08
# Three noise layers at different speeds and offsets
layer1 = self._noise.fbm(positions + t * speed * 0.2, octaves=3)
layer2 = self._noise.fbm(positions * 1.5 + t * speed * 0.35 + 100.0, octaves=3)
layer3 = self._noise.fbm(positions * 0.7 + t * speed * 0.15 + 200.0, octaves=2)
# Combine layers: layer1 drives hue, layer2 modulates brightness,
# layer3 adds slow undulation
hue = (layer1 + layer3 * 0.5) * 0.67 # 01 range for palette lookup
hue = np.clip(hue, 0.0, 1.0)
brightness = 0.3 + 0.7 * layer2 * intensity
brightness = np.clip(brightness, 0.0, 1.0)
indices = np.clip((hue * 255).astype(np.int32), 0, 255)
colors = lut[indices].astype(np.float32)
colors *= brightness[:, np.newaxis]
buf[:] = np.clip(colors, 0, 255).astype(np.uint8)

View File

@@ -116,13 +116,14 @@ class WledTargetProcessor(TargetProcessor):
self._color_strip_stream = stream
self._resolved_display_index = stream.display_index
# For auto-sized static/gradient/color_cycle streams (led_count == 0), size to device LED count
# For auto-sized static/gradient/color_cycle/effect streams (led_count == 0), size to device LED count
from wled_controller.core.processing.color_strip_stream import (
ColorCycleColorStripStream,
GradientColorStripStream,
StaticColorStripStream,
)
if isinstance(stream, (StaticColorStripStream, GradientColorStripStream, ColorCycleColorStripStream)) and device_info.led_count > 0:
from wled_controller.core.processing.effect_stream import EffectColorStripStream
if isinstance(stream, (StaticColorStripStream, GradientColorStripStream, ColorCycleColorStripStream, EffectColorStripStream)) and device_info.led_count > 0:
stream.configure(device_info.led_count)
# Notify stream manager of our target FPS so it can adjust capture rate
@@ -486,7 +487,8 @@ class WledTargetProcessor(TargetProcessor):
while send_timestamps and send_timestamps[0] < now - 1.0:
send_timestamps.popleft()
self._metrics.fps_current = len(send_timestamps)
await asyncio.sleep(SKIP_REPOLL)
repoll = SKIP_REPOLL if stream.is_animated else frame_time
await asyncio.sleep(repoll)
continue
prev_colors = colors