perf(capture): vectorize hot paths and fix engine bugs

- WGC: replace per-frame ~30 MB BGRA->RGB fancy-index allocation with
  cv2.cvtColor into a 3-slot pre-allocated RGB pool. Use gc.collect(0)
  on cleanup instead of full GC to avoid multi-hundred-ms stalls.
- MSS: switch from screenshot.rgb (pure-Python BGRA->RGB rebuild) to
  screenshot.raw + cv2.cvtColor into a pooled buffer. Add cheap 256-byte
  hash-based change detection so idle frames return None — matches
  DXcam/BetterCam semantics.
- DXcam/BetterCam: fix silent factory leak — Python name-mangling
  rewrote self._dxcam.__factory to _DXcamCaptureStream__factory inside
  the class body, so cleanup never reached the real attribute. Use
  getattr with string literal to bypass mangling.
- calculate_dominant_color: replace np.random.choice(replace=False)
  (full sort) with np.random.randint, and np.unique(axis=0) (lexsort)
  with packed-RGB np.bincount. ~10x faster on dominant mode.
- calibration._map_edge_average: switch cached scratch buffers from
  float64 to float32. Halves memory bandwidth on the dominant reduction
  path; range-safe up to 8K screens.
- All engines: per-frame DEBUG logs use structlog kwarg style instead
  of f-strings to avoid per-frame string allocation.
