Files
ledgrab/server/src/ledgrab/api/routes/_preview_helpers.py
T
alexei.dolgolyov 888f8fd16e refactor(types): PEP-604 union sweep + UP007/UP045 enforcement
ruff --select UP007,UP045 --fix converted ~1760 sites across the
backend: `Optional[T]` → `T | None`, `Union[X, Y]` → `X | Y`. The
remaining module-level alias targets that ruff conservatively skips
(BindableFloatInput, ColorList, DeviceConfig) were converted by hand
earlier in the pass. black -formatted the result so the wider unions
fit cleanly under the 100-char line budget.

pyproject.toml now sets [tool.ruff.lint] extend-select = ["UP007",
"UP045"] so future legacy imports fire CI on every push. The
pre-commit ruff hook was bumped from v0.8.0 -> v0.15.12 to recognise
UP045 (split off from UP007 in v0.13).
2026-05-23 01:21:44 +03:00

215 lines
7.3 KiB
Python

"""Shared helpers for WebSocket-based capture preview endpoints."""
import asyncio
import threading
import time
from typing import Callable
import numpy as np
from starlette.websockets import WebSocket
from ledgrab.core.filters import FilterRegistry, ImagePool
from ledgrab.utils import get_logger
from ledgrab.utils.image_codec import (
encode_jpeg,
encode_jpeg_data_uri,
resize_down,
thumbnail,
)
logger = get_logger(__name__)
PREVIEW_INTERVAL = 0.1 # seconds between intermediate thumbnail sends
PREVIEW_MAX_WIDTH = 640 # px for intermediate thumbnails
FINAL_THUMBNAIL_WIDTH = 640 # px for the final thumbnail
FINAL_JPEG_QUALITY = 90
PREVIEW_JPEG_QUALITY = 70
def _encode_jpeg(image: np.ndarray, quality: int = 85) -> str:
"""Encode a numpy RGB image as a JPEG base64 data URI."""
return encode_jpeg_data_uri(image, quality)
def encode_preview_frame(image: np.ndarray, max_width: int = None, quality: int = 80) -> bytes:
"""Encode a numpy RGB image to JPEG bytes, optionally downscaling."""
if max_width:
image = resize_down(image, max_width)
return encode_jpeg(image, quality)
def _make_thumbnail(image: np.ndarray, max_width: int) -> np.ndarray:
"""Create a thumbnail copy of the image, preserving aspect ratio."""
return thumbnail(image, max_width)
def _apply_pp_filters(image: np.ndarray, flat_filters: list) -> np.ndarray:
"""Apply postprocessing filter instances to a numpy image."""
if not flat_filters:
return image
pool = ImagePool()
arr = image
for fi in flat_filters:
f = FilterRegistry.create_instance(fi.filter_id, fi.options)
result = f.process_image(arr, pool)
if result is not None:
arr = result
return arr
async def stream_capture_test(
websocket: WebSocket,
engine_factory: Callable,
duration: float,
pp_filters: list | None = None,
preview_width: int | None = None,
) -> None:
"""Run a capture test, streaming intermediate thumbnails and a final full-res frame.
The engine is created and used entirely within a background thread to avoid
thread-affinity issues (e.g. MSS uses thread-local state).
Args:
websocket: Accepted WebSocket connection.
engine_factory: Zero-arg callable that returns an initialized engine stream
(with .capture_frame() and .cleanup() methods). Called inside the
capture thread so thread-local resources work correctly.
duration: Test duration in seconds.
pp_filters: Optional list of resolved filter instances to apply to frames.
"""
thumb_width = preview_width or PREVIEW_MAX_WIDTH
# Shared state between capture thread and async loop
latest_frame = None # numpy RGB array
frame_count = 0
total_capture_time = 0.0
stop_event = threading.Event()
done_event = threading.Event()
init_error = None # set if engine_factory fails
def _capture_loop():
nonlocal latest_frame, frame_count, total_capture_time, init_error
stream = None
try:
stream = engine_factory()
start = time.perf_counter()
end = start + duration
while time.perf_counter() < end and not stop_event.is_set():
t0 = time.perf_counter()
capture = stream.capture_frame()
t1 = time.perf_counter()
if capture is None:
time.sleep(0.005)
continue
total_capture_time += t1 - t0
frame_count += 1
if isinstance(capture.image, np.ndarray):
latest_frame = capture.image
else:
latest_frame = capture.image
except Exception as e:
init_error = str(e)
logger.error(f"Capture thread error: {e}")
finally:
if stream:
try:
stream.cleanup()
except Exception as e:
logger.debug("Capture stream cleanup error: %s", e)
pass
done_event.set()
# Start capture in background thread
loop = asyncio.get_running_loop()
capture_future = loop.run_in_executor(None, _capture_loop)
start_time = time.perf_counter()
last_sent_frame = None
try:
# Stream intermediate previews
while not done_event.is_set():
await asyncio.sleep(PREVIEW_INTERVAL)
# Check for init error
if init_error:
stop_event.set()
await capture_future
await websocket.send_json({"type": "error", "detail": init_error})
return
frame = latest_frame
if frame is not None and frame is not last_sent_frame:
last_sent_frame = frame
elapsed = time.perf_counter() - start_time
fc = frame_count
tc = total_capture_time
# Encode preview thumbnail (small + fast)
thumb = _make_thumbnail(frame, thumb_width)
if pp_filters:
thumb = _apply_pp_filters(thumb, pp_filters)
thumb_uri = _encode_jpeg(thumb, PREVIEW_JPEG_QUALITY)
fps = fc / elapsed if elapsed > 0 else 0
avg_ms = (tc / fc * 1000) if fc > 0 else 0
await websocket.send_json(
{
"type": "frame",
"thumbnail": thumb_uri,
"frame_count": fc,
"elapsed_s": round(elapsed, 2),
"fps": round(fps, 1),
"avg_capture_ms": round(avg_ms, 1),
}
)
# Wait for capture thread to fully finish
await capture_future
# Check for errors
if init_error:
await websocket.send_json({"type": "error", "detail": init_error})
return
# Send final result
final_frame = latest_frame
if final_frame is None:
await websocket.send_json({"type": "error", "detail": "No frames captured"})
return
elapsed = time.perf_counter() - start_time
fc = frame_count
tc = total_capture_time
fps = fc / elapsed if elapsed > 0 else 0
avg_ms = (tc / fc * 1000) if fc > 0 else 0
# Apply PP filters to final images
if pp_filters:
final_frame = _apply_pp_filters(final_frame, pp_filters)
h, w = final_frame.shape[:2]
full_uri = _encode_jpeg(final_frame, FINAL_JPEG_QUALITY)
thumb = _make_thumbnail(final_frame, FINAL_THUMBNAIL_WIDTH)
thumb_uri = _encode_jpeg(thumb, 85)
await websocket.send_json(
{
"type": "result",
"full_image": full_uri,
"thumbnail": thumb_uri,
"width": w,
"height": h,
"frame_count": fc,
"elapsed_s": round(elapsed, 2),
"fps": round(fps, 1),
"avg_capture_ms": round(avg_ms, 1),
}
)
except Exception as e:
# WebSocket disconnect or send error — signal capture thread to stop
logger.debug("Capture preview WS error, stopping capture thread: %s", e)
stop_event.set()
await capture_future
raise