"""Action runner — orchestrates loading, executing, and logging actions.""" from __future__ import annotations import logging from datetime import datetime, timezone from typing import Any from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from notify_bridge_core.providers.action_executor import ActionResult from ..database.engine import get_engine from ..database.models import ( Action, ActionExecution, ActionRule, EventLog, ServiceProvider, ) _LOGGER = logging.getLogger(__name__) async def run_action( action_id: int, *, trigger: str = "scheduled" ) -> ActionResult: """Load an action from DB, execute it, and save the execution log.""" engine = get_engine() # ------------------------------------------------------------------ # 1. Load all DB data eagerly (before aiohttp context) # ------------------------------------------------------------------ async with AsyncSession(engine) as session: action = await session.get(Action, action_id) if not action: return ActionResult(success=False, error="Action not found") if not action.enabled and trigger == "scheduled": return ActionResult(success=False, error="Action is disabled") provider = await session.get(ServiceProvider, action.provider_id) if not provider: return ActionResult(success=False, error="Provider not found") result = await session.exec( select(ActionRule) .where(ActionRule.action_id == action_id) .where(ActionRule.enabled == True) # noqa: E712 .order_by(ActionRule.order) ) rules = result.all() if not rules: return ActionResult(success=True, rules_processed=0) # Snapshot data provider_type = provider.type provider_config = dict(provider.config) provider_name = provider.name action_type = action.action_type action_config = dict(action.config) if action.config else {} rule_configs = [ {**dict(r.rule_config), "name": r.name} for r in rules ] # Create execution record execution = ActionExecution( action_id=action_id, trigger=trigger, status="running", ) session.add(execution) await session.commit() await session.refresh(execution) execution_id = execution.id # ------------------------------------------------------------------ # 2. Execute via provider-specific executor # ------------------------------------------------------------------ is_dry_run = trigger == "dry_run" action_result: ActionResult try: action_result = await _execute_with_provider( provider_type=provider_type, provider_config=provider_config, provider_name=provider_name, action_type=action_type, action_config=action_config, rule_configs=rule_configs, dry_run=is_dry_run, ) except Exception as err: _LOGGER.error("Action %d execution error: %s", action_id, err) action_result = ActionResult(success=False, error=str(err)) # ------------------------------------------------------------------ # 3. Save execution results # ------------------------------------------------------------------ async with AsyncSession(engine) as session: execution = await session.get(ActionExecution, execution_id) if execution: execution.finished_at = datetime.now(timezone.utc) if action_result.error is not None and action_result.rules_succeeded == 0: execution.status = "failed" elif action_result.rules_failed > 0: execution.status = "partial" else: execution.status = "success" execution.rules_processed = action_result.rules_processed execution.rules_succeeded = action_result.rules_succeeded execution.rules_failed = action_result.rules_failed execution.total_items_affected = action_result.total_items_affected execution.summary = action_result.to_dict() execution.error = action_result.error or "" session.add(execution) # Update action last_run metadata + emit a dashboard EventLog row # (skip both for dry runs — dashboards should not count previews). if not is_dry_run: action = await session.get(Action, action_id) if action: action.last_run_at = datetime.now(timezone.utc) action.last_run_status = execution.status if execution else "" session.add(action) provider = await session.get(ServiceProvider, action.provider_id) status_str = execution.status if execution else "success" event_type = f"action_{status_str}" # action_success|partial|failed session.add(EventLog( user_id=action.user_id, tracker_id=None, tracker_name="", action_id=action.id, action_name=action.name, provider_id=provider.id if provider else None, provider_name=(provider.name if provider else "") or "", event_type=event_type, collection_id=str(action.id), # ``collection_name`` is what the dashboard row shows as the # event subject; use the action name so the row is readable # without a separate action_name renderer. collection_name=action.name, assets_count=action_result.total_items_affected, details={ "action_type": action.action_type, "trigger": trigger, "rules_processed": action_result.rules_processed, "rules_succeeded": action_result.rules_succeeded, "rules_failed": action_result.rules_failed, "error": action_result.error or "", "execution_id": execution_id, }, )) await session.commit() _LOGGER.info( "Action %d (%s) completed: %d/%d rules succeeded, %d items affected", action_id, trigger, action_result.rules_succeeded, action_result.rules_processed, action_result.total_items_affected, ) return action_result async def dry_run_action(action_id: int) -> ActionResult: """Execute a dry-run of an action (no mutations).""" return await run_action(action_id, trigger="dry_run") async def _execute_with_provider( *, provider_type: str, provider_config: dict[str, Any], provider_name: str, action_type: str, action_config: dict[str, Any], rule_configs: list[dict[str, Any]], dry_run: bool, ) -> ActionResult: """Instantiate the appropriate executor and run.""" if provider_type == "immich": from notify_bridge_core.providers.immich.action_executor import ( ImmichActionExecutor, ) from notify_bridge_core.providers.immich.client import ImmichClient from .http_session import get_http_session http_session = await get_http_session() client = ImmichClient( http_session, provider_config.get("url", ""), provider_config.get("api_key", ""), ) external_domain = provider_config.get("external_domain") if external_domain: client.external_domain = external_domain # Verify connectivity if not await client.ping(): return ActionResult( success=False, error=f"Cannot connect to Immich server ({provider_name})", ) executor = ImmichActionExecutor(client) if dry_run: return await executor.dry_run(action_type, rule_configs, action_config) return await executor.execute(action_type, rule_configs, action_config) return ActionResult( success=False, error=f"No action executor for provider type: {provider_type}", )