Add auto-restart for crashed processing loops, remove sync clock badge

- Auto-restart: ProcessorManager detects fatal task crashes via done
  callback and restarts with exponential backoff (2s-30s, max 5 attempts
  in 5 min window). Manual stop disables auto-restart. Restart state
  exposed in target state API and via WebSocket events.
- Remove "Running"/"Paused" badge label from sync clock dashboard cards
  (pause/play button already conveys state).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-10 01:53:04 +03:00
parent 30fa107ef7
commit 954e37c2ca
3 changed files with 171 additions and 11 deletions

View File

@@ -1,6 +1,7 @@
"""Processing manager — thin orchestrator for devices and target processors."""
import asyncio
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
@@ -33,6 +34,22 @@ logger = get_logger(__name__)
DEFAULT_STATE_CHECK_INTERVAL = 30 # seconds between health checks
# Auto-restart constants
_RESTART_MAX_ATTEMPTS = 5 # max restarts within the window
_RESTART_WINDOW_SEC = 300 # 5 minutes — reset counter after stable period
_RESTART_BACKOFF_BASE = 2.0 # initial backoff seconds
_RESTART_BACKOFF_MAX = 30.0 # cap backoff at 30s
@dataclass
class _RestartState:
"""Per-target auto-restart tracking."""
attempts: int = 0
first_crash_time: float = 0.0
last_crash_time: float = 0.0
restart_task: Optional[asyncio.Task] = None
enabled: bool = True # disabled on manual stop
@dataclass
class DeviceState:
@@ -108,6 +125,7 @@ class ProcessorManager:
self._overlay_manager = OverlayManager()
self._event_queues: List[asyncio.Queue] = []
self._metrics_history = MetricsHistory(self)
self._restart_states: Dict[str, _RestartState] = {}
logger.info("Processor manager initialized")
@property
@@ -381,6 +399,10 @@ class ProcessorManager:
proc = self._processors[target_id]
if proc.is_running:
raise RuntimeError(f"Cannot remove target {target_id} while processing")
# Clean up restart state
rs = self._restart_states.pop(target_id, None)
if rs and rs.restart_task and not rs.restart_task.done():
rs.restart_task.cancel()
del self._processors[target_id]
logger.info(f"Unregistered target {target_id}")
@@ -444,6 +466,29 @@ class ProcessorManager:
await proc.start()
# Enable auto-restart and attach crash callback
rs = self._restart_states.get(target_id)
if rs:
# Cancel any pending restart task (e.g. if manually restarted before backoff expired)
if rs.restart_task and not rs.restart_task.done():
rs.restart_task.cancel()
rs.enabled = True
else:
rs = _RestartState()
self._restart_states[target_id] = rs
# Reset restart counter if previous crashes were long ago (stable period)
now = time.monotonic()
if rs.first_crash_time and (now - rs.first_crash_time) > _RESTART_WINDOW_SEC:
rs.attempts = 0
rs.first_crash_time = 0.0
# Attach done callback to detect crashes
if proc._task is not None:
proc._task.add_done_callback(
lambda task, tid=target_id: self._on_task_done(tid, task)
)
async def stop_processing(self, target_id: str):
"""Stop processing for a target (any type).
@@ -451,6 +496,14 @@ class ProcessorManager:
and no other targets are still actively processing on it, the device
is restored to its idle state (static color or pre-streaming snapshot).
"""
# Disable auto-restart before stopping (manual stop = intentional)
rs = self._restart_states.get(target_id)
if rs:
rs.enabled = False
if rs.restart_task and not rs.restart_task.done():
rs.restart_task.cancel()
rs.restart_task = None
proc = self._get_processor(target_id)
await proc.stop()
@@ -462,10 +515,17 @@ class ProcessorManager:
"""Get current processing state for a target (any type).
For WLED targets, device health info is merged in.
Auto-restart state is always included.
"""
proc = self._get_processor(target_id)
state = proc.get_state()
# Include auto-restart info
rs = self._restart_states.get(target_id)
if rs and rs.attempts > 0:
state["auto_restart_attempts"] = rs.attempts
state["auto_restart_exhausted"] = rs.attempts > _RESTART_MAX_ATTEMPTS
# Merge device health for device-aware targets
if proc.device_id is not None and proc.device_id in self._devices:
h = self._devices[proc.device_id].health
@@ -761,6 +821,109 @@ class ProcessorManager:
else:
logger.info(f"Auto-restore: {ds.device_type} device {device_id} dark (closed by processor)")
# ===== AUTO-RESTART =====
def _on_task_done(self, target_id: str, task: asyncio.Task) -> None:
"""Task done callback — detects crashes and schedules auto-restart."""
# Ignore graceful cancellation (manual stop)
if task.cancelled():
return
exc = task.exception()
if exc is None:
return # Clean exit (shouldn't happen, but harmless)
rs = self._restart_states.get(target_id)
if not rs or not rs.enabled:
return # Auto-restart disabled (manual stop was called)
now = time.monotonic()
# Reset counter if previous crash window expired
if rs.first_crash_time and (now - rs.first_crash_time) > _RESTART_WINDOW_SEC:
rs.attempts = 0
rs.first_crash_time = 0.0
rs.attempts += 1
rs.last_crash_time = now
if not rs.first_crash_time:
rs.first_crash_time = now
if rs.attempts > _RESTART_MAX_ATTEMPTS:
logger.error(
f"[AUTO-RESTART] Target {target_id} crashed {rs.attempts} times "
f"in {now - rs.first_crash_time:.0f}s — giving up"
)
self._fire_event({
"type": "state_change",
"target_id": target_id,
"processing": False,
"crashed": True,
"auto_restart_exhausted": True,
})
return
backoff = min(
_RESTART_BACKOFF_BASE * (2 ** (rs.attempts - 1)),
_RESTART_BACKOFF_MAX,
)
logger.warning(
f"[AUTO-RESTART] Target {target_id} crashed (attempt {rs.attempts}/"
f"{_RESTART_MAX_ATTEMPTS}), restarting in {backoff:.1f}s"
)
self._fire_event({
"type": "state_change",
"target_id": target_id,
"processing": False,
"crashed": True,
"auto_restart_in": backoff,
"auto_restart_attempt": rs.attempts,
})
# Schedule the restart (runs in the event loop)
try:
loop = asyncio.get_running_loop()
except RuntimeError:
logger.error(f"[AUTO-RESTART] No running event loop for {target_id}")
return
rs.restart_task = loop.create_task(self._auto_restart(target_id, backoff))
async def _auto_restart(self, target_id: str, delay: float) -> None:
"""Wait for backoff delay, then restart the target processor."""
try:
await asyncio.sleep(delay)
except asyncio.CancelledError:
logger.info(f"[AUTO-RESTART] Restart cancelled for {target_id}")
return
rs = self._restart_states.get(target_id)
if not rs or not rs.enabled:
logger.info(f"[AUTO-RESTART] Restart aborted for {target_id} (disabled)")
return
proc = self._processors.get(target_id)
if proc is None:
logger.warning(f"[AUTO-RESTART] Target {target_id} no longer registered")
return
if proc.is_running:
logger.info(f"[AUTO-RESTART] Target {target_id} already running, skipping")
return
logger.info(f"[AUTO-RESTART] Restarting target {target_id} (attempt {rs.attempts})")
try:
await self.start_processing(target_id)
except Exception as e:
logger.error(f"[AUTO-RESTART] Failed to restart {target_id}: {e}")
self._fire_event({
"type": "state_change",
"target_id": target_id,
"processing": False,
"crashed": True,
"auto_restart_error": str(e),
})
# ===== LIFECYCLE =====
async def stop_all(self):
@@ -768,6 +931,12 @@ class ProcessorManager:
await self._metrics_history.stop()
await self.stop_health_monitoring()
# Cancel all pending auto-restart tasks
for rs in self._restart_states.values():
rs.enabled = False
if rs.restart_task and not rs.restart_task.done():
rs.restart_task.cancel()
# Stop all processors
for target_id, proc in list(self._processors.items()):
if proc.is_running: