Add sync clock entity for synchronized animation timing

Introduces Synchronization Clocks — shared, controllable time bases
that CSS sources can optionally reference for synchronized animation.

Backend:
- New SyncClock dataclass, JSON store, Pydantic schemas, REST API
- Runtime clock with thread-safe pause/resume/reset and speed control
- Ref-counted runtime pool with eager creation for API control
- clock_id field on all ColorStripSource types
- Stream integration: clock time/speed replaces source-local values
- Paused clock skips rendering (saves CPU + stops frame pushes)
- Included in backup/restore via STORE_MAP

Frontend:
- Sync Clocks tab in Streams section with cards and controls
- Clock dropdown in CSS editor (hidden speed slider when clock set)
- Clock crosslink badge on CSS source cards (replaces speed badge)
- Targets tab uses DataCache for picture/audio sources and sync clocks

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-01 21:46:55 +03:00
parent 52ee4bdeb6
commit aa1e4a6afc
32 changed files with 1255 additions and 58 deletions

View File

@@ -567,6 +567,7 @@ class StaticColorStripStream(ColorStripStream):
self._running = False
self._thread: Optional[threading.Thread] = None
self._fps = 30
self._clock = None # optional SyncClockRuntime
self._update_from_source(source)
def _update_from_source(self, source) -> None:
@@ -651,6 +652,10 @@ class StaticColorStripStream(ColorStripStream):
self._rebuild_colors()
logger.info("StaticColorStripStream params updated in-place")
def set_clock(self, clock) -> None:
"""Set or clear the sync clock runtime. Thread-safe (read atomically by loop)."""
self._clock = clock
def _animate_loop(self) -> None:
"""Background thread: compute animated colors at target fps when animation is active.
@@ -666,14 +671,22 @@ class StaticColorStripStream(ColorStripStream):
try:
with high_resolution_timer():
while self._running:
loop_start = time.perf_counter()
wall_start = time.perf_counter()
frame_time = 1.0 / self._fps
try:
anim = self._animation
if anim and anim.get("enabled"):
speed = float(anim.get("speed", 1.0))
clock = self._clock
if clock:
if not clock.is_running:
time.sleep(0.1)
continue
speed = clock.speed
t = clock.get_time()
else:
speed = float(anim.get("speed", 1.0))
t = wall_start
atype = anim.get("type", "breathing")
t = loop_start
n = self._led_count
if n != _pool_n:
@@ -748,7 +761,7 @@ class StaticColorStripStream(ColorStripStream):
except Exception as e:
logger.error(f"StaticColorStripStream animation error: {e}")
elapsed = time.perf_counter() - loop_start
elapsed = time.perf_counter() - wall_start
sleep_target = frame_time if anim and anim.get("enabled") else 0.25
time.sleep(max(sleep_target - elapsed, 0.001))
except Exception as e:
@@ -773,6 +786,7 @@ class ColorCycleColorStripStream(ColorStripStream):
self._running = False
self._thread: Optional[threading.Thread] = None
self._fps = 30
self._clock = None # optional SyncClockRuntime
self._update_from_source(source)
def _update_from_source(self, source) -> None:
@@ -850,6 +864,10 @@ class ColorCycleColorStripStream(ColorStripStream):
self._rebuild_colors()
logger.info("ColorCycleColorStripStream params updated in-place")
def set_clock(self, clock) -> None:
"""Set or clear the sync clock runtime. Thread-safe (read atomically by loop)."""
self._clock = clock
def _animate_loop(self) -> None:
"""Background thread: interpolate between colors at target fps.
@@ -862,11 +880,20 @@ class ColorCycleColorStripStream(ColorStripStream):
try:
with high_resolution_timer():
while self._running:
loop_start = time.perf_counter()
wall_start = time.perf_counter()
frame_time = 1.0 / self._fps
try:
color_list = self._color_list
speed = self._cycle_speed
clock = self._clock
if clock:
if not clock.is_running:
time.sleep(0.1)
continue
speed = clock.speed
t = clock.get_time()
else:
speed = self._cycle_speed
t = wall_start
n = self._led_count
num = len(color_list)
if num >= 2:
@@ -879,7 +906,7 @@ class ColorCycleColorStripStream(ColorStripStream):
_use_a = not _use_a
# 0.05 factor → one full cycle every 20s at speed=1.0
cycle_pos = (speed * loop_start * 0.05) % 1.0
cycle_pos = (speed * t * 0.05) % 1.0
seg = cycle_pos * num
idx = int(seg) % num
t_i = seg - int(seg)
@@ -894,7 +921,7 @@ class ColorCycleColorStripStream(ColorStripStream):
self._colors = buf
except Exception as e:
logger.error(f"ColorCycleColorStripStream animation error: {e}")
elapsed = time.perf_counter() - loop_start
elapsed = time.perf_counter() - wall_start
time.sleep(max(frame_time - elapsed, 0.001))
except Exception as e:
logger.error(f"Fatal ColorCycleColorStripStream loop error: {e}", exc_info=True)
@@ -919,6 +946,7 @@ class GradientColorStripStream(ColorStripStream):
self._running = False
self._thread: Optional[threading.Thread] = None
self._fps = 30
self._clock = None # optional SyncClockRuntime
self._update_from_source(source)
def _update_from_source(self, source) -> None:
@@ -1002,6 +1030,10 @@ class GradientColorStripStream(ColorStripStream):
self._rebuild_colors()
logger.info("GradientColorStripStream params updated in-place")
def set_clock(self, clock) -> None:
"""Set or clear the sync clock runtime. Thread-safe (read atomically by loop)."""
self._clock = clock
def _animate_loop(self) -> None:
"""Background thread: apply animation effects at target fps when animation is active.
@@ -1022,14 +1054,22 @@ class GradientColorStripStream(ColorStripStream):
try:
with high_resolution_timer():
while self._running:
loop_start = time.perf_counter()
wall_start = time.perf_counter()
frame_time = 1.0 / self._fps
try:
anim = self._animation
if anim and anim.get("enabled"):
speed = float(anim.get("speed", 1.0))
clock = self._clock
if clock:
if not clock.is_running:
time.sleep(0.1)
continue
speed = clock.speed
t = clock.get_time()
else:
speed = float(anim.get("speed", 1.0))
t = wall_start
atype = anim.get("type", "breathing")
t = loop_start
n = self._led_count
stops = self._stops
colors = None
@@ -1147,7 +1187,7 @@ class GradientColorStripStream(ColorStripStream):
except Exception as e:
logger.error(f"GradientColorStripStream animation error: {e}")
elapsed = time.perf_counter() - loop_start
elapsed = time.perf_counter() - wall_start
sleep_target = frame_time if anim and anim.get("enabled") else 0.25
time.sleep(max(sleep_target - elapsed, 0.001))
except Exception as e:

View File

@@ -58,21 +58,46 @@ class ColorStripStreamManager:
keyed by ``{css_id}:{consumer_id}``.
"""
def __init__(self, color_strip_store, live_stream_manager, audio_capture_manager=None, audio_source_store=None, audio_template_store=None):
def __init__(self, color_strip_store, live_stream_manager, audio_capture_manager=None, audio_source_store=None, audio_template_store=None, sync_clock_manager=None):
"""
Args:
color_strip_store: ColorStripStore for resolving source configs
live_stream_manager: LiveStreamManager for acquiring picture streams
audio_capture_manager: AudioCaptureManager for audio-reactive sources
audio_source_store: AudioSourceStore for resolving audio source chains
sync_clock_manager: SyncClockManager for acquiring clock runtimes
"""
self._color_strip_store = color_strip_store
self._live_stream_manager = live_stream_manager
self._audio_capture_manager = audio_capture_manager
self._audio_source_store = audio_source_store
self._audio_template_store = audio_template_store
self._sync_clock_manager = sync_clock_manager
self._streams: Dict[str, _ColorStripEntry] = {}
def _inject_clock(self, css_stream, source) -> None:
"""Inject a SyncClockRuntime into the stream if source has clock_id."""
clock_id = getattr(source, "clock_id", None)
if clock_id and self._sync_clock_manager and hasattr(css_stream, "set_clock"):
try:
clock_rt = self._sync_clock_manager.acquire(clock_id)
css_stream.set_clock(clock_rt)
logger.debug(f"Injected clock {clock_id} into stream for {source.id}")
except Exception as e:
logger.warning(f"Could not inject clock {clock_id}: {e}")
def _release_clock(self, source_id: str, stream) -> None:
"""Release the clock runtime acquired for a stream."""
if not self._sync_clock_manager:
return
try:
source = self._color_strip_store.get_source(source_id)
clock_id = getattr(source, "clock_id", None)
if clock_id:
self._sync_clock_manager.release(clock_id)
except Exception:
pass # source may have been deleted already
def _resolve_key(self, css_id: str, consumer_id: str) -> str:
"""Resolve internal registry key for a (css_id, consumer_id) pair.
@@ -123,6 +148,8 @@ class ColorStripStreamManager:
f"Unsupported color strip source type '{source.source_type}' for {css_id}"
)
css_stream = stream_cls(source)
# Inject sync clock runtime if source references a clock
self._inject_clock(css_stream, source)
css_stream.start()
key = f"{css_id}:{consumer_id}" if consumer_id else css_id
self._streams[key] = _ColorStripEntry(
@@ -196,6 +223,10 @@ class ColorStripStreamManager:
except Exception as e:
logger.error(f"Error stopping color strip stream {key}: {e}")
# Release clock runtime if acquired
source_id = key.split(":")[0] if ":" in key else key
self._release_clock(source_id, entry.stream)
picture_source_id = entry.picture_source_id
del self._streams[key]
logger.info(f"Removed color strip stream {key}")
@@ -227,6 +258,25 @@ class ColorStripStreamManager:
for key in matching_keys:
entry = self._streams[key]
entry.stream.update_source(new_source)
# Hot-swap clock if clock_id changed
if hasattr(entry.stream, "set_clock") and self._sync_clock_manager:
new_clock_id = getattr(new_source, "clock_id", None)
old_clock = getattr(entry.stream, "_clock", None)
if new_clock_id:
try:
clock_rt = self._sync_clock_manager.acquire(new_clock_id)
entry.stream.set_clock(clock_rt)
# Release old clock if different
if old_clock:
# Find the old clock_id (best-effort)
source_id = key.split(":")[0] if ":" in key else key
self._release_clock(source_id, entry.stream)
except Exception as e:
logger.warning(f"Could not hot-swap clock {new_clock_id}: {e}")
elif old_clock:
entry.stream.set_clock(None)
source_id = key.split(":")[0] if ":" in key else key
self._release_clock(source_id, entry.stream)
# Track picture_source_id change for future reference counting
from wled_controller.storage.color_strip_source import PictureColorStripSource

View File

@@ -182,6 +182,8 @@ class EffectColorStripStream(ColorStripStream):
self._running = False
self._thread: Optional[threading.Thread] = None
self._fps = 30
self._clock = None # optional SyncClockRuntime
self._effective_speed = 1.0 # resolved speed (from clock or source)
self._noise = _ValueNoise1D(seed=42)
# Fire state — allocated lazily in render loop
self._heat: Optional[np.ndarray] = None
@@ -268,6 +270,10 @@ class EffectColorStripStream(ColorStripStream):
self._led_count = prev_led_count
logger.info("EffectColorStripStream params updated in-place")
def set_clock(self, clock) -> None:
"""Set or clear the sync clock runtime. Thread-safe (read atomically by loop)."""
self._clock = clock
# ── Main animation loop ──────────────────────────────────────────
def _animate_loop(self) -> None:
@@ -287,9 +293,22 @@ class EffectColorStripStream(ColorStripStream):
try:
with high_resolution_timer():
while self._running:
loop_start = time.perf_counter()
wall_start = time.perf_counter()
frame_time = 1.0 / self._fps
try:
# Resolve animation time and speed from clock or local
clock = self._clock
if clock:
if not clock.is_running:
# Clock paused — output frozen, sleep and skip render
time.sleep(0.1)
continue
anim_time = clock.get_time()
self._effective_speed = clock.speed
else:
anim_time = wall_start
self._effective_speed = self._speed
n = self._led_count
if n != _pool_n:
_pool_n = n
@@ -310,14 +329,14 @@ class EffectColorStripStream(ColorStripStream):
_use_a = not _use_a
render_fn = renderers.get(self._effect_type, self._render_fire)
render_fn(buf, n, loop_start)
render_fn(buf, n, anim_time)
with self._colors_lock:
self._colors = buf
except Exception as e:
logger.error(f"EffectColorStripStream render error: {e}")
elapsed = time.perf_counter() - loop_start
elapsed = time.perf_counter() - wall_start
time.sleep(max(frame_time - elapsed, 0.001))
except Exception as e:
logger.error(f"Fatal EffectColorStripStream loop error: {e}", exc_info=True)
@@ -332,7 +351,7 @@ class EffectColorStripStream(ColorStripStream):
A 1-D heat array cools, diffuses upward, and receives random sparks
at the bottom. Heat values are mapped to the palette LUT.
"""
speed = self._speed
speed = self._effective_speed
intensity = self._intensity
lut = self._palette_lut
@@ -378,7 +397,7 @@ class EffectColorStripStream(ColorStripStream):
def _render_meteor(self, buf: np.ndarray, n: int, t: float) -> None:
"""Bright meteor head with exponential-decay trail."""
speed = self._speed
speed = self._effective_speed
intensity = self._intensity
color = self._color
mirror = self._mirror
@@ -443,7 +462,7 @@ class EffectColorStripStream(ColorStripStream):
def _render_plasma(self, buf: np.ndarray, n: int, t: float) -> None:
"""Overlapping sine waves creating colorful plasma patterns."""
speed = self._speed
speed = self._effective_speed
scale = self._scale
lut = self._palette_lut
@@ -470,7 +489,7 @@ class EffectColorStripStream(ColorStripStream):
def _render_noise(self, buf: np.ndarray, n: int, t: float) -> None:
"""Smooth scrolling fractal noise mapped to a color palette."""
speed = self._speed
speed = self._effective_speed
scale = self._scale
lut = self._palette_lut
@@ -488,7 +507,7 @@ class EffectColorStripStream(ColorStripStream):
def _render_aurora(self, buf: np.ndarray, n: int, t: float) -> None:
"""Layered noise bands simulating aurora borealis."""
speed = self._speed
speed = self._effective_speed
scale = self._scale
intensity = self._intensity
lut = self._palette_lut

