"""Album watcher service - polls Immich and detects changes.""" from __future__ import annotations import logging from datetime import datetime, timezone from typing import Any import aiohttp from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from sqlalchemy.ext.asyncio import AsyncEngine from immich_watcher_core.asset_utils import build_asset_detail, get_any_url from immich_watcher_core.change_detector import detect_album_changes from immich_watcher_core.immich_client import ImmichApiError, ImmichClient from immich_watcher_core.models import AlbumChange, AlbumData from ..database.engine import get_engine from ..database.models import ( AlbumState, AlbumTracker, EventLog, ImmichServer, NotificationTarget, TemplateConfig, TrackingConfig, ) from .notifier import send_notification _LOGGER = logging.getLogger(__name__) async def check_tracker(tracker_id: int) -> dict[str, Any]: """Check a single tracker for album changes. Called by the scheduler (creates its own session). """ engine = get_engine() async with AsyncSession(engine) as session: return await check_tracker_with_session(tracker_id, session) async def check_tracker_with_session( tracker_id: int, session: AsyncSession ) -> dict[str, Any]: """Check a single tracker using a provided session. Called by API trigger (reuses route session) or by check_tracker. """ tracker = await session.get(AlbumTracker, tracker_id) if not tracker or not tracker.enabled: return {"skipped": True, "reason": "disabled or not found"} server = await session.get(ImmichServer, tracker.server_id) if not server: return {"error": "Server not found"} # Eagerly read all needed data before entering aiohttp context # (SQLAlchemy async greenlet context doesn't survive across other async CMs) album_ids = list(tracker.album_ids) target_ids = list(tracker.target_ids) tracker_db_id = tracker_id server_url = server.url server_api_key = server.api_key results = [] async with aiohttp.ClientSession() as http_session: client = ImmichClient(http_session, server_url, server_api_key) await client.get_server_config() users_cache = await client.get_users() for album_id in album_ids: result = await _check_album( session, http_session, client, tracker_db_id, album_id, users_cache, target_ids, ) results.append(result) await session.commit() return {"albums_checked": len(album_ids), "results": results} async def _check_album( session: AsyncSession, http_session: aiohttp.ClientSession, client: ImmichClient, tracker_id: int, album_id: str, users_cache: dict[str, str], target_ids: list[int], ) -> dict[str, Any]: """Check a single album for changes.""" try: album = await client.get_album(album_id, users_cache) except ImmichApiError as err: _LOGGER.error("Failed to fetch album %s: %s", album_id, err) return {"album_id": album_id, "error": str(err)} if album is None: return {"album_id": album_id, "status": "not_found"} # Load previous state result = await session.exec( select(AlbumState).where( AlbumState.tracker_id == tracker_id, AlbumState.album_id == album_id, ) ) state = result.first() if state is None: # First check - save state, no change detection state = AlbumState( tracker_id=tracker_id, album_id=album_id, asset_ids=list(album.asset_ids), pending_asset_ids=[], last_updated=datetime.now(timezone.utc), ) session.add(state) return {"album_id": album_id, "status": "initialized", "asset_count": album.asset_count} # Build previous AlbumData from persisted state for change detection previous_asset_ids = set(state.asset_ids) pending = set(state.pending_asset_ids) # Create a minimal previous AlbumData for comparison prev_album = AlbumData( id=album_id, name=album.name, # Use current name (rename detection compares) asset_count=len(previous_asset_ids), photo_count=0, video_count=0, created_at=album.created_at, updated_at="", shared=album.shared, # Use current (sharing detection compares) owner=album.owner, thumbnail_asset_id=None, asset_ids=previous_asset_ids, ) # Detect changes change, updated_pending = detect_album_changes(prev_album, album, pending) # Update persisted state state.asset_ids = list(album.asset_ids) state.pending_asset_ids = list(updated_pending) state.last_updated = datetime.now(timezone.utc) session.add(state) if change is None: return {"album_id": album_id, "status": "no_changes"} # Log the event shared_links = await client.get_shared_links(album_id) event_data = _build_event_data(change, album, client.external_url, shared_links) event_log = EventLog( tracker_id=tracker_id, event_type=change.change_type, album_id=album_id, album_name=album.name, details={"added_count": change.added_count, "removed_count": change.removed_count}, ) session.add(event_log) # Send notifications to each target, filtered by its tracking config for target_id in target_ids: target = await session.get(NotificationTarget, target_id) if not target: continue # Check target's tracking config for event filtering tracking_config = None if target.tracking_config_id: tracking_config = await session.get(TrackingConfig, target.tracking_config_id) if tracking_config: # Filter by event type should_notify = False if change.change_type == "assets_added" and tracking_config.track_assets_added: should_notify = True elif change.change_type == "assets_removed" and tracking_config.track_assets_removed: should_notify = True elif change.change_type == "album_renamed" and tracking_config.track_album_renamed: should_notify = True elif change.change_type == "album_deleted" and tracking_config.track_album_deleted: should_notify = True elif change.change_type == "changed": should_notify = True # "changed" = mixed, always notify if not should_notify: continue # Get target's template config template_config = None if target.template_config_id: template_config = await session.get(TemplateConfig, target.template_config_id) try: use_ai = target.config.get("ai_captions", False) await send_notification(target, event_data, template_config, use_ai_caption=use_ai) except Exception: _LOGGER.exception("Failed to send notification to target %d", target_id) return { "album_id": album_id, "status": "changed", "change_type": change.change_type, "added_count": change.added_count, "removed_count": change.removed_count, } def _build_event_data( change: AlbumChange, album: AlbumData, external_url: str, shared_links: list, ) -> dict[str, Any]: """Build event data dict for template rendering and webhook payload.""" added_details = [] for asset in change.added_assets: if asset.is_processed: added_details.append( build_asset_detail(asset, external_url, shared_links, include_thumbnail=False) ) album_url = get_any_url(external_url, shared_links) return { "album_id": change.album_id, "album_name": change.album_name, "album_url": album_url or "", "change_type": change.change_type, "added_count": change.added_count, "removed_count": change.removed_count, "added_assets": added_details, "removed_assets": change.removed_asset_ids, "people": list(album.people), "shared": album.shared, "old_name": change.old_name, "new_name": change.new_name, "old_shared": change.old_shared, "new_shared": change.new_shared, }