fix(telegram): respect chat_action UI choice, drop phantom indicator
chat_action was stored in two places — the model column and config JSON — and dispatch_helpers unconditionally overrode the config value with the column. The frontend only ever wrote the JSON path, so the UI choice silently had no effect on outgoing chat actions. Make the column the single source of truth: frontend sends chat_action top-level, dispatch_helpers reads from the column, and a one-time backfill migrates existing config values to the column and strips the legacy key. Also fix a long-standing race where the keepalive's bare sleep(4) + finally cancel could fire one last sendChatAction after the response already arrived, leaving a phantom indicator for ~5s. Replace with a stop event + wait_for so callers can signal stop cleanly via the new stop_keepalive helper.
This commit is contained in:
@@ -106,6 +106,7 @@ export interface NotificationTarget {
|
|||||||
name: string;
|
name: string;
|
||||||
icon: string;
|
icon: string;
|
||||||
config: Record<string, any>;
|
config: Record<string, any>;
|
||||||
|
chat_action?: string | null;
|
||||||
chat_name?: string;
|
chat_name?: string;
|
||||||
receiver_count: number;
|
receiver_count: number;
|
||||||
receivers: TargetReceiver[];
|
receivers: TargetReceiver[];
|
||||||
|
|||||||
@@ -229,7 +229,7 @@
|
|||||||
max_media_to_send: c.max_media_to_send ?? 50, max_media_per_group: c.max_media_per_group ?? 10,
|
max_media_to_send: c.max_media_to_send ?? 50, max_media_per_group: c.max_media_per_group ?? 10,
|
||||||
media_delay: c.media_delay ?? 500, max_asset_size: c.max_asset_size ?? 50,
|
media_delay: c.media_delay ?? 500, max_asset_size: c.max_asset_size ?? 50,
|
||||||
disable_url_preview: c.disable_url_preview ?? false, send_large_photos_as_documents: c.send_large_photos_as_documents ?? false,
|
disable_url_preview: c.disable_url_preview ?? false, send_large_photos_as_documents: c.send_large_photos_as_documents ?? false,
|
||||||
ai_captions: c.ai_captions ?? false, chat_action: c.chat_action ?? 'typing',
|
ai_captions: c.ai_captions ?? false, chat_action: tgt.chat_action ?? c.chat_action ?? 'typing',
|
||||||
// discord/slack
|
// discord/slack
|
||||||
username: c.username || '',
|
username: c.username || '',
|
||||||
// ntfy
|
// ntfy
|
||||||
@@ -268,7 +268,7 @@
|
|||||||
max_media_to_send: form.max_media_to_send, max_media_per_group: form.max_media_per_group,
|
max_media_to_send: form.max_media_to_send, max_media_per_group: form.max_media_per_group,
|
||||||
media_delay: form.media_delay, max_asset_size: form.max_asset_size,
|
media_delay: form.media_delay, max_asset_size: form.max_asset_size,
|
||||||
disable_url_preview: form.disable_url_preview, send_large_photos_as_documents: form.send_large_photos_as_documents,
|
disable_url_preview: form.disable_url_preview, send_large_photos_as_documents: form.send_large_photos_as_documents,
|
||||||
ai_captions: form.ai_captions, chat_action: form.chat_action || undefined,
|
ai_captions: form.ai_captions,
|
||||||
};
|
};
|
||||||
} else if (formType === 'webhook') {
|
} else if (formType === 'webhook') {
|
||||||
config = { ai_captions: form.ai_captions };
|
config = { ai_captions: form.ai_captions };
|
||||||
@@ -284,10 +284,12 @@
|
|||||||
config = { child_target_ids: form.child_target_ids };
|
config = { child_target_ids: form.child_target_ids };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const body: Record<string, any> = { name: form.name, icon: form.icon, config };
|
||||||
|
if (formType === 'telegram') body.chat_action = form.chat_action || null;
|
||||||
if (editing) {
|
if (editing) {
|
||||||
await api(`/targets/${editing}`, { method: 'PUT', body: JSON.stringify({ name: form.name, icon: form.icon, config }) });
|
await api(`/targets/${editing}`, { method: 'PUT', body: JSON.stringify(body) });
|
||||||
} else {
|
} else {
|
||||||
await api('/targets', { method: 'POST', body: JSON.stringify({ type: formType, name: form.name, icon: form.icon, config }) });
|
await api('/targets', { method: 'POST', body: JSON.stringify({ type: formType, ...body }) });
|
||||||
}
|
}
|
||||||
showForm = false;
|
showForm = false;
|
||||||
editing = null;
|
editing = null;
|
||||||
|
|||||||
@@ -6,12 +6,46 @@ import asyncio
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import mimetypes
|
import mimetypes
|
||||||
|
import re
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any, Callable
|
from typing import Any, Callable
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
from aiohttp import FormData
|
from aiohttp import FormData
|
||||||
|
|
||||||
|
# Telegram 429 / flood-control retry settings. Telegram returns
|
||||||
|
# ``parameters.retry_after`` for rate limits; we honor it up to a cap so a
|
||||||
|
# pathological value can't park the dispatcher for minutes.
|
||||||
|
_TG_429_MAX_ATTEMPTS = 4
|
||||||
|
_TG_429_MAX_WAIT_S = 60
|
||||||
|
_RETRY_AFTER_RE = re.compile(r"retry after (\d+)", re.IGNORECASE)
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_retry_after(result: dict[str, Any]) -> int | None:
|
||||||
|
"""Return the retry_after seconds from a Telegram error response.
|
||||||
|
|
||||||
|
Prefers the structured ``parameters.retry_after`` field; falls back to
|
||||||
|
parsing the human-readable description (``"Too Many Requests: retry
|
||||||
|
after N"``) which Telegram has been known to return without the
|
||||||
|
structured field on some endpoints.
|
||||||
|
"""
|
||||||
|
params = result.get("parameters") or {}
|
||||||
|
ra = params.get("retry_after")
|
||||||
|
if isinstance(ra, (int, float)) and ra > 0:
|
||||||
|
return int(ra)
|
||||||
|
desc = str(result.get("description", ""))
|
||||||
|
m = _RETRY_AFTER_RE.search(desc)
|
||||||
|
if m:
|
||||||
|
try:
|
||||||
|
return int(m.group(1))
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _is_rate_limited(status: int, result: dict[str, Any]) -> bool:
|
||||||
|
return status == 429 or result.get("error_code") == 429
|
||||||
|
|
||||||
from .cache import TelegramFileCache
|
from .cache import TelegramFileCache
|
||||||
from .media import (
|
from .media import (
|
||||||
TELEGRAM_API_BASE_URL,
|
TELEGRAM_API_BASE_URL,
|
||||||
@@ -193,18 +227,21 @@ class TelegramClient:
|
|||||||
thumbhash: str | None,
|
thumbhash: str | None,
|
||||||
) -> NotificationResult:
|
) -> NotificationResult:
|
||||||
"""Multipart-upload ``data`` to Telegram and cache the returned file_id."""
|
"""Multipart-upload ``data`` to Telegram and cache the returned file_id."""
|
||||||
form = FormData()
|
def _build_form() -> FormData:
|
||||||
form.add_field("chat_id", chat_id)
|
f = FormData()
|
||||||
form.add_field(kind.form_field, data, filename=filename, content_type=content_type)
|
f.add_field("chat_id", chat_id)
|
||||||
form.add_field("parse_mode", parse_mode)
|
f.add_field(kind.form_field, data, filename=filename, content_type=content_type)
|
||||||
|
f.add_field("parse_mode", parse_mode)
|
||||||
if caption:
|
if caption:
|
||||||
form.add_field("caption", caption)
|
f.add_field("caption", caption)
|
||||||
if reply_to_message_id:
|
if reply_to_message_id:
|
||||||
form.add_field("reply_parameters", json.dumps({"message_id": reply_to_message_id}))
|
f.add_field("reply_parameters", json.dumps({"message_id": reply_to_message_id}))
|
||||||
|
return f
|
||||||
|
|
||||||
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/{kind.api_method}"
|
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/{kind.api_method}"
|
||||||
|
for attempt in range(1, _TG_429_MAX_ATTEMPTS + 1):
|
||||||
try:
|
try:
|
||||||
async with self._session.post(telegram_url, data=form) as response:
|
async with self._session.post(telegram_url, data=_build_form()) as response:
|
||||||
result = await response.json()
|
result = await response.json()
|
||||||
if response.status == 200 and result.get("ok"):
|
if response.status == 200 and result.get("ok"):
|
||||||
res = result.get("result", {})
|
res = result.get("result", {})
|
||||||
@@ -215,6 +252,18 @@ class TelegramClient:
|
|||||||
thumbhash=thumbhash, size=len(data),
|
thumbhash=thumbhash, size=len(data),
|
||||||
)
|
)
|
||||||
return {"success": True, "message_id": res.get("message_id")}
|
return {"success": True, "message_id": res.get("message_id")}
|
||||||
|
|
||||||
|
if _is_rate_limited(response.status, result) and attempt < _TG_429_MAX_ATTEMPTS:
|
||||||
|
retry_after = _extract_retry_after(result) or 1
|
||||||
|
wait_s = min(retry_after + 1, _TG_429_MAX_WAIT_S)
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Telegram %s 429 (retry_after=%ds, attempt %d/%d) bytes=%d — sleeping %ds",
|
||||||
|
kind.api_method, retry_after, attempt, _TG_429_MAX_ATTEMPTS,
|
||||||
|
len(data), wait_s,
|
||||||
|
)
|
||||||
|
await asyncio.sleep(wait_s)
|
||||||
|
continue
|
||||||
|
|
||||||
_LOGGER.error(
|
_LOGGER.error(
|
||||||
"Telegram %s failed: status=%s code=%s desc=%r bytes=%d",
|
"Telegram %s failed: status=%s code=%s desc=%r bytes=%d",
|
||||||
kind.api_method, response.status, result.get("error_code"),
|
kind.api_method, response.status, result.get("error_code"),
|
||||||
@@ -227,6 +276,9 @@ class TelegramClient:
|
|||||||
kind.api_method, len(data), err, exc_info=True,
|
kind.api_method, len(data), err, exc_info=True,
|
||||||
)
|
)
|
||||||
return {"success": False, "error": str(err)}
|
return {"success": False, "error": str(err)}
|
||||||
|
# All attempts exhausted via 429 — should be unreachable, but keep
|
||||||
|
# an explicit error path so we never return None.
|
||||||
|
return {"success": False, "error": "Telegram rate limit: max retries exhausted"}
|
||||||
|
|
||||||
async def send_notification(
|
async def send_notification(
|
||||||
self,
|
self,
|
||||||
@@ -299,12 +351,7 @@ class TelegramClient:
|
|||||||
send_large_photos_as_documents,
|
send_large_photos_as_documents,
|
||||||
)
|
)
|
||||||
finally:
|
finally:
|
||||||
if typing_task:
|
await self.stop_keepalive(typing_task)
|
||||||
typing_task.cancel()
|
|
||||||
try:
|
|
||||||
await typing_task
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def send_message(
|
async def send_message(
|
||||||
self,
|
self,
|
||||||
@@ -368,20 +415,53 @@ class TelegramClient:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def start_chat_action_keepalive(self, chat_id: str, action: str = "typing") -> asyncio.Task:
|
def start_chat_action_keepalive(self, chat_id: str, action: str = "typing") -> asyncio.Task:
|
||||||
"""Repeatedly post ``action`` every 4s until the returned task is cancelled.
|
"""Repeatedly post ``action`` every 4s until stopped.
|
||||||
|
|
||||||
Telegram chat actions expire after ~5s, so callers that want the hint
|
Telegram chat actions expire after ~5s, so callers that want the hint
|
||||||
to persist through longer work (fetching assets, multi-chunk uploads)
|
to persist through longer work (fetching assets, multi-chunk uploads)
|
||||||
need a keep-alive. Cancel the task in a ``finally`` to stop it.
|
need a keep-alive.
|
||||||
|
|
||||||
|
The returned task carries an attached ``stop_event`` (``asyncio.Event``).
|
||||||
|
Stop cleanly via :meth:`stop_keepalive` — setting the event before
|
||||||
|
cancellation prevents the loop from firing one last ``sendChatAction``
|
||||||
|
after the caller's final user-visible message, which would otherwise
|
||||||
|
leave a phantom indicator hanging for ~5s.
|
||||||
"""
|
"""
|
||||||
|
stop_event = asyncio.Event()
|
||||||
|
|
||||||
async def action_loop() -> None:
|
async def action_loop() -> None:
|
||||||
try:
|
try:
|
||||||
while True:
|
while not stop_event.is_set():
|
||||||
await self.send_chat_action(chat_id, action)
|
await self.send_chat_action(chat_id, action)
|
||||||
await asyncio.sleep(4)
|
try:
|
||||||
|
await asyncio.wait_for(stop_event.wait(), timeout=4)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass # 4s elapsed, refresh the action
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
task: asyncio.Task = asyncio.create_task(action_loop())
|
||||||
|
task.stop_event = stop_event # type: ignore[attr-defined]
|
||||||
|
return task
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def stop_keepalive(task: asyncio.Task | None) -> None:
|
||||||
|
"""Stop a keepalive task started by :meth:`start_chat_action_keepalive`.
|
||||||
|
|
||||||
|
Sets the attached stop event before cancelling so the loop won't
|
||||||
|
fire another ``sendChatAction`` after the caller's final message
|
||||||
|
landed at Telegram.
|
||||||
|
"""
|
||||||
|
if task is None:
|
||||||
|
return
|
||||||
|
stop_event = getattr(task, "stop_event", None)
|
||||||
|
if stop_event is not None:
|
||||||
|
stop_event.set()
|
||||||
|
task.cancel()
|
||||||
|
try:
|
||||||
|
await task
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
pass
|
||||||
return asyncio.create_task(action_loop())
|
|
||||||
|
|
||||||
async def _send_photo(
|
async def _send_photo(
|
||||||
self, chat_id: str, url: str | None, caption: str | None = None,
|
self, chat_id: str, url: str | None, caption: str | None = None,
|
||||||
@@ -526,12 +606,10 @@ class TelegramClient:
|
|||||||
all_message_ids.append(result.get("message_id"))
|
all_message_ids.append(result.get("message_id"))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Multi-item: download all, build form, send media group
|
# Multi-item: download all, build form, send media group.
|
||||||
form = FormData()
|
# Attachments are recorded separately so we can rebuild FormData on
|
||||||
form.add_field("chat_id", chat_id)
|
# 429 retry — aiohttp.FormData is single-use after a request.
|
||||||
if reply_to_message_id and chunk_idx == 0:
|
attachments: list[tuple[str, bytes, str, str]] = [] # (name, data, filename, content_type)
|
||||||
form.add_field("reply_parameters", json.dumps({"message_id": reply_to_message_id}))
|
|
||||||
|
|
||||||
media_json = []
|
media_json = []
|
||||||
upload_idx = 0
|
upload_idx = 0
|
||||||
# Track cache info per media_json entry (in order) so we can map
|
# Track cache info per media_json entry (in order) so we can map
|
||||||
@@ -646,7 +724,7 @@ class TelegramClient:
|
|||||||
attach_name = f"file{upload_idx}"
|
attach_name = f"file{upload_idx}"
|
||||||
ct = item.get("content_type") or ("image/jpeg" if media_type == "photo" else "video/mp4")
|
ct = item.get("content_type") or ("image/jpeg" if media_type == "photo" else "video/mp4")
|
||||||
ext = "jpg" if media_type == "photo" else "mp4"
|
ext = "jpg" if media_type == "photo" else "mp4"
|
||||||
form.add_field(attach_name, data, filename=f"media_{idx}.{ext}", content_type=ct)
|
attachments.append((attach_name, data, f"media_{idx}.{ext}", ct))
|
||||||
mij = {"type": media_type, "media": f"attach://{attach_name}"}
|
mij = {"type": media_type, "media": f"attach://{attach_name}"}
|
||||||
upload_idx += 1
|
upload_idx += 1
|
||||||
# Record cache key so we can store file_id from response
|
# Record cache key so we can store file_id from response
|
||||||
@@ -674,11 +752,22 @@ class TelegramClient:
|
|||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
form.add_field("media", json.dumps(media_json))
|
|
||||||
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendMediaGroup"
|
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendMediaGroup"
|
||||||
|
|
||||||
|
def _build_form() -> FormData:
|
||||||
|
f = FormData()
|
||||||
|
f.add_field("chat_id", chat_id)
|
||||||
|
if reply_to_message_id and chunk_idx == 0:
|
||||||
|
f.add_field("reply_parameters", json.dumps({"message_id": reply_to_message_id}))
|
||||||
|
for name, payload, filename, ct in attachments:
|
||||||
|
f.add_field(name, payload, filename=filename, content_type=ct)
|
||||||
|
f.add_field("media", json.dumps(media_json))
|
||||||
|
return f
|
||||||
|
|
||||||
|
chunk_failed_result: dict[str, Any] | None = None
|
||||||
|
for attempt in range(1, _TG_429_MAX_ATTEMPTS + 1):
|
||||||
try:
|
try:
|
||||||
async with self._session.post(telegram_url, data=form) as response:
|
async with self._session.post(telegram_url, data=_build_form()) as response:
|
||||||
result = await response.json()
|
result = await response.json()
|
||||||
if response.status == 200 and result.get("ok"):
|
if response.status == 200 and result.get("ok"):
|
||||||
result_msgs = result.get("result", [])
|
result_msgs = result.get("result", [])
|
||||||
@@ -707,19 +796,32 @@ class TelegramClient:
|
|||||||
eff_cache = self._get_cache_for_key(cache_entries[0][0], is_asset_cache_key(cache_entries[0][0]))
|
eff_cache = self._get_cache_for_key(cache_entries[0][0], is_asset_cache_key(cache_entries[0][0]))
|
||||||
if eff_cache:
|
if eff_cache:
|
||||||
await eff_cache.async_set_many(cache_entries)
|
await eff_cache.async_set_many(cache_entries)
|
||||||
else:
|
break # chunk succeeded
|
||||||
|
|
||||||
|
if _is_rate_limited(response.status, result) and attempt < _TG_429_MAX_ATTEMPTS:
|
||||||
|
retry_after = _extract_retry_after(result) or 1
|
||||||
|
wait_s = min(retry_after + 1, _TG_429_MAX_WAIT_S)
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Telegram sendMediaGroup 429 (retry_after=%ds, attempt %d/%d) chunk=%d/%d items=%d — sleeping %ds",
|
||||||
|
retry_after, attempt, _TG_429_MAX_ATTEMPTS,
|
||||||
|
chunk_idx + 1, len(chunks), len(media_json), wait_s,
|
||||||
|
)
|
||||||
|
await asyncio.sleep(wait_s)
|
||||||
|
continue
|
||||||
|
|
||||||
_LOGGER.error(
|
_LOGGER.error(
|
||||||
"Telegram sendMediaGroup failed: status=%s code=%s desc=%r chunk=%d/%d items=%d",
|
"Telegram sendMediaGroup failed: status=%s code=%s desc=%r chunk=%d/%d items=%d",
|
||||||
response.status, result.get("error_code"),
|
response.status, result.get("error_code"),
|
||||||
result.get("description", "Unknown"),
|
result.get("description", "Unknown"),
|
||||||
chunk_idx + 1, len(chunks), len(media_json),
|
chunk_idx + 1, len(chunks), len(media_json),
|
||||||
)
|
)
|
||||||
return {
|
chunk_failed_result = {
|
||||||
"success": False,
|
"success": False,
|
||||||
"error": result.get("description", "Unknown"),
|
"error": result.get("description", "Unknown"),
|
||||||
"error_code": result.get("error_code"),
|
"error_code": result.get("error_code"),
|
||||||
"failed_at_chunk": chunk_idx + 1,
|
"failed_at_chunk": chunk_idx + 1,
|
||||||
}
|
}
|
||||||
|
break
|
||||||
except aiohttp.ClientError as err:
|
except aiohttp.ClientError as err:
|
||||||
_LOGGER.error(
|
_LOGGER.error(
|
||||||
"Telegram sendMediaGroup transport error on chunk %d/%d (%d items): %s",
|
"Telegram sendMediaGroup transport error on chunk %d/%d (%d items): %s",
|
||||||
@@ -728,6 +830,9 @@ class TelegramClient:
|
|||||||
)
|
)
|
||||||
return {"success": False, "error": str(err), "failed_at_chunk": chunk_idx + 1}
|
return {"success": False, "error": str(err), "failed_at_chunk": chunk_idx + 1}
|
||||||
|
|
||||||
|
if chunk_failed_result is not None:
|
||||||
|
return chunk_failed_result
|
||||||
|
|
||||||
# Distinguish "posted something" from "posted nothing" so the caller
|
# Distinguish "posted something" from "posted nothing" so the caller
|
||||||
# can surface an ERROR when a command produced a caption reply but no
|
# can surface an ERROR when a command produced a caption reply but no
|
||||||
# media ever reached Telegram.
|
# media ever reached Telegram.
|
||||||
|
|||||||
@@ -1391,6 +1391,40 @@ async def migrate_performance_indexes(engine: AsyncEngine) -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def migrate_chat_action_to_column(engine: AsyncEngine) -> None:
|
||||||
|
"""Move ``chat_action`` from ``config`` JSON to the dedicated column.
|
||||||
|
|
||||||
|
Earlier versions of the frontend stored ``chat_action`` inside
|
||||||
|
``notification_target.config``; the dedicated ``chat_action`` column
|
||||||
|
was rarely set or held a stale default. The dispatcher's resolver
|
||||||
|
overrode the config value with the (stale) column, so a user's UI
|
||||||
|
choice silently had no effect on outgoing chat actions.
|
||||||
|
|
||||||
|
This backfill takes the config value as authoritative (it's what the
|
||||||
|
UI was writing) and copies it to the column, then strips it from
|
||||||
|
config so the column becomes the single source of truth. Idempotent:
|
||||||
|
a second run finds nothing to migrate.
|
||||||
|
"""
|
||||||
|
async with engine.begin() as conn:
|
||||||
|
if not await _has_table(conn, "notification_target"):
|
||||||
|
return
|
||||||
|
if not await _has_column(conn, "notification_target", "chat_action"):
|
||||||
|
return
|
||||||
|
# Copy config["chat_action"] → column where present.
|
||||||
|
await conn.execute(text(
|
||||||
|
"UPDATE notification_target "
|
||||||
|
"SET chat_action = json_extract(config, '$.chat_action') "
|
||||||
|
"WHERE json_extract(config, '$.chat_action') IS NOT NULL"
|
||||||
|
))
|
||||||
|
# Strip the legacy key so the column is unambiguous going forward.
|
||||||
|
await conn.execute(text(
|
||||||
|
"UPDATE notification_target "
|
||||||
|
"SET config = json_remove(config, '$.chat_action') "
|
||||||
|
"WHERE json_extract(config, '$.chat_action') IS NOT NULL"
|
||||||
|
))
|
||||||
|
logger.info("Migrated chat_action from config JSON to column where present")
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Schema version tracking — lightweight alternative to Alembic while the
|
# Schema version tracking — lightweight alternative to Alembic while the
|
||||||
# hand-rolled idempotent migrations remain the source of truth. Gives
|
# hand-rolled idempotent migrations remain the source of truth. Gives
|
||||||
|
|||||||
@@ -75,6 +75,7 @@ async def lifespan(app: FastAPI):
|
|||||||
migrate_notification_slot_locale,
|
migrate_notification_slot_locale,
|
||||||
migrate_user_token_version,
|
migrate_user_token_version,
|
||||||
migrate_performance_indexes,
|
migrate_performance_indexes,
|
||||||
|
migrate_chat_action_to_column,
|
||||||
migrate_schema_version,
|
migrate_schema_version,
|
||||||
)
|
)
|
||||||
from .database.snapshot import snapshot_and_prune
|
from .database.snapshot import snapshot_and_prune
|
||||||
@@ -98,6 +99,7 @@ async def lifespan(app: FastAPI):
|
|||||||
await migrate_notification_slot_locale(engine)
|
await migrate_notification_slot_locale(engine)
|
||||||
await migrate_user_token_version(engine)
|
await migrate_user_token_version(engine)
|
||||||
await migrate_performance_indexes(engine)
|
await migrate_performance_indexes(engine)
|
||||||
|
await migrate_chat_action_to_column(engine)
|
||||||
await migrate_schema_version(engine)
|
await migrate_schema_version(engine)
|
||||||
from .database.seeds import seed_all
|
from .database.seeds import seed_all
|
||||||
await seed_all()
|
await seed_all()
|
||||||
|
|||||||
@@ -326,7 +326,11 @@ async def _resolve_target(
|
|||||||
receivers.append(build_receiver(target.type, dict(r.config), locale))
|
receivers.append(build_receiver(target.type, dict(r.config), locale))
|
||||||
|
|
||||||
target_config = dict(target.config)
|
target_config = dict(target.config)
|
||||||
# Inject chat_action for Telegram targets
|
# chat_action lives on the model column — single source of truth.
|
||||||
|
# Strip any legacy/stale value from config so an old config-stored value
|
||||||
|
# can't shadow the user's UI choice. When the column is unset, leave the
|
||||||
|
# key absent so the dispatcher's "typing" fallback applies.
|
||||||
|
target_config.pop("chat_action", None)
|
||||||
if hasattr(target, 'chat_action') and target.chat_action:
|
if hasattr(target, 'chat_action') and target.chat_action:
|
||||||
target_config["chat_action"] = target.chat_action
|
target_config["chat_action"] = target.chat_action
|
||||||
# Inject bot credentials for bot-backed target types
|
# Inject bot credentials for bot-backed target types
|
||||||
|
|||||||
@@ -19,7 +19,6 @@ this module just guarantees every caller gets a properly-wired client.
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import contextlib
|
import contextlib
|
||||||
from typing import Any, AsyncIterator, Callable
|
from typing import Any, AsyncIterator, Callable
|
||||||
|
|
||||||
@@ -144,6 +143,4 @@ async def telegram_chat_action(
|
|||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
task.cancel()
|
await client.stop_keepalive(task)
|
||||||
with contextlib.suppress(asyncio.CancelledError):
|
|
||||||
await task
|
|
||||||
|
|||||||
Reference in New Issue
Block a user