feat(notify-bridge): phase 3 - Immich service provider
Implement ImmichServiceProvider as first concrete ServiceProvider: - ImmichClient: async API client (ping, albums, shared links, search, thumbnails) - ImmichAssetInfo/ImmichAlbumData: Immich-specific models with from_api_response() - Change detector: produces generic ServiceEvent from album diffs - Asset utils: filter, sort, URL building for Immich assets - 12 Immich-specific template variables registered in global VariableRegistry - Provider config schema (url, api_key, external_domain) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1 +1,17 @@
|
||||
"""Immich service provider implementation."""
|
||||
|
||||
from notify_bridge_core.providers.base import ServiceProviderType
|
||||
from notify_bridge_core.templates.variables import registry
|
||||
|
||||
from .client import ImmichClient, ImmichApiError
|
||||
from .provider import ImmichServiceProvider, IMMICH_VARIABLES
|
||||
|
||||
# Register Immich variables in the global registry
|
||||
registry.register_provider_variables(ServiceProviderType.IMMICH, IMMICH_VARIABLES)
|
||||
|
||||
__all__ = [
|
||||
"ImmichClient",
|
||||
"ImmichApiError",
|
||||
"ImmichServiceProvider",
|
||||
"IMMICH_VARIABLES",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,233 @@
|
||||
"""Immich asset filtering, sorting, and URL utilities."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import random
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from .constants import ASSET_TYPE_IMAGE, ASSET_TYPE_VIDEO
|
||||
from .models import ImmichAssetInfo, SharedLinkInfo
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def filter_assets(
|
||||
assets: list[ImmichAssetInfo],
|
||||
*,
|
||||
favorite_only: bool = False,
|
||||
min_rating: int = 1,
|
||||
asset_type: str = "all",
|
||||
min_date: str | None = None,
|
||||
max_date: str | None = None,
|
||||
memory_date: str | None = None,
|
||||
city: str | None = None,
|
||||
state: str | None = None,
|
||||
country: str | None = None,
|
||||
processed_only: bool = True,
|
||||
) -> list[ImmichAssetInfo]:
|
||||
"""Filter Immich assets by various criteria."""
|
||||
result = list(assets)
|
||||
|
||||
if processed_only:
|
||||
result = [a for a in result if a.is_processed]
|
||||
if favorite_only:
|
||||
result = [a for a in result if a.is_favorite]
|
||||
if min_rating > 1:
|
||||
result = [a for a in result if a.rating is not None and a.rating >= min_rating]
|
||||
if asset_type == "photo":
|
||||
result = [a for a in result if a.type == ASSET_TYPE_IMAGE]
|
||||
elif asset_type == "video":
|
||||
result = [a for a in result if a.type == ASSET_TYPE_VIDEO]
|
||||
if min_date:
|
||||
result = [a for a in result if a.created_at >= min_date]
|
||||
if max_date:
|
||||
result = [a for a in result if a.created_at <= max_date]
|
||||
|
||||
if memory_date:
|
||||
try:
|
||||
ref_date = datetime.fromisoformat(memory_date.replace("Z", "+00:00"))
|
||||
ref_year, ref_month, ref_day = ref_date.year, ref_date.month, ref_date.day
|
||||
|
||||
def matches_memory(asset: ImmichAssetInfo) -> bool:
|
||||
try:
|
||||
asset_date = datetime.fromisoformat(
|
||||
asset.created_at.replace("Z", "+00:00")
|
||||
)
|
||||
return (
|
||||
asset_date.month == ref_month
|
||||
and asset_date.day == ref_day
|
||||
and asset_date.year != ref_year
|
||||
)
|
||||
except (ValueError, AttributeError):
|
||||
return False
|
||||
|
||||
result = [a for a in result if matches_memory(a)]
|
||||
except ValueError:
|
||||
_LOGGER.warning("Invalid memory_date format: %s", memory_date)
|
||||
|
||||
if city:
|
||||
city_lower = city.lower()
|
||||
result = [a for a in result if a.city and city_lower in a.city.lower()]
|
||||
if state:
|
||||
state_lower = state.lower()
|
||||
result = [a for a in result if a.state and state_lower in a.state.lower()]
|
||||
if country:
|
||||
country_lower = country.lower()
|
||||
result = [a for a in result if a.country and country_lower in a.country.lower()]
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def sort_assets(
|
||||
assets: list[ImmichAssetInfo],
|
||||
order_by: str = "date",
|
||||
order: str = "descending",
|
||||
) -> list[ImmichAssetInfo]:
|
||||
"""Sort Immich assets by the specified field."""
|
||||
result = list(assets)
|
||||
|
||||
if order_by == "random":
|
||||
random.shuffle(result)
|
||||
elif order_by == "rating":
|
||||
result = sorted(
|
||||
result,
|
||||
key=lambda a: (a.rating is None, a.rating if a.rating is not None else 0),
|
||||
reverse=(order == "descending"),
|
||||
)
|
||||
elif order_by == "name":
|
||||
result = sorted(
|
||||
result,
|
||||
key=lambda a: a.filename.lower(),
|
||||
reverse=(order == "descending"),
|
||||
)
|
||||
else: # date
|
||||
result = sorted(
|
||||
result,
|
||||
key=lambda a: a.created_at,
|
||||
reverse=(order == "descending"),
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# --- Shared link URL helpers ---
|
||||
|
||||
|
||||
def get_accessible_links(links: list[SharedLinkInfo]) -> list[SharedLinkInfo]:
|
||||
return [link for link in links if link.is_accessible]
|
||||
|
||||
|
||||
def get_protected_links(links: list[SharedLinkInfo]) -> list[SharedLinkInfo]:
|
||||
return [link for link in links if link.has_password and not link.is_expired]
|
||||
|
||||
|
||||
def get_public_url(external_url: str, links: list[SharedLinkInfo]) -> str | None:
|
||||
accessible = get_accessible_links(links)
|
||||
if accessible:
|
||||
return f"{external_url}/share/{accessible[0].key}"
|
||||
return None
|
||||
|
||||
|
||||
def get_any_url(external_url: str, links: list[SharedLinkInfo]) -> str | None:
|
||||
accessible = get_accessible_links(links)
|
||||
if accessible:
|
||||
return f"{external_url}/share/{accessible[0].key}"
|
||||
non_expired = [link for link in links if not link.is_expired]
|
||||
if non_expired:
|
||||
return f"{external_url}/share/{non_expired[0].key}"
|
||||
return None
|
||||
|
||||
|
||||
def get_protected_url(external_url: str, links: list[SharedLinkInfo]) -> str | None:
|
||||
protected = get_protected_links(links)
|
||||
if protected:
|
||||
return f"{external_url}/share/{protected[0].key}"
|
||||
return None
|
||||
|
||||
|
||||
def get_protected_password(links: list[SharedLinkInfo]) -> str | None:
|
||||
protected = get_protected_links(links)
|
||||
if protected and protected[0].password:
|
||||
return protected[0].password
|
||||
return None
|
||||
|
||||
|
||||
def _get_best_link_key(links: list[SharedLinkInfo]) -> str | None:
|
||||
accessible = get_accessible_links(links)
|
||||
if accessible:
|
||||
return accessible[0].key
|
||||
non_expired = [link for link in links if not link.is_expired]
|
||||
if non_expired:
|
||||
return non_expired[0].key
|
||||
return None
|
||||
|
||||
|
||||
def get_asset_download_url(
|
||||
external_url: str, links: list[SharedLinkInfo], asset_id: str
|
||||
) -> str | None:
|
||||
key = _get_best_link_key(links)
|
||||
if key:
|
||||
return f"{external_url}/api/assets/{asset_id}/original?key={key}"
|
||||
return None
|
||||
|
||||
|
||||
def get_asset_photo_url(
|
||||
external_url: str, links: list[SharedLinkInfo], asset_id: str
|
||||
) -> str | None:
|
||||
key = _get_best_link_key(links)
|
||||
if key:
|
||||
return f"{external_url}/api/assets/{asset_id}/thumbnail?size=preview&key={key}"
|
||||
return None
|
||||
|
||||
|
||||
def get_asset_video_url(
|
||||
external_url: str, links: list[SharedLinkInfo], asset_id: str
|
||||
) -> str | None:
|
||||
key = _get_best_link_key(links)
|
||||
if key:
|
||||
return f"{external_url}/api/assets/{asset_id}/video/playback?key={key}"
|
||||
return None
|
||||
|
||||
|
||||
def build_asset_detail(
|
||||
asset: ImmichAssetInfo,
|
||||
external_url: str,
|
||||
shared_links: list[SharedLinkInfo],
|
||||
) -> dict[str, Any]:
|
||||
"""Build asset detail dictionary with all available data."""
|
||||
detail: dict[str, Any] = {
|
||||
"id": asset.id,
|
||||
"type": asset.type,
|
||||
"filename": asset.filename,
|
||||
"created_at": asset.created_at,
|
||||
"owner": asset.owner_name,
|
||||
"owner_id": asset.owner_id,
|
||||
"description": asset.description,
|
||||
"people": asset.people,
|
||||
"is_favorite": asset.is_favorite,
|
||||
"rating": asset.rating,
|
||||
"latitude": asset.latitude,
|
||||
"longitude": asset.longitude,
|
||||
"city": asset.city,
|
||||
"state": asset.state,
|
||||
"country": asset.country,
|
||||
"thumbnail_url": f"{external_url}/api/assets/{asset.id}/thumbnail",
|
||||
}
|
||||
|
||||
key = _get_best_link_key(shared_links)
|
||||
if key:
|
||||
detail["url"] = f"{external_url}/share/{key}/photos/{asset.id}"
|
||||
detail["download_url"] = f"{external_url}/api/assets/{asset.id}/original?key={key}"
|
||||
|
||||
if asset.type == ASSET_TYPE_VIDEO:
|
||||
detail["playback_url"] = (
|
||||
f"{external_url}/api/assets/{asset.id}/video/playback?key={key}"
|
||||
)
|
||||
elif asset.type == ASSET_TYPE_IMAGE:
|
||||
detail["photo_url"] = (
|
||||
f"{external_url}/api/assets/{asset.id}/thumbnail?size=preview&key={key}"
|
||||
)
|
||||
|
||||
return detail
|
||||
@@ -0,0 +1,143 @@
|
||||
"""Immich album change detection — produces generic ServiceEvent objects."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from notify_bridge_core.models.events import EventType, ServiceEvent
|
||||
from notify_bridge_core.models.media import MediaAsset, MediaType
|
||||
from notify_bridge_core.providers.base import ServiceProviderType
|
||||
|
||||
from .models import ImmichAlbumData, ImmichAssetInfo
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _asset_to_media(asset: ImmichAssetInfo, external_url: str) -> MediaAsset:
|
||||
"""Convert ImmichAssetInfo to generic MediaAsset."""
|
||||
media_type = MediaType.IMAGE if asset.type == "IMAGE" else MediaType.VIDEO
|
||||
|
||||
try:
|
||||
created_at = datetime.fromisoformat(asset.created_at.replace("Z", "+00:00"))
|
||||
except (ValueError, AttributeError):
|
||||
created_at = datetime.now(timezone.utc)
|
||||
|
||||
return MediaAsset(
|
||||
id=asset.id,
|
||||
type=media_type,
|
||||
filename=asset.filename,
|
||||
created_at=created_at,
|
||||
owner_name=asset.owner_name or None,
|
||||
description=asset.description or None,
|
||||
tags=list(asset.people),
|
||||
thumbnail_url=f"{external_url}/api/assets/{asset.id}/thumbnail",
|
||||
full_url=f"{external_url}/api/assets/{asset.id}/original",
|
||||
extra={
|
||||
"owner_id": asset.owner_id,
|
||||
"is_favorite": asset.is_favorite,
|
||||
"rating": asset.rating,
|
||||
"latitude": asset.latitude,
|
||||
"longitude": asset.longitude,
|
||||
"city": asset.city,
|
||||
"state": asset.state,
|
||||
"country": asset.country,
|
||||
"thumbhash": asset.thumbhash,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def detect_album_changes(
|
||||
old_album: ImmichAlbumData,
|
||||
new_album: ImmichAlbumData,
|
||||
pending_asset_ids: set[str],
|
||||
provider_name: str,
|
||||
external_url: str,
|
||||
) -> tuple[ServiceEvent | None, set[str]]:
|
||||
"""Detect changes between two album states, producing a generic ServiceEvent.
|
||||
|
||||
Args:
|
||||
old_album: Previous album data
|
||||
new_album: Current album data
|
||||
pending_asset_ids: Assets detected but not yet fully processed
|
||||
provider_name: Name of the provider instance
|
||||
external_url: External URL for building asset URLs
|
||||
|
||||
Returns:
|
||||
Tuple of (ServiceEvent or None, updated pending_asset_ids)
|
||||
"""
|
||||
added_ids = new_album.asset_ids - old_album.asset_ids
|
||||
removed_ids = old_album.asset_ids - new_album.asset_ids
|
||||
|
||||
pending = set(pending_asset_ids)
|
||||
|
||||
# Collect newly added processed assets
|
||||
added_assets: list[ImmichAssetInfo] = []
|
||||
for aid in added_ids:
|
||||
if aid not in new_album.assets:
|
||||
continue
|
||||
asset = new_album.assets[aid]
|
||||
if asset.is_processed:
|
||||
added_assets.append(asset)
|
||||
else:
|
||||
pending.add(aid)
|
||||
|
||||
# Check if pending assets are now processed
|
||||
for aid in list(pending):
|
||||
if aid not in new_album.assets:
|
||||
pending.discard(aid)
|
||||
continue
|
||||
asset = new_album.assets[aid]
|
||||
if asset.is_processed:
|
||||
added_assets.append(asset)
|
||||
pending.discard(aid)
|
||||
|
||||
# Detect metadata changes
|
||||
name_changed = old_album.name != new_album.name
|
||||
sharing_changed = old_album.shared != new_album.shared
|
||||
|
||||
if not added_assets and not removed_ids and not name_changed and not sharing_changed:
|
||||
return None, pending
|
||||
|
||||
# Determine event type
|
||||
if name_changed and not added_assets and not removed_ids and not sharing_changed:
|
||||
event_type = EventType.COLLECTION_RENAMED
|
||||
elif sharing_changed and not added_assets and not removed_ids and not name_changed:
|
||||
event_type = EventType.SHARING_CHANGED
|
||||
elif added_assets and not removed_ids and not name_changed and not sharing_changed:
|
||||
event_type = EventType.ASSETS_ADDED
|
||||
elif removed_ids and not added_assets and not name_changed and not sharing_changed:
|
||||
event_type = EventType.ASSETS_REMOVED
|
||||
else:
|
||||
event_type = EventType.ASSETS_ADDED # default for mixed changes
|
||||
|
||||
# Convert to generic MediaAssets
|
||||
media_assets = [_asset_to_media(a, external_url) for a in added_assets]
|
||||
|
||||
event = ServiceEvent(
|
||||
event_type=event_type,
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
provider_name=provider_name,
|
||||
collection_id=new_album.id,
|
||||
collection_name=new_album.name,
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
added_assets=media_assets,
|
||||
removed_asset_ids=list(removed_ids),
|
||||
added_count=len(added_assets),
|
||||
removed_count=len(removed_ids),
|
||||
old_name=old_album.name if name_changed else None,
|
||||
new_name=new_album.name if name_changed else None,
|
||||
old_shared=old_album.shared if sharing_changed else None,
|
||||
new_shared=new_album.shared if sharing_changed else None,
|
||||
extra={
|
||||
"album_url": f"{external_url}/albums/{new_album.id}",
|
||||
"people": list(new_album.people),
|
||||
"shared": new_album.shared,
|
||||
"photo_count": new_album.photo_count,
|
||||
"video_count": new_album.video_count,
|
||||
"asset_count": new_album.asset_count,
|
||||
"owner": new_album.owner,
|
||||
},
|
||||
)
|
||||
|
||||
return event, pending
|
||||
@@ -0,0 +1,270 @@
|
||||
"""Async Immich API client."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
|
||||
from .models import ImmichAlbumData, SharedLinkInfo
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ImmichClient:
|
||||
"""Async client for the Immich API."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
session: aiohttp.ClientSession,
|
||||
url: str,
|
||||
api_key: str,
|
||||
) -> None:
|
||||
self._session = session
|
||||
self._url = url.rstrip("/")
|
||||
self._api_key = api_key
|
||||
self._external_domain: str | None = None
|
||||
|
||||
@property
|
||||
def url(self) -> str:
|
||||
return self._url
|
||||
|
||||
@property
|
||||
def external_url(self) -> str:
|
||||
if self._external_domain:
|
||||
return self._external_domain.rstrip("/")
|
||||
return self._url
|
||||
|
||||
@property
|
||||
def api_key(self) -> str:
|
||||
return self._api_key
|
||||
|
||||
def get_internal_download_url(self, url: str) -> str:
|
||||
if self._external_domain:
|
||||
external = self._external_domain.rstrip("/")
|
||||
if url.startswith(external):
|
||||
return url.replace(external, self._url, 1)
|
||||
return url
|
||||
|
||||
@property
|
||||
def _headers(self) -> dict[str, str]:
|
||||
return {"x-api-key": self._api_key}
|
||||
|
||||
@property
|
||||
def _json_headers(self) -> dict[str, str]:
|
||||
return {
|
||||
"x-api-key": self._api_key,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async def ping(self) -> bool:
|
||||
try:
|
||||
async with self._session.get(
|
||||
f"{self._url}/api/server/ping",
|
||||
headers=self._headers,
|
||||
) as response:
|
||||
return response.status == 200
|
||||
except aiohttp.ClientError:
|
||||
return False
|
||||
|
||||
async def get_server_config(self) -> str | None:
|
||||
try:
|
||||
async with self._session.get(
|
||||
f"{self._url}/api/server/config",
|
||||
headers=self._headers,
|
||||
) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
external_domain = data.get("externalDomain", "") or ""
|
||||
self._external_domain = external_domain
|
||||
return external_domain or None
|
||||
_LOGGER.warning(
|
||||
"Failed to fetch server config: HTTP %s", response.status
|
||||
)
|
||||
except aiohttp.ClientError as err:
|
||||
_LOGGER.warning("Failed to fetch server config: %s", err)
|
||||
return None
|
||||
|
||||
async def get_users(self) -> dict[str, str]:
|
||||
try:
|
||||
async with self._session.get(
|
||||
f"{self._url}/api/users",
|
||||
headers=self._headers,
|
||||
) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
return {
|
||||
u["id"]: u.get("name", u.get("email", "Unknown"))
|
||||
for u in data
|
||||
if u.get("id")
|
||||
}
|
||||
except aiohttp.ClientError as err:
|
||||
_LOGGER.warning("Failed to fetch users: %s", err)
|
||||
return {}
|
||||
|
||||
async def get_people(self) -> dict[str, str]:
|
||||
try:
|
||||
async with self._session.get(
|
||||
f"{self._url}/api/people",
|
||||
headers=self._headers,
|
||||
) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
people_list = data.get("people", data) if isinstance(data, dict) else data
|
||||
return {
|
||||
p["id"]: p.get("name", "")
|
||||
for p in people_list
|
||||
if p.get("name")
|
||||
}
|
||||
except aiohttp.ClientError as err:
|
||||
_LOGGER.warning("Failed to fetch people: %s", err)
|
||||
return {}
|
||||
|
||||
async def get_shared_links(self, album_id: str) -> list[SharedLinkInfo]:
|
||||
links: list[SharedLinkInfo] = []
|
||||
try:
|
||||
async with self._session.get(
|
||||
f"{self._url}/api/shared-links",
|
||||
headers=self._headers,
|
||||
) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
for link in data:
|
||||
album = link.get("album")
|
||||
key = link.get("key")
|
||||
if album and key and album.get("id") == album_id:
|
||||
links.append(SharedLinkInfo.from_api_response(link))
|
||||
except aiohttp.ClientError as err:
|
||||
_LOGGER.warning("Failed to fetch shared links: %s", err)
|
||||
return links
|
||||
|
||||
async def get_album(
|
||||
self,
|
||||
album_id: str,
|
||||
users_cache: dict[str, str] | None = None,
|
||||
) -> ImmichAlbumData | None:
|
||||
try:
|
||||
async with self._session.get(
|
||||
f"{self._url}/api/albums/{album_id}",
|
||||
headers=self._headers,
|
||||
) as response:
|
||||
if response.status == 404:
|
||||
return None
|
||||
if response.status != 200:
|
||||
raise ImmichApiError(
|
||||
f"Error fetching album {album_id}: HTTP {response.status}"
|
||||
)
|
||||
data = await response.json()
|
||||
return ImmichAlbumData.from_api_response(data, users_cache)
|
||||
except aiohttp.ClientError as err:
|
||||
raise ImmichApiError(f"Error communicating with Immich: {err}") from err
|
||||
|
||||
async def get_albums(self) -> list[dict[str, Any]]:
|
||||
try:
|
||||
async with self._session.get(
|
||||
f"{self._url}/api/albums",
|
||||
headers=self._headers,
|
||||
) as response:
|
||||
if response.status == 200:
|
||||
return await response.json()
|
||||
_LOGGER.warning("Failed to fetch albums: HTTP %s", response.status)
|
||||
except aiohttp.ClientError as err:
|
||||
_LOGGER.warning("Failed to fetch albums: %s", err)
|
||||
return []
|
||||
|
||||
async def create_shared_link(
|
||||
self, album_id: str, password: str | None = None
|
||||
) -> bool:
|
||||
payload: dict[str, Any] = {
|
||||
"albumId": album_id,
|
||||
"type": "ALBUM",
|
||||
"allowDownload": True,
|
||||
"allowUpload": False,
|
||||
"showMetadata": True,
|
||||
}
|
||||
if password:
|
||||
payload["password"] = password
|
||||
|
||||
try:
|
||||
async with self._session.post(
|
||||
f"{self._url}/api/shared-links",
|
||||
headers=self._json_headers,
|
||||
json=payload,
|
||||
) as response:
|
||||
if response.status == 201:
|
||||
return True
|
||||
return False
|
||||
except aiohttp.ClientError:
|
||||
return False
|
||||
|
||||
async def delete_shared_link(self, link_id: str) -> bool:
|
||||
try:
|
||||
async with self._session.delete(
|
||||
f"{self._url}/api/shared-links/{link_id}",
|
||||
headers=self._headers,
|
||||
) as response:
|
||||
return response.status == 200
|
||||
except aiohttp.ClientError:
|
||||
return False
|
||||
|
||||
async def set_shared_link_password(
|
||||
self, link_id: str, password: str | None
|
||||
) -> bool:
|
||||
payload = {"password": password if password else None}
|
||||
try:
|
||||
async with self._session.patch(
|
||||
f"{self._url}/api/shared-links/{link_id}",
|
||||
headers=self._json_headers,
|
||||
json=payload,
|
||||
) as response:
|
||||
return response.status == 200
|
||||
except aiohttp.ClientError:
|
||||
return False
|
||||
|
||||
async def search_smart(
|
||||
self,
|
||||
query: str,
|
||||
album_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
) -> list[dict[str, Any]]:
|
||||
payload: dict[str, Any] = {"query": query, "page": 1, "size": limit}
|
||||
try:
|
||||
async with self._session.post(
|
||||
f"{self._url}/api/search/smart",
|
||||
headers=self._json_headers,
|
||||
json=payload,
|
||||
) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
items = data.get("assets", {}).get("items", [])
|
||||
if album_ids:
|
||||
tracked = set(album_ids)
|
||||
items = [
|
||||
a for a in items
|
||||
if any(
|
||||
alb.get("id") in tracked
|
||||
for alb in a.get("albums", [])
|
||||
)
|
||||
]
|
||||
return items[:limit]
|
||||
except aiohttp.ClientError:
|
||||
pass
|
||||
return []
|
||||
|
||||
async def get_asset_thumbnail(self, asset_id: str, size: str = "preview") -> bytes | None:
|
||||
try:
|
||||
async with self._session.get(
|
||||
f"{self._url}/api/assets/{asset_id}/thumbnail",
|
||||
headers=self._headers,
|
||||
params={"size": size},
|
||||
) as response:
|
||||
if response.status == 200:
|
||||
return await response.read()
|
||||
except aiohttp.ClientError:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
class ImmichApiError(Exception):
|
||||
"""Raised when an Immich API call fails."""
|
||||
@@ -0,0 +1,11 @@
|
||||
"""Immich-specific constants."""
|
||||
|
||||
from typing import Final
|
||||
|
||||
# Asset types (Immich API values)
|
||||
ASSET_TYPE_IMAGE: Final = "IMAGE"
|
||||
ASSET_TYPE_VIDEO: Final = "VIDEO"
|
||||
|
||||
# Defaults
|
||||
DEFAULT_SCAN_INTERVAL: Final = 60 # seconds
|
||||
DEFAULT_SHARE_PASSWORD: Final = "immich123"
|
||||
@@ -0,0 +1,199 @@
|
||||
"""Immich-specific data models."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from .constants import ASSET_TYPE_IMAGE, ASSET_TYPE_VIDEO
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SharedLinkInfo:
|
||||
"""Shared link information from Immich."""
|
||||
|
||||
id: str
|
||||
key: str
|
||||
has_password: bool = False
|
||||
password: str | None = None
|
||||
expires_at: datetime | None = None
|
||||
allow_download: bool = True
|
||||
show_metadata: bool = True
|
||||
|
||||
@property
|
||||
def is_expired(self) -> bool:
|
||||
if self.expires_at is None:
|
||||
return False
|
||||
return datetime.now(self.expires_at.tzinfo) > self.expires_at
|
||||
|
||||
@property
|
||||
def is_accessible(self) -> bool:
|
||||
return not self.has_password and not self.is_expired
|
||||
|
||||
@classmethod
|
||||
def from_api_response(cls, data: dict[str, Any]) -> SharedLinkInfo:
|
||||
expires_at = None
|
||||
if data.get("expiresAt"):
|
||||
try:
|
||||
expires_at = datetime.fromisoformat(
|
||||
data["expiresAt"].replace("Z", "+00:00")
|
||||
)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
password = data.get("password")
|
||||
return cls(
|
||||
id=data["id"],
|
||||
key=data["key"],
|
||||
has_password=bool(password),
|
||||
password=password if password else None,
|
||||
expires_at=expires_at,
|
||||
allow_download=data.get("allowDownload", True),
|
||||
show_metadata=data.get("showMetadata", True),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImmichAssetInfo:
|
||||
"""Immich asset with full detail from API."""
|
||||
|
||||
id: str
|
||||
type: str # IMAGE or VIDEO
|
||||
filename: str
|
||||
created_at: str
|
||||
owner_id: str = ""
|
||||
owner_name: str = ""
|
||||
description: str = ""
|
||||
people: list[str] = field(default_factory=list)
|
||||
is_favorite: bool = False
|
||||
rating: int | None = None
|
||||
latitude: float | None = None
|
||||
longitude: float | None = None
|
||||
city: str | None = None
|
||||
state: str | None = None
|
||||
country: str | None = None
|
||||
is_processed: bool = True
|
||||
thumbhash: str | None = None
|
||||
|
||||
@classmethod
|
||||
def from_api_response(
|
||||
cls, data: dict[str, Any], users_cache: dict[str, str] | None = None
|
||||
) -> ImmichAssetInfo:
|
||||
people = []
|
||||
if "people" in data:
|
||||
people = [p.get("name", "") for p in data["people"] if p.get("name")]
|
||||
|
||||
owner_id = data.get("ownerId", "")
|
||||
owner_name = ""
|
||||
if users_cache and owner_id:
|
||||
owner_name = users_cache.get(owner_id, "")
|
||||
|
||||
description = data.get("description", "") or ""
|
||||
exif_info = data.get("exifInfo")
|
||||
if not description and exif_info:
|
||||
description = exif_info.get("description", "") or ""
|
||||
|
||||
is_favorite = data.get("isFavorite", False)
|
||||
rating = exif_info.get("rating") if exif_info else None
|
||||
|
||||
latitude = exif_info.get("latitude") if exif_info else None
|
||||
longitude = exif_info.get("longitude") if exif_info else None
|
||||
city = exif_info.get("city") if exif_info else None
|
||||
state = exif_info.get("state") if exif_info else None
|
||||
country = exif_info.get("country") if exif_info else None
|
||||
|
||||
asset_type = data.get("type", ASSET_TYPE_IMAGE)
|
||||
is_processed = cls._check_processing_status(data)
|
||||
thumbhash = data.get("thumbhash")
|
||||
|
||||
return cls(
|
||||
id=data["id"],
|
||||
type=asset_type,
|
||||
filename=data.get("originalFileName", ""),
|
||||
created_at=data.get("fileCreatedAt", ""),
|
||||
owner_id=owner_id,
|
||||
owner_name=owner_name,
|
||||
description=description,
|
||||
people=people,
|
||||
is_favorite=is_favorite,
|
||||
rating=rating,
|
||||
latitude=latitude,
|
||||
longitude=longitude,
|
||||
city=city,
|
||||
state=state,
|
||||
country=country,
|
||||
is_processed=is_processed,
|
||||
thumbhash=thumbhash,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _check_processing_status(data: dict[str, Any]) -> bool:
|
||||
is_offline = data.get("isOffline", False)
|
||||
is_trashed = data.get("isTrashed", False)
|
||||
is_archived = data.get("isArchived", False)
|
||||
thumbhash = data.get("thumbhash")
|
||||
|
||||
if is_offline or is_trashed or is_archived:
|
||||
return False
|
||||
|
||||
return bool(thumbhash)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImmichAlbumData:
|
||||
"""Full album data from Immich API."""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
asset_count: int
|
||||
photo_count: int
|
||||
video_count: int
|
||||
created_at: str
|
||||
updated_at: str
|
||||
shared: bool
|
||||
owner: str
|
||||
thumbnail_asset_id: str | None
|
||||
asset_ids: set[str] = field(default_factory=set)
|
||||
assets: dict[str, ImmichAssetInfo] = field(default_factory=dict)
|
||||
people: set[str] = field(default_factory=set)
|
||||
|
||||
@classmethod
|
||||
def from_api_response(
|
||||
cls, data: dict[str, Any], users_cache: dict[str, str] | None = None
|
||||
) -> ImmichAlbumData:
|
||||
assets_data = data.get("assets", [])
|
||||
asset_ids = set()
|
||||
assets = {}
|
||||
people = set()
|
||||
photo_count = 0
|
||||
video_count = 0
|
||||
|
||||
for asset_data in assets_data:
|
||||
asset = ImmichAssetInfo.from_api_response(asset_data, users_cache)
|
||||
asset_ids.add(asset.id)
|
||||
assets[asset.id] = asset
|
||||
people.update(asset.people)
|
||||
if asset.type == ASSET_TYPE_IMAGE:
|
||||
photo_count += 1
|
||||
elif asset.type == ASSET_TYPE_VIDEO:
|
||||
video_count += 1
|
||||
|
||||
return cls(
|
||||
id=data["id"],
|
||||
name=data.get("albumName", "Unnamed"),
|
||||
asset_count=data.get("assetCount", len(asset_ids)),
|
||||
photo_count=photo_count,
|
||||
video_count=video_count,
|
||||
created_at=data.get("createdAt", ""),
|
||||
updated_at=data.get("updatedAt", ""),
|
||||
shared=data.get("shared", False),
|
||||
owner=data.get("owner", {}).get("name", "Unknown"),
|
||||
thumbnail_asset_id=data.get("albumThumbnailAssetId"),
|
||||
asset_ids=asset_ids,
|
||||
assets=assets,
|
||||
people=people,
|
||||
)
|
||||
@@ -0,0 +1,269 @@
|
||||
"""Immich service provider — concrete implementation of ServiceProvider."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
|
||||
from notify_bridge_core.models.events import ServiceEvent
|
||||
from notify_bridge_core.providers.base import ServiceProvider, ServiceProviderType
|
||||
from notify_bridge_core.templates.variables import TemplateVariableDefinition
|
||||
|
||||
from .change_detector import detect_album_changes
|
||||
from .client import ImmichClient
|
||||
from .models import ImmichAlbumData
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Immich-specific template variables
|
||||
IMMICH_VARIABLES: list[TemplateVariableDefinition] = [
|
||||
TemplateVariableDefinition(
|
||||
name="album_name",
|
||||
type="string",
|
||||
description="Album name (alias for collection_name)",
|
||||
example="Vacation 2026",
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
),
|
||||
TemplateVariableDefinition(
|
||||
name="album_id",
|
||||
type="string",
|
||||
description="Album ID (alias for collection_id)",
|
||||
example="abc-123-def",
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
),
|
||||
TemplateVariableDefinition(
|
||||
name="album_url",
|
||||
type="string",
|
||||
description="Web URL to the album",
|
||||
example="https://photos.example.com/albums/abc-123",
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
),
|
||||
TemplateVariableDefinition(
|
||||
name="people",
|
||||
type="list",
|
||||
description="Detected people names across all assets",
|
||||
example='["Alice", "Bob"]',
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
),
|
||||
TemplateVariableDefinition(
|
||||
name="shared",
|
||||
type="bool",
|
||||
description="Whether the album is shared",
|
||||
example="true",
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
),
|
||||
TemplateVariableDefinition(
|
||||
name="public_url",
|
||||
type="string",
|
||||
description="Public share link URL (if available)",
|
||||
example="https://photos.example.com/share/abc123",
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
),
|
||||
TemplateVariableDefinition(
|
||||
name="protected_url",
|
||||
type="string",
|
||||
description="Password-protected share link URL (if available)",
|
||||
example="https://photos.example.com/share/def456",
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
),
|
||||
TemplateVariableDefinition(
|
||||
name="old_album_name",
|
||||
type="string",
|
||||
description="Previous album name (for rename events)",
|
||||
example="Old Name",
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
),
|
||||
TemplateVariableDefinition(
|
||||
name="new_album_name",
|
||||
type="string",
|
||||
description="New album name (for rename events)",
|
||||
example="New Name",
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
),
|
||||
TemplateVariableDefinition(
|
||||
name="photo_count",
|
||||
type="int",
|
||||
description="Number of photos in album",
|
||||
example="42",
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
),
|
||||
TemplateVariableDefinition(
|
||||
name="video_count",
|
||||
type="int",
|
||||
description="Number of videos in album",
|
||||
example="8",
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
),
|
||||
TemplateVariableDefinition(
|
||||
name="owner",
|
||||
type="string",
|
||||
description="Album owner name",
|
||||
example="John",
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
class ImmichServiceProvider(ServiceProvider):
|
||||
"""Immich photo server provider."""
|
||||
|
||||
provider_type = ServiceProviderType.IMMICH
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
session: aiohttp.ClientSession,
|
||||
url: str,
|
||||
api_key: str,
|
||||
external_domain: str | None = None,
|
||||
name: str = "Immich",
|
||||
) -> None:
|
||||
self._client = ImmichClient(session, url, api_key)
|
||||
self._name = name
|
||||
self._external_domain = external_domain
|
||||
self._users_cache: dict[str, str] = {}
|
||||
|
||||
@property
|
||||
def client(self) -> ImmichClient:
|
||||
return self._client
|
||||
|
||||
async def connect(self) -> bool:
|
||||
ok = await self._client.ping()
|
||||
if ok:
|
||||
await self._client.get_server_config()
|
||||
if self._external_domain:
|
||||
self._client._external_domain = self._external_domain
|
||||
self._users_cache = await self._client.get_users()
|
||||
return ok
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
pass # session lifecycle managed by caller
|
||||
|
||||
async def poll(
|
||||
self,
|
||||
collection_ids: list[str],
|
||||
tracker_state: dict[str, Any],
|
||||
) -> tuple[list[ServiceEvent], dict[str, Any]]:
|
||||
events: list[ServiceEvent] = []
|
||||
new_state = dict(tracker_state)
|
||||
external_url = self._client.external_url
|
||||
|
||||
for album_id in collection_ids:
|
||||
album = await self._client.get_album(album_id, self._users_cache)
|
||||
if album is None:
|
||||
# Album deleted
|
||||
if album_id in new_state:
|
||||
from notify_bridge_core.models.events import EventType
|
||||
from datetime import datetime, timezone
|
||||
events.append(ServiceEvent(
|
||||
event_type=EventType.COLLECTION_DELETED,
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
provider_name=self._name,
|
||||
collection_id=album_id,
|
||||
collection_name=new_state.get(album_id, {}).get("name", "Unknown"),
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
))
|
||||
del new_state[album_id]
|
||||
continue
|
||||
|
||||
# Get previous state
|
||||
prev = new_state.get(album_id)
|
||||
if prev is None:
|
||||
# First time seeing this album — store state, no event
|
||||
new_state[album_id] = _serialize_album_state(album)
|
||||
continue
|
||||
|
||||
# Reconstruct previous album data for comparison
|
||||
old_album = _deserialize_album_state(album_id, prev)
|
||||
pending = set(prev.get("pending_asset_ids", []))
|
||||
|
||||
event, updated_pending = detect_album_changes(
|
||||
old_album, album, pending, self._name, external_url
|
||||
)
|
||||
|
||||
if event:
|
||||
events.append(event)
|
||||
|
||||
# Update state
|
||||
state = _serialize_album_state(album)
|
||||
state["pending_asset_ids"] = list(updated_pending)
|
||||
new_state[album_id] = state
|
||||
|
||||
return events, new_state
|
||||
|
||||
def get_available_variables(self) -> list[TemplateVariableDefinition]:
|
||||
return list(IMMICH_VARIABLES)
|
||||
|
||||
def get_provider_config_schema(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"url": {
|
||||
"type": "string",
|
||||
"description": "Immich server URL",
|
||||
"example": "http://immich:2283",
|
||||
},
|
||||
"api_key": {
|
||||
"type": "string",
|
||||
"description": "Immich API key",
|
||||
"secret": True,
|
||||
},
|
||||
"external_domain": {
|
||||
"type": "string",
|
||||
"description": "Public-facing domain (optional)",
|
||||
"example": "https://photos.example.com",
|
||||
},
|
||||
},
|
||||
"required": ["url", "api_key"],
|
||||
}
|
||||
|
||||
async def list_collections(self) -> list[dict[str, Any]]:
|
||||
albums = await self._client.get_albums()
|
||||
return [
|
||||
{
|
||||
"id": a.get("id", ""),
|
||||
"name": a.get("albumName", "Unnamed"),
|
||||
"asset_count": a.get("assetCount", 0),
|
||||
}
|
||||
for a in albums
|
||||
]
|
||||
|
||||
async def test_connection(self) -> dict[str, Any]:
|
||||
ok = await self._client.ping()
|
||||
if ok:
|
||||
config = await self._client.get_server_config()
|
||||
return {
|
||||
"ok": True,
|
||||
"message": "Connected to Immich",
|
||||
"external_domain": config,
|
||||
}
|
||||
return {"ok": False, "message": "Failed to connect to Immich"}
|
||||
|
||||
|
||||
def _serialize_album_state(album: ImmichAlbumData) -> dict[str, Any]:
|
||||
"""Serialize album state for persistence."""
|
||||
return {
|
||||
"name": album.name,
|
||||
"asset_ids": list(album.asset_ids),
|
||||
"shared": album.shared,
|
||||
"pending_asset_ids": [],
|
||||
}
|
||||
|
||||
|
||||
def _deserialize_album_state(album_id: str, state: dict[str, Any]) -> ImmichAlbumData:
|
||||
"""Create a minimal ImmichAlbumData from saved state for comparison."""
|
||||
return ImmichAlbumData(
|
||||
id=album_id,
|
||||
name=state.get("name", ""),
|
||||
asset_count=len(state.get("asset_ids", [])),
|
||||
photo_count=0,
|
||||
video_count=0,
|
||||
created_at="",
|
||||
updated_at="",
|
||||
shared=state.get("shared", False),
|
||||
owner="",
|
||||
thumbnail_asset_id=None,
|
||||
asset_ids=set(state.get("asset_ids", [])),
|
||||
)
|
||||
Reference in New Issue
Block a user