refactor: unify test dispatch with real NotificationDispatcher

- Route scheduled/memory test sends through the same NotificationDispatcher
  the watcher uses — identical template rendering, media handling, caching
- Add preview_url field to MediaAsset (transcoded mid-size), separate from
  thumbnail_url (small) and full_url (original). Dispatcher prefers preview_url
- Fix sendMediaGroup cache: extract file_ids from Telegram response and store
  via async_set_many so repeat sends use cached file_ids
- Parallelize asset downloads in _send_media_group with asyncio.gather
- Filter unprocessed assets (archived/trashed/offline/no-thumbhash) at album
  parse time in ImmichAlbumData.from_api_response
- Extract shared asset_to_media + collect_scheduled_assets into asset_utils.py
  (single source for test dispatch and future real scheduler)
- Respect tracking config filters: limit, asset_type, favorite_only, min_rating
- Random asset sampling for scheduled sends
- Memory mode: "On This Day" date filter (same month+day, previous year)
- Skip dispatch when no matching assets found
- Remove ~250 lines of duplicated send logic from notifier.py
- Fix restart-backend.sh: proper env var export, Python path resolution, error log
This commit is contained in:
2026-03-24 19:32:40 +03:00
parent 1a8c95e942
commit d4cb388c74
12 changed files with 746 additions and 370 deletions
@@ -33,6 +33,7 @@ class MediaAsset:
description: str | None = None
tags: list[str] = field(default_factory=list)
thumbnail_url: str | None = None
preview_url: str | None = None # transcoded/mid-size — used for sending media
full_url: str | None = None
# Provider-specific extras (e.g., rating, location, people for Immich)
@@ -156,7 +156,7 @@ class NotificationDispatcher:
provider_urls = [u for u in (internal_url, external_url) if u]
assets = []
for asset in event.added_assets[:max_media]:
url = asset.full_url or asset.thumbnail_url
url = asset.preview_url or asset.thumbnail_url or asset.full_url
if url:
# Rewrite external URL to internal for faster LAN fetching
if internal_url and external_url and url.startswith(external_url):
@@ -430,15 +430,20 @@ class TelegramClient:
media_json = []
upload_idx = 0
# Track cache info per media_json entry (in order) so we can map
# Telegram response items back to cache keys for newly uploaded items.
# None = already cached (no need to store), tuple = needs caching.
media_cache_info: list[tuple[str, str, str | None] | None] = []
for i, item in enumerate(chunk):
# Resolve cache hits and collect download tasks in parallel
async def _fetch_asset(idx: int, item: dict) -> tuple[int, dict | None, bytes | None]:
"""Return (index, cache_entry_or_None, downloaded_bytes_or_None)."""
url = item.get("url")
if not url:
continue
return idx, None, None
media_type = item.get("type", "photo")
custom_cache_key = item.get("cache_key")
# Check cache
ck = custom_cache_key or extract_asset_id_from_url(url) or url
ck_is_asset = is_asset_cache_key(ck)
item_cache = self._get_cache_for_key(ck, ck_is_asset)
@@ -447,34 +452,59 @@ class TelegramClient:
cached = item_cache.get(ck, thumbhash=item_thumbhash) if item_cache else None
if cached and cached.get("file_id"):
mij: dict[str, Any] = {"type": media_type, "media": cached["file_id"]}
return idx, cached, None
try:
download_url = self._resolve_url(url)
dl_headers = item.get("headers") or {}
async with self._session.get(download_url, headers=dl_headers) as resp:
if resp.status != 200:
return idx, None, None
data = await resp.read()
if max_asset_data_size and len(data) > max_asset_data_size:
return idx, None, None
if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE:
return idx, None, None
if media_type == "photo":
exceeds, _, _, _ = check_photo_limits(data)
if exceeds:
return idx, None, None
return idx, None, data
except aiohttp.ClientError:
return idx, None, None
results = await asyncio.gather(
*(_fetch_asset(i, item) for i, item in enumerate(chunk))
)
for idx, cached_entry, data in results:
item = chunk[idx]
url = item.get("url")
if not url:
continue
media_type = item.get("type", "photo")
custom_cache_key = item.get("cache_key")
if cached_entry and cached_entry.get("file_id"):
mij: dict[str, Any] = {"type": media_type, "media": cached_entry["file_id"]}
media_cache_info.append(None) # already cached
elif data is not None:
attach_name = f"file{upload_idx}"
ct = item.get("content_type") or ("image/jpeg" if media_type == "photo" else "video/mp4")
ext = "jpg" if media_type == "photo" else "mp4"
form.add_field(attach_name, data, filename=f"media_{idx}.{ext}", content_type=ct)
mij = {"type": media_type, "media": f"attach://{attach_name}"}
upload_idx += 1
# Record cache key so we can store file_id from response
ck = custom_cache_key or extract_asset_id_from_url(url) or url
ck_is_asset = is_asset_cache_key(ck)
bare_ck = asset_id_from_cache_key(ck) if ck_is_asset else ck
th = self._thumbhash_resolver(bare_ck) if ck_is_asset and self._thumbhash_resolver else None
media_cache_info.append((ck, media_type, th))
else:
try:
download_url = self._resolve_url(url)
dl_headers = item.get("headers") or {}
async with self._session.get(download_url, headers=dl_headers) as resp:
if resp.status != 200:
continue
data = await resp.read()
if max_asset_data_size and len(data) > max_asset_data_size:
continue
if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE:
continue
if media_type == "photo":
exceeds, _, _, _ = check_photo_limits(data)
if exceeds:
continue
continue
attach_name = f"file{upload_idx}"
ct = item.get("content_type") or ("image/jpeg" if media_type == "photo" else "video/mp4")
ext = "jpg" if media_type == "photo" else "mp4"
form.add_field(attach_name, data, filename=f"media_{i}.{ext}", content_type=ct)
mij = {"type": media_type, "media": f"attach://{attach_name}"}
upload_idx += 1
except aiohttp.ClientError:
continue
if i == 0 and chunk_idx == 0 and caption:
if idx == 0 and chunk_idx == 0 and caption:
mij["caption"] = caption
mij["parse_mode"] = parse_mode
media_json.append(mij)
@@ -489,7 +519,32 @@ class TelegramClient:
async with self._session.post(telegram_url, data=form) as response:
result = await response.json()
if response.status == 200 and result.get("ok"):
all_message_ids.extend(msg.get("message_id") for msg in result.get("result", []))
result_msgs = result.get("result", [])
all_message_ids.extend(msg.get("message_id") for msg in result_msgs)
# Cache file_ids from response — map by position
cache_entries: list[tuple[str, str, str, str | None]] = []
for i, msg in enumerate(result_msgs):
if i >= len(media_cache_info):
break
info = media_cache_info[i]
if info is None:
continue # was a cache hit, skip
ck, mt, th = info
file_id = None
if msg.get("photo"):
file_id = msg["photo"][-1].get("file_id")
elif msg.get("video"):
file_id = msg["video"].get("file_id")
elif msg.get("document"):
file_id = msg["document"].get("file_id")
if file_id:
cache_entries.append((ck, file_id, mt, th))
if cache_entries:
# All entries in a chunk share the same cache backend
eff_cache = self._get_cache_for_key(cache_entries[0][0], is_asset_cache_key(cache_entries[0][0]))
if eff_cache:
await eff_cache.async_set_many(cache_entries)
else:
return {"success": False, "error": result.get("description", "Unknown"), "failed_at_chunk": chunk_idx + 1}
except aiohttp.ClientError as err:
@@ -0,0 +1,184 @@
"""Google Photos album change detection — produces generic ServiceEvent objects."""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from notify_bridge_core.models.events import EventType, ServiceEvent
from notify_bridge_core.models.media import MediaAsset, MediaType
from notify_bridge_core.providers.base import ServiceProviderType
from .constants import CACHE_HOST, PHOTO_DOWNLOAD_SUFFIX, PHOTO_THUMBNAIL_SUFFIX, VIDEO_DOWNLOAD_SUFFIX
from .models import GooglePhotosAlbumData, GooglePhotosMediaItem
_LOGGER = logging.getLogger(__name__)
def _asset_to_media(item: GooglePhotosMediaItem) -> MediaAsset:
"""Convert GooglePhotosMediaItem to generic MediaAsset."""
media_type = MediaType.VIDEO if item.is_video else MediaType.IMAGE
try:
created_at = datetime.fromisoformat(item.created_at.replace("Z", "+00:00"))
except (ValueError, AttributeError):
created_at = datetime.now(timezone.utc)
if item.is_video:
full_url = f"{item.base_url}{VIDEO_DOWNLOAD_SUFFIX}" if item.base_url else ""
thumbnail_url = f"{item.base_url}{PHOTO_THUMBNAIL_SUFFIX}" if item.base_url else ""
else:
full_url = f"{item.base_url}{PHOTO_DOWNLOAD_SUFFIX}" if item.base_url else ""
thumbnail_url = f"{item.base_url}{PHOTO_THUMBNAIL_SUFFIX}" if item.base_url else ""
return MediaAsset(
id=item.id,
type=media_type,
filename=item.filename,
created_at=created_at,
description=item.description or None,
thumbnail_url=thumbnail_url,
preview_url=full_url, # Google Photos: transcoded via size suffix, same as full
full_url=full_url,
extra={
"google_photos_media_id": item.id,
"product_url": item.product_url,
"mime_type": item.mime_type,
"width": item.width,
"height": item.height,
"cache_key": f"{CACHE_HOST}:{item.id}",
},
)
def _make_base_extra(album: GooglePhotosAlbumData) -> dict:
"""Build common extra dict for album events."""
return {
"album_url": album.product_url,
"shared": album.is_shared,
"photo_count": album.photo_count,
"video_count": album.video_count,
"asset_count": album.asset_count,
}
def detect_album_changes(
old_album: GooglePhotosAlbumData,
new_album: GooglePhotosAlbumData,
pending_asset_ids: set[str],
provider_name: str,
) -> tuple[list[ServiceEvent], set[str]]:
"""Detect changes between two album states, producing generic ServiceEvents.
Args:
old_album: Previous album data.
new_album: Current album data.
pending_asset_ids: Assets detected but not yet fully processed.
provider_name: Name of the provider instance.
Returns:
Tuple of (list of ServiceEvents, updated pending_asset_ids).
"""
added_ids = new_album.asset_ids - old_album.asset_ids
removed_ids = old_album.asset_ids - new_album.asset_ids
pending = set(pending_asset_ids)
# Collect newly added processed assets
added_assets: list[GooglePhotosMediaItem] = []
for aid in added_ids:
if aid not in new_album.assets:
continue
asset = new_album.assets[aid]
if asset.is_processed:
added_assets.append(asset)
else:
pending.add(aid)
# Check if pending assets are now processed
for aid in list(pending):
if aid not in new_album.assets:
pending.discard(aid)
continue
asset = new_album.assets[aid]
if asset.is_processed:
added_assets.append(asset)
pending.discard(aid)
# Detect metadata changes
name_changed = old_album.name != new_album.name
sharing_changed = old_album.is_shared != new_album.is_shared
if not added_assets and not removed_ids and not name_changed and not sharing_changed:
return [], pending
now = datetime.now(timezone.utc)
extra = _make_base_extra(new_album)
events: list[ServiceEvent] = []
if added_assets:
media_assets = [_asset_to_media(a) for a in added_assets]
events.append(ServiceEvent(
event_type=EventType.ASSETS_ADDED,
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
provider_name=provider_name,
collection_id=new_album.id,
collection_name=new_album.name,
timestamp=now,
added_assets=media_assets,
removed_asset_ids=[],
added_count=len(added_assets),
removed_count=0,
extra=dict(extra),
))
if removed_ids:
events.append(ServiceEvent(
event_type=EventType.ASSETS_REMOVED,
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
provider_name=provider_name,
collection_id=new_album.id,
collection_name=new_album.name,
timestamp=now,
added_assets=[],
removed_asset_ids=list(removed_ids),
added_count=0,
removed_count=len(removed_ids),
extra=dict(extra),
))
if name_changed:
events.append(ServiceEvent(
event_type=EventType.COLLECTION_RENAMED,
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
provider_name=provider_name,
collection_id=new_album.id,
collection_name=new_album.name,
timestamp=now,
added_assets=[],
removed_asset_ids=[],
added_count=0,
removed_count=0,
old_name=old_album.name,
new_name=new_album.name,
extra=dict(extra),
))
if sharing_changed:
events.append(ServiceEvent(
event_type=EventType.SHARING_CHANGED,
provider_type=ServiceProviderType.GOOGLE_PHOTOS,
provider_name=provider_name,
collection_id=new_album.id,
collection_name=new_album.name,
timestamp=now,
added_assets=[],
removed_asset_ids=[],
added_count=0,
removed_count=0,
old_shared=old_album.is_shared,
new_shared=new_album.is_shared,
extra=dict(extra),
))
return events, pending
@@ -4,11 +4,13 @@ from __future__ import annotations
import logging
import random
from datetime import datetime
from datetime import datetime, timezone
from typing import Any
from notify_bridge_core.models.media import MediaAsset, MediaType
from .constants import ASSET_TYPE_IMAGE, ASSET_TYPE_VIDEO
from .models import ImmichAssetInfo, SharedLinkInfo
from .models import ImmichAlbumData, ImmichAssetInfo, SharedLinkInfo
_LOGGER = logging.getLogger(__name__)
@@ -231,3 +233,123 @@ def build_asset_detail(
)
return detail
def asset_to_media(asset: ImmichAssetInfo, external_url: str) -> MediaAsset:
"""Convert ImmichAssetInfo to generic MediaAsset."""
media_type = MediaType.IMAGE if asset.type == ASSET_TYPE_IMAGE else MediaType.VIDEO
try:
created_at = datetime.fromisoformat(asset.created_at.replace("Z", "+00:00"))
except (ValueError, AttributeError):
created_at = datetime.now(timezone.utc)
return MediaAsset(
id=asset.id,
type=media_type,
filename=asset.filename,
created_at=created_at,
owner_name=asset.owner_name or None,
description=asset.description or None,
tags=list(asset.people),
thumbnail_url=f"{external_url}/api/assets/{asset.id}/thumbnail",
preview_url=f"{external_url}/api/assets/{asset.id}/thumbnail?size=preview",
full_url=f"{external_url}/api/assets/{asset.id}/original",
extra={
"owner_id": asset.owner_id,
"is_favorite": asset.is_favorite,
"rating": asset.rating,
"latitude": asset.latitude,
"longitude": asset.longitude,
"city": asset.city,
"state": asset.state,
"country": asset.country,
"thumbhash": asset.thumbhash,
"file_size": asset.file_size,
},
)
def collect_scheduled_assets(
albums: dict[str, "ImmichAlbumData"],
shared_links: dict[str, list[SharedLinkInfo]],
external_url: str,
*,
limit: int = 10,
asset_type: str = "all",
favorite_only: bool = False,
min_rating: int = 0,
is_memory: bool = False,
) -> tuple[list[MediaAsset], list[dict[str, Any]]]:
"""Collect and filter assets for scheduled/memory sends.
This is the SINGLE function used by both test dispatch and the real
scheduled runner. Assets are filtered, randomly sampled, and converted
to MediaAsset objects.
Args:
albums: Album ID → ImmichAlbumData mapping.
shared_links: Album ID → shared links mapping.
external_url: External domain for URL construction.
limit: Max number of assets to return.
asset_type: "all", "photo", or "video".
favorite_only: Only include favorites.
min_rating: Minimum rating filter.
is_memory: If True, filter to "On This Day" (same month+day, previous year).
Returns:
(list of MediaAsset, list of collection info dicts)
"""
now = datetime.now(timezone.utc)
memory_date = now.isoformat() if is_memory else None
all_eligible: list[ImmichAssetInfo] = []
# Track which album each asset belongs to for public URL construction
asset_album_map: dict[str, tuple[str, str]] = {} # asset_id → (album_id, public_url)
collections_extra: list[dict[str, Any]] = []
for album_id, album in albums.items():
links = shared_links.get(album_id, [])
album_public_url = get_public_url(external_url, links) or ""
collections_extra.append({
"name": album.name,
"url": album_public_url or f"{external_url}/albums/{album_id}",
"public_url": album_public_url,
"asset_count": album.asset_count,
"shared": album.shared,
"photo_count": album.photo_count,
"video_count": album.video_count,
"owner": album.owner,
})
filtered = filter_assets(
list(album.assets.values()),
favorite_only=favorite_only,
min_rating=min_rating,
asset_type=asset_type,
memory_date=memory_date,
)
for asset in filtered:
if asset.id not in asset_album_map:
asset_album_map[asset.id] = (album_id, album_public_url)
all_eligible.append(asset)
# Random sample
if len(all_eligible) > limit:
selected = random.sample(all_eligible, limit)
else:
random.shuffle(all_eligible)
selected = all_eligible
# Convert to MediaAsset with public URLs
result: list[MediaAsset] = []
for asset in selected:
media = asset_to_media(asset, external_url)
_, album_pub_url = asset_album_map.get(asset.id, ("", ""))
if album_pub_url:
media.extra["public_url"] = f"{album_pub_url}/photos/{asset.id}"
else:
media.extra.setdefault("public_url", "")
result.append(media)
return result, collections_extra
@@ -6,48 +6,14 @@ import logging
from datetime import datetime, timezone
from notify_bridge_core.models.events import EventType, ServiceEvent
from notify_bridge_core.models.media import MediaAsset, MediaType
from notify_bridge_core.providers.base import ServiceProviderType
from .asset_utils import asset_to_media
from .models import ImmichAlbumData, ImmichAssetInfo
_LOGGER = logging.getLogger(__name__)
def _asset_to_media(asset: ImmichAssetInfo, external_url: str) -> MediaAsset:
"""Convert ImmichAssetInfo to generic MediaAsset."""
media_type = MediaType.IMAGE if asset.type == "IMAGE" else MediaType.VIDEO
try:
created_at = datetime.fromisoformat(asset.created_at.replace("Z", "+00:00"))
except (ValueError, AttributeError):
created_at = datetime.now(timezone.utc)
return MediaAsset(
id=asset.id,
type=media_type,
filename=asset.filename,
created_at=created_at,
owner_name=asset.owner_name or None,
description=asset.description or None,
tags=list(asset.people),
thumbnail_url=f"{external_url}/api/assets/{asset.id}/thumbnail",
full_url=f"{external_url}/api/assets/{asset.id}/original",
extra={
"owner_id": asset.owner_id,
"is_favorite": asset.is_favorite,
"rating": asset.rating,
"latitude": asset.latitude,
"longitude": asset.longitude,
"city": asset.city,
"state": asset.state,
"country": asset.country,
"thumbhash": asset.thumbhash,
"file_size": asset.file_size,
},
)
def _make_base_extra(new_album: ImmichAlbumData, external_url: str) -> dict:
"""Build the common extra dict for album events."""
return {
@@ -119,7 +85,7 @@ def detect_album_changes(
# Emit one event per change type detected
if added_assets:
media_assets = [_asset_to_media(a, external_url) for a in added_assets]
media_assets = [asset_to_media(a, external_url) for a in added_assets]
events.append(ServiceEvent(
event_type=EventType.ASSETS_ADDED,
provider_type=ServiceProviderType.IMMICH,
@@ -177,6 +177,8 @@ class ImmichAlbumData:
for asset_data in assets_data:
asset = ImmichAssetInfo.from_api_response(asset_data, users_cache)
if not asset.is_processed:
continue
asset_ids.add(asset.id)
assets[asset.id] = asset
people.update(asset.people)
@@ -58,6 +58,7 @@ def build_template_context(
"description": asset.description or "",
"tags": asset.tags,
"thumbnail_url": asset.thumbnail_url or "",
"preview_url": asset.preview_url or "",
"full_url": asset.full_url or "",
}
# Flatten extras into asset dict for template access