This commit is contained in:
2026-05-12 15:05:52 +03:00
parent ad84b60ae4
commit f184ef0afb
6 changed files with 192 additions and 70 deletions
+14 -11
View File
@@ -443,7 +443,10 @@ class PixelMapper:
axis = 1 axis = 1
edge_len = edge_pixels.shape[0] edge_len = edge_pixels.shape[0]
# Lazy-init / resize per-edge scratch buffers # Lazy-init / resize per-edge scratch buffers.
# float32 is sufficient: max cumsum value is edge_len * 255 (≈2M @ 8K
# screens) which fits exactly in float32's 24-bit mantissa. Halves
# memory bandwidth on the hot reduction.
cache = self._edge_cache.get(edge_name) cache = self._edge_cache.get(edge_name)
if cache is None or cache[0] != edge_len or cache[1] != led_count: if cache is None or cache[0] != edge_len or cache[1] != led_count:
step = edge_len / led_count step = edge_len / led_count
@@ -452,11 +455,11 @@ class PixelMapper:
np.minimum(boundaries, edge_len, out=boundaries) np.minimum(boundaries, edge_len, out=boundaries)
starts = boundaries[:-1] starts = boundaries[:-1]
ends = boundaries[1:] ends = boundaries[1:]
lengths = (ends - starts).reshape(-1, 1).astype(np.float64) lengths = (ends - starts).reshape(-1, 1).astype(np.float32)
cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float64) cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float32)
edge_1d_buf = np.empty((edge_len, 3), dtype=np.float64) edge_1d_buf = np.empty((edge_len, 3), dtype=np.float32)
sums_buf = np.empty((led_count, 3), dtype=np.float64) sums_buf = np.empty((led_count, 3), dtype=np.float32)
starts_buf = np.empty((led_count, 3), dtype=np.float64) starts_buf = np.empty((led_count, 3), dtype=np.float32)
out_uint8 = np.empty((led_count, 3), dtype=np.uint8) out_uint8 = np.empty((led_count, 3), dtype=np.uint8)
cache = ( cache = (
edge_len, edge_len,
@@ -682,11 +685,11 @@ class AdvancedPixelMapper:
np.minimum(boundaries, edge_len, out=boundaries) np.minimum(boundaries, edge_len, out=boundaries)
starts = boundaries[:-1] starts = boundaries[:-1]
ends = boundaries[1:] ends = boundaries[1:]
lengths = (ends - starts).reshape(-1, 1).astype(np.float64) lengths = (ends - starts).reshape(-1, 1).astype(np.float32)
cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float64) cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float32)
edge_1d_buf = np.empty((edge_len, 3), dtype=np.float64) edge_1d_buf = np.empty((edge_len, 3), dtype=np.float32)
sums_buf = np.empty((led_count, 3), dtype=np.float64) sums_buf = np.empty((led_count, 3), dtype=np.float32)
starts_buf = np.empty((led_count, 3), dtype=np.float64) starts_buf = np.empty((led_count, 3), dtype=np.float32)
out_uint8 = np.empty((led_count, 3), dtype=np.uint8) out_uint8 = np.empty((led_count, 3), dtype=np.uint8)
cache = ( cache = (
edge_len, edge_len,
@@ -192,8 +192,11 @@ def extract_border_pixels(screen_capture: ScreenCapture, border_width: int = 10)
left = img[:, :border_width, :] left = img[:, :border_width, :]
logger.debug( logger.debug(
f"Extracted borders: top={top.shape}, right={right.shape}, " "Extracted borders",
f"bottom={bottom.shape}, left={left.shape}" top=top.shape,
right=right.shape,
bottom=bottom.shape,
left=left.shape,
) )
return BorderPixels( return BorderPixels(
@@ -303,6 +306,12 @@ def calculate_median_color(pixels: np.ndarray) -> tuple[int, int, int]:
def calculate_dominant_color(pixels: np.ndarray) -> tuple[int, int, int]: def calculate_dominant_color(pixels: np.ndarray) -> tuple[int, int, int]:
"""Calculate dominant color of a pixel region using simple clustering. """Calculate dominant color of a pixel region using simple clustering.
Quantizes to 32 levels/channel (5 bits/channel = 32K bins), packs into a
single uint32, then uses ``np.bincount`` to find the most common bin.
Sampling uses with-replacement (statistically equivalent for a dominant-bin
search and avoids the full sort that ``np.random.choice(replace=False)``
triggers internally).
Args: Args:
pixels: Pixel array (height, width, 3) pixels: Pixel array (height, width, 3)
@@ -312,28 +321,23 @@ def calculate_dominant_color(pixels: np.ndarray) -> tuple[int, int, int]:
if pixels.size == 0: if pixels.size == 0:
return (0, 0, 0) return (0, 0, 0)
# Reshape to (n_pixels, 3)
pixels_reshaped = pixels.reshape(-1, 3) pixels_reshaped = pixels.reshape(-1, 3)
n = len(pixels_reshaped)
# For performance, sample pixels if there are too many
max_samples = 1000 max_samples = 1000
if len(pixels_reshaped) > max_samples: if n > max_samples:
indices = np.random.choice(len(pixels_reshaped), max_samples, replace=False) indices = np.random.randint(0, n, max_samples)
pixels_reshaped = pixels_reshaped[indices] pixels_reshaped = pixels_reshaped[indices]
# Simple dominant color: quantize colors and find most common # Quantize to 32 levels/channel (drop low 3 bits) and pack into uint32:
# Reduce color space to 32 levels per channel for binning # bits 10-14 = R, bits 5-9 = G, bits 0-4 = B → 32K possible bins.
quantized = (pixels_reshaped // 8) * 8 q = pixels_reshaped >> 3 # uint8 in [0,31]
packed = (q[:, 0].astype(np.uint32) << 10) | (q[:, 1].astype(np.uint32) << 5) | q[:, 2]
# Find unique colors and their counts counts = np.bincount(packed, minlength=1)
unique_colors, counts = np.unique(quantized, axis=0, return_counts=True) dominant_bin = int(np.argmax(counts))
# Get the most common color
dominant_idx = np.argmax(counts)
dominant_color = unique_colors[dominant_idx]
r = int(np.clip(dominant_color[0], 0, 255))
g = int(np.clip(dominant_color[1], 0, 255))
b = int(np.clip(dominant_color[2], 0, 255))
# Reconstruct 5-bit channels and shift back to 8-bit (centered in bin).
r = ((dominant_bin >> 10) & 0x1F) << 3
g = ((dominant_bin >> 5) & 0x1F) << 3
b = (dominant_bin & 0x1F) << 3
return (r, g, b) return (r, g, b)
@@ -35,12 +35,17 @@ class BetterCamCaptureStream(CaptureStream):
except ImportError: except ImportError:
raise RuntimeError("BetterCam not installed. Install with: pip install bettercam") raise RuntimeError("BetterCam not installed. Install with: pip install bettercam")
# Clear global camera cache for fresh DXGI state # Clear global camera cache for fresh DXGI state.
try: # NOTE: ``self._bettercam.__factory`` is name-mangled by Python to
self._bettercam.__factory.clean_up() # ``self._bettercam._BetterCamCaptureStream__factory`` because the
except Exception as e: # access appears inside a class body, which silently AttributeErrors.
logger.debug("BetterCam factory cleanup on init: %s", e) # Use string-based getattr to bypass mangling.
pass _factory = getattr(self._bettercam, "__factory", None)
if _factory is not None:
try:
_factory.clean_up()
except Exception as e:
logger.debug("BetterCam factory cleanup on init failed", error=str(e))
self._camera = self._bettercam.create( self._camera = self._bettercam.create(
output_idx=self.display_index, output_idx=self.display_index,
@@ -71,11 +76,12 @@ class BetterCamCaptureStream(CaptureStream):
self._camera = None self._camera = None
if self._bettercam: if self._bettercam:
try: _factory = getattr(self._bettercam, "__factory", None)
self._bettercam.__factory.clean_up() if _factory is not None:
except Exception as e: try:
logger.debug("BetterCam factory cleanup on teardown: %s", e) _factory.clean_up()
pass except Exception as e:
logger.debug("BetterCam factory cleanup on teardown failed", error=str(e))
self._initialized = False self._initialized = False
logger.info(f"BetterCam capture stream cleaned up (display={self.display_index})") logger.info(f"BetterCam capture stream cleaned up (display={self.display_index})")
@@ -109,8 +115,10 @@ class BetterCamCaptureStream(CaptureStream):
return None return None
logger.debug( logger.debug(
f"BetterCam captured display {self.display_index}: " "BetterCam captured frame",
f"{frame.shape[1]}x{frame.shape[0]}" display=self.display_index,
w=frame.shape[1],
h=frame.shape[0],
) )
return ScreenCapture( return ScreenCapture(
@@ -35,12 +35,17 @@ class DXcamCaptureStream(CaptureStream):
except ImportError: except ImportError:
raise RuntimeError("DXcam not installed. Install with: pip install dxcam") raise RuntimeError("DXcam not installed. Install with: pip install dxcam")
# Clear global camera cache for fresh DXGI state # Clear global camera cache for fresh DXGI state.
try: # NOTE: ``self._dxcam.__factory`` is name-mangled by Python to
self._dxcam.__factory.clean_up() # ``self._dxcam._DXcamCaptureStream__factory`` because the access
except Exception as e: # appears inside a class body, which silently AttributeErrors.
logger.debug("DXcam factory cleanup on init: %s", e) # Use string-based getattr to bypass mangling.
pass _factory = getattr(self._dxcam, "__factory", None)
if _factory is not None:
try:
_factory.clean_up()
except Exception as e:
logger.debug("DXcam factory cleanup on init failed", error=str(e))
self._camera = self._dxcam.create( self._camera = self._dxcam.create(
output_idx=self.display_index, output_idx=self.display_index,
@@ -69,11 +74,12 @@ class DXcamCaptureStream(CaptureStream):
self._camera = None self._camera = None
if self._dxcam: if self._dxcam:
try: _factory = getattr(self._dxcam, "__factory", None)
self._dxcam.__factory.clean_up() if _factory is not None:
except Exception as e: try:
logger.debug("DXcam factory cleanup on teardown: %s", e) _factory.clean_up()
pass except Exception as e:
logger.debug("DXcam factory cleanup on teardown failed", error=str(e))
self._initialized = False self._initialized = False
logger.info(f"DXcam capture stream cleaned up (display={self.display_index})") logger.info(f"DXcam capture stream cleaned up (display={self.display_index})")
@@ -107,8 +113,10 @@ class DXcamCaptureStream(CaptureStream):
return None return None
logger.debug( logger.debug(
f"DXcam captured display {self.display_index}: " "DXcam captured frame",
f"{frame.shape[1]}x{frame.shape[0]}" display=self.display_index,
w=frame.shape[1],
h=frame.shape[0],
) )
return ScreenCapture( return ScreenCapture(
@@ -5,6 +5,13 @@ from typing import Any, Dict, List, Optional
import mss import mss
import numpy as np import numpy as np
try:
import cv2
_HAS_CV2 = True
except ImportError:
_HAS_CV2 = False
from ledgrab.core.capture_engines.base import ( from ledgrab.core.capture_engines.base import (
CaptureEngine, CaptureEngine,
CaptureStream, CaptureStream,
@@ -15,6 +22,13 @@ from ledgrab.utils import get_logger, get_monitor_names, get_monitor_refresh_rat
logger = get_logger(__name__) logger = get_logger(__name__)
# Rotating RGB output pool: keeps prior frame references stable for any
# consumer still reading them while a new frame is written.
_RGB_POOL_SIZE = 3
# Number of bytes from .raw to hash for change detection (cheap pre-check
# that avoids the full BGRA→RGB conversion when the screen is idle).
_CHANGE_DETECT_BYTES = 256
class MSSCaptureStream(CaptureStream): class MSSCaptureStream(CaptureStream):
"""MSS capture stream for a specific display.""" """MSS capture stream for a specific display."""
@@ -22,6 +36,12 @@ class MSSCaptureStream(CaptureStream):
def __init__(self, display_index: int, config: Dict[str, Any]): def __init__(self, display_index: int, config: Dict[str, Any]):
super().__init__(display_index, config) super().__init__(display_index, config)
self._sct = None self._sct = None
# Pre-allocated RGB destination pool — avoids per-frame allocation.
self._rgb_pool: list = [None] * _RGB_POOL_SIZE
self._rgb_idx: int = 0
self._rgb_shape: tuple = (0, 0)
# Cheap hash of the previous .raw bytes, for change detection.
self._prev_hash: Optional[int] = None
def initialize(self) -> None: def initialize(self) -> None:
try: try:
@@ -36,6 +56,7 @@ class MSSCaptureStream(CaptureStream):
self._sct.close() self._sct.close()
self._sct = None self._sct = None
self._initialized = False self._initialized = False
self._prev_hash = None
logger.info(f"MSS capture stream cleaned up (display={self.display_index})") logger.info(f"MSS capture stream cleaned up (display={self.display_index})")
def capture_frame(self) -> Optional[ScreenCapture]: def capture_frame(self) -> Optional[ScreenCapture]:
@@ -55,18 +76,51 @@ class MSSCaptureStream(CaptureStream):
monitor = self._sct.monitors[monitor_index] monitor = self._sct.monitors[monitor_index]
screenshot = self._sct.grab(monitor) screenshot = self._sct.grab(monitor)
# Direct bytes→numpy (skips PIL intermediate object) # Cheap change detection: hash a small slice of the raw BGRA
img_array = np.frombuffer( # buffer. ~256 bytes is enough to differentiate any cursor/pixel
screenshot.rgb, # change. Skips the BGRA→RGB conversion when nothing changed
dtype=np.uint8, # (common on idle desktops). DXcam/BetterCam return None in this
).reshape(screenshot.height, screenshot.width, 3) # case natively; mss does not, so we add it here.
raw = screenshot.raw
sample = bytes(raw[:_CHANGE_DETECT_BYTES])
cur_hash = hash(sample)
if cur_hash == self._prev_hash:
return None
self._prev_hash = cur_hash
height = screenshot.height
width = screenshot.width
# Reshape .raw (BGRA) — zero-copy view over the screenshot's buffer.
# ``screenshot.rgb`` (used previously) is a pure-Python BGRA→RGB
# rebuild costing ~6 MB/frame at 1080p in the slowest possible
# way. cv2.cvtColor is SIMD and writes directly into our pool.
bgra = np.frombuffer(raw, dtype=np.uint8).reshape(height, width, 4)
if self._rgb_shape != (height, width):
for i in range(_RGB_POOL_SIZE):
self._rgb_pool[i] = np.empty((height, width, 3), dtype=np.uint8)
self._rgb_shape = (height, width)
dst = self._rgb_pool[self._rgb_idx]
self._rgb_idx = (self._rgb_idx + 1) % _RGB_POOL_SIZE
if _HAS_CV2:
cv2.cvtColor(bgra, cv2.COLOR_BGRA2RGB, dst=dst)
else:
dst[..., 0] = bgra[..., 2]
dst[..., 1] = bgra[..., 1]
dst[..., 2] = bgra[..., 0]
logger.debug( logger.debug(
f"MSS captured display {self.display_index}: {monitor['width']}x{monitor['height']}" "MSS captured frame",
display=self.display_index,
w=monitor["width"],
h=monitor["height"],
) )
return ScreenCapture( return ScreenCapture(
image=img_array, image=dst,
width=monitor["width"], width=monitor["width"],
height=monitor["height"], height=monitor["height"],
display_index=self.display_index, display_index=self.display_index,
@@ -5,6 +5,14 @@ import sys
import threading import threading
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
import numpy as np
try:
import cv2
_HAS_CV2 = True
except ImportError:
_HAS_CV2 = False
from ledgrab.core.capture_engines.base import ( from ledgrab.core.capture_engines.base import (
CaptureEngine, CaptureEngine,
@@ -16,6 +24,10 @@ from ledgrab.utils import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
# 3-slot rotating output buffer pool: ensures the consumer always has a stable
# RGB array reference (the underlying WGC native buffer is reused per frame).
_RGB_POOL_SIZE = 3
class WGCCaptureStream(CaptureStream): class WGCCaptureStream(CaptureStream):
"""WGC capture stream for a specific display.""" """WGC capture stream for a specific display."""
@@ -29,6 +41,11 @@ class WGCCaptureStream(CaptureStream):
self._frame_event = threading.Event() self._frame_event = threading.Event()
self._closed_event = threading.Event() self._closed_event = threading.Event()
self._frame_lock = threading.Lock() self._frame_lock = threading.Lock()
# Pre-allocated RGB destination buffers (rotated to keep prior frames
# stable for any consumer still reading the previous reference).
self._rgb_pool: list = [None] * _RGB_POOL_SIZE
self._rgb_idx: int = 0
self._rgb_shape: tuple = (0, 0)
def initialize(self) -> None: def initialize(self) -> None:
if self._wgc is None: if self._wgc is None:
@@ -66,10 +83,33 @@ class WGCCaptureStream(CaptureStream):
width = frame.width width = frame.width
height = frame.height height = frame.height
# WGC provides BGRA format, convert to RGB # WGC provides BGRA. ``frame_buffer`` is a view over the
# Fancy indexing creates a new contiguous array — no .copy() needed # native side's reusable buffer — must copy out before
# returning. Use a 3-slot rotating pool of pre-allocated
# RGB buffers + cv2.cvtColor (SIMD) instead of numpy fancy
# indexing. Fancy indexing would allocate ~width*height*3
# bytes per frame (≈480 MB/s at 1080p60); the pool allocates
# 3 buffers total and reuses them.
frame_array = frame_buffer.reshape((height, width, 4)) frame_array = frame_buffer.reshape((height, width, 4))
frame_rgb = frame_array[:, :, [2, 1, 0]]
if self._rgb_shape != (height, width):
for i in range(_RGB_POOL_SIZE):
self._rgb_pool[i] = np.empty((height, width, 3), dtype=np.uint8)
self._rgb_shape = (height, width)
dst = self._rgb_pool[self._rgb_idx]
self._rgb_idx = (self._rgb_idx + 1) % _RGB_POOL_SIZE
if _HAS_CV2:
cv2.cvtColor(frame_array, cv2.COLOR_BGRA2RGB, dst=dst)
frame_rgb = dst
else:
# Fallback: per-channel copy is still 2× faster than
# fancy-index allocation because it writes in-place.
dst[..., 0] = frame_array[..., 2]
dst[..., 1] = frame_array[..., 1]
dst[..., 2] = frame_array[..., 0]
frame_rgb = dst
with self._frame_lock: with self._frame_lock:
self._latest_frame = frame_rgb self._latest_frame = frame_rgb
@@ -153,8 +193,10 @@ class WGCCaptureStream(CaptureStream):
self._cleanup_internal() self._cleanup_internal()
self._initialized = False self._initialized = False
# Force garbage collection to release COM objects # Gen-0 collect is enough to release recently-allocated COM
gc.collect() # references and avoids the multi-hundred-ms full-heap pause
# ``gc.collect()`` would cause on a heap full of frame ndarrays.
gc.collect(0)
logger.info(f"WGC capture stream cleaned up (display={self.display_index})") logger.info(f"WGC capture stream cleaned up (display={self.display_index})")
def capture_frame(self) -> Optional[ScreenCapture]: def capture_frame(self) -> Optional[ScreenCapture]:
@@ -173,7 +215,10 @@ class WGCCaptureStream(CaptureStream):
self._frame_event.clear() self._frame_event.clear()
logger.debug( logger.debug(
f"WGC captured display {self.display_index}: " f"{frame.shape[1]}x{frame.shape[0]}" "WGC captured frame",
display=self.display_index,
w=frame.shape[1],
h=frame.shape[0],
) )
return ScreenCapture( return ScreenCapture(