Add tags to all entity types with chip-based input and autocomplete

- Add `tags: List[str]` field to all 13 entity types (devices, output targets,
  CSS sources, picture sources, audio sources, value sources, sync clocks,
  automations, scene presets, capture/audio/PP/pattern templates)
- Update all stores, schemas, and route handlers for tag CRUD
- Add GET /api/v1/tags endpoint aggregating unique tags across all stores
- Create TagInput component with chip display, autocomplete dropdown,
  keyboard navigation, and API-backed suggestions
- Display tag chips on all entity cards (searchable via existing text filter)
- Add tag input to all 14 editor modals with dirty check support
- Add CSS styles and i18n keys (en/ru/zh) for tag UI
- Also includes code review fixes: thread safety, perf, store dedup

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-09 22:20:19 +03:00
parent 2712c6682e
commit 30fa107ef7
120 changed files with 2471 additions and 1949 deletions

View File

@@ -5,9 +5,9 @@ An AudioSource represents a reusable audio input configuration:
MonoAudioSource — extracts a single channel from a multichannel source
"""
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List, Optional
@dataclass
@@ -20,6 +20,7 @@ class AudioSource:
created_at: datetime
updated_at: datetime
description: Optional[str] = None
tags: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
"""Convert source to dictionary. Subclasses extend this."""
@@ -30,6 +31,7 @@ class AudioSource:
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"description": self.description,
"tags": self.tags,
# Subclass fields default to None for forward compat
"device_index": None,
"is_loopback": None,
@@ -45,26 +47,27 @@ class AudioSource:
sid: str = data["id"]
name: str = data["name"]
description: str | None = data.get("description")
tags: list = data.get("tags", [])
raw_created = data.get("created_at")
created_at: datetime = (
datetime.fromisoformat(raw_created)
if isinstance(raw_created, str)
else raw_created if isinstance(raw_created, datetime)
else datetime.utcnow()
else datetime.now(timezone.utc)
)
raw_updated = data.get("updated_at")
updated_at: datetime = (
datetime.fromisoformat(raw_updated)
if isinstance(raw_updated, str)
else raw_updated if isinstance(raw_updated, datetime)
else datetime.utcnow()
else datetime.now(timezone.utc)
)
if source_type == "mono":
return MonoAudioSource(
id=sid, name=name, source_type="mono",
created_at=created_at, updated_at=updated_at, description=description,
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
audio_source_id=data.get("audio_source_id") or "",
channel=data.get("channel") or "mono",
)
@@ -72,7 +75,7 @@ class AudioSource:
# Default: multichannel
return MultichannelAudioSource(
id=sid, name=name, source_type="multichannel",
created_at=created_at, updated_at=updated_at, description=description,
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
device_index=int(data.get("device_index", -1)),
is_loopback=bool(data.get("is_loopback", True)),
audio_template_id=data.get("audio_template_id"),

View File

@@ -1,87 +1,36 @@
"""Audio source storage using JSON files."""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timezone
from typing import List, Optional, Tuple
from wled_controller.storage.audio_source import (
AudioSource,
MonoAudioSource,
MultichannelAudioSource,
)
from wled_controller.utils import atomic_write_json, get_logger
from wled_controller.storage.base_store import BaseJsonStore
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class AudioSourceStore:
class AudioSourceStore(BaseJsonStore[AudioSource]):
"""Persistent storage for audio sources."""
_json_key = "audio_sources"
_entity_name = "Audio source"
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self._sources: Dict[str, AudioSource] = {}
self._load()
super().__init__(file_path, AudioSource.from_dict)
def _load(self) -> None:
if not self.file_path.exists():
logger.info("Audio source store file not found — starting empty")
return
try:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json.load(f)
sources_data = data.get("audio_sources", {})
loaded = 0
for source_id, source_dict in sources_data.items():
try:
source = AudioSource.from_dict(source_dict)
self._sources[source_id] = source
loaded += 1
except Exception as e:
logger.error(
f"Failed to load audio source {source_id}: {e}",
exc_info=True,
)
if loaded > 0:
logger.info(f"Loaded {loaded} audio sources from storage")
except Exception as e:
logger.error(f"Failed to load audio sources from {self.file_path}: {e}")
raise
logger.info(f"Audio source store initialized with {len(self._sources)} sources")
def _save(self) -> None:
try:
data = {
"version": "1.0.0",
"audio_sources": {
sid: source.to_dict()
for sid, source in self._sources.items()
},
}
atomic_write_json(self.file_path, data)
except Exception as e:
logger.error(f"Failed to save audio sources to {self.file_path}: {e}")
raise
# ── CRUD ─────────────────────────────────────────────────────────
def get_all_sources(self) -> List[AudioSource]:
return list(self._sources.values())
# Backward-compatible aliases
get_all_sources = BaseJsonStore.get_all
get_source = BaseJsonStore.get
def get_mono_sources(self) -> List[MonoAudioSource]:
"""Return only mono audio sources (for CSS dropdown)."""
return [s for s in self._sources.values() if isinstance(s, MonoAudioSource)]
def get_source(self, source_id: str) -> AudioSource:
if source_id not in self._sources:
raise ValueError(f"Audio source not found: {source_id}")
return self._sources[source_id]
return [s for s in self._items.values() if isinstance(s, MonoAudioSource)]
def create_source(
self,
@@ -93,25 +42,21 @@ class AudioSourceStore:
channel: Optional[str] = None,
description: Optional[str] = None,
audio_template_id: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> AudioSource:
if not name or not name.strip():
raise ValueError("Name is required")
self._check_name_unique(name)
if source_type not in ("multichannel", "mono"):
raise ValueError(f"Invalid source type: {source_type}")
for source in self._sources.values():
if source.name == name:
raise ValueError(f"Audio source with name '{name}' already exists")
sid = f"as_{uuid.uuid4().hex[:8]}"
now = datetime.utcnow()
now = datetime.now(timezone.utc)
if source_type == "mono":
if not audio_source_id:
raise ValueError("Mono sources require audio_source_id")
# Validate parent exists and is multichannel
parent = self._sources.get(audio_source_id)
parent = self._items.get(audio_source_id)
if not parent:
raise ValueError(f"Parent audio source not found: {audio_source_id}")
if not isinstance(parent, MultichannelAudioSource):
@@ -119,20 +64,20 @@ class AudioSourceStore:
source = MonoAudioSource(
id=sid, name=name, source_type="mono",
created_at=now, updated_at=now, description=description,
created_at=now, updated_at=now, description=description, tags=tags or [],
audio_source_id=audio_source_id,
channel=channel or "mono",
)
else:
source = MultichannelAudioSource(
id=sid, name=name, source_type="multichannel",
created_at=now, updated_at=now, description=description,
created_at=now, updated_at=now, description=description, tags=tags or [],
device_index=device_index if device_index is not None else -1,
is_loopback=bool(is_loopback) if is_loopback is not None else True,
audio_template_id=audio_template_id,
)
self._sources[sid] = source
self._items[sid] = source
self._save()
logger.info(f"Created audio source: {name} ({sid}, type={source_type})")
@@ -148,20 +93,18 @@ class AudioSourceStore:
channel: Optional[str] = None,
description: Optional[str] = None,
audio_template_id: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> AudioSource:
if source_id not in self._sources:
raise ValueError(f"Audio source not found: {source_id}")
source = self._sources[source_id]
source = self.get(source_id)
if name is not None:
for other in self._sources.values():
if other.id != source_id and other.name == name:
raise ValueError(f"Audio source with name '{name}' already exists")
self._check_name_unique(name, exclude_id=source_id)
source.name = name
if description is not None:
source.description = description
if tags is not None:
source.tags = tags
if isinstance(source, MultichannelAudioSource):
if device_index is not None:
@@ -172,7 +115,7 @@ class AudioSourceStore:
source.audio_template_id = audio_template_id
elif isinstance(source, MonoAudioSource):
if audio_source_id is not None:
parent = self._sources.get(audio_source_id)
parent = self._items.get(audio_source_id)
if not parent:
raise ValueError(f"Parent audio source not found: {audio_source_id}")
if not isinstance(parent, MultichannelAudioSource):
@@ -181,27 +124,27 @@ class AudioSourceStore:
if channel is not None:
source.channel = channel
source.updated_at = datetime.utcnow()
source.updated_at = datetime.now(timezone.utc)
self._save()
logger.info(f"Updated audio source: {source_id}")
return source
def delete_source(self, source_id: str) -> None:
if source_id not in self._sources:
raise ValueError(f"Audio source not found: {source_id}")
if source_id not in self._items:
raise ValueError(f"{self._entity_name} not found: {source_id}")
source = self._sources[source_id]
source = self._items[source_id]
# Prevent deleting multichannel sources referenced by mono sources
if isinstance(source, MultichannelAudioSource):
for other in self._sources.values():
for other in self._items.values():
if isinstance(other, MonoAudioSource) and other.audio_source_id == source_id:
raise ValueError(
f"Cannot delete '{source.name}': referenced by mono source '{other.name}'"
)
del self._sources[source_id]
del self._items[source_id]
self._save()
logger.info(f"Deleted audio source: {source_id}")
@@ -231,4 +174,3 @@ class AudioSourceStore:
return parent.device_index, parent.is_loopback, source.channel, parent.audio_template_id
raise ValueError(f"Audio source {source_id} is not a valid audio source")

View File

@@ -1,8 +1,8 @@
"""Audio capture template data model."""
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Optional
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
@dataclass
@@ -16,6 +16,7 @@ class AudioCaptureTemplate:
created_at: datetime
updated_at: datetime
description: Optional[str] = None
tags: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
"""Convert template to dictionary."""
@@ -27,6 +28,7 @@ class AudioCaptureTemplate:
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"description": self.description,
"tags": self.tags,
}
@classmethod
@@ -39,9 +41,10 @@ class AudioCaptureTemplate:
engine_config=data.get("engine_config", {}),
created_at=datetime.fromisoformat(data["created_at"])
if isinstance(data.get("created_at"), str)
else data.get("created_at", datetime.utcnow()),
else data.get("created_at", datetime.now(timezone.utc)),
updated_at=datetime.fromisoformat(data["updated_at"])
if isinstance(data.get("updated_at"), str)
else data.get("updated_at", datetime.utcnow()),
else data.get("updated_at", datetime.now(timezone.utc)),
description=data.get("description"),
tags=data.get("tags", []),
)

View File

@@ -1,19 +1,18 @@
"""Audio template storage using JSON files."""
import json
import uuid
from datetime import datetime
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional
from wled_controller.core.audio.factory import AudioEngineRegistry
from wled_controller.storage.audio_template import AudioCaptureTemplate
from wled_controller.utils import atomic_write_json, get_logger
from wled_controller.storage.base_store import BaseJsonStore
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class AudioTemplateStore:
class AudioTemplateStore(BaseJsonStore[AudioCaptureTemplate]):
"""Storage for audio capture templates.
All templates are persisted to the JSON file.
@@ -21,15 +20,20 @@ class AudioTemplateStore:
highest-priority available engine.
"""
_json_key = "templates"
_entity_name = "Audio capture template"
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self._templates: Dict[str, AudioCaptureTemplate] = {}
self._load()
super().__init__(file_path, AudioCaptureTemplate.from_dict)
self._ensure_initial_template()
# Backward-compatible aliases
get_all_templates = BaseJsonStore.get_all
get_template = BaseJsonStore.get
def _ensure_initial_template(self) -> None:
"""Auto-create a template if none exist, using the best available engine."""
if self._templates:
if self._items:
return
best_engine = AudioEngineRegistry.get_best_available_engine()
@@ -39,7 +43,7 @@ class AudioTemplateStore:
engine_class = AudioEngineRegistry.get_engine(best_engine)
default_config = engine_class.get_default_config()
now = datetime.utcnow()
now = datetime.now(timezone.utc)
template_id = f"atpl_{uuid.uuid4().hex[:8]}"
template = AudioCaptureTemplate(
@@ -52,71 +56,17 @@ class AudioTemplateStore:
description=f"Default audio template using {best_engine.upper()} engine",
)
self._templates[template_id] = template
self._items[template_id] = template
self._save()
logger.info(
f"Auto-created initial audio template: {template.name} "
f"({template_id}, engine={best_engine})"
)
def _load(self) -> None:
"""Load templates from file."""
if not self.file_path.exists():
return
try:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json.load(f)
templates_data = data.get("templates", {})
loaded = 0
for template_id, template_dict in templates_data.items():
try:
template = AudioCaptureTemplate.from_dict(template_dict)
self._templates[template_id] = template
loaded += 1
except Exception as e:
logger.error(
f"Failed to load audio template {template_id}: {e}",
exc_info=True,
)
if loaded > 0:
logger.info(f"Loaded {loaded} audio templates from storage")
except Exception as e:
logger.error(f"Failed to load audio templates from {self.file_path}: {e}")
raise
logger.info(f"Audio template store initialized with {len(self._templates)} templates")
def _save(self) -> None:
"""Save all templates to file."""
try:
data = {
"version": "1.0.0",
"templates": {
template_id: template.to_dict()
for template_id, template in self._templates.items()
},
}
atomic_write_json(self.file_path, data)
except Exception as e:
logger.error(f"Failed to save audio templates to {self.file_path}: {e}")
raise
def get_all_templates(self) -> List[AudioCaptureTemplate]:
return list(self._templates.values())
def get_template(self, template_id: str) -> AudioCaptureTemplate:
if template_id not in self._templates:
raise ValueError(f"Audio template not found: {template_id}")
return self._templates[template_id]
def get_default_template_id(self) -> Optional[str]:
"""Return the ID of the first template, or None if none exist."""
if self._templates:
return next(iter(self._templates))
if self._items:
return next(iter(self._items))
return None
def create_template(
@@ -125,13 +75,12 @@ class AudioTemplateStore:
engine_type: str,
engine_config: Dict[str, any],
description: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> AudioCaptureTemplate:
for template in self._templates.values():
if template.name == name:
raise ValueError(f"Audio template with name '{name}' already exists")
self._check_name_unique(name)
template_id = f"atpl_{uuid.uuid4().hex[:8]}"
now = datetime.utcnow()
now = datetime.now(timezone.utc)
template = AudioCaptureTemplate(
id=template_id,
name=name,
@@ -140,9 +89,10 @@ class AudioTemplateStore:
created_at=now,
updated_at=now,
description=description,
tags=tags or [],
)
self._templates[template_id] = template
self._items[template_id] = template
self._save()
logger.info(f"Created audio template: {name} ({template_id})")
return template
@@ -154,16 +104,12 @@ class AudioTemplateStore:
engine_type: Optional[str] = None,
engine_config: Optional[Dict[str, any]] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> AudioCaptureTemplate:
if template_id not in self._templates:
raise ValueError(f"Audio template not found: {template_id}")
template = self._templates[template_id]
template = self.get(template_id)
if name is not None:
for tid, t in self._templates.items():
if tid != template_id and t.name == name:
raise ValueError(f"Audio template with name '{name}' already exists")
self._check_name_unique(name, exclude_id=template_id)
template.name = name
if engine_type is not None:
template.engine_type = engine_type
@@ -171,8 +117,10 @@ class AudioTemplateStore:
template.engine_config = engine_config
if description is not None:
template.description = description
if tags is not None:
template.tags = tags
template.updated_at = datetime.utcnow()
template.updated_at = datetime.now(timezone.utc)
self._save()
logger.info(f"Updated audio template: {template_id}")
return template
@@ -187,8 +135,8 @@ class AudioTemplateStore:
Raises:
ValueError: If template not found or still referenced
"""
if template_id not in self._templates:
raise ValueError(f"Audio template not found: {template_id}")
if template_id not in self._items:
raise ValueError(f"{self._entity_name} not found: {template_id}")
# Check if any multichannel audio source references this template
if audio_source_store is not None:
@@ -203,6 +151,6 @@ class AudioTemplateStore:
f"referenced by audio source '{source.name}' ({source.id})"
)
del self._templates[template_id]
del self._items[template_id]
self._save()
logger.info(f"Deleted audio template: {template_id}")

