Files
notify-bridge/packages/server/tests/test_telegram_media_group_partial.py
alexei.dolgolyov 6a8f374678 feat: observability, per-receiver Telegram options, oversized-video fallback
Operability:
- Correlation IDs end-to-end: shared dispatch_id between log lines and
  EventLog rows (event/watcher/scheduled/deferred/action/HA/command paths)
  and a new X-Request-Id middleware that normalizes inbound ids and binds
  request_id into log context.
- dispatch_summary block merged into EventLog.details: per-target
  success/failure counts plus Telegram media delivered/skipped/failed and
  truncated error lists, so partial outcomes surface in the UI.
- Diagnostic mode: admin can flip one module to DEBUG for a bounded
  window with auto-revert (in-memory only; setup_logging() resets on
  boot, lifespan reverts on shutdown). New /diagnostic-mode endpoints
  plus DiagnosticsCassette UI on the settings page.

Telegram:
- Per-receiver options: disable_notification (silent send) and
  message_thread_id (forum-topic routing), wired through the dispatcher
  via a ContextVar so all four send sites (sendMessage / sendPhoto-Video-
  Document / sendMediaGroup / cache-hit POST) pick them up.
- send_large_videos_as_documents target setting: bypass the 50 MB
  sendVideo cap by falling back to sendDocument for oversized videos.
- sendMediaGroup byte-budget enforcement (TELEGRAM_MAX_GROUP_TOTAL_BYTES,
  45 MB) with per-item fallback on chunk failure so a stale file_id no
  longer silently drops a cached asset.

Tests:
- New: diagnostic_mode, dispatch_summary, request_correlation,
  telegram_media_group_partial, telegram_per_send_options.

Docs:
- .claude/reviews/: six-axis production-readiness review of v0.8.1.
- .claude/docs/functional-review-2026-05-28.md: focused review of
  Telegram/Immich/logging subsystems.
2026-05-28 15:19:31 +03:00

512 lines
19 KiB
Python

