Phase 10: Telegram bot commands + Phase 11: Snackbar notifications
All checks were successful
Validate / Hassfest (push) Successful in 3s

Phase 10 — Telegram Bot Commands:
- Add commands_config JSON field to TelegramBot model (enabled cmds,
  default count, response mode, rate limits, locale)
- Create command handler with 14 commands: /status, /albums, /events,
  /summary, /latest, /memory, /random, /search, /find, /person,
  /place, /favorites, /people, /help
- Add search_smart, search_metadata, search_by_person, get_random,
  download_asset, get_asset_thumbnail to ImmichClient
- Auto-register commands with Telegram setMyCommands API (EN+RU)
- Rate limiting per chat per command category
- Media mode: download thumbnails and send as photos to Telegram
- Webhook handler routes /commands before falling through to AI chat
- Frontend: expandable Commands section per bot with checkboxes,
  count/mode/locale settings, rate limit inputs, sync button

Phase 11 — Snackbar Notifications:
- Create snackbar store (snackbar.svelte.ts) with $state rune
- Create Snackbar component with fly/fade transitions, typed colors
- Mount globally in +layout.svelte
- Replace all alert() calls with typed snackbar notifications
- Add success snacks to all CRUD operations across all pages
- 4 types: success (3s), error (5s), info (3s), warning (4s)
- Max 3 visible, auto-dismiss, manual dismiss via X button

Both: Add ~30 i18n keys (EN+RU) for commands UI and snack messages.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-19 21:39:05 +03:00
parent ffce3ee337
commit e6ff0a423a
20 changed files with 1384 additions and 70 deletions

View File