View File

@@ -1,7 +1,7 @@
"""Automation and Condition data models."""
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timezone
from typing import List, Optional
@@ -204,6 +204,7 @@ class Automation:
deactivation_scene_preset_id: Optional[str] # scene for fallback_scene mode
created_at: datetime
updated_at: datetime
tags: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {
@@ -215,6 +216,7 @@ class Automation:
"scene_preset_id": self.scene_preset_id,
"deactivation_mode": self.deactivation_mode,
"deactivation_scene_preset_id": self.deactivation_scene_preset_id,
"tags": self.tags,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}
@@ -237,6 +239,7 @@ class Automation:
scene_preset_id=data.get("scene_preset_id"),
deactivation_mode=data.get("deactivation_mode", "none"),
deactivation_scene_preset_id=data.get("deactivation_scene_preset_id"),
created_at=datetime.fromisoformat(data.get("created_at", datetime.utcnow().isoformat())),
updated_at=datetime.fromisoformat(data.get("updated_at", datetime.utcnow().isoformat())),
tags=data.get("tags", []),
created_at=datetime.fromisoformat(data.get("created_at", datetime.now(timezone.utc).isoformat())),
updated_at=datetime.fromisoformat(data.get("updated_at", datetime.now(timezone.utc).isoformat())),
)

View File

@@ -1,72 +1,27 @@
"""Automation storage using JSON files."""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.storage.automation import Automation, Condition
from wled_controller.utils import atomic_write_json, get_logger
from wled_controller.storage.base_store import BaseJsonStore
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class AutomationStore:
"""Persistent storage for automations."""
class AutomationStore(BaseJsonStore[Automation]):
_json_key = "automations"
_entity_name = "Automation"
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self._automations: Dict[str, Automation] = {}
self._load()
super().__init__(file_path, Automation.from_dict)
def _load(self) -> None:
if not self.file_path.exists():
return
try:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json.load(f)
automations_data = data.get("automations", {})
loaded = 0
for auto_id, auto_dict in automations_data.items():
try:
automation = Automation.from_dict(auto_dict)
self._automations[auto_id] = automation
loaded += 1
except Exception as e:
logger.error(f"Failed to load automation {auto_id}: {e}", exc_info=True)
if loaded > 0:
logger.info(f"Loaded {loaded} automations from storage")
except Exception as e:
logger.error(f"Failed to load automations from {self.file_path}: {e}")
raise
logger.info(f"Automation store initialized with {len(self._automations)} automations")
def _save(self) -> None:
try:
data = {
"version": "1.0.0",
"automations": {
aid: a.to_dict() for aid, a in self._automations.items()
},
}
atomic_write_json(self.file_path, data)
except Exception as e:
logger.error(f"Failed to save automations to {self.file_path}: {e}")
raise
def get_all_automations(self) -> List[Automation]:
return list(self._automations.values())
def get_automation(self, automation_id: str) -> Automation:
if automation_id not in self._automations:
raise ValueError(f"Automation not found: {automation_id}")
return self._automations[automation_id]
# Backward-compatible aliases
get_all_automations = BaseJsonStore.get_all
get_automation = BaseJsonStore.get
delete_automation = BaseJsonStore.delete
def create_automation(
self,
@@ -77,13 +32,14 @@ class AutomationStore:
scene_preset_id: Optional[str] = None,
deactivation_mode: str = "none",
deactivation_scene_preset_id: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> Automation:
for a in self._automations.values():
for a in self._items.values():
if a.name == name:
raise ValueError(f"Automation with name '{name}' already exists")
automation_id = f"auto_{uuid.uuid4().hex[:8]}"
now = datetime.utcnow()
now = datetime.now(timezone.utc)
automation = Automation(
id=automation_id,
@@ -96,11 +52,11 @@ class AutomationStore:
deactivation_scene_preset_id=deactivation_scene_preset_id,
created_at=now,
updated_at=now,
tags=tags or [],
)
self._automations[automation_id] = automation
self._items[automation_id] = automation
self._save()
logger.info(f"Created automation: {name} ({automation_id})")
return automation
@@ -114,16 +70,12 @@ class AutomationStore:
scene_preset_id: str = "__unset__",
deactivation_mode: Optional[str] = None,
deactivation_scene_preset_id: str = "__unset__",
tags: Optional[List[str]] = None,
) -> Automation:
if automation_id not in self._automations:
raise ValueError(f"Automation not found: {automation_id}")
automation = self._automations[automation_id]
automation = self.get(automation_id)
if name is not None:
for aid, a in self._automations.items():
if aid != automation_id and a.name == name:
raise ValueError(f"Automation with name '{name}' already exists")
self._check_name_unique(name, exclude_id=automation_id)
automation.name = name
if enabled is not None:
automation.enabled = enabled
@@ -137,21 +89,10 @@ class AutomationStore:
automation.deactivation_mode = deactivation_mode
if deactivation_scene_preset_id != "__unset__":
automation.deactivation_scene_preset_id = deactivation_scene_preset_id
if tags is not None:
automation.tags = tags
automation.updated_at = datetime.utcnow()
automation.updated_at = datetime.now(timezone.utc)
self._save()
logger.info(f"Updated automation: {automation_id}")
return automation
def delete_automation(self, automation_id: str) -> None:
if automation_id not in self._automations:
raise ValueError(f"Automation not found: {automation_id}")
del self._automations[automation_id]
self._save()
logger.info(f"Deleted automation: {automation_id}")
def count(self) -> int:
return len(self._automations)

View File

@@ -0,0 +1,115 @@
"""Base class for JSON entity stores — eliminates boilerplate across 12+ stores."""
import json
from pathlib import Path
from typing import Callable, Dict, Generic, List, TypeVar
from wled_controller.utils import atomic_write_json, get_logger
T = TypeVar("T")
logger = get_logger(__name__)
class BaseJsonStore(Generic[T]):
"""JSON-file-backed entity store with common CRUD helpers.
Provides:
- ``_load()`` / ``_save()``: atomic JSON file I/O
- ``get_all()`` / ``get(id)`` / ``delete(id)`` / ``count()``: read/delete
- ``_check_name_unique(name, exclude_id)``: duplicate-name guard
Subclasses must set class attributes:
- ``_json_key``: root key in JSON file (e.g. ``"sync_clocks"``)
- ``_entity_name``: human label for errors (e.g. ``"Sync clock"``)
- ``_version``: schema version string (default ``"1.0.0"``)
"""
_json_key: str
_entity_name: str
_version: str = "1.0.0"
def __init__(self, file_path: str, deserializer: Callable[[dict], T]):
self.file_path = Path(file_path)
self._items: Dict[str, T] = {}
self._deserializer = deserializer
self._load()
# ── I/O ────────────────────────────────────────────────────────
def _load(self) -> None:
if not self.file_path.exists():
logger.info(f"{self._entity_name} store file not found — starting empty")
return
try:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json.load(f)
items_data = data.get(self._json_key, {})
loaded = 0
for item_id, item_dict in items_data.items():
try:
self._items[item_id] = self._deserializer(item_dict)
loaded += 1
except Exception as e:
logger.error(
f"Failed to load {self._entity_name} {item_id}: {e}",
exc_info=True,
)
if loaded > 0:
logger.info(f"Loaded {loaded} {self._json_key} from storage")
except Exception as e:
logger.error(f"Failed to load {self._json_key} from {self.file_path}: {e}")
raise
logger.info(
f"{self._entity_name} store initialized with {len(self._items)} items"
)
def _save(self) -> None:
try:
data = {
"version": self._version,
self._json_key: {
item_id: item.to_dict()
for item_id, item in self._items.items()
},
}
atomic_write_json(self.file_path, data)
except Exception as e:
logger.error(f"Failed to save {self._json_key} to {self.file_path}: {e}")
raise
# ── Common CRUD ────────────────────────────────────────────────
def get_all(self) -> List[T]:
return list(self._items.values())
def get(self, item_id: str) -> T:
if item_id not in self._items:
raise ValueError(f"{self._entity_name} not found: {item_id}")
return self._items[item_id]
def delete(self, item_id: str) -> None:
if item_id not in self._items:
raise ValueError(f"{self._entity_name} not found: {item_id}")
del self._items[item_id]
self._save()
logger.info(f"Deleted {self._entity_name}: {item_id}")
def count(self) -> int:
return len(self._items)
# ── Helpers ────────────────────────────────────────────────────
def _check_name_unique(self, name: str, exclude_id: str = None) -> None:
"""Raise ValueError if *name* is empty or already taken."""
if not name or not name.strip():
raise ValueError("Name is required")
for item_id, item in self._items.items():
if item_id != exclude_id and getattr(item, "name", None) == name:
raise ValueError(
f"{self._entity_name} with name '{name}' already exists"
)

