Phase 9: Telegram bot management with chat discovery
Some checks failed
Validate / Hassfest (push) Has been cancelled

New entity + API:
- TelegramBot model (name, token, bot_username, bot_id)
- CRUD at /api/telegram-bots with token validation via getMe
- GET /{id}/chats: discover active chats via getUpdates
- GET /{id}/token: retrieve full token (for target config)

Frontend:
- /telegram-bots page: register bots, view discovered chats
  per bot (expandable), refresh on demand
- Targets page: select from registered bots (dropdown) instead
  of raw token input. Chat selector shows discovered chats
  when bot is selected, falls back to manual input.
  Bot token resolved server-side on save.

Nav icon uses monochrome symbol for consistency.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-19 17:45:58 +03:00
parent 5dee7c55ca
commit cf987cbfb4
9 changed files with 440 additions and 10 deletions

View File

@@ -0,0 +1,181 @@
"""Telegram bot management API routes."""
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
import aiohttp
from immich_watcher_core.telegram.media import TELEGRAM_API_BASE_URL
from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import TelegramBot, User
router = APIRouter(prefix="/api/telegram-bots", tags=["telegram-bots"])
class BotCreate(BaseModel):
name: str
token: str
class BotUpdate(BaseModel):
name: str | None = None
@router.get("")
async def list_bots(
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
"""List all registered Telegram bots."""
result = await session.exec(
select(TelegramBot).where(TelegramBot.user_id == user.id)
)
return [_bot_response(b) for b in result.all()]
@router.post("", status_code=status.HTTP_201_CREATED)
async def create_bot(
body: BotCreate,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
"""Register a new Telegram bot (validates token via getMe)."""
# Validate token by calling getMe
bot_info = await _get_me(body.token)
if not bot_info:
raise HTTPException(status_code=400, detail="Invalid bot token")
bot = TelegramBot(
user_id=user.id,
name=body.name,
token=body.token,
bot_username=bot_info.get("username", ""),
bot_id=bot_info.get("id", 0),
)
session.add(bot)
await session.commit()
await session.refresh(bot)
return _bot_response(bot)
@router.put("/{bot_id}")
async def update_bot(
bot_id: int,
body: BotUpdate,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
"""Update a bot's display name."""
bot = await _get_user_bot(session, bot_id, user.id)
if body.name is not None:
bot.name = body.name
session.add(bot)
await session.commit()
await session.refresh(bot)
return _bot_response(bot)
@router.delete("/{bot_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_bot(
bot_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
"""Delete a registered bot."""
bot = await _get_user_bot(session, bot_id, user.id)
await session.delete(bot)
await session.commit()
@router.get("/{bot_id}/token")
async def get_bot_token(
bot_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
"""Get the full bot token (for internal use by targets)."""
bot = await _get_user_bot(session, bot_id, user.id)
return {"token": bot.token}
@router.get("/{bot_id}/chats")
async def list_bot_chats(
bot_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
"""Discover active chats for a bot via getUpdates.
Returns unique chats the bot has received messages from.
Note: Telegram only keeps updates for 24 hours, so this shows
recently active chats. For groups, the bot must have received
at least one message.
"""
bot = await _get_user_bot(session, bot_id, user.id)
chats = await _discover_chats(bot.token)
return chats
# --- Helpers ---
async def _get_me(token: str) -> dict | None:
"""Call Telegram getMe to validate token and get bot info."""
try:
async with aiohttp.ClientSession() as http:
async with http.get(f"{TELEGRAM_API_BASE_URL}{token}/getMe") as resp:
data = await resp.json()
if data.get("ok"):
return data.get("result", {})
except aiohttp.ClientError:
pass
return None
async def _discover_chats(token: str) -> list[dict]:
"""Discover chats by fetching recent updates from Telegram."""
seen: dict[int, dict] = {}
try:
async with aiohttp.ClientSession() as http:
async with http.get(
f"{TELEGRAM_API_BASE_URL}{token}/getUpdates",
params={"limit": 100, "allowed_updates": '["message"]'},
) as resp:
data = await resp.json()
if not data.get("ok"):
return []
for update in data.get("result", []):
msg = update.get("message", {})
chat = msg.get("chat", {})
chat_id = chat.get("id")
if chat_id and chat_id not in seen:
seen[chat_id] = {
"id": chat_id,
"title": chat.get("title") or chat.get("first_name", "") + (" " + chat.get("last_name", "")).strip(),
"type": chat.get("type", "private"),
"username": chat.get("username", ""),
}
except aiohttp.ClientError:
pass
return list(seen.values())
def _bot_response(b: TelegramBot) -> dict:
return {
"id": b.id,
"name": b.name,
"bot_username": b.bot_username,
"bot_id": b.bot_id,
"token_preview": f"{b.token[:8]}...{b.token[-4:]}" if len(b.token) > 12 else "***",
"created_at": b.created_at.isoformat(),
}
async def _get_user_bot(session: AsyncSession, bot_id: int, user_id: int) -> TelegramBot:
bot = await session.get(TelegramBot, bot_id)
if not bot or bot.user_id != user_id:
raise HTTPException(status_code=404, detail="Bot not found")
return bot

View File

@@ -7,6 +7,7 @@ from .models import (
EventLog,
ImmichServer,
NotificationTarget,
TelegramBot,
TemplateConfig,
TrackingConfig,
User,

View File

@@ -36,6 +36,20 @@ class ImmichServer(SQLModel, table=True):
created_at: datetime = Field(default_factory=_utcnow)
class TelegramBot(SQLModel, table=True):
"""Registered Telegram bot."""
__tablename__ = "telegram_bot"
id: int | None = Field(default=None, primary_key=True)
user_id: int = Field(foreign_key="user.id")
name: str # User-given display name
token: str # Bot API token
bot_username: str = Field(default="") # @username from getMe
bot_id: int = Field(default=0) # Numeric bot ID from getMe
created_at: datetime = Field(default_factory=_utcnow)
class TrackingConfig(SQLModel, table=True):
"""Tracking configuration: what events/assets to react to and scheduled modes."""

View File

@@ -23,6 +23,7 @@ from .api.targets import router as targets_router
from .api.users import router as users_router
from .api.status import router as status_router
from .api.sync import router as sync_router
from .api.telegram_bots import router as telegram_bots_router
from .ai.telegram_webhook import router as telegram_ai_router
logging.basicConfig(
@@ -74,6 +75,7 @@ app.include_router(targets_router)
app.include_router(users_router)
app.include_router(status_router)
app.include_router(sync_router)
app.include_router(telegram_bots_router)
app.include_router(telegram_ai_router)
# Serve frontend static files if available