Improve WGC cleanup, add capture duration persistence, simplify test errors
Some checks failed
Validate / validate (push) Failing after 9s
Some checks failed
Validate / validate (push) Failing after 9s
- Add WGC engine with aggressive COM object cleanup (multiple GC passes) - Persist capture test duration to localStorage - Show only short error messages in snackbar (details in console) Note: WGC cleanup still has issues with yellow border persistence - needs further investigation Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
26
server/src/wled_controller/core/capture_engines/__init__.py
Normal file
26
server/src/wled_controller/core/capture_engines/__init__.py
Normal file
@@ -0,0 +1,26 @@
|
||||
"""Screen capture engine abstraction layer."""
|
||||
|
||||
from wled_controller.core.capture_engines.base import (
|
||||
CaptureEngine,
|
||||
DisplayInfo,
|
||||
ScreenCapture,
|
||||
)
|
||||
from wled_controller.core.capture_engines.factory import EngineRegistry
|
||||
from wled_controller.core.capture_engines.mss_engine import MSSEngine
|
||||
from wled_controller.core.capture_engines.dxcam_engine import DXcamEngine
|
||||
from wled_controller.core.capture_engines.wgc_engine import WGCEngine
|
||||
|
||||
# Auto-register available engines
|
||||
EngineRegistry.register(MSSEngine)
|
||||
EngineRegistry.register(DXcamEngine)
|
||||
EngineRegistry.register(WGCEngine)
|
||||
|
||||
__all__ = [
|
||||
"CaptureEngine",
|
||||
"DisplayInfo",
|
||||
"ScreenCapture",
|
||||
"EngineRegistry",
|
||||
"MSSEngine",
|
||||
"DXcamEngine",
|
||||
"WGCEngine",
|
||||
]
|
||||
141
server/src/wled_controller/core/capture_engines/base.py
Normal file
141
server/src/wled_controller/core/capture_engines/base.py
Normal file
@@ -0,0 +1,141 @@
|
||||
"""Base classes for screen capture engines."""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
@dataclass
|
||||
class DisplayInfo:
|
||||
"""Information about a display/monitor."""
|
||||
|
||||
index: int
|
||||
name: str
|
||||
width: int
|
||||
height: int
|
||||
x: int
|
||||
y: int
|
||||
is_primary: bool
|
||||
refresh_rate: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class ScreenCapture:
|
||||
"""Captured screen image data."""
|
||||
|
||||
image: np.ndarray
|
||||
width: int
|
||||
height: int
|
||||
display_index: int
|
||||
|
||||
|
||||
class CaptureEngine(ABC):
|
||||
"""Abstract base class for screen capture engines.
|
||||
|
||||
All screen capture engines must implement this interface to be
|
||||
compatible with the WLED Screen Controller system.
|
||||
"""
|
||||
|
||||
ENGINE_TYPE: str = "base" # Override in subclasses
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
"""Initialize engine with configuration.
|
||||
|
||||
Args:
|
||||
config: Engine-specific configuration dict
|
||||
"""
|
||||
self.config = config
|
||||
self._initialized = False
|
||||
|
||||
@abstractmethod
|
||||
def initialize(self) -> None:
|
||||
"""Initialize the capture engine.
|
||||
|
||||
This method should prepare any resources needed for screen capture
|
||||
(e.g., creating capture objects, allocating buffers).
|
||||
|
||||
Raises:
|
||||
RuntimeError: If initialization fails
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def cleanup(self) -> None:
|
||||
"""Cleanup engine resources.
|
||||
|
||||
This method should release any resources allocated during
|
||||
initialization or capture operations.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_available_displays(self) -> List[DisplayInfo]:
|
||||
"""Get list of available displays.
|
||||
|
||||
Returns:
|
||||
List of DisplayInfo objects describing available displays
|
||||
|
||||
Raises:
|
||||
RuntimeError: If unable to detect displays
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def capture_display(self, display_index: int) -> ScreenCapture:
|
||||
"""Capture the specified display.
|
||||
|
||||
Args:
|
||||
display_index: Index of display to capture (0-based)
|
||||
|
||||
Returns:
|
||||
ScreenCapture object with image data as numpy array (RGB format)
|
||||
|
||||
Raises:
|
||||
ValueError: If display_index is invalid
|
||||
RuntimeError: If capture fails
|
||||
"""
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def is_available(cls) -> bool:
|
||||
"""Check if this engine is available on current system.
|
||||
|
||||
Returns:
|
||||
True if engine can be used on this platform
|
||||
|
||||
Examples:
|
||||
>>> MSSEngine.is_available()
|
||||
True # MSS is available on all platforms
|
||||
>>> DXcamEngine.is_available()
|
||||
True # On Windows 8.1+
|
||||
False # On Linux/macOS
|
||||
"""
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def get_default_config(cls) -> Dict[str, Any]:
|
||||
"""Get default configuration for this engine.
|
||||
|
||||
Returns:
|
||||
Default config dict with engine-specific options
|
||||
|
||||
Examples:
|
||||
>>> MSSEngine.get_default_config()
|
||||
{}
|
||||
>>> DXcamEngine.get_default_config()
|
||||
{'device_idx': 0, 'output_color': 'RGB', 'max_buffer_len': 64}
|
||||
"""
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
"""Context manager entry - initialize engine."""
|
||||
self.initialize()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Context manager exit - cleanup engine."""
|
||||
self.cleanup()
|
||||
251
server/src/wled_controller/core/capture_engines/dxcam_engine.py
Normal file
251
server/src/wled_controller/core/capture_engines/dxcam_engine.py
Normal file
@@ -0,0 +1,251 @@
|
||||
"""DXcam-based screen capture engine (Windows only, DXGI Desktop Duplication)."""
|
||||
|
||||
import sys
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import numpy as np
|
||||
|
||||
from wled_controller.core.capture_engines.base import (
|
||||
CaptureEngine,
|
||||
DisplayInfo,
|
||||
ScreenCapture,
|
||||
)
|
||||
from wled_controller.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class DXcamEngine(CaptureEngine):
|
||||
"""DXcam-based screen capture engine.
|
||||
|
||||
Uses the dxcam library which leverages DXGI Desktop Duplication API for
|
||||
ultra-fast screen capture on Windows. Offers significantly better performance
|
||||
than MSS and eliminates cursor flickering.
|
||||
|
||||
Requires: Windows 8.1+
|
||||
"""
|
||||
|
||||
ENGINE_TYPE = "dxcam"
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
"""Initialize DXcam engine.
|
||||
|
||||
Args:
|
||||
config: Engine configuration
|
||||
- device_idx (int): GPU index (default: 0)
|
||||
- output_idx (int|None): Monitor index (default: None = primary)
|
||||
- output_color (str): Color format "RGB" or "BGR" (default: "RGB")
|
||||
- max_buffer_len (int): Frame buffer size (default: 64)
|
||||
"""
|
||||
super().__init__(config)
|
||||
self._camera = None
|
||||
self._dxcam = None
|
||||
|
||||
def initialize(self) -> None:
|
||||
"""Initialize DXcam capture.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If DXcam not installed or initialization fails
|
||||
"""
|
||||
try:
|
||||
import dxcam
|
||||
self._dxcam = dxcam
|
||||
except ImportError:
|
||||
raise RuntimeError(
|
||||
"DXcam not installed. Install with: pip install dxcam"
|
||||
)
|
||||
|
||||
try:
|
||||
device_idx = self.config.get("device_idx", 0)
|
||||
output_idx = self.config.get("output_idx", None)
|
||||
output_color = self.config.get("output_color", "RGB")
|
||||
max_buffer_len = self.config.get("max_buffer_len", 64)
|
||||
|
||||
self._camera = self._dxcam.create(
|
||||
device_idx=device_idx,
|
||||
output_idx=output_idx,
|
||||
output_color=output_color,
|
||||
max_buffer_len=max_buffer_len,
|
||||
)
|
||||
|
||||
if not self._camera:
|
||||
raise RuntimeError("Failed to create DXcam camera instance")
|
||||
|
||||
# Start the camera to begin capturing
|
||||
self._camera.start()
|
||||
|
||||
self._initialized = True
|
||||
logger.info(
|
||||
f"DXcam engine initialized (device={device_idx}, "
|
||||
f"output={output_idx}, color={output_color})"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to initialize DXcam: {e}")
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""Cleanup DXcam resources."""
|
||||
if self._camera:
|
||||
try:
|
||||
# Stop capturing before releasing
|
||||
self._camera.stop()
|
||||
self._camera.release()
|
||||
except Exception as e:
|
||||
logger.error(f"Error releasing DXcam camera: {e}")
|
||||
self._camera = None
|
||||
|
||||
self._initialized = False
|
||||
logger.info("DXcam engine cleaned up")
|
||||
|
||||
def get_available_displays(self) -> List[DisplayInfo]:
|
||||
"""Get list of available displays using DXcam.
|
||||
|
||||
Note: DXcam provides limited display enumeration. This method
|
||||
returns basic information for the configured output.
|
||||
|
||||
Returns:
|
||||
List of DisplayInfo objects
|
||||
|
||||
Raises:
|
||||
RuntimeError: If not initialized or detection fails
|
||||
"""
|
||||
if not self._initialized:
|
||||
raise RuntimeError("Engine not initialized")
|
||||
|
||||
try:
|
||||
displays = []
|
||||
|
||||
# Get output information from DXcam
|
||||
# Note: DXcam doesn't provide comprehensive display enumeration
|
||||
# We report the single configured output
|
||||
output_idx = self.config.get("output_idx", 0)
|
||||
if output_idx is None:
|
||||
output_idx = 0
|
||||
|
||||
# DXcam camera has basic output info
|
||||
if self._camera and hasattr(self._camera, "width") and hasattr(self._camera, "height"):
|
||||
display_info = DisplayInfo(
|
||||
index=output_idx,
|
||||
name=f"DXcam Display {output_idx}",
|
||||
width=self._camera.width,
|
||||
height=self._camera.height,
|
||||
x=0, # DXcam doesn't provide position info
|
||||
y=0,
|
||||
is_primary=(output_idx == 0),
|
||||
refresh_rate=60, # DXcam doesn't report refresh rate
|
||||
)
|
||||
displays.append(display_info)
|
||||
else:
|
||||
# Fallback if camera doesn't have dimensions
|
||||
display_info = DisplayInfo(
|
||||
index=output_idx,
|
||||
name=f"DXcam Display {output_idx}",
|
||||
width=1920, # Reasonable default
|
||||
height=1080,
|
||||
x=0,
|
||||
y=0,
|
||||
is_primary=(output_idx == 0),
|
||||
refresh_rate=60,
|
||||
)
|
||||
displays.append(display_info)
|
||||
|
||||
logger.debug(f"DXcam detected {len(displays)} display(s)")
|
||||
return displays
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to detect displays with DXcam: {e}")
|
||||
raise RuntimeError(f"Failed to detect displays: {e}")
|
||||
|
||||
def capture_display(self, display_index: int) -> ScreenCapture:
|
||||
"""Capture display using DXcam.
|
||||
|
||||
Args:
|
||||
display_index: Index of display to capture (0-based)
|
||||
Note: DXcam is configured for a specific output, so this
|
||||
should match the configured output_idx
|
||||
|
||||
Returns:
|
||||
ScreenCapture object with image data
|
||||
|
||||
Raises:
|
||||
RuntimeError: If not initialized
|
||||
ValueError: If display_index doesn't match configured output
|
||||
RuntimeError: If capture fails
|
||||
"""
|
||||
if not self._initialized:
|
||||
raise RuntimeError("Engine not initialized")
|
||||
|
||||
# DXcam is configured for a specific output
|
||||
configured_output = self.config.get("output_idx", 0)
|
||||
if configured_output is None:
|
||||
configured_output = 0
|
||||
|
||||
if display_index != configured_output:
|
||||
raise ValueError(
|
||||
f"DXcam engine is configured for output {configured_output}, "
|
||||
f"cannot capture display {display_index}. Create a new template "
|
||||
f"with output_idx={display_index} to capture this display."
|
||||
)
|
||||
|
||||
try:
|
||||
# Grab frame from DXcam
|
||||
frame = self._camera.grab()
|
||||
|
||||
if frame is None:
|
||||
raise RuntimeError(
|
||||
"Failed to capture frame (no new data). This can happen if "
|
||||
"the screen hasn't changed or if there's a timeout."
|
||||
)
|
||||
|
||||
# DXcam returns numpy array directly in configured color format
|
||||
logger.debug(
|
||||
f"DXcam captured display {display_index}: "
|
||||
f"{frame.shape[1]}x{frame.shape[0]}"
|
||||
)
|
||||
|
||||
return ScreenCapture(
|
||||
image=frame,
|
||||
width=frame.shape[1],
|
||||
height=frame.shape[0],
|
||||
display_index=display_index,
|
||||
)
|
||||
|
||||
except ValueError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to capture display {display_index} with DXcam: {e}")
|
||||
raise RuntimeError(f"Screen capture failed: {e}")
|
||||
|
||||
@classmethod
|
||||
def is_available(cls) -> bool:
|
||||
"""Check if DXcam is available.
|
||||
|
||||
DXcam requires Windows 8.1+ and the dxcam package.
|
||||
|
||||
Returns:
|
||||
True if dxcam is available on this system
|
||||
"""
|
||||
# Check platform
|
||||
if sys.platform != "win32":
|
||||
return False
|
||||
|
||||
# Check if dxcam is installed
|
||||
try:
|
||||
import dxcam
|
||||
return True
|
||||
except ImportError:
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def get_default_config(cls) -> Dict[str, Any]:
|
||||
"""Get default DXcam configuration.
|
||||
|
||||
Returns:
|
||||
Default config dict with DXcam options
|
||||
"""
|
||||
return {
|
||||
"device_idx": 0, # Primary GPU
|
||||
"output_idx": None, # Primary monitor (None = auto-select)
|
||||
"output_color": "RGB", # RGB color format
|
||||
"max_buffer_len": 64, # Frame buffer size
|
||||
}
|
||||
134
server/src/wled_controller/core/capture_engines/factory.py
Normal file
134
server/src/wled_controller/core/capture_engines/factory.py
Normal file
@@ -0,0 +1,134 @@
|
||||
"""Engine registry and factory for screen capture engines."""
|
||||
|
||||
from typing import Any, Dict, List, Type
|
||||
|
||||
from wled_controller.core.capture_engines.base import CaptureEngine
|
||||
from wled_controller.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class EngineRegistry:
|
||||
"""Registry for available capture engines.
|
||||
|
||||
This class maintains a registry of all capture engine implementations
|
||||
and provides factory methods for creating engine instances.
|
||||
"""
|
||||
|
||||
_engines: Dict[str, Type[CaptureEngine]] = {}
|
||||
|
||||
@classmethod
|
||||
def register(cls, engine_class: Type[CaptureEngine]):
|
||||
"""Register a capture engine.
|
||||
|
||||
Args:
|
||||
engine_class: Engine class to register (must inherit from CaptureEngine)
|
||||
|
||||
Raises:
|
||||
ValueError: If engine_class is not a subclass of CaptureEngine
|
||||
ValueError: If an engine with the same ENGINE_TYPE is already registered
|
||||
"""
|
||||
if not issubclass(engine_class, CaptureEngine):
|
||||
raise ValueError(f"{engine_class} must be a subclass of CaptureEngine")
|
||||
|
||||
engine_type = engine_class.ENGINE_TYPE
|
||||
if engine_type == "base":
|
||||
raise ValueError("Cannot register base engine type")
|
||||
|
||||
if engine_type in cls._engines:
|
||||
logger.warning(f"Engine '{engine_type}' already registered, overwriting")
|
||||
|
||||
cls._engines[engine_type] = engine_class
|
||||
logger.info(f"Registered capture engine: {engine_type}")
|
||||
|
||||
@classmethod
|
||||
def get_engine(cls, engine_type: str) -> Type[CaptureEngine]:
|
||||
"""Get engine class by type.
|
||||
|
||||
Args:
|
||||
engine_type: Engine type identifier (e.g., "mss", "dxcam")
|
||||
|
||||
Returns:
|
||||
Engine class
|
||||
|
||||
Raises:
|
||||
ValueError: If engine type not found
|
||||
"""
|
||||
if engine_type not in cls._engines:
|
||||
available = ", ".join(cls._engines.keys()) or "none"
|
||||
raise ValueError(
|
||||
f"Unknown engine type: '{engine_type}'. Available engines: {available}"
|
||||
)
|
||||
return cls._engines[engine_type]
|
||||
|
||||
@classmethod
|
||||
def get_available_engines(cls) -> List[str]:
|
||||
"""Get list of available engine types on this system.
|
||||
|
||||
Returns:
|
||||
List of engine type identifiers that are available on the current platform
|
||||
|
||||
Examples:
|
||||
>>> EngineRegistry.get_available_engines()
|
||||
['mss'] # On Linux
|
||||
['mss', 'dxcam', 'wgc'] # On Windows 10+
|
||||
"""
|
||||
available = []
|
||||
for engine_type, engine_class in cls._engines.items():
|
||||
try:
|
||||
if engine_class.is_available():
|
||||
available.append(engine_type)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error checking availability for engine '{engine_type}': {e}"
|
||||
)
|
||||
|
||||
return available
|
||||
|
||||
@classmethod
|
||||
def get_all_engines(cls) -> Dict[str, Type[CaptureEngine]]:
|
||||
"""Get all registered engines (available or not).
|
||||
|
||||
Returns:
|
||||
Dictionary mapping engine type to engine class
|
||||
"""
|
||||
return cls._engines.copy()
|
||||
|
||||
@classmethod
|
||||
def create_engine(cls, engine_type: str, config: Dict[str, Any]) -> CaptureEngine:
|
||||
"""Create engine instance with configuration.
|
||||
|
||||
Args:
|
||||
engine_type: Engine type identifier
|
||||
config: Engine-specific configuration
|
||||
|
||||
Returns:
|
||||
Initialized engine instance
|
||||
|
||||
Raises:
|
||||
ValueError: If engine type not found or not available
|
||||
RuntimeError: If engine initialization fails
|
||||
"""
|
||||
engine_class = cls.get_engine(engine_type)
|
||||
|
||||
if not engine_class.is_available():
|
||||
raise ValueError(
|
||||
f"Engine '{engine_type}' is not available on this system"
|
||||
)
|
||||
|
||||
try:
|
||||
engine = engine_class(config)
|
||||
logger.debug(f"Created engine instance: {engine_type}")
|
||||
return engine
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create engine '{engine_type}': {e}")
|
||||
raise RuntimeError(f"Failed to create engine '{engine_type}': {e}")
|
||||
|
||||
@classmethod
|
||||
def clear_registry(cls):
|
||||
"""Clear all registered engines.
|
||||
|
||||
This is primarily useful for testing.
|
||||
"""
|
||||
cls._engines.clear()
|
||||
logger.debug("Cleared engine registry")
|
||||
185
server/src/wled_controller/core/capture_engines/mss_engine.py
Normal file
185
server/src/wled_controller/core/capture_engines/mss_engine.py
Normal file
@@ -0,0 +1,185 @@
|
||||
"""MSS-based screen capture engine (cross-platform)."""
|
||||
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import mss
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
from wled_controller.core.capture_engines.base import (
|
||||
CaptureEngine,
|
||||
DisplayInfo,
|
||||
ScreenCapture,
|
||||
)
|
||||
from wled_controller.utils import get_logger, get_monitor_names, get_monitor_refresh_rates
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class MSSEngine(CaptureEngine):
|
||||
"""MSS-based screen capture engine.
|
||||
|
||||
Uses the mss library for cross-platform screen capture support.
|
||||
Works on Windows, macOS, and Linux.
|
||||
|
||||
Note: May experience cursor flickering on some systems.
|
||||
"""
|
||||
|
||||
ENGINE_TYPE = "mss"
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
"""Initialize MSS engine.
|
||||
|
||||
Args:
|
||||
config: Engine configuration (currently unused for MSS)
|
||||
"""
|
||||
super().__init__(config)
|
||||
self._sct = None
|
||||
|
||||
def initialize(self) -> None:
|
||||
"""Initialize MSS capture context.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If MSS initialization fails
|
||||
"""
|
||||
try:
|
||||
self._sct = mss.mss()
|
||||
self._initialized = True
|
||||
logger.info("MSS engine initialized")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to initialize MSS: {e}")
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""Cleanup MSS resources."""
|
||||
if self._sct:
|
||||
self._sct.close()
|
||||
self._sct = None
|
||||
self._initialized = False
|
||||
logger.info("MSS engine cleaned up")
|
||||
|
||||
def get_available_displays(self) -> List[DisplayInfo]:
|
||||
"""Get list of available displays using MSS.
|
||||
|
||||
Returns:
|
||||
List of DisplayInfo objects for each available monitor
|
||||
|
||||
Raises:
|
||||
RuntimeError: If not initialized or display detection fails
|
||||
"""
|
||||
if not self._initialized:
|
||||
raise RuntimeError("Engine not initialized")
|
||||
|
||||
try:
|
||||
# Get friendly monitor names (Windows only, falls back to generic names)
|
||||
monitor_names = get_monitor_names()
|
||||
|
||||
# Get monitor refresh rates (Windows only, falls back to 60Hz)
|
||||
refresh_rates = get_monitor_refresh_rates()
|
||||
|
||||
displays = []
|
||||
|
||||
# Skip the first monitor (combined virtual screen on multi-monitor setups)
|
||||
for idx, monitor in enumerate(self._sct.monitors[1:], start=0):
|
||||
# Use friendly name from WMI if available, otherwise generic name
|
||||
friendly_name = monitor_names.get(idx, f"Display {idx}")
|
||||
|
||||
# Use detected refresh rate or default to 60Hz
|
||||
refresh_rate = refresh_rates.get(idx, 60)
|
||||
|
||||
display_info = DisplayInfo(
|
||||
index=idx,
|
||||
name=friendly_name,
|
||||
width=monitor["width"],
|
||||
height=monitor["height"],
|
||||
x=monitor["left"],
|
||||
y=monitor["top"],
|
||||
is_primary=(idx == 0),
|
||||
refresh_rate=refresh_rate,
|
||||
)
|
||||
displays.append(display_info)
|
||||
|
||||
logger.debug(f"MSS detected {len(displays)} display(s)")
|
||||
return displays
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to detect displays with MSS: {e}")
|
||||
raise RuntimeError(f"Failed to detect displays: {e}")
|
||||
|
||||
def capture_display(self, display_index: int) -> ScreenCapture:
|
||||
"""Capture display using MSS.
|
||||
|
||||
Args:
|
||||
display_index: Index of display to capture (0-based)
|
||||
|
||||
Returns:
|
||||
ScreenCapture object with image data
|
||||
|
||||
Raises:
|
||||
RuntimeError: If not initialized
|
||||
ValueError: If display_index is invalid
|
||||
RuntimeError: If capture fails
|
||||
"""
|
||||
if not self._initialized:
|
||||
raise RuntimeError("Engine not initialized")
|
||||
|
||||
try:
|
||||
# mss monitors[0] is the combined screen, monitors[1+] are individual displays
|
||||
monitor_index = display_index + 1
|
||||
|
||||
if monitor_index >= len(self._sct.monitors):
|
||||
raise ValueError(
|
||||
f"Invalid display index {display_index}. "
|
||||
f"Available displays: 0-{len(self._sct.monitors) - 2}"
|
||||
)
|
||||
|
||||
monitor = self._sct.monitors[monitor_index]
|
||||
|
||||
# Capture screenshot
|
||||
screenshot = self._sct.grab(monitor)
|
||||
|
||||
# Convert to numpy array (RGB)
|
||||
img = Image.frombytes("RGB", screenshot.size, screenshot.rgb)
|
||||
img_array = np.array(img)
|
||||
|
||||
logger.debug(
|
||||
f"MSS captured display {display_index}: {monitor['width']}x{monitor['height']}"
|
||||
)
|
||||
|
||||
return ScreenCapture(
|
||||
image=img_array,
|
||||
width=monitor["width"],
|
||||
height=monitor["height"],
|
||||
display_index=display_index,
|
||||
)
|
||||
|
||||
except ValueError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to capture display {display_index} with MSS: {e}")
|
||||
raise RuntimeError(f"Screen capture failed: {e}")
|
||||
|
||||
@classmethod
|
||||
def is_available(cls) -> bool:
|
||||
"""Check if MSS is available.
|
||||
|
||||
MSS is cross-platform and should always be available.
|
||||
|
||||
Returns:
|
||||
True if mss library is available
|
||||
"""
|
||||
try:
|
||||
import mss
|
||||
return True
|
||||
except ImportError:
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def get_default_config(cls) -> Dict[str, Any]:
|
||||
"""Get default MSS configuration.
|
||||
|
||||
MSS has no configurable options.
|
||||
|
||||
Returns:
|
||||
Empty dict (MSS has no configuration)
|
||||
"""
|
||||
return {}
|
||||
348
server/src/wled_controller/core/capture_engines/wgc_engine.py
Normal file
348
server/src/wled_controller/core/capture_engines/wgc_engine.py
Normal file
@@ -0,0 +1,348 @@
|
||||
"""Windows Graphics Capture engine (Windows 10+, WGC API)."""
|
||||
|
||||
import gc
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import numpy as np
|
||||
|
||||
from wled_controller.core.capture_engines.base import (
|
||||
CaptureEngine,
|
||||
DisplayInfo,
|
||||
ScreenCapture,
|
||||
)
|
||||
from wled_controller.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class WGCEngine(CaptureEngine):
|
||||
"""Windows Graphics Capture engine.
|
||||
|
||||
Uses the windows-capture library which leverages Windows.Graphics.Capture API.
|
||||
This is Microsoft's recommended modern screen capture API for Windows 10+.
|
||||
|
||||
Features:
|
||||
- Cross-GPU support (works regardless of GPU routing)
|
||||
- Hardware cursor exclusion (no cursor flickering)
|
||||
- GPU-accelerated with direct texture sharing
|
||||
- Modern, future-proof API
|
||||
|
||||
Requires: Windows 10 1803+
|
||||
"""
|
||||
|
||||
ENGINE_TYPE = "wgc"
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
"""Initialize WGC engine.
|
||||
|
||||
Args:
|
||||
config: Engine configuration
|
||||
- monitor_index (int): Monitor index (default: 0)
|
||||
- capture_cursor (bool): Include cursor in capture (default: False)
|
||||
- draw_border (bool): Draw border around capture (default: False)
|
||||
"""
|
||||
super().__init__(config)
|
||||
self._wgc = None
|
||||
self._capture_instance = None
|
||||
self._capture_control = None
|
||||
self._latest_frame = None
|
||||
self._frame_lock = threading.Lock()
|
||||
self._frame_event = threading.Event()
|
||||
self._closed_event = threading.Event() # Signals when capture session is closed
|
||||
|
||||
def initialize(self) -> None:
|
||||
"""Initialize WGC capture.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If windows-capture not installed or initialization fails
|
||||
"""
|
||||
try:
|
||||
import windows_capture
|
||||
self._wgc = windows_capture
|
||||
except ImportError:
|
||||
raise RuntimeError(
|
||||
"windows-capture not installed. Install with: pip install windows-capture"
|
||||
)
|
||||
|
||||
# Clear events for fresh initialization
|
||||
self._frame_event.clear()
|
||||
self._closed_event.clear()
|
||||
|
||||
try:
|
||||
monitor_index = self.config.get("monitor_index", 0)
|
||||
capture_cursor = self.config.get("capture_cursor", False)
|
||||
# Note: draw_border is not supported by WGC API on most platforms
|
||||
|
||||
# WGC uses 1-based monitor indexing (1, 2, 3...) while we use 0-based (0, 1, 2...)
|
||||
wgc_monitor_index = monitor_index + 1
|
||||
|
||||
# Create capture instance
|
||||
# Note: draw_border parameter not supported on all platforms
|
||||
self._capture_instance = self._wgc.WindowsCapture(
|
||||
cursor_capture=capture_cursor,
|
||||
monitor_index=wgc_monitor_index,
|
||||
)
|
||||
|
||||
# Define event handlers as local functions first
|
||||
def on_frame_arrived(frame, capture_control):
|
||||
"""Called when a new frame is captured."""
|
||||
try:
|
||||
logger.debug("WGC frame callback triggered")
|
||||
|
||||
# Store capture_control reference for cleanup
|
||||
if self._capture_control is None:
|
||||
self._capture_control = capture_control
|
||||
|
||||
# Get frame buffer as numpy array
|
||||
frame_buffer = frame.frame_buffer
|
||||
width = frame.width
|
||||
height = frame.height
|
||||
|
||||
# Reshape to image dimensions (height, width, channels)
|
||||
# WGC provides BGRA format
|
||||
frame_array = frame_buffer.reshape((height, width, 4))
|
||||
|
||||
# Convert BGRA to RGB
|
||||
frame_rgb = frame_array[:, :, [2, 1, 0]] # Take BGR channels
|
||||
|
||||
# Store the latest frame
|
||||
with self._frame_lock:
|
||||
self._latest_frame = frame_rgb.copy()
|
||||
self._frame_event.set()
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing WGC frame: {e}", exc_info=True)
|
||||
|
||||
def on_closed():
|
||||
"""Called when capture session is closed."""
|
||||
logger.debug("WGC capture session closed callback triggered")
|
||||
# Signal that the capture session has fully closed and resources are released
|
||||
self._closed_event.set()
|
||||
|
||||
# Set handlers directly as attributes
|
||||
self._capture_instance.frame_handler = on_frame_arrived
|
||||
self._capture_instance.closed_handler = on_closed
|
||||
|
||||
# Start capture using free-threaded mode (non-blocking)
|
||||
logger.debug("Starting WGC capture (free-threaded mode)...")
|
||||
self._capture_instance.start_free_threaded()
|
||||
|
||||
# Wait for first frame to arrive (with timeout)
|
||||
logger.debug("Waiting for first WGC frame...")
|
||||
frame_received = self._frame_event.wait(timeout=5.0)
|
||||
|
||||
if not frame_received or self._latest_frame is None:
|
||||
raise RuntimeError(
|
||||
"WGC capture started but no frames received within 5 seconds. "
|
||||
"This may indicate the capture session failed to start or "
|
||||
"the display is not actively updating."
|
||||
)
|
||||
|
||||
self._initialized = True
|
||||
logger.info(
|
||||
f"WGC engine initialized (monitor={monitor_index}, "
|
||||
f"cursor={capture_cursor})"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize WGC: {e}", exc_info=True)
|
||||
raise RuntimeError(f"Failed to initialize WGC: {e}")
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""Cleanup WGC resources."""
|
||||
# For free-threaded captures, cleanup is tricky:
|
||||
# 1. Stop capture via capture_control if available
|
||||
# 2. Explicitly delete the capture instance (releases COM objects)
|
||||
# 3. Wait for resources to be freed (give Windows time to cleanup)
|
||||
# 4. Force garbage collection multiple times to ensure COM cleanup
|
||||
|
||||
if self._capture_control:
|
||||
try:
|
||||
logger.debug("Stopping WGC capture session via capture_control...")
|
||||
self._capture_control.stop()
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping WGC capture_control: {e}")
|
||||
finally:
|
||||
self._capture_control = None
|
||||
|
||||
# Explicitly delete the capture instance BEFORE waiting
|
||||
# This is critical for releasing COM objects and GPU resources
|
||||
if self._capture_instance:
|
||||
try:
|
||||
logger.debug("Explicitly deleting WGC capture instance...")
|
||||
instance = self._capture_instance
|
||||
self._capture_instance = None
|
||||
del instance
|
||||
logger.debug("WGC capture instance deleted")
|
||||
except Exception as e:
|
||||
logger.error(f"Error releasing WGC capture instance: {e}")
|
||||
self._capture_instance = None
|
||||
|
||||
# Force garbage collection multiple times to ensure COM cleanup
|
||||
# COM objects may require multiple GC passes to fully release
|
||||
logger.debug("Forcing garbage collection for COM cleanup...")
|
||||
for i in range(3):
|
||||
gc.collect()
|
||||
time.sleep(0.05) # Small delay between GC passes
|
||||
|
||||
logger.debug("Waiting for WGC resources to be fully released...")
|
||||
# Give Windows time to clean up the capture session and remove border
|
||||
time.sleep(0.3)
|
||||
|
||||
# Final GC pass
|
||||
gc.collect()
|
||||
logger.debug("Final garbage collection completed")
|
||||
|
||||
with self._frame_lock:
|
||||
self._latest_frame = None
|
||||
self._frame_event.clear()
|
||||
self._closed_event.clear()
|
||||
|
||||
self._initialized = False
|
||||
logger.info("WGC engine cleaned up")
|
||||
|
||||
def get_available_displays(self) -> List[DisplayInfo]:
|
||||
"""Get list of available displays using MSS.
|
||||
|
||||
Note: WGC doesn't provide a direct API for enumerating monitors,
|
||||
so we use MSS for display detection.
|
||||
|
||||
Returns:
|
||||
List of DisplayInfo objects
|
||||
|
||||
Raises:
|
||||
RuntimeError: If detection fails
|
||||
"""
|
||||
try:
|
||||
import mss
|
||||
|
||||
with mss.mss() as sct:
|
||||
displays = []
|
||||
# Skip monitor 0 (all monitors combined)
|
||||
for i, monitor in enumerate(sct.monitors[1:], start=0):
|
||||
displays.append(
|
||||
DisplayInfo(
|
||||
index=i,
|
||||
name=f"Monitor {i+1}",
|
||||
width=monitor["width"],
|
||||
height=monitor["height"],
|
||||
x=monitor["left"],
|
||||
y=monitor["top"],
|
||||
is_primary=(i == 0),
|
||||
refresh_rate=60,
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug(f"WGC detected {len(displays)} display(s)")
|
||||
return displays
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to detect displays: {e}")
|
||||
raise RuntimeError(f"Failed to detect displays: {e}")
|
||||
|
||||
def capture_display(self, display_index: int) -> ScreenCapture:
|
||||
"""Capture display using WGC.
|
||||
|
||||
Args:
|
||||
display_index: Index of display to capture (0-based)
|
||||
|
||||
Returns:
|
||||
ScreenCapture object with image data
|
||||
|
||||
Raises:
|
||||
RuntimeError: If not initialized
|
||||
ValueError: If display_index doesn't match configured monitor
|
||||
RuntimeError: If capture fails or no frame available
|
||||
"""
|
||||
if not self._initialized:
|
||||
raise RuntimeError("Engine not initialized")
|
||||
|
||||
# WGC is configured for a specific monitor
|
||||
configured_monitor = self.config.get("monitor_index", 0)
|
||||
|
||||
if display_index != configured_monitor:
|
||||
raise ValueError(
|
||||
f"WGC engine is configured for monitor {configured_monitor}, "
|
||||
f"cannot capture display {display_index}. Create a new template "
|
||||
f"with monitor_index={display_index} to capture this display."
|
||||
)
|
||||
|
||||
try:
|
||||
# Get the latest frame
|
||||
with self._frame_lock:
|
||||
if self._latest_frame is None:
|
||||
raise RuntimeError(
|
||||
"No frame available yet. The capture may not have started or "
|
||||
"the screen hasn't updated. Wait a moment and try again."
|
||||
)
|
||||
frame = self._latest_frame.copy()
|
||||
|
||||
logger.debug(
|
||||
f"WGC captured display {display_index}: "
|
||||
f"{frame.shape[1]}x{frame.shape[0]}"
|
||||
)
|
||||
|
||||
return ScreenCapture(
|
||||
image=frame,
|
||||
width=frame.shape[1],
|
||||
height=frame.shape[0],
|
||||
display_index=display_index,
|
||||
)
|
||||
|
||||
except ValueError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to capture display {display_index} with WGC: {e}")
|
||||
raise RuntimeError(f"Screen capture failed: {e}")
|
||||
|
||||
@classmethod
|
||||
def is_available(cls) -> bool:
|
||||
"""Check if WGC is available.
|
||||
|
||||
WGC requires Windows 10 1803+ and the windows-capture package.
|
||||
|
||||
Returns:
|
||||
True if windows-capture is available on this system
|
||||
"""
|
||||
# Check platform
|
||||
if sys.platform != "win32":
|
||||
return False
|
||||
|
||||
# Check Windows version (Windows 10 1803 = version 10.0.17134)
|
||||
try:
|
||||
import platform
|
||||
version = platform.version()
|
||||
# Parse version string like "10.0.19045"
|
||||
parts = version.split(".")
|
||||
if len(parts) >= 3:
|
||||
major = int(parts[0])
|
||||
minor = int(parts[1])
|
||||
build = int(parts[2])
|
||||
# Check for Windows 10 1803+ (build 17134+)
|
||||
if major < 10 or (major == 10 and minor == 0 and build < 17134):
|
||||
return False
|
||||
except Exception:
|
||||
# If we can't parse version, assume it might work
|
||||
pass
|
||||
|
||||
# Check if windows-capture is installed
|
||||
try:
|
||||
import windows_capture
|
||||
return True
|
||||
except ImportError:
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def get_default_config(cls) -> Dict[str, Any]:
|
||||
"""Get default WGC configuration.
|
||||
|
||||
Returns:
|
||||
Default config dict with WGC options
|
||||
"""
|
||||
return {
|
||||
"monitor_index": 0, # Primary monitor
|
||||
"capture_cursor": False, # Exclude cursor (hardware exclusion)
|
||||
"draw_border": False, # Don't draw border around capture
|
||||
}
|
||||
Reference in New Issue
Block a user