View File

@@ -16,8 +16,8 @@ Current types:
"""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.core.capture.calibration import (
CalibrationConfig,
@@ -37,6 +37,7 @@ class ColorStripSource:
updated_at: datetime
description: Optional[str] = None
clock_id: Optional[str] = None # optional SyncClock reference
tags: List[str] = field(default_factory=list)
@property
def sharable(self) -> bool:
@@ -57,6 +58,7 @@ class ColorStripSource:
"updated_at": self.updated_at.isoformat(),
"description": self.description,
"clock_id": self.clock_id,
"tags": self.tags,
# Subclass fields default to None for forward compat
"picture_source_id": None,
"fps": None,
@@ -102,20 +104,21 @@ class ColorStripSource:
description: str | None = data.get("description")
clock_id: str | None = data.get("clock_id")
tags: list = data.get("tags", [])
raw_created = data.get("created_at")
created_at: datetime = (
datetime.fromisoformat(raw_created)
if isinstance(raw_created, str)
else raw_created if isinstance(raw_created, datetime)
else datetime.utcnow()
else datetime.now(timezone.utc)
)
raw_updated = data.get("updated_at")
updated_at: datetime = (
datetime.fromisoformat(raw_updated)
if isinstance(raw_updated, str)
else raw_updated if isinstance(raw_updated, datetime)
else datetime.utcnow()
else datetime.now(timezone.utc)
)
calibration_data = data.get("calibration")
@@ -134,7 +137,7 @@ class ColorStripSource:
return StaticColorStripSource(
id=sid, name=name, source_type="static",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, color=color,
clock_id=clock_id, tags=tags, color=color,
animation=data.get("animation"),
)
@@ -144,7 +147,7 @@ class ColorStripSource:
return GradientColorStripSource(
id=sid, name=name, source_type="gradient",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, stops=stops,
clock_id=clock_id, tags=tags, stops=stops,
animation=data.get("animation"),
)
@@ -154,14 +157,14 @@ class ColorStripSource:
return ColorCycleColorStripSource(
id=sid, name=name, source_type="color_cycle",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, colors=colors,
clock_id=clock_id, tags=tags, colors=colors,
)
if source_type == "composite":
return CompositeColorStripSource(
id=sid, name=name, source_type="composite",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, layers=data.get("layers") or [],
clock_id=clock_id, tags=tags, layers=data.get("layers") or [],
led_count=data.get("led_count") or 0,
)
@@ -169,7 +172,7 @@ class ColorStripSource:
return MappedColorStripSource(
id=sid, name=name, source_type="mapped",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, zones=data.get("zones") or [],
clock_id=clock_id, tags=tags, zones=data.get("zones") or [],
led_count=data.get("led_count") or 0,
)
@@ -181,7 +184,7 @@ class ColorStripSource:
return AudioColorStripSource(
id=sid, name=name, source_type="audio",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, visualization_mode=data.get("visualization_mode") or "spectrum",
clock_id=clock_id, tags=tags, visualization_mode=data.get("visualization_mode") or "spectrum",
audio_source_id=data.get("audio_source_id") or "",
sensitivity=float(data.get("sensitivity") or 1.0),
smoothing=float(data.get("smoothing") or 0.3),
@@ -201,7 +204,7 @@ class ColorStripSource:
return EffectColorStripSource(
id=sid, name=name, source_type="effect",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, effect_type=data.get("effect_type") or "fire",
clock_id=clock_id, tags=tags, effect_type=data.get("effect_type") or "fire",
palette=data.get("palette") or "fire",
color=color,
intensity=float(data.get("intensity") or 1.0),
@@ -218,7 +221,7 @@ class ColorStripSource:
return ApiInputColorStripSource(
id=sid, name=name, source_type="api_input",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id, led_count=data.get("led_count") or 0,
clock_id=clock_id, tags=tags, led_count=data.get("led_count") or 0,
fallback_color=fallback_color,
timeout=float(data.get("timeout") or 5.0),
)
@@ -231,7 +234,7 @@ class ColorStripSource:
return NotificationColorStripSource(
id=sid, name=name, source_type="notification",
created_at=created_at, updated_at=updated_at, description=description,
clock_id=clock_id,
clock_id=clock_id, tags=tags,
notification_effect=data.get("notification_effect") or "flash",
duration_ms=int(data.get("duration_ms") or 1500),
default_color=data.get("default_color") or "#FFFFFF",
@@ -243,6 +246,7 @@ class ColorStripSource:
# Shared picture-type field extraction
_picture_kwargs = dict(
tags=tags,
fps=data.get("fps") or 30,
brightness=data["brightness"] if data.get("brightness") is not None else 1.0,
saturation=data["saturation"] if data.get("saturation") is not None else 1.0,

View File

@@ -1,12 +1,11 @@
"""Color strip source storage using JSON files."""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.core.capture.calibration import CalibrationConfig, calibration_to_dict
from wled_controller.storage.base_store import BaseJsonStore
from wled_controller.storage.color_strip_source import (
AdvancedPictureColorStripSource,
ApiInputColorStripSource,
@@ -21,73 +20,27 @@ from wled_controller.storage.color_strip_source import (
PictureColorStripSource,
StaticColorStripSource,
)
from wled_controller.utils import atomic_write_json, get_logger
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class ColorStripStore:
class ColorStripStore(BaseJsonStore[ColorStripSource]):
"""Persistent storage for color strip sources."""
_json_key = "color_strip_sources"
_entity_name = "Color strip source"
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self._sources: Dict[str, ColorStripSource] = {}
self._load()
super().__init__(file_path, ColorStripSource.from_dict)
def _load(self) -> None:
if not self.file_path.exists():
logger.info("Color strip store file not found — starting empty")
return
try:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json.load(f)
sources_data = data.get("color_strip_sources", {})
loaded = 0
for source_id, source_dict in sources_data.items():
try:
source = ColorStripSource.from_dict(source_dict)
self._sources[source_id] = source
loaded += 1
except Exception as e:
logger.error(f"Failed to load color strip source {source_id}: {e}", exc_info=True)
if loaded > 0:
logger.info(f"Loaded {loaded} color strip sources from storage")
except Exception as e:
logger.error(f"Failed to load color strip sources from {self.file_path}: {e}")
raise
logger.info(f"Color strip store initialized with {len(self._sources)} sources")
def _save(self) -> None:
try:
data = {
"version": "1.0.0",
"color_strip_sources": {
sid: source.to_dict()
for sid, source in self._sources.items()
},
}
atomic_write_json(self.file_path, data)
except Exception as e:
logger.error(f"Failed to save color strip sources to {self.file_path}: {e}")
raise
def get_all_sources(self) -> List[ColorStripSource]:
return list(self._sources.values())
# Backward-compatible aliases
get_all_sources = BaseJsonStore.get_all
delete_source = BaseJsonStore.delete
def get_source(self, source_id: str) -> ColorStripSource:
"""Get a color strip source by ID.
Raises:
ValueError: If source not found
"""
if source_id not in self._sources:
raise ValueError(f"Color strip source not found: {source_id}")
return self._sources[source_id]
"""Get a color strip source by ID (alias for get())."""
return self.get(source_id)
def create_source(
self,
@@ -129,6 +82,7 @@ class ColorStripStore:
app_filter_mode: Optional[str] = None,
app_filter_list: Optional[list] = None,
os_listener: Optional[bool] = None,
tags: Optional[List[str]] = None,
) -> ColorStripSource:
"""Create a new color strip source.
@@ -138,12 +92,12 @@ class ColorStripStore:
if not name or not name.strip():
raise ValueError("Name is required")
for source in self._sources.values():
for source in self._items.values():
if source.name == name:
raise ValueError(f"Color strip source with name '{name}' already exists")
source_id = f"css_{uuid.uuid4().hex[:8]}"
now = datetime.utcnow()
now = datetime.now(timezone.utc)
if source_type == "static":
rgb = color if isinstance(color, list) and len(color) == 3 else [255, 255, 255]
@@ -325,7 +279,8 @@ class ColorStripStore:
frame_interpolation=frame_interpolation,
)
self._sources[source_id] = source
source.tags = tags or []
self._items[source_id] = source
self._save()
logger.info(f"Created color strip source: {name} ({source_id}, type={source_type})")
@@ -371,19 +326,20 @@ class ColorStripStore:
app_filter_mode: Optional[str] = None,
app_filter_list: Optional[list] = None,
os_listener: Optional[bool] = None,
tags: Optional[List[str]] = None,
) -> ColorStripSource:
"""Update an existing color strip source.
Raises:
ValueError: If source not found
"""
if source_id not in self._sources:
if source_id not in self._items:
raise ValueError(f"Color strip source not found: {source_id}")
source = self._sources[source_id]
source = self._items[source_id]
if name is not None:
for other in self._sources.values():
for other in self._items.values():
if other.id != source_id and other.name == name:
raise ValueError(f"Color strip source with name '{name}' already exists")
source.name = name
@@ -394,6 +350,9 @@ class ColorStripStore:
if clock_id is not None:
source.clock_id = clock_id if clock_id else None
if tags is not None:
source.tags = tags
if isinstance(source, (PictureColorStripSource, AdvancedPictureColorStripSource)):
if picture_source_id is not None and isinstance(source, PictureColorStripSource):
source.picture_source_id = picture_source_id
@@ -494,30 +453,16 @@ class ColorStripStore:
if os_listener is not None:
source.os_listener = bool(os_listener)
source.updated_at = datetime.utcnow()
source.updated_at = datetime.now(timezone.utc)
self._save()
logger.info(f"Updated color strip source: {source_id}")
return source
def delete_source(self, source_id: str) -> None:
"""Delete a color strip source.
Raises:
ValueError: If source not found
"""
if source_id not in self._sources:
raise ValueError(f"Color strip source not found: {source_id}")
del self._sources[source_id]
self._save()
logger.info(f"Deleted color strip source: {source_id}")
def get_composites_referencing(self, source_id: str) -> List[str]:
"""Return names of composite sources that reference a given source as a layer."""
names = []
for source in self._sources.values():
for source in self._items.values():
if isinstance(source, CompositeColorStripSource):
for layer in source.layers:
if layer.get("source_id") == source_id:
@@ -528,7 +473,7 @@ class ColorStripStore:
def get_mapped_referencing(self, source_id: str) -> List[str]:
"""Return names of mapped sources that reference a given source as a zone."""
names = []
for source in self._sources.values():
for source in self._items.values():
if isinstance(source, MappedColorStripSource):
for zone in source.zones:
if zone.get("source_id") == source_id:

View File

@@ -2,11 +2,11 @@
import json
import uuid
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
from wled_controller.utils import get_logger
from wled_controller.utils import atomic_write_json, get_logger
logger = get_logger(__name__)
@@ -33,6 +33,7 @@ class Device:
send_latency_ms: int = 0,
rgbw: bool = False,
zone_mode: str = "combined",
tags: List[str] = None,
created_at: Optional[datetime] = None,
updated_at: Optional[datetime] = None,
):
@@ -48,8 +49,9 @@ class Device:
self.send_latency_ms = send_latency_ms
self.rgbw = rgbw
self.zone_mode = zone_mode
self.created_at = created_at or datetime.utcnow()
self.updated_at = updated_at or datetime.utcnow()
self.tags = tags or []
self.created_at = created_at or datetime.now(timezone.utc)
self.updated_at = updated_at or datetime.now(timezone.utc)
def to_dict(self) -> dict:
"""Convert device to dictionary."""
@@ -75,6 +77,8 @@ class Device:
d["rgbw"] = True
if self.zone_mode != "combined":
d["zone_mode"] = self.zone_mode
if self.tags:
d["tags"] = self.tags
return d
@classmethod
@@ -93,8 +97,9 @@ class Device:
send_latency_ms=data.get("send_latency_ms", 0),
rgbw=data.get("rgbw", False),
zone_mode=data.get("zone_mode", "combined"),
created_at=datetime.fromisoformat(data.get("created_at", datetime.utcnow().isoformat())),
updated_at=datetime.fromisoformat(data.get("updated_at", datetime.utcnow().isoformat())),
tags=data.get("tags", []),
created_at=datetime.fromisoformat(data.get("created_at", datetime.now(timezone.utc).isoformat())),
updated_at=datetime.fromisoformat(data.get("updated_at", datetime.now(timezone.utc).isoformat())),
)
@@ -158,11 +163,7 @@ class DeviceStore:
}
}
temp_file = self.storage_file.with_suffix(".tmp")
with open(temp_file, "w") as f:
json.dump(data, f, indent=2)
temp_file.replace(self.storage_file)
atomic_write_json(self.storage_file, data)
logger.debug(f"Saved {len(self._devices)} devices to storage")
@@ -181,6 +182,7 @@ class DeviceStore:
send_latency_ms: int = 0,
rgbw: bool = False,
zone_mode: str = "combined",
tags: Optional[List[str]] = None,
) -> Device:
"""Create a new device."""
device_id = f"device_{uuid.uuid4().hex[:8]}"
@@ -200,6 +202,7 @@ class DeviceStore:
send_latency_ms=send_latency_ms,
rgbw=rgbw,
zone_mode=zone_mode,
tags=tags or [],
)
self._devices[device_id] = device
@@ -228,6 +231,7 @@ class DeviceStore:
send_latency_ms: Optional[int] = None,
rgbw: Optional[bool] = None,
zone_mode: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> Device:
"""Update device."""
device = self._devices.get(device_id)
@@ -252,8 +256,10 @@ class DeviceStore:
device.rgbw = rgbw
if zone_mode is not None:
device.zone_mode = zone_mode
if tags is not None:
device.tags = tags
device.updated_at = datetime.utcnow()
device.updated_at = datetime.now(timezone.utc)
self.save()
logger.info(f"Updated device {device_id}")

