Files
notify-bridge/packages/server/src/notify_bridge_server/services/notifier.py
T
alexei.dolgolyov 10d30fc956 feat: production readiness — security, perf, bug fixes, bridge self-monitoring
Comprehensive multi-area pass driven by a parallel 8-agent production
review. Frontend, backend, database, security, performance, operational,
plus a new self-monitoring feature.

## Critical fixes
- Planka webhook: reads bounded raw body (was NameError on every call)
- HA quiet hours: ha_state_changed/automation_triggered/service_called/
  event_fired added to deferrable set (were silently dropped)
- DNS-rebinding SSRF: PinnedResolver wired into shared aiohttp session
- Telegram inbound webhook: secret now mandatory (401 without)
- Generic webhook: auth_mode="none" requires explicit
  acknowledge_unauthenticated=true; per-IP rate limit 60/min
- svelte-check: 5 null-narrowing errors in EventDetailModal fixed
- Provider hardcoding: Immich-only block extracted to descriptor
  featureDiscoveryHint
- command_sync: snapshot+expunge bot before exiting AsyncSession

## Bug fixes
- notifier asyncio.gather(return_exceptions=True) — one bad chat no longer
  cancels peer sends
- NotificationDispatcher hoisted out of per-tracker loop
- Provider credential resolution unified across all 5 dispatch sites
- HA asyncio.shield now drains inner task on cancellation
- Provider construction switched from if/elif ladder to factory registry
- NUT first poll seeds silently (no spurious ups_on_battery)
- Quiet-hours gate: event-type-disabled now wins over deferral
- APScheduler drain job ID resolution upgraded to seconds
- HA on_status_change wired through to EventLog
- Webhook payload rollback failures now logged (not swallowed)
- Batched receivers/chats/bots in load_link_data (was per-target N+1)
- flag_modified on JSON column reassignments in deferred_dispatch

## Database
- UNIQUE indexes on service_provider.webhook_token,
  telegram_bot.webhook_path_id, partial UNIQUE on telegram_bot.bot_id,
  telegram_chat(bot_id, chat_id), notification_tracker_target unique link,
  partial UNIQUE on bridge_self provider per user
- Composite ix_event_log_user_event_type_created index
- save_chat_from_webhook switched to ON CONFLICT DO UPDATE
- ondelete=CASCADE on user-id FKs (model annotation; app-side cascade
  delete added for existing data)
- delete_notification_tracker converted from N+1 to bulk DELETE/UPDATE
- Module-level asyncio.Lock replaced with lazy _get_lock() pattern
- VACUUM INTO snapshot now PRAGMA integrity_check verified

## Performance
- Jinja2 template compilation LRU cached (lru_cache maxsize=512)
- Per-locale render cache in NotificationDispatcher (skips re-rendering
  identical content for receivers sharing a locale)
- Tracker list cached per provider_id with 5s TTL + explicit invalidation
  on tracker CRUD (relieves HA chat-bus rate query pressure)
- Nav-counts collapsed from 16 round-trips to single UNION ALL
- HA event_log: skip persisting empty assets_added/removed events

## Security hardening
- Mass-assignment guard on Action create/update; cron sub-minute reject
- Backup JSON depth/node-count cap (depth ≤ 10, nodes ≤ 100k)
- _sanitize_config extended to all JSON-typed fields on backup import
- Telegram _safe_get walks redirects manually with SSRF revalidation
- Bcrypt 72-byte password length cap with clear 422
- Webhook payload body redaction; sensitive substring set extended with
  oauth/client_secret/webhook_secret/csrf in both header filter and
  template extras filter

## Frontend
- 76 catch (err: any) sites converted to errMsg(err) helper
- globalProviderFilter: pure getter; reconciliation moved to one-time
  $effect in +layout
- Provider-filter binding: removed paired $effects + _syncingFilter flag,
  now one-way derived
- entity-cache: separate _refreshing flag for background re-fetches
- api.ts 401 handling: AuthRedirectError class + dedup _redirecting flag,
  goto() instead of window.location.href
- a11y: aria-expanded on mobile More, role=switch + aria-checked on
  Telegram bot toggles

## Tests & operations
- CI pytest gate added to .gitea/workflows/build.yml + release.yml
  (wheel-built install to dodge editable-install slowness)
- /api/ready upgraded to deep healthcheck (db SELECT 1, scheduler.running,
  HA supervisor presence) returning {ready, checks, errors, version}
- /api/metrics endpoint with prometheus_client (deferred_pending,
  event_log_total, dispatch_duration, poll_failures, send_failures)
