feat(notify-bridge): phase 5 - notification system

Extract and generalize notification dispatch:
- TelegramClient: full Bot API client with photo/video/document/media group support
- TelegramFileCache: TTL and thumbhash-based file_id caching
- WebhookClient: simple JSON POST client
- NotificationQueue: persistent deferred notification queue for quiet hours
- NotificationDispatcher: routes ServiceEvent to targets, renders templates
- StorageBackend protocol + JsonFileBackend for persistence
- TargetConfig dataclass for target configuration

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-19 23:28:10 +03:00
parent f36f070478
commit 16a41efec1
10 changed files with 972 additions and 0 deletions
@@ -1 +1,5 @@
"""Notification dispatch — Telegram, webhooks, queue."""
from notify_bridge_core.notifications.dispatcher import NotificationDispatcher, TargetConfig
__all__ = ["NotificationDispatcher", "TargetConfig"]
@@ -0,0 +1,124 @@
"""Notification dispatcher — routes events to targets via templates."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
import aiohttp
from notify_bridge_core.models.events import ServiceEvent
from notify_bridge_core.templates.context import build_template_context
from notify_bridge_core.templates.renderer import render_template
from .telegram.client import TelegramClient
from .webhook.client import WebhookClient
_LOGGER = logging.getLogger(__name__)
DEFAULT_TEMPLATE = (
'{{ added_count }} new item(s) added to "{{ collection_name }}".'
'{% if people %}\nPeople: {{ people | join(", ") }}{% endif %}'
)
@dataclass
class TargetConfig:
"""Configuration for a notification target."""
type: str # "telegram" or "webhook"
config: dict[str, Any] # type-specific config
template_slots: dict[str, str] | None = None # event_type -> template string
class NotificationDispatcher:
"""Dispatches ServiceEvent notifications to configured targets."""
async def dispatch(
self,
event: ServiceEvent,
targets: list[TargetConfig],
) -> list[dict[str, Any]]:
"""Send event notification to all targets.
Returns list of results (one per target).
"""
results = []
for target in targets:
try:
result = await self._send_to_target(event, target)
results.append(result)
except Exception as e:
_LOGGER.error("Failed to dispatch to target: %s", e)
results.append({"success": False, "error": str(e)})
return results
async def _send_to_target(
self, event: ServiceEvent, target: TargetConfig
) -> dict[str, Any]:
"""Send event to a single target."""
# Select template
template_str = DEFAULT_TEMPLATE
if target.template_slots:
slot = target.template_slots.get(event.event_type.value)
if slot:
template_str = slot
# Build context and render
ctx = build_template_context(event, target_type=target.type)
message = render_template(template_str, ctx)
if target.type == "telegram":
return await self._send_telegram(target, message, event)
elif target.type == "webhook":
return await self._send_webhook(target, message, event)
return {"success": False, "error": f"Unknown target type: {target.type}"}
async def _send_telegram(
self, target: TargetConfig, message: str, event: ServiceEvent
) -> dict[str, Any]:
bot_token = target.config.get("bot_token")
chat_id = target.config.get("chat_id")
if not bot_token or not chat_id:
return {"success": False, "error": "Missing bot_token or chat_id"}
async with aiohttp.ClientSession() as session:
client = TelegramClient(session, bot_token)
# Build asset list for media sending
assets = []
for asset in event.added_assets:
url = asset.full_url or asset.thumbnail_url
if url:
asset_type = "video" if asset.type.value == "video" else "photo"
assets.append({"url": url, "type": asset_type})
return await client.send_notification(
chat_id=str(chat_id),
caption=message,
assets=assets if assets else None,
)
async def _send_webhook(
self, target: TargetConfig, message: str, event: ServiceEvent
) -> dict[str, Any]:
url = target.config.get("url")
headers = target.config.get("headers", {})
if not url:
return {"success": False, "error": "Missing url in target config"}
payload = {
"message": message,
"event_type": event.event_type.value,
"provider_type": event.provider_type.value,
"collection_name": event.collection_name,
"collection_id": event.collection_id,
"timestamp": event.timestamp.isoformat(),
}
async with aiohttp.ClientSession() as session:
client = WebhookClient(session, url, headers)
return await client.send(payload)
@@ -0,0 +1,48 @@
"""Persistent notification queue for deferred notifications."""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import Any
from notify_bridge_core.storage import StorageBackend
_LOGGER = logging.getLogger(__name__)
class NotificationQueue:
"""Persistent queue for notifications deferred during quiet hours."""
def __init__(self, backend: StorageBackend) -> None:
self._backend = backend
self._data: dict[str, Any] | None = None
async def async_load(self) -> None:
self._data = await self._backend.load() or {"queue": []}
async def async_enqueue(self, notification_params: dict[str, Any]) -> None:
if self._data is None:
self._data = {"queue": []}
self._data["queue"].append({
"params": notification_params,
"queued_at": datetime.now(timezone.utc).isoformat(),
})
await self._backend.save(self._data)
def get_all(self) -> list[dict[str, Any]]:
if not self._data:
return []
return list(self._data.get("queue", []))
def has_pending(self) -> bool:
return bool(self._data and self._data.get("queue"))
async def async_clear(self) -> None:
if self._data:
self._data["queue"] = []
await self._backend.save(self._data)
async def async_remove(self) -> None:
await self._backend.remove()
self._data = None
@@ -1 +1,6 @@
"""Telegram notification client."""
from notify_bridge_core.notifications.telegram.client import TelegramClient
from notify_bridge_core.notifications.telegram.cache import TelegramFileCache
__all__ = ["TelegramClient", "TelegramFileCache"]
@@ -0,0 +1,129 @@
"""Telegram file_id cache with pluggable storage backend."""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import Any
from notify_bridge_core.storage import StorageBackend
_LOGGER = logging.getLogger(__name__)
DEFAULT_TELEGRAM_CACHE_TTL = 48 * 60 * 60
class TelegramFileCache:
"""Cache for Telegram file_ids to avoid re-uploading media.
Supports two validation modes:
- TTL mode (default): entries expire after a configured time-to-live
- Thumbhash mode: entries validated by comparing stored thumbhash with current
"""
THUMBHASH_MAX_ENTRIES = 2000
def __init__(
self,
backend: StorageBackend,
ttl_seconds: int = DEFAULT_TELEGRAM_CACHE_TTL,
use_thumbhash: bool = False,
) -> None:
self._backend = backend
self._data: dict[str, Any] | None = None
self._ttl_seconds = ttl_seconds
self._use_thumbhash = use_thumbhash
async def async_load(self) -> None:
self._data = await self._backend.load() or {"files": {}}
await self._cleanup_expired()
async def _cleanup_expired(self) -> None:
if self._use_thumbhash:
files = self._data.get("files", {}) if self._data else {}
if len(files) > self.THUMBHASH_MAX_ENTRIES:
sorted_keys = sorted(files, key=lambda k: files[k].get("cached_at", ""))
for key in sorted_keys[: len(files) - self.THUMBHASH_MAX_ENTRIES]:
del files[key]
await self._backend.save(self._data)
return
if not self._data or "files" not in self._data:
return
now = datetime.now(timezone.utc)
expired = [
url for url, entry in self._data["files"].items()
if entry.get("cached_at") and
(now - datetime.fromisoformat(entry["cached_at"])).total_seconds() > self._ttl_seconds
]
if expired:
for key in expired:
del self._data["files"][key]
await self._backend.save(self._data)
def get(self, key: str, thumbhash: str | None = None) -> dict[str, Any] | None:
if not self._data or "files" not in self._data:
return None
entry = self._data["files"].get(key)
if not entry:
return None
if self._use_thumbhash:
if thumbhash is not None:
stored = entry.get("thumbhash")
if stored and stored != thumbhash:
del self._data["files"][key]
return None
else:
cached_at_str = entry.get("cached_at")
if cached_at_str:
age = (datetime.now(timezone.utc) - datetime.fromisoformat(cached_at_str)).total_seconds()
if age > self._ttl_seconds:
return None
return {"file_id": entry.get("file_id"), "type": entry.get("type")}
async def async_set(
self, key: str, file_id: str, media_type: str, thumbhash: str | None = None
) -> None:
if self._data is None:
self._data = {"files": {}}
entry: dict[str, Any] = {
"file_id": file_id,
"type": media_type,
"cached_at": datetime.now(timezone.utc).isoformat(),
}
if thumbhash is not None:
entry["thumbhash"] = thumbhash
self._data["files"][key] = entry
await self._backend.save(self._data)
async def async_set_many(
self, entries: list[tuple[str, str, str, str | None]]
) -> None:
if not entries:
return
if self._data is None:
self._data = {"files": {}}
now_iso = datetime.now(timezone.utc).isoformat()
for key, file_id, media_type, thumbhash in entries:
entry: dict[str, Any] = {
"file_id": file_id,
"type": media_type,
"cached_at": now_iso,
}
if thumbhash is not None:
entry["thumbhash"] = thumbhash
self._data["files"][key] = entry
await self._backend.save(self._data)
async def async_remove(self) -> None:
await self._backend.remove()
self._data = None
@@ -0,0 +1,481 @@
"""Telegram Bot API client for sending notifications with media."""
from __future__ import annotations
import asyncio
import json
import logging
import mimetypes
from typing import Any, Callable
import aiohttp
from aiohttp import FormData
from .cache import TelegramFileCache
from .media import (
TELEGRAM_API_BASE_URL,
TELEGRAM_MAX_PHOTO_SIZE,
TELEGRAM_MAX_VIDEO_SIZE,
check_photo_limits,
extract_asset_id_from_url,
is_asset_id,
split_media_by_upload_size,
)
_LOGGER = logging.getLogger(__name__)
NotificationResult = dict[str, Any]
class TelegramClient:
"""Async Telegram Bot API client for sending notifications with media."""
def __init__(
self,
session: aiohttp.ClientSession,
bot_token: str,
*,
url_cache: TelegramFileCache | None = None,
asset_cache: TelegramFileCache | None = None,
url_resolver: Callable[[str], str] | None = None,
thumbhash_resolver: Callable[[str], str | None] | None = None,
) -> None:
self._session = session
self._token = bot_token
self._url_cache = url_cache
self._asset_cache = asset_cache
self._url_resolver = url_resolver
self._thumbhash_resolver = thumbhash_resolver
def _resolve_url(self, url: str) -> str:
if self._url_resolver:
return self._url_resolver(url)
return url
def _get_cache_and_key(
self, url: str | None, cache_key: str | None = None,
) -> tuple[TelegramFileCache | None, str | None, str | None]:
if cache_key:
return self._url_cache, cache_key, None
if url:
if is_asset_id(url):
thumbhash = self._thumbhash_resolver(url) if self._thumbhash_resolver else None
return self._asset_cache, url, thumbhash
asset_id = extract_asset_id_from_url(url)
if asset_id:
thumbhash = self._thumbhash_resolver(asset_id) if self._thumbhash_resolver else None
return self._asset_cache, asset_id, thumbhash
return self._url_cache, url, None
return None, None, None
def _get_cache_for_key(self, key: str, is_asset: bool | None = None) -> TelegramFileCache | None:
if is_asset is None:
is_asset = is_asset_id(key)
return self._asset_cache if is_asset else self._url_cache
async def send_notification(
self,
chat_id: str,
assets: list[dict[str, str]] | None = None,
caption: str | None = None,
reply_to_message_id: int | None = None,
disable_web_page_preview: bool | None = None,
parse_mode: str = "HTML",
max_group_size: int = 10,
chunk_delay: int = 0,
max_asset_data_size: int | None = None,
send_large_photos_as_documents: bool = False,
chat_action: str | None = "typing",
) -> NotificationResult:
if not assets:
return await self.send_message(
chat_id, caption or "", reply_to_message_id,
disable_web_page_preview, parse_mode,
)
typing_task = None
if chat_action:
typing_task = self._start_typing_indicator(chat_id, chat_action)
try:
if len(assets) == 1 and assets[0].get("type") == "photo":
return await self._send_photo(
chat_id, assets[0].get("url"), caption, reply_to_message_id,
parse_mode, max_asset_data_size, send_large_photos_as_documents,
assets[0].get("content_type"), assets[0].get("cache_key"),
)
if len(assets) == 1 and assets[0].get("type") == "video":
return await self._send_video(
chat_id, assets[0].get("url"), caption, reply_to_message_id,
parse_mode, max_asset_data_size,
assets[0].get("content_type"), assets[0].get("cache_key"),
)
if len(assets) == 1 and assets[0].get("type", "document") == "document":
url = assets[0].get("url")
if not url:
return {"success": False, "error": "Missing 'url' for document"}
try:
download_url = self._resolve_url(url)
async with self._session.get(download_url) as resp:
if resp.status != 200:
return {"success": False, "error": f"Failed to download media: HTTP {resp.status}"}
data = await resp.read()
if max_asset_data_size is not None and len(data) > max_asset_data_size:
return {"success": False, "error": f"Media size exceeds limit"}
filename = url.split("/")[-1].split("?")[0] or "file"
return await self._send_document(
chat_id, data, filename, caption, reply_to_message_id,
parse_mode, url, assets[0].get("content_type"),
assets[0].get("cache_key"),
)
except aiohttp.ClientError as err:
return {"success": False, "error": f"Failed to download media: {err}"}
return await self._send_media_group(
chat_id, assets, caption, reply_to_message_id, max_group_size,
chunk_delay, parse_mode, max_asset_data_size,
send_large_photos_as_documents,
)
finally:
if typing_task:
typing_task.cancel()
try:
await typing_task
except asyncio.CancelledError:
pass
async def send_message(
self,
chat_id: str,
text: str,
reply_to_message_id: int | None = None,
disable_web_page_preview: bool | None = None,
parse_mode: str = "HTML",
) -> NotificationResult:
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendMessage"
payload: dict[str, Any] = {
"chat_id": chat_id,
"text": text or "Notification",
"parse_mode": parse_mode,
}
if reply_to_message_id:
payload["reply_to_message_id"] = reply_to_message_id
if disable_web_page_preview is not None:
payload["disable_web_page_preview"] = disable_web_page_preview
try:
async with self._session.post(telegram_url, json=payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
return {"success": True, "message_id": result.get("result", {}).get("message_id")}
return {"success": False, "error": result.get("description", "Unknown Telegram error"), "error_code": result.get("error_code")}
except aiohttp.ClientError as err:
return {"success": False, "error": str(err)}
async def send_chat_action(self, chat_id: str, action: str = "typing") -> bool:
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendChatAction"
try:
async with self._session.post(telegram_url, json={"chat_id": chat_id, "action": action}) as response:
result = await response.json()
return response.status == 200 and result.get("ok", False)
except aiohttp.ClientError:
return False
def _start_typing_indicator(self, chat_id: str, action: str = "typing") -> asyncio.Task:
async def action_loop() -> None:
try:
while True:
await self.send_chat_action(chat_id, action)
await asyncio.sleep(4)
except asyncio.CancelledError:
pass
return asyncio.create_task(action_loop())
async def _send_photo(
self, chat_id: str, url: str | None, caption: str | None = None,
reply_to_message_id: int | None = None, parse_mode: str = "HTML",
max_asset_data_size: int | None = None, send_large_photos_as_documents: bool = False,
content_type: str | None = None, cache_key: str | None = None,
) -> NotificationResult:
if not content_type:
content_type = "image/jpeg"
if not url:
return {"success": False, "error": "Missing 'url' for photo"}
effective_cache, effective_cache_key, effective_thumbhash = self._get_cache_and_key(url, cache_key)
# Check cache
cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None
if cached and cached.get("file_id"):
payload = {"chat_id": chat_id, "photo": cached["file_id"], "parse_mode": parse_mode}
if caption:
payload["caption"] = caption
if reply_to_message_id:
payload["reply_to_message_id"] = reply_to_message_id
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendPhoto"
try:
async with self._session.post(telegram_url, json=payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
return {"success": True, "message_id": result.get("result", {}).get("message_id"), "cached": True}
except aiohttp.ClientError:
pass
try:
download_url = self._resolve_url(url)
async with self._session.get(download_url) as resp:
if resp.status != 200:
return {"success": False, "error": f"Failed to download photo: HTTP {resp.status}"}
data = await resp.read()
if max_asset_data_size is not None and len(data) > max_asset_data_size:
return {"success": False, "error": "Photo exceeds size limit", "skipped": True}
exceeds_limits, reason, _, _ = check_photo_limits(data)
if exceeds_limits:
if send_large_photos_as_documents:
return await self._send_document(chat_id, data, "photo.jpg", caption, reply_to_message_id, parse_mode, url, None, cache_key)
return {"success": False, "error": f"Photo {reason}", "skipped": True}
form = FormData()
form.add_field("chat_id", chat_id)
form.add_field("photo", data, filename="photo.jpg", content_type=content_type)
form.add_field("parse_mode", parse_mode)
if caption:
form.add_field("caption", caption)
if reply_to_message_id:
form.add_field("reply_to_message_id", str(reply_to_message_id))
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendPhoto"
async with self._session.post(telegram_url, data=form) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
photos = result.get("result", {}).get("photo", [])
if photos and effective_cache and effective_cache_key:
file_id = photos[-1].get("file_id")
if file_id:
await effective_cache.async_set(effective_cache_key, file_id, "photo", thumbhash=effective_thumbhash)
return {"success": True, "message_id": result.get("result", {}).get("message_id")}
return {"success": False, "error": result.get("description", "Unknown Telegram error")}
except aiohttp.ClientError as err:
return {"success": False, "error": str(err)}
async def _send_video(
self, chat_id: str, url: str | None, caption: str | None = None,
reply_to_message_id: int | None = None, parse_mode: str = "HTML",
max_asset_data_size: int | None = None, content_type: str | None = None,
cache_key: str | None = None,
) -> NotificationResult:
if not content_type:
content_type = "video/mp4"
if not url:
return {"success": False, "error": "Missing 'url' for video"}
effective_cache, effective_cache_key, effective_thumbhash = self._get_cache_and_key(url, cache_key)
cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None
if cached and cached.get("file_id"):
payload = {"chat_id": chat_id, "video": cached["file_id"], "parse_mode": parse_mode}
if caption:
payload["caption"] = caption
if reply_to_message_id:
payload["reply_to_message_id"] = reply_to_message_id
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendVideo"
try:
async with self._session.post(telegram_url, json=payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
return {"success": True, "message_id": result.get("result", {}).get("message_id"), "cached": True}
except aiohttp.ClientError:
pass
try:
download_url = self._resolve_url(url)
async with self._session.get(download_url) as resp:
if resp.status != 200:
return {"success": False, "error": f"Failed to download video: HTTP {resp.status}"}
data = await resp.read()
if max_asset_data_size is not None and len(data) > max_asset_data_size:
return {"success": False, "error": "Video exceeds size limit", "skipped": True}
if len(data) > TELEGRAM_MAX_VIDEO_SIZE:
return {"success": False, "error": f"Video exceeds Telegram's {TELEGRAM_MAX_VIDEO_SIZE // (1024*1024)} MB limit", "skipped": True}
form = FormData()
form.add_field("chat_id", chat_id)
form.add_field("video", data, filename="video.mp4", content_type=content_type)
form.add_field("parse_mode", parse_mode)
if caption:
form.add_field("caption", caption)
if reply_to_message_id:
form.add_field("reply_to_message_id", str(reply_to_message_id))
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendVideo"
async with self._session.post(telegram_url, data=form) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
video = result.get("result", {}).get("video", {})
if video and effective_cache and effective_cache_key:
file_id = video.get("file_id")
if file_id:
await effective_cache.async_set(effective_cache_key, file_id, "video", thumbhash=effective_thumbhash)
return {"success": True, "message_id": result.get("result", {}).get("message_id")}
return {"success": False, "error": result.get("description", "Unknown Telegram error")}
except aiohttp.ClientError as err:
return {"success": False, "error": str(err)}
async def _send_document(
self, chat_id: str, data: bytes, filename: str = "file",
caption: str | None = None, reply_to_message_id: int | None = None,
parse_mode: str = "HTML", source_url: str | None = None,
content_type: str | None = None, cache_key: str | None = None,
) -> NotificationResult:
if not content_type:
content_type, _ = mimetypes.guess_type(filename)
if not content_type:
content_type = "application/octet-stream"
effective_cache, effective_cache_key, effective_thumbhash = self._get_cache_and_key(source_url, cache_key)
if effective_cache and effective_cache_key:
cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash)
if cached and cached.get("file_id") and cached.get("type") == "document":
payload = {"chat_id": chat_id, "document": cached["file_id"], "parse_mode": parse_mode}
if caption:
payload["caption"] = caption
if reply_to_message_id:
payload["reply_to_message_id"] = reply_to_message_id
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendDocument"
try:
async with self._session.post(telegram_url, json=payload) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
return {"success": True, "message_id": result.get("result", {}).get("message_id"), "cached": True}
except aiohttp.ClientError:
pass
try:
form = FormData()
form.add_field("chat_id", chat_id)
form.add_field("document", data, filename=filename, content_type=content_type)
form.add_field("parse_mode", parse_mode)
if caption:
form.add_field("caption", caption)
if reply_to_message_id:
form.add_field("reply_to_message_id", str(reply_to_message_id))
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendDocument"
async with self._session.post(telegram_url, data=form) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
if effective_cache_key and effective_cache:
document = result.get("result", {}).get("document", {})
file_id = document.get("file_id")
if file_id:
await effective_cache.async_set(effective_cache_key, file_id, "document", thumbhash=effective_thumbhash)
return {"success": True, "message_id": result.get("result", {}).get("message_id")}
return {"success": False, "error": result.get("description", "Unknown Telegram error")}
except aiohttp.ClientError as err:
return {"success": False, "error": str(err)}
async def _send_media_group(
self, chat_id: str, assets: list[dict[str, str]],
caption: str | None = None, reply_to_message_id: int | None = None,
max_group_size: int = 10, chunk_delay: int = 0, parse_mode: str = "HTML",
max_asset_data_size: int | None = None, send_large_photos_as_documents: bool = False,
) -> NotificationResult:
chunks = [assets[i:i + max_group_size] for i in range(0, len(assets), max_group_size)]
all_message_ids: list = []
for chunk_idx, chunk in enumerate(chunks):
if chunk_idx > 0 and chunk_delay > 0:
await asyncio.sleep(chunk_delay / 1000)
if len(chunk) == 1:
item = chunk[0]
chunk_caption = caption if chunk_idx == 0 else None
chunk_reply = reply_to_message_id if chunk_idx == 0 else None
if item.get("type") == "photo":
result = await self._send_photo(chat_id, item.get("url"), chunk_caption, chunk_reply, parse_mode, max_asset_data_size, send_large_photos_as_documents, item.get("content_type"), item.get("cache_key"))
elif item.get("type") == "video":
result = await self._send_video(chat_id, item.get("url"), chunk_caption, chunk_reply, parse_mode, max_asset_data_size, item.get("content_type"), item.get("cache_key"))
else:
continue
if not result.get("success"):
result["failed_at_chunk"] = chunk_idx + 1
return result
all_message_ids.append(result.get("message_id"))
continue
# Multi-item: download all, build form, send media group
form = FormData()
form.add_field("chat_id", chat_id)
if reply_to_message_id and chunk_idx == 0:
form.add_field("reply_to_message_id", str(reply_to_message_id))
media_json = []
upload_idx = 0
for i, item in enumerate(chunk):
url = item.get("url")
if not url:
continue
media_type = item.get("type", "photo")
custom_cache_key = item.get("cache_key")
# Check cache
ck = custom_cache_key or extract_asset_id_from_url(url) or url
ck_is_asset = is_asset_id(ck)
item_cache = self._get_cache_for_key(ck, ck_is_asset)
item_thumbhash = self._thumbhash_resolver(ck) if ck_is_asset and self._thumbhash_resolver else None
cached = item_cache.get(ck, thumbhash=item_thumbhash) if item_cache else None
if cached and cached.get("file_id"):
mij: dict[str, Any] = {"type": media_type, "media": cached["file_id"]}
else:
try:
download_url = self._resolve_url(url)
async with self._session.get(download_url) as resp:
if resp.status != 200:
continue
data = await resp.read()
if max_asset_data_size and len(data) > max_asset_data_size:
continue
if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE:
continue
if media_type == "photo":
exceeds, _, _, _ = check_photo_limits(data)
if exceeds:
continue
attach_name = f"file{upload_idx}"
ct = item.get("content_type") or ("image/jpeg" if media_type == "photo" else "video/mp4")
ext = "jpg" if media_type == "photo" else "mp4"
form.add_field(attach_name, data, filename=f"media_{i}.{ext}", content_type=ct)
mij = {"type": media_type, "media": f"attach://{attach_name}"}
upload_idx += 1
except aiohttp.ClientError:
continue
if i == 0 and chunk_idx == 0 and caption:
mij["caption"] = caption
mij["parse_mode"] = parse_mode
media_json.append(mij)
if not media_json:
continue
form.add_field("media", json.dumps(media_json))
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendMediaGroup"
try:
async with self._session.post(telegram_url, data=form) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
all_message_ids.extend(msg.get("message_id") for msg in result.get("result", []))
else:
return {"success": False, "error": result.get("description", "Unknown"), "failed_at_chunk": chunk_idx + 1}
except aiohttp.ClientError as err:
return {"success": False, "error": str(err), "failed_at_chunk": chunk_idx + 1}
return {"success": True, "message_ids": all_message_ids, "chunks_sent": len(chunks)}
@@ -0,0 +1,92 @@
"""Telegram media utilities - constants, URL helpers, and size splitting."""
from __future__ import annotations
import re
from typing import Final
# Telegram constants
TELEGRAM_API_BASE_URL: Final = "https://api.telegram.org/bot"
TELEGRAM_MAX_PHOTO_SIZE: Final = 10 * 1024 * 1024 # 10 MB
TELEGRAM_MAX_VIDEO_SIZE: Final = 50 * 1024 * 1024 # 50 MB
TELEGRAM_MAX_DIMENSION_SUM: Final = 10000
# Generic UUID pattern for asset IDs
_ASSET_ID_PATTERN = re.compile(r"^[a-f0-9-]{36}$")
# URL patterns to extract asset IDs (generic enough for Immich-style URLs)
_ASSET_ID_URL_PATTERNS = [
re.compile(r"/api/assets/([a-f0-9-]{36})/(?:original|thumbnail|video)"),
re.compile(r"/share/[^/]+/photos/([a-f0-9-]{36})"),
]
def is_asset_id(value: str) -> bool:
"""Check if a string is a valid asset ID (UUID format)."""
return bool(_ASSET_ID_PATTERN.match(value))
def extract_asset_id_from_url(url: str) -> str | None:
"""Extract asset ID from a URL if possible."""
if not url:
return None
for pattern in _ASSET_ID_URL_PATTERNS:
match = pattern.search(url)
if match:
return match.group(1)
return None
def split_media_by_upload_size(
media_items: list[tuple], max_upload_size: int
) -> list[list[tuple]]:
"""Split media items into sub-groups respecting upload size limit."""
if not media_items:
return []
groups: list[list[tuple]] = []
current_group: list[tuple] = []
current_size = 0
for item in media_items:
media_ref = item[1]
is_cached = item[4]
item_size = 0 if is_cached else (len(media_ref) if isinstance(media_ref, bytes) else 0)
if current_group and current_size + item_size > max_upload_size:
groups.append(current_group)
current_group = []
current_size = 0
current_group.append(item)
current_size += item_size
if current_group:
groups.append(current_group)
return groups
def check_photo_limits(
data: bytes,
) -> tuple[bool, str | None, int | None, int | None]:
"""Check if photo data exceeds Telegram photo limits."""
if len(data) > TELEGRAM_MAX_PHOTO_SIZE:
return (True, f"size {len(data)} bytes exceeds {TELEGRAM_MAX_PHOTO_SIZE} bytes limit", None, None)
try:
from PIL import Image
import io
img = Image.open(io.BytesIO(data))
width, height = img.size
dimension_sum = width + height
if dimension_sum > TELEGRAM_MAX_DIMENSION_SUM:
return (True, f"dimensions {width}x{height} (sum={dimension_sum}) exceed {TELEGRAM_MAX_DIMENSION_SUM} limit", width, height)
return False, None, width, height
except ImportError:
return False, None, None, None
except Exception:
return False, None, None, None
@@ -1 +1,5 @@
"""Webhook notification client."""
from notify_bridge_core.notifications.webhook.client import WebhookClient
__all__ = ["WebhookClient"]
@@ -0,0 +1,33 @@
"""Generic webhook notification client."""
from __future__ import annotations
import logging
from typing import Any
import aiohttp
_LOGGER = logging.getLogger(__name__)
class WebhookClient:
"""Send JSON payloads to a webhook URL."""
def __init__(self, session: aiohttp.ClientSession, url: str, headers: dict[str, str] | None = None) -> None:
self._session = session
self._url = url
self._headers = headers or {}
async def send(self, payload: dict[str, Any]) -> dict[str, Any]:
try:
async with self._session.post(
self._url,
json=payload,
headers={"Content-Type": "application/json", **self._headers},
) as response:
if 200 <= response.status < 300:
return {"success": True, "status_code": response.status}
body = await response.text()
return {"success": False, "error": f"HTTP {response.status}", "body": body[:200]}
except aiohttp.ClientError as err:
return {"success": False, "error": str(err)}
@@ -0,0 +1,52 @@
"""Abstract storage backends and JSON file implementation."""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Any, Protocol, runtime_checkable
_LOGGER = logging.getLogger(__name__)
@runtime_checkable
class StorageBackend(Protocol):
"""Abstract storage backend for persisting JSON-serializable data."""
async def load(self) -> dict[str, Any] | None: ...
async def save(self, data: dict[str, Any]) -> None: ...
async def remove(self) -> None: ...
class JsonFileBackend:
"""Simple JSON file storage backend."""
def __init__(self, path: Path) -> None:
self._path = path
async def load(self) -> dict[str, Any] | None:
if not self._path.exists():
return None
try:
text = self._path.read_text(encoding="utf-8")
return json.loads(text)
except (json.JSONDecodeError, OSError) as err:
_LOGGER.warning("Failed to load %s: %s", self._path, err)
return None
async def save(self, data: dict[str, Any]) -> None:
try:
self._path.parent.mkdir(parents=True, exist_ok=True)
self._path.write_text(
json.dumps(data, default=str), encoding="utf-8"
)
except OSError as err:
_LOGGER.error("Failed to save %s: %s", self._path, err)
async def remove(self) -> None:
try:
if self._path.exists():
self._path.unlink()
except OSError as err:
_LOGGER.error("Failed to remove %s: %s", self._path, err)