diff --git a/frontend/src/lib/components/IconPicker.svelte b/frontend/src/lib/components/IconPicker.svelte index 1ae2961..d2c2ee3 100644 --- a/frontend/src/lib/components/IconPicker.svelte +++ b/frontend/src/lib/components/IconPicker.svelte @@ -1,5 +1,5 @@ diff --git a/frontend/src/lib/components/MultiEntitySelect.svelte b/frontend/src/lib/components/MultiEntitySelect.svelte new file mode 100644 index 0000000..c666637 --- /dev/null +++ b/frontend/src/lib/components/MultiEntitySelect.svelte @@ -0,0 +1,352 @@ + + + +
+ {#if selectedItems.length > 0} +
+ {#each selectedItems as item} + + {#if item.icon}{/if} + {item.label} + + + {/each} +
+ {/if} + +
+ + +{#if open} + + +
+
+ + + {(values || []).length}/{items.length} + ESC +
+ +
+ {#if filtered.length === 0} +
No matches
+ {:else} + {#each filtered as item, i} + {@const checked = (values || []).includes(item.value)} + + {/each} + {/if} +
+
+{/if} + + diff --git a/frontend/src/lib/i18n/en.json b/frontend/src/lib/i18n/en.json index 79680b8..1ab2f12 100644 --- a/frontend/src/lib/i18n/en.json +++ b/frontend/src/lib/i18n/en.json @@ -33,7 +33,9 @@ "targetDiscord": "Discord", "targetSlack": "Slack", "targetNtfy": "ntfy", - "targetMatrix": "Matrix" + "targetMatrix": "Matrix", + "automation": "Automation", + "actions": "Actions" }, "auth": { "signIn": "Sign in", @@ -138,6 +140,7 @@ "server": "Provider", "selectServer": "Select provider...", "albums": "Albums", + "selectAlbums": "Select albums...", "eventTypes": "Event Types", "notificationTargets": "Notification Targets", "scanInterval": "Scan Interval (seconds)", @@ -828,5 +831,59 @@ "navigate": "navigate", "open": "open", "close": "close" + }, + "actions": { + "title": "Actions", + "description": "Scheduled mutations on external services", + "addAction": "Add Action", + "noActions": "No actions configured yet.", + "provider": "Provider", + "selectProvider": "Select provider...", + "actionType": "Action Type", + "name": "Name", + "schedule": "Schedule", + "interval": "Interval", + "seconds": "seconds", + "cronHint": "Standard cron expression (e.g. 0 3 * * * for daily at 3 AM)", + "enabled": "Enabled", + "rules": "rules", + "addRule": "Add Rule", + "ruleName": "Rule Name", + "ruleNamePlaceholder": "e.g. Alice → Family Album", + "unnamedRule": "Unnamed rule", + "noRules": "No rules yet. Add a rule to define what this action does.", + "on": "ON", + "off": "OFF", + "criteria": "Criteria", + "persons": "Persons", + "addPerson": "Add person...", + "searchQuery": "Smart Search Query", + "searchQueryPlaceholder": "e.g. sunset, beach, birthday...", + "assetType": "Asset type", + "dateFrom": "From date", + "dateTo": "To date", + "favoritesOnly": "Favorites only", + "targetAlbum": "Target Album", + "selectAlbum": "Album", + "selectAlbumPlaceholder": "— Select album —", + "albumId": "Album ID", + "createAlbumIfMissing": "Create album if it doesn't exist", + "newAlbumName": "New album name", + "execute": "Execute", + "dryRun": "Dry Run", + "history": "History", + "affected": "affected", + "executeResult": "Action executed: {affected} items affected", + "dryRunResult": "Dry run: {affected} items would be affected", + "saved": "Action saved", + "deleted": "Action deleted", + "ruleSaved": "Rule saved", + "ruleDeleted": "Rule deleted", + "confirmDelete": "Are you sure you want to delete this action? All rules and execution history will be lost.", + "loadError": "Failed to load actions", + "noExecutions": "No executions yet.", + "triggerManual": "manual", + "triggerDryRun": "dry-run", + "triggerScheduled": "scheduled" } } \ No newline at end of file diff --git a/frontend/src/lib/i18n/ru.json b/frontend/src/lib/i18n/ru.json index 1370524..8370dea 100644 --- a/frontend/src/lib/i18n/ru.json +++ b/frontend/src/lib/i18n/ru.json @@ -33,7 +33,9 @@ "targetDiscord": "Discord", "targetSlack": "Slack", "targetNtfy": "ntfy", - "targetMatrix": "Matrix" + "targetMatrix": "Matrix", + "automation": "Автоматизация", + "actions": "Действия" }, "auth": { "signIn": "Войти", @@ -138,6 +140,7 @@ "server": "Провайдер", "selectServer": "Выберите провайдер...", "albums": "Альбомы", + "selectAlbums": "Выберите альбомы...", "eventTypes": "Типы событий", "notificationTargets": "Получатели уведомлений", "scanInterval": "Интервал проверки (секунды)", @@ -828,5 +831,59 @@ "navigate": "навигация", "open": "открыть", "close": "закрыть" + }, + "actions": { + "title": "Действия", + "description": "Запланированные операции над внешними сервисами", + "addAction": "Добавить действие", + "noActions": "Действия ещё не настроены.", + "provider": "Провайдер", + "selectProvider": "Выберите провайдер...", + "actionType": "Тип действия", + "name": "Название", + "schedule": "Расписание", + "interval": "Интервал", + "seconds": "секунд", + "cronHint": "Стандартное cron-выражение (напр. 0 3 * * * — ежедневно в 3:00)", + "enabled": "Включено", + "rules": "правил", + "addRule": "Добавить правило", + "ruleName": "Название правила", + "ruleNamePlaceholder": "напр. Алиса → Семейный альбом", + "unnamedRule": "Без названия", + "noRules": "Правил пока нет. Добавьте правило, чтобы определить, что делает это действие.", + "on": "ВКЛ", + "off": "ВЫКЛ", + "criteria": "Критерии", + "persons": "Люди", + "addPerson": "Добавить человека...", + "searchQuery": "Умный поиск", + "searchQueryPlaceholder": "напр. закат, пляж, день рождения...", + "assetType": "Тип файла", + "dateFrom": "С даты", + "dateTo": "По дату", + "favoritesOnly": "Только избранное", + "targetAlbum": "Целевой альбом", + "selectAlbum": "Альбом", + "selectAlbumPlaceholder": "— Выберите альбом —", + "albumId": "ID альбома", + "createAlbumIfMissing": "Создать альбом, если не существует", + "newAlbumName": "Название нового альбома", + "execute": "Выполнить", + "dryRun": "Пробный запуск", + "history": "История", + "affected": "затронуто", + "executeResult": "Действие выполнено: затронуто {affected} объектов", + "dryRunResult": "Пробный запуск: было бы затронуто {affected} объектов", + "saved": "Действие сохранено", + "deleted": "Действие удалено", + "ruleSaved": "Правило сохранено", + "ruleDeleted": "Правило удалено", + "confirmDelete": "Вы уверены, что хотите удалить это действие? Все правила и история выполнений будут потеряны.", + "loadError": "Не удалось загрузить действия", + "noExecutions": "Выполнений пока нет.", + "triggerManual": "вручную", + "triggerDryRun": "пробный", + "triggerScheduled": "по расписанию" } } \ No newline at end of file diff --git a/frontend/src/lib/mdi-lookup.ts b/frontend/src/lib/mdi-lookup.svelte.ts similarity index 82% rename from frontend/src/lib/mdi-lookup.ts rename to frontend/src/lib/mdi-lookup.svelte.ts index 277a795..d2f8a36 100644 --- a/frontend/src/lib/mdi-lookup.ts +++ b/frontend/src/lib/mdi-lookup.svelte.ts @@ -4,9 +4,13 @@ * Instead of `import * as mdi from '@mdi/js'` (which loads ~5MB of SVG paths * into memory), this module loads the full set once on first use and caches it. * Vite only processes the import once, reducing HMR memory pressure. + * + * Uses $state so that components re-render once the icon module finishes loading + * (fixes icons not appearing until page reload after cache-clearing navigations). */ let _cache: Record | null = null; +let _ready = $state | null>(null); async function _load(): Promise> { if (_cache) return _cache; @@ -16,7 +20,6 @@ async function _load(): Promise> { } // Eagerly load on module init (runs once) -let _ready: Record | null = null; _load().then(m => { _ready = m; }); /** diff --git a/frontend/src/lib/stores/caches.svelte.ts b/frontend/src/lib/stores/caches.svelte.ts index 8843905..16270cf 100644 --- a/frontend/src/lib/stores/caches.svelte.ts +++ b/frontend/src/lib/stores/caches.svelte.ts @@ -18,6 +18,7 @@ import type { CommandConfig, CommandTemplateConfig, CommandTracker, + Action, } from '$lib/types'; /** Service providers — used by Dashboard, Trackers, Command Trackers, Providers page. */ @@ -53,6 +54,9 @@ export const commandTemplateConfigsCache = createEntityCache('/command-trackers'); +/** Actions — used by Actions page. */ +export const actionsCache = createEntityCache('/actions'); + /** Provider capabilities — used by Template Configs, Command Configs. */ export const capabilitiesCache = (() => { let data = $state>({}); @@ -85,6 +89,7 @@ export const allCaches = { command_configs: commandConfigsCache, command_template_configs: commandTemplateConfigsCache, command_trackers: commandTrackersCache, + actions: actionsCache, } as const; /** diff --git a/frontend/src/lib/types.ts b/frontend/src/lib/types.ts index 07b60c1..0349a43 100644 --- a/frontend/src/lib/types.ts +++ b/frontend/src/lib/types.ts @@ -219,6 +219,55 @@ export interface CommandTracker { created_at: string; } +export interface ActionRule { + id: number; + action_id: number; + name: string; + rule_config: Record; + enabled: boolean; + order: number; + created_at: string; +} + +export interface Action { + id: number; + name: string; + icon: string; + provider_id: number; + action_type: string; + config: Record; + schedule_type: string; + schedule_interval: number; + schedule_cron: string; + enabled: boolean; + last_run_at: string | null; + last_run_status: string; + rules: ActionRule[]; + created_at: string; +} + +export interface ActionExecution { + id: number; + action_id: number; + started_at: string; + finished_at: string | null; + status: string; + rules_processed: number; + rules_succeeded: number; + rules_failed: number; + total_items_affected: number; + summary: Record; + error: string; + trigger: string; +} + +export interface ActionTypeInfo { + key: string; + name: string; + description: string; + provider_type: string; +} + export interface DashboardStatus { providers: number; trackers: { total: number; active: number }; diff --git a/frontend/src/routes/+layout.svelte b/frontend/src/routes/+layout.svelte index 489faac..3d76219 100644 --- a/frontend/src/routes/+layout.svelte +++ b/frontend/src/routes/+layout.svelte @@ -84,6 +84,12 @@ { href: '/command-template-configs', key: 'nav.templates', icon: 'mdiCodeBracesBox', countKey: 'command_template_configs' }, ], }, + { + key: 'nav.automation', icon: 'mdiRobotOutline', + children: [ + { href: '/actions', key: 'nav.actions', icon: 'mdiPlayCircleOutline', countKey: 'actions' }, + ], + }, { key: 'nav.bots', icon: 'mdiRobot', children: [ diff --git a/frontend/src/routes/actions/+page.svelte b/frontend/src/routes/actions/+page.svelte new file mode 100644 index 0000000..d401f38 --- /dev/null +++ b/frontend/src/routes/actions/+page.svelte @@ -0,0 +1,337 @@ + + + + + + +{#if !loaded} + +{:else} + +{#if loadError} + +
+ + {loadError} +
+
+{/if} + +{#if showForm} +
+ + {#if error} +
{error}
+ {/if} +
+
+ + +
+ + {#if actionTypes.length > 0} +
+ + {#if !editing} +
+ {#each actionTypes as at} + + {/each} +
+ {:else} +

{form.action_type}

+ {/if} +
+ {/if} + +
+ +
+ form.icon = v} /> + +
+
+ +
+ +
+ + +
+ {#if form.schedule_type === 'interval'} +
+ + {t('actions.seconds')} +
+ {:else} + +

{t('actions.cronHint')}

+ {/if} +
+ +
+ + +
+ + +
+ + {#if editing} +
+ + {/if} +
+
+{/if} + +{#if !showForm && allActions.length > 0} +
+ +
+{/if} + +{#if allActions.length === 0 && !showForm} + + + +{:else if actions.length === 0 && !showForm} + + + +{:else if !showForm} +
+ {#each actions as action} + +
+
+
+ +
+
+

{action.name}

+ {action.action_type} +
+
+ {getProviderName(action.provider_id)} + {formatSchedule(action)} + {action.rules?.length || 0} {t('actions.rules')} + {#if action.last_run_status} + + {action.last_run_status} + + {/if} +
+
+
+
+ executeAction(action.id)} + disabled={executing[action.id]} /> + executeAction(action.id, true)} + disabled={executing[action.id]} /> + historyActionId = historyActionId === action.id ? null : action.id} /> + edit(action)} /> + startDelete(action)} variant="danger" /> +
+
+ {#if historyActionId === action.id} +
+ +
+ {/if} +
+ {/each} +
+{/if} + +{/if} + + confirmDelete = null} /> diff --git a/frontend/src/routes/actions/ExecutionHistory.svelte b/frontend/src/routes/actions/ExecutionHistory.svelte new file mode 100644 index 0000000..06e3e91 --- /dev/null +++ b/frontend/src/routes/actions/ExecutionHistory.svelte @@ -0,0 +1,114 @@ + + +
+

+ {t('actions.history')} +

+ + {#if loading} +

{t('common.loading')}...

+ {:else if executions.length === 0} +

{t('actions.noExecutions')}

+ {:else} +
+ {#each executions as exec} + + + {#if expandedId === exec.id} +
+ {#if exec.error} +

{exec.error}

+ {/if} + {#if exec.summary?.rule_results} + {#each exec.summary.rule_results as rr} +
+ + + + {rr.rule_name} + + {rr.items_matched} matched, {rr.items_affected} affected, {rr.items_skipped} skipped + + {#if rr.error} + {rr.error} + {/if} +
+ {/each} + {/if} +
+ {/if} + {/each} +
+ {/if} +
diff --git a/frontend/src/routes/actions/RuleEditor.svelte b/frontend/src/routes/actions/RuleEditor.svelte new file mode 100644 index 0000000..4db3e8a --- /dev/null +++ b/frontend/src/routes/actions/RuleEditor.svelte @@ -0,0 +1,310 @@ + + +
+
+

{t('actions.rules')} ({rules.length})

+ +
+ + {#if loading} +

{t('common.loading')}...

+ {/if} + + {#if showAddForm} +
+
+ + +
+ + {#if actionType === 'auto_organize'} + {@render criteriaFields(newRule.rule_config)} + {/if} + + +
+ {/if} + + {#each rules as rule} +
+
+ +
+ + deleteRule(rule.id)} variant="danger" size={16} /> +
+
+ + {#if expandedRule === rule.id} +
+
+ + +
+ + {#if actionType === 'auto_organize'} + {@render criteriaFields(rule.rule_config)} + {/if} + + +
+ {/if} +
+ {/each} + + {#if rules.length === 0 && !loading && !showAddForm} +

{t('actions.noRules')}

+ {/if} +
+ +{#snippet criteriaFields(ruleConfig: any)} +
+ {t('actions.criteria')} + + + {#if personItems.length > 0} +
+ + { + ruleConfig.criteria.person_names = ids.map(id => people.find(p => p.id === id)?.name || id); + }} /> +
+ {/if} + + +
+ + +
+ + +
+ + {#each ['all', 'image', 'video'] as at} + + {/each} +
+ + +
+
+ + +
+
+ + +
+
+ + + +
+ + +
+ {t('actions.targetAlbum')} + + {#if albumItems.length > 0} +
+ + { + ruleConfig.target_album_names = ids.map(id => albums.find(a => a.id === id)?.name || id); + }} /> +
+ {:else} +
+ + +
+ {/if} + + + + {#if ruleConfig.create_album_if_missing} +
+ + +
+ {/if} +
+{/snippet} diff --git a/frontend/src/routes/notification-trackers/TrackerForm.svelte b/frontend/src/routes/notification-trackers/TrackerForm.svelte index 85d2328..0796bca 100644 --- a/frontend/src/routes/notification-trackers/TrackerForm.svelte +++ b/frontend/src/routes/notification-trackers/TrackerForm.svelte @@ -5,6 +5,7 @@ import IconPicker from '$lib/components/IconPicker.svelte'; import Hint from '$lib/components/Hint.svelte'; import EntitySelect from '$lib/components/EntitySelect.svelte'; + import MultiEntitySelect from '$lib/components/MultiEntitySelect.svelte'; interface Props { form: { @@ -18,15 +19,15 @@ }; providerItems: { value: number; label: string; icon: string; desc: string }[]; collections: any[]; - collectionFilter: string; + collectionFilter?: string; editing: number | null; submitting: boolean; linkCheckLoading: boolean; error: string; providerType: string; onsave: (e: SubmitEvent) => void; - ontoggleCollection: (collectionId: string) => void; - formatDate: (dateStr: string) => string; + ontoggleCollection?: (collectionId: string) => void; + formatDate?: (dateStr: string) => string; } let { @@ -91,22 +92,17 @@ {#if !isScheduler && collections.length > 0}
- - -
- {#each collections.filter(a => !collectionFilter || (a.albumName || a.name || '').toLowerCase().includes(collectionFilter.toLowerCase())) as col} - - {/each} -
+ + ({ + value: col.id, + label: col.albumName || col.name, + icon: 'mdiImageMultiple', + desc: `${col.assetCount ?? col.asset_count ?? 0} assets`, + }))} + bind:values={form.collection_ids} + placeholder={t('notificationTracker.selectAlbums')} + />
{/if} diff --git a/packages/core/src/notify_bridge_core/providers/action_executor.py b/packages/core/src/notify_bridge_core/providers/action_executor.py new file mode 100644 index 0000000..a984ebe --- /dev/null +++ b/packages/core/src/notify_bridge_core/providers/action_executor.py @@ -0,0 +1,112 @@ +"""Action executor base class and result types. + +Provides the abstract interface that provider-specific action executors must +implement. Lives in core (not server) so business logic stays testable +without database dependencies. +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class RuleResult: + """Outcome of executing a single rule.""" + + rule_name: str + success: bool + items_matched: int = 0 + items_affected: int = 0 + items_skipped: int = 0 + error: str | None = None + details: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class ActionResult: + """Aggregate outcome of executing an action (all rules).""" + + success: bool + rules_processed: int = 0 + rules_succeeded: int = 0 + rules_failed: int = 0 + total_items_affected: int = 0 + rule_results: list[RuleResult] = field(default_factory=list) + error: str | None = None + + def to_dict(self) -> dict[str, Any]: + """Serialize to a JSON-compatible dict for storage.""" + return { + "success": self.success, + "rules_processed": self.rules_processed, + "rules_succeeded": self.rules_succeeded, + "rules_failed": self.rules_failed, + "total_items_affected": self.total_items_affected, + "rule_results": [ + { + "rule_name": r.rule_name, + "success": r.success, + "items_matched": r.items_matched, + "items_affected": r.items_affected, + "items_skipped": r.items_skipped, + "error": r.error, + "details": r.details, + } + for r in self.rule_results + ], + "error": self.error, + } + + +class ActionExecutor(ABC): + """Abstract base class for provider-specific action executors. + + Each provider that supports actions implements a concrete executor. + The executor receives rule configs (plain dicts) and performs the + actual mutations on the external service. + """ + + @abstractmethod + async def execute( + self, + action_type: str, + rules: list[dict[str, Any]], + config: dict[str, Any], + ) -> ActionResult: + """Execute the action with given rules. + + Args: + action_type: The action type key (e.g. "auto_organize"). + rules: List of rule_config dicts (from ActionRule rows). + config: Action-level config dict. + + Returns: + ActionResult with per-rule outcomes. + """ + + @abstractmethod + async def validate_rules( + self, + action_type: str, + rules: list[dict[str, Any]], + ) -> list[str]: + """Validate rules before saving. + + Returns a list of error messages. Empty list means valid. + """ + + @abstractmethod + async def dry_run( + self, + action_type: str, + rules: list[dict[str, Any]], + config: dict[str, Any], + ) -> ActionResult: + """Preview what would happen without mutating anything. + + Returns the same ActionResult shape as execute(), but + items_affected reflects what *would* be changed. + """ diff --git a/packages/core/src/notify_bridge_core/providers/actions.py b/packages/core/src/notify_bridge_core/providers/actions.py new file mode 100644 index 0000000..116f58c --- /dev/null +++ b/packages/core/src/notify_bridge_core/providers/actions.py @@ -0,0 +1,126 @@ +"""Action type registry. + +Defines what action types each provider supports. Used by the API to expose +available actions and validate action configurations. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass(frozen=True) +class ActionTypeDefinition: + """Describes one type of action a provider supports.""" + + key: str # e.g. "auto_organize" + provider_type: str # e.g. "immich" + display_name: str # e.g. "Auto-Organize" + description: str + rule_schema: dict[str, Any] = field(default_factory=dict) + config_schema: dict[str, Any] = field(default_factory=dict) + + +# --------------------------------------------------------------------------- +# Registry +# --------------------------------------------------------------------------- + +_ACTION_TYPE_REGISTRY: dict[str, list[ActionTypeDefinition]] = {} + + +def register_action_type(defn: ActionTypeDefinition) -> None: + """Register an action type for a provider.""" + _ACTION_TYPE_REGISTRY.setdefault(defn.provider_type, []).append(defn) + + +def get_action_types(provider_type: str) -> list[ActionTypeDefinition]: + """Get all action types for a provider.""" + return list(_ACTION_TYPE_REGISTRY.get(provider_type, [])) + + +def get_action_type(provider_type: str, key: str) -> ActionTypeDefinition | None: + """Get a specific action type by provider and key.""" + for defn in _ACTION_TYPE_REGISTRY.get(provider_type, []): + if defn.key == key: + return defn + return None + + +def get_all_action_types() -> dict[str, list[ActionTypeDefinition]]: + """Get all registered action types grouped by provider.""" + return {k: list(v) for k, v in _ACTION_TYPE_REGISTRY.items()} + + +# --------------------------------------------------------------------------- +# Immich action types +# --------------------------------------------------------------------------- + +IMMICH_AUTO_ORGANIZE = ActionTypeDefinition( + key="auto_organize", + provider_type="immich", + display_name="Auto-Organize", + description="Sort assets into albums by person, search query, date, or favorites", + rule_schema={ + "type": "object", + "properties": { + "criteria": { + "type": "object", + "properties": { + "person_ids": { + "type": "array", + "items": {"type": "string"}, + "description": "Immich person UUIDs", + }, + "person_names": { + "type": "array", + "items": {"type": "string"}, + "description": "Display names (UI only)", + }, + "query": { + "type": "string", + "description": "Smart search query (CLIP)", + }, + "asset_type": { + "type": "string", + "enum": ["all", "image", "video"], + "default": "all", + }, + "date_from": { + "type": "string", + "format": "date", + "description": "ISO date lower bound", + }, + "date_to": { + "type": "string", + "format": "date", + "description": "ISO date upper bound", + }, + "favorite_only": { + "type": "boolean", + "default": False, + }, + }, + }, + "target_album_id": { + "type": "string", + "description": "Immich album UUID", + }, + "target_album_name": { + "type": "string", + "description": "Display name (UI only)", + }, + "create_album_if_missing": { + "type": "boolean", + "default": False, + }, + "create_album_name": { + "type": "string", + "description": "Name for auto-created album", + }, + }, + "required": ["criteria", "target_album_id"], + }, +) + +register_action_type(IMMICH_AUTO_ORGANIZE) diff --git a/packages/core/src/notify_bridge_core/providers/capabilities.py b/packages/core/src/notify_bridge_core/providers/capabilities.py index 6bb6916..906f296 100644 --- a/packages/core/src/notify_bridge_core/providers/capabilities.py +++ b/packages/core/src/notify_bridge_core/providers/capabilities.py @@ -35,6 +35,9 @@ class ProviderCapabilities: # Whether this provider receives webhooks (vs polling) webhook_based: bool = False + # Action types this provider supports (used by Actions feature) + action_types: list[dict[str, str]] = field(default_factory=list) + # --------------------------------------------------------------------------- # Immich provider capabilities @@ -46,6 +49,13 @@ IMMICH_CAPABILITIES = ProviderCapabilities( supported_filters=[ {"key": "collections", "label": "Albums", "type": "select", "source": "api"}, ], + action_types=[ + { + "key": "auto_organize", + "name": "Auto-Organize", + "description": "Sort assets into albums by person, search query, date, or favorites", + }, + ], notification_slots=[ {"name": "message_assets_added", "description": "New assets added to album"}, {"name": "message_assets_removed", "description": "Assets removed from album"}, diff --git a/packages/core/src/notify_bridge_core/providers/immich/action_executor.py b/packages/core/src/notify_bridge_core/providers/immich/action_executor.py new file mode 100644 index 0000000..9df6d65 --- /dev/null +++ b/packages/core/src/notify_bridge_core/providers/immich/action_executor.py @@ -0,0 +1,292 @@ +"""Immich action executor — implements auto_organize and future action types.""" + +from __future__ import annotations + +import logging +from typing import Any + +from ..action_executor import ActionExecutor, ActionResult, RuleResult +from .client import ImmichApiError, ImmichClient + +_LOGGER = logging.getLogger(__name__) + + +class ImmichActionExecutor(ActionExecutor): + """Executes actions against an Immich server.""" + + def __init__(self, client: ImmichClient) -> None: + self._client = client + + async def execute( + self, + action_type: str, + rules: list[dict[str, Any]], + config: dict[str, Any], + ) -> ActionResult: + if action_type == "auto_organize": + return await self._execute_auto_organize(rules, config, dry_run=False) + return ActionResult(success=False, error=f"Unknown action type: {action_type}") + + async def validate_rules( + self, + action_type: str, + rules: list[dict[str, Any]], + ) -> list[str]: + errors: list[str] = [] + if action_type == "auto_organize": + for i, rule in enumerate(rules): + criteria = rule.get("criteria", {}) + if not criteria: + errors.append(f"Rule {i + 1}: criteria is required") + target = rule.get("target_album_id", "") + create = rule.get("create_album_if_missing", False) + if not target and not create: + errors.append( + f"Rule {i + 1}: target_album_id is required " + "unless create_album_if_missing is true" + ) + if create and not rule.get("create_album_name"): + errors.append( + f"Rule {i + 1}: create_album_name is required " + "when create_album_if_missing is true" + ) + # Must have at least one criteria source + has_source = bool( + criteria.get("person_ids") + or criteria.get("query") + ) + if not has_source: + errors.append( + f"Rule {i + 1}: criteria must include " + "at least person_ids or query" + ) + else: + errors.append(f"Unknown action type: {action_type}") + return errors + + async def dry_run( + self, + action_type: str, + rules: list[dict[str, Any]], + config: dict[str, Any], + ) -> ActionResult: + if action_type == "auto_organize": + return await self._execute_auto_organize(rules, config, dry_run=True) + return ActionResult(success=False, error=f"Unknown action type: {action_type}") + + # ------------------------------------------------------------------ + # auto_organize implementation + # ------------------------------------------------------------------ + + async def _execute_auto_organize( + self, + rules: list[dict[str, Any]], + config: dict[str, Any], + *, + dry_run: bool, + ) -> ActionResult: + rule_results: list[RuleResult] = [] + total_affected = 0 + + for rule in rules: + result = await self._execute_single_organize_rule(rule, dry_run=dry_run) + rule_results.append(result) + if result.success: + total_affected += result.items_affected + + succeeded = sum(1 for r in rule_results if r.success) + failed = len(rule_results) - succeeded + + if failed == 0: + status = True + elif succeeded == 0: + status = False + else: + status = True # partial success is still "success" at action level + + return ActionResult( + success=status, + rules_processed=len(rule_results), + rules_succeeded=succeeded, + rules_failed=failed, + total_items_affected=total_affected, + rule_results=rule_results, + ) + + async def _execute_single_organize_rule( + self, + rule: dict[str, Any], + *, + dry_run: bool, + ) -> RuleResult: + rule_name = rule.get("name", rule.get("target_album_name", "unnamed")) + criteria = rule.get("criteria", {}) + create_if_missing = rule.get("create_album_if_missing", False) + create_album_name = rule.get("create_album_name", "") + + # Support both target_album_ids (array) and target_album_id (single, backward compat) + target_album_ids: list[str] = list(rule.get("target_album_ids") or []) + if not target_album_ids: + single = rule.get("target_album_id", "") + if single: + target_album_ids = [single] + + try: + # Step 1: Gather candidate assets from criteria + candidate_ids = await self._gather_candidates(criteria) + + if not candidate_ids: + return RuleResult( + rule_name=rule_name, + success=True, + items_matched=0, + items_affected=0, + items_skipped=0, + details={"message": "No assets matched criteria"}, + ) + + # If no target albums and create_if_missing, create one + if not target_album_ids and create_if_missing and create_album_name: + if dry_run: + _LOGGER.info("[DRY RUN] Would create album '%s'", create_album_name) + target_album_ids = ["__dry_run_new__"] + else: + created = await self._client.create_album(create_album_name) + target_album_ids = [created.get("id", "")] + _LOGGER.info("Created album '%s' with id %s", create_album_name, target_album_ids[0]) + + if not target_album_ids: + return RuleResult( + rule_name=rule_name, + success=False, + error="No target albums specified", + ) + + # Step 2-4: For each target album, diff and add + total_affected = 0 + total_skipped = 0 + album_details: list[dict[str, Any]] = [] + + for album_id in target_album_ids: + album_asset_ids: set[str] = set() + + if album_id and album_id != "__dry_run_new__": + album = await self._client.get_album(album_id) + if album is None and create_if_missing and create_album_name: + if not dry_run: + created = await self._client.create_album(create_album_name) + album_id = created.get("id", album_id) + _LOGGER.info("Created album '%s' with id %s", create_album_name, album_id) + elif album is None: + album_details.append({"album_id": album_id, "error": "not found"}) + continue + elif album is not None: + album_asset_ids = set(album.asset_ids) + + new_asset_ids = [aid for aid in candidate_ids if aid not in album_asset_ids] + skipped = len(candidate_ids) - len(new_asset_ids) + + if new_asset_ids and not dry_run and album_id: + for i in range(0, len(new_asset_ids), 500): + batch = new_asset_ids[i : i + 500] + await self._client.add_assets_to_album(album_id, batch) + _LOGGER.info("Added %d assets to album %s", len(new_asset_ids), album_id) + elif dry_run and new_asset_ids: + _LOGGER.info("[DRY RUN] Would add %d assets to album %s", len(new_asset_ids), album_id) + + total_affected += len(new_asset_ids) + total_skipped += skipped + album_details.append({"album_id": album_id, "added": len(new_asset_ids), "skipped": skipped}) + + return RuleResult( + rule_name=rule_name, + success=True, + items_matched=len(candidate_ids), + items_affected=total_affected, + items_skipped=total_skipped, + details={ + "target_album_ids": target_album_ids, + "albums": album_details, + "dry_run": dry_run, + }, + ) + + except ImmichApiError as err: + _LOGGER.error("Rule '%s' failed: %s", rule_name, err) + return RuleResult( + rule_name=rule_name, + success=False, + error=str(err), + ) + except Exception as err: + _LOGGER.error("Unexpected error in rule '%s': %s", rule_name, err) + return RuleResult( + rule_name=rule_name, + success=False, + error=f"Unexpected error: {err}", + ) + + async def _gather_candidates( + self, criteria: dict[str, Any] + ) -> list[str]: + """Gather asset IDs matching the criteria (union of all sources).""" + seen: set[str] = set() + result: list[str] = [] + + # Source 1: Person assets + person_ids = criteria.get("person_ids", []) + for pid in person_ids: + assets = await self._client.get_person_assets_all(pid) + for asset in assets: + aid = asset.get("id", "") + if aid and aid not in seen: + if self._matches_filters(asset, criteria): + seen.add(aid) + result.append(aid) + + # Source 2: Smart search + query = criteria.get("query", "") + if query: + assets = await self._client.search_smart_all(query) + for asset in assets: + aid = asset.get("id", "") + if aid and aid not in seen: + if self._matches_filters(asset, criteria): + seen.add(aid) + result.append(aid) + + return result + + def _matches_filters( + self, asset: dict[str, Any], criteria: dict[str, Any] + ) -> bool: + """Apply client-side filters (asset_type, date range, favorites).""" + # Asset type filter + asset_type_filter = criteria.get("asset_type", "all") + if asset_type_filter != "all": + asset_type = (asset.get("type") or "").lower() + if asset_type_filter == "image" and asset_type != "image": + return False + if asset_type_filter == "video" and asset_type != "video": + return False + + # Favorite filter + if criteria.get("favorite_only") and not asset.get("isFavorite"): + return False + + # Date range filter + date_from = criteria.get("date_from") + date_to = criteria.get("date_to") + if date_from or date_to: + created = asset.get("fileCreatedAt") or asset.get("createdAt") or "" + if created: + try: + asset_date = created[:10] # "YYYY-MM-DD" + if date_from and asset_date < date_from: + return False + if date_to and asset_date > date_to: + return False + except (ValueError, IndexError): + pass + + return True diff --git a/packages/core/src/notify_bridge_core/providers/immich/client.py b/packages/core/src/notify_bridge_core/providers/immich/client.py index a856c44..c246d0d 100644 --- a/packages/core/src/notify_bridge_core/providers/immich/client.py +++ b/packages/core/src/notify_bridge_core/providers/immich/client.py @@ -322,6 +322,120 @@ class ImmichClient: _LOGGER.warning("Failed to fetch memories: %s", err) return [] + # ------------------------------------------------------------------ + # Write methods (used by action executors) + # ------------------------------------------------------------------ + + async def add_assets_to_album( + self, album_id: str, asset_ids: list[str] + ) -> dict[str, Any]: + """Add assets to an album. Returns API response with success/error arrays.""" + payload = {"ids": asset_ids} + try: + async with self._session.put( + f"{self._url}/api/albums/{album_id}/assets", + headers=self._json_headers, + json=payload, + ) as response: + if response.status == 200: + return await response.json() + raise ImmichApiError( + f"Failed to add assets to album {album_id}: HTTP {response.status}" + ) + except aiohttp.ClientError as err: + raise ImmichApiError(f"Error adding assets to album: {err}") from err + + async def remove_assets_from_album( + self, album_id: str, asset_ids: list[str] + ) -> dict[str, Any]: + """Remove assets from an album.""" + payload = {"ids": asset_ids} + try: + async with self._session.delete( + f"{self._url}/api/albums/{album_id}/assets", + headers=self._json_headers, + json=payload, + ) as response: + if response.status == 200: + return await response.json() + raise ImmichApiError( + f"Failed to remove assets from album {album_id}: HTTP {response.status}" + ) + except aiohttp.ClientError as err: + raise ImmichApiError(f"Error removing assets from album: {err}") from err + + async def create_album( + self, name: str, asset_ids: list[str] | None = None + ) -> dict[str, Any]: + """Create a new album, optionally with initial assets.""" + payload: dict[str, Any] = {"albumName": name} + if asset_ids: + payload["assetIds"] = asset_ids + try: + async with self._session.post( + f"{self._url}/api/albums", + headers=self._json_headers, + json=payload, + ) as response: + if response.status == 201: + return await response.json() + raise ImmichApiError( + f"Failed to create album '{name}': HTTP {response.status}" + ) + except aiohttp.ClientError as err: + raise ImmichApiError(f"Error creating album: {err}") from err + + async def get_person_assets_all(self, person_id: str) -> list[dict[str, Any]]: + """Fetch ALL assets for a person (no limit).""" + try: + async with self._session.get( + f"{self._url}/api/people/{person_id}/assets", + headers=self._headers, + ) as response: + if response.status == 200: + data = await response.json() + return data if isinstance(data, list) else [] + if response.status == 404: + return [] + raise ImmichApiError( + f"Failed to fetch person {person_id} assets: HTTP {response.status}" + ) + except aiohttp.ClientError as err: + raise ImmichApiError(f"Error fetching person assets: {err}") from err + + async def search_smart_all( + self, query: str, limit: int = 1000 + ) -> list[dict[str, Any]]: + """Smart search with pagination up to limit.""" + all_items: list[dict[str, Any]] = [] + page = 1 + page_size = min(limit, 100) + while len(all_items) < limit: + payload = {"query": query, "page": page, "size": page_size} + try: + async with self._session.post( + f"{self._url}/api/search/smart", + headers=self._json_headers, + json=payload, + ) as response: + if response.status != 200: + break + data = await response.json() + items = data.get("assets", {}).get("items", []) + if not items: + break + all_items.extend(items) + if len(items) < page_size: + break + page += 1 + except aiohttp.ClientError: + break + return all_items[:limit] + + # ------------------------------------------------------------------ + # Read methods (continued) + # ------------------------------------------------------------------ + async def get_asset_thumbnail(self, asset_id: str, size: str = "preview") -> bytes | None: try: async with self._session.get( diff --git a/packages/server/src/notify_bridge_server/api/action_rules.py b/packages/server/src/notify_bridge_server/api/action_rules.py new file mode 100644 index 0000000..72c24fb --- /dev/null +++ b/packages/server/src/notify_bridge_server/api/action_rules.py @@ -0,0 +1,162 @@ +"""Action rule management API routes.""" + +import logging + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +from ..auth.dependencies import get_current_user +from ..database.engine import get_session +from ..database.models import Action, ActionRule, User + +_LOGGER = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/actions", tags=["action-rules"]) + + +# --------------------------------------------------------------------------- +# Request schemas +# --------------------------------------------------------------------------- + + +class ActionRuleCreate(BaseModel): + name: str = "" + rule_config: dict = {} + enabled: bool = True + order: int = 0 + + +class ActionRuleUpdate(BaseModel): + name: str | None = None + rule_config: dict | None = None + enabled: bool | None = None + order: int | None = None + + +class ReorderBody(BaseModel): + rule_ids: list[int] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _rule_response(rule: ActionRule) -> dict: + return { + "id": rule.id, + "action_id": rule.action_id, + "name": rule.name, + "rule_config": rule.rule_config, + "enabled": rule.enabled, + "order": rule.order, + "created_at": rule.created_at.isoformat() if rule.created_at else None, + } + + +async def _get_user_action( + session: AsyncSession, action_id: int, user: User +) -> Action: + action = await session.get(Action, action_id) + if not action or action.user_id != user.id: + raise HTTPException(status_code=404, detail="Action not found") + return action + + +# --------------------------------------------------------------------------- +# Routes +# --------------------------------------------------------------------------- + + +@router.get("/{action_id}/rules") +async def list_rules( + action_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + await _get_user_action(session, action_id, user) + result = await session.exec( + select(ActionRule) + .where(ActionRule.action_id == action_id) + .order_by(ActionRule.order) + ) + return [_rule_response(r) for r in result.all()] + + +@router.post("/{action_id}/rules", status_code=status.HTTP_201_CREATED) +async def create_rule( + action_id: int, + body: ActionRuleCreate, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + await _get_user_action(session, action_id, user) + rule = ActionRule(action_id=action_id, **body.model_dump()) + session.add(rule) + await session.commit() + await session.refresh(rule) + return _rule_response(rule) + + +@router.put("/{action_id}/rules/reorder") +async def reorder_rules( + action_id: int, + body: ReorderBody, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + await _get_user_action(session, action_id, user) + result = await session.exec( + select(ActionRule).where(ActionRule.action_id == action_id) + ) + rules_by_id = {r.id: r for r in result.all()} + + for idx, rule_id in enumerate(body.rule_ids): + rule = rules_by_id.get(rule_id) + if rule: + rule.order = idx + session.add(rule) + + await session.commit() + return {"status": "ok"} + + +@router.put("/{action_id}/rules/{rule_id}") +async def update_rule( + action_id: int, + rule_id: int, + body: ActionRuleUpdate, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + await _get_user_action(session, action_id, user) + rule = await session.get(ActionRule, rule_id) + if not rule or rule.action_id != action_id: + raise HTTPException(status_code=404, detail="Rule not found") + + updates = body.model_dump(exclude_unset=True) + for key, value in updates.items(): + setattr(rule, key, value) + session.add(rule) + await session.commit() + await session.refresh(rule) + return _rule_response(rule) + + +@router.delete( + "/{action_id}/rules/{rule_id}", status_code=status.HTTP_204_NO_CONTENT +) +async def delete_rule( + action_id: int, + rule_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + await _get_user_action(session, action_id, user) + rule = await session.get(ActionRule, rule_id) + if not rule or rule.action_id != action_id: + raise HTTPException(status_code=404, detail="Rule not found") + await session.delete(rule) + await session.commit() diff --git a/packages/server/src/notify_bridge_server/api/action_types.py b/packages/server/src/notify_bridge_server/api/action_types.py new file mode 100644 index 0000000..bb8ed36 --- /dev/null +++ b/packages/server/src/notify_bridge_server/api/action_types.py @@ -0,0 +1,78 @@ +"""Action type discovery API routes.""" + +from fastapi import APIRouter, Depends, HTTPException + +from notify_bridge_core.providers.actions import ( + get_action_type, + get_action_types, + get_all_action_types, +) +from notify_bridge_core.providers.capabilities import get_all_capabilities + +from ..auth.dependencies import get_current_user +from ..database.models import User + +router = APIRouter(prefix="/api/action-types", tags=["action-types"]) + + +@router.get("") +async def list_all_action_types( + user: User = Depends(get_current_user), +): + """All action types grouped by provider.""" + all_types = get_all_action_types() + result: dict[str, list[dict]] = {} + for provider_type, defns in all_types.items(): + result[provider_type] = [ + { + "key": d.key, + "provider_type": d.provider_type, + "display_name": d.display_name, + "description": d.description, + } + for d in defns + ] + # Also include providers with no action types (from capabilities) + all_caps = get_all_capabilities() + for ptype in all_caps: + if ptype not in result: + result[ptype] = [] + return result + + +@router.get("/{provider_type}") +async def list_provider_action_types( + provider_type: str, + user: User = Depends(get_current_user), +): + """Action types for a specific provider.""" + defns = get_action_types(provider_type) + return [ + { + "key": d.key, + "provider_type": d.provider_type, + "display_name": d.display_name, + "description": d.description, + } + for d in defns + ] + + +@router.get("/{provider_type}/{key}/schema") +async def get_action_type_schema( + provider_type: str, + key: str, + user: User = Depends(get_current_user), +): + """Get rule and config JSON schemas for an action type.""" + defn = get_action_type(provider_type, key) + if not defn: + raise HTTPException(status_code=404, detail="Action type not found") + return { + "key": defn.key, + "provider_type": defn.provider_type, + "display_name": defn.display_name, + "description": defn.description, + "rule_schema": defn.rule_schema, + "config_schema": defn.config_schema, + } diff --git a/packages/server/src/notify_bridge_server/api/actions.py b/packages/server/src/notify_bridge_server/api/actions.py new file mode 100644 index 0000000..7395557 --- /dev/null +++ b/packages/server/src/notify_bridge_server/api/actions.py @@ -0,0 +1,291 @@ +"""Action management API routes — CRUD, execute, dry-run, executions.""" + +import logging + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from pydantic import BaseModel +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +from ..auth.dependencies import get_current_user +from ..database.engine import get_session +from ..database.models import ( + Action, + ActionExecution, + ActionRule, + ServiceProvider, + User, +) + +_LOGGER = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/actions", tags=["actions"]) + + +# --------------------------------------------------------------------------- +# Request / response schemas +# --------------------------------------------------------------------------- + + +class ActionCreate(BaseModel): + provider_id: int + name: str + icon: str = "" + action_type: str + config: dict = {} + schedule_type: str = "interval" + schedule_interval: int = 3600 + schedule_cron: str = "" + enabled: bool = False + + +class ActionUpdate(BaseModel): + name: str | None = None + icon: str | None = None + config: dict | None = None + schedule_type: str | None = None + schedule_interval: int | None = None + schedule_cron: str | None = None + enabled: bool | None = None + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _action_response(session: AsyncSession, action: Action) -> dict: + """Build response dict with rules inlined.""" + result = await session.exec( + select(ActionRule) + .where(ActionRule.action_id == action.id) + .order_by(ActionRule.order) + ) + rules = result.all() + return { + "id": action.id, + "user_id": action.user_id, + "provider_id": action.provider_id, + "name": action.name, + "icon": action.icon, + "action_type": action.action_type, + "config": action.config, + "schedule_type": action.schedule_type, + "schedule_interval": action.schedule_interval, + "schedule_cron": action.schedule_cron, + "enabled": action.enabled, + "last_run_at": action.last_run_at.isoformat() if action.last_run_at else None, + "last_run_status": action.last_run_status, + "created_at": action.created_at.isoformat() if action.created_at else None, + "rules": [ + { + "id": r.id, + "action_id": r.action_id, + "name": r.name, + "rule_config": r.rule_config, + "enabled": r.enabled, + "order": r.order, + "created_at": r.created_at.isoformat() if r.created_at else None, + } + for r in rules + ], + } + + +# --------------------------------------------------------------------------- +# CRUD +# --------------------------------------------------------------------------- + + +@router.get("") +async def list_actions( + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + result = await session.exec( + select(Action).where(Action.user_id == user.id) + ) + actions = result.all() + return [await _action_response(session, a) for a in actions] + + +@router.post("", status_code=status.HTTP_201_CREATED) +async def create_action( + body: ActionCreate, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + provider = await session.get(ServiceProvider, body.provider_id) + if not provider or provider.user_id != user.id: + raise HTTPException(status_code=404, detail="Provider not found") + + # Validate action_type against the registry + from notify_bridge_core.providers.actions import get_action_type + if not get_action_type(provider.type, body.action_type): + raise HTTPException( + status_code=400, + detail=f"Invalid action type '{body.action_type}' for provider type '{provider.type}'", + ) + + action = Action(user_id=user.id, **body.model_dump()) + session.add(action) + await session.commit() + await session.refresh(action) + + if action.enabled: + from ..services.scheduler import schedule_action + await schedule_action( + action.id, + action.schedule_type, + action.schedule_interval, + action.schedule_cron, + ) + + return await _action_response(session, action) + + +@router.get("/{action_id}") +async def get_action( + action_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + action = await session.get(Action, action_id) + if not action or action.user_id != user.id: + raise HTTPException(status_code=404, detail="Action not found") + return await _action_response(session, action) + + +@router.put("/{action_id}") +async def update_action( + action_id: int, + body: ActionUpdate, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + action = await session.get(Action, action_id) + if not action or action.user_id != user.id: + raise HTTPException(status_code=404, detail="Action not found") + + updates = body.model_dump(exclude_unset=True) + for key, value in updates.items(): + setattr(action, key, value) + session.add(action) + await session.commit() + await session.refresh(action) + + # Reschedule or unschedule based on enabled state + from ..services.scheduler import schedule_action, unschedule_action + if action.enabled: + await schedule_action( + action.id, + action.schedule_type, + action.schedule_interval, + action.schedule_cron, + ) + else: + await unschedule_action(action.id) + + return await _action_response(session, action) + + +@router.delete("/{action_id}", status_code=status.HTTP_204_NO_CONTENT) +async def delete_action( + action_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + action = await session.get(Action, action_id) + if not action or action.user_id != user.id: + raise HTTPException(status_code=404, detail="Action not found") + + # Unschedule + from ..services.scheduler import unschedule_action + await unschedule_action(action.id) + + # Bulk delete rules and executions + from sqlalchemy import delete + await session.exec(delete(ActionRule).where(ActionRule.action_id == action_id)) + await session.exec(delete(ActionExecution).where(ActionExecution.action_id == action_id)) + + await session.delete(action) + await session.commit() + + +# --------------------------------------------------------------------------- +# Execute / Dry-run +# --------------------------------------------------------------------------- + + +@router.post("/{action_id}/execute") +async def execute_action( + action_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + action = await session.get(Action, action_id) + if not action or action.user_id != user.id: + raise HTTPException(status_code=404, detail="Action not found") + + from ..services.action_runner import run_action + result = await run_action(action_id, trigger="manual") + return result.to_dict() + + +@router.post("/{action_id}/dry-run") +async def dry_run_action( + action_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + action = await session.get(Action, action_id) + if not action or action.user_id != user.id: + raise HTTPException(status_code=404, detail="Action not found") + + from ..services.action_runner import dry_run_action as _dry_run + result = await _dry_run(action_id) + return result.to_dict() + + +# --------------------------------------------------------------------------- +# Execution history +# --------------------------------------------------------------------------- + + +@router.get("/{action_id}/executions") +async def list_executions( + action_id: int, + limit: int = Query(default=20, le=100), + offset: int = Query(default=0, ge=0), + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + action = await session.get(Action, action_id) + if not action or action.user_id != user.id: + raise HTTPException(status_code=404, detail="Action not found") + + result = await session.exec( + select(ActionExecution) + .where(ActionExecution.action_id == action_id) + .order_by(ActionExecution.started_at.desc()) + .offset(offset) + .limit(limit) + ) + executions = result.all() + return [ + { + "id": e.id, + "action_id": e.action_id, + "started_at": e.started_at.isoformat() if e.started_at else None, + "finished_at": e.finished_at.isoformat() if e.finished_at else None, + "status": e.status, + "rules_processed": e.rules_processed, + "rules_succeeded": e.rules_succeeded, + "rules_failed": e.rules_failed, + "total_items_affected": e.total_items_affected, + "summary": e.summary, + "error": e.error, + "trigger": e.trigger, + } + for e in executions + ] diff --git a/packages/server/src/notify_bridge_server/api/providers.py b/packages/server/src/notify_bridge_server/api/providers.py index fabddd2..286d226 100644 --- a/packages/server/src/notify_bridge_server/api/providers.py +++ b/packages/server/src/notify_bridge_server/api/providers.py @@ -193,6 +193,7 @@ async def list_provider_capabilities(): "commands": caps.commands, "supported_filters": caps.supported_filters, "webhook_based": caps.webhook_based, + "action_types": caps.action_types, } return result @@ -351,6 +352,29 @@ async def test_provider( return {"ok": False, "message": f"Unknown provider type: {provider.type}"} +@router.get("/{provider_id}/people") +async def list_people( + provider_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Fetch people from a service provider (Immich only).""" + provider = await _get_user_provider(session, provider_id, user.id) + + if provider.type == "immich": + from notify_bridge_core.providers.immich.client import ImmichClient + async with aiohttp.ClientSession() as http_session: + client = ImmichClient( + http_session, + provider.config.get("url", ""), + provider.config.get("api_key", ""), + ) + people_dict = await client.get_people() + return [{"id": pid, "name": name} for pid, name in people_dict.items()] + + return [] + + @router.get("/{provider_id}/collections") async def list_collections( provider_id: int, diff --git a/packages/server/src/notify_bridge_server/database/models.py b/packages/server/src/notify_bridge_server/database/models.py index ae40422..b4814b9 100644 --- a/packages/server/src/notify_bridge_server/database/models.py +++ b/packages/server/src/notify_bridge_server/database/models.py @@ -470,6 +470,60 @@ class EventLog(SQLModel, table=True): created_at: datetime = Field(default_factory=_utcnow) +class Action(SQLModel, table=True): + """A scheduled action that mutates an external service.""" + + __tablename__ = "action" + + id: int | None = Field(default=None, primary_key=True) + user_id: int = Field(foreign_key="user.id") + provider_id: int = Field(foreign_key="service_provider.id") + name: str + icon: str = Field(default="") + action_type: str # e.g. "auto_organize" + config: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON)) + schedule_type: str = Field(default="interval") # "interval" or "cron" + schedule_interval: int = Field(default=3600) # seconds + schedule_cron: str = Field(default="") + enabled: bool = Field(default=False) # default disabled for safety + last_run_at: datetime | None = Field(default=None) + last_run_status: str = Field(default="") # "success", "partial", "failed", "" + created_at: datetime = Field(default_factory=_utcnow) + + +class ActionRule(SQLModel, table=True): + """One rule within an Action. Executed in order.""" + + __tablename__ = "action_rule" + + id: int | None = Field(default=None, primary_key=True) + action_id: int = Field(foreign_key="action.id", index=True) + name: str = Field(default="") + rule_config: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON)) + enabled: bool = Field(default=True) + order: int = Field(default=0) + created_at: datetime = Field(default_factory=_utcnow) + + +class ActionExecution(SQLModel, table=True): + """Log of an action execution (scheduled, manual, or dry-run).""" + + __tablename__ = "action_execution" + + id: int | None = Field(default=None, primary_key=True) + action_id: int = Field(foreign_key="action.id", index=True) + started_at: datetime = Field(default_factory=_utcnow) + finished_at: datetime | None = Field(default=None) + status: str = Field(default="running") # "running", "success", "partial", "failed" + rules_processed: int = Field(default=0) + rules_succeeded: int = Field(default=0) + rules_failed: int = Field(default=0) + total_items_affected: int = Field(default=0) + summary: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON)) + error: str = Field(default="") + trigger: str = Field(default="scheduled") # "scheduled", "manual", "dry_run" + + class AppSetting(SQLModel, table=True): """Key-value app-level settings (admin-configurable).""" diff --git a/packages/server/src/notify_bridge_server/main.py b/packages/server/src/notify_bridge_server/main.py index e33c150..469b2f7 100644 --- a/packages/server/src/notify_bridge_server/main.py +++ b/packages/server/src/notify_bridge_server/main.py @@ -35,6 +35,9 @@ from .api.app_settings import router as app_settings_router from .api.command_configs import router as command_configs_router from .api.command_trackers import router as command_trackers_router from .api.command_template_configs import router as command_template_configs_router +from .api.actions import router as actions_router +from .api.action_rules import router as action_rules_router +from .api.action_types import router as action_types_router from .commands.webhook import router as webhook_router, set_webhook_secret from .api.webhooks import router as webhooks_router @@ -106,6 +109,9 @@ app.include_router(matrix_bots_router) app.include_router(users_router) app.include_router(status_router) app.include_router(app_settings_router) +app.include_router(action_types_router) +app.include_router(action_rules_router) +app.include_router(actions_router) app.include_router(command_configs_router) app.include_router(command_trackers_router) app.include_router(command_template_configs_router) diff --git a/packages/server/src/notify_bridge_server/services/action_runner.py b/packages/server/src/notify_bridge_server/services/action_runner.py new file mode 100644 index 0000000..97abfb5 --- /dev/null +++ b/packages/server/src/notify_bridge_server/services/action_runner.py @@ -0,0 +1,187 @@ +"""Action runner — orchestrates loading, executing, and logging actions.""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from typing import Any + +import aiohttp +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +from notify_bridge_core.providers.action_executor import ActionResult + +from ..database.engine import get_engine +from ..database.models import ( + Action, + ActionExecution, + ActionRule, + ServiceProvider, +) + +_LOGGER = logging.getLogger(__name__) + + +async def run_action( + action_id: int, *, trigger: str = "scheduled" +) -> ActionResult: + """Load an action from DB, execute it, and save the execution log.""" + engine = get_engine() + + # ------------------------------------------------------------------ + # 1. Load all DB data eagerly (before aiohttp context) + # ------------------------------------------------------------------ + async with AsyncSession(engine) as session: + action = await session.get(Action, action_id) + if not action: + return ActionResult(success=False, error="Action not found") + if not action.enabled and trigger == "scheduled": + return ActionResult(success=False, error="Action is disabled") + + provider = await session.get(ServiceProvider, action.provider_id) + if not provider: + return ActionResult(success=False, error="Provider not found") + + result = await session.exec( + select(ActionRule) + .where(ActionRule.action_id == action_id) + .where(ActionRule.enabled == True) # noqa: E712 + .order_by(ActionRule.order) + ) + rules = result.all() + + if not rules: + return ActionResult(success=True, rules_processed=0) + + # Snapshot data + provider_type = provider.type + provider_config = dict(provider.config) + provider_name = provider.name + action_type = action.action_type + action_config = dict(action.config) if action.config else {} + rule_configs = [ + {**dict(r.rule_config), "name": r.name} for r in rules + ] + + # Create execution record + execution = ActionExecution( + action_id=action_id, + trigger=trigger, + status="running", + ) + session.add(execution) + await session.commit() + await session.refresh(execution) + execution_id = execution.id + + # ------------------------------------------------------------------ + # 2. Execute via provider-specific executor + # ------------------------------------------------------------------ + is_dry_run = trigger == "dry_run" + action_result: ActionResult + + try: + action_result = await _execute_with_provider( + provider_type=provider_type, + provider_config=provider_config, + provider_name=provider_name, + action_type=action_type, + action_config=action_config, + rule_configs=rule_configs, + dry_run=is_dry_run, + ) + except Exception as err: + _LOGGER.error("Action %d execution error: %s", action_id, err) + action_result = ActionResult(success=False, error=str(err)) + + # ------------------------------------------------------------------ + # 3. Save execution results + # ------------------------------------------------------------------ + async with AsyncSession(engine) as session: + execution = await session.get(ActionExecution, execution_id) + if execution: + execution.finished_at = datetime.now(timezone.utc) + if action_result.error is not None and action_result.rules_succeeded == 0: + execution.status = "failed" + elif action_result.rules_failed > 0: + execution.status = "partial" + else: + execution.status = "success" + execution.rules_processed = action_result.rules_processed + execution.rules_succeeded = action_result.rules_succeeded + execution.rules_failed = action_result.rules_failed + execution.total_items_affected = action_result.total_items_affected + execution.summary = action_result.to_dict() + execution.error = action_result.error or "" + session.add(execution) + + # Update action last_run metadata (skip for dry runs) + if not is_dry_run: + action = await session.get(Action, action_id) + if action: + action.last_run_at = datetime.now(timezone.utc) + action.last_run_status = execution.status if execution else "" + session.add(action) + + await session.commit() + + _LOGGER.info( + "Action %d (%s) completed: %d/%d rules succeeded, %d items affected", + action_id, + trigger, + action_result.rules_succeeded, + action_result.rules_processed, + action_result.total_items_affected, + ) + return action_result + + +async def dry_run_action(action_id: int) -> ActionResult: + """Execute a dry-run of an action (no mutations).""" + return await run_action(action_id, trigger="dry_run") + + +async def _execute_with_provider( + *, + provider_type: str, + provider_config: dict[str, Any], + provider_name: str, + action_type: str, + action_config: dict[str, Any], + rule_configs: list[dict[str, Any]], + dry_run: bool, +) -> ActionResult: + """Instantiate the appropriate executor and run.""" + if provider_type == "immich": + from notify_bridge_core.providers.immich.action_executor import ( + ImmichActionExecutor, + ) + from notify_bridge_core.providers.immich.client import ImmichClient + + async with aiohttp.ClientSession() as http_session: + client = ImmichClient( + http_session, + provider_config.get("url", ""), + provider_config.get("api_key", ""), + ) + external_domain = provider_config.get("external_domain") + if external_domain: + client.external_domain = external_domain + + # Verify connectivity + if not await client.ping(): + return ActionResult( + success=False, + error=f"Cannot connect to Immich server ({provider_name})", + ) + + executor = ImmichActionExecutor(client) + if dry_run: + return await executor.dry_run(action_type, rule_configs, action_config) + return await executor.execute(action_type, rule_configs, action_config) + + return ActionResult( + success=False, + error=f"No action executor for provider type: {provider_type}", + ) diff --git a/packages/server/src/notify_bridge_server/services/scheduler.py b/packages/server/src/notify_bridge_server/services/scheduler.py index e206fa6..33937cd 100644 --- a/packages/server/src/notify_bridge_server/services/scheduler.py +++ b/packages/server/src/notify_bridge_server/services/scheduler.py @@ -1,4 +1,4 @@ -"""APScheduler-based polling scheduler for trackers.""" +"""APScheduler-based polling scheduler for trackers and actions.""" from __future__ import annotations @@ -25,6 +25,7 @@ async def start_scheduler() -> None: _LOGGER.info("Scheduler started") await _load_tracker_jobs() + await _load_action_jobs() # Start Telegram bot polling for bots with active command listeners from .telegram_poller import start_command_listener_polling @@ -156,3 +157,123 @@ async def _poll_tracker(tracker_id: int) -> None: await check_tracker(tracker_id) except Exception as e: _LOGGER.error("Error polling tracker %d: %s", tracker_id, e) + + +# --------------------------------------------------------------------------- +# Action scheduling +# --------------------------------------------------------------------------- + + +async def _load_action_jobs() -> None: + """Load enabled actions and schedule execution jobs.""" + from sqlmodel import select + from sqlmodel.ext.asyncio.session import AsyncSession + from ..database.engine import get_engine + from ..database.models import Action + + engine = get_engine() + scheduler = get_scheduler() + + async with AsyncSession(engine) as session: + result = await session.exec( + select(Action).where(Action.enabled == True) # noqa: E712 + ) + actions = result.all() + + for action in actions: + job_id = f"action_{action.id}" + if scheduler.get_job(job_id): + continue + + if action.schedule_type == "cron" and action.schedule_cron: + try: + from apscheduler.triggers.cron import CronTrigger + trigger = CronTrigger.from_crontab(action.schedule_cron) + scheduler.add_job( + _run_action, + trigger, + id=job_id, + args=[action.id], + replace_existing=True, + ) + _LOGGER.info( + "Scheduled action %d (%s) with cron: %s", + action.id, action.name, action.schedule_cron, + ) + continue + except Exception as e: + _LOGGER.error( + "Invalid cron for action %d (%s): %s — falling back to interval", + action.id, action.name, e, + ) + + scheduler.add_job( + _run_action, + "interval", + seconds=action.schedule_interval, + id=job_id, + args=[action.id], + replace_existing=True, + ) + _LOGGER.info( + "Scheduled action %d (%s) every %ds", + action.id, action.name, action.schedule_interval, + ) + + +async def schedule_action( + action_id: int, + schedule_type: str = "interval", + interval: int = 3600, + cron_expression: str = "", +) -> None: + """Add or update a scheduler job for an action.""" + scheduler = get_scheduler() + job_id = f"action_{action_id}" + + if scheduler.get_job(job_id): + scheduler.remove_job(job_id) + + if schedule_type == "cron" and cron_expression: + try: + from apscheduler.triggers.cron import CronTrigger + trigger = CronTrigger.from_crontab(cron_expression) + scheduler.add_job( + _run_action, + trigger, + id=job_id, + args=[action_id], + replace_existing=True, + ) + _LOGGER.info("Scheduled action %d with cron: %s", action_id, cron_expression) + return + except Exception as e: + _LOGGER.error("Invalid cron for action %d: %s — using interval", action_id, e) + + scheduler.add_job( + _run_action, + "interval", + seconds=interval, + id=job_id, + args=[action_id], + replace_existing=True, + ) + _LOGGER.info("Scheduled action %d every %ds", action_id, interval) + + +async def unschedule_action(action_id: int) -> None: + """Remove a scheduler job for an action.""" + scheduler = get_scheduler() + job_id = f"action_{action_id}" + if scheduler.get_job(job_id): + scheduler.remove_job(job_id) + _LOGGER.info("Unscheduled action %d", action_id) + + +async def _run_action(action_id: int) -> None: + """Run an action (called by APScheduler).""" + from .action_runner import run_action + try: + await run_action(action_id, trigger="scheduled") + except Exception as e: + _LOGGER.error("Error running action %d: %s", action_id, e)