Add profile system for automatic target activation

Profiles monitor running processes and foreground windows to
automatically start/stop targets when conditions are met.
Includes profile engine, platform detector (WMI), REST API,
process browser endpoint, and calibration persistence fix.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-18 15:12:34 +03:00
parent d6cf45c873
commit 29d9b95885
15 changed files with 933 additions and 10 deletions

View File

@@ -9,6 +9,7 @@ from .routes.postprocessing import router as postprocessing_router
from .routes.picture_sources import router as picture_sources_router from .routes.picture_sources import router as picture_sources_router
from .routes.pattern_templates import router as pattern_templates_router from .routes.pattern_templates import router as pattern_templates_router
from .routes.picture_targets import router as picture_targets_router from .routes.picture_targets import router as picture_targets_router
from .routes.profiles import router as profiles_router
router = APIRouter() router = APIRouter()
router.include_router(system_router) router.include_router(system_router)
@@ -18,5 +19,6 @@ router.include_router(postprocessing_router)
router.include_router(pattern_templates_router) router.include_router(pattern_templates_router)
router.include_router(picture_sources_router) router.include_router(picture_sources_router)
router.include_router(picture_targets_router) router.include_router(picture_targets_router)
router.include_router(profiles_router)
__all__ = ["router"] __all__ = ["router"]

View File

