Add frame interpolation postprocessing filter + KC hot-settings

Frame interpolation filter (frame_interpolation):
- New PostprocessingFilter with supports_idle_frames = True
- Backward-blend algorithm: blends frame N-1 → N over one capture
  interval, producing smooth output on idle ticks at ≤1 frame of lag
- Detects new vs idle frames via cheap 64-byte signature comparison
- No options; registered alongside other built-in filters

ProcessedLiveStream idle-tick support:
- Detects supports_idle_frames filters at construction (_has_idle_filters)
- target_fps returns 2× source rate when idle filters are present
- _process_loop runs at 2× rate; idle ticks copy cached source frame
  and run full filter chain, publishing result only when a filter
  returned actual interpolated output (not a pass-through)
- Pass-through idle ticks leave _latest_frame unchanged so consumers
  correctly deduplicate via object identity

KC target hot-settings:
- brightness, smoothing, interpolation_mode now read from self._settings
  each frame instead of captured as stale locals at loop startup
- Changes take effect within one frame without stop/restart

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-20 21:01:45 +03:00
parent 55e25b8860
commit 872949a7e1
5 changed files with 142 additions and 11 deletions

View File

@@ -18,6 +18,7 @@ import wled_controller.core.filters.pixelate # noqa: F401
import wled_controller.core.filters.auto_crop # noqa: F401
import wled_controller.core.filters.flip # noqa: F401
import wled_controller.core.filters.color_correction # noqa: F401
import wled_controller.core.filters.frame_interpolation # noqa: F401
__all__ = [
"FilterOptionDef",

View File

@@ -41,6 +41,7 @@ class PostprocessingFilter(ABC):
filter_id: str = ""
filter_name: str = ""
supports_idle_frames: bool = False
def __init__(self, options: Dict[str, Any]):
"""Initialize filter with validated options."""

View File

@@ -0,0 +1,89 @@
"""Frame interpolation postprocessing filter."""
import time
from typing import Any, Dict, List, Optional
import numpy as np
from wled_controller.core.filters.base import FilterOptionDef, PostprocessingFilter
from wled_controller.core.filters.image_pool import ImagePool
from wled_controller.core.filters.registry import FilterRegistry
@FilterRegistry.register
class FrameInterpolationFilter(PostprocessingFilter):
"""Blends consecutive frames to fill idle loop ticks with smooth output.
When ProcessedLiveStream runs at 2× source capture rate, idle ticks
(where no new source frame has arrived) receive an interpolated blend of
the two most recently captured frames instead of a duplicate.
Algorithm (backward blend):
- When frame B arrives at T_b: start blending A → B over the interval
(T_b - T_a), the duration between the two most recent source frames.
- At time T: alpha = clamp((T - T_b) / (T_b - T_a), 0, 1)
- output = (1 - alpha)*A + alpha*B
New vs idle frames are detected by comparing a cheap 64-byte signature of
the filter input. Since preceding filters are deterministic and the source
image is unchanged between idle ticks, the signature is stable.
Introduces at most one capture-interval of lag (≤33 ms at 30 fps) —
imperceptible for ambient lighting use cases.
"""
filter_id = "frame_interpolation"
filter_name = "Frame Interpolation"
supports_idle_frames = True
def __init__(self, options: Dict[str, Any]):
super().__init__(options)
self._frame_a: Optional[np.ndarray] = None # frame N-1
self._frame_b: Optional[np.ndarray] = None # frame N (latest source)
self._time_a: float = 0.0
self._time_b: float = 0.0
self._sig_b: Optional[bytes] = None # 64-byte signature of frame_b input
@classmethod
def get_options_schema(cls) -> List[FilterOptionDef]:
return []
def process_image(self, image: np.ndarray, image_pool: ImagePool) -> Optional[np.ndarray]:
"""Return interpolated blend on idle ticks; update state on new source frames.
Returns:
None — image passes through unchanged (no blend needed).
ndarray — blended output acquired from image_pool.
"""
now = time.perf_counter()
# Detect new vs idle frame via cheap 64-byte signature
sig = bytes(image.ravel()[:64])
if sig != self._sig_b:
# New source frame — shift A ← B, B ← current
self._frame_a = self._frame_b
self._time_a = self._time_b
self._frame_b = image.copy()
self._time_b = now
self._sig_b = sig
# Need at least two frames and a positive interval to interpolate
if self._frame_a is None or self._time_b <= self._time_a:
return None # pass through unchanged
interval = self._time_b - self._time_a
alpha = min(1.0, (now - self._time_b) / interval)
if alpha >= 1.0:
return None # fully transitioned to B, no blend needed
# Blend: output = (1 - alpha)*A + alpha*B (integer fast path)
alpha_i = int(alpha * 256)
h, w, c = image.shape
out = image_pool.acquire(h, w, c)
blended = (
(256 - alpha_i) * self._frame_a.astype(np.uint16)
+ alpha_i * image.astype(np.uint16)
) >> 8
np.copyto(out, blended, casting="unsafe")
return out

View File

@@ -258,19 +258,14 @@ class KCTargetProcessor(TargetProcessor):
async def _processing_loop(self) -> None:
"""Main processing loop for key-colors extraction."""
settings = self._settings
target_fps = self._settings.fps
target_fps = settings.fps
smoothing = settings.smoothing
brightness = settings.brightness
# Select color calculation function
# Lookup table for interpolation mode → function (used per-frame from live settings)
calc_fns = {
"average": calculate_average_color,
"median": calculate_median_color,
"dominant": calculate_dominant_color,
}
calc_fn = calc_fns.get(settings.interpolation_mode, calculate_average_color)
frame_time = 1.0 / target_fps
fps_samples: collections.deque = collections.deque(maxlen=10)
@@ -331,11 +326,16 @@ class KCTargetProcessor(TargetProcessor):
continue
prev_capture = capture
# Read settings fresh each frame so hot updates (brightness,
# smoothing, interpolation_mode) take effect without restart.
s = self._settings
calc_fn = calc_fns.get(s.interpolation_mode, calculate_average_color)
# CPU-bound work in thread pool
colors, colors_arr, frame_timing = await asyncio.to_thread(
_process_kc_frame,
capture, rect_names, rect_bounds, calc_fn,
prev_colors_arr, smoothing, brightness,
prev_colors_arr, s.smoothing, s.brightness,
)
prev_colors_arr = colors_arr

View File

@@ -168,10 +168,17 @@ class ProcessedLiveStream(LiveStream):
self._frame_lock = threading.Lock()
self._running = False
self._thread: Optional[threading.Thread] = None
# True when at least one filter requests idle-tick processing (e.g. frame interpolation).
# The processing loop then runs at 2× source rate and runs the full filter chain on idle
# ticks so those filters can produce interpolated output.
self._has_idle_filters: bool = any(
getattr(f, "supports_idle_frames", False) for f in filters
)
@property
def target_fps(self) -> int:
return self._source.target_fps
base = self._source.target_fps
return base * 2 if self._has_idle_filters else base
@property
def display_index(self) -> Optional[int]:
@@ -212,14 +219,47 @@ class ProcessedLiveStream(LiveStream):
# processed by a consumer), so the 3rd slot is always safe to reuse.
_ring: List[Optional[np.ndarray]] = [None, None, None]
_ring_idx = 0
frame_time = 1.0 / self._source.target_fps if self._source.target_fps > 0 else 1.0
# Separate buffer for idle-tick source copies (not part of the ring buffer)
_idle_src_buf: Optional[np.ndarray] = None
fps = self.target_fps
frame_time = 1.0 / fps if fps > 0 else 1.0
while self._running:
loop_start = time.time()
source_frame = self._source.get_latest_frame()
if source_frame is None or source_frame is cached_source_frame:
# Sleep until next frame is expected
# Idle tick — run filter chain when any filter requests idle processing
if self._has_idle_filters and cached_source_frame is not None:
src = cached_source_frame.image
h, w, c = src.shape
if _idle_src_buf is None or _idle_src_buf.shape != (h, w, c):
_idle_src_buf = np.empty((h, w, c), dtype=np.uint8)
np.copyto(_idle_src_buf, src)
idle_image = _idle_src_buf
for f in self._filters:
result = f.process_image(idle_image, self._image_pool)
if result is not None:
if idle_image is not _idle_src_buf:
self._image_pool.release(idle_image)
idle_image = result
# Only publish a new frame when the filter chain produced actual
# interpolated output (idle_image advanced past the input buffer).
# If every filter passed through, idle_image is still _idle_src_buf —
# leave _latest_frame unchanged so consumers that rely on object
# identity for deduplication correctly detect no new content.
if idle_image is not _idle_src_buf:
processed = ScreenCapture(
image=idle_image,
width=cached_source_frame.width,
height=cached_source_frame.height,
display_index=cached_source_frame.display_index,
)
with self._frame_lock:
self._latest_frame = processed
elapsed = time.time() - loop_start
remaining = frame_time - elapsed
time.sleep(max(remaining, 0.001))