- New OPERATIONS.md covering deploy, healthchecks, metrics, backup/restore
  procedures, log handling, common scenarios, upgrade flow
- New tests: test_bridge_self (11), test_gitea_parser (9),
  test_planka_parser (6), test_immich_change_detector (6),
  test_backup_roundtrip (1)

## New feature: bridge self-monitoring
- New bridge_self provider type — internal sink for bridge health events
- Three event types: bridge_self_poll_failures (consecutive tracker poll
  failures), bridge_self_deferred_backlog (pending count crosses
  threshold), bridge_self_target_failures (consecutive 5xx/network
  failures per target)
- Per-user thresholds (defaults: 3 / 100 / 5) configurable via the
  provider config form
- Auto-seeded on user create + /setup + boot backfill for existing users
- Anti-spam: counters reset after emission; backlog uses transition latch
- Self-loop guard: bridge_self failures don't count toward target-failure
  thresholds (logged only) — wire to your own Telegram/Email/Matrix to
  get notified when polls/dispatches/sends fail
- 6 default templates (3 events × 2 locales), tracking config columns
  with backfill migration, frontend descriptor (excluded from "create
  provider" wizard since auto-managed)

Operator-visible behavior changes (call out in release notes):
- NOTIFY_BRIDGE_TELEGRAM_WEBHOOK_SECRET now REQUIRED for webhook mode
- Existing webhook providers with auth_mode="none" need explicit opt-in
- Generic webhook endpoint rate-limited 60/min per source IP
- HA disconnect/reconnect writes ha_status_* EventLog rows
- Every user gets a bridge_self provider — wire it to a target to
  receive failure alerts

Pre-existing test failures (test_ssrf, test_release_provider) on
Python 3.13 are unrelated; CI runs on 3.12.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 02:16:49 +03:00

502 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Notification sender — unified send logic for all paths (dispatch + test)."""
import asyncio
import logging
from typing import Any
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..database.engine import get_engine
from ..database.models import NotificationTarget, TargetReceiver
_LOGGER = logging.getLogger(__name__)
# Cap on concurrent per-receiver test sends. Keeps us under Telegram's per-bot
# rate limit (~30 msg/s) while still saving ~N×RTT on multi-chat broadcasts.
_TEST_SEND_CONCURRENCY = 5
_TEST_MESSAGES: dict[str, dict[str, str]] = {
"en": {
"telegram": "\u2705 Test message from <b>Notify Bridge</b>",
"webhook": "Test notification from Notify Bridge",
"email": "Test email from Notify Bridge",
"discord": "Test message from **Notify Bridge**",
"slack": "Test message from *Notify Bridge*",
"ntfy": "Test notification from Notify Bridge",
"matrix": "Test message from Notify Bridge",
},
"ru": {
"telegram": "\u2705 Тестовое сообщение от <b>Notify Bridge</b>",
"webhook": "Тестовое уведомление от Notify Bridge",
"email": "Тестовое письмо от Notify Bridge",
"discord": "Тестовое сообщение от **Notify Bridge**",
"slack": "Тестовое сообщение от *Notify Bridge*",
"ntfy": "Тестовое уведомление от Notify Bridge",
"matrix": "Тестовое сообщение от Notify Bridge",
},
}
def _get_test_message(locale: str, target_type: str) -> str:
msgs = _TEST_MESSAGES.get(locale, _TEST_MESSAGES["en"])
return msgs.get(target_type, msgs.get("webhook", "Test"))
def pick_telegram_locale(
*,
receiver_locale: str = "",
chat_override: str = "",
chat_language_code: str = "",
fallback: str = "en",
) -> str:
"""Pick the effective 2-letter locale for a Telegram chat.
Priority (highest first):
1. ``receiver_locale`` — explicit per-receiver override on a target.
2. ``chat_override`` — explicit ``TelegramChat.language_override``
set in the bot/chat manager UI.
3. ``chat_language_code`` — Telegram-provided ``language_code``.
4. ``fallback`` — caller-supplied default (e.g. query param).
All inputs are coerced to lowercase 2-letter codes.
"""
for candidate in (receiver_locale, chat_override, chat_language_code, fallback):
if candidate:
return candidate[:2].lower()
return "en"
async def resolve_telegram_chat_locale(
session: AsyncSession,
*,
bot_id: int | None,
chat_id: str | int | None,
receiver: TargetReceiver | None = None,
fallback: str = "en",
) -> str:
"""Look up a Telegram chat and resolve its effective locale.
Single source of truth for "what language should I send to this chat in?".
Used by every Telegram test/preview path (bot test_chat, target test
receiver, per-receiver fan-out) so they stay in lockstep.
"""
from ..database.models import TelegramChat
chat_row = None
if bot_id and chat_id:
chat_row = (await session.exec(
select(TelegramChat).where(
TelegramChat.bot_id == bot_id,
TelegramChat.chat_id == str(chat_id),
)
)).first()
return pick_telegram_locale(
receiver_locale=getattr(receiver, "locale", "") if receiver else "",
chat_override=getattr(chat_row, "language_override", "") if chat_row else "",
chat_language_code=getattr(chat_row, "language_code", "") if chat_row else "",
fallback=fallback,
)
async def _load_receivers(target_id: int) -> list[dict]:
"""Load enabled receivers for a target from DB."""
engine = get_engine()
async with AsyncSession(engine) as session:
result = await session.exec(
select(TargetReceiver).where(
TargetReceiver.target_id == target_id,
TargetReceiver.enabled == True,
)
)
return [dict(r.config) for r in result.all()]
async def send_to_target(target: NotificationTarget, message: str) -> dict:
"""Send a text message to a target, broadcasting to all receivers.
Used for basic test and template preview sends (text only, no media).
Real notifications with media go through NotificationDispatcher.
"""
try:
receivers = await _load_receivers(target.id)
send_fn = {
"telegram": _send_telegram_broadcast,
"webhook": _send_webhook_broadcast,
"email": _send_email_broadcast,
"discord": _send_webhook_like_broadcast,
"slack": _send_webhook_like_broadcast,
"ntfy": _send_ntfy_broadcast,
"matrix": _send_matrix_broadcast,
}.get(target.type)
if send_fn:
return await send_fn(target, message, receivers)
return {"success": False, "error": f"Unknown target type: {target.type}"}
except Exception as e:
_LOGGER.error("Send failed: %s", e)
return {"success": False, "error": str(e)}
async def _send_telegram_broadcast(target: NotificationTarget, message: str, receivers: list[dict]) -> dict:
from notify_bridge_core.notifications.telegram.client import TelegramClient
bot_token = target.config.get("bot_token")
disable_preview = target.config.get("disable_url_preview", False)
if not bot_token:
return {"success": False, "error": "Missing bot_token"}
if not receivers:
return {"success": False, "error": "No receivers configured"}
from .http_session import get_http_session
http = await get_http_session()
results: list[dict] = []
client = TelegramClient(http, bot_token)
for recv in receivers:
chat_id = recv.get("chat_id")
if not chat_id:
continue
result = await client.send_message(
chat_id=str(chat_id),
text=message,
disable_web_page_preview=bool(disable_preview),
)
results.append(result)
return _aggregate(results)
async def _send_webhook_broadcast(target: NotificationTarget, message: str, receivers: list[dict]) -> dict:
from notify_bridge_core.notifications.webhook.client import WebhookClient
if not receivers:
return {"success": False, "error": "No receivers configured"}
from .http_session import get_http_session
http = await get_http_session()
results: list[dict] = []
for recv in receivers:
url = recv.get("url")
headers = recv.get("headers", {})
if not url:
continue
client = WebhookClient(http, url, headers)
results.append(await client.send({"message": message, "event_type": "notification"}))
return _aggregate(results)
async def _send_email_broadcast(target: NotificationTarget, message: str, receivers: list[dict]) -> dict:
from notify_bridge_core.notifications.email.client import EmailClient, SmtpConfig
from ..database.models import EmailBot
email_bot_id = target.config.get("email_bot_id")
if not email_bot_id:
return {"success": False, "error": "No email bot configured for this target"}
engine = get_engine()
async with AsyncSession(engine) as session:
email_bot = await session.get(EmailBot, email_bot_id)
if not email_bot:
return {"success": False, "error": "Email bot not found"}
smtp_cfg = SmtpConfig(
host=email_bot.smtp_host,
port=email_bot.smtp_port,
username=email_bot.smtp_username,
password=email_bot.smtp_password,
from_address=email_bot.email,
from_name=email_bot.name,
use_tls=email_bot.smtp_use_tls,
)
if not smtp_cfg.host or not smtp_cfg.from_address:
return {"success": False, "error": "Email bot SMTP not configured"}
if not receivers:
return {"success": False, "error": "No email receivers configured"}
client = EmailClient(smtp_cfg)
results: list[dict] = []
for recv in receivers:
email = recv.get("email")
if not email:
continue
result = await client.send(
to_email=email,
subject="Notification from Notify Bridge",
body_text=message,
body_html=message,
to_name=recv.get("name", ""),
)
results.append(result)
return _aggregate(results)
async def _send_webhook_like_broadcast(target: NotificationTarget, message: str, receivers: list[dict]) -> dict:
"""Broadcast for Discord and Slack — both use webhook URLs as receivers."""
if not receivers:
return {"success": False, "error": "No receivers configured"}
from .http_session import get_http_session
http = await get_http_session()
results: list[dict] = []
if target.type == "discord":
from notify_bridge_core.notifications.discord.client import DiscordClient
client = DiscordClient(http)
for recv in receivers:
url = recv.get("webhook_url")
if url:
results.append(await client.send(url, message, username=target.config.get("username")))
elif target.type == "slack":
from notify_bridge_core.notifications.slack.client import SlackClient
client = SlackClient(http)
for recv in receivers:
url = recv.get("webhook_url")
if url:
results.append(await client.send(url, message, username=target.config.get("username")))
return _aggregate(results)
async def _send_ntfy_broadcast(target: NotificationTarget, message: str, receivers: list[dict]) -> dict:
"""Broadcast to ntfy topics."""
server_url = target.config.get("server_url", "https://ntfy.sh")
auth_token = target.config.get("auth_token")
if not receivers:
return {"success": False, "error": "No receivers configured"}
from notify_bridge_core.notifications.ntfy.client import NtfyClient
from .http_session import get_http_session
http = await get_http_session()
results: list[dict] = []
client = NtfyClient(http)
for recv in receivers:
topic = recv.get("topic")
if topic:
results.append(await client.send(
server_url, topic, message,
title="Notify Bridge",
priority=recv.get("priority", 3),
auth_token=auth_token,
))
return _aggregate(results)
async def _send_matrix_broadcast(target: NotificationTarget, message: str, receivers: list[dict]) -> dict:
"""Broadcast to Matrix rooms."""
from notify_bridge_core.notifications.matrix.client import MatrixClient
from ..database.models import MatrixBot
matrix_bot_id = target.config.get("matrix_bot_id")
if not matrix_bot_id:
return {"success": False, "error": "No Matrix bot configured for this target"}
engine = get_engine()
async with AsyncSession(engine) as session:
bot = await session.get(MatrixBot, matrix_bot_id)
if not bot:
return {"success": False, "error": "Matrix bot not found"}
homeserver = bot.homeserver_url
access_token = bot.access_token
if not receivers:
return {"success": False, "error": "No receivers configured"}
from .http_session import get_http_session
http = await get_http_session()
results: list[dict] = []
client = MatrixClient(http, homeserver, access_token)
for recv in receivers:
room_id = recv.get("room_id")
if room_id:
results.append(await client.send_message(room_id, message, html_message=message))
return _aggregate(results)
def _aggregate(results: list[dict]) -> dict:
"""Aggregate broadcast results."""
successes = sum(1 for r in results if r.get("success"))
if successes == len(results) and results:
return {"success": True, "receivers": len(results)}
elif successes > 0:
return {"success": True, "receivers": len(results), "partial_failures": len(results) - successes}
elif results:
return results[0]
return {"success": False, "error": "No valid receivers"}
# --- Public API used by routes ---
async def send_to_receiver(target: NotificationTarget, receiver_config: dict, message: str) -> dict:
"""Send a message to a single receiver of a target."""
try:
send_fn = {
"telegram": _send_telegram_broadcast,
"webhook": _send_webhook_broadcast,
"email": _send_email_broadcast,
"discord": _send_webhook_like_broadcast,
"slack": _send_webhook_like_broadcast,
"ntfy": _send_ntfy_broadcast,
"matrix": _send_matrix_broadcast,
}.get(target.type)
if send_fn:
return await send_fn(target, message, [receiver_config])
return {"success": False, "error": f"Unknown target type: {target.type}"}
except Exception as e:
_LOGGER.error("Send to receiver failed: %s", e)
return {"success": False, "error": str(e)}
async def send_test_notification(target: NotificationTarget, locale: str = "en") -> dict:
"""Send a simple test message. For broadcast targets, fans out to all children.
For Telegram targets, per-receiver locale (TargetReceiver.locale or
TelegramChat.language_override/language_code) is resolved individually so
each chat receives the message in its own configured language.
"""
if target.type == "broadcast":
return await _send_broadcast_test(target, locale)
if target.type == "telegram":
return await _send_telegram_test_per_receiver(target, default_locale=locale)
message = _get_test_message(locale, target.type)
return await send_to_target(target, message)
async def _send_telegram_test_per_receiver(
target: NotificationTarget, default_locale: str = "en",
) -> dict:
"""Send a test message to each Telegram receiver in its own resolved locale."""
from notify_bridge_core.notifications.telegram.client import TelegramClient
from ..database.models import TargetReceiver, TelegramChat
from .http_session import get_http_session
bot_token = target.config.get("bot_token")
bot_id = target.config.get("bot_id")
disable_preview = target.config.get("disable_url_preview", False)
if not bot_token:
return {"success": False, "error": "Missing bot_token"}
engine = get_engine()
async with AsyncSession(engine) as session:
recv_rows = (await session.exec(
select(TargetReceiver).where(
TargetReceiver.target_id == target.id,
TargetReceiver.enabled == True,
)
)).all()
if not recv_rows:
return {"success": False, "error": "No receivers configured"}
# Batch-load TelegramChat rows so per-receiver locale picks don't
# round-trip the DB N times. Priority resolution then runs through the
# shared pick_telegram_locale() helper so single-shot test endpoints
# and this fan-out agree on the same rules.
chat_ids = [str(r.config.get("chat_id", "")) for r in recv_rows if r.config.get("chat_id")]
chat_row_map: dict[str, TelegramChat] = {}
if bot_id and chat_ids:
chat_rows = (await session.exec(
select(TelegramChat).where(
TelegramChat.bot_id == bot_id,
TelegramChat.chat_id.in_(chat_ids),
)
)).all()
chat_row_map = {chat.chat_id: chat for chat in chat_rows}
http = await get_http_session()
client = TelegramClient(http, bot_token)
# Parallelize per-receiver sends with a small semaphore — broadcast to
# N chats now takes ~ceil(N / concurrency) × RTT instead of N × RTT,
# matching the dispatcher's bounded-concurrency pattern. Capped below
# Telegram's rate limit so we don't trigger 429s on large fleets.
sem = asyncio.Semaphore(_TEST_SEND_CONCURRENCY)
async def _send_one(r: TargetReceiver) -> dict | None:
chat_id = str(r.config.get("chat_id", ""))
if not chat_id:
return None
chat_row = chat_row_map.get(chat_id)
locale = pick_telegram_locale(
receiver_locale=getattr(r, "locale", "") or "",
chat_override=getattr(chat_row, "language_override", "") if chat_row else "",
chat_language_code=getattr(chat_row, "language_code", "") if chat_row else "",
fallback=default_locale,
)
message = _get_test_message(locale, "telegram")
async with sem:
return await client.send_message(
chat_id=chat_id,
text=message,
disable_web_page_preview=bool(disable_preview),
)
# ``return_exceptions=True`` so a single send raising (e.g. transient
# network error to one chat) doesn't abort the entire fan-out and lose
# the successful sibling sends from the aggregate count.
raw = await asyncio.gather(
*(_send_one(r) for r in recv_rows), return_exceptions=True,
)
results: list[dict] = []
for r in raw:
if isinstance(r, BaseException):
_LOGGER.warning("Test send to receiver raised: %s", r)
continue
if r is None:
continue
results.append(r)
return _aggregate(results)
async def _send_broadcast_test(target: NotificationTarget, locale: str) -> dict:
"""Send test notifications to all child targets of a broadcast target."""
child_ids = target.config.get("child_target_ids", [])
if not child_ids:
return {"success": False, "error": "No child targets configured"}
disabled_ids = set(target.config.get("disabled_child_ids", []))
engine = get_engine()
results: list[dict] = []
async with AsyncSession(engine) as session:
for child_id in child_ids:
if child_id in disabled_ids:
continue
child = await session.get(NotificationTarget, child_id)
if not child or child.type == "broadcast":
continue
message = _get_test_message(locale, child.type)
results.append(await send_to_target(child, message))
return _aggregate(results)
async def send_test_template_notification(
target: NotificationTarget, slot: str, template_str: str
) -> dict:
"""Render a template slot with sample data and send."""
from jinja2.sandbox import SandboxedEnvironment
from .sample_context import _SAMPLE_CONTEXT
if not template_str:
return await send_test_notification(target)
try:
env = SandboxedEnvironment(autoescape=False)
tmpl = env.from_string(template_str)
message = tmpl.render(**_SAMPLE_CONTEXT)
except Exception as e:
return {"success": False, "error": f"Template render error: {e}"}
return await send_to_target(target, message)