feat(notify-bridge): phase 3 - Immich service provider

Implement ImmichServiceProvider as first concrete ServiceProvider:
- ImmichClient: async API client (ping, albums, shared links, search, thumbnails)
- ImmichAssetInfo/ImmichAlbumData: Immich-specific models with from_api_response()
- Change detector: produces generic ServiceEvent from album diffs
- Asset utils: filter, sort, URL building for Immich assets
- 12 Immich-specific template variables registered in global VariableRegistry
- Provider config schema (url, api_key, external_domain)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-19 22:37:17 +03:00
parent 3ed0d8ce88
commit cc02558fdf
7 changed files with 1141 additions and 0 deletions
@@ -1 +1,17 @@
"""Immich service provider implementation.""" """Immich service provider implementation."""
from notify_bridge_core.providers.base import ServiceProviderType
from notify_bridge_core.templates.variables import registry
from .client import ImmichClient, ImmichApiError
from .provider import ImmichServiceProvider, IMMICH_VARIABLES
# Register Immich variables in the global registry
registry.register_provider_variables(ServiceProviderType.IMMICH, IMMICH_VARIABLES)
__all__ = [
"ImmichClient",
"ImmichApiError",
"ImmichServiceProvider",
"IMMICH_VARIABLES",
]
@@ -0,0 +1,233 @@
"""Immich asset filtering, sorting, and URL utilities."""
from __future__ import annotations
import logging
import random
from datetime import datetime
from typing import Any
from .constants import ASSET_TYPE_IMAGE, ASSET_TYPE_VIDEO
from .models import ImmichAssetInfo, SharedLinkInfo
_LOGGER = logging.getLogger(__name__)
def filter_assets(
assets: list[ImmichAssetInfo],
*,
favorite_only: bool = False,
min_rating: int = 1,
asset_type: str = "all",
min_date: str | None = None,
max_date: str | None = None,
memory_date: str | None = None,
city: str | None = None,
state: str | None = None,
country: str | None = None,
processed_only: bool = True,
) -> list[ImmichAssetInfo]:
"""Filter Immich assets by various criteria."""
result = list(assets)
if processed_only:
result = [a for a in result if a.is_processed]
if favorite_only:
result = [a for a in result if a.is_favorite]
if min_rating > 1:
result = [a for a in result if a.rating is not None and a.rating >= min_rating]
if asset_type == "photo":
result = [a for a in result if a.type == ASSET_TYPE_IMAGE]
elif asset_type == "video":
result = [a for a in result if a.type == ASSET_TYPE_VIDEO]
if min_date:
result = [a for a in result if a.created_at >= min_date]
if max_date:
result = [a for a in result if a.created_at <= max_date]
if memory_date:
try:
ref_date = datetime.fromisoformat(memory_date.replace("Z", "+00:00"))
ref_year, ref_month, ref_day = ref_date.year, ref_date.month, ref_date.day
def matches_memory(asset: ImmichAssetInfo) -> bool:
try:
asset_date = datetime.fromisoformat(
asset.created_at.replace("Z", "+00:00")
)
return (
asset_date.month == ref_month
and asset_date.day == ref_day
and asset_date.year != ref_year
)
except (ValueError, AttributeError):
return False
result = [a for a in result if matches_memory(a)]
except ValueError:
_LOGGER.warning("Invalid memory_date format: %s", memory_date)
if city:
city_lower = city.lower()
result = [a for a in result if a.city and city_lower in a.city.lower()]
if state:
state_lower = state.lower()
result = [a for a in result if a.state and state_lower in a.state.lower()]
if country:
country_lower = country.lower()
result = [a for a in result if a.country and country_lower in a.country.lower()]
return result
def sort_assets(
assets: list[ImmichAssetInfo],
order_by: str = "date",
order: str = "descending",
) -> list[ImmichAssetInfo]:
"""Sort Immich assets by the specified field."""
result = list(assets)
if order_by == "random":
random.shuffle(result)
elif order_by == "rating":
result = sorted(
result,
key=lambda a: (a.rating is None, a.rating if a.rating is not None else 0),
reverse=(order == "descending"),
)
elif order_by == "name":
result = sorted(
result,
key=lambda a: a.filename.lower(),
reverse=(order == "descending"),
)
else: # date
result = sorted(
result,
key=lambda a: a.created_at,
reverse=(order == "descending"),
)
return result
# --- Shared link URL helpers ---
def get_accessible_links(links: list[SharedLinkInfo]) -> list[SharedLinkInfo]:
return [link for link in links if link.is_accessible]
def get_protected_links(links: list[SharedLinkInfo]) -> list[SharedLinkInfo]:
return [link for link in links if link.has_password and not link.is_expired]
def get_public_url(external_url: str, links: list[SharedLinkInfo]) -> str | None:
accessible = get_accessible_links(links)
if accessible:
return f"{external_url}/share/{accessible[0].key}"
return None
def get_any_url(external_url: str, links: list[SharedLinkInfo]) -> str | None:
accessible = get_accessible_links(links)
if accessible:
return f"{external_url}/share/{accessible[0].key}"
non_expired = [link for link in links if not link.is_expired]
if non_expired:
return f"{external_url}/share/{non_expired[0].key}"
return None
def get_protected_url(external_url: str, links: list[SharedLinkInfo]) -> str | None:
protected = get_protected_links(links)
if protected:
return f"{external_url}/share/{protected[0].key}"
return None
def get_protected_password(links: list[SharedLinkInfo]) -> str | None:
protected = get_protected_links(links)
if protected and protected[0].password:
return protected[0].password
return None
def _get_best_link_key(links: list[SharedLinkInfo]) -> str | None:
accessible = get_accessible_links(links)
if accessible:
return accessible[0].key
non_expired = [link for link in links if not link.is_expired]
if non_expired:
return non_expired[0].key
return None
def get_asset_download_url(
external_url: str, links: list[SharedLinkInfo], asset_id: str
) -> str | None:
key = _get_best_link_key(links)
if key:
return f"{external_url}/api/assets/{asset_id}/original?key={key}"
return None
def get_asset_photo_url(
external_url: str, links: list[SharedLinkInfo], asset_id: str
) -> str | None:
key = _get_best_link_key(links)
if key:
return f"{external_url}/api/assets/{asset_id}/thumbnail?size=preview&key={key}"
return None
def get_asset_video_url(
external_url: str, links: list[SharedLinkInfo], asset_id: str
) -> str | None:
key = _get_best_link_key(links)
if key:
return f"{external_url}/api/assets/{asset_id}/video/playback?key={key}"
return None
def build_asset_detail(
asset: ImmichAssetInfo,
external_url: str,
shared_links: list[SharedLinkInfo],
) -> dict[str, Any]:
"""Build asset detail dictionary with all available data."""
detail: dict[str, Any] = {
"id": asset.id,
"type": asset.type,
"filename": asset.filename,
"created_at": asset.created_at,
"owner": asset.owner_name,
"owner_id": asset.owner_id,
"description": asset.description,
"people": asset.people,
"is_favorite": asset.is_favorite,
"rating": asset.rating,
"latitude": asset.latitude,
"longitude": asset.longitude,
"city": asset.city,
"state": asset.state,
"country": asset.country,
"thumbnail_url": f"{external_url}/api/assets/{asset.id}/thumbnail",
}
key = _get_best_link_key(shared_links)
if key:
detail["url"] = f"{external_url}/share/{key}/photos/{asset.id}"
detail["download_url"] = f"{external_url}/api/assets/{asset.id}/original?key={key}"
if asset.type == ASSET_TYPE_VIDEO:
detail["playback_url"] = (
f"{external_url}/api/assets/{asset.id}/video/playback?key={key}"
)
elif asset.type == ASSET_TYPE_IMAGE:
detail["photo_url"] = (
f"{external_url}/api/assets/{asset.id}/thumbnail?size=preview&key={key}"
)
return detail
@@ -0,0 +1,143 @@
"""Immich album change detection — produces generic ServiceEvent objects."""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from notify_bridge_core.models.events import EventType, ServiceEvent
from notify_bridge_core.models.media import MediaAsset, MediaType
from notify_bridge_core.providers.base import ServiceProviderType
from .models import ImmichAlbumData, ImmichAssetInfo
_LOGGER = logging.getLogger(__name__)
def _asset_to_media(asset: ImmichAssetInfo, external_url: str) -> MediaAsset:
"""Convert ImmichAssetInfo to generic MediaAsset."""
media_type = MediaType.IMAGE if asset.type == "IMAGE" else MediaType.VIDEO
try:
created_at = datetime.fromisoformat(asset.created_at.replace("Z", "+00:00"))
except (ValueError, AttributeError):
created_at = datetime.now(timezone.utc)
return MediaAsset(
id=asset.id,
type=media_type,
filename=asset.filename,
created_at=created_at,
owner_name=asset.owner_name or None,
description=asset.description or None,
tags=list(asset.people),
thumbnail_url=f"{external_url}/api/assets/{asset.id}/thumbnail",
full_url=f"{external_url}/api/assets/{asset.id}/original",
extra={
"owner_id": asset.owner_id,
"is_favorite": asset.is_favorite,
"rating": asset.rating,
"latitude": asset.latitude,
"longitude": asset.longitude,
"city": asset.city,
"state": asset.state,
"country": asset.country,
"thumbhash": asset.thumbhash,
},
)
def detect_album_changes(
old_album: ImmichAlbumData,
new_album: ImmichAlbumData,
pending_asset_ids: set[str],
provider_name: str,
external_url: str,
) -> tuple[ServiceEvent | None, set[str]]:
"""Detect changes between two album states, producing a generic ServiceEvent.
Args:
old_album: Previous album data
new_album: Current album data
pending_asset_ids: Assets detected but not yet fully processed
provider_name: Name of the provider instance
external_url: External URL for building asset URLs
Returns:
Tuple of (ServiceEvent or None, updated pending_asset_ids)
"""
added_ids = new_album.asset_ids - old_album.asset_ids
removed_ids = old_album.asset_ids - new_album.asset_ids
pending = set(pending_asset_ids)
# Collect newly added processed assets
added_assets: list[ImmichAssetInfo] = []
for aid in added_ids:
if aid not in new_album.assets:
continue
asset = new_album.assets[aid]
if asset.is_processed:
added_assets.append(asset)
else:
pending.add(aid)
# Check if pending assets are now processed
for aid in list(pending):
if aid not in new_album.assets:
pending.discard(aid)
continue
asset = new_album.assets[aid]
if asset.is_processed:
added_assets.append(asset)
pending.discard(aid)
# Detect metadata changes
name_changed = old_album.name != new_album.name
sharing_changed = old_album.shared != new_album.shared
if not added_assets and not removed_ids and not name_changed and not sharing_changed:
return None, pending
# Determine event type
if name_changed and not added_assets and not removed_ids and not sharing_changed:
event_type = EventType.COLLECTION_RENAMED
elif sharing_changed and not added_assets and not removed_ids and not name_changed:
event_type = EventType.SHARING_CHANGED
elif added_assets and not removed_ids and not name_changed and not sharing_changed:
event_type = EventType.ASSETS_ADDED
elif removed_ids and not added_assets and not name_changed and not sharing_changed:
event_type = EventType.ASSETS_REMOVED
else:
event_type = EventType.ASSETS_ADDED # default for mixed changes
# Convert to generic MediaAssets
media_assets = [_asset_to_media(a, external_url) for a in added_assets]
event = ServiceEvent(
event_type=event_type,
provider_type=ServiceProviderType.IMMICH,
provider_name=provider_name,
collection_id=new_album.id,
collection_name=new_album.name,
timestamp=datetime.now(timezone.utc),
added_assets=media_assets,
removed_asset_ids=list(removed_ids),
added_count=len(added_assets),
removed_count=len(removed_ids),
old_name=old_album.name if name_changed else None,
new_name=new_album.name if name_changed else None,
old_shared=old_album.shared if sharing_changed else None,
new_shared=new_album.shared if sharing_changed else None,
extra={
"album_url": f"{external_url}/albums/{new_album.id}",
"people": list(new_album.people),
"shared": new_album.shared,
"photo_count": new_album.photo_count,
"video_count": new_album.video_count,
"asset_count": new_album.asset_count,
"owner": new_album.owner,
},
)
return event, pending
@@ -0,0 +1,270 @@
"""Async Immich API client."""
from __future__ import annotations
import logging
from typing import Any
import aiohttp
from .models import ImmichAlbumData, SharedLinkInfo
_LOGGER = logging.getLogger(__name__)
class ImmichClient:
"""Async client for the Immich API."""
def __init__(
self,
session: aiohttp.ClientSession,
url: str,
api_key: str,
) -> None:
self._session = session
self._url = url.rstrip("/")
self._api_key = api_key
self._external_domain: str | None = None
@property
def url(self) -> str:
return self._url
@property
def external_url(self) -> str:
if self._external_domain:
return self._external_domain.rstrip("/")
return self._url
@property
def api_key(self) -> str:
return self._api_key
def get_internal_download_url(self, url: str) -> str:
if self._external_domain:
external = self._external_domain.rstrip("/")
if url.startswith(external):
return url.replace(external, self._url, 1)
return url
@property
def _headers(self) -> dict[str, str]:
return {"x-api-key": self._api_key}
@property
def _json_headers(self) -> dict[str, str]:
return {
"x-api-key": self._api_key,
"Content-Type": "application/json",
}
async def ping(self) -> bool:
try:
async with self._session.get(
f"{self._url}/api/server/ping",
headers=self._headers,
) as response:
return response.status == 200
except aiohttp.ClientError:
return False
async def get_server_config(self) -> str | None:
try:
async with self._session.get(
f"{self._url}/api/server/config",
headers=self._headers,
) as response:
if response.status == 200:
data = await response.json()
external_domain = data.get("externalDomain", "") or ""
self._external_domain = external_domain
return external_domain or None
_LOGGER.warning(
"Failed to fetch server config: HTTP %s", response.status
)
except aiohttp.ClientError as err:
_LOGGER.warning("Failed to fetch server config: %s", err)
return None
async def get_users(self) -> dict[str, str]:
try:
async with self._session.get(
f"{self._url}/api/users",
headers=self._headers,
) as response:
if response.status == 200:
data = await response.json()
return {
u["id"]: u.get("name", u.get("email", "Unknown"))
for u in data
if u.get("id")
}
except aiohttp.ClientError as err:
_LOGGER.warning("Failed to fetch users: %s", err)
return {}
async def get_people(self) -> dict[str, str]:
try:
async with self._session.get(
f"{self._url}/api/people",
headers=self._headers,
) as response:
if response.status == 200:
data = await response.json()
people_list = data.get("people", data) if isinstance(data, dict) else data
return {
p["id"]: p.get("name", "")
for p in people_list
if p.get("name")
}
except aiohttp.ClientError as err:
_LOGGER.warning("Failed to fetch people: %s", err)
return {}
async def get_shared_links(self, album_id: str) -> list[SharedLinkInfo]:
links: list[SharedLinkInfo] = []
try:
async with self._session.get(
f"{self._url}/api/shared-links",
headers=self._headers,
) as response:
if response.status == 200:
data = await response.json()
for link in data:
album = link.get("album")
key = link.get("key")
if album and key and album.get("id") == album_id:
links.append(SharedLinkInfo.from_api_response(link))
except aiohttp.ClientError as err:
_LOGGER.warning("Failed to fetch shared links: %s", err)
return links
async def get_album(
self,
album_id: str,
users_cache: dict[str, str] | None = None,
) -> ImmichAlbumData | None:
try:
async with self._session.get(
f"{self._url}/api/albums/{album_id}",
headers=self._headers,
) as response:
if response.status == 404:
return None
if response.status != 200:
raise ImmichApiError(
f"Error fetching album {album_id}: HTTP {response.status}"
)
data = await response.json()
return ImmichAlbumData.from_api_response(data, users_cache)
except aiohttp.ClientError as err:
raise ImmichApiError(f"Error communicating with Immich: {err}") from err
async def get_albums(self) -> list[dict[str, Any]]:
try:
async with self._session.get(
f"{self._url}/api/albums",
headers=self._headers,
) as response:
if response.status == 200:
return await response.json()
_LOGGER.warning("Failed to fetch albums: HTTP %s", response.status)
except aiohttp.ClientError as err:
_LOGGER.warning("Failed to fetch albums: %s", err)
return []
async def create_shared_link(
self, album_id: str, password: str | None = None
) -> bool:
payload: dict[str, Any] = {
"albumId": album_id,
"type": "ALBUM",
"allowDownload": True,
"allowUpload": False,
"showMetadata": True,
}
if password:
payload["password"] = password
try:
async with self._session.post(
f"{self._url}/api/shared-links",
headers=self._json_headers,
json=payload,
) as response:
if response.status == 201:
return True
return False
except aiohttp.ClientError:
return False
async def delete_shared_link(self, link_id: str) -> bool:
try:
async with self._session.delete(
f"{self._url}/api/shared-links/{link_id}",
headers=self._headers,
) as response:
return response.status == 200
except aiohttp.ClientError:
return False
async def set_shared_link_password(
self, link_id: str, password: str | None
) -> bool:
payload = {"password": password if password else None}
try:
async with self._session.patch(
f"{self._url}/api/shared-links/{link_id}",
headers=self._json_headers,
json=payload,
) as response:
return response.status == 200
except aiohttp.ClientError:
return False
async def search_smart(
self,
query: str,
album_ids: list[str] | None = None,
limit: int = 10,
) -> list[dict[str, Any]]:
payload: dict[str, Any] = {"query": query, "page": 1, "size": limit}
try:
async with self._session.post(
f"{self._url}/api/search/smart",
headers=self._json_headers,
json=payload,
) as response:
if response.status == 200:
data = await response.json()
items = data.get("assets", {}).get("items", [])
if album_ids:
tracked = set(album_ids)
items = [
a for a in items
if any(
alb.get("id") in tracked
for alb in a.get("albums", [])
)
]
return items[:limit]
except aiohttp.ClientError:
pass
return []
async def get_asset_thumbnail(self, asset_id: str, size: str = "preview") -> bytes | None:
try:
async with self._session.get(
f"{self._url}/api/assets/{asset_id}/thumbnail",
headers=self._headers,
params={"size": size},
) as response:
if response.status == 200:
return await response.read()
except aiohttp.ClientError:
pass
return None
class ImmichApiError(Exception):
"""Raised when an Immich API call fails."""
@@ -0,0 +1,11 @@
"""Immich-specific constants."""
from typing import Final
# Asset types (Immich API values)
ASSET_TYPE_IMAGE: Final = "IMAGE"
ASSET_TYPE_VIDEO: Final = "VIDEO"
# Defaults
DEFAULT_SCAN_INTERVAL: Final = 60 # seconds
DEFAULT_SHARE_PASSWORD: Final = "immich123"
@@ -0,0 +1,199 @@
"""Immich-specific data models."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
from .constants import ASSET_TYPE_IMAGE, ASSET_TYPE_VIDEO
_LOGGER = logging.getLogger(__name__)
@dataclass
class SharedLinkInfo:
"""Shared link information from Immich."""
id: str
key: str
has_password: bool = False
password: str | None = None
expires_at: datetime | None = None
allow_download: bool = True
show_metadata: bool = True
@property
def is_expired(self) -> bool:
if self.expires_at is None:
return False
return datetime.now(self.expires_at.tzinfo) > self.expires_at
@property
def is_accessible(self) -> bool:
return not self.has_password and not self.is_expired
@classmethod
def from_api_response(cls, data: dict[str, Any]) -> SharedLinkInfo:
expires_at = None
if data.get("expiresAt"):
try:
expires_at = datetime.fromisoformat(
data["expiresAt"].replace("Z", "+00:00")
)
except ValueError:
pass
password = data.get("password")
return cls(
id=data["id"],
key=data["key"],
has_password=bool(password),
password=password if password else None,
expires_at=expires_at,
allow_download=data.get("allowDownload", True),
show_metadata=data.get("showMetadata", True),
)
@dataclass
class ImmichAssetInfo:
"""Immich asset with full detail from API."""
id: str
type: str # IMAGE or VIDEO
filename: str
created_at: str
owner_id: str = ""
owner_name: str = ""
description: str = ""
people: list[str] = field(default_factory=list)
is_favorite: bool = False
rating: int | None = None
latitude: float | None = None
longitude: float | None = None
city: str | None = None
state: str | None = None
country: str | None = None
is_processed: bool = True
thumbhash: str | None = None
@classmethod
def from_api_response(
cls, data: dict[str, Any], users_cache: dict[str, str] | None = None
) -> ImmichAssetInfo:
people = []
if "people" in data:
people = [p.get("name", "") for p in data["people"] if p.get("name")]
owner_id = data.get("ownerId", "")
owner_name = ""
if users_cache and owner_id:
owner_name = users_cache.get(owner_id, "")
description = data.get("description", "") or ""
exif_info = data.get("exifInfo")
if not description and exif_info:
description = exif_info.get("description", "") or ""
is_favorite = data.get("isFavorite", False)
rating = exif_info.get("rating") if exif_info else None
latitude = exif_info.get("latitude") if exif_info else None
longitude = exif_info.get("longitude") if exif_info else None
city = exif_info.get("city") if exif_info else None
state = exif_info.get("state") if exif_info else None
country = exif_info.get("country") if exif_info else None
asset_type = data.get("type", ASSET_TYPE_IMAGE)
is_processed = cls._check_processing_status(data)
thumbhash = data.get("thumbhash")
return cls(
id=data["id"],
type=asset_type,
filename=data.get("originalFileName", ""),
created_at=data.get("fileCreatedAt", ""),
owner_id=owner_id,
owner_name=owner_name,
description=description,
people=people,
is_favorite=is_favorite,
rating=rating,
latitude=latitude,
longitude=longitude,
city=city,
state=state,
country=country,
is_processed=is_processed,
thumbhash=thumbhash,
)
@staticmethod
def _check_processing_status(data: dict[str, Any]) -> bool:
is_offline = data.get("isOffline", False)
is_trashed = data.get("isTrashed", False)
is_archived = data.get("isArchived", False)
thumbhash = data.get("thumbhash")
if is_offline or is_trashed or is_archived:
return False
return bool(thumbhash)
@dataclass
class ImmichAlbumData:
"""Full album data from Immich API."""
id: str
name: str
asset_count: int
photo_count: int
video_count: int
created_at: str
updated_at: str
shared: bool
owner: str
thumbnail_asset_id: str | None
asset_ids: set[str] = field(default_factory=set)
assets: dict[str, ImmichAssetInfo] = field(default_factory=dict)
people: set[str] = field(default_factory=set)
@classmethod
def from_api_response(
cls, data: dict[str, Any], users_cache: dict[str, str] | None = None
) -> ImmichAlbumData:
assets_data = data.get("assets", [])
asset_ids = set()
assets = {}
people = set()
photo_count = 0
video_count = 0
for asset_data in assets_data:
asset = ImmichAssetInfo.from_api_response(asset_data, users_cache)
asset_ids.add(asset.id)
assets[asset.id] = asset
people.update(asset.people)
if asset.type == ASSET_TYPE_IMAGE:
photo_count += 1
elif asset.type == ASSET_TYPE_VIDEO:
video_count += 1
return cls(
id=data["id"],
name=data.get("albumName", "Unnamed"),
asset_count=data.get("assetCount", len(asset_ids)),
photo_count=photo_count,
video_count=video_count,
created_at=data.get("createdAt", ""),
updated_at=data.get("updatedAt", ""),
shared=data.get("shared", False),
owner=data.get("owner", {}).get("name", "Unknown"),
thumbnail_asset_id=data.get("albumThumbnailAssetId"),
asset_ids=asset_ids,
assets=assets,
people=people,
)
@@ -0,0 +1,269 @@
"""Immich service provider — concrete implementation of ServiceProvider."""
from __future__ import annotations
import logging
from typing import Any
import aiohttp
from notify_bridge_core.models.events import ServiceEvent
from notify_bridge_core.providers.base import ServiceProvider, ServiceProviderType
from notify_bridge_core.templates.variables import TemplateVariableDefinition
from .change_detector import detect_album_changes
from .client import ImmichClient
from .models import ImmichAlbumData
_LOGGER = logging.getLogger(__name__)
# Immich-specific template variables
IMMICH_VARIABLES: list[TemplateVariableDefinition] = [
TemplateVariableDefinition(
name="album_name",
type="string",
description="Album name (alias for collection_name)",
example="Vacation 2026",
provider_type=ServiceProviderType.IMMICH,
),
TemplateVariableDefinition(
name="album_id",
type="string",
description="Album ID (alias for collection_id)",
example="abc-123-def",
provider_type=ServiceProviderType.IMMICH,
),
TemplateVariableDefinition(
name="album_url",
type="string",
description="Web URL to the album",
example="https://photos.example.com/albums/abc-123",
provider_type=ServiceProviderType.IMMICH,
),
TemplateVariableDefinition(
name="people",
type="list",
description="Detected people names across all assets",
example='["Alice", "Bob"]',
provider_type=ServiceProviderType.IMMICH,
),
TemplateVariableDefinition(
name="shared",
type="bool",
description="Whether the album is shared",
example="true",
provider_type=ServiceProviderType.IMMICH,
),
TemplateVariableDefinition(
name="public_url",
type="string",
description="Public share link URL (if available)",
example="https://photos.example.com/share/abc123",
provider_type=ServiceProviderType.IMMICH,
),
TemplateVariableDefinition(
name="protected_url",
type="string",
description="Password-protected share link URL (if available)",
example="https://photos.example.com/share/def456",
provider_type=ServiceProviderType.IMMICH,
),
TemplateVariableDefinition(
name="old_album_name",
type="string",
description="Previous album name (for rename events)",
example="Old Name",
provider_type=ServiceProviderType.IMMICH,
),
TemplateVariableDefinition(
name="new_album_name",
type="string",
description="New album name (for rename events)",
example="New Name",
provider_type=ServiceProviderType.IMMICH,
),
TemplateVariableDefinition(
name="photo_count",
type="int",
description="Number of photos in album",
example="42",
provider_type=ServiceProviderType.IMMICH,
),
TemplateVariableDefinition(
name="video_count",
type="int",
description="Number of videos in album",
example="8",
provider_type=ServiceProviderType.IMMICH,
),
TemplateVariableDefinition(
name="owner",
type="string",
description="Album owner name",
example="John",
provider_type=ServiceProviderType.IMMICH,
),
]
class ImmichServiceProvider(ServiceProvider):
"""Immich photo server provider."""
provider_type = ServiceProviderType.IMMICH
def __init__(
self,
session: aiohttp.ClientSession,
url: str,
api_key: str,
external_domain: str | None = None,
name: str = "Immich",
) -> None:
self._client = ImmichClient(session, url, api_key)
self._name = name
self._external_domain = external_domain
self._users_cache: dict[str, str] = {}
@property
def client(self) -> ImmichClient:
return self._client
async def connect(self) -> bool:
ok = await self._client.ping()
if ok:
await self._client.get_server_config()
if self._external_domain:
self._client._external_domain = self._external_domain
self._users_cache = await self._client.get_users()
return ok
async def disconnect(self) -> None:
pass # session lifecycle managed by caller
async def poll(
self,
collection_ids: list[str],
tracker_state: dict[str, Any],
) -> tuple[list[ServiceEvent], dict[str, Any]]:
events: list[ServiceEvent] = []
new_state = dict(tracker_state)
external_url = self._client.external_url
for album_id in collection_ids:
album = await self._client.get_album(album_id, self._users_cache)
if album is None:
# Album deleted
if album_id in new_state:
from notify_bridge_core.models.events import EventType
from datetime import datetime, timezone
events.append(ServiceEvent(
event_type=EventType.COLLECTION_DELETED,
provider_type=ServiceProviderType.IMMICH,
provider_name=self._name,
collection_id=album_id,
collection_name=new_state.get(album_id, {}).get("name", "Unknown"),
timestamp=datetime.now(timezone.utc),
))
del new_state[album_id]
continue
# Get previous state
prev = new_state.get(album_id)
if prev is None:
# First time seeing this album — store state, no event
new_state[album_id] = _serialize_album_state(album)
continue
# Reconstruct previous album data for comparison
old_album = _deserialize_album_state(album_id, prev)
pending = set(prev.get("pending_asset_ids", []))
event, updated_pending = detect_album_changes(
old_album, album, pending, self._name, external_url
)
if event:
events.append(event)
# Update state
state = _serialize_album_state(album)
state["pending_asset_ids"] = list(updated_pending)
new_state[album_id] = state
return events, new_state
def get_available_variables(self) -> list[TemplateVariableDefinition]:
return list(IMMICH_VARIABLES)
def get_provider_config_schema(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "Immich server URL",
"example": "http://immich:2283",
},
"api_key": {
"type": "string",
"description": "Immich API key",
"secret": True,
},
"external_domain": {
"type": "string",
"description": "Public-facing domain (optional)",
"example": "https://photos.example.com",
},
},
"required": ["url", "api_key"],
}
async def list_collections(self) -> list[dict[str, Any]]:
albums = await self._client.get_albums()
return [
{
"id": a.get("id", ""),
"name": a.get("albumName", "Unnamed"),
"asset_count": a.get("assetCount", 0),
}
for a in albums
]
async def test_connection(self) -> dict[str, Any]:
ok = await self._client.ping()
if ok:
config = await self._client.get_server_config()
return {
"ok": True,
"message": "Connected to Immich",
"external_domain": config,
}
return {"ok": False, "message": "Failed to connect to Immich"}
def _serialize_album_state(album: ImmichAlbumData) -> dict[str, Any]:
"""Serialize album state for persistence."""
return {
"name": album.name,
"asset_ids": list(album.asset_ids),
"shared": album.shared,
"pending_asset_ids": [],
}
def _deserialize_album_state(album_id: str, state: dict[str, Any]) -> ImmichAlbumData:
"""Create a minimal ImmichAlbumData from saved state for comparison."""
return ImmichAlbumData(
id=album_id,
name=state.get("name", ""),
asset_count=len(state.get("asset_ids", [])),
photo_count=0,
video_count=0,
created_at="",
updated_at="",
shared=state.get("shared", False),
owner="",
thumbnail_asset_id=None,
asset_ids=set(state.get("asset_ids", [])),
)