feat: Google Photos provider backend + API hardening

- Add Google Photos provider: client, models, change detector, capabilities
- Add notification templates (en/ru) for all GP event slots
- Add command templates (en/ru) for GP bot commands
- Register GP in slot/command loaders, capabilities, and seeds
- Harden provider API: validate OAuth credentials on create/update
- Add internal URL rewriting for asset fetches (LAN optimization)
- Fix template renderer to handle missing variables gracefully
- Improve webhook command routing for multi-provider support
- Add provider health check endpoint and watcher improvements
This commit is contained in:
2026-03-25 22:07:03 +03:00
parent 337276113d
commit 307871cae5
73 changed files with 1154 additions and 144 deletions
@@ -19,6 +19,7 @@ class ServiceProviderType(str, Enum):
PLANKA = "planka"
SCHEDULER = "scheduler"
NUT = "nut"
GOOGLE_PHOTOS = "google_photos"
class ServiceProvider(ABC):
@@ -352,6 +352,61 @@ NUT_CAPABILITIES = ProviderCapabilities(
],
)
# ---------------------------------------------------------------------------
# Google Photos provider capabilities
# ---------------------------------------------------------------------------
GOOGLE_PHOTOS_CAPABILITIES = ProviderCapabilities(
provider_type="google_photos",
display_name="Google Photos",
webhook_based=False,
supported_filters=[
{"key": "collections", "label": "Albums", "type": "select", "source": "api"},
],
notification_slots=[
{"name": "message_assets_added", "description": "New media added to album"},
{"name": "message_assets_removed", "description": "Media removed from album"},
{"name": "message_collection_renamed", "description": "Album renamed"},
{"name": "message_collection_deleted", "description": "Album deleted"},
{"name": "message_sharing_changed", "description": "Sharing status changed"},
],
events=[
{"name": "assets_added", "description": "New media detected in album"},
{"name": "assets_removed", "description": "Media removed from album"},
{"name": "collection_renamed", "description": "Album was renamed"},
{"name": "collection_deleted", "description": "Album was deleted"},
{"name": "sharing_changed", "description": "Album sharing status changed"},
],
command_slots=[
{"name": "start", "description": "/start greeting message"},
{"name": "help", "description": "/help command listing"},
{"name": "status", "description": "/status tracker summary"},
{"name": "albums", "description": "/albums tracked albums list"},
{"name": "latest", "description": "/latest recent photos"},
{"name": "search", "description": "/search by category or date"},
{"name": "random", "description": "/random random photos"},
{"name": "rate_limited", "description": "Rate limit warning message"},
{"name": "no_results", "description": "Empty results fallback"},
{"name": "desc_help", "description": "Menu description for /help"},
{"name": "desc_status", "description": "Menu description for /status"},
{"name": "desc_albums", "description": "Menu description for /albums"},
{"name": "desc_latest", "description": "Menu description for /latest"},
{"name": "usage_latest", "description": "Usage example for /latest"},
{"name": "desc_search", "description": "Menu description for /search"},
{"name": "usage_search", "description": "Usage example for /search"},
{"name": "desc_random", "description": "Menu description for /random"},
{"name": "usage_random", "description": "Usage example for /random"},
],
commands=[
{"name": "status", "description": "Show tracker status"},
{"name": "albums", "description": "List tracked albums"},
{"name": "latest", "description": "Show latest photos"},
{"name": "search", "description": "Search media"},
{"name": "random", "description": "Random photos"},
{"name": "help", "description": "Show commands"},
],
)
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
@@ -362,6 +417,7 @@ _REGISTRY: dict[str, ProviderCapabilities] = {
"planka": PLANKA_CAPABILITIES,
"scheduler": SCHEDULER_CAPABILITIES,
"nut": NUT_CAPABILITIES,
"google_photos": GOOGLE_PHOTOS_CAPABILITIES,
}
@@ -0,0 +1,17 @@
"""Google Photos service provider implementation."""
from notify_bridge_core.providers.base import ServiceProviderType
from notify_bridge_core.templates.variables import registry
from .client import GooglePhotosClient, GooglePhotosApiError
from .provider import GooglePhotosServiceProvider, GOOGLE_PHOTOS_VARIABLES
# Register Google Photos variables in the global registry
registry.register_provider_variables(ServiceProviderType.GOOGLE_PHOTOS, GOOGLE_PHOTOS_VARIABLES)
__all__ = [
"GooglePhotosClient",
"GooglePhotosApiError",
"GooglePhotosServiceProvider",
"GOOGLE_PHOTOS_VARIABLES",
]
@@ -0,0 +1,275 @@
"""Google Photos Library API async client with OAuth2 token refresh."""
from __future__ import annotations
import logging
import time
from typing import Any
import aiohttp
from .constants import (
API_BASE_URL,
MAX_BATCH_GET,
MAX_PAGE_SIZE,
TOKEN_URL,
)
from .models import GooglePhotosAlbumData, GooglePhotosMediaItem
_LOGGER = logging.getLogger(__name__)
class GooglePhotosApiError(Exception):
"""Raised when the Google Photos API returns an error."""
def __init__(self, status: int, message: str) -> None:
self.status = status
super().__init__(f"Google Photos API error {status}: {message}")
class GooglePhotosClient:
"""Async HTTP client for the Google Photos Library API."""
def __init__(
self,
session: aiohttp.ClientSession,
client_id: str,
client_secret: str,
refresh_token: str,
) -> None:
self._session = session
self._client_id = client_id
self._client_secret = client_secret
self._refresh_token = refresh_token
self._access_token: str | None = None
self._token_expires_at: float = 0.0
@property
def access_token(self) -> str | None:
"""Current access token (may be expired)."""
return self._access_token
async def _ensure_token(self) -> None:
"""Refresh the OAuth2 access token if expired or missing."""
if self._access_token and time.monotonic() < self._token_expires_at:
return
_LOGGER.debug("Refreshing Google Photos access token")
data = {
"client_id": self._client_id,
"client_secret": self._client_secret,
"refresh_token": self._refresh_token,
"grant_type": "refresh_token",
}
async with self._session.post(TOKEN_URL, data=data) as resp:
if resp.status != 200:
body = await resp.text()
raise GooglePhotosApiError(resp.status, f"Token refresh failed: {body}")
result = await resp.json()
self._access_token = result["access_token"]
# Refresh ~60s before actual expiry to avoid races
expires_in = result.get("expires_in", 3600)
self._token_expires_at = time.monotonic() + expires_in - 60
async def _request(
self,
method: str,
path: str,
*,
params: dict[str, Any] | None = None,
json_body: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Make an authenticated API request."""
await self._ensure_token()
url = f"{API_BASE_URL}/{path}"
headers = {"Authorization": f"Bearer {self._access_token}"}
async with self._session.request(
method, url, headers=headers, params=params, json=json_body,
) as resp:
if resp.status == 200:
return await resp.json()
body = await resp.text()
raise GooglePhotosApiError(resp.status, body)
# ------------------------------------------------------------------
# Albums
# ------------------------------------------------------------------
async def ping(self) -> bool:
"""Test connectivity by listing a single album."""
try:
await self._request("GET", "albums", params={"pageSize": "1"})
return True
except Exception:
_LOGGER.exception("Google Photos ping failed")
return False
async def list_albums(self, *, include_shared: bool = True) -> list[dict[str, Any]]:
"""List all albums (owned + optionally shared)."""
albums: list[dict[str, Any]] = []
# Owned albums
page_token: str | None = None
while True:
params: dict[str, str] = {"pageSize": str(MAX_PAGE_SIZE)}
if page_token:
params["pageToken"] = page_token
result = await self._request("GET", "albums", params=params)
albums.extend(result.get("albums", []))
page_token = result.get("nextPageToken")
if not page_token:
break
# Shared albums
if include_shared:
page_token = None
while True:
params = {"pageSize": str(MAX_PAGE_SIZE)}
if page_token:
params["pageToken"] = page_token
result = await self._request("GET", "sharedAlbums", params=params)
shared = result.get("sharedAlbums", [])
# Deduplicate: shared albums the user owns are already in the owned list
existing_ids = {a["id"] for a in albums}
albums.extend(a for a in shared if a.get("id") not in existing_ids)
page_token = result.get("nextPageToken")
if not page_token:
break
return albums
async def get_album(self, album_id: str) -> dict[str, Any] | None:
"""Get a single album by ID."""
try:
return await self._request("GET", f"albums/{album_id}")
except GooglePhotosApiError as e:
if e.status == 404:
return None
raise
async def get_album_with_items(self, album_id: str) -> GooglePhotosAlbumData | None:
"""Fetch album metadata + all media items."""
album_data = await self.get_album(album_id)
if album_data is None:
return None
items = await self.list_media_items(album_id)
return GooglePhotosAlbumData.from_api_response(album_data, items)
# ------------------------------------------------------------------
# Media items
# ------------------------------------------------------------------
async def list_media_items(self, album_id: str) -> list[GooglePhotosMediaItem]:
"""List all media items in an album (handles pagination)."""
items: list[GooglePhotosMediaItem] = []
page_token: str | None = None
while True:
body: dict[str, Any] = {
"albumId": album_id,
"pageSize": MAX_PAGE_SIZE,
}
if page_token:
body["pageToken"] = page_token
result = await self._request("POST", "mediaItems:search", json_body=body)
for raw in result.get("mediaItems", []):
items.append(GooglePhotosMediaItem.from_api_response(raw))
page_token = result.get("nextPageToken")
if not page_token:
break
return items
async def get_media_item(self, media_item_id: str) -> GooglePhotosMediaItem | None:
"""Get a single media item (with fresh baseUrl)."""
try:
data = await self._request("GET", f"mediaItems/{media_item_id}")
return GooglePhotosMediaItem.from_api_response(data)
except GooglePhotosApiError as e:
if e.status == 404:
return None
raise
async def get_media_items_batch(
self, media_item_ids: list[str],
) -> dict[str, GooglePhotosMediaItem]:
"""Batch-get media items (up to 50 per call). Returns dict keyed by ID."""
result: dict[str, GooglePhotosMediaItem] = {}
for i in range(0, len(media_item_ids), MAX_BATCH_GET):
chunk = media_item_ids[i : i + MAX_BATCH_GET]
# batchGet requires repeated query params (mediaItemIds=a&mediaItemIds=b)
# which _request() doesn't support, so we make the request manually.
await self._ensure_token()
url = f"{API_BASE_URL}/mediaItems:batchGet"
headers = {"Authorization": f"Bearer {self._access_token}"}
query_params = "&".join(f"mediaItemIds={mid}" for mid in chunk)
async with self._session.get(f"{url}?{query_params}", headers=headers) as resp:
if resp.status != 200:
body = await resp.text()
_LOGGER.error("batchGet failed: %s %s", resp.status, body)
continue
data = await resp.json()
for item_result in data.get("mediaItemResults", []):
media_item = item_result.get("mediaItem")
if media_item:
parsed = GooglePhotosMediaItem.from_api_response(media_item)
result[parsed.id] = parsed
return result
async def search_media_items(
self,
*,
album_id: str | None = None,
category: str | None = None,
media_type: str | None = None,
date_start: str | None = None,
date_end: str | None = None,
page_size: int = 25,
) -> list[GooglePhotosMediaItem]:
"""Search media items with filters."""
body: dict[str, Any] = {"pageSize": min(page_size, MAX_PAGE_SIZE)}
if album_id:
body["albumId"] = album_id
else:
filters: dict[str, Any] = {}
if category:
filters["contentFilter"] = {
"includedContentCategories": [category.upper()],
}
if media_type:
filters["mediaTypeFilter"] = {"mediaTypes": [media_type.upper()]}
if date_start or date_end:
date_filter: dict[str, Any] = {}
if date_start:
parts = date_start.split("-")
date_filter["startDate"] = {
"year": int(parts[0]),
"month": int(parts[1]),
"day": int(parts[2]),
}
if date_end:
parts = date_end.split("-")
date_filter["endDate"] = {
"year": int(parts[0]),
"month": int(parts[1]),
"day": int(parts[2]),
}
filters["dateFilter"] = {"ranges": [date_filter]}
if filters:
body["filters"] = filters
result = await self._request("POST", "mediaItems:search", json_body=body)
return [
GooglePhotosMediaItem.from_api_response(raw)
for raw in result.get("mediaItems", [])
]
@@ -0,0 +1,26 @@
"""Google Photos provider constants."""
from typing import Final
# Google Photos Library API
API_BASE_URL: Final = "https://photoslibrary.googleapis.com/v1"
TOKEN_URL: Final = "https://oauth2.googleapis.com/token"
# OAuth scopes
SCOPE_READONLY: Final = "https://www.googleapis.com/auth/photoslibrary.readonly"
SCOPE_SHARING: Final = "https://www.googleapis.com/auth/photoslibrary.sharing"
# API limits
MAX_PAGE_SIZE: Final = 100 # max items per page
MAX_BATCH_GET: Final = 50 # max items per batchGet call
# Asset URL suffixes (appended to baseUrl)
PHOTO_DOWNLOAD_SUFFIX: Final = "=w2048-h2048" # max resolution photo
PHOTO_THUMBNAIL_SUFFIX: Final = "=w512-h512"
VIDEO_DOWNLOAD_SUFFIX: Final = "=dv" # video download
# Cache key prefix for Telegram file_id cache
CACHE_HOST: Final = "photos.googleapis.com"
# Defaults
DEFAULT_SCAN_INTERVAL: Final = 120 # seconds (Google API quota is tighter than Immich)
@@ -0,0 +1,108 @@
"""Google Photos data models."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass
class GooglePhotosMediaItem:
"""A media item from the Google Photos Library API."""
id: str
filename: str
mime_type: str
created_at: str # mediaMetadata.creationTime (ISO 8601)
base_url: str # temporary download URL (expires ~60 min)
product_url: str # permanent link to Google Photos UI
width: int | None = None
height: int | None = None
description: str = ""
is_video: bool = False
@property
def is_processed(self) -> bool:
"""A media item is considered processed if it has a base_url."""
return bool(self.base_url)
@classmethod
def from_api_response(cls, data: dict[str, Any]) -> GooglePhotosMediaItem:
"""Parse a mediaItem response from the Google Photos API."""
metadata = data.get("mediaMetadata", {})
mime_type = data.get("mimeType", "")
is_video = mime_type.startswith("video/") or "video" in metadata
width = None
height = None
if "width" in metadata:
try:
width = int(metadata["width"])
except (ValueError, TypeError):
pass
if "height" in metadata:
try:
height = int(metadata["height"])
except (ValueError, TypeError):
pass
return cls(
id=data["id"],
filename=data.get("filename", ""),
mime_type=mime_type,
created_at=metadata.get("creationTime", ""),
base_url=data.get("baseUrl", ""),
product_url=data.get("productUrl", ""),
width=width,
height=height,
description=data.get("description", ""),
is_video=is_video,
)
@dataclass
class GooglePhotosAlbumData:
"""Full album data assembled from Google Photos API responses."""
id: str
name: str
asset_count: int
product_url: str
is_shared: bool
cover_photo_id: str | None = None
share_info: dict[str, Any] = field(default_factory=dict)
asset_ids: set[str] = field(default_factory=set)
assets: dict[str, GooglePhotosMediaItem] = field(default_factory=dict)
@property
def photo_count(self) -> int:
return sum(1 for a in self.assets.values() if not a.is_video)
@property
def video_count(self) -> int:
return sum(1 for a in self.assets.values() if a.is_video)
@classmethod
def from_api_response(
cls, album_data: dict[str, Any], media_items: list[GooglePhotosMediaItem],
) -> GooglePhotosAlbumData:
"""Build album data from album metadata + fetched media items."""
asset_ids: set[str] = set()
assets: dict[str, GooglePhotosMediaItem] = {}
for item in media_items:
asset_ids.add(item.id)
assets[item.id] = item
share_info = album_data.get("shareInfo", {})
return cls(
id=album_data["id"],
name=album_data.get("title", "Untitled"),
asset_count=int(album_data.get("mediaItemsCount", len(asset_ids))),
product_url=album_data.get("productUrl", ""),
is_shared=bool(share_info),
cover_photo_id=album_data.get("coverPhotoMediaItemId"),
share_info=share_info,
asset_ids=asset_ids,
assets=assets,
)
@@ -0,0 +1,226 @@
"""Google Photos service provider — concrete implementation of ServiceProvider."""
from __future__ import annotations
import logging
from typing import Any
import aiohttp
from notify_bridge_core.models.events import ServiceEvent
from notify_bridge_core.providers.base import ServiceProvider, ServiceProviderType
from notify_bridge_core.templates.variables import TemplateVariableDefinition
from .change_detector import detect_album_changes
from .client import GooglePhotosClient
from .models import GooglePhotosAlbumData
_LOGGER = logging.getLogger(__name__)
# Google Photos specific template variables
GOOGLE_PHOTOS_VARIABLES: list[TemplateVariableDefinition] = [
TemplateVariableDefinition(
name="album_name",
type="string",
description="Album name (alias for collection_name)",
example="Vacation 2026",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
TemplateVariableDefinition(
name="album_id",
type="string",
description="Album ID (alias for collection_id)",
example="ABc123...",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
TemplateVariableDefinition(
name="album_url",
type="string",
description="Web URL to the album in Google Photos",
example="https://photos.google.com/lr/album/ABc123...",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
TemplateVariableDefinition(
name="photo_count",
type="int",
description="Number of photos in album",
example="42",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
TemplateVariableDefinition(
name="video_count",
type="int",
description="Number of videos in album",
example="8",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
TemplateVariableDefinition(
name="shared",
type="bool",
description="Whether the album is shared",
example="true",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
TemplateVariableDefinition(
name="old_album_name",
type="string",
description="Previous album name (for rename events)",
example="Old Name",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
TemplateVariableDefinition(
name="new_album_name",
type="string",
description="New album name (for rename events)",
example="New Name",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
]
class GooglePhotosServiceProvider(ServiceProvider):
"""Google Photos Library API provider."""
provider_type = ServiceProviderType.GOOGLE_PHOTOS
def __init__(
self,
session: aiohttp.ClientSession,
client_id: str,
client_secret: str,
refresh_token: str,
name: str = "Google Photos",
) -> None:
self._client = GooglePhotosClient(session, client_id, client_secret, refresh_token)
self._name = name
@property
def client(self) -> GooglePhotosClient:
return self._client
async def connect(self) -> bool:
return await self._client.ping()
async def disconnect(self) -> None:
pass # session lifecycle managed by caller
async def poll(
self,
collection_ids: list[str],
tracker_state: dict[str, Any],
) -> tuple[list[ServiceEvent], dict[str, Any]]:
events: list[ServiceEvent] = []
new_state = dict(tracker_state)
for album_id in collection_ids:
album = await self._client.get_album_with_items(album_id)
if album is None:
# Album deleted or inaccessible
if album_id in new_state:
from notify_bridge_core.models.events import EventType
from datetime import datetime, timezone
events.append(ServiceEvent(
event_type=EventType.COLLECTION_DELETED,
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
provider_name=self._name,
collection_id=album_id,
collection_name=new_state.get(album_id, {}).get("name", "Unknown"),
timestamp=datetime.now(timezone.utc),
))
del new_state[album_id]
continue
# Get previous state
prev = new_state.get(album_id)
if prev is None:
# First time seeing this album — store state, no event
new_state[album_id] = _serialize_album_state(album)
continue
# Reconstruct previous album data for comparison
old_album = _deserialize_album_state(album_id, prev)
pending = set(prev.get("pending_asset_ids", []))
detected_events, updated_pending = detect_album_changes(
old_album, album, pending, self._name,
)
if detected_events:
events.extend(detected_events)
# Update state
state = _serialize_album_state(album)
state["pending_asset_ids"] = list(updated_pending)
new_state[album_id] = state
return events, new_state
def get_available_variables(self) -> list[TemplateVariableDefinition]:
return list(GOOGLE_PHOTOS_VARIABLES)
def get_provider_config_schema(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"client_id": {
"type": "string",
"description": "Google OAuth2 Client ID",
},
"client_secret": {
"type": "string",
"description": "Google OAuth2 Client Secret",
"secret": True,
},
"refresh_token": {
"type": "string",
"description": "OAuth2 Refresh Token (from Google OAuth Playground)",
"secret": True,
},
},
"required": ["client_id", "client_secret", "refresh_token"],
}
async def list_collections(self) -> list[dict[str, Any]]:
albums = await self._client.list_albums(include_shared=True)
return [
{
"id": a.get("id", ""),
"name": a.get("title", "Untitled"),
"asset_count": int(a.get("mediaItemsCount", 0)),
}
for a in albums
]
async def test_connection(self) -> dict[str, Any]:
ok = await self._client.ping()
if ok:
albums = await self._client.list_albums(include_shared=False)
return {
"ok": True,
"message": f"Connected to Google Photos ({len(albums)} albums found)",
}
return {"ok": False, "message": "Failed to connect to Google Photos"}
def _serialize_album_state(album: GooglePhotosAlbumData) -> dict[str, Any]:
"""Serialize album state for persistence."""
return {
"name": album.name,
"asset_ids": list(album.asset_ids),
"shared": album.is_shared,
"pending_asset_ids": [],
}
def _deserialize_album_state(album_id: str, state: dict[str, Any]) -> GooglePhotosAlbumData:
"""Create a minimal GooglePhotosAlbumData from saved state for comparison."""
return GooglePhotosAlbumData(
id=album_id,
name=state.get("name", ""),
asset_count=len(state.get("asset_ids", [])),
product_url="",
is_shared=state.get("shared", False),
asset_ids=set(state.get("asset_ids", [])),
)
@@ -0,0 +1,8 @@
📚 Tracked albums:
{%- if albums %}
{%- for album in albums %}
• {{ album.name }} ({{ album.asset_count }} items)
{%- endfor %}
{%- else %}
(none)
{%- endif %}
@@ -0,0 +1 @@
Show available commands
@@ -0,0 +1 @@
Search photos by category
@@ -0,0 +1,4 @@
📷 <b>Available Commands</b>
{%- for cmd in commands %}
/{{ cmd.name }} — {{ cmd.description }}
{%- endfor %}
@@ -0,0 +1,4 @@
📸 Latest:
{%- for asset in assets %}
• {{ asset.originalFileName }}{% if asset.year %} ({{ asset.year }}){% endif %}
{%- endfor %}
@@ -0,0 +1,4 @@
🎲 Random:
{%- for asset in assets %}
• {{ asset.originalFileName }}
{%- endfor %}
@@ -0,0 +1 @@
⏳ Too many requests. Please wait a moment before trying again.
@@ -0,0 +1,4 @@
🔍 Results for "{{ query }}":
{%- for asset in assets %}
• {{ asset.originalFileName }}{% if asset.year %} ({{ asset.year }}){% endif %}
{%- endfor %}
@@ -0,0 +1,2 @@
👋 Hi! I'm your Notify Bridge bot for <b>Google Photos</b>.
Use /help to see available commands.
@@ -0,0 +1,6 @@
📷 <b>Google Photos Status</b>
Service: {{ service_name }}
Tracked albums: {{ album_count }}
{%- if last_event %}
Last event: {{ last_event.type }} ({{ last_event.time }})
{%- endif %}
@@ -47,6 +47,16 @@ PROVIDER_COMMAND_SLOTS: dict[str, list[str]] = {
# Description slots
"desc_help", "desc_status", "desc_devices", "desc_battery",
],
"google_photos": [
# Response templates
"start", "help", "status", "albums", "latest", "search", "random",
"rate_limited", "no_results",
# Description slots
"desc_help", "desc_status", "desc_albums",
"desc_latest", "desc_search", "desc_random",
# Usage example slots
"usage_latest", "usage_search", "usage_random",
],
}
# Backward-compatible aliases
@@ -0,0 +1,8 @@
📚 Отслеживаемые альбомы:
{%- if albums %}
{%- for album in albums %}
• {{ album.name }} ({{ album.asset_count }} элементов)
{%- endfor %}
{%- else %}
(нет)
{%- endif %}
@@ -0,0 +1 @@
Список отслеживаемых альбомов
@@ -0,0 +1 @@
Показать доступные команды
@@ -0,0 +1 @@
Последние фото
@@ -0,0 +1 @@
Случайные фото
@@ -0,0 +1 @@
Поиск фото по категории
@@ -0,0 +1 @@
Показать статус трекера
@@ -0,0 +1,4 @@
📷 <b>Доступные команды</b>
{%- for cmd in commands %}
/{{ cmd.name }} — {{ cmd.description }}
{%- endfor %}
@@ -0,0 +1,4 @@
📸 Последние:
{%- for asset in assets %}
• {{ asset.originalFileName }}{% if asset.year %} ({{ asset.year }}){% endif %}
{%- endfor %}
@@ -0,0 +1 @@
Ничего не найдено.
@@ -0,0 +1,4 @@
🎲 Случайные:
{%- for asset in assets %}
• {{ asset.originalFileName }}
{%- endfor %}
@@ -0,0 +1 @@
⏳ Слишком много запросов. Подождите немного.
@@ -0,0 +1,4 @@
🔍 Результаты по запросу "{{ query }}":
{%- for asset in assets %}
• {{ asset.originalFileName }}{% if asset.year %} ({{ asset.year }}){% endif %}
{%- endfor %}
@@ -0,0 +1,2 @@
👋 Привет! Я бот Notify Bridge для <b>Google Фото</b>.
Используйте /help для просмотра доступных команд.
@@ -0,0 +1,6 @@
📷 <b>Статус Google Фото</b>
Сервис: {{ service_name }}
Отслеживаемые альбомы: {{ album_count }}
{%- if last_event %}
Последнее событие: {{ last_event.type }} ({{ last_event.time }})
{%- endif %}
@@ -0,0 +1 @@
/latest [количество]
@@ -0,0 +1 @@
/random [количество]
@@ -0,0 +1 @@
/search [категория]
@@ -0,0 +1,12 @@
📎 {{ added_count }} new file(s) added to album {% if album_url %}<a href="{{ album_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}.
{%- if added_assets %}
{%- for asset in added_assets %}
• {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {{ asset.filename }}
{%- if asset.is_favorite %} ❤️{% endif %}
{%- if asset.oversized %} ⚠️{% endif %}
{%- endfor %}
{%- endif %}
{%- if has_oversized_videos %}
⚠️ Some videos exceed the {{ max_video_size_mb }} MB file size limit and may not be sent.
{%- endif %}
@@ -0,0 +1 @@
🗑️ {{ removed_count }} file(s) removed from album {% if album_url %}<a href="{{ album_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}.
@@ -0,0 +1 @@
🗑️ Album "{{ collection_name }}" was deleted.
@@ -0,0 +1 @@
✏️ Album "{{ old_name }}" renamed to {% if album_url %}<a href="{{ album_url }}">{{ new_name }}</a>{% else %}"{{ new_name }}"{% endif %}.
@@ -0,0 +1 @@
🔗 Sharing changed for album {% if album_url %}<a href="{{ album_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}.
@@ -53,6 +53,13 @@ PROVIDER_SLOT_FILE_MAP: dict[str, dict[str, str]] = {
"scheduler": {
"message_scheduled_message": "scheduled_message.jinja2",
},
"google_photos": {
"message_assets_added": "gp_assets_added.jinja2",
"message_assets_removed": "gp_assets_removed.jinja2",
"message_collection_renamed": "gp_collection_renamed.jinja2",
"message_collection_deleted": "gp_collection_deleted.jinja2",
"message_sharing_changed": "gp_sharing_changed.jinja2",
},
"nut": {
"message_ups_online": "nut_ups_online.jinja2",
"message_ups_on_battery": "nut_ups_on_battery.jinja2",
@@ -0,0 +1,12 @@
📎 {{ added_count }} новых файл(ов) добавлено в альбом {% if album_url %}<a href="{{ album_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}.
{%- if added_assets %}
{%- for asset in added_assets %}
• {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {{ asset.filename }}
{%- if asset.is_favorite %} ❤️{% endif %}
{%- if asset.oversized %} ⚠️{% endif %}
{%- endfor %}
{%- endif %}
{%- if has_oversized_videos %}
⚠️ Некоторые видео превышают лимит {{ max_video_size_mb }} МБ и могут не отправиться.
{%- endif %}
@@ -0,0 +1 @@
🗑️ {{ removed_count }} файл(ов) удалено из альбома {% if album_url %}<a href="{{ album_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}.
@@ -0,0 +1 @@
🗑️ Альбом "{{ collection_name }}" был удалён.
@@ -0,0 +1 @@
✏️ Альбом "{{ old_name }}" переименован в {% if album_url %}<a href="{{ album_url }}">{{ new_name }}</a>{% else %}"{{ new_name }}"{% endif %}.
@@ -0,0 +1 @@
🔗 Изменён доступ к альбому {% if album_url %}<a href="{{ album_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}.
@@ -22,4 +22,4 @@ def render_template(template_str: str, context: dict[str, Any]) -> str:
return _env.from_string(template_str).render(**context)
except jinja2.TemplateError as e:
_LOGGER.error("Template render error: %s", e)
return template_str
return "[Template rendering error]"