Add telegram media sender as service. Also fixed the via_device warnings that would break in HA 2025.12.0.
All checks were successful
Validate / Hassfest (push) Successful in 3s
All checks were successful
Validate / Hassfest (push) Successful in 3s
This commit is contained in:
@@ -38,9 +38,11 @@ from .const import (
|
||||
CONF_ALBUM_ID,
|
||||
CONF_ALBUM_NAME,
|
||||
CONF_HUB_NAME,
|
||||
CONF_TELEGRAM_BOT_TOKEN,
|
||||
DOMAIN,
|
||||
SERVICE_GET_RECENT_ASSETS,
|
||||
SERVICE_REFRESH,
|
||||
SERVICE_SEND_TELEGRAM_MEDIA_GROUP,
|
||||
)
|
||||
from .coordinator import AlbumData, ImmichAlbumWatcherCoordinator
|
||||
|
||||
@@ -96,6 +98,19 @@ async def async_setup_entry(
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
platform.async_register_entity_service(
|
||||
SERVICE_SEND_TELEGRAM_MEDIA_GROUP,
|
||||
{
|
||||
vol.Optional("bot_token"): str,
|
||||
vol.Required("chat_id"): vol.Coerce(str),
|
||||
vol.Required("urls"): vol.All(list, vol.Length(min=1, max=10)),
|
||||
vol.Optional("caption"): str,
|
||||
vol.Optional("reply_to_message_id"): vol.Coerce(int),
|
||||
},
|
||||
"async_send_telegram_media_group",
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
|
||||
class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], SensorEntity):
|
||||
"""Base sensor for Immich album."""
|
||||
@@ -143,7 +158,6 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se
|
||||
name=self._album_name,
|
||||
manufacturer="Immich",
|
||||
entry_type=DeviceEntryType.SERVICE,
|
||||
via_device=(DOMAIN, self._entry.entry_id),
|
||||
)
|
||||
|
||||
@callback
|
||||
@@ -160,6 +174,121 @@ class ImmichAlbumBaseSensor(CoordinatorEntity[ImmichAlbumWatcherCoordinator], Se
|
||||
assets = await self.coordinator.async_get_recent_assets(count)
|
||||
return {"assets": assets}
|
||||
|
||||
async def async_send_telegram_media_group(
|
||||
self,
|
||||
chat_id: str,
|
||||
urls: list[dict[str, str]],
|
||||
bot_token: str | None = None,
|
||||
caption: str | None = None,
|
||||
reply_to_message_id: int | None = None,
|
||||
) -> ServiceResponse:
|
||||
"""Send media URLs to Telegram as a media group.
|
||||
|
||||
Each item in urls should be a dict with 'url' and 'type' (photo/video).
|
||||
Downloads media and uploads to Telegram to bypass CORS restrictions.
|
||||
"""
|
||||
import json
|
||||
import aiohttp
|
||||
from aiohttp import FormData
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
|
||||
# Get bot token from parameter or config
|
||||
token = bot_token or self._entry.options.get(CONF_TELEGRAM_BOT_TOKEN)
|
||||
if not token:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "No bot token provided. Set it in integration options or pass as parameter.",
|
||||
}
|
||||
|
||||
session = async_get_clientsession(self.hass)
|
||||
|
||||
# Download all media files
|
||||
media_files: list[tuple[str, bytes, str]] = []
|
||||
for i, item in enumerate(urls):
|
||||
url = item.get("url")
|
||||
media_type = item.get("type", "photo")
|
||||
|
||||
if not url:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Missing 'url' in item {i}",
|
||||
}
|
||||
|
||||
if media_type not in ("photo", "video"):
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Invalid type '{media_type}' in item {i}. Must be 'photo' or 'video'.",
|
||||
}
|
||||
|
||||
try:
|
||||
_LOGGER.debug("Downloading media %d from %s", i, url[:80])
|
||||
async with session.get(url) as resp:
|
||||
if resp.status != 200:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Failed to download media {i}: HTTP {resp.status}",
|
||||
}
|
||||
data = await resp.read()
|
||||
ext = "jpg" if media_type == "photo" else "mp4"
|
||||
filename = f"media_{i}.{ext}"
|
||||
media_files.append((media_type, data, filename))
|
||||
_LOGGER.debug("Downloaded media %d: %d bytes", i, len(data))
|
||||
except aiohttp.ClientError as err:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Failed to download media {i}: {err}",
|
||||
}
|
||||
|
||||
# Build multipart form
|
||||
form = FormData()
|
||||
form.add_field("chat_id", chat_id)
|
||||
|
||||
if reply_to_message_id:
|
||||
form.add_field("reply_to_message_id", str(reply_to_message_id))
|
||||
|
||||
# Build media JSON with attach:// references
|
||||
media_json = []
|
||||
for i, (media_type, data, filename) in enumerate(media_files):
|
||||
attach_name = f"file{i}"
|
||||
media_item: dict[str, Any] = {
|
||||
"type": media_type,
|
||||
"media": f"attach://{attach_name}",
|
||||
}
|
||||
if i == 0 and caption:
|
||||
media_item["caption"] = caption
|
||||
media_json.append(media_item)
|
||||
|
||||
content_type = "image/jpeg" if media_type == "photo" else "video/mp4"
|
||||
form.add_field(attach_name, data, filename=filename, content_type=content_type)
|
||||
|
||||
form.add_field("media", json.dumps(media_json))
|
||||
|
||||
# Send to Telegram
|
||||
telegram_url = f"https://api.telegram.org/bot{token}/sendMediaGroup"
|
||||
|
||||
try:
|
||||
_LOGGER.debug("Uploading %d files to Telegram", len(media_files))
|
||||
async with session.post(telegram_url, data=form) as response:
|
||||
result = await response.json()
|
||||
_LOGGER.debug("Telegram API response: status=%d, ok=%s", response.status, result.get("ok"))
|
||||
if response.status == 200 and result.get("ok"):
|
||||
return {
|
||||
"success": True,
|
||||
"message_ids": [
|
||||
msg.get("message_id") for msg in result.get("result", [])
|
||||
],
|
||||
}
|
||||
else:
|
||||
_LOGGER.error("Telegram API error: %s", result)
|
||||
return {
|
||||
"success": False,
|
||||
"error": result.get("description", "Unknown Telegram error"),
|
||||
"error_code": result.get("error_code"),
|
||||
}
|
||||
except aiohttp.ClientError as err:
|
||||
_LOGGER.error("Telegram upload failed: %s", err)
|
||||
return {"success": False, "error": str(err)}
|
||||
|
||||
|
||||
class ImmichAlbumIdSensor(ImmichAlbumBaseSensor):
|
||||
"""Sensor exposing the Immich album ID."""
|
||||
|
||||
Reference in New Issue
Block a user