Files
haos-hacs-immich-album-watcher/packages/server/src/immich_watcher_server/services/notifier.py
alexei.dolgolyov 58b2281dc6
Some checks failed
Validate / Hassfest (push) Has been cancelled
Add standalone FastAPI server backend (Phase 3)
Build a complete standalone web server for Immich album change
notifications, independent of Home Assistant. Uses the shared
core library from Phase 1.

Server features:
- FastAPI with async SQLite (SQLModel + aiosqlite)
- Multi-user auth with JWT (admin/user roles, setup wizard)
- CRUD APIs: Immich servers, album trackers, message templates,
  notification targets (Telegram + webhook), user management
- APScheduler background polling per tracker
- Jinja2 template rendering with live preview
- Album browser proxied from Immich API
- Event logging and dashboard status endpoint
- Docker deployment (single container, SQLite in volume)

39 API routes, 14 integration tests passing.

Also adds Phase 6 (Claude AI Telegram bot enhancement) to the
primary plan as an optional future phase.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 12:56:22 +03:00

129 lines
4.1 KiB
Python

"""Notification dispatch service."""
from __future__ import annotations
import logging
from typing import Any
import aiohttp
import jinja2
from immich_watcher_core.telegram.client import TelegramClient
from ..database.models import MessageTemplate, NotificationTarget
from ..webhook.client import WebhookClient
_LOGGER = logging.getLogger(__name__)
# Default template used when no custom template is configured
DEFAULT_TEMPLATE = (
"{{ added_count }} new item(s) added to album \"{{ album_name }}\"."
"{% if people %}\nPeople: {{ people | join(', ') }}{% endif %}"
)
def render_template(template_body: str, context: dict[str, Any]) -> str:
"""Render a Jinja2 template with the given context."""
env = jinja2.Environment(autoescape=False)
tmpl = env.from_string(template_body)
return tmpl.render(**context)
async def send_notification(
target: NotificationTarget,
event_data: dict[str, Any],
template: MessageTemplate | None = None,
) -> dict[str, Any]:
"""Send a notification to a target using event data.
Args:
target: Notification destination (telegram or webhook)
event_data: Album change event data (album_name, added_count, etc.)
template: Optional message template (uses default if None)
"""
template_body = template.body if template else DEFAULT_TEMPLATE
try:
message = render_template(template_body, event_data)
except jinja2.TemplateError as e:
_LOGGER.error("Template rendering failed: %s", e)
message = f"Album changed: {event_data.get('album_name', 'unknown')}"
if target.type == "telegram":
return await _send_telegram(target, message, event_data)
elif target.type == "webhook":
return await _send_webhook(target, message, event_data)
else:
return {"success": False, "error": f"Unknown target type: {target.type}"}
async def send_test_notification(target: NotificationTarget) -> dict[str, Any]:
"""Send a test notification to verify target configuration."""
test_data = {
"album_name": "Test Album",
"added_count": 1,
"removed_count": 0,
"change_type": "assets_added",
"people": [],
"added_assets": [],
}
if target.type == "telegram":
return await _send_telegram(
target, "Test notification from Immich Watcher", test_data
)
elif target.type == "webhook":
return await _send_webhook(
target, "Test notification from Immich Watcher", test_data
)
return {"success": False, "error": f"Unknown target type: {target.type}"}
async def _send_telegram(
target: NotificationTarget, message: str, event_data: dict[str, Any]
) -> dict[str, Any]:
"""Send notification via Telegram."""
config = target.config
bot_token = config.get("bot_token")
chat_id = config.get("chat_id")
if not bot_token or not chat_id:
return {"success": False, "error": "Missing bot_token or chat_id in target config"}
async with aiohttp.ClientSession() as session:
client = TelegramClient(session, bot_token)
# Build assets list from event data for media sending
assets = []
for asset in event_data.get("added_assets", []):
url = asset.get("download_url") or asset.get("url")
if url:
asset_type = "video" if asset.get("type") == "VIDEO" else "photo"
assets.append({"url": url, "type": asset_type})
return await client.send_notification(
chat_id=str(chat_id),
caption=message,
assets=assets if assets else None,
)
async def _send_webhook(
target: NotificationTarget, message: str, event_data: dict[str, Any]
) -> dict[str, Any]:
"""Send notification via webhook."""
config = target.config
url = config.get("url")
headers = config.get("headers", {})
if not url:
return {"success": False, "error": "Missing url in target config"}
payload = {
"message": message,
"event": event_data,
}
async with aiohttp.ClientSession() as session:
client = WebhookClient(session, url, headers)
return await client.send(payload)