Files
notify-bridge/packages/server/tests/test_dispatcher_aggregation.py
alexei.dolgolyov 0eb899afb9 feat: harden notification stack and switch logging selectors to icon grid
Notifications:
- Add shared http_base, redact, and SSRF hardening modules
- Refactor dispatcher, queue, receiver and per-provider clients
  (telegram, discord, email, matrix, ntfy, slack, webhook) to use
  the shared base, with bounded queue and redacted error logs
- Tests for ssrf, redact, http_base, queue bounds, dispatcher
  aggregation, telegram media partition, email and matrix clients

Frontend:
- Settings: log level / log format selectors now use IconGridSelect
  with per-option icons and i18n descriptions
- Minor providers page and entity-cache store updates

Tooling:
- Document code-review-graph MCP usage in CLAUDE.md
- Ignore .code-review-graph/, register .mcp.json
2026-05-07 13:53:26 +03:00

47 lines
1.5 KiB
Python

"""Dispatcher result aggregation: per-receiver detail must survive."""
from __future__ import annotations
from notify_bridge_core.notifications.dispatcher import NotificationDispatcher
def test_aggregate_all_success() -> None:
out = NotificationDispatcher._aggregate_results([
{"success": True, "message_id": 1},
{"success": True, "message_id": 2},
])
assert out["success"] is True
assert out["receivers"] == 2
assert out["successes"] == 2
assert out["failures"] == 0
def test_aggregate_partial() -> None:
out = NotificationDispatcher._aggregate_results([
{"success": True},
{"success": False, "error": "boom"},
])
assert out["success"] is True # at least one succeeded
assert out["successes"] == 1
assert out["failures"] == 1
assert "boom" in out["errors"]
assert "results" in out
def test_aggregate_all_fail_preserves_all_errors() -> None:
out = NotificationDispatcher._aggregate_results([
{"success": False, "error": "first"},
{"success": False, "error": "second"},
])
assert out["success"] is False
assert out["error"] == "first" # back-compat top-level field
assert out["errors"] == ["first", "second"]
# Per-receiver details survive — operator can see exactly what failed.
assert len(out["results"]) == 2
def test_aggregate_empty() -> None:
out = NotificationDispatcher._aggregate_results([])
assert out["success"] is False
assert "error" in out