@@ -7,6 +7,8 @@ from wled_controller.storage.postprocessing_template_store import Postprocessing
from wled_controller.storage.pattern_template_store import PatternTemplateStore from wled_controller.storage.pattern_template_store import PatternTemplateStore
from wled_controller.storage.picture_source_store import PictureSourceStore from wled_controller.storage.picture_source_store import PictureSourceStore
from wled_controller.storage.picture_target_store import PictureTargetStore from wled_controller.storage.picture_target_store import PictureTargetStore
from wled_controller.storage.profile_store import ProfileStore
from wled_controller.core.profiles.profile_engine import ProfileEngine
# Global instances (initialized in main.py) # Global instances (initialized in main.py)
_device_store: DeviceStore | None = None _device_store: DeviceStore | None = None
@@ -16,6 +18,8 @@ _pattern_template_store: PatternTemplateStore | None = None
_picture_source_store: PictureSourceStore | None = None _picture_source_store: PictureSourceStore | None = None
_picture_target_store: PictureTargetStore | None = None _picture_target_store: PictureTargetStore | None = None
_processor_manager: ProcessorManager | None = None _processor_manager: ProcessorManager | None = None
_profile_store: ProfileStore | None = None
_profile_engine: ProfileEngine | None = None
def get_device_store() -> DeviceStore: def get_device_store() -> DeviceStore:
@@ -67,6 +71,20 @@ def get_processor_manager() -> ProcessorManager:
return _processor_manager return _processor_manager
def get_profile_store() -> ProfileStore:
"""Get profile store dependency."""
if _profile_store is None:
raise RuntimeError("Profile store not initialized")
return _profile_store
def get_profile_engine() -> ProfileEngine:
"""Get profile engine dependency."""
if _profile_engine is None:
raise RuntimeError("Profile engine not initialized")
return _profile_engine
def init_dependencies( def init_dependencies(
device_store: DeviceStore, device_store: DeviceStore,
template_store: TemplateStore, template_store: TemplateStore,
@@ -75,10 +93,13 @@ def init_dependencies(
pattern_template_store: PatternTemplateStore | None = None, pattern_template_store: PatternTemplateStore | None = None,
picture_source_store: PictureSourceStore | None = None, picture_source_store: PictureSourceStore | None = None,
picture_target_store: PictureTargetStore | None = None, picture_target_store: PictureTargetStore | None = None,
profile_store: ProfileStore | None = None,
profile_engine: ProfileEngine | None = None,
): ):
"""Initialize global dependencies.""" """Initialize global dependencies."""
global _device_store, _template_store, _processor_manager global _device_store, _template_store, _processor_manager
global _pp_template_store, _pattern_template_store, _picture_source_store, _picture_target_store global _pp_template_store, _pattern_template_store, _picture_source_store, _picture_target_store
global _profile_store, _profile_engine
_device_store = device_store _device_store = device_store
_template_store = template_store _template_store = template_store
_processor_manager = processor_manager _processor_manager = processor_manager
@@ -86,3 +107,5 @@ def init_dependencies(
_pattern_template_store = pattern_template_store _pattern_template_store = pattern_template_store
_picture_source_store = picture_source_store _picture_source_store = picture_source_store
_picture_target_store = picture_target_store _picture_target_store = picture_target_store
_profile_store = profile_store
_profile_engine = profile_engine

View File

@@ -0,0 +1,255 @@
"""Profile management API routes."""
from fastapi import APIRouter, Depends, HTTPException
from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
get_picture_target_store,
get_profile_engine,
get_profile_store,
)
from wled_controller.api.schemas.profiles import (
ConditionSchema,
ProfileCreate,
ProfileListResponse,
ProfileResponse,
ProfileUpdate,
)
from wled_controller.core.profiles.profile_engine import ProfileEngine
from wled_controller.storage.picture_target_store import PictureTargetStore
from wled_controller.storage.profile import ApplicationCondition, Condition
from wled_controller.storage.profile_store import ProfileStore
from wled_controller.utils import get_logger
logger = get_logger(__name__)
router = APIRouter()
# ===== Helpers =====
def _condition_from_schema(s: ConditionSchema) -> Condition:
if s.condition_type == "application":
return ApplicationCondition(
apps=s.apps or [],
match_type=s.match_type or "running",
)
raise ValueError(f"Unknown condition type: {s.condition_type}")
def _condition_to_schema(c: Condition) -> ConditionSchema:
d = c.to_dict()
return ConditionSchema(**d)
def _profile_to_response(profile, engine: ProfileEngine) -> ProfileResponse:
state = engine.get_profile_state(profile.id)
return ProfileResponse(
id=profile.id,
name=profile.name,
enabled=profile.enabled,
condition_logic=profile.condition_logic,
conditions=[_condition_to_schema(c) for c in profile.conditions],
target_ids=profile.target_ids,
is_active=state["is_active"],
active_target_ids=state["active_target_ids"],
last_activated_at=state.get("last_activated_at"),
last_deactivated_at=state.get("last_deactivated_at"),
created_at=profile.created_at,
updated_at=profile.updated_at,
)
def _validate_condition_logic(logic: str) -> None:
if logic not in ("or", "and"):
raise HTTPException(status_code=400, detail=f"Invalid condition_logic: {logic}. Must be 'or' or 'and'.")
def _validate_target_ids(target_ids: list, target_store: PictureTargetStore) -> None:
for tid in target_ids:
try:
target_store.get_target(tid)
except ValueError:
raise HTTPException(status_code=400, detail=f"Target not found: {tid}")
# ===== CRUD Endpoints =====
@router.post(
"/api/v1/profiles",
response_model=ProfileResponse,
tags=["Profiles"],
status_code=201,
)
async def create_profile(
data: ProfileCreate,
_auth: AuthRequired,
store: ProfileStore = Depends(get_profile_store),
engine: ProfileEngine = Depends(get_profile_engine),
target_store: PictureTargetStore = Depends(get_picture_target_store),
):
"""Create a new profile."""
_validate_condition_logic(data.condition_logic)
_validate_target_ids(data.target_ids, target_store)
try:
conditions = [_condition_from_schema(c) for c in data.conditions]
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
profile = store.create_profile(
name=data.name,
enabled=data.enabled,
condition_logic=data.condition_logic,
conditions=conditions,
target_ids=data.target_ids,
)
return _profile_to_response(profile, engine)
@router.get(
"/api/v1/profiles",
response_model=ProfileListResponse,
tags=["Profiles"],
)
async def list_profiles(
_auth: AuthRequired,
store: ProfileStore = Depends(get_profile_store),
engine: ProfileEngine = Depends(get_profile_engine),
):
"""List all profiles."""
profiles = store.get_all_profiles()
return ProfileListResponse(
profiles=[_profile_to_response(p, engine) for p in profiles],
count=len(profiles),
)
@router.get(
"/api/v1/profiles/{profile_id}",
response_model=ProfileResponse,
tags=["Profiles"],
)
async def get_profile(
profile_id: str,
_auth: AuthRequired,
store: ProfileStore = Depends(get_profile_store),
engine: ProfileEngine = Depends(get_profile_engine),
):
"""Get a single profile."""
try:
profile = store.get_profile(profile_id)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
return _profile_to_response(profile, engine)
@router.put(
"/api/v1/profiles/{profile_id}",
response_model=ProfileResponse,
tags=["Profiles"],
)
async def update_profile(
profile_id: str,
data: ProfileUpdate,
_auth: AuthRequired,
store: ProfileStore = Depends(get_profile_store),
engine: ProfileEngine = Depends(get_profile_engine),
target_store: PictureTargetStore = Depends(get_picture_target_store),
):
"""Update a profile."""
if data.condition_logic is not None:
_validate_condition_logic(data.condition_logic)
if data.target_ids is not None:
_validate_target_ids(data.target_ids, target_store)
conditions = None
if data.conditions is not None:
try:
conditions = [_condition_from_schema(c) for c in data.conditions]
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
try:
# If disabling, deactivate first
if data.enabled is False:
await engine.deactivate_if_active(profile_id)
profile = store.update_profile(
profile_id=profile_id,
name=data.name,
enabled=data.enabled,
condition_logic=data.condition_logic,
conditions=conditions,
target_ids=data.target_ids,
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
return _profile_to_response(profile, engine)
@router.delete(
"/api/v1/profiles/{profile_id}",
status_code=204,
tags=["Profiles"],
)
async def delete_profile(
profile_id: str,
_auth: AuthRequired,
store: ProfileStore = Depends(get_profile_store),
engine: ProfileEngine = Depends(get_profile_engine),
):
"""Delete a profile."""
# Deactivate first (stop owned targets)
await engine.deactivate_if_active(profile_id)
try:
store.delete_profile(profile_id)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
# ===== Enable/Disable =====
@router.post(
"/api/v1/profiles/{profile_id}/enable",
response_model=ProfileResponse,
tags=["Profiles"],
)
async def enable_profile(
profile_id: str,
_auth: AuthRequired,
store: ProfileStore = Depends(get_profile_store),
engine: ProfileEngine = Depends(get_profile_engine),
):
"""Enable a profile."""
try:
profile = store.update_profile(profile_id=profile_id, enabled=True)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
return _profile_to_response(profile, engine)
@router.post(
"/api/v1/profiles/{profile_id}/disable",
response_model=ProfileResponse,
tags=["Profiles"],
)
async def disable_profile(
profile_id: str,
_auth: AuthRequired,
store: ProfileStore = Depends(get_profile_store),
engine: ProfileEngine = Depends(get_profile_engine),
):
"""Disable a profile and stop any targets it owns."""
await engine.deactivate_if_active(profile_id)
try:
profile = store.update_profile(profile_id=profile_id, enabled=False)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
return _profile_to_response(profile, engine)

View File

@@ -11,6 +11,7 @@ from wled_controller.api.schemas.system import (
DisplayInfo, DisplayInfo,
DisplayListResponse, DisplayListResponse,
HealthResponse, HealthResponse,
ProcessListResponse,
VersionResponse, VersionResponse,
) )
from wled_controller.core.capture.screen_capture import get_available_displays from wled_controller.core.capture.screen_capture import get_available_displays
@@ -91,3 +92,24 @@ async def get_displays(_: AuthRequired):
status_code=500, status_code=500,
detail=f"Failed to retrieve display information: {str(e)}" detail=f"Failed to retrieve display information: {str(e)}"
) )
@router.get("/api/v1/system/processes", response_model=ProcessListResponse, tags=["Config"])
async def get_running_processes(_: AuthRequired):
"""Get list of currently running process names.
Returns a sorted list of unique process names for use in profile conditions.
"""
from wled_controller.core.profiles.platform_detector import PlatformDetector
try:
detector = PlatformDetector()
processes = await detector.get_running_processes()
sorted_procs = sorted(processes)
return ProcessListResponse(processes=sorted_procs, count=len(sorted_procs))
except Exception as e:
logger.error(f"Failed to get processes: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to retrieve process list: {str(e)}"
)

