Files
ledgrab/server/src/ledgrab/api/routes/automations.py
T
alexei.dolgolyov 3fe66d80cb feat(automations): expand automation rules + UI + engine coverage
Storage model + Pydantic schema + route surface gain the new rule
shapes the engine already supports. Frontend automations editor grows
the matching inputs. New core/test_automation_engine.py pins the
dispatch table rules behind ~285 lines of unit coverage.
2026-05-23 00:48:19 +03:00

389 lines
12 KiB
Python

"""Automation management API routes."""
import secrets
from fastapi import APIRouter, Depends, HTTPException, Request
from ledgrab.api.auth import AuthRequired
from ledgrab.api.dependencies import (
fire_entity_event,
get_automation_engine,
get_automation_store,
get_scene_preset_store,
)
from ledgrab.api.schemas.automations import (
AutomationCreate,
AutomationListResponse,
AutomationResponse,
AutomationUpdate,
RuleSchema,
)
from ledgrab.core.automations.automation_engine import AutomationEngine
from ledgrab.storage.automation import (
ApplicationRule,
DisplayStateRule,
HomeAssistantRule,
HTTPPollRule,
MQTTRule,
Rule,
StartupRule,
SystemIdleRule,
TimeOfDayRule,
WebhookRule,
)
from ledgrab.storage.automation_store import AutomationStore
from ledgrab.storage.scene_preset_store import ScenePresetStore
from ledgrab.utils import get_logger
from ledgrab.storage.base_store import EntityNotFoundError
logger = get_logger(__name__)
router = APIRouter()
# ===== Helpers =====
def _rule_from_schema(s: RuleSchema) -> Rule:
_SCHEMA_TO_RULE = {
"application": lambda: ApplicationRule(
apps=s.apps or [],
match_type=s.match_type or "running",
),
"time_of_day": lambda: TimeOfDayRule(
start_time=s.start_time or "00:00",
end_time=s.end_time or "23:59",
),
"system_idle": lambda: SystemIdleRule(
idle_minutes=s.idle_minutes if s.idle_minutes is not None else 5,
when_idle=s.when_idle if s.when_idle is not None else True,
),
"display_state": lambda: DisplayStateRule(
state=s.state or "on",
),
"mqtt": lambda: MQTTRule(
mqtt_source_id=s.mqtt_source_id or "",
topic=s.topic or "",
payload=s.payload or "",
match_mode=s.match_mode or "exact",
),
"webhook": lambda: WebhookRule(
token=s.token or secrets.token_hex(16),
),
"startup": lambda: StartupRule(),
"home_assistant": lambda: HomeAssistantRule(
ha_source_id=s.ha_source_id or "",
entity_id=s.entity_id or "",
state=s.state or "",
match_mode=s.match_mode or "exact",
),
"http_poll": lambda: HTTPPollRule(
value_source_id=s.value_source_id or "",
operator=s.operator or "equals",
value=s.value or "",
),
}
factory = _SCHEMA_TO_RULE.get(s.rule_type)
if factory is None:
raise ValueError(f"Unknown rule type: {s.rule_type}")
return factory()
def _rule_to_schema(r: Rule) -> RuleSchema:
d = r.to_dict()
return RuleSchema(**d)
def _automation_to_response(
automation, engine: AutomationEngine, request: Request = None
) -> AutomationResponse:
state = engine.get_automation_state(automation.id)
# Build webhook URL from the first webhook rule (if any)
webhook_url = None
for r in automation.rules:
if isinstance(r, WebhookRule) and r.token:
# Prefer configured external URL, fall back to request base URL
from ledgrab.api.routes.system import load_external_url
ext = load_external_url()
if ext:
webhook_url = ext + f"/api/v1/webhooks/{r.token}"
elif request:
webhook_url = str(request.base_url).rstrip("/") + f"/api/v1/webhooks/{r.token}"
else:
webhook_url = f"/api/v1/webhooks/{r.token}"
break
return AutomationResponse(
id=automation.id,
name=automation.name,
enabled=automation.enabled,
rule_logic=automation.rule_logic,
rules=[_rule_to_schema(r) for r in automation.rules],
scene_preset_id=automation.scene_preset_id,
deactivation_mode=automation.deactivation_mode,
deactivation_scene_preset_id=automation.deactivation_scene_preset_id,
webhook_url=webhook_url,
is_active=state["is_active"],
last_activated_at=state.get("last_activated_at"),
last_deactivated_at=state.get("last_deactivated_at"),
tags=automation.tags,
icon=getattr(automation, "icon", "") or "",
icon_color=getattr(automation, "icon_color", "") or "",
created_at=automation.created_at,
updated_at=automation.updated_at,
)
def _validate_rule_logic(logic: str) -> None:
if logic not in ("or", "and"):
raise HTTPException(
status_code=400, detail=f"Invalid rule_logic: {logic}. Must be 'or' or 'and'."
)
def _validate_scene_refs(
scene_preset_id: str | None,
deactivation_scene_preset_id: str | None,
scene_store: ScenePresetStore,
) -> None:
"""Validate that referenced scene preset IDs exist."""
for sid, label in [
(scene_preset_id, "scene_preset_id"),
(deactivation_scene_preset_id, "deactivation_scene_preset_id"),
]:
if sid is not None:
try:
scene_store.get_preset(sid)
except ValueError:
raise HTTPException(
status_code=400, detail=f"Scene preset not found: {sid} ({label})"
)
# ===== CRUD Endpoints =====
@router.post(
"/api/v1/automations",
response_model=AutomationResponse,
tags=["Automations"],
status_code=201,
)
async def create_automation(
request: Request,
data: AutomationCreate,
_auth: AuthRequired,
store: AutomationStore = Depends(get_automation_store),
engine: AutomationEngine = Depends(get_automation_engine),
scene_store: ScenePresetStore = Depends(get_scene_preset_store),
):
"""Create a new automation."""
_validate_rule_logic(data.rule_logic)
_validate_scene_refs(data.scene_preset_id, data.deactivation_scene_preset_id, scene_store)
try:
rules = [_rule_from_schema(r) for r in data.rules]
except EntityNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
automation = store.create_automation(
name=data.name,
enabled=data.enabled,
rule_logic=data.rule_logic,
rules=rules,
scene_preset_id=data.scene_preset_id,
deactivation_mode=data.deactivation_mode,
deactivation_scene_preset_id=data.deactivation_scene_preset_id,
tags=data.tags,
icon=data.icon,
icon_color=data.icon_color,
)
if automation.enabled:
await engine.trigger_evaluate()
fire_entity_event("automation", "created", automation.id)
return _automation_to_response(automation, engine, request)
@router.get(
"/api/v1/automations",
response_model=AutomationListResponse,
tags=["Automations"],
)
async def list_automations(
request: Request,
_auth: AuthRequired,
store: AutomationStore = Depends(get_automation_store),
engine: AutomationEngine = Depends(get_automation_engine),
):
"""List all automations."""
automations = store.get_all_automations()
return AutomationListResponse(
automations=[_automation_to_response(a, engine, request) for a in automations],
count=len(automations),
)
@router.get(
"/api/v1/automations/{automation_id}",
response_model=AutomationResponse,
tags=["Automations"],
)
async def get_automation(
request: Request,
automation_id: str,
_auth: AuthRequired,
store: AutomationStore = Depends(get_automation_store),
engine: AutomationEngine = Depends(get_automation_engine),
):
"""Get a single automation."""
try:
automation = store.get_automation(automation_id)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
return _automation_to_response(automation, engine, request)
@router.put(
"/api/v1/automations/{automation_id}",
response_model=AutomationResponse,
tags=["Automations"],
)
async def update_automation(
request: Request,
automation_id: str,
data: AutomationUpdate,
_auth: AuthRequired,
store: AutomationStore = Depends(get_automation_store),
engine: AutomationEngine = Depends(get_automation_engine),
scene_store: ScenePresetStore = Depends(get_scene_preset_store),
):
"""Update an automation."""
if data.rule_logic is not None:
_validate_rule_logic(data.rule_logic)
# Validate scene refs (only the ones being updated)
_validate_scene_refs(data.scene_preset_id, data.deactivation_scene_preset_id, scene_store)
rules = None
if data.rules is not None:
try:
rules = [_rule_from_schema(r) for r in data.rules]
except EntityNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
try:
# If disabling, deactivate first
if data.enabled is False:
await engine.deactivate_if_active(automation_id)
# Build update kwargs — use sentinel for Optional[str] fields
update_kwargs = dict(
automation_id=automation_id,
name=data.name,
enabled=data.enabled,
rule_logic=data.rule_logic,
rules=rules,
deactivation_mode=data.deactivation_mode,
tags=data.tags,
icon=data.icon,
icon_color=data.icon_color,
)
if data.scene_preset_id is not None:
update_kwargs["scene_preset_id"] = data.scene_preset_id
if data.deactivation_scene_preset_id is not None:
update_kwargs["deactivation_scene_preset_id"] = data.deactivation_scene_preset_id
automation = store.update_automation(**update_kwargs)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
# Re-evaluate immediately if automation is enabled (may have new rules/scene)
if automation.enabled:
await engine.trigger_evaluate()
fire_entity_event("automation", "updated", automation_id)
return _automation_to_response(automation, engine, request)
@router.delete(
"/api/v1/automations/{automation_id}",
status_code=204,
tags=["Automations"],
)
async def delete_automation(
automation_id: str,
_auth: AuthRequired,
store: AutomationStore = Depends(get_automation_store),
engine: AutomationEngine = Depends(get_automation_engine),
):
"""Delete an automation."""
# Deactivate first
await engine.deactivate_if_active(automation_id)
try:
store.delete_automation(automation_id)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
fire_entity_event("automation", "deleted", automation_id)
# ===== Enable/Disable =====
@router.post(
"/api/v1/automations/{automation_id}/enable",
response_model=AutomationResponse,
tags=["Automations"],
)
async def enable_automation(
request: Request,
automation_id: str,
_auth: AuthRequired,
store: AutomationStore = Depends(get_automation_store),
engine: AutomationEngine = Depends(get_automation_engine),
):
"""Enable an automation."""
try:
automation = store.update_automation(automation_id=automation_id, enabled=True)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
# Evaluate immediately so scene activates without waiting for the next poll cycle
await engine.trigger_evaluate()
return _automation_to_response(automation, engine, request)
@router.post(
"/api/v1/automations/{automation_id}/disable",
response_model=AutomationResponse,
tags=["Automations"],
)
async def disable_automation(
request: Request,
automation_id: str,
_auth: AuthRequired,
store: AutomationStore = Depends(get_automation_store),
engine: AutomationEngine = Depends(get_automation_engine),
):
"""Disable an automation and deactivate it."""
await engine.deactivate_if_active(automation_id)
try:
automation = store.update_automation(automation_id=automation_id, enabled=False)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
return _automation_to_response(automation, engine, request)