View File

@@ -69,7 +69,7 @@ class ProcessorManager:
Targets are registered for processing via polymorphic TargetProcessor subclasses.
"""
def __init__(self, picture_source_store=None, capture_template_store=None, pp_template_store=None, pattern_template_store=None, device_store=None, color_strip_store=None, audio_source_store=None, value_source_store=None, audio_template_store=None):
def __init__(self, picture_source_store=None, capture_template_store=None, pp_template_store=None, pattern_template_store=None, device_store=None, color_strip_store=None, audio_source_store=None, value_source_store=None, audio_template_store=None, sync_clock_manager=None):
"""Initialize processor manager."""
self._devices: Dict[str, DeviceState] = {}
self._processors: Dict[str, TargetProcessor] = {}
@@ -89,12 +89,14 @@ class ProcessorManager:
picture_source_store, capture_template_store, pp_template_store
)
self._audio_capture_manager = AudioCaptureManager()
self._sync_clock_manager = sync_clock_manager
self._color_strip_stream_manager = ColorStripStreamManager(
color_strip_store=color_strip_store,
live_stream_manager=self._live_stream_manager,
audio_capture_manager=self._audio_capture_manager,
audio_source_store=audio_source_store,
audio_template_store=audio_template_store,
sync_clock_manager=sync_clock_manager,
)
self._value_stream_manager = ValueStreamManager(
value_source_store=value_source_store,

View File

@@ -0,0 +1,99 @@
"""Ref-counted pool of SyncClockRuntime instances.
Runtimes are created lazily when a stream first acquires a clock and
destroyed when the last consumer releases it.
"""
from typing import Dict, Optional
from wled_controller.core.processing.sync_clock_runtime import SyncClockRuntime
from wled_controller.storage.sync_clock_store import SyncClockStore
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class SyncClockManager:
"""Manages SyncClockRuntime instances with reference counting."""
def __init__(self, store: SyncClockStore) -> None:
self._store = store
self._runtimes: Dict[str, SyncClockRuntime] = {}
self._ref_counts: Dict[str, int] = {}
# ── Acquire / release (used by stream manager) ────────────────
def acquire(self, clock_id: str) -> SyncClockRuntime:
"""Get or create a runtime for *clock_id* (ref-counted)."""
if clock_id in self._runtimes:
self._ref_counts[clock_id] += 1
logger.debug(f"SyncClock {clock_id} ref++ → {self._ref_counts[clock_id]}")
return self._runtimes[clock_id]
clock_cfg = self._store.get_clock(clock_id) # raises ValueError if missing
rt = SyncClockRuntime(speed=clock_cfg.speed)
self._runtimes[clock_id] = rt
self._ref_counts[clock_id] = 1
logger.info(f"SyncClock runtime created: {clock_id} (speed={clock_cfg.speed})")
return rt
def release(self, clock_id: str) -> None:
"""Decrement ref count; destroy runtime when it reaches zero."""
if clock_id not in self._ref_counts:
return
self._ref_counts[clock_id] -= 1
logger.debug(f"SyncClock {clock_id} ref-- → {self._ref_counts[clock_id]}")
if self._ref_counts[clock_id] <= 0:
del self._runtimes[clock_id]
del self._ref_counts[clock_id]
logger.info(f"SyncClock runtime destroyed: {clock_id}")
def release_all_for(self, clock_id: str) -> None:
"""Force-release all references to *clock_id* (used on delete)."""
self._runtimes.pop(clock_id, None)
self._ref_counts.pop(clock_id, None)
def release_all(self) -> None:
"""Destroy all runtimes (shutdown)."""
self._runtimes.clear()
self._ref_counts.clear()
# ── Lookup (no ref counting) ──────────────────────────────────
def get_runtime(self, clock_id: str) -> Optional[SyncClockRuntime]:
"""Return an existing runtime or *None* (does not create one)."""
return self._runtimes.get(clock_id)
def _ensure_runtime(self, clock_id: str) -> SyncClockRuntime:
"""Return existing runtime or create a zero-ref one for API control."""
rt = self._runtimes.get(clock_id)
if rt:
return rt
clock_cfg = self._store.get_clock(clock_id)
rt = SyncClockRuntime(speed=clock_cfg.speed)
self._runtimes[clock_id] = rt
self._ref_counts[clock_id] = 0
logger.info(f"SyncClock runtime created (API): {clock_id} (speed={clock_cfg.speed})")
return rt
# ── Delegated control ─────────────────────────────────────────
def update_speed(self, clock_id: str, speed: float) -> None:
rt = self._ensure_runtime(clock_id)
rt.speed = speed
logger.info(f"SyncClock {clock_id} speed → {speed}")
def pause(self, clock_id: str) -> None:
rt = self._ensure_runtime(clock_id)
rt.pause()
logger.info(f"SyncClock {clock_id} paused")
def resume(self, clock_id: str) -> None:
rt = self._ensure_runtime(clock_id)
rt.resume()
logger.info(f"SyncClock {clock_id} resumed")
def reset(self, clock_id: str) -> None:
rt = self._ensure_runtime(clock_id)
rt.reset()
logger.info(f"SyncClock {clock_id} reset")

View File

@@ -0,0 +1,72 @@
"""Thread-safe synchronization clock runtime.
Provides a pause-aware elapsed-time counter and a shared speed multiplier
that animation streams read every frame.
"""
import threading
import time
class SyncClockRuntime:
"""In-memory clock instance used by animation streams for synchronized timing.
``get_time()`` returns pause-aware **real** elapsed seconds (NOT
speed-scaled). Streams combine this with ``speed`` themselves so that
per-frame physics effects (fire cooling, sparkle density) also scale
with clock speed uniformly.
"""
__slots__ = ("_speed", "_running", "_epoch", "_offset", "_lock")
def __init__(self, speed: float = 1.0) -> None:
self._speed: float = speed
self._running: bool = True
self._epoch: float = time.perf_counter()
self._offset: float = 0.0
self._lock = threading.Lock()
# ── Speed ──────────────────────────────────────────────────────
@property
def speed(self) -> float:
"""Current speed multiplier (lock-free read; CPython float is atomic)."""
return self._speed
@speed.setter
def speed(self, value: float) -> None:
self._speed = value
# ── Time ───────────────────────────────────────────────────────
def get_time(self) -> float:
"""Pause-aware elapsed seconds since creation/last reset.
Returns *real* (wall-clock) elapsed time, not speed-scaled.
"""
if not self._running:
return self._offset
return self._offset + (time.perf_counter() - self._epoch)
# ── Control ────────────────────────────────────────────────────
def pause(self) -> None:
with self._lock:
if self._running:
self._offset += time.perf_counter() - self._epoch
self._running = False
def resume(self) -> None:
with self._lock:
if not self._running:
self._epoch = time.perf_counter()
self._running = True
def reset(self) -> None:
with self._lock:
self._offset = 0.0
self._epoch = time.perf_counter()
@property
def is_running(self) -> bool:
return self._running