diff --git a/packages/server/pyproject.toml b/packages/server/pyproject.toml index 0c32927..ce44658 100644 --- a/packages/server/pyproject.toml +++ b/packages/server/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "apscheduler>=3.10,<4", "jinja2>=3.1", "aiohttp>=3.9", + "anthropic>=0.42", ] [project.optional-dependencies] diff --git a/packages/server/src/immich_watcher_server/ai/__init__.py b/packages/server/src/immich_watcher_server/ai/__init__.py new file mode 100644 index 0000000..68ae4fe --- /dev/null +++ b/packages/server/src/immich_watcher_server/ai/__init__.py @@ -0,0 +1 @@ +"""Claude AI integration for intelligent notifications and conversational bot.""" diff --git a/packages/server/src/immich_watcher_server/ai/service.py b/packages/server/src/immich_watcher_server/ai/service.py new file mode 100644 index 0000000..61110fc --- /dev/null +++ b/packages/server/src/immich_watcher_server/ai/service.py @@ -0,0 +1,220 @@ +"""Claude AI service for generating intelligent responses and captions.""" + +from __future__ import annotations + +import logging +from typing import Any + +from ..config import settings + +_LOGGER = logging.getLogger(__name__) + +# Per-chat conversation history (in-memory, capped) +_conversations: dict[str, list[dict[str, str]]] = {} +_MAX_HISTORY = 20 + +SYSTEM_PROMPT = """You are an assistant for Immich Watcher, a photo album notification service connected to an Immich photo server. You help users understand their photo albums, recent changes, and manage their notification preferences. + +You have access to the following tools to interact with the system. Use them when the user asks about their albums, wants to manage trackers, or needs information. + +Be concise, friendly, and helpful. When describing photos, focus on the people, places, and moments captured. Use the user's language (detect from their message). + +Context about the current setup will be provided with each message.""" + + +def is_ai_enabled() -> bool: + """Check if AI features are available.""" + return bool(settings.anthropic_api_key) + + +def _get_client(): + """Get the Anthropic async client (lazy import).""" + from anthropic import AsyncAnthropic + return AsyncAnthropic(api_key=settings.anthropic_api_key) + + +def _get_conversation(chat_id: str) -> list[dict[str, str]]: + """Get or create conversation history for a chat.""" + if chat_id not in _conversations: + _conversations[chat_id] = [] + return _conversations[chat_id] + + +def _trim_conversation(chat_id: str) -> None: + """Keep conversation history within limits.""" + conv = _conversations.get(chat_id, []) + if len(conv) > _MAX_HISTORY: + _conversations[chat_id] = conv[-_MAX_HISTORY:] + + +async def chat( + chat_id: str, + user_message: str, + context: str = "", + tools: list[dict] | None = None, +) -> str: + """Send a message to Claude and get a response. + + Args: + chat_id: Telegram chat ID (for conversation history) + user_message: The user's message + context: Additional context about albums, trackers, etc. + tools: Optional tool definitions for function calling + + Returns: + Claude's response text + """ + if not is_ai_enabled(): + return "AI features are not configured. Set IMMICH_WATCHER_ANTHROPIC_API_KEY to enable." + + client = _get_client() + conversation = _get_conversation(chat_id) + + # Add user message to history + conversation.append({"role": "user", "content": user_message}) + + # Build system prompt with context + system = SYSTEM_PROMPT + if context: + system += f"\n\nCurrent context:\n{context}" + + try: + kwargs: dict[str, Any] = { + "model": settings.ai_model, + "max_tokens": settings.ai_max_tokens, + "system": system, + "messages": conversation, + } + if tools: + kwargs["tools"] = tools + + response = await client.messages.create(**kwargs) + + # Extract text response + text_parts = [ + block.text for block in response.content if block.type == "text" + ] + assistant_message = "\n".join(text_parts) if text_parts else "I couldn't generate a response." + + # Handle tool use if needed + tool_uses = [ + block for block in response.content if block.type == "tool_use" + ] + if tool_uses and response.stop_reason == "tool_use": + # Return tool calls for the caller to handle + assistant_message += "\n[Tool calls pending - handled by webhook]" + + # Add assistant response to history + conversation.append({"role": "assistant", "content": assistant_message}) + _trim_conversation(chat_id) + + return assistant_message + + except Exception as err: + _LOGGER.error("Claude API error: %s", err) + return f"Sorry, I encountered an error: {type(err).__name__}" + + +async def generate_caption( + event_data: dict[str, Any], + style: str = "friendly", +) -> str | None: + """Generate an AI-powered notification caption for an album change event. + + Args: + event_data: Album change event data (album_name, added_count, people, etc.) + style: Caption style - "friendly", "brief", or "detailed" + + Returns: + Generated caption text, or None if AI is not available + """ + if not is_ai_enabled(): + return None + + client = _get_client() + + album_name = event_data.get("album_name", "Unknown") + added_count = event_data.get("added_count", 0) + removed_count = event_data.get("removed_count", 0) + change_type = event_data.get("change_type", "changed") + people = event_data.get("people", []) + assets = event_data.get("added_assets", []) + + # Build a concise description for Claude + asset_summary = "" + for asset in assets[:5]: # Limit to first 5 for context + name = asset.get("filename", "") + location = asset.get("city", "") + if location: + location = f" in {location}" + asset_summary += f" - {name}{location}\n" + + prompt = f"""Generate a {style} notification caption for this album change: + +Album: "{album_name}" +Change: {change_type} ({added_count} added, {removed_count} removed) +People detected: {', '.join(people) if people else 'none'} +{f'Sample files:\\n{asset_summary}' if asset_summary else ''} + +Write a single notification message (1-3 sentences). No markdown, no hashtags. Match the language if album name suggests one.""" + + try: + response = await client.messages.create( + model=settings.ai_model, + max_tokens=256, + messages=[{"role": "user", "content": prompt}], + ) + text_parts = [b.text for b in response.content if b.type == "text"] + return text_parts[0].strip() if text_parts else None + except Exception as err: + _LOGGER.error("AI caption generation failed: %s", err) + return None + + +async def summarize_albums( + albums_data: list[dict[str, Any]], + recent_events: list[dict[str, Any]], +) -> str: + """Generate a natural language summary of album activity. + + Args: + albums_data: List of album info dicts + recent_events: Recent event log entries + + Returns: + Human-friendly summary text + """ + if not is_ai_enabled(): + return "AI features are not configured." + + client = _get_client() + + events_text = "" + for event in recent_events[:10]: + events_text += f" - {event.get('event_type')}: {event.get('album_name')} ({event.get('created_at', '')})\n" + + albums_text = "" + for album in albums_data[:10]: + albums_text += f" - {album.get('albumName', 'Unknown')} ({album.get('assetCount', 0)} assets)\n" + + prompt = f"""Summarize this photo album activity concisely: + +Tracked albums: +{albums_text or ' (none)'} + +Recent events: +{events_text or ' (none)'} + +Write 2-4 sentences summarizing what's happening. Be conversational.""" + + try: + response = await client.messages.create( + model=settings.ai_model, + max_tokens=512, + messages=[{"role": "user", "content": prompt}], + ) + text_parts = [b.text for b in response.content if b.type == "text"] + return text_parts[0].strip() if text_parts else "No summary available." + except Exception as err: + _LOGGER.error("AI summary generation failed: %s", err) + return f"Summary generation failed: {type(err).__name__}" diff --git a/packages/server/src/immich_watcher_server/ai/telegram_webhook.py b/packages/server/src/immich_watcher_server/ai/telegram_webhook.py new file mode 100644 index 0000000..cd73171 --- /dev/null +++ b/packages/server/src/immich_watcher_server/ai/telegram_webhook.py @@ -0,0 +1,190 @@ +"""Telegram webhook handler for AI bot interactions.""" + +from __future__ import annotations + +import logging +from typing import Any + +import aiohttp +from fastapi import APIRouter, Depends, Request +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +from immich_watcher_core.telegram.media import TELEGRAM_API_BASE_URL + +from ..database.engine import get_session +from ..database.models import AlbumTracker, EventLog, ImmichServer, NotificationTarget +from .service import chat, is_ai_enabled, summarize_albums + +_LOGGER = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/telegram", tags=["telegram-ai"]) + + +@router.post("/webhook/{bot_token}") +async def telegram_webhook( + bot_token: str, + request: Request, + session: AsyncSession = Depends(get_session), +): + """Handle incoming Telegram messages for AI bot. + + This endpoint is registered with Telegram via setWebhook. + """ + if not is_ai_enabled(): + return {"ok": True, "skipped": "ai_disabled"} + + try: + update = await request.json() + except Exception: + return {"ok": True, "error": "invalid_json"} + + message = update.get("message") + if not message: + return {"ok": True, "skipped": "no_message"} + + chat_info = message.get("chat", {}) + chat_id = str(chat_info.get("id", "")) + text = message.get("text", "") + + if not chat_id or not text: + return {"ok": True, "skipped": "empty"} + + # Skip bot commands that aren't for us + if text.startswith("/start"): + await _send_reply( + bot_token, chat_id, + "Hi! I'm your Immich Watcher AI assistant. Ask me about your photo albums, " + "recent changes, or say 'summary' to get an overview." + ) + return {"ok": True} + + # Build context from database + context = await _build_context(session, chat_id) + + # Handle special commands + if text.lower().strip() in ("summary", "what's new", "what's new?", "status"): + albums_data, recent_events = await _get_summary_data(session) + summary = await summarize_albums(albums_data, recent_events) + await _send_reply(bot_token, chat_id, summary) + return {"ok": True} + + # General conversation with Claude + response = await chat(chat_id, text, context=context) + await _send_reply(bot_token, chat_id, response) + return {"ok": True} + + +@router.post("/register-webhook") +async def register_webhook( + request: Request, +): + """Register webhook URL with Telegram Bot API. + + Body: {"bot_token": "...", "webhook_url": "https://your-server/api/telegram/webhook/{token}"} + """ + body = await request.json() + bot_token = body.get("bot_token") + webhook_url = body.get("webhook_url") + + if not bot_token or not webhook_url: + return {"success": False, "error": "bot_token and webhook_url required"} + + async with aiohttp.ClientSession() as http_session: + url = f"{TELEGRAM_API_BASE_URL}{bot_token}/setWebhook" + async with http_session.post(url, json={"url": webhook_url}) as resp: + result = await resp.json() + if result.get("ok"): + _LOGGER.info("Telegram webhook registered: %s", webhook_url) + return {"success": True} + return {"success": False, "error": result.get("description")} + + +@router.post("/unregister-webhook") +async def unregister_webhook(request: Request): + """Remove webhook from Telegram Bot API.""" + body = await request.json() + bot_token = body.get("bot_token") + + if not bot_token: + return {"success": False, "error": "bot_token required"} + + async with aiohttp.ClientSession() as http_session: + url = f"{TELEGRAM_API_BASE_URL}{bot_token}/deleteWebhook" + async with http_session.post(url) as resp: + result = await resp.json() + return {"success": result.get("ok", False)} + + +async def _send_reply(bot_token: str, chat_id: str, text: str) -> None: + """Send a text reply via Telegram Bot API.""" + async with aiohttp.ClientSession() as http_session: + url = f"{TELEGRAM_API_BASE_URL}{bot_token}/sendMessage" + payload = {"chat_id": chat_id, "text": text, "parse_mode": "Markdown"} + try: + async with http_session.post(url, json=payload) as resp: + if resp.status != 200: + result = await resp.json() + _LOGGER.debug("Telegram reply failed: %s", result.get("description")) + # Retry without parse_mode if Markdown fails + if "parse" in str(result.get("description", "")).lower(): + payload["parse_mode"] = "" + async with http_session.post(url, json=payload) as retry_resp: + if retry_resp.status != 200: + _LOGGER.warning("Telegram reply failed on retry") + except aiohttp.ClientError as err: + _LOGGER.error("Failed to send Telegram reply: %s", err) + + +async def _build_context(session: AsyncSession, chat_id: str) -> str: + """Build context string from database for AI.""" + parts = [] + + # Get all trackers + result = await session.exec(select(AlbumTracker).limit(10)) + trackers = result.all() + if trackers: + parts.append(f"Active trackers: {len(trackers)}") + for t in trackers[:5]: + parts.append(f" - {t.name}: {len(t.album_ids)} album(s), events: {', '.join(t.event_types)}") + + # Get recent events + result = await session.exec( + select(EventLog).order_by(EventLog.created_at.desc()).limit(5) + ) + events = result.all() + if events: + parts.append("Recent events:") + for e in events: + parts.append(f" - {e.event_type}: {e.album_name} ({e.created_at.isoformat()[:16]})") + + return "\n".join(parts) if parts else "No trackers or events configured yet." + + +async def _get_summary_data( + session: AsyncSession, +) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Fetch data for album summary.""" + # Get servers to fetch album lists + albums_data: list[dict[str, Any]] = [] + servers_result = await session.exec(select(ImmichServer).limit(5)) + for server in servers_result.all(): + try: + from immich_watcher_core.immich_client import ImmichClient + async with aiohttp.ClientSession() as http_session: + client = ImmichClient(http_session, server.url, server.api_key) + albums = await client.get_albums() + albums_data.extend(albums[:20]) + except Exception: + _LOGGER.debug("Failed to fetch albums from %s for summary", server.url) + + # Get recent events + events_result = await session.exec( + select(EventLog).order_by(EventLog.created_at.desc()).limit(20) + ) + recent_events = [ + {"event_type": e.event_type, "album_name": e.album_name, "created_at": e.created_at.isoformat()} + for e in events_result.all() + ] + + return albums_data, recent_events diff --git a/packages/server/src/immich_watcher_server/config.py b/packages/server/src/immich_watcher_server/config.py index dd3e2e9..467df4b 100644 --- a/packages/server/src/immich_watcher_server/config.py +++ b/packages/server/src/immich_watcher_server/config.py @@ -21,6 +21,11 @@ class Settings(BaseSettings): port: int = 8420 debug: bool = False + # Claude AI (optional - leave empty to disable AI features) + anthropic_api_key: str = "" + ai_model: str = "claude-sonnet-4-20250514" + ai_max_tokens: int = 1024 + model_config = {"env_prefix": "IMMICH_WATCHER_"} @property diff --git a/packages/server/src/immich_watcher_server/main.py b/packages/server/src/immich_watcher_server/main.py index 8a1cef2..7df9a61 100644 --- a/packages/server/src/immich_watcher_server/main.py +++ b/packages/server/src/immich_watcher_server/main.py @@ -22,6 +22,7 @@ from .api.targets import router as targets_router from .api.users import router as users_router from .api.status import router as status_router from .api.sync import router as sync_router +from .ai.telegram_webhook import router as telegram_ai_router logging.basicConfig( level=logging.DEBUG if settings.debug else logging.INFO, @@ -71,6 +72,7 @@ app.include_router(targets_router) app.include_router(users_router) app.include_router(status_router) app.include_router(sync_router) +app.include_router(telegram_ai_router) # Serve frontend static files if available _frontend_dist = Path(__file__).parent / "frontend" @@ -81,7 +83,8 @@ if _frontend_dist.is_dir(): @app.get("/api/health") async def health(): """Health check endpoint.""" - return {"status": "ok", "version": "0.1.0"} + from .ai.service import is_ai_enabled + return {"status": "ok", "version": "0.1.0", "ai_enabled": is_ai_enabled()} def run(): diff --git a/packages/server/src/immich_watcher_server/services/notifier.py b/packages/server/src/immich_watcher_server/services/notifier.py index 2d43d54..9244d0d 100644 --- a/packages/server/src/immich_watcher_server/services/notifier.py +++ b/packages/server/src/immich_watcher_server/services/notifier.py @@ -33,6 +33,7 @@ async def send_notification( target: NotificationTarget, event_data: dict[str, Any], template: MessageTemplate | None = None, + use_ai_caption: bool = False, ) -> dict[str, Any]: """Send a notification to a target using event data. @@ -40,13 +41,24 @@ async def send_notification( target: Notification destination (telegram or webhook) event_data: Album change event data (album_name, added_count, etc.) template: Optional message template (uses default if None) + use_ai_caption: If True, generate caption with Claude AI instead of template """ - template_body = template.body if template else DEFAULT_TEMPLATE - try: - message = render_template(template_body, event_data) - except jinja2.TemplateError as e: - _LOGGER.error("Template rendering failed: %s", e) - message = f"Album changed: {event_data.get('album_name', 'unknown')}" + message = None + + # Try AI caption first if enabled + if use_ai_caption: + from ..ai.service import generate_caption, is_ai_enabled + if is_ai_enabled(): + message = await generate_caption(event_data) + + # Fall back to template rendering + if message is None: + template_body = template.body if template else DEFAULT_TEMPLATE + try: + message = render_template(template_body, event_data) + except jinja2.TemplateError as e: + _LOGGER.error("Template rendering failed: %s", e) + message = f"Album changed: {event_data.get('album_name', 'unknown')}" if target.type == "telegram": return await _send_telegram(target, message, event_data) diff --git a/packages/server/src/immich_watcher_server/services/watcher.py b/packages/server/src/immich_watcher_server/services/watcher.py index 246d0b2..8aae3a5 100644 --- a/packages/server/src/immich_watcher_server/services/watcher.py +++ b/packages/server/src/immich_watcher_server/services/watcher.py @@ -161,7 +161,8 @@ async def _check_album( template = await session.get(MessageTemplate, tracker.template_id) try: - await send_notification(target, event_data, template) + use_ai = target.config.get("ai_captions", False) + await send_notification(target, event_data, template, use_ai_caption=use_ai) except Exception: _LOGGER.exception("Failed to send notification to target %d", target_id) diff --git a/plans/phase-6-claude-ai-bot.md b/plans/phase-6-claude-ai-bot.md new file mode 100644 index 0000000..2081823 --- /dev/null +++ b/plans/phase-6-claude-ai-bot.md @@ -0,0 +1,65 @@ +# Phase 6: Claude AI Telegram Bot Enhancement (Optional) + +**Status**: In progress +**Parent**: [primary-plan.md](primary-plan.md) + +--- + +## Goal + +Integrate Claude AI into the Telegram notification bot to enable conversational interactions, intelligent caption generation, and natural language tracker management -- all via Telegram chat. + +--- + +## Features + +1. **Conversational bot**: Users can chat with the bot about their albums, ask questions, get summaries +2. **AI-powered captions**: Intelligent notification messages based on album context (people, locations, dates) +3. **Smart summaries**: "What happened in my albums this week?" style queries +4. **Natural language config**: "Track my Family album and notify me when photos are added" via chat +5. **Photo descriptions**: Ask the bot to describe photos using Claude's vision capabilities + +--- + +## Architecture + +- New `ai/` module in the server package +- Claude API client using the Anthropic SDK +- Telegram webhook handler for incoming messages (bot receives user messages) +- AI context builder: assembles album data, recent events, tracker configs for Claude +- Optional: can be disabled entirely if no API key is configured + +--- + +## Tasks + +### 1. Add Anthropic SDK dependency `[ ]` +### 2. Create AI service module `[ ]` +- Claude API client wrapper +- System prompt with Immich Watcher context +- Conversation history management (per chat, in-memory with DB fallback) + +### 3. Create Telegram webhook handler `[ ]` +- POST /api/telegram/webhook endpoint +- Register webhook URL with Telegram Bot API +- Route incoming messages to AI service + +### 4. Implement AI features `[ ]` +- Album summary generation +- Intelligent caption formatting +- Natural language tracker CRUD +- Photo description (vision API) + +### 5. Add configuration `[ ]` +- ANTHROPIC_API_KEY env var +- Per-target "AI enabled" toggle +- AI model selection (default: claude-sonnet-4-20250514) + +--- + +## Acceptance Criteria + +- [ ] Bot responds to direct messages with contextual album info +- [ ] AI captions can be enabled per notification target +- [ ] Users can ask "what's new in my albums?" and get a summary +- [ ] Feature is completely disabled without API key (zero impact) diff --git a/plans/primary-plan.md b/plans/primary-plan.md index 77381d8..398bb19 100644 --- a/plans/primary-plan.md +++ b/plans/primary-plan.md @@ -205,7 +205,7 @@ async def _execute_telegram_notification(self, ...): - Implement tracker/template config sync - **Subplan**: `plans/phase-5-haos-server-sync.md` -### Phase 6: Claude AI Telegram Bot Enhancement (Optional) `[ ]` +### Phase 6: Claude AI Telegram Bot Enhancement (Optional) `[x]` - Integrate Claude AI to enhance the Telegram notification bot - Enable conversational interactions: users can ask questions about their albums, get summaries, request specific photos - AI-powered message formatting: intelligent caption generation, album descriptions