View File

@@ -1,7 +1,7 @@
"""Key colors output target — extracts key colors from image rectangles."""
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.storage.output_target import OutputTarget
@@ -100,9 +100,10 @@ class KeyColorsOutputTarget(OutputTarget):
def update_fields(self, *, name=None, device_id=None, picture_source_id=None,
settings=None, key_colors_settings=None, description=None,
tags=None,
**_kwargs) -> None:
"""Apply mutable field updates for KC targets."""
super().update_fields(name=name, description=description)
super().update_fields(name=name, description=description, tags=tags)
if picture_source_id is not None:
self.picture_source_id = picture_source_id
if key_colors_settings is not None:
@@ -130,6 +131,7 @@ class KeyColorsOutputTarget(OutputTarget):
picture_source_id=data.get("picture_source_id", ""),
settings=settings,
description=data.get("description"),
created_at=datetime.fromisoformat(data.get("created_at", datetime.utcnow().isoformat())),
updated_at=datetime.fromisoformat(data.get("updated_at", datetime.utcnow().isoformat())),
tags=data.get("tags", []),
created_at=datetime.fromisoformat(data.get("created_at", datetime.now(timezone.utc).isoformat())),
updated_at=datetime.fromisoformat(data.get("updated_at", datetime.now(timezone.utc).isoformat())),
)

View File

@@ -1,8 +1,8 @@
"""Output target base data model."""
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from typing import List, Optional
@dataclass
@@ -15,6 +15,7 @@ class OutputTarget:
created_at: datetime
updated_at: datetime
description: Optional[str] = None
tags: List[str] = field(default_factory=list)
def register_with_manager(self, manager) -> None:
"""Register this target with the processor manager. Subclasses override."""
@@ -26,12 +27,15 @@ class OutputTarget:
def update_fields(self, *, name=None, device_id=None, picture_source_id=None,
settings=None, key_colors_settings=None, description=None,
tags: Optional[List[str]] = None,
**_kwargs) -> None:
"""Apply mutable field updates. Base handles common fields; subclasses handle type-specific ones."""
if name is not None:
self.name = name
if description is not None:
self.description = description
if tags is not None:
self.tags = tags
@property
def has_picture_source(self) -> bool:
@@ -45,6 +49,7 @@ class OutputTarget:
"name": self.name,
"target_type": self.target_type,
"description": self.description,
"tags": self.tags,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}

View File

