diff --git a/frontend/src/lib/types.ts b/frontend/src/lib/types.ts index 382b9a6..2edf024 100644 --- a/frontend/src/lib/types.ts +++ b/frontend/src/lib/types.ts @@ -106,6 +106,7 @@ export interface NotificationTarget { name: string; icon: string; config: Record; + chat_action?: string | null; chat_name?: string; receiver_count: number; receivers: TargetReceiver[]; diff --git a/frontend/src/routes/targets/+page.svelte b/frontend/src/routes/targets/+page.svelte index 96015f5..f231f2b 100644 --- a/frontend/src/routes/targets/+page.svelte +++ b/frontend/src/routes/targets/+page.svelte @@ -229,7 +229,7 @@ max_media_to_send: c.max_media_to_send ?? 50, max_media_per_group: c.max_media_per_group ?? 10, media_delay: c.media_delay ?? 500, max_asset_size: c.max_asset_size ?? 50, disable_url_preview: c.disable_url_preview ?? false, send_large_photos_as_documents: c.send_large_photos_as_documents ?? false, - ai_captions: c.ai_captions ?? false, chat_action: c.chat_action ?? 'typing', + ai_captions: c.ai_captions ?? false, chat_action: tgt.chat_action ?? c.chat_action ?? 'typing', // discord/slack username: c.username || '', // ntfy @@ -268,7 +268,7 @@ max_media_to_send: form.max_media_to_send, max_media_per_group: form.max_media_per_group, media_delay: form.media_delay, max_asset_size: form.max_asset_size, disable_url_preview: form.disable_url_preview, send_large_photos_as_documents: form.send_large_photos_as_documents, - ai_captions: form.ai_captions, chat_action: form.chat_action || undefined, + ai_captions: form.ai_captions, }; } else if (formType === 'webhook') { config = { ai_captions: form.ai_captions }; @@ -284,10 +284,12 @@ config = { child_target_ids: form.child_target_ids }; } + const body: Record = { name: form.name, icon: form.icon, config }; + if (formType === 'telegram') body.chat_action = form.chat_action || null; if (editing) { - await api(`/targets/${editing}`, { method: 'PUT', body: JSON.stringify({ name: form.name, icon: form.icon, config }) }); + await api(`/targets/${editing}`, { method: 'PUT', body: JSON.stringify(body) }); } else { - await api('/targets', { method: 'POST', body: JSON.stringify({ type: formType, name: form.name, icon: form.icon, config }) }); + await api('/targets', { method: 'POST', body: JSON.stringify({ type: formType, ...body }) }); } showForm = false; editing = null; diff --git a/packages/core/src/notify_bridge_core/notifications/telegram/client.py b/packages/core/src/notify_bridge_core/notifications/telegram/client.py index e8501f9..b8d51ff 100644 --- a/packages/core/src/notify_bridge_core/notifications/telegram/client.py +++ b/packages/core/src/notify_bridge_core/notifications/telegram/client.py @@ -6,12 +6,46 @@ import asyncio import json import logging import mimetypes +import re from dataclasses import dataclass from typing import Any, Callable import aiohttp from aiohttp import FormData +# Telegram 429 / flood-control retry settings. Telegram returns +# ``parameters.retry_after`` for rate limits; we honor it up to a cap so a +# pathological value can't park the dispatcher for minutes. +_TG_429_MAX_ATTEMPTS = 4 +_TG_429_MAX_WAIT_S = 60 +_RETRY_AFTER_RE = re.compile(r"retry after (\d+)", re.IGNORECASE) + + +def _extract_retry_after(result: dict[str, Any]) -> int | None: + """Return the retry_after seconds from a Telegram error response. + + Prefers the structured ``parameters.retry_after`` field; falls back to + parsing the human-readable description (``"Too Many Requests: retry + after N"``) which Telegram has been known to return without the + structured field on some endpoints. + """ + params = result.get("parameters") or {} + ra = params.get("retry_after") + if isinstance(ra, (int, float)) and ra > 0: + return int(ra) + desc = str(result.get("description", "")) + m = _RETRY_AFTER_RE.search(desc) + if m: + try: + return int(m.group(1)) + except ValueError: + return None + return None + + +def _is_rate_limited(status: int, result: dict[str, Any]) -> bool: + return status == 429 or result.get("error_code") == 429 + from .cache import TelegramFileCache from .media import ( TELEGRAM_API_BASE_URL, @@ -193,40 +227,58 @@ class TelegramClient: thumbhash: str | None, ) -> NotificationResult: """Multipart-upload ``data`` to Telegram and cache the returned file_id.""" - form = FormData() - form.add_field("chat_id", chat_id) - form.add_field(kind.form_field, data, filename=filename, content_type=content_type) - form.add_field("parse_mode", parse_mode) - if caption: - form.add_field("caption", caption) - if reply_to_message_id: - form.add_field("reply_parameters", json.dumps({"message_id": reply_to_message_id})) + def _build_form() -> FormData: + f = FormData() + f.add_field("chat_id", chat_id) + f.add_field(kind.form_field, data, filename=filename, content_type=content_type) + f.add_field("parse_mode", parse_mode) + if caption: + f.add_field("caption", caption) + if reply_to_message_id: + f.add_field("reply_parameters", json.dumps({"message_id": reply_to_message_id})) + return f telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/{kind.api_method}" - try: - async with self._session.post(telegram_url, data=form) as response: - result = await response.json() - if response.status == 200 and result.get("ok"): - res = result.get("result", {}) - file_id = kind.file_id_from_result(res) - if file_id and cache and cache_key: - await cache.async_set( - cache_key, file_id, kind.cache_type, - thumbhash=thumbhash, size=len(data), + for attempt in range(1, _TG_429_MAX_ATTEMPTS + 1): + try: + async with self._session.post(telegram_url, data=_build_form()) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + res = result.get("result", {}) + file_id = kind.file_id_from_result(res) + if file_id and cache and cache_key: + await cache.async_set( + cache_key, file_id, kind.cache_type, + thumbhash=thumbhash, size=len(data), + ) + return {"success": True, "message_id": res.get("message_id")} + + if _is_rate_limited(response.status, result) and attempt < _TG_429_MAX_ATTEMPTS: + retry_after = _extract_retry_after(result) or 1 + wait_s = min(retry_after + 1, _TG_429_MAX_WAIT_S) + _LOGGER.warning( + "Telegram %s 429 (retry_after=%ds, attempt %d/%d) bytes=%d — sleeping %ds", + kind.api_method, retry_after, attempt, _TG_429_MAX_ATTEMPTS, + len(data), wait_s, ) - return {"success": True, "message_id": res.get("message_id")} + await asyncio.sleep(wait_s) + continue + + _LOGGER.error( + "Telegram %s failed: status=%s code=%s desc=%r bytes=%d", + kind.api_method, response.status, result.get("error_code"), + result.get("description", "Unknown"), len(data), + ) + return {"success": False, "error": result.get("description", "Unknown Telegram error")} + except aiohttp.ClientError as err: _LOGGER.error( - "Telegram %s failed: status=%s code=%s desc=%r bytes=%d", - kind.api_method, response.status, result.get("error_code"), - result.get("description", "Unknown"), len(data), + "Telegram %s transport error (bytes=%d): %s", + kind.api_method, len(data), err, exc_info=True, ) - return {"success": False, "error": result.get("description", "Unknown Telegram error")} - except aiohttp.ClientError as err: - _LOGGER.error( - "Telegram %s transport error (bytes=%d): %s", - kind.api_method, len(data), err, exc_info=True, - ) - return {"success": False, "error": str(err)} + return {"success": False, "error": str(err)} + # All attempts exhausted via 429 — should be unreachable, but keep + # an explicit error path so we never return None. + return {"success": False, "error": "Telegram rate limit: max retries exhausted"} async def send_notification( self, @@ -299,12 +351,7 @@ class TelegramClient: send_large_photos_as_documents, ) finally: - if typing_task: - typing_task.cancel() - try: - await typing_task - except asyncio.CancelledError: - pass + await self.stop_keepalive(typing_task) async def send_message( self, @@ -368,20 +415,53 @@ class TelegramClient: return False def start_chat_action_keepalive(self, chat_id: str, action: str = "typing") -> asyncio.Task: - """Repeatedly post ``action`` every 4s until the returned task is cancelled. + """Repeatedly post ``action`` every 4s until stopped. Telegram chat actions expire after ~5s, so callers that want the hint to persist through longer work (fetching assets, multi-chunk uploads) - need a keep-alive. Cancel the task in a ``finally`` to stop it. + need a keep-alive. + + The returned task carries an attached ``stop_event`` (``asyncio.Event``). + Stop cleanly via :meth:`stop_keepalive` — setting the event before + cancellation prevents the loop from firing one last ``sendChatAction`` + after the caller's final user-visible message, which would otherwise + leave a phantom indicator hanging for ~5s. """ + stop_event = asyncio.Event() + async def action_loop() -> None: try: - while True: + while not stop_event.is_set(): await self.send_chat_action(chat_id, action) - await asyncio.sleep(4) + try: + await asyncio.wait_for(stop_event.wait(), timeout=4) + except asyncio.TimeoutError: + pass # 4s elapsed, refresh the action except asyncio.CancelledError: pass - return asyncio.create_task(action_loop()) + + task: asyncio.Task = asyncio.create_task(action_loop()) + task.stop_event = stop_event # type: ignore[attr-defined] + return task + + @staticmethod + async def stop_keepalive(task: asyncio.Task | None) -> None: + """Stop a keepalive task started by :meth:`start_chat_action_keepalive`. + + Sets the attached stop event before cancelling so the loop won't + fire another ``sendChatAction`` after the caller's final message + landed at Telegram. + """ + if task is None: + return + stop_event = getattr(task, "stop_event", None) + if stop_event is not None: + stop_event.set() + task.cancel() + try: + await task + except asyncio.CancelledError: + pass async def _send_photo( self, chat_id: str, url: str | None, caption: str | None = None, @@ -526,12 +606,10 @@ class TelegramClient: all_message_ids.append(result.get("message_id")) continue - # Multi-item: download all, build form, send media group - form = FormData() - form.add_field("chat_id", chat_id) - if reply_to_message_id and chunk_idx == 0: - form.add_field("reply_parameters", json.dumps({"message_id": reply_to_message_id})) - + # Multi-item: download all, build form, send media group. + # Attachments are recorded separately so we can rebuild FormData on + # 429 retry — aiohttp.FormData is single-use after a request. + attachments: list[tuple[str, bytes, str, str]] = [] # (name, data, filename, content_type) media_json = [] upload_idx = 0 # Track cache info per media_json entry (in order) so we can map @@ -646,7 +724,7 @@ class TelegramClient: attach_name = f"file{upload_idx}" ct = item.get("content_type") or ("image/jpeg" if media_type == "photo" else "video/mp4") ext = "jpg" if media_type == "photo" else "mp4" - form.add_field(attach_name, data, filename=f"media_{idx}.{ext}", content_type=ct) + attachments.append((attach_name, data, f"media_{idx}.{ext}", ct)) mij = {"type": media_type, "media": f"attach://{attach_name}"} upload_idx += 1 # Record cache key so we can store file_id from response @@ -674,59 +752,86 @@ class TelegramClient: ) continue - form.add_field("media", json.dumps(media_json)) telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendMediaGroup" - try: - async with self._session.post(telegram_url, data=form) as response: - result = await response.json() - if response.status == 200 and result.get("ok"): - result_msgs = result.get("result", []) - all_message_ids.extend(msg.get("message_id") for msg in result_msgs) + def _build_form() -> FormData: + f = FormData() + f.add_field("chat_id", chat_id) + if reply_to_message_id and chunk_idx == 0: + f.add_field("reply_parameters", json.dumps({"message_id": reply_to_message_id})) + for name, payload, filename, ct in attachments: + f.add_field(name, payload, filename=filename, content_type=ct) + f.add_field("media", json.dumps(media_json)) + return f + + chunk_failed_result: dict[str, Any] | None = None + for attempt in range(1, _TG_429_MAX_ATTEMPTS + 1): + try: + async with self._session.post(telegram_url, data=_build_form()) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + result_msgs = result.get("result", []) + all_message_ids.extend(msg.get("message_id") for msg in result_msgs) + + # Cache file_ids from response — map by position + cache_entries: list[tuple[str, str, str, str | None, int | None]] = [] + for i, msg in enumerate(result_msgs): + if i >= len(media_cache_info): + break + info = media_cache_info[i] + if info is None: + continue # was a cache hit, skip + ck, mt, th, sz = info + file_id = None + if msg.get("photo"): + file_id = msg["photo"][-1].get("file_id") + elif msg.get("video"): + file_id = msg["video"].get("file_id") + elif msg.get("document"): + file_id = msg["document"].get("file_id") + if file_id: + cache_entries.append((ck, file_id, mt, th, sz)) + if cache_entries: + # All entries in a chunk share the same cache backend + eff_cache = self._get_cache_for_key(cache_entries[0][0], is_asset_cache_key(cache_entries[0][0])) + if eff_cache: + await eff_cache.async_set_many(cache_entries) + break # chunk succeeded + + if _is_rate_limited(response.status, result) and attempt < _TG_429_MAX_ATTEMPTS: + retry_after = _extract_retry_after(result) or 1 + wait_s = min(retry_after + 1, _TG_429_MAX_WAIT_S) + _LOGGER.warning( + "Telegram sendMediaGroup 429 (retry_after=%ds, attempt %d/%d) chunk=%d/%d items=%d — sleeping %ds", + retry_after, attempt, _TG_429_MAX_ATTEMPTS, + chunk_idx + 1, len(chunks), len(media_json), wait_s, + ) + await asyncio.sleep(wait_s) + continue - # Cache file_ids from response — map by position - cache_entries: list[tuple[str, str, str, str | None, int | None]] = [] - for i, msg in enumerate(result_msgs): - if i >= len(media_cache_info): - break - info = media_cache_info[i] - if info is None: - continue # was a cache hit, skip - ck, mt, th, sz = info - file_id = None - if msg.get("photo"): - file_id = msg["photo"][-1].get("file_id") - elif msg.get("video"): - file_id = msg["video"].get("file_id") - elif msg.get("document"): - file_id = msg["document"].get("file_id") - if file_id: - cache_entries.append((ck, file_id, mt, th, sz)) - if cache_entries: - # All entries in a chunk share the same cache backend - eff_cache = self._get_cache_for_key(cache_entries[0][0], is_asset_cache_key(cache_entries[0][0])) - if eff_cache: - await eff_cache.async_set_many(cache_entries) - else: _LOGGER.error( "Telegram sendMediaGroup failed: status=%s code=%s desc=%r chunk=%d/%d items=%d", response.status, result.get("error_code"), result.get("description", "Unknown"), chunk_idx + 1, len(chunks), len(media_json), ) - return { + chunk_failed_result = { "success": False, "error": result.get("description", "Unknown"), "error_code": result.get("error_code"), "failed_at_chunk": chunk_idx + 1, } - except aiohttp.ClientError as err: - _LOGGER.error( - "Telegram sendMediaGroup transport error on chunk %d/%d (%d items): %s", - chunk_idx + 1, len(chunks), len(media_json), err, - exc_info=True, - ) - return {"success": False, "error": str(err), "failed_at_chunk": chunk_idx + 1} + break + except aiohttp.ClientError as err: + _LOGGER.error( + "Telegram sendMediaGroup transport error on chunk %d/%d (%d items): %s", + chunk_idx + 1, len(chunks), len(media_json), err, + exc_info=True, + ) + return {"success": False, "error": str(err), "failed_at_chunk": chunk_idx + 1} + + if chunk_failed_result is not None: + return chunk_failed_result # Distinguish "posted something" from "posted nothing" so the caller # can surface an ERROR when a command produced a caption reply but no diff --git a/packages/server/src/notify_bridge_server/database/migrations.py b/packages/server/src/notify_bridge_server/database/migrations.py index 285158a..6d4ad22 100644 --- a/packages/server/src/notify_bridge_server/database/migrations.py +++ b/packages/server/src/notify_bridge_server/database/migrations.py @@ -1391,6 +1391,40 @@ async def migrate_performance_indexes(engine: AsyncEngine) -> None: ) +async def migrate_chat_action_to_column(engine: AsyncEngine) -> None: + """Move ``chat_action`` from ``config`` JSON to the dedicated column. + + Earlier versions of the frontend stored ``chat_action`` inside + ``notification_target.config``; the dedicated ``chat_action`` column + was rarely set or held a stale default. The dispatcher's resolver + overrode the config value with the (stale) column, so a user's UI + choice silently had no effect on outgoing chat actions. + + This backfill takes the config value as authoritative (it's what the + UI was writing) and copies it to the column, then strips it from + config so the column becomes the single source of truth. Idempotent: + a second run finds nothing to migrate. + """ + async with engine.begin() as conn: + if not await _has_table(conn, "notification_target"): + return + if not await _has_column(conn, "notification_target", "chat_action"): + return + # Copy config["chat_action"] → column where present. + await conn.execute(text( + "UPDATE notification_target " + "SET chat_action = json_extract(config, '$.chat_action') " + "WHERE json_extract(config, '$.chat_action') IS NOT NULL" + )) + # Strip the legacy key so the column is unambiguous going forward. + await conn.execute(text( + "UPDATE notification_target " + "SET config = json_remove(config, '$.chat_action') " + "WHERE json_extract(config, '$.chat_action') IS NOT NULL" + )) + logger.info("Migrated chat_action from config JSON to column where present") + + # --------------------------------------------------------------------------- # Schema version tracking — lightweight alternative to Alembic while the # hand-rolled idempotent migrations remain the source of truth. Gives diff --git a/packages/server/src/notify_bridge_server/main.py b/packages/server/src/notify_bridge_server/main.py index 94ec3a3..303e411 100644 --- a/packages/server/src/notify_bridge_server/main.py +++ b/packages/server/src/notify_bridge_server/main.py @@ -75,6 +75,7 @@ async def lifespan(app: FastAPI): migrate_notification_slot_locale, migrate_user_token_version, migrate_performance_indexes, + migrate_chat_action_to_column, migrate_schema_version, ) from .database.snapshot import snapshot_and_prune @@ -98,6 +99,7 @@ async def lifespan(app: FastAPI): await migrate_notification_slot_locale(engine) await migrate_user_token_version(engine) await migrate_performance_indexes(engine) + await migrate_chat_action_to_column(engine) await migrate_schema_version(engine) from .database.seeds import seed_all await seed_all() diff --git a/packages/server/src/notify_bridge_server/services/dispatch_helpers.py b/packages/server/src/notify_bridge_server/services/dispatch_helpers.py index 169d7d2..11087ea 100644 --- a/packages/server/src/notify_bridge_server/services/dispatch_helpers.py +++ b/packages/server/src/notify_bridge_server/services/dispatch_helpers.py @@ -326,7 +326,11 @@ async def _resolve_target( receivers.append(build_receiver(target.type, dict(r.config), locale)) target_config = dict(target.config) - # Inject chat_action for Telegram targets + # chat_action lives on the model column — single source of truth. + # Strip any legacy/stale value from config so an old config-stored value + # can't shadow the user's UI choice. When the column is unset, leave the + # key absent so the dispatcher's "typing" fallback applies. + target_config.pop("chat_action", None) if hasattr(target, 'chat_action') and target.chat_action: target_config["chat_action"] = target.chat_action # Inject bot credentials for bot-backed target types diff --git a/packages/server/src/notify_bridge_server/services/telegram_send.py b/packages/server/src/notify_bridge_server/services/telegram_send.py index db24d0a..b80992b 100644 --- a/packages/server/src/notify_bridge_server/services/telegram_send.py +++ b/packages/server/src/notify_bridge_server/services/telegram_send.py @@ -19,7 +19,6 @@ this module just guarantees every caller gets a properly-wired client. from __future__ import annotations -import asyncio import contextlib from typing import Any, AsyncIterator, Callable @@ -144,6 +143,4 @@ async def telegram_chat_action( try: yield finally: - task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await task + await client.stop_keepalive(task)