perf(processing): event-driven frame hand-off and scheduling fixes

- LiveStream: add frame_id counter + Condition with wait_for_new_frame()
  helper. Producers (ScreenCaptureLiveStream, ProcessedLiveStream,
  StaticImageLiveStream, VideoCaptureLiveStream) now signal_new_frame()
  on each new frame; consumers (PictureColorStripStream, ProcessedLive
  Stream) wait on the event with frame_time as a safety timeout
  instead of polling + sleeping. Cuts glass-to-LED latency at matched
  FPS by up to one frame_time.
- ProcessedLiveStream ring buffer: 3 -> 5 slots. The previous "max 2
  frames in flight" assumption ignored the multi-consumer case where
  several PictureColorStripStream/HA-target threads can hold the same
  _latest_frame reference while we wrap. 5 slots gives ~83 ms of
  consumer-read margin at 60 FPS.
- PictureColorStripStream advanced mode: reuse the already-fetched
  primary frame instead of re-acquiring its lock from _live_streams.
- _blend_u16: use cv2.addWeighted (single SIMD-fused pass) when cv2
  is available; numpy fallback unchanged. Output verified bit-equal
  to the existing 6-pass implementation.
- FrameLimiter.wait: drop the 1 ms minimum-sleep floor. Over-budget
  loops no longer add an extra ms per iteration; the cap on achievable
  rate (~750 fps) is removed.
