feat: Google Photos provider backend + API hardening

- Add Google Photos provider: client, models, change detector, capabilities
- Add notification templates (en/ru) for all GP event slots
- Add command templates (en/ru) for GP bot commands
- Register GP in slot/command loaders, capabilities, and seeds
- Harden provider API: validate OAuth credentials on create/update
- Add internal URL rewriting for asset fetches (LAN optimization)
- Fix template renderer to handle missing variables gracefully
- Improve webhook command routing for multi-provider support
- Add provider health check endpoint and watcher improvements
This commit is contained in:
2026-03-25 22:07:03 +03:00
parent 337276113d
commit 307871cae5
73 changed files with 1154 additions and 144 deletions
+34 -23
View File
@@ -55,9 +55,11 @@ async function doRefreshAccessToken(): Promise<boolean> {
return false;
}
const DEFAULT_TIMEOUT_MS = 30_000;
export async function api<T = any>(
path: string,
options: RequestInit = {}
options: RequestInit & { timeoutMs?: number } = {}
): Promise<T> {
const token = getToken();
const headers: Record<string, string> = {
@@ -68,31 +70,40 @@ export async function api<T = any>(
headers['Authorization'] = `Bearer ${token}`;
}
let res = await fetch(`${API_BASE}${path}`, { ...options, headers });
const { timeoutMs, ...fetchOptions } = options;
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), timeoutMs ?? DEFAULT_TIMEOUT_MS);
const signal = options.signal ?? controller.signal;
// Try token refresh on 401
if (res.status === 401 && token) {
const refreshed = await refreshAccessToken();
if (refreshed) {
headers['Authorization'] = `Bearer ${getToken()}`;
res = await fetch(`${API_BASE}${path}`, { ...options, headers });
try {
let res = await fetch(`${API_BASE}${path}`, { ...fetchOptions, headers, signal });
// Try token refresh on 401
if (res.status === 401 && token) {
const refreshed = await refreshAccessToken();
if (refreshed) {
headers['Authorization'] = `Bearer ${getToken()}`;
res = await fetch(`${API_BASE}${path}`, { ...fetchOptions, headers, signal });
}
}
}
if (res.status === 401) {
clearTokens();
if (typeof window !== 'undefined') {
window.location.href = '/login';
if (res.status === 401) {
clearTokens();
if (typeof window !== 'undefined') {
window.location.href = '/login';
}
throw new Error('Unauthorized');
}
throw new Error('Unauthorized');
if (res.status === 204) return undefined as T;
if (!res.ok) {
const err = await res.json().catch(() => ({ detail: res.statusText }));
throw new Error(err.detail || `HTTP ${res.status}`);
}
return res.json();
} finally {
clearTimeout(timeout);
}
if (res.status === 204) return undefined as T;
if (!res.ok) {
const err = await res.json().catch(() => ({ detail: res.statusText }));
throw new Error(err.detail || `HTTP ${res.status}`);
}
return res.json();
}
@@ -136,6 +136,7 @@
});
const flatResults = $derived(results);
const flatIndexMap = $derived(new Map(flatResults.map((item, idx) => [item, idx])));
async function openPalette() {
open = true;
@@ -239,7 +240,7 @@
{group.label}
</div>
{#each group.items as item, i}
{@const flatIdx = flatResults.indexOf(item)}
{@const flatIdx = flatIndexMap.get(item) ?? -1}
<button
class="sp-item"
class:sp-active={flatIdx === activeIndex}
@@ -58,6 +58,12 @@
let slotErrorLines = $state<Record<string, number | null>>({});
let slotErrorTypes = $state<Record<string, string>>({});
let validateTimers: Record<string, ReturnType<typeof setTimeout>> = {};
/** Clean up validate timers on unmount */
onMount(() => {
return () => {
for (const timer of Object.values(validateTimers)) clearTimeout(timer);
};
});
let varsRef = $state<Record<string, any>>({});
let showVarsFor = $state<string | null>(null);
let activeLocale = $state<string>('en');
@@ -110,10 +116,12 @@
return form.slots[slotName]?.[activeLocale] || '';
}
/** Set slot template for current locale. */
/** Set slot template for current locale (immutable update). */
function setSlotValue(slotName: string, value: string) {
if (!form.slots[slotName]) form.slots[slotName] = {};
form.slots[slotName][activeLocale] = value;
form.slots = {
...form.slots,
[slotName]: { ...(form.slots[slotName] || {}), [activeLocale]: value }
};
}
onMount(load);
@@ -425,9 +433,9 @@
<p class="font-medium">{config.name}</p>
<span class="text-xs px-1.5 py-0.5 rounded bg-[var(--color-muted)] text-[var(--color-muted-foreground)]">{config.provider_type}</span>
{#if config.user_id === 0}
<span class="text-xs px-1.5 py-0.5 rounded bg-[var(--color-muted)] text-[var(--color-muted-foreground)]">System</span>
<span class="text-xs px-1.5 py-0.5 rounded bg-[var(--color-muted)] text-[var(--color-muted-foreground)]">{t('common.system')}</span>
{/if}
<span class="text-xs px-1.5 py-0.5 rounded bg-[var(--color-muted)] text-[var(--color-muted-foreground)]">{Object.keys(config.slots).length} slots</span>
<span class="text-xs px-1.5 py-0.5 rounded bg-[var(--color-muted)] text-[var(--color-muted-foreground)]">{Object.keys(config.slots).length} {t('templateConfig.slots')}</span>
</div>
{#if config.description}
<p class="text-sm text-[var(--color-muted-foreground)] mt-1">{config.description}</p>
+2 -2
View File
@@ -56,8 +56,8 @@
} catch (err: any) {
loadError = err.message || t('providers.loadError');
} finally { loaded = true; highlightFromUrl(); }
// Ping all providers in background
for (const p of providers) {
// Ping all providers in background (use unfiltered list)
for (const p of allProviders) {
health = { ...health, [p.id]: null };
api(`/providers/${p.id}/test`, { method: 'POST' })
.then((r: any) => { health = { ...health, [p.id]: r.ok }; })
@@ -19,6 +19,7 @@ class ServiceProviderType(str, Enum):
PLANKA = "planka"
SCHEDULER = "scheduler"
NUT = "nut"
GOOGLE_PHOTOS = "google_photos"
class ServiceProvider(ABC):
@@ -352,6 +352,61 @@ NUT_CAPABILITIES = ProviderCapabilities(
],
)
# ---------------------------------------------------------------------------
# Google Photos provider capabilities
# ---------------------------------------------------------------------------
GOOGLE_PHOTOS_CAPABILITIES = ProviderCapabilities(
provider_type="google_photos",
display_name="Google Photos",
webhook_based=False,
supported_filters=[
{"key": "collections", "label": "Albums", "type": "select", "source": "api"},
],
notification_slots=[
{"name": "message_assets_added", "description": "New media added to album"},
{"name": "message_assets_removed", "description": "Media removed from album"},
{"name": "message_collection_renamed", "description": "Album renamed"},
{"name": "message_collection_deleted", "description": "Album deleted"},
{"name": "message_sharing_changed", "description": "Sharing status changed"},
],
events=[
{"name": "assets_added", "description": "New media detected in album"},
{"name": "assets_removed", "description": "Media removed from album"},
{"name": "collection_renamed", "description": "Album was renamed"},
{"name": "collection_deleted", "description": "Album was deleted"},
{"name": "sharing_changed", "description": "Album sharing status changed"},
],
command_slots=[
{"name": "start", "description": "/start greeting message"},
{"name": "help", "description": "/help command listing"},
{"name": "status", "description": "/status tracker summary"},
{"name": "albums", "description": "/albums tracked albums list"},
{"name": "latest", "description": "/latest recent photos"},
{"name": "search", "description": "/search by category or date"},
{"name": "random", "description": "/random random photos"},
{"name": "rate_limited", "description": "Rate limit warning message"},
{"name": "no_results", "description": "Empty results fallback"},
{"name": "desc_help", "description": "Menu description for /help"},
{"name": "desc_status", "description": "Menu description for /status"},
{"name": "desc_albums", "description": "Menu description for /albums"},
{"name": "desc_latest", "description": "Menu description for /latest"},
{"name": "usage_latest", "description": "Usage example for /latest"},
{"name": "desc_search", "description": "Menu description for /search"},
{"name": "usage_search", "description": "Usage example for /search"},
{"name": "desc_random", "description": "Menu description for /random"},
{"name": "usage_random", "description": "Usage example for /random"},
],
commands=[
{"name": "status", "description": "Show tracker status"},
{"name": "albums", "description": "List tracked albums"},
{"name": "latest", "description": "Show latest photos"},
{"name": "search", "description": "Search media"},
{"name": "random", "description": "Random photos"},
{"name": "help", "description": "Show commands"},
],
)
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
@@ -362,6 +417,7 @@ _REGISTRY: dict[str, ProviderCapabilities] = {
"planka": PLANKA_CAPABILITIES,
"scheduler": SCHEDULER_CAPABILITIES,
"nut": NUT_CAPABILITIES,
"google_photos": GOOGLE_PHOTOS_CAPABILITIES,
}
@@ -0,0 +1,17 @@
"""Google Photos service provider implementation."""
from notify_bridge_core.providers.base import ServiceProviderType
from notify_bridge_core.templates.variables import registry
from .client import GooglePhotosClient, GooglePhotosApiError
from .provider import GooglePhotosServiceProvider, GOOGLE_PHOTOS_VARIABLES
# Register Google Photos variables in the global registry
registry.register_provider_variables(ServiceProviderType.GOOGLE_PHOTOS, GOOGLE_PHOTOS_VARIABLES)
__all__ = [
"GooglePhotosClient",
"GooglePhotosApiError",
"GooglePhotosServiceProvider",
"GOOGLE_PHOTOS_VARIABLES",
]
@@ -0,0 +1,275 @@
"""Google Photos Library API async client with OAuth2 token refresh."""
from __future__ import annotations
import logging
import time
from typing import Any
import aiohttp
from .constants import (
API_BASE_URL,
MAX_BATCH_GET,
MAX_PAGE_SIZE,
TOKEN_URL,
)
from .models import GooglePhotosAlbumData, GooglePhotosMediaItem
_LOGGER = logging.getLogger(__name__)
class GooglePhotosApiError(Exception):
"""Raised when the Google Photos API returns an error."""
def __init__(self, status: int, message: str) -> None:
self.status = status
super().__init__(f"Google Photos API error {status}: {message}")
class GooglePhotosClient:
"""Async HTTP client for the Google Photos Library API."""
def __init__(
self,
session: aiohttp.ClientSession,
client_id: str,
client_secret: str,
refresh_token: str,
) -> None:
self._session = session
self._client_id = client_id
self._client_secret = client_secret
self._refresh_token = refresh_token
self._access_token: str | None = None
self._token_expires_at: float = 0.0
@property
def access_token(self) -> str | None:
"""Current access token (may be expired)."""
return self._access_token
async def _ensure_token(self) -> None:
"""Refresh the OAuth2 access token if expired or missing."""
if self._access_token and time.monotonic() < self._token_expires_at:
return
_LOGGER.debug("Refreshing Google Photos access token")
data = {
"client_id": self._client_id,
"client_secret": self._client_secret,
"refresh_token": self._refresh_token,
"grant_type": "refresh_token",
}
async with self._session.post(TOKEN_URL, data=data) as resp:
if resp.status != 200:
body = await resp.text()
raise GooglePhotosApiError(resp.status, f"Token refresh failed: {body}")
result = await resp.json()
self._access_token = result["access_token"]
# Refresh ~60s before actual expiry to avoid races
expires_in = result.get("expires_in", 3600)
self._token_expires_at = time.monotonic() + expires_in - 60
async def _request(
self,
method: str,
path: str,
*,
params: dict[str, Any] | None = None,
json_body: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Make an authenticated API request."""
await self._ensure_token()
url = f"{API_BASE_URL}/{path}"
headers = {"Authorization": f"Bearer {self._access_token}"}
async with self._session.request(
method, url, headers=headers, params=params, json=json_body,
) as resp:
if resp.status == 200:
return await resp.json()
body = await resp.text()
raise GooglePhotosApiError(resp.status, body)
# ------------------------------------------------------------------
# Albums
# ------------------------------------------------------------------
async def ping(self) -> bool:
"""Test connectivity by listing a single album."""
try:
await self._request("GET", "albums", params={"pageSize": "1"})
return True
except Exception:
_LOGGER.exception("Google Photos ping failed")
return False
async def list_albums(self, *, include_shared: bool = True) -> list[dict[str, Any]]:
"""List all albums (owned + optionally shared)."""
albums: list[dict[str, Any]] = []
# Owned albums
page_token: str | None = None
while True:
params: dict[str, str] = {"pageSize": str(MAX_PAGE_SIZE)}
if page_token:
params["pageToken"] = page_token
result = await self._request("GET", "albums", params=params)
albums.extend(result.get("albums", []))
page_token = result.get("nextPageToken")
if not page_token:
break
# Shared albums
if include_shared:
page_token = None
while True:
params = {"pageSize": str(MAX_PAGE_SIZE)}
if page_token:
params["pageToken"] = page_token
result = await self._request("GET", "sharedAlbums", params=params)
shared = result.get("sharedAlbums", [])
# Deduplicate: shared albums the user owns are already in the owned list
existing_ids = {a["id"] for a in albums}
albums.extend(a for a in shared if a.get("id") not in existing_ids)
page_token = result.get("nextPageToken")
if not page_token:
break
return albums
async def get_album(self, album_id: str) -> dict[str, Any] | None:
"""Get a single album by ID."""
try:
return await self._request("GET", f"albums/{album_id}")
except GooglePhotosApiError as e:
if e.status == 404:
return None
raise
async def get_album_with_items(self, album_id: str) -> GooglePhotosAlbumData | None:
"""Fetch album metadata + all media items."""
album_data = await self.get_album(album_id)
if album_data is None:
return None
items = await self.list_media_items(album_id)
return GooglePhotosAlbumData.from_api_response(album_data, items)
# ------------------------------------------------------------------
# Media items
# ------------------------------------------------------------------
async def list_media_items(self, album_id: str) -> list[GooglePhotosMediaItem]:
"""List all media items in an album (handles pagination)."""
items: list[GooglePhotosMediaItem] = []
page_token: str | None = None
while True:
body: dict[str, Any] = {
"albumId": album_id,
"pageSize": MAX_PAGE_SIZE,
}
if page_token:
body["pageToken"] = page_token
result = await self._request("POST", "mediaItems:search", json_body=body)
for raw in result.get("mediaItems", []):
items.append(GooglePhotosMediaItem.from_api_response(raw))
page_token = result.get("nextPageToken")
if not page_token:
break
return items
async def get_media_item(self, media_item_id: str) -> GooglePhotosMediaItem | None:
"""Get a single media item (with fresh baseUrl)."""
try:
data = await self._request("GET", f"mediaItems/{media_item_id}")
return GooglePhotosMediaItem.from_api_response(data)
except GooglePhotosApiError as e:
if e.status == 404:
return None
raise
async def get_media_items_batch(
self, media_item_ids: list[str],
) -> dict[str, GooglePhotosMediaItem]:
"""Batch-get media items (up to 50 per call). Returns dict keyed by ID."""
result: dict[str, GooglePhotosMediaItem] = {}
for i in range(0, len(media_item_ids), MAX_BATCH_GET):
chunk = media_item_ids[i : i + MAX_BATCH_GET]
# batchGet requires repeated query params (mediaItemIds=a&mediaItemIds=b)
# which _request() doesn't support, so we make the request manually.
await self._ensure_token()
url = f"{API_BASE_URL}/mediaItems:batchGet"
headers = {"Authorization": f"Bearer {self._access_token}"}
query_params = "&".join(f"mediaItemIds={mid}" for mid in chunk)
async with self._session.get(f"{url}?{query_params}", headers=headers) as resp:
if resp.status != 200:
body = await resp.text()
_LOGGER.error("batchGet failed: %s %s", resp.status, body)
continue
data = await resp.json()
for item_result in data.get("mediaItemResults", []):
media_item = item_result.get("mediaItem")
if media_item:
parsed = GooglePhotosMediaItem.from_api_response(media_item)
result[parsed.id] = parsed
return result
async def search_media_items(
self,
*,
album_id: str | None = None,
category: str | None = None,
media_type: str | None = None,
date_start: str | None = None,
date_end: str | None = None,
page_size: int = 25,
) -> list[GooglePhotosMediaItem]:
"""Search media items with filters."""
body: dict[str, Any] = {"pageSize": min(page_size, MAX_PAGE_SIZE)}
if album_id:
body["albumId"] = album_id
else:
filters: dict[str, Any] = {}
if category:
filters["contentFilter"] = {
"includedContentCategories": [category.upper()],
}
if media_type:
filters["mediaTypeFilter"] = {"mediaTypes": [media_type.upper()]}
if date_start or date_end:
date_filter: dict[str, Any] = {}
if date_start:
parts = date_start.split("-")
date_filter["startDate"] = {
"year": int(parts[0]),
"month": int(parts[1]),
"day": int(parts[2]),
}
if date_end:
parts = date_end.split("-")
date_filter["endDate"] = {
"year": int(parts[0]),
"month": int(parts[1]),
"day": int(parts[2]),
}
filters["dateFilter"] = {"ranges": [date_filter]}
if filters:
body["filters"] = filters
result = await self._request("POST", "mediaItems:search", json_body=body)
return [
GooglePhotosMediaItem.from_api_response(raw)
for raw in result.get("mediaItems", [])
]
@@ -0,0 +1,26 @@
"""Google Photos provider constants."""
from typing import Final
# Google Photos Library API
API_BASE_URL: Final = "https://photoslibrary.googleapis.com/v1"
TOKEN_URL: Final = "https://oauth2.googleapis.com/token"
# OAuth scopes
SCOPE_READONLY: Final = "https://www.googleapis.com/auth/photoslibrary.readonly"
SCOPE_SHARING: Final = "https://www.googleapis.com/auth/photoslibrary.sharing"
# API limits
MAX_PAGE_SIZE: Final = 100 # max items per page
MAX_BATCH_GET: Final = 50 # max items per batchGet call
# Asset URL suffixes (appended to baseUrl)
PHOTO_DOWNLOAD_SUFFIX: Final = "=w2048-h2048" # max resolution photo
PHOTO_THUMBNAIL_SUFFIX: Final = "=w512-h512"
VIDEO_DOWNLOAD_SUFFIX: Final = "=dv" # video download
# Cache key prefix for Telegram file_id cache
CACHE_HOST: Final = "photos.googleapis.com"
# Defaults
DEFAULT_SCAN_INTERVAL: Final = 120 # seconds (Google API quota is tighter than Immich)
@@ -0,0 +1,108 @@
"""Google Photos data models."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass
class GooglePhotosMediaItem:
"""A media item from the Google Photos Library API."""
id: str
filename: str
mime_type: str
created_at: str # mediaMetadata.creationTime (ISO 8601)
base_url: str # temporary download URL (expires ~60 min)
product_url: str # permanent link to Google Photos UI
width: int | None = None
height: int | None = None
description: str = ""
is_video: bool = False
@property
def is_processed(self) -> bool:
"""A media item is considered processed if it has a base_url."""
return bool(self.base_url)
@classmethod
def from_api_response(cls, data: dict[str, Any]) -> GooglePhotosMediaItem:
"""Parse a mediaItem response from the Google Photos API."""
metadata = data.get("mediaMetadata", {})
mime_type = data.get("mimeType", "")
is_video = mime_type.startswith("video/") or "video" in metadata
width = None
height = None
if "width" in metadata:
try:
width = int(metadata["width"])
except (ValueError, TypeError):
pass
if "height" in metadata:
try:
height = int(metadata["height"])
except (ValueError, TypeError):
pass
return cls(
id=data["id"],
filename=data.get("filename", ""),
mime_type=mime_type,
created_at=metadata.get("creationTime", ""),
base_url=data.get("baseUrl", ""),
product_url=data.get("productUrl", ""),
width=width,
height=height,
description=data.get("description", ""),
is_video=is_video,
)
@dataclass
class GooglePhotosAlbumData:
"""Full album data assembled from Google Photos API responses."""
id: str
name: str
asset_count: int
product_url: str
is_shared: bool
cover_photo_id: str | None = None
share_info: dict[str, Any] = field(default_factory=dict)
asset_ids: set[str] = field(default_factory=set)
assets: dict[str, GooglePhotosMediaItem] = field(default_factory=dict)
@property
def photo_count(self) -> int:
return sum(1 for a in self.assets.values() if not a.is_video)
@property
def video_count(self) -> int:
return sum(1 for a in self.assets.values() if a.is_video)
@classmethod
def from_api_response(
cls, album_data: dict[str, Any], media_items: list[GooglePhotosMediaItem],
) -> GooglePhotosAlbumData:
"""Build album data from album metadata + fetched media items."""
asset_ids: set[str] = set()
assets: dict[str, GooglePhotosMediaItem] = {}
for item in media_items:
asset_ids.add(item.id)
assets[item.id] = item
share_info = album_data.get("shareInfo", {})
return cls(
id=album_data["id"],
name=album_data.get("title", "Untitled"),
asset_count=int(album_data.get("mediaItemsCount", len(asset_ids))),
product_url=album_data.get("productUrl", ""),
is_shared=bool(share_info),
cover_photo_id=album_data.get("coverPhotoMediaItemId"),
share_info=share_info,
asset_ids=asset_ids,
assets=assets,
)
@@ -0,0 +1,226 @@
"""Google Photos service provider — concrete implementation of ServiceProvider."""
from __future__ import annotations
import logging
from typing import Any
import aiohttp
from notify_bridge_core.models.events import ServiceEvent
from notify_bridge_core.providers.base import ServiceProvider, ServiceProviderType
from notify_bridge_core.templates.variables import TemplateVariableDefinition
from .change_detector import detect_album_changes
from .client import GooglePhotosClient
from .models import GooglePhotosAlbumData
_LOGGER = logging.getLogger(__name__)
# Google Photos specific template variables
GOOGLE_PHOTOS_VARIABLES: list[TemplateVariableDefinition] = [
TemplateVariableDefinition(
name="album_name",
type="string",
description="Album name (alias for collection_name)",
example="Vacation 2026",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
TemplateVariableDefinition(
name="album_id",
type="string",
description="Album ID (alias for collection_id)",
example="ABc123...",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
TemplateVariableDefinition(
name="album_url",
type="string",
description="Web URL to the album in Google Photos",
example="https://photos.google.com/lr/album/ABc123...",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
TemplateVariableDefinition(
name="photo_count",
type="int",
description="Number of photos in album",
example="42",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
TemplateVariableDefinition(
name="video_count",
type="int",
description="Number of videos in album",
example="8",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
TemplateVariableDefinition(
name="shared",
type="bool",
description="Whether the album is shared",
example="true",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
TemplateVariableDefinition(
name="old_album_name",
type="string",
description="Previous album name (for rename events)",
example="Old Name",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
TemplateVariableDefinition(
name="new_album_name",
type="string",
description="New album name (for rename events)",
example="New Name",
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
),
]
class GooglePhotosServiceProvider(ServiceProvider):
"""Google Photos Library API provider."""
provider_type = ServiceProviderType.GOOGLE_PHOTOS
def __init__(
self,
session: aiohttp.ClientSession,
client_id: str,
client_secret: str,
refresh_token: str,
name: str = "Google Photos",
) -> None:
self._client = GooglePhotosClient(session, client_id, client_secret, refresh_token)
self._name = name
@property
def client(self) -> GooglePhotosClient:
return self._client
async def connect(self) -> bool:
return await self._client.ping()
async def disconnect(self) -> None:
pass # session lifecycle managed by caller
async def poll(
self,
collection_ids: list[str],
tracker_state: dict[str, Any],
) -> tuple[list[ServiceEvent], dict[str, Any]]:
events: list[ServiceEvent] = []
new_state = dict(tracker_state)
for album_id in collection_ids:
album = await self._client.get_album_with_items(album_id)
if album is None:
# Album deleted or inaccessible
if album_id in new_state:
from notify_bridge_core.models.events import EventType
from datetime import datetime, timezone
events.append(ServiceEvent(
event_type=EventType.COLLECTION_DELETED,
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
provider_name=self._name,
collection_id=album_id,
collection_name=new_state.get(album_id, {}).get("name", "Unknown"),
timestamp=datetime.now(timezone.utc),
))
del new_state[album_id]
continue
# Get previous state
prev = new_state.get(album_id)
if prev is None:
# First time seeing this album — store state, no event
new_state[album_id] = _serialize_album_state(album)
continue
# Reconstruct previous album data for comparison
old_album = _deserialize_album_state(album_id, prev)
pending = set(prev.get("pending_asset_ids", []))
detected_events, updated_pending = detect_album_changes(
old_album, album, pending, self._name,
)
if detected_events:
events.extend(detected_events)
# Update state
state = _serialize_album_state(album)
state["pending_asset_ids"] = list(updated_pending)
new_state[album_id] = state
return events, new_state
def get_available_variables(self) -> list[TemplateVariableDefinition]:
return list(GOOGLE_PHOTOS_VARIABLES)
def get_provider_config_schema(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"client_id": {
"type": "string",
"description": "Google OAuth2 Client ID",
},
"client_secret": {
"type": "string",
"description": "Google OAuth2 Client Secret",
"secret": True,
},
"refresh_token": {
"type": "string",
"description": "OAuth2 Refresh Token (from Google OAuth Playground)",
"secret": True,
},
},
"required": ["client_id", "client_secret", "refresh_token"],
}
async def list_collections(self) -> list[dict[str, Any]]:
albums = await self._client.list_albums(include_shared=True)
return [
{
"id": a.get("id", ""),
"name": a.get("title", "Untitled"),
"asset_count": int(a.get("mediaItemsCount", 0)),
}
for a in albums
]
async def test_connection(self) -> dict[str, Any]:
ok = await self._client.ping()
if ok:
albums = await self._client.list_albums(include_shared=False)
return {
"ok": True,
"message": f"Connected to Google Photos ({len(albums)} albums found)",
}
return {"ok": False, "message": "Failed to connect to Google Photos"}
def _serialize_album_state(album: GooglePhotosAlbumData) -> dict[str, Any]:
"""Serialize album state for persistence."""
return {
"name": album.name,
"asset_ids": list(album.asset_ids),
"shared": album.is_shared,
"pending_asset_ids": [],
}
def _deserialize_album_state(album_id: str, state: dict[str, Any]) -> GooglePhotosAlbumData:
"""Create a minimal GooglePhotosAlbumData from saved state for comparison."""
return GooglePhotosAlbumData(
id=album_id,
name=state.get("name", ""),
asset_count=len(state.get("asset_ids", [])),
product_url="",
is_shared=state.get("shared", False),
asset_ids=set(state.get("asset_ids", [])),
)
@@ -0,0 +1,8 @@
📚 Tracked albums:
{%- if albums %}
{%- for album in albums %}
• {{ album.name }} ({{ album.asset_count }} items)
{%- endfor %}
{%- else %}
(none)
{%- endif %}
@@ -0,0 +1 @@
Show available commands
@@ -0,0 +1 @@
Search photos by category
@@ -0,0 +1,4 @@
📷 <b>Available Commands</b>
{%- for cmd in commands %}
/{{ cmd.name }} — {{ cmd.description }}
{%- endfor %}
@@ -0,0 +1,4 @@
📸 Latest:
{%- for asset in assets %}
• {{ asset.originalFileName }}{% if asset.year %} ({{ asset.year }}){% endif %}
{%- endfor %}
@@ -0,0 +1,4 @@
🎲 Random:
{%- for asset in assets %}
• {{ asset.originalFileName }}
{%- endfor %}
@@ -0,0 +1 @@
⏳ Too many requests. Please wait a moment before trying again.
@@ -0,0 +1,4 @@
🔍 Results for "{{ query }}":
{%- for asset in assets %}
• {{ asset.originalFileName }}{% if asset.year %} ({{ asset.year }}){% endif %}
{%- endfor %}
@@ -0,0 +1,2 @@
👋 Hi! I'm your Notify Bridge bot for <b>Google Photos</b>.
Use /help to see available commands.
@@ -0,0 +1,6 @@
📷 <b>Google Photos Status</b>
Service: {{ service_name }}
Tracked albums: {{ album_count }}
{%- if last_event %}
Last event: {{ last_event.type }} ({{ last_event.time }})
{%- endif %}
@@ -47,6 +47,16 @@ PROVIDER_COMMAND_SLOTS: dict[str, list[str]] = {
# Description slots
"desc_help", "desc_status", "desc_devices", "desc_battery",
],
"google_photos": [
# Response templates
"start", "help", "status", "albums", "latest", "search", "random",
"rate_limited", "no_results",
# Description slots
"desc_help", "desc_status", "desc_albums",
"desc_latest", "desc_search", "desc_random",
# Usage example slots
"usage_latest", "usage_search", "usage_random",
],
}
# Backward-compatible aliases
@@ -0,0 +1,8 @@
📚 Отслеживаемые альбомы:
{%- if albums %}
{%- for album in albums %}
• {{ album.name }} ({{ album.asset_count }} элементов)
{%- endfor %}
{%- else %}
(нет)
{%- endif %}
@@ -0,0 +1 @@
Список отслеживаемых альбомов
@@ -0,0 +1 @@
Показать доступные команды
@@ -0,0 +1 @@
Последние фото
@@ -0,0 +1 @@
Случайные фото
@@ -0,0 +1 @@
Поиск фото по категории
@@ -0,0 +1 @@
Показать статус трекера
@@ -0,0 +1,4 @@
📷 <b>Доступные команды</b>
{%- for cmd in commands %}
/{{ cmd.name }} — {{ cmd.description }}
{%- endfor %}
@@ -0,0 +1,4 @@
📸 Последние:
{%- for asset in assets %}
• {{ asset.originalFileName }}{% if asset.year %} ({{ asset.year }}){% endif %}
{%- endfor %}
@@ -0,0 +1 @@
Ничего не найдено.
@@ -0,0 +1,4 @@
🎲 Случайные:
{%- for asset in assets %}
• {{ asset.originalFileName }}
{%- endfor %}
@@ -0,0 +1 @@
⏳ Слишком много запросов. Подождите немного.
@@ -0,0 +1,4 @@
🔍 Результаты по запросу "{{ query }}":
{%- for asset in assets %}
• {{ asset.originalFileName }}{% if asset.year %} ({{ asset.year }}){% endif %}
{%- endfor %}
@@ -0,0 +1,2 @@
👋 Привет! Я бот Notify Bridge для <b>Google Фото</b>.
Используйте /help для просмотра доступных команд.
@@ -0,0 +1,6 @@
📷 <b>Статус Google Фото</b>
Сервис: {{ service_name }}
Отслеживаемые альбомы: {{ album_count }}
{%- if last_event %}
Последнее событие: {{ last_event.type }} ({{ last_event.time }})
{%- endif %}
@@ -0,0 +1 @@
/latest [количество]
@@ -0,0 +1 @@
/random [количество]
@@ -0,0 +1 @@
/search [категория]
@@ -0,0 +1,12 @@
📎 {{ added_count }} new file(s) added to album {% if album_url %}<a href="{{ album_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}.
{%- if added_assets %}
{%- for asset in added_assets %}
• {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {{ asset.filename }}
{%- if asset.is_favorite %} ❤️{% endif %}
{%- if asset.oversized %} ⚠️{% endif %}
{%- endfor %}
{%- endif %}
{%- if has_oversized_videos %}
⚠️ Some videos exceed the {{ max_video_size_mb }} MB file size limit and may not be sent.
{%- endif %}
@@ -0,0 +1 @@
🗑️ {{ removed_count }} file(s) removed from album {% if album_url %}<a href="{{ album_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}.
@@ -0,0 +1 @@
🗑️ Album "{{ collection_name }}" was deleted.
@@ -0,0 +1 @@
✏️ Album "{{ old_name }}" renamed to {% if album_url %}<a href="{{ album_url }}">{{ new_name }}</a>{% else %}"{{ new_name }}"{% endif %}.
@@ -0,0 +1 @@
🔗 Sharing changed for album {% if album_url %}<a href="{{ album_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}.
@@ -53,6 +53,13 @@ PROVIDER_SLOT_FILE_MAP: dict[str, dict[str, str]] = {
"scheduler": {
"message_scheduled_message": "scheduled_message.jinja2",
},
"google_photos": {
"message_assets_added": "gp_assets_added.jinja2",
"message_assets_removed": "gp_assets_removed.jinja2",
"message_collection_renamed": "gp_collection_renamed.jinja2",
"message_collection_deleted": "gp_collection_deleted.jinja2",
"message_sharing_changed": "gp_sharing_changed.jinja2",
},
"nut": {
"message_ups_online": "nut_ups_online.jinja2",
"message_ups_on_battery": "nut_ups_on_battery.jinja2",
@@ -0,0 +1,12 @@
📎 {{ added_count }} новых файл(ов) добавлено в альбом {% if album_url %}<a href="{{ album_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}.
{%- if added_assets %}
{%- for asset in added_assets %}
• {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {{ asset.filename }}
{%- if asset.is_favorite %} ❤️{% endif %}
{%- if asset.oversized %} ⚠️{% endif %}
{%- endfor %}
{%- endif %}
{%- if has_oversized_videos %}
⚠️ Некоторые видео превышают лимит {{ max_video_size_mb }} МБ и могут не отправиться.
{%- endif %}
@@ -0,0 +1 @@
🗑️ {{ removed_count }} файл(ов) удалено из альбома {% if album_url %}<a href="{{ album_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}.
@@ -0,0 +1 @@
🗑️ Альбом "{{ collection_name }}" был удалён.
@@ -0,0 +1 @@
✏️ Альбом "{{ old_name }}" переименован в {% if album_url %}<a href="{{ album_url }}">{{ new_name }}</a>{% else %}"{{ new_name }}"{% endif %}.
@@ -0,0 +1 @@
🔗 Изменён доступ к альбому {% if album_url %}<a href="{{ album_url }}">{{ album_name }}</a>{% else %}"{{ album_name }}"{% endif %}.
@@ -22,4 +22,4 @@ def render_template(template_str: str, context: dict[str, Any]) -> str:
return _env.from_string(template_str).render(**context)
except jinja2.TemplateError as e:
_LOGGER.error("Template render error: %s", e)
return template_str
return "[Template rendering error]"
@@ -104,7 +104,9 @@ async def _get(session: AsyncSession, config_id: int, user_id: int) -> CommandTe
# ---------------------------------------------------------------------------
@router.get("/variables")
async def get_command_variables():
async def get_command_variables(
user: User = Depends(get_current_user),
):
"""Get variable reference for each command template slot."""
common_vars = {
"locale": "Current locale (en/ru)",
@@ -13,7 +13,7 @@ import aiohttp
from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import ServiceProvider, User
from ..services import make_immich_provider, make_gitea_provider, make_planka_provider, make_nut_provider
from ..services import make_immich_provider, make_gitea_provider, make_planka_provider, make_nut_provider, make_google_photos_provider
_LOGGER = logging.getLogger(__name__)
@@ -76,12 +76,19 @@ class NutProviderConfig(BaseModel):
password: str | None = None
class GooglePhotosProviderConfig(BaseModel):
client_id: str
client_secret: str
refresh_token: str
_PROVIDER_CONFIG_MODELS: dict[str, type[BaseModel]] = {
"immich": ImmichProviderConfig,
"gitea": GiteaProviderConfig,
"planka": PlankaProviderConfig,
"scheduler": SchedulerProviderConfig,
"nut": NutProviderConfig,
"google_photos": GooglePhotosProviderConfig,
}
@@ -122,65 +129,93 @@ async def create_provider(
_validate_provider_config(body.type, body.config)
# Validate connection for known provider types
if body.type == "immich":
from notify_bridge_core.providers.immich import ImmichServiceProvider
config = body.config
async with aiohttp.ClientSession() as http_session:
immich = ImmichServiceProvider(
http_session, config.get("url", ""), config.get("api_key", ""),
config.get("external_domain"), body.name,
)
test_result = await immich.test_connection()
try:
if body.type == "immich":
from notify_bridge_core.providers.immich import ImmichServiceProvider
config = body.config
async with aiohttp.ClientSession() as http_session:
immich = ImmichServiceProvider(
http_session, config.get("url", ""), config.get("api_key", ""),
config.get("external_domain"), body.name,
)
test_result = await immich.test_connection()
if not test_result.get("ok"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=test_result.get("message", f"Cannot connect to {body.type} provider"),
)
# Store external_domain from server config if available
if test_result.get("external_domain"):
config["external_domain"] = test_result["external_domain"]
elif body.type == "gitea":
config = body.config
# api_token is optional (webhook_secret is required, but token only for repo listing)
if config.get("api_token"):
async with aiohttp.ClientSession() as http_session:
from notify_bridge_core.providers.gitea import GiteaServiceProvider
gitea = GiteaServiceProvider(
http_session, config.get("url", ""), config.get("api_token", ""), body.name,
)
test_result = await gitea.test_connection()
if not test_result.get("ok"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=test_result.get("message", "Cannot connect to Gitea"),
)
elif body.type == "planka":
config = body.config
if config.get("api_key"):
async with aiohttp.ClientSession() as http_session:
from notify_bridge_core.providers.planka import PlankaServiceProvider
planka = PlankaServiceProvider(
http_session, config.get("url", ""), config.get("api_key", ""), body.name,
)
test_result = await planka.test_connection()
if not test_result.get("ok"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=test_result.get("message", "Cannot connect to Planka"),
)
elif body.type == "nut":
nut = make_nut_provider(ServiceProvider(
id=0, user_id=0, type="nut", name=body.name, config=body.config,
))
test_result = await nut.test_connection()
if not test_result.get("ok"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=test_result.get("message", f"Cannot connect to {body.type} provider"),
detail=test_result.get("message", "Cannot connect to NUT server"),
)
# Store external_domain from server config if available
if test_result.get("external_domain"):
config["external_domain"] = test_result["external_domain"]
elif body.type == "gitea":
config = body.config
# api_token is optional (webhook_secret is required, but token only for repo listing)
if config.get("api_token"):
elif body.type == "google_photos":
config = body.config
async with aiohttp.ClientSession() as http_session:
from notify_bridge_core.providers.gitea import GiteaServiceProvider
gitea = GiteaServiceProvider(
http_session, config.get("url", ""), config.get("api_token", ""), body.name,
from notify_bridge_core.providers.google_photos import GooglePhotosServiceProvider
gp = GooglePhotosServiceProvider(
http_session, config.get("client_id", ""), config.get("client_secret", ""),
config.get("refresh_token", ""), body.name,
)
test_result = await gitea.test_connection()
test_result = await gp.test_connection()
if not test_result.get("ok"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=test_result.get("message", "Cannot connect to Gitea"),
detail=test_result.get("message", "Cannot connect to Google Photos"),
)
elif body.type == "planka":
config = body.config
if config.get("api_key"):
async with aiohttp.ClientSession() as http_session:
from notify_bridge_core.providers.planka import PlankaServiceProvider
planka = PlankaServiceProvider(
http_session, config.get("url", ""), config.get("api_key", ""), body.name,
)
test_result = await planka.test_connection()
if not test_result.get("ok"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=test_result.get("message", "Cannot connect to Planka"),
)
elif body.type == "nut":
nut = make_nut_provider(ServiceProvider(
id=0, user_id=0, type="nut", name=body.name, config=body.config,
))
test_result = await nut.test_connection()
if not test_result.get("ok"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=test_result.get("message", "Cannot connect to NUT server"),
)
except HTTPException:
raise
except aiohttp.ClientError as err:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Connection error: {err}",
)
except OSError as err:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Connection error: {err}",
)
# Scheduler: no validation needed (virtual provider)
@@ -198,7 +233,9 @@ async def create_provider(
@router.get("/capabilities")
async def list_provider_capabilities():
async def list_provider_capabilities(
user: User = Depends(get_current_user),
):
"""List capabilities for all registered provider types."""
from notify_bridge_core.providers.capabilities import get_all_capabilities
result = {}
@@ -218,7 +255,10 @@ async def list_provider_capabilities():
@router.get("/capabilities/{provider_type}")
async def get_provider_capabilities(provider_type: str):
async def get_provider_capabilities(
provider_type: str,
user: User = Depends(get_current_user),
):
"""Get capabilities for a provider type (events, slots, commands)."""
from notify_bridge_core.providers.capabilities import get_capabilities
caps = get_capabilities(provider_type)
@@ -324,6 +364,21 @@ async def update_provider(
status_code=status.HTTP_400_BAD_REQUEST,
detail=test_result.get("message", "Cannot connect to NUT server"),
)
elif config_changed and provider.type == "google_photos":
try:
async with aiohttp.ClientSession() as http_session:
gp = make_google_photos_provider(http_session, provider)
test_result = await gp.test_connection()
if not test_result.get("ok"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=test_result.get("message", "Cannot connect to Google Photos"),
)
except aiohttp.ClientError as err:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Connection error: {err}",
)
session.add(provider)
await session.commit()
@@ -380,6 +435,11 @@ async def test_provider(
nut = make_nut_provider(provider)
return await nut.test_connection()
if provider.type == "google_photos":
async with aiohttp.ClientSession() as http_session:
gp = make_google_photos_provider(http_session, provider)
return await gp.test_connection()
return {"ok": False, "message": f"Unknown provider type: {provider.type}"}
@@ -400,22 +460,8 @@ async def list_people(
provider.config.get("url", ""),
provider.config.get("api_key", ""),
)
try:
async with http_session.get(
f"{client.url}/api/people",
headers={"x-api-key": client.api_key},
ssl=False,
) as response:
if response.status == 200:
data = await response.json()
people_list = data.get("people", data) if isinstance(data, dict) else data
return [
{"id": p["id"], "name": p.get("name", "")}
for p in people_list
if p.get("name")
]
except Exception as e:
_LOGGER.error("Failed to fetch people: %s", e)
people = await client.get_people()
return [{"id": pid, "name": name} for pid, name in people.items()]
return []
@@ -452,6 +498,11 @@ async def list_collections(
nut = make_nut_provider(provider)
return await nut.list_collections()
if provider.type == "google_photos":
async with aiohttp.ClientSession() as http_session:
gp = make_google_photos_provider(http_session, provider)
return await gp.list_collections()
return []
@@ -510,10 +561,11 @@ def _provider_response(p: ServiceProvider) -> dict:
"""Build a safe response dict for a provider."""
config = dict(p.config)
# Mask sensitive fields
for secret_field in ("api_key", "api_token", "webhook_secret", "password"):
for secret_field in ("api_key", "api_token", "webhook_secret", "password",
"client_secret", "refresh_token"):
if secret_field in config:
key = config[secret_field]
config[secret_field] = f"{key[:8]}...{key[-4:]}" if len(key) > 12 else "***"
config[secret_field] = f"***{key[-4:]}" if len(key) > 4 else "***"
return {
"id": p.id,
"type": p.type,
@@ -79,17 +79,20 @@ async def list_targets(
)
targets = result.all()
# Load receivers for each target
target_receivers: dict[int, list[TargetReceiver]] = {}
for tgt in targets:
# Batch-load all receivers for the user's targets in one query
target_ids = [tgt.id for tgt in targets]
target_receivers: dict[int, list[TargetReceiver]] = {tid: [] for tid in target_ids}
if target_ids:
recv_result = await session.exec(
select(TargetReceiver).where(TargetReceiver.target_id == tgt.id)
select(TargetReceiver).where(TargetReceiver.target_id.in_(target_ids))
)
target_receivers[tgt.id] = list(recv_result.all())
for recv in recv_result.all():
target_receivers[recv.target_id].append(recv)
# Resolve chat names and languages from receivers for telegram targets
# Batch-load chat names and languages for all telegram targets
chat_names: dict[str, str] = {}
chat_languages: dict[str, str] = {}
chat_lookups: list[tuple[int, str]] = [] # (bot_id, chat_id)
for tgt in targets:
if tgt.type == "telegram":
bot_id = tgt.config.get("bot_id")
@@ -98,18 +101,23 @@ async def list_targets(
for recv in target_receivers.get(tgt.id, []):
chat_id = str(recv.config.get("chat_id", ""))
if chat_id:
chat_result = await session.exec(
select(TelegramChat).where(
TelegramChat.bot_id == bot_id,
TelegramChat.chat_id == chat_id,
)
)
chat = chat_result.first()
if chat:
chat_names[f"{bot_id}_{chat_id}"] = chat.title or chat.username or ""
lang = getattr(chat, 'language_override', '') or getattr(chat, 'language_code', '') or ''
if lang:
chat_languages[f"{bot_id}_{chat_id}"] = lang
chat_lookups.append((bot_id, chat_id))
if chat_lookups:
all_bot_ids = list({bl[0] for bl in chat_lookups})
all_chat_ids = list({bl[1] for bl in chat_lookups})
chat_result = await session.exec(
select(TelegramChat).where(
TelegramChat.bot_id.in_(all_bot_ids),
TelegramChat.chat_id.in_(all_chat_ids),
)
)
for chat in chat_result.all():
key = f"{chat.bot_id}_{chat.chat_id}"
chat_names[key] = chat.title or chat.username or ""
lang = getattr(chat, 'language_override', '') or getattr(chat, 'language_code', '') or ''
if lang:
chat_languages[key] = lang
# Build lookup for broadcast child target resolution
target_map = {t.id: t for t in targets}
@@ -130,7 +130,9 @@ async def list_configs(
@router.get("/variables")
async def get_template_variables():
async def get_template_variables(
user: User = Depends(get_current_user),
):
"""Get template variable reference grouped by slot.
Returns a dict keyed by template slot name, each containing:
@@ -1,16 +1,22 @@
"""Template variable documentation endpoint."""
from fastapi import APIRouter
from fastapi import APIRouter, Depends
from notify_bridge_core.providers.base import ServiceProviderType
from notify_bridge_core.providers.immich import ImmichServiceProvider # noqa: F401 — triggers registration
from notify_bridge_core.templates.variables import registry
from ..auth.dependencies import get_current_user
from ..database.models import User
router = APIRouter(prefix="/api/template-vars", tags=["template-vars"])
@router.get("")
async def get_template_variables(provider_type: str | None = None):
async def get_template_variables(
provider_type: str | None = None,
user: User = Depends(get_current_user),
):
"""Get available template variables, optionally filtered by provider type."""
if provider_type:
try:
@@ -55,6 +55,9 @@ async def create_user(
if result.first():
raise HTTPException(status_code=409, detail="Username already exists")
if len(body.password) < 8:
raise HTTPException(status_code=400, detail="Password must be at least 8 characters")
user = User(
username=body.username,
hashed_password=bcrypt.hashpw(body.password.encode(), bcrypt.gensalt()).decode(),
@@ -81,8 +84,8 @@ async def reset_user_password(
user = await session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if len(body.new_password) < 6:
raise HTTPException(status_code=400, detail="Password must be at least 6 characters")
if len(body.new_password) < 8:
raise HTTPException(status_code=400, detail="Password must be at least 8 characters")
user.hashed_password = bcrypt.hashpw(body.new_password.encode(), bcrypt.gensalt()).decode()
session.add(user)
await session.commit()
@@ -156,7 +156,6 @@ async def gitea_webhook(provider_id: int, request: Request):
)},
},
))
await session.commit()
# Dispatch to targets
dispatcher = NotificationDispatcher()
@@ -172,6 +171,8 @@ async def gitea_webhook(provider_id: int, request: Request):
tracker.id, r.get("error", "unknown"),
)
await session.commit()
return {"ok": True, "dispatched": dispatched}
@@ -268,7 +269,6 @@ async def planka_webhook(provider_id: int, request: Request):
)},
},
))
await session.commit()
# Dispatch to targets
dispatcher = NotificationDispatcher()
@@ -284,6 +284,8 @@ async def planka_webhook(provider_id: int, request: Request):
tracker.id, r.get("error", "unknown"),
)
await session.commit()
return {"ok": True, "dispatched": dispatched}
@@ -2,6 +2,7 @@
from __future__ import annotations
import hmac
import logging
from typing import Any
@@ -40,7 +41,7 @@ async def telegram_webhook(
"""Handle incoming Telegram messages — route commands to handlers."""
# Validate webhook secret if configured
if _webhook_secret:
if x_telegram_bot_api_secret_token != _webhook_secret:
if not hmac.compare_digest(x_telegram_bot_api_secret_token or "", _webhook_secret):
raise HTTPException(status_code=403, detail="Invalid webhook secret")
# Find bot by opaque webhook path ID (not by token — token must not appear in URLs)
@@ -33,8 +33,8 @@ class Settings(BaseSettings):
telegram_webhook_secret: str = ""
cors_allowed_origins: str = "*"
"""Comma-separated allowed origins for CORS (e.g. 'http://localhost:5173,https://myapp.com'). Use '*' for dev."""
cors_allowed_origins: str = "http://localhost:5173"
"""Comma-separated allowed origins for CORS (e.g. 'http://localhost:5173,https://myapp.com')."""
static_dir: str = ""
"""Path to frontend static files. Set to serve SvelteKit build via FastAPI (e.g. /app/static in Docker)."""
@@ -152,6 +152,7 @@ async def _seed_default_templates() -> None:
await _seed_provider_template(session, "planka", "Planka")
await _seed_provider_template(session, "scheduler", "Scheduler")
await _seed_provider_template(session, "nut", "NUT")
await _seed_provider_template(session, "google_photos", "Google Photos")
await session.commit()
@@ -175,6 +176,9 @@ async def _seed_default_command_templates() -> None:
await _seed_provider_command_template(
session, "nut", "Default NUT Commands", "Default NUT command templates",
)
await _seed_provider_command_template(
session, "google_photos", "Default Google Photos Commands", "Default Google Photos command templates",
)
await session.commit()
@@ -305,6 +309,16 @@ async def _seed_default_command_configs() -> None:
"default_count": 5,
"rate_limits": {"api": 15, "default": 10},
},
{
"provider_type": "google_photos",
"name": "Default Google Photos",
"enabled_commands": [
"help", "status", "albums", "latest", "search", "random",
],
"response_mode": "media",
"default_count": 5,
"rate_limits": {"search": 30, "default": 10},
},
]
for cfg in defaults:
@@ -11,8 +11,11 @@ from slowapi.middleware import SlowAPIMiddleware
# Ensure app-level loggers are visible
logging.basicConfig(level=logging.INFO)
logging.getLogger("notify_bridge_server").setLevel(logging.DEBUG)
logging.getLogger("notify_bridge_core").setLevel(logging.DEBUG)
from .config import settings as _log_cfg
_log_level = logging.DEBUG if _log_cfg.debug else logging.INFO
logging.getLogger("notify_bridge_server").setLevel(_log_level)
logging.getLogger("notify_bridge_core").setLevel(_log_level)
from .database.engine import init_db
from .database.models import * # noqa: F401,F403 — ensure all models registered
@@ -77,6 +80,24 @@ async def lifespan(app: FastAPI):
app = FastAPI(title="Notify Bridge", version="0.1.0", lifespan=lifespan)
# --- Security headers ---
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request as StarletteRequest
from starlette.responses import Response as StarletteResponse
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: StarletteRequest, call_next):
response: StarletteResponse = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
return response
app.add_middleware(SecurityHeadersMiddleware)
# --- Rate limiting ---
from .auth.routes import limiter
app.state.limiter = limiter
@@ -4,6 +4,7 @@ from notify_bridge_core.providers.immich import ImmichServiceProvider
from notify_bridge_core.providers.gitea import GiteaServiceProvider
from notify_bridge_core.providers.planka import PlankaServiceProvider
from notify_bridge_core.providers.nut import NutServiceProvider
from notify_bridge_core.providers.google_photos import GooglePhotosServiceProvider
from ..database.models import ServiceProvider
@@ -52,3 +53,15 @@ def make_nut_provider(provider: ServiceProvider) -> NutServiceProvider:
password=config.get("password"),
name=provider.name,
)
def make_google_photos_provider(http_session, provider: ServiceProvider) -> GooglePhotosServiceProvider:
"""Create a GooglePhotosServiceProvider from a DB provider model."""
config = provider.config or {}
return GooglePhotosServiceProvider(
http_session,
config.get("client_id", ""),
config.get("client_secret", ""),
config.get("refresh_token", ""),
provider.name,
)
@@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
import logging
from typing import Any
@@ -28,6 +29,7 @@ _LOGGER = logging.getLogger(__name__)
# Module-level Telegram file caches — shared across dispatches for reuse
_url_cache: TelegramFileCache | None = None
_asset_cache: TelegramFileCache | None = None
_cache_lock = asyncio.Lock()
async def _get_telegram_caches() -> tuple[TelegramFileCache | None, TelegramFileCache | None]:
@@ -35,18 +37,24 @@ async def _get_telegram_caches() -> tuple[TelegramFileCache | None, TelegramFile
global _url_cache, _asset_cache
if _url_cache is not None:
return _url_cache, _asset_cache
import os
from pathlib import Path
data_dir = os.environ.get("NOTIFY_BRIDGE_DATA_DIR")
if not data_dir:
return None, None
cache_dir = Path(data_dir) / "cache"
_url_cache = TelegramFileCache(JsonFileBackend(cache_dir / "telegram_url_cache.json"))
_asset_cache = TelegramFileCache(JsonFileBackend(cache_dir / "telegram_asset_cache.json"))
await _url_cache.async_load()
await _asset_cache.async_load()
_LOGGER.info("Initialized Telegram file caches in %s", cache_dir)
return _url_cache, _asset_cache
async with _cache_lock:
# Double-check after acquiring lock
if _url_cache is not None:
return _url_cache, _asset_cache
import os
from pathlib import Path
data_dir = os.environ.get("NOTIFY_BRIDGE_DATA_DIR")
if not data_dir:
return None, None
cache_dir = Path(data_dir) / "cache"
url_cache = TelegramFileCache(JsonFileBackend(cache_dir / "telegram_url_cache.json"))
asset_cache = TelegramFileCache(JsonFileBackend(cache_dir / "telegram_asset_cache.json"))
await url_cache.async_load()
await asset_cache.async_load()
_url_cache = url_cache
_asset_cache = asset_cache
_LOGGER.info("Initialized Telegram file caches in %s", cache_dir)
return _url_cache, _asset_cache
async def check_tracker(tracker_id: int) -> dict[str, Any]:
@@ -133,6 +141,20 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
name=provider_name,
)
events, new_state = await nut.poll(collection_ids, state_dict)
elif provider_type == "google_photos":
from notify_bridge_core.providers.google_photos import GooglePhotosServiceProvider
async with aiohttp.ClientSession() as http_session:
gp = GooglePhotosServiceProvider(
http_session,
provider_config.get("client_id", ""),
provider_config.get("client_secret", ""),
provider_config.get("refresh_token", ""),
provider_name,
)
connected = await gp.connect()
if not connected:
return {"status": "error", "reason": "failed to connect to Google Photos"}
events, new_state = await gp.poll(collection_ids, state_dict)
else:
return {"status": "error", "reason": f"unsupported provider type: {provider_type}"}