@@ -2,93 +2,61 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.storage.base_store import BaseJsonStore
from wled_controller.storage.output_target import OutputTarget
from wled_controller.storage.wled_output_target import WledOutputTarget
from wled_controller.storage.key_colors_output_target import (
KeyColorsSettings,
KeyColorsOutputTarget,
)
from wled_controller.utils import atomic_write_json, get_logger
from wled_controller.utils import get_logger
logger = get_logger(__name__)
DEFAULT_STATE_CHECK_INTERVAL = 30 # seconds
class OutputTargetStore:
class OutputTargetStore(BaseJsonStore[OutputTarget]):
"""Persistent storage for output targets."""
def __init__(self, file_path: str):
"""Initialize output target store.
_json_key = "output_targets"
_entity_name = "Output target"
Args:
file_path: Path to targets JSON file
"""
self.file_path = Path(file_path)
self._targets: Dict[str, OutputTarget] = {}
self._load()
def __init__(self, file_path: str):
super().__init__(file_path, OutputTarget.from_dict)
def _load(self) -> None:
"""Load targets from file."""
"""Override to support legacy 'picture_targets' JSON key."""
import json as _json
from pathlib import Path
if not self.file_path.exists():
logger.info(f"{self._entity_name} store file not found — starting empty")
return
try:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Support both new "output_targets" and legacy "picture_targets" keys
data = _json.load(f)
targets_data = data.get("output_targets") or data.get("picture_targets", {})
loaded = 0
for target_id, target_dict in targets_data.items():
try:
target = OutputTarget.from_dict(target_dict)
self._targets[target_id] = target
self._items[target_id] = self._deserializer(target_dict)
loaded += 1
except Exception as e:
logger.error(f"Failed to load output target {target_id}: {e}", exc_info=True)
logger.error(f"Failed to load {self._entity_name} {target_id}: {e}", exc_info=True)
if loaded > 0:
logger.info(f"Loaded {loaded} output targets from storage")
logger.info(f"Loaded {loaded} {self._json_key} from storage")
except Exception as e:
logger.error(f"Failed to load output targets from {self.file_path}: {e}")
logger.error(f"Failed to load {self._json_key} from {self.file_path}: {e}")
raise
logger.info(f"{self._entity_name} store initialized with {len(self._items)} items")
logger.info(f"Output target store initialized with {len(self._targets)} targets")
def _save(self) -> None:
"""Save all targets to file."""
try:
data = {
"version": "1.0.0",
"output_targets": {
target_id: target.to_dict()
for target_id, target in self._targets.items()
},
}
atomic_write_json(self.file_path, data)
except Exception as e:
logger.error(f"Failed to save output targets to {self.file_path}: {e}")
raise
def get_all_targets(self) -> List[OutputTarget]:
"""Get all output targets."""
return list(self._targets.values())
def get_target(self, target_id: str) -> OutputTarget:
"""Get target by ID.
Raises:
ValueError: If target not found
"""
if target_id not in self._targets:
raise ValueError(f"Output target not found: {target_id}")
return self._targets[target_id]
# Backward-compatible aliases
get_all_targets = BaseJsonStore.get_all
get_target = BaseJsonStore.get
delete_target = BaseJsonStore.delete
def create_target(
self,
@@ -106,6 +74,7 @@ class OutputTargetStore:
key_colors_settings: Optional[KeyColorsSettings] = None,
description: Optional[str] = None,
picture_source_id: str = "",
tags: Optional[List[str]] = None,
) -> OutputTarget:
"""Create a new output target.
@@ -116,12 +85,12 @@ class OutputTargetStore:
raise ValueError(f"Invalid target type: {target_type}")
# Check for duplicate name
for target in self._targets.values():
for target in self._items.values():
if target.name == name:
raise ValueError(f"Output target with name '{name}' already exists")
target_id = f"pt_{uuid.uuid4().hex[:8]}"
now = datetime.utcnow()
now = datetime.now(timezone.utc)
if target_type == "led":
target: OutputTarget = WledOutputTarget(
@@ -155,7 +124,8 @@ class OutputTargetStore:
else:
raise ValueError(f"Unknown target type: {target_type}")
self._targets[target_id] = target
target.tags = tags or []
self._items[target_id] = target
self._save()
logger.info(f"Created output target: {name} ({target_id}, type={target_type})")
@@ -176,20 +146,21 @@ class OutputTargetStore:
protocol: Optional[str] = None,
key_colors_settings: Optional[KeyColorsSettings] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> OutputTarget:
"""Update an output target.
Raises:
ValueError: If target not found or validation fails
"""
if target_id not in self._targets:
if target_id not in self._items:
raise ValueError(f"Output target not found: {target_id}")
target = self._targets[target_id]
target = self._items[target_id]
if name is not None:
# Check for duplicate name (exclude self)
for other in self._targets.values():
for other in self._items.values():
if other.id != target_id and other.name == name:
raise ValueError(f"Output target with name '{name}' already exists")
@@ -206,50 +177,37 @@ class OutputTargetStore:
protocol=protocol,
key_colors_settings=key_colors_settings,
description=description,
tags=tags,
)
target.updated_at = datetime.utcnow()
target.updated_at = datetime.now(timezone.utc)
self._save()
logger.info(f"Updated output target: {target_id}")
return target
def delete_target(self, target_id: str) -> None:
"""Delete an output target.
Raises:
ValueError: If target not found
"""
if target_id not in self._targets:
raise ValueError(f"Output target not found: {target_id}")
del self._targets[target_id]
self._save()
logger.info(f"Deleted output target: {target_id}")
def get_targets_for_device(self, device_id: str) -> List[OutputTarget]:
"""Get all targets that reference a specific device."""
return [
t for t in self._targets.values()
t for t in self._items.values()
if isinstance(t, WledOutputTarget) and t.device_id == device_id
]
def get_targets_referencing_source(self, source_id: str) -> List[str]:
"""Return names of KC targets that reference a picture source."""
return [
target.name for target in self._targets.values()
target.name for target in self._items.values()
if isinstance(target, KeyColorsOutputTarget) and target.picture_source_id == source_id
]
def get_targets_referencing_css(self, css_id: str) -> List[str]:
"""Return names of LED targets that reference a color strip source."""
return [
target.name for target in self._targets.values()
target.name for target in self._items.values()
if isinstance(target, WledOutputTarget)
and target.color_strip_source_id == css_id
]
def count(self) -> int:
"""Get number of targets."""
return len(self._targets)
return len(self._items)

View File

@@ -1,7 +1,7 @@
"""Pattern template data model for key color rectangle layouts."""
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.storage.key_colors_output_target import KeyColorRectangle
@@ -17,6 +17,7 @@ class PatternTemplate:
created_at: datetime
updated_at: datetime
description: Optional[str] = None
tags: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
"""Convert to dictionary."""
@@ -27,6 +28,7 @@ class PatternTemplate:
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"description": self.description,
"tags": self.tags,
}
@classmethod
@@ -39,9 +41,10 @@ class PatternTemplate:
rectangles=rectangles,
created_at=datetime.fromisoformat(data["created_at"])
if isinstance(data.get("created_at"), str)
else data.get("created_at", datetime.utcnow()),
else data.get("created_at", datetime.now(timezone.utc)),
updated_at=datetime.fromisoformat(data["updated_at"])
if isinstance(data.get("updated_at"), str)
else data.get("updated_at", datetime.utcnow()),
else data.get("updated_at", datetime.now(timezone.utc)),
description=data.get("description"),
tags=data.get("tags", []),
)

View File

@@ -1,42 +1,42 @@
"""Pattern template storage using JSON files."""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.storage.base_store import BaseJsonStore
from wled_controller.storage.key_colors_output_target import KeyColorRectangle
from wled_controller.storage.pattern_template import PatternTemplate
from wled_controller.utils import atomic_write_json, get_logger
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class PatternTemplateStore:
class PatternTemplateStore(BaseJsonStore[PatternTemplate]):
"""Storage for pattern templates (rectangle layouts for key color extraction).
All templates are persisted to the JSON file.
On startup, if no templates exist, a default one is auto-created.
"""
def __init__(self, file_path: str):
"""Initialize pattern template store.
_json_key = "pattern_templates"
_entity_name = "Pattern template"
Args:
file_path: Path to templates JSON file
"""
self.file_path = Path(file_path)
self._templates: Dict[str, PatternTemplate] = {}
self._load()
def __init__(self, file_path: str):
super().__init__(file_path, PatternTemplate.from_dict)
self._ensure_initial_template()
# Backward-compatible aliases
get_all_templates = BaseJsonStore.get_all
get_template = BaseJsonStore.get
delete_template = BaseJsonStore.delete
def _ensure_initial_template(self) -> None:
"""Auto-create a default pattern template if none exist."""
if self._templates:
if self._items:
return
now = datetime.utcnow()
now = datetime.now(timezone.utc)
template_id = f"pat_{uuid.uuid4().hex[:8]}"
template = PatternTemplate(
@@ -50,95 +50,24 @@ class PatternTemplateStore:
description="Default pattern template with full-frame rectangle",
)
self._templates[template_id] = template
self._items[template_id] = template
self._save()
logger.info(f"Auto-created initial pattern template: {template.name} ({template_id})")
def _load(self) -> None:
"""Load templates from file."""
if not self.file_path.exists():
return
try:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json.load(f)
templates_data = data.get("pattern_templates", {})
loaded = 0
for template_id, template_dict in templates_data.items():
try:
template = PatternTemplate.from_dict(template_dict)
self._templates[template_id] = template
loaded += 1
except Exception as e:
logger.error(
f"Failed to load pattern template {template_id}: {e}",
exc_info=True,
)
if loaded > 0:
logger.info(f"Loaded {loaded} pattern templates from storage")
except Exception as e:
logger.error(f"Failed to load pattern templates from {self.file_path}: {e}")
raise
logger.info(f"Pattern template store initialized with {len(self._templates)} templates")
def _save(self) -> None:
"""Save all templates to file."""
try:
data = {
"version": "1.0.0",
"pattern_templates": {
template_id: template.to_dict()
for template_id, template in self._templates.items()
},
}
atomic_write_json(self.file_path, data)
except Exception as e:
logger.error(f"Failed to save pattern templates to {self.file_path}: {e}")
raise
def get_all_templates(self) -> List[PatternTemplate]:
"""Get all pattern templates."""
return list(self._templates.values())
def get_template(self, template_id: str) -> PatternTemplate:
"""Get template by ID.
Raises:
ValueError: If template not found
"""
if template_id not in self._templates:
raise ValueError(f"Pattern template not found: {template_id}")
return self._templates[template_id]
def create_template(
self,
name: str,
rectangles: Optional[List[KeyColorRectangle]] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> PatternTemplate:
"""Create a new pattern template.
Args:
name: Template name (must be unique)
rectangles: List of named rectangles
description: Optional description
Raises:
ValueError: If template with same name exists
"""
for template in self._templates.values():
if template.name == name:
raise ValueError(f"Pattern template with name '{name}' already exists")
self._check_name_unique(name)
if rectangles is None:
rectangles = []
template_id = f"pat_{uuid.uuid4().hex[:8]}"
now = datetime.utcnow()
now = datetime.now(timezone.utc)
template = PatternTemplate(
id=template_id,
@@ -147,9 +76,10 @@ class PatternTemplateStore:
created_at=now,
updated_at=now,
description=description,
tags=tags or [],
)
self._templates[template_id] = template
self._items[template_id] = template
self._save()
logger.info(f"Created pattern template: {name} ({template_id})")
@@ -161,48 +91,26 @@ class PatternTemplateStore:
name: Optional[str] = None,
rectangles: Optional[List[KeyColorRectangle]] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> PatternTemplate:
"""Update an existing pattern template.
Raises:
ValueError: If template not found
"""
if template_id not in self._templates:
raise ValueError(f"Pattern template not found: {template_id}")
template = self._templates[template_id]
template = self.get(template_id)
if name is not None:
for tid, t in self._templates.items():
if tid != template_id and t.name == name:
raise ValueError(f"Pattern template with name '{name}' already exists")
self._check_name_unique(name, exclude_id=template_id)
template.name = name
if rectangles is not None:
template.rectangles = rectangles
if description is not None:
template.description = description
if tags is not None:
template.tags = tags
template.updated_at = datetime.utcnow()
template.updated_at = datetime.now(timezone.utc)
self._save()
logger.info(f"Updated pattern template: {template_id}")
return template
def delete_template(self, template_id: str) -> None:
"""Delete a pattern template.
Raises:
ValueError: If template not found
"""
if template_id not in self._templates:
raise ValueError(f"Pattern template not found: {template_id}")
del self._templates[template_id]
self._save()
logger.info(f"Deleted pattern template: {template_id}")
def get_targets_referencing(self, template_id: str, output_target_store) -> List[str]:
"""Return names of KC targets that reference this template."""
from wled_controller.storage.key_colors_output_target import KeyColorsOutputTarget

View File

@@ -1,8 +1,8 @@
"""Picture source data model with inheritance-based stream types."""
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List, Optional
@dataclass
@@ -21,6 +21,7 @@ class PictureSource:
created_at: datetime
updated_at: datetime
description: Optional[str] = None
tags: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
"""Convert stream to dictionary. Subclasses extend this."""
@@ -31,6 +32,7 @@ class PictureSource:
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"description": self.description,
"tags": self.tags,
# Subclass fields default to None for backward compat
"display_index": None,
"capture_template_id": None,
@@ -47,39 +49,40 @@ class PictureSource:
sid: str = data["id"]
name: str = data["name"]
description: str | None = data.get("description")
tags: list = data.get("tags", [])
raw_created = data.get("created_at")
created_at: datetime = (
datetime.fromisoformat(raw_created)
if isinstance(raw_created, str)
else raw_created if isinstance(raw_created, datetime)
else datetime.utcnow()
else datetime.now(timezone.utc)
)
raw_updated = data.get("updated_at")
updated_at: datetime = (
datetime.fromisoformat(raw_updated)
if isinstance(raw_updated, str)
else raw_updated if isinstance(raw_updated, datetime)
else datetime.utcnow()
else datetime.now(timezone.utc)
)
if stream_type == "processed":
return ProcessedPictureSource(
id=sid, name=name, stream_type=stream_type,
created_at=created_at, updated_at=updated_at, description=description,
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
source_stream_id=data.get("source_stream_id") or "",
postprocessing_template_id=data.get("postprocessing_template_id") or "",
)
elif stream_type == "static_image":
return StaticImagePictureSource(
id=sid, name=name, stream_type=stream_type,
created_at=created_at, updated_at=updated_at, description=description,
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
image_source=data.get("image_source") or "",
)
else:
return ScreenCapturePictureSource(
id=sid, name=name, stream_type=stream_type,
created_at=created_at, updated_at=updated_at, description=description,
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
display_index=data.get("display_index") or 0,
capture_template_id=data.get("capture_template_id") or "",
target_fps=data.get("target_fps") or 30,

View File

@@ -1,84 +1,43 @@
"""Picture source storage using JSON files."""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Set
from datetime import datetime, timezone
from typing import List, Optional, Set
from wled_controller.storage.base_store import BaseJsonStore
from wled_controller.storage.picture_source import (
PictureSource,
ScreenCapturePictureSource,
ProcessedPictureSource,
ScreenCapturePictureSource,
StaticImagePictureSource,
)
from wled_controller.utils import atomic_write_json, get_logger
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class PictureSourceStore:
class PictureSourceStore(BaseJsonStore[PictureSource]):
"""Storage for picture sources.
Supports raw and processed stream types with cycle detection
for processed streams that reference other streams.
"""
_json_key = "picture_sources"
_entity_name = "Picture source"
def __init__(self, file_path: str):
"""Initialize picture source store.
super().__init__(file_path, PictureSource.from_dict)
Args:
file_path: Path to streams JSON file
"""
self.file_path = Path(file_path)
self._streams: Dict[str, PictureSource] = {}
self._load()
# Backward-compatible aliases
get_all_sources = BaseJsonStore.get_all
get_source = BaseJsonStore.get
def _load(self) -> None:
"""Load streams from file."""
if not self.file_path.exists():
return
# Legacy aliases (old code used "stream" naming)
get_all_streams = BaseJsonStore.get_all
get_stream = BaseJsonStore.get
try:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json.load(f)
streams_data = data.get("picture_sources", {})
loaded = 0
for stream_id, stream_dict in streams_data.items():
try:
stream = PictureSource.from_dict(stream_dict)
self._streams[stream_id] = stream
loaded += 1
except Exception as e:
logger.error(
f"Failed to load picture source {stream_id}: {e}",
exc_info=True,
)
if loaded > 0:
logger.info(f"Loaded {loaded} picture sources from storage")
except Exception as e:
logger.error(f"Failed to load picture sources from {self.file_path}: {e}")
raise
logger.info(f"Picture source store initialized with {len(self._streams)} streams")
def _save(self) -> None:
"""Save all streams to file."""
try:
data = {
"version": "1.0.0",
"picture_sources": {
stream_id: stream.to_dict()
for stream_id, stream in self._streams.items()
},
}
atomic_write_json(self.file_path, data)
except Exception as e:
logger.error(f"Failed to save picture sources to {self.file_path}: {e}")
raise
# ── Helpers ───────────────────────────────────────────────────────
def _detect_cycle(self, source_stream_id: str, exclude_stream_id: Optional[str] = None) -> bool:
"""Detect if following the source chain from source_stream_id would create a cycle.
@@ -100,7 +59,7 @@ class PictureSourceStore:
return True
visited.add(current_id)
current_stream = self._streams.get(current_id)
current_stream = self._items.get(current_id)
if not current_stream:
break
if not isinstance(current_stream, ProcessedPictureSource):
@@ -109,19 +68,7 @@ class PictureSourceStore:
return False
def get_all_streams(self) -> List[PictureSource]:
"""Get all picture sources."""
return list(self._streams.values())
def get_stream(self, stream_id: str) -> PictureSource:
"""Get stream by ID.
Raises:
ValueError: If stream not found
"""
if stream_id not in self._streams:
raise ValueError(f"Picture source not found: {stream_id}")
return self._streams[stream_id]
# ── CRUD ──────────────────────────────────────────────────────────
def create_stream(
self,
@@ -134,6 +81,7 @@ class PictureSourceStore:
postprocessing_template_id: Optional[str] = None,
image_source: Optional[str] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> PictureSource:
"""Create a new picture source.
@@ -167,7 +115,7 @@ class PictureSourceStore:
if not postprocessing_template_id:
raise ValueError("Processed streams require postprocessing_template_id")
# Validate source stream exists
if source_stream_id not in self._streams:
if source_stream_id not in self._items:
raise ValueError(f"Source stream not found: {source_stream_id}")
# Check for cycles
if self._detect_cycle(source_stream_id):
@@ -177,16 +125,15 @@ class PictureSourceStore:
raise ValueError("Static image streams require image_source")
# Check for duplicate name
for stream in self._streams.values():
if stream.name == name:
raise ValueError(f"Picture source with name '{name}' already exists")
self._check_name_unique(name)
stream_id = f"ps_{uuid.uuid4().hex[:8]}"
now = datetime.utcnow()
now = datetime.now(timezone.utc)
common = dict(
id=stream_id, name=name, stream_type=stream_type,
created_at=now, updated_at=now, description=description,
tags=tags or [],
)
stream: PictureSource
@@ -209,7 +156,7 @@ class PictureSourceStore:
image_source=image_source, # type: ignore[arg-type]
)
self._streams[stream_id] = stream
self._items[stream_id] = stream
self._save()
logger.info(f"Created picture source: {name} ({stream_id}, type={stream_type})")
@@ -226,28 +173,29 @@ class PictureSourceStore:
postprocessing_template_id: Optional[str] = None,
image_source: Optional[str] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> PictureSource:
"""Update an existing picture source.
Raises:
ValueError: If stream not found, validation fails, or cycle detected
"""
if stream_id not in self._streams:
raise ValueError(f"Picture source not found: {stream_id}")
stream = self._streams[stream_id]
stream = self.get(stream_id)
# If changing source_stream_id on a processed stream, check for cycles
if source_stream_id is not None and isinstance(stream, ProcessedPictureSource):
if source_stream_id not in self._streams:
if source_stream_id not in self._items:
raise ValueError(f"Source stream not found: {source_stream_id}")
if self._detect_cycle(source_stream_id, exclude_stream_id=stream_id):
raise ValueError("Cycle detected in stream chain")
if name is not None:
self._check_name_unique(name, exclude_id=stream_id)
stream.name = name
if description is not None:
stream.description = description
if tags is not None:
stream.tags = tags
if isinstance(stream, ScreenCapturePictureSource):
if display_index is not None:
@@ -265,7 +213,7 @@ class PictureSourceStore:
if image_source is not None:
stream.image_source = image_source
stream.updated_at = datetime.utcnow()
stream.updated_at = datetime.now(timezone.utc)
self._save()
@@ -278,22 +226,29 @@ class PictureSourceStore:
Raises:
ValueError: If stream not found or is referenced by another stream
"""
if stream_id not in self._streams:
if stream_id not in self._items:
raise ValueError(f"Picture source not found: {stream_id}")
# Check if any other stream references this one as source
for other_stream in self._streams.values():
for other_stream in self._items.values():
if isinstance(other_stream, ProcessedPictureSource) and other_stream.source_stream_id == stream_id:
raise ValueError(
f"Cannot delete stream '{self._streams[stream_id].name}': "
f"Cannot delete stream '{self._items[stream_id].name}': "
f"it is referenced by stream '{other_stream.name}'"
)
del self._streams[stream_id]
del self._items[stream_id]
self._save()
logger.info(f"Deleted picture source: {stream_id}")
# Also expose as delete_source for consistency
def delete_source(self, source_id: str) -> None:
"""Alias for delete_stream with reference checking."""
self.delete_stream(source_id)
# ── Query helpers ─────────────────────────────────────────────────
def get_targets_referencing(self, stream_id: str, target_store) -> List[str]:
"""Return names of targets that reference this stream."""
return target_store.get_targets_referencing_source(stream_id)
@@ -324,7 +279,7 @@ class PictureSourceStore:
raise ValueError(f"Cycle detected in stream chain at {current_id}")
visited.add(current_id)
stream = self.get_stream(current_id)
stream = self.get(current_id)
if not isinstance(stream, ProcessedPictureSource):
return {

View File

@@ -1,7 +1,7 @@
"""Postprocessing template data model."""
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.core.filters.filter_instance import FilterInstance
@@ -17,6 +17,7 @@ class PostprocessingTemplate:
created_at: datetime
updated_at: datetime
description: Optional[str] = None
tags: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
"""Convert template to dictionary."""
@@ -27,6 +28,7 @@ class PostprocessingTemplate:
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"description": self.description,
"tags": self.tags,
}
@classmethod
@@ -40,9 +42,10 @@ class PostprocessingTemplate:
filters=filters,
created_at=datetime.fromisoformat(data["created_at"])
if isinstance(data.get("created_at"), str)
else data.get("created_at", datetime.utcnow()),
else data.get("created_at", datetime.now(timezone.utc)),
updated_at=datetime.fromisoformat(data["updated_at"])
if isinstance(data.get("updated_at"), str)
else data.get("updated_at", datetime.utcnow()),
else data.get("updated_at", datetime.now(timezone.utc)),
description=data.get("description"),
tags=data.get("tags", []),
)

