"""Tests for partial-delivery resilience in TelegramClient._send_media_group. Covers the three independent failure modes that previously aborted the whole send: 1. **Per-item oversize** — one item over ``max_asset_data_size`` is silently dropped; siblings still deliver. ``skipped_count`` reflects the drop. 2. **Combined chunk over Telegram's byte envelope** — pre-flight splits into byte-budgeted sub-chunks, avoiding the 413 entirely. 3. **Telegram-side chunk rejection after pre-flight** — fall back to sending each item individually so partial delivery still happens. """ from __future__ import annotations from typing import Any from unittest.mock import patch import aiohttp import pytest from aioresponses import aioresponses from notify_bridge_core.notifications.telegram.client import ( TelegramClient, _MediaItem, ) from notify_bridge_core.notifications.telegram.media import ( TELEGRAM_MAX_GROUP_TOTAL_BYTES, ) BOT_TOKEN = "TEST_TOKEN" TG = f"https://api.telegram.org/bot{BOT_TOKEN}" CHAT_ID = "-1001234567890" # --------------------------------------------------------------------------- # Pure unit tests for the new helpers # --------------------------------------------------------------------------- def _item(upload_bytes: int, media_type: str = "photo") -> _MediaItem: """Build a synthetic _MediaItem with the given upload byte cost.""" if upload_bytes == 0: return _MediaItem( media_json={"type": media_type, "media": "file_id_cached"}, cache_info=None, attachment=None, ) return _MediaItem( media_json={"type": media_type, "media": "attach://x"}, cache_info=("ck", media_type, None, upload_bytes), attachment=("x", b"\x00" * upload_bytes, "f.jpg", "image/jpeg"), ) def test_split_empty_returns_empty() -> None: assert TelegramClient._split_items_by_byte_budget([], 1000) == [] def test_split_fits_in_single_group() -> None: items = [_item(10), _item(20), _item(30)] groups = TelegramClient._split_items_by_byte_budget(items, 100) assert len(groups) == 1 assert sum(it.upload_bytes for it in groups[0]) == 60 def test_split_packs_greedily_across_budget() -> None: # Three items @ 40 each, budget 100 → groups of [40,40] and [40]. items = [_item(40), _item(40), _item(40)] groups = TelegramClient._split_items_by_byte_budget(items, 100) assert [len(g) for g in groups] == [2, 1] assert sum(it.upload_bytes for it in groups[0]) == 80 assert sum(it.upload_bytes for it in groups[1]) == 40 def test_split_oversized_single_item_kept_alone() -> None: # An item that exceeds the budget on its own goes alone — Telegram # gets to return a precise per-item error instead of silently # dropping it client-side. items = [_item(200)] groups = TelegramClient._split_items_by_byte_budget(items, 100) assert len(groups) == 1 assert groups[0][0].upload_bytes == 200 def test_split_cached_items_are_free() -> None: # Cached items contribute 0 bytes — they never force a split. items = [_item(0), _item(0), _item(0)] groups = TelegramClient._split_items_by_byte_budget(items, 10) assert len(groups) == 1 assert len(groups[0]) == 3 def test_split_mixes_cached_and_fresh_correctly() -> None: # Cached items piggyback freely into whatever group they land in. items = [_item(40), _item(0), _item(40), _item(0), _item(40)] groups = TelegramClient._split_items_by_byte_budget(items, 100) # [40, 0, 40] = 80 bytes (fits), next 0 fits, next 40 starts new. assert [len(g) for g in groups] == [4, 1] def test_attach_caption_to_first_idempotent() -> None: items = [_item(10), _item(10)] TelegramClient._attach_caption_to_first(items, "Hello", "HTML") assert items[0].media_json["caption"] == "Hello" assert items[0].media_json["parse_mode"] == "HTML" assert "caption" not in items[1].media_json # Re-attaching overwrites in-place, doesn't duplicate. TelegramClient._attach_caption_to_first(items, "Bye", "MarkdownV2") assert items[0].media_json["caption"] == "Bye" assert items[0].media_json["parse_mode"] == "MarkdownV2" def test_attach_caption_truncates_to_telegram_limit() -> None: from notify_bridge_core.notifications.telegram.media import ( TELEGRAM_MAX_CAPTION_LENGTH, ) items = [_item(10)] long_caption = "A" * (TELEGRAM_MAX_CAPTION_LENGTH + 500) TelegramClient._attach_caption_to_first(items, long_caption, "HTML") assert len(items[0].media_json["caption"]) <= TELEGRAM_MAX_CAPTION_LENGTH def test_attach_caption_no_items_is_noop() -> None: TelegramClient._attach_caption_to_first([], "x", "HTML") # must not raise # --------------------------------------------------------------------------- # Integration tests for the full _send_media_group flow # --------------------------------------------------------------------------- def _png_bytes(size: int) -> bytes: """Minimal valid PNG header + pad bytes to reach the requested size. Required so ``check_photo_limits`` can identify the bytes as an image rather than rejecting them. The PIL inspection only reads the header so padding with zeros is harmless. """ # 8-byte PNG signature + IHDR chunk for a 1x1 image (zero-padded # to size). Pillow accepts this enough to read dimensions; the # remaining bytes after IHDR are treated as trailing garbage. sig = b"\x89PNG\r\n\x1a\n" ihdr = bytes.fromhex( # length=13, type=IHDR, w=1, h=1, depth=8, color=2 (RGB), # compression=0, filter=0, interlace=0, crc=ignored "0000000d49484452000000010000000108020000009077" "53de" ) base = sig + ihdr if len(base) >= size: return base[:size] return base + b"\x00" * (size - len(base)) async def _build_client(session: aiohttp.ClientSession) -> TelegramClient: return TelegramClient(session, BOT_TOKEN) @pytest.mark.asyncio async def test_oversized_item_skipped_others_delivered() -> None: """One item over max_asset_data_size is dropped; siblings still go.""" mock_url_big = "http://assets.test/big.jpg" mock_url_a = "http://assets.test/a.jpg" mock_url_b = "http://assets.test/b.jpg" max_size = 1_000_000 # 1 MB cap # We pre-load bytes via the asset dict so we don't have to mock the # asset HTTP server. Telegram side is mocked so sendMediaGroup # returns a clean 200 with two message IDs. assets = [ {"type": "photo", "url": mock_url_big, "data": _png_bytes(2_000_000)}, {"type": "photo", "url": mock_url_a, "data": _png_bytes(50_000)}, {"type": "photo", "url": mock_url_b, "data": _png_bytes(50_000)}, ] with aioresponses() as mocked: mocked.post( f"{TG}/sendMediaGroup", payload={ "ok": True, "result": [ {"message_id": 100, "photo": [{"file_id": "fa"}]}, {"message_id": 101, "photo": [{"file_id": "fb"}]}, ], }, ) async with aiohttp.ClientSession() as sess: client = await _build_client(sess) result = await client._send_media_group( CHAT_ID, assets, max_asset_data_size=max_size, ) assert result["success"] is True assert result["delivered_count"] == 2 assert result["skipped_count"] == 1 assert result["failed_count"] == 0 assert result["message_ids"] == [100, 101] @pytest.mark.asyncio async def test_byte_budget_splits_into_sub_chunks() -> None: """Three items that combined exceed the byte budget pre-split into 2 calls.""" # Sized so 2 fit (sum < budget) but 3 don't (sum > budget) → # [2 items, 1 item] split. per_item = TELEGRAM_MAX_GROUP_TOTAL_BYTES // 3 + 1 # Use generated PNGs so check_photo_limits doesn't reject them as # malformed; the size doesn't matter for the photo dimension check # since the PNG header advertises 1x1. assets = [ {"type": "photo", "url": f"http://t/{i}.jpg", "data": _png_bytes(per_item)} for i in range(3) ] calls: list[int] = [] def _ok_response_for_n(n: int) -> dict[str, Any]: return { "ok": True, "result": [ {"message_id": 200 + i, "photo": [{"file_id": f"x{i}"}]} for i in range(n) ], } with aioresponses() as mocked: # We don't know item count per call up front, so respond with # 10-item payloads (Telegram ignores trailing IDs we don't use). mocked.post( f"{TG}/sendMediaGroup", payload=_ok_response_for_n(10), repeat=True, ) async with aiohttp.ClientSession() as sess: client = await _build_client(sess) # Disable photo limits — large PNG bodies trip dimension # checks since we pad past the IHDR. with patch( "notify_bridge_core.notifications.telegram.client.check_photo_limits", return_value=(False, None, None, None), ): result = await client._send_media_group(CHAT_ID, assets) # Count outbound sendMediaGroup calls via the mock registry. req_log = mocked.requests send_calls = [ k for k in req_log if k[1].path.endswith("/sendMediaGroup") ] assert len(send_calls) >= 1 # At least one call → multiple requests recorded. for k in send_calls: calls.append(len(req_log[k])) assert result["success"] is True # Pre-split avoided 413 entirely. assert result["failed_count"] == 0 # The 3 items went out across 2 sub-chunks (2+1). assert sum(calls) == 2 @pytest.mark.asyncio async def test_chunk_413_falls_back_to_per_item() -> None: """If Telegram 413s a chunk anyway, retry each item individually.""" assets = [ {"type": "photo", "url": f"http://t/{i}.jpg", "data": _png_bytes(50_000)} for i in range(2) ] with aioresponses() as mocked: # The group send fails hard (Telegram-side rejection). mocked.post( f"{TG}/sendMediaGroup", status=413, payload={"ok": False, "error_code": 413, "description": "Request Entity Too Large"}, ) # Per-item fallback: two sendPhoto calls succeed. mocked.post( f"{TG}/sendPhoto", payload={"ok": True, "result": {"message_id": 300, "photo": [{"file_id": "z0"}]}}, ) mocked.post( f"{TG}/sendPhoto", payload={"ok": True, "result": {"message_id": 301, "photo": [{"file_id": "z1"}]}}, ) async with aiohttp.ClientSession() as sess: client = await _build_client(sess) with patch( "notify_bridge_core.notifications.telegram.client.check_photo_limits", return_value=(False, None, None, None), ): result = await client._send_media_group(CHAT_ID, assets) assert result["success"] is True assert result["delivered_count"] == 2 assert result["failed_count"] == 0 # We still record the original chunk-level error for diagnostics, # tagged with kind="chunk" so operators can distinguish cause from # per-item consequences. assert result["errors"] is not None chunk_errors = [e for e in result["errors"] if e.get("kind") == "chunk"] assert len(chunk_errors) == 1 assert "Request Entity Too Large" in str(chunk_errors[0]["error"]) @pytest.mark.asyncio async def test_chunk_failure_with_per_item_partial_failure() -> None: """Per-item fallback can itself partially fail; we report both.""" assets = [ {"type": "photo", "url": f"http://t/{i}.jpg", "data": _png_bytes(50_000)} for i in range(2) ] with aioresponses() as mocked: mocked.post( f"{TG}/sendMediaGroup", status=400, payload={"ok": False, "error_code": 400, "description": "Bad Request"}, ) # First per-item OK, second fails. mocked.post( f"{TG}/sendPhoto", payload={"ok": True, "result": {"message_id": 400, "photo": [{"file_id": "p0"}]}}, ) mocked.post( f"{TG}/sendPhoto", status=400, payload={"ok": False, "error_code": 400, "description": "PHOTO_INVALID_DIMENSIONS"}, ) async with aiohttp.ClientSession() as sess: client = await _build_client(sess) with patch( "notify_bridge_core.notifications.telegram.client.check_photo_limits", return_value=(False, None, None, None), ): result = await client._send_media_group(CHAT_ID, assets) # At least one item delivered → overall success. assert result["success"] is True assert result["delivered_count"] == 1 assert result["failed_count"] == 1 assert result["message_ids"] == [400] # The failed item carries its index so operators can correlate # with the original asset list. item_errors = [e for e in result["errors"] if e.get("kind") == "item"] assert len(item_errors) == 1 assert item_errors[0]["item_index"] == 1 @pytest.mark.asyncio async def test_document_chunk_failure_falls_back_to_sendDocument() -> None: """Document items must hit /sendDocument in fallback, not /sendVideo. Regression guard: an earlier draft routed any non-photo through _VIDEO_KIND, silently misrouting documents to the video endpoint where Telegram would reject them with a confusing error. """ assets = [ {"type": "document", "url": f"http://t/f{i}.bin", "data": b"\x00" * 50_000} for i in range(2) ] with aioresponses() as mocked: mocked.post( f"{TG}/sendMediaGroup", status=400, payload={"ok": False, "error_code": 400, "description": "Bad Request"}, ) mocked.post( f"{TG}/sendDocument", payload={"ok": True, "result": {"message_id": 500, "document": {"file_id": "d0"}}}, ) mocked.post( f"{TG}/sendDocument", payload={"ok": True, "result": {"message_id": 501, "document": {"file_id": "d1"}}}, ) async with aiohttp.ClientSession() as sess: client = await _build_client(sess) result = await client._send_media_group(CHAT_ID, assets) # No /sendVideo or /sendPhoto calls should have been made. for key in mocked.requests: assert "/sendVideo" not in key[1].path assert "/sendPhoto" not in key[1].path assert result["success"] is True assert result["delivered_count"] == 2 assert result["message_ids"] == [500, 501] @pytest.mark.asyncio async def test_oversized_video_deferred_as_document_when_opted_in() -> None: """Oversized videos are sent as documents post-chunk when the flag is set. Telegram caps sendVideo at 50 MB but accepts up to 2 GB via sendDocument. With ``send_large_videos_as_documents=True``, an oversized video should be deferred out of the media group, then delivered as its own document send instead of being silently dropped. Other items in the same group must ride through the normal sendMediaGroup path unaffected. """ # 60 MB exceeds the 50 MB sendVideo cap but is under document's 2 GB cap. oversized_video = b"\x00" * (60 * 1024 * 1024) assets = [ {"type": "video", "url": "http://t/big.mp4", "data": oversized_video, "content_type": "video/mp4"}, {"type": "photo", "url": "http://t/a.jpg", "data": _png_bytes(50_000)}, {"type": "photo", "url": "http://t/b.jpg", "data": _png_bytes(50_000)}, ] with aioresponses() as mocked: # The 2 photos ride out in sendMediaGroup together. mocked.post( f"{TG}/sendMediaGroup", payload={ "ok": True, "result": [ {"message_id": 700, "photo": [{"file_id": "p0"}]}, {"message_id": 701, "photo": [{"file_id": "p1"}]}, ], }, ) # The deferred video lands as a document after the chunk. mocked.post( f"{TG}/sendDocument", payload={"ok": True, "result": {"message_id": 702, "document": {"file_id": "d0"}}}, ) async with aiohttp.ClientSession() as sess: client = await _build_client(sess) with patch( "notify_bridge_core.notifications.telegram.client.check_photo_limits", return_value=(False, None, None, None), ): result = await client._send_media_group( CHAT_ID, assets, send_large_videos_as_documents=True, ) # sendVideo must NOT have been called — the oversized video # bypasses sendVideo entirely and goes straight to sendDocument. for key in mocked.requests: assert "/sendVideo" not in key[1].path assert result["success"] is True assert result["delivered_count"] == 3 assert result["skipped_count"] == 0 assert result["failed_count"] == 0 assert sorted(result["message_ids"]) == [700, 701, 702] @pytest.mark.asyncio async def test_oversized_video_skipped_when_flag_off() -> None: """Without the opt-in flag, oversized videos are dropped (legacy behavior).""" oversized_video = b"\x00" * (60 * 1024 * 1024) assets = [ {"type": "video", "url": "http://t/big.mp4", "data": oversized_video, "content_type": "video/mp4"}, {"type": "photo", "url": "http://t/a.jpg", "data": _png_bytes(50_000)}, ] with aioresponses() as mocked: mocked.post( f"{TG}/sendMediaGroup", payload={ "ok": True, "result": [{"message_id": 800, "photo": [{"file_id": "p0"}]}], }, ) async with aiohttp.ClientSession() as sess: client = await _build_client(sess) with patch( "notify_bridge_core.notifications.telegram.client.check_photo_limits", return_value=(False, None, None, None), ): result = await client._send_media_group(CHAT_ID, assets) # No sendDocument call either — video is simply dropped. for key in mocked.requests: assert "/sendDocument" not in key[1].path assert result["success"] is True assert result["delivered_count"] == 1 assert result["skipped_count"] == 1 @pytest.mark.asyncio async def test_all_items_oversized_returns_failure() -> None: """When every asset is filtered before send, success is False.""" assets = [ {"type": "photo", "url": "http://t/big.jpg", "data": _png_bytes(5_000_000)} for _ in range(2) ] async with aiohttp.ClientSession() as sess: client = await _build_client(sess) # No HTTP mock needed — nothing should reach Telegram. result = await client._send_media_group( CHAT_ID, assets, max_asset_data_size=1_000_000, ) assert result["success"] is False assert result["delivered_count"] == 0 assert result["skipped_count"] == 2 assert result["failed_count"] == 0 assert "filtered" in result["error"]