From a4991c884aeee76a7897ad45d77d5493314fd192 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Thu, 12 Feb 2026 01:27:01 +0300 Subject: [PATCH] Fix DDP streaming and capture thread safety - Fix WLED DDP config: use lor=0 (live overrides effects), remove live flag (read-only, causes issues on 0.15.x), disable PUSH flag which broke rendering on WLED 0.15.x - Use dedicated single-thread executor for capture engine calls to fix thread-local state issues with BetterCam/MSS/DXcam - Sync processor state on stream/template change even when stopped, preventing stale engine references on next start - Add diagnostic logging for frame sends and DDP packets Co-Authored-By: Claude Opus 4.6 --- server/src/wled_controller/api/routes.py | 39 ++++++++++------- server/src/wled_controller/core/ddp_client.py | 11 ++--- .../wled_controller/core/processor_manager.py | 43 +++++++++++++++++-- .../src/wled_controller/core/wled_client.py | 10 +++-- 4 files changed, 75 insertions(+), 28 deletions(-) diff --git a/server/src/wled_controller/api/routes.py b/server/src/wled_controller/api/routes.py index 467c275..05347c8 100644 --- a/server/src/wled_controller/api/routes.py +++ b/server/src/wled_controller/api/routes.py @@ -427,14 +427,29 @@ async def update_device( picture_stream_id=update_data.picture_stream_id, ) - # Hot-swap: If stream/template changed and device was processing, restart it - if (stream_changed or template_changed) and was_processing: - logger.info(f"Hot-swapping stream/template for device {device_id}") - try: - # Stop current processing - await manager.stop_processing(device_id) - - # Update processor with new settings + # Sync processor state when stream/template changed + if stream_changed or template_changed: + if was_processing: + # Hot-swap: restart with new settings + logger.info(f"Hot-swapping stream/template for device {device_id}") + try: + await manager.stop_processing(device_id) + manager.remove_device(device_id) + manager.add_device( + device_id=device.id, + device_url=device.url, + led_count=device.led_count, + settings=device.settings, + calibration=device.calibration, + capture_template_id=device.capture_template_id, + picture_stream_id=device.picture_stream_id, + ) + await manager.start_processing(device_id) + logger.info(f"Successfully hot-swapped stream/template for device {device_id}") + except Exception as e: + logger.error(f"Error during template hot-swap: {e}") + else: + # Not processing — update processor state so next start uses new values manager.remove_device(device_id) manager.add_device( device_id=device.id, @@ -446,14 +461,6 @@ async def update_device( picture_stream_id=device.picture_stream_id, ) - # Restart processing - await manager.start_processing(device_id) - logger.info(f"Successfully hot-swapped stream/template for device {device_id}") - - except Exception as e: - logger.error(f"Error during template hot-swap: {e}") - # Device is stopped but updated - user can manually restart - return DeviceResponse( id=device.id, name=device.name, diff --git a/server/src/wled_controller/core/ddp_client.py b/server/src/wled_controller/core/ddp_client.py index 084bdf7..7b0e383 100644 --- a/server/src/wled_controller/core/ddp_client.py +++ b/server/src/wled_controller/core/ddp_client.py @@ -207,9 +207,9 @@ class DDPClient: # Split into multiple packets if needed num_packets = (total_bytes + bytes_per_packet - 1) // bytes_per_packet - logger.debug( - f"Sending {len(pixels)} pixels ({total_bytes} bytes) " - f"in {num_packets} DDP packet(s)" + logger.info( + f"DDP: Sending {len(pixels)} pixels ({total_bytes} bytes) " + f"in {num_packets} packet(s) to {self.host}:{self.port}" ) for i in range(num_packets): @@ -221,10 +221,11 @@ class DDPClient: # Increment sequence number self._sequence = (self._sequence + 1) % 256 - # Build and send packet (set PUSH on last packet) + # Build and send packet (no PUSH flag — WLED 0.15.x + # handles DDP without it; adding PUSH broke rendering) packet = self._build_ddp_packet( chunk, offset=start, - sequence=self._sequence, push=is_last, + sequence=self._sequence, push=False, ) self._transport.sendto(packet) diff --git a/server/src/wled_controller/core/processor_manager.py b/server/src/wled_controller/core/processor_manager.py index 28c8345..c962540 100644 --- a/server/src/wled_controller/core/processor_manager.py +++ b/server/src/wled_controller/core/processor_manager.py @@ -1,6 +1,7 @@ """Processing manager for coordinating screen capture and WLED updates.""" import asyncio +import concurrent.futures import time from dataclasses import dataclass, field from datetime import datetime @@ -115,6 +116,10 @@ class ProcessorState: static_image: Optional[np.ndarray] = None image_pool: Optional[ImagePool] = None filter_instances: Optional[List[PostprocessingFilter]] = None + # Dedicated single-thread executor for capture engine calls. + # Capture libraries (BetterCam, MSS, DXcam) use thread-local state, + # so all calls must run on the same thread. + capture_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None class ProcessorManager: @@ -416,11 +421,23 @@ class ProcessorManager: engine_type = state.resolved_engine_type or "mss" engine_config = state.resolved_engine_config or {} engine = EngineRegistry.create_engine(engine_type, engine_config) - engine.initialize() + + # Create a dedicated single-thread executor for capture calls. + # Capture libraries use thread-local state (DXGI contexts, GDI DCs) + # so initialize + capture + cleanup must all run on the same thread. + state.capture_executor = concurrent.futures.ThreadPoolExecutor( + max_workers=1, thread_name_prefix=f"capture-{device_id}" + ) + loop = asyncio.get_event_loop() + await loop.run_in_executor(state.capture_executor, engine.initialize) + state.capture_engine = engine logger.info(f"Initialized capture engine for device {device_id}: {engine_type}") except Exception as e: logger.error(f"Failed to initialize capture engine for device {device_id}: {e}") + if state.capture_executor: + state.capture_executor.shutdown(wait=False) + state.capture_executor = None if state.wled_client: await state.wled_client.disconnect() raise RuntimeError(f"Failed to initialize capture engine: {e}") @@ -476,9 +493,20 @@ class ProcessorManager: await state.wled_client.close() state.wled_client = None - # Cleanup capture engine + # Cleanup capture engine on the same dedicated thread it was created on if state.capture_engine: - state.capture_engine.cleanup() + if state.capture_executor: + loop = asyncio.get_event_loop() + try: + await loop.run_in_executor( + state.capture_executor, state.capture_engine.cleanup + ) + except Exception as e: + logger.warning(f"Error cleaning up capture engine: {e}") + state.capture_executor.shutdown(wait=False) + state.capture_executor = None + else: + state.capture_engine.cleanup() state.capture_engine = None # Release cached static image @@ -524,6 +552,8 @@ class ProcessorManager: frame_time = 1.0 / target_fps fps_samples = [] + loop = asyncio.get_event_loop() + capture_executor = state.capture_executor # dedicated single-thread executor def _apply_filters(image): """Apply all postprocessing filters to the captured image.""" @@ -550,13 +580,16 @@ class ProcessorManager: image=state.static_image.copy(), width=w, height=h, display_index=-1 ) else: - capture = await asyncio.to_thread( + capture = await loop.run_in_executor( + capture_executor, state.capture_engine.capture_display, display_index ) # Skip processing if no new frame (screen unchanged) if capture is None: + if state.metrics.frames_processed == 0: + logger.info(f"Capture returned None for {device_id} (no new frame yet)") await asyncio.sleep(frame_time) continue @@ -585,6 +618,8 @@ class ProcessorManager: # Update metrics state.metrics.frames_processed += 1 + if state.metrics.frames_processed <= 3 or state.metrics.frames_processed % 100 == 0: + logger.info(f"Frame {state.metrics.frames_processed} sent for {device_id} ({len(led_colors)} LEDs, bri={brightness_value})") state.metrics.last_update = datetime.utcnow() state.previous_colors = led_colors diff --git a/server/src/wled_controller/core/wled_client.py b/server/src/wled_controller/core/wled_client.py index e1dedfc..8382182 100644 --- a/server/src/wled_controller/core/wled_client.py +++ b/server/src/wled_controller/core/wled_client.py @@ -107,14 +107,18 @@ class WLEDClient: self._ddp_client.set_buses(info.buses) await self._ddp_client.connect() - # Turn on the device and disable Audio Reactive mode for DDP + # Configure device for DDP streaming: + # - Turn on, set lor=0 (live data overrides effects), + # and disable Audio Reactive. + # - Do NOT set live — it's read-only and causes issues on WLED 0.15.x. + # DDP packets automatically enter realtime mode. try: await self._request("POST", "/json/state", json_data={ "on": True, - "live": True, + "lor": 0, "AudioReactive": {"on": False} }) - logger.debug("Turned on device and enabled live mode for DDP streaming") + logger.info("Configured device for DDP streaming") except Exception as e: logger.warning(f"Could not configure device for DDP: {e}")