Add CSPT entity, processed CSS source type, reverse filter, and UI improvements

- Add Color Strip Processing Template (CSPT) entity: reusable filter chains
  for 1D LED strip postprocessing (backend, storage, API, frontend CRUD)
- Add "processed" color strip source type that wraps another CSS source and
  applies a CSPT filter chain (dataclass, stream, schema, modal, cards)
- Add Reverse filter for strip LED order reversal
- Add CSPT and processed CSS nodes/edges to visual graph editor
- Add CSPT test preview WS endpoint with input source selection
- Add device settings CSPT template selector (add + edit modals with hints)
- Use icon grids for palette quantization preset selector in filter lists
- Use EntitySelect for template references and test modal source selectors
- Fix filters.css_filter_template.desc missing localization
- Fix icon grid cell height inequality (grid-auto-rows: 1fr)
- Rename "Processed" subtab to "Processing Templates"
- Localize all new strings (en/ru/zh)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-15 02:16:59 +03:00
parent 7e78323c9c
commit 294d704eb0
72 changed files with 2992 additions and 1416 deletions

View File

@@ -1,6 +1,7 @@
"""Base class for JSON entity stores — eliminates boilerplate across 12+ stores."""
import json
import threading
from pathlib import Path
from typing import Callable, Dict, Generic, List, TypeVar
@@ -32,6 +33,7 @@ class BaseJsonStore(Generic[T]):
self.file_path = Path(file_path)
self._items: Dict[str, T] = {}
self._deserializer = deserializer
self._lock = threading.Lock()
self._load()
# ── I/O ────────────────────────────────────────────────────────
@@ -69,6 +71,12 @@ class BaseJsonStore(Generic[T]):
)
def _save(self) -> None:
"""Persist all items to disk atomically.
Note: This is synchronous blocking I/O. When called from async route
handlers, it briefly blocks the event loop (typically < 5ms for small
stores). Acceptable for user-initiated CRUD; not suitable for hot loops.
"""
try:
data = {
"version": self._version,
@@ -105,7 +113,11 @@ class BaseJsonStore(Generic[T]):
# ── Helpers ────────────────────────────────────────────────────
def _check_name_unique(self, name: str, exclude_id: str = None) -> None:
"""Raise ValueError if *name* is empty or already taken."""
"""Raise ValueError if *name* is empty or already taken.
Callers should hold ``self._lock`` when calling this + mutating
``_items`` to prevent race conditions between concurrent requests.
"""
if not name or not name.strip():
raise ValueError("Name is required")
for item_id, item in self._items.items():

View File

@@ -0,0 +1,53 @@
"""Color strip processing template data model."""
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.core.filters.filter_instance import FilterInstance
@dataclass
class ColorStripProcessingTemplate:
"""Processing template for color strip sources — ordered list of filters
applied to 1D LED arrays (N, 3) uint8.
"""
id: str
name: str
filters: List[FilterInstance]
created_at: datetime
updated_at: datetime
description: Optional[str] = None
tags: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
"""Convert template to dictionary."""
return {
"id": self.id,
"name": self.name,
"filters": [f.to_dict() for f in self.filters],
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"description": self.description,
"tags": self.tags,
}
@classmethod
def from_dict(cls, data: dict) -> "ColorStripProcessingTemplate":
"""Create template from dictionary."""
filters = [FilterInstance.from_dict(f) for f in data.get("filters", [])]
return cls(
id=data["id"],
name=data["name"],
filters=filters,
created_at=datetime.fromisoformat(data["created_at"])
if isinstance(data.get("created_at"), str)
else data.get("created_at", datetime.now(timezone.utc)),
updated_at=datetime.fromisoformat(data["updated_at"])
if isinstance(data.get("updated_at"), str)
else data.get("updated_at", datetime.now(timezone.utc)),
description=data.get("description"),
tags=data.get("tags", []),
)

View File

@@ -0,0 +1,170 @@
"""Color strip processing template storage using JSON files."""
import uuid
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.core.filters.filter_instance import FilterInstance
from wled_controller.core.filters.registry import FilterRegistry
from wled_controller.storage.base_store import BaseJsonStore
from wled_controller.storage.color_strip_processing_template import ColorStripProcessingTemplate
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class ColorStripProcessingTemplateStore(BaseJsonStore[ColorStripProcessingTemplate]):
"""Storage for color strip processing templates.
All templates are persisted to the JSON file.
On startup, if no templates exist, a default one is auto-created.
"""
_json_key = "color_strip_processing_templates"
_entity_name = "Color strip processing template"
_version = "1.0.0"
def __init__(self, file_path: str):
super().__init__(file_path, ColorStripProcessingTemplate.from_dict)
self._ensure_initial_template()
# Backward-compatible aliases
get_all_templates = BaseJsonStore.get_all
get_template = BaseJsonStore.get
delete_template = BaseJsonStore.delete
def _ensure_initial_template(self) -> None:
"""Auto-create a default color strip processing template if none exist."""
if self._items:
return
now = datetime.now(timezone.utc)
template_id = f"cspt_{uuid.uuid4().hex[:8]}"
template = ColorStripProcessingTemplate(
id=template_id,
name="Default",
filters=[
FilterInstance("brightness", {"value": 1.0}),
FilterInstance("gamma", {"value": 2.2}),
],
created_at=now,
updated_at=now,
description="Default color strip processing template",
)
self._items[template_id] = template
self._save()
logger.info(f"Auto-created initial color strip processing template: {template.name} ({template_id})")
def _validate_strip_filters(self, filters: List[FilterInstance]) -> None:
"""Validate that all filters support strip processing."""
for fi in filters:
if not FilterRegistry.is_registered(fi.filter_id):
raise ValueError(f"Unknown filter type: '{fi.filter_id}'")
filter_cls = FilterRegistry.get(fi.filter_id)
if not getattr(filter_cls, "supports_strip", True):
raise ValueError(
f"Filter '{fi.filter_id}' does not support strip processing"
)
def create_template(
self,
name: str,
filters: Optional[List[FilterInstance]] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> ColorStripProcessingTemplate:
self._check_name_unique(name)
if filters is None:
filters = []
self._validate_strip_filters(filters)
template_id = f"cspt_{uuid.uuid4().hex[:8]}"
now = datetime.now(timezone.utc)
template = ColorStripProcessingTemplate(
id=template_id,
name=name,
filters=filters,
created_at=now,
updated_at=now,
description=description,
tags=tags or [],
)
self._items[template_id] = template
self._save()
logger.info(f"Created color strip processing template: {name} ({template_id})")
return template
def update_template(
self,
template_id: str,
name: Optional[str] = None,
filters: Optional[List[FilterInstance]] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> ColorStripProcessingTemplate:
template = self.get(template_id)
if name is not None:
self._check_name_unique(name, exclude_id=template_id)
template.name = name
if filters is not None:
self._validate_strip_filters(filters)
template.filters = filters
if description is not None:
template.description = description
if tags is not None:
template.tags = tags
template.updated_at = datetime.now(timezone.utc)
self._save()
logger.info(f"Updated color strip processing template: {template_id}")
return template
def resolve_filter_instances(self, filter_instances, _visited=None):
"""Recursively resolve filter instances, expanding css_filter_template references.
Returns a flat list of FilterInstance objects with no css_filter_template entries.
"""
if _visited is None:
_visited = set()
resolved = []
for fi in filter_instances:
if fi.filter_id == "css_filter_template":
template_id = fi.options.get("template_id", "")
if not template_id or template_id in _visited:
continue
try:
ref_template = self.get_template(template_id)
_visited.add(template_id)
resolved.extend(self.resolve_filter_instances(ref_template.filters, _visited))
_visited.discard(template_id)
except ValueError:
logger.warning(f"Referenced CSS filter template '{template_id}' not found, skipping")
else:
resolved.append(fi)
return resolved
def get_references(self, template_id: str, device_store=None, css_store=None) -> List[str]:
"""Return names of entities that reference this template."""
refs = []
if device_store:
for dev in device_store.get_all():
if getattr(dev, "default_css_processing_template_id", "") == template_id:
refs.append(f"Device: {dev.name}")
if css_store:
for src in css_store.get_all():
layers = getattr(src, "layers", None)
if layers:
for layer in layers:
if isinstance(layer, dict) and layer.get("processing_template_id") == template_id:
refs.append(f"Composite layer in: {src.name}")
break
return refs

View File

@@ -19,13 +19,19 @@ Current types:
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List, Optional
from typing import Dict, List, Optional, Type
from wled_controller.core.capture.calibration import (
CalibrationConfig,
calibration_from_dict,
calibration_to_dict,
)
from wled_controller.storage.utils import resolve_ref
def _validate_rgb(value, default: list) -> list:
"""Return value if it's a 3-element list, otherwise return default."""
return value if isinstance(value, list) and len(value) == 3 else list(default)
@dataclass
@@ -51,7 +57,7 @@ class ColorStripSource:
return False
def to_dict(self) -> dict:
"""Convert source to dictionary. Subclasses extend this."""
"""Convert source to dictionary. Subclasses extend this with their own fields."""
return {
"id": self.id,
"name": self.name,
@@ -61,48 +67,40 @@ class ColorStripSource:
"description": self.description,
"clock_id": self.clock_id,
"tags": self.tags,
# Subclass fields default to None for forward compat
"picture_source_id": None,
"fps": None,
"brightness": None,
"saturation": None,
"gamma": None,
"smoothing": None,
"interpolation_mode": None,
"calibration": None,
"led_count": None,
"color": None,
"stops": None,
"animation": None,
"colors": None,
"effect_type": None,
"palette": None,
"intensity": None,
"scale": None,
"mirror": None,
"layers": None,
"zones": None,
"visualization_mode": None,
"audio_source_id": None,
"sensitivity": None,
"color_peak": None,
"fallback_color": None,
"timeout": None,
"notification_effect": None,
"duration_ms": None,
"default_color": None,
"app_colors": None,
"app_filter_mode": None,
"app_filter_list": None,
"os_listener": None,
# daylight-type fields
"speed": None,
"use_real_time": None,
"latitude": None,
# candlelight-type fields
"num_candles": None,
}
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
description: Optional[str] = None,
clock_id: Optional[str] = None,
tags: Optional[List[str]] = None,
**kwargs) -> "ColorStripSource":
"""Create an instance from keyword arguments.
Base implementation — subclasses override to extract type-specific fields.
This should not be called directly; use create_instance() instead.
"""
raise NotImplementedError(f"create_from_kwargs not implemented on {cls.__name__}")
def apply_update(self, **kwargs) -> None:
"""Apply type-specific field updates. Subclasses override this.
Only fields present in kwargs (and not None) are updated.
Base class handles common fields (name, description, clock_id, tags)
in the store; this method handles subclass-specific fields only.
"""
pass
@staticmethod
def create_instance(source_type: str, **kwargs) -> "ColorStripSource":
"""Factory: create the correct subclass based on source_type."""
cls = _SOURCE_TYPE_MAP.get(source_type)
if cls is None:
# Default to PictureColorStripSource for unknown types
cls = PictureColorStripSource
return cls.create_from_kwargs(source_type=source_type, **kwargs)
@staticmethod
def from_dict(data: dict) -> "ColorStripSource":
"""Factory: dispatch to the correct subclass based on source_type."""
@@ -262,6 +260,15 @@ class ColorStripSource:
latitude=float(data.get("latitude") or 50.0),
)
if source_type == "processed":
return ProcessedColorStripSource(
id=sid, name=name, source_type="processed",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, tags=tags,
input_source_id=data.get("input_source_id") or "",
processing_template_id=data.get("processing_template_id") or "",
)
if source_type == "candlelight":
raw_color = data.get("color")
color = (
@@ -282,14 +289,10 @@ class ColorStripSource:
_picture_kwargs = dict(
tags=tags,
fps=data.get("fps") or 30,
brightness=data["brightness"] if data.get("brightness") is not None else 1.0,
saturation=data["saturation"] if data.get("saturation") is not None else 1.0,
gamma=data["gamma"] if data.get("gamma") is not None else 1.0,
smoothing=data["smoothing"] if data.get("smoothing") is not None else 0.3,
interpolation_mode=data.get("interpolation_mode") or "average",
calibration=calibration,
led_count=data.get("led_count") or 0,
frame_interpolation=bool(data.get("frame_interpolation", False)),
)
if source_type == "picture_advanced":
@@ -311,17 +314,27 @@ class ColorStripSource:
def _picture_base_to_dict(source, d: dict) -> dict:
"""Populate dict with fields common to both picture source types."""
d["fps"] = source.fps
d["brightness"] = source.brightness
d["saturation"] = source.saturation
d["gamma"] = source.gamma
d["smoothing"] = source.smoothing
d["interpolation_mode"] = source.interpolation_mode
d["calibration"] = calibration_to_dict(source.calibration)
d["led_count"] = source.led_count
d["frame_interpolation"] = source.frame_interpolation
return d
def _apply_picture_update(source, **kwargs) -> None:
"""Apply update fields common to both picture source types."""
if kwargs.get("fps") is not None:
source.fps = kwargs["fps"]
if kwargs.get("smoothing") is not None:
source.smoothing = kwargs["smoothing"]
if kwargs.get("interpolation_mode") is not None:
source.interpolation_mode = kwargs["interpolation_mode"]
if kwargs.get("calibration") is not None:
source.calibration = kwargs["calibration"]
if kwargs.get("led_count") is not None:
source.led_count = kwargs["led_count"]
@dataclass
class PictureColorStripSource(ColorStripSource):
"""Color strip source driven by a single PictureSource (simple 4-edge calibration).
@@ -337,29 +350,50 @@ class PictureColorStripSource(ColorStripSource):
picture_source_id: str = ""
fps: int = 30
brightness: float = 1.0 # color correction multiplier (0.02.0; 1.0 = unchanged)
saturation: float = 1.0 # 1.0 = unchanged, 0.0 = grayscale, 2.0 = double saturation
gamma: float = 1.0 # 1.0 = no correction; <1 = brighter, >1 = darker mids
smoothing: float = 0.3 # temporal smoothing (0.0 = none, 1.0 = full)
interpolation_mode: str = "average" # "average" | "median" | "dominant"
calibration: CalibrationConfig = field(
default_factory=lambda: CalibrationConfig(layout="clockwise", start_position="bottom_left")
)
led_count: int = 0 # explicit LED count; 0 = auto (derived from calibration)
frame_interpolation: bool = False # blend between consecutive captured frames
def to_dict(self) -> dict:
d = super().to_dict()
d["picture_source_id"] = self.picture_source_id
return _picture_base_to_dict(self, d)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
description=None, clock_id=None, tags=None,
picture_source_id="", fps=30,
smoothing=0.3,
interpolation_mode="average", calibration=None,
led_count=0, **_kwargs):
if calibration is None:
calibration = CalibrationConfig(layout="clockwise", start_position="bottom_left")
return cls(
id=id, name=name, source_type=source_type,
created_at=created_at, updated_at=updated_at,
description=description, clock_id=clock_id, tags=tags or [],
picture_source_id=picture_source_id, fps=fps,
smoothing=smoothing, interpolation_mode=interpolation_mode,
calibration=calibration, led_count=led_count,
)
def apply_update(self, **kwargs) -> None:
picture_source_id = kwargs.get("picture_source_id")
if picture_source_id is not None:
self.picture_source_id = resolve_ref(picture_source_id, self.picture_source_id)
_apply_picture_update(self, **kwargs)
@dataclass
class AdvancedPictureColorStripSource(ColorStripSource):
"""Color strip source with line-based calibration across multiple picture sources.
Each calibration line references its own picture source and edge, enabling
LED strips that span multiple monitors. No single picture_source_id the
LED strips that span multiple monitors. No single picture_source_id -- the
picture sources are defined per-line in the calibration config.
"""
@@ -369,27 +403,44 @@ class AdvancedPictureColorStripSource(ColorStripSource):
return True
fps: int = 30
brightness: float = 1.0
saturation: float = 1.0
gamma: float = 1.0
smoothing: float = 0.3
interpolation_mode: str = "average"
calibration: CalibrationConfig = field(
default_factory=lambda: CalibrationConfig(mode="advanced")
)
led_count: int = 0
frame_interpolation: bool = False
def to_dict(self) -> dict:
d = super().to_dict()
return _picture_base_to_dict(self, d)
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
description=None, clock_id=None, tags=None,
fps=30,
smoothing=0.3, interpolation_mode="average",
calibration=None, led_count=0, **_kwargs):
if calibration is None:
calibration = CalibrationConfig(mode="advanced")
return cls(
id=id, name=name, source_type="picture_advanced",
created_at=created_at, updated_at=updated_at,
description=description, clock_id=clock_id, tags=tags or [],
fps=fps,
smoothing=smoothing, interpolation_mode=interpolation_mode,
calibration=calibration, led_count=led_count,
)
def apply_update(self, **kwargs) -> None:
_apply_picture_update(self, **kwargs)
@dataclass
class StaticColorStripSource(ColorStripSource):
"""Color strip source that fills all LEDs with a single static color.
No capture or processing the entire LED strip is set to one constant
No capture or processing -- the entire LED strip is set to one constant
RGB color. Useful for solid-color accents or as a placeholder while
a PictureColorStripSource is being configured.
"""
@@ -403,12 +454,33 @@ class StaticColorStripSource(ColorStripSource):
d["animation"] = self.animation
return d
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
description=None, clock_id=None, tags=None,
color=None, animation=None, **_kwargs):
rgb = _validate_rgb(color, [255, 255, 255])
return cls(
id=id, name=name, source_type="static",
created_at=created_at, updated_at=updated_at,
description=description, clock_id=clock_id, tags=tags or [],
color=rgb, animation=animation,
)
def apply_update(self, **kwargs) -> None:
color = kwargs.get("color")
if color is not None:
if isinstance(color, list) and len(color) == 3:
self.color = color
if kwargs.get("animation") is not None:
self.animation = kwargs["animation"]
@dataclass
class GradientColorStripSource(ColorStripSource):
"""Color strip source that produces a linear gradient across all LEDs.
The gradient is defined by color stops at relative positions (0.01.0).
The gradient is defined by color stops at relative positions (0.0-1.0).
Each stop has a primary color; optionally a second "right" color to create
a hard discontinuity (bidirectional stop) at that position.
@@ -428,6 +500,29 @@ class GradientColorStripSource(ColorStripSource):
d["animation"] = self.animation
return d
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
description=None, clock_id=None, tags=None,
stops=None, animation=None, **_kwargs):
return cls(
id=id, name=name, source_type="gradient",
created_at=created_at, updated_at=updated_at,
description=description, clock_id=clock_id, tags=tags or [],
stops=stops if isinstance(stops, list) else [
{"position": 0.0, "color": [255, 0, 0]},
{"position": 1.0, "color": [0, 0, 255]},
],
animation=animation,
)
def apply_update(self, **kwargs) -> None:
stops = kwargs.get("stops")
if stops is not None and isinstance(stops, list):
self.stops = stops
if kwargs.get("animation") is not None:
self.animation = kwargs["animation"]
@dataclass
class ColorCycleColorStripSource(ColorStripSource):
@@ -448,6 +543,27 @@ class ColorCycleColorStripSource(ColorStripSource):
d["colors"] = [list(c) for c in self.colors]
return d
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
description=None, clock_id=None, tags=None,
colors=None, **_kwargs):
default_colors = [
[255, 0, 0], [255, 255, 0], [0, 255, 0],
[0, 255, 255], [0, 0, 255], [255, 0, 255],
]
return cls(
id=id, name=name, source_type="color_cycle",
created_at=created_at, updated_at=updated_at,
description=description, clock_id=clock_id, tags=tags or [],
colors=colors if isinstance(colors, list) and len(colors) >= 2 else default_colors,
)
def apply_update(self, **kwargs) -> None:
colors = kwargs.get("colors")
if colors is not None and isinstance(colors, list) and len(colors) >= 2:
self.colors = colors
@dataclass
class EffectColorStripSource(ColorStripSource):
@@ -461,8 +577,8 @@ class EffectColorStripSource(ColorStripSource):
effect_type: str = "fire" # fire | meteor | plasma | noise | aurora
palette: str = "fire" # named color palette
color: list = field(default_factory=lambda: [255, 80, 0]) # [R,G,B] for meteor head
intensity: float = 1.0 # effect-specific intensity (0.12.0)
scale: float = 1.0 # spatial scale / zoom (0.55.0)
intensity: float = 1.0 # effect-specific intensity (0.1-2.0)
scale: float = 1.0 # spatial scale / zoom (0.5-5.0)
mirror: bool = False # bounce mode (meteor)
def to_dict(self) -> dict:
@@ -475,6 +591,39 @@ class EffectColorStripSource(ColorStripSource):
d["mirror"] = self.mirror
return d
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
description=None, clock_id=None, tags=None,
effect_type="fire", palette="fire", color=None,
intensity=1.0, scale=1.0, mirror=False, **_kwargs):
rgb = _validate_rgb(color, [255, 80, 0])
return cls(
id=id, name=name, source_type="effect",
created_at=created_at, updated_at=updated_at,
description=description, clock_id=clock_id, tags=tags or [],
effect_type=effect_type or "fire", palette=palette or "fire",
color=rgb,
intensity=float(intensity) if intensity else 1.0,
scale=float(scale) if scale else 1.0,
mirror=bool(mirror),
)
def apply_update(self, **kwargs) -> None:
if kwargs.get("effect_type") is not None:
self.effect_type = kwargs["effect_type"]
if kwargs.get("palette") is not None:
self.palette = kwargs["palette"]
color = kwargs.get("color")
if color is not None and isinstance(color, list) and len(color) == 3:
self.color = color
if kwargs.get("intensity") is not None:
self.intensity = float(kwargs["intensity"])
if kwargs.get("scale") is not None:
self.scale = float(kwargs["scale"])
if kwargs.get("mirror") is not None:
self.mirror = bool(kwargs["mirror"])
@dataclass
class AudioColorStripSource(ColorStripSource):
@@ -487,8 +636,8 @@ class AudioColorStripSource(ColorStripSource):
visualization_mode: str = "spectrum" # spectrum | beat_pulse | vu_meter
audio_source_id: str = "" # references a MonoAudioSource
sensitivity: float = 1.0 # gain multiplier (0.15.0)
smoothing: float = 0.3 # temporal smoothing (0.01.0)
sensitivity: float = 1.0 # gain multiplier (0.1-5.0)
smoothing: float = 0.3 # temporal smoothing (0.0-1.0)
palette: str = "rainbow" # named color palette
color: list = field(default_factory=lambda: [0, 255, 0]) # base RGB for VU meter
color_peak: list = field(default_factory=lambda: [255, 0, 0]) # peak RGB for VU meter
@@ -508,6 +657,52 @@ class AudioColorStripSource(ColorStripSource):
d["mirror"] = self.mirror
return d
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
description=None, clock_id=None, tags=None,
visualization_mode="spectrum", audio_source_id="",
sensitivity=1.0, smoothing=0.3, palette="rainbow",
color=None, color_peak=None, led_count=0,
mirror=False, **_kwargs):
rgb = _validate_rgb(color, [0, 255, 0])
peak = _validate_rgb(color_peak, [255, 0, 0])
return cls(
id=id, name=name, source_type="audio",
created_at=created_at, updated_at=updated_at,
description=description, clock_id=clock_id, tags=tags or [],
visualization_mode=visualization_mode or "spectrum",
audio_source_id=audio_source_id or "",
sensitivity=float(sensitivity) if sensitivity else 1.0,
smoothing=float(smoothing) if smoothing else 0.3,
palette=palette or "rainbow",
color=rgb, color_peak=peak, led_count=led_count,
mirror=bool(mirror),
)
def apply_update(self, **kwargs) -> None:
if kwargs.get("visualization_mode") is not None:
self.visualization_mode = kwargs["visualization_mode"]
audio_source_id = kwargs.get("audio_source_id")
if audio_source_id is not None:
self.audio_source_id = resolve_ref(audio_source_id, self.audio_source_id)
if kwargs.get("sensitivity") is not None:
self.sensitivity = float(kwargs["sensitivity"])
if kwargs.get("smoothing") is not None:
self.smoothing = float(kwargs["smoothing"])
if kwargs.get("palette") is not None:
self.palette = kwargs["palette"]
color = kwargs.get("color")
if color is not None and isinstance(color, list) and len(color) == 3:
self.color = color
color_peak = kwargs.get("color_peak")
if color_peak is not None and isinstance(color_peak, list) and len(color_peak) == 3:
self.color_peak = color_peak
if kwargs.get("led_count") is not None:
self.led_count = kwargs["led_count"]
if kwargs.get("mirror") is not None:
self.mirror = bool(kwargs["mirror"])
@dataclass
class CompositeColorStripSource(ColorStripSource):
@@ -528,6 +723,26 @@ class CompositeColorStripSource(ColorStripSource):
d["led_count"] = self.led_count
return d
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
description=None, clock_id=None, tags=None,
layers=None, led_count=0, **_kwargs):
return cls(
id=id, name=name, source_type="composite",
created_at=created_at, updated_at=updated_at,
description=description, clock_id=clock_id, tags=tags or [],
layers=layers if isinstance(layers, list) else [],
led_count=led_count,
)
def apply_update(self, **kwargs) -> None:
layers = kwargs.get("layers")
if layers is not None and isinstance(layers, list):
self.layers = layers
if kwargs.get("led_count") is not None:
self.led_count = kwargs["led_count"]
@dataclass
class MappedColorStripSource(ColorStripSource):
@@ -549,6 +764,26 @@ class MappedColorStripSource(ColorStripSource):
d["led_count"] = self.led_count
return d
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
description=None, clock_id=None, tags=None,
zones=None, led_count=0, **_kwargs):
return cls(
id=id, name=name, source_type="mapped",
created_at=created_at, updated_at=updated_at,
description=description, clock_id=clock_id, tags=tags or [],
zones=zones if isinstance(zones, list) else [],
led_count=led_count,
)
def apply_update(self, **kwargs) -> None:
zones = kwargs.get("zones")
if zones is not None and isinstance(zones, list):
self.zones = zones
if kwargs.get("led_count") is not None:
self.led_count = kwargs["led_count"]
@dataclass
class ApiInputColorStripSource(ColorStripSource):
@@ -571,6 +806,30 @@ class ApiInputColorStripSource(ColorStripSource):
d["timeout"] = self.timeout
return d
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
description=None, clock_id=None, tags=None,
led_count=0, fallback_color=None, timeout=None,
**_kwargs):
fb = _validate_rgb(fallback_color, [0, 0, 0])
return cls(
id=id, name=name, source_type="api_input",
created_at=created_at, updated_at=updated_at,
description=description, clock_id=clock_id, tags=tags or [],
led_count=led_count, fallback_color=fb,
timeout=float(timeout) if timeout is not None else 5.0,
)
def apply_update(self, **kwargs) -> None:
fallback_color = kwargs.get("fallback_color")
if fallback_color is not None and isinstance(fallback_color, list) and len(fallback_color) == 3:
self.fallback_color = fallback_color
if kwargs.get("timeout") is not None:
self.timeout = float(kwargs["timeout"])
if kwargs.get("led_count") is not None:
self.led_count = kwargs["led_count"]
@dataclass
class NotificationColorStripSource(ColorStripSource):
@@ -586,7 +845,7 @@ class NotificationColorStripSource(ColorStripSource):
notification_effect: str = "flash" # flash | pulse | sweep
duration_ms: int = 1500 # effect duration in milliseconds
default_color: str = "#FFFFFF" # hex color for notifications without app match
app_colors: dict = field(default_factory=dict) # app name hex color
app_colors: dict = field(default_factory=dict) # app name -> hex color
app_filter_mode: str = "off" # off | whitelist | blacklist
app_filter_list: list = field(default_factory=list) # app names for filter
os_listener: bool = False # whether to listen for OS notifications
@@ -602,6 +861,45 @@ class NotificationColorStripSource(ColorStripSource):
d["os_listener"] = self.os_listener
return d
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
description=None, clock_id=None, tags=None,
notification_effect=None, duration_ms=None,
default_color=None, app_colors=None,
app_filter_mode=None, app_filter_list=None,
os_listener=None, **_kwargs):
return cls(
id=id, name=name, source_type="notification",
created_at=created_at, updated_at=updated_at,
description=description, clock_id=clock_id, tags=tags or [],
notification_effect=notification_effect or "flash",
duration_ms=int(duration_ms) if duration_ms is not None else 1500,
default_color=default_color or "#FFFFFF",
app_colors=app_colors if isinstance(app_colors, dict) else {},
app_filter_mode=app_filter_mode or "off",
app_filter_list=app_filter_list if isinstance(app_filter_list, list) else [],
os_listener=bool(os_listener) if os_listener is not None else False,
)
def apply_update(self, **kwargs) -> None:
if kwargs.get("notification_effect") is not None:
self.notification_effect = kwargs["notification_effect"]
if kwargs.get("duration_ms") is not None:
self.duration_ms = int(kwargs["duration_ms"])
if kwargs.get("default_color") is not None:
self.default_color = kwargs["default_color"]
app_colors = kwargs.get("app_colors")
if app_colors is not None and isinstance(app_colors, dict):
self.app_colors = app_colors
if kwargs.get("app_filter_mode") is not None:
self.app_filter_mode = kwargs["app_filter_mode"]
app_filter_list = kwargs.get("app_filter_list")
if app_filter_list is not None and isinstance(app_filter_list, list):
self.app_filter_list = app_filter_list
if kwargs.get("os_listener") is not None:
self.os_listener = bool(kwargs["os_listener"])
@dataclass
class DaylightColorStripSource(ColorStripSource):
@@ -614,7 +912,7 @@ class DaylightColorStripSource(ColorStripSource):
When use_real_time is True, the current wall-clock hour determines
the color; speed is ignored. When False, speed controls how fast
a full 24-hour cycle plays (1.0 4 minutes per full cycle).
a full 24-hour cycle plays (1.0 = 4 minutes per full cycle).
"""
speed: float = 1.0 # cycle speed (ignored when use_real_time)
@@ -628,6 +926,29 @@ class DaylightColorStripSource(ColorStripSource):
d["latitude"] = self.latitude
return d
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
description=None, clock_id=None, tags=None,
speed=None, use_real_time=None, latitude=None,
**_kwargs):
return cls(
id=id, name=name, source_type="daylight",
created_at=created_at, updated_at=updated_at,
description=description, clock_id=clock_id, tags=tags or [],
speed=float(speed) if speed is not None else 1.0,
use_real_time=bool(use_real_time) if use_real_time is not None else False,
latitude=float(latitude) if latitude is not None else 50.0,
)
def apply_update(self, **kwargs) -> None:
if kwargs.get("speed") is not None:
self.speed = float(kwargs["speed"])
if kwargs.get("use_real_time") is not None:
self.use_real_time = bool(kwargs["use_real_time"])
if kwargs.get("latitude") is not None:
self.latitude = float(kwargs["latitude"])
@dataclass
class CandlelightColorStripSource(ColorStripSource):
@@ -639,7 +960,7 @@ class CandlelightColorStripSource(ColorStripSource):
"""
color: list = field(default_factory=lambda: [255, 147, 41]) # warm candle base [R,G,B]
intensity: float = 1.0 # flicker intensity (0.12.0)
intensity: float = 1.0 # flicker intensity (0.1-2.0)
num_candles: int = 3 # number of independent candle sources
speed: float = 1.0 # flicker speed multiplier
@@ -650,3 +971,93 @@ class CandlelightColorStripSource(ColorStripSource):
d["num_candles"] = self.num_candles
d["speed"] = self.speed
return d
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
description=None, clock_id=None, tags=None,
color=None, intensity=1.0, num_candles=None,
speed=None, **_kwargs):
rgb = _validate_rgb(color, [255, 147, 41])
return cls(
id=id, name=name, source_type="candlelight",
created_at=created_at, updated_at=updated_at,
description=description, clock_id=clock_id, tags=tags or [],
color=rgb,
intensity=float(intensity) if intensity else 1.0,
num_candles=int(num_candles) if num_candles is not None else 3,
speed=float(speed) if speed is not None else 1.0,
)
def apply_update(self, **kwargs) -> None:
color = kwargs.get("color")
if color is not None and isinstance(color, list) and len(color) == 3:
self.color = color
if kwargs.get("intensity") is not None:
self.intensity = float(kwargs["intensity"])
if kwargs.get("num_candles") is not None:
self.num_candles = int(kwargs["num_candles"])
if kwargs.get("speed") is not None:
self.speed = float(kwargs["speed"])
@dataclass
class ProcessedColorStripSource(ColorStripSource):
"""Color strip source that takes another CSS and applies a processing template.
Wraps an existing color strip source and pipes its output through a
ColorStripProcessingTemplate filter chain. Useful for applying brightness,
gamma, palette quantization, etc. to any source without modifying the
original.
"""
input_source_id: str = "" # ID of the input color strip source
processing_template_id: str = "" # ID of the CSPT to apply
def to_dict(self) -> dict:
d = super().to_dict()
d["input_source_id"] = self.input_source_id
d["processing_template_id"] = self.processing_template_id
return d
@classmethod
def create_from_kwargs(cls, *, id: str, name: str, source_type: str,
created_at: datetime, updated_at: datetime,
description=None, clock_id=None, tags=None,
input_source_id="", processing_template_id="",
**_kwargs):
return cls(
id=id, name=name, source_type="processed",
created_at=created_at, updated_at=updated_at,
description=description, clock_id=clock_id, tags=tags or [],
input_source_id=input_source_id,
processing_template_id=processing_template_id,
)
def apply_update(self, **kwargs) -> None:
input_source_id = kwargs.get("input_source_id")
if input_source_id is not None:
self.input_source_id = resolve_ref(input_source_id, self.input_source_id)
processing_template_id = kwargs.get("processing_template_id")
if processing_template_id is not None:
self.processing_template_id = resolve_ref(processing_template_id, self.processing_template_id)
# -- Source type registry --
# Maps source_type string to its subclass for factory dispatch.
_SOURCE_TYPE_MAP: Dict[str, Type[ColorStripSource]] = {
"picture": PictureColorStripSource,
"picture_advanced": AdvancedPictureColorStripSource,
"static": StaticColorStripSource,
"gradient": GradientColorStripSource,
"color_cycle": ColorCycleColorStripSource,
"effect": EffectColorStripSource,
"audio": AudioColorStripSource,
"composite": CompositeColorStripSource,
"mapped": MappedColorStripSource,
"api_input": ApiInputColorStripSource,
"notification": NotificationColorStripSource,
"daylight": DaylightColorStripSource,
"candlelight": CandlelightColorStripSource,
"processed": ProcessedColorStripSource,
}