@@ -0,0 +1,637 @@
"""Telegram bot command handler — implements all /commands."""
from __future__ import annotations
import logging
import time
from datetime import datetime, timezone
from typing import Any
import aiohttp
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from immich_watcher_core.immich_client import ImmichClient
from immich_watcher_core.telegram.media import TELEGRAM_API_BASE_URL
from ..database.models import (
AlbumTracker,
EventLog,
ImmichServer,
NotificationTarget,
TelegramBot,
)
_LOGGER = logging.getLogger(__name__)
# Command descriptions for Telegram menu (EN / RU)
COMMAND_DESCRIPTIONS: dict[str, dict[str, str]] = {
"status": {"en": "Show tracker status", "ru": "Показать статус трекеров"},
"albums": {"en": "List tracked albums", "ru": "Список отслеживаемых альбомов"},
"events": {"en": "Show recent events", "ru": "Показать последние события"},
"summary": {"en": "Send album summary now", "ru": "Отправить сводку альбомов"},
"latest": {"en": "Show latest photos", "ru": "Показать последние фото"},
"memory": {"en": "On This Day memories", "ru": "Воспоминания за этот день"},
"random": {"en": "Send random photo", "ru": "Отправить случайное фото"},
"search": {"en": "Smart search (AI)", "ru": "Умный поиск (AI)"},
"find": {"en": "Search by filename", "ru": "Поиск по имени файла"},
"person": {"en": "Find photos of person", "ru": "Найти фото человека"},
"place": {"en": "Find photos by location", "ru": "Найти фото по месту"},
"favorites": {"en": "Show favorites", "ru": "Показать избранное"},
"people": {"en": "List detected people", "ru": "Список людей"},
"help": {"en": "Show available commands", "ru": "Показать доступные команды"},
}
# Rate limit state: { (bot_id, chat_id, command_category): last_used_timestamp }
_rate_limits: dict[tuple[int, str, str], float] = {}
# Map commands to rate limit categories
_RATE_CATEGORY: dict[str, str] = {
"search": "search", "find": "search", "person": "search",
"place": "search", "favorites": "search", "people": "search",
}
def _get_rate_category(cmd: str) -> str:
return _RATE_CATEGORY.get(cmd, "default")
def _check_rate_limit(bot_id: int, chat_id: str, cmd: str, limits: dict[str, int]) -> int | None:
"""Check rate limit. Returns seconds to wait, or None if OK."""
category = _get_rate_category(cmd)
cooldown = limits.get(category, limits.get("default", 10))
if cooldown <= 0:
return None
key = (bot_id, chat_id, category)
now = time.time()
last = _rate_limits.get(key, 0)
if now - last < cooldown:
return int(cooldown - (now - last)) + 1
_rate_limits[key] = now
return None
def parse_command(text: str) -> tuple[str, str, int | None]:
"""Parse a command message into (command, args, count).
Examples:
"/search sunset" -> ("search", "sunset", None)
"/latest Family 5" -> ("latest", "Family", 5)
"/events 10" -> ("events", "", 10)
"""
text = text.strip()
if not text.startswith("/"):
return ("", text, None)
# Strip @botname suffix: /command@botname args
parts = text[1:].split(None, 1)
cmd = parts[0].split("@")[0].lower()
rest = parts[1] if len(parts) > 1 else ""
# Try to extract trailing count
count = None
rest_parts = rest.rsplit(None, 1)
if len(rest_parts) == 2:
try:
count = int(rest_parts[1])
rest = rest_parts[0]
except ValueError:
pass
elif rest_parts and rest_parts[0]:
try:
count = int(rest_parts[0])
rest = ""
except ValueError:
pass
return (cmd, rest.strip(), count)
async def handle_command(
bot: TelegramBot,
chat_id: str,
text: str,
session: AsyncSession,
) -> str | list[dict[str, Any]] | None:
"""Handle a bot command. Returns text response or media list, or None if not a command."""
cmd, args, count_override = parse_command(text)
if not cmd:
return None
config = bot.commands_config or {}
enabled = config.get("enabled", [])
default_count = min(config.get("default_count", 5), 20)
locale = config.get("locale", "en")
rate_limits = config.get("rate_limits", {})
if cmd == "start":
msgs = {
"en": "Hi! I'm your Immich Watcher bot. Use /help to see available commands.",
"ru": "Привет! Я бот Immich Watcher. Используйте /help для списка команд.",
}
return msgs.get(locale, msgs["en"])
if cmd not in enabled and cmd != "start":
return None # Silently ignore disabled commands
# Rate limit check
wait = _check_rate_limit(bot.id, chat_id, cmd, rate_limits)
if wait is not None:
msgs = {
"en": f"Please wait {wait}s before using this command again.",
"ru": f"Подождите {wait} сек. перед повторным использованием.",
}
return msgs.get(locale, msgs["en"])
count = min(count_override or default_count, 20)
# Dispatch
if cmd == "help":
return _cmd_help(enabled, locale)
if cmd == "status":
return await _cmd_status(bot, session, locale)
if cmd == "albums":
return await _cmd_albums(bot, session, locale)
if cmd == "events":
return await _cmd_events(bot, session, count, locale)
if cmd == "people":
return await _cmd_people(bot, session, locale)
if cmd in ("search", "find", "person", "place", "latest", "random", "favorites", "summary", "memory"):
return await _cmd_immich(bot, cmd, args, count, session, locale)
return None
def _cmd_help(enabled: list[str], locale: str) -> str:
"""Generate /help response from enabled commands."""
lines = []
for cmd in enabled:
desc = COMMAND_DESCRIPTIONS.get(cmd, {})
lines.append(f"/{cmd}{desc.get(locale, desc.get('en', ''))}")
header = {"en": "Available commands:", "ru": "Доступные команды:"}
return header.get(locale, header["en"]) + "\n" + "\n".join(lines)
async def _cmd_status(bot: TelegramBot, session: AsyncSession, locale: str) -> str:
"""Show tracker status."""
# Find trackers via targets linked to this bot
trackers, _ = await _get_bot_trackers(bot, session)
active = sum(1 for t in trackers if t.enabled)
total = len(trackers)
total_albums = sum(len(t.album_ids) for t in trackers)
result = await session.exec(
select(EventLog).order_by(EventLog.created_at.desc()).limit(1)
)
last_event = result.first()
last_str = last_event.created_at.strftime("%Y-%m-%d %H:%M") if last_event else "-"
if locale == "ru":
return (
f"📊 Статус\n"
f"Трекеры: {active}/{total} активных\n"
f"Альбомы: {total_albums}\n"
f"Последнее событие: {last_str}"
)
return (
f"📊 Status\n"
f"Trackers: {active}/{total} active\n"
f"Albums: {total_albums}\n"
f"Last event: {last_str}"
)
async def _cmd_albums(bot: TelegramBot, session: AsyncSession, locale: str) -> str:
"""List tracked albums with asset counts."""
trackers, servers_map = await _get_bot_trackers(bot, session)
if not trackers:
return "No tracked albums." if locale == "en" else "Нет отслеживаемых альбомов."
lines = []
async with aiohttp.ClientSession() as http:
for tracker in trackers:
server = servers_map.get(tracker.server_id)
if not server:
continue
client = ImmichClient(http, server.url, server.api_key)
for album_id in tracker.album_ids:
try:
album = await client.get_album(album_id)
if album:
lines.append(f"{album.name} ({album.asset_count} assets)")
except Exception:
lines.append(f"{album_id[:8]}... (error)")
header = "📚 Tracked albums:" if locale == "en" else "📚 Отслеживаемые альбомы:"
return header + "\n" + "\n".join(lines) if lines else header + "\n (none)"
async def _cmd_events(bot: TelegramBot, session: AsyncSession, count: int, locale: str) -> str:
"""Show recent events."""
trackers, _ = await _get_bot_trackers(bot, session)
tracker_ids = [t.id for t in trackers]
if not tracker_ids:
return "No events." if locale == "en" else "Нет событий."
result = await session.exec(
select(EventLog)
.where(EventLog.tracker_id.in_(tracker_ids))
.order_by(EventLog.created_at.desc())
.limit(count)
)
events = result.all()
if not events:
return "No events yet." if locale == "en" else "Пока нет событий."
header = f"📋 Last {len(events)} events:" if locale == "en" else f"📋 Последние {len(events)} событий:"
lines = []
for e in events:
ts = e.created_at.strftime("%m/%d %H:%M")
lines.append(f" {ts}{e.event_type}: {e.album_name}")
return header + "\n" + "\n".join(lines)
async def _cmd_people(bot: TelegramBot, session: AsyncSession, locale: str) -> str:
"""List people detected across tracked albums."""
_, servers_map = await _get_bot_trackers(bot, session)
all_people: dict[str, str] = {}
async with aiohttp.ClientSession() as http:
for server in servers_map.values():
client = ImmichClient(http, server.url, server.api_key)
people = await client.get_people()
all_people.update(people)
if not all_people:
return "No people detected." if locale == "en" else "Люди не обнаружены."
names = sorted(all_people.values())
header = f"👥 {len(names)} people:" if locale == "en" else f"👥 {len(names)} людей:"
return header + "\n" + ", ".join(names)
async def _cmd_immich(
bot: TelegramBot,
cmd: str,
args: str,
count: int,
session: AsyncSession,
locale: str,
) -> str | list[dict[str, Any]]:
"""Handle commands that need Immich API access and may return media."""
trackers, servers_map = await _get_bot_trackers(bot, session)
if not trackers:
return "No trackers configured." if locale == "en" else "Трекеры не настроены."
# Collect all tracked album IDs
all_album_ids: list[str] = []
for t in trackers:
all_album_ids.extend(t.album_ids)
# Pick the first server (most commands need one)
first_tracker = trackers[0]
server = servers_map.get(first_tracker.server_id)
if not server:
return "Server not found." if locale == "en" else "Сервер не найден."
config = bot.commands_config or {}
response_mode = config.get("response_mode", "media")
async with aiohttp.ClientSession() as http:
client = ImmichClient(http, server.url, server.api_key)
await client.get_server_config()
if cmd == "search":
if not args:
return "Usage: /search <query>" if locale == "en" else "Использование: /search <запрос>"
assets = await client.search_smart(args, album_ids=all_album_ids, limit=count)
return _format_assets(assets, cmd, args, locale, response_mode, client, bot.token)
if cmd == "find":
if not args:
return "Usage: /find <text>" if locale == "en" else "Использование: /find <текст>"
assets = await client.search_metadata(args, album_ids=all_album_ids, limit=count)
return _format_assets(assets, cmd, args, locale, response_mode, client, bot.token)
if cmd == "person":
if not args:
return "Usage: /person <name>" if locale == "en" else "Использование: /person <имя>"
people = await client.get_people()
# Find matching person by name (case-insensitive)
person_id = None
for pid, pname in people.items():
if args.lower() in pname.lower():
person_id = pid
break
if not person_id:
return f"Person '{args}' not found." if locale == "en" else f"Человек '{args}' не найден."
assets = await client.search_by_person(person_id, limit=count)
return _format_assets(assets, cmd, args, locale, response_mode, client, bot.token)
if cmd == "place":
if not args:
return "Usage: /place <location>" if locale == "en" else "Использование: /place <место>"
# Use smart search scoped to location context
assets = await client.search_smart(
f"photos taken in {args}", album_ids=all_album_ids, limit=count
)
return _format_assets(assets, cmd, args, locale, response_mode, client, bot.token)
if cmd == "favorites":
# Get assets from tracked albums and filter favorites
fav_assets: list[dict[str, Any]] = []
for album_id in all_album_ids[:10]:
try:
album = await client.get_album(album_id)
if album:
for asset in album.assets[:50]:
if asset.is_favorite and len(fav_assets) < count:
fav_assets.append({
"id": asset.id,
"originalFileName": asset.filename,
"type": asset.type,
})
except Exception:
pass
if len(fav_assets) >= count:
break
return _format_assets(fav_assets, cmd, "", locale, response_mode, client, bot.token)
if cmd == "latest":
# Get latest assets from tracked albums
latest_assets: list[dict[str, Any]] = []
for album_id in all_album_ids[:10]:
try:
album = await client.get_album(album_id)
if album and album.assets:
for asset in album.assets[:count]:
latest_assets.append({
"id": asset.id,
"originalFileName": asset.filename,
"type": asset.type,
"createdAt": asset.created_at,
})
except Exception:
pass
# Sort by date descending, take top N
latest_assets.sort(key=lambda a: a.get("createdAt", ""), reverse=True)
latest_assets = latest_assets[:count]
return _format_assets(latest_assets, cmd, "", locale, response_mode, client, bot.token)
if cmd == "random":
# Get random assets scoped to tracked albums
random_assets: list[dict[str, Any]] = []
import random as rng
for album_id in all_album_ids[:10]:
try:
album = await client.get_album(album_id)
if album and album.assets:
sampled = rng.sample(album.assets, min(count, len(album.assets)))
for asset in sampled:
random_assets.append({
"id": asset.id,
"originalFileName": asset.filename,
"type": asset.type,
})
except Exception:
pass
rng.shuffle(random_assets)
random_assets = random_assets[:count]
return _format_assets(random_assets, cmd, "", locale, response_mode, client, bot.token)
if cmd == "summary":
lines = []
for album_id in all_album_ids:
try:
album = await client.get_album(album_id)
if album:
lines.append(f"{album.name}: {album.asset_count} assets")
except Exception:
pass
header = f"📋 Album summary ({len(lines)}):" if locale == "en" else f"📋 Сводка альбомов ({len(lines)}):"
return header + "\n" + "\n".join(lines) if lines else header
if cmd == "memory":
today = datetime.now(timezone.utc)
month_day = (today.month, today.day)
memory_assets: list[dict[str, Any]] = []
for album_id in all_album_ids[:10]:
try:
album = await client.get_album(album_id)
if album:
for asset in album.assets:
try:
dt = datetime.fromisoformat(asset.created_at.replace("Z", "+00:00"))
if (dt.month, dt.day) == month_day and dt.year != today.year:
memory_assets.append({
"id": asset.id,
"originalFileName": asset.filename,
"type": asset.type,
"createdAt": asset.created_at,
"year": dt.year,
})
except (ValueError, AttributeError):
pass
except Exception:
pass
memory_assets = memory_assets[:count]
if not memory_assets:
return "No memories for today." if locale == "en" else "Нет воспоминаний за сегодня."
return _format_assets(memory_assets, cmd, "", locale, response_mode, client, bot.token)
return "Unknown command." if locale == "en" else "Неизвестная команда."
def _format_assets(
assets: list[dict[str, Any]],
cmd: str,
query: str,
locale: str,
response_mode: str,
client: ImmichClient,
bot_token: str,
) -> str | list[dict[str, Any]]:
"""Format asset results as text or media payload."""
if not assets:
msgs = {"en": "No results found.", "ru": "Ничего не найдено."}
return msgs.get(locale, msgs["en"])
if response_mode == "media":
# Return media list for the webhook handler to send as photos
media_items = []
for asset in assets:
asset_id = asset.get("id", "")
filename = asset.get("originalFileName", "")
year = asset.get("year", "")
caption = filename
if year:
caption = f"{filename} ({year})"
media_items.append({
"type": "photo",
"asset_id": asset_id,
"caption": caption,
"thumbnail_url": f"{client.url}/api/assets/{asset_id}/thumbnail?size=preview",
"api_key": client.api_key,
})
return media_items
# Text mode
header_map = {
"search": {"en": f"🔍 Results for \"{query}\":", "ru": f"🔍 Результаты для \"{query}\":"},
"find": {"en": f"📄 Files matching \"{query}\":", "ru": f"📄 Файлы по запросу \"{query}\":"},
"person": {"en": f"👤 Photos of {query}:", "ru": f"👤 Фото {query}:"},
"place": {"en": f"📍 Photos from {query}:", "ru": f"📍 Фото из {query}:"},
"favorites": {"en": "⭐ Favorites:", "ru": "⭐ Избранное:"},
"latest": {"en": "📸 Latest:", "ru": "📸 Последние:"},
"random": {"en": "🎲 Random:", "ru": "🎲 Случайные:"},
"memory": {"en": "📅 On this day:", "ru": "📅 В этот день:"},
}
header = header_map.get(cmd, {}).get(locale, f"Results ({len(assets)}):")
lines = []
for a in assets:
name = a.get("originalFileName", a.get("id", "?")[:8])
year = a.get("year", "")
if year:
lines.append(f"{name} ({year})")
else:
lines.append(f"{name}")
return header + "\n" + "\n".join(lines)
async def _get_bot_trackers(
bot: TelegramBot, session: AsyncSession
) -> tuple[list[AlbumTracker], dict[int, ImmichServer]]:
"""Get trackers and servers associated with a bot via its targets."""
# Find targets that use this bot's token
result = await session.exec(
select(NotificationTarget).where(NotificationTarget.type == "telegram")
)
targets = result.all()
bot_target_ids = set()
for target in targets:
if target.config.get("bot_token") == bot.token:
bot_target_ids.add(target.id)
if not bot_target_ids:
return [], {}
# Find trackers that include any of these target IDs
result = await session.exec(select(AlbumTracker))
all_trackers = result.all()
trackers = []
server_ids = set()
for tracker in all_trackers:
if any(tid in bot_target_ids for tid in (tracker.target_ids or [])):
trackers.append(tracker)
server_ids.add(tracker.server_id)
# Load servers
servers_map: dict[int, ImmichServer] = {}
for sid in server_ids:
server = await session.get(ImmichServer, sid)
if server:
servers_map[sid] = server
return trackers, servers_map
async def send_media_group(
bot_token: str,
chat_id: str,
media_items: list[dict[str, Any]],
) -> None:
"""Send media items as photos to a Telegram chat."""
async with aiohttp.ClientSession() as http:
for item in media_items:
asset_id = item.get("asset_id", "")
caption = item.get("caption", "")
thumb_url = item.get("thumbnail_url", "")
api_key = item.get("api_key", "")
# Download thumbnail from Immich
try:
async with http.get(
thumb_url,
headers={"x-api-key": api_key},
) as resp:
if resp.status != 200:
_LOGGER.warning("Failed to download thumbnail for %s", asset_id)
continue
photo_bytes = await resp.read()
except aiohttp.ClientError:
continue
# Send to Telegram
url = f"{TELEGRAM_API_BASE_URL}{bot_token}/sendPhoto"
data = aiohttp.FormData()
data.add_field("chat_id", chat_id)
data.add_field("photo", photo_bytes, filename=f"{asset_id}.jpg", content_type="image/jpeg")
if caption:
data.add_field("caption", caption)
try:
async with http.post(url, data=data) as resp:
if resp.status != 200:
result = await resp.json()
_LOGGER.warning("Failed to send photo: %s", result.get("description"))
except aiohttp.ClientError as err:
_LOGGER.warning("Failed to send photo: %s", err)
async def register_commands_with_telegram(bot: TelegramBot) -> bool:
"""Register enabled commands with Telegram BotFather API."""
config = bot.commands_config or {}
enabled = config.get("enabled", [])
locale = config.get("locale", "en")
commands = []
for cmd in enabled:
desc = COMMAND_DESCRIPTIONS.get(cmd, {})
commands.append({
"command": cmd,
"description": desc.get(locale, desc.get("en", cmd)),
})
async with aiohttp.ClientSession() as http:
# Set commands for the bot's locale
url = f"{TELEGRAM_API_BASE_URL}{bot.token}/setMyCommands"
payload: dict[str, Any] = {"commands": commands}
try:
async with http.post(url, json=payload) as resp:
result = await resp.json()
if result.get("ok"):
_LOGGER.info("Registered %d commands for bot @%s", len(commands), bot.bot_username)
# Also register for the other locale
other_locale = "ru" if locale == "en" else "en"
other_commands = []
for cmd in enabled:
desc = COMMAND_DESCRIPTIONS.get(cmd, {})
other_commands.append({
"command": cmd,
"description": desc.get(other_locale, desc.get("en", cmd)),
})
other_payload: dict[str, Any] = {
"commands": other_commands,
"language_code": other_locale,
}
async with http.post(url, json=other_payload) as resp2:
r2 = await resp2.json()
if not r2.get("ok"):
_LOGGER.warning("Failed to register %s commands: %s", other_locale, r2.get("description"))
return True
_LOGGER.warning("Failed to register commands: %s", result.get("description"))
return False
except aiohttp.ClientError as err:
_LOGGER.error("Failed to register commands: %s", err)
return False

