Files
notify-bridge/packages/server/src/notify_bridge_server/services/notifier.py
T
alexei.dolgolyov ef942b77cc feat(telegram): per-chat command localization + unified locale resolver
Two related Telegram changes:

1. Per-chat command localization. setMyCommands now accepts a scope
   (BotCommandScopeChat) and deleteMyCommands clears scoped bindings.
   Command registration runs three tiers: default → per-language
   (Telegram client language) → per-chat (UI override). Saving a
   chat's language_override or commands_enabled toggle pushes the
   binding to Telegram inline rather than waiting on the 30s
   debounced bot-wide sync.

2. Unified Telegram locale resolution. Three test paths (bot test_chat,
   target receiver test, target-level fan-out) used to disagree on
   locale priority — the target receiver test in particular only
   consulted receiver.locale and ignored the chat's language_override.
   Introduced pick_telegram_locale (pure) and
   resolve_telegram_chat_locale (async DB lookup) in services/notifier
   so all three paths share one priority order:

       receiver.locale → chat.language_override → chat.language_code → fallback

   Fan-out keeps batch-loading TelegramChat rows for efficiency, just
   runs them through the same priority function now.
2026-04-25 14:41:28 +03:00

490 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Notification sender — unified send logic for all paths (dispatch + test)."""
import asyncio
import logging
from typing import Any
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..database.engine import get_engine
from ..database.models import NotificationTarget, TargetReceiver
_LOGGER = logging.getLogger(__name__)
# Cap on concurrent per-receiver test sends. Keeps us under Telegram's per-bot
# rate limit (~30 msg/s) while still saving ~N×RTT on multi-chat broadcasts.
_TEST_SEND_CONCURRENCY = 5
_TEST_MESSAGES: dict[str, dict[str, str]] = {
"en": {
"telegram": "\u2705 Test message from <b>Notify Bridge</b>",
"webhook": "Test notification from Notify Bridge",
"email": "Test email from Notify Bridge",
"discord": "Test message from **Notify Bridge**",
"slack": "Test message from *Notify Bridge*",
"ntfy": "Test notification from Notify Bridge",
"matrix": "Test message from Notify Bridge",
},
"ru": {
"telegram": "\u2705 Тестовое сообщение от <b>Notify Bridge</b>",
"webhook": "Тестовое уведомление от Notify Bridge",
"email": "Тестовое письмо от Notify Bridge",
"discord": "Тестовое сообщение от **Notify Bridge**",
"slack": "Тестовое сообщение от *Notify Bridge*",
"ntfy": "Тестовое уведомление от Notify Bridge",
"matrix": "Тестовое сообщение от Notify Bridge",
},
}
def _get_test_message(locale: str, target_type: str) -> str:
msgs = _TEST_MESSAGES.get(locale, _TEST_MESSAGES["en"])
return msgs.get(target_type, msgs.get("webhook", "Test"))
def pick_telegram_locale(
*,
receiver_locale: str = "",
chat_override: str = "",
chat_language_code: str = "",
fallback: str = "en",
) -> str:
"""Pick the effective 2-letter locale for a Telegram chat.
Priority (highest first):
1. ``receiver_locale`` — explicit per-receiver override on a target.
2. ``chat_override`` — explicit ``TelegramChat.language_override``
set in the bot/chat manager UI.
3. ``chat_language_code`` — Telegram-provided ``language_code``.
4. ``fallback`` — caller-supplied default (e.g. query param).
All inputs are coerced to lowercase 2-letter codes.
"""
for candidate in (receiver_locale, chat_override, chat_language_code, fallback):
if candidate:
return candidate[:2].lower()
return "en"
async def resolve_telegram_chat_locale(
session: AsyncSession,
*,
bot_id: int | None,
chat_id: str | int | None,
receiver: TargetReceiver | None = None,
fallback: str = "en",
) -> str:
"""Look up a Telegram chat and resolve its effective locale.
Single source of truth for "what language should I send to this chat in?".
Used by every Telegram test/preview path (bot test_chat, target test
receiver, per-receiver fan-out) so they stay in lockstep.
"""
from ..database.models import TelegramChat
chat_row = None
if bot_id and chat_id:
chat_row = (await session.exec(
select(TelegramChat).where(
TelegramChat.bot_id == bot_id,
TelegramChat.chat_id == str(chat_id),
)
)).first()
return pick_telegram_locale(
receiver_locale=getattr(receiver, "locale", "") if receiver else "",
chat_override=getattr(chat_row, "language_override", "") if chat_row else "",
chat_language_code=getattr(chat_row, "language_code", "") if chat_row else "",
fallback=fallback,
)
async def _load_receivers(target_id: int) -> list[dict]:
"""Load enabled receivers for a target from DB."""
engine = get_engine()
async with AsyncSession(engine) as session:
result = await session.exec(
select(TargetReceiver).where(
TargetReceiver.target_id == target_id,
TargetReceiver.enabled == True,
)
)
return [dict(r.config) for r in result.all()]
async def send_to_target(target: NotificationTarget, message: str) -> dict:
"""Send a text message to a target, broadcasting to all receivers.
Used for basic test and template preview sends (text only, no media).
Real notifications with media go through NotificationDispatcher.
"""
try:
receivers = await _load_receivers(target.id)
send_fn = {
"telegram": _send_telegram_broadcast,
"webhook": _send_webhook_broadcast,
"email": _send_email_broadcast,
"discord": _send_webhook_like_broadcast,
"slack": _send_webhook_like_broadcast,
"ntfy": _send_ntfy_broadcast,
"matrix": _send_matrix_broadcast,
}.get(target.type)
if send_fn:
return await send_fn(target, message, receivers)
return {"success": False, "error": f"Unknown target type: {target.type}"}
except Exception as e:
_LOGGER.error("Send failed: %s", e)
return {"success": False, "error": str(e)}
async def _send_telegram_broadcast(target: NotificationTarget, message: str, receivers: list[dict]) -> dict:
from notify_bridge_core.notifications.telegram.client import TelegramClient
bot_token = target.config.get("bot_token")
disable_preview = target.config.get("disable_url_preview", False)
if not bot_token:
return {"success": False, "error": "Missing bot_token"}
if not receivers:
return {"success": False, "error": "No receivers configured"}
from .http_session import get_http_session
http = await get_http_session()
results: list[dict] = []
client = TelegramClient(http, bot_token)
for recv in receivers:
chat_id = recv.get("chat_id")
if not chat_id:
continue
result = await client.send_message(
chat_id=str(chat_id),
text=message,
disable_web_page_preview=bool(disable_preview),
)
results.append(result)
return _aggregate(results)
async def _send_webhook_broadcast(target: NotificationTarget, message: str, receivers: list[dict]) -> dict:
from notify_bridge_core.notifications.webhook.client import WebhookClient
if not receivers:
return {"success": False, "error": "No receivers configured"}
from .http_session import get_http_session
http = await get_http_session()
results: list[dict] = []
for recv in receivers:
url = recv.get("url")
headers = recv.get("headers", {})
if not url:
continue
client = WebhookClient(http, url, headers)
results.append(await client.send({"message": message, "event_type": "notification"}))
return _aggregate(results)
async def _send_email_broadcast(target: NotificationTarget, message: str, receivers: list[dict]) -> dict:
from notify_bridge_core.notifications.email.client import EmailClient, SmtpConfig
from ..database.models import EmailBot
email_bot_id = target.config.get("email_bot_id")
if not email_bot_id:
return {"success": False, "error": "No email bot configured for this target"}
engine = get_engine()
async with AsyncSession(engine) as session:
email_bot = await session.get(EmailBot, email_bot_id)
if not email_bot:
return {"success": False, "error": "Email bot not found"}
smtp_cfg = SmtpConfig(
host=email_bot.smtp_host,
port=email_bot.smtp_port,
username=email_bot.smtp_username,
password=email_bot.smtp_password,
from_address=email_bot.email,
from_name=email_bot.name,
use_tls=email_bot.smtp_use_tls,
)
if not smtp_cfg.host or not smtp_cfg.from_address:
return {"success": False, "error": "Email bot SMTP not configured"}
if not receivers:
return {"success": False, "error": "No email receivers configured"}
client = EmailClient(smtp_cfg)
results: list[dict] = []
for recv in receivers:
email = recv.get("email")
if not email:
continue
result = await client.send(
to_email=email,
subject="Notification from Notify Bridge",
body_text=message,
body_html=message,
to_name=recv.get("name", ""),
)
results.append(result)
return _aggregate(results)
async def _send_webhook_like_broadcast(target: NotificationTarget, message: str, receivers: list[dict]) -> dict:
"""Broadcast for Discord and Slack — both use webhook URLs as receivers."""
if not receivers:
return {"success": False, "error": "No receivers configured"}
from .http_session import get_http_session
http = await get_http_session()
results: list[dict] = []
if target.type == "discord":
from notify_bridge_core.notifications.discord.client import DiscordClient
client = DiscordClient(http)
for recv in receivers:
url = recv.get("webhook_url")
if url:
results.append(await client.send(url, message, username=target.config.get("username")))
elif target.type == "slack":
from notify_bridge_core.notifications.slack.client import SlackClient
client = SlackClient(http)
for recv in receivers:
url = recv.get("webhook_url")
if url:
results.append(await client.send(url, message, username=target.config.get("username")))
return _aggregate(results)
async def _send_ntfy_broadcast(target: NotificationTarget, message: str, receivers: list[dict]) -> dict:
"""Broadcast to ntfy topics."""
server_url = target.config.get("server_url", "https://ntfy.sh")
auth_token = target.config.get("auth_token")
if not receivers:
return {"success": False, "error": "No receivers configured"}
from notify_bridge_core.notifications.ntfy.client import NtfyClient
from .http_session import get_http_session
http = await get_http_session()
results: list[dict] = []
client = NtfyClient(http)
for recv in receivers:
topic = recv.get("topic")
if topic:
results.append(await client.send(
server_url, topic, message,
title="Notify Bridge",
priority=recv.get("priority", 3),
auth_token=auth_token,
))
return _aggregate(results)
async def _send_matrix_broadcast(target: NotificationTarget, message: str, receivers: list[dict]) -> dict:
"""Broadcast to Matrix rooms."""
from notify_bridge_core.notifications.matrix.client import MatrixClient
from ..database.models import MatrixBot
matrix_bot_id = target.config.get("matrix_bot_id")
if not matrix_bot_id:
return {"success": False, "error": "No Matrix bot configured for this target"}
engine = get_engine()
async with AsyncSession(engine) as session:
bot = await session.get(MatrixBot, matrix_bot_id)
if not bot:
return {"success": False, "error": "Matrix bot not found"}
homeserver = bot.homeserver_url
access_token = bot.access_token
if not receivers:
return {"success": False, "error": "No receivers configured"}
from .http_session import get_http_session
http = await get_http_session()
results: list[dict] = []
client = MatrixClient(http, homeserver, access_token)
for recv in receivers:
room_id = recv.get("room_id")
if room_id:
results.append(await client.send_message(room_id, message, html_message=message))
return _aggregate(results)
def _aggregate(results: list[dict]) -> dict:
"""Aggregate broadcast results."""
successes = sum(1 for r in results if r.get("success"))
if successes == len(results) and results:
return {"success": True, "receivers": len(results)}
elif successes > 0:
return {"success": True, "receivers": len(results), "partial_failures": len(results) - successes}
elif results:
return results[0]
return {"success": False, "error": "No valid receivers"}
# --- Public API used by routes ---
async def send_to_receiver(target: NotificationTarget, receiver_config: dict, message: str) -> dict:
"""Send a message to a single receiver of a target."""
try:
send_fn = {
"telegram": _send_telegram_broadcast,
"webhook": _send_webhook_broadcast,
"email": _send_email_broadcast,
"discord": _send_webhook_like_broadcast,
"slack": _send_webhook_like_broadcast,
"ntfy": _send_ntfy_broadcast,
"matrix": _send_matrix_broadcast,
}.get(target.type)
if send_fn:
return await send_fn(target, message, [receiver_config])
return {"success": False, "error": f"Unknown target type: {target.type}"}
except Exception as e:
_LOGGER.error("Send to receiver failed: %s", e)
return {"success": False, "error": str(e)}
async def send_test_notification(target: NotificationTarget, locale: str = "en") -> dict:
"""Send a simple test message. For broadcast targets, fans out to all children.
For Telegram targets, per-receiver locale (TargetReceiver.locale or
TelegramChat.language_override/language_code) is resolved individually so
each chat receives the message in its own configured language.
"""
if target.type == "broadcast":
return await _send_broadcast_test(target, locale)
if target.type == "telegram":
return await _send_telegram_test_per_receiver(target, default_locale=locale)
message = _get_test_message(locale, target.type)
return await send_to_target(target, message)
async def _send_telegram_test_per_receiver(
target: NotificationTarget, default_locale: str = "en",
) -> dict:
"""Send a test message to each Telegram receiver in its own resolved locale."""
from notify_bridge_core.notifications.telegram.client import TelegramClient
from ..database.models import TargetReceiver, TelegramChat
from .http_session import get_http_session
bot_token = target.config.get("bot_token")
bot_id = target.config.get("bot_id")
disable_preview = target.config.get("disable_url_preview", False)
if not bot_token:
return {"success": False, "error": "Missing bot_token"}
engine = get_engine()
async with AsyncSession(engine) as session:
recv_rows = (await session.exec(
select(TargetReceiver).where(
TargetReceiver.target_id == target.id,
TargetReceiver.enabled == True,
)
)).all()
if not recv_rows:
return {"success": False, "error": "No receivers configured"}
# Batch-load TelegramChat rows so per-receiver locale picks don't
# round-trip the DB N times. Priority resolution then runs through the
# shared pick_telegram_locale() helper so single-shot test endpoints
# and this fan-out agree on the same rules.
chat_ids = [str(r.config.get("chat_id", "")) for r in recv_rows if r.config.get("chat_id")]
chat_row_map: dict[str, TelegramChat] = {}
if bot_id and chat_ids:
chat_rows = (await session.exec(
select(TelegramChat).where(
TelegramChat.bot_id == bot_id,
TelegramChat.chat_id.in_(chat_ids),
)
)).all()
chat_row_map = {chat.chat_id: chat for chat in chat_rows}
http = await get_http_session()
client = TelegramClient(http, bot_token)
# Parallelize per-receiver sends with a small semaphore — broadcast to
# N chats now takes ~ceil(N / concurrency) × RTT instead of N × RTT,
# matching the dispatcher's bounded-concurrency pattern. Capped below
# Telegram's rate limit so we don't trigger 429s on large fleets.
sem = asyncio.Semaphore(_TEST_SEND_CONCURRENCY)
async def _send_one(r: TargetReceiver) -> dict | None:
chat_id = str(r.config.get("chat_id", ""))
if not chat_id:
return None
chat_row = chat_row_map.get(chat_id)
locale = pick_telegram_locale(
receiver_locale=getattr(r, "locale", "") or "",
chat_override=getattr(chat_row, "language_override", "") if chat_row else "",
chat_language_code=getattr(chat_row, "language_code", "") if chat_row else "",
fallback=default_locale,
)
message = _get_test_message(locale, "telegram")
async with sem:
return await client.send_message(
chat_id=chat_id,
text=message,
disable_web_page_preview=bool(disable_preview),
)
raw = await asyncio.gather(*(_send_one(r) for r in recv_rows))
results = [r for r in raw if r is not None]
return _aggregate(results)
async def _send_broadcast_test(target: NotificationTarget, locale: str) -> dict:
"""Send test notifications to all child targets of a broadcast target."""
child_ids = target.config.get("child_target_ids", [])
if not child_ids:
return {"success": False, "error": "No child targets configured"}
disabled_ids = set(target.config.get("disabled_child_ids", []))
engine = get_engine()
results: list[dict] = []
async with AsyncSession(engine) as session:
for child_id in child_ids:
if child_id in disabled_ids:
continue
child = await session.get(NotificationTarget, child_id)
if not child or child.type == "broadcast":
continue
message = _get_test_message(locale, child.type)
results.append(await send_to_target(child, message))
return _aggregate(results)
async def send_test_template_notification(
target: NotificationTarget, slot: str, template_str: str
) -> dict:
"""Render a template slot with sample data and send."""
from jinja2.sandbox import SandboxedEnvironment
from .sample_context import _SAMPLE_CONTEXT
if not template_str:
return await send_test_notification(target)
try:
env = SandboxedEnvironment(autoescape=False)
tmpl = env.from_string(template_str)
message = tmpl.render(**_SAMPLE_CONTEXT)
except Exception as e:
return {"success": False, "error": f"Template render error: {e}"}
return await send_to_target(target, message)