View File

@@ -0,0 +1,58 @@
"""Profile-related schemas."""
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field
class ConditionSchema(BaseModel):
"""A single condition within a profile."""
condition_type: str = Field(description="Condition type discriminator (e.g. 'application')")
apps: Optional[List[str]] = Field(None, description="Process names (for application condition)")
match_type: Optional[str] = Field(None, description="'running' or 'topmost' (for application condition)")
class ProfileCreate(BaseModel):
"""Request to create a profile."""
name: str = Field(description="Profile name", min_length=1, max_length=100)
enabled: bool = Field(default=True, description="Whether the profile is enabled")
condition_logic: str = Field(default="or", description="How conditions combine: 'or' or 'and'")
conditions: List[ConditionSchema] = Field(default_factory=list, description="List of conditions")
target_ids: List[str] = Field(default_factory=list, description="Target IDs to activate")
class ProfileUpdate(BaseModel):
"""Request to update a profile."""
name: Optional[str] = Field(None, description="Profile name", min_length=1, max_length=100)
enabled: Optional[bool] = Field(None, description="Whether the profile is enabled")
condition_logic: Optional[str] = Field(None, description="How conditions combine: 'or' or 'and'")
conditions: Optional[List[ConditionSchema]] = Field(None, description="List of conditions")
target_ids: Optional[List[str]] = Field(None, description="Target IDs to activate")
class ProfileResponse(BaseModel):
"""Profile information response."""
id: str = Field(description="Profile ID")
name: str = Field(description="Profile name")
enabled: bool = Field(description="Whether the profile is enabled")
condition_logic: str = Field(description="Condition combination logic")
conditions: List[ConditionSchema] = Field(description="List of conditions")
target_ids: List[str] = Field(description="Target IDs to activate")
is_active: bool = Field(default=False, description="Whether the profile is currently active")
active_target_ids: List[str] = Field(default_factory=list, description="Targets currently owned by this profile")
last_activated_at: Optional[datetime] = Field(None, description="Last time this profile was activated")
last_deactivated_at: Optional[datetime] = Field(None, description="Last time this profile was deactivated")
created_at: datetime = Field(description="Creation timestamp")
updated_at: datetime = Field(description="Last update timestamp")
class ProfileListResponse(BaseModel):
"""List of profiles response."""
profiles: List[ProfileResponse] = Field(description="List of profiles")
count: int = Field(description="Number of profiles")

