diff --git a/server/src/ledgrab/core/processing/color_strip/picture.py b/server/src/ledgrab/core/processing/color_strip/picture.py index 72f4943..c914781 100644 --- a/server/src/ledgrab/core/processing/color_strip/picture.py +++ b/server/src/ledgrab/core/processing/color_strip/picture.py @@ -7,6 +7,13 @@ from typing import Optional import numpy as np +try: + import cv2 + + _HAS_CV2 = True +except ImportError: + _HAS_CV2 = False + from ledgrab.core.capture.calibration import ( AdvancedPixelMapper, CalibrationConfig, @@ -203,6 +210,11 @@ class PictureColorStripStream(ColorStripStream): def _processing_loop(self) -> None: """Background thread: poll source, process, cache colors.""" cached_frame = None + # Track the source LiveStream's frame_id for event-driven waits. + # When the source publishes a new frame, ``wait_for_new_frame`` wakes + # us immediately instead of waiting for our own polling tick — cuts + # glass-to-LED latency by up to a full frame_time at matched FPS. + last_source_id = self._live_stream.current_frame_id # Scratch buffer pool (pre-allocated, resized when LED count changes) _pool_n = 0 @@ -213,8 +225,14 @@ class PictureColorStripStream(ColorStripStream): def _blend_u16(a, b, alpha_b, out): """Blend two uint8 arrays: out = ((256-alpha_b)*a + alpha_b*b) >> 8. - Uses pre-allocated uint16 scratch buffers (_u16_a, _u16_b). + Uses ``cv2.addWeighted`` (single SIMD-fused pass) when available; + falls back to a 6-pass numpy implementation using pre-allocated + uint16 scratch buffers (_u16_a, _u16_b). """ + if _HAS_CV2: + w_b = alpha_b / 256.0 + cv2.addWeighted(a, 1.0 - w_b, b, w_b, 0.0, dst=out) + return nonlocal _u16_a, _u16_b np.copyto(_u16_a, a, casting="unsafe") np.copyto(_u16_b, b, casting="unsafe") @@ -229,6 +247,7 @@ class PictureColorStripStream(ColorStripStream): try: with high_resolution_timer(): while self._running: + loop_start = time.perf_counter() limiter.begin() frame_time = self._frame_time @@ -236,10 +255,19 @@ class PictureColorStripStream(ColorStripStream): frame = self._live_stream.get_latest_frame() if frame is None or frame is cached_frame: - limiter.wait(frame_time) + # Event-driven wait on the source: returns + # immediately when a new frame lands, or after + # frame_time as a safety timeout. + elapsed = time.perf_counter() - loop_start + remaining = max(frame_time - elapsed, 0.0) + if remaining > 0: + last_source_id = self._live_stream.wait_for_new_frame( + last_source_id, timeout=remaining + ) continue cached_frame = frame + last_source_id = self._live_stream.current_frame_id t0 = time.perf_counter() # Record the new frame in the rolling 1s window @@ -255,12 +283,17 @@ class PictureColorStripStream(ColorStripStream): mapper = self._pixel_mapper if isinstance(mapper, AdvancedPixelMapper): - # Advanced mode: gather frames from all live streams + # Advanced mode: gather frames from all live streams. + # Reuse the already-sampled primary frame to avoid + # an extra lock acquisition on it. frames_dict = {} for ps_id, ls in self._live_streams.items(): - f = ls.get_latest_frame() - if f is not None: - frames_dict[ps_id] = f + if ls is self._live_stream: + frames_dict[ps_id] = frame + else: + f = ls.get_latest_frame() + if f is not None: + frames_dict[ps_id] = f t1 = time.perf_counter() led_colors = mapper.map_lines_to_leds(frames_dict) else: diff --git a/server/src/ledgrab/core/processing/live_stream.py b/server/src/ledgrab/core/processing/live_stream.py index 4335998..64f1365 100644 --- a/server/src/ledgrab/core/processing/live_stream.py +++ b/server/src/ledgrab/core/processing/live_stream.py @@ -31,9 +31,17 @@ class LiveStream(ABC): """Abstract base for a runtime frame source. A LiveStream produces frames at some frequency. Consumers call - get_latest_frame() to read the most recent frame (non-blocking). + get_latest_frame() to read the most recent frame (non-blocking), + or wait_for_new_frame() for event-driven consumption. """ + def __init__(self) -> None: + # Monotonic frame counter incremented by ``_signal_new_frame``. + # Consumers track the last value they saw to detect new frames + # without busy-polling. + self._frame_id: int = 0 + self._frame_cond: threading.Condition = threading.Condition() + @property @abstractmethod def target_fps(self) -> int: @@ -60,6 +68,35 @@ class LiveStream(ABC): ScreenCapture with image data (RGB), or None if no frame available yet. """ + @property + def current_frame_id(self) -> int: + """Current monotonic frame ID. Increments on each new frame.""" + return self._frame_id + + def _signal_new_frame(self) -> None: + """Producer: increment frame ID and notify all waiting consumers.""" + with self._frame_cond: + self._frame_id += 1 + self._frame_cond.notify_all() + + def wait_for_new_frame(self, last_seen_id: int, timeout: float) -> int: + """Consumer: block until a frame newer than ``last_seen_id`` arrives. + + Args: + last_seen_id: Last frame ID the caller has already processed. + timeout: Maximum seconds to wait. ``0`` returns immediately. + + Returns: + Current ``frame_id`` (may equal ``last_seen_id`` if timed out). + """ + if timeout <= 0: + return self._frame_id + with self._frame_cond: + if self._frame_id != last_seen_id: + return self._frame_id + self._frame_cond.wait(timeout=timeout) + return self._frame_id + class ScreenCaptureLiveStream(LiveStream): """Live stream backed by a CaptureStream with a dedicated capture thread. @@ -73,6 +110,7 @@ class ScreenCaptureLiveStream(LiveStream): """ def __init__(self, capture_stream: CaptureStream, fps: int): + super().__init__() self._capture_stream = capture_stream self._fps = fps self._frame_time = 1.0 / fps if fps > 0 else 1.0 @@ -139,6 +177,7 @@ class ScreenCaptureLiveStream(LiveStream): if frame is not None: with self._frame_lock: self._latest_frame = frame + self._signal_new_frame() consecutive_errors = 0 else: # Small sleep when no frame available to avoid CPU spinning @@ -181,6 +220,7 @@ class ProcessedLiveStream(LiveStream): source: LiveStream, filters: List[PostprocessingFilter], ): + super().__init__() self._source = source self._filters = filters self._image_pool = ImagePool() @@ -234,15 +274,22 @@ class ProcessedLiveStream(LiveStream): def _process_loop(self) -> None: """Background thread: poll source, apply filters, cache result.""" cached_source_frame: Optional[ScreenCapture] = None - # Ring buffer: 3 slots guarantees consumer finished with oldest buffer. - # At most 2 frames are in flight (one in _latest_frame, one being - # processed by a consumer), so the 3rd slot is always safe to reuse. - _ring: List[Optional[np.ndarray]] = [None, None, None] + # Ring buffer: 5 slots gives a safety margin for the multi-consumer + # case (multiple PictureColorStripStream/HA target threads can hold + # the same _latest_frame reference while we wrap around). At 60 FPS + # the 5-slot rotation gives any consumer ~83 ms to finish reading + # before the slot is overwritten — well above any realistic + # extract→map→smooth tick. + _RING_SIZE = 5 + _ring: List[Optional[np.ndarray]] = [None] * _RING_SIZE _ring_idx = 0 # Separate buffer for idle-tick source copies (not part of the ring buffer) _idle_src_buf: Optional[np.ndarray] = None fps = self.target_fps frame_time = 1.0 / fps if fps > 0 else 1.0 + # Track the source's frame_id so we can wait event-driven for new + # frames instead of polling + sleeping. + last_source_id = self._source.current_frame_id try: with high_resolution_timer(): @@ -278,13 +325,20 @@ class ProcessedLiveStream(LiveStream): ) with self._frame_lock: self._latest_frame = processed + self._signal_new_frame() + # Event-driven wait: blocks until source produces a + # new frame, with frame_time as a safety timeout. elapsed = time.perf_counter() - loop_start - remaining = frame_time - elapsed - time.sleep(max(remaining, 0.001)) + remaining = max(frame_time - elapsed, 0.0) + if remaining > 0: + last_source_id = self._source.wait_for_new_frame( + last_source_id, timeout=remaining + ) continue cached_source_frame = source_frame + last_source_id = self._source.current_frame_id # Reuse ring buffer slot instead of allocating a new copy each frame src = source_frame.image @@ -293,7 +347,7 @@ class ProcessedLiveStream(LiveStream): if buf is None or buf.shape != (h, w, c): buf = np.empty((h, w, c), dtype=np.uint8) _ring[_ring_idx] = buf - _ring_idx = (_ring_idx + 1) % 3 + _ring_idx = (_ring_idx + 1) % _RING_SIZE np.copyto(buf, src) image = buf @@ -315,6 +369,7 @@ class ProcessedLiveStream(LiveStream): ) with self._frame_lock: self._latest_frame = processed + self._signal_new_frame() except Exception as e: logger.error(f"Filter processing error: {e}") time.sleep(0.01) @@ -328,9 +383,12 @@ class StaticImageLiveStream(LiveStream): """Live stream that always returns the same static image.""" def __init__(self, image: np.ndarray): + super().__init__() self._image = image h, w = image.shape[:2] self._frame = ScreenCapture(image=image, width=w, height=h, display_index=-1) + # Bump frame_id once so consumers waiting on it see a "new" frame. + self._signal_new_frame() @property def target_fps(self) -> int: diff --git a/server/src/ledgrab/core/processing/video_stream.py b/server/src/ledgrab/core/processing/video_stream.py index a0ca16a..c9b7100 100644 --- a/server/src/ledgrab/core/processing/video_stream.py +++ b/server/src/ledgrab/core/processing/video_stream.py @@ -144,6 +144,7 @@ class VideoCaptureLiveStream(LiveStream): target_fps: int = 30, ): _require_cv2() + super().__init__() self._original_url = url self._resolved_url: Optional[str] = None self._loop = loop @@ -348,6 +349,7 @@ class VideoCaptureLiveStream(LiveStream): sc = ScreenCapture(image=buf, width=w, height=h, display_index=-1) with self._frame_lock: self._latest_frame = sc + self._signal_new_frame() except Exception as e: consecutive_errors += 1 diff --git a/server/src/ledgrab/utils/frame_limiter.py b/server/src/ledgrab/utils/frame_limiter.py index 26e4867..4ef30a0 100644 --- a/server/src/ledgrab/utils/frame_limiter.py +++ b/server/src/ledgrab/utils/frame_limiter.py @@ -6,9 +6,9 @@ import time class FrameLimiter: """Sleep helper to pace a streaming loop at a target FPS. - Tracks per-frame timing and exposes simple stats. The minimum sleep is - clamped to 1ms (matches the historical ``max(..., 0.001)`` guard) so a - bound is always present even when the loop is over budget. + Tracks per-frame timing and exposes simple stats. When the loop is over + budget ``wait()`` returns immediately rather than sleeping, so a slow + iteration doesn't compound additional latency. """ def __init__(self, target_fps: float): @@ -39,17 +39,19 @@ class FrameLimiter: def wait(self, frame_time: float | None = None) -> None: """Sleep until the configured frame interval elapses since ``begin()``. - Sleeps at least 1ms to ensure cooperative yielding even when the loop - is already over budget. Pass ``frame_time`` to override the - per-iteration target (useful when callers vary the sleep target - based on runtime state). + Returns immediately when the loop is already over budget — does NOT + impose a 1 ms floor, which would cap achievable rate at ~750 fps and + add jitter to over-budget iterations. Pass ``frame_time`` to override + the per-iteration target. """ ft = self._frame_time if frame_time is None else frame_time elapsed = time.perf_counter() - self._loop_start self._last_dt = elapsed self._sum_dt += elapsed self._count += 1 - time.sleep(max(ft - elapsed, 0.001)) + remaining = ft - elapsed + if remaining > 0: + time.sleep(remaining) @property def stats(self) -> dict: