refactor: unify test dispatch with real NotificationDispatcher
- Route scheduled/memory test sends through the same NotificationDispatcher the watcher uses — identical template rendering, media handling, caching - Add preview_url field to MediaAsset (transcoded mid-size), separate from thumbnail_url (small) and full_url (original). Dispatcher prefers preview_url - Fix sendMediaGroup cache: extract file_ids from Telegram response and store via async_set_many so repeat sends use cached file_ids - Parallelize asset downloads in _send_media_group with asyncio.gather - Filter unprocessed assets (archived/trashed/offline/no-thumbhash) at album parse time in ImmichAlbumData.from_api_response - Extract shared asset_to_media + collect_scheduled_assets into asset_utils.py (single source for test dispatch and future real scheduler) - Respect tracking config filters: limit, asset_type, favorite_only, min_rating - Random asset sampling for scheduled sends - Memory mode: "On This Day" date filter (same month+day, previous year) - Skip dispatch when no matching assets found - Remove ~250 lines of duplicated send logic from notifier.py - Fix restart-backend.sh: proper env var export, Python path resolution, error log
This commit is contained in:
@@ -33,6 +33,7 @@ class MediaAsset:
|
|||||||
description: str | None = None
|
description: str | None = None
|
||||||
tags: list[str] = field(default_factory=list)
|
tags: list[str] = field(default_factory=list)
|
||||||
thumbnail_url: str | None = None
|
thumbnail_url: str | None = None
|
||||||
|
preview_url: str | None = None # transcoded/mid-size — used for sending media
|
||||||
full_url: str | None = None
|
full_url: str | None = None
|
||||||
|
|
||||||
# Provider-specific extras (e.g., rating, location, people for Immich)
|
# Provider-specific extras (e.g., rating, location, people for Immich)
|
||||||
|
|||||||
@@ -156,7 +156,7 @@ class NotificationDispatcher:
|
|||||||
provider_urls = [u for u in (internal_url, external_url) if u]
|
provider_urls = [u for u in (internal_url, external_url) if u]
|
||||||
assets = []
|
assets = []
|
||||||
for asset in event.added_assets[:max_media]:
|
for asset in event.added_assets[:max_media]:
|
||||||
url = asset.full_url or asset.thumbnail_url
|
url = asset.preview_url or asset.thumbnail_url or asset.full_url
|
||||||
if url:
|
if url:
|
||||||
# Rewrite external URL to internal for faster LAN fetching
|
# Rewrite external URL to internal for faster LAN fetching
|
||||||
if internal_url and external_url and url.startswith(external_url):
|
if internal_url and external_url and url.startswith(external_url):
|
||||||
|
|||||||
@@ -430,15 +430,20 @@ class TelegramClient:
|
|||||||
|
|
||||||
media_json = []
|
media_json = []
|
||||||
upload_idx = 0
|
upload_idx = 0
|
||||||
|
# Track cache info per media_json entry (in order) so we can map
|
||||||
|
# Telegram response items back to cache keys for newly uploaded items.
|
||||||
|
# None = already cached (no need to store), tuple = needs caching.
|
||||||
|
media_cache_info: list[tuple[str, str, str | None] | None] = []
|
||||||
|
|
||||||
for i, item in enumerate(chunk):
|
# Resolve cache hits and collect download tasks in parallel
|
||||||
|
async def _fetch_asset(idx: int, item: dict) -> tuple[int, dict | None, bytes | None]:
|
||||||
|
"""Return (index, cache_entry_or_None, downloaded_bytes_or_None)."""
|
||||||
url = item.get("url")
|
url = item.get("url")
|
||||||
if not url:
|
if not url:
|
||||||
continue
|
return idx, None, None
|
||||||
media_type = item.get("type", "photo")
|
media_type = item.get("type", "photo")
|
||||||
custom_cache_key = item.get("cache_key")
|
custom_cache_key = item.get("cache_key")
|
||||||
|
|
||||||
# Check cache
|
|
||||||
ck = custom_cache_key or extract_asset_id_from_url(url) or url
|
ck = custom_cache_key or extract_asset_id_from_url(url) or url
|
||||||
ck_is_asset = is_asset_cache_key(ck)
|
ck_is_asset = is_asset_cache_key(ck)
|
||||||
item_cache = self._get_cache_for_key(ck, ck_is_asset)
|
item_cache = self._get_cache_for_key(ck, ck_is_asset)
|
||||||
@@ -447,34 +452,59 @@ class TelegramClient:
|
|||||||
cached = item_cache.get(ck, thumbhash=item_thumbhash) if item_cache else None
|
cached = item_cache.get(ck, thumbhash=item_thumbhash) if item_cache else None
|
||||||
|
|
||||||
if cached and cached.get("file_id"):
|
if cached and cached.get("file_id"):
|
||||||
mij: dict[str, Any] = {"type": media_type, "media": cached["file_id"]}
|
return idx, cached, None
|
||||||
|
|
||||||
|
try:
|
||||||
|
download_url = self._resolve_url(url)
|
||||||
|
dl_headers = item.get("headers") or {}
|
||||||
|
async with self._session.get(download_url, headers=dl_headers) as resp:
|
||||||
|
if resp.status != 200:
|
||||||
|
return idx, None, None
|
||||||
|
data = await resp.read()
|
||||||
|
if max_asset_data_size and len(data) > max_asset_data_size:
|
||||||
|
return idx, None, None
|
||||||
|
if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE:
|
||||||
|
return idx, None, None
|
||||||
|
if media_type == "photo":
|
||||||
|
exceeds, _, _, _ = check_photo_limits(data)
|
||||||
|
if exceeds:
|
||||||
|
return idx, None, None
|
||||||
|
return idx, None, data
|
||||||
|
except aiohttp.ClientError:
|
||||||
|
return idx, None, None
|
||||||
|
|
||||||
|
results = await asyncio.gather(
|
||||||
|
*(_fetch_asset(i, item) for i, item in enumerate(chunk))
|
||||||
|
)
|
||||||
|
|
||||||
|
for idx, cached_entry, data in results:
|
||||||
|
item = chunk[idx]
|
||||||
|
url = item.get("url")
|
||||||
|
if not url:
|
||||||
|
continue
|
||||||
|
media_type = item.get("type", "photo")
|
||||||
|
custom_cache_key = item.get("cache_key")
|
||||||
|
|
||||||
|
if cached_entry and cached_entry.get("file_id"):
|
||||||
|
mij: dict[str, Any] = {"type": media_type, "media": cached_entry["file_id"]}
|
||||||
|
media_cache_info.append(None) # already cached
|
||||||
|
elif data is not None:
|
||||||
|
attach_name = f"file{upload_idx}"
|
||||||
|
ct = item.get("content_type") or ("image/jpeg" if media_type == "photo" else "video/mp4")
|
||||||
|
ext = "jpg" if media_type == "photo" else "mp4"
|
||||||
|
form.add_field(attach_name, data, filename=f"media_{idx}.{ext}", content_type=ct)
|
||||||
|
mij = {"type": media_type, "media": f"attach://{attach_name}"}
|
||||||
|
upload_idx += 1
|
||||||
|
# Record cache key so we can store file_id from response
|
||||||
|
ck = custom_cache_key or extract_asset_id_from_url(url) or url
|
||||||
|
ck_is_asset = is_asset_cache_key(ck)
|
||||||
|
bare_ck = asset_id_from_cache_key(ck) if ck_is_asset else ck
|
||||||
|
th = self._thumbhash_resolver(bare_ck) if ck_is_asset and self._thumbhash_resolver else None
|
||||||
|
media_cache_info.append((ck, media_type, th))
|
||||||
else:
|
else:
|
||||||
try:
|
continue
|
||||||
download_url = self._resolve_url(url)
|
|
||||||
dl_headers = item.get("headers") or {}
|
|
||||||
async with self._session.get(download_url, headers=dl_headers) as resp:
|
|
||||||
if resp.status != 200:
|
|
||||||
continue
|
|
||||||
data = await resp.read()
|
|
||||||
if max_asset_data_size and len(data) > max_asset_data_size:
|
|
||||||
continue
|
|
||||||
if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE:
|
|
||||||
continue
|
|
||||||
if media_type == "photo":
|
|
||||||
exceeds, _, _, _ = check_photo_limits(data)
|
|
||||||
if exceeds:
|
|
||||||
continue
|
|
||||||
|
|
||||||
attach_name = f"file{upload_idx}"
|
if idx == 0 and chunk_idx == 0 and caption:
|
||||||
ct = item.get("content_type") or ("image/jpeg" if media_type == "photo" else "video/mp4")
|
|
||||||
ext = "jpg" if media_type == "photo" else "mp4"
|
|
||||||
form.add_field(attach_name, data, filename=f"media_{i}.{ext}", content_type=ct)
|
|
||||||
mij = {"type": media_type, "media": f"attach://{attach_name}"}
|
|
||||||
upload_idx += 1
|
|
||||||
except aiohttp.ClientError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if i == 0 and chunk_idx == 0 and caption:
|
|
||||||
mij["caption"] = caption
|
mij["caption"] = caption
|
||||||
mij["parse_mode"] = parse_mode
|
mij["parse_mode"] = parse_mode
|
||||||
media_json.append(mij)
|
media_json.append(mij)
|
||||||
@@ -489,7 +519,32 @@ class TelegramClient:
|
|||||||
async with self._session.post(telegram_url, data=form) as response:
|
async with self._session.post(telegram_url, data=form) as response:
|
||||||
result = await response.json()
|
result = await response.json()
|
||||||
if response.status == 200 and result.get("ok"):
|
if response.status == 200 and result.get("ok"):
|
||||||
all_message_ids.extend(msg.get("message_id") for msg in result.get("result", []))
|
result_msgs = result.get("result", [])
|
||||||
|
all_message_ids.extend(msg.get("message_id") for msg in result_msgs)
|
||||||
|
|
||||||
|
# Cache file_ids from response — map by position
|
||||||
|
cache_entries: list[tuple[str, str, str, str | None]] = []
|
||||||
|
for i, msg in enumerate(result_msgs):
|
||||||
|
if i >= len(media_cache_info):
|
||||||
|
break
|
||||||
|
info = media_cache_info[i]
|
||||||
|
if info is None:
|
||||||
|
continue # was a cache hit, skip
|
||||||
|
ck, mt, th = info
|
||||||
|
file_id = None
|
||||||
|
if msg.get("photo"):
|
||||||
|
file_id = msg["photo"][-1].get("file_id")
|
||||||
|
elif msg.get("video"):
|
||||||
|
file_id = msg["video"].get("file_id")
|
||||||
|
elif msg.get("document"):
|
||||||
|
file_id = msg["document"].get("file_id")
|
||||||
|
if file_id:
|
||||||
|
cache_entries.append((ck, file_id, mt, th))
|
||||||
|
if cache_entries:
|
||||||
|
# All entries in a chunk share the same cache backend
|
||||||
|
eff_cache = self._get_cache_for_key(cache_entries[0][0], is_asset_cache_key(cache_entries[0][0]))
|
||||||
|
if eff_cache:
|
||||||
|
await eff_cache.async_set_many(cache_entries)
|
||||||
else:
|
else:
|
||||||
return {"success": False, "error": result.get("description", "Unknown"), "failed_at_chunk": chunk_idx + 1}
|
return {"success": False, "error": result.get("description", "Unknown"), "failed_at_chunk": chunk_idx + 1}
|
||||||
except aiohttp.ClientError as err:
|
except aiohttp.ClientError as err:
|
||||||
|
|||||||
@@ -0,0 +1,184 @@
|
|||||||
|
"""Google Photos album change detection — produces generic ServiceEvent objects."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
from notify_bridge_core.models.events import EventType, ServiceEvent
|
||||||
|
from notify_bridge_core.models.media import MediaAsset, MediaType
|
||||||
|
from notify_bridge_core.providers.base import ServiceProviderType
|
||||||
|
|
||||||
|
from .constants import CACHE_HOST, PHOTO_DOWNLOAD_SUFFIX, PHOTO_THUMBNAIL_SUFFIX, VIDEO_DOWNLOAD_SUFFIX
|
||||||
|
from .models import GooglePhotosAlbumData, GooglePhotosMediaItem
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _asset_to_media(item: GooglePhotosMediaItem) -> MediaAsset:
|
||||||
|
"""Convert GooglePhotosMediaItem to generic MediaAsset."""
|
||||||
|
media_type = MediaType.VIDEO if item.is_video else MediaType.IMAGE
|
||||||
|
|
||||||
|
try:
|
||||||
|
created_at = datetime.fromisoformat(item.created_at.replace("Z", "+00:00"))
|
||||||
|
except (ValueError, AttributeError):
|
||||||
|
created_at = datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
if item.is_video:
|
||||||
|
full_url = f"{item.base_url}{VIDEO_DOWNLOAD_SUFFIX}" if item.base_url else ""
|
||||||
|
thumbnail_url = f"{item.base_url}{PHOTO_THUMBNAIL_SUFFIX}" if item.base_url else ""
|
||||||
|
else:
|
||||||
|
full_url = f"{item.base_url}{PHOTO_DOWNLOAD_SUFFIX}" if item.base_url else ""
|
||||||
|
thumbnail_url = f"{item.base_url}{PHOTO_THUMBNAIL_SUFFIX}" if item.base_url else ""
|
||||||
|
|
||||||
|
return MediaAsset(
|
||||||
|
id=item.id,
|
||||||
|
type=media_type,
|
||||||
|
filename=item.filename,
|
||||||
|
created_at=created_at,
|
||||||
|
description=item.description or None,
|
||||||
|
thumbnail_url=thumbnail_url,
|
||||||
|
preview_url=full_url, # Google Photos: transcoded via size suffix, same as full
|
||||||
|
full_url=full_url,
|
||||||
|
extra={
|
||||||
|
"google_photos_media_id": item.id,
|
||||||
|
"product_url": item.product_url,
|
||||||
|
"mime_type": item.mime_type,
|
||||||
|
"width": item.width,
|
||||||
|
"height": item.height,
|
||||||
|
"cache_key": f"{CACHE_HOST}:{item.id}",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _make_base_extra(album: GooglePhotosAlbumData) -> dict:
|
||||||
|
"""Build common extra dict for album events."""
|
||||||
|
return {
|
||||||
|
"album_url": album.product_url,
|
||||||
|
"shared": album.is_shared,
|
||||||
|
"photo_count": album.photo_count,
|
||||||
|
"video_count": album.video_count,
|
||||||
|
"asset_count": album.asset_count,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def detect_album_changes(
|
||||||
|
old_album: GooglePhotosAlbumData,
|
||||||
|
new_album: GooglePhotosAlbumData,
|
||||||
|
pending_asset_ids: set[str],
|
||||||
|
provider_name: str,
|
||||||
|
) -> tuple[list[ServiceEvent], set[str]]:
|
||||||
|
"""Detect changes between two album states, producing generic ServiceEvents.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
old_album: Previous album data.
|
||||||
|
new_album: Current album data.
|
||||||
|
pending_asset_ids: Assets detected but not yet fully processed.
|
||||||
|
provider_name: Name of the provider instance.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (list of ServiceEvents, updated pending_asset_ids).
|
||||||
|
"""
|
||||||
|
added_ids = new_album.asset_ids - old_album.asset_ids
|
||||||
|
removed_ids = old_album.asset_ids - new_album.asset_ids
|
||||||
|
|
||||||
|
pending = set(pending_asset_ids)
|
||||||
|
|
||||||
|
# Collect newly added processed assets
|
||||||
|
added_assets: list[GooglePhotosMediaItem] = []
|
||||||
|
for aid in added_ids:
|
||||||
|
if aid not in new_album.assets:
|
||||||
|
continue
|
||||||
|
asset = new_album.assets[aid]
|
||||||
|
if asset.is_processed:
|
||||||
|
added_assets.append(asset)
|
||||||
|
else:
|
||||||
|
pending.add(aid)
|
||||||
|
|
||||||
|
# Check if pending assets are now processed
|
||||||
|
for aid in list(pending):
|
||||||
|
if aid not in new_album.assets:
|
||||||
|
pending.discard(aid)
|
||||||
|
continue
|
||||||
|
asset = new_album.assets[aid]
|
||||||
|
if asset.is_processed:
|
||||||
|
added_assets.append(asset)
|
||||||
|
pending.discard(aid)
|
||||||
|
|
||||||
|
# Detect metadata changes
|
||||||
|
name_changed = old_album.name != new_album.name
|
||||||
|
sharing_changed = old_album.is_shared != new_album.is_shared
|
||||||
|
|
||||||
|
if not added_assets and not removed_ids and not name_changed and not sharing_changed:
|
||||||
|
return [], pending
|
||||||
|
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
extra = _make_base_extra(new_album)
|
||||||
|
events: list[ServiceEvent] = []
|
||||||
|
|
||||||
|
if added_assets:
|
||||||
|
media_assets = [_asset_to_media(a) for a in added_assets]
|
||||||
|
events.append(ServiceEvent(
|
||||||
|
event_type=EventType.ASSETS_ADDED,
|
||||||
|
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
|
||||||
|
provider_name=provider_name,
|
||||||
|
collection_id=new_album.id,
|
||||||
|
collection_name=new_album.name,
|
||||||
|
timestamp=now,
|
||||||
|
added_assets=media_assets,
|
||||||
|
removed_asset_ids=[],
|
||||||
|
added_count=len(added_assets),
|
||||||
|
removed_count=0,
|
||||||
|
extra=dict(extra),
|
||||||
|
))
|
||||||
|
|
||||||
|
if removed_ids:
|
||||||
|
events.append(ServiceEvent(
|
||||||
|
event_type=EventType.ASSETS_REMOVED,
|
||||||
|
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
|
||||||
|
provider_name=provider_name,
|
||||||
|
collection_id=new_album.id,
|
||||||
|
collection_name=new_album.name,
|
||||||
|
timestamp=now,
|
||||||
|
added_assets=[],
|
||||||
|
removed_asset_ids=list(removed_ids),
|
||||||
|
added_count=0,
|
||||||
|
removed_count=len(removed_ids),
|
||||||
|
extra=dict(extra),
|
||||||
|
))
|
||||||
|
|
||||||
|
if name_changed:
|
||||||
|
events.append(ServiceEvent(
|
||||||
|
event_type=EventType.COLLECTION_RENAMED,
|
||||||
|
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
|
||||||
|
provider_name=provider_name,
|
||||||
|
collection_id=new_album.id,
|
||||||
|
collection_name=new_album.name,
|
||||||
|
timestamp=now,
|
||||||
|
added_assets=[],
|
||||||
|
removed_asset_ids=[],
|
||||||
|
added_count=0,
|
||||||
|
removed_count=0,
|
||||||
|
old_name=old_album.name,
|
||||||
|
new_name=new_album.name,
|
||||||
|
extra=dict(extra),
|
||||||
|
))
|
||||||
|
|
||||||
|
if sharing_changed:
|
||||||
|
events.append(ServiceEvent(
|
||||||
|
event_type=EventType.SHARING_CHANGED,
|
||||||
|
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
|
||||||
|
provider_name=provider_name,
|
||||||
|
collection_id=new_album.id,
|
||||||
|
collection_name=new_album.name,
|
||||||
|
timestamp=now,
|
||||||
|
added_assets=[],
|
||||||
|
removed_asset_ids=[],
|
||||||
|
added_count=0,
|
||||||
|
removed_count=0,
|
||||||
|
old_shared=old_album.is_shared,
|
||||||
|
new_shared=new_album.is_shared,
|
||||||
|
extra=dict(extra),
|
||||||
|
))
|
||||||
|
|
||||||
|
return events, pending
|
||||||
@@ -4,11 +4,13 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
import random
|
import random
|
||||||
from datetime import datetime
|
from datetime import datetime, timezone
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
from notify_bridge_core.models.media import MediaAsset, MediaType
|
||||||
|
|
||||||
from .constants import ASSET_TYPE_IMAGE, ASSET_TYPE_VIDEO
|
from .constants import ASSET_TYPE_IMAGE, ASSET_TYPE_VIDEO
|
||||||
from .models import ImmichAssetInfo, SharedLinkInfo
|
from .models import ImmichAlbumData, ImmichAssetInfo, SharedLinkInfo
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -231,3 +233,123 @@ def build_asset_detail(
|
|||||||
)
|
)
|
||||||
|
|
||||||
return detail
|
return detail
|
||||||
|
|
||||||
|
|
||||||
|
def asset_to_media(asset: ImmichAssetInfo, external_url: str) -> MediaAsset:
|
||||||
|
"""Convert ImmichAssetInfo to generic MediaAsset."""
|
||||||
|
media_type = MediaType.IMAGE if asset.type == ASSET_TYPE_IMAGE else MediaType.VIDEO
|
||||||
|
try:
|
||||||
|
created_at = datetime.fromisoformat(asset.created_at.replace("Z", "+00:00"))
|
||||||
|
except (ValueError, AttributeError):
|
||||||
|
created_at = datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
return MediaAsset(
|
||||||
|
id=asset.id,
|
||||||
|
type=media_type,
|
||||||
|
filename=asset.filename,
|
||||||
|
created_at=created_at,
|
||||||
|
owner_name=asset.owner_name or None,
|
||||||
|
description=asset.description or None,
|
||||||
|
tags=list(asset.people),
|
||||||
|
thumbnail_url=f"{external_url}/api/assets/{asset.id}/thumbnail",
|
||||||
|
preview_url=f"{external_url}/api/assets/{asset.id}/thumbnail?size=preview",
|
||||||
|
full_url=f"{external_url}/api/assets/{asset.id}/original",
|
||||||
|
extra={
|
||||||
|
"owner_id": asset.owner_id,
|
||||||
|
"is_favorite": asset.is_favorite,
|
||||||
|
"rating": asset.rating,
|
||||||
|
"latitude": asset.latitude,
|
||||||
|
"longitude": asset.longitude,
|
||||||
|
"city": asset.city,
|
||||||
|
"state": asset.state,
|
||||||
|
"country": asset.country,
|
||||||
|
"thumbhash": asset.thumbhash,
|
||||||
|
"file_size": asset.file_size,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def collect_scheduled_assets(
|
||||||
|
albums: dict[str, "ImmichAlbumData"],
|
||||||
|
shared_links: dict[str, list[SharedLinkInfo]],
|
||||||
|
external_url: str,
|
||||||
|
*,
|
||||||
|
limit: int = 10,
|
||||||
|
asset_type: str = "all",
|
||||||
|
favorite_only: bool = False,
|
||||||
|
min_rating: int = 0,
|
||||||
|
is_memory: bool = False,
|
||||||
|
) -> tuple[list[MediaAsset], list[dict[str, Any]]]:
|
||||||
|
"""Collect and filter assets for scheduled/memory sends.
|
||||||
|
|
||||||
|
This is the SINGLE function used by both test dispatch and the real
|
||||||
|
scheduled runner. Assets are filtered, randomly sampled, and converted
|
||||||
|
to MediaAsset objects.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
albums: Album ID → ImmichAlbumData mapping.
|
||||||
|
shared_links: Album ID → shared links mapping.
|
||||||
|
external_url: External domain for URL construction.
|
||||||
|
limit: Max number of assets to return.
|
||||||
|
asset_type: "all", "photo", or "video".
|
||||||
|
favorite_only: Only include favorites.
|
||||||
|
min_rating: Minimum rating filter.
|
||||||
|
is_memory: If True, filter to "On This Day" (same month+day, previous year).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(list of MediaAsset, list of collection info dicts)
|
||||||
|
"""
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
memory_date = now.isoformat() if is_memory else None
|
||||||
|
|
||||||
|
all_eligible: list[ImmichAssetInfo] = []
|
||||||
|
# Track which album each asset belongs to for public URL construction
|
||||||
|
asset_album_map: dict[str, tuple[str, str]] = {} # asset_id → (album_id, public_url)
|
||||||
|
collections_extra: list[dict[str, Any]] = []
|
||||||
|
|
||||||
|
for album_id, album in albums.items():
|
||||||
|
links = shared_links.get(album_id, [])
|
||||||
|
album_public_url = get_public_url(external_url, links) or ""
|
||||||
|
|
||||||
|
collections_extra.append({
|
||||||
|
"name": album.name,
|
||||||
|
"url": album_public_url or f"{external_url}/albums/{album_id}",
|
||||||
|
"public_url": album_public_url,
|
||||||
|
"asset_count": album.asset_count,
|
||||||
|
"shared": album.shared,
|
||||||
|
"photo_count": album.photo_count,
|
||||||
|
"video_count": album.video_count,
|
||||||
|
"owner": album.owner,
|
||||||
|
})
|
||||||
|
|
||||||
|
filtered = filter_assets(
|
||||||
|
list(album.assets.values()),
|
||||||
|
favorite_only=favorite_only,
|
||||||
|
min_rating=min_rating,
|
||||||
|
asset_type=asset_type,
|
||||||
|
memory_date=memory_date,
|
||||||
|
)
|
||||||
|
for asset in filtered:
|
||||||
|
if asset.id not in asset_album_map:
|
||||||
|
asset_album_map[asset.id] = (album_id, album_public_url)
|
||||||
|
all_eligible.append(asset)
|
||||||
|
|
||||||
|
# Random sample
|
||||||
|
if len(all_eligible) > limit:
|
||||||
|
selected = random.sample(all_eligible, limit)
|
||||||
|
else:
|
||||||
|
random.shuffle(all_eligible)
|
||||||
|
selected = all_eligible
|
||||||
|
|
||||||
|
# Convert to MediaAsset with public URLs
|
||||||
|
result: list[MediaAsset] = []
|
||||||
|
for asset in selected:
|
||||||
|
media = asset_to_media(asset, external_url)
|
||||||
|
_, album_pub_url = asset_album_map.get(asset.id, ("", ""))
|
||||||
|
if album_pub_url:
|
||||||
|
media.extra["public_url"] = f"{album_pub_url}/photos/{asset.id}"
|
||||||
|
else:
|
||||||
|
media.extra.setdefault("public_url", "")
|
||||||
|
result.append(media)
|
||||||
|
|
||||||
|
return result, collections_extra
|
||||||
|
|||||||
@@ -6,48 +6,14 @@ import logging
|
|||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
from notify_bridge_core.models.events import EventType, ServiceEvent
|
from notify_bridge_core.models.events import EventType, ServiceEvent
|
||||||
from notify_bridge_core.models.media import MediaAsset, MediaType
|
|
||||||
from notify_bridge_core.providers.base import ServiceProviderType
|
from notify_bridge_core.providers.base import ServiceProviderType
|
||||||
|
|
||||||
|
from .asset_utils import asset_to_media
|
||||||
from .models import ImmichAlbumData, ImmichAssetInfo
|
from .models import ImmichAlbumData, ImmichAssetInfo
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def _asset_to_media(asset: ImmichAssetInfo, external_url: str) -> MediaAsset:
|
|
||||||
"""Convert ImmichAssetInfo to generic MediaAsset."""
|
|
||||||
media_type = MediaType.IMAGE if asset.type == "IMAGE" else MediaType.VIDEO
|
|
||||||
|
|
||||||
try:
|
|
||||||
created_at = datetime.fromisoformat(asset.created_at.replace("Z", "+00:00"))
|
|
||||||
except (ValueError, AttributeError):
|
|
||||||
created_at = datetime.now(timezone.utc)
|
|
||||||
|
|
||||||
return MediaAsset(
|
|
||||||
id=asset.id,
|
|
||||||
type=media_type,
|
|
||||||
filename=asset.filename,
|
|
||||||
created_at=created_at,
|
|
||||||
owner_name=asset.owner_name or None,
|
|
||||||
description=asset.description or None,
|
|
||||||
tags=list(asset.people),
|
|
||||||
thumbnail_url=f"{external_url}/api/assets/{asset.id}/thumbnail",
|
|
||||||
full_url=f"{external_url}/api/assets/{asset.id}/original",
|
|
||||||
extra={
|
|
||||||
"owner_id": asset.owner_id,
|
|
||||||
"is_favorite": asset.is_favorite,
|
|
||||||
"rating": asset.rating,
|
|
||||||
"latitude": asset.latitude,
|
|
||||||
"longitude": asset.longitude,
|
|
||||||
"city": asset.city,
|
|
||||||
"state": asset.state,
|
|
||||||
"country": asset.country,
|
|
||||||
"thumbhash": asset.thumbhash,
|
|
||||||
"file_size": asset.file_size,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _make_base_extra(new_album: ImmichAlbumData, external_url: str) -> dict:
|
def _make_base_extra(new_album: ImmichAlbumData, external_url: str) -> dict:
|
||||||
"""Build the common extra dict for album events."""
|
"""Build the common extra dict for album events."""
|
||||||
return {
|
return {
|
||||||
@@ -119,7 +85,7 @@ def detect_album_changes(
|
|||||||
|
|
||||||
# Emit one event per change type detected
|
# Emit one event per change type detected
|
||||||
if added_assets:
|
if added_assets:
|
||||||
media_assets = [_asset_to_media(a, external_url) for a in added_assets]
|
media_assets = [asset_to_media(a, external_url) for a in added_assets]
|
||||||
events.append(ServiceEvent(
|
events.append(ServiceEvent(
|
||||||
event_type=EventType.ASSETS_ADDED,
|
event_type=EventType.ASSETS_ADDED,
|
||||||
provider_type=ServiceProviderType.IMMICH,
|
provider_type=ServiceProviderType.IMMICH,
|
||||||
|
|||||||
@@ -177,6 +177,8 @@ class ImmichAlbumData:
|
|||||||
|
|
||||||
for asset_data in assets_data:
|
for asset_data in assets_data:
|
||||||
asset = ImmichAssetInfo.from_api_response(asset_data, users_cache)
|
asset = ImmichAssetInfo.from_api_response(asset_data, users_cache)
|
||||||
|
if not asset.is_processed:
|
||||||
|
continue
|
||||||
asset_ids.add(asset.id)
|
asset_ids.add(asset.id)
|
||||||
assets[asset.id] = asset
|
assets[asset.id] = asset
|
||||||
people.update(asset.people)
|
people.update(asset.people)
|
||||||
|
|||||||
@@ -58,6 +58,7 @@ def build_template_context(
|
|||||||
"description": asset.description or "",
|
"description": asset.description or "",
|
||||||
"tags": asset.tags,
|
"tags": asset.tags,
|
||||||
"thumbnail_url": asset.thumbnail_url or "",
|
"thumbnail_url": asset.thumbnail_url or "",
|
||||||
|
"preview_url": asset.preview_url or "",
|
||||||
"full_url": asset.full_url or "",
|
"full_url": asset.full_url or "",
|
||||||
}
|
}
|
||||||
# Flatten extras into asset dict for template access
|
# Flatten extras into asset dict for template access
|
||||||
|
|||||||
@@ -21,7 +21,8 @@ from ..database.models import (
|
|||||||
TrackingConfig,
|
TrackingConfig,
|
||||||
User,
|
User,
|
||||||
)
|
)
|
||||||
from ..services.notifier import send_real_data_notification, send_test_notification
|
from ..services.notifier import send_test_notification
|
||||||
|
from ..services.test_dispatch import dispatch_test_notification
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -242,62 +243,14 @@ async def test_notification_tracker_target(
|
|||||||
r = await send_test_notification(target, locale=effective_locale)
|
r = await send_test_notification(target, locale=effective_locale)
|
||||||
return {"target": target.name, **r}
|
return {"target": target.name, **r}
|
||||||
|
|
||||||
# For periodic/scheduled/memory — fetch real data from provider
|
# For periodic/scheduled/memory — dispatch through the real NotificationDispatcher
|
||||||
template_config = None
|
r = await dispatch_test_notification(
|
||||||
template_str = ""
|
session=session,
|
||||||
if tt.template_config_id:
|
tracker=tracker,
|
||||||
template_config = await session.get(TemplateConfig, tt.template_config_id)
|
tt=tt,
|
||||||
if template_config:
|
|
||||||
slot_map = {
|
|
||||||
"periodic": "periodic_summary_message",
|
|
||||||
"scheduled": "scheduled_assets_message",
|
|
||||||
"memory": "memory_mode_message",
|
|
||||||
}
|
|
||||||
slot_name = slot_map[test_type]
|
|
||||||
slot_result = await session.exec(
|
|
||||||
select(TemplateSlot).where(
|
|
||||||
TemplateSlot.config_id == template_config.id,
|
|
||||||
TemplateSlot.slot_name == slot_name,
|
|
||||||
TemplateSlot.locale == effective_locale,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
slot = slot_result.first()
|
|
||||||
if not slot:
|
|
||||||
# Fallback: any locale
|
|
||||||
slot_result2 = await session.exec(
|
|
||||||
select(TemplateSlot).where(
|
|
||||||
TemplateSlot.config_id == template_config.id,
|
|
||||||
TemplateSlot.slot_name == slot_name,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
slot = slot_result2.first()
|
|
||||||
template_str = slot.template if slot else ""
|
|
||||||
|
|
||||||
# Load provider and tracker data eagerly before aiohttp context
|
|
||||||
provider = await session.get(ServiceProvider, tracker.provider_id)
|
|
||||||
if not provider:
|
|
||||||
raise HTTPException(status_code=404, detail="Provider not found")
|
|
||||||
provider_config = dict(provider.config)
|
|
||||||
collection_ids = list(tracker.collection_ids or [])
|
|
||||||
|
|
||||||
# Load tracking config to get memory_source
|
|
||||||
memory_source = "albums"
|
|
||||||
if tt.tracking_config_id:
|
|
||||||
tracking_config = await session.get(TrackingConfig, tt.tracking_config_id)
|
|
||||||
if tracking_config:
|
|
||||||
memory_source = tracking_config.memory_source or "albums"
|
|
||||||
|
|
||||||
# Fetch real data from provider
|
|
||||||
r = await send_real_data_notification(
|
|
||||||
target=target,
|
target=target,
|
||||||
template_str=template_str,
|
|
||||||
test_type=test_type,
|
test_type=test_type,
|
||||||
provider_type=provider.type,
|
locale=effective_locale,
|
||||||
provider_config=provider_config,
|
|
||||||
collection_ids=collection_ids,
|
|
||||||
date_format=template_config.date_format if template_config else "%d.%m.%Y, %H:%M UTC",
|
|
||||||
date_only_format=template_config.date_only_format if template_config and template_config.date_only_format else "%d.%m.%Y",
|
|
||||||
memory_source=memory_source,
|
|
||||||
)
|
)
|
||||||
return {"target": target.name, **r}
|
return {"target": target.name, **r}
|
||||||
|
|
||||||
|
|||||||
@@ -54,9 +54,10 @@ async def _load_receivers(target_id: int) -> list[dict]:
|
|||||||
|
|
||||||
|
|
||||||
async def send_to_target(target: NotificationTarget, message: str) -> dict:
|
async def send_to_target(target: NotificationTarget, message: str) -> dict:
|
||||||
"""Send a message to a target, broadcasting to all receivers.
|
"""Send a text message to a target, broadcasting to all receivers.
|
||||||
|
|
||||||
This is the SINGLE send path used by dispatch, test, and real-data notifications.
|
Used for basic test and template preview sends (text only, no media).
|
||||||
|
Real notifications with media go through NotificationDispatcher.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
receivers = await _load_receivers(target.id)
|
receivers = await _load_receivers(target.id)
|
||||||
@@ -356,241 +357,3 @@ async def send_test_template_notification(
|
|||||||
return {"success": False, "error": f"Template render error: {e}"}
|
return {"success": False, "error": f"Template render error: {e}"}
|
||||||
|
|
||||||
return await send_to_target(target, message)
|
return await send_to_target(target, message)
|
||||||
|
|
||||||
|
|
||||||
async def send_real_data_notification(
|
|
||||||
target: NotificationTarget,
|
|
||||||
template_str: str,
|
|
||||||
test_type: str,
|
|
||||||
provider_type: str,
|
|
||||||
provider_config: dict,
|
|
||||||
collection_ids: list[str],
|
|
||||||
date_format: str = "%d.%m.%Y, %H:%M UTC",
|
|
||||||
date_only_format: str = "%d.%m.%Y",
|
|
||||||
memory_source: str = "albums",
|
|
||||||
) -> dict:
|
|
||||||
"""Fetch real data from provider, render template, and send."""
|
|
||||||
from jinja2.sandbox import SandboxedEnvironment
|
|
||||||
|
|
||||||
if not template_str:
|
|
||||||
return {"success": False, "error": f"No template configured for {test_type}"}
|
|
||||||
|
|
||||||
try:
|
|
||||||
ctx = await _build_real_context(
|
|
||||||
provider_type, provider_config, collection_ids,
|
|
||||||
test_type, date_format, date_only_format,
|
|
||||||
memory_source=memory_source,
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
_LOGGER.error("Failed to fetch real data for test: %s", e)
|
|
||||||
return {"success": False, "error": f"Failed to fetch provider data: {e}"}
|
|
||||||
|
|
||||||
ctx["target_type"] = target.type
|
|
||||||
ctx["date_format"] = date_format
|
|
||||||
ctx["date_only_format"] = date_only_format
|
|
||||||
|
|
||||||
try:
|
|
||||||
env = SandboxedEnvironment(autoescape=False)
|
|
||||||
tmpl = env.from_string(template_str)
|
|
||||||
message = tmpl.render(**ctx)
|
|
||||||
except Exception as e:
|
|
||||||
return {"success": False, "error": f"Template render error: {e}"}
|
|
||||||
|
|
||||||
return await send_to_target(target, message)
|
|
||||||
|
|
||||||
|
|
||||||
async def _build_real_context(
|
|
||||||
provider_type: str,
|
|
||||||
provider_config: dict,
|
|
||||||
collection_ids: list[str],
|
|
||||||
test_type: str,
|
|
||||||
date_format: str,
|
|
||||||
date_only_format: str,
|
|
||||||
memory_source: str = "albums",
|
|
||||||
) -> dict:
|
|
||||||
"""Build template context from real provider data."""
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
|
|
||||||
if provider_type != "immich":
|
|
||||||
return {"date": datetime.now(timezone.utc).strftime(date_only_format)}
|
|
||||||
|
|
||||||
from notify_bridge_core.providers.immich import ImmichServiceProvider
|
|
||||||
|
|
||||||
async with aiohttp.ClientSession() as http_session:
|
|
||||||
immich = ImmichServiceProvider(
|
|
||||||
http_session,
|
|
||||||
provider_config.get("url", ""),
|
|
||||||
provider_config.get("api_key", ""),
|
|
||||||
provider_config.get("external_domain"),
|
|
||||||
"Immich",
|
|
||||||
)
|
|
||||||
connected = await immich.connect()
|
|
||||||
if not connected:
|
|
||||||
raise RuntimeError("Failed to connect to Immich")
|
|
||||||
|
|
||||||
ext_domain = provider_config.get("external_domain") or provider_config.get("url", "")
|
|
||||||
|
|
||||||
# --- Native Immich memories ---
|
|
||||||
if test_type == "memory" and memory_source == "native":
|
|
||||||
memories = await immich.client.get_memories()
|
|
||||||
all_assets: list[dict[str, Any]] = []
|
|
||||||
tracked_ids = set(collection_ids) if collection_ids else None
|
|
||||||
for mem in memories:
|
|
||||||
for raw_asset in mem.get("assets", []):
|
|
||||||
asset_id = raw_asset.get("id", "")
|
|
||||||
# Optional album filtering: keep only assets in tracked albums
|
|
||||||
if tracked_ids:
|
|
||||||
asset_albums = raw_asset.get("albums", [])
|
|
||||||
if not any(a.get("id") in tracked_ids for a in asset_albums):
|
|
||||||
continue
|
|
||||||
exif = raw_asset.get("exifInfo") or {}
|
|
||||||
people_raw = raw_asset.get("people", [])
|
|
||||||
all_assets.append({
|
|
||||||
"id": asset_id,
|
|
||||||
"filename": raw_asset.get("originalFileName", ""),
|
|
||||||
"type": (raw_asset.get("type") or "IMAGE").upper(),
|
|
||||||
"created_at": raw_asset.get("fileCreatedAt", raw_asset.get("createdAt", "")),
|
|
||||||
"owner": "",
|
|
||||||
"description": exif.get("description", "") or raw_asset.get("description", "") or "",
|
|
||||||
"people": [p.get("name", "") for p in people_raw if p.get("name")],
|
|
||||||
"is_favorite": raw_asset.get("isFavorite", False),
|
|
||||||
"rating": exif.get("rating"),
|
|
||||||
"city": exif.get("city", "") or "",
|
|
||||||
"state": exif.get("state", "") or "",
|
|
||||||
"country": exif.get("country", "") or "",
|
|
||||||
"public_url": "",
|
|
||||||
"url": f"{ext_domain.rstrip('/')}/api/assets/{asset_id}/original",
|
|
||||||
"photo_url": f"{ext_domain.rstrip('/')}/api/assets/{asset_id}/thumbnail",
|
|
||||||
"year": mem.get("data", {}).get("year"),
|
|
||||||
})
|
|
||||||
|
|
||||||
now = datetime.now(timezone.utc)
|
|
||||||
ctx: dict[str, Any] = {
|
|
||||||
"date": now.strftime(date_only_format),
|
|
||||||
"timestamp": now.isoformat(),
|
|
||||||
"service_name": "Immich",
|
|
||||||
"service_type": "immich",
|
|
||||||
"collections": [],
|
|
||||||
"albums": [],
|
|
||||||
"assets": all_assets,
|
|
||||||
"common_date": "",
|
|
||||||
"common_location": "",
|
|
||||||
"collection_name": "", "album_name": "",
|
|
||||||
"public_url": "", "album_url": "",
|
|
||||||
"shared": False, "photo_count": 0, "video_count": 0, "owner": "",
|
|
||||||
}
|
|
||||||
people: set[str] = set()
|
|
||||||
for a in all_assets:
|
|
||||||
people.update(a.get("people", []))
|
|
||||||
ctx["people"] = list(people)
|
|
||||||
ctx["has_videos"] = any(a.get("type") == "VIDEO" for a in all_assets)
|
|
||||||
ctx["has_photos"] = any(a.get("type") == "IMAGE" for a in all_assets)
|
|
||||||
ctx["added_count"] = len(all_assets)
|
|
||||||
ctx["added_assets"] = all_assets
|
|
||||||
ctx["protected_url"] = ""
|
|
||||||
return ctx
|
|
||||||
|
|
||||||
# --- Album-based asset collection (default path) ---
|
|
||||||
collections: list[dict[str, Any]] = []
|
|
||||||
all_assets: list[dict[str, Any]] = []
|
|
||||||
|
|
||||||
for album_id in collection_ids:
|
|
||||||
album = await immich.client.get_album(album_id)
|
|
||||||
if not album:
|
|
||||||
continue
|
|
||||||
|
|
||||||
shared_links = await immich.client.get_shared_links(album_id)
|
|
||||||
ext_domain = provider_config.get("external_domain") or provider_config.get("url", "")
|
|
||||||
album_public_url = ""
|
|
||||||
for link in shared_links:
|
|
||||||
if link.is_accessible and not link.is_expired and not link.has_password:
|
|
||||||
album_public_url = f"{ext_domain.rstrip('/')}/share/{link.key}"
|
|
||||||
break
|
|
||||||
|
|
||||||
collections.append({
|
|
||||||
"name": album.name,
|
|
||||||
"url": album_public_url or f"{ext_domain.rstrip('/')}/albums/{album_id}",
|
|
||||||
"public_url": album_public_url,
|
|
||||||
"asset_count": album.asset_count,
|
|
||||||
"shared": album.shared,
|
|
||||||
"photo_count": album.photo_count,
|
|
||||||
"video_count": album.video_count,
|
|
||||||
"owner": album.owner,
|
|
||||||
})
|
|
||||||
|
|
||||||
for asset_id, asset in list(album.assets.items())[:10]:
|
|
||||||
asset_public_url = f"{album_public_url}/photos/{asset_id}" if album_public_url else ""
|
|
||||||
all_assets.append({
|
|
||||||
"id": asset.id,
|
|
||||||
"filename": asset.filename,
|
|
||||||
"type": asset.type.upper(),
|
|
||||||
"created_at": asset.created_at,
|
|
||||||
"owner": asset.owner_name or "",
|
|
||||||
"description": asset.description or "",
|
|
||||||
"people": list(asset.people),
|
|
||||||
"is_favorite": asset.is_favorite,
|
|
||||||
"rating": asset.rating,
|
|
||||||
"city": asset.city or "",
|
|
||||||
"state": asset.state or "",
|
|
||||||
"country": asset.country or "",
|
|
||||||
"public_url": asset_public_url,
|
|
||||||
"url": f"{ext_domain.rstrip('/')}/api/assets/{asset.id}/original",
|
|
||||||
"photo_url": f"{ext_domain.rstrip('/')}/api/assets/{asset.id}/thumbnail",
|
|
||||||
})
|
|
||||||
|
|
||||||
now = datetime.now(timezone.utc)
|
|
||||||
ctx: dict[str, Any] = {
|
|
||||||
"date": now.strftime(date_only_format),
|
|
||||||
"timestamp": now.isoformat(),
|
|
||||||
"service_name": "Immich",
|
|
||||||
"service_type": "immich",
|
|
||||||
"collections": collections,
|
|
||||||
"albums": collections,
|
|
||||||
"assets": all_assets,
|
|
||||||
"common_date": "",
|
|
||||||
"common_location": "",
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(all_assets) > 1:
|
|
||||||
dates = {a.get("created_at", "")[:10] for a in all_assets if a.get("created_at")}
|
|
||||||
if len(dates) == 1:
|
|
||||||
try:
|
|
||||||
ctx["common_date"] = datetime.fromisoformat(dates.pop()).strftime(date_only_format)
|
|
||||||
except (ValueError, TypeError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
locations = set()
|
|
||||||
for a in all_assets:
|
|
||||||
city = a.get("city", "")
|
|
||||||
country = a.get("country", "")
|
|
||||||
locations.add(f"{city}, {country}" if city and country else city or "")
|
|
||||||
if len(locations) == 1 and "" not in locations:
|
|
||||||
ctx["common_location"] = locations.pop()
|
|
||||||
|
|
||||||
if collections:
|
|
||||||
first = collections[0]
|
|
||||||
ctx.update({
|
|
||||||
"collection_name": first["name"], "album_name": first["name"],
|
|
||||||
"public_url": first.get("public_url", ""), "album_url": first.get("url", ""),
|
|
||||||
"shared": first.get("shared", False),
|
|
||||||
"photo_count": first.get("photo_count", 0), "video_count": first.get("video_count", 0),
|
|
||||||
"owner": first.get("owner", ""),
|
|
||||||
})
|
|
||||||
else:
|
|
||||||
ctx.update({
|
|
||||||
"collection_name": "", "album_name": "",
|
|
||||||
"public_url": "", "album_url": "",
|
|
||||||
"shared": False, "photo_count": 0, "video_count": 0, "owner": "",
|
|
||||||
})
|
|
||||||
|
|
||||||
people: set[str] = set()
|
|
||||||
for a in all_assets:
|
|
||||||
people.update(a.get("people", []))
|
|
||||||
ctx["people"] = list(people)
|
|
||||||
ctx["has_videos"] = any(a.get("type") == "VIDEO" for a in all_assets)
|
|
||||||
ctx["has_photos"] = any(a.get("type") == "IMAGE" for a in all_assets)
|
|
||||||
ctx["added_count"] = len(all_assets)
|
|
||||||
ctx["added_assets"] = all_assets
|
|
||||||
ctx["protected_url"] = ""
|
|
||||||
|
|
||||||
return ctx
|
|
||||||
|
|||||||
@@ -0,0 +1,314 @@
|
|||||||
|
"""Test dispatch — manual trigger through the real NotificationDispatcher.
|
||||||
|
|
||||||
|
No separate logic — just builds a ServiceEvent + TargetConfig from DB
|
||||||
|
objects and dispatches through the same path the watcher uses.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import aiohttp
|
||||||
|
from sqlmodel import select
|
||||||
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||||
|
|
||||||
|
from notify_bridge_core.models.events import EventType, ServiceEvent
|
||||||
|
from notify_bridge_core.models.media import MediaAsset
|
||||||
|
from notify_bridge_core.notifications.dispatcher import NotificationDispatcher, TargetConfig
|
||||||
|
from notify_bridge_core.providers.base import ServiceProviderType
|
||||||
|
|
||||||
|
from ..database.models import (
|
||||||
|
NotificationTarget,
|
||||||
|
NotificationTracker,
|
||||||
|
NotificationTrackerTarget,
|
||||||
|
ServiceProvider,
|
||||||
|
TemplateConfig,
|
||||||
|
TemplateSlot,
|
||||||
|
TrackingConfig,
|
||||||
|
)
|
||||||
|
from .dispatch_helpers import _resolve_target
|
||||||
|
from .watcher import _get_telegram_caches
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Maps test_type → DB template slot name
|
||||||
|
_TEST_TYPE_SLOT_MAP = {
|
||||||
|
"periodic": "periodic_summary_message",
|
||||||
|
"scheduled": "scheduled_assets_message",
|
||||||
|
"memory": "memory_mode_message",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def dispatch_test_notification(
|
||||||
|
*,
|
||||||
|
session: AsyncSession,
|
||||||
|
tracker: NotificationTracker,
|
||||||
|
tt: NotificationTrackerTarget,
|
||||||
|
target: NotificationTarget,
|
||||||
|
test_type: str,
|
||||||
|
locale: str = "en",
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Dispatch a test notification through the real NotificationDispatcher."""
|
||||||
|
|
||||||
|
# Load provider
|
||||||
|
provider = await session.get(ServiceProvider, tracker.provider_id)
|
||||||
|
if not provider:
|
||||||
|
return {"success": False, "error": "Provider not found"}
|
||||||
|
provider_config = dict(provider.config)
|
||||||
|
collection_ids = list(tracker.collection_ids or [])
|
||||||
|
|
||||||
|
# Load tracking config
|
||||||
|
tracking_config = None
|
||||||
|
if tt.tracking_config_id:
|
||||||
|
tracking_config = await session.get(TrackingConfig, tt.tracking_config_id)
|
||||||
|
|
||||||
|
# Load template slots keyed by EventType.SCHEDULED_MESSAGE.value
|
||||||
|
template_config = None
|
||||||
|
template_slots: dict[str, dict[str, str]] | None = None
|
||||||
|
slot_name = _TEST_TYPE_SLOT_MAP.get(test_type, test_type)
|
||||||
|
if tt.template_config_id:
|
||||||
|
template_config = await session.get(TemplateConfig, tt.template_config_id)
|
||||||
|
if template_config:
|
||||||
|
slot_result = await session.exec(
|
||||||
|
select(TemplateSlot).where(
|
||||||
|
TemplateSlot.config_id == template_config.id,
|
||||||
|
TemplateSlot.slot_name == slot_name,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
locale_map: dict[str, str] = {}
|
||||||
|
for s in slot_result.all():
|
||||||
|
locale_map[s.locale] = s.template
|
||||||
|
if locale_map:
|
||||||
|
template_slots = {EventType.SCHEDULED_MESSAGE.value: locale_map}
|
||||||
|
|
||||||
|
# Resolve target config + receivers (same as watcher)
|
||||||
|
resolved = await _resolve_target(session, target)
|
||||||
|
|
||||||
|
target_cfg = TargetConfig(
|
||||||
|
type=resolved["target_type"],
|
||||||
|
config=resolved["target_config"],
|
||||||
|
template_slots=template_slots,
|
||||||
|
locale=locale,
|
||||||
|
date_format=template_config.date_format if template_config else "%d.%m.%Y, %H:%M UTC",
|
||||||
|
date_only_format=template_config.date_only_format if template_config and template_config.date_only_format else "%d.%m.%Y",
|
||||||
|
provider_api_key=provider_config.get("api_key"),
|
||||||
|
provider_internal_url=provider_config.get("url", ""),
|
||||||
|
provider_external_url=provider_config.get("external_domain", ""),
|
||||||
|
receivers=resolved["receivers"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Fetch assets and build event
|
||||||
|
event = await _build_event(
|
||||||
|
provider_type=provider.type,
|
||||||
|
provider_config=provider_config,
|
||||||
|
provider_name=provider.name or provider.type,
|
||||||
|
tracker_name=tracker.name or "",
|
||||||
|
tracker_filters=dict(tracker.filters) if tracker.filters else {},
|
||||||
|
collection_ids=collection_ids,
|
||||||
|
test_type=test_type,
|
||||||
|
tracking_config=tracking_config,
|
||||||
|
)
|
||||||
|
if event is None:
|
||||||
|
return {"success": False, "error": "No data returned from provider"}
|
||||||
|
if not event.added_assets and test_type in ("scheduled", "memory"):
|
||||||
|
return {"success": False, "error": "No matching assets found" + (" for today" if test_type == "memory" else "")}
|
||||||
|
|
||||||
|
# Dispatch through the real NotificationDispatcher
|
||||||
|
url_cache, asset_cache = await _get_telegram_caches()
|
||||||
|
dispatcher = NotificationDispatcher(url_cache=url_cache, asset_cache=asset_cache)
|
||||||
|
results = await dispatcher.dispatch(event, [target_cfg])
|
||||||
|
|
||||||
|
if not results:
|
||||||
|
return {"success": False, "error": "No dispatch results"}
|
||||||
|
return results[0]
|
||||||
|
|
||||||
|
|
||||||
|
async def _build_event(
|
||||||
|
*,
|
||||||
|
provider_type: str,
|
||||||
|
provider_config: dict,
|
||||||
|
provider_name: str,
|
||||||
|
tracker_name: str,
|
||||||
|
tracker_filters: dict,
|
||||||
|
collection_ids: list[str],
|
||||||
|
test_type: str,
|
||||||
|
tracking_config: TrackingConfig | None = None,
|
||||||
|
) -> ServiceEvent | None:
|
||||||
|
"""Build a ServiceEvent with real provider data."""
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
if provider_type == "immich":
|
||||||
|
return await _build_immich_event(
|
||||||
|
provider_config=provider_config,
|
||||||
|
provider_name=provider_name,
|
||||||
|
tracker_name=tracker_name,
|
||||||
|
collection_ids=collection_ids,
|
||||||
|
test_type=test_type,
|
||||||
|
tracking_config=tracking_config,
|
||||||
|
)
|
||||||
|
elif provider_type == "scheduler":
|
||||||
|
from notify_bridge_core.providers.scheduler import SchedulerServiceProvider
|
||||||
|
custom_vars = tracker_filters.get("custom_variables", {})
|
||||||
|
sched = SchedulerServiceProvider(
|
||||||
|
name=provider_name,
|
||||||
|
tracker_name=tracker_name,
|
||||||
|
custom_variables=custom_vars,
|
||||||
|
)
|
||||||
|
events, _ = await sched.poll(collection_ids, {})
|
||||||
|
return events[0] if events else None
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def _build_immich_event(
|
||||||
|
*,
|
||||||
|
provider_config: dict,
|
||||||
|
provider_name: str,
|
||||||
|
tracker_name: str,
|
||||||
|
collection_ids: list[str],
|
||||||
|
test_type: str,
|
||||||
|
tracking_config: TrackingConfig | None = None,
|
||||||
|
) -> ServiceEvent | None:
|
||||||
|
"""Build an Immich scheduled/memory event using shared core utilities."""
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from notify_bridge_core.providers.immich import ImmichServiceProvider
|
||||||
|
from notify_bridge_core.providers.immich.asset_utils import collect_scheduled_assets
|
||||||
|
from notify_bridge_core.providers.immich.models import ImmichAlbumData, SharedLinkInfo
|
||||||
|
|
||||||
|
ext_domain = provider_config.get("external_domain") or provider_config.get("url", "")
|
||||||
|
prefix = "memory" if test_type == "memory" else "scheduled"
|
||||||
|
limit = getattr(tracking_config, f"{prefix}_limit", 10) if tracking_config else 10
|
||||||
|
asset_type = getattr(tracking_config, f"{prefix}_asset_type", "all") if tracking_config else "all"
|
||||||
|
favorite_only = getattr(tracking_config, f"{prefix}_favorite_only", False) if tracking_config else False
|
||||||
|
min_rating = getattr(tracking_config, f"{prefix}_min_rating", 0) if tracking_config else 0
|
||||||
|
memory_source = getattr(tracking_config, "memory_source", "albums") if tracking_config else "albums"
|
||||||
|
is_memory = test_type == "memory"
|
||||||
|
|
||||||
|
async with aiohttp.ClientSession() as http_session:
|
||||||
|
immich = ImmichServiceProvider(
|
||||||
|
http_session,
|
||||||
|
provider_config.get("url", ""),
|
||||||
|
provider_config.get("api_key", ""),
|
||||||
|
provider_config.get("external_domain"),
|
||||||
|
provider_name,
|
||||||
|
)
|
||||||
|
if not await immich.connect():
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Native Immich memories API path
|
||||||
|
if is_memory and memory_source == "native":
|
||||||
|
return await _build_native_memory_event(
|
||||||
|
immich, ext_domain, provider_name, tracker_name,
|
||||||
|
collection_ids, limit, asset_type, favorite_only, min_rating,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Album-based path: use shared collect_scheduled_assets
|
||||||
|
albums: dict[str, ImmichAlbumData] = {}
|
||||||
|
shared_links: dict[str, list[SharedLinkInfo]] = {}
|
||||||
|
for album_id in collection_ids:
|
||||||
|
album = await immich.client.get_album(album_id)
|
||||||
|
if album:
|
||||||
|
albums[album_id] = album
|
||||||
|
shared_links[album_id] = await immich.client.get_shared_links(album_id)
|
||||||
|
|
||||||
|
assets, collections_extra = collect_scheduled_assets(
|
||||||
|
albums, shared_links, ext_domain,
|
||||||
|
limit=limit,
|
||||||
|
asset_type=asset_type,
|
||||||
|
favorite_only=favorite_only,
|
||||||
|
min_rating=min_rating,
|
||||||
|
is_memory=is_memory,
|
||||||
|
)
|
||||||
|
|
||||||
|
first_col = collections_extra[0] if collections_extra else {}
|
||||||
|
return ServiceEvent(
|
||||||
|
event_type=EventType.SCHEDULED_MESSAGE,
|
||||||
|
provider_type=ServiceProviderType.IMMICH,
|
||||||
|
provider_name=provider_name,
|
||||||
|
collection_id=collection_ids[0] if collection_ids else "",
|
||||||
|
collection_name=first_col.get("name", tracker_name),
|
||||||
|
timestamp=datetime.now(timezone.utc),
|
||||||
|
added_assets=assets,
|
||||||
|
added_count=len(assets),
|
||||||
|
extra={
|
||||||
|
"collections": collections_extra,
|
||||||
|
"albums": collections_extra,
|
||||||
|
**(first_col if first_col else {}),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _build_native_memory_event(
|
||||||
|
immich,
|
||||||
|
ext_domain: str,
|
||||||
|
provider_name: str,
|
||||||
|
tracker_name: str,
|
||||||
|
collection_ids: list[str],
|
||||||
|
limit: int,
|
||||||
|
asset_type: str,
|
||||||
|
favorite_only: bool,
|
||||||
|
min_rating: int,
|
||||||
|
) -> ServiceEvent | None:
|
||||||
|
"""Build event from Immich native memories API."""
|
||||||
|
import random
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from notify_bridge_core.models.media import MediaAsset, MediaType
|
||||||
|
from notify_bridge_core.providers.immich.asset_utils import filter_assets
|
||||||
|
from notify_bridge_core.providers.immich.models import ImmichAssetInfo
|
||||||
|
|
||||||
|
memories = await immich.client.get_memories()
|
||||||
|
tracked_ids = set(collection_ids) if collection_ids else None
|
||||||
|
|
||||||
|
# Collect raw assets, convert to ImmichAssetInfo for unified filtering
|
||||||
|
raw_assets: list[ImmichAssetInfo] = []
|
||||||
|
year_map: dict[str, int | None] = {} # asset_id → memory year
|
||||||
|
for mem in memories:
|
||||||
|
mem_year = mem.get("data", {}).get("year")
|
||||||
|
for raw in mem.get("assets", []):
|
||||||
|
asset_id = raw.get("id", "")
|
||||||
|
if tracked_ids:
|
||||||
|
asset_albums = raw.get("albums", [])
|
||||||
|
if not any(a.get("id") in tracked_ids for a in asset_albums):
|
||||||
|
continue
|
||||||
|
asset = ImmichAssetInfo.from_api_response(raw)
|
||||||
|
if not asset.is_processed:
|
||||||
|
continue
|
||||||
|
raw_assets.append(asset)
|
||||||
|
year_map[asset_id] = mem_year
|
||||||
|
|
||||||
|
# Apply standard filters (no memory_date — native API already filters by date)
|
||||||
|
filtered = filter_assets(
|
||||||
|
raw_assets,
|
||||||
|
favorite_only=favorite_only,
|
||||||
|
min_rating=min_rating,
|
||||||
|
asset_type=asset_type,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Random sample
|
||||||
|
if len(filtered) > limit:
|
||||||
|
selected = random.sample(filtered, limit)
|
||||||
|
else:
|
||||||
|
random.shuffle(filtered)
|
||||||
|
selected = filtered
|
||||||
|
|
||||||
|
from notify_bridge_core.providers.immich.asset_utils import asset_to_media
|
||||||
|
|
||||||
|
all_assets = []
|
||||||
|
for asset in selected:
|
||||||
|
media = asset_to_media(asset, ext_domain)
|
||||||
|
media.extra["year"] = year_map.get(asset.id)
|
||||||
|
all_assets.append(media)
|
||||||
|
|
||||||
|
return ServiceEvent(
|
||||||
|
event_type=EventType.SCHEDULED_MESSAGE,
|
||||||
|
provider_type=ServiceProviderType.IMMICH,
|
||||||
|
provider_name=provider_name,
|
||||||
|
collection_id=collection_ids[0] if collection_ids else "",
|
||||||
|
collection_name=tracker_name,
|
||||||
|
timestamp=datetime.now(timezone.utc),
|
||||||
|
added_assets=all_assets,
|
||||||
|
added_count=len(all_assets),
|
||||||
|
extra={
|
||||||
|
"collections": [],
|
||||||
|
"albums": [],
|
||||||
|
},
|
||||||
|
)
|
||||||
@@ -2,7 +2,6 @@
|
|||||||
# Restart the backend dev server.
|
# Restart the backend dev server.
|
||||||
# Usage: bash scripts/restart-backend.sh
|
# Usage: bash scripts/restart-backend.sh
|
||||||
|
|
||||||
set -e
|
|
||||||
cd "$(dirname "$0")/.."
|
cd "$(dirname "$0")/.."
|
||||||
|
|
||||||
# Kill existing backend
|
# Kill existing backend
|
||||||
@@ -12,12 +11,28 @@ if [ -n "$PID" ]; then
|
|||||||
sleep 1
|
sleep 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
# Resolve python — py launcher needs absolute path for nohup on Windows
|
||||||
|
if command -v python3 &>/dev/null; then
|
||||||
|
PYTHON=python3
|
||||||
|
elif command -v python &>/dev/null; then
|
||||||
|
PYTHON=python
|
||||||
|
else
|
||||||
|
PYTHON=$(py -3.13 -c "import sys; print(sys.executable)" 2>/dev/null \
|
||||||
|
|| py -3 -c "import sys; print(sys.executable)")
|
||||||
|
fi
|
||||||
|
|
||||||
# Start backend
|
# Start backend
|
||||||
NOTIFY_BRIDGE_DATA_DIR=./test-data \
|
export NOTIFY_BRIDGE_DATA_DIR=./test-data
|
||||||
NOTIFY_BRIDGE_SECRET_KEY=test-secret-key-minimum-32chars \
|
export NOTIFY_BRIDGE_SECRET_KEY=test-secret-key-minimum-32chars
|
||||||
PYTHON=$(command -v python3 2>/dev/null || command -v python 2>/dev/null || echo "py -3.13")
|
|
||||||
nohup "$PYTHON" -m uvicorn notify_bridge_server.main:app \
|
nohup "$PYTHON" -m uvicorn notify_bridge_server.main:app \
|
||||||
--host 0.0.0.0 --port 8420 > /dev/null 2>&1 &
|
--host 0.0.0.0 --port 8420 > .backend.log 2>&1 &
|
||||||
|
|
||||||
sleep 3
|
sleep 3
|
||||||
curl -s http://localhost:8420/api/health
|
if curl -sf http://localhost:8420/api/health; then
|
||||||
|
echo ""
|
||||||
|
echo "Backend started (PID $!)"
|
||||||
|
else
|
||||||
|
echo "Backend failed to start. Log:"
|
||||||
|
tail -20 .backend.log
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|||||||
Reference in New Issue
Block a user