Fix DDP streaming and capture thread safety

- Fix WLED DDP config: use lor=0 (live overrides effects), remove
  live flag (read-only, causes issues on 0.15.x), disable PUSH flag
  which broke rendering on WLED 0.15.x
- Use dedicated single-thread executor for capture engine calls to
  fix thread-local state issues with BetterCam/MSS/DXcam
- Sync processor state on stream/template change even when stopped,
  preventing stale engine references on next start
- Add diagnostic logging for frame sends and DDP packets

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-12 01:27:01 +03:00
parent 4c21ae5178
commit a4991c884a
4 changed files with 75 additions and 28 deletions

View File

@@ -427,14 +427,29 @@ async def update_device(
picture_stream_id=update_data.picture_stream_id, picture_stream_id=update_data.picture_stream_id,
) )
# Hot-swap: If stream/template changed and device was processing, restart it # Sync processor state when stream/template changed
if (stream_changed or template_changed) and was_processing: if stream_changed or template_changed:
logger.info(f"Hot-swapping stream/template for device {device_id}") if was_processing:
try: # Hot-swap: restart with new settings
# Stop current processing logger.info(f"Hot-swapping stream/template for device {device_id}")
await manager.stop_processing(device_id) try:
await manager.stop_processing(device_id)
# Update processor with new settings manager.remove_device(device_id)
manager.add_device(
device_id=device.id,
device_url=device.url,
led_count=device.led_count,
settings=device.settings,
calibration=device.calibration,
capture_template_id=device.capture_template_id,
picture_stream_id=device.picture_stream_id,
)
await manager.start_processing(device_id)
logger.info(f"Successfully hot-swapped stream/template for device {device_id}")
except Exception as e:
logger.error(f"Error during template hot-swap: {e}")
else:
# Not processing — update processor state so next start uses new values
manager.remove_device(device_id) manager.remove_device(device_id)
manager.add_device( manager.add_device(
device_id=device.id, device_id=device.id,
@@ -446,14 +461,6 @@ async def update_device(
picture_stream_id=device.picture_stream_id, picture_stream_id=device.picture_stream_id,
) )
# Restart processing
await manager.start_processing(device_id)
logger.info(f"Successfully hot-swapped stream/template for device {device_id}")
except Exception as e:
logger.error(f"Error during template hot-swap: {e}")
# Device is stopped but updated - user can manually restart
return DeviceResponse( return DeviceResponse(
id=device.id, id=device.id,
name=device.name, name=device.name,

View File

@@ -207,9 +207,9 @@ class DDPClient:
# Split into multiple packets if needed # Split into multiple packets if needed
num_packets = (total_bytes + bytes_per_packet - 1) // bytes_per_packet num_packets = (total_bytes + bytes_per_packet - 1) // bytes_per_packet
logger.debug( logger.info(
f"Sending {len(pixels)} pixels ({total_bytes} bytes) " f"DDP: Sending {len(pixels)} pixels ({total_bytes} bytes) "
f"in {num_packets} DDP packet(s)" f"in {num_packets} packet(s) to {self.host}:{self.port}"
) )
for i in range(num_packets): for i in range(num_packets):
@@ -221,10 +221,11 @@ class DDPClient:
# Increment sequence number # Increment sequence number
self._sequence = (self._sequence + 1) % 256 self._sequence = (self._sequence + 1) % 256
# Build and send packet (set PUSH on last packet) # Build and send packet (no PUSH flag — WLED 0.15.x
# handles DDP without it; adding PUSH broke rendering)
packet = self._build_ddp_packet( packet = self._build_ddp_packet(
chunk, offset=start, chunk, offset=start,
sequence=self._sequence, push=is_last, sequence=self._sequence, push=False,
) )
self._transport.sendto(packet) self._transport.sendto(packet)

View File

@@ -1,6 +1,7 @@
"""Processing manager for coordinating screen capture and WLED updates.""" """Processing manager for coordinating screen capture and WLED updates."""
import asyncio import asyncio
import concurrent.futures
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
@@ -115,6 +116,10 @@ class ProcessorState:
static_image: Optional[np.ndarray] = None static_image: Optional[np.ndarray] = None
image_pool: Optional[ImagePool] = None image_pool: Optional[ImagePool] = None
filter_instances: Optional[List[PostprocessingFilter]] = None filter_instances: Optional[List[PostprocessingFilter]] = None
# Dedicated single-thread executor for capture engine calls.
# Capture libraries (BetterCam, MSS, DXcam) use thread-local state,
# so all calls must run on the same thread.
capture_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None
class ProcessorManager: class ProcessorManager:
@@ -416,11 +421,23 @@ class ProcessorManager:
engine_type = state.resolved_engine_type or "mss" engine_type = state.resolved_engine_type or "mss"
engine_config = state.resolved_engine_config or {} engine_config = state.resolved_engine_config or {}
engine = EngineRegistry.create_engine(engine_type, engine_config) engine = EngineRegistry.create_engine(engine_type, engine_config)
engine.initialize()
# Create a dedicated single-thread executor for capture calls.
# Capture libraries use thread-local state (DXGI contexts, GDI DCs)
# so initialize + capture + cleanup must all run on the same thread.
state.capture_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=1, thread_name_prefix=f"capture-{device_id}"
)
loop = asyncio.get_event_loop()
await loop.run_in_executor(state.capture_executor, engine.initialize)
state.capture_engine = engine state.capture_engine = engine
logger.info(f"Initialized capture engine for device {device_id}: {engine_type}") logger.info(f"Initialized capture engine for device {device_id}: {engine_type}")
except Exception as e: except Exception as e:
logger.error(f"Failed to initialize capture engine for device {device_id}: {e}") logger.error(f"Failed to initialize capture engine for device {device_id}: {e}")
if state.capture_executor:
state.capture_executor.shutdown(wait=False)
state.capture_executor = None
if state.wled_client: if state.wled_client:
await state.wled_client.disconnect() await state.wled_client.disconnect()
raise RuntimeError(f"Failed to initialize capture engine: {e}") raise RuntimeError(f"Failed to initialize capture engine: {e}")
@@ -476,9 +493,20 @@ class ProcessorManager:
await state.wled_client.close() await state.wled_client.close()
state.wled_client = None state.wled_client = None
# Cleanup capture engine # Cleanup capture engine on the same dedicated thread it was created on
if state.capture_engine: if state.capture_engine:
state.capture_engine.cleanup() if state.capture_executor:
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(
state.capture_executor, state.capture_engine.cleanup
)
except Exception as e:
logger.warning(f"Error cleaning up capture engine: {e}")
state.capture_executor.shutdown(wait=False)
state.capture_executor = None
else:
state.capture_engine.cleanup()
state.capture_engine = None state.capture_engine = None
# Release cached static image # Release cached static image
@@ -524,6 +552,8 @@ class ProcessorManager:
frame_time = 1.0 / target_fps frame_time = 1.0 / target_fps
fps_samples = [] fps_samples = []
loop = asyncio.get_event_loop()
capture_executor = state.capture_executor # dedicated single-thread executor
def _apply_filters(image): def _apply_filters(image):
"""Apply all postprocessing filters to the captured image.""" """Apply all postprocessing filters to the captured image."""
@@ -550,13 +580,16 @@ class ProcessorManager:
image=state.static_image.copy(), width=w, height=h, display_index=-1 image=state.static_image.copy(), width=w, height=h, display_index=-1
) )
else: else:
capture = await asyncio.to_thread( capture = await loop.run_in_executor(
capture_executor,
state.capture_engine.capture_display, state.capture_engine.capture_display,
display_index display_index
) )
# Skip processing if no new frame (screen unchanged) # Skip processing if no new frame (screen unchanged)
if capture is None: if capture is None:
if state.metrics.frames_processed == 0:
logger.info(f"Capture returned None for {device_id} (no new frame yet)")
await asyncio.sleep(frame_time) await asyncio.sleep(frame_time)
continue continue
@@ -585,6 +618,8 @@ class ProcessorManager:
# Update metrics # Update metrics
state.metrics.frames_processed += 1 state.metrics.frames_processed += 1
if state.metrics.frames_processed <= 3 or state.metrics.frames_processed % 100 == 0:
logger.info(f"Frame {state.metrics.frames_processed} sent for {device_id} ({len(led_colors)} LEDs, bri={brightness_value})")
state.metrics.last_update = datetime.utcnow() state.metrics.last_update = datetime.utcnow()
state.previous_colors = led_colors state.previous_colors = led_colors

View File

@@ -107,14 +107,18 @@ class WLEDClient:
self._ddp_client.set_buses(info.buses) self._ddp_client.set_buses(info.buses)
await self._ddp_client.connect() await self._ddp_client.connect()
# Turn on the device and disable Audio Reactive mode for DDP # Configure device for DDP streaming:
# - Turn on, set lor=0 (live data overrides effects),
# and disable Audio Reactive.
# - Do NOT set live — it's read-only and causes issues on WLED 0.15.x.
# DDP packets automatically enter realtime mode.
try: try:
await self._request("POST", "/json/state", json_data={ await self._request("POST", "/json/state", json_data={
"on": True, "on": True,
"live": True, "lor": 0,
"AudioReactive": {"on": False} "AudioReactive": {"on": False}
}) })
logger.debug("Turned on device and enabled live mode for DDP streaming") logger.info("Configured device for DDP streaming")
except Exception as e: except Exception as e:
logger.warning(f"Could not configure device for DDP: {e}") logger.warning(f"Could not configure device for DDP: {e}")