perf(processing): event-driven frame hand-off and scheduling fixes
- LiveStream: add frame_id counter + Condition with wait_for_new_frame() helper. Producers (ScreenCaptureLiveStream, ProcessedLiveStream, StaticImageLiveStream, VideoCaptureLiveStream) now signal_new_frame() on each new frame; consumers (PictureColorStripStream, ProcessedLive Stream) wait on the event with frame_time as a safety timeout instead of polling + sleeping. Cuts glass-to-LED latency at matched FPS by up to one frame_time. - ProcessedLiveStream ring buffer: 3 -> 5 slots. The previous "max 2 frames in flight" assumption ignored the multi-consumer case where several PictureColorStripStream/HA-target threads can hold the same _latest_frame reference while we wrap. 5 slots gives ~83 ms of consumer-read margin at 60 FPS. - PictureColorStripStream advanced mode: reuse the already-fetched primary frame instead of re-acquiring its lock from _live_streams. - _blend_u16: use cv2.addWeighted (single SIMD-fused pass) when cv2 is available; numpy fallback unchanged. Output verified bit-equal to the existing 6-pass implementation. - FrameLimiter.wait: drop the 1 ms minimum-sleep floor. Over-budget loops no longer add an extra ms per iteration; the cap on achievable rate (~750 fps) is removed.
This commit is contained in:
@@ -7,6 +7,13 @@ from typing import Optional
|
|||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
try:
|
||||||
|
import cv2
|
||||||
|
|
||||||
|
_HAS_CV2 = True
|
||||||
|
except ImportError:
|
||||||
|
_HAS_CV2 = False
|
||||||
|
|
||||||
from ledgrab.core.capture.calibration import (
|
from ledgrab.core.capture.calibration import (
|
||||||
AdvancedPixelMapper,
|
AdvancedPixelMapper,
|
||||||
CalibrationConfig,
|
CalibrationConfig,
|
||||||
@@ -203,6 +210,11 @@ class PictureColorStripStream(ColorStripStream):
|
|||||||
def _processing_loop(self) -> None:
|
def _processing_loop(self) -> None:
|
||||||
"""Background thread: poll source, process, cache colors."""
|
"""Background thread: poll source, process, cache colors."""
|
||||||
cached_frame = None
|
cached_frame = None
|
||||||
|
# Track the source LiveStream's frame_id for event-driven waits.
|
||||||
|
# When the source publishes a new frame, ``wait_for_new_frame`` wakes
|
||||||
|
# us immediately instead of waiting for our own polling tick — cuts
|
||||||
|
# glass-to-LED latency by up to a full frame_time at matched FPS.
|
||||||
|
last_source_id = self._live_stream.current_frame_id
|
||||||
|
|
||||||
# Scratch buffer pool (pre-allocated, resized when LED count changes)
|
# Scratch buffer pool (pre-allocated, resized when LED count changes)
|
||||||
_pool_n = 0
|
_pool_n = 0
|
||||||
@@ -213,8 +225,14 @@ class PictureColorStripStream(ColorStripStream):
|
|||||||
def _blend_u16(a, b, alpha_b, out):
|
def _blend_u16(a, b, alpha_b, out):
|
||||||
"""Blend two uint8 arrays: out = ((256-alpha_b)*a + alpha_b*b) >> 8.
|
"""Blend two uint8 arrays: out = ((256-alpha_b)*a + alpha_b*b) >> 8.
|
||||||
|
|
||||||
Uses pre-allocated uint16 scratch buffers (_u16_a, _u16_b).
|
Uses ``cv2.addWeighted`` (single SIMD-fused pass) when available;
|
||||||
|
falls back to a 6-pass numpy implementation using pre-allocated
|
||||||
|
uint16 scratch buffers (_u16_a, _u16_b).
|
||||||
"""
|
"""
|
||||||
|
if _HAS_CV2:
|
||||||
|
w_b = alpha_b / 256.0
|
||||||
|
cv2.addWeighted(a, 1.0 - w_b, b, w_b, 0.0, dst=out)
|
||||||
|
return
|
||||||
nonlocal _u16_a, _u16_b
|
nonlocal _u16_a, _u16_b
|
||||||
np.copyto(_u16_a, a, casting="unsafe")
|
np.copyto(_u16_a, a, casting="unsafe")
|
||||||
np.copyto(_u16_b, b, casting="unsafe")
|
np.copyto(_u16_b, b, casting="unsafe")
|
||||||
@@ -229,6 +247,7 @@ class PictureColorStripStream(ColorStripStream):
|
|||||||
try:
|
try:
|
||||||
with high_resolution_timer():
|
with high_resolution_timer():
|
||||||
while self._running:
|
while self._running:
|
||||||
|
loop_start = time.perf_counter()
|
||||||
limiter.begin()
|
limiter.begin()
|
||||||
frame_time = self._frame_time
|
frame_time = self._frame_time
|
||||||
|
|
||||||
@@ -236,10 +255,19 @@ class PictureColorStripStream(ColorStripStream):
|
|||||||
frame = self._live_stream.get_latest_frame()
|
frame = self._live_stream.get_latest_frame()
|
||||||
|
|
||||||
if frame is None or frame is cached_frame:
|
if frame is None or frame is cached_frame:
|
||||||
limiter.wait(frame_time)
|
# Event-driven wait on the source: returns
|
||||||
|
# immediately when a new frame lands, or after
|
||||||
|
# frame_time as a safety timeout.
|
||||||
|
elapsed = time.perf_counter() - loop_start
|
||||||
|
remaining = max(frame_time - elapsed, 0.0)
|
||||||
|
if remaining > 0:
|
||||||
|
last_source_id = self._live_stream.wait_for_new_frame(
|
||||||
|
last_source_id, timeout=remaining
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
cached_frame = frame
|
cached_frame = frame
|
||||||
|
last_source_id = self._live_stream.current_frame_id
|
||||||
|
|
||||||
t0 = time.perf_counter()
|
t0 = time.perf_counter()
|
||||||
# Record the new frame in the rolling 1s window
|
# Record the new frame in the rolling 1s window
|
||||||
@@ -255,12 +283,17 @@ class PictureColorStripStream(ColorStripStream):
|
|||||||
mapper = self._pixel_mapper
|
mapper = self._pixel_mapper
|
||||||
|
|
||||||
if isinstance(mapper, AdvancedPixelMapper):
|
if isinstance(mapper, AdvancedPixelMapper):
|
||||||
# Advanced mode: gather frames from all live streams
|
# Advanced mode: gather frames from all live streams.
|
||||||
|
# Reuse the already-sampled primary frame to avoid
|
||||||
|
# an extra lock acquisition on it.
|
||||||
frames_dict = {}
|
frames_dict = {}
|
||||||
for ps_id, ls in self._live_streams.items():
|
for ps_id, ls in self._live_streams.items():
|
||||||
f = ls.get_latest_frame()
|
if ls is self._live_stream:
|
||||||
if f is not None:
|
frames_dict[ps_id] = frame
|
||||||
frames_dict[ps_id] = f
|
else:
|
||||||
|
f = ls.get_latest_frame()
|
||||||
|
if f is not None:
|
||||||
|
frames_dict[ps_id] = f
|
||||||
t1 = time.perf_counter()
|
t1 = time.perf_counter()
|
||||||
led_colors = mapper.map_lines_to_leds(frames_dict)
|
led_colors = mapper.map_lines_to_leds(frames_dict)
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -31,9 +31,17 @@ class LiveStream(ABC):
|
|||||||
"""Abstract base for a runtime frame source.
|
"""Abstract base for a runtime frame source.
|
||||||
|
|
||||||
A LiveStream produces frames at some frequency. Consumers call
|
A LiveStream produces frames at some frequency. Consumers call
|
||||||
get_latest_frame() to read the most recent frame (non-blocking).
|
get_latest_frame() to read the most recent frame (non-blocking),
|
||||||
|
or wait_for_new_frame() for event-driven consumption.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
# Monotonic frame counter incremented by ``_signal_new_frame``.
|
||||||
|
# Consumers track the last value they saw to detect new frames
|
||||||
|
# without busy-polling.
|
||||||
|
self._frame_id: int = 0
|
||||||
|
self._frame_cond: threading.Condition = threading.Condition()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def target_fps(self) -> int:
|
def target_fps(self) -> int:
|
||||||
@@ -60,6 +68,35 @@ class LiveStream(ABC):
|
|||||||
ScreenCapture with image data (RGB), or None if no frame available yet.
|
ScreenCapture with image data (RGB), or None if no frame available yet.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def current_frame_id(self) -> int:
|
||||||
|
"""Current monotonic frame ID. Increments on each new frame."""
|
||||||
|
return self._frame_id
|
||||||
|
|
||||||
|
def _signal_new_frame(self) -> None:
|
||||||
|
"""Producer: increment frame ID and notify all waiting consumers."""
|
||||||
|
with self._frame_cond:
|
||||||
|
self._frame_id += 1
|
||||||
|
self._frame_cond.notify_all()
|
||||||
|
|
||||||
|
def wait_for_new_frame(self, last_seen_id: int, timeout: float) -> int:
|
||||||
|
"""Consumer: block until a frame newer than ``last_seen_id`` arrives.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
last_seen_id: Last frame ID the caller has already processed.
|
||||||
|
timeout: Maximum seconds to wait. ``0`` returns immediately.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Current ``frame_id`` (may equal ``last_seen_id`` if timed out).
|
||||||
|
"""
|
||||||
|
if timeout <= 0:
|
||||||
|
return self._frame_id
|
||||||
|
with self._frame_cond:
|
||||||
|
if self._frame_id != last_seen_id:
|
||||||
|
return self._frame_id
|
||||||
|
self._frame_cond.wait(timeout=timeout)
|
||||||
|
return self._frame_id
|
||||||
|
|
||||||
|
|
||||||
class ScreenCaptureLiveStream(LiveStream):
|
class ScreenCaptureLiveStream(LiveStream):
|
||||||
"""Live stream backed by a CaptureStream with a dedicated capture thread.
|
"""Live stream backed by a CaptureStream with a dedicated capture thread.
|
||||||
@@ -73,6 +110,7 @@ class ScreenCaptureLiveStream(LiveStream):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, capture_stream: CaptureStream, fps: int):
|
def __init__(self, capture_stream: CaptureStream, fps: int):
|
||||||
|
super().__init__()
|
||||||
self._capture_stream = capture_stream
|
self._capture_stream = capture_stream
|
||||||
self._fps = fps
|
self._fps = fps
|
||||||
self._frame_time = 1.0 / fps if fps > 0 else 1.0
|
self._frame_time = 1.0 / fps if fps > 0 else 1.0
|
||||||
@@ -139,6 +177,7 @@ class ScreenCaptureLiveStream(LiveStream):
|
|||||||
if frame is not None:
|
if frame is not None:
|
||||||
with self._frame_lock:
|
with self._frame_lock:
|
||||||
self._latest_frame = frame
|
self._latest_frame = frame
|
||||||
|
self._signal_new_frame()
|
||||||
consecutive_errors = 0
|
consecutive_errors = 0
|
||||||
else:
|
else:
|
||||||
# Small sleep when no frame available to avoid CPU spinning
|
# Small sleep when no frame available to avoid CPU spinning
|
||||||
@@ -181,6 +220,7 @@ class ProcessedLiveStream(LiveStream):
|
|||||||
source: LiveStream,
|
source: LiveStream,
|
||||||
filters: List[PostprocessingFilter],
|
filters: List[PostprocessingFilter],
|
||||||
):
|
):
|
||||||
|
super().__init__()
|
||||||
self._source = source
|
self._source = source
|
||||||
self._filters = filters
|
self._filters = filters
|
||||||
self._image_pool = ImagePool()
|
self._image_pool = ImagePool()
|
||||||
@@ -234,15 +274,22 @@ class ProcessedLiveStream(LiveStream):
|
|||||||
def _process_loop(self) -> None:
|
def _process_loop(self) -> None:
|
||||||
"""Background thread: poll source, apply filters, cache result."""
|
"""Background thread: poll source, apply filters, cache result."""
|
||||||
cached_source_frame: Optional[ScreenCapture] = None
|
cached_source_frame: Optional[ScreenCapture] = None
|
||||||
# Ring buffer: 3 slots guarantees consumer finished with oldest buffer.
|
# Ring buffer: 5 slots gives a safety margin for the multi-consumer
|
||||||
# At most 2 frames are in flight (one in _latest_frame, one being
|
# case (multiple PictureColorStripStream/HA target threads can hold
|
||||||
# processed by a consumer), so the 3rd slot is always safe to reuse.
|
# the same _latest_frame reference while we wrap around). At 60 FPS
|
||||||
_ring: List[Optional[np.ndarray]] = [None, None, None]
|
# the 5-slot rotation gives any consumer ~83 ms to finish reading
|
||||||
|
# before the slot is overwritten — well above any realistic
|
||||||
|
# extract→map→smooth tick.
|
||||||
|
_RING_SIZE = 5
|
||||||
|
_ring: List[Optional[np.ndarray]] = [None] * _RING_SIZE
|
||||||
_ring_idx = 0
|
_ring_idx = 0
|
||||||
# Separate buffer for idle-tick source copies (not part of the ring buffer)
|
# Separate buffer for idle-tick source copies (not part of the ring buffer)
|
||||||
_idle_src_buf: Optional[np.ndarray] = None
|
_idle_src_buf: Optional[np.ndarray] = None
|
||||||
fps = self.target_fps
|
fps = self.target_fps
|
||||||
frame_time = 1.0 / fps if fps > 0 else 1.0
|
frame_time = 1.0 / fps if fps > 0 else 1.0
|
||||||
|
# Track the source's frame_id so we can wait event-driven for new
|
||||||
|
# frames instead of polling + sleeping.
|
||||||
|
last_source_id = self._source.current_frame_id
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with high_resolution_timer():
|
with high_resolution_timer():
|
||||||
@@ -278,13 +325,20 @@ class ProcessedLiveStream(LiveStream):
|
|||||||
)
|
)
|
||||||
with self._frame_lock:
|
with self._frame_lock:
|
||||||
self._latest_frame = processed
|
self._latest_frame = processed
|
||||||
|
self._signal_new_frame()
|
||||||
|
|
||||||
|
# Event-driven wait: blocks until source produces a
|
||||||
|
# new frame, with frame_time as a safety timeout.
|
||||||
elapsed = time.perf_counter() - loop_start
|
elapsed = time.perf_counter() - loop_start
|
||||||
remaining = frame_time - elapsed
|
remaining = max(frame_time - elapsed, 0.0)
|
||||||
time.sleep(max(remaining, 0.001))
|
if remaining > 0:
|
||||||
|
last_source_id = self._source.wait_for_new_frame(
|
||||||
|
last_source_id, timeout=remaining
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
cached_source_frame = source_frame
|
cached_source_frame = source_frame
|
||||||
|
last_source_id = self._source.current_frame_id
|
||||||
|
|
||||||
# Reuse ring buffer slot instead of allocating a new copy each frame
|
# Reuse ring buffer slot instead of allocating a new copy each frame
|
||||||
src = source_frame.image
|
src = source_frame.image
|
||||||
@@ -293,7 +347,7 @@ class ProcessedLiveStream(LiveStream):
|
|||||||
if buf is None or buf.shape != (h, w, c):
|
if buf is None or buf.shape != (h, w, c):
|
||||||
buf = np.empty((h, w, c), dtype=np.uint8)
|
buf = np.empty((h, w, c), dtype=np.uint8)
|
||||||
_ring[_ring_idx] = buf
|
_ring[_ring_idx] = buf
|
||||||
_ring_idx = (_ring_idx + 1) % 3
|
_ring_idx = (_ring_idx + 1) % _RING_SIZE
|
||||||
|
|
||||||
np.copyto(buf, src)
|
np.copyto(buf, src)
|
||||||
image = buf
|
image = buf
|
||||||
@@ -315,6 +369,7 @@ class ProcessedLiveStream(LiveStream):
|
|||||||
)
|
)
|
||||||
with self._frame_lock:
|
with self._frame_lock:
|
||||||
self._latest_frame = processed
|
self._latest_frame = processed
|
||||||
|
self._signal_new_frame()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Filter processing error: {e}")
|
logger.error(f"Filter processing error: {e}")
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
@@ -328,9 +383,12 @@ class StaticImageLiveStream(LiveStream):
|
|||||||
"""Live stream that always returns the same static image."""
|
"""Live stream that always returns the same static image."""
|
||||||
|
|
||||||
def __init__(self, image: np.ndarray):
|
def __init__(self, image: np.ndarray):
|
||||||
|
super().__init__()
|
||||||
self._image = image
|
self._image = image
|
||||||
h, w = image.shape[:2]
|
h, w = image.shape[:2]
|
||||||
self._frame = ScreenCapture(image=image, width=w, height=h, display_index=-1)
|
self._frame = ScreenCapture(image=image, width=w, height=h, display_index=-1)
|
||||||
|
# Bump frame_id once so consumers waiting on it see a "new" frame.
|
||||||
|
self._signal_new_frame()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def target_fps(self) -> int:
|
def target_fps(self) -> int:
|
||||||
|
|||||||
@@ -144,6 +144,7 @@ class VideoCaptureLiveStream(LiveStream):
|
|||||||
target_fps: int = 30,
|
target_fps: int = 30,
|
||||||
):
|
):
|
||||||
_require_cv2()
|
_require_cv2()
|
||||||
|
super().__init__()
|
||||||
self._original_url = url
|
self._original_url = url
|
||||||
self._resolved_url: Optional[str] = None
|
self._resolved_url: Optional[str] = None
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
@@ -348,6 +349,7 @@ class VideoCaptureLiveStream(LiveStream):
|
|||||||
sc = ScreenCapture(image=buf, width=w, height=h, display_index=-1)
|
sc = ScreenCapture(image=buf, width=w, height=h, display_index=-1)
|
||||||
with self._frame_lock:
|
with self._frame_lock:
|
||||||
self._latest_frame = sc
|
self._latest_frame = sc
|
||||||
|
self._signal_new_frame()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
consecutive_errors += 1
|
consecutive_errors += 1
|
||||||
|
|||||||
@@ -6,9 +6,9 @@ import time
|
|||||||
class FrameLimiter:
|
class FrameLimiter:
|
||||||
"""Sleep helper to pace a streaming loop at a target FPS.
|
"""Sleep helper to pace a streaming loop at a target FPS.
|
||||||
|
|
||||||
Tracks per-frame timing and exposes simple stats. The minimum sleep is
|
Tracks per-frame timing and exposes simple stats. When the loop is over
|
||||||
clamped to 1ms (matches the historical ``max(..., 0.001)`` guard) so a
|
budget ``wait()`` returns immediately rather than sleeping, so a slow
|
||||||
bound is always present even when the loop is over budget.
|
iteration doesn't compound additional latency.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, target_fps: float):
|
def __init__(self, target_fps: float):
|
||||||
@@ -39,17 +39,19 @@ class FrameLimiter:
|
|||||||
def wait(self, frame_time: float | None = None) -> None:
|
def wait(self, frame_time: float | None = None) -> None:
|
||||||
"""Sleep until the configured frame interval elapses since ``begin()``.
|
"""Sleep until the configured frame interval elapses since ``begin()``.
|
||||||
|
|
||||||
Sleeps at least 1ms to ensure cooperative yielding even when the loop
|
Returns immediately when the loop is already over budget — does NOT
|
||||||
is already over budget. Pass ``frame_time`` to override the
|
impose a 1 ms floor, which would cap achievable rate at ~750 fps and
|
||||||
per-iteration target (useful when callers vary the sleep target
|
add jitter to over-budget iterations. Pass ``frame_time`` to override
|
||||||
based on runtime state).
|
the per-iteration target.
|
||||||
"""
|
"""
|
||||||
ft = self._frame_time if frame_time is None else frame_time
|
ft = self._frame_time if frame_time is None else frame_time
|
||||||
elapsed = time.perf_counter() - self._loop_start
|
elapsed = time.perf_counter() - self._loop_start
|
||||||
self._last_dt = elapsed
|
self._last_dt = elapsed
|
||||||
self._sum_dt += elapsed
|
self._sum_dt += elapsed
|
||||||
self._count += 1
|
self._count += 1
|
||||||
time.sleep(max(ft - elapsed, 0.001))
|
remaining = ft - elapsed
|
||||||
|
if remaining > 0:
|
||||||
|
time.sleep(remaining)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def stats(self) -> dict:
|
def stats(self) -> dict:
|
||||||
|
|||||||
Reference in New Issue
Block a user