"""Gradient storage with built-in seeding. Provides CRUD for gradient entities. On startup, seeds any missing built-in gradients. Built-in gradients are read-only and cannot be deleted or modified. """ import uuid from datetime import datetime, timezone from typing import List, Optional from wled_controller.storage.base_sqlite_store import BaseSqliteStore from wled_controller.storage.database import Database from wled_controller.storage.gradient import Gradient from wled_controller.utils import get_logger logger = get_logger(__name__) # Built-in gradient definitions — matches the legacy _PALETTE_DEFS. # Format: (position, R, G, B) tuples converted to stop dicts on seed. _BUILTIN_DEFS = { "fire": [(0, 0, 0, 0), (0.33, 200, 24, 0), (0.66, 255, 160, 0), (1.0, 255, 255, 200)], "ocean": [(0, 0, 0, 32), (0.33, 0, 16, 128), (0.66, 0, 128, 255), (1.0, 128, 224, 255)], "lava": [(0, 0, 0, 0), (0.25, 128, 0, 0), (0.5, 255, 32, 0), (0.75, 255, 160, 0), (1.0, 255, 255, 128)], "forest": [(0, 0, 16, 0), (0.33, 0, 80, 0), (0.66, 32, 160, 0), (1.0, 128, 255, 64)], "rainbow": [ (0, 255, 0, 0), (0.17, 255, 255, 0), (0.33, 0, 255, 0), (0.5, 0, 255, 255), (0.67, 0, 0, 255), (0.83, 255, 0, 255), (1.0, 255, 0, 0), ], "aurora": [ (0, 0, 16, 32), (0.2, 0, 80, 64), (0.4, 0, 200, 100), (0.6, 64, 128, 255), (0.8, 128, 0, 200), (1.0, 0, 16, 32), ], "sunset": [ (0, 32, 0, 64), (0.25, 128, 0, 128), (0.5, 255, 64, 0), (0.75, 255, 192, 64), (1.0, 255, 255, 192), ], "ice": [(0, 0, 0, 64), (0.33, 0, 64, 192), (0.66, 128, 192, 255), (1.0, 240, 248, 255)], "warm": [(0, 255, 255, 80), (0.33, 255, 160, 0), (0.67, 255, 60, 0), (1.0, 160, 0, 0)], "cool": [(0, 0, 255, 200), (0.33, 0, 120, 255), (0.67, 60, 0, 255), (1.0, 120, 0, 180)], "neon": [ (0, 255, 0, 200), (0.25, 0, 255, 255), (0.5, 0, 255, 50), (0.75, 255, 255, 0), (1.0, 255, 0, 100), ], "pastel": [ (0, 255, 180, 180), (0.2, 255, 220, 160), (0.4, 255, 255, 180), (0.6, 180, 255, 200), (0.8, 180, 200, 255), (1.0, 220, 180, 255), ], } def _tuples_to_stops(tuples: list) -> list: """Convert [(pos, R, G, B), ...] to [{"position": pos, "color": [R, G, B]}, ...].""" return [{"position": t[0], "color": [t[1], t[2], t[3]]} for t in tuples] class GradientStore(BaseSqliteStore[Gradient]): _table_name = "gradients" _entity_name = "Gradient" def __init__(self, db: Database): super().__init__(db, Gradient.from_dict) self._seed_missing_builtins() def _seed_missing_builtins(self) -> None: """Seed any built-in gradients not yet in the store.""" now = datetime.now(timezone.utc) added = 0 for name, tuples in _BUILTIN_DEFS.items(): gid = f"gr_builtin_{name}" if gid in self._items: continue gradient = Gradient( id=gid, name=name.capitalize(), stops=_tuples_to_stops(tuples), is_builtin=True, created_at=now, updated_at=now, description=f"Built-in {name} gradient", ) self._items[gid] = gradient self._save_item(gid, gradient) added += 1 if added: logger.info(f"Seeded {added} new built-in gradients") # Aliases get_all_gradients = BaseSqliteStore.get_all def get_gradient(self, gradient_id: str) -> Gradient: return self.get(gradient_id) def get_by_name(self, name: str) -> Optional[Gradient]: """Look up a gradient by name (case-insensitive).""" lower = name.lower() for g in self._items.values(): if g.name.lower() == lower: return g return None def create_gradient( self, name: str, stops: list, description: Optional[str] = None, tags: Optional[List[str]] = None, ) -> Gradient: self._check_name_unique(name) gid = f"gr_{uuid.uuid4().hex[:8]}" now = datetime.now(timezone.utc) gradient = Gradient( id=gid, name=name, stops=stops, is_builtin=False, created_at=now, updated_at=now, description=description, tags=tags or [], ) self._items[gid] = gradient self._save_item(gid, gradient) logger.info(f"Created gradient: {name} ({gid})") return gradient def update_gradient( self, gradient_id: str, name: Optional[str] = None, stops: Optional[list] = None, description: Optional[str] = None, tags: Optional[List[str]] = None, ) -> Gradient: gradient = self.get(gradient_id) if gradient.is_builtin: raise ValueError("Built-in gradients are read-only. Clone to edit.") if name is not None: self._check_name_unique(name, exclude_id=gradient_id) gradient.name = name if stops is not None: gradient.stops = stops if description is not None: gradient.description = description if tags is not None: gradient.tags = tags gradient.updated_at = datetime.now(timezone.utc) self._save_item(gradient_id, gradient) logger.info(f"Updated gradient: {gradient_id}") return gradient def delete_gradient(self, gradient_id: str) -> None: gradient = self.get(gradient_id) if gradient.is_builtin: raise ValueError("Built-in gradients cannot be deleted") self.delete(gradient_id) def get_stops_by_id(self, gradient_id: str) -> Optional[list]: """Return stops list for a gradient ID, or None if not found.""" gradient = self._items.get(gradient_id) if gradient is None: return None return gradient.stops def resolve_stops(self, gradient_id: Optional[str]) -> Optional[list]: """Resolve a gradient_id to its stops list, or None if invalid/missing.""" if not gradient_id: return None return self.get_stops_by_id(gradient_id) def migrate_palette_references(self, color_strip_store) -> int: """One-way migration: convert legacy palette names to gradient_id. For each color strip source that has a `palette` field but no `gradient_id`, sets `gradient_id` to the matching builtin gradient ID. Returns the number of sources migrated. """ migrated = 0 for source in color_strip_store.get_all_sources(): if getattr(source, "gradient_id", None): continue # already has a gradient_id palette_name = getattr(source, "palette", None) if not palette_name: continue # Map legacy palette name to builtin gradient ID builtin_id = f"gr_builtin_{palette_name}" if builtin_id in self._items: source.gradient_id = builtin_id migrated += 1 if migrated: color_strip_store._save() logger.info(f"Migrated {migrated} color strip sources from palette names to gradient IDs") return migrated