Implement WGC multi-monitor simultaneous capture support
Some checks failed
Validate / validate (push) Failing after 9s

- Refactored WGC engine to maintain separate capture instances per monitor
- Each monitor gets dedicated instance, control, frame storage, and events
- Supports simultaneous capture from multiple monitors using same template
- Fixed template test endpoint to avoid redundant monitor 0 initialization
- Removed monitor_index from WGC template configuration (monitor-agnostic)

This enables using the same WGC template for multiple devices capturing
from different monitors without conflicts or unexpected borders.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-09 19:12:21 +03:00
parent c371e07e81
commit 5ce4dba925
2 changed files with 533 additions and 96 deletions

View File

@@ -1,10 +1,15 @@
"""API routes and endpoints."""
import base64
import io
import sys
import time
from datetime import datetime
from typing import List
from typing import List, Dict, Any
import httpx
import numpy as np
from PIL import Image
from fastapi import APIRouter, HTTPException, Depends
from wled_controller import __version__
@@ -24,6 +29,17 @@ from wled_controller.api.schemas import (
CalibrationTestModeResponse,
ProcessingState,
MetricsResponse,
TemplateCreate,
TemplateUpdate,
TemplateResponse,
TemplateListResponse,
EngineInfo,
EngineListResponse,
TemplateTestRequest,
TemplateTestResponse,
CaptureImage,
BorderExtraction,
PerformanceMetrics,
)
from wled_controller.config import get_config
from wled_controller.core.processor_manager import ProcessorManager, ProcessingSettings
@@ -32,6 +48,8 @@ from wled_controller.core.calibration import (
calibration_to_dict,
)
from wled_controller.storage import DeviceStore
from wled_controller.storage.template_store import TemplateStore
from wled_controller.core.capture_engines import EngineRegistry
from wled_controller.utils import get_logger
from wled_controller.core.screen_capture import get_available_displays
@@ -41,6 +59,7 @@ router = APIRouter()
# Global instances (initialized in main.py)
_device_store: DeviceStore | None = None
_template_store: TemplateStore | None = None
_processor_manager: ProcessorManager | None = None
@@ -51,6 +70,13 @@ def get_device_store() -> DeviceStore:
return _device_store
def get_template_store() -> TemplateStore:
"""Get template store dependency."""
if _template_store is None:
raise RuntimeError("Template store not initialized")
return _template_store
def get_processor_manager() -> ProcessorManager:
"""Get processor manager dependency."""
if _processor_manager is None:
@@ -58,10 +84,15 @@ def get_processor_manager() -> ProcessorManager:
return _processor_manager
def init_dependencies(device_store: DeviceStore, processor_manager: ProcessorManager):
def init_dependencies(
device_store: DeviceStore,
template_store: TemplateStore,
processor_manager: ProcessorManager,
):
"""Initialize global dependencies."""
global _device_store, _processor_manager
global _device_store, _template_store, _processor_manager
_device_store = device_store
_template_store = template_store
_processor_manager = processor_manager
@@ -214,6 +245,7 @@ async def create_device(
state_check_interval=device.settings.state_check_interval,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
capture_template_id=device.capture_template_id,
created_at=device.created_at,
updated_at=device.updated_at,
)
@@ -248,6 +280,7 @@ async def list_devices(
state_check_interval=device.settings.state_check_interval,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
capture_template_id=device.capture_template_id,
created_at=device.created_at,
updated_at=device.updated_at,
)
@@ -291,6 +324,7 @@ async def get_device(
state_check_interval=device.settings.state_check_interval,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
capture_template_id=device.capture_template_id,
created_at=device.created_at,
updated_at=device.updated_at,
)
@@ -302,16 +336,53 @@ async def update_device(
update_data: DeviceUpdate,
_auth: AuthRequired,
store: DeviceStore = Depends(get_device_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Update device information."""
try:
# Check if template changed and device is processing (for hot-swap)
old_device = store.get_device(device_id)
template_changed = (
update_data.capture_template_id is not None
and update_data.capture_template_id != old_device.capture_template_id
)
was_processing = manager.is_processing(device_id)
# Update device
device = store.update_device(
device_id=device_id,
name=update_data.name,
url=update_data.url,
enabled=update_data.enabled,
capture_template_id=update_data.capture_template_id,
)
# Hot-swap: If template changed and device was processing, restart it
if template_changed and was_processing:
logger.info(f"Hot-swapping template for device {device_id}")
try:
# Stop current processing
await manager.stop_processing(device_id)
# Update processor with new template
manager.remove_device(device_id)
manager.add_device(
device_id=device.id,
device_url=device.url,
led_count=device.led_count,
settings=device.settings,
calibration=device.calibration,
capture_template_id=device.capture_template_id,
)
# Restart processing
await manager.start_processing(device_id)
logger.info(f"Successfully hot-swapped template for device {device_id}")
except Exception as e:
logger.error(f"Error during template hot-swap: {e}")
# Device is stopped but updated - user can manually restart
return DeviceResponse(
id=device.id,
name=device.name,
@@ -327,6 +398,7 @@ async def update_device(
state_check_interval=device.settings.state_check_interval,
),
calibration=CalibrationSchema(**calibration_to_dict(device.calibration)),
capture_template_id=device.capture_template_id,
created_at=device.created_at,
updated_at=device.updated_at,
)
@@ -626,3 +698,322 @@ async def get_metrics(
except Exception as e:
logger.error(f"Failed to get metrics: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== CAPTURE TEMPLATE ENDPOINTS =====
@router.get("/api/v1/capture-templates", response_model=TemplateListResponse, tags=["Templates"])
async def list_templates(
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""List all capture templates."""
try:
templates = template_store.get_all_templates()
template_responses = [
TemplateResponse(
id=t.id,
name=t.name,
engine_type=t.engine_type,
engine_config=t.engine_config,
is_default=t.is_default,
created_at=t.created_at,
updated_at=t.updated_at,
description=t.description,
)
for t in templates
]
return TemplateListResponse(
templates=template_responses,
count=len(template_responses),
)
except Exception as e:
logger.error(f"Failed to list templates: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/capture-templates", response_model=TemplateResponse, tags=["Templates"], status_code=201)
async def create_template(
template_data: TemplateCreate,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""Create a new capture template."""
try:
template = template_store.create_template(
name=template_data.name,
engine_type=template_data.engine_type,
engine_config=template_data.engine_config,
description=template_data.description,
)
return TemplateResponse(
id=template.id,
name=template.name,
engine_type=template.engine_type,
engine_config=template.engine_config,
is_default=template.is_default,
created_at=template.created_at,
updated_at=template.updated_at,
description=template.description,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to create template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/capture-templates/{template_id}", response_model=TemplateResponse, tags=["Templates"])
async def get_template(
template_id: str,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""Get template by ID."""
template = template_store.get_template(template_id)
if not template:
raise HTTPException(status_code=404, detail=f"Template {template_id} not found")
return TemplateResponse(
id=template.id,
name=template.name,
engine_type=template.engine_type,
engine_config=template.engine_config,
is_default=template.is_default,
created_at=template.created_at,
updated_at=template.updated_at,
description=template.description,
)
@router.put("/api/v1/capture-templates/{template_id}", response_model=TemplateResponse, tags=["Templates"])
async def update_template(
template_id: str,
update_data: TemplateUpdate,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
):
"""Update a template."""
try:
template = template_store.update_template(
template_id=template_id,
name=update_data.name,
engine_config=update_data.engine_config,
description=update_data.description,
)
return TemplateResponse(
id=template.id,
name=template.name,
engine_type=template.engine_type,
engine_config=template.engine_config,
is_default=template.is_default,
created_at=template.created_at,
updated_at=template.updated_at,
description=template.description,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to update template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/capture-templates/{template_id}", status_code=204, tags=["Templates"])
async def delete_template(
template_id: str,
_auth: AuthRequired,
template_store: TemplateStore = Depends(get_template_store),
device_store: DeviceStore = Depends(get_device_store),
):
"""Delete a template.
Validates that no devices are currently using this template before deletion.
"""
try:
# Check if any devices are using this template
devices_using_template = []
for device in device_store.get_all_devices():
if device.capture_template_id == template_id:
devices_using_template.append(device.name)
if devices_using_template:
device_list = ", ".join(devices_using_template)
raise HTTPException(
status_code=409,
detail=f"Cannot delete template: it is currently assigned to the following device(s): {device_list}. "
f"Please reassign these devices to a different template before deleting."
)
# Proceed with deletion
template_store.delete_template(template_id)
except HTTPException:
raise # Re-raise HTTP exceptions as-is
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to delete template: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/capture-engines", response_model=EngineListResponse, tags=["Templates"])
async def list_engines(_auth: AuthRequired):
"""List available capture engines on this system.
Returns all registered engines that are available on the current platform.
"""
try:
available_engine_types = EngineRegistry.get_available_engines()
engines = []
for engine_type in available_engine_types:
engine_class = EngineRegistry.get_engine(engine_type)
engines.append(
EngineInfo(
type=engine_type,
name=engine_type.upper(),
default_config=engine_class.get_default_config(),
available=True,
)
)
return EngineListResponse(engines=engines, count=len(engines))
except Exception as e:
logger.error(f"Failed to list engines: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/capture-templates/test", response_model=TemplateTestResponse, tags=["Templates"])
async def test_template(
test_request: TemplateTestRequest,
_auth: AuthRequired,
processor_manager: ProcessorManager = Depends(get_processor_manager),
device_store: DeviceStore = Depends(get_device_store),
):
"""Test a capture template configuration.
Temporarily instantiates an engine with the provided configuration,
captures frames for the specified duration, and returns actual FPS metrics.
"""
engine = None
try:
# Validate engine type
if test_request.engine_type not in EngineRegistry.get_available_engines():
raise HTTPException(
status_code=400,
detail=f"Engine '{test_request.engine_type}' is not available on this system"
)
# Check if display is already being captured
locked_device_id = processor_manager.get_display_lock_info(test_request.display_index)
if locked_device_id:
# Get device info for better error message
try:
device = device_store.get_device(locked_device_id)
device_name = device.name
except Exception:
device_name = locked_device_id
raise HTTPException(
status_code=409,
detail=(
f"Display {test_request.display_index} is currently being captured by device "
f"'{device_name}'. Please stop the device processing before testing this template."
)
)
# Create engine (initialization happens on first capture)
engine = EngineRegistry.create_engine(test_request.engine_type, test_request.engine_config)
# Run sustained capture test
logger.info(f"Starting {test_request.capture_duration}s capture test with {test_request.engine_type}")
frame_count = 0
total_capture_time = 0.0
last_frame = None
start_time = time.perf_counter()
end_time = start_time + test_request.capture_duration
while time.perf_counter() < end_time:
capture_start = time.perf_counter()
screen_capture = engine.capture_display(test_request.display_index)
capture_elapsed = time.perf_counter() - capture_start
total_capture_time += capture_elapsed
frame_count += 1
last_frame = screen_capture
actual_duration = time.perf_counter() - start_time
logger.info(f"Captured {frame_count} frames in {actual_duration:.2f}s")
# Use the last captured frame for preview
if last_frame is None:
raise RuntimeError("No frames captured during test")
# Convert numpy array to PIL Image
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
else:
raise ValueError("Unexpected image format from engine")
# Create thumbnail (640px wide, maintain aspect ratio)
thumbnail_width = 640
aspect_ratio = pil_image.height / pil_image.width
thumbnail_height = int(thumbnail_width * aspect_ratio)
thumbnail = pil_image.copy()
thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS)
# Encode full capture thumbnail as JPEG
img_buffer = io.BytesIO()
thumbnail.save(img_buffer, format='JPEG', quality=85)
img_buffer.seek(0)
full_capture_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
full_capture_data_uri = f"data:image/jpeg;base64,{full_capture_b64}"
# Calculate metrics
actual_fps = frame_count / actual_duration if actual_duration > 0 else 0
avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0
width, height = pil_image.size
return TemplateTestResponse(
full_capture=CaptureImage(
image=full_capture_data_uri,
width=width,
height=height,
thumbnail_width=thumbnail_width,
thumbnail_height=thumbnail_height,
),
border_extraction=None,
performance=PerformanceMetrics(
capture_duration_s=actual_duration,
frame_count=frame_count,
actual_fps=actual_fps,
avg_capture_time_ms=avg_capture_time_ms,
),
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
raise HTTPException(status_code=500, detail=f"Engine error: {str(e)}")
except Exception as e:
logger.error(f"Failed to test template: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
finally:
# Always cleanup engine
if engine:
try:
engine.cleanup()
except Exception as e:
logger.error(f"Error cleaning up test engine: {e}")

View File

@@ -40,57 +40,70 @@ class WGCEngine(CaptureEngine):
Args:
config: Engine configuration
- monitor_index (int): Monitor index (default: 0)
- capture_cursor (bool): Include cursor in capture (default: False)
- draw_border (bool): Draw border around capture (default: False)
Note: monitor_index is NOT in config - WGC maintains separate instances per monitor
to support simultaneous capture from multiple monitors.
"""
super().__init__(config)
self._wgc = None
self._capture_instance = None
self._capture_control = None
self._latest_frame = None
# Per-monitor capture instances: {monitor_index: (instance, control, frame, frame_event)}
self._monitor_captures = {}
self._frame_lock = threading.Lock()
self._frame_event = threading.Event()
self._closed_event = threading.Event() # Signals when capture session is closed
def initialize(self) -> None:
"""Initialize WGC capture.
def initialize(self, monitor_index: int = 0) -> None:
"""Initialize WGC capture for a specific monitor.
Maintains separate capture instances per monitor to support simultaneous
capture from multiple monitors.
Args:
monitor_index: Monitor index to capture (0-based)
Raises:
RuntimeError: If windows-capture not installed or initialization fails
"""
try:
import windows_capture
self._wgc = windows_capture
except ImportError:
raise RuntimeError(
"windows-capture not installed. Install with: pip install windows-capture"
)
# Import windows_capture if not already imported
if self._wgc is None:
try:
import windows_capture
self._wgc = windows_capture
except ImportError:
raise RuntimeError(
"windows-capture not installed. Install with: pip install windows-capture"
)
# Clear events for fresh initialization
self._frame_event.clear()
self._closed_event.clear()
# Skip if already initialized for this monitor
if monitor_index in self._monitor_captures:
logger.debug(f"WGC already initialized for monitor {monitor_index}")
return
try:
monitor_index = self.config.get("monitor_index", 0)
capture_cursor = self.config.get("capture_cursor", False)
# Note: draw_border is not supported by WGC API on most platforms
# WGC uses 1-based monitor indexing (1, 2, 3...) while we use 0-based (0, 1, 2...)
wgc_monitor_index = monitor_index + 1
# Create per-monitor events and storage
frame_event = threading.Event()
closed_event = threading.Event()
latest_frame = None
# Create capture instance
# Note: draw_border parameter not supported on all platforms
self._capture_instance = self._wgc.WindowsCapture(
capture_instance = self._wgc.WindowsCapture(
cursor_capture=capture_cursor,
monitor_index=wgc_monitor_index,
)
# Define event handlers as local functions first
# Define event handlers as local functions that capture monitor_index
def on_frame_arrived(frame, capture_control):
"""Called when a new frame is captured."""
nonlocal latest_frame
try:
logger.debug("WGC frame callback triggered")
logger.debug(f"WGC frame callback triggered for monitor {monitor_index}")
# Get frame buffer as numpy array
frame_buffer = frame.frame_buffer
@@ -104,97 +117,128 @@ class WGCEngine(CaptureEngine):
# Convert BGRA to RGB
frame_rgb = frame_array[:, :, [2, 1, 0]] # Take BGR channels
# Store the latest frame
# Store the latest frame for this monitor
with self._frame_lock:
self._latest_frame = frame_rgb.copy()
self._frame_event.set()
if monitor_index in self._monitor_captures:
self._monitor_captures[monitor_index]['latest_frame'] = frame_rgb.copy()
self._monitor_captures[monitor_index]['frame_event'].set()
except Exception as e:
logger.error(f"Error processing WGC frame: {e}", exc_info=True)
logger.error(f"Error processing WGC frame for monitor {monitor_index}: {e}", exc_info=True)
def on_closed():
"""Called when capture session is closed."""
logger.debug("WGC capture session closed callback triggered")
logger.debug(f"WGC capture session closed for monitor {monitor_index}")
# Signal that the capture session has fully closed and resources are released
self._closed_event.set()
with self._frame_lock:
if monitor_index in self._monitor_captures:
self._monitor_captures[monitor_index]['closed_event'].set()
# Set handlers directly as attributes
self._capture_instance.frame_handler = on_frame_arrived
self._capture_instance.closed_handler = on_closed
capture_instance.frame_handler = on_frame_arrived
capture_instance.closed_handler = on_closed
# Start capture using free-threaded mode (non-blocking)
# IMPORTANT: start_free_threaded() returns a CaptureControl object for cleanup
logger.debug("Starting WGC capture (free-threaded mode)...")
self._capture_control = self._capture_instance.start_free_threaded()
logger.debug(f"Starting WGC capture for monitor {monitor_index} (free-threaded mode)...")
capture_control = capture_instance.start_free_threaded()
# Store all per-monitor data
self._monitor_captures[monitor_index] = {
'instance': capture_instance,
'control': capture_control,
'latest_frame': None,
'frame_event': frame_event,
'closed_event': closed_event,
}
# Wait for first frame to arrive (with timeout)
logger.debug("Waiting for first WGC frame...")
frame_received = self._frame_event.wait(timeout=5.0)
logger.debug(f"Waiting for first WGC frame from monitor {monitor_index}...")
frame_received = frame_event.wait(timeout=5.0)
if not frame_received or self._latest_frame is None:
if not frame_received or self._monitor_captures[monitor_index]['latest_frame'] is None:
# Cleanup on failure
with self._frame_lock:
if monitor_index in self._monitor_captures:
del self._monitor_captures[monitor_index]
raise RuntimeError(
"WGC capture started but no frames received within 5 seconds. "
f"WGC capture started for monitor {monitor_index} but no frames received within 5 seconds. "
"This may indicate the capture session failed to start or "
"the display is not actively updating."
)
self._initialized = True
logger.info(
f"WGC engine initialized (monitor={monitor_index}, "
f"cursor={capture_cursor})"
)
except Exception as e:
logger.error(f"Failed to initialize WGC: {e}", exc_info=True)
raise RuntimeError(f"Failed to initialize WGC: {e}")
logger.error(f"Failed to initialize WGC for monitor {monitor_index}: {e}", exc_info=True)
raise RuntimeError(f"Failed to initialize WGC for monitor {monitor_index}: {e}")
def cleanup(self) -> None:
"""Cleanup WGC resources."""
"""Cleanup WGC resources for all monitors."""
# Proper cleanup for free-threaded captures:
# 1. Stop capture via CaptureControl.stop() (signals thread to stop)
# 2. Wait for thread to finish using CaptureControl.wait() (blocks until done)
# 3. Delete capture instance (releases COM objects)
# 4. Force garbage collection (ensures COM cleanup)
if self._capture_control:
try:
logger.debug("Stopping WGC capture thread...")
self._capture_control.stop()
with self._frame_lock:
monitors_to_cleanup = list(self._monitor_captures.keys())
logger.debug("Waiting for WGC capture thread to finish...")
# This will block until the capture thread actually finishes
# This is the CORRECT way to wait for cleanup (not a timeout!)
self._capture_control.wait()
logger.debug("WGC capture thread finished successfully")
except Exception as e:
logger.error(f"Error during WGC capture control cleanup: {e}", exc_info=True)
finally:
self._capture_control = None
for monitor_index in monitors_to_cleanup:
logger.debug(f"Cleaning up WGC resources for monitor {monitor_index}...")
# Now that the thread has stopped, delete the capture instance
if self._capture_instance:
try:
logger.debug("Deleting WGC capture instance...")
instance = self._capture_instance
self._capture_instance = None
del instance
logger.debug("WGC capture instance deleted")
except Exception as e:
logger.error(f"Error deleting WGC capture instance: {e}", exc_info=True)
self._capture_instance = None
with self._frame_lock:
if monitor_index not in self._monitor_captures:
continue
monitor_data = self._monitor_captures[monitor_index]
# Stop and wait for capture thread
capture_control = monitor_data.get('control')
if capture_control:
try:
logger.debug(f"Stopping WGC capture thread for monitor {monitor_index}...")
capture_control.stop()
logger.debug(f"Waiting for WGC capture thread to finish (monitor {monitor_index})...")
# This will block until the capture thread actually finishes
capture_control.wait()
logger.debug(f"WGC capture thread finished successfully for monitor {monitor_index}")
except Exception as e:
logger.error(f"Error during WGC capture control cleanup for monitor {monitor_index}: {e}", exc_info=True)
# Delete capture instance
capture_instance = monitor_data.get('instance')
if capture_instance:
try:
logger.debug(f"Deleting WGC capture instance for monitor {monitor_index}...")
del capture_instance
logger.debug(f"WGC capture instance deleted for monitor {monitor_index}")
except Exception as e:
logger.error(f"Error deleting WGC capture instance for monitor {monitor_index}: {e}", exc_info=True)
# Clear events
frame_event = monitor_data.get('frame_event')
if frame_event:
frame_event.clear()
closed_event = monitor_data.get('closed_event')
if closed_event:
closed_event.clear()
# Remove from dictionary
with self._frame_lock:
if monitor_index in self._monitor_captures:
del self._monitor_captures[monitor_index]
logger.info(f"WGC engine cleaned up for monitor {monitor_index}")
# Force garbage collection to release COM objects
logger.debug("Running garbage collection for COM cleanup...")
gc.collect()
logger.debug("Garbage collection completed")
with self._frame_lock:
self._latest_frame = None
self._frame_event.clear()
self._closed_event.clear()
self._initialized = False
logger.info("WGC engine cleaned up")
def get_available_displays(self) -> List[DisplayInfo]:
"""Get list of available displays using MSS.
@@ -237,6 +281,9 @@ class WGCEngine(CaptureEngine):
def capture_display(self, display_index: int) -> ScreenCapture:
"""Capture display using WGC.
WGC dynamically initializes for the requested display if needed.
Supports simultaneous capture from multiple monitors.
Args:
display_index: Index of display to capture (0-based)
@@ -244,32 +291,29 @@ class WGCEngine(CaptureEngine):
ScreenCapture object with image data
Raises:
RuntimeError: If not initialized
ValueError: If display_index doesn't match configured monitor
RuntimeError: If capture fails or no frame available
RuntimeError: If initialization or capture fails
"""
if not self._initialized:
raise RuntimeError("Engine not initialized")
# WGC is configured for a specific monitor
configured_monitor = self.config.get("monitor_index", 0)
if display_index != configured_monitor:
raise ValueError(
f"WGC engine is configured for monitor {configured_monitor}, "
f"cannot capture display {display_index}. Create a new template "
f"with monitor_index={display_index} to capture this display."
)
# Initialize for this monitor if not already initialized
self.initialize(display_index)
try:
# Get the latest frame
# Get the latest frame for this monitor
with self._frame_lock:
if self._latest_frame is None:
if display_index not in self._monitor_captures:
raise RuntimeError(
"No frame available yet. The capture may not have started or "
"the screen hasn't updated. Wait a moment and try again."
f"Monitor {display_index} not initialized. This should not happen."
)
frame = self._latest_frame.copy()
monitor_data = self._monitor_captures[display_index]
latest_frame = monitor_data.get('latest_frame')
if latest_frame is None:
raise RuntimeError(
f"No frame available yet for monitor {display_index}. "
"The capture may not have started or the screen hasn't updated. "
"Wait a moment and try again."
)
frame = latest_frame.copy()
logger.debug(
f"WGC captured display {display_index}: "
@@ -330,11 +374,13 @@ class WGCEngine(CaptureEngine):
def get_default_config(cls) -> Dict[str, Any]:
"""Get default WGC configuration.
Note: monitor_index is NOT in config - WGC dynamically initializes
for the requested monitor at capture time.
Returns:
Default config dict with WGC options
"""
return {
"monitor_index": 0, # Primary monitor
"capture_cursor": False, # Exclude cursor (hardware exclusion)
"draw_border": False, # Don't draw border around capture
}