View File

@@ -40,3 +40,10 @@ class DisplayListResponse(BaseModel):
displays: List[DisplayInfo] = Field(description="Available displays") displays: List[DisplayInfo] = Field(description="Available displays")
count: int = Field(description="Number of displays") count: int = Field(description="Number of displays")
class ProcessListResponse(BaseModel):
"""List of running processes."""
processes: List[str] = Field(description="Sorted list of unique process names")
count: int = Field(description="Number of unique processes")

View File

@@ -33,6 +33,7 @@ class StorageConfig(BaseSettings):
picture_sources_file: str = "data/picture_sources.json" picture_sources_file: str = "data/picture_sources.json"
picture_targets_file: str = "data/picture_targets.json" picture_targets_file: str = "data/picture_targets.json"
pattern_templates_file: str = "data/pattern_templates.json" pattern_templates_file: str = "data/pattern_templates.json"
profiles_file: str = "data/profiles.json"
class LoggingConfig(BaseSettings): class LoggingConfig(BaseSettings):

View File

@@ -691,22 +691,16 @@ class ProcessorManager:
state.device_type, state.device_url, client, state.health, state.device_type, state.device_url, client, state.health,
) )
# Auto-sync LED count # Auto-sync LED count (preserve existing calibration)
reported = state.health.device_led_count reported = state.health.device_led_count
if reported and reported != state.led_count and self._device_store: if reported and reported != state.led_count and self._device_store:
old_count = state.led_count old_count = state.led_count
logger.info( logger.info(
f"Device {device_id} LED count changed: {old_count}{reported}, " f"Device {device_id} LED count changed: {old_count}{reported}"
f"updating calibration"
) )
try: try:
device = self._device_store.update_device(device_id, led_count=reported) self._device_store.update_device(device_id, led_count=reported)
state.led_count = reported state.led_count = reported
state.calibration = device.calibration
# Propagate to WLED processors using this device
for proc in self._processors.values():
if isinstance(proc, WledTargetProcessor) and proc.device_id == device_id:
proc.update_calibration(device.calibration)
except Exception as e: except Exception as e:
logger.error(f"Failed to sync LED count for {device_id}: {e}") logger.error(f"Failed to sync LED count for {device_id}: {e}")

View File

@@ -0,0 +1 @@
"""Profile automation — condition evaluation and target management."""

View File

@@ -0,0 +1,91 @@
"""Platform-specific process and window detection.
Windows: uses wmi for process listing, ctypes for foreground window detection.
Non-Windows: graceful degradation (returns empty results).
"""
import asyncio
import ctypes
import ctypes.wintypes
import os
import sys
from typing import Optional, Set
from wled_controller.utils import get_logger
logger = get_logger(__name__)
_IS_WINDOWS = sys.platform == "win32"
class PlatformDetector:
"""Detect running processes and the foreground window's process."""
def _get_running_processes_sync(self) -> Set[str]:
"""Get set of lowercase process names (blocking, call via executor)."""
if not _IS_WINDOWS:
return set()
try:
import pythoncom
pythoncom.CoInitialize()
try:
import wmi
w = wmi.WMI()
return {p.Name.lower() for p in w.Win32_Process() if p.Name}
finally:
pythoncom.CoUninitialize()
except Exception as e:
logger.error(f"Failed to enumerate processes: {e}")
return set()
def _get_topmost_process_sync(self) -> Optional[str]:
"""Get lowercase process name of the foreground window (blocking, call via executor)."""
if not _IS_WINDOWS:
return None
try:
user32 = ctypes.windll.user32
kernel32 = ctypes.windll.kernel32
psapi = ctypes.windll.psapi
hwnd = user32.GetForegroundWindow()
if not hwnd:
return None
pid = ctypes.wintypes.DWORD()
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
if not pid.value:
return None
PROCESS_QUERY_INFORMATION = 0x0400
PROCESS_VM_READ = 0x0010
handle = kernel32.OpenProcess(
PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid.value
)
if not handle:
return None
try:
buf = ctypes.create_unicode_buffer(512)
psapi.GetModuleFileNameExW(handle, None, buf, 512)
full_path = buf.value
if full_path:
return os.path.basename(full_path).lower()
finally:
kernel32.CloseHandle(handle)
return None
except Exception as e:
logger.error(f"Failed to get foreground process: {e}")
return None
async def get_running_processes(self) -> Set[str]:
"""Get set of lowercase process names (async-safe)."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._get_running_processes_sync)
async def get_topmost_process(self) -> Optional[str]:
"""Get lowercase process name of the foreground window (async-safe)."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._get_topmost_process_sync)

View File