This commit is contained in:
2026-05-12 15:06:11 +03:00
parent f184ef0afb
commit ee4fa81376
4 changed files with 117 additions and 22 deletions
@@ -7,6 +7,13 @@ from typing import Optional
import numpy as np
try:
import cv2
_HAS_CV2 = True
except ImportError:
_HAS_CV2 = False
from ledgrab.core.capture.calibration import (
AdvancedPixelMapper,
CalibrationConfig,
@@ -203,6 +210,11 @@ class PictureColorStripStream(ColorStripStream):
def _processing_loop(self) -> None:
"""Background thread: poll source, process, cache colors."""
cached_frame = None
# Track the source LiveStream's frame_id for event-driven waits.
# When the source publishes a new frame, ``wait_for_new_frame`` wakes
# us immediately instead of waiting for our own polling tick — cuts
# glass-to-LED latency by up to a full frame_time at matched FPS.
last_source_id = self._live_stream.current_frame_id
# Scratch buffer pool (pre-allocated, resized when LED count changes)
_pool_n = 0
@@ -213,8 +225,14 @@ class PictureColorStripStream(ColorStripStream):
def _blend_u16(a, b, alpha_b, out):
"""Blend two uint8 arrays: out = ((256-alpha_b)*a + alpha_b*b) >> 8.
Uses pre-allocated uint16 scratch buffers (_u16_a, _u16_b).
Uses ``cv2.addWeighted`` (single SIMD-fused pass) when available;
falls back to a 6-pass numpy implementation using pre-allocated
uint16 scratch buffers (_u16_a, _u16_b).
"""
if _HAS_CV2:
w_b = alpha_b / 256.0
cv2.addWeighted(a, 1.0 - w_b, b, w_b, 0.0, dst=out)
return
nonlocal _u16_a, _u16_b
np.copyto(_u16_a, a, casting="unsafe")
np.copyto(_u16_b, b, casting="unsafe")
@@ -229,6 +247,7 @@ class PictureColorStripStream(ColorStripStream):
try:
with high_resolution_timer():
while self._running:
loop_start = time.perf_counter()
limiter.begin()
frame_time = self._frame_time
@@ -236,10 +255,19 @@ class PictureColorStripStream(ColorStripStream):
frame = self._live_stream.get_latest_frame()
if frame is None or frame is cached_frame:
limiter.wait(frame_time)
# Event-driven wait on the source: returns
# immediately when a new frame lands, or after
# frame_time as a safety timeout.
elapsed = time.perf_counter() - loop_start
remaining = max(frame_time - elapsed, 0.0)
if remaining > 0:
last_source_id = self._live_stream.wait_for_new_frame(
last_source_id, timeout=remaining
)
continue
cached_frame = frame
last_source_id = self._live_stream.current_frame_id
t0 = time.perf_counter()
# Record the new frame in the rolling 1s window
@@ -255,9 +283,14 @@ class PictureColorStripStream(ColorStripStream):
mapper = self._pixel_mapper
if isinstance(mapper, AdvancedPixelMapper):
# Advanced mode: gather frames from all live streams
# Advanced mode: gather frames from all live streams.
# Reuse the already-sampled primary frame to avoid
# an extra lock acquisition on it.
frames_dict = {}
for ps_id, ls in self._live_streams.items():
if ls is self._live_stream:
frames_dict[ps_id] = frame
else:
f = ls.get_latest_frame()
if f is not None:
frames_dict[ps_id] = f
@@ -31,9 +31,17 @@ class LiveStream(ABC):
"""Abstract base for a runtime frame source.
A LiveStream produces frames at some frequency. Consumers call
get_latest_frame() to read the most recent frame (non-blocking).
get_latest_frame() to read the most recent frame (non-blocking),
or wait_for_new_frame() for event-driven consumption.
"""
def __init__(self) -> None:
# Monotonic frame counter incremented by ``_signal_new_frame``.
# Consumers track the last value they saw to detect new frames
# without busy-polling.
self._frame_id: int = 0
self._frame_cond: threading.Condition = threading.Condition()
@property
@abstractmethod
def target_fps(self) -> int:
@@ -60,6 +68,35 @@ class LiveStream(ABC):
ScreenCapture with image data (RGB), or None if no frame available yet.
"""
@property
def current_frame_id(self) -> int:
"""Current monotonic frame ID. Increments on each new frame."""
return self._frame_id
def _signal_new_frame(self) -> None:
"""Producer: increment frame ID and notify all waiting consumers."""
with self._frame_cond:
self._frame_id += 1
self._frame_cond.notify_all()
def wait_for_new_frame(self, last_seen_id: int, timeout: float) -> int:
"""Consumer: block until a frame newer than ``last_seen_id`` arrives.
Args:
last_seen_id: Last frame ID the caller has already processed.
timeout: Maximum seconds to wait. ``0`` returns immediately.
Returns:
Current ``frame_id`` (may equal ``last_seen_id`` if timed out).
"""
if timeout <= 0:
return self._frame_id
with self._frame_cond:
if self._frame_id != last_seen_id:
return self._frame_id
self._frame_cond.wait(timeout=timeout)
return self._frame_id
class ScreenCaptureLiveStream(LiveStream):
"""Live stream backed by a CaptureStream with a dedicated capture thread.
@@ -73,6 +110,7 @@ class ScreenCaptureLiveStream(LiveStream):
"""
def __init__(self, capture_stream: CaptureStream, fps: int):
super().__init__()
self._capture_stream = capture_stream
self._fps = fps
self._frame_time = 1.0 / fps if fps > 0 else 1.0
@@ -139,6 +177,7 @@ class ScreenCaptureLiveStream(LiveStream):
if frame is not None:
with self._frame_lock:
self._latest_frame = frame
self._signal_new_frame()
consecutive_errors = 0
else:
# Small sleep when no frame available to avoid CPU spinning
@@ -181,6 +220,7 @@ class ProcessedLiveStream(LiveStream):
source: LiveStream,
filters: List[PostprocessingFilter],
):
super().__init__()
self._source = source
self._filters = filters
self._image_pool = ImagePool()
@@ -234,15 +274,22 @@ class ProcessedLiveStream(LiveStream):
def _process_loop(self) -> None:
"""Background thread: poll source, apply filters, cache result."""
cached_source_frame: Optional[ScreenCapture] = None
# Ring buffer: 3 slots guarantees consumer finished with oldest buffer.
# At most 2 frames are in flight (one in _latest_frame, one being
# processed by a consumer), so the 3rd slot is always safe to reuse.
_ring: List[Optional[np.ndarray]] = [None, None, None]
# Ring buffer: 5 slots gives a safety margin for the multi-consumer
# case (multiple PictureColorStripStream/HA target threads can hold
# the same _latest_frame reference while we wrap around). At 60 FPS
# the 5-slot rotation gives any consumer ~83 ms to finish reading
# before the slot is overwritten — well above any realistic
# extract→map→smooth tick.
_RING_SIZE = 5
_ring: List[Optional[np.ndarray]] = [None] * _RING_SIZE
_ring_idx = 0
# Separate buffer for idle-tick source copies (not part of the ring buffer)
_idle_src_buf: Optional[np.ndarray] = None
fps = self.target_fps
frame_time = 1.0 / fps if fps > 0 else 1.0
# Track the source's frame_id so we can wait event-driven for new
# frames instead of polling + sleeping.
last_source_id = self._source.current_frame_id
try:
with high_resolution_timer():
@@ -278,13 +325,20 @@ class ProcessedLiveStream(LiveStream):
)
with self._frame_lock:
self._latest_frame = processed
self._signal_new_frame()
# Event-driven wait: blocks until source produces a
# new frame, with frame_time as a safety timeout.
elapsed = time.perf_counter() - loop_start
remaining = frame_time - elapsed
time.sleep(max(remaining, 0.001))
remaining = max(frame_time - elapsed, 0.0)
if remaining > 0:
last_source_id = self._source.wait_for_new_frame(
last_source_id, timeout=remaining
)
continue
cached_source_frame = source_frame
last_source_id = self._source.current_frame_id
# Reuse ring buffer slot instead of allocating a new copy each frame
src = source_frame.image
@@ -293,7 +347,7 @@ class ProcessedLiveStream(LiveStream):
if buf is None or buf.shape != (h, w, c):
buf = np.empty((h, w, c), dtype=np.uint8)
_ring[_ring_idx] = buf
_ring_idx = (_ring_idx + 1) % 3
_ring_idx = (_ring_idx + 1) % _RING_SIZE
np.copyto(buf, src)
image = buf
@@ -315,6 +369,7 @@ class ProcessedLiveStream(LiveStream):
)
with self._frame_lock:
self._latest_frame = processed
self._signal_new_frame()
except Exception as e:
logger.error(f"Filter processing error: {e}")
time.sleep(0.01)
@@ -328,9 +383,12 @@ class StaticImageLiveStream(LiveStream):
"""Live stream that always returns the same static image."""
def __init__(self, image: np.ndarray):
super().__init__()
self._image = image
h, w = image.shape[:2]
self._frame = ScreenCapture(image=image, width=w, height=h, display_index=-1)
# Bump frame_id once so consumers waiting on it see a "new" frame.
self._signal_new_frame()
@property
def target_fps(self) -> int:
@@ -144,6 +144,7 @@ class VideoCaptureLiveStream(LiveStream):
target_fps: int = 30,
):
_require_cv2()
super().__init__()
self._original_url = url
self._resolved_url: Optional[str] = None
self._loop = loop
@@ -348,6 +349,7 @@ class VideoCaptureLiveStream(LiveStream):
sc = ScreenCapture(image=buf, width=w, height=h, display_index=-1)
with self._frame_lock:
self._latest_frame = sc
self._signal_new_frame()
except Exception as e:
consecutive_errors += 1
+10 -8
View File
@@ -6,9 +6,9 @@ import time
class FrameLimiter:
"""Sleep helper to pace a streaming loop at a target FPS.
Tracks per-frame timing and exposes simple stats. The minimum sleep is
clamped to 1ms (matches the historical ``max(..., 0.001)`` guard) so a
bound is always present even when the loop is over budget.
Tracks per-frame timing and exposes simple stats. When the loop is over
budget ``wait()`` returns immediately rather than sleeping, so a slow
iteration doesn't compound additional latency.
"""
def __init__(self, target_fps: float):
@@ -39,17 +39,19 @@ class FrameLimiter:
def wait(self, frame_time: float | None = None) -> None:
"""Sleep until the configured frame interval elapses since ``begin()``.
Sleeps at least 1ms to ensure cooperative yielding even when the loop
is already over budget. Pass ``frame_time`` to override the
per-iteration target (useful when callers vary the sleep target
based on runtime state).
Returns immediately when the loop is already over budget — does NOT
impose a 1 ms floor, which would cap achievable rate at ~750 fps and
add jitter to over-budget iterations. Pass ``frame_time`` to override
the per-iteration target.
"""
ft = self._frame_time if frame_time is None else frame_time
elapsed = time.perf_counter() - self._loop_start
self._last_dt = elapsed
self._sum_dt += elapsed
self._count += 1
time.sleep(max(ft - elapsed, 0.001))
remaining = ft - elapsed
if remaining > 0:
time.sleep(remaining)
@property
def stats(self) -> dict: