diff --git a/packages/core/src/notify_bridge_core/notifications/__init__.py b/packages/core/src/notify_bridge_core/notifications/__init__.py index f7dc22e..51fea54 100644 --- a/packages/core/src/notify_bridge_core/notifications/__init__.py +++ b/packages/core/src/notify_bridge_core/notifications/__init__.py @@ -1 +1,5 @@ """Notification dispatch — Telegram, webhooks, queue.""" + +from notify_bridge_core.notifications.dispatcher import NotificationDispatcher, TargetConfig + +__all__ = ["NotificationDispatcher", "TargetConfig"] diff --git a/packages/core/src/notify_bridge_core/notifications/dispatcher.py b/packages/core/src/notify_bridge_core/notifications/dispatcher.py new file mode 100644 index 0000000..0ed8eec --- /dev/null +++ b/packages/core/src/notify_bridge_core/notifications/dispatcher.py @@ -0,0 +1,124 @@ +"""Notification dispatcher — routes events to targets via templates.""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import Any + +import aiohttp + +from notify_bridge_core.models.events import ServiceEvent +from notify_bridge_core.templates.context import build_template_context +from notify_bridge_core.templates.renderer import render_template + +from .telegram.client import TelegramClient +from .webhook.client import WebhookClient + +_LOGGER = logging.getLogger(__name__) + +DEFAULT_TEMPLATE = ( + '{{ added_count }} new item(s) added to "{{ collection_name }}".' + '{% if people %}\nPeople: {{ people | join(", ") }}{% endif %}' +) + + +@dataclass +class TargetConfig: + """Configuration for a notification target.""" + + type: str # "telegram" or "webhook" + config: dict[str, Any] # type-specific config + template_slots: dict[str, str] | None = None # event_type -> template string + + +class NotificationDispatcher: + """Dispatches ServiceEvent notifications to configured targets.""" + + async def dispatch( + self, + event: ServiceEvent, + targets: list[TargetConfig], + ) -> list[dict[str, Any]]: + """Send event notification to all targets. + + Returns list of results (one per target). + """ + results = [] + for target in targets: + try: + result = await self._send_to_target(event, target) + results.append(result) + except Exception as e: + _LOGGER.error("Failed to dispatch to target: %s", e) + results.append({"success": False, "error": str(e)}) + return results + + async def _send_to_target( + self, event: ServiceEvent, target: TargetConfig + ) -> dict[str, Any]: + """Send event to a single target.""" + # Select template + template_str = DEFAULT_TEMPLATE + if target.template_slots: + slot = target.template_slots.get(event.event_type.value) + if slot: + template_str = slot + + # Build context and render + ctx = build_template_context(event, target_type=target.type) + message = render_template(template_str, ctx) + + if target.type == "telegram": + return await self._send_telegram(target, message, event) + elif target.type == "webhook": + return await self._send_webhook(target, message, event) + return {"success": False, "error": f"Unknown target type: {target.type}"} + + async def _send_telegram( + self, target: TargetConfig, message: str, event: ServiceEvent + ) -> dict[str, Any]: + bot_token = target.config.get("bot_token") + chat_id = target.config.get("chat_id") + + if not bot_token or not chat_id: + return {"success": False, "error": "Missing bot_token or chat_id"} + + async with aiohttp.ClientSession() as session: + client = TelegramClient(session, bot_token) + + # Build asset list for media sending + assets = [] + for asset in event.added_assets: + url = asset.full_url or asset.thumbnail_url + if url: + asset_type = "video" if asset.type.value == "video" else "photo" + assets.append({"url": url, "type": asset_type}) + + return await client.send_notification( + chat_id=str(chat_id), + caption=message, + assets=assets if assets else None, + ) + + async def _send_webhook( + self, target: TargetConfig, message: str, event: ServiceEvent + ) -> dict[str, Any]: + url = target.config.get("url") + headers = target.config.get("headers", {}) + + if not url: + return {"success": False, "error": "Missing url in target config"} + + payload = { + "message": message, + "event_type": event.event_type.value, + "provider_type": event.provider_type.value, + "collection_name": event.collection_name, + "collection_id": event.collection_id, + "timestamp": event.timestamp.isoformat(), + } + + async with aiohttp.ClientSession() as session: + client = WebhookClient(session, url, headers) + return await client.send(payload) diff --git a/packages/core/src/notify_bridge_core/notifications/queue.py b/packages/core/src/notify_bridge_core/notifications/queue.py new file mode 100644 index 0000000..1cc02bd --- /dev/null +++ b/packages/core/src/notify_bridge_core/notifications/queue.py @@ -0,0 +1,48 @@ +"""Persistent notification queue for deferred notifications.""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from typing import Any + +from notify_bridge_core.storage import StorageBackend + +_LOGGER = logging.getLogger(__name__) + + +class NotificationQueue: + """Persistent queue for notifications deferred during quiet hours.""" + + def __init__(self, backend: StorageBackend) -> None: + self._backend = backend + self._data: dict[str, Any] | None = None + + async def async_load(self) -> None: + self._data = await self._backend.load() or {"queue": []} + + async def async_enqueue(self, notification_params: dict[str, Any]) -> None: + if self._data is None: + self._data = {"queue": []} + self._data["queue"].append({ + "params": notification_params, + "queued_at": datetime.now(timezone.utc).isoformat(), + }) + await self._backend.save(self._data) + + def get_all(self) -> list[dict[str, Any]]: + if not self._data: + return [] + return list(self._data.get("queue", [])) + + def has_pending(self) -> bool: + return bool(self._data and self._data.get("queue")) + + async def async_clear(self) -> None: + if self._data: + self._data["queue"] = [] + await self._backend.save(self._data) + + async def async_remove(self) -> None: + await self._backend.remove() + self._data = None diff --git a/packages/core/src/notify_bridge_core/notifications/telegram/__init__.py b/packages/core/src/notify_bridge_core/notifications/telegram/__init__.py index 8eda49a..522ffec 100644 --- a/packages/core/src/notify_bridge_core/notifications/telegram/__init__.py +++ b/packages/core/src/notify_bridge_core/notifications/telegram/__init__.py @@ -1 +1,6 @@ """Telegram notification client.""" + +from notify_bridge_core.notifications.telegram.client import TelegramClient +from notify_bridge_core.notifications.telegram.cache import TelegramFileCache + +__all__ = ["TelegramClient", "TelegramFileCache"] diff --git a/packages/core/src/notify_bridge_core/notifications/telegram/cache.py b/packages/core/src/notify_bridge_core/notifications/telegram/cache.py new file mode 100644 index 0000000..7b9414e --- /dev/null +++ b/packages/core/src/notify_bridge_core/notifications/telegram/cache.py @@ -0,0 +1,129 @@ +"""Telegram file_id cache with pluggable storage backend.""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from typing import Any + +from notify_bridge_core.storage import StorageBackend + +_LOGGER = logging.getLogger(__name__) + +DEFAULT_TELEGRAM_CACHE_TTL = 48 * 60 * 60 + + +class TelegramFileCache: + """Cache for Telegram file_ids to avoid re-uploading media. + + Supports two validation modes: + - TTL mode (default): entries expire after a configured time-to-live + - Thumbhash mode: entries validated by comparing stored thumbhash with current + """ + + THUMBHASH_MAX_ENTRIES = 2000 + + def __init__( + self, + backend: StorageBackend, + ttl_seconds: int = DEFAULT_TELEGRAM_CACHE_TTL, + use_thumbhash: bool = False, + ) -> None: + self._backend = backend + self._data: dict[str, Any] | None = None + self._ttl_seconds = ttl_seconds + self._use_thumbhash = use_thumbhash + + async def async_load(self) -> None: + self._data = await self._backend.load() or {"files": {}} + await self._cleanup_expired() + + async def _cleanup_expired(self) -> None: + if self._use_thumbhash: + files = self._data.get("files", {}) if self._data else {} + if len(files) > self.THUMBHASH_MAX_ENTRIES: + sorted_keys = sorted(files, key=lambda k: files[k].get("cached_at", "")) + for key in sorted_keys[: len(files) - self.THUMBHASH_MAX_ENTRIES]: + del files[key] + await self._backend.save(self._data) + return + + if not self._data or "files" not in self._data: + return + + now = datetime.now(timezone.utc) + expired = [ + url for url, entry in self._data["files"].items() + if entry.get("cached_at") and + (now - datetime.fromisoformat(entry["cached_at"])).total_seconds() > self._ttl_seconds + ] + + if expired: + for key in expired: + del self._data["files"][key] + await self._backend.save(self._data) + + def get(self, key: str, thumbhash: str | None = None) -> dict[str, Any] | None: + if not self._data or "files" not in self._data: + return None + + entry = self._data["files"].get(key) + if not entry: + return None + + if self._use_thumbhash: + if thumbhash is not None: + stored = entry.get("thumbhash") + if stored and stored != thumbhash: + del self._data["files"][key] + return None + else: + cached_at_str = entry.get("cached_at") + if cached_at_str: + age = (datetime.now(timezone.utc) - datetime.fromisoformat(cached_at_str)).total_seconds() + if age > self._ttl_seconds: + return None + + return {"file_id": entry.get("file_id"), "type": entry.get("type")} + + async def async_set( + self, key: str, file_id: str, media_type: str, thumbhash: str | None = None + ) -> None: + if self._data is None: + self._data = {"files": {}} + + entry: dict[str, Any] = { + "file_id": file_id, + "type": media_type, + "cached_at": datetime.now(timezone.utc).isoformat(), + } + if thumbhash is not None: + entry["thumbhash"] = thumbhash + + self._data["files"][key] = entry + await self._backend.save(self._data) + + async def async_set_many( + self, entries: list[tuple[str, str, str, str | None]] + ) -> None: + if not entries: + return + if self._data is None: + self._data = {"files": {}} + + now_iso = datetime.now(timezone.utc).isoformat() + for key, file_id, media_type, thumbhash in entries: + entry: dict[str, Any] = { + "file_id": file_id, + "type": media_type, + "cached_at": now_iso, + } + if thumbhash is not None: + entry["thumbhash"] = thumbhash + self._data["files"][key] = entry + + await self._backend.save(self._data) + + async def async_remove(self) -> None: + await self._backend.remove() + self._data = None diff --git a/packages/core/src/notify_bridge_core/notifications/telegram/client.py b/packages/core/src/notify_bridge_core/notifications/telegram/client.py new file mode 100644 index 0000000..a6d3d3c --- /dev/null +++ b/packages/core/src/notify_bridge_core/notifications/telegram/client.py @@ -0,0 +1,481 @@ +"""Telegram Bot API client for sending notifications with media.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import mimetypes +from typing import Any, Callable + +import aiohttp +from aiohttp import FormData + +from .cache import TelegramFileCache +from .media import ( + TELEGRAM_API_BASE_URL, + TELEGRAM_MAX_PHOTO_SIZE, + TELEGRAM_MAX_VIDEO_SIZE, + check_photo_limits, + extract_asset_id_from_url, + is_asset_id, + split_media_by_upload_size, +) + +_LOGGER = logging.getLogger(__name__) + +NotificationResult = dict[str, Any] + + +class TelegramClient: + """Async Telegram Bot API client for sending notifications with media.""" + + def __init__( + self, + session: aiohttp.ClientSession, + bot_token: str, + *, + url_cache: TelegramFileCache | None = None, + asset_cache: TelegramFileCache | None = None, + url_resolver: Callable[[str], str] | None = None, + thumbhash_resolver: Callable[[str], str | None] | None = None, + ) -> None: + self._session = session + self._token = bot_token + self._url_cache = url_cache + self._asset_cache = asset_cache + self._url_resolver = url_resolver + self._thumbhash_resolver = thumbhash_resolver + + def _resolve_url(self, url: str) -> str: + if self._url_resolver: + return self._url_resolver(url) + return url + + def _get_cache_and_key( + self, url: str | None, cache_key: str | None = None, + ) -> tuple[TelegramFileCache | None, str | None, str | None]: + if cache_key: + return self._url_cache, cache_key, None + if url: + if is_asset_id(url): + thumbhash = self._thumbhash_resolver(url) if self._thumbhash_resolver else None + return self._asset_cache, url, thumbhash + asset_id = extract_asset_id_from_url(url) + if asset_id: + thumbhash = self._thumbhash_resolver(asset_id) if self._thumbhash_resolver else None + return self._asset_cache, asset_id, thumbhash + return self._url_cache, url, None + return None, None, None + + def _get_cache_for_key(self, key: str, is_asset: bool | None = None) -> TelegramFileCache | None: + if is_asset is None: + is_asset = is_asset_id(key) + return self._asset_cache if is_asset else self._url_cache + + async def send_notification( + self, + chat_id: str, + assets: list[dict[str, str]] | None = None, + caption: str | None = None, + reply_to_message_id: int | None = None, + disable_web_page_preview: bool | None = None, + parse_mode: str = "HTML", + max_group_size: int = 10, + chunk_delay: int = 0, + max_asset_data_size: int | None = None, + send_large_photos_as_documents: bool = False, + chat_action: str | None = "typing", + ) -> NotificationResult: + if not assets: + return await self.send_message( + chat_id, caption or "", reply_to_message_id, + disable_web_page_preview, parse_mode, + ) + + typing_task = None + if chat_action: + typing_task = self._start_typing_indicator(chat_id, chat_action) + + try: + if len(assets) == 1 and assets[0].get("type") == "photo": + return await self._send_photo( + chat_id, assets[0].get("url"), caption, reply_to_message_id, + parse_mode, max_asset_data_size, send_large_photos_as_documents, + assets[0].get("content_type"), assets[0].get("cache_key"), + ) + if len(assets) == 1 and assets[0].get("type") == "video": + return await self._send_video( + chat_id, assets[0].get("url"), caption, reply_to_message_id, + parse_mode, max_asset_data_size, + assets[0].get("content_type"), assets[0].get("cache_key"), + ) + if len(assets) == 1 and assets[0].get("type", "document") == "document": + url = assets[0].get("url") + if not url: + return {"success": False, "error": "Missing 'url' for document"} + try: + download_url = self._resolve_url(url) + async with self._session.get(download_url) as resp: + if resp.status != 200: + return {"success": False, "error": f"Failed to download media: HTTP {resp.status}"} + data = await resp.read() + if max_asset_data_size is not None and len(data) > max_asset_data_size: + return {"success": False, "error": f"Media size exceeds limit"} + filename = url.split("/")[-1].split("?")[0] or "file" + return await self._send_document( + chat_id, data, filename, caption, reply_to_message_id, + parse_mode, url, assets[0].get("content_type"), + assets[0].get("cache_key"), + ) + except aiohttp.ClientError as err: + return {"success": False, "error": f"Failed to download media: {err}"} + + return await self._send_media_group( + chat_id, assets, caption, reply_to_message_id, max_group_size, + chunk_delay, parse_mode, max_asset_data_size, + send_large_photos_as_documents, + ) + finally: + if typing_task: + typing_task.cancel() + try: + await typing_task + except asyncio.CancelledError: + pass + + async def send_message( + self, + chat_id: str, + text: str, + reply_to_message_id: int | None = None, + disable_web_page_preview: bool | None = None, + parse_mode: str = "HTML", + ) -> NotificationResult: + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendMessage" + payload: dict[str, Any] = { + "chat_id": chat_id, + "text": text or "Notification", + "parse_mode": parse_mode, + } + if reply_to_message_id: + payload["reply_to_message_id"] = reply_to_message_id + if disable_web_page_preview is not None: + payload["disable_web_page_preview"] = disable_web_page_preview + + try: + async with self._session.post(telegram_url, json=payload) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + return {"success": True, "message_id": result.get("result", {}).get("message_id")} + return {"success": False, "error": result.get("description", "Unknown Telegram error"), "error_code": result.get("error_code")} + except aiohttp.ClientError as err: + return {"success": False, "error": str(err)} + + async def send_chat_action(self, chat_id: str, action: str = "typing") -> bool: + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendChatAction" + try: + async with self._session.post(telegram_url, json={"chat_id": chat_id, "action": action}) as response: + result = await response.json() + return response.status == 200 and result.get("ok", False) + except aiohttp.ClientError: + return False + + def _start_typing_indicator(self, chat_id: str, action: str = "typing") -> asyncio.Task: + async def action_loop() -> None: + try: + while True: + await self.send_chat_action(chat_id, action) + await asyncio.sleep(4) + except asyncio.CancelledError: + pass + return asyncio.create_task(action_loop()) + + async def _send_photo( + self, chat_id: str, url: str | None, caption: str | None = None, + reply_to_message_id: int | None = None, parse_mode: str = "HTML", + max_asset_data_size: int | None = None, send_large_photos_as_documents: bool = False, + content_type: str | None = None, cache_key: str | None = None, + ) -> NotificationResult: + if not content_type: + content_type = "image/jpeg" + if not url: + return {"success": False, "error": "Missing 'url' for photo"} + + effective_cache, effective_cache_key, effective_thumbhash = self._get_cache_and_key(url, cache_key) + + # Check cache + cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None + if cached and cached.get("file_id"): + payload = {"chat_id": chat_id, "photo": cached["file_id"], "parse_mode": parse_mode} + if caption: + payload["caption"] = caption + if reply_to_message_id: + payload["reply_to_message_id"] = reply_to_message_id + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendPhoto" + try: + async with self._session.post(telegram_url, json=payload) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + return {"success": True, "message_id": result.get("result", {}).get("message_id"), "cached": True} + except aiohttp.ClientError: + pass + + try: + download_url = self._resolve_url(url) + async with self._session.get(download_url) as resp: + if resp.status != 200: + return {"success": False, "error": f"Failed to download photo: HTTP {resp.status}"} + data = await resp.read() + + if max_asset_data_size is not None and len(data) > max_asset_data_size: + return {"success": False, "error": "Photo exceeds size limit", "skipped": True} + + exceeds_limits, reason, _, _ = check_photo_limits(data) + if exceeds_limits: + if send_large_photos_as_documents: + return await self._send_document(chat_id, data, "photo.jpg", caption, reply_to_message_id, parse_mode, url, None, cache_key) + return {"success": False, "error": f"Photo {reason}", "skipped": True} + + form = FormData() + form.add_field("chat_id", chat_id) + form.add_field("photo", data, filename="photo.jpg", content_type=content_type) + form.add_field("parse_mode", parse_mode) + if caption: + form.add_field("caption", caption) + if reply_to_message_id: + form.add_field("reply_to_message_id", str(reply_to_message_id)) + + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendPhoto" + async with self._session.post(telegram_url, data=form) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + photos = result.get("result", {}).get("photo", []) + if photos and effective_cache and effective_cache_key: + file_id = photos[-1].get("file_id") + if file_id: + await effective_cache.async_set(effective_cache_key, file_id, "photo", thumbhash=effective_thumbhash) + return {"success": True, "message_id": result.get("result", {}).get("message_id")} + return {"success": False, "error": result.get("description", "Unknown Telegram error")} + except aiohttp.ClientError as err: + return {"success": False, "error": str(err)} + + async def _send_video( + self, chat_id: str, url: str | None, caption: str | None = None, + reply_to_message_id: int | None = None, parse_mode: str = "HTML", + max_asset_data_size: int | None = None, content_type: str | None = None, + cache_key: str | None = None, + ) -> NotificationResult: + if not content_type: + content_type = "video/mp4" + if not url: + return {"success": False, "error": "Missing 'url' for video"} + + effective_cache, effective_cache_key, effective_thumbhash = self._get_cache_and_key(url, cache_key) + + cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None + if cached and cached.get("file_id"): + payload = {"chat_id": chat_id, "video": cached["file_id"], "parse_mode": parse_mode} + if caption: + payload["caption"] = caption + if reply_to_message_id: + payload["reply_to_message_id"] = reply_to_message_id + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendVideo" + try: + async with self._session.post(telegram_url, json=payload) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + return {"success": True, "message_id": result.get("result", {}).get("message_id"), "cached": True} + except aiohttp.ClientError: + pass + + try: + download_url = self._resolve_url(url) + async with self._session.get(download_url) as resp: + if resp.status != 200: + return {"success": False, "error": f"Failed to download video: HTTP {resp.status}"} + data = await resp.read() + + if max_asset_data_size is not None and len(data) > max_asset_data_size: + return {"success": False, "error": "Video exceeds size limit", "skipped": True} + if len(data) > TELEGRAM_MAX_VIDEO_SIZE: + return {"success": False, "error": f"Video exceeds Telegram's {TELEGRAM_MAX_VIDEO_SIZE // (1024*1024)} MB limit", "skipped": True} + + form = FormData() + form.add_field("chat_id", chat_id) + form.add_field("video", data, filename="video.mp4", content_type=content_type) + form.add_field("parse_mode", parse_mode) + if caption: + form.add_field("caption", caption) + if reply_to_message_id: + form.add_field("reply_to_message_id", str(reply_to_message_id)) + + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendVideo" + async with self._session.post(telegram_url, data=form) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + video = result.get("result", {}).get("video", {}) + if video and effective_cache and effective_cache_key: + file_id = video.get("file_id") + if file_id: + await effective_cache.async_set(effective_cache_key, file_id, "video", thumbhash=effective_thumbhash) + return {"success": True, "message_id": result.get("result", {}).get("message_id")} + return {"success": False, "error": result.get("description", "Unknown Telegram error")} + except aiohttp.ClientError as err: + return {"success": False, "error": str(err)} + + async def _send_document( + self, chat_id: str, data: bytes, filename: str = "file", + caption: str | None = None, reply_to_message_id: int | None = None, + parse_mode: str = "HTML", source_url: str | None = None, + content_type: str | None = None, cache_key: str | None = None, + ) -> NotificationResult: + if not content_type: + content_type, _ = mimetypes.guess_type(filename) + if not content_type: + content_type = "application/octet-stream" + + effective_cache, effective_cache_key, effective_thumbhash = self._get_cache_and_key(source_url, cache_key) + + if effective_cache and effective_cache_key: + cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) + if cached and cached.get("file_id") and cached.get("type") == "document": + payload = {"chat_id": chat_id, "document": cached["file_id"], "parse_mode": parse_mode} + if caption: + payload["caption"] = caption + if reply_to_message_id: + payload["reply_to_message_id"] = reply_to_message_id + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendDocument" + try: + async with self._session.post(telegram_url, json=payload) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + return {"success": True, "message_id": result.get("result", {}).get("message_id"), "cached": True} + except aiohttp.ClientError: + pass + + try: + form = FormData() + form.add_field("chat_id", chat_id) + form.add_field("document", data, filename=filename, content_type=content_type) + form.add_field("parse_mode", parse_mode) + if caption: + form.add_field("caption", caption) + if reply_to_message_id: + form.add_field("reply_to_message_id", str(reply_to_message_id)) + + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendDocument" + async with self._session.post(telegram_url, data=form) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + if effective_cache_key and effective_cache: + document = result.get("result", {}).get("document", {}) + file_id = document.get("file_id") + if file_id: + await effective_cache.async_set(effective_cache_key, file_id, "document", thumbhash=effective_thumbhash) + return {"success": True, "message_id": result.get("result", {}).get("message_id")} + return {"success": False, "error": result.get("description", "Unknown Telegram error")} + except aiohttp.ClientError as err: + return {"success": False, "error": str(err)} + + async def _send_media_group( + self, chat_id: str, assets: list[dict[str, str]], + caption: str | None = None, reply_to_message_id: int | None = None, + max_group_size: int = 10, chunk_delay: int = 0, parse_mode: str = "HTML", + max_asset_data_size: int | None = None, send_large_photos_as_documents: bool = False, + ) -> NotificationResult: + chunks = [assets[i:i + max_group_size] for i in range(0, len(assets), max_group_size)] + all_message_ids: list = [] + + for chunk_idx, chunk in enumerate(chunks): + if chunk_idx > 0 and chunk_delay > 0: + await asyncio.sleep(chunk_delay / 1000) + + if len(chunk) == 1: + item = chunk[0] + chunk_caption = caption if chunk_idx == 0 else None + chunk_reply = reply_to_message_id if chunk_idx == 0 else None + if item.get("type") == "photo": + result = await self._send_photo(chat_id, item.get("url"), chunk_caption, chunk_reply, parse_mode, max_asset_data_size, send_large_photos_as_documents, item.get("content_type"), item.get("cache_key")) + elif item.get("type") == "video": + result = await self._send_video(chat_id, item.get("url"), chunk_caption, chunk_reply, parse_mode, max_asset_data_size, item.get("content_type"), item.get("cache_key")) + else: + continue + if not result.get("success"): + result["failed_at_chunk"] = chunk_idx + 1 + return result + all_message_ids.append(result.get("message_id")) + continue + + # Multi-item: download all, build form, send media group + form = FormData() + form.add_field("chat_id", chat_id) + if reply_to_message_id and chunk_idx == 0: + form.add_field("reply_to_message_id", str(reply_to_message_id)) + + media_json = [] + upload_idx = 0 + + for i, item in enumerate(chunk): + url = item.get("url") + if not url: + continue + media_type = item.get("type", "photo") + custom_cache_key = item.get("cache_key") + + # Check cache + ck = custom_cache_key or extract_asset_id_from_url(url) or url + ck_is_asset = is_asset_id(ck) + item_cache = self._get_cache_for_key(ck, ck_is_asset) + item_thumbhash = self._thumbhash_resolver(ck) if ck_is_asset and self._thumbhash_resolver else None + cached = item_cache.get(ck, thumbhash=item_thumbhash) if item_cache else None + + if cached and cached.get("file_id"): + mij: dict[str, Any] = {"type": media_type, "media": cached["file_id"]} + else: + try: + download_url = self._resolve_url(url) + async with self._session.get(download_url) as resp: + if resp.status != 200: + continue + data = await resp.read() + if max_asset_data_size and len(data) > max_asset_data_size: + continue + if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE: + continue + if media_type == "photo": + exceeds, _, _, _ = check_photo_limits(data) + if exceeds: + continue + + attach_name = f"file{upload_idx}" + ct = item.get("content_type") or ("image/jpeg" if media_type == "photo" else "video/mp4") + ext = "jpg" if media_type == "photo" else "mp4" + form.add_field(attach_name, data, filename=f"media_{i}.{ext}", content_type=ct) + mij = {"type": media_type, "media": f"attach://{attach_name}"} + upload_idx += 1 + except aiohttp.ClientError: + continue + + if i == 0 and chunk_idx == 0 and caption: + mij["caption"] = caption + mij["parse_mode"] = parse_mode + media_json.append(mij) + + if not media_json: + continue + + form.add_field("media", json.dumps(media_json)) + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendMediaGroup" + + try: + async with self._session.post(telegram_url, data=form) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + all_message_ids.extend(msg.get("message_id") for msg in result.get("result", [])) + else: + return {"success": False, "error": result.get("description", "Unknown"), "failed_at_chunk": chunk_idx + 1} + except aiohttp.ClientError as err: + return {"success": False, "error": str(err), "failed_at_chunk": chunk_idx + 1} + + return {"success": True, "message_ids": all_message_ids, "chunks_sent": len(chunks)} diff --git a/packages/core/src/notify_bridge_core/notifications/telegram/media.py b/packages/core/src/notify_bridge_core/notifications/telegram/media.py new file mode 100644 index 0000000..2cbacae --- /dev/null +++ b/packages/core/src/notify_bridge_core/notifications/telegram/media.py @@ -0,0 +1,92 @@ +"""Telegram media utilities - constants, URL helpers, and size splitting.""" + +from __future__ import annotations + +import re +from typing import Final + +# Telegram constants +TELEGRAM_API_BASE_URL: Final = "https://api.telegram.org/bot" +TELEGRAM_MAX_PHOTO_SIZE: Final = 10 * 1024 * 1024 # 10 MB +TELEGRAM_MAX_VIDEO_SIZE: Final = 50 * 1024 * 1024 # 50 MB +TELEGRAM_MAX_DIMENSION_SUM: Final = 10000 + +# Generic UUID pattern for asset IDs +_ASSET_ID_PATTERN = re.compile(r"^[a-f0-9-]{36}$") + +# URL patterns to extract asset IDs (generic enough for Immich-style URLs) +_ASSET_ID_URL_PATTERNS = [ + re.compile(r"/api/assets/([a-f0-9-]{36})/(?:original|thumbnail|video)"), + re.compile(r"/share/[^/]+/photos/([a-f0-9-]{36})"), +] + + +def is_asset_id(value: str) -> bool: + """Check if a string is a valid asset ID (UUID format).""" + return bool(_ASSET_ID_PATTERN.match(value)) + + +def extract_asset_id_from_url(url: str) -> str | None: + """Extract asset ID from a URL if possible.""" + if not url: + return None + for pattern in _ASSET_ID_URL_PATTERNS: + match = pattern.search(url) + if match: + return match.group(1) + return None + + +def split_media_by_upload_size( + media_items: list[tuple], max_upload_size: int +) -> list[list[tuple]]: + """Split media items into sub-groups respecting upload size limit.""" + if not media_items: + return [] + + groups: list[list[tuple]] = [] + current_group: list[tuple] = [] + current_size = 0 + + for item in media_items: + media_ref = item[1] + is_cached = item[4] + item_size = 0 if is_cached else (len(media_ref) if isinstance(media_ref, bytes) else 0) + + if current_group and current_size + item_size > max_upload_size: + groups.append(current_group) + current_group = [] + current_size = 0 + + current_group.append(item) + current_size += item_size + + if current_group: + groups.append(current_group) + + return groups + + +def check_photo_limits( + data: bytes, +) -> tuple[bool, str | None, int | None, int | None]: + """Check if photo data exceeds Telegram photo limits.""" + if len(data) > TELEGRAM_MAX_PHOTO_SIZE: + return (True, f"size {len(data)} bytes exceeds {TELEGRAM_MAX_PHOTO_SIZE} bytes limit", None, None) + + try: + from PIL import Image + import io + + img = Image.open(io.BytesIO(data)) + width, height = img.size + dimension_sum = width + height + + if dimension_sum > TELEGRAM_MAX_DIMENSION_SUM: + return (True, f"dimensions {width}x{height} (sum={dimension_sum}) exceed {TELEGRAM_MAX_DIMENSION_SUM} limit", width, height) + + return False, None, width, height + except ImportError: + return False, None, None, None + except Exception: + return False, None, None, None diff --git a/packages/core/src/notify_bridge_core/notifications/webhook/__init__.py b/packages/core/src/notify_bridge_core/notifications/webhook/__init__.py index 020b298..7d6a2dd 100644 --- a/packages/core/src/notify_bridge_core/notifications/webhook/__init__.py +++ b/packages/core/src/notify_bridge_core/notifications/webhook/__init__.py @@ -1 +1,5 @@ """Webhook notification client.""" + +from notify_bridge_core.notifications.webhook.client import WebhookClient + +__all__ = ["WebhookClient"] diff --git a/packages/core/src/notify_bridge_core/notifications/webhook/client.py b/packages/core/src/notify_bridge_core/notifications/webhook/client.py new file mode 100644 index 0000000..45bd9a9 --- /dev/null +++ b/packages/core/src/notify_bridge_core/notifications/webhook/client.py @@ -0,0 +1,33 @@ +"""Generic webhook notification client.""" + +from __future__ import annotations + +import logging +from typing import Any + +import aiohttp + +_LOGGER = logging.getLogger(__name__) + + +class WebhookClient: + """Send JSON payloads to a webhook URL.""" + + def __init__(self, session: aiohttp.ClientSession, url: str, headers: dict[str, str] | None = None) -> None: + self._session = session + self._url = url + self._headers = headers or {} + + async def send(self, payload: dict[str, Any]) -> dict[str, Any]: + try: + async with self._session.post( + self._url, + json=payload, + headers={"Content-Type": "application/json", **self._headers}, + ) as response: + if 200 <= response.status < 300: + return {"success": True, "status_code": response.status} + body = await response.text() + return {"success": False, "error": f"HTTP {response.status}", "body": body[:200]} + except aiohttp.ClientError as err: + return {"success": False, "error": str(err)} diff --git a/packages/core/src/notify_bridge_core/storage.py b/packages/core/src/notify_bridge_core/storage.py new file mode 100644 index 0000000..d92161e --- /dev/null +++ b/packages/core/src/notify_bridge_core/storage.py @@ -0,0 +1,52 @@ +"""Abstract storage backends and JSON file implementation.""" + +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Any, Protocol, runtime_checkable + +_LOGGER = logging.getLogger(__name__) + + +@runtime_checkable +class StorageBackend(Protocol): + """Abstract storage backend for persisting JSON-serializable data.""" + + async def load(self) -> dict[str, Any] | None: ... + async def save(self, data: dict[str, Any]) -> None: ... + async def remove(self) -> None: ... + + +class JsonFileBackend: + """Simple JSON file storage backend.""" + + def __init__(self, path: Path) -> None: + self._path = path + + async def load(self) -> dict[str, Any] | None: + if not self._path.exists(): + return None + try: + text = self._path.read_text(encoding="utf-8") + return json.loads(text) + except (json.JSONDecodeError, OSError) as err: + _LOGGER.warning("Failed to load %s: %s", self._path, err) + return None + + async def save(self, data: dict[str, Any]) -> None: + try: + self._path.parent.mkdir(parents=True, exist_ok=True) + self._path.write_text( + json.dumps(data, default=str), encoding="utf-8" + ) + except OSError as err: + _LOGGER.error("Failed to save %s: %s", self._path, err) + + async def remove(self) -> None: + try: + if self._path.exists(): + self._path.unlink() + except OSError as err: + _LOGGER.error("Failed to remove %s: %s", self._path, err)