feat(commands): keep chat-action hint alive during slow command fetches

Slow bot commands (/latest, /random, /favorites, /memory, /search,
/find, /person, /place, /summary) spend most of their wall time
fetching assets from the service provider, not uploading to Telegram.
Telegram chat actions expire after ~5s, so the previous one-shot hint
vanished long before media arrived — users saw nothing happening.

- TelegramClient.start_chat_action_keepalive: promoted from private
  helper to public API, posts the action every 4s until cancelled.
- telegram_send.telegram_chat_action: async context manager that
  starts the keep-alive task on enter and cancels + awaits it on
  exit. A None action makes it a no-op so callers don't branch.
- classify_command_chat_action: maps command name to the right
  Telegram action (upload_photo for media-returning commands, typing
  for /summary, None for fast DB-only commands like /status /events).
- webhook.py + telegram_poller.py: wrap handle_command in the context
  manager so the hint persists through the whole fetch+upload window
  in both webhook and long-poll modes.
This commit is contained in:
2026-04-22 18:56:18 +03:00
parent fe38d20b96
commit 69711bbc84
5 changed files with 99 additions and 20 deletions
@@ -229,7 +229,7 @@ class TelegramClient:
typing_task = None
if chat_action:
typing_task = self._start_typing_indicator(chat_id, chat_action)
typing_task = self.start_chat_action_keepalive(chat_id, chat_action)
try:
if len(assets) == 1 and assets[0].get("type") == "photo":
@@ -340,7 +340,13 @@ class TelegramClient:
except aiohttp.ClientError:
return False
def _start_typing_indicator(self, chat_id: str, action: str = "typing") -> asyncio.Task:
def start_chat_action_keepalive(self, chat_id: str, action: str = "typing") -> asyncio.Task:
"""Repeatedly post ``action`` every 4s until the returned task is cancelled.
Telegram chat actions expire after ~5s, so callers that want the hint
to persist through longer work (fetching assets, multi-chunk uploads)
need a keep-alive. Cancel the task in a ``finally`` to stop it.
"""
async def action_loop() -> None:
try:
while True:
@@ -41,6 +41,36 @@ _rate_limits: TTLCache = TTLCache(maxsize=10000, ttl=3600)
# Maximum responses per command to avoid Telegram rate limits
_MAX_RESPONSES_PER_COMMAND = 5
# Commands that fetch assets from the service provider and usually reply
# with media — "uploading photo" is the accurate UX hint while we wait on
# the provider API + Telegram upload.
_UPLOAD_PHOTO_COMMANDS = frozenset({
"latest", "random", "favorites", "memory",
"search", "find", "person", "place",
})
# Commands that fetch from the provider but reply with text only.
# "typing" is accurate; we still want an indicator because the fetch is slow.
_TYPING_COMMANDS = frozenset({"summary"})
def classify_command_chat_action(text: str) -> str | None:
"""Return the Telegram chat-action hint to show for this command, or None.
The classification is by command name alone — good enough for the
cases where a chat action is worthwhile (slow provider fetches). Fast
DB-only commands (``/status``, ``/albums``, ``/events``, ``/people``)
return ``None`` and skip the indicator entirely.
"""
cmd, _, _ = parse_command(text)
if not cmd:
return None
if cmd in _UPLOAD_PHOTO_COMMANDS:
return "upload_photo"
if cmd in _TYPING_COMMANDS:
return "typing"
return None
def _check_rate_limit(bot_id: int, chat_id: str, cmd: str, limits: dict[str, int]) -> int | None:
"""Check rate limit. Returns seconds to wait, or None if OK."""
@@ -15,8 +15,9 @@ from notify_bridge_core.notifications.telegram.client import TelegramClient
from ..database.engine import get_session
from ..database.models import TelegramBot, TelegramChat
from ..services.telegram import save_chat_from_webhook
from ..services.telegram_send import telegram_chat_action
from .base import CommandResponse
from .handler import handle_command, send_media_group, send_reply
from .handler import classify_command_chat_action, handle_command, send_media_group, send_reply
_LOGGER = logging.getLogger(__name__)
@@ -95,14 +96,17 @@ async def telegram_webhook(
return {"ok": True, "skipped": "commands_disabled"}
effective_lang = chat_row.language_override or msg_language
message_id = message.get("message_id")
responses = await handle_command(bot, chat_id, text, language_code=effective_lang)
if responses:
for resp in responses:
if resp.text:
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
if resp.media:
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
return {"ok": True}
async with telegram_chat_action(
bot_token, chat_id, classify_command_chat_action(text),
):
responses = await handle_command(bot, chat_id, text, language_code=effective_lang)
if responses:
for resp in responses:
if resp.text:
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
if resp.media:
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
return {"ok": True}
return {"ok": True, "skipped": "not_a_command"}
@@ -257,7 +257,13 @@ async def _poll_bot(bot_id: int) -> None:
_last_update_id[bot_id] = updates[-1]["update_id"]
# Process each update
from ..commands.handler import handle_command, send_media_group, send_reply
from ..commands.handler import (
classify_command_chat_action,
handle_command,
send_media_group,
send_reply,
)
from .telegram_send import telegram_chat_action
for update in updates:
message = update.get("message")
@@ -295,13 +301,16 @@ async def _poll_bot(bot_id: int) -> None:
continue
effective_lang = chat_row.language_override or msg_language
message_id = message.get("message_id")
responses = await handle_command(bot_obj, chat_id, text, language_code=effective_lang)
if responses:
for resp in responses:
if resp.text:
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
if resp.media:
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
async with telegram_chat_action(
bot_token, chat_id, classify_command_chat_action(text),
):
responses = await handle_command(bot_obj, chat_id, text, language_code=effective_lang)
if responses:
for resp in responses:
if resp.text:
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
if resp.media:
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
except Exception:
_LOGGER.error("Error handling command from bot %d", bot_id, exc_info=True)
@@ -19,7 +19,9 @@ this module just guarantees every caller gets a properly-wired client.
from __future__ import annotations
from typing import Any, Callable
import asyncio
import contextlib
from typing import Any, AsyncIterator, Callable
import aiohttp
@@ -117,3 +119,31 @@ async def send_telegram_media(
send_large_photos_as_documents=send_large_photos_as_documents,
chat_action=chat_action,
)
@contextlib.asynccontextmanager
async def telegram_chat_action(
bot_token: str,
chat_id: str,
action: str | None,
) -> AsyncIterator[None]:
"""Hold a Telegram chat action (e.g. ``upload_photo``) for the block's duration.
Used by the command path to show ``typing`` / ``uploading photo`` while
the bot fetches assets from the service (Immich, etc.) AND uploads them
to Telegram — i.e. for the whole user-visible wait, not just the upload.
A ``None`` action makes this a no-op so callers don't have to branch.
"""
if not action:
yield
return
client = await get_telegram_client(bot_token)
task = client.start_chat_action_keepalive(chat_id, action)
try:
yield
finally:
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task