View File

@@ -1,4 +1,4 @@
"""Telegram webhook handler for AI bot interactions."""
"""Telegram webhook handler for AI bot interactions and commands."""
from __future__ import annotations
@@ -15,7 +15,8 @@ from immich_watcher_core.telegram.media import TELEGRAM_API_BASE_URL
from ..auth.dependencies import get_current_user
from ..config import settings
from ..database.engine import get_session
from ..database.models import AlbumTracker, EventLog, ImmichServer, NotificationTarget, User
from ..database.models import AlbumTracker, EventLog, ImmichServer, NotificationTarget, TelegramBot, User
from .commands import handle_command, send_media_group
from .service import chat, is_ai_enabled, summarize_albums
_LOGGER = logging.getLogger(__name__)
@@ -42,15 +43,16 @@ async def telegram_webhook(
if x_telegram_bot_api_secret_token != settings.telegram_webhook_secret:
raise HTTPException(status_code=403, detail="Invalid webhook secret")
# Validate bot_token against stored targets
result = await session.exec(select(NotificationTarget).where(NotificationTarget.type == "telegram"))
valid_token = False
for target in result.all():
if target.config.get("bot_token") == bot_token:
valid_token = True
break
if not valid_token:
raise HTTPException(status_code=403, detail="Unknown bot token")
# Find bot by token
bot_result = await session.exec(select(TelegramBot).where(TelegramBot.token == bot_token))
bot = bot_result.first()
if not bot:
# Fallback: check targets for legacy setups
result = await session.exec(select(NotificationTarget).where(NotificationTarget.type == "telegram"))
valid_token = any(t.config.get("bot_token") == bot_token for t in result.all())
if not valid_token:
raise HTTPException(status_code=403, detail="Unknown bot token")
try:
update = await request.json()
@@ -68,13 +70,22 @@ async def telegram_webhook(
if not chat_id or not text:
return {"ok": True, "skipped": "empty"}
if text.startswith("/start"):
await _send_reply(
bot_token, chat_id,
"Hi! I'm your Immich Watcher AI assistant. Ask me about your photo albums, "
"recent changes, or say 'summary' to get an overview."
)
return {"ok": True}
# Try bot commands first (if bot is registered)
if bot and text.startswith("/"):
cmd_response = await handle_command(bot, chat_id, text, session)
if cmd_response is not None:
if isinstance(cmd_response, list):
# Media response — send photos
await send_media_group(bot_token, chat_id, cmd_response)
else:
await _send_reply(bot_token, chat_id, cmd_response)
return {"ok": True}
# Fall through to AI chat if enabled
if not is_ai_enabled():
if text.startswith("/"):
return {"ok": True, "skipped": "command_not_handled"}
return {"ok": True, "skipped": "ai_disabled"}
# Build context from database
context = await _build_context(session, chat_id)

View File

@@ -9,6 +9,7 @@ import aiohttp
from immich_watcher_core.telegram.media import TELEGRAM_API_BASE_URL
from ..ai.commands import register_commands_with_telegram
from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import TelegramBot, User
@@ -23,6 +24,7 @@ class BotCreate(BaseModel):
class BotUpdate(BaseModel):
name: str | None = None
commands_config: dict | None = None
@router.get("")
@@ -69,10 +71,12 @@ async def update_bot(
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
"""Update a bot's display name."""
"""Update a bot's display name and/or commands config."""
bot = await _get_user_bot(session, bot_id, user.id)
if body.name is not None:
bot.name = body.name
if body.commands_config is not None:
bot.commands_config = body.commands_config
session.add(bot)
await session.commit()
await session.refresh(bot)
@@ -121,6 +125,20 @@ async def list_bot_chats(
return chats
@router.post("/{bot_id}/sync-commands")
async def sync_commands(
bot_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
"""Register bot commands with Telegram BotFather API."""
bot = await _get_user_bot(session, bot_id, user.id)
success = await register_commands_with_telegram(bot)
if not success:
raise HTTPException(status_code=500, detail="Failed to register commands with Telegram")
return {"success": True}
# --- Helpers ---
async def _get_me(token: str) -> dict | None:
@@ -171,6 +189,7 @@ def _bot_response(b: TelegramBot) -> dict:
"bot_username": b.bot_username,
"bot_id": b.bot_id,
"token_preview": f"{b.token[:8]}...{b.token[-4:]}" if len(b.token) > 12 else "***",
"commands_config": b.commands_config,
"created_at": b.created_at.isoformat(),
}

View File

@@ -49,6 +49,20 @@ class TelegramBot(SQLModel, table=True):
icon: str = Field(default="") # MDI icon name
bot_username: str = Field(default="") # @username from getMe
bot_id: int = Field(default=0) # Numeric bot ID from getMe
commands_config: dict[str, Any] = Field(
default_factory=lambda: {
"enabled": [
"status", "albums", "events", "summary", "latest",
"memory", "random", "search", "find", "person",
"place", "favorites", "people", "help",
],
"default_count": 5,
"response_mode": "media",
"rate_limits": {"search": 30, "find": 30, "default": 10},
"locale": "en",
},
sa_column=Column(JSON),
)
created_at: datetime = Field(default_factory=_utcnow)