View File

@@ -1,44 +1,45 @@
"""Postprocessing template storage using JSON files."""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.core.filters.filter_instance import FilterInstance
from wled_controller.core.filters.registry import FilterRegistry
from wled_controller.storage.base_store import BaseJsonStore
from wled_controller.storage.picture_source import ProcessedPictureSource
from wled_controller.storage.postprocessing_template import PostprocessingTemplate
from wled_controller.utils import atomic_write_json, get_logger
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class PostprocessingTemplateStore:
class PostprocessingTemplateStore(BaseJsonStore[PostprocessingTemplate]):
"""Storage for postprocessing templates.
All templates are persisted to the JSON file.
On startup, if no templates exist, a default one is auto-created.
"""
def __init__(self, file_path: str):
"""Initialize postprocessing template store.
_json_key = "postprocessing_templates"
_entity_name = "Postprocessing template"
_version = "2.0.0"
Args:
file_path: Path to templates JSON file
"""
self.file_path = Path(file_path)
self._templates: Dict[str, PostprocessingTemplate] = {}
self._load()
def __init__(self, file_path: str):
super().__init__(file_path, PostprocessingTemplate.from_dict)
self._ensure_initial_template()
# Backward-compatible aliases
get_all_templates = BaseJsonStore.get_all
get_template = BaseJsonStore.get
delete_template = BaseJsonStore.delete
def _ensure_initial_template(self) -> None:
"""Auto-create a default postprocessing template if none exist."""
if self._templates:
if self._items:
return
now = datetime.utcnow()
now = datetime.now(timezone.utc)
template_id = f"pp_{uuid.uuid4().hex[:8]}"
template = PostprocessingTemplate(
@@ -54,89 +55,18 @@ class PostprocessingTemplateStore:
description="Default postprocessing template",
)
self._templates[template_id] = template
self._items[template_id] = template
self._save()
logger.info(f"Auto-created initial postprocessing template: {template.name} ({template_id})")
def _load(self) -> None:
"""Load templates from file."""
if not self.file_path.exists():
return
try:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json.load(f)
templates_data = data.get("postprocessing_templates", {})
loaded = 0
for template_id, template_dict in templates_data.items():
try:
template = PostprocessingTemplate.from_dict(template_dict)
self._templates[template_id] = template
loaded += 1
except Exception as e:
logger.error(
f"Failed to load postprocessing template {template_id}: {e}",
exc_info=True,
)
if loaded > 0:
logger.info(f"Loaded {loaded} postprocessing templates from storage")
except Exception as e:
logger.error(f"Failed to load postprocessing templates from {self.file_path}: {e}")
raise
logger.info(f"Postprocessing template store initialized with {len(self._templates)} templates")
def _save(self) -> None:
"""Save all templates to file."""
try:
data = {
"version": "2.0.0",
"postprocessing_templates": {
template_id: template.to_dict()
for template_id, template in self._templates.items()
},
}
atomic_write_json(self.file_path, data)
except Exception as e:
logger.error(f"Failed to save postprocessing templates to {self.file_path}: {e}")
raise
def get_all_templates(self) -> List[PostprocessingTemplate]:
"""Get all postprocessing templates."""
return list(self._templates.values())
def get_template(self, template_id: str) -> PostprocessingTemplate:
"""Get template by ID.
Raises:
ValueError: If template not found
"""
if template_id not in self._templates:
raise ValueError(f"Postprocessing template not found: {template_id}")
return self._templates[template_id]
def create_template(
self,
name: str,
filters: Optional[List[FilterInstance]] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> PostprocessingTemplate:
"""Create a new postprocessing template.
Args:
name: Template name (must be unique)
filters: Ordered list of filter instances
description: Optional description
Raises:
ValueError: If template with same name exists or invalid filter_id
"""
for template in self._templates.values():
if template.name == name:
raise ValueError(f"Postprocessing template with name '{name}' already exists")
self._check_name_unique(name)
if filters is None:
filters = []
@@ -147,7 +77,7 @@ class PostprocessingTemplateStore:
raise ValueError(f"Unknown filter type: '{fi.filter_id}'")
template_id = f"pp_{uuid.uuid4().hex[:8]}"
now = datetime.utcnow()
now = datetime.now(timezone.utc)
template = PostprocessingTemplate(
id=template_id,
@@ -156,9 +86,10 @@ class PostprocessingTemplateStore:
created_at=now,
updated_at=now,
description=description,
tags=tags or [],
)
self._templates[template_id] = template
self._items[template_id] = template
self._save()
logger.info(f"Created postprocessing template: {name} ({template_id})")
@@ -170,21 +101,12 @@ class PostprocessingTemplateStore:
name: Optional[str] = None,
filters: Optional[List[FilterInstance]] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> PostprocessingTemplate:
"""Update an existing postprocessing template.
Raises:
ValueError: If template not found or invalid filter_id
"""
if template_id not in self._templates:
raise ValueError(f"Postprocessing template not found: {template_id}")
template = self._templates[template_id]
template = self.get(template_id)
if name is not None:
for tid, t in self._templates.items():
if tid != template_id and t.name == name:
raise ValueError(f"Postprocessing template with name '{name}' already exists")
self._check_name_unique(name, exclude_id=template_id)
template.name = name
if filters is not None:
# Validate filter IDs
@@ -194,28 +116,15 @@ class PostprocessingTemplateStore:
template.filters = filters
if description is not None:
template.description = description
if tags is not None:
template.tags = tags
template.updated_at = datetime.utcnow()
template.updated_at = datetime.now(timezone.utc)
self._save()
logger.info(f"Updated postprocessing template: {template_id}")
return template
def delete_template(self, template_id: str) -> None:
"""Delete a postprocessing template.
Raises:
ValueError: If template not found or is referenced by a picture source
"""
if template_id not in self._templates:
raise ValueError(f"Postprocessing template not found: {template_id}")
del self._templates[template_id]
self._save()
logger.info(f"Deleted postprocessing template: {template_id}")
def resolve_filter_instances(self, filter_instances, _visited=None):
"""Recursively resolve filter instances, expanding filter_template references.

View File

@@ -1,7 +1,7 @@
"""Scene preset data models — snapshot of target state."""
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timezone
from typing import List
@@ -42,16 +42,18 @@ class ScenePreset:
id: str
name: str
description: str = ""
tags: List[str] = field(default_factory=list)
targets: List[TargetSnapshot] = field(default_factory=list)
order: int = 0
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"description": self.description,
"tags": self.tags,
"targets": [t.to_dict() for t in self.targets],
"order": self.order,
"created_at": self.created_at.isoformat(),
@@ -64,8 +66,9 @@ class ScenePreset:
id=data["id"],
name=data["name"],
description=data.get("description", ""),
tags=data.get("tags", []),
targets=[TargetSnapshot.from_dict(t) for t in data.get("targets", [])],
order=data.get("order", 0),
created_at=datetime.fromisoformat(data.get("created_at", datetime.utcnow().isoformat())),
updated_at=datetime.fromisoformat(data.get("updated_at", datetime.utcnow().isoformat())),
created_at=datetime.fromisoformat(data.get("created_at", datetime.now(timezone.utc).isoformat())),
updated_at=datetime.fromisoformat(data.get("updated_at", datetime.now(timezone.utc).isoformat())),
)

View File

@@ -1,79 +1,40 @@
"""Scene preset storage using JSON files."""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.storage.base_store import BaseJsonStore
from wled_controller.storage.scene_preset import ScenePreset, TargetSnapshot
from wled_controller.utils import atomic_write_json, get_logger
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class ScenePresetStore:
class ScenePresetStore(BaseJsonStore[ScenePreset]):
"""Persistent storage for scene presets."""
_json_key = "scene_presets"
_entity_name = "Scene preset"
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self._presets: Dict[str, ScenePreset] = {}
self._load()
super().__init__(file_path, ScenePreset.from_dict)
def _load(self) -> None:
if not self.file_path.exists():
return
try:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json.load(f)
presets_data = data.get("scene_presets", {})
loaded = 0
for preset_id, preset_dict in presets_data.items():
try:
preset = ScenePreset.from_dict(preset_dict)
self._presets[preset_id] = preset
loaded += 1
except Exception as e:
logger.error(f"Failed to load scene preset {preset_id}: {e}", exc_info=True)
if loaded > 0:
logger.info(f"Loaded {loaded} scene presets from storage")
except Exception as e:
logger.error(f"Failed to load scene presets from {self.file_path}: {e}")
raise
logger.info(f"Scene preset store initialized with {len(self._presets)} presets")
def _save(self) -> None:
try:
data = {
"version": "1.0.0",
"scene_presets": {
pid: p.to_dict() for pid, p in self._presets.items()
},
}
atomic_write_json(self.file_path, data)
except Exception as e:
logger.error(f"Failed to save scene presets to {self.file_path}: {e}")
raise
# Backward-compatible aliases
get_preset = BaseJsonStore.get
delete_preset = BaseJsonStore.delete
def get_all_presets(self) -> List[ScenePreset]:
return sorted(self._presets.values(), key=lambda p: p.order)
"""Get all presets sorted by order field."""
return sorted(self._items.values(), key=lambda p: p.order)
def get_preset(self, preset_id: str) -> ScenePreset:
if preset_id not in self._presets:
raise ValueError(f"Scene preset not found: {preset_id}")
return self._presets[preset_id]
# Override get_all to also sort by order for consistency
def get_all(self) -> List[ScenePreset]:
return self.get_all_presets()
def create_preset(self, preset: ScenePreset) -> ScenePreset:
for p in self._presets.values():
if p.name == preset.name:
raise ValueError(f"Scene preset with name '{preset.name}' already exists")
self._check_name_unique(preset.name)
self._presets[preset.id] = preset
self._items[preset.id] = preset
self._save()
logger.info(f"Created scene preset: {preset.name} ({preset.id})")
return preset
@@ -85,16 +46,12 @@ class ScenePresetStore:
description: Optional[str] = None,
order: Optional[int] = None,
targets: Optional[List[TargetSnapshot]] = None,
tags: Optional[List[str]] = None,
) -> ScenePreset:
if preset_id not in self._presets:
raise ValueError(f"Scene preset not found: {preset_id}")
preset = self._presets[preset_id]
preset = self.get(preset_id)
if name is not None:
for pid, p in self._presets.items():
if pid != preset_id and p.name == name:
raise ValueError(f"Scene preset with name '{name}' already exists")
self._check_name_unique(name, exclude_id=preset_id)
preset.name = name
if description is not None:
preset.description = description
@@ -102,31 +59,20 @@ class ScenePresetStore:
preset.order = order
if targets is not None:
preset.targets = targets
if tags is not None:
preset.tags = tags
preset.updated_at = datetime.utcnow()
preset.updated_at = datetime.now(timezone.utc)
self._save()
logger.info(f"Updated scene preset: {preset_id}")
return preset
def recapture_preset(self, preset_id: str, preset: ScenePreset) -> ScenePreset:
"""Replace snapshot data of an existing preset (recapture current state)."""
if preset_id not in self._presets:
raise ValueError(f"Scene preset not found: {preset_id}")
existing = self.get(preset_id)
existing = self._presets[preset_id]
existing.targets = preset.targets
existing.updated_at = datetime.utcnow()
existing.updated_at = datetime.now(timezone.utc)
self._save()
logger.info(f"Recaptured scene preset: {preset_id}")
return existing
def delete_preset(self, preset_id: str) -> None:
if preset_id not in self._presets:
raise ValueError(f"Scene preset not found: {preset_id}")
del self._presets[preset_id]
self._save()
logger.info(f"Deleted scene preset: {preset_id}")
def count(self) -> int:
return len(self._presets)

View File

@@ -5,9 +5,9 @@ color strip sources. Multiple CSS sources referencing the same clock
animate in sync and share speed / pause / resume / reset controls.
"""
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from typing import List, Optional
@dataclass
@@ -20,6 +20,7 @@ class SyncClock:
created_at: datetime
updated_at: datetime
description: Optional[str] = None
tags: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {
@@ -27,6 +28,7 @@ class SyncClock:
"name": self.name,
"speed": self.speed,
"description": self.description,
"tags": self.tags,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}
@@ -38,6 +40,7 @@ class SyncClock:
name=data["name"],
speed=float(data.get("speed", 1.0)),
description=data.get("description"),
tags=data.get("tags", []),
created_at=datetime.fromisoformat(data["created_at"]),
updated_at=datetime.fromisoformat(data["updated_at"]),
)

