Add camera/webcam capture engine with engine-aware display picker

- New CameraEngine using OpenCV VideoCapture for webcam capture
- HAS_OWN_DISPLAYS class attribute on CaptureEngine base to distinguish
  engines with their own device lists from desktop monitor engines
- Display picker renders device list for cameras/scrcpy, spatial layout
  for desktop monitors
- Engine-aware display label formatting (camera name vs monitor index)
- Stream modal properly loads engine-specific displays on template change,
  edit, and clone
- Camera backend config rendered as dropdown (auto/dshow/msmf/v4l2)
- Remove offline label from device cards (healthcheck indicator suffices)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-01 12:46:28 +03:00
parent b9ec509f56
commit 8fe9c6489b
14 changed files with 405 additions and 42 deletions

View File

@@ -12,6 +12,7 @@ from wled_controller.core.capture_engines.dxcam_engine import DXcamEngine, DXcam
from wled_controller.core.capture_engines.bettercam_engine import BetterCamEngine, BetterCamCaptureStream
from wled_controller.core.capture_engines.wgc_engine import WGCEngine, WGCCaptureStream
from wled_controller.core.capture_engines.scrcpy_engine import ScrcpyEngine, ScrcpyCaptureStream
from wled_controller.core.capture_engines.camera_engine import CameraEngine, CameraCaptureStream
# Auto-register available engines
EngineRegistry.register(MSSEngine)
@@ -19,6 +20,7 @@ EngineRegistry.register(DXcamEngine)
EngineRegistry.register(BetterCamEngine)
EngineRegistry.register(WGCEngine)
EngineRegistry.register(ScrcpyEngine)
EngineRegistry.register(CameraEngine)
__all__ = [
"CaptureEngine",
@@ -36,4 +38,6 @@ __all__ = [
"WGCCaptureStream",
"ScrcpyEngine",
"ScrcpyCaptureStream",
"CameraEngine",
"CameraCaptureStream",
]

View File

@@ -103,6 +103,7 @@ class CaptureEngine(ABC):
ENGINE_TYPE: str = "base"
ENGINE_PRIORITY: int = 0
HAS_OWN_DISPLAYS: bool = False # True for engines with non-desktop device lists
@classmethod
@abstractmethod

View File

@@ -0,0 +1,276 @@
"""Camera/webcam capture engine using OpenCV.
Captures frames from USB/integrated webcams via ``cv2.VideoCapture``.
Each camera is exposed as a "display" through the standard
``CaptureEngine`` / ``CaptureStream`` architecture.
Prerequisites (optional dependency):
pip install opencv-python-headless>=4.8.0
"""
import platform
import sys
from typing import Any, Dict, List, Optional
import numpy as np
from wled_controller.core.capture_engines.base import (
CaptureEngine,
CaptureStream,
DisplayInfo,
ScreenCapture,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
_MAX_CAMERA_INDEX = 10 # probe indices 0..9
_CV2_BACKENDS = {
"auto": None,
"dshow": 700, # cv2.CAP_DSHOW
"msmf": 1400, # cv2.CAP_MSMF
"v4l2": 200, # cv2.CAP_V4L2
}
def _get_default_backend():
"""Return the best default backend for the current platform."""
if sys.platform == "win32":
return "dshow" # faster open than MSMF on Windows
return "auto"
def _cv2_backend_id(backend_name: str) -> Optional[int]:
"""Convert a backend name string to cv2 API preference constant."""
return _CV2_BACKENDS.get(backend_name)
def _get_camera_friendly_names() -> Dict[int, str]:
"""Get friendly names for cameras from OS.
On Windows, queries WMI for PnP camera devices.
Returns a dict mapping sequential index → friendly name.
"""
if platform.system() != "Windows":
return {}
try:
import wmi
c = wmi.WMI()
cameras = c.query(
"SELECT Name FROM Win32_PnPEntity WHERE PNPClass = 'Camera'"
)
return {i: cam.Name for i, cam in enumerate(cameras)}
except Exception as e:
logger.debug(f"WMI camera enumeration failed: {e}")
return {}
def _enumerate_cameras(backend_name: str = "auto") -> List[Dict[str, Any]]:
"""Probe camera indices and return metadata for each available camera.
Returns a list of dicts: {cv2_index, name, width, height, fps}.
"""
try:
import cv2
except ImportError:
return []
backend_id = _cv2_backend_id(backend_name)
friendly_names = _get_camera_friendly_names()
cameras: List[Dict[str, Any]] = []
sequential_idx = 0
for i in range(_MAX_CAMERA_INDEX):
if backend_id is not None:
cap = cv2.VideoCapture(i, backend_id)
else:
cap = cv2.VideoCapture(i)
if not cap.isOpened():
cap.release()
continue
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
name = friendly_names.get(sequential_idx, f"Camera {sequential_idx}")
cap.release()
cameras.append({
"cv2_index": i,
"name": name,
"width": width,
"height": height,
"fps": fps,
})
sequential_idx += 1
return cameras
# ---------------------------------------------------------------------------
# CaptureStream
# ---------------------------------------------------------------------------
class CameraCaptureStream(CaptureStream):
"""OpenCV-based webcam capture stream.
Synchronous capture like MSSEngine — ``capture_frame()`` grabs
one frame per call. OpenCV's internal buffering keeps latency low.
"""
def __init__(self, display_index: int, config: Dict[str, Any]):
super().__init__(display_index, config)
self._cap = None
def initialize(self) -> None:
if self._initialized:
return
import cv2
backend_name = self.config.get("camera_backend", _get_default_backend())
# Enumerate to resolve display_index → actual cv2 camera index
cameras = _enumerate_cameras(backend_name)
if not cameras:
raise RuntimeError(
"No cameras found. Ensure a webcam is connected."
)
if self.display_index >= len(cameras):
raise RuntimeError(
f"Camera index {self.display_index} out of range "
f"(found {len(cameras)} camera(s))"
)
camera = cameras[self.display_index]
cv2_index = camera["cv2_index"]
# Open the camera
backend_id = _cv2_backend_id(backend_name)
if backend_id is not None:
self._cap = cv2.VideoCapture(cv2_index, backend_id)
else:
self._cap = cv2.VideoCapture(cv2_index)
if not self._cap.isOpened():
raise RuntimeError(
f"Failed to open camera {self.display_index} "
f"(cv2 index {cv2_index})"
)
# Apply optional resolution override
res_w = self.config.get("resolution_width", 0)
res_h = self.config.get("resolution_height", 0)
if res_w > 0 and res_h > 0:
self._cap.set(cv2.CAP_PROP_FRAME_WIDTH, res_w)
self._cap.set(cv2.CAP_PROP_FRAME_HEIGHT, res_h)
# Test read
ret, frame = self._cap.read()
if not ret or frame is None:
self._cap.release()
self._cap = None
raise RuntimeError(
f"Camera {self.display_index} opened but test read failed"
)
h, w = frame.shape[:2]
self._initialized = True
logger.info(
f"Camera capture stream initialized "
f"(camera={camera['name']}, cv2_idx={cv2_index}, {w}x{h})"
)
def capture_frame(self) -> Optional[ScreenCapture]:
if not self._initialized:
self.initialize()
import cv2
ret, frame = self._cap.read()
if not ret or frame is None:
return None
# OpenCV captures BGR; convert to RGB
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w = rgb.shape[:2]
return ScreenCapture(
image=rgb,
width=w,
height=h,
display_index=self.display_index,
)
def cleanup(self) -> None:
if self._cap is not None:
self._cap.release()
self._cap = None
self._initialized = False
logger.info(f"Camera capture stream cleaned up (display={self.display_index})")
# ---------------------------------------------------------------------------
# CaptureEngine
# ---------------------------------------------------------------------------
class CameraEngine(CaptureEngine):
"""Camera/webcam capture engine using OpenCV.
Captures frames from USB or integrated webcams.
Each connected camera appears as a selectable "display".
Prerequisites:
pip install opencv-python-headless>=4.8.0
"""
ENGINE_TYPE = "camera"
ENGINE_PRIORITY = 0
HAS_OWN_DISPLAYS = True
@classmethod
def is_available(cls) -> bool:
try:
import cv2 # noqa: F401
return True
except ImportError:
return False
@classmethod
def get_default_config(cls) -> Dict[str, Any]:
return {
"camera_backend": _get_default_backend(),
"resolution_width": 0,
"resolution_height": 0,
}
@classmethod
def get_available_displays(cls) -> List[DisplayInfo]:
backend = _get_default_backend()
cameras = _enumerate_cameras(backend)
displays = []
for idx, cam in enumerate(cameras):
displays.append(DisplayInfo(
index=idx,
name=cam["name"],
width=cam["width"],
height=cam["height"],
x=idx * 500,
y=0,
is_primary=(idx == 0),
refresh_rate=int(cam["fps"]),
))
logger.debug(f"Camera engine detected {len(displays)} camera(s)")
return displays
@classmethod
def create_stream(
cls, display_index: int, config: Dict[str, Any]
) -> CameraCaptureStream:
return CameraCaptureStream(display_index, config)

View File

@@ -304,6 +304,7 @@ class ScrcpyEngine(CaptureEngine):
ENGINE_TYPE = "scrcpy"
ENGINE_PRIORITY = 5
HAS_OWN_DISPLAYS = True
@classmethod
def is_available(cls) -> bool: