feat(notify-bridge): phase 5 - notification system
Extract and generalize notification dispatch: - TelegramClient: full Bot API client with photo/video/document/media group support - TelegramFileCache: TTL and thumbhash-based file_id caching - WebhookClient: simple JSON POST client - NotificationQueue: persistent deferred notification queue for quiet hours - NotificationDispatcher: routes ServiceEvent to targets, renders templates - StorageBackend protocol + JsonFileBackend for persistence - TargetConfig dataclass for target configuration Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1 +1,5 @@
|
||||
"""Notification dispatch — Telegram, webhooks, queue."""
|
||||
|
||||
from notify_bridge_core.notifications.dispatcher import NotificationDispatcher, TargetConfig
|
||||
|
||||
__all__ = ["NotificationDispatcher", "TargetConfig"]
|
||||
|
||||
@@ -0,0 +1,124 @@
|
||||
"""Notification dispatcher — routes events to targets via templates."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
|
||||
from notify_bridge_core.models.events import ServiceEvent
|
||||
from notify_bridge_core.templates.context import build_template_context
|
||||
from notify_bridge_core.templates.renderer import render_template
|
||||
|
||||
from .telegram.client import TelegramClient
|
||||
from .webhook.client import WebhookClient
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_TEMPLATE = (
|
||||
'{{ added_count }} new item(s) added to "{{ collection_name }}".'
|
||||
'{% if people %}\nPeople: {{ people | join(", ") }}{% endif %}'
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TargetConfig:
|
||||
"""Configuration for a notification target."""
|
||||
|
||||
type: str # "telegram" or "webhook"
|
||||
config: dict[str, Any] # type-specific config
|
||||
template_slots: dict[str, str] | None = None # event_type -> template string
|
||||
|
||||
|
||||
class NotificationDispatcher:
|
||||
"""Dispatches ServiceEvent notifications to configured targets."""
|
||||
|
||||
async def dispatch(
|
||||
self,
|
||||
event: ServiceEvent,
|
||||
targets: list[TargetConfig],
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Send event notification to all targets.
|
||||
|
||||
Returns list of results (one per target).
|
||||
"""
|
||||
results = []
|
||||
for target in targets:
|
||||
try:
|
||||
result = await self._send_to_target(event, target)
|
||||
results.append(result)
|
||||
except Exception as e:
|
||||
_LOGGER.error("Failed to dispatch to target: %s", e)
|
||||
results.append({"success": False, "error": str(e)})
|
||||
return results
|
||||
|
||||
async def _send_to_target(
|
||||
self, event: ServiceEvent, target: TargetConfig
|
||||
) -> dict[str, Any]:
|
||||
"""Send event to a single target."""
|
||||
# Select template
|
||||
template_str = DEFAULT_TEMPLATE
|
||||
if target.template_slots:
|
||||
slot = target.template_slots.get(event.event_type.value)
|
||||
if slot:
|
||||
template_str = slot
|
||||
|
||||
# Build context and render
|
||||
ctx = build_template_context(event, target_type=target.type)
|
||||
message = render_template(template_str, ctx)
|
||||
|
||||
if target.type == "telegram":
|
||||
return await self._send_telegram(target, message, event)
|
||||
elif target.type == "webhook":
|
||||
return await self._send_webhook(target, message, event)
|
||||
return {"success": False, "error": f"Unknown target type: {target.type}"}
|
||||
|
||||
async def _send_telegram(
|
||||
self, target: TargetConfig, message: str, event: ServiceEvent
|
||||
) -> dict[str, Any]:
|
||||
bot_token = target.config.get("bot_token")
|
||||
chat_id = target.config.get("chat_id")
|
||||
|
||||
if not bot_token or not chat_id:
|
||||
return {"success": False, "error": "Missing bot_token or chat_id"}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
client = TelegramClient(session, bot_token)
|
||||
|
||||
# Build asset list for media sending
|
||||
assets = []
|
||||
for asset in event.added_assets:
|
||||
url = asset.full_url or asset.thumbnail_url
|
||||
if url:
|
||||
asset_type = "video" if asset.type.value == "video" else "photo"
|
||||
assets.append({"url": url, "type": asset_type})
|
||||
|
||||
return await client.send_notification(
|
||||
chat_id=str(chat_id),
|
||||
caption=message,
|
||||
assets=assets if assets else None,
|
||||
)
|
||||
|
||||
async def _send_webhook(
|
||||
self, target: TargetConfig, message: str, event: ServiceEvent
|
||||
) -> dict[str, Any]:
|
||||
url = target.config.get("url")
|
||||
headers = target.config.get("headers", {})
|
||||
|
||||
if not url:
|
||||
return {"success": False, "error": "Missing url in target config"}
|
||||
|
||||
payload = {
|
||||
"message": message,
|
||||
"event_type": event.event_type.value,
|
||||
"provider_type": event.provider_type.value,
|
||||
"collection_name": event.collection_name,
|
||||
"collection_id": event.collection_id,
|
||||
"timestamp": event.timestamp.isoformat(),
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
client = WebhookClient(session, url, headers)
|
||||
return await client.send(payload)
|
||||
@@ -0,0 +1,48 @@
|
||||
"""Persistent notification queue for deferred notifications."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
from notify_bridge_core.storage import StorageBackend
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NotificationQueue:
|
||||
"""Persistent queue for notifications deferred during quiet hours."""
|
||||
|
||||
def __init__(self, backend: StorageBackend) -> None:
|
||||
self._backend = backend
|
||||
self._data: dict[str, Any] | None = None
|
||||
|
||||
async def async_load(self) -> None:
|
||||
self._data = await self._backend.load() or {"queue": []}
|
||||
|
||||
async def async_enqueue(self, notification_params: dict[str, Any]) -> None:
|
||||
if self._data is None:
|
||||
self._data = {"queue": []}
|
||||
self._data["queue"].append({
|
||||
"params": notification_params,
|
||||
"queued_at": datetime.now(timezone.utc).isoformat(),
|
||||
})
|
||||
await self._backend.save(self._data)
|
||||
|
||||
def get_all(self) -> list[dict[str, Any]]:
|
||||
if not self._data:
|
||||
return []
|
||||
return list(self._data.get("queue", []))
|
||||
|
||||
def has_pending(self) -> bool:
|
||||
return bool(self._data and self._data.get("queue"))
|
||||
|
||||
async def async_clear(self) -> None:
|
||||
if self._data:
|
||||
self._data["queue"] = []
|
||||
await self._backend.save(self._data)
|
||||
|
||||
async def async_remove(self) -> None:
|
||||
await self._backend.remove()
|
||||
self._data = None
|
||||
@@ -1 +1,6 @@
|
||||
"""Telegram notification client."""
|
||||
|
||||
from notify_bridge_core.notifications.telegram.client import TelegramClient
|
||||
from notify_bridge_core.notifications.telegram.cache import TelegramFileCache
|
||||
|
||||
__all__ = ["TelegramClient", "TelegramFileCache"]
|
||||
|
||||
@@ -0,0 +1,129 @@
|
||||
"""Telegram file_id cache with pluggable storage backend."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
from notify_bridge_core.storage import StorageBackend
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_TELEGRAM_CACHE_TTL = 48 * 60 * 60
|
||||
|
||||
|
||||
class TelegramFileCache:
|
||||
"""Cache for Telegram file_ids to avoid re-uploading media.
|
||||
|
||||
Supports two validation modes:
|
||||
- TTL mode (default): entries expire after a configured time-to-live
|
||||
- Thumbhash mode: entries validated by comparing stored thumbhash with current
|
||||
"""
|
||||
|
||||
THUMBHASH_MAX_ENTRIES = 2000
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
backend: StorageBackend,
|
||||
ttl_seconds: int = DEFAULT_TELEGRAM_CACHE_TTL,
|
||||
use_thumbhash: bool = False,
|
||||
) -> None:
|
||||
self._backend = backend
|
||||
self._data: dict[str, Any] | None = None
|
||||
self._ttl_seconds = ttl_seconds
|
||||
self._use_thumbhash = use_thumbhash
|
||||
|
||||
async def async_load(self) -> None:
|
||||
self._data = await self._backend.load() or {"files": {}}
|
||||
await self._cleanup_expired()
|
||||
|
||||
async def _cleanup_expired(self) -> None:
|
||||
if self._use_thumbhash:
|
||||
files = self._data.get("files", {}) if self._data else {}
|
||||
if len(files) > self.THUMBHASH_MAX_ENTRIES:
|
||||
sorted_keys = sorted(files, key=lambda k: files[k].get("cached_at", ""))
|
||||
for key in sorted_keys[: len(files) - self.THUMBHASH_MAX_ENTRIES]:
|
||||
del files[key]
|
||||
await self._backend.save(self._data)
|
||||
return
|
||||
|
||||
if not self._data or "files" not in self._data:
|
||||
return
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
expired = [
|
||||
url for url, entry in self._data["files"].items()
|
||||
if entry.get("cached_at") and
|
||||
(now - datetime.fromisoformat(entry["cached_at"])).total_seconds() > self._ttl_seconds
|
||||
]
|
||||
|
||||
if expired:
|
||||
for key in expired:
|
||||
del self._data["files"][key]
|
||||
await self._backend.save(self._data)
|
||||
|
||||
def get(self, key: str, thumbhash: str | None = None) -> dict[str, Any] | None:
|
||||
if not self._data or "files" not in self._data:
|
||||
return None
|
||||
|
||||
entry = self._data["files"].get(key)
|
||||
if not entry:
|
||||
return None
|
||||
|
||||
if self._use_thumbhash:
|
||||
if thumbhash is not None:
|
||||
stored = entry.get("thumbhash")
|
||||
if stored and stored != thumbhash:
|
||||
del self._data["files"][key]
|
||||
return None
|
||||
else:
|
||||
cached_at_str = entry.get("cached_at")
|
||||
if cached_at_str:
|
||||
age = (datetime.now(timezone.utc) - datetime.fromisoformat(cached_at_str)).total_seconds()
|
||||
if age > self._ttl_seconds:
|
||||
return None
|
||||
|
||||
return {"file_id": entry.get("file_id"), "type": entry.get("type")}
|
||||
|
||||
async def async_set(
|
||||
self, key: str, file_id: str, media_type: str, thumbhash: str | None = None
|
||||
) -> None:
|
||||
if self._data is None:
|
||||
self._data = {"files": {}}
|
||||
|
||||
entry: dict[str, Any] = {
|
||||
"file_id": file_id,
|
||||
"type": media_type,
|
||||
"cached_at": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
if thumbhash is not None:
|
||||
entry["thumbhash"] = thumbhash
|
||||
|
||||
self._data["files"][key] = entry
|
||||
await self._backend.save(self._data)
|
||||
|
||||
async def async_set_many(
|
||||
self, entries: list[tuple[str, str, str, str | None]]
|
||||
) -> None:
|
||||
if not entries:
|
||||
return
|
||||
if self._data is None:
|
||||
self._data = {"files": {}}
|
||||
|
||||
now_iso = datetime.now(timezone.utc).isoformat()
|
||||
for key, file_id, media_type, thumbhash in entries:
|
||||
entry: dict[str, Any] = {
|
||||
"file_id": file_id,
|
||||
"type": media_type,
|
||||
"cached_at": now_iso,
|
||||
}
|
||||
if thumbhash is not None:
|
||||
entry["thumbhash"] = thumbhash
|
||||
self._data["files"][key] = entry
|
||||
|
||||
await self._backend.save(self._data)
|
||||
|
||||
async def async_remove(self) -> None:
|
||||
await self._backend.remove()
|
||||
self._data = None
|
||||
@@ -0,0 +1,481 @@
|
||||
"""Telegram Bot API client for sending notifications with media."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import mimetypes
|
||||
from typing import Any, Callable
|
||||
|
||||
import aiohttp
|
||||
from aiohttp import FormData
|
||||
|
||||
from .cache import TelegramFileCache
|
||||
from .media import (
|
||||
TELEGRAM_API_BASE_URL,
|
||||
TELEGRAM_MAX_PHOTO_SIZE,
|
||||
TELEGRAM_MAX_VIDEO_SIZE,
|
||||
check_photo_limits,
|
||||
extract_asset_id_from_url,
|
||||
is_asset_id,
|
||||
split_media_by_upload_size,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
NotificationResult = dict[str, Any]
|
||||
|
||||
|
||||
class TelegramClient:
|
||||
"""Async Telegram Bot API client for sending notifications with media."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
session: aiohttp.ClientSession,
|
||||
bot_token: str,
|
||||
*,
|
||||
url_cache: TelegramFileCache | None = None,
|
||||
asset_cache: TelegramFileCache | None = None,
|
||||
url_resolver: Callable[[str], str] | None = None,
|
||||
thumbhash_resolver: Callable[[str], str | None] | None = None,
|
||||
) -> None:
|
||||
self._session = session
|
||||
self._token = bot_token
|
||||
self._url_cache = url_cache
|
||||
self._asset_cache = asset_cache
|
||||
self._url_resolver = url_resolver
|
||||
self._thumbhash_resolver = thumbhash_resolver
|
||||
|
||||
def _resolve_url(self, url: str) -> str:
|
||||
if self._url_resolver:
|
||||
return self._url_resolver(url)
|
||||
return url
|
||||
|
||||
def _get_cache_and_key(
|
||||
self, url: str | None, cache_key: str | None = None,
|
||||
) -> tuple[TelegramFileCache | None, str | None, str | None]:
|
||||
if cache_key:
|
||||
return self._url_cache, cache_key, None
|
||||
if url:
|
||||
if is_asset_id(url):
|
||||
thumbhash = self._thumbhash_resolver(url) if self._thumbhash_resolver else None
|
||||
return self._asset_cache, url, thumbhash
|
||||
asset_id = extract_asset_id_from_url(url)
|
||||
if asset_id:
|
||||
thumbhash = self._thumbhash_resolver(asset_id) if self._thumbhash_resolver else None
|
||||
return self._asset_cache, asset_id, thumbhash
|
||||
return self._url_cache, url, None
|
||||
return None, None, None
|
||||
|
||||
def _get_cache_for_key(self, key: str, is_asset: bool | None = None) -> TelegramFileCache | None:
|
||||
if is_asset is None:
|
||||
is_asset = is_asset_id(key)
|
||||
return self._asset_cache if is_asset else self._url_cache
|
||||
|
||||
async def send_notification(
|
||||
self,
|
||||
chat_id: str,
|
||||
assets: list[dict[str, str]] | None = None,
|
||||
caption: str | None = None,
|
||||
reply_to_message_id: int | None = None,
|
||||
disable_web_page_preview: bool | None = None,
|
||||
parse_mode: str = "HTML",
|
||||
max_group_size: int = 10,
|
||||
chunk_delay: int = 0,
|
||||
max_asset_data_size: int | None = None,
|
||||
send_large_photos_as_documents: bool = False,
|
||||
chat_action: str | None = "typing",
|
||||
) -> NotificationResult:
|
||||
if not assets:
|
||||
return await self.send_message(
|
||||
chat_id, caption or "", reply_to_message_id,
|
||||
disable_web_page_preview, parse_mode,
|
||||
)
|
||||
|
||||
typing_task = None
|
||||
if chat_action:
|
||||
typing_task = self._start_typing_indicator(chat_id, chat_action)
|
||||
|
||||
try:
|
||||
if len(assets) == 1 and assets[0].get("type") == "photo":
|
||||
return await self._send_photo(
|
||||
chat_id, assets[0].get("url"), caption, reply_to_message_id,
|
||||
parse_mode, max_asset_data_size, send_large_photos_as_documents,
|
||||
assets[0].get("content_type"), assets[0].get("cache_key"),
|
||||
)
|
||||
if len(assets) == 1 and assets[0].get("type") == "video":
|
||||
return await self._send_video(
|
||||
chat_id, assets[0].get("url"), caption, reply_to_message_id,
|
||||
parse_mode, max_asset_data_size,
|
||||
assets[0].get("content_type"), assets[0].get("cache_key"),
|
||||
)
|
||||
if len(assets) == 1 and assets[0].get("type", "document") == "document":
|
||||
url = assets[0].get("url")
|
||||
if not url:
|
||||
return {"success": False, "error": "Missing 'url' for document"}
|
||||
try:
|
||||
download_url = self._resolve_url(url)
|
||||
async with self._session.get(download_url) as resp:
|
||||
if resp.status != 200:
|
||||
return {"success": False, "error": f"Failed to download media: HTTP {resp.status}"}
|
||||
data = await resp.read()
|
||||
if max_asset_data_size is not None and len(data) > max_asset_data_size:
|
||||
return {"success": False, "error": f"Media size exceeds limit"}
|
||||
filename = url.split("/")[-1].split("?")[0] or "file"
|
||||
return await self._send_document(
|
||||
chat_id, data, filename, caption, reply_to_message_id,
|
||||
parse_mode, url, assets[0].get("content_type"),
|
||||
assets[0].get("cache_key"),
|
||||
)
|
||||
except aiohttp.ClientError as err:
|
||||
return {"success": False, "error": f"Failed to download media: {err}"}
|
||||
|
||||
return await self._send_media_group(
|
||||
chat_id, assets, caption, reply_to_message_id, max_group_size,
|
||||
chunk_delay, parse_mode, max_asset_data_size,
|
||||
send_large_photos_as_documents,
|
||||
)
|
||||
finally:
|
||||
if typing_task:
|
||||
typing_task.cancel()
|
||||
try:
|
||||
await typing_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
async def send_message(
|
||||
self,
|
||||
chat_id: str,
|
||||
text: str,
|
||||
reply_to_message_id: int | None = None,
|
||||
disable_web_page_preview: bool | None = None,
|
||||
parse_mode: str = "HTML",
|
||||
) -> NotificationResult:
|
||||
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendMessage"
|
||||
payload: dict[str, Any] = {
|
||||
"chat_id": chat_id,
|
||||
"text": text or "Notification",
|
||||
"parse_mode": parse_mode,
|
||||
}
|
||||
if reply_to_message_id:
|
||||
payload["reply_to_message_id"] = reply_to_message_id
|
||||
if disable_web_page_preview is not None:
|
||||
payload["disable_web_page_preview"] = disable_web_page_preview
|
||||
|
||||
try:
|
||||
async with self._session.post(telegram_url, json=payload) as response:
|
||||
result = await response.json()
|
||||
if response.status == 200 and result.get("ok"):
|
||||
return {"success": True, "message_id": result.get("result", {}).get("message_id")}
|
||||
return {"success": False, "error": result.get("description", "Unknown Telegram error"), "error_code": result.get("error_code")}
|
||||
except aiohttp.ClientError as err:
|
||||
return {"success": False, "error": str(err)}
|
||||
|
||||
async def send_chat_action(self, chat_id: str, action: str = "typing") -> bool:
|
||||
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendChatAction"
|
||||
try:
|
||||
async with self._session.post(telegram_url, json={"chat_id": chat_id, "action": action}) as response:
|
||||
result = await response.json()
|
||||
return response.status == 200 and result.get("ok", False)
|
||||
except aiohttp.ClientError:
|
||||
return False
|
||||
|
||||
def _start_typing_indicator(self, chat_id: str, action: str = "typing") -> asyncio.Task:
|
||||
async def action_loop() -> None:
|
||||
try:
|
||||
while True:
|
||||
await self.send_chat_action(chat_id, action)
|
||||
await asyncio.sleep(4)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
return asyncio.create_task(action_loop())
|
||||
|
||||
async def _send_photo(
|
||||
self, chat_id: str, url: str | None, caption: str | None = None,
|
||||
reply_to_message_id: int | None = None, parse_mode: str = "HTML",
|
||||
max_asset_data_size: int | None = None, send_large_photos_as_documents: bool = False,
|
||||
content_type: str | None = None, cache_key: str | None = None,
|
||||
) -> NotificationResult:
|
||||
if not content_type:
|
||||
content_type = "image/jpeg"
|
||||
if not url:
|
||||
return {"success": False, "error": "Missing 'url' for photo"}
|
||||
|
||||
effective_cache, effective_cache_key, effective_thumbhash = self._get_cache_and_key(url, cache_key)
|
||||
|
||||
# Check cache
|
||||
cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None
|
||||
if cached and cached.get("file_id"):
|
||||
payload = {"chat_id": chat_id, "photo": cached["file_id"], "parse_mode": parse_mode}
|
||||
if caption:
|
||||
payload["caption"] = caption
|
||||
if reply_to_message_id:
|
||||
payload["reply_to_message_id"] = reply_to_message_id
|
||||
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendPhoto"
|
||||
try:
|
||||
async with self._session.post(telegram_url, json=payload) as response:
|
||||
result = await response.json()
|
||||
if response.status == 200 and result.get("ok"):
|
||||
return {"success": True, "message_id": result.get("result", {}).get("message_id"), "cached": True}
|
||||
except aiohttp.ClientError:
|
||||
pass
|
||||
|
||||
try:
|
||||
download_url = self._resolve_url(url)
|
||||
async with self._session.get(download_url) as resp:
|
||||
if resp.status != 200:
|
||||
return {"success": False, "error": f"Failed to download photo: HTTP {resp.status}"}
|
||||
data = await resp.read()
|
||||
|
||||
if max_asset_data_size is not None and len(data) > max_asset_data_size:
|
||||
return {"success": False, "error": "Photo exceeds size limit", "skipped": True}
|
||||
|
||||
exceeds_limits, reason, _, _ = check_photo_limits(data)
|
||||
if exceeds_limits:
|
||||
if send_large_photos_as_documents:
|
||||
return await self._send_document(chat_id, data, "photo.jpg", caption, reply_to_message_id, parse_mode, url, None, cache_key)
|
||||
return {"success": False, "error": f"Photo {reason}", "skipped": True}
|
||||
|
||||
form = FormData()
|
||||
form.add_field("chat_id", chat_id)
|
||||
form.add_field("photo", data, filename="photo.jpg", content_type=content_type)
|
||||
form.add_field("parse_mode", parse_mode)
|
||||
if caption:
|
||||
form.add_field("caption", caption)
|
||||
if reply_to_message_id:
|
||||
form.add_field("reply_to_message_id", str(reply_to_message_id))
|
||||
|
||||
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendPhoto"
|
||||
async with self._session.post(telegram_url, data=form) as response:
|
||||
result = await response.json()
|
||||
if response.status == 200 and result.get("ok"):
|
||||
photos = result.get("result", {}).get("photo", [])
|
||||
if photos and effective_cache and effective_cache_key:
|
||||
file_id = photos[-1].get("file_id")
|
||||
if file_id:
|
||||
await effective_cache.async_set(effective_cache_key, file_id, "photo", thumbhash=effective_thumbhash)
|
||||
return {"success": True, "message_id": result.get("result", {}).get("message_id")}
|
||||
return {"success": False, "error": result.get("description", "Unknown Telegram error")}
|
||||
except aiohttp.ClientError as err:
|
||||
return {"success": False, "error": str(err)}
|
||||
|
||||
async def _send_video(
|
||||
self, chat_id: str, url: str | None, caption: str | None = None,
|
||||
reply_to_message_id: int | None = None, parse_mode: str = "HTML",
|
||||
max_asset_data_size: int | None = None, content_type: str | None = None,
|
||||
cache_key: str | None = None,
|
||||
) -> NotificationResult:
|
||||
if not content_type:
|
||||
content_type = "video/mp4"
|
||||
if not url:
|
||||
return {"success": False, "error": "Missing 'url' for video"}
|
||||
|
||||
effective_cache, effective_cache_key, effective_thumbhash = self._get_cache_and_key(url, cache_key)
|
||||
|
||||
cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None
|
||||
if cached and cached.get("file_id"):
|
||||
payload = {"chat_id": chat_id, "video": cached["file_id"], "parse_mode": parse_mode}
|
||||
if caption:
|
||||
payload["caption"] = caption
|
||||
if reply_to_message_id:
|
||||
payload["reply_to_message_id"] = reply_to_message_id
|
||||
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendVideo"
|
||||
try:
|
||||
async with self._session.post(telegram_url, json=payload) as response:
|
||||
result = await response.json()
|
||||
if response.status == 200 and result.get("ok"):
|
||||
return {"success": True, "message_id": result.get("result", {}).get("message_id"), "cached": True}
|
||||
except aiohttp.ClientError:
|
||||
pass
|
||||
|
||||
try:
|
||||
download_url = self._resolve_url(url)
|
||||
async with self._session.get(download_url) as resp:
|
||||
if resp.status != 200:
|
||||
return {"success": False, "error": f"Failed to download video: HTTP {resp.status}"}
|
||||
data = await resp.read()
|
||||
|
||||
if max_asset_data_size is not None and len(data) > max_asset_data_size:
|
||||
return {"success": False, "error": "Video exceeds size limit", "skipped": True}
|
||||
if len(data) > TELEGRAM_MAX_VIDEO_SIZE:
|
||||
return {"success": False, "error": f"Video exceeds Telegram's {TELEGRAM_MAX_VIDEO_SIZE // (1024*1024)} MB limit", "skipped": True}
|
||||
|
||||
form = FormData()
|
||||
form.add_field("chat_id", chat_id)
|
||||
form.add_field("video", data, filename="video.mp4", content_type=content_type)
|
||||
form.add_field("parse_mode", parse_mode)
|
||||
if caption:
|
||||
form.add_field("caption", caption)
|
||||
if reply_to_message_id:
|
||||
form.add_field("reply_to_message_id", str(reply_to_message_id))
|
||||
|
||||
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendVideo"
|
||||
async with self._session.post(telegram_url, data=form) as response:
|
||||
result = await response.json()
|
||||
if response.status == 200 and result.get("ok"):
|
||||
video = result.get("result", {}).get("video", {})
|
||||
if video and effective_cache and effective_cache_key:
|
||||
file_id = video.get("file_id")
|
||||
if file_id:
|
||||
await effective_cache.async_set(effective_cache_key, file_id, "video", thumbhash=effective_thumbhash)
|
||||
return {"success": True, "message_id": result.get("result", {}).get("message_id")}
|
||||
return {"success": False, "error": result.get("description", "Unknown Telegram error")}
|
||||
except aiohttp.ClientError as err:
|
||||
return {"success": False, "error": str(err)}
|
||||
|
||||
async def _send_document(
|
||||
self, chat_id: str, data: bytes, filename: str = "file",
|
||||
caption: str | None = None, reply_to_message_id: int | None = None,
|
||||
parse_mode: str = "HTML", source_url: str | None = None,
|
||||
content_type: str | None = None, cache_key: str | None = None,
|
||||
) -> NotificationResult:
|
||||
if not content_type:
|
||||
content_type, _ = mimetypes.guess_type(filename)
|
||||
if not content_type:
|
||||
content_type = "application/octet-stream"
|
||||
|
||||
effective_cache, effective_cache_key, effective_thumbhash = self._get_cache_and_key(source_url, cache_key)
|
||||
|
||||
if effective_cache and effective_cache_key:
|
||||
cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash)
|
||||
if cached and cached.get("file_id") and cached.get("type") == "document":
|
||||
payload = {"chat_id": chat_id, "document": cached["file_id"], "parse_mode": parse_mode}
|
||||
if caption:
|
||||
payload["caption"] = caption
|
||||
if reply_to_message_id:
|
||||
payload["reply_to_message_id"] = reply_to_message_id
|
||||
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendDocument"
|
||||
try:
|
||||
async with self._session.post(telegram_url, json=payload) as response:
|
||||
result = await response.json()
|
||||
if response.status == 200 and result.get("ok"):
|
||||
return {"success": True, "message_id": result.get("result", {}).get("message_id"), "cached": True}
|
||||
except aiohttp.ClientError:
|
||||
pass
|
||||
|
||||
try:
|
||||
form = FormData()
|
||||
form.add_field("chat_id", chat_id)
|
||||
form.add_field("document", data, filename=filename, content_type=content_type)
|
||||
form.add_field("parse_mode", parse_mode)
|
||||
if caption:
|
||||
form.add_field("caption", caption)
|
||||
if reply_to_message_id:
|
||||
form.add_field("reply_to_message_id", str(reply_to_message_id))
|
||||
|
||||
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendDocument"
|
||||
async with self._session.post(telegram_url, data=form) as response:
|
||||
result = await response.json()
|
||||
if response.status == 200 and result.get("ok"):
|
||||
if effective_cache_key and effective_cache:
|
||||
document = result.get("result", {}).get("document", {})
|
||||
file_id = document.get("file_id")
|
||||
if file_id:
|
||||
await effective_cache.async_set(effective_cache_key, file_id, "document", thumbhash=effective_thumbhash)
|
||||
return {"success": True, "message_id": result.get("result", {}).get("message_id")}
|
||||
return {"success": False, "error": result.get("description", "Unknown Telegram error")}
|
||||
except aiohttp.ClientError as err:
|
||||
return {"success": False, "error": str(err)}
|
||||
|
||||
async def _send_media_group(
|
||||
self, chat_id: str, assets: list[dict[str, str]],
|
||||
caption: str | None = None, reply_to_message_id: int | None = None,
|
||||
max_group_size: int = 10, chunk_delay: int = 0, parse_mode: str = "HTML",
|
||||
max_asset_data_size: int | None = None, send_large_photos_as_documents: bool = False,
|
||||
) -> NotificationResult:
|
||||
chunks = [assets[i:i + max_group_size] for i in range(0, len(assets), max_group_size)]
|
||||
all_message_ids: list = []
|
||||
|
||||
for chunk_idx, chunk in enumerate(chunks):
|
||||
if chunk_idx > 0 and chunk_delay > 0:
|
||||
await asyncio.sleep(chunk_delay / 1000)
|
||||
|
||||
if len(chunk) == 1:
|
||||
item = chunk[0]
|
||||
chunk_caption = caption if chunk_idx == 0 else None
|
||||
chunk_reply = reply_to_message_id if chunk_idx == 0 else None
|
||||
if item.get("type") == "photo":
|
||||
result = await self._send_photo(chat_id, item.get("url"), chunk_caption, chunk_reply, parse_mode, max_asset_data_size, send_large_photos_as_documents, item.get("content_type"), item.get("cache_key"))
|
||||
elif item.get("type") == "video":
|
||||
result = await self._send_video(chat_id, item.get("url"), chunk_caption, chunk_reply, parse_mode, max_asset_data_size, item.get("content_type"), item.get("cache_key"))
|
||||
else:
|
||||
continue
|
||||
if not result.get("success"):
|
||||
result["failed_at_chunk"] = chunk_idx + 1
|
||||
return result
|
||||
all_message_ids.append(result.get("message_id"))
|
||||
continue
|
||||
|
||||
# Multi-item: download all, build form, send media group
|
||||
form = FormData()
|
||||
form.add_field("chat_id", chat_id)
|
||||
if reply_to_message_id and chunk_idx == 0:
|
||||
form.add_field("reply_to_message_id", str(reply_to_message_id))
|
||||
|
||||
media_json = []
|
||||
upload_idx = 0
|
||||
|
||||
for i, item in enumerate(chunk):
|
||||
url = item.get("url")
|
||||
if not url:
|
||||
continue
|
||||
media_type = item.get("type", "photo")
|
||||
custom_cache_key = item.get("cache_key")
|
||||
|
||||
# Check cache
|
||||
ck = custom_cache_key or extract_asset_id_from_url(url) or url
|
||||
ck_is_asset = is_asset_id(ck)
|
||||
item_cache = self._get_cache_for_key(ck, ck_is_asset)
|
||||
item_thumbhash = self._thumbhash_resolver(ck) if ck_is_asset and self._thumbhash_resolver else None
|
||||
cached = item_cache.get(ck, thumbhash=item_thumbhash) if item_cache else None
|
||||
|
||||
if cached and cached.get("file_id"):
|
||||
mij: dict[str, Any] = {"type": media_type, "media": cached["file_id"]}
|
||||
else:
|
||||
try:
|
||||
download_url = self._resolve_url(url)
|
||||
async with self._session.get(download_url) as resp:
|
||||
if resp.status != 200:
|
||||
continue
|
||||
data = await resp.read()
|
||||
if max_asset_data_size and len(data) > max_asset_data_size:
|
||||
continue
|
||||
if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE:
|
||||
continue
|
||||
if media_type == "photo":
|
||||
exceeds, _, _, _ = check_photo_limits(data)
|
||||
if exceeds:
|
||||
continue
|
||||
|
||||
attach_name = f"file{upload_idx}"
|
||||
ct = item.get("content_type") or ("image/jpeg" if media_type == "photo" else "video/mp4")
|
||||
ext = "jpg" if media_type == "photo" else "mp4"
|
||||
form.add_field(attach_name, data, filename=f"media_{i}.{ext}", content_type=ct)
|
||||
mij = {"type": media_type, "media": f"attach://{attach_name}"}
|
||||
upload_idx += 1
|
||||
except aiohttp.ClientError:
|
||||
continue
|
||||
|
||||
if i == 0 and chunk_idx == 0 and caption:
|
||||
mij["caption"] = caption
|
||||
mij["parse_mode"] = parse_mode
|
||||
media_json.append(mij)
|
||||
|
||||
if not media_json:
|
||||
continue
|
||||
|
||||
form.add_field("media", json.dumps(media_json))
|
||||
telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendMediaGroup"
|
||||
|
||||
try:
|
||||
async with self._session.post(telegram_url, data=form) as response:
|
||||
result = await response.json()
|
||||
if response.status == 200 and result.get("ok"):
|
||||
all_message_ids.extend(msg.get("message_id") for msg in result.get("result", []))
|
||||
else:
|
||||
return {"success": False, "error": result.get("description", "Unknown"), "failed_at_chunk": chunk_idx + 1}
|
||||
except aiohttp.ClientError as err:
|
||||
return {"success": False, "error": str(err), "failed_at_chunk": chunk_idx + 1}
|
||||
|
||||
return {"success": True, "message_ids": all_message_ids, "chunks_sent": len(chunks)}
|
||||
@@ -0,0 +1,92 @@
|
||||
"""Telegram media utilities - constants, URL helpers, and size splitting."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import Final
|
||||
|
||||
# Telegram constants
|
||||
TELEGRAM_API_BASE_URL: Final = "https://api.telegram.org/bot"
|
||||
TELEGRAM_MAX_PHOTO_SIZE: Final = 10 * 1024 * 1024 # 10 MB
|
||||
TELEGRAM_MAX_VIDEO_SIZE: Final = 50 * 1024 * 1024 # 50 MB
|
||||
TELEGRAM_MAX_DIMENSION_SUM: Final = 10000
|
||||
|
||||
# Generic UUID pattern for asset IDs
|
||||
_ASSET_ID_PATTERN = re.compile(r"^[a-f0-9-]{36}$")
|
||||
|
||||
# URL patterns to extract asset IDs (generic enough for Immich-style URLs)
|
||||
_ASSET_ID_URL_PATTERNS = [
|
||||
re.compile(r"/api/assets/([a-f0-9-]{36})/(?:original|thumbnail|video)"),
|
||||
re.compile(r"/share/[^/]+/photos/([a-f0-9-]{36})"),
|
||||
]
|
||||
|
||||
|
||||
def is_asset_id(value: str) -> bool:
|
||||
"""Check if a string is a valid asset ID (UUID format)."""
|
||||
return bool(_ASSET_ID_PATTERN.match(value))
|
||||
|
||||
|
||||
def extract_asset_id_from_url(url: str) -> str | None:
|
||||
"""Extract asset ID from a URL if possible."""
|
||||
if not url:
|
||||
return None
|
||||
for pattern in _ASSET_ID_URL_PATTERNS:
|
||||
match = pattern.search(url)
|
||||
if match:
|
||||
return match.group(1)
|
||||
return None
|
||||
|
||||
|
||||
def split_media_by_upload_size(
|
||||
media_items: list[tuple], max_upload_size: int
|
||||
) -> list[list[tuple]]:
|
||||
"""Split media items into sub-groups respecting upload size limit."""
|
||||
if not media_items:
|
||||
return []
|
||||
|
||||
groups: list[list[tuple]] = []
|
||||
current_group: list[tuple] = []
|
||||
current_size = 0
|
||||
|
||||
for item in media_items:
|
||||
media_ref = item[1]
|
||||
is_cached = item[4]
|
||||
item_size = 0 if is_cached else (len(media_ref) if isinstance(media_ref, bytes) else 0)
|
||||
|
||||
if current_group and current_size + item_size > max_upload_size:
|
||||
groups.append(current_group)
|
||||
current_group = []
|
||||
current_size = 0
|
||||
|
||||
current_group.append(item)
|
||||
current_size += item_size
|
||||
|
||||
if current_group:
|
||||
groups.append(current_group)
|
||||
|
||||
return groups
|
||||
|
||||
|
||||
def check_photo_limits(
|
||||
data: bytes,
|
||||
) -> tuple[bool, str | None, int | None, int | None]:
|
||||
"""Check if photo data exceeds Telegram photo limits."""
|
||||
if len(data) > TELEGRAM_MAX_PHOTO_SIZE:
|
||||
return (True, f"size {len(data)} bytes exceeds {TELEGRAM_MAX_PHOTO_SIZE} bytes limit", None, None)
|
||||
|
||||
try:
|
||||
from PIL import Image
|
||||
import io
|
||||
|
||||
img = Image.open(io.BytesIO(data))
|
||||
width, height = img.size
|
||||
dimension_sum = width + height
|
||||
|
||||
if dimension_sum > TELEGRAM_MAX_DIMENSION_SUM:
|
||||
return (True, f"dimensions {width}x{height} (sum={dimension_sum}) exceed {TELEGRAM_MAX_DIMENSION_SUM} limit", width, height)
|
||||
|
||||
return False, None, width, height
|
||||
except ImportError:
|
||||
return False, None, None, None
|
||||
except Exception:
|
||||
return False, None, None, None
|
||||
@@ -1 +1,5 @@
|
||||
"""Webhook notification client."""
|
||||
|
||||
from notify_bridge_core.notifications.webhook.client import WebhookClient
|
||||
|
||||
__all__ = ["WebhookClient"]
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
"""Generic webhook notification client."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WebhookClient:
|
||||
"""Send JSON payloads to a webhook URL."""
|
||||
|
||||
def __init__(self, session: aiohttp.ClientSession, url: str, headers: dict[str, str] | None = None) -> None:
|
||||
self._session = session
|
||||
self._url = url
|
||||
self._headers = headers or {}
|
||||
|
||||
async def send(self, payload: dict[str, Any]) -> dict[str, Any]:
|
||||
try:
|
||||
async with self._session.post(
|
||||
self._url,
|
||||
json=payload,
|
||||
headers={"Content-Type": "application/json", **self._headers},
|
||||
) as response:
|
||||
if 200 <= response.status < 300:
|
||||
return {"success": True, "status_code": response.status}
|
||||
body = await response.text()
|
||||
return {"success": False, "error": f"HTTP {response.status}", "body": body[:200]}
|
||||
except aiohttp.ClientError as err:
|
||||
return {"success": False, "error": str(err)}
|
||||
@@ -0,0 +1,52 @@
|
||||
"""Abstract storage backends and JSON file implementation."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Any, Protocol, runtime_checkable
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class StorageBackend(Protocol):
|
||||
"""Abstract storage backend for persisting JSON-serializable data."""
|
||||
|
||||
async def load(self) -> dict[str, Any] | None: ...
|
||||
async def save(self, data: dict[str, Any]) -> None: ...
|
||||
async def remove(self) -> None: ...
|
||||
|
||||
|
||||
class JsonFileBackend:
|
||||
"""Simple JSON file storage backend."""
|
||||
|
||||
def __init__(self, path: Path) -> None:
|
||||
self._path = path
|
||||
|
||||
async def load(self) -> dict[str, Any] | None:
|
||||
if not self._path.exists():
|
||||
return None
|
||||
try:
|
||||
text = self._path.read_text(encoding="utf-8")
|
||||
return json.loads(text)
|
||||
except (json.JSONDecodeError, OSError) as err:
|
||||
_LOGGER.warning("Failed to load %s: %s", self._path, err)
|
||||
return None
|
||||
|
||||
async def save(self, data: dict[str, Any]) -> None:
|
||||
try:
|
||||
self._path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self._path.write_text(
|
||||
json.dumps(data, default=str), encoding="utf-8"
|
||||
)
|
||||
except OSError as err:
|
||||
_LOGGER.error("Failed to save %s: %s", self._path, err)
|
||||
|
||||
async def remove(self) -> None:
|
||||
try:
|
||||
if self._path.exists():
|
||||
self._path.unlink()
|
||||
except OSError as err:
|
||||
_LOGGER.error("Failed to remove %s: %s", self._path, err)
|
||||
Reference in New Issue
Block a user