@@ -0,0 +1,216 @@
"""Profile engine — background loop that evaluates conditions and manages targets."""
import asyncio
from datetime import datetime, timezone
from typing import Dict, Optional, Set
from wled_controller.core.profiles.platform_detector import PlatformDetector
from wled_controller.storage.profile import ApplicationCondition, Condition, Profile
from wled_controller.storage.profile_store import ProfileStore
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class ProfileEngine:
"""Evaluates profile conditions and starts/stops targets accordingly."""
def __init__(self, profile_store: ProfileStore, processor_manager, poll_interval: float = 3.0):
self._store = profile_store
self._manager = processor_manager
self._poll_interval = poll_interval
self._detector = PlatformDetector()
self._task: Optional[asyncio.Task] = None
# Runtime state (not persisted)
# profile_id → set of target_ids that THIS profile started
self._active_profiles: Dict[str, Set[str]] = {}
# profile_id → datetime of last activation / deactivation
self._last_activated: Dict[str, datetime] = {}
self._last_deactivated: Dict[str, datetime] = {}
async def start(self) -> None:
if self._task is not None:
return
self._task = asyncio.create_task(self._poll_loop())
logger.info("Profile engine started")
async def stop(self) -> None:
if self._task is None:
return
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
# Deactivate all profiles (stop owned targets)
for profile_id in list(self._active_profiles.keys()):
await self._deactivate_profile(profile_id)
logger.info("Profile engine stopped")
async def _poll_loop(self) -> None:
try:
while True:
try:
await self._evaluate_all()
except Exception as e:
logger.error(f"Profile evaluation error: {e}", exc_info=True)
await asyncio.sleep(self._poll_interval)
except asyncio.CancelledError:
pass
async def _evaluate_all(self) -> None:
profiles = self._store.get_all_profiles()
if not profiles:
# No profiles — deactivate any stale state
for pid in list(self._active_profiles.keys()):
await self._deactivate_profile(pid)
return
# Gather platform state once per cycle
running_procs = await self._detector.get_running_processes()
topmost_proc = await self._detector.get_topmost_process()
active_profile_ids = set()
for profile in profiles:
should_be_active = (
profile.enabled
and len(profile.conditions) > 0
and self._evaluate_conditions(profile, running_procs, topmost_proc)
)
is_active = profile.id in self._active_profiles
if should_be_active and not is_active:
await self._activate_profile(profile)
active_profile_ids.add(profile.id)
elif should_be_active and is_active:
active_profile_ids.add(profile.id)
elif not should_be_active and is_active:
await self._deactivate_profile(profile.id)
# Deactivate profiles that were removed from store while active
for pid in list(self._active_profiles.keys()):
if pid not in active_profile_ids:
await self._deactivate_profile(pid)
def _evaluate_conditions(
self, profile: Profile, running_procs: Set[str], topmost_proc: Optional[str]
) -> bool:
results = [
self._evaluate_condition(c, running_procs, topmost_proc)
for c in profile.conditions
]
if profile.condition_logic == "and":
return all(results)
return any(results) # "or" is default
def _evaluate_condition(
self, condition: Condition, running_procs: Set[str], topmost_proc: Optional[str]
) -> bool:
if isinstance(condition, ApplicationCondition):
return self._evaluate_app_condition(condition, running_procs, topmost_proc)
return False
def _evaluate_app_condition(
self,
condition: ApplicationCondition,
running_procs: Set[str],
topmost_proc: Optional[str],
) -> bool:
if not condition.apps:
return False
apps_lower = [a.lower() for a in condition.apps]
if condition.match_type == "topmost":
if topmost_proc is None:
return False
return any(app == topmost_proc for app in apps_lower)
# Default: "running"
return any(app in running_procs for app in apps_lower)
async def _activate_profile(self, profile: Profile) -> None:
started: Set[str] = set()
for target_id in profile.target_ids:
try:
# Skip targets that are already running (manual or other profile)
proc = self._manager._processors.get(target_id)
if proc and proc.is_running:
continue
await self._manager.start_processing(target_id)
started.add(target_id)
logger.info(f"Profile '{profile.name}' started target {target_id}")
except Exception as e:
logger.warning(f"Profile '{profile.name}' failed to start target {target_id}: {e}")
if started:
self._active_profiles[profile.id] = started
self._last_activated[profile.id] = datetime.now(timezone.utc)
self._fire_event(profile.id, "activated", list(started))
logger.info(f"Profile '{profile.name}' activated ({len(started)} targets)")
else:
logger.debug(f"Profile '{profile.name}' matched but no targets started — will retry")
async def _deactivate_profile(self, profile_id: str) -> None:
owned = self._active_profiles.pop(profile_id, set())
stopped = []
for target_id in owned:
try:
proc = self._manager._processors.get(target_id)
if proc and proc.is_running:
await self._manager.stop_processing(target_id)
stopped.append(target_id)
logger.info(f"Profile {profile_id} stopped target {target_id}")
except Exception as e:
logger.warning(f"Profile {profile_id} failed to stop target {target_id}: {e}")
if stopped:
self._last_deactivated[profile_id] = datetime.now(timezone.utc)
self._fire_event(profile_id, "deactivated", stopped)
logger.info(f"Profile {profile_id} deactivated ({len(stopped)} targets stopped)")
def _fire_event(self, profile_id: str, action: str, target_ids: list) -> None:
try:
self._manager._fire_event({
"type": "profile_state_changed",
"profile_id": profile_id,
"action": action,
"target_ids": target_ids,
})
except Exception:
pass
# ===== Public query methods (used by API) =====
def get_profile_state(self, profile_id: str) -> dict:
"""Get runtime state of a single profile."""
is_active = profile_id in self._active_profiles
owned = list(self._active_profiles.get(profile_id, set()))
return {
"is_active": is_active,
"active_target_ids": owned,
"last_activated_at": self._last_activated.get(profile_id),
"last_deactivated_at": self._last_deactivated.get(profile_id),
}
def get_all_profile_states(self) -> Dict[str, dict]:
"""Get runtime states of all profiles."""
result = {}
for profile in self._store.get_all_profiles():
result[profile.id] = self.get_profile_state(profile.id)
return result
async def deactivate_if_active(self, profile_id: str) -> None:
"""Deactivate a profile immediately (used when disabling/deleting)."""
if profile_id in self._active_profiles:
await self._deactivate_profile(profile_id)

View File