View File

@@ -4,7 +4,6 @@ import uuid
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.core.capture.calibration import CalibrationConfig, calibration_to_dict
from wled_controller.storage.base_store import BaseJsonStore
from wled_controller.storage.utils import resolve_ref
from wled_controller.storage.color_strip_source import (
@@ -21,6 +20,7 @@ from wled_controller.storage.color_strip_source import (
MappedColorStripSource,
NotificationColorStripSource,
PictureColorStripSource,
ProcessedColorStripSource,
StaticColorStripSource,
)
from wled_controller.utils import get_logger
@@ -45,56 +45,13 @@ class ColorStripStore(BaseJsonStore[ColorStripSource]):
"""Get a color strip source by ID (alias for get())."""
return self.get(source_id)
def create_source(
self,
name: str,
source_type: str = "picture",
picture_source_id: str = "",
fps: int = 30,
brightness: float = 1.0,
saturation: float = 1.0,
gamma: float = 1.0,
smoothing: float = 0.3,
interpolation_mode: str = "average",
calibration=None,
led_count: int = 0,
color: Optional[list] = None,
stops: Optional[list] = None,
description: Optional[str] = None,
frame_interpolation: bool = False,
animation: Optional[dict] = None,
colors: Optional[list] = None,
effect_type: str = "fire",
palette: str = "fire",
intensity: float = 1.0,
scale: float = 1.0,
mirror: bool = False,
layers: Optional[list] = None,
zones: Optional[list] = None,
visualization_mode: str = "spectrum",
audio_source_id: str = "",
sensitivity: float = 1.0,
color_peak: Optional[list] = None,
fallback_color: Optional[list] = None,
timeout: Optional[float] = None,
clock_id: Optional[str] = None,
notification_effect: Optional[str] = None,
duration_ms: Optional[int] = None,
default_color: Optional[str] = None,
app_colors: Optional[dict] = None,
app_filter_mode: Optional[str] = None,
app_filter_list: Optional[list] = None,
os_listener: Optional[bool] = None,
# daylight-type fields
speed: Optional[float] = None,
use_real_time: Optional[bool] = None,
latitude: Optional[float] = None,
# candlelight-type fields
num_candles: Optional[int] = None,
tags: Optional[List[str]] = None,
) -> ColorStripSource:
def create_source(self, name: str, source_type: str = "picture",
**kwargs) -> ColorStripSource:
"""Create a new color strip source.
All type-specific parameters are passed as keyword arguments and
forwarded to the subclass ``create_from_kwargs`` factory method.
Raises:
ValueError: If validation fails
"""
@@ -108,271 +65,30 @@ class ColorStripStore(BaseJsonStore[ColorStripSource]):
source_id = f"css_{uuid.uuid4().hex[:8]}"
now = datetime.now(timezone.utc)
if source_type == "static":
rgb = color if isinstance(color, list) and len(color) == 3 else [255, 255, 255]
source = StaticColorStripSource(
id=source_id,
name=name,
source_type="static",
created_at=now,
updated_at=now,
description=description,
clock_id=clock_id,
color=rgb,
animation=animation,
)
elif source_type == "gradient":
source = GradientColorStripSource(
id=source_id,
name=name,
source_type="gradient",
created_at=now,
updated_at=now,
description=description,
clock_id=clock_id,
stops=stops if isinstance(stops, list) else [
{"position": 0.0, "color": [255, 0, 0]},
{"position": 1.0, "color": [0, 0, 255]},
],
animation=animation,
)
elif source_type == "color_cycle":
default_colors = [
[255, 0, 0], [255, 255, 0], [0, 255, 0],
[0, 255, 255], [0, 0, 255], [255, 0, 255],
]
source = ColorCycleColorStripSource(
id=source_id,
name=name,
source_type="color_cycle",
created_at=now,
updated_at=now,
description=description,
clock_id=clock_id,
colors=colors if isinstance(colors, list) and len(colors) >= 2 else default_colors,
)
elif source_type == "effect":
rgb = color if isinstance(color, list) and len(color) == 3 else [255, 80, 0]
source = EffectColorStripSource(
id=source_id,
name=name,
source_type="effect",
created_at=now,
updated_at=now,
description=description,
clock_id=clock_id,
effect_type=effect_type or "fire",
palette=palette or "fire",
color=rgb,
intensity=float(intensity) if intensity else 1.0,
scale=float(scale) if scale else 1.0,
mirror=bool(mirror),
)
elif source_type == "audio":
rgb = color if isinstance(color, list) and len(color) == 3 else [0, 255, 0]
peak = color_peak if isinstance(color_peak, list) and len(color_peak) == 3 else [255, 0, 0]
source = AudioColorStripSource(
id=source_id,
name=name,
source_type="audio",
created_at=now,
updated_at=now,
description=description,
clock_id=clock_id,
visualization_mode=visualization_mode or "spectrum",
audio_source_id=audio_source_id or "",
sensitivity=float(sensitivity) if sensitivity else 1.0,
smoothing=float(smoothing) if smoothing else 0.3,
palette=palette or "rainbow",
color=rgb,
color_peak=peak,
led_count=led_count,
mirror=bool(mirror),
)
elif source_type == "composite":
source = CompositeColorStripSource(
id=source_id,
name=name,
source_type="composite",
created_at=now,
updated_at=now,
description=description,
clock_id=clock_id,
layers=layers if isinstance(layers, list) else [],
led_count=led_count,
)
elif source_type == "mapped":
source = MappedColorStripSource(
id=source_id,
name=name,
source_type="mapped",
created_at=now,
updated_at=now,
description=description,
clock_id=clock_id,
zones=zones if isinstance(zones, list) else [],
led_count=led_count,
)
elif source_type == "api_input":
fb = fallback_color if isinstance(fallback_color, list) and len(fallback_color) == 3 else [0, 0, 0]
source = ApiInputColorStripSource(
id=source_id,
name=name,
source_type="api_input",
created_at=now,
updated_at=now,
description=description,
clock_id=clock_id,
led_count=led_count,
fallback_color=fb,
timeout=float(timeout) if timeout is not None else 5.0,
)
elif source_type == "notification":
source = NotificationColorStripSource(
id=source_id,
name=name,
source_type="notification",
created_at=now,
updated_at=now,
description=description,
clock_id=clock_id,
notification_effect=notification_effect or "flash",
duration_ms=int(duration_ms) if duration_ms is not None else 1500,
default_color=default_color or "#FFFFFF",
app_colors=app_colors if isinstance(app_colors, dict) else {},
app_filter_mode=app_filter_mode or "off",
app_filter_list=app_filter_list if isinstance(app_filter_list, list) else [],
os_listener=bool(os_listener) if os_listener is not None else False,
)
elif source_type == "daylight":
source = DaylightColorStripSource(
id=source_id,
name=name,
source_type="daylight",
created_at=now,
updated_at=now,
description=description,
clock_id=clock_id,
speed=float(speed) if speed is not None else 1.0,
use_real_time=bool(use_real_time) if use_real_time is not None else False,
latitude=float(latitude) if latitude is not None else 50.0,
)
elif source_type == "candlelight":
rgb = color if isinstance(color, list) and len(color) == 3 else [255, 147, 41]
source = CandlelightColorStripSource(
id=source_id,
name=name,
source_type="candlelight",
created_at=now,
updated_at=now,
description=description,
clock_id=clock_id,
color=rgb,
intensity=float(intensity) if intensity else 1.0,
num_candles=int(num_candles) if num_candles is not None else 3,
speed=float(speed) if speed is not None else 1.0,
)
elif source_type == "picture_advanced":
if calibration is None:
calibration = CalibrationConfig(mode="advanced")
source = AdvancedPictureColorStripSource(
id=source_id,
name=name,
source_type="picture_advanced",
created_at=now,
updated_at=now,
description=description,
clock_id=clock_id,
fps=fps,
brightness=brightness,
saturation=saturation,
gamma=gamma,
smoothing=smoothing,
interpolation_mode=interpolation_mode,
calibration=calibration,
led_count=led_count,
frame_interpolation=frame_interpolation,
)
else:
if calibration is None:
calibration = CalibrationConfig(layout="clockwise", start_position="bottom_left")
source = PictureColorStripSource(
id=source_id,
name=name,
source_type=source_type,
created_at=now,
updated_at=now,
description=description,
clock_id=clock_id,
picture_source_id=picture_source_id,
fps=fps,
brightness=brightness,
saturation=saturation,
gamma=gamma,
smoothing=smoothing,
interpolation_mode=interpolation_mode,
calibration=calibration,
led_count=led_count,
frame_interpolation=frame_interpolation,
)
tags = kwargs.pop("tags", None) or []
source = ColorStripSource.create_instance(
source_type,
id=source_id,
name=name,
created_at=now,
updated_at=now,
tags=tags,
**kwargs,
)
source.tags = tags or []
self._items[source_id] = source
self._save()
logger.info(f"Created color strip source: {name} ({source_id}, type={source_type})")
return source
def update_source(
self,
source_id: str,
name: Optional[str] = None,
picture_source_id: Optional[str] = None,
fps: Optional[int] = None,
brightness: Optional[float] = None,
saturation: Optional[float] = None,
gamma: Optional[float] = None,
smoothing: Optional[float] = None,
interpolation_mode: Optional[str] = None,
calibration=None,
led_count: Optional[int] = None,
color: Optional[list] = None,
stops: Optional[list] = None,
description: Optional[str] = None,
frame_interpolation: Optional[bool] = None,
animation: Optional[dict] = None,
colors: Optional[list] = None,
effect_type: Optional[str] = None,
palette: Optional[str] = None,
intensity: Optional[float] = None,
scale: Optional[float] = None,
mirror: Optional[bool] = None,
layers: Optional[list] = None,
zones: Optional[list] = None,
visualization_mode: Optional[str] = None,
audio_source_id: Optional[str] = None,
sensitivity: Optional[float] = None,
color_peak: Optional[list] = None,
fallback_color: Optional[list] = None,
timeout: Optional[float] = None,
clock_id: Optional[str] = None,
notification_effect: Optional[str] = None,
duration_ms: Optional[int] = None,
default_color: Optional[str] = None,
app_colors: Optional[dict] = None,
app_filter_mode: Optional[str] = None,
app_filter_list: Optional[list] = None,
os_listener: Optional[bool] = None,
# daylight-type fields
speed: Optional[float] = None,
use_real_time: Optional[bool] = None,
latitude: Optional[float] = None,
# candlelight-type fields
num_candles: Optional[int] = None,
tags: Optional[List[str]] = None,
) -> ColorStripSource:
def update_source(self, source_id: str, **kwargs) -> ColorStripSource:
"""Update an existing color strip source.
All type-specific parameters are passed as keyword arguments and
forwarded to the subclass ``apply_update`` method.
Raises:
ValueError: If source not found
"""
@@ -381,136 +97,28 @@ class ColorStripStore(BaseJsonStore[ColorStripSource]):
source = self._items[source_id]
# -- Common fields handled here (before dispatching to subclass) --
name = kwargs.pop("name", None)
if name is not None:
for other in self._items.values():
if other.id != source_id and other.name == name:
raise ValueError(f"Color strip source with name '{name}' already exists")
source.name = name
description = kwargs.pop("description", None)
if description is not None:
source.description = description
clock_id = kwargs.pop("clock_id", None)
if clock_id is not None:
source.clock_id = resolve_ref(clock_id, source.clock_id)
tags = kwargs.pop("tags", None)
if tags is not None:
source.tags = tags
if isinstance(source, (PictureColorStripSource, AdvancedPictureColorStripSource)):
if picture_source_id is not None and isinstance(source, PictureColorStripSource):
source.picture_source_id = resolve_ref(picture_source_id, source.picture_source_id)
if fps is not None:
source.fps = fps
if brightness is not None:
source.brightness = brightness
if saturation is not None:
source.saturation = saturation
if gamma is not None:
source.gamma = gamma
if smoothing is not None:
source.smoothing = smoothing
if interpolation_mode is not None:
source.interpolation_mode = interpolation_mode
if calibration is not None:
source.calibration = calibration
if led_count is not None:
source.led_count = led_count
if frame_interpolation is not None:
source.frame_interpolation = frame_interpolation
elif isinstance(source, StaticColorStripSource):
if color is not None:
if isinstance(color, list) and len(color) == 3:
source.color = color
if animation is not None:
source.animation = animation
elif isinstance(source, GradientColorStripSource):
if stops is not None and isinstance(stops, list):
source.stops = stops
if animation is not None:
source.animation = animation
elif isinstance(source, ColorCycleColorStripSource):
if colors is not None and isinstance(colors, list) and len(colors) >= 2:
source.colors = colors
elif isinstance(source, EffectColorStripSource):
if effect_type is not None:
source.effect_type = effect_type
if palette is not None:
source.palette = palette
if color is not None and isinstance(color, list) and len(color) == 3:
source.color = color
if intensity is not None:
source.intensity = float(intensity)
if scale is not None:
source.scale = float(scale)
if mirror is not None:
source.mirror = bool(mirror)
elif isinstance(source, AudioColorStripSource):
if visualization_mode is not None:
source.visualization_mode = visualization_mode
if audio_source_id is not None:
source.audio_source_id = resolve_ref(audio_source_id, source.audio_source_id)
if sensitivity is not None:
source.sensitivity = float(sensitivity)
if smoothing is not None:
source.smoothing = float(smoothing)
if palette is not None:
source.palette = palette
if color is not None and isinstance(color, list) and len(color) == 3:
source.color = color
if color_peak is not None and isinstance(color_peak, list) and len(color_peak) == 3:
source.color_peak = color_peak
if led_count is not None:
source.led_count = led_count
if mirror is not None:
source.mirror = bool(mirror)
elif isinstance(source, CompositeColorStripSource):
if layers is not None and isinstance(layers, list):
source.layers = layers
if led_count is not None:
source.led_count = led_count
elif isinstance(source, MappedColorStripSource):
if zones is not None and isinstance(zones, list):
source.zones = zones
if led_count is not None:
source.led_count = led_count
elif isinstance(source, ApiInputColorStripSource):
if fallback_color is not None and isinstance(fallback_color, list) and len(fallback_color) == 3:
source.fallback_color = fallback_color
if timeout is not None:
source.timeout = float(timeout)
if led_count is not None:
source.led_count = led_count
elif isinstance(source, NotificationColorStripSource):
if notification_effect is not None:
source.notification_effect = notification_effect
if duration_ms is not None:
source.duration_ms = int(duration_ms)
if default_color is not None:
source.default_color = default_color
if app_colors is not None and isinstance(app_colors, dict):
source.app_colors = app_colors
if app_filter_mode is not None:
source.app_filter_mode = app_filter_mode
if app_filter_list is not None and isinstance(app_filter_list, list):
source.app_filter_list = app_filter_list
if os_listener is not None:
source.os_listener = bool(os_listener)
elif isinstance(source, DaylightColorStripSource):
if speed is not None:
source.speed = float(speed)
if use_real_time is not None:
source.use_real_time = bool(use_real_time)
if latitude is not None:
source.latitude = float(latitude)
elif isinstance(source, CandlelightColorStripSource):
if color is not None and isinstance(color, list) and len(color) == 3:
source.color = color
if intensity is not None:
source.intensity = float(intensity)
if num_candles is not None:
source.num_candles = int(num_candles)
if speed is not None:
source.speed = float(speed)
# -- Type-specific fields --
source.apply_update(**kwargs)
source.updated_at = datetime.now(timezone.utc)
self._save()
@@ -539,3 +147,12 @@ class ColorStripStore(BaseJsonStore[ColorStripSource]):
names.append(source.name)
break
return names
def get_processed_referencing(self, source_id: str) -> List[str]:
"""Return names of processed sources that reference a given source as input."""
names = []
for source in self._items.values():
if isinstance(source, ProcessedColorStripSource):
if source.input_source_id == source_id:
names.append(source.name)
return names

View File

@@ -6,7 +6,8 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
from wled_controller.utils import atomic_write_json, get_logger
from wled_controller.storage.base_store import BaseJsonStore
from wled_controller.utils import get_logger
logger = get_logger(__name__)
@@ -52,6 +53,8 @@ class Device:
chroma_device_type: str = "chromalink",
# SteelSeries GameSense fields
gamesense_device_type: str = "keyboard",
# Default color strip processing template
default_css_processing_template_id: str = "",
created_at: Optional[datetime] = None,
updated_at: Optional[datetime] = None,
):
@@ -80,6 +83,7 @@ class Device:
self.spi_led_type = spi_led_type
self.chroma_device_type = chroma_device_type
self.gamesense_device_type = gamesense_device_type
self.default_css_processing_template_id = default_css_processing_template_id
self.created_at = created_at or datetime.now(timezone.utc)
self.updated_at = updated_at or datetime.now(timezone.utc)
@@ -133,6 +137,8 @@ class Device:
d["chroma_device_type"] = self.chroma_device_type
if self.gamesense_device_type != "keyboard":
d["gamesense_device_type"] = self.gamesense_device_type
if self.default_css_processing_template_id:
d["default_css_processing_template_id"] = self.default_css_processing_template_id
return d
@classmethod
@@ -164,78 +170,44 @@ class Device:
spi_led_type=data.get("spi_led_type", "WS2812B"),
chroma_device_type=data.get("chroma_device_type", "chromalink"),
gamesense_device_type=data.get("gamesense_device_type", "keyboard"),
default_css_processing_template_id=data.get("default_css_processing_template_id", ""),
created_at=datetime.fromisoformat(data.get("created_at", datetime.now(timezone.utc).isoformat())),
updated_at=datetime.fromisoformat(data.get("updated_at", datetime.now(timezone.utc).isoformat())),
)
class DeviceStore:
# Fields that can be updated (all Device.__init__ params except identity/timestamps)
_UPDATABLE_FIELDS = {
k for k in Device.__init__.__code__.co_varnames
if k not in ('self', 'device_id', 'created_at', 'updated_at')
}
class DeviceStore(BaseJsonStore[Device]):
"""Persistent storage for WLED devices."""
_json_key = "devices"
_entity_name = "Device"
def __init__(self, storage_file: str | Path):
self.storage_file = Path(storage_file)
self._devices: Dict[str, Device] = {}
super().__init__(file_path=str(storage_file), deserializer=Device.from_dict)
logger.info(f"Device store initialized with {len(self._items)} devices")
# Ensure directory exists
self.storage_file.parent.mkdir(parents=True, exist_ok=True)
# ── Backward-compat aliases ──────────────────────────────────
# Load existing devices
self.load()
def get_device(self, device_id: str) -> Device:
"""Get device by ID. Raises ValueError if not found."""
return self.get(device_id)
logger.info(f"Device store initialized with {len(self._devices)} devices")
def get_all_devices(self) -> List[Device]:
"""Get all devices."""
return self.get_all()
def load(self):
"""Load devices from storage file."""
if not self.storage_file.exists():
logger.info("Storage file does not exist, starting with empty store")
return
def delete_device(self, device_id: str) -> None:
"""Delete device. Raises ValueError if not found."""
self.delete(device_id)
try:
with open(self.storage_file, "r") as f:
data = json.load(f)
devices_data = data.get("devices", {})
self._devices = {
device_id: Device.from_dict(device_data)
for device_id, device_data in devices_data.items()
}
logger.info(f"Loaded {len(self._devices)} devices from storage")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse storage file: {e}")
raise
except Exception as e:
logger.error(f"Failed to load devices: {e}")
raise
def load_raw(self) -> dict:
"""Load raw JSON data from storage (for migration)."""
if not self.storage_file.exists():
return {}
try:
with open(self.storage_file, "r") as f:
return json.load(f)
except Exception:
return {}
def save(self):
"""Save devices to storage file."""
try:
data = {
"devices": {
device_id: device.to_dict()
for device_id, device in self._devices.items()
}
}
atomic_write_json(self.storage_file, data)
logger.debug(f"Saved {len(self._devices)} devices to storage")
except Exception as e:
logger.error(f"Failed to save devices: {e}")
raise
# ── Create / Update ──────────────────────────────────────────
def create_device(
self,
@@ -295,122 +267,48 @@ class DeviceStore:
gamesense_device_type=gamesense_device_type,
)
self._devices[device_id] = device
self.save()
self._items[device_id] = device
self._save()
logger.info(f"Created device {device_id}: {name}")
return device
def get_device(self, device_id: str) -> Optional[Device]:
"""Get device by ID."""
return self._devices.get(device_id)
def update_device(self, device_id: str, **kwargs) -> Device:
"""Update device fields.
def get_all_devices(self) -> List[Device]:
"""Get all devices."""
return list(self._devices.values())
Pass any updatable Device field as a keyword argument.
``None`` values are ignored (no change).
"""
device = self.get(device_id) # raises ValueError if not found
def update_device(
self,
device_id: str,
name: Optional[str] = None,
url: Optional[str] = None,
led_count: Optional[int] = None,
enabled: Optional[bool] = None,
baud_rate: Optional[int] = None,
auto_shutdown: Optional[bool] = None,
send_latency_ms: Optional[int] = None,
rgbw: Optional[bool] = None,
zone_mode: Optional[str] = None,
tags: Optional[List[str]] = None,
dmx_protocol: Optional[str] = None,
dmx_start_universe: Optional[int] = None,
dmx_start_channel: Optional[int] = None,
espnow_peer_mac: Optional[str] = None,
espnow_channel: Optional[int] = None,
hue_username: Optional[str] = None,
hue_client_key: Optional[str] = None,
hue_entertainment_group_id: Optional[str] = None,
spi_speed_hz: Optional[int] = None,
spi_led_type: Optional[str] = None,
chroma_device_type: Optional[str] = None,
gamesense_device_type: Optional[str] = None,
) -> Device:
"""Update device."""
device = self._devices.get(device_id)
if not device:
raise ValueError(f"Device {device_id} not found")
if name is not None:
device.name = name
if url is not None:
device.url = url
if led_count is not None:
device.led_count = led_count
if enabled is not None:
device.enabled = enabled
if baud_rate is not None:
device.baud_rate = baud_rate
if auto_shutdown is not None:
device.auto_shutdown = auto_shutdown
if send_latency_ms is not None:
device.send_latency_ms = send_latency_ms
if rgbw is not None:
device.rgbw = rgbw
if zone_mode is not None:
device.zone_mode = zone_mode
if tags is not None:
device.tags = tags
if dmx_protocol is not None:
device.dmx_protocol = dmx_protocol
if dmx_start_universe is not None:
device.dmx_start_universe = dmx_start_universe
if dmx_start_channel is not None:
device.dmx_start_channel = dmx_start_channel
if espnow_peer_mac is not None:
device.espnow_peer_mac = espnow_peer_mac
if espnow_channel is not None:
device.espnow_channel = espnow_channel
if hue_username is not None:
device.hue_username = hue_username
if hue_client_key is not None:
device.hue_client_key = hue_client_key
if hue_entertainment_group_id is not None:
device.hue_entertainment_group_id = hue_entertainment_group_id
if spi_speed_hz is not None:
device.spi_speed_hz = spi_speed_hz
if spi_led_type is not None:
device.spi_led_type = spi_led_type
if chroma_device_type is not None:
device.chroma_device_type = chroma_device_type
if gamesense_device_type is not None:
device.gamesense_device_type = gamesense_device_type
for key, value in kwargs.items():
if value is not None and key in _UPDATABLE_FIELDS:
setattr(device, key, value)
device.updated_at = datetime.now(timezone.utc)
self.save()
self._save()
logger.info(f"Updated device {device_id}")
return device
def delete_device(self, device_id: str):
"""Delete device."""
if device_id not in self._devices:
raise ValueError(f"Device {device_id} not found")
del self._devices[device_id]
self.save()
logger.info(f"Deleted device {device_id}")
# ── Unique helpers ───────────────────────────────────────────
def device_exists(self, device_id: str) -> bool:
"""Check if device exists."""
return device_id in self._devices
def count(self) -> int:
"""Get number of devices."""
return len(self._devices)
return device_id in self._items
def clear(self):
"""Clear all devices (for testing)."""
self._devices.clear()
self.save()
self._items.clear()
self._save()
logger.warning("Cleared all devices from storage")
def load_raw(self) -> dict:
"""Load raw JSON data from storage (for migration)."""
if not self.file_path.exists():
return {}
try:
with open(self.file_path, "r") as f:
return json.load(f)
except Exception:
return {}