Optimize processing pipeline and fix multi-target crash
Performance optimizations across 5 phases: - Saturation filter: float32 → int32 integer math (~2-3x faster) - Frame interpolation: pre-allocated uint16 scratch buffers - Color correction: single-pass cv2.LUT instead of 3 channel lookups - DDP: numpy vectorized color reorder + pre-allocated RGBW buffer - Calibration boundaries: vectorized with np.arange + np.maximum - wled_client: vectorized pixel validation and HTTP pixel list - _fit_to_device: cached linspace arrays (now per-instance) - Diagnostic lists: bounded deque(maxlen=...) instead of unbounded list - Health checks: adaptive intervals (10s streaming, 60s idle) - Profile engine: poll interval 3s → 1s Bug fixes: - Fix deque slicing crash killing targets when multiple run in parallel (deque doesn't support [-1:] or [:5] slice syntax unlike list) - Fix numpy array boolean ambiguity in send_pixels() validation - Persist fatal processing loop errors to metrics for API visibility - Move _fit_to_device cache from class-level to instance-level to prevent cross-target cache thrashing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -828,18 +828,31 @@ class ProcessorManager:
|
||||
for p in self._processors.values()
|
||||
)
|
||||
|
||||
def _is_device_streaming(self, device_id: str) -> bool:
|
||||
"""Check if any running processor targets this device."""
|
||||
for proc in self._processors.values():
|
||||
if getattr(proc, 'device_id', None) == device_id and proc.is_running:
|
||||
return True
|
||||
return False
|
||||
|
||||
async def _health_check_loop(self, device_id: str):
|
||||
"""Background loop that periodically checks a device."""
|
||||
"""Background loop that periodically checks a device.
|
||||
|
||||
Uses adaptive intervals: 10s for actively streaming devices,
|
||||
60s for idle devices, to balance responsiveness with overhead.
|
||||
"""
|
||||
state = self._devices.get(device_id)
|
||||
if not state:
|
||||
return
|
||||
|
||||
check_interval = DEFAULT_STATE_CHECK_INTERVAL
|
||||
ACTIVE_INTERVAL = 10 # streaming devices — faster detection
|
||||
IDLE_INTERVAL = 60 # idle devices — less overhead
|
||||
|
||||
try:
|
||||
while self._health_monitoring_active:
|
||||
await self._check_device_health(device_id)
|
||||
await asyncio.sleep(check_interval)
|
||||
interval = ACTIVE_INTERVAL if self._is_device_streaming(device_id) else IDLE_INTERVAL
|
||||
await asyncio.sleep(interval)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
|
||||
@@ -67,6 +67,11 @@ class WledTargetProcessor(TargetProcessor):
|
||||
|
||||
self._resolved_display_index: Optional[int] = None
|
||||
|
||||
# Fit-to-device linspace cache (per-instance to avoid cross-target thrash)
|
||||
self._fit_cache_key: tuple = (0, 0)
|
||||
self._fit_cache_src: Optional[np.ndarray] = None
|
||||
self._fit_cache_dst: Optional[np.ndarray] = None
|
||||
|
||||
# LED preview WebSocket clients
|
||||
self._preview_clients: list = []
|
||||
self._last_preview_colors: np.ndarray | None = None
|
||||
@@ -461,7 +466,7 @@ class WledTargetProcessor(TargetProcessor):
|
||||
self._preview_clients.append(ws)
|
||||
# Send last known frame immediately so late joiners see current state
|
||||
if self._last_preview_colors is not None:
|
||||
data = bytes([self._last_preview_brightness]) + self._last_preview_colors.astype(np.uint8).tobytes()
|
||||
data = bytes([self._last_preview_brightness]) + self._last_preview_colors.tobytes()
|
||||
asyncio.ensure_future(self._send_preview_to(ws, data))
|
||||
|
||||
@staticmethod
|
||||
@@ -484,7 +489,7 @@ class WledTargetProcessor(TargetProcessor):
|
||||
if not self._preview_clients:
|
||||
return
|
||||
|
||||
data = bytes([brightness]) + colors.astype(np.uint8).tobytes()
|
||||
data = bytes([brightness]) + colors.tobytes()
|
||||
|
||||
async def _send_safe(ws):
|
||||
try:
|
||||
@@ -501,16 +506,18 @@ class WledTargetProcessor(TargetProcessor):
|
||||
|
||||
# ----- Private: processing loop -----
|
||||
|
||||
@staticmethod
|
||||
def _fit_to_device(colors: np.ndarray, device_led_count: int) -> np.ndarray:
|
||||
def _fit_to_device(self, colors: np.ndarray, device_led_count: int) -> np.ndarray:
|
||||
"""Resample colors to match the target LED count."""
|
||||
n = len(colors)
|
||||
if n == device_led_count or device_led_count <= 0:
|
||||
return colors
|
||||
src_x = np.linspace(0, 1, n)
|
||||
dst_x = np.linspace(0, 1, device_led_count)
|
||||
key = (n, device_led_count)
|
||||
if self._fit_cache_key != key:
|
||||
self._fit_cache_src = np.linspace(0, 1, n)
|
||||
self._fit_cache_dst = np.linspace(0, 1, device_led_count)
|
||||
self._fit_cache_key = key
|
||||
result = np.column_stack([
|
||||
np.interp(dst_x, src_x, colors[:, ch]).astype(np.uint8)
|
||||
np.interp(self._fit_cache_dst, self._fit_cache_src, colors[:, ch]).astype(np.uint8)
|
||||
for ch in range(colors.shape[1])
|
||||
])
|
||||
return result
|
||||
@@ -568,9 +575,9 @@ class WledTargetProcessor(TargetProcessor):
|
||||
# --- Timing diagnostics ---
|
||||
_diag_interval = 5.0
|
||||
_diag_next_report = time.perf_counter() + _diag_interval
|
||||
_diag_sleep_jitters: list = []
|
||||
_diag_slow_iters: list = []
|
||||
_diag_iter_times: list = []
|
||||
_diag_sleep_jitters: collections.deque = collections.deque(maxlen=300)
|
||||
_diag_slow_iters: collections.deque = collections.deque(maxlen=50)
|
||||
_diag_iter_times: collections.deque = collections.deque(maxlen=300)
|
||||
_diag_device_info: Optional[DeviceInfo] = None
|
||||
_diag_device_info_age = 0
|
||||
|
||||
@@ -817,7 +824,7 @@ class WledTargetProcessor(TargetProcessor):
|
||||
iter_ms = (iter_end - loop_start) * 1000
|
||||
_diag_iter_times.append(iter_ms)
|
||||
if iter_ms > frame_time * 1500:
|
||||
if "sleep_jitter" not in [s[1] for s in _diag_slow_iters[-1:]]:
|
||||
if not _diag_slow_iters or _diag_slow_iters[-1][1] != "sleep_jitter":
|
||||
_diag_slow_iters.append((iter_ms, "slow_iter"))
|
||||
|
||||
# Periodic diagnostics report
|
||||
@@ -845,7 +852,7 @@ class WledTargetProcessor(TargetProcessor):
|
||||
logger.warning(
|
||||
f"[DIAG] {self._target_id} slow iterations: "
|
||||
f"{len(_diag_slow_iters)} in last {_diag_interval}s — "
|
||||
f"{_diag_slow_iters[:5]}"
|
||||
f"{list(_diag_slow_iters)[:5]}"
|
||||
)
|
||||
_diag_sleep_jitters.clear()
|
||||
_diag_slow_iters.clear()
|
||||
@@ -855,7 +862,9 @@ class WledTargetProcessor(TargetProcessor):
|
||||
logger.info(f"Processing loop cancelled for target {self._target_id}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Fatal error in processing loop for target {self._target_id}: {e}")
|
||||
logger.error(f"Fatal error in processing loop for target {self._target_id}: {e}", exc_info=True)
|
||||
self._metrics.last_error = f"FATAL: {e}"
|
||||
self._metrics.errors_count += 1
|
||||
self._is_running = False
|
||||
raise
|
||||
finally:
|
||||
|
||||
Reference in New Issue
Block a user