From 73947eb6cb5569e394a6079347ccb468b904a306 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Tue, 24 Mar 2026 14:51:27 +0300 Subject: [PATCH] refactor: replace type-dispatch if/elif chains with registry patterns and handler maps Backend: add registry dicts (_CONDITION_MAP, _VALUE_SOURCE_MAP, _PICTURE_SOURCE_MAP) and per-subclass from_dict() methods to eliminate ~300 lines of if/elif in factory functions. Convert automation engine dispatch (condition eval, match_mode, match_type, deactivation_mode) to dict-based lookup. Frontend: extract CSS_CARD_RENDERERS, CSS_SECTION_MAP, CSS_TYPE_SETUP, CONDITION_PILL_RENDERERS, and PICTURE_SOURCE_CARD_RENDERERS handler maps to replace scattered type-check chains in color-strips.ts, automations.ts, and streams.ts. --- .../wled_controller/api/routes/automations.py | 43 +- .../core/automations/automation_engine.py | 145 ++++--- .../static/js/features/automations.ts | 54 ++- .../static/js/features/color-strips.ts | 249 ++++++----- .../static/js/features/streams.ts | 91 ++-- .../src/wled_controller/storage/automation.py | 37 +- .../storage/color_strip_source.py | 405 +++++++++--------- .../wled_controller/storage/picture_source.py | 138 +++--- .../wled_controller/storage/value_source.py | 186 ++++---- 9 files changed, 714 insertions(+), 634 deletions(-) diff --git a/server/src/wled_controller/api/routes/automations.py b/server/src/wled_controller/api/routes/automations.py index c9bfcac..5a36d68 100644 --- a/server/src/wled_controller/api/routes/automations.py +++ b/server/src/wled_controller/api/routes/automations.py @@ -42,40 +42,37 @@ router = APIRouter() # ===== Helpers ===== def _condition_from_schema(s: ConditionSchema) -> Condition: - if s.condition_type == "always": - return AlwaysCondition() - if s.condition_type == "application": - return ApplicationCondition( + _SCHEMA_TO_CONDITION = { + "always": lambda: AlwaysCondition(), + "application": lambda: ApplicationCondition( apps=s.apps or [], match_type=s.match_type or "running", - ) - if s.condition_type == "time_of_day": - return TimeOfDayCondition( + ), + "time_of_day": lambda: TimeOfDayCondition( start_time=s.start_time or "00:00", end_time=s.end_time or "23:59", - ) - if s.condition_type == "system_idle": - return SystemIdleCondition( + ), + "system_idle": lambda: SystemIdleCondition( idle_minutes=s.idle_minutes if s.idle_minutes is not None else 5, when_idle=s.when_idle if s.when_idle is not None else True, - ) - if s.condition_type == "display_state": - return DisplayStateCondition( + ), + "display_state": lambda: DisplayStateCondition( state=s.state or "on", - ) - if s.condition_type == "mqtt": - return MQTTCondition( + ), + "mqtt": lambda: MQTTCondition( topic=s.topic or "", payload=s.payload or "", match_mode=s.match_mode or "exact", - ) - if s.condition_type == "webhook": - return WebhookCondition( + ), + "webhook": lambda: WebhookCondition( token=s.token or secrets.token_hex(16), - ) - if s.condition_type == "startup": - return StartupCondition() - raise ValueError(f"Unknown condition type: {s.condition_type}") + ), + "startup": lambda: StartupCondition(), + } + factory = _SCHEMA_TO_CONDITION.get(s.condition_type) + if factory is None: + raise ValueError(f"Unknown condition type: {s.condition_type}") + return factory() def _condition_to_schema(c: Condition) -> ConditionSchema: diff --git a/server/src/wled_controller/core/automations/automation_engine.py b/server/src/wled_controller/core/automations/automation_engine.py index 009bcc2..049d1e4 100644 --- a/server/src/wled_controller/core/automations/automation_engine.py +++ b/server/src/wled_controller/core/automations/automation_engine.py @@ -205,21 +205,20 @@ class AutomationEngine: fullscreen_procs: Set[str], idle_seconds: Optional[float], display_state: Optional[str], ) -> bool: - if isinstance(condition, (AlwaysCondition, StartupCondition)): - return True - if isinstance(condition, ApplicationCondition): - return self._evaluate_app_condition(condition, running_procs, topmost_proc, topmost_fullscreen, fullscreen_procs) - if isinstance(condition, TimeOfDayCondition): - return self._evaluate_time_of_day(condition) - if isinstance(condition, SystemIdleCondition): - return self._evaluate_idle(condition, idle_seconds) - if isinstance(condition, DisplayStateCondition): - return self._evaluate_display_state(condition, display_state) - if isinstance(condition, MQTTCondition): - return self._evaluate_mqtt(condition) - if isinstance(condition, WebhookCondition): - return self._webhook_states.get(condition.token, False) - return False + dispatch = { + AlwaysCondition: lambda c: True, + StartupCondition: lambda c: True, + ApplicationCondition: lambda c: self._evaluate_app_condition(c, running_procs, topmost_proc, topmost_fullscreen, fullscreen_procs), + TimeOfDayCondition: lambda c: self._evaluate_time_of_day(c), + SystemIdleCondition: lambda c: self._evaluate_idle(c, idle_seconds), + DisplayStateCondition: lambda c: self._evaluate_display_state(c, display_state), + MQTTCondition: lambda c: self._evaluate_mqtt(c), + WebhookCondition: lambda c: self._webhook_states.get(c.token, False), + } + handler = dispatch.get(type(condition)) + if handler is None: + return False + return handler(condition) @staticmethod def _evaluate_time_of_day(condition: TimeOfDayCondition) -> bool: @@ -253,16 +252,18 @@ class AutomationEngine: value = self._mqtt_service.get_last_value(condition.topic) if value is None: return False - if condition.match_mode == "exact": - return value == condition.payload - if condition.match_mode == "contains": - return condition.payload in value - if condition.match_mode == "regex": - try: - return bool(re.search(condition.payload, value)) - except re.error: - return False - return False + matchers = { + "exact": lambda: value == condition.payload, + "contains": lambda: condition.payload in value, + "regex": lambda: bool(re.search(condition.payload, value)), + } + matcher = matchers.get(condition.match_mode) + if matcher is None: + return False + try: + return matcher() + except re.error: + return False def _evaluate_app_condition( self, @@ -277,19 +278,21 @@ class AutomationEngine: apps_lower = [a.lower() for a in condition.apps] - if condition.match_type == "fullscreen": - return any(app in fullscreen_procs for app in apps_lower) - - if condition.match_type == "topmost_fullscreen": - if topmost_proc is None or not topmost_fullscreen: - return False - return any(app == topmost_proc for app in apps_lower) - - if condition.match_type == "topmost": - if topmost_proc is None: - return False - return any(app == topmost_proc for app in apps_lower) - + match_handlers = { + "fullscreen": lambda: any(app in fullscreen_procs for app in apps_lower), + "topmost_fullscreen": lambda: ( + topmost_proc is not None + and topmost_fullscreen + and any(app == topmost_proc for app in apps_lower) + ), + "topmost": lambda: ( + topmost_proc is not None + and any(app == topmost_proc for app in apps_lower) + ), + } + handler = match_handlers.get(condition.match_type) + if handler is not None: + return handler() # Default: "running" return any(app in running_procs for app in apps_lower) @@ -352,38 +355,10 @@ class AutomationEngine: deactivation_mode = automation.deactivation_mode if automation else "none" if deactivation_mode == "revert": - snapshot = self._pre_activation_snapshots.pop(automation_id, None) - if snapshot and self._target_store: - from wled_controller.core.scenes.scene_activator import apply_scene_state - status, errors = await apply_scene_state( - snapshot, self._target_store, self._manager, - ) - if errors: - logger.warning(f"Automation {automation_id} revert errors: {errors}") - else: - logger.info(f"Automation {automation_id} deactivated (reverted to previous state)") - else: - logger.warning(f"Automation {automation_id}: no snapshot available for revert") - + await self._deactivate_revert(automation_id) elif deactivation_mode == "fallback_scene": - fallback_id = automation.deactivation_scene_preset_id if automation else None - if fallback_id and self._scene_preset_store and self._target_store: - try: - fallback = self._scene_preset_store.get_preset(fallback_id) - from wled_controller.core.scenes.scene_activator import apply_scene_state - status, errors = await apply_scene_state( - fallback, self._target_store, self._manager, - ) - if errors: - logger.warning(f"Automation {automation_id} fallback errors: {errors}") - else: - logger.info(f"Automation {automation_id} deactivated (fallback scene '{fallback.name}' applied)") - except ValueError: - logger.warning(f"Automation {automation_id}: fallback scene {fallback_id} not found") - else: - logger.info(f"Automation {automation_id} deactivated (no fallback scene configured)") + await self._deactivate_fallback(automation_id, automation) else: - # "none" mode — just clear active state logger.info(f"Automation {automation_id} deactivated") self._last_deactivated[automation_id] = datetime.now(timezone.utc) @@ -391,6 +366,40 @@ class AutomationEngine: # Clean up any leftover snapshot self._pre_activation_snapshots.pop(automation_id, None) + async def _deactivate_revert(self, automation_id: str) -> None: + """Revert to pre-activation snapshot.""" + snapshot = self._pre_activation_snapshots.pop(automation_id, None) + if snapshot and self._target_store: + from wled_controller.core.scenes.scene_activator import apply_scene_state + status, errors = await apply_scene_state( + snapshot, self._target_store, self._manager, + ) + if errors: + logger.warning(f"Automation {automation_id} revert errors: {errors}") + else: + logger.info(f"Automation {automation_id} deactivated (reverted to previous state)") + else: + logger.warning(f"Automation {automation_id}: no snapshot available for revert") + + async def _deactivate_fallback(self, automation_id: str, automation) -> None: + """Activate fallback scene on deactivation.""" + fallback_id = automation.deactivation_scene_preset_id if automation else None + if fallback_id and self._scene_preset_store and self._target_store: + try: + fallback = self._scene_preset_store.get_preset(fallback_id) + from wled_controller.core.scenes.scene_activator import apply_scene_state + status, errors = await apply_scene_state( + fallback, self._target_store, self._manager, + ) + if errors: + logger.warning(f"Automation {automation_id} fallback errors: {errors}") + else: + logger.info(f"Automation {automation_id} deactivated (fallback scene '{fallback.name}' applied)") + except ValueError: + logger.warning(f"Automation {automation_id}: fallback scene {fallback_id} not found") + else: + logger.info(f"Automation {automation_id} deactivated (no fallback scene configured)") + def _fire_event(self, automation_id: str, action: str) -> None: try: self._manager.fire_event({ diff --git a/server/src/wled_controller/static/js/features/automations.ts b/server/src/wled_controller/static/js/features/automations.ts index daf324e..40158b3 100644 --- a/server/src/wled_controller/static/js/features/automations.ts +++ b/server/src/wled_controller/static/js/features/automations.ts @@ -206,6 +206,29 @@ function renderAutomations(automations: any, sceneMap: any) { } } +type ConditionPillRenderer = (c: any) => string; + +const CONDITION_PILL_RENDERERS: Record = { + always: (c) => `${ICON_OK} ${t('automations.condition.always')}`, + startup: (c) => `${ICON_START} ${t('automations.condition.startup')}`, + application: (c) => { + const apps = (c.apps || []).join(', '); + const matchLabel = t('automations.condition.application.match_type.' + (c.match_type || 'running')); + return `${t('automations.condition.application')}: ${apps} (${matchLabel})`; + }, + time_of_day: (c) => `${ICON_CLOCK} ${t('automations.condition.time_of_day')}: ${c.start_time || '00:00'} – ${c.end_time || '23:59'}`, + system_idle: (c) => { + const mode = c.when_idle !== false ? t('automations.condition.system_idle.when_idle') : t('automations.condition.system_idle.when_active'); + return `${ICON_TIMER} ${c.idle_minutes || 5}m (${mode})`; + }, + display_state: (c) => { + const stateLabel = t('automations.condition.display_state.' + (c.state || 'on')); + return `${ICON_MONITOR} ${t('automations.condition.display_state')}: ${stateLabel}`; + }, + mqtt: (c) => `${ICON_RADIO} ${t('automations.condition.mqtt')}: ${escapeHtml(c.topic || '')} = ${escapeHtml(c.payload || '*')}`, + webhook: (c) => `🔗 ${t('automations.condition.webhook')}`, +}; + function createAutomationCard(automation: Automation, sceneMap = new Map()) { const statusClass = !automation.enabled ? 'disabled' : automation.is_active ? 'active' : 'inactive'; const statusText = !automation.enabled ? t('automations.status.disabled') : automation.is_active ? t('automations.status.active') : t('automations.status.inactive'); @@ -215,35 +238,8 @@ function createAutomationCard(automation: Automation, sceneMap = new Map()) { condPills = `${t('automations.conditions.empty')}`; } else { const parts = automation.conditions.map(c => { - if (c.condition_type === 'always') { - return `${ICON_OK} ${t('automations.condition.always')}`; - } - if (c.condition_type === 'startup') { - return `${ICON_START} ${t('automations.condition.startup')}`; - } - if (c.condition_type === 'application') { - const apps = (c.apps || []).join(', '); - const matchLabel = t('automations.condition.application.match_type.' + (c.match_type || 'running')); - return `${t('automations.condition.application')}: ${apps} (${matchLabel})`; - } - if (c.condition_type === 'time_of_day') { - return `${ICON_CLOCK} ${t('automations.condition.time_of_day')}: ${c.start_time || '00:00'} – ${c.end_time || '23:59'}`; - } - if (c.condition_type === 'system_idle') { - const mode = c.when_idle !== false ? t('automations.condition.system_idle.when_idle') : t('automations.condition.system_idle.when_active'); - return `${ICON_TIMER} ${c.idle_minutes || 5}m (${mode})`; - } - if (c.condition_type === 'display_state') { - const stateLabel = t('automations.condition.display_state.' + (c.state || 'on')); - return `${ICON_MONITOR} ${t('automations.condition.display_state')}: ${stateLabel}`; - } - if (c.condition_type === 'mqtt') { - return `${ICON_RADIO} ${t('automations.condition.mqtt')}: ${escapeHtml(c.topic || '')} = ${escapeHtml(c.payload || '*')}`; - } - if (c.condition_type === 'webhook') { - return `🔗 ${t('automations.condition.webhook')}`; - } - return `${c.condition_type}`; + const renderer = CONDITION_PILL_RENDERERS[c.condition_type]; + return renderer ? renderer(c) : `${c.condition_type}`; }); const logicLabel = automation.condition_logic === 'and' ? t('automations.logic.and') : t('automations.logic.or'); condPills = parts.join(`${logicLabel}`); diff --git a/server/src/wled_controller/static/js/features/color-strips.ts b/server/src/wled_controller/static/js/features/color-strips.ts index b559867..33af6ff 100644 --- a/server/src/wled_controller/static/js/features/color-strips.ts +++ b/server/src/wled_controller/static/js/features/color-strips.ts @@ -158,46 +158,62 @@ function _ensureCSSTypeIconSelect() { /* ── Type-switch helper ───────────────────────────────────────── */ +const CSS_SECTION_MAP: Record = { + 'picture': 'css-editor-picture-section', + 'picture_advanced': 'css-editor-picture-section', + 'static': 'css-editor-static-section', + 'color_cycle': 'css-editor-color-cycle-section', + 'gradient': 'css-editor-gradient-section', + 'effect': 'css-editor-effect-section', + 'composite': 'css-editor-composite-section', + 'mapped': 'css-editor-mapped-section', + 'audio': 'css-editor-audio-section', + 'api_input': 'css-editor-api-input-section', + 'notification': 'css-editor-notification-section', + 'daylight': 'css-editor-daylight-section', + 'candlelight': 'css-editor-candlelight-section', + 'processed': 'css-editor-processed-section', +}; + +const CSS_ALL_SECTION_IDS = [...new Set(Object.values(CSS_SECTION_MAP))]; + +const CSS_TYPE_SETUP: Record void> = { + processed: () => _populateProcessedSelectors(), + effect: () => { _ensureEffectTypeIconSelect(); _ensureEffectPaletteIconSelect(); onEffectTypeChange(); }, + audio: () => { _ensureAudioVizIconSelect(); _ensureAudioPaletteIconSelect(); onAudioVizChange(); _loadAudioSources(); }, + gradient: () => { _ensureGradientPresetIconSelect(); _ensureGradientEasingIconSelect(); requestAnimationFrame(() => gradientRenderAll()); }, + notification: () => { ensureNotificationEffectIconSelect(); ensureNotificationFilterModeIconSelect(); }, + candlelight: () => _ensureCandleTypeIconSelect(), + composite: () => compositeRenderList(), + mapped: () => _mappedRenderList(), +}; + export function onCSSTypeChange() { const type = (document.getElementById('css-editor-type') as HTMLInputElement).value; // Sync icon-select trigger display if (_cssTypeIconSelect) _cssTypeIconSelect.setValue(type); - const isPictureType = type === 'picture' || type === 'picture_advanced'; - (document.getElementById('css-editor-picture-section') as HTMLElement).style.display = isPictureType ? '' : 'none'; + + // Hide all type-specific sections, then show the active one + CSS_ALL_SECTION_IDS.forEach(id => { + const el = document.getElementById(id); + if (el) el.style.display = 'none'; + }); + const activeSection = CSS_SECTION_MAP[type]; + if (activeSection) { + const el = document.getElementById(activeSection); + if (el) el.style.display = ''; + } + // Hide picture source dropdown for advanced (sources are per-line in calibration) const psGroup = document.getElementById('css-editor-picture-source-group') as HTMLElement | null; if (psGroup) psGroup.style.display = (type === 'picture') ? '' : 'none'; - (document.getElementById('css-editor-static-section') as HTMLElement).style.display = type === 'static' ? '' : 'none'; - (document.getElementById('css-editor-color-cycle-section') as HTMLElement).style.display = type === 'color_cycle' ? '' : 'none'; - (document.getElementById('css-editor-gradient-section') as HTMLElement).style.display = type === 'gradient' ? '' : 'none'; - (document.getElementById('css-editor-effect-section') as HTMLElement).style.display = type === 'effect' ? '' : 'none'; - (document.getElementById('css-editor-composite-section') as HTMLElement).style.display = type === 'composite' ? '' : 'none'; - (document.getElementById('css-editor-mapped-section') as HTMLElement).style.display = type === 'mapped' ? '' : 'none'; - (document.getElementById('css-editor-audio-section') as HTMLElement).style.display = type === 'audio' ? '' : 'none'; - (document.getElementById('css-editor-api-input-section') as HTMLElement).style.display = type === 'api_input' ? '' : 'none'; - (document.getElementById('css-editor-notification-section') as HTMLElement).style.display = type === 'notification' ? '' : 'none'; - (document.getElementById('css-editor-daylight-section') as HTMLElement).style.display = type === 'daylight' ? '' : 'none'; - (document.getElementById('css-editor-candlelight-section') as HTMLElement).style.display = type === 'candlelight' ? '' : 'none'; - (document.getElementById('css-editor-processed-section') as HTMLElement).style.display = type === 'processed' ? '' : 'none'; + const isPictureType = type === 'picture' || type === 'picture_advanced'; if (isPictureType) _ensureInterpolationIconSelect(); - if (type === 'processed') _populateProcessedSelectors(); - if (type === 'effect') { - _ensureEffectTypeIconSelect(); - _ensureEffectPaletteIconSelect(); - onEffectTypeChange(); - } - if (type === 'audio') { - _ensureAudioVizIconSelect(); - _ensureAudioPaletteIconSelect(); - onAudioVizChange(); - } - if (type === 'gradient') { _ensureGradientPresetIconSelect(); _ensureGradientEasingIconSelect(); } - if (type === 'notification') { - ensureNotificationEffectIconSelect(); - ensureNotificationFilterModeIconSelect(); - } - if (type === 'candlelight') _ensureCandleTypeIconSelect(); + + // Run type-specific setup + const setupFn = CSS_TYPE_SETUP[type]; + if (setupFn) setupFn(); // Animation section — shown for static/gradient only const animSection = document.getElementById('css-editor-animation-section') as HTMLElement; @@ -226,16 +242,6 @@ export function onCSSTypeChange() { (document.getElementById('css-editor-clock-group') as HTMLElement).style.display = clockTypes.includes(type) ? '' : 'none'; if (clockTypes.includes(type)) _populateClockDropdown(); - if (type === 'audio') { - _loadAudioSources(); - } else if (type === 'composite') { - compositeRenderList(); - } else if (type === 'mapped') { - _mappedRenderList(); - } else if (type === 'gradient') { - requestAnimationFrame(() => gradientRenderAll()); - } - _autoGenerateCSSName(); } @@ -912,56 +918,44 @@ function _resetAudioState() { /* ── Card ─────────────────────────────────────────────────────── */ -export function createColorStripCard(source: ColorStripSource, pictureSourceMap: Record, audioSourceMap: Record) { - const isStatic = source.source_type === 'static'; - const isGradient = source.source_type === 'gradient'; - const isColorCycle = source.source_type === 'color_cycle'; - const isEffect = source.source_type === 'effect'; - const isComposite = source.source_type === 'composite'; - const isMapped = source.source_type === 'mapped'; - const isAudio = source.source_type === 'audio'; - const isApiInput = source.source_type === 'api_input'; - const isNotification = source.source_type === 'notification'; - const isPictureAdvanced = source.source_type === 'picture_advanced'; +type CardPropsRenderer = (source: ColorStripSource, opts: { + clockBadge: string; + animBadge: string; + audioSourceMap: Record; + pictureSourceMap: Record; +}) => string; - // Clock crosslink badge (replaces speed badge when clock is assigned) - const clockObj = source.clock_id ? _cachedSyncClocks.find(c => c.id === source.clock_id) : null; - const clockBadge = clockObj - ? `${ICON_CLOCK} ${escapeHtml(clockObj.name)}` - : source.clock_id ? `${ICON_CLOCK} ${source.clock_id}` : ''; +const NON_PICTURE_TYPES = new Set([ + 'static', 'gradient', 'color_cycle', 'effect', 'composite', 'mapped', + 'audio', 'api_input', 'notification', 'daylight', 'candlelight', 'processed', +]); - const anim = (isStatic || isGradient) && source.animation && source.animation.enabled ? source.animation : null; - const animBadge = anim - ? `${ICON_SPARKLES} ${t('color_strip.animation.type.' + anim.type) || anim.type}` - : ''; - - let propsHtml; - if (isStatic) { +const CSS_CARD_RENDERERS: Record = { + static: (source, { clockBadge, animBadge }) => { const hexColor = rgbArrayToHex(source.color!); - propsHtml = ` + return ` ${hexColor.toUpperCase()} ${animBadge} ${clockBadge} `; - } else if (isColorCycle) { + }, + color_cycle: (source, { clockBadge }) => { const colors = source.colors || []; const swatches = colors.slice(0, 8).map((c: any) => `` ).join(''); - propsHtml = ` + return ` ${swatches} ${clockBadge} `; - } else if (isGradient) { + }, + gradient: (source, { clockBadge, animBadge }) => { const stops = source.stops || []; const sortedStops = [...stops].sort((a, b) => a.position - b.position); let cssGradient = ''; if (sortedStops.length >= 2) { - // Build CSS stops that mirror the interpolation algorithm: - // for each stop emit its primary color, then immediately emit color_right - // at the same position to produce a hard edge (bidirectional stop). const parts: string[] = []; sortedStops.forEach(s => { const pct = Math.round(s.position * 100); @@ -970,39 +964,43 @@ export function createColorStripCard(source: ColorStripSource, pictureSourceMap: }); cssGradient = `linear-gradient(to right, ${parts.join(', ')})`; } - propsHtml = ` + return ` ${cssGradient ? `` : ''} ${ICON_PALETTE} ${stops.length} ${t('color_strip.gradient.stops_count')} ${animBadge} ${clockBadge} `; - } else if (isEffect) { + }, + effect: (source, { clockBadge }) => { const effectLabel = t('color_strip.effect.' + (source.effect_type || 'fire')) || source.effect_type || 'fire'; const paletteLabel = source.palette ? (t('color_strip.palette.' + source.palette) || source.palette) : ''; - propsHtml = ` + return ` ${ICON_FPS} ${escapeHtml(effectLabel)} ${paletteLabel ? `${ICON_PALETTE} ${escapeHtml(paletteLabel)}` : ''} ${clockBadge} `; - } else if (isComposite) { + }, + composite: (source) => { const layerCount = (source.layers || []).length; const enabledCount = (source.layers || []).filter((l: any) => l.enabled !== false).length; - propsHtml = ` + return ` ${ICON_LINK} ${enabledCount}/${layerCount} ${t('color_strip.composite.layers_count')} ${source.led_count ? `${ICON_LED} ${source.led_count}` : ''} `; - } else if (isMapped) { + }, + mapped: (source) => { const zoneCount = (source.zones || []).length; - propsHtml = ` + return ` ${ICON_MAP_PIN} ${zoneCount} ${t('color_strip.mapped.zones_count')} ${source.led_count ? `${ICON_LED} ${source.led_count}` : ''} `; - } else if (isAudio) { + }, + audio: (source, { audioSourceMap }) => { const vizLabel = t('color_strip.audio.viz.' + (source.visualization_mode || 'spectrum')) || source.visualization_mode || 'spectrum'; const vizMode = source.visualization_mode || 'spectrum'; const showPalette = (vizMode === 'spectrum' || vizMode === 'beat_pulse') && source.palette; const audioPaletteLabel = showPalette ? (t('color_strip.palette.' + source.palette) || source.palette) : ''; - propsHtml = ` + return ` ${ICON_MUSIC} ${escapeHtml(vizLabel)} ${audioPaletteLabel ? `${ICON_PALETTE} ${escapeHtml(audioPaletteLabel)}` : ''} ${source.audio_source_id ? (() => { @@ -1013,21 +1011,23 @@ export function createColorStripCard(source: ColorStripSource, pictureSourceMap: })() : ''} ${source.mirror ? `🪞` : ''} `; - } else if (isApiInput) { + }, + api_input: (source) => { const fbColor = rgbArrayToHex(source.fallback_color || [0, 0, 0]); const timeoutVal = (source.timeout ?? 5.0).toFixed(1); - propsHtml = ` + return ` ${fbColor.toUpperCase()} ${ICON_TIMER} ${timeoutVal}s `; - } else if (isNotification) { + }, + notification: (source) => { const effectLabel = t('color_strip.notification.effect.' + (source.notification_effect || 'flash')) || source.notification_effect || 'flash'; const durationVal = source.duration_ms || 1500; const defColor = source.default_color || '#FFFFFF'; const appCount = source.app_colors ? Object.keys(source.app_colors).length : 0; - propsHtml = ` + return ` ${ICON_BELL} ${escapeHtml(effectLabel)} ${ICON_TIMER} ${durationVal}ms @@ -1035,73 +1035,96 @@ export function createColorStripCard(source: ColorStripSource, pictureSourceMap: ${appCount > 0 ? `${ICON_PALETTE} ${appCount} ${t('color_strip.notification.app_count')}` : ''} `; - } else if (source.source_type === 'daylight') { + }, + daylight: (source, { clockBadge }) => { const useRealTime = source.use_real_time; const speedVal = (source.speed ?? 1.0).toFixed(1); - propsHtml = ` + return ` ${useRealTime ? '🕐 ' + t('color_strip.daylight.real_time') : '⏩ ' + speedVal + 'x'} ${clockBadge} `; - } else if (source.source_type === 'candlelight') { + }, + candlelight: (source, { clockBadge }) => { const hexColor = rgbArrayToHex(source.color || [255, 147, 41]); const numCandles = source.num_candles ?? 3; - propsHtml = ` + return ` ${hexColor.toUpperCase()} ${numCandles} ${t('color_strip.candlelight.num_candles')} ${clockBadge} `; - } else if (source.source_type === 'processed') { + }, + processed: (source) => { const inputSrc = ((colorStripSourcesCache.data || []) as any[]).find(s => s.id === source.input_source_id); const inputName = inputSrc?.name || source.input_source_id || '—'; const tplName = source.processing_template_id ? (_cachedCSPTemplates.find(t => t.id === source.processing_template_id)?.name || source.processing_template_id) : '—'; - propsHtml = ` + return ` ${ICON_LINK_SOURCE} ${escapeHtml(inputName)} ${ICON_SPARKLES} ${escapeHtml(tplName)} `; - } else if (isPictureAdvanced) { + }, + picture_advanced: (source, { pictureSourceMap }) => { const cal = source.calibration ?? {} as Partial; const lines = cal.lines || []; const totalLeds = lines.reduce((s: any, l: any) => s + (l.led_count || 0), 0); const ledCount = (source.led_count > 0) ? source.led_count : totalLeds; - // Collect unique picture source names const psIds: any[] = [...new Set(lines.map((l: any) => l.picture_source_id).filter(Boolean))]; const psNames = psIds.map((id: any) => { const ps = pictureSourceMap && pictureSourceMap[id]; return ps ? ps.name : id; }); - propsHtml = ` + return ` ${ICON_MAP_PIN} ${lines.length} ${t('calibration.advanced.lines_title').toLowerCase()} ${ledCount ? `${ICON_LED} ${ledCount}` : ''} ${psNames.length ? `${ICON_LINK_SOURCE} ${escapeHtml(psNames.join(', '))}` : ''} `; - } else { - const ps = pictureSourceMap && pictureSourceMap[source.picture_source_id!]; - const srcName = ps ? ps.name : source.picture_source_id || '—'; - const cal = source.calibration ?? {} as Partial; - const calLeds = (cal.leds_top || 0) + (cal.leds_right || 0) + (cal.leds_bottom || 0) + (cal.leds_left || 0); - const ledCount = (source.led_count > 0) ? source.led_count : calLeds; - let psSubTab = 'raw', psSection = 'raw-streams'; - if (ps) { - if (ps.stream_type === 'static_image') { psSubTab = 'static_image'; psSection = 'static-streams'; } - else if (ps.stream_type === 'processed') { psSubTab = 'processed'; psSection = 'proc-streams'; } - } - propsHtml = ` - ${ICON_LINK_SOURCE} ${escapeHtml(srcName)} - ${ledCount ? `${ICON_LED} ${ledCount}` : ''} - `; + }, +}; + +/** Fallback renderer for picture-type sources (plain picture). */ +function _renderPictureCardProps(source: ColorStripSource, pictureSourceMap: Record): string { + const ps = pictureSourceMap && pictureSourceMap[source.picture_source_id!]; + const srcName = ps ? ps.name : source.picture_source_id || '—'; + const cal = source.calibration ?? {} as Partial; + const calLeds = (cal.leds_top || 0) + (cal.leds_right || 0) + (cal.leds_bottom || 0) + (cal.leds_left || 0); + const ledCount = (source.led_count > 0) ? source.led_count : calLeds; + let psSubTab = 'raw', psSection = 'raw-streams'; + if (ps) { + if (ps.stream_type === 'static_image') { psSubTab = 'static_image'; psSection = 'static-streams'; } + else if (ps.stream_type === 'processed') { psSubTab = 'processed'; psSection = 'proc-streams'; } } + return ` + ${ICON_LINK_SOURCE} ${escapeHtml(srcName)} + ${ledCount ? `${ICON_LED} ${ledCount}` : ''} + `; +} + +export function createColorStripCard(source: ColorStripSource, pictureSourceMap: Record, audioSourceMap: Record) { + // Clock crosslink badge (replaces speed badge when clock is assigned) + const clockObj = source.clock_id ? _cachedSyncClocks.find(c => c.id === source.clock_id) : null; + const clockBadge = clockObj + ? `${ICON_CLOCK} ${escapeHtml(clockObj.name)}` + : source.clock_id ? `${ICON_CLOCK} ${source.clock_id}` : ''; + + const isAnimatable = source.source_type === 'static' || source.source_type === 'gradient'; + const anim = isAnimatable && source.animation && source.animation.enabled ? source.animation : null; + const animBadge = anim + ? `${ICON_SPARKLES} ${t('color_strip.animation.type.' + anim.type) || anim.type}` + : ''; + + const renderer = CSS_CARD_RENDERERS[source.source_type]; + const propsHtml = renderer + ? renderer(source, { clockBadge, animBadge, audioSourceMap, pictureSourceMap }) + : _renderPictureCardProps(source, pictureSourceMap); const icon = getColorStripIcon(source.source_type); - const isDaylight = source.source_type === 'daylight'; - const isCandlelight = source.source_type === 'candlelight'; - const isProcessed = source.source_type === 'processed'; - const isPictureKind = (!isStatic && !isGradient && !isColorCycle && !isEffect && !isComposite && !isMapped && !isAudio && !isApiInput && !isNotification && !isDaylight && !isCandlelight && !isProcessed); + const isNotification = source.source_type === 'notification'; + const isPictureKind = !NON_PICTURE_TYPES.has(source.source_type); const calibrationBtn = isPictureKind - ? `` + ? `` : ''; const overlayBtn = isPictureKind ? `` diff --git a/server/src/wled_controller/static/js/features/streams.ts b/server/src/wled_controller/static/js/features/streams.ts index 1cfaa60..5a910db 100644 --- a/server/src/wled_controller/static/js/features/streams.ts +++ b/server/src/wled_controller/static/js/features/streams.ts @@ -276,6 +276,54 @@ const _streamSectionMap = { sync: [csSyncClocks], }; +type StreamCardRenderer = (stream: any) => string; + +const PICTURE_SOURCE_CARD_RENDERERS: Record = { + raw: (stream) => { + let capTmplName = ''; + if (stream.capture_template_id) { + const capTmpl = _cachedCaptureTemplates.find(t => t.id === stream.capture_template_id); + if (capTmpl) capTmplName = escapeHtml(capTmpl.name); + } + return `
+ ${ICON_MONITOR} ${stream.display_index ?? 0} + ${ICON_FPS} ${stream.target_fps ?? 30} + ${capTmplName ? `${ICON_TEMPLATE} ${capTmplName}` : ''} +
`; + }, + processed: (stream) => { + const sourceStream = _cachedStreams.find(s => s.id === stream.source_stream_id); + const sourceName = sourceStream ? escapeHtml(sourceStream.name) : (stream.source_stream_id || '-'); + const sourceSubTab = sourceStream ? (sourceStream.stream_type === 'static_image' ? 'static_image' : 'raw') : 'raw'; + const sourceSection = sourceStream ? (sourceStream.stream_type === 'static_image' ? 'static-streams' : 'raw-streams') : 'raw-streams'; + let ppTmplName = ''; + if (stream.postprocessing_template_id) { + const ppTmpl = _cachedPPTemplates.find(p => p.id === stream.postprocessing_template_id); + if (ppTmpl) ppTmplName = escapeHtml(ppTmpl.name); + } + return `
+ ${ICON_LINK_SOURCE} ${sourceName} + ${ppTmplName ? `${ICON_TEMPLATE} ${ppTmplName}` : ''} +
`; + }, + static_image: (stream) => { + const src = stream.image_source || ''; + return `
+ ${ICON_WEB} ${escapeHtml(src)} +
`; + }, + video: (stream) => { + const url = stream.url || ''; + const shortUrl = url.length > 40 ? url.slice(0, 37) + '...' : url; + return `
+ ${ICON_WEB} ${escapeHtml(shortUrl)} + ${ICON_FPS} ${stream.target_fps ?? 30} + ${stream.loop !== false ? `` : ''} + ${stream.playback_speed && stream.playback_speed !== 1.0 ? `${stream.playback_speed}×` : ''} +
`; + }, +}; + function renderPictureSourcesList(streams: any) { const container = document.getElementById('streams-list')!; const activeTab = localStorage.getItem('activeStreamTab') || 'raw'; @@ -283,47 +331,8 @@ function renderPictureSourcesList(streams: any) { const renderStreamCard = (stream: any) => { const typeIcon = getPictureSourceIcon(stream.stream_type); - let detailsHtml = ''; - if (stream.stream_type === 'raw') { - let capTmplName = ''; - if (stream.capture_template_id) { - const capTmpl = _cachedCaptureTemplates.find(t => t.id === stream.capture_template_id); - if (capTmpl) capTmplName = escapeHtml(capTmpl.name); - } - detailsHtml = `
- ${ICON_MONITOR} ${stream.display_index ?? 0} - ${ICON_FPS} ${stream.target_fps ?? 30} - ${capTmplName ? `${ICON_TEMPLATE} ${capTmplName}` : ''} -
`; - } else if (stream.stream_type === 'processed') { - const sourceStream = _cachedStreams.find(s => s.id === stream.source_stream_id); - const sourceName = sourceStream ? escapeHtml(sourceStream.name) : (stream.source_stream_id || '-'); - const sourceSubTab = sourceStream ? (sourceStream.stream_type === 'static_image' ? 'static_image' : 'raw') : 'raw'; - const sourceSection = sourceStream ? (sourceStream.stream_type === 'static_image' ? 'static-streams' : 'raw-streams') : 'raw-streams'; - let ppTmplName = ''; - if (stream.postprocessing_template_id) { - const ppTmpl = _cachedPPTemplates.find(p => p.id === stream.postprocessing_template_id); - if (ppTmpl) ppTmplName = escapeHtml(ppTmpl.name); - } - detailsHtml = `
- ${ICON_LINK_SOURCE} ${sourceName} - ${ppTmplName ? `${ICON_TEMPLATE} ${ppTmplName}` : ''} -
`; - } else if (stream.stream_type === 'static_image') { - const src = stream.image_source || ''; - detailsHtml = `
- ${ICON_WEB} ${escapeHtml(src)} -
`; - } else if (stream.stream_type === 'video') { - const url = stream.url || ''; - const shortUrl = url.length > 40 ? url.slice(0, 37) + '...' : url; - detailsHtml = `
- ${ICON_WEB} ${escapeHtml(shortUrl)} - ${ICON_FPS} ${stream.target_fps ?? 30} - ${stream.loop !== false ? `` : ''} - ${stream.playback_speed && stream.playback_speed !== 1.0 ? `${stream.playback_speed}×` : ''} -
`; - } + const renderer = PICTURE_SOURCE_CARD_RENDERERS[stream.stream_type]; + const detailsHtml = renderer ? renderer(stream) : ''; return wrapCard({ type: 'template-card', diff --git a/server/src/wled_controller/storage/automation.py b/server/src/wled_controller/storage/automation.py index 86cf4f6..368287c 100644 --- a/server/src/wled_controller/storage/automation.py +++ b/server/src/wled_controller/storage/automation.py @@ -2,7 +2,7 @@ from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import List, Optional +from typing import Dict, List, Optional, Type @dataclass @@ -16,25 +16,12 @@ class Condition: @classmethod def from_dict(cls, data: dict) -> "Condition": - """Factory: dispatch to the correct subclass.""" + """Factory: dispatch to the correct subclass via registry.""" ct = data.get("condition_type", "") - if ct == "always": - return AlwaysCondition.from_dict(data) - if ct == "application": - return ApplicationCondition.from_dict(data) - if ct == "time_of_day": - return TimeOfDayCondition.from_dict(data) - if ct == "system_idle": - return SystemIdleCondition.from_dict(data) - if ct == "display_state": - return DisplayStateCondition.from_dict(data) - if ct == "mqtt": - return MQTTCondition.from_dict(data) - if ct == "webhook": - return WebhookCondition.from_dict(data) - if ct == "startup": - return StartupCondition.from_dict(data) - raise ValueError(f"Unknown condition type: {ct}") + subcls = _CONDITION_MAP.get(ct) + if subcls is None: + raise ValueError(f"Unknown condition type: {ct}") + return subcls.from_dict(data) @dataclass @@ -190,6 +177,18 @@ class StartupCondition(Condition): return cls() +_CONDITION_MAP: Dict[str, Type[Condition]] = { + "always": AlwaysCondition, + "application": ApplicationCondition, + "time_of_day": TimeOfDayCondition, + "system_idle": SystemIdleCondition, + "display_state": DisplayStateCondition, + "mqtt": MQTTCondition, + "webhook": WebhookCondition, + "startup": StartupCondition, +} + + @dataclass class Automation: """Automation that activates a scene preset based on conditions.""" diff --git a/server/src/wled_controller/storage/color_strip_source.py b/server/src/wled_controller/storage/color_strip_source.py index cc78ad7..9e57e76 100644 --- a/server/src/wled_controller/storage/color_strip_source.py +++ b/server/src/wled_controller/storage/color_strip_source.py @@ -104,216 +104,53 @@ class ColorStripSource: @staticmethod def from_dict(data: dict) -> "ColorStripSource": """Factory: dispatch to the correct subclass based on source_type.""" - source_type: str = data.get("source_type", "picture") or "picture" - sid: str = data["id"] - name: str = data["name"] - description: str | None = data.get("description") + source_type = data.get("source_type", "picture") or "picture" + subcls = _SOURCE_TYPE_MAP.get(source_type, PictureColorStripSource) + return subcls.from_dict(data) - clock_id: str | None = data.get("clock_id") - tags: list = data.get("tags", []) - raw_created = data.get("created_at") - created_at: datetime = ( - datetime.fromisoformat(raw_created) - if isinstance(raw_created, str) - else raw_created if isinstance(raw_created, datetime) - else datetime.now(timezone.utc) - ) - raw_updated = data.get("updated_at") - updated_at: datetime = ( - datetime.fromisoformat(raw_updated) - if isinstance(raw_updated, str) - else raw_updated if isinstance(raw_updated, datetime) - else datetime.now(timezone.utc) - ) +def _parse_css_common(data: dict) -> dict: + """Extract fields common to all ColorStripSource types.""" + raw_created = data.get("created_at") + created_at = ( + datetime.fromisoformat(raw_created) + if isinstance(raw_created, str) + else raw_created if isinstance(raw_created, datetime) + else datetime.now(timezone.utc) + ) + raw_updated = data.get("updated_at") + updated_at = ( + datetime.fromisoformat(raw_updated) + if isinstance(raw_updated, str) + else raw_updated if isinstance(raw_updated, datetime) + else datetime.now(timezone.utc) + ) + return dict( + id=data["id"], + name=data["name"], + description=data.get("description"), + clock_id=data.get("clock_id"), + tags=data.get("tags", []), + created_at=created_at, + updated_at=updated_at, + ) - calibration_data = data.get("calibration") - calibration = ( - calibration_from_dict(calibration_data) - if calibration_data - else CalibrationConfig(layout="clockwise", start_position="bottom_left") - ) - if source_type == "static": - raw_color = data.get("color") - color = ( - raw_color if isinstance(raw_color, list) and len(raw_color) == 3 - else [255, 255, 255] - ) - return StaticColorStripSource( - id=sid, name=name, source_type="static", - created_at=created_at, updated_at=updated_at, description=description, - clock_id=clock_id, tags=tags, color=color, - animation=data.get("animation"), - ) - - if source_type == "gradient": - raw_stops = data.get("stops") - stops = raw_stops if isinstance(raw_stops, list) else [] - return GradientColorStripSource( - id=sid, name=name, source_type="gradient", - created_at=created_at, updated_at=updated_at, description=description, - clock_id=clock_id, tags=tags, stops=stops, - animation=data.get("animation"), - easing=data.get("easing") or "linear", - gradient_id=data.get("gradient_id"), - ) - - if source_type == "color_cycle": - raw_colors = data.get("colors") - colors = raw_colors if isinstance(raw_colors, list) else [] - return ColorCycleColorStripSource( - id=sid, name=name, source_type="color_cycle", - created_at=created_at, updated_at=updated_at, description=description, - clock_id=clock_id, tags=tags, colors=colors, - ) - - if source_type == "composite": - return CompositeColorStripSource( - id=sid, name=name, source_type="composite", - created_at=created_at, updated_at=updated_at, description=description, - clock_id=clock_id, tags=tags, layers=data.get("layers") or [], - led_count=data.get("led_count") or 0, - ) - - if source_type == "mapped": - return MappedColorStripSource( - id=sid, name=name, source_type="mapped", - created_at=created_at, updated_at=updated_at, description=description, - clock_id=clock_id, tags=tags, zones=data.get("zones") or [], - led_count=data.get("led_count") or 0, - ) - - if source_type == "audio": - raw_color = data.get("color") - color = raw_color if isinstance(raw_color, list) and len(raw_color) == 3 else [0, 255, 0] - raw_peak = data.get("color_peak") - color_peak = raw_peak if isinstance(raw_peak, list) and len(raw_peak) == 3 else [255, 0, 0] - return AudioColorStripSource( - id=sid, name=name, source_type="audio", - created_at=created_at, updated_at=updated_at, description=description, - clock_id=clock_id, tags=tags, visualization_mode=data.get("visualization_mode") or "spectrum", - audio_source_id=data.get("audio_source_id") or "", - sensitivity=float(data.get("sensitivity") or 1.0), - smoothing=float(data.get("smoothing") or 0.3), - palette=data.get("palette") or "rainbow", - gradient_id=data.get("gradient_id"), - color=color, - color_peak=color_peak, - led_count=data.get("led_count") or 0, - mirror=bool(data.get("mirror", False)), - ) - - if source_type == "effect": - raw_color = data.get("color") - color = ( - raw_color if isinstance(raw_color, list) and len(raw_color) == 3 - else [255, 80, 0] - ) - return EffectColorStripSource( - id=sid, name=name, source_type="effect", - created_at=created_at, updated_at=updated_at, description=description, - clock_id=clock_id, tags=tags, effect_type=data.get("effect_type") or "fire", - palette=data.get("palette") or "fire", - gradient_id=data.get("gradient_id"), - color=color, - intensity=float(data.get("intensity") or 1.0), - scale=float(data.get("scale") or 1.0), - mirror=bool(data.get("mirror", False)), - custom_palette=data.get("custom_palette"), - ) - - if source_type == "api_input": - raw_fallback = data.get("fallback_color") - fallback_color = ( - raw_fallback if isinstance(raw_fallback, list) and len(raw_fallback) == 3 - else [0, 0, 0] - ) - return ApiInputColorStripSource( - id=sid, name=name, source_type="api_input", - created_at=created_at, updated_at=updated_at, description=description, - clock_id=clock_id, tags=tags, - fallback_color=fallback_color, - timeout=float(data.get("timeout") or 5.0), - ) - - elif source_type == "notification": - raw_app_colors = data.get("app_colors") - app_colors = raw_app_colors if isinstance(raw_app_colors, dict) else {} - raw_app_filter_list = data.get("app_filter_list") - app_filter_list = raw_app_filter_list if isinstance(raw_app_filter_list, list) else [] - return NotificationColorStripSource( - id=sid, name=name, source_type="notification", - created_at=created_at, updated_at=updated_at, description=description, - clock_id=clock_id, tags=tags, - notification_effect=data.get("notification_effect") or "flash", - duration_ms=int(data.get("duration_ms") or 1500), - default_color=data.get("default_color") or "#FFFFFF", - app_colors=app_colors, - app_filter_mode=data.get("app_filter_mode") or "off", - app_filter_list=app_filter_list, - os_listener=bool(data.get("os_listener", False)), - ) - - if source_type == "daylight": - return DaylightColorStripSource( - id=sid, name=name, source_type="daylight", - created_at=created_at, updated_at=updated_at, description=description, - clock_id=clock_id, tags=tags, - speed=float(data.get("speed") or 1.0), - use_real_time=bool(data.get("use_real_time", False)), - latitude=float(data.get("latitude") or 50.0), - ) - - if source_type == "processed": - return ProcessedColorStripSource( - id=sid, name=name, source_type="processed", - created_at=created_at, updated_at=updated_at, description=description, - clock_id=clock_id, tags=tags, - input_source_id=data.get("input_source_id") or "", - processing_template_id=data.get("processing_template_id") or "", - ) - - if source_type == "candlelight": - raw_color = data.get("color") - color = ( - raw_color if isinstance(raw_color, list) and len(raw_color) == 3 - else [255, 147, 41] - ) - return CandlelightColorStripSource( - id=sid, name=name, source_type="candlelight", - created_at=created_at, updated_at=updated_at, description=description, - clock_id=clock_id, tags=tags, - color=color, - intensity=float(data.get("intensity") or 1.0), - num_candles=int(data.get("num_candles") or 3), - speed=float(data.get("speed") or 1.0), - ) - - # Shared picture-type field extraction - _picture_kwargs = dict( - tags=tags, - fps=data.get("fps") or 30, - smoothing=data["smoothing"] if data.get("smoothing") is not None else 0.3, - interpolation_mode=data.get("interpolation_mode") or "average", - calibration=calibration, - led_count=data.get("led_count") or 0, - ) - - if source_type == "picture_advanced": - return AdvancedPictureColorStripSource( - id=sid, name=name, source_type="picture_advanced", - created_at=created_at, updated_at=updated_at, description=description, - clock_id=clock_id, **_picture_kwargs, - ) - - # Default: "picture" type (simple 4-edge calibration) - return PictureColorStripSource( - id=sid, name=name, source_type=source_type, - created_at=created_at, updated_at=updated_at, description=description, - clock_id=clock_id, picture_source_id=data.get("picture_source_id") or "", - **_picture_kwargs, - ) +def _parse_picture_fields(data: dict) -> dict: + """Extract fields shared by picture-type color strip sources.""" + calibration_data = data.get("calibration") + calibration = ( + calibration_from_dict(calibration_data) + if calibration_data + else CalibrationConfig(layout="clockwise", start_position="bottom_left") + ) + return dict( + fps=data.get("fps") or 30, + smoothing=data["smoothing"] if data.get("smoothing") is not None else 0.3, + interpolation_mode=data.get("interpolation_mode") or "average", + calibration=calibration, + led_count=data.get("led_count") or 0, + ) def _picture_base_to_dict(source, d: dict) -> dict: @@ -367,6 +204,17 @@ class PictureColorStripSource(ColorStripSource): d["picture_source_id"] = self.picture_source_id return _picture_base_to_dict(self, d) + @classmethod + def from_dict(cls, data: dict) -> "PictureColorStripSource": + common = _parse_css_common(data) + pic = _parse_picture_fields(data) + return cls( + **common, + source_type=data.get("source_type", "picture") or "picture", + picture_source_id=data.get("picture_source_id") or "", + **pic, + ) + @classmethod def create_from_kwargs(cls, *, id: str, name: str, source_type: str, created_at: datetime, updated_at: datetime, @@ -419,6 +267,12 @@ class AdvancedPictureColorStripSource(ColorStripSource): d = super().to_dict() return _picture_base_to_dict(self, d) + @classmethod + def from_dict(cls, data: dict) -> "AdvancedPictureColorStripSource": + common = _parse_css_common(data) + pic = _parse_picture_fields(data) + return cls(**common, source_type="picture_advanced", **pic) + @classmethod def create_from_kwargs(cls, *, id: str, name: str, source_type: str, created_at: datetime, updated_at: datetime, @@ -459,6 +313,15 @@ class StaticColorStripSource(ColorStripSource): d["animation"] = self.animation return d + @classmethod + def from_dict(cls, data: dict) -> "StaticColorStripSource": + common = _parse_css_common(data) + color = _validate_rgb(data.get("color"), [255, 255, 255]) + return cls( + **common, source_type="static", + color=color, animation=data.get("animation"), + ) + @classmethod def create_from_kwargs(cls, *, id: str, name: str, source_type: str, created_at: datetime, updated_at: datetime, @@ -509,6 +372,18 @@ class GradientColorStripSource(ColorStripSource): d["gradient_id"] = self.gradient_id return d + @classmethod + def from_dict(cls, data: dict) -> "GradientColorStripSource": + common = _parse_css_common(data) + raw_stops = data.get("stops") + return cls( + **common, source_type="gradient", + stops=raw_stops if isinstance(raw_stops, list) else [], + animation=data.get("animation"), + easing=data.get("easing") or "linear", + gradient_id=data.get("gradient_id"), + ) + @classmethod def create_from_kwargs(cls, *, id: str, name: str, source_type: str, created_at: datetime, updated_at: datetime, @@ -559,6 +434,15 @@ class ColorCycleColorStripSource(ColorStripSource): d["colors"] = [list(c) for c in self.colors] return d + @classmethod + def from_dict(cls, data: dict) -> "ColorCycleColorStripSource": + common = _parse_css_common(data) + raw_colors = data.get("colors") + return cls( + **common, source_type="color_cycle", + colors=raw_colors if isinstance(raw_colors, list) else [], + ) + @classmethod def create_from_kwargs(cls, *, id: str, name: str, source_type: str, created_at: datetime, updated_at: datetime, @@ -611,6 +495,22 @@ class EffectColorStripSource(ColorStripSource): d["custom_palette"] = self.custom_palette return d + @classmethod + def from_dict(cls, data: dict) -> "EffectColorStripSource": + common = _parse_css_common(data) + color = _validate_rgb(data.get("color"), [255, 80, 0]) + return cls( + **common, source_type="effect", + effect_type=data.get("effect_type") or "fire", + palette=data.get("palette") or "fire", + gradient_id=data.get("gradient_id"), + color=color, + intensity=float(data.get("intensity") or 1.0), + scale=float(data.get("scale") or 1.0), + mirror=bool(data.get("mirror", False)), + custom_palette=data.get("custom_palette"), + ) + @classmethod def create_from_kwargs(cls, *, id: str, name: str, source_type: str, created_at: datetime, updated_at: datetime, @@ -687,6 +587,24 @@ class AudioColorStripSource(ColorStripSource): d["mirror"] = self.mirror return d + @classmethod + def from_dict(cls, data: dict) -> "AudioColorStripSource": + common = _parse_css_common(data) + color = _validate_rgb(data.get("color"), [0, 255, 0]) + color_peak = _validate_rgb(data.get("color_peak"), [255, 0, 0]) + return cls( + **common, source_type="audio", + visualization_mode=data.get("visualization_mode") or "spectrum", + audio_source_id=data.get("audio_source_id") or "", + sensitivity=float(data.get("sensitivity") or 1.0), + smoothing=float(data.get("smoothing") or 0.3), + palette=data.get("palette") or "rainbow", + gradient_id=data.get("gradient_id"), + color=color, color_peak=color_peak, + led_count=data.get("led_count") or 0, + mirror=bool(data.get("mirror", False)), + ) + @classmethod def create_from_kwargs(cls, *, id: str, name: str, source_type: str, created_at: datetime, updated_at: datetime, @@ -756,6 +674,15 @@ class CompositeColorStripSource(ColorStripSource): d["led_count"] = self.led_count return d + @classmethod + def from_dict(cls, data: dict) -> "CompositeColorStripSource": + common = _parse_css_common(data) + return cls( + **common, source_type="composite", + layers=data.get("layers") or [], + led_count=data.get("led_count") or 0, + ) + @classmethod def create_from_kwargs(cls, *, id: str, name: str, source_type: str, created_at: datetime, updated_at: datetime, @@ -797,6 +724,15 @@ class MappedColorStripSource(ColorStripSource): d["led_count"] = self.led_count return d + @classmethod + def from_dict(cls, data: dict) -> "MappedColorStripSource": + common = _parse_css_common(data) + return cls( + **common, source_type="mapped", + zones=data.get("zones") or [], + led_count=data.get("led_count") or 0, + ) + @classmethod def create_from_kwargs(cls, *, id: str, name: str, source_type: str, created_at: datetime, updated_at: datetime, @@ -837,6 +773,16 @@ class ApiInputColorStripSource(ColorStripSource): d["timeout"] = self.timeout return d + @classmethod + def from_dict(cls, data: dict) -> "ApiInputColorStripSource": + common = _parse_css_common(data) + fallback_color = _validate_rgb(data.get("fallback_color"), [0, 0, 0]) + return cls( + **common, source_type="api_input", + fallback_color=fallback_color, + timeout=float(data.get("timeout") or 5.0), + ) + @classmethod def create_from_kwargs(cls, *, id: str, name: str, source_type: str, created_at: datetime, updated_at: datetime, @@ -890,6 +836,22 @@ class NotificationColorStripSource(ColorStripSource): d["os_listener"] = self.os_listener return d + @classmethod + def from_dict(cls, data: dict) -> "NotificationColorStripSource": + common = _parse_css_common(data) + raw_app_colors = data.get("app_colors") + raw_app_filter_list = data.get("app_filter_list") + return cls( + **common, source_type="notification", + notification_effect=data.get("notification_effect") or "flash", + duration_ms=int(data.get("duration_ms") or 1500), + default_color=data.get("default_color") or "#FFFFFF", + app_colors=raw_app_colors if isinstance(raw_app_colors, dict) else {}, + app_filter_mode=data.get("app_filter_mode") or "off", + app_filter_list=raw_app_filter_list if isinstance(raw_app_filter_list, list) else [], + os_listener=bool(data.get("os_listener", False)), + ) + @classmethod def create_from_kwargs(cls, *, id: str, name: str, source_type: str, created_at: datetime, updated_at: datetime, @@ -957,6 +919,16 @@ class DaylightColorStripSource(ColorStripSource): d["longitude"] = self.longitude return d + @classmethod + def from_dict(cls, data: dict) -> "DaylightColorStripSource": + common = _parse_css_common(data) + return cls( + **common, source_type="daylight", + speed=float(data.get("speed") or 1.0), + use_real_time=bool(data.get("use_real_time", False)), + latitude=float(data.get("latitude") or 50.0), + ) + @classmethod def create_from_kwargs(cls, *, id: str, name: str, source_type: str, created_at: datetime, updated_at: datetime, @@ -1010,6 +982,18 @@ class CandlelightColorStripSource(ColorStripSource): d["candle_type"] = self.candle_type return d + @classmethod + def from_dict(cls, data: dict) -> "CandlelightColorStripSource": + common = _parse_css_common(data) + color = _validate_rgb(data.get("color"), [255, 147, 41]) + return cls( + **common, source_type="candlelight", + color=color, + intensity=float(data.get("intensity") or 1.0), + num_candles=int(data.get("num_candles") or 3), + speed=float(data.get("speed") or 1.0), + ) + @classmethod def create_from_kwargs(cls, *, id: str, name: str, source_type: str, created_at: datetime, updated_at: datetime, @@ -1066,6 +1050,15 @@ class ProcessedColorStripSource(ColorStripSource): d["processing_template_id"] = self.processing_template_id return d + @classmethod + def from_dict(cls, data: dict) -> "ProcessedColorStripSource": + common = _parse_css_common(data) + return cls( + **common, source_type="processed", + input_source_id=data.get("input_source_id") or "", + processing_template_id=data.get("processing_template_id") or "", + ) + @classmethod def create_from_kwargs(cls, *, id: str, name: str, source_type: str, created_at: datetime, updated_at: datetime, diff --git a/server/src/wled_controller/storage/picture_source.py b/server/src/wled_controller/storage/picture_source.py index 21fa567..ea7d46d 100644 --- a/server/src/wled_controller/storage/picture_source.py +++ b/server/src/wled_controller/storage/picture_source.py @@ -2,7 +2,7 @@ from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import List, Optional +from typing import Dict, List, Optional, Type @dataclass @@ -54,61 +54,35 @@ class PictureSource: @staticmethod def from_dict(data: dict) -> "PictureSource": """Factory: dispatch to the correct subclass based on stream_type.""" - stream_type: str = data.get("stream_type", "raw") or "raw" - sid: str = data["id"] - name: str = data["name"] - description: str | None = data.get("description") - tags: list = data.get("tags", []) + stream_type = data.get("stream_type", "raw") or "raw" + subcls = _PICTURE_SOURCE_MAP.get(stream_type, ScreenCapturePictureSource) + return subcls.from_dict(data) - raw_created = data.get("created_at") - created_at: datetime = ( - datetime.fromisoformat(raw_created) - if isinstance(raw_created, str) - else raw_created if isinstance(raw_created, datetime) - else datetime.now(timezone.utc) - ) - raw_updated = data.get("updated_at") - updated_at: datetime = ( - datetime.fromisoformat(raw_updated) - if isinstance(raw_updated, str) - else raw_updated if isinstance(raw_updated, datetime) - else datetime.now(timezone.utc) - ) - if stream_type == "processed": - return ProcessedPictureSource( - id=sid, name=name, stream_type=stream_type, - created_at=created_at, updated_at=updated_at, description=description, tags=tags, - source_stream_id=data.get("source_stream_id") or "", - postprocessing_template_id=data.get("postprocessing_template_id") or "", - ) - elif stream_type == "static_image": - return StaticImagePictureSource( - id=sid, name=name, stream_type=stream_type, - created_at=created_at, updated_at=updated_at, description=description, tags=tags, - image_source=data.get("image_source") or "", - ) - elif stream_type == "video": - return VideoCaptureSource( - id=sid, name=name, stream_type=stream_type, - created_at=created_at, updated_at=updated_at, description=description, tags=tags, - url=data.get("url") or "", - loop=data.get("loop", True), - playback_speed=data.get("playback_speed", 1.0), - start_time=data.get("start_time"), - end_time=data.get("end_time"), - resolution_limit=data.get("resolution_limit"), - clock_id=data.get("clock_id"), - target_fps=data.get("target_fps") or 30, - ) - else: - return ScreenCapturePictureSource( - id=sid, name=name, stream_type=stream_type, - created_at=created_at, updated_at=updated_at, description=description, tags=tags, - display_index=data.get("display_index") or 0, - capture_template_id=data.get("capture_template_id") or "", - target_fps=data.get("target_fps") or 30, - ) +def _parse_common_fields(data: dict) -> dict: + """Extract common fields shared by all picture source types.""" + raw_created = data.get("created_at") + created_at = ( + datetime.fromisoformat(raw_created) + if isinstance(raw_created, str) + else raw_created if isinstance(raw_created, datetime) + else datetime.now(timezone.utc) + ) + raw_updated = data.get("updated_at") + updated_at = ( + datetime.fromisoformat(raw_updated) + if isinstance(raw_updated, str) + else raw_updated if isinstance(raw_updated, datetime) + else datetime.now(timezone.utc) + ) + return dict( + id=data["id"], + name=data["name"], + description=data.get("description"), + tags=data.get("tags", []), + created_at=created_at, + updated_at=updated_at, + ) @dataclass @@ -126,6 +100,17 @@ class ScreenCapturePictureSource(PictureSource): d["target_fps"] = self.target_fps return d + @classmethod + def from_dict(cls, data: dict) -> "ScreenCapturePictureSource": + common = _parse_common_fields(data) + return cls( + **common, + stream_type=data.get("stream_type", "raw") or "raw", + display_index=data.get("display_index") or 0, + capture_template_id=data.get("capture_template_id") or "", + target_fps=data.get("target_fps") or 30, + ) + @dataclass class ProcessedPictureSource(PictureSource): @@ -140,6 +125,16 @@ class ProcessedPictureSource(PictureSource): d["postprocessing_template_id"] = self.postprocessing_template_id return d + @classmethod + def from_dict(cls, data: dict) -> "ProcessedPictureSource": + common = _parse_common_fields(data) + return cls( + **common, + stream_type="processed", + source_stream_id=data.get("source_stream_id") or "", + postprocessing_template_id=data.get("postprocessing_template_id") or "", + ) + @dataclass class StaticImagePictureSource(PictureSource): @@ -152,6 +147,15 @@ class StaticImagePictureSource(PictureSource): d["image_source"] = self.image_source return d + @classmethod + def from_dict(cls, data: dict) -> "StaticImagePictureSource": + common = _parse_common_fields(data) + return cls( + **common, + stream_type="static_image", + image_source=data.get("image_source") or "", + ) + @dataclass class VideoCaptureSource(PictureSource): @@ -177,3 +181,29 @@ class VideoCaptureSource(PictureSource): d["clock_id"] = self.clock_id d["target_fps"] = self.target_fps return d + + @classmethod + def from_dict(cls, data: dict) -> "VideoCaptureSource": + common = _parse_common_fields(data) + return cls( + **common, + stream_type="video", + url=data.get("url") or "", + loop=data.get("loop", True), + playback_speed=data.get("playback_speed", 1.0), + start_time=data.get("start_time"), + end_time=data.get("end_time"), + resolution_limit=data.get("resolution_limit"), + clock_id=data.get("clock_id"), + target_fps=data.get("target_fps") or 30, + ) + + +# -- Source type registry -- +# Maps stream_type string to its subclass for factory dispatch. +_PICTURE_SOURCE_MAP: Dict[str, Type[PictureSource]] = { + "raw": ScreenCapturePictureSource, + "processed": ProcessedPictureSource, + "static_image": StaticImagePictureSource, + "video": VideoCaptureSource, +} diff --git a/server/src/wled_controller/storage/value_source.py b/server/src/wled_controller/storage/value_source.py index 32ac705..edbc22f 100644 --- a/server/src/wled_controller/storage/value_source.py +++ b/server/src/wled_controller/storage/value_source.py @@ -13,7 +13,7 @@ parameters like brightness. Six types: from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import List, Optional +from typing import Dict, List, Optional, Type @dataclass @@ -59,88 +59,35 @@ class ValueSource: @staticmethod def from_dict(data: dict) -> "ValueSource": """Factory: dispatch to the correct subclass based on source_type.""" - source_type: str = data.get("source_type", "static") or "static" - sid: str = data["id"] - name: str = data["name"] - description: str | None = data.get("description") - tags: list = data.get("tags", []) + source_type = data.get("source_type", "static") or "static" + subcls = _VALUE_SOURCE_MAP.get(source_type, StaticValueSource) + return subcls.from_dict(data) - raw_created = data.get("created_at") - created_at: datetime = ( - datetime.fromisoformat(raw_created) - if isinstance(raw_created, str) - else raw_created if isinstance(raw_created, datetime) - else datetime.now(timezone.utc) - ) - raw_updated = data.get("updated_at") - updated_at: datetime = ( - datetime.fromisoformat(raw_updated) - if isinstance(raw_updated, str) - else raw_updated if isinstance(raw_updated, datetime) - else datetime.now(timezone.utc) - ) - if source_type == "animated": - return AnimatedValueSource( - id=sid, name=name, source_type="animated", - created_at=created_at, updated_at=updated_at, description=description, tags=tags, - waveform=data.get("waveform") or "sine", - speed=float(data.get("speed") or 10.0), - min_value=float(data.get("min_value") or 0.0), - max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0, - ) - - if source_type == "audio": - return AudioValueSource( - id=sid, name=name, source_type="audio", - created_at=created_at, updated_at=updated_at, description=description, tags=tags, - audio_source_id=data.get("audio_source_id") or "", - mode=data.get("mode") or "rms", - sensitivity=float(data.get("sensitivity") or 1.0), - smoothing=float(data.get("smoothing") or 0.3), - min_value=float(data.get("min_value") or 0.0), - max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0, - auto_gain=bool(data.get("auto_gain", False)), - ) - - if source_type == "adaptive_time": - return AdaptiveValueSource( - id=sid, name=name, source_type="adaptive_time", - created_at=created_at, updated_at=updated_at, description=description, tags=tags, - schedule=data.get("schedule") or [], - min_value=float(data.get("min_value") or 0.0), - max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0, - ) - - if source_type == "adaptive_scene": - return AdaptiveValueSource( - id=sid, name=name, source_type="adaptive_scene", - created_at=created_at, updated_at=updated_at, description=description, tags=tags, - picture_source_id=data.get("picture_source_id") or "", - scene_behavior=data.get("scene_behavior") or "complement", - sensitivity=float(data.get("sensitivity") or 1.0), - smoothing=float(data.get("smoothing") or 0.3), - min_value=float(data.get("min_value") or 0.0), - max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0, - ) - - if source_type == "daylight": - return DaylightValueSource( - id=sid, name=name, source_type="daylight", - created_at=created_at, updated_at=updated_at, description=description, tags=tags, - speed=float(data.get("speed") or 1.0), - use_real_time=bool(data.get("use_real_time", False)), - latitude=float(data.get("latitude") or 50.0), - min_value=float(data.get("min_value") or 0.0), - max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0, - ) - - # Default: "static" type - return StaticValueSource( - id=sid, name=name, source_type="static", - created_at=created_at, updated_at=updated_at, description=description, tags=tags, - value=float(data["value"]) if data.get("value") is not None else 1.0, - ) +def _parse_common_fields(data: dict) -> dict: + """Extract common fields shared by all value source types.""" + raw_created = data.get("created_at") + created_at = ( + datetime.fromisoformat(raw_created) + if isinstance(raw_created, str) + else raw_created if isinstance(raw_created, datetime) + else datetime.now(timezone.utc) + ) + raw_updated = data.get("updated_at") + updated_at = ( + datetime.fromisoformat(raw_updated) + if isinstance(raw_updated, str) + else raw_updated if isinstance(raw_updated, datetime) + else datetime.now(timezone.utc) + ) + return dict( + id=data["id"], + name=data["name"], + description=data.get("description"), + tags=data.get("tags", []), + created_at=created_at, + updated_at=updated_at, + ) @dataclass @@ -157,6 +104,15 @@ class StaticValueSource(ValueSource): d["value"] = self.value return d + @classmethod + def from_dict(cls, data: dict) -> "StaticValueSource": + common = _parse_common_fields(data) + return cls( + **common, + source_type="static", + value=float(data["value"]) if data.get("value") is not None else 1.0, + ) + @dataclass class AnimatedValueSource(ValueSource): @@ -179,6 +135,18 @@ class AnimatedValueSource(ValueSource): d["max_value"] = self.max_value return d + @classmethod + def from_dict(cls, data: dict) -> "AnimatedValueSource": + common = _parse_common_fields(data) + return cls( + **common, + source_type="animated", + waveform=data.get("waveform") or "sine", + speed=float(data.get("speed") or 10.0), + min_value=float(data.get("min_value") or 0.0), + max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0, + ) + @dataclass class AudioValueSource(ValueSource): @@ -207,6 +175,21 @@ class AudioValueSource(ValueSource): d["auto_gain"] = self.auto_gain return d + @classmethod + def from_dict(cls, data: dict) -> "AudioValueSource": + common = _parse_common_fields(data) + return cls( + **common, + source_type="audio", + audio_source_id=data.get("audio_source_id") or "", + mode=data.get("mode") or "rms", + sensitivity=float(data.get("sensitivity") or 1.0), + smoothing=float(data.get("smoothing") or 0.3), + min_value=float(data.get("min_value") or 0.0), + max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0, + auto_gain=bool(data.get("auto_gain", False)), + ) + @dataclass class AdaptiveValueSource(ValueSource): @@ -236,6 +219,22 @@ class AdaptiveValueSource(ValueSource): d["max_value"] = self.max_value return d + @classmethod + def from_dict(cls, data: dict) -> "AdaptiveValueSource": + common = _parse_common_fields(data) + source_type = data.get("source_type", "adaptive_time") + return cls( + **common, + source_type=source_type, + schedule=data.get("schedule") or [], + picture_source_id=data.get("picture_source_id") or "", + scene_behavior=data.get("scene_behavior") or "complement", + sensitivity=float(data.get("sensitivity") or 1.0), + smoothing=float(data.get("smoothing") or 0.3), + min_value=float(data.get("min_value") or 0.0), + max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0, + ) + @dataclass class DaylightValueSource(ValueSource): @@ -259,3 +258,28 @@ class DaylightValueSource(ValueSource): d["min_value"] = self.min_value d["max_value"] = self.max_value return d + + @classmethod + def from_dict(cls, data: dict) -> "DaylightValueSource": + common = _parse_common_fields(data) + return cls( + **common, + source_type="daylight", + speed=float(data.get("speed") or 1.0), + use_real_time=bool(data.get("use_real_time", False)), + latitude=float(data.get("latitude") or 50.0), + min_value=float(data.get("min_value") or 0.0), + max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0, + ) + + +# -- Source type registry -- +# Maps source_type string to its subclass for factory dispatch. +_VALUE_SOURCE_MAP: Dict[str, Type[ValueSource]] = { + "static": StaticValueSource, + "animated": AnimatedValueSource, + "audio": AudioValueSource, + "adaptive_time": AdaptiveValueSource, + "adaptive_scene": AdaptiveValueSource, + "daylight": DaylightValueSource, +}