From d4cb388c74d2e19913f292e31c5cf3831114cd91 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Tue, 24 Mar 2026 19:32:40 +0300 Subject: [PATCH] refactor: unify test dispatch with real NotificationDispatcher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Route scheduled/memory test sends through the same NotificationDispatcher the watcher uses — identical template rendering, media handling, caching - Add preview_url field to MediaAsset (transcoded mid-size), separate from thumbnail_url (small) and full_url (original). Dispatcher prefers preview_url - Fix sendMediaGroup cache: extract file_ids from Telegram response and store via async_set_many so repeat sends use cached file_ids - Parallelize asset downloads in _send_media_group with asyncio.gather - Filter unprocessed assets (archived/trashed/offline/no-thumbhash) at album parse time in ImmichAlbumData.from_api_response - Extract shared asset_to_media + collect_scheduled_assets into asset_utils.py (single source for test dispatch and future real scheduler) - Respect tracking config filters: limit, asset_type, favorite_only, min_rating - Random asset sampling for scheduled sends - Memory mode: "On This Day" date filter (same month+day, previous year) - Skip dispatch when no matching assets found - Remove ~250 lines of duplicated send logic from notifier.py - Fix restart-backend.sh: proper env var export, Python path resolution, error log --- .../src/notify_bridge_core/models/media.py | 1 + .../notifications/dispatcher.py | 2 +- .../notifications/telegram/client.py | 115 +++++-- .../google_photos/change_detector.py | 184 ++++++++++ .../providers/immich/asset_utils.py | 126 ++++++- .../providers/immich/change_detector.py | 38 +-- .../providers/immich/models.py | 2 + .../notify_bridge_core/templates/context.py | 1 + .../api/notification_tracker_targets.py | 63 +--- .../notify_bridge_server/services/notifier.py | 243 +------------- .../services/test_dispatch.py | 314 ++++++++++++++++++ scripts/restart-backend.sh | 27 +- 12 files changed, 746 insertions(+), 370 deletions(-) create mode 100644 packages/core/src/notify_bridge_core/providers/google_photos/change_detector.py create mode 100644 packages/server/src/notify_bridge_server/services/test_dispatch.py diff --git a/packages/core/src/notify_bridge_core/models/media.py b/packages/core/src/notify_bridge_core/models/media.py index fdbedac..10d29ae 100644 --- a/packages/core/src/notify_bridge_core/models/media.py +++ b/packages/core/src/notify_bridge_core/models/media.py @@ -33,6 +33,7 @@ class MediaAsset: description: str | None = None tags: list[str] = field(default_factory=list) thumbnail_url: str | None = None + preview_url: str | None = None # transcoded/mid-size — used for sending media full_url: str | None = None # Provider-specific extras (e.g., rating, location, people for Immich) diff --git a/packages/core/src/notify_bridge_core/notifications/dispatcher.py b/packages/core/src/notify_bridge_core/notifications/dispatcher.py index 8b1ac71..aa1cf56 100644 --- a/packages/core/src/notify_bridge_core/notifications/dispatcher.py +++ b/packages/core/src/notify_bridge_core/notifications/dispatcher.py @@ -156,7 +156,7 @@ class NotificationDispatcher: provider_urls = [u for u in (internal_url, external_url) if u] assets = [] for asset in event.added_assets[:max_media]: - url = asset.full_url or asset.thumbnail_url + url = asset.preview_url or asset.thumbnail_url or asset.full_url if url: # Rewrite external URL to internal for faster LAN fetching if internal_url and external_url and url.startswith(external_url): diff --git a/packages/core/src/notify_bridge_core/notifications/telegram/client.py b/packages/core/src/notify_bridge_core/notifications/telegram/client.py index 1ddb5a5..2a0f3dd 100644 --- a/packages/core/src/notify_bridge_core/notifications/telegram/client.py +++ b/packages/core/src/notify_bridge_core/notifications/telegram/client.py @@ -430,15 +430,20 @@ class TelegramClient: media_json = [] upload_idx = 0 + # Track cache info per media_json entry (in order) so we can map + # Telegram response items back to cache keys for newly uploaded items. + # None = already cached (no need to store), tuple = needs caching. + media_cache_info: list[tuple[str, str, str | None] | None] = [] - for i, item in enumerate(chunk): + # Resolve cache hits and collect download tasks in parallel + async def _fetch_asset(idx: int, item: dict) -> tuple[int, dict | None, bytes | None]: + """Return (index, cache_entry_or_None, downloaded_bytes_or_None).""" url = item.get("url") if not url: - continue + return idx, None, None media_type = item.get("type", "photo") custom_cache_key = item.get("cache_key") - # Check cache ck = custom_cache_key or extract_asset_id_from_url(url) or url ck_is_asset = is_asset_cache_key(ck) item_cache = self._get_cache_for_key(ck, ck_is_asset) @@ -447,34 +452,59 @@ class TelegramClient: cached = item_cache.get(ck, thumbhash=item_thumbhash) if item_cache else None if cached and cached.get("file_id"): - mij: dict[str, Any] = {"type": media_type, "media": cached["file_id"]} + return idx, cached, None + + try: + download_url = self._resolve_url(url) + dl_headers = item.get("headers") or {} + async with self._session.get(download_url, headers=dl_headers) as resp: + if resp.status != 200: + return idx, None, None + data = await resp.read() + if max_asset_data_size and len(data) > max_asset_data_size: + return idx, None, None + if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE: + return idx, None, None + if media_type == "photo": + exceeds, _, _, _ = check_photo_limits(data) + if exceeds: + return idx, None, None + return idx, None, data + except aiohttp.ClientError: + return idx, None, None + + results = await asyncio.gather( + *(_fetch_asset(i, item) for i, item in enumerate(chunk)) + ) + + for idx, cached_entry, data in results: + item = chunk[idx] + url = item.get("url") + if not url: + continue + media_type = item.get("type", "photo") + custom_cache_key = item.get("cache_key") + + if cached_entry and cached_entry.get("file_id"): + mij: dict[str, Any] = {"type": media_type, "media": cached_entry["file_id"]} + media_cache_info.append(None) # already cached + elif data is not None: + attach_name = f"file{upload_idx}" + ct = item.get("content_type") or ("image/jpeg" if media_type == "photo" else "video/mp4") + ext = "jpg" if media_type == "photo" else "mp4" + form.add_field(attach_name, data, filename=f"media_{idx}.{ext}", content_type=ct) + mij = {"type": media_type, "media": f"attach://{attach_name}"} + upload_idx += 1 + # Record cache key so we can store file_id from response + ck = custom_cache_key or extract_asset_id_from_url(url) or url + ck_is_asset = is_asset_cache_key(ck) + bare_ck = asset_id_from_cache_key(ck) if ck_is_asset else ck + th = self._thumbhash_resolver(bare_ck) if ck_is_asset and self._thumbhash_resolver else None + media_cache_info.append((ck, media_type, th)) else: - try: - download_url = self._resolve_url(url) - dl_headers = item.get("headers") or {} - async with self._session.get(download_url, headers=dl_headers) as resp: - if resp.status != 200: - continue - data = await resp.read() - if max_asset_data_size and len(data) > max_asset_data_size: - continue - if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE: - continue - if media_type == "photo": - exceeds, _, _, _ = check_photo_limits(data) - if exceeds: - continue + continue - attach_name = f"file{upload_idx}" - ct = item.get("content_type") or ("image/jpeg" if media_type == "photo" else "video/mp4") - ext = "jpg" if media_type == "photo" else "mp4" - form.add_field(attach_name, data, filename=f"media_{i}.{ext}", content_type=ct) - mij = {"type": media_type, "media": f"attach://{attach_name}"} - upload_idx += 1 - except aiohttp.ClientError: - continue - - if i == 0 and chunk_idx == 0 and caption: + if idx == 0 and chunk_idx == 0 and caption: mij["caption"] = caption mij["parse_mode"] = parse_mode media_json.append(mij) @@ -489,7 +519,32 @@ class TelegramClient: async with self._session.post(telegram_url, data=form) as response: result = await response.json() if response.status == 200 and result.get("ok"): - all_message_ids.extend(msg.get("message_id") for msg in result.get("result", [])) + result_msgs = result.get("result", []) + all_message_ids.extend(msg.get("message_id") for msg in result_msgs) + + # Cache file_ids from response — map by position + cache_entries: list[tuple[str, str, str, str | None]] = [] + for i, msg in enumerate(result_msgs): + if i >= len(media_cache_info): + break + info = media_cache_info[i] + if info is None: + continue # was a cache hit, skip + ck, mt, th = info + file_id = None + if msg.get("photo"): + file_id = msg["photo"][-1].get("file_id") + elif msg.get("video"): + file_id = msg["video"].get("file_id") + elif msg.get("document"): + file_id = msg["document"].get("file_id") + if file_id: + cache_entries.append((ck, file_id, mt, th)) + if cache_entries: + # All entries in a chunk share the same cache backend + eff_cache = self._get_cache_for_key(cache_entries[0][0], is_asset_cache_key(cache_entries[0][0])) + if eff_cache: + await eff_cache.async_set_many(cache_entries) else: return {"success": False, "error": result.get("description", "Unknown"), "failed_at_chunk": chunk_idx + 1} except aiohttp.ClientError as err: diff --git a/packages/core/src/notify_bridge_core/providers/google_photos/change_detector.py b/packages/core/src/notify_bridge_core/providers/google_photos/change_detector.py new file mode 100644 index 0000000..3b4d500 --- /dev/null +++ b/packages/core/src/notify_bridge_core/providers/google_photos/change_detector.py @@ -0,0 +1,184 @@ +"""Google Photos album change detection — produces generic ServiceEvent objects.""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone + +from notify_bridge_core.models.events import EventType, ServiceEvent +from notify_bridge_core.models.media import MediaAsset, MediaType +from notify_bridge_core.providers.base import ServiceProviderType + +from .constants import CACHE_HOST, PHOTO_DOWNLOAD_SUFFIX, PHOTO_THUMBNAIL_SUFFIX, VIDEO_DOWNLOAD_SUFFIX +from .models import GooglePhotosAlbumData, GooglePhotosMediaItem + +_LOGGER = logging.getLogger(__name__) + + +def _asset_to_media(item: GooglePhotosMediaItem) -> MediaAsset: + """Convert GooglePhotosMediaItem to generic MediaAsset.""" + media_type = MediaType.VIDEO if item.is_video else MediaType.IMAGE + + try: + created_at = datetime.fromisoformat(item.created_at.replace("Z", "+00:00")) + except (ValueError, AttributeError): + created_at = datetime.now(timezone.utc) + + if item.is_video: + full_url = f"{item.base_url}{VIDEO_DOWNLOAD_SUFFIX}" if item.base_url else "" + thumbnail_url = f"{item.base_url}{PHOTO_THUMBNAIL_SUFFIX}" if item.base_url else "" + else: + full_url = f"{item.base_url}{PHOTO_DOWNLOAD_SUFFIX}" if item.base_url else "" + thumbnail_url = f"{item.base_url}{PHOTO_THUMBNAIL_SUFFIX}" if item.base_url else "" + + return MediaAsset( + id=item.id, + type=media_type, + filename=item.filename, + created_at=created_at, + description=item.description or None, + thumbnail_url=thumbnail_url, + preview_url=full_url, # Google Photos: transcoded via size suffix, same as full + full_url=full_url, + extra={ + "google_photos_media_id": item.id, + "product_url": item.product_url, + "mime_type": item.mime_type, + "width": item.width, + "height": item.height, + "cache_key": f"{CACHE_HOST}:{item.id}", + }, + ) + + +def _make_base_extra(album: GooglePhotosAlbumData) -> dict: + """Build common extra dict for album events.""" + return { + "album_url": album.product_url, + "shared": album.is_shared, + "photo_count": album.photo_count, + "video_count": album.video_count, + "asset_count": album.asset_count, + } + + +def detect_album_changes( + old_album: GooglePhotosAlbumData, + new_album: GooglePhotosAlbumData, + pending_asset_ids: set[str], + provider_name: str, +) -> tuple[list[ServiceEvent], set[str]]: + """Detect changes between two album states, producing generic ServiceEvents. + + Args: + old_album: Previous album data. + new_album: Current album data. + pending_asset_ids: Assets detected but not yet fully processed. + provider_name: Name of the provider instance. + + Returns: + Tuple of (list of ServiceEvents, updated pending_asset_ids). + """ + added_ids = new_album.asset_ids - old_album.asset_ids + removed_ids = old_album.asset_ids - new_album.asset_ids + + pending = set(pending_asset_ids) + + # Collect newly added processed assets + added_assets: list[GooglePhotosMediaItem] = [] + for aid in added_ids: + if aid not in new_album.assets: + continue + asset = new_album.assets[aid] + if asset.is_processed: + added_assets.append(asset) + else: + pending.add(aid) + + # Check if pending assets are now processed + for aid in list(pending): + if aid not in new_album.assets: + pending.discard(aid) + continue + asset = new_album.assets[aid] + if asset.is_processed: + added_assets.append(asset) + pending.discard(aid) + + # Detect metadata changes + name_changed = old_album.name != new_album.name + sharing_changed = old_album.is_shared != new_album.is_shared + + if not added_assets and not removed_ids and not name_changed and not sharing_changed: + return [], pending + + now = datetime.now(timezone.utc) + extra = _make_base_extra(new_album) + events: list[ServiceEvent] = [] + + if added_assets: + media_assets = [_asset_to_media(a) for a in added_assets] + events.append(ServiceEvent( + event_type=EventType.ASSETS_ADDED, + provider_type=ServiceProviderType.GOOGLE_PHOTOS, + provider_name=provider_name, + collection_id=new_album.id, + collection_name=new_album.name, + timestamp=now, + added_assets=media_assets, + removed_asset_ids=[], + added_count=len(added_assets), + removed_count=0, + extra=dict(extra), + )) + + if removed_ids: + events.append(ServiceEvent( + event_type=EventType.ASSETS_REMOVED, + provider_type=ServiceProviderType.GOOGLE_PHOTOS, + provider_name=provider_name, + collection_id=new_album.id, + collection_name=new_album.name, + timestamp=now, + added_assets=[], + removed_asset_ids=list(removed_ids), + added_count=0, + removed_count=len(removed_ids), + extra=dict(extra), + )) + + if name_changed: + events.append(ServiceEvent( + event_type=EventType.COLLECTION_RENAMED, + provider_type=ServiceProviderType.GOOGLE_PHOTOS, + provider_name=provider_name, + collection_id=new_album.id, + collection_name=new_album.name, + timestamp=now, + added_assets=[], + removed_asset_ids=[], + added_count=0, + removed_count=0, + old_name=old_album.name, + new_name=new_album.name, + extra=dict(extra), + )) + + if sharing_changed: + events.append(ServiceEvent( + event_type=EventType.SHARING_CHANGED, + provider_type=ServiceProviderType.GOOGLE_PHOTOS, + provider_name=provider_name, + collection_id=new_album.id, + collection_name=new_album.name, + timestamp=now, + added_assets=[], + removed_asset_ids=[], + added_count=0, + removed_count=0, + old_shared=old_album.is_shared, + new_shared=new_album.is_shared, + extra=dict(extra), + )) + + return events, pending diff --git a/packages/core/src/notify_bridge_core/providers/immich/asset_utils.py b/packages/core/src/notify_bridge_core/providers/immich/asset_utils.py index 3a324a9..2da9a4b 100644 --- a/packages/core/src/notify_bridge_core/providers/immich/asset_utils.py +++ b/packages/core/src/notify_bridge_core/providers/immich/asset_utils.py @@ -4,11 +4,13 @@ from __future__ import annotations import logging import random -from datetime import datetime +from datetime import datetime, timezone from typing import Any +from notify_bridge_core.models.media import MediaAsset, MediaType + from .constants import ASSET_TYPE_IMAGE, ASSET_TYPE_VIDEO -from .models import ImmichAssetInfo, SharedLinkInfo +from .models import ImmichAlbumData, ImmichAssetInfo, SharedLinkInfo _LOGGER = logging.getLogger(__name__) @@ -231,3 +233,123 @@ def build_asset_detail( ) return detail + + +def asset_to_media(asset: ImmichAssetInfo, external_url: str) -> MediaAsset: + """Convert ImmichAssetInfo to generic MediaAsset.""" + media_type = MediaType.IMAGE if asset.type == ASSET_TYPE_IMAGE else MediaType.VIDEO + try: + created_at = datetime.fromisoformat(asset.created_at.replace("Z", "+00:00")) + except (ValueError, AttributeError): + created_at = datetime.now(timezone.utc) + + return MediaAsset( + id=asset.id, + type=media_type, + filename=asset.filename, + created_at=created_at, + owner_name=asset.owner_name or None, + description=asset.description or None, + tags=list(asset.people), + thumbnail_url=f"{external_url}/api/assets/{asset.id}/thumbnail", + preview_url=f"{external_url}/api/assets/{asset.id}/thumbnail?size=preview", + full_url=f"{external_url}/api/assets/{asset.id}/original", + extra={ + "owner_id": asset.owner_id, + "is_favorite": asset.is_favorite, + "rating": asset.rating, + "latitude": asset.latitude, + "longitude": asset.longitude, + "city": asset.city, + "state": asset.state, + "country": asset.country, + "thumbhash": asset.thumbhash, + "file_size": asset.file_size, + }, + ) + + +def collect_scheduled_assets( + albums: dict[str, "ImmichAlbumData"], + shared_links: dict[str, list[SharedLinkInfo]], + external_url: str, + *, + limit: int = 10, + asset_type: str = "all", + favorite_only: bool = False, + min_rating: int = 0, + is_memory: bool = False, +) -> tuple[list[MediaAsset], list[dict[str, Any]]]: + """Collect and filter assets for scheduled/memory sends. + + This is the SINGLE function used by both test dispatch and the real + scheduled runner. Assets are filtered, randomly sampled, and converted + to MediaAsset objects. + + Args: + albums: Album ID → ImmichAlbumData mapping. + shared_links: Album ID → shared links mapping. + external_url: External domain for URL construction. + limit: Max number of assets to return. + asset_type: "all", "photo", or "video". + favorite_only: Only include favorites. + min_rating: Minimum rating filter. + is_memory: If True, filter to "On This Day" (same month+day, previous year). + + Returns: + (list of MediaAsset, list of collection info dicts) + """ + now = datetime.now(timezone.utc) + memory_date = now.isoformat() if is_memory else None + + all_eligible: list[ImmichAssetInfo] = [] + # Track which album each asset belongs to for public URL construction + asset_album_map: dict[str, tuple[str, str]] = {} # asset_id → (album_id, public_url) + collections_extra: list[dict[str, Any]] = [] + + for album_id, album in albums.items(): + links = shared_links.get(album_id, []) + album_public_url = get_public_url(external_url, links) or "" + + collections_extra.append({ + "name": album.name, + "url": album_public_url or f"{external_url}/albums/{album_id}", + "public_url": album_public_url, + "asset_count": album.asset_count, + "shared": album.shared, + "photo_count": album.photo_count, + "video_count": album.video_count, + "owner": album.owner, + }) + + filtered = filter_assets( + list(album.assets.values()), + favorite_only=favorite_only, + min_rating=min_rating, + asset_type=asset_type, + memory_date=memory_date, + ) + for asset in filtered: + if asset.id not in asset_album_map: + asset_album_map[asset.id] = (album_id, album_public_url) + all_eligible.append(asset) + + # Random sample + if len(all_eligible) > limit: + selected = random.sample(all_eligible, limit) + else: + random.shuffle(all_eligible) + selected = all_eligible + + # Convert to MediaAsset with public URLs + result: list[MediaAsset] = [] + for asset in selected: + media = asset_to_media(asset, external_url) + _, album_pub_url = asset_album_map.get(asset.id, ("", "")) + if album_pub_url: + media.extra["public_url"] = f"{album_pub_url}/photos/{asset.id}" + else: + media.extra.setdefault("public_url", "") + result.append(media) + + return result, collections_extra diff --git a/packages/core/src/notify_bridge_core/providers/immich/change_detector.py b/packages/core/src/notify_bridge_core/providers/immich/change_detector.py index 3679f08..6e26619 100644 --- a/packages/core/src/notify_bridge_core/providers/immich/change_detector.py +++ b/packages/core/src/notify_bridge_core/providers/immich/change_detector.py @@ -6,48 +6,14 @@ import logging from datetime import datetime, timezone from notify_bridge_core.models.events import EventType, ServiceEvent -from notify_bridge_core.models.media import MediaAsset, MediaType from notify_bridge_core.providers.base import ServiceProviderType +from .asset_utils import asset_to_media from .models import ImmichAlbumData, ImmichAssetInfo _LOGGER = logging.getLogger(__name__) -def _asset_to_media(asset: ImmichAssetInfo, external_url: str) -> MediaAsset: - """Convert ImmichAssetInfo to generic MediaAsset.""" - media_type = MediaType.IMAGE if asset.type == "IMAGE" else MediaType.VIDEO - - try: - created_at = datetime.fromisoformat(asset.created_at.replace("Z", "+00:00")) - except (ValueError, AttributeError): - created_at = datetime.now(timezone.utc) - - return MediaAsset( - id=asset.id, - type=media_type, - filename=asset.filename, - created_at=created_at, - owner_name=asset.owner_name or None, - description=asset.description or None, - tags=list(asset.people), - thumbnail_url=f"{external_url}/api/assets/{asset.id}/thumbnail", - full_url=f"{external_url}/api/assets/{asset.id}/original", - extra={ - "owner_id": asset.owner_id, - "is_favorite": asset.is_favorite, - "rating": asset.rating, - "latitude": asset.latitude, - "longitude": asset.longitude, - "city": asset.city, - "state": asset.state, - "country": asset.country, - "thumbhash": asset.thumbhash, - "file_size": asset.file_size, - }, - ) - - def _make_base_extra(new_album: ImmichAlbumData, external_url: str) -> dict: """Build the common extra dict for album events.""" return { @@ -119,7 +85,7 @@ def detect_album_changes( # Emit one event per change type detected if added_assets: - media_assets = [_asset_to_media(a, external_url) for a in added_assets] + media_assets = [asset_to_media(a, external_url) for a in added_assets] events.append(ServiceEvent( event_type=EventType.ASSETS_ADDED, provider_type=ServiceProviderType.IMMICH, diff --git a/packages/core/src/notify_bridge_core/providers/immich/models.py b/packages/core/src/notify_bridge_core/providers/immich/models.py index 65b2747..7c9b61f 100644 --- a/packages/core/src/notify_bridge_core/providers/immich/models.py +++ b/packages/core/src/notify_bridge_core/providers/immich/models.py @@ -177,6 +177,8 @@ class ImmichAlbumData: for asset_data in assets_data: asset = ImmichAssetInfo.from_api_response(asset_data, users_cache) + if not asset.is_processed: + continue asset_ids.add(asset.id) assets[asset.id] = asset people.update(asset.people) diff --git a/packages/core/src/notify_bridge_core/templates/context.py b/packages/core/src/notify_bridge_core/templates/context.py index 346ace1..0d4b313 100644 --- a/packages/core/src/notify_bridge_core/templates/context.py +++ b/packages/core/src/notify_bridge_core/templates/context.py @@ -58,6 +58,7 @@ def build_template_context( "description": asset.description or "", "tags": asset.tags, "thumbnail_url": asset.thumbnail_url or "", + "preview_url": asset.preview_url or "", "full_url": asset.full_url or "", } # Flatten extras into asset dict for template access diff --git a/packages/server/src/notify_bridge_server/api/notification_tracker_targets.py b/packages/server/src/notify_bridge_server/api/notification_tracker_targets.py index 7f35683..95acae3 100644 --- a/packages/server/src/notify_bridge_server/api/notification_tracker_targets.py +++ b/packages/server/src/notify_bridge_server/api/notification_tracker_targets.py @@ -21,7 +21,8 @@ from ..database.models import ( TrackingConfig, User, ) -from ..services.notifier import send_real_data_notification, send_test_notification +from ..services.notifier import send_test_notification +from ..services.test_dispatch import dispatch_test_notification _LOGGER = logging.getLogger(__name__) @@ -242,62 +243,14 @@ async def test_notification_tracker_target( r = await send_test_notification(target, locale=effective_locale) return {"target": target.name, **r} - # For periodic/scheduled/memory — fetch real data from provider - template_config = None - template_str = "" - if tt.template_config_id: - template_config = await session.get(TemplateConfig, tt.template_config_id) - if template_config: - slot_map = { - "periodic": "periodic_summary_message", - "scheduled": "scheduled_assets_message", - "memory": "memory_mode_message", - } - slot_name = slot_map[test_type] - slot_result = await session.exec( - select(TemplateSlot).where( - TemplateSlot.config_id == template_config.id, - TemplateSlot.slot_name == slot_name, - TemplateSlot.locale == effective_locale, - ) - ) - slot = slot_result.first() - if not slot: - # Fallback: any locale - slot_result2 = await session.exec( - select(TemplateSlot).where( - TemplateSlot.config_id == template_config.id, - TemplateSlot.slot_name == slot_name, - ) - ) - slot = slot_result2.first() - template_str = slot.template if slot else "" - - # Load provider and tracker data eagerly before aiohttp context - provider = await session.get(ServiceProvider, tracker.provider_id) - if not provider: - raise HTTPException(status_code=404, detail="Provider not found") - provider_config = dict(provider.config) - collection_ids = list(tracker.collection_ids or []) - - # Load tracking config to get memory_source - memory_source = "albums" - if tt.tracking_config_id: - tracking_config = await session.get(TrackingConfig, tt.tracking_config_id) - if tracking_config: - memory_source = tracking_config.memory_source or "albums" - - # Fetch real data from provider - r = await send_real_data_notification( + # For periodic/scheduled/memory — dispatch through the real NotificationDispatcher + r = await dispatch_test_notification( + session=session, + tracker=tracker, + tt=tt, target=target, - template_str=template_str, test_type=test_type, - provider_type=provider.type, - provider_config=provider_config, - collection_ids=collection_ids, - date_format=template_config.date_format if template_config else "%d.%m.%Y, %H:%M UTC", - date_only_format=template_config.date_only_format if template_config and template_config.date_only_format else "%d.%m.%Y", - memory_source=memory_source, + locale=effective_locale, ) return {"target": target.name, **r} diff --git a/packages/server/src/notify_bridge_server/services/notifier.py b/packages/server/src/notify_bridge_server/services/notifier.py index 0d50b5c..93c9d61 100644 --- a/packages/server/src/notify_bridge_server/services/notifier.py +++ b/packages/server/src/notify_bridge_server/services/notifier.py @@ -54,9 +54,10 @@ async def _load_receivers(target_id: int) -> list[dict]: async def send_to_target(target: NotificationTarget, message: str) -> dict: - """Send a message to a target, broadcasting to all receivers. + """Send a text message to a target, broadcasting to all receivers. - This is the SINGLE send path used by dispatch, test, and real-data notifications. + Used for basic test and template preview sends (text only, no media). + Real notifications with media go through NotificationDispatcher. """ try: receivers = await _load_receivers(target.id) @@ -356,241 +357,3 @@ async def send_test_template_notification( return {"success": False, "error": f"Template render error: {e}"} return await send_to_target(target, message) - - -async def send_real_data_notification( - target: NotificationTarget, - template_str: str, - test_type: str, - provider_type: str, - provider_config: dict, - collection_ids: list[str], - date_format: str = "%d.%m.%Y, %H:%M UTC", - date_only_format: str = "%d.%m.%Y", - memory_source: str = "albums", -) -> dict: - """Fetch real data from provider, render template, and send.""" - from jinja2.sandbox import SandboxedEnvironment - - if not template_str: - return {"success": False, "error": f"No template configured for {test_type}"} - - try: - ctx = await _build_real_context( - provider_type, provider_config, collection_ids, - test_type, date_format, date_only_format, - memory_source=memory_source, - ) - except Exception as e: - _LOGGER.error("Failed to fetch real data for test: %s", e) - return {"success": False, "error": f"Failed to fetch provider data: {e}"} - - ctx["target_type"] = target.type - ctx["date_format"] = date_format - ctx["date_only_format"] = date_only_format - - try: - env = SandboxedEnvironment(autoescape=False) - tmpl = env.from_string(template_str) - message = tmpl.render(**ctx) - except Exception as e: - return {"success": False, "error": f"Template render error: {e}"} - - return await send_to_target(target, message) - - -async def _build_real_context( - provider_type: str, - provider_config: dict, - collection_ids: list[str], - test_type: str, - date_format: str, - date_only_format: str, - memory_source: str = "albums", -) -> dict: - """Build template context from real provider data.""" - from datetime import datetime, timezone - - if provider_type != "immich": - return {"date": datetime.now(timezone.utc).strftime(date_only_format)} - - from notify_bridge_core.providers.immich import ImmichServiceProvider - - async with aiohttp.ClientSession() as http_session: - immich = ImmichServiceProvider( - http_session, - provider_config.get("url", ""), - provider_config.get("api_key", ""), - provider_config.get("external_domain"), - "Immich", - ) - connected = await immich.connect() - if not connected: - raise RuntimeError("Failed to connect to Immich") - - ext_domain = provider_config.get("external_domain") or provider_config.get("url", "") - - # --- Native Immich memories --- - if test_type == "memory" and memory_source == "native": - memories = await immich.client.get_memories() - all_assets: list[dict[str, Any]] = [] - tracked_ids = set(collection_ids) if collection_ids else None - for mem in memories: - for raw_asset in mem.get("assets", []): - asset_id = raw_asset.get("id", "") - # Optional album filtering: keep only assets in tracked albums - if tracked_ids: - asset_albums = raw_asset.get("albums", []) - if not any(a.get("id") in tracked_ids for a in asset_albums): - continue - exif = raw_asset.get("exifInfo") or {} - people_raw = raw_asset.get("people", []) - all_assets.append({ - "id": asset_id, - "filename": raw_asset.get("originalFileName", ""), - "type": (raw_asset.get("type") or "IMAGE").upper(), - "created_at": raw_asset.get("fileCreatedAt", raw_asset.get("createdAt", "")), - "owner": "", - "description": exif.get("description", "") or raw_asset.get("description", "") or "", - "people": [p.get("name", "") for p in people_raw if p.get("name")], - "is_favorite": raw_asset.get("isFavorite", False), - "rating": exif.get("rating"), - "city": exif.get("city", "") or "", - "state": exif.get("state", "") or "", - "country": exif.get("country", "") or "", - "public_url": "", - "url": f"{ext_domain.rstrip('/')}/api/assets/{asset_id}/original", - "photo_url": f"{ext_domain.rstrip('/')}/api/assets/{asset_id}/thumbnail", - "year": mem.get("data", {}).get("year"), - }) - - now = datetime.now(timezone.utc) - ctx: dict[str, Any] = { - "date": now.strftime(date_only_format), - "timestamp": now.isoformat(), - "service_name": "Immich", - "service_type": "immich", - "collections": [], - "albums": [], - "assets": all_assets, - "common_date": "", - "common_location": "", - "collection_name": "", "album_name": "", - "public_url": "", "album_url": "", - "shared": False, "photo_count": 0, "video_count": 0, "owner": "", - } - people: set[str] = set() - for a in all_assets: - people.update(a.get("people", [])) - ctx["people"] = list(people) - ctx["has_videos"] = any(a.get("type") == "VIDEO" for a in all_assets) - ctx["has_photos"] = any(a.get("type") == "IMAGE" for a in all_assets) - ctx["added_count"] = len(all_assets) - ctx["added_assets"] = all_assets - ctx["protected_url"] = "" - return ctx - - # --- Album-based asset collection (default path) --- - collections: list[dict[str, Any]] = [] - all_assets: list[dict[str, Any]] = [] - - for album_id in collection_ids: - album = await immich.client.get_album(album_id) - if not album: - continue - - shared_links = await immich.client.get_shared_links(album_id) - ext_domain = provider_config.get("external_domain") or provider_config.get("url", "") - album_public_url = "" - for link in shared_links: - if link.is_accessible and not link.is_expired and not link.has_password: - album_public_url = f"{ext_domain.rstrip('/')}/share/{link.key}" - break - - collections.append({ - "name": album.name, - "url": album_public_url or f"{ext_domain.rstrip('/')}/albums/{album_id}", - "public_url": album_public_url, - "asset_count": album.asset_count, - "shared": album.shared, - "photo_count": album.photo_count, - "video_count": album.video_count, - "owner": album.owner, - }) - - for asset_id, asset in list(album.assets.items())[:10]: - asset_public_url = f"{album_public_url}/photos/{asset_id}" if album_public_url else "" - all_assets.append({ - "id": asset.id, - "filename": asset.filename, - "type": asset.type.upper(), - "created_at": asset.created_at, - "owner": asset.owner_name or "", - "description": asset.description or "", - "people": list(asset.people), - "is_favorite": asset.is_favorite, - "rating": asset.rating, - "city": asset.city or "", - "state": asset.state or "", - "country": asset.country or "", - "public_url": asset_public_url, - "url": f"{ext_domain.rstrip('/')}/api/assets/{asset.id}/original", - "photo_url": f"{ext_domain.rstrip('/')}/api/assets/{asset.id}/thumbnail", - }) - - now = datetime.now(timezone.utc) - ctx: dict[str, Any] = { - "date": now.strftime(date_only_format), - "timestamp": now.isoformat(), - "service_name": "Immich", - "service_type": "immich", - "collections": collections, - "albums": collections, - "assets": all_assets, - "common_date": "", - "common_location": "", - } - - if len(all_assets) > 1: - dates = {a.get("created_at", "")[:10] for a in all_assets if a.get("created_at")} - if len(dates) == 1: - try: - ctx["common_date"] = datetime.fromisoformat(dates.pop()).strftime(date_only_format) - except (ValueError, TypeError): - pass - - locations = set() - for a in all_assets: - city = a.get("city", "") - country = a.get("country", "") - locations.add(f"{city}, {country}" if city and country else city or "") - if len(locations) == 1 and "" not in locations: - ctx["common_location"] = locations.pop() - - if collections: - first = collections[0] - ctx.update({ - "collection_name": first["name"], "album_name": first["name"], - "public_url": first.get("public_url", ""), "album_url": first.get("url", ""), - "shared": first.get("shared", False), - "photo_count": first.get("photo_count", 0), "video_count": first.get("video_count", 0), - "owner": first.get("owner", ""), - }) - else: - ctx.update({ - "collection_name": "", "album_name": "", - "public_url": "", "album_url": "", - "shared": False, "photo_count": 0, "video_count": 0, "owner": "", - }) - - people: set[str] = set() - for a in all_assets: - people.update(a.get("people", [])) - ctx["people"] = list(people) - ctx["has_videos"] = any(a.get("type") == "VIDEO" for a in all_assets) - ctx["has_photos"] = any(a.get("type") == "IMAGE" for a in all_assets) - ctx["added_count"] = len(all_assets) - ctx["added_assets"] = all_assets - ctx["protected_url"] = "" - - return ctx diff --git a/packages/server/src/notify_bridge_server/services/test_dispatch.py b/packages/server/src/notify_bridge_server/services/test_dispatch.py new file mode 100644 index 0000000..e310a17 --- /dev/null +++ b/packages/server/src/notify_bridge_server/services/test_dispatch.py @@ -0,0 +1,314 @@ +"""Test dispatch — manual trigger through the real NotificationDispatcher. + +No separate logic — just builds a ServiceEvent + TargetConfig from DB +objects and dispatches through the same path the watcher uses. +""" + +import logging +from typing import Any + +import aiohttp +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +from notify_bridge_core.models.events import EventType, ServiceEvent +from notify_bridge_core.models.media import MediaAsset +from notify_bridge_core.notifications.dispatcher import NotificationDispatcher, TargetConfig +from notify_bridge_core.providers.base import ServiceProviderType + +from ..database.models import ( + NotificationTarget, + NotificationTracker, + NotificationTrackerTarget, + ServiceProvider, + TemplateConfig, + TemplateSlot, + TrackingConfig, +) +from .dispatch_helpers import _resolve_target +from .watcher import _get_telegram_caches + +_LOGGER = logging.getLogger(__name__) + +# Maps test_type → DB template slot name +_TEST_TYPE_SLOT_MAP = { + "periodic": "periodic_summary_message", + "scheduled": "scheduled_assets_message", + "memory": "memory_mode_message", +} + + +async def dispatch_test_notification( + *, + session: AsyncSession, + tracker: NotificationTracker, + tt: NotificationTrackerTarget, + target: NotificationTarget, + test_type: str, + locale: str = "en", +) -> dict[str, Any]: + """Dispatch a test notification through the real NotificationDispatcher.""" + + # Load provider + provider = await session.get(ServiceProvider, tracker.provider_id) + if not provider: + return {"success": False, "error": "Provider not found"} + provider_config = dict(provider.config) + collection_ids = list(tracker.collection_ids or []) + + # Load tracking config + tracking_config = None + if tt.tracking_config_id: + tracking_config = await session.get(TrackingConfig, tt.tracking_config_id) + + # Load template slots keyed by EventType.SCHEDULED_MESSAGE.value + template_config = None + template_slots: dict[str, dict[str, str]] | None = None + slot_name = _TEST_TYPE_SLOT_MAP.get(test_type, test_type) + if tt.template_config_id: + template_config = await session.get(TemplateConfig, tt.template_config_id) + if template_config: + slot_result = await session.exec( + select(TemplateSlot).where( + TemplateSlot.config_id == template_config.id, + TemplateSlot.slot_name == slot_name, + ) + ) + locale_map: dict[str, str] = {} + for s in slot_result.all(): + locale_map[s.locale] = s.template + if locale_map: + template_slots = {EventType.SCHEDULED_MESSAGE.value: locale_map} + + # Resolve target config + receivers (same as watcher) + resolved = await _resolve_target(session, target) + + target_cfg = TargetConfig( + type=resolved["target_type"], + config=resolved["target_config"], + template_slots=template_slots, + locale=locale, + date_format=template_config.date_format if template_config else "%d.%m.%Y, %H:%M UTC", + date_only_format=template_config.date_only_format if template_config and template_config.date_only_format else "%d.%m.%Y", + provider_api_key=provider_config.get("api_key"), + provider_internal_url=provider_config.get("url", ""), + provider_external_url=provider_config.get("external_domain", ""), + receivers=resolved["receivers"], + ) + + # Fetch assets and build event + event = await _build_event( + provider_type=provider.type, + provider_config=provider_config, + provider_name=provider.name or provider.type, + tracker_name=tracker.name or "", + tracker_filters=dict(tracker.filters) if tracker.filters else {}, + collection_ids=collection_ids, + test_type=test_type, + tracking_config=tracking_config, + ) + if event is None: + return {"success": False, "error": "No data returned from provider"} + if not event.added_assets and test_type in ("scheduled", "memory"): + return {"success": False, "error": "No matching assets found" + (" for today" if test_type == "memory" else "")} + + # Dispatch through the real NotificationDispatcher + url_cache, asset_cache = await _get_telegram_caches() + dispatcher = NotificationDispatcher(url_cache=url_cache, asset_cache=asset_cache) + results = await dispatcher.dispatch(event, [target_cfg]) + + if not results: + return {"success": False, "error": "No dispatch results"} + return results[0] + + +async def _build_event( + *, + provider_type: str, + provider_config: dict, + provider_name: str, + tracker_name: str, + tracker_filters: dict, + collection_ids: list[str], + test_type: str, + tracking_config: TrackingConfig | None = None, +) -> ServiceEvent | None: + """Build a ServiceEvent with real provider data.""" + from datetime import datetime, timezone + + if provider_type == "immich": + return await _build_immich_event( + provider_config=provider_config, + provider_name=provider_name, + tracker_name=tracker_name, + collection_ids=collection_ids, + test_type=test_type, + tracking_config=tracking_config, + ) + elif provider_type == "scheduler": + from notify_bridge_core.providers.scheduler import SchedulerServiceProvider + custom_vars = tracker_filters.get("custom_variables", {}) + sched = SchedulerServiceProvider( + name=provider_name, + tracker_name=tracker_name, + custom_variables=custom_vars, + ) + events, _ = await sched.poll(collection_ids, {}) + return events[0] if events else None + + return None + + +async def _build_immich_event( + *, + provider_config: dict, + provider_name: str, + tracker_name: str, + collection_ids: list[str], + test_type: str, + tracking_config: TrackingConfig | None = None, +) -> ServiceEvent | None: + """Build an Immich scheduled/memory event using shared core utilities.""" + from datetime import datetime, timezone + from notify_bridge_core.providers.immich import ImmichServiceProvider + from notify_bridge_core.providers.immich.asset_utils import collect_scheduled_assets + from notify_bridge_core.providers.immich.models import ImmichAlbumData, SharedLinkInfo + + ext_domain = provider_config.get("external_domain") or provider_config.get("url", "") + prefix = "memory" if test_type == "memory" else "scheduled" + limit = getattr(tracking_config, f"{prefix}_limit", 10) if tracking_config else 10 + asset_type = getattr(tracking_config, f"{prefix}_asset_type", "all") if tracking_config else "all" + favorite_only = getattr(tracking_config, f"{prefix}_favorite_only", False) if tracking_config else False + min_rating = getattr(tracking_config, f"{prefix}_min_rating", 0) if tracking_config else 0 + memory_source = getattr(tracking_config, "memory_source", "albums") if tracking_config else "albums" + is_memory = test_type == "memory" + + async with aiohttp.ClientSession() as http_session: + immich = ImmichServiceProvider( + http_session, + provider_config.get("url", ""), + provider_config.get("api_key", ""), + provider_config.get("external_domain"), + provider_name, + ) + if not await immich.connect(): + return None + + # Native Immich memories API path + if is_memory and memory_source == "native": + return await _build_native_memory_event( + immich, ext_domain, provider_name, tracker_name, + collection_ids, limit, asset_type, favorite_only, min_rating, + ) + + # Album-based path: use shared collect_scheduled_assets + albums: dict[str, ImmichAlbumData] = {} + shared_links: dict[str, list[SharedLinkInfo]] = {} + for album_id in collection_ids: + album = await immich.client.get_album(album_id) + if album: + albums[album_id] = album + shared_links[album_id] = await immich.client.get_shared_links(album_id) + + assets, collections_extra = collect_scheduled_assets( + albums, shared_links, ext_domain, + limit=limit, + asset_type=asset_type, + favorite_only=favorite_only, + min_rating=min_rating, + is_memory=is_memory, + ) + + first_col = collections_extra[0] if collections_extra else {} + return ServiceEvent( + event_type=EventType.SCHEDULED_MESSAGE, + provider_type=ServiceProviderType.IMMICH, + provider_name=provider_name, + collection_id=collection_ids[0] if collection_ids else "", + collection_name=first_col.get("name", tracker_name), + timestamp=datetime.now(timezone.utc), + added_assets=assets, + added_count=len(assets), + extra={ + "collections": collections_extra, + "albums": collections_extra, + **(first_col if first_col else {}), + }, + ) + + +async def _build_native_memory_event( + immich, + ext_domain: str, + provider_name: str, + tracker_name: str, + collection_ids: list[str], + limit: int, + asset_type: str, + favorite_only: bool, + min_rating: int, +) -> ServiceEvent | None: + """Build event from Immich native memories API.""" + import random + from datetime import datetime, timezone + from notify_bridge_core.models.media import MediaAsset, MediaType + from notify_bridge_core.providers.immich.asset_utils import filter_assets + from notify_bridge_core.providers.immich.models import ImmichAssetInfo + + memories = await immich.client.get_memories() + tracked_ids = set(collection_ids) if collection_ids else None + + # Collect raw assets, convert to ImmichAssetInfo for unified filtering + raw_assets: list[ImmichAssetInfo] = [] + year_map: dict[str, int | None] = {} # asset_id → memory year + for mem in memories: + mem_year = mem.get("data", {}).get("year") + for raw in mem.get("assets", []): + asset_id = raw.get("id", "") + if tracked_ids: + asset_albums = raw.get("albums", []) + if not any(a.get("id") in tracked_ids for a in asset_albums): + continue + asset = ImmichAssetInfo.from_api_response(raw) + if not asset.is_processed: + continue + raw_assets.append(asset) + year_map[asset_id] = mem_year + + # Apply standard filters (no memory_date — native API already filters by date) + filtered = filter_assets( + raw_assets, + favorite_only=favorite_only, + min_rating=min_rating, + asset_type=asset_type, + ) + + # Random sample + if len(filtered) > limit: + selected = random.sample(filtered, limit) + else: + random.shuffle(filtered) + selected = filtered + + from notify_bridge_core.providers.immich.asset_utils import asset_to_media + + all_assets = [] + for asset in selected: + media = asset_to_media(asset, ext_domain) + media.extra["year"] = year_map.get(asset.id) + all_assets.append(media) + + return ServiceEvent( + event_type=EventType.SCHEDULED_MESSAGE, + provider_type=ServiceProviderType.IMMICH, + provider_name=provider_name, + collection_id=collection_ids[0] if collection_ids else "", + collection_name=tracker_name, + timestamp=datetime.now(timezone.utc), + added_assets=all_assets, + added_count=len(all_assets), + extra={ + "collections": [], + "albums": [], + }, + ) diff --git a/scripts/restart-backend.sh b/scripts/restart-backend.sh index f8aadaa..9b9ea84 100644 --- a/scripts/restart-backend.sh +++ b/scripts/restart-backend.sh @@ -2,7 +2,6 @@ # Restart the backend dev server. # Usage: bash scripts/restart-backend.sh -set -e cd "$(dirname "$0")/.." # Kill existing backend @@ -12,12 +11,28 @@ if [ -n "$PID" ]; then sleep 1 fi +# Resolve python — py launcher needs absolute path for nohup on Windows +if command -v python3 &>/dev/null; then + PYTHON=python3 +elif command -v python &>/dev/null; then + PYTHON=python +else + PYTHON=$(py -3.13 -c "import sys; print(sys.executable)" 2>/dev/null \ + || py -3 -c "import sys; print(sys.executable)") +fi + # Start backend -NOTIFY_BRIDGE_DATA_DIR=./test-data \ -NOTIFY_BRIDGE_SECRET_KEY=test-secret-key-minimum-32chars \ -PYTHON=$(command -v python3 2>/dev/null || command -v python 2>/dev/null || echo "py -3.13") +export NOTIFY_BRIDGE_DATA_DIR=./test-data +export NOTIFY_BRIDGE_SECRET_KEY=test-secret-key-minimum-32chars nohup "$PYTHON" -m uvicorn notify_bridge_server.main:app \ - --host 0.0.0.0 --port 8420 > /dev/null 2>&1 & + --host 0.0.0.0 --port 8420 > .backend.log 2>&1 & sleep 3 -curl -s http://localhost:8420/api/health +if curl -sf http://localhost:8420/api/health; then + echo "" + echo "Backend started (PID $!)" +else + echo "Backend failed to start. Log:" + tail -20 .backend.log + exit 1 +fi