@@ -23,6 +23,8 @@ from wled_controller.storage.picture_source_store import PictureSourceStore
from wled_controller.storage.picture_target_store import PictureTargetStore from wled_controller.storage.picture_target_store import PictureTargetStore
from wled_controller.storage.wled_picture_target import WledPictureTarget from wled_controller.storage.wled_picture_target import WledPictureTarget
from wled_controller.storage.key_colors_picture_target import KeyColorsPictureTarget from wled_controller.storage.key_colors_picture_target import KeyColorsPictureTarget
from wled_controller.storage.profile_store import ProfileStore
from wled_controller.core.profiles.profile_engine import ProfileEngine
from wled_controller.utils import setup_logging, get_logger from wled_controller.utils import setup_logging, get_logger
# Initialize logging # Initialize logging
@@ -39,6 +41,7 @@ pp_template_store = PostprocessingTemplateStore(config.storage.postprocessing_te
picture_source_store = PictureSourceStore(config.storage.picture_sources_file) picture_source_store = PictureSourceStore(config.storage.picture_sources_file)
picture_target_store = PictureTargetStore(config.storage.picture_targets_file) picture_target_store = PictureTargetStore(config.storage.picture_targets_file)
pattern_template_store = PatternTemplateStore(config.storage.pattern_templates_file) pattern_template_store = PatternTemplateStore(config.storage.pattern_templates_file)
profile_store = ProfileStore(config.storage.profiles_file)
processor_manager = ProcessorManager( processor_manager = ProcessorManager(
picture_source_store=picture_source_store, picture_source_store=picture_source_store,
@@ -138,6 +141,9 @@ async def lifespan(app: FastAPI):
# Run migrations # Run migrations
_migrate_devices_to_targets() _migrate_devices_to_targets()
# Create profile engine (needs processor_manager)
profile_engine = ProfileEngine(profile_store, processor_manager)
# Initialize API dependencies # Initialize API dependencies
init_dependencies( init_dependencies(
device_store, template_store, processor_manager, device_store, template_store, processor_manager,
@@ -145,6 +151,8 @@ async def lifespan(app: FastAPI):
pattern_template_store=pattern_template_store, pattern_template_store=pattern_template_store,
picture_source_store=picture_source_store, picture_source_store=picture_source_store,
picture_target_store=picture_target_store, picture_target_store=picture_target_store,
profile_store=profile_store,
profile_engine=profile_engine,
) )
# Register devices in processor manager for health monitoring # Register devices in processor manager for health monitoring
@@ -201,11 +209,21 @@ async def lifespan(app: FastAPI):
# Start background health monitoring for all devices # Start background health monitoring for all devices
await processor_manager.start_health_monitoring() await processor_manager.start_health_monitoring()
# Start profile engine (evaluates conditions and auto-starts/stops targets)
await profile_engine.start()
yield yield
# Shutdown # Shutdown
logger.info("Shutting down LED Grab") logger.info("Shutting down LED Grab")
# Stop profile engine first (deactivates profile-managed targets)
try:
await profile_engine.stop()
logger.info("Stopped profile engine")
except Exception as e:
logger.error(f"Error stopping profile engine: {e}")
# Stop all processing # Stop all processing
try: try:
await processor_manager.stop_all() await processor_manager.stop_all()

View File

@@ -243,7 +243,6 @@ class DeviceStore:
device.url = url device.url = url
if led_count is not None: if led_count is not None:
device.led_count = led_count device.led_count = led_count
device.calibration = create_default_calibration(led_count)
if enabled is not None: if enabled is not None:
device.enabled = enabled device.enabled = enabled
if baud_rate is not None: if baud_rate is not None:

View File

@@ -0,0 +1,91 @@
"""Profile and Condition data models."""
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional
@dataclass
class Condition:
"""Base condition — polymorphic via condition_type discriminator."""
condition_type: str
def to_dict(self) -> dict:
return {"condition_type": self.condition_type}
@classmethod
def from_dict(cls, data: dict) -> "Condition":
"""Factory: dispatch to the correct subclass."""
ct = data.get("condition_type", "")
if ct == "application":
return ApplicationCondition.from_dict(data)
raise ValueError(f"Unknown condition type: {ct}")
@dataclass
class ApplicationCondition(Condition):
"""Activate when specified applications are running or topmost."""
condition_type: str = "application"
apps: List[str] = field(default_factory=list)
match_type: str = "running" # "running" | "topmost"
def to_dict(self) -> dict:
d = super().to_dict()
d["apps"] = list(self.apps)
d["match_type"] = self.match_type
return d
@classmethod
def from_dict(cls, data: dict) -> "ApplicationCondition":
return cls(
apps=data.get("apps", []),
match_type=data.get("match_type", "running"),
)
@dataclass
class Profile:
"""Automation profile that activates targets based on conditions."""
id: str
name: str
enabled: bool
condition_logic: str # "or" | "and"
conditions: List[Condition]
target_ids: List[str]
created_at: datetime
updated_at: datetime
def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"enabled": self.enabled,
"condition_logic": self.condition_logic,
"conditions": [c.to_dict() for c in self.conditions],
"target_ids": list(self.target_ids),
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}
@classmethod
def from_dict(cls, data: dict) -> "Profile":
conditions = []
for c_data in data.get("conditions", []):
try:
conditions.append(Condition.from_dict(c_data))
except ValueError:
pass # skip unknown condition types on load
return cls(
id=data["id"],
name=data["name"],
enabled=data.get("enabled", True),
condition_logic=data.get("condition_logic", "or"),
conditions=conditions,
target_ids=data.get("target_ids", []),
created_at=datetime.fromisoformat(data.get("created_at", datetime.utcnow().isoformat())),
updated_at=datetime.fromisoformat(data.get("updated_at", datetime.utcnow().isoformat())),
)

View File

@@ -0,0 +1,145 @@
"""Profile storage using JSON files."""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from wled_controller.storage.profile import Condition, Profile
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class ProfileStore:
"""Persistent storage for automation profiles."""
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self._profiles: Dict[str, Profile] = {}
self._load()
def _load(self) -> None:
if not self.file_path.exists():
return
try:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json.load(f)
profiles_data = data.get("profiles", {})
loaded = 0
for profile_id, profile_dict in profiles_data.items():
try:
profile = Profile.from_dict(profile_dict)
self._profiles[profile_id] = profile
loaded += 1
except Exception as e:
logger.error(f"Failed to load profile {profile_id}: {e}", exc_info=True)
if loaded > 0:
logger.info(f"Loaded {loaded} profiles from storage")
except Exception as e:
logger.error(f"Failed to load profiles from {self.file_path}: {e}")
raise
logger.info(f"Profile store initialized with {len(self._profiles)} profiles")
def _save(self) -> None:
try:
self.file_path.parent.mkdir(parents=True, exist_ok=True)
data = {
"version": "1.0.0",
"profiles": {
pid: p.to_dict() for pid, p in self._profiles.items()
},
}
with open(self.file_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Failed to save profiles to {self.file_path}: {e}")
raise
def get_all_profiles(self) -> List[Profile]:
return list(self._profiles.values())
def get_profile(self, profile_id: str) -> Profile:
if profile_id not in self._profiles:
raise ValueError(f"Profile not found: {profile_id}")
return self._profiles[profile_id]
def create_profile(
self,
name: str,
enabled: bool = True,
condition_logic: str = "or",
conditions: Optional[List[Condition]] = None,
target_ids: Optional[List[str]] = None,
) -> Profile:
profile_id = f"prof_{uuid.uuid4().hex[:8]}"
now = datetime.utcnow()
profile = Profile(
id=profile_id,
name=name,
enabled=enabled,
condition_logic=condition_logic,
conditions=conditions or [],
target_ids=target_ids or [],
created_at=now,
updated_at=now,
)
self._profiles[profile_id] = profile
self._save()
logger.info(f"Created profile: {name} ({profile_id})")
return profile
def update_profile(
self,
profile_id: str,
name: Optional[str] = None,
enabled: Optional[bool] = None,
condition_logic: Optional[str] = None,
conditions: Optional[List[Condition]] = None,
target_ids: Optional[List[str]] = None,
) -> Profile:
if profile_id not in self._profiles:
raise ValueError(f"Profile not found: {profile_id}")
profile = self._profiles[profile_id]
if name is not None:
profile.name = name
if enabled is not None:
profile.enabled = enabled
if condition_logic is not None:
profile.condition_logic = condition_logic
if conditions is not None:
profile.conditions = conditions
if target_ids is not None:
profile.target_ids = target_ids
profile.updated_at = datetime.utcnow()
self._save()
logger.info(f"Updated profile: {profile_id}")
return profile
def delete_profile(self, profile_id: str) -> None:
if profile_id not in self._profiles:
raise ValueError(f"Profile not found: {profile_id}")
del self._profiles[profile_id]
self._save()
logger.info(f"Deleted profile: {profile_id}")
def count(self) -> int:
return len(self._profiles)