"""Notification dispatch service with full Jinja2 template rendering.""" from __future__ import annotations import logging from datetime import datetime from typing import Any import aiohttp import jinja2 from jinja2.sandbox import SandboxedEnvironment from immich_watcher_core.telegram.client import TelegramClient from ..database.models import NotificationTarget, TemplateConfig from ..webhook.client import WebhookClient _LOGGER = logging.getLogger(__name__) _env = SandboxedEnvironment(autoescape=False) # Default template (Jinja2 syntax) when no config is assigned DEFAULT_TEMPLATE = ( '{{ added_count }} new item(s) added to album "{{ album_name }}".' '{% if people %}\nPeople: {{ people | join(", ") }}{% endif %}' ) def _render(template_str: str, ctx: dict[str, Any]) -> str: """Render a Jinja2 template string with context. Falls back on error.""" try: return _env.from_string(template_str).render(**ctx) except jinja2.TemplateError as e: _LOGGER.error("Template render error: %s", e) return template_str def build_full_context( event_data: dict[str, Any], template_config: TemplateConfig | None = None, ) -> dict[str, Any]: """Build template context by passing raw data directly to Jinja2. The templates use {% for %}, {% if %} etc. to handle formatting, so no pre-rendering of sub-templates is needed. """ ctx = dict(event_data) # Ensure lists are actual lists (not strings) if isinstance(ctx.get("people"), str): ctx["people"] = [ctx["people"]] if ctx["people"] else [] # Video warning added_assets = ctx.get("added_assets", []) has_videos = any(a.get("type") == "VIDEO" for a in added_assets) if added_assets else False ctx["video_warning"] = (template_config.video_warning if template_config and has_videos else "") return ctx async def send_notification( target: NotificationTarget, event_data: dict[str, Any], template_config: TemplateConfig | None = None, use_ai_caption: bool = False, ) -> dict[str, Any]: """Send a notification to a target using event data.""" message = None # Try AI caption first if enabled if use_ai_caption: from ..ai.service import generate_caption, is_ai_enabled if is_ai_enabled(): message = await generate_caption(event_data) # Render with template engine if message is None: ctx = build_full_context(event_data, template_config) template_body = DEFAULT_TEMPLATE if template_config: change_type = event_data.get("change_type", "") slot_map = { "assets_added": "message_assets_added", "assets_removed": "message_assets_removed", "album_renamed": "message_album_renamed", "album_deleted": "message_album_deleted", } slot = slot_map.get(change_type, "message_assets_added") template_body = getattr(template_config, slot, DEFAULT_TEMPLATE) or DEFAULT_TEMPLATE message = _render(template_body, ctx) if target.type == "telegram": return await _send_telegram(target, message, event_data) elif target.type == "webhook": return await _send_webhook(target, message, event_data) return {"success": False, "error": f"Unknown target type: {target.type}"} async def send_test_notification(target: NotificationTarget) -> dict[str, Any]: """Send a test notification to verify target configuration.""" test_data = { "album_name": "Test Album", "added_count": 1, "removed_count": 0, "change_type": "assets_added", "people": [], "added_assets": [], } if target.type == "telegram": return await _send_telegram( target, "Test notification from Immich Watcher", test_data ) elif target.type == "webhook": return await _send_webhook( target, "Test notification from Immich Watcher", test_data ) return {"success": False, "error": f"Unknown target type: {target.type}"} async def _send_telegram( target: NotificationTarget, message: str, event_data: dict[str, Any] ) -> dict[str, Any]: """Send notification via Telegram.""" config = target.config bot_token = config.get("bot_token") chat_id = config.get("chat_id") if not bot_token or not chat_id: return {"success": False, "error": "Missing bot_token or chat_id in target config"} async with aiohttp.ClientSession() as session: client = TelegramClient(session, bot_token) assets = [] for asset in event_data.get("added_assets", []): url = asset.get("download_url") or asset.get("url") if url: asset_type = "video" if asset.get("type") == "VIDEO" else "photo" assets.append({"url": url, "type": asset_type}) return await client.send_notification( chat_id=str(chat_id), caption=message, assets=assets if assets else None, ) async def _send_webhook( target: NotificationTarget, message: str, event_data: dict[str, Any] ) -> dict[str, Any]: """Send notification via webhook.""" config = target.config url = config.get("url") headers = config.get("headers", {}) if not url: return {"success": False, "error": "Missing url in target config"} payload = {"message": message, "event": event_data} async with aiohttp.ClientSession() as session: client = WebhookClient(session, url, headers) return await client.send(payload)