"""Automation and Rule data models.""" import logging from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Dict, List, Optional, Type logger = logging.getLogger(__name__) @dataclass class Rule: """Base rule — polymorphic via rule_type discriminator.""" rule_type: str def to_dict(self) -> dict: return {"rule_type": self.rule_type} @classmethod def from_dict(cls, data: dict) -> "Rule": """Factory: dispatch to the correct subclass via registry.""" # Support legacy "condition_type" key for migration rt = data.get("rule_type") or data.get("condition_type", "") subcls = _RULE_MAP.get(rt) if subcls is None: raise ValueError(f"Unknown rule type: {rt}") return subcls.from_dict(data) @dataclass class ApplicationRule(Rule): """Activate when specified applications are running or topmost.""" rule_type: str = "application" apps: List[str] = field(default_factory=list) match_type: str = "running" # "running" | "topmost" def to_dict(self) -> dict: d = super().to_dict() d["apps"] = list(self.apps) d["match_type"] = self.match_type return d @classmethod def from_dict(cls, data: dict) -> "ApplicationRule": return cls( apps=data.get("apps", []), match_type=data.get("match_type", "running"), ) @dataclass class TimeOfDayRule(Rule): """Activate during a specific time range (server local time). Supports overnight ranges: if start_time > end_time, the range wraps around midnight (e.g. 22:00 → 06:00). """ rule_type: str = "time_of_day" start_time: str = "00:00" # HH:MM end_time: str = "23:59" # HH:MM def to_dict(self) -> dict: d = super().to_dict() d["start_time"] = self.start_time d["end_time"] = self.end_time return d @classmethod def from_dict(cls, data: dict) -> "TimeOfDayRule": return cls( start_time=data.get("start_time", "00:00"), end_time=data.get("end_time", "23:59"), ) @dataclass class SystemIdleRule(Rule): """Activate based on system idle time (keyboard/mouse inactivity).""" rule_type: str = "system_idle" idle_minutes: int = 5 when_idle: bool = True # True = active when idle; False = active when NOT idle def to_dict(self) -> dict: d = super().to_dict() d["idle_minutes"] = self.idle_minutes d["when_idle"] = self.when_idle return d @classmethod def from_dict(cls, data: dict) -> "SystemIdleRule": return cls( idle_minutes=data.get("idle_minutes", 5), when_idle=data.get("when_idle", True), ) @dataclass class DisplayStateRule(Rule): """Activate based on display/monitor power state.""" rule_type: str = "display_state" state: str = "on" # "on" | "off" def to_dict(self) -> dict: d = super().to_dict() d["state"] = self.state return d @classmethod def from_dict(cls, data: dict) -> "DisplayStateRule": return cls( state=data.get("state", "on"), ) @dataclass class MQTTRule(Rule): """Activate based on an MQTT topic value.""" rule_type: str = "mqtt" topic: str = "" payload: str = "" match_mode: str = "exact" # "exact" | "contains" | "regex" def to_dict(self) -> dict: d = super().to_dict() d["topic"] = self.topic d["payload"] = self.payload d["match_mode"] = self.match_mode return d @classmethod def from_dict(cls, data: dict) -> "MQTTRule": return cls( topic=data.get("topic", ""), payload=data.get("payload", ""), match_mode=data.get("match_mode", "exact"), ) @dataclass class WebhookRule(Rule): """Activate via an HTTP webhook call with a secret token.""" rule_type: str = "webhook" token: str = "" # auto-generated 128-bit hex secret def to_dict(self) -> dict: d = super().to_dict() d["token"] = self.token return d @classmethod def from_dict(cls, data: dict) -> "WebhookRule": return cls(token=data.get("token", "")) @dataclass class StartupRule(Rule): """Activate when the server starts — stays active while enabled.""" rule_type: str = "startup" @classmethod def from_dict(cls, data: dict) -> "StartupRule": return cls() @dataclass class HomeAssistantRule(Rule): """Activate based on a Home Assistant entity state.""" rule_type: str = "home_assistant" ha_source_id: str = "" # references HomeAssistantSource entity_id: str = "" # e.g. "binary_sensor.front_door" state: str = "" # expected state value match_mode: str = "exact" # "exact" | "contains" | "regex" def to_dict(self) -> dict: d = super().to_dict() d["ha_source_id"] = self.ha_source_id d["entity_id"] = self.entity_id d["state"] = self.state d["match_mode"] = self.match_mode return d @classmethod def from_dict(cls, data: dict) -> "HomeAssistantRule": return cls( ha_source_id=data.get("ha_source_id", ""), entity_id=data.get("entity_id", ""), state=data.get("state", ""), match_mode=data.get("match_mode", "exact"), ) _RULE_MAP: Dict[str, Type[Rule]] = { "application": ApplicationRule, "time_of_day": TimeOfDayRule, "system_idle": SystemIdleRule, "display_state": DisplayStateRule, "mqtt": MQTTRule, "webhook": WebhookRule, "startup": StartupRule, "home_assistant": HomeAssistantRule, # Legacy: "always" maps to StartupRule for migration "always": StartupRule, } # ── Backward-compatible aliases (for imports in other modules during transition) ── Condition = Rule ApplicationCondition = ApplicationRule TimeOfDayCondition = TimeOfDayRule SystemIdleCondition = SystemIdleRule DisplayStateCondition = DisplayStateRule MQTTCondition = MQTTRule WebhookCondition = WebhookRule StartupCondition = StartupRule HomeAssistantCondition = HomeAssistantRule AlwaysCondition = StartupRule # "Always" removed — maps to Startup @dataclass class Automation: """Automation that activates a scene preset based on rules.""" id: str name: str enabled: bool rule_logic: str # "or" | "and" rules: List[Rule] scene_preset_id: Optional[str] # scene to activate when rules are met deactivation_mode: str # "none" | "revert" | "fallback_scene" deactivation_scene_preset_id: Optional[str] # scene for fallback_scene mode created_at: datetime updated_at: datetime tags: List[str] = field(default_factory=list) # Backward-compatible property aliases @property def condition_logic(self) -> str: return self.rule_logic @condition_logic.setter def condition_logic(self, value: str) -> None: self.rule_logic = value @property def conditions(self) -> List[Rule]: return self.rules @conditions.setter def conditions(self, value: List[Rule]) -> None: self.rules = value def to_dict(self) -> dict: return { "id": self.id, "name": self.name, "enabled": self.enabled, "rule_logic": self.rule_logic, "rules": [r.to_dict() for r in self.rules], "scene_preset_id": self.scene_preset_id, "deactivation_mode": self.deactivation_mode, "deactivation_scene_preset_id": self.deactivation_scene_preset_id, "tags": self.tags, "created_at": self.created_at.isoformat(), "updated_at": self.updated_at.isoformat(), } @classmethod def from_dict(cls, data: dict) -> "Automation": rules = [] # Support legacy "conditions" key for migration raw_rules = data.get("rules") or data.get("conditions", []) for r_data in raw_rules: try: rule = Rule.from_dict(r_data) # Skip "always" rules during migration (they're redundant) if r_data.get("rule_type") == "always" or r_data.get("condition_type") == "always": logger.info("Migrating 'always' condition to startup rule") rule = StartupRule() rules.append(rule) except ValueError as e: logger.warning("Skipping unknown rule type on load: %s", e) return cls( id=data["id"], name=data["name"], enabled=data.get("enabled", True), rule_logic=data.get("rule_logic") or data.get("condition_logic", "or"), rules=rules, scene_preset_id=data.get("scene_preset_id"), deactivation_mode=data.get("deactivation_mode", "none"), deactivation_scene_preset_id=data.get("deactivation_scene_preset_id"), tags=data.get("tags", []), created_at=datetime.fromisoformat( data.get("created_at", datetime.now(timezone.utc).isoformat()) ), updated_at=datetime.fromisoformat( data.get("updated_at", datetime.now(timezone.utc).isoformat()) ), )