perf(processing): event-driven frame hand-off and scheduling fixes
- LiveStream: add frame_id counter + Condition with wait_for_new_frame() helper. Producers (ScreenCaptureLiveStream, ProcessedLiveStream, StaticImageLiveStream, VideoCaptureLiveStream) now signal_new_frame() on each new frame; consumers (PictureColorStripStream, ProcessedLive Stream) wait on the event with frame_time as a safety timeout instead of polling + sleeping. Cuts glass-to-LED latency at matched FPS by up to one frame_time. - ProcessedLiveStream ring buffer: 3 -> 5 slots. The previous "max 2 frames in flight" assumption ignored the multi-consumer case where several PictureColorStripStream/HA-target threads can hold the same _latest_frame reference while we wrap. 5 slots gives ~83 ms of consumer-read margin at 60 FPS. - PictureColorStripStream advanced mode: reuse the already-fetched primary frame instead of re-acquiring its lock from _live_streams. - _blend_u16: use cv2.addWeighted (single SIMD-fused pass) when cv2 is available; numpy fallback unchanged. Output verified bit-equal to the existing 6-pass implementation. - FrameLimiter.wait: drop the 1 ms minimum-sleep floor. Over-budget loops no longer add an extra ms per iteration; the cap on achievable rate (~750 fps) is removed.
This commit is contained in:
@@ -7,6 +7,13 @@ from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
try:
|
||||
import cv2
|
||||
|
||||
_HAS_CV2 = True
|
||||
except ImportError:
|
||||
_HAS_CV2 = False
|
||||
|
||||
from ledgrab.core.capture.calibration import (
|
||||
AdvancedPixelMapper,
|
||||
CalibrationConfig,
|
||||
@@ -203,6 +210,11 @@ class PictureColorStripStream(ColorStripStream):
|
||||
def _processing_loop(self) -> None:
|
||||
"""Background thread: poll source, process, cache colors."""
|
||||
cached_frame = None
|
||||
# Track the source LiveStream's frame_id for event-driven waits.
|
||||
# When the source publishes a new frame, ``wait_for_new_frame`` wakes
|
||||
# us immediately instead of waiting for our own polling tick — cuts
|
||||
# glass-to-LED latency by up to a full frame_time at matched FPS.
|
||||
last_source_id = self._live_stream.current_frame_id
|
||||
|
||||
# Scratch buffer pool (pre-allocated, resized when LED count changes)
|
||||
_pool_n = 0
|
||||
@@ -213,8 +225,14 @@ class PictureColorStripStream(ColorStripStream):
|
||||
def _blend_u16(a, b, alpha_b, out):
|
||||
"""Blend two uint8 arrays: out = ((256-alpha_b)*a + alpha_b*b) >> 8.
|
||||
|
||||
Uses pre-allocated uint16 scratch buffers (_u16_a, _u16_b).
|
||||
Uses ``cv2.addWeighted`` (single SIMD-fused pass) when available;
|
||||
falls back to a 6-pass numpy implementation using pre-allocated
|
||||
uint16 scratch buffers (_u16_a, _u16_b).
|
||||
"""
|
||||
if _HAS_CV2:
|
||||
w_b = alpha_b / 256.0
|
||||
cv2.addWeighted(a, 1.0 - w_b, b, w_b, 0.0, dst=out)
|
||||
return
|
||||
nonlocal _u16_a, _u16_b
|
||||
np.copyto(_u16_a, a, casting="unsafe")
|
||||
np.copyto(_u16_b, b, casting="unsafe")
|
||||
@@ -229,6 +247,7 @@ class PictureColorStripStream(ColorStripStream):
|
||||
try:
|
||||
with high_resolution_timer():
|
||||
while self._running:
|
||||
loop_start = time.perf_counter()
|
||||
limiter.begin()
|
||||
frame_time = self._frame_time
|
||||
|
||||
@@ -236,10 +255,19 @@ class PictureColorStripStream(ColorStripStream):
|
||||
frame = self._live_stream.get_latest_frame()
|
||||
|
||||
if frame is None or frame is cached_frame:
|
||||
limiter.wait(frame_time)
|
||||
# Event-driven wait on the source: returns
|
||||
# immediately when a new frame lands, or after
|
||||
# frame_time as a safety timeout.
|
||||
elapsed = time.perf_counter() - loop_start
|
||||
remaining = max(frame_time - elapsed, 0.0)
|
||||
if remaining > 0:
|
||||
last_source_id = self._live_stream.wait_for_new_frame(
|
||||
last_source_id, timeout=remaining
|
||||
)
|
||||
continue
|
||||
|
||||
cached_frame = frame
|
||||
last_source_id = self._live_stream.current_frame_id
|
||||
|
||||
t0 = time.perf_counter()
|
||||
# Record the new frame in the rolling 1s window
|
||||
@@ -255,9 +283,14 @@ class PictureColorStripStream(ColorStripStream):
|
||||
mapper = self._pixel_mapper
|
||||
|
||||
if isinstance(mapper, AdvancedPixelMapper):
|
||||
# Advanced mode: gather frames from all live streams
|
||||
# Advanced mode: gather frames from all live streams.
|
||||
# Reuse the already-sampled primary frame to avoid
|
||||
# an extra lock acquisition on it.
|
||||
frames_dict = {}
|
||||
for ps_id, ls in self._live_streams.items():
|
||||
if ls is self._live_stream:
|
||||
frames_dict[ps_id] = frame
|
||||
else:
|
||||
f = ls.get_latest_frame()
|
||||
if f is not None:
|
||||
frames_dict[ps_id] = f
|
||||
|
||||
@@ -31,9 +31,17 @@ class LiveStream(ABC):
|
||||
"""Abstract base for a runtime frame source.
|
||||
|
||||
A LiveStream produces frames at some frequency. Consumers call
|
||||
get_latest_frame() to read the most recent frame (non-blocking).
|
||||
get_latest_frame() to read the most recent frame (non-blocking),
|
||||
or wait_for_new_frame() for event-driven consumption.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
# Monotonic frame counter incremented by ``_signal_new_frame``.
|
||||
# Consumers track the last value they saw to detect new frames
|
||||
# without busy-polling.
|
||||
self._frame_id: int = 0
|
||||
self._frame_cond: threading.Condition = threading.Condition()
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def target_fps(self) -> int:
|
||||
@@ -60,6 +68,35 @@ class LiveStream(ABC):
|
||||
ScreenCapture with image data (RGB), or None if no frame available yet.
|
||||
"""
|
||||
|
||||
@property
|
||||
def current_frame_id(self) -> int:
|
||||
"""Current monotonic frame ID. Increments on each new frame."""
|
||||
return self._frame_id
|
||||
|
||||
def _signal_new_frame(self) -> None:
|
||||
"""Producer: increment frame ID and notify all waiting consumers."""
|
||||
with self._frame_cond:
|
||||
self._frame_id += 1
|
||||
self._frame_cond.notify_all()
|
||||
|
||||
def wait_for_new_frame(self, last_seen_id: int, timeout: float) -> int:
|
||||
"""Consumer: block until a frame newer than ``last_seen_id`` arrives.
|
||||
|
||||
Args:
|
||||
last_seen_id: Last frame ID the caller has already processed.
|
||||
timeout: Maximum seconds to wait. ``0`` returns immediately.
|
||||
|
||||
Returns:
|
||||
Current ``frame_id`` (may equal ``last_seen_id`` if timed out).
|
||||
"""
|
||||
if timeout <= 0:
|
||||
return self._frame_id
|
||||
with self._frame_cond:
|
||||
if self._frame_id != last_seen_id:
|
||||
return self._frame_id
|
||||
self._frame_cond.wait(timeout=timeout)
|
||||
return self._frame_id
|
||||
|
||||
|
||||
class ScreenCaptureLiveStream(LiveStream):
|
||||
"""Live stream backed by a CaptureStream with a dedicated capture thread.
|
||||
@@ -73,6 +110,7 @@ class ScreenCaptureLiveStream(LiveStream):
|
||||
"""
|
||||
|
||||
def __init__(self, capture_stream: CaptureStream, fps: int):
|
||||
super().__init__()
|
||||
self._capture_stream = capture_stream
|
||||
self._fps = fps
|
||||
self._frame_time = 1.0 / fps if fps > 0 else 1.0
|
||||
@@ -139,6 +177,7 @@ class ScreenCaptureLiveStream(LiveStream):
|
||||
if frame is not None:
|
||||
with self._frame_lock:
|
||||
self._latest_frame = frame
|
||||
self._signal_new_frame()
|
||||
consecutive_errors = 0
|
||||
else:
|
||||
# Small sleep when no frame available to avoid CPU spinning
|
||||
@@ -181,6 +220,7 @@ class ProcessedLiveStream(LiveStream):
|
||||
source: LiveStream,
|
||||
filters: List[PostprocessingFilter],
|
||||
):
|
||||
super().__init__()
|
||||
self._source = source
|
||||
self._filters = filters
|
||||
self._image_pool = ImagePool()
|
||||
@@ -234,15 +274,22 @@ class ProcessedLiveStream(LiveStream):
|
||||
def _process_loop(self) -> None:
|
||||
"""Background thread: poll source, apply filters, cache result."""
|
||||
cached_source_frame: Optional[ScreenCapture] = None
|
||||
# Ring buffer: 3 slots guarantees consumer finished with oldest buffer.
|
||||
# At most 2 frames are in flight (one in _latest_frame, one being
|
||||
# processed by a consumer), so the 3rd slot is always safe to reuse.
|
||||
_ring: List[Optional[np.ndarray]] = [None, None, None]
|
||||
# Ring buffer: 5 slots gives a safety margin for the multi-consumer
|
||||
# case (multiple PictureColorStripStream/HA target threads can hold
|
||||
# the same _latest_frame reference while we wrap around). At 60 FPS
|
||||
# the 5-slot rotation gives any consumer ~83 ms to finish reading
|
||||
# before the slot is overwritten — well above any realistic
|
||||
# extract→map→smooth tick.
|
||||
_RING_SIZE = 5
|
||||
_ring: List[Optional[np.ndarray]] = [None] * _RING_SIZE
|
||||
_ring_idx = 0
|
||||
# Separate buffer for idle-tick source copies (not part of the ring buffer)
|
||||
_idle_src_buf: Optional[np.ndarray] = None
|
||||
fps = self.target_fps
|
||||
frame_time = 1.0 / fps if fps > 0 else 1.0
|
||||
# Track the source's frame_id so we can wait event-driven for new
|
||||
# frames instead of polling + sleeping.
|
||||
last_source_id = self._source.current_frame_id
|
||||
|
||||
try:
|
||||
with high_resolution_timer():
|
||||
@@ -278,13 +325,20 @@ class ProcessedLiveStream(LiveStream):
|
||||
)
|
||||
with self._frame_lock:
|
||||
self._latest_frame = processed
|
||||
self._signal_new_frame()
|
||||
|
||||
# Event-driven wait: blocks until source produces a
|
||||
# new frame, with frame_time as a safety timeout.
|
||||
elapsed = time.perf_counter() - loop_start
|
||||
remaining = frame_time - elapsed
|
||||
time.sleep(max(remaining, 0.001))
|
||||
remaining = max(frame_time - elapsed, 0.0)
|
||||
if remaining > 0:
|
||||
last_source_id = self._source.wait_for_new_frame(
|
||||
last_source_id, timeout=remaining
|
||||
)
|
||||
continue
|
||||
|
||||
cached_source_frame = source_frame
|
||||
last_source_id = self._source.current_frame_id
|
||||
|
||||
# Reuse ring buffer slot instead of allocating a new copy each frame
|
||||
src = source_frame.image
|
||||
@@ -293,7 +347,7 @@ class ProcessedLiveStream(LiveStream):
|
||||
if buf is None or buf.shape != (h, w, c):
|
||||
buf = np.empty((h, w, c), dtype=np.uint8)
|
||||
_ring[_ring_idx] = buf
|
||||
_ring_idx = (_ring_idx + 1) % 3
|
||||
_ring_idx = (_ring_idx + 1) % _RING_SIZE
|
||||
|
||||
np.copyto(buf, src)
|
||||
image = buf
|
||||
@@ -315,6 +369,7 @@ class ProcessedLiveStream(LiveStream):
|
||||
)
|
||||
with self._frame_lock:
|
||||
self._latest_frame = processed
|
||||
self._signal_new_frame()
|
||||
except Exception as e:
|
||||
logger.error(f"Filter processing error: {e}")
|
||||
time.sleep(0.01)
|
||||
@@ -328,9 +383,12 @@ class StaticImageLiveStream(LiveStream):
|
||||
"""Live stream that always returns the same static image."""
|
||||
|
||||
def __init__(self, image: np.ndarray):
|
||||
super().__init__()
|
||||
self._image = image
|
||||
h, w = image.shape[:2]
|
||||
self._frame = ScreenCapture(image=image, width=w, height=h, display_index=-1)
|
||||
# Bump frame_id once so consumers waiting on it see a "new" frame.
|
||||
self._signal_new_frame()
|
||||
|
||||
@property
|
||||
def target_fps(self) -> int:
|
||||
|
||||
@@ -144,6 +144,7 @@ class VideoCaptureLiveStream(LiveStream):
|
||||
target_fps: int = 30,
|
||||
):
|
||||
_require_cv2()
|
||||
super().__init__()
|
||||
self._original_url = url
|
||||
self._resolved_url: Optional[str] = None
|
||||
self._loop = loop
|
||||
@@ -348,6 +349,7 @@ class VideoCaptureLiveStream(LiveStream):
|
||||
sc = ScreenCapture(image=buf, width=w, height=h, display_index=-1)
|
||||
with self._frame_lock:
|
||||
self._latest_frame = sc
|
||||
self._signal_new_frame()
|
||||
|
||||
except Exception as e:
|
||||
consecutive_errors += 1
|
||||
|
||||
@@ -6,9 +6,9 @@ import time
|
||||
class FrameLimiter:
|
||||
"""Sleep helper to pace a streaming loop at a target FPS.
|
||||
|
||||
Tracks per-frame timing and exposes simple stats. The minimum sleep is
|
||||
clamped to 1ms (matches the historical ``max(..., 0.001)`` guard) so a
|
||||
bound is always present even when the loop is over budget.
|
||||
Tracks per-frame timing and exposes simple stats. When the loop is over
|
||||
budget ``wait()`` returns immediately rather than sleeping, so a slow
|
||||
iteration doesn't compound additional latency.
|
||||
"""
|
||||
|
||||
def __init__(self, target_fps: float):
|
||||
@@ -39,17 +39,19 @@ class FrameLimiter:
|
||||
def wait(self, frame_time: float | None = None) -> None:
|
||||
"""Sleep until the configured frame interval elapses since ``begin()``.
|
||||
|
||||
Sleeps at least 1ms to ensure cooperative yielding even when the loop
|
||||
is already over budget. Pass ``frame_time`` to override the
|
||||
per-iteration target (useful when callers vary the sleep target
|
||||
based on runtime state).
|
||||
Returns immediately when the loop is already over budget — does NOT
|
||||
impose a 1 ms floor, which would cap achievable rate at ~750 fps and
|
||||
add jitter to over-budget iterations. Pass ``frame_time`` to override
|
||||
the per-iteration target.
|
||||
"""
|
||||
ft = self._frame_time if frame_time is None else frame_time
|
||||
elapsed = time.perf_counter() - self._loop_start
|
||||
self._last_dt = elapsed
|
||||
self._sum_dt += elapsed
|
||||
self._count += 1
|
||||
time.sleep(max(ft - elapsed, 0.001))
|
||||
remaining = ft - elapsed
|
||||
if remaining > 0:
|
||||
time.sleep(remaining)
|
||||
|
||||
@property
|
||||
def stats(self) -> dict:
|
||||
|
||||
Reference in New Issue
Block a user