"""Tests for partial-delivery resilience in TelegramClient._send_media_group.
Covers the three independent failure modes that previously aborted the
whole send:
1. **Per-item oversize** — one item over ``max_asset_data_size`` is
silently dropped; siblings still deliver. ``skipped_count`` reflects
the drop.
2. **Combined chunk over Telegram's byte envelope** — pre-flight splits
into byte-budgeted sub-chunks, avoiding the 413 entirely.
3. **Telegram-side chunk rejection after pre-flight** — fall back to
sending each item individually so partial delivery still happens.
"""
from __future__ import annotations
from typing import Any
from unittest.mock import patch
import aiohttp
import pytest
from aioresponses import aioresponses
from notify_bridge_core.notifications.telegram.client import (
TelegramClient,
_MediaItem,
)
from notify_bridge_core.notifications.telegram.media import (
TELEGRAM_MAX_GROUP_TOTAL_BYTES,
)
BOT_TOKEN = "TEST_TOKEN"
TG = f"https://api.telegram.org/bot{BOT_TOKEN}"
CHAT_ID = "-1001234567890"
# ---------------------------------------------------------------------------
# Pure unit tests for the new helpers
# ---------------------------------------------------------------------------
def _item(upload_bytes: int, media_type: str = "photo") -> _MediaItem:
"""Build a synthetic _MediaItem with the given upload byte cost."""
if upload_bytes == 0:
return _MediaItem(
media_json={"type": media_type, "media": "file_id_cached"},
cache_info=None,
attachment=None,
)
return _MediaItem(
media_json={"type": media_type, "media": "attach://x"},
cache_info=("ck", media_type, None, upload_bytes),
attachment=("x", b"\x00" * upload_bytes, "f.jpg", "image/jpeg"),
)
def test_split_empty_returns_empty() -> None:
assert TelegramClient._split_items_by_byte_budget([], 1000) == []
def test_split_fits_in_single_group() -> None:
items = [_item(10), _item(20), _item(30)]
groups = TelegramClient._split_items_by_byte_budget(items, 100)
assert len(groups) == 1
assert sum(it.upload_bytes for it in groups[0]) == 60
def test_split_packs_greedily_across_budget() -> None:
# Three items @ 40 each, budget 100 → groups of [40,40] and [40].
items = [_item(40), _item(40), _item(40)]
groups = TelegramClient._split_items_by_byte_budget(items, 100)
assert [len(g) for g in groups] == [2, 1]
assert sum(it.upload_bytes for it in groups[0]) == 80
assert sum(it.upload_bytes for it in groups[1]) == 40
def test_split_oversized_single_item_kept_alone() -> None:
# An item that exceeds the budget on its own goes alone — Telegram
# gets to return a precise per-item error instead of silently
# dropping it client-side.
items = [_item(200)]
groups = TelegramClient._split_items_by_byte_budget(items, 100)
assert len(groups) == 1
assert groups[0][0].upload_bytes == 200
def test_split_cached_items_are_free() -> None:
# Cached items contribute 0 bytes — they never force a split.
items = [_item(0), _item(0), _item(0)]
groups = TelegramClient._split_items_by_byte_budget(items, 10)
assert len(groups) == 1
assert len(groups[0]) == 3
def test_split_mixes_cached_and_fresh_correctly() -> None:
# Cached items piggyback freely into whatever group they land in.
items = [_item(40), _item(0), _item(40), _item(0), _item(40)]
groups = TelegramClient._split_items_by_byte_budget(items, 100)
# [40, 0, 40] = 80 bytes (fits), next 0 fits, next 40 starts new.
assert [len(g) for g in groups] == [4, 1]
def test_attach_caption_to_first_idempotent() -> None:
items = [_item(10), _item(10)]
TelegramClient._attach_caption_to_first(items, "Hello", "HTML")
assert items[0].media_json["caption"] == "Hello"
assert items[0].media_json["parse_mode"] == "HTML"
assert "caption" not in items[1].media_json
# Re-attaching overwrites in-place, doesn't duplicate.
TelegramClient._attach_caption_to_first(items, "Bye", "MarkdownV2")
assert items[0].media_json["caption"] == "Bye"
assert items[0].media_json["parse_mode"] == "MarkdownV2"
def test_attach_caption_truncates_to_telegram_limit() -> None:
from notify_bridge_core.notifications.telegram.media import (
TELEGRAM_MAX_CAPTION_LENGTH,
)
items = [_item(10)]
long_caption = "A" * (TELEGRAM_MAX_CAPTION_LENGTH + 500)
TelegramClient._attach_caption_to_first(items, long_caption, "HTML")
assert len(items[0].media_json["caption"]) <= TELEGRAM_MAX_CAPTION_LENGTH
def test_attach_caption_no_items_is_noop() -> None:
TelegramClient._attach_caption_to_first([], "x", "HTML") # must not raise
# ---------------------------------------------------------------------------
# Integration tests for the full _send_media_group flow
# ---------------------------------------------------------------------------
def _png_bytes(size: int) -> bytes:
"""Minimal valid PNG header + pad bytes to reach the requested size.
Required so ``check_photo_limits`` can identify the bytes as an
image rather than rejecting them. The PIL inspection only reads the
header so padding with zeros is harmless.
"""
# 8-byte PNG signature + IHDR chunk for a 1x1 image (zero-padded
# to size). Pillow accepts this enough to read dimensions; the
# remaining bytes after IHDR are treated as trailing garbage.
sig = b"\x89PNG\r\n\x1a\n"
ihdr = bytes.fromhex(
# length=13, type=IHDR, w=1, h=1, depth=8, color=2 (RGB),
# compression=0, filter=0, interlace=0, crc=ignored
"0000000d49484452000000010000000108020000009077"
"53de"
)
base = sig + ihdr
if len(base) >= size:
return base[:size]
return base + b"\x00" * (size - len(base))
async def _build_client(session: aiohttp.ClientSession) -> TelegramClient:
return TelegramClient(session, BOT_TOKEN)
@pytest.mark.asyncio
async def test_oversized_item_skipped_others_delivered() -> None:
"""One item over max_asset_data_size is dropped; siblings still go."""
mock_url_big = "http://assets.test/big.jpg"
mock_url_a = "http://assets.test/a.jpg"
mock_url_b = "http://assets.test/b.jpg"
max_size = 1_000_000 # 1 MB cap
# We pre-load bytes via the asset dict so we don't have to mock the
# asset HTTP server. Telegram side is mocked so sendMediaGroup
# returns a clean 200 with two message IDs.
assets = [
{"type": "photo", "url": mock_url_big, "data": _png_bytes(2_000_000)},
{"type": "photo", "url": mock_url_a, "data": _png_bytes(50_000)},
{"type": "photo", "url": mock_url_b, "data": _png_bytes(50_000)},
]
with aioresponses() as mocked:
mocked.post(
f"{TG}/sendMediaGroup",
payload={
"ok": True,
"result": [
{"message_id": 100, "photo": [{"file_id": "fa"}]},
{"message_id": 101, "photo": [{"file_id": "fb"}]},
],
},
)
async with aiohttp.ClientSession() as sess:
client = await _build_client(sess)
result = await client._send_media_group(
CHAT_ID, assets, max_asset_data_size=max_size,
)
assert result["success"] is True
assert result["delivered_count"] == 2
assert result["skipped_count"] == 1
assert result["failed_count"] == 0
assert result["message_ids"] == [100, 101]
@pytest.mark.asyncio
async def test_byte_budget_splits_into_sub_chunks() -> None:
"""Three items that combined exceed the byte budget pre-split into 2 calls."""
# Sized so 2 fit (sum < budget) but 3 don't (sum > budget) →
# [2 items, 1 item] split.
per_item = TELEGRAM_MAX_GROUP_TOTAL_BYTES // 3 + 1
# Use generated PNGs so check_photo_limits doesn't reject them as
# malformed; the size doesn't matter for the photo dimension check
# since the PNG header advertises 1x1.
assets = [
{"type": "photo", "url": f"http://t/{i}.jpg", "data": _png_bytes(per_item)}
for i in range(3)
]
calls: list[int] = []
def _ok_response_for_n(n: int) -> dict[str, Any]:
return {
"ok": True,
"result": [
{"message_id": 200 + i, "photo": [{"file_id": f"x{i}"}]}
for i in range(n)
],
}
with aioresponses() as mocked:
# We don't know item count per call up front, so respond with
# 10-item payloads (Telegram ignores trailing IDs we don't use).
mocked.post(
f"{TG}/sendMediaGroup",
payload=_ok_response_for_n(10),
repeat=True,
)
async with aiohttp.ClientSession() as sess:
client = await _build_client(sess)
# Disable photo limits — large PNG bodies trip dimension
# checks since we pad past the IHDR.
with patch(
"notify_bridge_core.notifications.telegram.client.check_photo_limits",
return_value=(False, None, None, None),
):
result = await client._send_media_group(CHAT_ID, assets)
# Count outbound sendMediaGroup calls via the mock registry.
req_log = mocked.requests
send_calls = [
k for k in req_log if k[1].path.endswith("/sendMediaGroup")
]
assert len(send_calls) >= 1
# At least one call → multiple requests recorded.
for k in send_calls:
calls.append(len(req_log[k]))
assert result["success"] is True
# Pre-split avoided 413 entirely.
assert result["failed_count"] == 0
# The 3 items went out across 2 sub-chunks (2+1).
assert sum(calls) == 2
@pytest.mark.asyncio
async def test_chunk_413_falls_back_to_per_item() -> None:
"""If Telegram 413s a chunk anyway, retry each item individually."""
assets = [
{"type": "photo", "url": f"http://t/{i}.jpg", "data": _png_bytes(50_000)}
for i in range(2)
]
with aioresponses() as mocked:
# The group send fails hard (Telegram-side rejection).
mocked.post(
f"{TG}/sendMediaGroup",
status=413,
payload={"ok": False, "error_code": 413, "description": "Request Entity Too Large"},
)
# Per-item fallback: two sendPhoto calls succeed.
mocked.post(
f"{TG}/sendPhoto",
payload={"ok": True, "result": {"message_id": 300, "photo": [{"file_id": "z0"}]}},
)
mocked.post(
f"{TG}/sendPhoto",
payload={"ok": True, "result": {"message_id": 301, "photo": [{"file_id": "z1"}]}},
)
async with aiohttp.ClientSession() as sess:
client = await _build_client(sess)
with patch(
"notify_bridge_core.notifications.telegram.client.check_photo_limits",
return_value=(False, None, None, None),
):
result = await client._send_media_group(CHAT_ID, assets)
assert result["success"] is True
assert result["delivered_count"] == 2
assert result["failed_count"] == 0
# We still record the original chunk-level error for diagnostics,
# tagged with kind="chunk" so operators can distinguish cause from
# per-item consequences.
assert result["errors"] is not None
chunk_errors = [e for e in result["errors"] if e.get("kind") == "chunk"]
assert len(chunk_errors) == 1
assert "Request Entity Too Large" in str(chunk_errors[0]["error"])
@pytest.mark.asyncio
async def test_chunk_failure_with_per_item_partial_failure() -> None:
"""Per-item fallback can itself partially fail; we report both."""
assets = [
{"type": "photo", "url": f"http://t/{i}.jpg", "data": _png_bytes(50_000)}
for i in range(2)
]
with aioresponses() as mocked:
mocked.post(
f"{TG}/sendMediaGroup",
status=400,
payload={"ok": False, "error_code": 400, "description": "Bad Request"},
)
# First per-item OK, second fails.
mocked.post(
f"{TG}/sendPhoto",
payload={"ok": True, "result": {"message_id": 400, "photo": [{"file_id": "p0"}]}},
)
mocked.post(
f"{TG}/sendPhoto",
status=400,
payload={"ok": False, "error_code": 400, "description": "PHOTO_INVALID_DIMENSIONS"},
)
async with aiohttp.ClientSession() as sess:
client = await _build_client(sess)
with patch(
"notify_bridge_core.notifications.telegram.client.check_photo_limits",
return_value=(False, None, None, None),
):
result = await client._send_media_group(CHAT_ID, assets)
# At least one item delivered → overall success.
assert result["success"] is True
assert result["delivered_count"] == 1
assert result["failed_count"] == 1
assert result["message_ids"] == [400]
# The failed item carries its index so operators can correlate
# with the original asset list.
item_errors = [e for e in result["errors"] if e.get("kind") == "item"]
assert len(item_errors) == 1
assert item_errors[0]["item_index"] == 1
@pytest.mark.asyncio
async def test_document_chunk_failure_falls_back_to_sendDocument() -> None:
"""Document items must hit /sendDocument in fallback, not /sendVideo.
Regression guard: an earlier draft routed any non-photo through
_VIDEO_KIND, silently misrouting documents to the video endpoint
where Telegram would reject them with a confusing error.
"""
assets = [
{"type": "document", "url": f"http://t/f{i}.bin", "data": b"\x00" * 50_000}
for i in range(2)
]
with aioresponses() as mocked:
mocked.post(
f"{TG}/sendMediaGroup",
status=400,
payload={"ok": False, "error_code": 400, "description": "Bad Request"},
)
mocked.post(
f"{TG}/sendDocument",
payload={"ok": True, "result": {"message_id": 500, "document": {"file_id": "d0"}}},
)
mocked.post(
f"{TG}/sendDocument",
payload={"ok": True, "result": {"message_id": 501, "document": {"file_id": "d1"}}},
)
async with aiohttp.ClientSession() as sess:
client = await _build_client(sess)
result = await client._send_media_group(CHAT_ID, assets)
# No /sendVideo or /sendPhoto calls should have been made.
for key in mocked.requests:
assert "/sendVideo" not in key[1].path
assert "/sendPhoto" not in key[1].path
assert result["success"] is True
assert result["delivered_count"] == 2
assert result["message_ids"] == [500, 501]
@pytest.mark.asyncio
async def test_oversized_video_deferred_as_document_when_opted_in() -> None:
"""Oversized videos are sent as documents post-chunk when the flag is set.
Telegram caps sendVideo at 50 MB but accepts up to 2 GB via
sendDocument. With ``send_large_videos_as_documents=True``, an
oversized video should be deferred out of the media group, then
delivered as its own document send instead of being silently
dropped. Other items in the same group must ride through the
normal sendMediaGroup path unaffected.
"""
# 60 MB exceeds the 50 MB sendVideo cap but is under document's 2 GB cap.
oversized_video = b"\x00" * (60 * 1024 * 1024)
assets = [
{"type": "video", "url": "http://t/big.mp4", "data": oversized_video,
"content_type": "video/mp4"},
{"type": "photo", "url": "http://t/a.jpg", "data": _png_bytes(50_000)},
{"type": "photo", "url": "http://t/b.jpg", "data": _png_bytes(50_000)},
]
with aioresponses() as mocked:
# The 2 photos ride out in sendMediaGroup together.
mocked.post(
f"{TG}/sendMediaGroup",
payload={
"ok": True,
"result": [
{"message_id": 700, "photo": [{"file_id": "p0"}]},
{"message_id": 701, "photo": [{"file_id": "p1"}]},
],
},
)
# The deferred video lands as a document after the chunk.
mocked.post(
f"{TG}/sendDocument",
payload={"ok": True, "result": {"message_id": 702, "document": {"file_id": "d0"}}},
)
async with aiohttp.ClientSession() as sess:
client = await _build_client(sess)
with patch(
"notify_bridge_core.notifications.telegram.client.check_photo_limits",
return_value=(False, None, None, None),
):
result = await client._send_media_group(
CHAT_ID, assets,
send_large_videos_as_documents=True,
)
# sendVideo must NOT have been called — the oversized video
# bypasses sendVideo entirely and goes straight to sendDocument.
for key in mocked.requests:
assert "/sendVideo" not in key[1].path
assert result["success"] is True
assert result["delivered_count"] == 3
assert result["skipped_count"] == 0
assert result["failed_count"] == 0
assert sorted(result["message_ids"]) == [700, 701, 702]
@pytest.mark.asyncio
async def test_oversized_video_skipped_when_flag_off() -> None:
"""Without the opt-in flag, oversized videos are dropped (legacy behavior)."""
oversized_video = b"\x00" * (60 * 1024 * 1024)
assets = [
{"type": "video", "url": "http://t/big.mp4", "data": oversized_video,
"content_type": "video/mp4"},
{"type": "photo", "url": "http://t/a.jpg", "data": _png_bytes(50_000)},
]
with aioresponses() as mocked:
mocked.post(
f"{TG}/sendMediaGroup",
payload={
"ok": True,
"result": [{"message_id": 800, "photo": [{"file_id": "p0"}]}],
},
)
async with aiohttp.ClientSession() as sess:
client = await _build_client(sess)
with patch(
"notify_bridge_core.notifications.telegram.client.check_photo_limits",
return_value=(False, None, None, None),
):
result = await client._send_media_group(CHAT_ID, assets)
# No sendDocument call either — video is simply dropped.
for key in mocked.requests:
assert "/sendDocument" not in key[1].path
assert result["success"] is True
assert result["delivered_count"] == 1
assert result["skipped_count"] == 1
@pytest.mark.asyncio
async def test_all_items_oversized_returns_failure() -> None:
"""When every asset is filtered before send, success is False."""
assets = [
{"type": "photo", "url": "http://t/big.jpg", "data": _png_bytes(5_000_000)}
for _ in range(2)
]
async with aiohttp.ClientSession() as sess:
client = await _build_client(sess)
# No HTTP mock needed — nothing should reach Telegram.
result = await client._send_media_group(
CHAT_ID, assets, max_asset_data_size=1_000_000,
)
assert result["success"] is False
assert result["delivered_count"] == 0
assert result["skipped_count"] == 2
assert result["failed_count"] == 0
assert "filtered" in result["error"]