View File

@@ -1,95 +1,38 @@
"""Synchronization clock storage using JSON files."""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.storage.base_store import BaseJsonStore
from wled_controller.storage.sync_clock import SyncClock
from wled_controller.utils import atomic_write_json, get_logger
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class SyncClockStore:
"""Persistent storage for synchronization clocks."""
class SyncClockStore(BaseJsonStore[SyncClock]):
_json_key = "sync_clocks"
_entity_name = "Sync clock"
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self._clocks: Dict[str, SyncClock] = {}
self._load()
super().__init__(file_path, SyncClock.from_dict)
def _load(self) -> None:
if not self.file_path.exists():
logger.info("Sync clock store file not found — starting empty")
return
try:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json.load(f)
clocks_data = data.get("sync_clocks", {})
loaded = 0
for clock_id, clock_dict in clocks_data.items():
try:
clock = SyncClock.from_dict(clock_dict)
self._clocks[clock_id] = clock
loaded += 1
except Exception as e:
logger.error(
f"Failed to load sync clock {clock_id}: {e}",
exc_info=True,
)
if loaded > 0:
logger.info(f"Loaded {loaded} sync clocks from storage")
except Exception as e:
logger.error(f"Failed to load sync clocks from {self.file_path}: {e}")
raise
logger.info(f"Sync clock store initialized with {len(self._clocks)} clocks")
def _save(self) -> None:
try:
data = {
"version": "1.0.0",
"sync_clocks": {
cid: clock.to_dict()
for cid, clock in self._clocks.items()
},
}
atomic_write_json(self.file_path, data)
except Exception as e:
logger.error(f"Failed to save sync clocks to {self.file_path}: {e}")
raise
# ── CRUD ─────────────────────────────────────────────────────────
def get_all_clocks(self) -> List[SyncClock]:
return list(self._clocks.values())
def get_clock(self, clock_id: str) -> SyncClock:
if clock_id not in self._clocks:
raise ValueError(f"Sync clock not found: {clock_id}")
return self._clocks[clock_id]
# Backward-compatible aliases
get_all_clocks = BaseJsonStore.get_all
get_clock = BaseJsonStore.get
delete_clock = BaseJsonStore.delete
def create_clock(
self,
name: str,
speed: float = 1.0,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> SyncClock:
if not name or not name.strip():
raise ValueError("Name is required")
for clock in self._clocks.values():
if clock.name == name:
raise ValueError(f"Sync clock with name '{name}' already exists")
self._check_name_unique(name)
cid = f"sc_{uuid.uuid4().hex[:8]}"
now = datetime.utcnow()
now = datetime.now(timezone.utc)
clock = SyncClock(
id=cid,
@@ -98,11 +41,11 @@ class SyncClockStore:
created_at=now,
updated_at=now,
description=description,
tags=tags or [],
)
self._clocks[cid] = clock
self._items[cid] = clock
self._save()
logger.info(f"Created sync clock: {name} ({cid}, speed={clock.speed})")
return clock
@@ -112,35 +55,21 @@ class SyncClockStore:
name: Optional[str] = None,
speed: Optional[float] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> SyncClock:
if clock_id not in self._clocks:
raise ValueError(f"Sync clock not found: {clock_id}")
clock = self._clocks[clock_id]
clock = self.get(clock_id)
if name is not None:
for other in self._clocks.values():
if other.id != clock_id and other.name == name:
raise ValueError(f"Sync clock with name '{name}' already exists")
self._check_name_unique(name, exclude_id=clock_id)
clock.name = name
if speed is not None:
clock.speed = max(0.1, min(10.0, speed))
if description is not None:
clock.description = description
if tags is not None:
clock.tags = tags
clock.updated_at = datetime.utcnow()
clock.updated_at = datetime.now(timezone.utc)
self._save()
logger.info(f"Updated sync clock: {clock_id}")
return clock
def delete_clock(self, clock_id: str) -> None:
if clock_id not in self._clocks:
raise ValueError(f"Sync clock not found: {clock_id}")
del self._clocks[clock_id]
self._save()
logger.info(f"Deleted sync clock: {clock_id}")

View File

@@ -1,8 +1,8 @@
"""Capture template data model."""
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Optional
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
@dataclass
@@ -16,6 +16,7 @@ class CaptureTemplate:
created_at: datetime
updated_at: datetime
description: Optional[str] = None
tags: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
"""Convert template to dictionary.
@@ -31,6 +32,7 @@ class CaptureTemplate:
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"description": self.description,
"tags": self.tags,
}
@classmethod
@@ -50,9 +52,10 @@ class CaptureTemplate:
engine_config=data.get("engine_config", {}),
created_at=datetime.fromisoformat(data["created_at"])
if isinstance(data.get("created_at"), str)
else data.get("created_at", datetime.utcnow()),
else data.get("created_at", datetime.now(timezone.utc)),
updated_at=datetime.fromisoformat(data["updated_at"])
if isinstance(data.get("updated_at"), str)
else data.get("updated_at", datetime.utcnow()),
else data.get("updated_at", datetime.now(timezone.utc)),
description=data.get("description"),
tags=data.get("tags", []),
)

View File

@@ -1,19 +1,18 @@
"""Template storage using JSON files."""
import json
import uuid
from datetime import datetime
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional
from wled_controller.core.capture_engines.factory import EngineRegistry
from wled_controller.storage.base_store import BaseJsonStore
from wled_controller.storage.template import CaptureTemplate
from wled_controller.utils import atomic_write_json, get_logger
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class TemplateStore:
class TemplateStore(BaseJsonStore[CaptureTemplate]):
"""Storage for capture templates.
All templates are persisted to the JSON file.
@@ -21,20 +20,21 @@ class TemplateStore:
highest-priority available engine.
"""
def __init__(self, file_path: str):
"""Initialize template store.
_json_key = "templates"
_entity_name = "Capture template"
Args:
file_path: Path to templates JSON file
"""
self.file_path = Path(file_path)
self._templates: Dict[str, CaptureTemplate] = {}
self._load()
def __init__(self, file_path: str):
super().__init__(file_path, CaptureTemplate.from_dict)
self._ensure_initial_template()
# Backward-compatible aliases
get_all_templates = BaseJsonStore.get_all
get_template = BaseJsonStore.get
delete_template = BaseJsonStore.delete
def _ensure_initial_template(self) -> None:
"""Auto-create a template if none exist, using the best available engine."""
if self._templates:
if self._items:
return
best_engine = EngineRegistry.get_best_available_engine()
@@ -44,7 +44,7 @@ class TemplateStore:
engine_class = EngineRegistry.get_engine(best_engine)
default_config = engine_class.get_default_config()
now = datetime.utcnow()
now = datetime.now(timezone.utc)
template_id = f"tpl_{uuid.uuid4().hex[:8]}"
template = CaptureTemplate(
@@ -57,111 +57,22 @@ class TemplateStore:
description=f"Default capture template using {best_engine.upper()} engine",
)
self._templates[template_id] = template
self._items[template_id] = template
self._save()
logger.info(f"Auto-created initial template: {template.name} ({template_id}, engine={best_engine})")
def _load(self) -> None:
"""Load templates from file."""
if not self.file_path.exists():
return
try:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json.load(f)
templates_data = data.get("templates", {})
loaded = 0
for template_id, template_dict in templates_data.items():
try:
template = CaptureTemplate.from_dict(template_dict)
self._templates[template_id] = template
loaded += 1
except Exception as e:
logger.error(
f"Failed to load template {template_id}: {e}",
exc_info=True
)
if loaded > 0:
logger.info(f"Loaded {loaded} templates from storage")
except Exception as e:
logger.error(f"Failed to load templates from {self.file_path}: {e}")
raise
logger.info(f"Template store initialized with {len(self._templates)} templates")
def _save(self) -> None:
"""Save all templates to file."""
try:
data = {
"version": "1.0.0",
"templates": {
template_id: template.to_dict()
for template_id, template in self._templates.items()
},
}
atomic_write_json(self.file_path, data)
except Exception as e:
logger.error(f"Failed to save templates to {self.file_path}: {e}")
raise
def get_all_templates(self) -> List[CaptureTemplate]:
"""Get all templates.
Returns:
List of all templates
"""
return list(self._templates.values())
def get_template(self, template_id: str) -> CaptureTemplate:
"""Get template by ID.
Args:
template_id: Template ID
Returns:
Template instance
Raises:
ValueError: If template not found
"""
if template_id not in self._templates:
raise ValueError(f"Template not found: {template_id}")
return self._templates[template_id]
def create_template(
self,
name: str,
engine_type: str,
engine_config: Dict[str, any],
description: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> CaptureTemplate:
"""Create a new template.
self._check_name_unique(name)
Args:
name: Template name
engine_type: Engine type (mss, dxcam, wgc)
engine_config: Engine-specific configuration
description: Optional description
Returns:
Created template
Raises:
ValueError: If template with same name exists
"""
# Check for duplicate name
for template in self._templates.values():
if template.name == name:
raise ValueError(f"Template with name '{name}' already exists")
# Generate new ID
template_id = f"tpl_{uuid.uuid4().hex[:8]}"
# Create template
now = datetime.utcnow()
now = datetime.now(timezone.utc)
template = CaptureTemplate(
id=template_id,
name=name,
@@ -170,10 +81,10 @@ class TemplateStore:
created_at=now,
updated_at=now,
description=description,
tags=tags or [],
)
# Store and save
self._templates[template_id] = template
self._items[template_id] = template
self._save()
logger.info(f"Created template: {name} ({template_id})")
@@ -186,32 +97,12 @@ class TemplateStore:
engine_type: Optional[str] = None,
engine_config: Optional[Dict[str, any]] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> CaptureTemplate:
"""Update an existing template.
template = self.get(template_id)
Args:
template_id: Template ID
name: New name (optional)
engine_type: New engine type (optional)
engine_config: New engine config (optional)
description: New description (optional)
Returns:
Updated template
Raises:
ValueError: If template not found
"""
if template_id not in self._templates:
raise ValueError(f"Template not found: {template_id}")
template = self._templates[template_id]
# Update fields
if name is not None:
for tid, t in self._templates.items():
if tid != template_id and t.name == name:
raise ValueError(f"Template with name '{name}' already exists")
self._check_name_unique(name, exclude_id=template_id)
template.name = name
if engine_type is not None:
template.engine_type = engine_type
@@ -219,29 +110,11 @@ class TemplateStore:
template.engine_config = engine_config
if description is not None:
template.description = description
if tags is not None:
template.tags = tags
template.updated_at = datetime.utcnow()
# Save
template.updated_at = datetime.now(timezone.utc)
self._save()
logger.info(f"Updated template: {template_id}")
return template
def delete_template(self, template_id: str) -> None:
"""Delete a template.
Args:
template_id: Template ID
Raises:
ValueError: If template not found
"""
if template_id not in self._templates:
raise ValueError(f"Template not found: {template_id}")
# Remove and save
del self._templates[template_id]
self._save()
logger.info(f"Deleted template: {template_id}")

View File

@@ -11,7 +11,7 @@ parameters like brightness. Five types:
"""
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timezone
from typing import List, Optional
@@ -25,6 +25,7 @@ class ValueSource:
created_at: datetime
updated_at: datetime
description: Optional[str] = None
tags: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
"""Convert source to dictionary. Subclasses extend this."""
@@ -35,6 +36,7 @@ class ValueSource:
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"description": self.description,
"tags": self.tags,
# Subclass fields default to None for forward compat
"value": None,
"waveform": None,
@@ -58,26 +60,27 @@ class ValueSource:
sid: str = data["id"]
name: str = data["name"]
description: str | None = data.get("description")
tags: list = data.get("tags", [])
raw_created = data.get("created_at")
created_at: datetime = (
datetime.fromisoformat(raw_created)
if isinstance(raw_created, str)
else raw_created if isinstance(raw_created, datetime)
else datetime.utcnow()
else datetime.now(timezone.utc)
)
raw_updated = data.get("updated_at")
updated_at: datetime = (
datetime.fromisoformat(raw_updated)
if isinstance(raw_updated, str)
else raw_updated if isinstance(raw_updated, datetime)
else datetime.utcnow()
else datetime.now(timezone.utc)
)
if source_type == "animated":
return AnimatedValueSource(
id=sid, name=name, source_type="animated",
created_at=created_at, updated_at=updated_at, description=description,
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
waveform=data.get("waveform") or "sine",
speed=float(data.get("speed") or 10.0),
min_value=float(data.get("min_value") or 0.0),
@@ -87,7 +90,7 @@ class ValueSource:
if source_type == "audio":
return AudioValueSource(
id=sid, name=name, source_type="audio",
created_at=created_at, updated_at=updated_at, description=description,
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
audio_source_id=data.get("audio_source_id") or "",
mode=data.get("mode") or "rms",
sensitivity=float(data.get("sensitivity") or 1.0),
@@ -100,7 +103,7 @@ class ValueSource:
if source_type == "adaptive_time":
return AdaptiveValueSource(
id=sid, name=name, source_type="adaptive_time",
created_at=created_at, updated_at=updated_at, description=description,
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
schedule=data.get("schedule") or [],
min_value=float(data.get("min_value") or 0.0),
max_value=float(data["max_value"]) if data.get("max_value") is not None else 1.0,
@@ -109,7 +112,7 @@ class ValueSource:
if source_type == "adaptive_scene":
return AdaptiveValueSource(
id=sid, name=name, source_type="adaptive_scene",
created_at=created_at, updated_at=updated_at, description=description,
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
picture_source_id=data.get("picture_source_id") or "",
scene_behavior=data.get("scene_behavior") or "complement",
sensitivity=float(data.get("sensitivity") or 1.0),
@@ -121,7 +124,7 @@ class ValueSource:
# Default: "static" type
return StaticValueSource(
id=sid, name=name, source_type="static",
created_at=created_at, updated_at=updated_at, description=description,
created_at=created_at, updated_at=updated_at, description=description, tags=tags,
value=float(data["value"]) if data.get("value") is not None else 1.0,
)

View File

@@ -1,11 +1,10 @@
"""Value source storage using JSON files."""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.storage.base_store import BaseJsonStore
from wled_controller.storage.value_source import (
AdaptiveValueSource,
AnimatedValueSource,
@@ -13,74 +12,27 @@ from wled_controller.storage.value_source import (
StaticValueSource,
ValueSource,
)
from wled_controller.utils import atomic_write_json, get_logger
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class ValueSourceStore:
class ValueSourceStore(BaseJsonStore[ValueSource]):
"""Persistent storage for value sources."""
_json_key = "value_sources"
_entity_name = "Value source"
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self._sources: Dict[str, ValueSource] = {}
self._load()
super().__init__(file_path, ValueSource.from_dict)
def _load(self) -> None:
if not self.file_path.exists():
logger.info("Value source store file not found — starting empty")
return
try:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json.load(f)
sources_data = data.get("value_sources", {})
loaded = 0
for source_id, source_dict in sources_data.items():
try:
source = ValueSource.from_dict(source_dict)
self._sources[source_id] = source
loaded += 1
except Exception as e:
logger.error(
f"Failed to load value source {source_id}: {e}",
exc_info=True,
)
if loaded > 0:
logger.info(f"Loaded {loaded} value sources from storage")
except Exception as e:
logger.error(f"Failed to load value sources from {self.file_path}: {e}")
raise
logger.info(f"Value source store initialized with {len(self._sources)} sources")
def _save(self) -> None:
try:
data = {
"version": "1.0.0",
"value_sources": {
sid: source.to_dict()
for sid, source in self._sources.items()
},
}
atomic_write_json(self.file_path, data)
except Exception as e:
logger.error(f"Failed to save value sources to {self.file_path}: {e}")
raise
# Backward-compatible aliases
get_all_sources = BaseJsonStore.get_all
get_source = BaseJsonStore.get
delete_source = BaseJsonStore.delete
# ── CRUD ─────────────────────────────────────────────────────────
def get_all_sources(self) -> List[ValueSource]:
return list(self._sources.values())
def get_source(self, source_id: str) -> ValueSource:
if source_id not in self._sources:
raise ValueError(f"Value source not found: {source_id}")
return self._sources[source_id]
def create_source(
self,
name: str,
@@ -99,30 +51,28 @@ class ValueSourceStore:
picture_source_id: Optional[str] = None,
scene_behavior: Optional[str] = None,
auto_gain: Optional[bool] = None,
tags: Optional[List[str]] = None,
) -> ValueSource:
if not name or not name.strip():
raise ValueError("Name is required")
if source_type not in ("static", "animated", "audio", "adaptive_time", "adaptive_scene"):
raise ValueError(f"Invalid source type: {source_type}")
for source in self._sources.values():
if source.name == name:
raise ValueError(f"Value source with name '{name}' already exists")
self._check_name_unique(name)
sid = f"vs_{uuid.uuid4().hex[:8]}"
now = datetime.utcnow()
now = datetime.now(timezone.utc)
common_tags = tags or []
if source_type == "static":
source: ValueSource = StaticValueSource(
id=sid, name=name, source_type="static",
created_at=now, updated_at=now, description=description,
created_at=now, updated_at=now, description=description, tags=common_tags,
value=value if value is not None else 1.0,
)
elif source_type == "animated":
source = AnimatedValueSource(
id=sid, name=name, source_type="animated",
created_at=now, updated_at=now, description=description,
created_at=now, updated_at=now, description=description, tags=common_tags,
waveform=waveform or "sine",
speed=speed if speed is not None else 10.0,
min_value=min_value if min_value is not None else 0.0,
@@ -131,7 +81,7 @@ class ValueSourceStore:
elif source_type == "audio":
source = AudioValueSource(
id=sid, name=name, source_type="audio",
created_at=now, updated_at=now, description=description,
created_at=now, updated_at=now, description=description, tags=common_tags,
audio_source_id=audio_source_id or "",
mode=mode or "rms",
sensitivity=sensitivity if sensitivity is not None else 1.0,
@@ -146,7 +96,7 @@ class ValueSourceStore:
raise ValueError("Time of day schedule requires at least 2 points")
source = AdaptiveValueSource(
id=sid, name=name, source_type="adaptive_time",
created_at=now, updated_at=now, description=description,
created_at=now, updated_at=now, description=description, tags=common_tags,
schedule=schedule_data,
min_value=min_value if min_value is not None else 0.0,
max_value=max_value if max_value is not None else 1.0,
@@ -154,7 +104,7 @@ class ValueSourceStore:
elif source_type == "adaptive_scene":
source = AdaptiveValueSource(
id=sid, name=name, source_type="adaptive_scene",
created_at=now, updated_at=now, description=description,
created_at=now, updated_at=now, description=description, tags=common_tags,
picture_source_id=picture_source_id or "",
scene_behavior=scene_behavior or "complement",
sensitivity=sensitivity if sensitivity is not None else 1.0,
@@ -163,7 +113,7 @@ class ValueSourceStore:
max_value=max_value if max_value is not None else 1.0,
)
self._sources[sid] = source
self._items[sid] = source
self._save()
logger.info(f"Created value source: {name} ({sid}, type={source_type})")
@@ -187,20 +137,18 @@ class ValueSourceStore:
picture_source_id: Optional[str] = None,
scene_behavior: Optional[str] = None,
auto_gain: Optional[bool] = None,
tags: Optional[List[str]] = None,
) -> ValueSource:
if source_id not in self._sources:
raise ValueError(f"Value source not found: {source_id}")
source = self._sources[source_id]
source = self.get(source_id)
if name is not None:
for other in self._sources.values():
if other.id != source_id and other.name == name:
raise ValueError(f"Value source with name '{name}' already exists")
self._check_name_unique(name, exclude_id=source_id)
source.name = name
if description is not None:
source.description = description
if tags is not None:
source.tags = tags
if isinstance(source, StaticValueSource):
if value is not None:
@@ -247,17 +195,8 @@ class ValueSourceStore:
if max_value is not None:
source.max_value = max_value
source.updated_at = datetime.utcnow()
source.updated_at = datetime.now(timezone.utc)
self._save()
logger.info(f"Updated value source: {source_id}")
return source
def delete_source(self, source_id: str) -> None:
if source_id not in self._sources:
raise ValueError(f"Value source not found: {source_id}")
del self._sources[source_id]
self._save()
logger.info(f"Deleted value source: {source_id}")

View File

@@ -1,8 +1,8 @@
"""LED output target — sends color strip sources to an LED device."""
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from datetime import datetime, timezone
from typing import List, Optional
from wled_controller.storage.output_target import OutputTarget
@@ -63,9 +63,10 @@ class WledOutputTarget(OutputTarget):
brightness_value_source_id=None,
fps=None, keepalive_interval=None, state_check_interval=None,
min_brightness_threshold=None, adaptive_fps=None, protocol=None,
description=None, **_kwargs) -> None:
description=None, tags: Optional[List[str]] = None,
**_kwargs) -> None:
"""Apply mutable field updates for WLED targets."""
super().update_fields(name=name, description=description)
super().update_fields(name=name, description=description, tags=tags)
if device_id is not None:
self.device_id = device_id
if color_strip_source_id is not None:
@@ -120,6 +121,7 @@ class WledOutputTarget(OutputTarget):
adaptive_fps=data.get("adaptive_fps", False),
protocol=data.get("protocol", "ddp"),
description=data.get("description"),
created_at=datetime.fromisoformat(data.get("created_at", datetime.utcnow().isoformat())),
updated_at=datetime.fromisoformat(data.get("updated_at", datetime.utcnow().isoformat())),
tags=data.get("tags", []),
created_at=datetime.fromisoformat(data.get("created_at", datetime.now(timezone.utc).isoformat())),
updated_at=datetime.fromisoformat(data.get("updated_at", datetime.now(timezone.utc).isoformat())),
)