refactor: replace type-dispatch if/elif chains with registry patterns and handler maps
Some checks failed
Lint & Test / test (push) Failing after 30s

Backend: add registry dicts (_CONDITION_MAP, _VALUE_SOURCE_MAP, _PICTURE_SOURCE_MAP)
and per-subclass from_dict() methods to eliminate ~300 lines of if/elif in factory
functions. Convert automation engine dispatch (condition eval, match_mode, match_type,
deactivation_mode) to dict-based lookup.

Frontend: extract CSS_CARD_RENDERERS, CSS_SECTION_MAP, CSS_TYPE_SETUP,
CONDITION_PILL_RENDERERS, and PICTURE_SOURCE_CARD_RENDERERS handler maps to replace
scattered type-check chains in color-strips.ts, automations.ts, and streams.ts.
This commit is contained in:
2026-03-24 14:51:27 +03:00
parent b63944bb34
commit 73947eb6cb
9 changed files with 714 additions and 634 deletions

View File

@@ -42,40 +42,37 @@ router = APIRouter()
# ===== Helpers =====
def _condition_from_schema(s: ConditionSchema) -> Condition:
if s.condition_type == "always":
return AlwaysCondition()
if s.condition_type == "application":
return ApplicationCondition(
_SCHEMA_TO_CONDITION = {
"always": lambda: AlwaysCondition(),
"application": lambda: ApplicationCondition(
apps=s.apps or [],
match_type=s.match_type or "running",
)
if s.condition_type == "time_of_day":
return TimeOfDayCondition(
),
"time_of_day": lambda: TimeOfDayCondition(
start_time=s.start_time or "00:00",
end_time=s.end_time or "23:59",
)
if s.condition_type == "system_idle":
return SystemIdleCondition(
),
"system_idle": lambda: SystemIdleCondition(
idle_minutes=s.idle_minutes if s.idle_minutes is not None else 5,
when_idle=s.when_idle if s.when_idle is not None else True,
)
if s.condition_type == "display_state":
return DisplayStateCondition(
),
"display_state": lambda: DisplayStateCondition(
state=s.state or "on",
)
if s.condition_type == "mqtt":
return MQTTCondition(
),
"mqtt": lambda: MQTTCondition(
topic=s.topic or "",
payload=s.payload or "",
match_mode=s.match_mode or "exact",
)
if s.condition_type == "webhook":
return WebhookCondition(
),
"webhook": lambda: WebhookCondition(
token=s.token or secrets.token_hex(16),
)
if s.condition_type == "startup":
return StartupCondition()
raise ValueError(f"Unknown condition type: {s.condition_type}")
),
"startup": lambda: StartupCondition(),
}
factory = _SCHEMA_TO_CONDITION.get(s.condition_type)
if factory is None:
raise ValueError(f"Unknown condition type: {s.condition_type}")
return factory()
def _condition_to_schema(c: Condition) -> ConditionSchema:

View File

