feat(commands): keep chat-action hint alive during slow command fetches
Slow bot commands (/latest, /random, /favorites, /memory, /search, /find, /person, /place, /summary) spend most of their wall time fetching assets from the service provider, not uploading to Telegram. Telegram chat actions expire after ~5s, so the previous one-shot hint vanished long before media arrived — users saw nothing happening. - TelegramClient.start_chat_action_keepalive: promoted from private helper to public API, posts the action every 4s until cancelled. - telegram_send.telegram_chat_action: async context manager that starts the keep-alive task on enter and cancels + awaits it on exit. A None action makes it a no-op so callers don't branch. - classify_command_chat_action: maps command name to the right Telegram action (upload_photo for media-returning commands, typing for /summary, None for fast DB-only commands like /status /events). - webhook.py + telegram_poller.py: wrap handle_command in the context manager so the hint persists through the whole fetch+upload window in both webhook and long-poll modes.
This commit is contained in:
@@ -229,7 +229,7 @@ class TelegramClient:
|
||||
|
||||
typing_task = None
|
||||
if chat_action:
|
||||
typing_task = self._start_typing_indicator(chat_id, chat_action)
|
||||
typing_task = self.start_chat_action_keepalive(chat_id, chat_action)
|
||||
|
||||
try:
|
||||
if len(assets) == 1 and assets[0].get("type") == "photo":
|
||||
@@ -340,7 +340,13 @@ class TelegramClient:
|
||||
except aiohttp.ClientError:
|
||||
return False
|
||||
|
||||
def _start_typing_indicator(self, chat_id: str, action: str = "typing") -> asyncio.Task:
|
||||
def start_chat_action_keepalive(self, chat_id: str, action: str = "typing") -> asyncio.Task:
|
||||
"""Repeatedly post ``action`` every 4s until the returned task is cancelled.
|
||||
|
||||
Telegram chat actions expire after ~5s, so callers that want the hint
|
||||
to persist through longer work (fetching assets, multi-chunk uploads)
|
||||
need a keep-alive. Cancel the task in a ``finally`` to stop it.
|
||||
"""
|
||||
async def action_loop() -> None:
|
||||
try:
|
||||
while True:
|
||||
|
||||
@@ -41,6 +41,36 @@ _rate_limits: TTLCache = TTLCache(maxsize=10000, ttl=3600)
|
||||
# Maximum responses per command to avoid Telegram rate limits
|
||||
_MAX_RESPONSES_PER_COMMAND = 5
|
||||
|
||||
# Commands that fetch assets from the service provider and usually reply
|
||||
# with media — "uploading photo" is the accurate UX hint while we wait on
|
||||
# the provider API + Telegram upload.
|
||||
_UPLOAD_PHOTO_COMMANDS = frozenset({
|
||||
"latest", "random", "favorites", "memory",
|
||||
"search", "find", "person", "place",
|
||||
})
|
||||
|
||||
# Commands that fetch from the provider but reply with text only.
|
||||
# "typing" is accurate; we still want an indicator because the fetch is slow.
|
||||
_TYPING_COMMANDS = frozenset({"summary"})
|
||||
|
||||
|
||||
def classify_command_chat_action(text: str) -> str | None:
|
||||
"""Return the Telegram chat-action hint to show for this command, or None.
|
||||
|
||||
The classification is by command name alone — good enough for the
|
||||
cases where a chat action is worthwhile (slow provider fetches). Fast
|
||||
DB-only commands (``/status``, ``/albums``, ``/events``, ``/people``)
|
||||
return ``None`` and skip the indicator entirely.
|
||||
"""
|
||||
cmd, _, _ = parse_command(text)
|
||||
if not cmd:
|
||||
return None
|
||||
if cmd in _UPLOAD_PHOTO_COMMANDS:
|
||||
return "upload_photo"
|
||||
if cmd in _TYPING_COMMANDS:
|
||||
return "typing"
|
||||
return None
|
||||
|
||||
|
||||
def _check_rate_limit(bot_id: int, chat_id: str, cmd: str, limits: dict[str, int]) -> int | None:
|
||||
"""Check rate limit. Returns seconds to wait, or None if OK."""
|
||||
|
||||
@@ -15,8 +15,9 @@ from notify_bridge_core.notifications.telegram.client import TelegramClient
|
||||
from ..database.engine import get_session
|
||||
from ..database.models import TelegramBot, TelegramChat
|
||||
from ..services.telegram import save_chat_from_webhook
|
||||
from ..services.telegram_send import telegram_chat_action
|
||||
from .base import CommandResponse
|
||||
from .handler import handle_command, send_media_group, send_reply
|
||||
from .handler import classify_command_chat_action, handle_command, send_media_group, send_reply
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@@ -95,14 +96,17 @@ async def telegram_webhook(
|
||||
return {"ok": True, "skipped": "commands_disabled"}
|
||||
effective_lang = chat_row.language_override or msg_language
|
||||
message_id = message.get("message_id")
|
||||
responses = await handle_command(bot, chat_id, text, language_code=effective_lang)
|
||||
if responses:
|
||||
for resp in responses:
|
||||
if resp.text:
|
||||
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
|
||||
if resp.media:
|
||||
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
|
||||
return {"ok": True}
|
||||
async with telegram_chat_action(
|
||||
bot_token, chat_id, classify_command_chat_action(text),
|
||||
):
|
||||
responses = await handle_command(bot, chat_id, text, language_code=effective_lang)
|
||||
if responses:
|
||||
for resp in responses:
|
||||
if resp.text:
|
||||
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
|
||||
if resp.media:
|
||||
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
|
||||
return {"ok": True}
|
||||
|
||||
return {"ok": True, "skipped": "not_a_command"}
|
||||
|
||||
|
||||
@@ -257,7 +257,13 @@ async def _poll_bot(bot_id: int) -> None:
|
||||
_last_update_id[bot_id] = updates[-1]["update_id"]
|
||||
|
||||
# Process each update
|
||||
from ..commands.handler import handle_command, send_media_group, send_reply
|
||||
from ..commands.handler import (
|
||||
classify_command_chat_action,
|
||||
handle_command,
|
||||
send_media_group,
|
||||
send_reply,
|
||||
)
|
||||
from .telegram_send import telegram_chat_action
|
||||
|
||||
for update in updates:
|
||||
message = update.get("message")
|
||||
@@ -295,13 +301,16 @@ async def _poll_bot(bot_id: int) -> None:
|
||||
continue
|
||||
effective_lang = chat_row.language_override or msg_language
|
||||
message_id = message.get("message_id")
|
||||
responses = await handle_command(bot_obj, chat_id, text, language_code=effective_lang)
|
||||
if responses:
|
||||
for resp in responses:
|
||||
if resp.text:
|
||||
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
|
||||
if resp.media:
|
||||
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
|
||||
async with telegram_chat_action(
|
||||
bot_token, chat_id, classify_command_chat_action(text),
|
||||
):
|
||||
responses = await handle_command(bot_obj, chat_id, text, language_code=effective_lang)
|
||||
if responses:
|
||||
for resp in responses:
|
||||
if resp.text:
|
||||
await send_reply(bot_token, chat_id, resp.text, reply_to_message_id=message_id)
|
||||
if resp.media:
|
||||
await send_media_group(bot_token, chat_id, resp.media, reply_to_message_id=message_id)
|
||||
except Exception:
|
||||
_LOGGER.error("Error handling command from bot %d", bot_id, exc_info=True)
|
||||
|
||||
|
||||
@@ -19,7 +19,9 @@ this module just guarantees every caller gets a properly-wired client.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Callable
|
||||
import asyncio
|
||||
import contextlib
|
||||
from typing import Any, AsyncIterator, Callable
|
||||
|
||||
import aiohttp
|
||||
|
||||
@@ -117,3 +119,31 @@ async def send_telegram_media(
|
||||
send_large_photos_as_documents=send_large_photos_as_documents,
|
||||
chat_action=chat_action,
|
||||
)
|
||||
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def telegram_chat_action(
|
||||
bot_token: str,
|
||||
chat_id: str,
|
||||
action: str | None,
|
||||
) -> AsyncIterator[None]:
|
||||
"""Hold a Telegram chat action (e.g. ``upload_photo``) for the block's duration.
|
||||
|
||||
Used by the command path to show ``typing`` / ``uploading photo`` while
|
||||
the bot fetches assets from the service (Immich, etc.) AND uploads them
|
||||
to Telegram — i.e. for the whole user-visible wait, not just the upload.
|
||||
|
||||
A ``None`` action makes this a no-op so callers don't have to branch.
|
||||
"""
|
||||
if not action:
|
||||
yield
|
||||
return
|
||||
|
||||
client = await get_telegram_client(bot_token)
|
||||
task = client.start_chat_action_keepalive(chat_id, action)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await task
|
||||
|
||||
Reference in New Issue
Block a user