03ec9b3c86
Adds telegram bot command system with 13 commands (search, latest, random, etc.), webhook/polling handlers, rate limiting, app settings page, and various UI/UX improvements across all entity pages. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
616 lines
26 KiB
Python
616 lines
26 KiB
Python
"""Telegram bot command handler — implements all /commands."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import random as rng
|
||
import time
|
||
from datetime import datetime, timezone
|
||
from typing import Any
|
||
|
||
import aiohttp
|
||
from sqlmodel import select
|
||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||
|
||
from notify_bridge_core.notifications.telegram.media import TELEGRAM_API_BASE_URL
|
||
from ..database.engine import get_engine
|
||
from ..services import make_immich_provider
|
||
from ..database.models import (
|
||
EventLog,
|
||
NotificationTarget,
|
||
ServiceProvider,
|
||
TelegramBot,
|
||
Tracker,
|
||
TrackerTarget,
|
||
TrackingConfig,
|
||
)
|
||
from .parser import parse_command
|
||
from .registry import COMMAND_DESCRIPTIONS, get_rate_category
|
||
|
||
_LOGGER = logging.getLogger(__name__)
|
||
|
||
# Rate limit state: { (bot_id, chat_id, category): last_used_timestamp }
|
||
_rate_limits: dict[tuple[int, str, str], float] = {}
|
||
|
||
|
||
def _check_rate_limit(bot_id: int, chat_id: str, cmd: str, limits: dict[str, int]) -> int | None:
|
||
"""Check rate limit. Returns seconds to wait, or None if OK."""
|
||
category = get_rate_category(cmd)
|
||
cooldown = limits.get(category, limits.get("default", 10))
|
||
if cooldown <= 0:
|
||
return None
|
||
key = (bot_id, chat_id, category)
|
||
now = time.time()
|
||
last = _rate_limits.get(key, 0)
|
||
if now - last < cooldown:
|
||
return int(cooldown - (now - last)) + 1
|
||
_rate_limits[key] = now
|
||
return None
|
||
|
||
|
||
async def handle_command(
|
||
bot: TelegramBot,
|
||
chat_id: str,
|
||
text: str,
|
||
) -> str | list[dict[str, Any]] | None:
|
||
"""Handle a bot command. Returns text response, media list, or None."""
|
||
cmd, args, count_override = parse_command(text)
|
||
if not cmd:
|
||
return None
|
||
|
||
config = bot.commands_config or {}
|
||
enabled = config.get("enabled", [])
|
||
default_count = min(config.get("default_count", 5), 20)
|
||
locale = config.get("locale", "en")
|
||
rate_limits = config.get("rate_limits", {})
|
||
|
||
if cmd == "start":
|
||
msgs = {
|
||
"en": "Hi! I'm your Notify Bridge bot. Use /help to see available commands.",
|
||
"ru": "Привет! Я бот Notify Bridge. Используйте /help для списка команд.",
|
||
}
|
||
return msgs.get(locale, msgs["en"])
|
||
|
||
if cmd not in enabled and cmd != "start":
|
||
return None # Silently ignore disabled commands
|
||
|
||
# Rate limit check
|
||
wait = _check_rate_limit(bot.id, chat_id, cmd, rate_limits)
|
||
if wait is not None:
|
||
msgs = {
|
||
"en": f"Please wait {wait}s before using this command again.",
|
||
"ru": f"Подождите {wait} сек. перед повторным использованием.",
|
||
}
|
||
return msgs.get(locale, msgs["en"])
|
||
|
||
count = min(count_override or default_count, 20)
|
||
|
||
# Dispatch
|
||
if cmd == "help":
|
||
return _cmd_help(enabled, locale)
|
||
if cmd == "status":
|
||
return await _cmd_status(bot, locale)
|
||
if cmd == "albums":
|
||
return await _cmd_albums(bot, locale)
|
||
if cmd == "events":
|
||
return await _cmd_events(bot, count, locale)
|
||
if cmd == "people":
|
||
return await _cmd_people(bot, locale)
|
||
if cmd in ("search", "find", "person", "place", "latest", "random",
|
||
"favorites", "summary", "memory"):
|
||
return await _cmd_immich(bot, cmd, args, count, locale)
|
||
|
||
return None
|
||
|
||
|
||
def _cmd_help(enabled: list[str], locale: str) -> str:
|
||
lines = []
|
||
for cmd in enabled:
|
||
desc = COMMAND_DESCRIPTIONS.get(cmd, {})
|
||
lines.append(f"/{cmd} — {desc.get(locale, desc.get('en', ''))}")
|
||
header = {"en": "Available commands:", "ru": "Доступные команды:"}
|
||
return header.get(locale, header["en"]) + "\n" + "\n".join(lines)
|
||
|
||
|
||
async def _get_bot_context(bot: TelegramBot) -> tuple[
|
||
list[Tracker], dict[int, ServiceProvider]
|
||
]:
|
||
"""Get trackers and providers associated with a bot via its targets."""
|
||
engine = get_engine()
|
||
async with AsyncSession(engine) as session:
|
||
# Find targets that use this bot's token
|
||
result = await session.exec(
|
||
select(NotificationTarget).where(
|
||
NotificationTarget.type == "telegram",
|
||
NotificationTarget.user_id == bot.user_id,
|
||
)
|
||
)
|
||
targets = result.all()
|
||
bot_target_ids = {t.id for t in targets if t.config.get("bot_token") == bot.token}
|
||
|
||
if not bot_target_ids:
|
||
return [], {}
|
||
|
||
# Find trackers linked to these targets via TrackerTarget
|
||
tt_result = await session.exec(
|
||
select(TrackerTarget).where(TrackerTarget.target_id.in_(bot_target_ids))
|
||
)
|
||
all_links = tt_result.all()
|
||
tracker_ids = {tt.tracker_id for tt in all_links}
|
||
|
||
if not tracker_ids:
|
||
return [], {}
|
||
|
||
trackers = []
|
||
provider_ids = set()
|
||
for tid in tracker_ids:
|
||
tracker = await session.get(Tracker, tid)
|
||
if tracker:
|
||
trackers.append(tracker)
|
||
provider_ids.add(tracker.provider_id)
|
||
|
||
providers_map: dict[int, ServiceProvider] = {}
|
||
for pid in provider_ids:
|
||
provider = await session.get(ServiceProvider, pid)
|
||
if provider:
|
||
providers_map[pid] = provider
|
||
|
||
return trackers, providers_map
|
||
|
||
|
||
async def _check_native_memory(bot: TelegramBot) -> bool:
|
||
"""Check if any tracker-target linked to this bot uses native memory source."""
|
||
engine = get_engine()
|
||
async with AsyncSession(engine) as session:
|
||
result = await session.exec(
|
||
select(NotificationTarget).where(
|
||
NotificationTarget.type == "telegram",
|
||
NotificationTarget.user_id == bot.user_id,
|
||
)
|
||
)
|
||
targets = result.all()
|
||
bot_target_ids = {t.id for t in targets if t.config.get("bot_token") == bot.token}
|
||
if not bot_target_ids:
|
||
return False
|
||
tt_result = await session.exec(
|
||
select(TrackerTarget).where(TrackerTarget.target_id.in_(bot_target_ids))
|
||
)
|
||
for tt in tt_result.all():
|
||
if tt.tracking_config_id:
|
||
tc = await session.get(TrackingConfig, tt.tracking_config_id)
|
||
if tc and tc.memory_source == "native":
|
||
return True
|
||
return False
|
||
|
||
|
||
async def _cmd_status(bot: TelegramBot, locale: str) -> str:
|
||
trackers, _ = await _get_bot_context(bot)
|
||
active = sum(1 for t in trackers if t.enabled)
|
||
total = len(trackers)
|
||
total_albums = sum(len(t.collection_ids or []) for t in trackers)
|
||
|
||
engine = get_engine()
|
||
async with AsyncSession(engine) as session:
|
||
result = await session.exec(
|
||
select(EventLog).order_by(EventLog.created_at.desc()).limit(1)
|
||
)
|
||
last_event = result.first()
|
||
last_str = last_event.created_at.strftime("%Y-%m-%d %H:%M") if last_event else "-"
|
||
|
||
if locale == "ru":
|
||
return (
|
||
f"📊 Статус\n"
|
||
f"Трекеры: {active}/{total} активных\n"
|
||
f"Альбомы: {total_albums}\n"
|
||
f"Последнее событие: {last_str}"
|
||
)
|
||
return (
|
||
f"📊 Status\n"
|
||
f"Trackers: {active}/{total} active\n"
|
||
f"Albums: {total_albums}\n"
|
||
f"Last event: {last_str}"
|
||
)
|
||
|
||
|
||
async def _cmd_albums(bot: TelegramBot, locale: str) -> str:
|
||
trackers, providers_map = await _get_bot_context(bot)
|
||
if not trackers:
|
||
return "No tracked albums." if locale == "en" else "Нет отслеживаемых альбомов."
|
||
|
||
lines = []
|
||
async with aiohttp.ClientSession() as http:
|
||
for tracker in trackers:
|
||
provider = providers_map.get(tracker.provider_id)
|
||
if not provider or provider.type != "immich":
|
||
continue
|
||
immich = make_immich_provider(http, provider)
|
||
for album_id in (tracker.collection_ids or []):
|
||
try:
|
||
album = await immich.client.get_album(album_id)
|
||
if album:
|
||
lines.append(f" • {album.name} ({album.asset_count} assets)")
|
||
except Exception:
|
||
lines.append(f" • {album_id[:8]}... (error)")
|
||
|
||
header = "📚 Tracked albums:" if locale == "en" else "📚 Отслеживаемые альбомы:"
|
||
return header + "\n" + "\n".join(lines) if lines else header + "\n (none)"
|
||
|
||
|
||
async def _cmd_events(bot: TelegramBot, count: int, locale: str) -> str:
|
||
trackers, _ = await _get_bot_context(bot)
|
||
tracker_ids = [t.id for t in trackers]
|
||
if not tracker_ids:
|
||
return "No events." if locale == "en" else "Нет событий."
|
||
|
||
engine = get_engine()
|
||
async with AsyncSession(engine) as session:
|
||
result = await session.exec(
|
||
select(EventLog)
|
||
.where(EventLog.tracker_id.in_(tracker_ids))
|
||
.order_by(EventLog.created_at.desc())
|
||
.limit(count)
|
||
)
|
||
events = result.all()
|
||
|
||
if not events:
|
||
return "No events yet." if locale == "en" else "Пока нет событий."
|
||
|
||
header = f"📋 Last {len(events)} events:" if locale == "en" else f"📋 Последние {len(events)} событий:"
|
||
lines = []
|
||
for e in events:
|
||
ts = e.created_at.strftime("%m/%d %H:%M")
|
||
lines.append(f" {ts} — {e.event_type}: {e.collection_name}")
|
||
return header + "\n" + "\n".join(lines)
|
||
|
||
|
||
async def _cmd_people(bot: TelegramBot, locale: str) -> str:
|
||
_, providers_map = await _get_bot_context(bot)
|
||
all_people: dict[str, str] = {}
|
||
|
||
async with aiohttp.ClientSession() as http:
|
||
for provider in providers_map.values():
|
||
if provider.type != "immich":
|
||
continue
|
||
immich = make_immich_provider(http, provider)
|
||
people = await immich.client.get_people()
|
||
all_people.update(people)
|
||
|
||
if not all_people:
|
||
return "No people detected." if locale == "en" else "Люди не обнаружены."
|
||
|
||
names = sorted(all_people.values())
|
||
header = f"👥 {len(names)} people:" if locale == "en" else f"👥 {len(names)} людей:"
|
||
return header + "\n" + ", ".join(names)
|
||
|
||
|
||
async def _cmd_immich(
|
||
bot: TelegramBot, cmd: str, args: str, count: int, locale: str,
|
||
) -> str | list[dict[str, Any]]:
|
||
"""Handle commands that need Immich API access and may return media."""
|
||
trackers, providers_map = await _get_bot_context(bot)
|
||
if not trackers:
|
||
return "No trackers configured." if locale == "en" else "Трекеры не настроены."
|
||
|
||
all_album_ids: list[str] = []
|
||
for t in trackers:
|
||
all_album_ids.extend(t.collection_ids or [])
|
||
|
||
first_tracker = trackers[0]
|
||
provider = providers_map.get(first_tracker.provider_id)
|
||
if not provider or provider.type != "immich":
|
||
return "Server not found." if locale == "en" else "Сервер не найден."
|
||
|
||
config = bot.commands_config or {}
|
||
response_mode = config.get("response_mode", "media")
|
||
async with aiohttp.ClientSession() as http:
|
||
immich = make_immich_provider(http, provider)
|
||
client = immich.client
|
||
|
||
if cmd == "search":
|
||
if not args:
|
||
return "Usage: /search <query>" if locale == "en" else "Использование: /search <запрос>"
|
||
assets = await client.search_smart(args, album_ids=all_album_ids, limit=count)
|
||
return _format_assets(assets, cmd, args, locale, response_mode, client)
|
||
|
||
if cmd == "find":
|
||
if not args:
|
||
return "Usage: /find <text>" if locale == "en" else "Использование: /find <текст>"
|
||
assets = await client.search_metadata(args, album_ids=all_album_ids, limit=count)
|
||
return _format_assets(assets, cmd, args, locale, response_mode, client)
|
||
|
||
if cmd == "person":
|
||
if not args:
|
||
return "Usage: /person <name>" if locale == "en" else "Использование: /person <имя>"
|
||
people = await client.get_people()
|
||
person_id = None
|
||
for pid, pname in people.items():
|
||
if args.lower() in pname.lower():
|
||
person_id = pid
|
||
break
|
||
if not person_id:
|
||
return f"Person '{args}' not found." if locale == "en" else f"Человек '{args}' не найден."
|
||
assets = await client.search_by_person(person_id, limit=count)
|
||
return _format_assets(assets, cmd, args, locale, response_mode, client)
|
||
|
||
if cmd == "place":
|
||
if not args:
|
||
return "Usage: /place <location>" if locale == "en" else "Использование: /place <место>"
|
||
assets = await client.search_smart(
|
||
f"photos taken in {args}", album_ids=all_album_ids, limit=count
|
||
)
|
||
return _format_assets(assets, cmd, args, locale, response_mode, client)
|
||
|
||
if cmd == "favorites":
|
||
fav_assets: list[dict[str, Any]] = []
|
||
for album_id in all_album_ids[:10]:
|
||
try:
|
||
album = await client.get_album(album_id)
|
||
if album:
|
||
for aid, asset in list(album.assets.items())[:50]:
|
||
if asset.is_favorite and len(fav_assets) < count:
|
||
fav_assets.append({
|
||
"id": asset.id, "originalFileName": asset.filename,
|
||
"type": asset.type,
|
||
})
|
||
except Exception:
|
||
pass
|
||
if len(fav_assets) >= count:
|
||
break
|
||
return _format_assets(fav_assets, cmd, "", locale, response_mode, client)
|
||
|
||
if cmd == "latest":
|
||
latest_assets: list[dict[str, Any]] = []
|
||
for album_id in all_album_ids[:10]:
|
||
try:
|
||
album = await client.get_album(album_id)
|
||
if album:
|
||
for aid, asset in list(album.assets.items())[:count]:
|
||
latest_assets.append({
|
||
"id": asset.id, "originalFileName": asset.filename,
|
||
"type": asset.type, "createdAt": asset.created_at,
|
||
})
|
||
except Exception:
|
||
pass
|
||
latest_assets.sort(key=lambda a: a.get("createdAt", ""), reverse=True)
|
||
return _format_assets(latest_assets[:count], cmd, "", locale, response_mode, client)
|
||
|
||
if cmd == "random":
|
||
random_assets: list[dict[str, Any]] = []
|
||
for album_id in all_album_ids[:10]:
|
||
try:
|
||
album = await client.get_album(album_id)
|
||
if album:
|
||
asset_list = list(album.assets.values())
|
||
sampled = rng.sample(asset_list, min(count, len(asset_list)))
|
||
for asset in sampled:
|
||
random_assets.append({
|
||
"id": asset.id, "originalFileName": asset.filename,
|
||
"type": asset.type,
|
||
})
|
||
except Exception:
|
||
pass
|
||
rng.shuffle(random_assets)
|
||
return _format_assets(random_assets[:count], cmd, "", locale, response_mode, client)
|
||
|
||
if cmd == "summary":
|
||
lines = []
|
||
for album_id in all_album_ids:
|
||
try:
|
||
album = await client.get_album(album_id)
|
||
if album:
|
||
lines.append(f" • {album.name}: {album.asset_count} assets")
|
||
except Exception:
|
||
pass
|
||
header = f"📋 Album summary ({len(lines)}):" if locale == "en" else f"📋 Сводка альбомов ({len(lines)}):"
|
||
return header + "\n" + "\n".join(lines) if lines else header
|
||
|
||
if cmd == "memory":
|
||
# Check if any linked tracking config uses native memories
|
||
use_native = await _check_native_memory(bot)
|
||
|
||
today = datetime.now(timezone.utc)
|
||
memory_assets: list[dict[str, Any]] = []
|
||
|
||
if use_native:
|
||
# Use Immich native memories API
|
||
memories = await client.get_memories()
|
||
tracked_ids = set(all_album_ids) if all_album_ids else None
|
||
for mem in memories:
|
||
year = mem.get("data", {}).get("year")
|
||
for raw_asset in mem.get("assets", []):
|
||
# Optional album filtering
|
||
if tracked_ids:
|
||
asset_albums = raw_asset.get("albums", [])
|
||
if not any(a.get("id") in tracked_ids for a in asset_albums):
|
||
continue
|
||
memory_assets.append({
|
||
"id": raw_asset.get("id", ""),
|
||
"originalFileName": raw_asset.get("originalFileName", ""),
|
||
"type": raw_asset.get("type", "IMAGE"),
|
||
"createdAt": raw_asset.get("fileCreatedAt", raw_asset.get("createdAt", "")),
|
||
"year": year,
|
||
})
|
||
else:
|
||
# Album-scanning fallback
|
||
month_day = (today.month, today.day)
|
||
for album_id in all_album_ids[:10]:
|
||
try:
|
||
album = await client.get_album(album_id)
|
||
if album:
|
||
for aid, asset in album.assets.items():
|
||
try:
|
||
dt = datetime.fromisoformat(asset.created_at.replace("Z", "+00:00"))
|
||
if (dt.month, dt.day) == month_day and dt.year != today.year:
|
||
memory_assets.append({
|
||
"id": asset.id, "originalFileName": asset.filename,
|
||
"type": asset.type, "createdAt": asset.created_at,
|
||
"year": dt.year,
|
||
})
|
||
except (ValueError, AttributeError):
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
memory_assets = memory_assets[:count]
|
||
if not memory_assets:
|
||
return "No memories for today." if locale == "en" else "Нет воспоминаний за сегодня."
|
||
return _format_assets(memory_assets, cmd, "", locale, response_mode, client)
|
||
|
||
return "Unknown command." if locale == "en" else "Неизвестная команда."
|
||
|
||
|
||
def _format_assets(
|
||
assets: list[dict[str, Any]], cmd: str, query: str,
|
||
locale: str, response_mode: str, client: Any,
|
||
) -> str | list[dict[str, Any]]:
|
||
"""Format asset results as text or media payload."""
|
||
if not assets:
|
||
return {"en": "No results found.", "ru": "Ничего не найдено."}.get(locale, "No results found.")
|
||
|
||
if response_mode == "media":
|
||
media_items = []
|
||
for asset in assets:
|
||
asset_id = asset.get("id", "")
|
||
filename = asset.get("originalFileName", "")
|
||
year = asset.get("year", "")
|
||
caption = f"{filename} ({year})" if year else filename
|
||
media_items.append({
|
||
"type": "photo",
|
||
"asset_id": asset_id,
|
||
"caption": caption,
|
||
"thumbnail_url": f"{client.url}/api/assets/{asset_id}/thumbnail?size=preview",
|
||
"api_key": client.api_key,
|
||
})
|
||
return media_items
|
||
|
||
# Text mode
|
||
header_map = {
|
||
"search": {"en": f'🔍 Results for "{query}":', "ru": f'🔍 Результаты для "{query}":'},
|
||
"find": {"en": f'📄 Files matching "{query}":', "ru": f'📄 Файлы по запросу "{query}":'},
|
||
"person": {"en": f"👤 Photos of {query}:", "ru": f"👤 Фото {query}:"},
|
||
"place": {"en": f"📍 Photos from {query}:", "ru": f"📍 Фото из {query}:"},
|
||
"favorites": {"en": "⭐ Favorites:", "ru": "⭐ Избранное:"},
|
||
"latest": {"en": "📸 Latest:", "ru": "📸 Последние:"},
|
||
"random": {"en": "🎲 Random:", "ru": "🎲 Случайные:"},
|
||
"memory": {"en": "📅 On this day:", "ru": "📅 В этот день:"},
|
||
}
|
||
header = header_map.get(cmd, {}).get(locale, f"Results ({len(assets)}):")
|
||
lines = []
|
||
for a in assets:
|
||
name = a.get("originalFileName", a.get("id", "?")[:8])
|
||
year = a.get("year", "")
|
||
lines.append(f" • {name} ({year})" if year else f" • {name}")
|
||
return header + "\n" + "\n".join(lines)
|
||
|
||
|
||
async def send_media_group(
|
||
bot_token: str, chat_id: str, media_items: list[dict[str, Any]],
|
||
) -> None:
|
||
"""Send media items as a Telegram media group (album).
|
||
|
||
Falls back to individual sendPhoto calls if sendMediaGroup fails.
|
||
Telegram allows max 10 items per media group.
|
||
"""
|
||
if not media_items:
|
||
return
|
||
|
||
async with aiohttp.ClientSession() as http:
|
||
# Download all thumbnails first
|
||
downloaded: list[tuple[bytes, str, str]] = [] # (photo_bytes, asset_id, caption)
|
||
for item in media_items:
|
||
asset_id = item.get("asset_id", "")
|
||
caption = item.get("caption", "")
|
||
thumb_url = item.get("thumbnail_url", "")
|
||
api_key = item.get("api_key", "")
|
||
try:
|
||
async with http.get(thumb_url, headers={"x-api-key": api_key}) as resp:
|
||
if resp.status != 200:
|
||
_LOGGER.warning("Failed to download thumbnail for %s: HTTP %d", asset_id, resp.status)
|
||
continue
|
||
photo_bytes = await resp.read()
|
||
downloaded.append((photo_bytes, asset_id, caption))
|
||
except aiohttp.ClientError:
|
||
continue
|
||
|
||
if not downloaded:
|
||
return
|
||
|
||
# Send in groups of 10 (Telegram limit)
|
||
for i in range(0, len(downloaded), 10):
|
||
chunk = downloaded[i:i + 10]
|
||
|
||
if len(chunk) == 1:
|
||
# Single photo — use sendPhoto
|
||
photo_bytes, asset_id, caption = chunk[0]
|
||
data = aiohttp.FormData()
|
||
data.add_field("chat_id", chat_id)
|
||
data.add_field("photo", photo_bytes, filename=f"{asset_id}.jpg", content_type="image/jpeg")
|
||
if caption:
|
||
data.add_field("caption", caption)
|
||
try:
|
||
async with http.post(f"{TELEGRAM_API_BASE_URL}{bot_token}/sendPhoto", data=data) as resp:
|
||
if resp.status != 200:
|
||
result = await resp.json()
|
||
_LOGGER.warning("Failed to send photo: %s", result.get("description"))
|
||
except aiohttp.ClientError as err:
|
||
_LOGGER.warning("Failed to send photo: %s", err)
|
||
else:
|
||
# Multiple photos — use sendMediaGroup
|
||
import json as _json
|
||
data = aiohttp.FormData()
|
||
data.add_field("chat_id", chat_id)
|
||
media_array = []
|
||
for idx, (photo_bytes, asset_id, caption) in enumerate(chunk):
|
||
attach_key = f"photo_{idx}"
|
||
media_obj: dict[str, Any] = {"type": "photo", "media": f"attach://{attach_key}"}
|
||
if caption:
|
||
media_obj["caption"] = caption
|
||
media_array.append(media_obj)
|
||
data.add_field(attach_key, photo_bytes, filename=f"{asset_id}.jpg", content_type="image/jpeg")
|
||
data.add_field("media", _json.dumps(media_array))
|
||
try:
|
||
async with http.post(f"{TELEGRAM_API_BASE_URL}{bot_token}/sendMediaGroup", data=data) as resp:
|
||
if resp.status != 200:
|
||
result = await resp.json()
|
||
_LOGGER.warning("Failed to send media group: %s", result.get("description"))
|
||
except aiohttp.ClientError as err:
|
||
_LOGGER.warning("Failed to send media group: %s", err)
|
||
|
||
|
||
async def register_commands_with_telegram(bot: TelegramBot) -> bool:
|
||
"""Register enabled commands with Telegram BotFather API."""
|
||
config = bot.commands_config or {}
|
||
enabled = config.get("enabled", [])
|
||
locale = config.get("locale", "en")
|
||
|
||
commands = []
|
||
for cmd in enabled:
|
||
desc = COMMAND_DESCRIPTIONS.get(cmd, {})
|
||
commands.append({
|
||
"command": cmd,
|
||
"description": desc.get(locale, desc.get("en", cmd)),
|
||
})
|
||
|
||
async with aiohttp.ClientSession() as http:
|
||
url = f"{TELEGRAM_API_BASE_URL}{bot.token}/setMyCommands"
|
||
payload: dict[str, Any] = {"commands": commands}
|
||
try:
|
||
async with http.post(url, json=payload) as resp:
|
||
result = await resp.json()
|
||
if result.get("ok"):
|
||
_LOGGER.info("Registered %d commands for bot @%s", len(commands), bot.bot_username)
|
||
# Also register for the other locale
|
||
other_locale = "ru" if locale == "en" else "en"
|
||
other_commands = [
|
||
{"command": c, "description": COMMAND_DESCRIPTIONS.get(c, {}).get(other_locale, c)}
|
||
for c in enabled
|
||
]
|
||
async with http.post(url, json={"commands": other_commands, "language_code": other_locale}) as r2:
|
||
pass
|
||
return True
|
||
_LOGGER.warning("Failed to register commands: %s", result.get("description"))
|
||
return False
|
||
except aiohttp.ClientError as err:
|
||
_LOGGER.error("Failed to register commands: %s", err)
|
||
return False
|