@@ -205,21 +205,20 @@ class AutomationEngine:
fullscreen_procs: Set[str],
idle_seconds: Optional[float], display_state: Optional[str],
) -> bool:
if isinstance(condition, (AlwaysCondition, StartupCondition)):
return True
if isinstance(condition, ApplicationCondition):
return self._evaluate_app_condition(condition, running_procs, topmost_proc, topmost_fullscreen, fullscreen_procs)
if isinstance(condition, TimeOfDayCondition):
return self._evaluate_time_of_day(condition)
if isinstance(condition, SystemIdleCondition):
return self._evaluate_idle(condition, idle_seconds)
if isinstance(condition, DisplayStateCondition):
return self._evaluate_display_state(condition, display_state)
if isinstance(condition, MQTTCondition):
return self._evaluate_mqtt(condition)
if isinstance(condition, WebhookCondition):
return self._webhook_states.get(condition.token, False)
return False
dispatch = {
AlwaysCondition: lambda c: True,
StartupCondition: lambda c: True,
ApplicationCondition: lambda c: self._evaluate_app_condition(c, running_procs, topmost_proc, topmost_fullscreen, fullscreen_procs),
TimeOfDayCondition: lambda c: self._evaluate_time_of_day(c),
SystemIdleCondition: lambda c: self._evaluate_idle(c, idle_seconds),
DisplayStateCondition: lambda c: self._evaluate_display_state(c, display_state),
MQTTCondition: lambda c: self._evaluate_mqtt(c),
WebhookCondition: lambda c: self._webhook_states.get(c.token, False),
}
handler = dispatch.get(type(condition))
if handler is None:
return False
return handler(condition)
@staticmethod
def _evaluate_time_of_day(condition: TimeOfDayCondition) -> bool:
@@ -253,16 +252,18 @@ class AutomationEngine:
value = self._mqtt_service.get_last_value(condition.topic)
if value is None:
return False
if condition.match_mode == "exact":
return value == condition.payload
if condition.match_mode == "contains":
return condition.payload in value
if condition.match_mode == "regex":
try:
return bool(re.search(condition.payload, value))
except re.error:
return False
return False
matchers = {
"exact": lambda: value == condition.payload,
"contains": lambda: condition.payload in value,
"regex": lambda: bool(re.search(condition.payload, value)),
}
matcher = matchers.get(condition.match_mode)
if matcher is None:
return False
try:
return matcher()
except re.error:
return False
def _evaluate_app_condition(
self,
@@ -277,19 +278,21 @@ class AutomationEngine:
apps_lower = [a.lower() for a in condition.apps]
if condition.match_type == "fullscreen":
return any(app in fullscreen_procs for app in apps_lower)
if condition.match_type == "topmost_fullscreen":
if topmost_proc is None or not topmost_fullscreen:
return False
return any(app == topmost_proc for app in apps_lower)
if condition.match_type == "topmost":
if topmost_proc is None:
return False
return any(app == topmost_proc for app in apps_lower)
match_handlers = {
"fullscreen": lambda: any(app in fullscreen_procs for app in apps_lower),
"topmost_fullscreen": lambda: (
topmost_proc is not None
and topmost_fullscreen
and any(app == topmost_proc for app in apps_lower)
),
"topmost": lambda: (
topmost_proc is not None
and any(app == topmost_proc for app in apps_lower)
),
}
handler = match_handlers.get(condition.match_type)
if handler is not None:
return handler()
# Default: "running"
return any(app in running_procs for app in apps_lower)
@@ -352,38 +355,10 @@ class AutomationEngine:
deactivation_mode = automation.deactivation_mode if automation else "none"
if deactivation_mode == "revert":
snapshot = self._pre_activation_snapshots.pop(automation_id, None)
if snapshot and self._target_store:
from wled_controller.core.scenes.scene_activator import apply_scene_state
status, errors = await apply_scene_state(
snapshot, self._target_store, self._manager,
)
if errors:
logger.warning(f"Automation {automation_id} revert errors: {errors}")
else:
logger.info(f"Automation {automation_id} deactivated (reverted to previous state)")
else:
logger.warning(f"Automation {automation_id}: no snapshot available for revert")
await self._deactivate_revert(automation_id)
elif deactivation_mode == "fallback_scene":
fallback_id = automation.deactivation_scene_preset_id if automation else None
if fallback_id and self._scene_preset_store and self._target_store:
try:
fallback = self._scene_preset_store.get_preset(fallback_id)
from wled_controller.core.scenes.scene_activator import apply_scene_state
status, errors = await apply_scene_state(
fallback, self._target_store, self._manager,
)
if errors:
logger.warning(f"Automation {automation_id} fallback errors: {errors}")
else:
logger.info(f"Automation {automation_id} deactivated (fallback scene '{fallback.name}' applied)")
except ValueError:
logger.warning(f"Automation {automation_id}: fallback scene {fallback_id} not found")
else:
logger.info(f"Automation {automation_id} deactivated (no fallback scene configured)")
await self._deactivate_fallback(automation_id, automation)
else:
# "none" mode — just clear active state
logger.info(f"Automation {automation_id} deactivated")
self._last_deactivated[automation_id] = datetime.now(timezone.utc)
@@ -391,6 +366,40 @@ class AutomationEngine:
# Clean up any leftover snapshot
self._pre_activation_snapshots.pop(automation_id, None)
async def _deactivate_revert(self, automation_id: str) -> None:
"""Revert to pre-activation snapshot."""
snapshot = self._pre_activation_snapshots.pop(automation_id, None)
if snapshot and self._target_store:
from wled_controller.core.scenes.scene_activator import apply_scene_state
status, errors = await apply_scene_state(
snapshot, self._target_store, self._manager,
)
if errors:
logger.warning(f"Automation {automation_id} revert errors: {errors}")
else:
logger.info(f"Automation {automation_id} deactivated (reverted to previous state)")
else:
logger.warning(f"Automation {automation_id}: no snapshot available for revert")
async def _deactivate_fallback(self, automation_id: str, automation) -> None:
"""Activate fallback scene on deactivation."""
fallback_id = automation.deactivation_scene_preset_id if automation else None
if fallback_id and self._scene_preset_store and self._target_store:
try:
fallback = self._scene_preset_store.get_preset(fallback_id)
from wled_controller.core.scenes.scene_activator import apply_scene_state
status, errors = await apply_scene_state(
fallback, self._target_store, self._manager,
)
if errors:
logger.warning(f"Automation {automation_id} fallback errors: {errors}")
else:
logger.info(f"Automation {automation_id} deactivated (fallback scene '{fallback.name}' applied)")
except ValueError:
logger.warning(f"Automation {automation_id}: fallback scene {fallback_id} not found")
else:
logger.info(f"Automation {automation_id} deactivated (no fallback scene configured)")
def _fire_event(self, automation_id: str, action: str) -> None:
try:
self._manager.fire_event({

View File

@@ -206,6 +206,29 @@ function renderAutomations(automations: any, sceneMap: any) {
}
}
type ConditionPillRenderer = (c: any) => string;
const CONDITION_PILL_RENDERERS: Record<string, ConditionPillRenderer> = {
always: (c) => `<span class="stream-card-prop">${ICON_OK} ${t('automations.condition.always')}</span>`,
startup: (c) => `<span class="stream-card-prop">${ICON_START} ${t('automations.condition.startup')}</span>`,
application: (c) => {
const apps = (c.apps || []).join(', ');
const matchLabel = t('automations.condition.application.match_type.' + (c.match_type || 'running'));
return `<span class="stream-card-prop stream-card-prop-full">${t('automations.condition.application')}: ${apps} (${matchLabel})</span>`;
},
time_of_day: (c) => `<span class="stream-card-prop">${ICON_CLOCK} ${t('automations.condition.time_of_day')}: ${c.start_time || '00:00'} ${c.end_time || '23:59'}</span>`,
system_idle: (c) => {
const mode = c.when_idle !== false ? t('automations.condition.system_idle.when_idle') : t('automations.condition.system_idle.when_active');
return `<span class="stream-card-prop">${ICON_TIMER} ${c.idle_minutes || 5}m (${mode})</span>`;
},
display_state: (c) => {
const stateLabel = t('automations.condition.display_state.' + (c.state || 'on'));
return `<span class="stream-card-prop">${ICON_MONITOR} ${t('automations.condition.display_state')}: ${stateLabel}</span>`;
},
mqtt: (c) => `<span class="stream-card-prop stream-card-prop-full">${ICON_RADIO} ${t('automations.condition.mqtt')}: ${escapeHtml(c.topic || '')} = ${escapeHtml(c.payload || '*')}</span>`,
webhook: (c) => `<span class="stream-card-prop">&#x1F517; ${t('automations.condition.webhook')}</span>`,
};
function createAutomationCard(automation: Automation, sceneMap = new Map()) {
const statusClass = !automation.enabled ? 'disabled' : automation.is_active ? 'active' : 'inactive';
const statusText = !automation.enabled ? t('automations.status.disabled') : automation.is_active ? t('automations.status.active') : t('automations.status.inactive');
@@ -215,35 +238,8 @@ function createAutomationCard(automation: Automation, sceneMap = new Map()) {
condPills = `<span class="stream-card-prop">${t('automations.conditions.empty')}</span>`;
} else {
const parts = automation.conditions.map(c => {
if (c.condition_type === 'always') {
return `<span class="stream-card-prop">${ICON_OK} ${t('automations.condition.always')}</span>`;
}
if (c.condition_type === 'startup') {
return `<span class="stream-card-prop">${ICON_START} ${t('automations.condition.startup')}</span>`;
}
if (c.condition_type === 'application') {
const apps = (c.apps || []).join(', ');
const matchLabel = t('automations.condition.application.match_type.' + (c.match_type || 'running'));
return `<span class="stream-card-prop stream-card-prop-full">${t('automations.condition.application')}: ${apps} (${matchLabel})</span>`;
}
if (c.condition_type === 'time_of_day') {
return `<span class="stream-card-prop">${ICON_CLOCK} ${t('automations.condition.time_of_day')}: ${c.start_time || '00:00'} ${c.end_time || '23:59'}</span>`;
}
if (c.condition_type === 'system_idle') {
const mode = c.when_idle !== false ? t('automations.condition.system_idle.when_idle') : t('automations.condition.system_idle.when_active');
return `<span class="stream-card-prop">${ICON_TIMER} ${c.idle_minutes || 5}m (${mode})</span>`;
}
if (c.condition_type === 'display_state') {
const stateLabel = t('automations.condition.display_state.' + (c.state || 'on'));
return `<span class="stream-card-prop">${ICON_MONITOR} ${t('automations.condition.display_state')}: ${stateLabel}</span>`;
}
if (c.condition_type === 'mqtt') {
return `<span class="stream-card-prop stream-card-prop-full">${ICON_RADIO} ${t('automations.condition.mqtt')}: ${escapeHtml(c.topic || '')} = ${escapeHtml(c.payload || '*')}</span>`;
}
if (c.condition_type === 'webhook') {
return `<span class="stream-card-prop">&#x1F517; ${t('automations.condition.webhook')}</span>`;
}
return `<span class="stream-card-prop">${c.condition_type}</span>`;
const renderer = CONDITION_PILL_RENDERERS[c.condition_type];
return renderer ? renderer(c) : `<span class="stream-card-prop">${c.condition_type}</span>`;
});
const logicLabel = automation.condition_logic === 'and' ? t('automations.logic.and') : t('automations.logic.or');
condPills = parts.join(`<span class="automation-logic-label">${logicLabel}</span>`);

View File

@@ -158,46 +158,62 @@ function _ensureCSSTypeIconSelect() {
/* ── Type-switch helper ───────────────────────────────────────── */
const CSS_SECTION_MAP: Record<string, string> = {
'picture': 'css-editor-picture-section',
'picture_advanced': 'css-editor-picture-section',
'static': 'css-editor-static-section',
'color_cycle': 'css-editor-color-cycle-section',
'gradient': 'css-editor-gradient-section',
'effect': 'css-editor-effect-section',
'composite': 'css-editor-composite-section',
'mapped': 'css-editor-mapped-section',
'audio': 'css-editor-audio-section',
'api_input': 'css-editor-api-input-section',
'notification': 'css-editor-notification-section',
'daylight': 'css-editor-daylight-section',
'candlelight': 'css-editor-candlelight-section',
'processed': 'css-editor-processed-section',
};
const CSS_ALL_SECTION_IDS = [...new Set(Object.values(CSS_SECTION_MAP))];
const CSS_TYPE_SETUP: Record<string, () => void> = {
processed: () => _populateProcessedSelectors(),
effect: () => { _ensureEffectTypeIconSelect(); _ensureEffectPaletteIconSelect(); onEffectTypeChange(); },
audio: () => { _ensureAudioVizIconSelect(); _ensureAudioPaletteIconSelect(); onAudioVizChange(); _loadAudioSources(); },
gradient: () => { _ensureGradientPresetIconSelect(); _ensureGradientEasingIconSelect(); requestAnimationFrame(() => gradientRenderAll()); },
notification: () => { ensureNotificationEffectIconSelect(); ensureNotificationFilterModeIconSelect(); },
candlelight: () => _ensureCandleTypeIconSelect(),
composite: () => compositeRenderList(),
mapped: () => _mappedRenderList(),
};
export function onCSSTypeChange() {
const type = (document.getElementById('css-editor-type') as HTMLInputElement).value;
// Sync icon-select trigger display
if (_cssTypeIconSelect) _cssTypeIconSelect.setValue(type);
const isPictureType = type === 'picture' || type === 'picture_advanced';
(document.getElementById('css-editor-picture-section') as HTMLElement).style.display = isPictureType ? '' : 'none';
// Hide all type-specific sections, then show the active one
CSS_ALL_SECTION_IDS.forEach(id => {
const el = document.getElementById(id);
if (el) el.style.display = 'none';
});
const activeSection = CSS_SECTION_MAP[type];
if (activeSection) {
const el = document.getElementById(activeSection);
if (el) el.style.display = '';
}
// Hide picture source dropdown for advanced (sources are per-line in calibration)
const psGroup = document.getElementById('css-editor-picture-source-group') as HTMLElement | null;
if (psGroup) psGroup.style.display = (type === 'picture') ? '' : 'none';
(document.getElementById('css-editor-static-section') as HTMLElement).style.display = type === 'static' ? '' : 'none';
(document.getElementById('css-editor-color-cycle-section') as HTMLElement).style.display = type === 'color_cycle' ? '' : 'none';
(document.getElementById('css-editor-gradient-section') as HTMLElement).style.display = type === 'gradient' ? '' : 'none';
(document.getElementById('css-editor-effect-section') as HTMLElement).style.display = type === 'effect' ? '' : 'none';
(document.getElementById('css-editor-composite-section') as HTMLElement).style.display = type === 'composite' ? '' : 'none';
(document.getElementById('css-editor-mapped-section') as HTMLElement).style.display = type === 'mapped' ? '' : 'none';
(document.getElementById('css-editor-audio-section') as HTMLElement).style.display = type === 'audio' ? '' : 'none';
(document.getElementById('css-editor-api-input-section') as HTMLElement).style.display = type === 'api_input' ? '' : 'none';
(document.getElementById('css-editor-notification-section') as HTMLElement).style.display = type === 'notification' ? '' : 'none';
(document.getElementById('css-editor-daylight-section') as HTMLElement).style.display = type === 'daylight' ? '' : 'none';
(document.getElementById('css-editor-candlelight-section') as HTMLElement).style.display = type === 'candlelight' ? '' : 'none';
(document.getElementById('css-editor-processed-section') as HTMLElement).style.display = type === 'processed' ? '' : 'none';
const isPictureType = type === 'picture' || type === 'picture_advanced';
if (isPictureType) _ensureInterpolationIconSelect();
if (type === 'processed') _populateProcessedSelectors();
if (type === 'effect') {
_ensureEffectTypeIconSelect();
_ensureEffectPaletteIconSelect();
onEffectTypeChange();
}
if (type === 'audio') {
_ensureAudioVizIconSelect();
_ensureAudioPaletteIconSelect();
onAudioVizChange();
}
if (type === 'gradient') { _ensureGradientPresetIconSelect(); _ensureGradientEasingIconSelect(); }
if (type === 'notification') {
ensureNotificationEffectIconSelect();
ensureNotificationFilterModeIconSelect();
}
if (type === 'candlelight') _ensureCandleTypeIconSelect();
// Run type-specific setup
const setupFn = CSS_TYPE_SETUP[type];
if (setupFn) setupFn();
// Animation section — shown for static/gradient only
const animSection = document.getElementById('css-editor-animation-section') as HTMLElement;
@@ -226,16 +242,6 @@ export function onCSSTypeChange() {
(document.getElementById('css-editor-clock-group') as HTMLElement).style.display = clockTypes.includes(type) ? '' : 'none';
if (clockTypes.includes(type)) _populateClockDropdown();
if (type === 'audio') {
_loadAudioSources();
} else if (type === 'composite') {
compositeRenderList();
} else if (type === 'mapped') {
_mappedRenderList();
} else if (type === 'gradient') {
requestAnimationFrame(() => gradientRenderAll());
}
_autoGenerateCSSName();
}
@@ -912,56 +918,44 @@ function _resetAudioState() {
/* ── Card ─────────────────────────────────────────────────────── */
export function createColorStripCard(source: ColorStripSource, pictureSourceMap: Record<string, any>, audioSourceMap: Record<string, any>) {
const isStatic = source.source_type === 'static';
const isGradient = source.source_type === 'gradient';
const isColorCycle = source.source_type === 'color_cycle';
const isEffect = source.source_type === 'effect';
const isComposite = source.source_type === 'composite';
const isMapped = source.source_type === 'mapped';
const isAudio = source.source_type === 'audio';
const isApiInput = source.source_type === 'api_input';
const isNotification = source.source_type === 'notification';
const isPictureAdvanced = source.source_type === 'picture_advanced';
type CardPropsRenderer = (source: ColorStripSource, opts: {
clockBadge: string;
animBadge: string;
audioSourceMap: Record<string, any>;
pictureSourceMap: Record<string, any>;
}) => string;
// Clock crosslink badge (replaces speed badge when clock is assigned)
const clockObj = source.clock_id ? _cachedSyncClocks.find(c => c.id === source.clock_id) : null;
const clockBadge = clockObj
? `<span class="stream-card-prop stream-card-link" title="${t('color_strip.clock')}" onclick="event.stopPropagation(); navigateToCard('streams','sync','sync-clocks','data-id','${source.clock_id}')">${ICON_CLOCK} ${escapeHtml(clockObj.name)}</span>`
: source.clock_id ? `<span class="stream-card-prop">${ICON_CLOCK} ${source.clock_id}</span>` : '';
const NON_PICTURE_TYPES = new Set([
'static', 'gradient', 'color_cycle', 'effect', 'composite', 'mapped',
'audio', 'api_input', 'notification', 'daylight', 'candlelight', 'processed',
]);
const anim = (isStatic || isGradient) && source.animation && source.animation.enabled ? source.animation : null;
const animBadge = anim
? `<span class="stream-card-prop" title="${t('color_strip.animation')}">${ICON_SPARKLES} ${t('color_strip.animation.type.' + anim.type) || anim.type}</span>`
: '';
let propsHtml;
if (isStatic) {
const CSS_CARD_RENDERERS: Record<string, CardPropsRenderer> = {
static: (source, { clockBadge, animBadge }) => {
const hexColor = rgbArrayToHex(source.color!);
propsHtml = `
return `
<span class="stream-card-prop" title="${t('color_strip.static_color')}">
<span style="display:inline-block;width:14px;height:14px;background:${hexColor};border:1px solid #888;border-radius:2px;vertical-align:middle;margin-right:4px"></span>${hexColor.toUpperCase()}
</span>
${animBadge}
${clockBadge}
`;
} else if (isColorCycle) {
},
color_cycle: (source, { clockBadge }) => {
const colors = source.colors || [];
const swatches = colors.slice(0, 8).map((c: any) =>
`<span style="display:inline-block;width:12px;height:12px;background:${rgbArrayToHex(c)};border:1px solid #888;border-radius:2px;margin-right:2px"></span>`
).join('');
propsHtml = `
return `
<span class="stream-card-prop">${swatches}</span>
${clockBadge}
`;
} else if (isGradient) {
},
gradient: (source, { clockBadge, animBadge }) => {
const stops = source.stops || [];
const sortedStops = [...stops].sort((a, b) => a.position - b.position);
let cssGradient = '';
if (sortedStops.length >= 2) {
// Build CSS stops that mirror the interpolation algorithm:
// for each stop emit its primary color, then immediately emit color_right
// at the same position to produce a hard edge (bidirectional stop).
const parts: string[] = [];
sortedStops.forEach(s => {
const pct = Math.round(s.position * 100);
@@ -970,39 +964,43 @@ export function createColorStripCard(source: ColorStripSource, pictureSourceMap:
});
cssGradient = `linear-gradient(to right, ${parts.join(', ')})`;
}
propsHtml = `
return `
${cssGradient ? `<span style="flex:1 1 100%;height:12px;background:${cssGradient};border-radius:3px;border:1px solid rgba(128,128,128,0.3)"></span>` : ''}
<span class="stream-card-prop">${ICON_PALETTE} ${stops.length} ${t('color_strip.gradient.stops_count')}</span>
${animBadge}
${clockBadge}
`;
} else if (isEffect) {
},
effect: (source, { clockBadge }) => {
const effectLabel = t('color_strip.effect.' + (source.effect_type || 'fire')) || source.effect_type || 'fire';
const paletteLabel = source.palette ? (t('color_strip.palette.' + source.palette) || source.palette) : '';
propsHtml = `
return `
<span class="stream-card-prop">${ICON_FPS} ${escapeHtml(effectLabel)}</span>
${paletteLabel ? `<span class="stream-card-prop" title="${t('color_strip.effect.palette')}">${ICON_PALETTE} ${escapeHtml(paletteLabel)}</span>` : ''}
${clockBadge}
`;
} else if (isComposite) {
},
composite: (source) => {
const layerCount = (source.layers || []).length;
const enabledCount = (source.layers || []).filter((l: any) => l.enabled !== false).length;
propsHtml = `
return `
<span class="stream-card-prop">${ICON_LINK} ${enabledCount}/${layerCount} ${t('color_strip.composite.layers_count')}</span>
${source.led_count ? `<span class="stream-card-prop" title="${t('color_strip.leds')}">${ICON_LED} ${source.led_count}</span>` : ''}
`;
} else if (isMapped) {
},
mapped: (source) => {
const zoneCount = (source.zones || []).length;
propsHtml = `
return `
<span class="stream-card-prop">${ICON_MAP_PIN} ${zoneCount} ${t('color_strip.mapped.zones_count')}</span>
${source.led_count ? `<span class="stream-card-prop" title="${t('color_strip.leds')}">${ICON_LED} ${source.led_count}</span>` : ''}
`;
} else if (isAudio) {
},
audio: (source, { audioSourceMap }) => {
const vizLabel = t('color_strip.audio.viz.' + (source.visualization_mode || 'spectrum')) || source.visualization_mode || 'spectrum';
const vizMode = source.visualization_mode || 'spectrum';
const showPalette = (vizMode === 'spectrum' || vizMode === 'beat_pulse') && source.palette;
const audioPaletteLabel = showPalette ? (t('color_strip.palette.' + source.palette) || source.palette) : '';
propsHtml = `
return `
<span class="stream-card-prop">${ICON_MUSIC} ${escapeHtml(vizLabel)}</span>
${audioPaletteLabel ? `<span class="stream-card-prop" title="${t('color_strip.audio.palette')}">${ICON_PALETTE} ${escapeHtml(audioPaletteLabel)}</span>` : ''}
${source.audio_source_id ? (() => {
@@ -1013,21 +1011,23 @@ export function createColorStripCard(source: ColorStripSource, pictureSourceMap:
})() : ''}
${source.mirror ? `<span class="stream-card-prop">🪞</span>` : ''}
`;
} else if (isApiInput) {
},
api_input: (source) => {
const fbColor = rgbArrayToHex(source.fallback_color || [0, 0, 0]);
const timeoutVal = (source.timeout ?? 5.0).toFixed(1);
propsHtml = `
return `
<span class="stream-card-prop" title="${t('color_strip.api_input.fallback_color')}">
<span style="display:inline-block;width:14px;height:14px;background:${fbColor};border:1px solid #888;border-radius:2px;vertical-align:middle;margin-right:4px"></span>${fbColor.toUpperCase()}
</span>
<span class="stream-card-prop" title="${t('color_strip.api_input.timeout')}">${ICON_TIMER} ${timeoutVal}s</span>
`;
} else if (isNotification) {
},
notification: (source) => {
const effectLabel = t('color_strip.notification.effect.' + (source.notification_effect || 'flash')) || source.notification_effect || 'flash';
const durationVal = source.duration_ms || 1500;
const defColor = source.default_color || '#FFFFFF';
const appCount = source.app_colors ? Object.keys(source.app_colors).length : 0;
propsHtml = `
return `
<span class="stream-card-prop" title="${t('color_strip.notification.effect')}">${ICON_BELL} ${escapeHtml(effectLabel)}</span>
<span class="stream-card-prop" title="${t('color_strip.notification.duration')}">${ICON_TIMER} ${durationVal}ms</span>
<span class="stream-card-prop" title="${t('color_strip.notification.default_color')}">
@@ -1035,73 +1035,96 @@ export function createColorStripCard(source: ColorStripSource, pictureSourceMap:
</span>
${appCount > 0 ? `<span class="stream-card-prop">${ICON_PALETTE} ${appCount} ${t('color_strip.notification.app_count')}</span>` : ''}
`;
} else if (source.source_type === 'daylight') {
},
daylight: (source, { clockBadge }) => {
const useRealTime = source.use_real_time;
const speedVal = (source.speed ?? 1.0).toFixed(1);
propsHtml = `
return `
<span class="stream-card-prop">${useRealTime ? '🕐 ' + t('color_strip.daylight.real_time') : '⏩ ' + speedVal + 'x'}</span>
${clockBadge}
`;
} else if (source.source_type === 'candlelight') {
},
candlelight: (source, { clockBadge }) => {
const hexColor = rgbArrayToHex(source.color || [255, 147, 41]);
const numCandles = source.num_candles ?? 3;
propsHtml = `
return `
<span class="stream-card-prop" title="${t('color_strip.candlelight.color')}">
<span style="display:inline-block;width:14px;height:14px;background:${hexColor};border:1px solid #888;border-radius:2px;vertical-align:middle;margin-right:4px"></span>${hexColor.toUpperCase()}
</span>
<span class="stream-card-prop">${numCandles} ${t('color_strip.candlelight.num_candles')}</span>
${clockBadge}
`;
} else if (source.source_type === 'processed') {
},
processed: (source) => {
const inputSrc = ((colorStripSourcesCache.data || []) as any[]).find(s => s.id === source.input_source_id);
const inputName = inputSrc?.name || source.input_source_id || '—';
const tplName = source.processing_template_id
? (_cachedCSPTemplates.find(t => t.id === source.processing_template_id)?.name || source.processing_template_id)
: '—';
propsHtml = `
return `
<span class="stream-card-prop" title="${t('color_strip.processed.input')}">${ICON_LINK_SOURCE} ${escapeHtml(inputName)}</span>
<span class="stream-card-prop" title="${t('color_strip.processed.template')}">${ICON_SPARKLES} ${escapeHtml(tplName)}</span>
`;
} else if (isPictureAdvanced) {
},
picture_advanced: (source, { pictureSourceMap }) => {
const cal = source.calibration ?? {} as Partial<import('../types.ts').Calibration>;
const lines = cal.lines || [];
const totalLeds = lines.reduce((s: any, l: any) => s + (l.led_count || 0), 0);
const ledCount = (source.led_count > 0) ? source.led_count : totalLeds;
// Collect unique picture source names
const psIds: any[] = [...new Set(lines.map((l: any) => l.picture_source_id).filter(Boolean))];
const psNames = psIds.map((id: any) => {
const ps = pictureSourceMap && pictureSourceMap[id];
return ps ? ps.name : id;
});
propsHtml = `
return `
<span class="stream-card-prop" title="${t('calibration.advanced.lines_title')}">${ICON_MAP_PIN} ${lines.length} ${t('calibration.advanced.lines_title').toLowerCase()}</span>
${ledCount ? `<span class="stream-card-prop" title="${t('color_strip.leds')}">${ICON_LED} ${ledCount}</span>` : ''}
${psNames.length ? `<span class="stream-card-prop stream-card-prop-full" title="${t('color_strip.picture_source')}">${ICON_LINK_SOURCE} ${escapeHtml(psNames.join(', '))}</span>` : ''}
`;
} else {
const ps = pictureSourceMap && pictureSourceMap[source.picture_source_id!];
const srcName = ps ? ps.name : source.picture_source_id || '—';
const cal = source.calibration ?? {} as Partial<import('../types.ts').Calibration>;
const calLeds = (cal.leds_top || 0) + (cal.leds_right || 0) + (cal.leds_bottom || 0) + (cal.leds_left || 0);
const ledCount = (source.led_count > 0) ? source.led_count : calLeds;
let psSubTab = 'raw', psSection = 'raw-streams';
if (ps) {
if (ps.stream_type === 'static_image') { psSubTab = 'static_image'; psSection = 'static-streams'; }
else if (ps.stream_type === 'processed') { psSubTab = 'processed'; psSection = 'proc-streams'; }
}
propsHtml = `
<span class="stream-card-prop stream-card-prop-full${ps ? ' stream-card-link' : ''}" title="${t('color_strip.picture_source')}"${ps ? ` onclick="event.stopPropagation(); navigateToCard('streams','${psSubTab}','${psSection}','data-stream-id','${source.picture_source_id}')"` : ''}>${ICON_LINK_SOURCE} ${escapeHtml(srcName)}</span>
${ledCount ? `<span class="stream-card-prop" title="${t('color_strip.leds')}">${ICON_LED} ${ledCount}</span>` : ''}
`;
},
};
/** Fallback renderer for picture-type sources (plain picture). */
function _renderPictureCardProps(source: ColorStripSource, pictureSourceMap: Record<string, any>): string {
const ps = pictureSourceMap && pictureSourceMap[source.picture_source_id!];
const srcName = ps ? ps.name : source.picture_source_id || '';
const cal = source.calibration ?? {} as Partial<import('../types.ts').Calibration>;
const calLeds = (cal.leds_top || 0) + (cal.leds_right || 0) + (cal.leds_bottom || 0) + (cal.leds_left || 0);
const ledCount = (source.led_count > 0) ? source.led_count : calLeds;
let psSubTab = 'raw', psSection = 'raw-streams';
if (ps) {
if (ps.stream_type === 'static_image') { psSubTab = 'static_image'; psSection = 'static-streams'; }
else if (ps.stream_type === 'processed') { psSubTab = 'processed'; psSection = 'proc-streams'; }
}
return `
<span class="stream-card-prop stream-card-prop-full${ps ? ' stream-card-link' : ''}" title="${t('color_strip.picture_source')}"${ps ? ` onclick="event.stopPropagation(); navigateToCard('streams','${psSubTab}','${psSection}','data-stream-id','${source.picture_source_id}')"` : ''}>${ICON_LINK_SOURCE} ${escapeHtml(srcName)}</span>
${ledCount ? `<span class="stream-card-prop" title="${t('color_strip.leds')}">${ICON_LED} ${ledCount}</span>` : ''}
`;
}
export function createColorStripCard(source: ColorStripSource, pictureSourceMap: Record<string, any>, audioSourceMap: Record<string, any>) {
// Clock crosslink badge (replaces speed badge when clock is assigned)
const clockObj = source.clock_id ? _cachedSyncClocks.find(c => c.id === source.clock_id) : null;
const clockBadge = clockObj
? `<span class="stream-card-prop stream-card-link" title="${t('color_strip.clock')}" onclick="event.stopPropagation(); navigateToCard('streams','sync','sync-clocks','data-id','${source.clock_id}')">${ICON_CLOCK} ${escapeHtml(clockObj.name)}</span>`
: source.clock_id ? `<span class="stream-card-prop">${ICON_CLOCK} ${source.clock_id}</span>` : '';
const isAnimatable = source.source_type === 'static' || source.source_type === 'gradient';
const anim = isAnimatable && source.animation && source.animation.enabled ? source.animation : null;
const animBadge = anim
? `<span class="stream-card-prop" title="${t('color_strip.animation')}">${ICON_SPARKLES} ${t('color_strip.animation.type.' + anim.type) || anim.type}</span>`
: '';
const renderer = CSS_CARD_RENDERERS[source.source_type];
const propsHtml = renderer
? renderer(source, { clockBadge, animBadge, audioSourceMap, pictureSourceMap })
: _renderPictureCardProps(source, pictureSourceMap);
const icon = getColorStripIcon(source.source_type);
const isDaylight = source.source_type === 'daylight';
const isCandlelight = source.source_type === 'candlelight';
const isProcessed = source.source_type === 'processed';
const isPictureKind = (!isStatic && !isGradient && !isColorCycle && !isEffect && !isComposite && !isMapped && !isAudio && !isApiInput && !isNotification && !isDaylight && !isCandlelight && !isProcessed);
const isNotification = source.source_type === 'notification';
const isPictureKind = !NON_PICTURE_TYPES.has(source.source_type);
const calibrationBtn = isPictureKind
? `<button class="btn btn-icon btn-secondary" onclick="${isPictureAdvanced ? `showAdvancedCalibration('${source.id}')` : `showCSSCalibration('${source.id}')`}" title="${t('calibration.title')}">${ICON_CALIBRATION}</button>`
? `<button class="btn btn-icon btn-secondary" onclick="${source.source_type === 'picture_advanced' ? `showAdvancedCalibration('${source.id}')` : `showCSSCalibration('${source.id}')`}" title="${t('calibration.title')}">${ICON_CALIBRATION}</button>`
: '';
const overlayBtn = isPictureKind
? `<button class="btn btn-icon btn-secondary" onclick="event.stopPropagation(); toggleCSSOverlay('${source.id}')" title="${t('overlay.toggle')}">${ICON_OVERLAY}</button>`

View File

@@ -276,6 +276,54 @@ const _streamSectionMap = {
sync: [csSyncClocks],
};
type StreamCardRenderer = (stream: any) => string;
const PICTURE_SOURCE_CARD_RENDERERS: Record<string, StreamCardRenderer> = {
raw: (stream) => {
let capTmplName = '';
if (stream.capture_template_id) {
const capTmpl = _cachedCaptureTemplates.find(t => t.id === stream.capture_template_id);
if (capTmpl) capTmplName = escapeHtml(capTmpl.name);
}
return `<div class="stream-card-props">
<span class="stream-card-prop" title="${t('streams.display')}">${ICON_MONITOR} ${stream.display_index ?? 0}</span>
<span class="stream-card-prop" title="${t('streams.target_fps')}">${ICON_FPS} ${stream.target_fps ?? 30}</span>
${capTmplName ? `<span class="stream-card-prop stream-card-link" title="${t('streams.capture_template')}" onclick="event.stopPropagation(); navigateToCard('streams','raw_templates','raw-templates','data-template-id','${stream.capture_template_id}')">${ICON_TEMPLATE} ${capTmplName}</span>` : ''}
</div>`;
},
processed: (stream) => {
const sourceStream = _cachedStreams.find(s => s.id === stream.source_stream_id);
const sourceName = sourceStream ? escapeHtml(sourceStream.name) : (stream.source_stream_id || '-');
const sourceSubTab = sourceStream ? (sourceStream.stream_type === 'static_image' ? 'static_image' : 'raw') : 'raw';
const sourceSection = sourceStream ? (sourceStream.stream_type === 'static_image' ? 'static-streams' : 'raw-streams') : 'raw-streams';
let ppTmplName = '';
if (stream.postprocessing_template_id) {
const ppTmpl = _cachedPPTemplates.find(p => p.id === stream.postprocessing_template_id);
if (ppTmpl) ppTmplName = escapeHtml(ppTmpl.name);
}
return `<div class="stream-card-props">
<span class="stream-card-prop stream-card-link" title="${t('streams.source')}" onclick="event.stopPropagation(); navigateToCard('streams','${sourceSubTab}','${sourceSection}','data-stream-id','${stream.source_stream_id}')">${ICON_LINK_SOURCE} ${sourceName}</span>
${ppTmplName ? `<span class="stream-card-prop stream-card-link" title="${t('streams.pp_template')}" onclick="event.stopPropagation(); navigateToCard('streams','proc_templates','proc-templates','data-pp-template-id','${stream.postprocessing_template_id}')">${ICON_TEMPLATE} ${ppTmplName}</span>` : ''}
</div>`;
},
static_image: (stream) => {
const src = stream.image_source || '';
return `<div class="stream-card-props">
<span class="stream-card-prop stream-card-prop-full" title="${escapeHtml(src)}">${ICON_WEB} ${escapeHtml(src)}</span>
</div>`;
},
video: (stream) => {
const url = stream.url || '';
const shortUrl = url.length > 40 ? url.slice(0, 37) + '...' : url;
return `<div class="stream-card-props">
<span class="stream-card-prop stream-card-prop-full" title="${escapeHtml(url)}">${ICON_WEB} ${escapeHtml(shortUrl)}</span>
<span class="stream-card-prop" title="${t('streams.target_fps')}">${ICON_FPS} ${stream.target_fps ?? 30}</span>
${stream.loop !== false ? `<span class="stream-card-prop">↻</span>` : ''}
${stream.playback_speed && stream.playback_speed !== 1.0 ? `<span class="stream-card-prop">${stream.playback_speed}×</span>` : ''}
</div>`;
},
};
function renderPictureSourcesList(streams: any) {
const container = document.getElementById('streams-list')!;
const activeTab = localStorage.getItem('activeStreamTab') || 'raw';
@@ -283,47 +331,8 @@ function renderPictureSourcesList(streams: any) {
const renderStreamCard = (stream: any) => {
const typeIcon = getPictureSourceIcon(stream.stream_type);
let detailsHtml = '';
if (stream.stream_type === 'raw') {
let capTmplName = '';
if (stream.capture_template_id) {
const capTmpl = _cachedCaptureTemplates.find(t => t.id === stream.capture_template_id);
if (capTmpl) capTmplName = escapeHtml(capTmpl.name);
}
detailsHtml = `<div class="stream-card-props">
<span class="stream-card-prop" title="${t('streams.display')}">${ICON_MONITOR} ${stream.display_index ?? 0}</span>
<span class="stream-card-prop" title="${t('streams.target_fps')}">${ICON_FPS} ${stream.target_fps ?? 30}</span>
${capTmplName ? `<span class="stream-card-prop stream-card-link" title="${t('streams.capture_template')}" onclick="event.stopPropagation(); navigateToCard('streams','raw_templates','raw-templates','data-template-id','${stream.capture_template_id}')">${ICON_TEMPLATE} ${capTmplName}</span>` : ''}
</div>`;
} else if (stream.stream_type === 'processed') {
const sourceStream = _cachedStreams.find(s => s.id === stream.source_stream_id);
const sourceName = sourceStream ? escapeHtml(sourceStream.name) : (stream.source_stream_id || '-');
const sourceSubTab = sourceStream ? (sourceStream.stream_type === 'static_image' ? 'static_image' : 'raw') : 'raw';
const sourceSection = sourceStream ? (sourceStream.stream_type === 'static_image' ? 'static-streams' : 'raw-streams') : 'raw-streams';
let ppTmplName = '';
if (stream.postprocessing_template_id) {
const ppTmpl = _cachedPPTemplates.find(p => p.id === stream.postprocessing_template_id);
if (ppTmpl) ppTmplName = escapeHtml(ppTmpl.name);
}
detailsHtml = `<div class="stream-card-props">
<span class="stream-card-prop stream-card-link" title="${t('streams.source')}" onclick="event.stopPropagation(); navigateToCard('streams','${sourceSubTab}','${sourceSection}','data-stream-id','${stream.source_stream_id}')">${ICON_LINK_SOURCE} ${sourceName}</span>
${ppTmplName ? `<span class="stream-card-prop stream-card-link" title="${t('streams.pp_template')}" onclick="event.stopPropagation(); navigateToCard('streams','proc_templates','proc-templates','data-pp-template-id','${stream.postprocessing_template_id}')">${ICON_TEMPLATE} ${ppTmplName}</span>` : ''}
</div>`;
} else if (stream.stream_type === 'static_image') {
const src = stream.image_source || '';
detailsHtml = `<div class="stream-card-props">
<span class="stream-card-prop stream-card-prop-full" title="${escapeHtml(src)}">${ICON_WEB} ${escapeHtml(src)}</span>
</div>`;
} else if (stream.stream_type === 'video') {
const url = stream.url || '';
const shortUrl = url.length > 40 ? url.slice(0, 37) + '...' : url;
detailsHtml = `<div class="stream-card-props">
<span class="stream-card-prop stream-card-prop-full" title="${escapeHtml(url)}">${ICON_WEB} ${escapeHtml(shortUrl)}</span>
<span class="stream-card-prop" title="${t('streams.target_fps')}">${ICON_FPS} ${stream.target_fps ?? 30}</span>
${stream.loop !== false ? `<span class="stream-card-prop">↻</span>` : ''}
${stream.playback_speed && stream.playback_speed !== 1.0 ? `<span class="stream-card-prop">${stream.playback_speed}×</span>` : ''}
</div>`;
}
const renderer = PICTURE_SOURCE_CARD_RENDERERS[stream.stream_type];
const detailsHtml = renderer ? renderer(stream) : '';
return wrapCard({
type: 'template-card',

View File

@@ -2,7 +2,7 @@
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List, Optional
from typing import Dict, List, Optional, Type
@dataclass
@@ -16,25 +16,12 @@ class Condition:
@classmethod
def from_dict(cls, data: dict) -> "Condition":
"""Factory: dispatch to the correct subclass."""
"""Factory: dispatch to the correct subclass via registry."""
ct = data.get("condition_type", "")
if ct == "always":
return AlwaysCondition.from_dict(data)
if ct == "application":
return ApplicationCondition.from_dict(data)
if ct == "time_of_day":
return TimeOfDayCondition.from_dict(data)
if ct == "system_idle":
return SystemIdleCondition.from_dict(data)
if ct == "display_state":
return DisplayStateCondition.from_dict(data)
if ct == "mqtt":
return MQTTCondition.from_dict(data)
if ct == "webhook":
return WebhookCondition.from_dict(data)
if ct == "startup":
return StartupCondition.from_dict(data)
raise ValueError(f"Unknown condition type: {ct}")
subcls = _CONDITION_MAP.get(ct)
if subcls is None:
raise ValueError(f"Unknown condition type: {ct}")
return subcls.from_dict(data)
@dataclass
@@ -190,6 +177,18 @@ class StartupCondition(Condition):
return cls()
_CONDITION_MAP: Dict[str, Type[Condition]] = {
"always": AlwaysCondition,
"application": ApplicationCondition,
"time_of_day": TimeOfDayCondition,
"system_idle": SystemIdleCondition,
"display_state": DisplayStateCondition,
"mqtt": MQTTCondition,
"webhook": WebhookCondition,
"startup": StartupCondition,
}
@dataclass
class Automation:
"""Automation that activates a scene preset based on conditions."""

View File

@@ -104,216 +104,53 @@ class ColorStripSource:
@staticmethod
def from_dict(data: dict) -> "ColorStripSource":
"""Factory: dispatch to the correct subclass based on source_type."""
source_type: str = data.get("source_type", "picture") or "picture"
sid: str = data["id"]
name: str = data["name"]
description: str | None = data.get("description")
source_type = data.get("source_type", "picture") or "picture"
subcls = _SOURCE_TYPE_MAP.get(source_type, PictureColorStripSource)
return subcls.from_dict(data)
clock_id: str | None = data.get("clock_id")
tags: list = data.get("tags", [])
raw_created = data.get("created_at")
created_at: datetime = (
datetime.fromisoformat(raw_created)
if isinstance(raw_created, str)
else raw_created if isinstance(raw_created, datetime)
else datetime.now(timezone.utc)
)
raw_updated = data.get("updated_at")
updated_at: datetime = (
datetime.fromisoformat(raw_updated)
if isinstance(raw_updated, str)
else raw_updated if isinstance(raw_updated, datetime)
else datetime.now(timezone.utc)
)
def _parse_css_common(data: dict) -> dict:
"""Extract fields common to all ColorStripSource types."""
raw_created = data.get("created_at")
created_at = (
datetime.fromisoformat(raw_created)
if isinstance(raw_created, str)
else raw_created if isinstance(raw_created, datetime)
else datetime.now(timezone.utc)
)
raw_updated = data.get("updated_at")
updated_at = (
datetime.fromisoformat(raw_updated)
if isinstance(raw_updated, str)
else raw_updated if isinstance(raw_updated, datetime)
else datetime.now(timezone.utc)
)
return dict(
id=data["id"],
name=data["name"],
description=data.get("description"),
clock_id=data.get("clock_id"),
tags=data.get("tags", []),
created_at=created_at,
updated_at=updated_at,
)
calibration_data = data.get("calibration")
calibration = (
calibration_from_dict(calibration_data)
if calibration_data
else CalibrationConfig(layout="clockwise", start_position="bottom_left")
)
if source_type == "static":
raw_color = data.get("color")
color = (
raw_color if isinstance(raw_color, list) and len(raw_color) == 3
else [255, 255, 255]
)
return StaticColorStripSource(
id=sid, name=name, source_type="static",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, tags=tags, color=color,
animation=data.get("animation"),
)
if source_type == "gradient":
raw_stops = data.get("stops")
stops = raw_stops if isinstance(raw_stops, list) else []
return GradientColorStripSource(
id=sid, name=name, source_type="gradient",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, tags=tags, stops=stops,
animation=data.get("animation"),
easing=data.get("easing") or "linear",
gradient_id=data.get("gradient_id"),
)
if source_type == "color_cycle":
raw_colors = data.get("colors")
colors = raw_colors if isinstance(raw_colors, list) else []
return ColorCycleColorStripSource(
id=sid, name=name, source_type="color_cycle",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, tags=tags, colors=colors,
)
if source_type == "composite":
return CompositeColorStripSource(
id=sid, name=name, source_type="composite",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, tags=tags, layers=data.get("layers") or [],
led_count=data.get("led_count") or 0,
)
if source_type == "mapped":
return MappedColorStripSource(
id=sid, name=name, source_type="mapped",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, tags=tags, zones=data.get("zones") or [],
led_count=data.get("led_count") or 0,
)
if source_type == "audio":
raw_color = data.get("color")
color = raw_color if isinstance(raw_color, list) and len(raw_color) == 3 else [0, 255, 0]
raw_peak = data.get("color_peak")
color_peak = raw_peak if isinstance(raw_peak, list) and len(raw_peak) == 3 else [255, 0, 0]
return AudioColorStripSource(
id=sid, name=name, source_type="audio",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, tags=tags, visualization_mode=data.get("visualization_mode") or "spectrum",
audio_source_id=data.get("audio_source_id") or "",
sensitivity=float(data.get("sensitivity") or 1.0),
smoothing=float(data.get("smoothing") or 0.3),
palette=data.get("palette") or "rainbow",
gradient_id=data.get("gradient_id"),
color=color,
color_peak=color_peak,
led_count=data.get("led_count") or 0,
mirror=bool(data.get("mirror", False)),
)
if source_type == "effect":
raw_color = data.get("color")
color = (
raw_color if isinstance(raw_color, list) and len(raw_color) == 3
else [255, 80, 0]
)
return EffectColorStripSource(
id=sid, name=name, source_type="effect",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, tags=tags, effect_type=data.get("effect_type") or "fire",
palette=data.get("palette") or "fire",
gradient_id=data.get("gradient_id"),
color=color,
intensity=float(data.get("intensity") or 1.0),
scale=float(data.get("scale") or 1.0),
mirror=bool(data.get("mirror", False)),
custom_palette=data.get("custom_palette"),
)
if source_type == "api_input":
raw_fallback = data.get("fallback_color")
fallback_color = (
raw_fallback if isinstance(raw_fallback, list) and len(raw_fallback) == 3
else [0, 0, 0]
)
return ApiInputColorStripSource(
id=sid, name=name, source_type="api_input",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, tags=tags,
fallback_color=fallback_color,
timeout=float(data.get("timeout") or 5.0),
)
elif source_type == "notification":
raw_app_colors = data.get("app_colors")
app_colors = raw_app_colors if isinstance(raw_app_colors, dict) else {}
raw_app_filter_list = data.get("app_filter_list")
app_filter_list = raw_app_filter_list if isinstance(raw_app_filter_list, list) else []
return NotificationColorStripSource(
id=sid, name=name, source_type="notification",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, tags=tags,
notification_effect=data.get("notification_effect") or "flash",
duration_ms=int(data.get("duration_ms") or 1500),
default_color=data.get("default_color") or "#FFFFFF",
app_colors=app_colors,
app_filter_mode=data.get("app_filter_mode") or "off",
app_filter_list=app_filter_list,
os_listener=bool(data.get("os_listener", False)),
)
if source_type == "daylight":
return DaylightColorStripSource(
id=sid, name=name, source_type="daylight",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, tags=tags,
speed=float(data.get("speed") or 1.0),
use_real_time=bool(data.get("use_real_time", False)),
latitude=float(data.get("latitude") or 50.0),
)
if source_type == "processed":
return ProcessedColorStripSource(
id=sid, name=name, source_type="processed",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, tags=tags,
input_source_id=data.get("input_source_id") or "",
processing_template_id=data.get("processing_template_id") or "",
)
if source_type == "candlelight":
raw_color = data.get("color")
color = (
raw_color if isinstance(raw_color, list) and len(raw_color) == 3
else [255, 147, 41]
)
return CandlelightColorStripSource(
id=sid, name=name, source_type="candlelight",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, tags=tags,
color=color,
intensity=float(data.get("intensity") or 1.0),
num_candles=int(data.get("num_candles") or 3),
speed=float(data.get("speed") or 1.0),
)
# Shared picture-type field extraction
_picture_kwargs = dict(
tags=tags,
fps=data.get("fps") or 30,
smoothing=data["smoothing"] if data.get("smoothing") is not None else 0.3,
interpolation_mode=data.get("interpolation_mode") or "average",
calibration=calibration,
led_count=data.get("led_count") or 0,
)
if source_type == "picture_advanced":
return AdvancedPictureColorStripSource(
id=sid, name=name, source_type="picture_advanced",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, **_picture_kwargs,
)
# Default: "picture" type (simple 4-edge calibration)
return PictureColorStripSource(
id=sid, name=name, source_type=source_type,
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, picture_source_id=data.get("picture_source_id") or "",
**_picture_kwargs,
)
def _parse_picture_fields(data: dict) -> dict:
"""Extract fields shared by picture-type color strip sources."""
calibration_data = data.get("calibration")
calibration = (
calibration_from_dict(calibration_data)
if calibration_data
else CalibrationConfig(layout="clockwise", start_position="bottom_left")
)
return dict(
fps=data.get("fps") or 30,
smoothing=data["smoothing"] if data.get("smoothing") is not None else 0.3,
interpolation_mode=data.get("interpolation_mode") or "average",
calibration=calibration,
led_count=data.get("led_count") or 0,
)
def _picture_base_to_dict(source, d: dict) -> dict:
@@ -367,6 +204,17 @@ class PictureColorStripSource(ColorStripSource):
d["picture_source_id"] = self.picture_source_id
return _picture_base_to_dict(self, d)
@classmethod
def from_dict(cls, data: dict) -> "PictureColorStripSource":
common = _parse_css_common(data)
pic = _parse_picture_fields(data)
return cls(
**common,
source_type=data.get("source_type", "picture") or "picture",
picture_source_id=data.get("picture_source_id") or "",
**pic,
)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
@@ -419,6 +267,12 @@ class AdvancedPictureColorStripSource(ColorStripSource):
d = super().to_dict()
return _picture_base_to_dict(self, d)
@classmethod
def from_dict(cls, data: dict) -> "AdvancedPictureColorStripSource":
common = _parse_css_common(data)
pic = _parse_picture_fields(data)
return cls(**common, source_type="picture_advanced", **pic)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
@@ -459,6 +313,15 @@ class StaticColorStripSource(ColorStripSource):
d["animation"] = self.animation
return d
@classmethod
def from_dict(cls, data: dict) -> "StaticColorStripSource":
common = _parse_css_common(data)
color = _validate_rgb(data.get("color"), [255, 255, 255])
return cls(
**common, source_type="static",
color=color, animation=data.get("animation"),
)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
@@ -509,6 +372,18 @@ class GradientColorStripSource(ColorStripSource):
d["gradient_id"] = self.gradient_id
return d
@classmethod
def from_dict(cls, data: dict) -> "GradientColorStripSource":
common = _parse_css_common(data)
raw_stops = data.get("stops")
return cls(
**common, source_type="gradient",
stops=raw_stops if isinstance(raw_stops, list) else [],
animation=data.get("animation"),
easing=data.get("easing") or "linear",
gradient_id=data.get("gradient_id"),
)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
@@ -559,6 +434,15 @@ class ColorCycleColorStripSource(ColorStripSource):
d["colors"] = [list(c) for c in self.colors]
return d
@classmethod
def from_dict(cls, data: dict) -> "ColorCycleColorStripSource":
common = _parse_css_common(data)
raw_colors = data.get("colors")
return cls(
**common, source_type="color_cycle",
colors=raw_colors if isinstance(raw_colors, list) else [],
)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
@@ -611,6 +495,22 @@ class EffectColorStripSource(ColorStripSource):
d["custom_palette"] = self.custom_palette
return d
@classmethod
def from_dict(cls, data: dict) -> "EffectColorStripSource":
common = _parse_css_common(data)
color = _validate_rgb(data.get("color"), [255, 80, 0])
return cls(
**common, source_type="effect",
effect_type=data.get("effect_type") or "fire",
palette=data.get("palette") or "fire",
gradient_id=data.get("gradient_id"),
color=color,
intensity=float(data.get("intensity") or 1.0),
scale=float(data.get("scale") or 1.0),
mirror=bool(data.get("mirror", False)),
custom_palette=data.get("custom_palette"),
)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
@@ -687,6 +587,24 @@ class AudioColorStripSource(ColorStripSource):
d["mirror"] = self.mirror
return d
@classmethod
def from_dict(cls, data: dict) -> "AudioColorStripSource":
common = _parse_css_common(data)
color = _validate_rgb(data.get("color"), [0, 255, 0])
color_peak = _validate_rgb(data.get("color_peak"), [255, 0, 0])
return cls(
**common, source_type="audio",
visualization_mode=data.get("visualization_mode") or "spectrum",
audio_source_id=data.get("audio_source_id") or "",
sensitivity=float(data.get("sensitivity") or 1.0),
smoothing=float(data.get("smoothing") or 0.3),
palette=data.get("palette") or "rainbow",
gradient_id=data.get("gradient_id"),
color=color, color_peak=color_peak,
led_count=data.get("led_count") or 0,
mirror=bool(data.get("mirror", False)),
)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
@@ -756,6 +674,15 @@ class CompositeColorStripSource(ColorStripSource):
d["led_count"] = self.led_count
return d
@classmethod
def from_dict(cls, data: dict) -> "CompositeColorStripSource":
common = _parse_css_common(data)
return cls(
**common, source_type="composite",
layers=data.get("layers") or [],
led_count=data.get("led_count") or 0,
)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
@@ -797,6 +724,15 @@ class MappedColorStripSource(ColorStripSource):
d["led_count"] = self.led_count
return d
@classmethod
def from_dict(cls, data: dict) -> "MappedColorStripSource":
common = _parse_css_common(data)
return cls(
**common, source_type="mapped",
zones=data.get("zones") or [],
led_count=data.get("led_count") or 0,
)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
@@ -837,6 +773,16 @@ class ApiInputColorStripSource(ColorStripSource):
d["timeout"] = self.timeout
return d
@classmethod
def from_dict(cls, data: dict) -> "ApiInputColorStripSource":
common = _parse_css_common(data)
fallback_color = _validate_rgb(data.get("fallback_color"), [0, 0, 0])
return cls(
**common, source_type="api_input",
fallback_color=fallback_color,
timeout=float(data.get("timeout") or 5.0),
)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
@@ -890,6 +836,22 @@ class NotificationColorStripSource(ColorStripSource):
d["os_listener"] = self.os_listener
return d
@classmethod
def from_dict(cls, data: dict) -> "NotificationColorStripSource":
common = _parse_css_common(data)
raw_app_colors = data.get("app_colors")
raw_app_filter_list = data.get("app_filter_list")
return cls(
**common, source_type="notification",
notification_effect=data.get("notification_effect") or "flash",
duration_ms=int(data.get("duration_ms") or 1500),
default_color=data.get("default_color") or "#FFFFFF",
app_colors=raw_app_colors if isinstance(raw_app_colors, dict) else {},
app_filter_mode=data.get("app_filter_mode") or "off",
app_filter_list=raw_app_filter_list if isinstance(raw_app_filter_list, list) else [],
os_listener=bool(data.get("os_listener", False)),
)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
@@ -957,6 +919,16 @@ class DaylightColorStripSource(ColorStripSource):
d["longitude"] = self.longitude
return d
@classmethod
def from_dict(cls, data: dict) -> "DaylightColorStripSource":
common = _parse_css_common(data)
return cls(
**common, source_type="daylight",
speed=float(data.get("speed") or 1.0),
use_real_time=bool(data.get("use_real_time", False)),
latitude=float(data.get("latitude") or 50.0),
)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
@@ -1010,6 +982,18 @@ class CandlelightColorStripSource(ColorStripSource):
d["candle_type"] = self.candle_type
return d
@classmethod
def from_dict(cls, data: dict) -> "CandlelightColorStripSource":
common = _parse_css_common(data)
color = _validate_rgb(data.get("color"), [255, 147, 41])
return cls(
**common, source_type="candlelight",
color=color,
intensity=float(data.get("intensity") or 1.0),
num_candles=int(data.get("num_candles") or 3),
speed=float(data.get("speed") or 1.0),
)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
@@ -1066,6 +1050,15 @@ class ProcessedColorStripSource(ColorStripSource):
d["processing_template_id"] = self.processing_template_id
return d
@classmethod
def from_dict(cls, data: dict) -> "ProcessedColorStripSource":
common = _parse_css_common(data)
return cls(
**common, source_type="processed",
input_source_id=data.get("input_source_id") or "",
processing_template_id=data.get("processing_template_id") or "",
)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,

View File

@@ -2,7 +2,7 @@
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List, Optional
from typing import Dict, List, Optional, Type
@dataclass
@@ -54,61 +54,35 @@ class PictureSource:
@staticmethod
def from_dict(data: dict) -> "PictureSource":
"""Factory: dispatch to the correct subclass based on stream_type."""
stream_type: str = data.get("stream_type", "raw") or "raw"
sid: str = data["id"]
name: str = data["name"]
description: str | None = data.get("description")
tags: list = data.get("tags", [])
stream_type = data.get("stream_type", "raw") or "raw"
subcls = _PICTURE_SOURCE_MAP.get(stream_type, ScreenCapturePictureSource)
return subcls.from_dict(data)
raw_created = data.get("created_at")
created_at: datetime = (
datetime.fromisoformat(raw_created)
if isinstance(raw_created, str)
else raw_created if isinstance(raw_created, datetime)
else datetime.now(timezone.utc)
)
raw_updated = data.get("updated_at")
updated_at: datetime = (
datetime.fromisoformat(raw_updated)
if isinstance(raw_updated, str)
else raw_updated if isinstance(raw_updated, datetime)
else datetime.now(timezone.utc)
)
if stream_type == "processed":
return ProcessedPictureSource(
id=sid, name=name, stream_type=stream_type,
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
source_stream_id=data.get("source_stream_id") or "",
postprocessing_template_id=data.get("postprocessing_template_id") or "",
)
elif stream_type == "static_image":
return StaticImagePictureSource(
id=sid, name=name, stream_type=stream_type,
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
image_source=data.get("image_source") or "",
)
elif stream_type == "video":
return VideoCaptureSource(
id=sid, name=name, stream_type=stream_type,
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
url=data.get("url") or "",
loop=data.get("loop", True),
playback_speed=data.get("playback_speed", 1.0),
start_time=data.get("start_time"),
end_time=data.get("end_time"),
resolution_limit=data.get("resolution_limit"),
clock_id=data.get("clock_id"),
target_fps=data.get("target_fps") or 30,
)
else:
return ScreenCapturePictureSource(
id=sid, name=name, stream_type=stream_type,
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
display_index=data.get("display_index") or 0,
capture_template_id=data.get("capture_template_id") or "",
target_fps=data.get("target_fps") or 30,
)
def _parse_common_fields(data: dict) -> dict:
"""Extract common fields shared by all picture source types."""
raw_created = data.get("created_at")
created_at = (
datetime.fromisoformat(raw_created)
if isinstance(raw_created, str)
else raw_created if isinstance(raw_created, datetime)
else datetime.now(timezone.utc)
)
raw_updated = data.get("updated_at")
updated_at = (
datetime.fromisoformat(raw_updated)
if isinstance(raw_updated, str)
else raw_updated if isinstance(raw_updated, datetime)
else datetime.now(timezone.utc)
)
return dict(
id=data["id"],
name=data["name"],
description=data.get("description"),
tags=data.get("tags", []),
created_at=created_at,
updated_at=updated_at,
)
@dataclass
@@ -126,6 +100,17 @@ class ScreenCapturePictureSource(PictureSource):
d["target_fps"] = self.target_fps
return d
@classmethod
def from_dict(cls, data: dict) -> "ScreenCapturePictureSource":
common = _parse_common_fields(data)
return cls(
**common,
stream_type=data.get("stream_type", "raw") or "raw",
display_index=data.get("display_index") or 0,
capture_template_id=data.get("capture_template_id") or "",
target_fps=data.get("target_fps") or 30,
)
@dataclass
class ProcessedPictureSource(PictureSource):
@@ -140,6 +125,16 @@ class ProcessedPictureSource(PictureSource):
d["postprocessing_template_id"] = self.postprocessing_template_id
return d
@classmethod
def from_dict(cls, data: dict) -> "ProcessedPictureSource":
common = _parse_common_fields(data)
return cls(
**common,
stream_type="processed",
source_stream_id=data.get("source_stream_id") or "",
postprocessing_template_id=data.get("postprocessing_template_id") or "",
)
@dataclass
class StaticImagePictureSource(PictureSource):
@@ -152,6 +147,15 @@ class StaticImagePictureSource(PictureSource):
d["image_source"] = self.image_source
return d
@classmethod
def from_dict(cls, data: dict) -> "StaticImagePictureSource":
common = _parse_common_fields(data)
return cls(
**common,
stream_type="static_image",
image_source=data.get("image_source") or "",
)
@dataclass
class VideoCaptureSource(PictureSource):
@@ -177,3 +181,29 @@ class VideoCaptureSource(PictureSource):
d["clock_id"] = self.clock_id
d["target_fps"] = self.target_fps
return d
@classmethod
def from_dict(cls, data: dict) -> "VideoCaptureSource":
common = _parse_common_fields(data)
return cls(
**common,
stream_type="video",
url=data.get("url") or "",
loop=data.get("loop", True),
playback_speed=data.get("playback_speed", 1.0),
start_time=data.get("start_time"),
end_time=data.get("end_time"),
resolution_limit=data.get("resolution_limit"),
clock_id=data.get("clock_id"),
target_fps=data.get("target_fps") or 30,
)
# -- Source type registry --
# Maps stream_type string to its subclass for factory dispatch.
_PICTURE_SOURCE_MAP: Dict[str, Type[PictureSource]] = {
"raw": ScreenCapturePictureSource,
"processed": ProcessedPictureSource,
"static_image": StaticImagePictureSource,
"video": VideoCaptureSource,
}

View File

@@ -13,7 +13,7 @@ parameters like brightness. Six types:
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List, Optional
from typing import Dict, List, Optional, Type
@dataclass
@@ -59,88 +59,35 @@ class ValueSource:
@staticmethod
def from_dict(data: dict) -> "ValueSource":
"""Factory: dispatch to the correct subclass based on source_type."""
source_type: str = data.get("source_type", "static") or "static"
sid: str = data["id"]
name: str = data["name"]
description: str | None = data.get("description")
tags: list = data.get("tags", [])
source_type = data.get("source_type", "static") or "static"
subcls = _VALUE_SOURCE_MAP.get(source_type, StaticValueSource)
return subcls.from_dict(data)
raw_created = data.get("created_at")
created_at: datetime = (
datetime.fromisoformat(raw_created)
if isinstance(raw_created, str)
else raw_created if isinstance(raw_created, datetime)
else datetime.now(timezone.utc)
)
raw_updated = data.get("updated_at")
updated_at: datetime = (
datetime.fromisoformat(raw_updated)
if isinstance(raw_updated, str)
else raw_updated if isinstance(raw_updated, datetime)
else datetime.now(timezone.utc)
)
if source_type == "animated":
return AnimatedValueSource(
id=sid, name=name, source_type="animated",
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
waveform=data.get("waveform") or "sine",
speed=float(data.get("speed") or 10.0),
min_value=float(data.get("min_value") or 0.0),
max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0,
)
if source_type == "audio":
return AudioValueSource(
id=sid, name=name, source_type="audio",
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
audio_source_id=data.get("audio_source_id") or "",
mode=data.get("mode") or "rms",
sensitivity=float(data.get("sensitivity") or 1.0),
smoothing=float(data.get("smoothing") or 0.3),
min_value=float(data.get("min_value") or 0.0),
max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0,
auto_gain=bool(data.get("auto_gain", False)),
)
if source_type == "adaptive_time":
return AdaptiveValueSource(
id=sid, name=name, source_type="adaptive_time",
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
schedule=data.get("schedule") or [],
min_value=float(data.get("min_value") or 0.0),
max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0,
)
if source_type == "adaptive_scene":
return AdaptiveValueSource(
id=sid, name=name, source_type="adaptive_scene",
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
picture_source_id=data.get("picture_source_id") or "",
scene_behavior=data.get("scene_behavior") or "complement",
sensitivity=float(data.get("sensitivity") or 1.0),
smoothing=float(data.get("smoothing") or 0.3),
min_value=float(data.get("min_value") or 0.0),
max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0,
)
if source_type == "daylight":
return DaylightValueSource(
id=sid, name=name, source_type="daylight",
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
speed=float(data.get("speed") or 1.0),
use_real_time=bool(data.get("use_real_time", False)),
latitude=float(data.get("latitude") or 50.0),
min_value=float(data.get("min_value") or 0.0),
max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0,
)
# Default: "static" type
return StaticValueSource(
id=sid, name=name, source_type="static",
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
value=float(data["value"]) if data.get("value") is not None else 1.0,
)
def _parse_common_fields(data: dict) -> dict:
"""Extract common fields shared by all value source types."""
raw_created = data.get("created_at")
created_at = (
datetime.fromisoformat(raw_created)
if isinstance(raw_created, str)
else raw_created if isinstance(raw_created, datetime)
else datetime.now(timezone.utc)
)
raw_updated = data.get("updated_at")
updated_at = (
datetime.fromisoformat(raw_updated)
if isinstance(raw_updated, str)
else raw_updated if isinstance(raw_updated, datetime)
else datetime.now(timezone.utc)
)
return dict(
id=data["id"],
name=data["name"],
description=data.get("description"),
tags=data.get("tags", []),
created_at=created_at,
updated_at=updated_at,
)
@dataclass
@@ -157,6 +104,15 @@ class StaticValueSource(ValueSource):
d["value"] = self.value
return d
@classmethod
def from_dict(cls, data: dict) -> "StaticValueSource":
common = _parse_common_fields(data)
return cls(
**common,
source_type="static",
value=float(data["value"]) if data.get("value") is not None else 1.0,
)
@dataclass
class AnimatedValueSource(ValueSource):
@@ -179,6 +135,18 @@ class AnimatedValueSource(ValueSource):
d["max_value"] = self.max_value
return d
@classmethod
def from_dict(cls, data: dict) -> "AnimatedValueSource":
common = _parse_common_fields(data)
return cls(
**common,
source_type="animated",
waveform=data.get("waveform") or "sine",
speed=float(data.get("speed") or 10.0),
min_value=float(data.get("min_value") or 0.0),
max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0,
)
@dataclass
class AudioValueSource(ValueSource):
@@ -207,6 +175,21 @@ class AudioValueSource(ValueSource):
d["auto_gain"] = self.auto_gain
return d
@classmethod
def from_dict(cls, data: dict) -> "AudioValueSource":
common = _parse_common_fields(data)
return cls(
**common,
source_type="audio",
audio_source_id=data.get("audio_source_id") or "",
mode=data.get("mode") or "rms",
sensitivity=float(data.get("sensitivity") or 1.0),
smoothing=float(data.get("smoothing") or 0.3),
min_value=float(data.get("min_value") or 0.0),
max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0,
auto_gain=bool(data.get("auto_gain", False)),
)
@dataclass
class AdaptiveValueSource(ValueSource):
@@ -236,6 +219,22 @@ class AdaptiveValueSource(ValueSource):
d["max_value"] = self.max_value
return d
@classmethod
def from_dict(cls, data: dict) -> "AdaptiveValueSource":
common = _parse_common_fields(data)
source_type = data.get("source_type", "adaptive_time")
return cls(
**common,
source_type=source_type,
schedule=data.get("schedule") or [],
picture_source_id=data.get("picture_source_id") or "",
scene_behavior=data.get("scene_behavior") or "complement",
sensitivity=float(data.get("sensitivity") or 1.0),
smoothing=float(data.get("smoothing") or 0.3),
min_value=float(data.get("min_value") or 0.0),
max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0,
)
@dataclass
class DaylightValueSource(ValueSource):
@@ -259,3 +258,28 @@ class DaylightValueSource(ValueSource):
d["min_value"] = self.min_value
d["max_value"] = self.max_value
return d
@classmethod
def from_dict(cls, data: dict) -> "DaylightValueSource":
common = _parse_common_fields(data)
return cls(
**common,
source_type="daylight",
speed=float(data.get("speed") or 1.0),
use_real_time=bool(data.get("use_real_time", False)),
latitude=float(data.get("latitude") or 50.0),
min_value=float(data.get("min_value") or 0.0),
max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0,
)
# -- Source type registry --
# Maps source_type string to its subclass for factory dispatch.
_VALUE_SOURCE_MAP: Dict[str, Type[ValueSource]] = {
"static": StaticValueSource,
"animated": AnimatedValueSource,
"audio": AudioValueSource,
"adaptive_time": AdaptiveValueSource,
"adaptive_scene": AdaptiveValueSource,
"daylight": DaylightValueSource,
}