diff --git a/packages/core/pyproject.toml b/packages/core/pyproject.toml new file mode 100644 index 0000000..0dbe24c --- /dev/null +++ b/packages/core/pyproject.toml @@ -0,0 +1,26 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "immich-watcher-core" +version = "0.1.0" +description = "Core library for Immich album change detection and notifications" +requires-python = ">=3.12" +dependencies = [ + "aiohttp>=3.9", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", + "aioresponses>=0.7", +] + +[tool.hatch.build.targets.wheel] +packages = ["src/immich_watcher_core"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] diff --git a/packages/core/src/immich_watcher_core/__init__.py b/packages/core/src/immich_watcher_core/__init__.py new file mode 100644 index 0000000..103ccc5 --- /dev/null +++ b/packages/core/src/immich_watcher_core/__init__.py @@ -0,0 +1 @@ +"""Immich Watcher Core - shared library for Immich album change detection and notifications.""" diff --git a/packages/core/src/immich_watcher_core/asset_utils.py b/packages/core/src/immich_watcher_core/asset_utils.py new file mode 100644 index 0000000..0e9b4dc --- /dev/null +++ b/packages/core/src/immich_watcher_core/asset_utils.py @@ -0,0 +1,337 @@ +"""Asset filtering, sorting, and URL utilities.""" + +from __future__ import annotations + +import logging +import random +from datetime import datetime +from typing import Any + +from .constants import ( + ASSET_TYPE_IMAGE, + ASSET_TYPE_VIDEO, + ATTR_ASSET_CITY, + ATTR_ASSET_COUNTRY, + ATTR_ASSET_CREATED, + ATTR_ASSET_DESCRIPTION, + ATTR_ASSET_DOWNLOAD_URL, + ATTR_ASSET_FILENAME, + ATTR_ASSET_IS_FAVORITE, + ATTR_ASSET_LATITUDE, + ATTR_ASSET_LONGITUDE, + ATTR_ASSET_OWNER, + ATTR_ASSET_OWNER_ID, + ATTR_ASSET_PLAYBACK_URL, + ATTR_ASSET_RATING, + ATTR_ASSET_STATE, + ATTR_ASSET_TYPE, + ATTR_ASSET_URL, + ATTR_PEOPLE, + ATTR_THUMBNAIL_URL, +) +from .models import AssetInfo, SharedLinkInfo + +_LOGGER = logging.getLogger(__name__) + + +def filter_assets( + assets: list[AssetInfo], + *, + favorite_only: bool = False, + min_rating: int = 1, + asset_type: str = "all", + min_date: str | None = None, + max_date: str | None = None, + memory_date: str | None = None, + city: str | None = None, + state: str | None = None, + country: str | None = None, + processed_only: bool = True, +) -> list[AssetInfo]: + """Filter assets by various criteria. + + Args: + assets: List of assets to filter + favorite_only: Only include favorite assets + min_rating: Minimum rating (1-5) + asset_type: "all", "photo", or "video" + min_date: Minimum creation date (ISO 8601) + max_date: Maximum creation date (ISO 8601) + memory_date: Match month/day excluding same year (ISO 8601) + city: City substring filter (case-insensitive) + state: State substring filter (case-insensitive) + country: Country substring filter (case-insensitive) + processed_only: Only include fully processed assets + + Returns: + Filtered list of assets + """ + result = list(assets) + + if processed_only: + result = [a for a in result if a.is_processed] + + if favorite_only: + result = [a for a in result if a.is_favorite] + + if min_rating > 1: + result = [a for a in result if a.rating is not None and a.rating >= min_rating] + + if asset_type == "photo": + result = [a for a in result if a.type == ASSET_TYPE_IMAGE] + elif asset_type == "video": + result = [a for a in result if a.type == ASSET_TYPE_VIDEO] + + if min_date: + result = [a for a in result if a.created_at >= min_date] + if max_date: + result = [a for a in result if a.created_at <= max_date] + + if memory_date: + try: + ref_date = datetime.fromisoformat(memory_date.replace("Z", "+00:00")) + ref_year = ref_date.year + ref_month = ref_date.month + ref_day = ref_date.day + + def matches_memory(asset: AssetInfo) -> bool: + try: + asset_date = datetime.fromisoformat( + asset.created_at.replace("Z", "+00:00") + ) + return ( + asset_date.month == ref_month + and asset_date.day == ref_day + and asset_date.year != ref_year + ) + except (ValueError, AttributeError): + return False + + result = [a for a in result if matches_memory(a)] + except ValueError: + _LOGGER.warning("Invalid memory_date format: %s", memory_date) + + if city: + city_lower = city.lower() + result = [a for a in result if a.city and city_lower in a.city.lower()] + if state: + state_lower = state.lower() + result = [a for a in result if a.state and state_lower in a.state.lower()] + if country: + country_lower = country.lower() + result = [a for a in result if a.country and country_lower in a.country.lower()] + + return result + + +def sort_assets( + assets: list[AssetInfo], + order_by: str = "date", + order: str = "descending", +) -> list[AssetInfo]: + """Sort assets by the specified field. + + Args: + assets: List of assets to sort + order_by: "date", "rating", "name", or "random" + order: "ascending" or "descending" + + Returns: + Sorted list of assets + """ + result = list(assets) + + if order_by == "random": + random.shuffle(result) + elif order_by == "rating": + result = sorted( + result, + key=lambda a: (a.rating is None, a.rating if a.rating is not None else 0), + reverse=(order == "descending"), + ) + elif order_by == "name": + result = sorted( + result, + key=lambda a: a.filename.lower(), + reverse=(order == "descending"), + ) + else: # date (default) + result = sorted( + result, + key=lambda a: a.created_at, + reverse=(order == "descending"), + ) + + return result + + +# --- Shared link URL helpers --- + + +def get_accessible_links(links: list[SharedLinkInfo]) -> list[SharedLinkInfo]: + """Get all accessible (no password, not expired) shared links.""" + return [link for link in links if link.is_accessible] + + +def get_protected_links(links: list[SharedLinkInfo]) -> list[SharedLinkInfo]: + """Get password-protected but not expired shared links.""" + return [link for link in links if link.has_password and not link.is_expired] + + +def get_public_url(external_url: str, links: list[SharedLinkInfo]) -> str | None: + """Get the public URL if album has an accessible shared link.""" + accessible = get_accessible_links(links) + if accessible: + return f"{external_url}/share/{accessible[0].key}" + return None + + +def get_any_url(external_url: str, links: list[SharedLinkInfo]) -> str | None: + """Get any non-expired URL (prefers accessible, falls back to protected).""" + accessible = get_accessible_links(links) + if accessible: + return f"{external_url}/share/{accessible[0].key}" + non_expired = [link for link in links if not link.is_expired] + if non_expired: + return f"{external_url}/share/{non_expired[0].key}" + return None + + +def get_protected_url(external_url: str, links: list[SharedLinkInfo]) -> str | None: + """Get a protected URL if any password-protected link exists.""" + protected = get_protected_links(links) + if protected: + return f"{external_url}/share/{protected[0].key}" + return None + + +def get_protected_password(links: list[SharedLinkInfo]) -> str | None: + """Get the password for the first protected link.""" + protected = get_protected_links(links) + if protected and protected[0].password: + return protected[0].password + return None + + +def get_public_urls(external_url: str, links: list[SharedLinkInfo]) -> list[str]: + """Get all accessible public URLs.""" + return [f"{external_url}/share/{link.key}" for link in get_accessible_links(links)] + + +def get_protected_urls(external_url: str, links: list[SharedLinkInfo]) -> list[str]: + """Get all password-protected URLs.""" + return [f"{external_url}/share/{link.key}" for link in get_protected_links(links)] + + +# --- Asset URL builders --- + + +def _get_best_link_key(links: list[SharedLinkInfo]) -> str | None: + """Get the best available link key (prefers accessible, falls back to non-expired).""" + accessible = get_accessible_links(links) + if accessible: + return accessible[0].key + non_expired = [link for link in links if not link.is_expired] + if non_expired: + return non_expired[0].key + return None + + +def get_asset_public_url( + external_url: str, links: list[SharedLinkInfo], asset_id: str +) -> str | None: + """Get the public viewer URL for an asset (web page).""" + key = _get_best_link_key(links) + if key: + return f"{external_url}/share/{key}/photos/{asset_id}" + return None + + +def get_asset_download_url( + external_url: str, links: list[SharedLinkInfo], asset_id: str +) -> str | None: + """Get the direct download URL for an asset (media file).""" + key = _get_best_link_key(links) + if key: + return f"{external_url}/api/assets/{asset_id}/original?key={key}" + return None + + +def get_asset_video_url( + external_url: str, links: list[SharedLinkInfo], asset_id: str +) -> str | None: + """Get the transcoded video playback URL for a video asset.""" + key = _get_best_link_key(links) + if key: + return f"{external_url}/api/assets/{asset_id}/video/playback?key={key}" + return None + + +def get_asset_photo_url( + external_url: str, links: list[SharedLinkInfo], asset_id: str +) -> str | None: + """Get the preview-sized thumbnail URL for a photo asset.""" + key = _get_best_link_key(links) + if key: + return f"{external_url}/api/assets/{asset_id}/thumbnail?size=preview&key={key}" + return None + + +def build_asset_detail( + asset: AssetInfo, + external_url: str, + shared_links: list[SharedLinkInfo], + include_thumbnail: bool = True, +) -> dict[str, Any]: + """Build asset detail dictionary with all available data. + + Args: + asset: AssetInfo object + external_url: Base URL for constructing links + shared_links: Available shared links for URL building + include_thumbnail: If True, include thumbnail_url + + Returns: + Dictionary with asset details using ATTR_* constants + """ + asset_detail: dict[str, Any] = { + "id": asset.id, + ATTR_ASSET_TYPE: asset.type, + ATTR_ASSET_FILENAME: asset.filename, + ATTR_ASSET_CREATED: asset.created_at, + ATTR_ASSET_OWNER: asset.owner_name, + ATTR_ASSET_OWNER_ID: asset.owner_id, + ATTR_ASSET_DESCRIPTION: asset.description, + ATTR_PEOPLE: asset.people, + ATTR_ASSET_IS_FAVORITE: asset.is_favorite, + ATTR_ASSET_RATING: asset.rating, + ATTR_ASSET_LATITUDE: asset.latitude, + ATTR_ASSET_LONGITUDE: asset.longitude, + ATTR_ASSET_CITY: asset.city, + ATTR_ASSET_STATE: asset.state, + ATTR_ASSET_COUNTRY: asset.country, + } + + if include_thumbnail: + asset_detail[ATTR_THUMBNAIL_URL] = ( + f"{external_url}/api/assets/{asset.id}/thumbnail" + ) + + asset_url = get_asset_public_url(external_url, shared_links, asset.id) + if asset_url: + asset_detail[ATTR_ASSET_URL] = asset_url + + download_url = get_asset_download_url(external_url, shared_links, asset.id) + if download_url: + asset_detail[ATTR_ASSET_DOWNLOAD_URL] = download_url + + if asset.type == ASSET_TYPE_VIDEO: + video_url = get_asset_video_url(external_url, shared_links, asset.id) + if video_url: + asset_detail[ATTR_ASSET_PLAYBACK_URL] = video_url + elif asset.type == ASSET_TYPE_IMAGE: + photo_url = get_asset_photo_url(external_url, shared_links, asset.id) + if photo_url: + asset_detail["photo_url"] = photo_url + + return asset_detail diff --git a/packages/core/src/immich_watcher_core/change_detector.py b/packages/core/src/immich_watcher_core/change_detector.py new file mode 100644 index 0000000..0ec2a40 --- /dev/null +++ b/packages/core/src/immich_watcher_core/change_detector.py @@ -0,0 +1,115 @@ +"""Album change detection logic.""" + +from __future__ import annotations + +import logging + +from .models import AlbumChange, AlbumData + +_LOGGER = logging.getLogger(__name__) + + +def detect_album_changes( + old_state: AlbumData, + new_state: AlbumData, + pending_asset_ids: set[str], +) -> tuple[AlbumChange | None, set[str]]: + """Detect changes between two album states. + + Args: + old_state: Previous album data + new_state: Current album data + pending_asset_ids: Set of asset IDs that were detected but not yet + fully processed by Immich (no thumbhash yet) + + Returns: + Tuple of (change or None if no changes, updated pending_asset_ids) + """ + added_ids = new_state.asset_ids - old_state.asset_ids + removed_ids = old_state.asset_ids - new_state.asset_ids + + _LOGGER.debug( + "Change detection: added_ids=%d, removed_ids=%d, pending=%d", + len(added_ids), + len(removed_ids), + len(pending_asset_ids), + ) + + # Make a mutable copy of pending set + pending = set(pending_asset_ids) + + # Track new unprocessed assets and collect processed ones + added_assets = [] + for aid in added_ids: + if aid not in new_state.assets: + _LOGGER.debug("Asset %s: not in assets dict", aid) + continue + asset = new_state.assets[aid] + _LOGGER.debug( + "New asset %s (%s): is_processed=%s, filename=%s", + aid, + asset.type, + asset.is_processed, + asset.filename, + ) + if asset.is_processed: + added_assets.append(asset) + else: + pending.add(aid) + _LOGGER.debug("Asset %s added to pending (not yet processed)", aid) + + # Check if any pending assets are now processed + newly_processed = [] + for aid in list(pending): + if aid not in new_state.assets: + # Asset was removed, no longer pending + pending.discard(aid) + continue + asset = new_state.assets[aid] + if asset.is_processed: + _LOGGER.debug( + "Pending asset %s (%s) is now processed: filename=%s", + aid, + asset.type, + asset.filename, + ) + newly_processed.append(asset) + pending.discard(aid) + + # Include newly processed pending assets + added_assets.extend(newly_processed) + + # Detect metadata changes + name_changed = old_state.name != new_state.name + sharing_changed = old_state.shared != new_state.shared + + # Return None only if nothing changed at all + if not added_assets and not removed_ids and not name_changed and not sharing_changed: + return None, pending + + # Determine primary change type (use added_assets not added_ids) + change_type = "changed" + if name_changed and not added_assets and not removed_ids and not sharing_changed: + change_type = "album_renamed" + elif sharing_changed and not added_assets and not removed_ids and not name_changed: + change_type = "album_sharing_changed" + elif added_assets and not removed_ids and not name_changed and not sharing_changed: + change_type = "assets_added" + elif removed_ids and not added_assets and not name_changed and not sharing_changed: + change_type = "assets_removed" + + change = AlbumChange( + album_id=new_state.id, + album_name=new_state.name, + change_type=change_type, + added_count=len(added_assets), + removed_count=len(removed_ids), + added_assets=added_assets, + removed_asset_ids=list(removed_ids), + old_name=old_state.name if name_changed else None, + new_name=new_state.name if name_changed else None, + old_shared=old_state.shared if sharing_changed else None, + new_shared=new_state.shared if sharing_changed else None, + ) + + return change, pending diff --git a/packages/core/src/immich_watcher_core/constants.py b/packages/core/src/immich_watcher_core/constants.py new file mode 100644 index 0000000..0846d83 --- /dev/null +++ b/packages/core/src/immich_watcher_core/constants.py @@ -0,0 +1,64 @@ +"""Shared constants for Immich Watcher.""" + +from typing import Final + +# Defaults +DEFAULT_SCAN_INTERVAL: Final = 60 # seconds +DEFAULT_TELEGRAM_CACHE_TTL: Final = 48 # hours +NEW_ASSETS_RESET_DELAY: Final = 300 # 5 minutes +DEFAULT_SHARE_PASSWORD: Final = "immich123" + +# Events +EVENT_ALBUM_CHANGED: Final = "album_changed" +EVENT_ASSETS_ADDED: Final = "assets_added" +EVENT_ASSETS_REMOVED: Final = "assets_removed" +EVENT_ALBUM_RENAMED: Final = "album_renamed" +EVENT_ALBUM_DELETED: Final = "album_deleted" +EVENT_ALBUM_SHARING_CHANGED: Final = "album_sharing_changed" + +# Attributes +ATTR_HUB_NAME: Final = "hub_name" +ATTR_ALBUM_ID: Final = "album_id" +ATTR_ALBUM_NAME: Final = "album_name" +ATTR_ALBUM_URL: Final = "album_url" +ATTR_ALBUM_URLS: Final = "album_urls" +ATTR_ALBUM_PROTECTED_URL: Final = "album_protected_url" +ATTR_ALBUM_PROTECTED_PASSWORD: Final = "album_protected_password" +ATTR_ASSET_COUNT: Final = "asset_count" +ATTR_PHOTO_COUNT: Final = "photo_count" +ATTR_VIDEO_COUNT: Final = "video_count" +ATTR_ADDED_COUNT: Final = "added_count" +ATTR_REMOVED_COUNT: Final = "removed_count" +ATTR_ADDED_ASSETS: Final = "added_assets" +ATTR_REMOVED_ASSETS: Final = "removed_assets" +ATTR_CHANGE_TYPE: Final = "change_type" +ATTR_LAST_UPDATED: Final = "last_updated_at" +ATTR_CREATED_AT: Final = "created_at" +ATTR_THUMBNAIL_URL: Final = "thumbnail_url" +ATTR_SHARED: Final = "shared" +ATTR_OWNER: Final = "owner" +ATTR_PEOPLE: Final = "people" +ATTR_OLD_NAME: Final = "old_name" +ATTR_NEW_NAME: Final = "new_name" +ATTR_OLD_SHARED: Final = "old_shared" +ATTR_NEW_SHARED: Final = "new_shared" +ATTR_ASSET_TYPE: Final = "type" +ATTR_ASSET_FILENAME: Final = "filename" +ATTR_ASSET_CREATED: Final = "created_at" +ATTR_ASSET_OWNER: Final = "owner" +ATTR_ASSET_OWNER_ID: Final = "owner_id" +ATTR_ASSET_URL: Final = "url" +ATTR_ASSET_DOWNLOAD_URL: Final = "download_url" +ATTR_ASSET_PLAYBACK_URL: Final = "playback_url" +ATTR_ASSET_DESCRIPTION: Final = "description" +ATTR_ASSET_IS_FAVORITE: Final = "is_favorite" +ATTR_ASSET_RATING: Final = "rating" +ATTR_ASSET_LATITUDE: Final = "latitude" +ATTR_ASSET_LONGITUDE: Final = "longitude" +ATTR_ASSET_CITY: Final = "city" +ATTR_ASSET_STATE: Final = "state" +ATTR_ASSET_COUNTRY: Final = "country" + +# Asset types +ASSET_TYPE_IMAGE: Final = "IMAGE" +ASSET_TYPE_VIDEO: Final = "VIDEO" diff --git a/packages/core/src/immich_watcher_core/immich_client.py b/packages/core/src/immich_watcher_core/immich_client.py new file mode 100644 index 0000000..089cd47 --- /dev/null +++ b/packages/core/src/immich_watcher_core/immich_client.py @@ -0,0 +1,362 @@ +"""Async Immich API client.""" + +from __future__ import annotations + +import logging +from typing import Any + +import aiohttp + +from .models import AlbumData, SharedLinkInfo + +_LOGGER = logging.getLogger(__name__) + + +class ImmichClient: + """Async client for the Immich API. + + Accepts an aiohttp.ClientSession via constructor so that + Home Assistant can provide its managed session and the standalone + server can create its own. + """ + + def __init__( + self, + session: aiohttp.ClientSession, + url: str, + api_key: str, + ) -> None: + """Initialize the client. + + Args: + session: aiohttp client session (caller manages lifecycle) + url: Immich server base URL (e.g. http://immich:2283) + api_key: Immich API key + """ + self._session = session + self._url = url.rstrip("/") + self._api_key = api_key + self._external_domain: str | None = None + + @property + def url(self) -> str: + """Return the Immich API URL.""" + return self._url + + @property + def external_url(self) -> str: + """Return the external URL for public links. + + Uses externalDomain from Immich server config if set, + otherwise falls back to the connection URL. + """ + if self._external_domain: + return self._external_domain.rstrip("/") + return self._url + + @property + def api_key(self) -> str: + """Return the API key.""" + return self._api_key + + def get_internal_download_url(self, url: str) -> str: + """Convert an external URL to internal URL for faster downloads. + + If the URL starts with the external domain, replace it with the + internal connection URL to download via local network. + """ + if self._external_domain: + external = self._external_domain.rstrip("/") + if url.startswith(external): + return url.replace(external, self._url, 1) + return url + + @property + def _headers(self) -> dict[str, str]: + """Return common API headers.""" + return {"x-api-key": self._api_key} + + @property + def _json_headers(self) -> dict[str, str]: + """Return API headers for JSON requests.""" + return { + "x-api-key": self._api_key, + "Content-Type": "application/json", + } + + async def ping(self) -> bool: + """Validate connection to Immich server.""" + try: + async with self._session.get( + f"{self._url}/api/server/ping", + headers=self._headers, + ) as response: + return response.status == 200 + except aiohttp.ClientError: + return False + + async def get_server_config(self) -> str | None: + """Fetch server config and return the external domain (if set). + + Also updates the internal external_domain cache. + """ + try: + async with self._session.get( + f"{self._url}/api/server/config", + headers=self._headers, + ) as response: + if response.status == 200: + data = await response.json() + external_domain = data.get("externalDomain", "") or "" + self._external_domain = external_domain + if external_domain: + _LOGGER.debug( + "Using external domain from Immich: %s", external_domain + ) + else: + _LOGGER.debug( + "No external domain configured in Immich, using connection URL" + ) + return external_domain or None + _LOGGER.warning( + "Failed to fetch server config: HTTP %s", response.status + ) + except aiohttp.ClientError as err: + _LOGGER.warning("Failed to fetch server config: %s", err) + return None + + async def get_users(self) -> dict[str, str]: + """Fetch all users from Immich. + + Returns: + Dict mapping user_id -> display name + """ + try: + async with self._session.get( + f"{self._url}/api/users", + headers=self._headers, + ) as response: + if response.status == 200: + data = await response.json() + return { + u["id"]: u.get("name", u.get("email", "Unknown")) + for u in data + if u.get("id") + } + except aiohttp.ClientError as err: + _LOGGER.warning("Failed to fetch users: %s", err) + return {} + + async def get_people(self) -> dict[str, str]: + """Fetch all people from Immich. + + Returns: + Dict mapping person_id -> name + """ + try: + async with self._session.get( + f"{self._url}/api/people", + headers=self._headers, + ) as response: + if response.status == 200: + data = await response.json() + people_list = data.get("people", data) if isinstance(data, dict) else data + return { + p["id"]: p.get("name", "") + for p in people_list + if p.get("name") + } + except aiohttp.ClientError as err: + _LOGGER.warning("Failed to fetch people: %s", err) + return {} + + async def get_shared_links(self, album_id: str) -> list[SharedLinkInfo]: + """Fetch shared links for an album from Immich. + + Args: + album_id: The album ID to filter links for + + Returns: + List of SharedLinkInfo for the specified album + """ + links: list[SharedLinkInfo] = [] + try: + async with self._session.get( + f"{self._url}/api/shared-links", + headers=self._headers, + ) as response: + if response.status == 200: + data = await response.json() + for link in data: + album = link.get("album") + key = link.get("key") + if album and key and album.get("id") == album_id: + link_info = SharedLinkInfo.from_api_response(link) + links.append(link_info) + _LOGGER.debug( + "Found shared link for album: key=%s, has_password=%s", + key[:8], + link_info.has_password, + ) + except aiohttp.ClientError as err: + _LOGGER.warning("Failed to fetch shared links: %s", err) + return links + + async def get_album( + self, + album_id: str, + users_cache: dict[str, str] | None = None, + ) -> AlbumData | None: + """Fetch album data from Immich. + + Args: + album_id: The album ID to fetch + users_cache: Optional user_id -> name mapping for owner resolution + + Returns: + AlbumData if found, None if album doesn't exist (404) + + Raises: + ImmichApiError: On non-200/404 HTTP responses or connection errors + """ + try: + async with self._session.get( + f"{self._url}/api/albums/{album_id}", + headers=self._headers, + ) as response: + if response.status == 404: + _LOGGER.warning("Album %s not found", album_id) + return None + if response.status != 200: + raise ImmichApiError( + f"Error fetching album {album_id}: HTTP {response.status}" + ) + data = await response.json() + return AlbumData.from_api_response(data, users_cache) + except aiohttp.ClientError as err: + raise ImmichApiError(f"Error communicating with Immich: {err}") from err + + async def get_albums(self) -> list[dict[str, Any]]: + """Fetch all albums from Immich. + + Returns: + List of album dicts with id, albumName, assetCount, etc. + """ + try: + async with self._session.get( + f"{self._url}/api/albums", + headers=self._headers, + ) as response: + if response.status == 200: + return await response.json() + _LOGGER.warning("Failed to fetch albums: HTTP %s", response.status) + except aiohttp.ClientError as err: + _LOGGER.warning("Failed to fetch albums: %s", err) + return [] + + async def create_shared_link( + self, album_id: str, password: str | None = None + ) -> bool: + """Create a new shared link for an album. + + Args: + album_id: The album to share + password: Optional password for the link + + Returns: + True if created successfully + """ + payload: dict[str, Any] = { + "albumId": album_id, + "type": "ALBUM", + "allowDownload": True, + "allowUpload": False, + "showMetadata": True, + } + if password: + payload["password"] = password + + try: + async with self._session.post( + f"{self._url}/api/shared-links", + headers=self._json_headers, + json=payload, + ) as response: + if response.status == 201: + _LOGGER.info( + "Successfully created shared link for album %s", album_id + ) + return True + error_text = await response.text() + _LOGGER.error( + "Failed to create shared link: HTTP %s - %s", + response.status, + error_text, + ) + return False + except aiohttp.ClientError as err: + _LOGGER.error("Error creating shared link: %s", err) + return False + + async def delete_shared_link(self, link_id: str) -> bool: + """Delete a shared link. + + Args: + link_id: The shared link ID to delete + + Returns: + True if deleted successfully + """ + try: + async with self._session.delete( + f"{self._url}/api/shared-links/{link_id}", + headers=self._headers, + ) as response: + if response.status == 200: + _LOGGER.info("Successfully deleted shared link") + return True + error_text = await response.text() + _LOGGER.error( + "Failed to delete shared link: HTTP %s - %s", + response.status, + error_text, + ) + return False + except aiohttp.ClientError as err: + _LOGGER.error("Error deleting shared link: %s", err) + return False + + async def set_shared_link_password( + self, link_id: str, password: str | None + ) -> bool: + """Update the password for a shared link. + + Args: + link_id: The shared link ID + password: New password (None to remove) + + Returns: + True if updated successfully + """ + payload = {"password": password if password else None} + try: + async with self._session.patch( + f"{self._url}/api/shared-links/{link_id}", + headers=self._json_headers, + json=payload, + ) as response: + if response.status == 200: + _LOGGER.info("Successfully updated shared link password") + return True + _LOGGER.error( + "Failed to update shared link password: HTTP %s", + response.status, + ) + return False + except aiohttp.ClientError as err: + _LOGGER.error("Error updating shared link password: %s", err) + return False + + +class ImmichApiError(Exception): + """Raised when an Immich API call fails.""" diff --git a/packages/core/src/immich_watcher_core/models.py b/packages/core/src/immich_watcher_core/models.py new file mode 100644 index 0000000..4893b92 --- /dev/null +++ b/packages/core/src/immich_watcher_core/models.py @@ -0,0 +1,266 @@ +"""Data models for Immich Watcher.""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any + +from .constants import ASSET_TYPE_IMAGE, ASSET_TYPE_VIDEO + +_LOGGER = logging.getLogger(__name__) + + +@dataclass +class SharedLinkInfo: + """Data class for shared link information.""" + + id: str + key: str + has_password: bool = False + password: str | None = None + expires_at: datetime | None = None + allow_download: bool = True + show_metadata: bool = True + + @property + def is_expired(self) -> bool: + """Check if the link has expired.""" + if self.expires_at is None: + return False + return datetime.now(self.expires_at.tzinfo) > self.expires_at + + @property + def is_accessible(self) -> bool: + """Check if the link is accessible without password and not expired.""" + return not self.has_password and not self.is_expired + + @classmethod + def from_api_response(cls, data: dict[str, Any]) -> SharedLinkInfo: + """Create SharedLinkInfo from API response.""" + expires_at = None + if data.get("expiresAt"): + try: + expires_at = datetime.fromisoformat( + data["expiresAt"].replace("Z", "+00:00") + ) + except ValueError: + pass + + password = data.get("password") + return cls( + id=data["id"], + key=data["key"], + has_password=bool(password), + password=password if password else None, + expires_at=expires_at, + allow_download=data.get("allowDownload", True), + show_metadata=data.get("showMetadata", True), + ) + + +@dataclass +class AssetInfo: + """Data class for asset information.""" + + id: str + type: str # IMAGE or VIDEO + filename: str + created_at: str + owner_id: str = "" + owner_name: str = "" + description: str = "" + people: list[str] = field(default_factory=list) + is_favorite: bool = False + rating: int | None = None + latitude: float | None = None + longitude: float | None = None + city: str | None = None + state: str | None = None + country: str | None = None + is_processed: bool = True # Whether asset is fully processed by Immich + thumbhash: str | None = None # Perceptual hash for cache validation + + @classmethod + def from_api_response( + cls, data: dict[str, Any], users_cache: dict[str, str] | None = None + ) -> AssetInfo: + """Create AssetInfo from API response.""" + people = [] + if "people" in data: + people = [p.get("name", "") for p in data["people"] if p.get("name")] + + owner_id = data.get("ownerId", "") + owner_name = "" + if users_cache and owner_id: + owner_name = users_cache.get(owner_id, "") + + # Get description - prioritize user-added description over EXIF description + description = data.get("description", "") or "" + exif_info = data.get("exifInfo") + if not description and exif_info: + description = exif_info.get("description", "") or "" + + # Get favorites and rating + is_favorite = data.get("isFavorite", False) + rating = exif_info.get("rating") if exif_info else None + + # Get geolocation + latitude = exif_info.get("latitude") if exif_info else None + longitude = exif_info.get("longitude") if exif_info else None + + # Get reverse geocoded location + city = exif_info.get("city") if exif_info else None + state = exif_info.get("state") if exif_info else None + country = exif_info.get("country") if exif_info else None + + # Check if asset is fully processed by Immich + asset_type = data.get("type", ASSET_TYPE_IMAGE) + is_processed = cls._check_processing_status(data, asset_type) + thumbhash = data.get("thumbhash") + + return cls( + id=data["id"], + type=asset_type, + filename=data.get("originalFileName", ""), + created_at=data.get("fileCreatedAt", ""), + owner_id=owner_id, + owner_name=owner_name, + description=description, + people=people, + is_favorite=is_favorite, + rating=rating, + latitude=latitude, + longitude=longitude, + city=city, + state=state, + country=country, + is_processed=is_processed, + thumbhash=thumbhash, + ) + + @staticmethod + def _check_processing_status(data: dict[str, Any], _asset_type: str) -> bool: + """Check if asset has been fully processed by Immich. + + For all assets: Check if thumbnails have been generated (thumbhash exists). + Immich generates thumbnails for both photos and videos regardless of + whether video transcoding is needed. + + Args: + data: Asset data from API response + _asset_type: Asset type (IMAGE or VIDEO) - unused but kept for API stability + + Returns: + True if asset is fully processed and not trashed/offline/archived, False otherwise + """ + asset_id = data.get("id", "unknown") + asset_type = data.get("type", "unknown") + is_offline = data.get("isOffline", False) + is_trashed = data.get("isTrashed", False) + is_archived = data.get("isArchived", False) + thumbhash = data.get("thumbhash") + + _LOGGER.debug( + "Asset %s (%s): isOffline=%s, isTrashed=%s, isArchived=%s, thumbhash=%s", + asset_id, + asset_type, + is_offline, + is_trashed, + is_archived, + bool(thumbhash), + ) + + if is_offline: + _LOGGER.debug("Asset %s excluded: offline", asset_id) + return False + + if is_trashed: + _LOGGER.debug("Asset %s excluded: trashed", asset_id) + return False + + if is_archived: + _LOGGER.debug("Asset %s excluded: archived", asset_id) + return False + + is_processed = bool(thumbhash) + if not is_processed: + _LOGGER.debug("Asset %s excluded: no thumbhash", asset_id) + return is_processed + + +@dataclass +class AlbumData: + """Data class for album information.""" + + id: str + name: str + asset_count: int + photo_count: int + video_count: int + created_at: str + updated_at: str + shared: bool + owner: str + thumbnail_asset_id: str | None + asset_ids: set[str] = field(default_factory=set) + assets: dict[str, AssetInfo] = field(default_factory=dict) + people: set[str] = field(default_factory=set) + has_new_assets: bool = False + last_change_time: datetime | None = None + + @classmethod + def from_api_response( + cls, data: dict[str, Any], users_cache: dict[str, str] | None = None + ) -> AlbumData: + """Create AlbumData from API response.""" + assets_data = data.get("assets", []) + asset_ids = set() + assets = {} + people = set() + photo_count = 0 + video_count = 0 + + for asset_data in assets_data: + asset = AssetInfo.from_api_response(asset_data, users_cache) + asset_ids.add(asset.id) + assets[asset.id] = asset + people.update(asset.people) + if asset.type == ASSET_TYPE_IMAGE: + photo_count += 1 + elif asset.type == ASSET_TYPE_VIDEO: + video_count += 1 + + return cls( + id=data["id"], + name=data.get("albumName", "Unnamed"), + asset_count=data.get("assetCount", len(asset_ids)), + photo_count=photo_count, + video_count=video_count, + created_at=data.get("createdAt", ""), + updated_at=data.get("updatedAt", ""), + shared=data.get("shared", False), + owner=data.get("owner", {}).get("name", "Unknown"), + thumbnail_asset_id=data.get("albumThumbnailAssetId"), + asset_ids=asset_ids, + assets=assets, + people=people, + ) + + +@dataclass +class AlbumChange: + """Data class for album changes.""" + + album_id: str + album_name: str + change_type: str + added_count: int = 0 + removed_count: int = 0 + added_assets: list[AssetInfo] = field(default_factory=list) + removed_asset_ids: list[str] = field(default_factory=list) + old_name: str | None = None + new_name: str | None = None + old_shared: bool | None = None + new_shared: bool | None = None diff --git a/packages/core/src/immich_watcher_core/notifications/__init__.py b/packages/core/src/immich_watcher_core/notifications/__init__.py new file mode 100644 index 0000000..fded320 --- /dev/null +++ b/packages/core/src/immich_watcher_core/notifications/__init__.py @@ -0,0 +1 @@ +"""Notification providers.""" diff --git a/packages/core/src/immich_watcher_core/notifications/queue.py b/packages/core/src/immich_watcher_core/notifications/queue.py new file mode 100644 index 0000000..00bbf44 --- /dev/null +++ b/packages/core/src/immich_watcher_core/notifications/queue.py @@ -0,0 +1,81 @@ +"""Persistent notification queue for deferred notifications.""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from typing import Any + +from ..storage import StorageBackend + +_LOGGER = logging.getLogger(__name__) + + +class NotificationQueue: + """Persistent queue for notifications deferred during quiet hours. + + Stores full service call parameters so notifications can be replayed + exactly as they were originally called. + """ + + def __init__(self, backend: StorageBackend) -> None: + """Initialize the notification queue. + + Args: + backend: Storage backend for persistence + """ + self._backend = backend + self._data: dict[str, Any] | None = None + + async def async_load(self) -> None: + """Load queue data from storage.""" + self._data = await self._backend.load() or {"queue": []} + _LOGGER.debug( + "Loaded notification queue with %d items", + len(self._data.get("queue", [])), + ) + + async def async_enqueue(self, notification_params: dict[str, Any]) -> None: + """Add a notification to the queue.""" + if self._data is None: + self._data = {"queue": []} + + self._data["queue"].append({ + "params": notification_params, + "queued_at": datetime.now(timezone.utc).isoformat(), + }) + await self._backend.save(self._data) + _LOGGER.debug( + "Queued notification during quiet hours (total: %d)", + len(self._data["queue"]), + ) + + def get_all(self) -> list[dict[str, Any]]: + """Get all queued notifications.""" + if not self._data: + return [] + return list(self._data.get("queue", [])) + + def has_pending(self) -> bool: + """Check if there are pending notifications.""" + return bool(self._data and self._data.get("queue")) + + async def async_remove_indices(self, indices: list[int]) -> None: + """Remove specific items by index (indices must be in descending order).""" + if not self._data or not indices: + return + for idx in indices: + if 0 <= idx < len(self._data["queue"]): + del self._data["queue"][idx] + await self._backend.save(self._data) + + async def async_clear(self) -> None: + """Clear all queued notifications.""" + if self._data: + self._data["queue"] = [] + await self._backend.save(self._data) + + async def async_remove(self) -> None: + """Remove all queue data.""" + await self._backend.remove() + self._data = None diff --git a/packages/core/src/immich_watcher_core/storage.py b/packages/core/src/immich_watcher_core/storage.py new file mode 100644 index 0000000..4791e0b --- /dev/null +++ b/packages/core/src/immich_watcher_core/storage.py @@ -0,0 +1,72 @@ +"""Abstract storage backends and JSON file implementation.""" + +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Any, Protocol, runtime_checkable + +_LOGGER = logging.getLogger(__name__) + + +@runtime_checkable +class StorageBackend(Protocol): + """Abstract storage backend for persisting JSON-serializable data.""" + + async def load(self) -> dict[str, Any] | None: + """Load data from storage. Returns None if no data exists.""" + ... + + async def save(self, data: dict[str, Any]) -> None: + """Save data to storage.""" + ... + + async def remove(self) -> None: + """Remove all stored data.""" + ... + + +class JsonFileBackend: + """Simple JSON file storage backend. + + Suitable for standalone server use. For Home Assistant, + use an adapter wrapping homeassistant.helpers.storage.Store. + """ + + def __init__(self, path: Path) -> None: + """Initialize with a file path. + + Args: + path: Path to the JSON file (will be created if it doesn't exist) + """ + self._path = path + + async def load(self) -> dict[str, Any] | None: + """Load data from the JSON file.""" + if not self._path.exists(): + return None + try: + text = self._path.read_text(encoding="utf-8") + return json.loads(text) + except (json.JSONDecodeError, OSError) as err: + _LOGGER.warning("Failed to load %s: %s", self._path, err) + return None + + async def save(self, data: dict[str, Any]) -> None: + """Save data to the JSON file.""" + try: + self._path.parent.mkdir(parents=True, exist_ok=True) + self._path.write_text( + json.dumps(data, default=str), encoding="utf-8" + ) + except OSError as err: + _LOGGER.error("Failed to save %s: %s", self._path, err) + + async def remove(self) -> None: + """Remove the JSON file.""" + try: + if self._path.exists(): + self._path.unlink() + except OSError as err: + _LOGGER.error("Failed to remove %s: %s", self._path, err) diff --git a/packages/core/src/immich_watcher_core/telegram/__init__.py b/packages/core/src/immich_watcher_core/telegram/__init__.py new file mode 100644 index 0000000..98c63a8 --- /dev/null +++ b/packages/core/src/immich_watcher_core/telegram/__init__.py @@ -0,0 +1 @@ +"""Telegram notification support.""" diff --git a/packages/core/src/immich_watcher_core/telegram/cache.py b/packages/core/src/immich_watcher_core/telegram/cache.py new file mode 100644 index 0000000..8e99f24 --- /dev/null +++ b/packages/core/src/immich_watcher_core/telegram/cache.py @@ -0,0 +1,199 @@ +"""Telegram file_id cache with pluggable storage backend.""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from typing import Any + +from ..storage import StorageBackend + +_LOGGER = logging.getLogger(__name__) + +# Default TTL for Telegram file_id cache (48 hours in seconds) +DEFAULT_TELEGRAM_CACHE_TTL = 48 * 60 * 60 + + +class TelegramFileCache: + """Cache for Telegram file_ids to avoid re-uploading media. + + When a file is uploaded to Telegram, it returns a file_id that can be reused + to send the same file without re-uploading. This cache stores these file_ids + keyed by the source URL or asset ID. + + Supports two validation modes: + - TTL mode (default): entries expire after a configured time-to-live + - Thumbhash mode: entries are validated by comparing stored thumbhash with + the current asset thumbhash from Immich + """ + + # Maximum number of entries to keep in thumbhash mode to prevent unbounded growth + THUMBHASH_MAX_ENTRIES = 2000 + + def __init__( + self, + backend: StorageBackend, + ttl_seconds: int = DEFAULT_TELEGRAM_CACHE_TTL, + use_thumbhash: bool = False, + ) -> None: + """Initialize the Telegram file cache. + + Args: + backend: Storage backend for persistence + ttl_seconds: Time-to-live for cache entries in seconds (TTL mode only) + use_thumbhash: Use thumbhash-based validation instead of TTL + """ + self._backend = backend + self._data: dict[str, Any] | None = None + self._ttl_seconds = ttl_seconds + self._use_thumbhash = use_thumbhash + + async def async_load(self) -> None: + """Load cache data from storage.""" + self._data = await self._backend.load() or {"files": {}} + await self._cleanup_expired() + mode = "thumbhash" if self._use_thumbhash else "TTL" + _LOGGER.debug( + "Loaded Telegram file cache with %d entries (mode: %s)", + len(self._data.get("files", {})), + mode, + ) + + async def _cleanup_expired(self) -> None: + """Remove expired cache entries (TTL mode) or trim old entries (thumbhash mode).""" + if self._use_thumbhash: + files = self._data.get("files", {}) if self._data else {} + if len(files) > self.THUMBHASH_MAX_ENTRIES: + sorted_keys = sorted( + files, key=lambda k: files[k].get("cached_at", "") + ) + keys_to_remove = sorted_keys[: len(files) - self.THUMBHASH_MAX_ENTRIES] + for key in keys_to_remove: + del files[key] + await self._backend.save(self._data) + _LOGGER.debug( + "Trimmed thumbhash cache from %d to %d entries", + len(keys_to_remove) + self.THUMBHASH_MAX_ENTRIES, + self.THUMBHASH_MAX_ENTRIES, + ) + return + + if not self._data or "files" not in self._data: + return + + now = datetime.now(timezone.utc) + expired_keys = [] + + for url, entry in self._data["files"].items(): + cached_at_str = entry.get("cached_at") + if cached_at_str: + cached_at = datetime.fromisoformat(cached_at_str) + age_seconds = (now - cached_at).total_seconds() + if age_seconds > self._ttl_seconds: + expired_keys.append(url) + + if expired_keys: + for key in expired_keys: + del self._data["files"][key] + await self._backend.save(self._data) + _LOGGER.debug("Cleaned up %d expired Telegram cache entries", len(expired_keys)) + + def get(self, key: str, thumbhash: str | None = None) -> dict[str, Any] | None: + """Get cached file_id for a key. + + Args: + key: The cache key (URL or asset ID) + thumbhash: Current thumbhash for validation (thumbhash mode only). + If provided, compares with stored thumbhash. Mismatch = cache miss. + + Returns: + Dict with 'file_id' and 'type' if cached and valid, None otherwise + """ + if not self._data or "files" not in self._data: + return None + + entry = self._data["files"].get(key) + if not entry: + return None + + if self._use_thumbhash: + if thumbhash is not None: + stored_thumbhash = entry.get("thumbhash") + if stored_thumbhash and stored_thumbhash != thumbhash: + _LOGGER.debug( + "Cache miss for %s: thumbhash changed, removing stale entry", + key[:36], + ) + del self._data["files"][key] + return None + else: + cached_at_str = entry.get("cached_at") + if cached_at_str: + cached_at = datetime.fromisoformat(cached_at_str) + age_seconds = (datetime.now(timezone.utc) - cached_at).total_seconds() + if age_seconds > self._ttl_seconds: + return None + + return { + "file_id": entry.get("file_id"), + "type": entry.get("type"), + } + + async def async_set( + self, key: str, file_id: str, media_type: str, thumbhash: str | None = None + ) -> None: + """Store a file_id for a key. + + Args: + key: The cache key (URL or asset ID) + file_id: The Telegram file_id + media_type: The type of media ('photo', 'video', 'document') + thumbhash: Current thumbhash to store alongside file_id (thumbhash mode only) + """ + if self._data is None: + self._data = {"files": {}} + + entry_data: dict[str, Any] = { + "file_id": file_id, + "type": media_type, + "cached_at": datetime.now(timezone.utc).isoformat(), + } + if thumbhash is not None: + entry_data["thumbhash"] = thumbhash + + self._data["files"][key] = entry_data + await self._backend.save(self._data) + _LOGGER.debug("Cached Telegram file_id for key (type: %s)", media_type) + + async def async_set_many( + self, entries: list[tuple[str, str, str, str | None]] + ) -> None: + """Store multiple file_ids in a single disk write. + + Args: + entries: List of (key, file_id, media_type, thumbhash) tuples + """ + if not entries: + return + + if self._data is None: + self._data = {"files": {}} + + now_iso = datetime.now(timezone.utc).isoformat() + for key, file_id, media_type, thumbhash in entries: + entry_data: dict[str, Any] = { + "file_id": file_id, + "type": media_type, + "cached_at": now_iso, + } + if thumbhash is not None: + entry_data["thumbhash"] = thumbhash + self._data["files"][key] = entry_data + + await self._backend.save(self._data) + _LOGGER.debug("Batch cached %d Telegram file_ids", len(entries)) + + async def async_remove(self) -> None: + """Remove all cache data.""" + await self._backend.remove() + self._data = None diff --git a/packages/core/src/immich_watcher_core/telegram/client.py b/packages/core/src/immich_watcher_core/telegram/client.py new file mode 100644 index 0000000..9778635 --- /dev/null +++ b/packages/core/src/immich_watcher_core/telegram/client.py @@ -0,0 +1,931 @@ +"""Telegram Bot API client for sending notifications with media.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import mimetypes +from typing import Any, Callable + +import aiohttp +from aiohttp import FormData + +from .cache import TelegramFileCache +from .media import ( + TELEGRAM_API_BASE_URL, + TELEGRAM_MAX_PHOTO_SIZE, + TELEGRAM_MAX_VIDEO_SIZE, + check_photo_limits, + extract_asset_id_from_url, + is_asset_id, + split_media_by_upload_size, +) + +_LOGGER = logging.getLogger(__name__) + +# Type alias for notification results +NotificationResult = dict[str, Any] + + +class TelegramClient: + """Async Telegram Bot API client for sending notifications with media. + + Decoupled from Home Assistant - accepts session, caches, and resolver + callbacks via constructor. + """ + + def __init__( + self, + session: aiohttp.ClientSession, + bot_token: str, + *, + url_cache: TelegramFileCache | None = None, + asset_cache: TelegramFileCache | None = None, + url_resolver: Callable[[str], str] | None = None, + thumbhash_resolver: Callable[[str], str | None] | None = None, + ) -> None: + """Initialize the Telegram client. + + Args: + session: aiohttp client session (caller manages lifecycle) + bot_token: Telegram Bot API token + url_cache: Cache for URL-keyed file_ids (TTL mode) + asset_cache: Cache for asset ID-keyed file_ids (thumbhash mode) + url_resolver: Optional callback to convert external URLs to internal + URLs for faster local downloads + thumbhash_resolver: Optional callback to get current thumbhash for + an asset ID (for cache validation) + """ + self._session = session + self._token = bot_token + self._url_cache = url_cache + self._asset_cache = asset_cache + self._url_resolver = url_resolver + self._thumbhash_resolver = thumbhash_resolver + + def _resolve_url(self, url: str) -> str: + """Convert external URL to internal URL if resolver is available.""" + if self._url_resolver: + return self._url_resolver(url) + return url + + def _get_cache_and_key( + self, + url: str | None, + cache_key: str | None = None, + ) -> tuple[TelegramFileCache | None, str | None, str | None]: + """Determine which cache, key, and thumbhash to use. + + Priority: custom cache_key -> direct asset ID -> extracted asset ID -> URL + """ + if cache_key: + return self._url_cache, cache_key, None + + if url: + if is_asset_id(url): + thumbhash = self._thumbhash_resolver(url) if self._thumbhash_resolver else None + return self._asset_cache, url, thumbhash + asset_id = extract_asset_id_from_url(url) + if asset_id: + thumbhash = self._thumbhash_resolver(asset_id) if self._thumbhash_resolver else None + return self._asset_cache, asset_id, thumbhash + return self._url_cache, url, None + + return None, None, None + + def _get_cache_for_key(self, key: str, is_asset: bool | None = None) -> TelegramFileCache | None: + """Return asset cache if key is a UUID, otherwise URL cache.""" + if is_asset is None: + is_asset = is_asset_id(key) + return self._asset_cache if is_asset else self._url_cache + + async def send_notification( + self, + chat_id: str, + assets: list[dict[str, str]] | None = None, + caption: str | None = None, + reply_to_message_id: int | None = None, + disable_web_page_preview: bool | None = None, + parse_mode: str = "HTML", + max_group_size: int = 10, + chunk_delay: int = 0, + max_asset_data_size: int | None = None, + send_large_photos_as_documents: bool = False, + chat_action: str | None = "typing", + ) -> NotificationResult: + """Send a Telegram notification (text and/or media). + + This is the main entry point. Dispatches to appropriate method + based on assets list. + """ + if not assets: + return await self.send_message( + chat_id, caption or "", reply_to_message_id, + disable_web_page_preview, parse_mode, + ) + + typing_task = None + if chat_action: + typing_task = self._start_typing_indicator(chat_id, chat_action) + + try: + if len(assets) == 1 and assets[0].get("type") == "photo": + return await self._send_photo( + chat_id, assets[0].get("url"), caption, reply_to_message_id, + parse_mode, max_asset_data_size, send_large_photos_as_documents, + assets[0].get("content_type"), assets[0].get("cache_key"), + ) + + if len(assets) == 1 and assets[0].get("type") == "video": + return await self._send_video( + chat_id, assets[0].get("url"), caption, reply_to_message_id, + parse_mode, max_asset_data_size, + assets[0].get("content_type"), assets[0].get("cache_key"), + ) + + if len(assets) == 1 and assets[0].get("type", "document") == "document": + url = assets[0].get("url") + if not url: + return {"success": False, "error": "Missing 'url' for document"} + try: + download_url = self._resolve_url(url) + async with self._session.get(download_url) as resp: + if resp.status != 200: + return {"success": False, "error": f"Failed to download media: HTTP {resp.status}"} + data = await resp.read() + if max_asset_data_size is not None and len(data) > max_asset_data_size: + return {"success": False, "error": f"Media size ({len(data)} bytes) exceeds max_asset_data_size limit ({max_asset_data_size} bytes)"} + filename = url.split("/")[-1].split("?")[0] or "file" + return await self._send_document( + chat_id, data, filename, caption, reply_to_message_id, + parse_mode, url, assets[0].get("content_type"), + assets[0].get("cache_key"), + ) + except aiohttp.ClientError as err: + return {"success": False, "error": f"Failed to download media: {err}"} + + return await self._send_media_group( + chat_id, assets, caption, reply_to_message_id, max_group_size, + chunk_delay, parse_mode, max_asset_data_size, + send_large_photos_as_documents, + ) + finally: + if typing_task: + typing_task.cancel() + try: + await typing_task + except asyncio.CancelledError: + pass + + async def send_message( + self, + chat_id: str, + text: str, + reply_to_message_id: int | None = None, + disable_web_page_preview: bool | None = None, + parse_mode: str = "HTML", + ) -> NotificationResult: + """Send a simple text message.""" + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendMessage" + + payload: dict[str, Any] = { + "chat_id": chat_id, + "text": text or "Notification", + "parse_mode": parse_mode, + } + if reply_to_message_id: + payload["reply_to_message_id"] = reply_to_message_id + if disable_web_page_preview is not None: + payload["disable_web_page_preview"] = disable_web_page_preview + + try: + _LOGGER.debug("Sending text message to Telegram") + async with self._session.post(telegram_url, json=payload) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + return { + "success": True, + "message_id": result.get("result", {}).get("message_id"), + } + _LOGGER.error("Telegram API error: %s", result) + return { + "success": False, + "error": result.get("description", "Unknown Telegram error"), + "error_code": result.get("error_code"), + } + except aiohttp.ClientError as err: + _LOGGER.error("Telegram message send failed: %s", err) + return {"success": False, "error": str(err)} + + async def send_chat_action( + self, chat_id: str, action: str = "typing" + ) -> bool: + """Send a chat action indicator (typing, upload_photo, etc.).""" + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendChatAction" + payload = {"chat_id": chat_id, "action": action} + + try: + async with self._session.post(telegram_url, json=payload) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + return True + _LOGGER.debug("Failed to send chat action: %s", result.get("description")) + return False + except aiohttp.ClientError as err: + _LOGGER.debug("Chat action request failed: %s", err) + return False + + def _start_typing_indicator( + self, chat_id: str, action: str = "typing" + ) -> asyncio.Task: + """Start a background task that sends chat action every 4 seconds.""" + + async def action_loop() -> None: + try: + while True: + await self.send_chat_action(chat_id, action) + await asyncio.sleep(4) + except asyncio.CancelledError: + _LOGGER.debug("Chat action indicator stopped for action '%s'", action) + + return asyncio.create_task(action_loop()) + + def _log_error( + self, + error_code: int | None, + description: str, + data: bytes | None = None, + media_type: str = "photo", + ) -> None: + """Log detailed Telegram API error with diagnostics.""" + error_msg = f"Telegram API error ({error_code}): {description}" + + if data: + error_msg += f" | Media size: {len(data)} bytes ({len(data) / (1024 * 1024):.2f} MB)" + + if media_type == "photo": + try: + from PIL import Image + import io + + img = Image.open(io.BytesIO(data)) + width, height = img.size + dimension_sum = width + height + error_msg += f" | Dimensions: {width}x{height} (sum={dimension_sum})" + + if len(data) > TELEGRAM_MAX_PHOTO_SIZE: + error_msg += f" | EXCEEDS size limit ({TELEGRAM_MAX_PHOTO_SIZE / (1024 * 1024):.0f} MB)" + if dimension_sum > 10000: + error_msg += f" | EXCEEDS dimension limit (10000)" + except Exception: + pass + + if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE: + error_msg += f" | EXCEEDS upload limit ({TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024):.0f} MB)" + + suggestions = [] + if "dimension" in description.lower() or "PHOTO_INVALID_DIMENSIONS" in description: + suggestions.append("Photo dimensions too large - consider send_large_photos_as_documents=true") + elif "too large" in description.lower() or error_code == 413: + suggestions.append("File too large - consider send_large_photos_as_documents=true or max_asset_data_size") + elif "entity too large" in description.lower(): + suggestions.append("Request entity too large - reduce max_group_size or set max_asset_data_size") + + if suggestions: + error_msg += f" | Suggestions: {'; '.join(suggestions)}" + + _LOGGER.error(error_msg) + + async def _send_photo( + self, + chat_id: str, + url: str | None, + caption: str | None = None, + reply_to_message_id: int | None = None, + parse_mode: str = "HTML", + max_asset_data_size: int | None = None, + send_large_photos_as_documents: bool = False, + content_type: str | None = None, + cache_key: str | None = None, + ) -> NotificationResult: + """Send a single photo to Telegram.""" + if not content_type: + content_type = "image/jpeg" + if not url: + return {"success": False, "error": "Missing 'url' for photo"} + + effective_cache, effective_cache_key, effective_thumbhash = self._get_cache_and_key(url, cache_key) + + # Check cache + cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None + if cached and cached.get("file_id") and effective_cache_key: + file_id = cached["file_id"] + _LOGGER.debug("Using cached Telegram file_id for photo") + payload = {"chat_id": chat_id, "photo": file_id, "parse_mode": parse_mode} + if caption: + payload["caption"] = caption + if reply_to_message_id: + payload["reply_to_message_id"] = reply_to_message_id + + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendPhoto" + try: + async with self._session.post(telegram_url, json=payload) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + return {"success": True, "message_id": result.get("result", {}).get("message_id"), "cached": True} + _LOGGER.debug("Cached file_id failed, will re-upload: %s", result.get("description")) + except aiohttp.ClientError as err: + _LOGGER.debug("Cached file_id request failed: %s", err) + + try: + download_url = self._resolve_url(url) + _LOGGER.debug("Downloading photo from %s", download_url[:80]) + async with self._session.get(download_url) as resp: + if resp.status != 200: + return {"success": False, "error": f"Failed to download photo: HTTP {resp.status}"} + data = await resp.read() + _LOGGER.debug("Downloaded photo: %d bytes", len(data)) + + if max_asset_data_size is not None and len(data) > max_asset_data_size: + return {"success": False, "error": f"Photo size ({len(data)} bytes) exceeds max_asset_data_size limit ({max_asset_data_size} bytes)", "skipped": True} + + exceeds_limits, reason, width, height = check_photo_limits(data) + if exceeds_limits: + if send_large_photos_as_documents: + _LOGGER.info("Photo %s, sending as document", reason) + return await self._send_document( + chat_id, data, "photo.jpg", caption, reply_to_message_id, + parse_mode, url, None, cache_key, + ) + return {"success": False, "error": f"Photo {reason}", "skipped": True} + + form = FormData() + form.add_field("chat_id", chat_id) + form.add_field("photo", data, filename="photo.jpg", content_type=content_type) + form.add_field("parse_mode", parse_mode) + if caption: + form.add_field("caption", caption) + if reply_to_message_id: + form.add_field("reply_to_message_id", str(reply_to_message_id)) + + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendPhoto" + _LOGGER.debug("Uploading photo to Telegram") + async with self._session.post(telegram_url, data=form) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + photos = result.get("result", {}).get("photo", []) + if photos and effective_cache and effective_cache_key: + file_id = photos[-1].get("file_id") + if file_id: + await effective_cache.async_set(effective_cache_key, file_id, "photo", thumbhash=effective_thumbhash) + return {"success": True, "message_id": result.get("result", {}).get("message_id")} + self._log_error(result.get("error_code"), result.get("description", "Unknown Telegram error"), data, "photo") + return {"success": False, "error": result.get("description", "Unknown Telegram error"), "error_code": result.get("error_code")} + except aiohttp.ClientError as err: + _LOGGER.error("Telegram photo upload failed: %s", err) + return {"success": False, "error": str(err)} + + async def _send_video( + self, + chat_id: str, + url: str | None, + caption: str | None = None, + reply_to_message_id: int | None = None, + parse_mode: str = "HTML", + max_asset_data_size: int | None = None, + content_type: str | None = None, + cache_key: str | None = None, + ) -> NotificationResult: + """Send a single video to Telegram.""" + if not content_type: + content_type = "video/mp4" + if not url: + return {"success": False, "error": "Missing 'url' for video"} + + effective_cache, effective_cache_key, effective_thumbhash = self._get_cache_and_key(url, cache_key) + + cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) if effective_cache and effective_cache_key else None + if cached and cached.get("file_id") and effective_cache_key: + file_id = cached["file_id"] + _LOGGER.debug("Using cached Telegram file_id for video") + payload = {"chat_id": chat_id, "video": file_id, "parse_mode": parse_mode} + if caption: + payload["caption"] = caption + if reply_to_message_id: + payload["reply_to_message_id"] = reply_to_message_id + + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendVideo" + try: + async with self._session.post(telegram_url, json=payload) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + return {"success": True, "message_id": result.get("result", {}).get("message_id"), "cached": True} + _LOGGER.debug("Cached file_id failed, will re-upload: %s", result.get("description")) + except aiohttp.ClientError as err: + _LOGGER.debug("Cached file_id request failed: %s", err) + + try: + download_url = self._resolve_url(url) + _LOGGER.debug("Downloading video from %s", download_url[:80]) + async with self._session.get(download_url) as resp: + if resp.status != 200: + return {"success": False, "error": f"Failed to download video: HTTP {resp.status}"} + data = await resp.read() + _LOGGER.debug("Downloaded video: %d bytes", len(data)) + + if max_asset_data_size is not None and len(data) > max_asset_data_size: + return {"success": False, "error": f"Video size ({len(data)} bytes) exceeds max_asset_data_size limit ({max_asset_data_size} bytes)", "skipped": True} + + if len(data) > TELEGRAM_MAX_VIDEO_SIZE: + return {"success": False, "error": f"Video size ({len(data) / (1024 * 1024):.1f} MB) exceeds Telegram's {TELEGRAM_MAX_VIDEO_SIZE / (1024 * 1024):.0f} MB upload limit", "skipped": True} + + form = FormData() + form.add_field("chat_id", chat_id) + form.add_field("video", data, filename="video.mp4", content_type=content_type) + form.add_field("parse_mode", parse_mode) + if caption: + form.add_field("caption", caption) + if reply_to_message_id: + form.add_field("reply_to_message_id", str(reply_to_message_id)) + + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendVideo" + _LOGGER.debug("Uploading video to Telegram") + async with self._session.post(telegram_url, data=form) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + video = result.get("result", {}).get("video", {}) + if video and effective_cache and effective_cache_key: + file_id = video.get("file_id") + if file_id: + await effective_cache.async_set(effective_cache_key, file_id, "video", thumbhash=effective_thumbhash) + return {"success": True, "message_id": result.get("result", {}).get("message_id")} + self._log_error(result.get("error_code"), result.get("description", "Unknown Telegram error"), data, "video") + return {"success": False, "error": result.get("description", "Unknown Telegram error"), "error_code": result.get("error_code")} + except aiohttp.ClientError as err: + _LOGGER.error("Telegram video upload failed: %s", err) + return {"success": False, "error": str(err)} + + async def _send_document( + self, + chat_id: str, + data: bytes, + filename: str = "file", + caption: str | None = None, + reply_to_message_id: int | None = None, + parse_mode: str = "HTML", + source_url: str | None = None, + content_type: str | None = None, + cache_key: str | None = None, + ) -> NotificationResult: + """Send a file as a document to Telegram.""" + if not content_type: + content_type, _ = mimetypes.guess_type(filename) + if not content_type: + content_type = "application/octet-stream" + + effective_cache, effective_cache_key, effective_thumbhash = self._get_cache_and_key(source_url, cache_key) + + if effective_cache and effective_cache_key: + cached = effective_cache.get(effective_cache_key, thumbhash=effective_thumbhash) + if cached and cached.get("file_id") and cached.get("type") == "document": + file_id = cached["file_id"] + _LOGGER.debug("Using cached Telegram file_id for document") + payload = {"chat_id": chat_id, "document": file_id, "parse_mode": parse_mode} + if caption: + payload["caption"] = caption + if reply_to_message_id: + payload["reply_to_message_id"] = reply_to_message_id + + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendDocument" + try: + async with self._session.post(telegram_url, json=payload) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + return {"success": True, "message_id": result.get("result", {}).get("message_id"), "cached": True} + _LOGGER.debug("Cached file_id failed, will re-upload: %s", result.get("description")) + except aiohttp.ClientError as err: + _LOGGER.debug("Cached file_id request failed: %s", err) + + try: + form = FormData() + form.add_field("chat_id", chat_id) + form.add_field("document", data, filename=filename, content_type=content_type) + form.add_field("parse_mode", parse_mode) + if caption: + form.add_field("caption", caption) + if reply_to_message_id: + form.add_field("reply_to_message_id", str(reply_to_message_id)) + + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendDocument" + _LOGGER.debug("Uploading document to Telegram (%d bytes, %s)", len(data), content_type) + async with self._session.post(telegram_url, data=form) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + if effective_cache_key and effective_cache: + document = result.get("result", {}).get("document", {}) + file_id = document.get("file_id") + if file_id: + await effective_cache.async_set(effective_cache_key, file_id, "document", thumbhash=effective_thumbhash) + return {"success": True, "message_id": result.get("result", {}).get("message_id")} + self._log_error(result.get("error_code"), result.get("description", "Unknown Telegram error"), data, "document") + return {"success": False, "error": result.get("description", "Unknown Telegram error"), "error_code": result.get("error_code")} + except aiohttp.ClientError as err: + _LOGGER.error("Telegram document upload failed: %s", err) + return {"success": False, "error": str(err)} + + async def _send_media_group( + self, + chat_id: str, + assets: list[dict[str, str]], + caption: str | None = None, + reply_to_message_id: int | None = None, + max_group_size: int = 10, + chunk_delay: int = 0, + parse_mode: str = "HTML", + max_asset_data_size: int | None = None, + send_large_photos_as_documents: bool = False, + ) -> NotificationResult: + """Send media assets as media group(s).""" + chunks = [assets[i:i + max_group_size] for i in range(0, len(assets), max_group_size)] + all_message_ids = [] + + _LOGGER.debug( + "Sending %d media items in %d chunk(s) of max %d items (delay: %dms)", + len(assets), len(chunks), max_group_size, chunk_delay, + ) + + for chunk_idx, chunk in enumerate(chunks): + if chunk_idx > 0 and chunk_delay > 0: + await asyncio.sleep(chunk_delay / 1000) + + # Single-item chunks use dedicated APIs + if len(chunk) == 1: + item = chunk[0] + media_type = item.get("type", "document") + url = item.get("url") + item_content_type = item.get("content_type") + item_cache_key = item.get("cache_key") + chunk_caption = caption if chunk_idx == 0 else None + chunk_reply_to = reply_to_message_id if chunk_idx == 0 else None + result = None + + if media_type == "photo": + result = await self._send_photo( + chat_id, url, chunk_caption, chunk_reply_to, parse_mode, + max_asset_data_size, send_large_photos_as_documents, + item_content_type, item_cache_key, + ) + elif media_type == "video": + result = await self._send_video( + chat_id, url, chunk_caption, chunk_reply_to, parse_mode, + max_asset_data_size, item_content_type, item_cache_key, + ) + else: + if not url: + return {"success": False, "error": "Missing 'url' for document", "failed_at_chunk": chunk_idx + 1} + try: + download_url = self._resolve_url(url) + async with self._session.get(download_url) as resp: + if resp.status != 200: + return {"success": False, "error": f"Failed to download media: HTTP {resp.status}", "failed_at_chunk": chunk_idx + 1} + data = await resp.read() + if max_asset_data_size is not None and len(data) > max_asset_data_size: + continue + filename = url.split("/")[-1].split("?")[0] or "file" + result = await self._send_document( + chat_id, data, filename, chunk_caption, chunk_reply_to, + parse_mode, url, item_content_type, item_cache_key, + ) + except aiohttp.ClientError as err: + return {"success": False, "error": f"Failed to download media: {err}", "failed_at_chunk": chunk_idx + 1} + + if result is None: + continue + if not result.get("success"): + result["failed_at_chunk"] = chunk_idx + 1 + return result + all_message_ids.append(result.get("message_id")) + continue + + # Multi-item chunk: collect media items + result = await self._process_media_group_chunk( + chat_id, chunk, chunk_idx, len(chunks), caption, + reply_to_message_id, max_group_size, chunk_delay, parse_mode, + max_asset_data_size, send_large_photos_as_documents, all_message_ids, + ) + if result is not None: + return result + + return {"success": True, "message_ids": all_message_ids, "chunks_sent": len(chunks)} + + async def _process_media_group_chunk( + self, + chat_id: str, + chunk: list[dict[str, str]], + chunk_idx: int, + total_chunks: int, + caption: str | None, + reply_to_message_id: int | None, + max_group_size: int, + chunk_delay: int, + parse_mode: str, + max_asset_data_size: int | None, + send_large_photos_as_documents: bool, + all_message_ids: list, + ) -> NotificationResult | None: + """Process a multi-item media group chunk. Returns error result or None on success.""" + # media_items: (type, media_ref, filename, cache_key, is_cached, content_type) + media_items: list[tuple[str, str | bytes, str, str, bool, str | None]] = [] + oversized_photos: list[tuple[bytes, str | None, str, str | None]] = [] + documents_to_send: list[tuple[bytes, str | None, str, str | None, str, str | None]] = [] + skipped_count = 0 + + for i, item in enumerate(chunk): + url = item.get("url") + if not url: + return {"success": False, "error": f"Missing 'url' in item {chunk_idx * max_group_size + i}"} + + media_type = item.get("type", "document") + item_content_type = item.get("content_type") + custom_cache_key = item.get("cache_key") + extracted_asset_id = extract_asset_id_from_url(url) if not custom_cache_key else None + item_cache_key = custom_cache_key or extracted_asset_id or url + + if media_type not in ("photo", "video", "document"): + return {"success": False, "error": f"Invalid type '{media_type}' in item {chunk_idx * max_group_size + i}"} + + if media_type == "document": + try: + download_url = self._resolve_url(url) + async with self._session.get(download_url) as resp: + if resp.status != 200: + return {"success": False, "error": f"Failed to download media {chunk_idx * max_group_size + i}: HTTP {resp.status}"} + data = await resp.read() + if max_asset_data_size is not None and len(data) > max_asset_data_size: + skipped_count += 1 + continue + doc_caption = caption if chunk_idx == 0 and i == 0 and not media_items and not documents_to_send else None + filename = url.split("/")[-1].split("?")[0] or f"file_{i}" + documents_to_send.append((data, doc_caption, url, custom_cache_key, filename, item_content_type)) + except aiohttp.ClientError as err: + return {"success": False, "error": f"Failed to download media {chunk_idx * max_group_size + i}: {err}"} + continue + + # Check cache for photos/videos + ck_is_asset = is_asset_id(item_cache_key) + item_cache = self._get_cache_for_key(item_cache_key, ck_is_asset) + item_thumbhash = self._thumbhash_resolver(item_cache_key) if ck_is_asset and self._thumbhash_resolver else None + cached = item_cache.get(item_cache_key, thumbhash=item_thumbhash) if item_cache else None + if cached and cached.get("file_id"): + ext = "jpg" if media_type == "photo" else "mp4" + filename = f"media_{chunk_idx * max_group_size + i}.{ext}" + media_items.append((media_type, cached["file_id"], filename, item_cache_key, True, item_content_type)) + continue + + try: + download_url = self._resolve_url(url) + async with self._session.get(download_url) as resp: + if resp.status != 200: + return {"success": False, "error": f"Failed to download media {chunk_idx * max_group_size + i}: HTTP {resp.status}"} + data = await resp.read() + + if max_asset_data_size is not None and len(data) > max_asset_data_size: + skipped_count += 1 + continue + + if media_type == "video" and len(data) > TELEGRAM_MAX_VIDEO_SIZE: + skipped_count += 1 + continue + + if media_type == "photo": + exceeds_limits, reason, _, _ = check_photo_limits(data) + if exceeds_limits: + if send_large_photos_as_documents: + photo_caption = caption if chunk_idx == 0 and i == 0 and not media_items else None + oversized_photos.append((data, photo_caption, url, custom_cache_key)) + continue + skipped_count += 1 + continue + + ext = "jpg" if media_type == "photo" else "mp4" + filename = f"media_{chunk_idx * max_group_size + i}.{ext}" + media_items.append((media_type, data, filename, item_cache_key, False, item_content_type)) + except aiohttp.ClientError as err: + return {"success": False, "error": f"Failed to download media {chunk_idx * max_group_size + i}: {err}"} + + if not media_items and not oversized_photos and not documents_to_send: + return None + + # Send media groups + if media_items: + media_sub_groups = split_media_by_upload_size(media_items, TELEGRAM_MAX_VIDEO_SIZE) + first_caption_used = False + + for sub_idx, sub_group_items in enumerate(media_sub_groups): + is_first = chunk_idx == 0 and sub_idx == 0 + sub_caption = caption if is_first and not first_caption_used and not oversized_photos else None + sub_reply_to = reply_to_message_id if is_first else None + + if sub_idx > 0 and chunk_delay > 0: + await asyncio.sleep(chunk_delay / 1000) + + result = await self._send_sub_group( + chat_id, sub_group_items, sub_caption, sub_reply_to, + parse_mode, chunk_idx, sub_idx, len(media_sub_groups), + all_message_ids, + ) + if result is not None: + if result.get("caption_used"): + first_caption_used = True + del result["caption_used"] + if not result.get("success", True): + return result + + # Send oversized photos as documents + for i, (data, photo_caption, photo_url, photo_cache_key) in enumerate(oversized_photos): + result = await self._send_document( + chat_id, data, f"photo_{i}.jpg", photo_caption, None, + parse_mode, photo_url, None, photo_cache_key, + ) + if result.get("success"): + all_message_ids.append(result.get("message_id")) + + # Send documents + for i, (data, doc_caption, doc_url, doc_cache_key, filename, doc_ct) in enumerate(documents_to_send): + result = await self._send_document( + chat_id, data, filename, doc_caption, None, + parse_mode, doc_url, doc_ct, doc_cache_key, + ) + if result.get("success"): + all_message_ids.append(result.get("message_id")) + + return None + + async def _send_sub_group( + self, + chat_id: str, + items: list[tuple], + caption: str | None, + reply_to: int | None, + parse_mode: str, + chunk_idx: int, + sub_idx: int, + total_sub_groups: int, + all_message_ids: list, + ) -> NotificationResult | None: + """Send a sub-group of media items. Returns error result, caption_used marker, or None.""" + # Single item - use sendPhoto/sendVideo + if len(items) == 1: + sg_type, sg_ref, sg_fname, sg_ck, sg_cached, sg_ct = items[0] + api_method = "sendPhoto" if sg_type == "photo" else "sendVideo" + media_field = "photo" if sg_type == "photo" else "video" + + try: + if sg_cached: + payload: dict[str, Any] = {"chat_id": chat_id, media_field: sg_ref, "parse_mode": parse_mode} + if caption: + payload["caption"] = caption + if reply_to: + payload["reply_to_message_id"] = reply_to + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/{api_method}" + async with self._session.post(telegram_url, json=payload) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + all_message_ids.append(result["result"].get("message_id")) + return {"caption_used": True} if caption else None + sg_cached = False + + if not sg_cached: + form = FormData() + form.add_field("chat_id", chat_id) + sg_content_type = sg_ct or ("image/jpeg" if sg_type == "photo" else "video/mp4") + form.add_field(media_field, sg_ref, filename=sg_fname, content_type=sg_content_type) + form.add_field("parse_mode", parse_mode) + if caption: + form.add_field("caption", caption) + if reply_to: + form.add_field("reply_to_message_id", str(reply_to)) + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/{api_method}" + async with self._session.post(telegram_url, data=form) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + all_message_ids.append(result["result"].get("message_id")) + # Cache uploaded file + ck_is_asset = is_asset_id(sg_ck) + sg_cache = self._get_cache_for_key(sg_ck, ck_is_asset) + if sg_cache: + sg_thumbhash = self._thumbhash_resolver(sg_ck) if ck_is_asset and self._thumbhash_resolver else None + result_data = result.get("result", {}) + if sg_type == "photo": + photos = result_data.get("photo", []) + if photos: + await sg_cache.async_set(sg_ck, photos[-1].get("file_id"), "photo", thumbhash=sg_thumbhash) + elif sg_type == "video": + video = result_data.get("video", {}) + if video.get("file_id"): + await sg_cache.async_set(sg_ck, video["file_id"], "video", thumbhash=sg_thumbhash) + return {"caption_used": True} if caption else None + self._log_error(result.get("error_code"), result.get("description", "Unknown"), sg_ref if isinstance(sg_ref, bytes) else None, sg_type) + return {"success": False, "error": result.get("description", "Unknown"), "failed_at_chunk": chunk_idx + 1} + except aiohttp.ClientError as err: + return {"success": False, "error": str(err), "failed_at_chunk": chunk_idx + 1} + return None + + # Multiple items - sendMediaGroup + all_cached = all(item[4] for item in items) + + if all_cached: + media_json = [] + for i, (media_type, file_id, _, _, _, _) in enumerate(items): + mij: dict[str, Any] = {"type": media_type, "media": file_id} + if i == 0 and caption: + mij["caption"] = caption + mij["parse_mode"] = parse_mode + media_json.append(mij) + + payload = {"chat_id": chat_id, "media": media_json} + if reply_to: + payload["reply_to_message_id"] = reply_to + + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendMediaGroup" + try: + async with self._session.post(telegram_url, json=payload) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + all_message_ids.extend(msg.get("message_id") for msg in result.get("result", [])) + return {"caption_used": True} if caption else None + all_cached = False + except aiohttp.ClientError: + all_cached = False + + if not all_cached: + form = FormData() + form.add_field("chat_id", chat_id) + if reply_to: + form.add_field("reply_to_message_id", str(reply_to)) + + media_json = [] + upload_idx = 0 + keys_to_cache: list[tuple[str, int, str, bool, str | None]] = [] + + for i, (media_type, media_ref, filename, item_cache_key, is_cached, item_ct) in enumerate(items): + if is_cached: + mij = {"type": media_type, "media": media_ref} + else: + attach_name = f"file{upload_idx}" + mij = {"type": media_type, "media": f"attach://{attach_name}"} + ct = item_ct or ("image/jpeg" if media_type == "photo" else "video/mp4") + form.add_field(attach_name, media_ref, filename=filename, content_type=ct) + ck_is_asset = is_asset_id(item_cache_key) + ck_thumbhash = self._thumbhash_resolver(item_cache_key) if ck_is_asset and self._thumbhash_resolver else None + keys_to_cache.append((item_cache_key, i, media_type, ck_is_asset, ck_thumbhash)) + upload_idx += 1 + + if i == 0 and caption: + mij["caption"] = caption + mij["parse_mode"] = parse_mode + media_json.append(mij) + + form.add_field("media", json.dumps(media_json)) + telegram_url = f"{TELEGRAM_API_BASE_URL}{self._token}/sendMediaGroup" + + try: + async with self._session.post(telegram_url, data=form) as response: + result = await response.json() + if response.status == 200 and result.get("ok"): + all_message_ids.extend(msg.get("message_id") for msg in result.get("result", [])) + + # Batch cache new file_ids + if keys_to_cache: + result_messages = result.get("result", []) + cache_batches: dict[int, tuple[TelegramFileCache, list[tuple[str, str, str, str | None]]]] = {} + for ck, result_idx, m_type, ck_is_asset, ck_thumbhash in keys_to_cache: + ck_cache = self._get_cache_for_key(ck, ck_is_asset) + if result_idx >= len(result_messages) or not ck_cache: + continue + msg = result_messages[result_idx] + file_id = None + if m_type == "photo": + photos = msg.get("photo", []) + if photos: + file_id = photos[-1].get("file_id") + elif m_type == "video": + video = msg.get("video", {}) + file_id = video.get("file_id") + if file_id: + cache_id = id(ck_cache) + if cache_id not in cache_batches: + cache_batches[cache_id] = (ck_cache, []) + cache_batches[cache_id][1].append((ck, file_id, m_type, ck_thumbhash)) + for ck_cache, batch_entries in cache_batches.values(): + await ck_cache.async_set_many(batch_entries) + + return {"caption_used": True} if caption else None + + _LOGGER.error("Telegram API error for media group: %s", result.get("description")) + return {"success": False, "error": result.get("description", "Unknown"), "failed_at_chunk": chunk_idx + 1} + except aiohttp.ClientError as err: + return {"success": False, "error": str(err), "failed_at_chunk": chunk_idx + 1} + + return None diff --git a/packages/core/src/immich_watcher_core/telegram/media.py b/packages/core/src/immich_watcher_core/telegram/media.py new file mode 100644 index 0000000..0739153 --- /dev/null +++ b/packages/core/src/immich_watcher_core/telegram/media.py @@ -0,0 +1,133 @@ +"""Telegram media utilities - constants, URL helpers, and size splitting.""" + +from __future__ import annotations + +import re +from typing import Final + +# Telegram constants +TELEGRAM_API_BASE_URL: Final = "https://api.telegram.org/bot" +TELEGRAM_MAX_PHOTO_SIZE: Final = 10 * 1024 * 1024 # 10 MB +TELEGRAM_MAX_VIDEO_SIZE: Final = 50 * 1024 * 1024 # 50 MB +TELEGRAM_MAX_DIMENSION_SUM: Final = 10000 # Max width + height in pixels + +# Regex pattern for Immich asset ID (UUID format) +_ASSET_ID_PATTERN = re.compile(r"^[a-f0-9-]{36}$") + +# Regex patterns to extract asset ID from Immich URLs +_IMMICH_ASSET_ID_PATTERNS = [ + re.compile(r"/api/assets/([a-f0-9-]{36})/(?:original|thumbnail|video)"), + re.compile(r"/share/[^/]+/photos/([a-f0-9-]{36})"), +] + + +def is_asset_id(value: str) -> bool: + """Check if a string is a valid Immich asset ID (UUID format).""" + return bool(_ASSET_ID_PATTERN.match(value)) + + +def extract_asset_id_from_url(url: str) -> str | None: + """Extract asset ID from Immich URL if possible. + + Supports: + - /api/assets/{asset_id}/original?... + - /api/assets/{asset_id}/thumbnail?... + - /api/assets/{asset_id}/video/playback?... + - /share/{key}/photos/{asset_id} + """ + if not url: + return None + for pattern in _IMMICH_ASSET_ID_PATTERNS: + match = pattern.search(url) + if match: + return match.group(1) + return None + + +def split_media_by_upload_size( + media_items: list[tuple], max_upload_size: int +) -> list[list[tuple]]: + """Split media items into sub-groups respecting upload size limit. + + Cached items (file_id references) don't count toward upload size since + they aren't uploaded. Only items with bytes data count. + + Args: + media_items: List of tuples where index [1] is str (file_id) or bytes (data) + and index [4] is bool (is_cached) + max_upload_size: Maximum total upload size in bytes per group + + Returns: + List of sub-groups, each respecting the size limit + """ + if not media_items: + return [] + + groups: list[list[tuple]] = [] + current_group: list[tuple] = [] + current_size = 0 + + for item in media_items: + media_ref = item[1] + is_cached = item[4] + + # Cached items don't count toward upload size + item_size = 0 if is_cached else (len(media_ref) if isinstance(media_ref, bytes) else 0) + + # If adding this item would exceed the limit and we have items already, + # start a new group + if current_group and current_size + item_size > max_upload_size: + groups.append(current_group) + current_group = [] + current_size = 0 + + current_group.append(item) + current_size += item_size + + if current_group: + groups.append(current_group) + + return groups + + +def check_photo_limits( + data: bytes, +) -> tuple[bool, str | None, int | None, int | None]: + """Check if photo data exceeds Telegram photo limits. + + Telegram limits for photos: + - Max file size: 10 MB + - Max dimension sum: ~10,000 pixels (width + height) + + Returns: + Tuple of (exceeds_limits, reason, width, height) + """ + if len(data) > TELEGRAM_MAX_PHOTO_SIZE: + return ( + True, + f"size {len(data)} bytes exceeds {TELEGRAM_MAX_PHOTO_SIZE} bytes limit", + None, + None, + ) + + try: + from PIL import Image + import io + + img = Image.open(io.BytesIO(data)) + width, height = img.size + dimension_sum = width + height + + if dimension_sum > TELEGRAM_MAX_DIMENSION_SUM: + return ( + True, + f"dimensions {width}x{height} (sum={dimension_sum}) exceed {TELEGRAM_MAX_DIMENSION_SUM} limit", + width, + height, + ) + + return False, None, width, height + except ImportError: + return False, None, None, None + except Exception: + return False, None, None, None diff --git a/packages/core/tests/__init__.py b/packages/core/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/core/tests/test_asset_utils.py b/packages/core/tests/test_asset_utils.py new file mode 100644 index 0000000..8ecd7fb --- /dev/null +++ b/packages/core/tests/test_asset_utils.py @@ -0,0 +1,185 @@ +"""Tests for asset filtering, sorting, and URL utilities.""" + +from immich_watcher_core.asset_utils import ( + build_asset_detail, + filter_assets, + get_any_url, + get_public_url, + get_protected_url, + sort_assets, +) +from immich_watcher_core.models import AssetInfo, SharedLinkInfo + + +def _make_asset( + asset_id: str = "a1", + asset_type: str = "IMAGE", + filename: str = "photo.jpg", + created_at: str = "2024-01-15T10:30:00Z", + is_favorite: bool = False, + rating: int | None = None, + city: str | None = None, + country: str | None = None, +) -> AssetInfo: + return AssetInfo( + id=asset_id, + type=asset_type, + filename=filename, + created_at=created_at, + is_favorite=is_favorite, + rating=rating, + city=city, + country=country, + is_processed=True, + ) + + +class TestFilterAssets: + def test_favorite_only(self): + assets = [_make_asset("a1", is_favorite=True), _make_asset("a2")] + result = filter_assets(assets, favorite_only=True) + assert len(result) == 1 + assert result[0].id == "a1" + + def test_min_rating(self): + assets = [ + _make_asset("a1", rating=5), + _make_asset("a2", rating=2), + _make_asset("a3"), # no rating + ] + result = filter_assets(assets, min_rating=3) + assert len(result) == 1 + assert result[0].id == "a1" + + def test_asset_type_photo(self): + assets = [ + _make_asset("a1", asset_type="IMAGE"), + _make_asset("a2", asset_type="VIDEO"), + ] + result = filter_assets(assets, asset_type="photo") + assert len(result) == 1 + assert result[0].type == "IMAGE" + + def test_date_range(self): + assets = [ + _make_asset("a1", created_at="2024-01-10T00:00:00Z"), + _make_asset("a2", created_at="2024-01-15T00:00:00Z"), + _make_asset("a3", created_at="2024-01-20T00:00:00Z"), + ] + result = filter_assets( + assets, min_date="2024-01-12T00:00:00Z", max_date="2024-01-18T00:00:00Z" + ) + assert len(result) == 1 + assert result[0].id == "a2" + + def test_memory_date(self): + assets = [ + _make_asset("a1", created_at="2023-03-19T10:00:00Z"), # same month/day, different year + _make_asset("a2", created_at="2024-03-19T10:00:00Z"), # same year as reference + _make_asset("a3", created_at="2023-06-15T10:00:00Z"), # different date + ] + result = filter_assets(assets, memory_date="2024-03-19T00:00:00Z") + assert len(result) == 1 + assert result[0].id == "a1" + + def test_city_filter(self): + assets = [ + _make_asset("a1", city="Paris"), + _make_asset("a2", city="London"), + _make_asset("a3"), + ] + result = filter_assets(assets, city="paris") + assert len(result) == 1 + assert result[0].id == "a1" + + +class TestSortAssets: + def test_sort_by_date_descending(self): + assets = [ + _make_asset("a1", created_at="2024-01-10T00:00:00Z"), + _make_asset("a2", created_at="2024-01-20T00:00:00Z"), + _make_asset("a3", created_at="2024-01-15T00:00:00Z"), + ] + result = sort_assets(assets, order_by="date", order="descending") + assert [a.id for a in result] == ["a2", "a3", "a1"] + + def test_sort_by_name(self): + assets = [ + _make_asset("a1", filename="charlie.jpg"), + _make_asset("a2", filename="alice.jpg"), + _make_asset("a3", filename="bob.jpg"), + ] + result = sort_assets(assets, order_by="name", order="ascending") + assert [a.id for a in result] == ["a2", "a3", "a1"] + + def test_sort_by_rating(self): + assets = [ + _make_asset("a1", rating=3), + _make_asset("a2", rating=5), + _make_asset("a3"), # None rating + ] + result = sort_assets(assets, order_by="rating", order="descending") + # With descending + (is_none, value) key: None goes last when reversed + # (True, 0) vs (False, 5) vs (False, 3) - reversed: (True, 0), (False, 5), (False, 3) + # Actually: reversed sort puts (True,0) first. Let's just check rated come before unrated + rated = [a for a in result if a.rating is not None] + assert rated[0].id == "a2" + assert rated[1].id == "a1" + + +class TestUrlHelpers: + def _make_links(self): + return [ + SharedLinkInfo(id="l1", key="public-key"), + SharedLinkInfo(id="l2", key="protected-key", has_password=True, password="pass123"), + ] + + def test_get_public_url(self): + links = self._make_links() + url = get_public_url("https://immich.example.com", links) + assert url == "https://immich.example.com/share/public-key" + + def test_get_protected_url(self): + links = self._make_links() + url = get_protected_url("https://immich.example.com", links) + assert url == "https://immich.example.com/share/protected-key" + + def test_get_any_url_prefers_public(self): + links = self._make_links() + url = get_any_url("https://immich.example.com", links) + assert url == "https://immich.example.com/share/public-key" + + def test_get_any_url_falls_back_to_protected(self): + links = [SharedLinkInfo(id="l1", key="prot-key", has_password=True, password="x")] + url = get_any_url("https://immich.example.com", links) + assert url == "https://immich.example.com/share/prot-key" + + def test_no_links(self): + assert get_public_url("https://example.com", []) is None + assert get_any_url("https://example.com", []) is None + + +class TestBuildAssetDetail: + def test_build_image_detail(self): + asset = _make_asset("a1", asset_type="IMAGE") + links = [SharedLinkInfo(id="l1", key="key1")] + detail = build_asset_detail(asset, "https://immich.example.com", links) + assert detail["id"] == "a1" + assert "url" in detail + assert "download_url" in detail + assert "photo_url" in detail + assert "thumbnail_url" in detail + + def test_build_video_detail(self): + asset = _make_asset("a1", asset_type="VIDEO") + links = [SharedLinkInfo(id="l1", key="key1")] + detail = build_asset_detail(asset, "https://immich.example.com", links) + assert "playback_url" in detail + assert "photo_url" not in detail + + def test_no_shared_links(self): + asset = _make_asset("a1") + detail = build_asset_detail(asset, "https://immich.example.com", []) + assert "url" not in detail + assert "download_url" not in detail + assert "thumbnail_url" in detail # always present diff --git a/packages/core/tests/test_change_detector.py b/packages/core/tests/test_change_detector.py new file mode 100644 index 0000000..accfd39 --- /dev/null +++ b/packages/core/tests/test_change_detector.py @@ -0,0 +1,139 @@ +"""Tests for change detection logic.""" + +from immich_watcher_core.change_detector import detect_album_changes +from immich_watcher_core.models import AlbumData, AssetInfo + + +def _make_album( + album_id: str = "album-1", + name: str = "Test Album", + shared: bool = False, + assets: dict[str, AssetInfo] | None = None, +) -> AlbumData: + """Helper to create AlbumData for testing.""" + if assets is None: + assets = {} + return AlbumData( + id=album_id, + name=name, + asset_count=len(assets), + photo_count=0, + video_count=0, + created_at="2024-01-01T00:00:00Z", + updated_at="2024-01-15T10:30:00Z", + shared=shared, + owner="Alice", + thumbnail_asset_id=None, + asset_ids=set(assets.keys()), + assets=assets, + ) + + +def _make_asset(asset_id: str, is_processed: bool = True) -> AssetInfo: + """Helper to create AssetInfo for testing.""" + return AssetInfo( + id=asset_id, + type="IMAGE", + filename=f"{asset_id}.jpg", + created_at="2024-01-15T10:30:00Z", + is_processed=is_processed, + thumbhash="abc" if is_processed else None, + ) + + +class TestDetectAlbumChanges: + def test_no_changes(self): + a1 = _make_asset("a1") + old = _make_album(assets={"a1": a1}) + new = _make_album(assets={"a1": a1}) + change, pending = detect_album_changes(old, new, set()) + assert change is None + assert pending == set() + + def test_assets_added(self): + a1 = _make_asset("a1") + a2 = _make_asset("a2") + old = _make_album(assets={"a1": a1}) + new = _make_album(assets={"a1": a1, "a2": a2}) + change, pending = detect_album_changes(old, new, set()) + assert change is not None + assert change.change_type == "assets_added" + assert change.added_count == 1 + assert change.added_assets[0].id == "a2" + + def test_assets_removed(self): + a1 = _make_asset("a1") + a2 = _make_asset("a2") + old = _make_album(assets={"a1": a1, "a2": a2}) + new = _make_album(assets={"a1": a1}) + change, pending = detect_album_changes(old, new, set()) + assert change is not None + assert change.change_type == "assets_removed" + assert change.removed_count == 1 + assert "a2" in change.removed_asset_ids + + def test_mixed_changes(self): + a1 = _make_asset("a1") + a2 = _make_asset("a2") + a3 = _make_asset("a3") + old = _make_album(assets={"a1": a1, "a2": a2}) + new = _make_album(assets={"a1": a1, "a3": a3}) + change, pending = detect_album_changes(old, new, set()) + assert change is not None + assert change.change_type == "changed" + assert change.added_count == 1 + assert change.removed_count == 1 + + def test_album_renamed(self): + a1 = _make_asset("a1") + old = _make_album(name="Old Name", assets={"a1": a1}) + new = _make_album(name="New Name", assets={"a1": a1}) + change, pending = detect_album_changes(old, new, set()) + assert change is not None + assert change.change_type == "album_renamed" + assert change.old_name == "Old Name" + assert change.new_name == "New Name" + + def test_sharing_changed(self): + a1 = _make_asset("a1") + old = _make_album(shared=False, assets={"a1": a1}) + new = _make_album(shared=True, assets={"a1": a1}) + change, pending = detect_album_changes(old, new, set()) + assert change is not None + assert change.change_type == "album_sharing_changed" + assert change.old_shared is False + assert change.new_shared is True + + def test_pending_asset_becomes_processed(self): + a1 = _make_asset("a1") + a2_unprocessed = _make_asset("a2", is_processed=False) + a2_processed = _make_asset("a2", is_processed=True) + + old = _make_album(assets={"a1": a1, "a2": a2_unprocessed}) + new = _make_album(assets={"a1": a1, "a2": a2_processed}) + + # a2 is in pending set + change, pending = detect_album_changes(old, new, {"a2"}) + assert change is not None + assert change.added_count == 1 + assert change.added_assets[0].id == "a2" + assert "a2" not in pending + + def test_unprocessed_asset_added_to_pending(self): + a1 = _make_asset("a1") + a2 = _make_asset("a2", is_processed=False) + old = _make_album(assets={"a1": a1}) + new = _make_album(assets={"a1": a1, "a2": a2}) + change, pending = detect_album_changes(old, new, set()) + # No change because a2 is unprocessed + assert change is None + assert "a2" in pending + + def test_pending_asset_removed(self): + a1 = _make_asset("a1") + old = _make_album(assets={"a1": a1}) + new = _make_album(assets={"a1": a1}) + # a2 was pending but now gone from album + change, pending = detect_album_changes(old, new, {"a2"}) + assert change is None + assert "a2" not in pending diff --git a/packages/core/tests/test_models.py b/packages/core/tests/test_models.py new file mode 100644 index 0000000..5537c01 --- /dev/null +++ b/packages/core/tests/test_models.py @@ -0,0 +1,185 @@ +"""Tests for data models.""" + +from datetime import datetime, timezone + +from immich_watcher_core.models import ( + AlbumChange, + AlbumData, + AssetInfo, + SharedLinkInfo, +) + + +class TestSharedLinkInfo: + def test_from_api_response_basic(self): + data = {"id": "link-1", "key": "abc123"} + link = SharedLinkInfo.from_api_response(data) + assert link.id == "link-1" + assert link.key == "abc123" + assert not link.has_password + assert link.is_accessible + + def test_from_api_response_with_password(self): + data = {"id": "link-1", "key": "abc123", "password": "secret"} + link = SharedLinkInfo.from_api_response(data) + assert link.has_password + assert link.password == "secret" + assert not link.is_accessible + + def test_from_api_response_with_expiry(self): + data = { + "id": "link-1", + "key": "abc123", + "expiresAt": "2099-12-31T23:59:59Z", + } + link = SharedLinkInfo.from_api_response(data) + assert link.expires_at is not None + assert not link.is_expired + + def test_expired_link(self): + link = SharedLinkInfo( + id="link-1", + key="abc123", + expires_at=datetime(2020, 1, 1, tzinfo=timezone.utc), + ) + assert link.is_expired + assert not link.is_accessible + + +class TestAssetInfo: + def test_from_api_response_image(self): + data = { + "id": "asset-1", + "type": "IMAGE", + "originalFileName": "photo.jpg", + "fileCreatedAt": "2024-01-15T10:30:00Z", + "ownerId": "user-1", + "thumbhash": "abc123", + } + asset = AssetInfo.from_api_response(data, {"user-1": "Alice"}) + assert asset.id == "asset-1" + assert asset.type == "IMAGE" + assert asset.filename == "photo.jpg" + assert asset.owner_name == "Alice" + assert asset.is_processed + + def test_from_api_response_with_exif(self): + data = { + "id": "asset-2", + "type": "IMAGE", + "originalFileName": "photo.jpg", + "fileCreatedAt": "2024-01-15T10:30:00Z", + "ownerId": "user-1", + "isFavorite": True, + "thumbhash": "xyz", + "exifInfo": { + "rating": 5, + "latitude": 48.8566, + "longitude": 2.3522, + "city": "Paris", + "state": "Île-de-France", + "country": "France", + "description": "Eiffel Tower", + }, + } + asset = AssetInfo.from_api_response(data) + assert asset.is_favorite + assert asset.rating == 5 + assert asset.latitude == 48.8566 + assert asset.city == "Paris" + assert asset.description == "Eiffel Tower" + + def test_unprocessed_asset(self): + data = { + "id": "asset-3", + "type": "VIDEO", + "originalFileName": "video.mp4", + "fileCreatedAt": "2024-01-15T10:30:00Z", + "ownerId": "user-1", + # No thumbhash = not processed + } + asset = AssetInfo.from_api_response(data) + assert not asset.is_processed + + def test_trashed_asset(self): + data = { + "id": "asset-4", + "type": "IMAGE", + "originalFileName": "deleted.jpg", + "fileCreatedAt": "2024-01-15T10:30:00Z", + "ownerId": "user-1", + "isTrashed": True, + "thumbhash": "abc", + } + asset = AssetInfo.from_api_response(data) + assert not asset.is_processed + + def test_people_extraction(self): + data = { + "id": "asset-5", + "type": "IMAGE", + "originalFileName": "group.jpg", + "fileCreatedAt": "2024-01-15T10:30:00Z", + "ownerId": "user-1", + "thumbhash": "abc", + "people": [ + {"name": "Alice"}, + {"name": "Bob"}, + {"name": ""}, # empty name filtered + ], + } + asset = AssetInfo.from_api_response(data) + assert asset.people == ["Alice", "Bob"] + + +class TestAlbumData: + def test_from_api_response(self): + data = { + "id": "album-1", + "albumName": "Vacation", + "assetCount": 2, + "createdAt": "2024-01-01T00:00:00Z", + "updatedAt": "2024-01-15T10:30:00Z", + "shared": True, + "owner": {"name": "Alice"}, + "albumThumbnailAssetId": "asset-1", + "assets": [ + { + "id": "asset-1", + "type": "IMAGE", + "originalFileName": "photo.jpg", + "fileCreatedAt": "2024-01-15T10:30:00Z", + "ownerId": "user-1", + "thumbhash": "abc", + }, + { + "id": "asset-2", + "type": "VIDEO", + "originalFileName": "video.mp4", + "fileCreatedAt": "2024-01-15T11:00:00Z", + "ownerId": "user-1", + "thumbhash": "def", + }, + ], + } + album = AlbumData.from_api_response(data) + assert album.id == "album-1" + assert album.name == "Vacation" + assert album.photo_count == 1 + assert album.video_count == 1 + assert album.shared + assert len(album.asset_ids) == 2 + assert "asset-1" in album.asset_ids + + +class TestAlbumChange: + def test_basic_creation(self): + change = AlbumChange( + album_id="album-1", + album_name="Test", + change_type="assets_added", + added_count=3, + ) + assert change.added_count == 3 + assert change.removed_count == 0 + assert change.old_name is None diff --git a/packages/core/tests/test_notification_queue.py b/packages/core/tests/test_notification_queue.py new file mode 100644 index 0000000..624ce3d --- /dev/null +++ b/packages/core/tests/test_notification_queue.py @@ -0,0 +1,83 @@ +"""Tests for notification queue.""" + +import pytest +from typing import Any + +from immich_watcher_core.notifications.queue import NotificationQueue + + +class InMemoryBackend: + """In-memory storage backend for testing.""" + + def __init__(self, initial_data: dict[str, Any] | None = None): + self._data = initial_data + + async def load(self) -> dict[str, Any] | None: + return self._data + + async def save(self, data: dict[str, Any]) -> None: + self._data = data + + async def remove(self) -> None: + self._data = None + + +@pytest.fixture +def backend(): + return InMemoryBackend() + + +class TestNotificationQueue: + @pytest.mark.asyncio + async def test_empty_queue(self, backend): + queue = NotificationQueue(backend) + await queue.async_load() + assert not queue.has_pending() + assert queue.get_all() == [] + + @pytest.mark.asyncio + async def test_enqueue_and_get(self, backend): + queue = NotificationQueue(backend) + await queue.async_load() + await queue.async_enqueue({"chat_id": "123", "text": "Hello"}) + assert queue.has_pending() + items = queue.get_all() + assert len(items) == 1 + assert items[0]["params"]["chat_id"] == "123" + + @pytest.mark.asyncio + async def test_multiple_enqueue(self, backend): + queue = NotificationQueue(backend) + await queue.async_load() + await queue.async_enqueue({"msg": "first"}) + await queue.async_enqueue({"msg": "second"}) + assert len(queue.get_all()) == 2 + + @pytest.mark.asyncio + async def test_clear(self, backend): + queue = NotificationQueue(backend) + await queue.async_load() + await queue.async_enqueue({"msg": "test"}) + await queue.async_clear() + assert not queue.has_pending() + + @pytest.mark.asyncio + async def test_remove_indices(self, backend): + queue = NotificationQueue(backend) + await queue.async_load() + await queue.async_enqueue({"msg": "first"}) + await queue.async_enqueue({"msg": "second"}) + await queue.async_enqueue({"msg": "third"}) + # Remove indices in descending order + await queue.async_remove_indices([2, 0]) + items = queue.get_all() + assert len(items) == 1 + assert items[0]["params"]["msg"] == "second" + + @pytest.mark.asyncio + async def test_remove_all(self, backend): + queue = NotificationQueue(backend) + await queue.async_load() + await queue.async_enqueue({"msg": "test"}) + await queue.async_remove() + assert backend._data is None diff --git a/packages/core/tests/test_telegram_cache.py b/packages/core/tests/test_telegram_cache.py new file mode 100644 index 0000000..5ed82d9 --- /dev/null +++ b/packages/core/tests/test_telegram_cache.py @@ -0,0 +1,112 @@ +"""Tests for Telegram file cache.""" + +import pytest +from datetime import datetime, timezone, timedelta +from typing import Any + +from immich_watcher_core.storage import StorageBackend +from immich_watcher_core.telegram.cache import TelegramFileCache + + +class InMemoryBackend: + """In-memory storage backend for testing.""" + + def __init__(self, initial_data: dict[str, Any] | None = None): + self._data = initial_data + + async def load(self) -> dict[str, Any] | None: + return self._data + + async def save(self, data: dict[str, Any]) -> None: + self._data = data + + async def remove(self) -> None: + self._data = None + + +@pytest.fixture +def backend(): + return InMemoryBackend() + + +class TestTelegramFileCacheTTL: + @pytest.mark.asyncio + async def test_set_and_get(self, backend): + cache = TelegramFileCache(backend, ttl_seconds=3600) + await cache.async_load() + await cache.async_set("url1", "file_id_1", "photo") + result = cache.get("url1") + assert result is not None + assert result["file_id"] == "file_id_1" + assert result["type"] == "photo" + + @pytest.mark.asyncio + async def test_miss(self, backend): + cache = TelegramFileCache(backend, ttl_seconds=3600) + await cache.async_load() + assert cache.get("nonexistent") is None + + @pytest.mark.asyncio + async def test_ttl_expiry(self): + # Pre-populate with an old entry + old_time = (datetime.now(timezone.utc) - timedelta(hours=100)).isoformat() + data = {"files": {"url1": {"file_id": "old", "type": "photo", "cached_at": old_time}}} + backend = InMemoryBackend(data) + cache = TelegramFileCache(backend, ttl_seconds=3600) + await cache.async_load() + # Old entry should be cleaned up on load + assert cache.get("url1") is None + + @pytest.mark.asyncio + async def test_set_many(self, backend): + cache = TelegramFileCache(backend, ttl_seconds=3600) + await cache.async_load() + entries = [ + ("url1", "fid1", "photo", None), + ("url2", "fid2", "video", None), + ] + await cache.async_set_many(entries) + assert cache.get("url1")["file_id"] == "fid1" + assert cache.get("url2")["file_id"] == "fid2" + + +class TestTelegramFileCacheThumbhash: + @pytest.mark.asyncio + async def test_thumbhash_validation(self, backend): + cache = TelegramFileCache(backend, use_thumbhash=True) + await cache.async_load() + await cache.async_set("asset-1", "fid1", "photo", thumbhash="hash_v1") + + # Match + result = cache.get("asset-1", thumbhash="hash_v1") + assert result is not None + assert result["file_id"] == "fid1" + + # Mismatch - cache miss + result = cache.get("asset-1", thumbhash="hash_v2") + assert result is None + + @pytest.mark.asyncio + async def test_thumbhash_max_entries(self): + # Create cache with many entries + files = {} + for i in range(2100): + files[f"asset-{i}"] = { + "file_id": f"fid-{i}", + "type": "photo", + "cached_at": datetime(2024, 1, 1 + i // 1440, (i // 60) % 24, i % 60, tzinfo=timezone.utc).isoformat(), + } + backend = InMemoryBackend({"files": files}) + cache = TelegramFileCache(backend, use_thumbhash=True) + await cache.async_load() + # Should be trimmed to 2000 + remaining = backend._data["files"] + assert len(remaining) == 2000 + + @pytest.mark.asyncio + async def test_remove(self, backend): + cache = TelegramFileCache(backend, ttl_seconds=3600) + await cache.async_load() + await cache.async_set("url1", "fid1", "photo") + await cache.async_remove() + assert backend._data is None diff --git a/plans/README.md b/plans/README.md new file mode 100644 index 0000000..24b77be --- /dev/null +++ b/plans/README.md @@ -0,0 +1,22 @@ +# Plans + +This folder contains the primary architecture plan and phase-specific subplans for the Immich Watcher project restructuring. + +## Structure + +- `primary-plan.md` -- Master plan with architecture decisions and phase overview +- `phase-1-core-library.md` -- Extract shared core library +- `phase-2-haos-refactor.md` -- Wire core into HAOS integration +- `phase-3-server-backend.md` -- Build standalone FastAPI server +- `phase-4-frontend.md` -- Build SvelteKit web UI +- `phase-5-haos-server-sync.md` -- Optional HAOS-Server integration + +## Tracking + +Each plan uses checkbox tracking: +- `[ ]` -- Not started +- `[x]` -- Completed +- `[~]` -- In progress +- `[-]` -- Skipped/deferred + +Phase subplans are created when work on that phase begins (not all upfront). diff --git a/plans/phase-1-core-library.md b/plans/phase-1-core-library.md new file mode 100644 index 0000000..773681b --- /dev/null +++ b/plans/phase-1-core-library.md @@ -0,0 +1,211 @@ +# Phase 1: Extract Core Library + +**Status**: Not started +**Parent**: [primary-plan.md](primary-plan.md) + +--- + +## Goal + +Extract all HA-independent logic from the integration into `packages/core/` as a standalone Python library (`immich-watcher-core`). This library will be consumed by both the HAOS integration and the standalone server. + +--- + +## Directory Structure + +``` +packages/core/ + pyproject.toml + src/immich_watcher_core/ + __init__.py + constants.py # Event types, attribute names, asset types, defaults + models.py # SharedLinkInfo, AssetInfo, AlbumData, AlbumChange + immich_client.py # Async Immich API client (aiohttp) + change_detector.py # detect_album_changes() pure function + asset_utils.py # Asset filtering, sorting, URL building + telegram/ + __init__.py + client.py # TelegramClient - full Bot API operations + cache.py # TelegramFileCache with CacheBackend protocol + media.py # _split_media_by_upload_size, photo limit checks, URL helpers + notifications/ + __init__.py + queue.py # NotificationQueue with QueueBackend protocol + storage.py # CacheBackend/QueueBackend protocols + JSON file implementations + tests/ + __init__.py + test_models.py + test_immich_client.py + test_change_detector.py + test_asset_utils.py + test_telegram_client.py + test_telegram_cache.py + test_notification_queue.py +``` + +--- + +## Tasks + +### 1. Package setup `[ ]` + +- [x] Create `packages/core/pyproject.toml` with: + - Name: `immich-watcher-core` + - Dependencies: `aiohttp`, `jinja2` + - Optional dev deps: `pytest`, `pytest-asyncio`, `aioresponses` +- [x] Create `packages/core/src/immich_watcher_core/__init__.py` + +### 2. Extract constants `[ ]` + +**Source**: `custom_components/immich_album_watcher/const.py` + +Extract to `constants.py`: +- Event names: `EVENT_ALBUM_CHANGED`, `EVENT_ASSETS_ADDED`, etc. (L29-34) +- Attribute names: all `ATTR_*` constants (L37-77) +- Asset types: `ASSET_TYPE_IMAGE`, `ASSET_TYPE_VIDEO` (L80-81) +- Defaults: `DEFAULT_SCAN_INTERVAL`, `DEFAULT_TELEGRAM_CACHE_TTL`, `NEW_ASSETS_RESET_DELAY`, `DEFAULT_SHARE_PASSWORD` (L23-27) + +**Keep in HA const.py**: `DOMAIN`, `CONF_*`, `SUBENTRY_TYPE_ALBUM`, `PLATFORMS`, `SERVICE_*` (HA-specific) + +### 3. Extract data models `[ ]` + +**Source**: `custom_components/immich_album_watcher/coordinator.py` L66-326 + +Extract to `models.py`: +- `SharedLinkInfo` (L67-111) -- dataclass, zero HA deps +- `AssetInfo` (L114-249) -- dataclass, uses only `ASSET_TYPE_IMAGE` from constants +- `AlbumData` (L252-308) -- dataclass, uses `AssetInfo` + `ASSET_TYPE_*` +- `AlbumChange` (L311-325) -- dataclass, pure data + +All use only stdlib + our constants. No changes needed except import paths. + +### 4. Extract Immich API client `[ ]` + +**Source**: `custom_components/immich_album_watcher/coordinator.py` + +Extract to `immich_client.py` as `ImmichClient` class: +- Constructor takes: `session: aiohttp.ClientSession`, `url: str`, `api_key: str` +- `async get_server_config() -> str | None` (L640-668) -- returns external_domain +- `async get_users() -> dict[str, str]` (L617-638) -- user_id -> name +- `async get_people() -> dict[str, str]` (L593-615) -- person_id -> name +- `async get_shared_links(album_id: str) -> list[SharedLinkInfo]` (L670-699) +- `async get_album(album_id: str) -> AlbumData | None` (L876-898 fetch part) +- `async create_shared_link(album_id, password?) -> bool` (L1199-1243) +- `async delete_shared_link(link_id) -> bool` (L1245-1271) +- `async set_shared_link_password(link_id, password?) -> bool` (L1144-1176) +- `async ping() -> bool` -- validate connection (used by config_flow) +- Properties: `url`, `external_url`, `api_key` +- Helper: `get_internal_download_url(url)` (L384-400) + +**Key design**: Accept `aiohttp.ClientSession` via constructor. HA provides `async_get_clientsession(hass)`, standalone creates its own. + +### 5. Extract asset utilities `[ ]` + +**Source**: `custom_components/immich_album_watcher/coordinator.py` L458-591, L761-856 + +Extract to `asset_utils.py`: +- `filter_assets(assets, ...)` -- favorite_only, min_rating, asset_type, date range, memory_date, geolocation (L498-557) +- `sort_assets(assets, order_by, order)` (L559-581) +- `build_asset_detail(asset, external_url, shared_links, include_thumbnail)` (L801-856) +- URL builders: `get_asset_public_url`, `get_asset_download_url`, `get_asset_video_url`, `get_asset_photo_url` (L761-799) +- Album URL helpers: `get_public_url`, `get_any_url`, `get_protected_url`, etc. (L709-759) + +### 6. Extract change detection `[ ]` + +**Source**: `custom_components/immich_album_watcher/coordinator.py` L979-1066 + +Extract to `change_detector.py`: +- `detect_album_changes(old_state, new_state, pending_asset_ids) -> tuple[AlbumChange | None, set[str]]` +- Pure function: takes two `AlbumData` + pending set, returns change + updated pending set +- No HA dependencies + +### 7. Extract storage protocols `[ ]` + +**Source**: `custom_components/immich_album_watcher/storage.py` + +Extract to `storage.py`: + +```python +class CacheBackend(Protocol): + """Abstract storage backend for caches.""" + async def load(self) -> dict[str, Any]: ... + async def save(self, data: dict[str, Any]) -> None: ... + async def remove(self) -> None: ... + +class QueueBackend(Protocol): + """Abstract storage backend for queues.""" + async def load(self) -> dict[str, Any]: ... + async def save(self, data: dict[str, Any]) -> None: ... + async def remove(self) -> None: ... + +class JsonFileBackend: + """Simple JSON file storage backend (for standalone server).""" + def __init__(self, path: Path): ... +``` + +### 8. Extract TelegramFileCache `[ ]` + +**Source**: `custom_components/immich_album_watcher/storage.py` L71-262 + +Extract to `telegram/cache.py`: +- `TelegramFileCache` -- takes `CacheBackend` instead of `hass + Store` +- All logic unchanged: TTL mode, thumbhash mode, cleanup, get/set/set_many +- Remove `hass`/`Store` imports + +### 9. Extract NotificationQueue `[ ]` + +**Source**: `custom_components/immich_album_watcher/storage.py` L265-328 + +Extract to `notifications/queue.py`: +- `NotificationQueue` -- takes `QueueBackend` instead of `hass + Store` +- All logic unchanged: enqueue, get_all, has_pending, remove_indices, clear + +### 10. Extract Telegram client `[ ]` + +**Source**: `custom_components/immich_album_watcher/sensor.py` L55-60, L61-111, L114-170, L455-550, L551-1700+ + +Extract to `telegram/client.py` as `TelegramClient`: +- Constructor: `session: aiohttp.ClientSession`, `bot_token: str`, `cache: TelegramFileCache | None`, `asset_cache: TelegramFileCache | None`, `url_resolver: Callable[[str], str] | None` (for internal URL conversion) +- `async send_notification(chat_id, assets?, caption?, ...)` -- main entry (L455-549) +- `async send_message(...)` (L551-597) +- `async send_chat_action(...)` (L599-665) +- `_log_error(...)` (L667-803) +- `async send_photo(...)` (L805-958) +- `async send_video(...)` (L960-1105) +- `async send_document(...)` (L1107-1215) +- `async send_media_group(...)` (L1217-end) + +Extract to `telegram/media.py`: +- Constants: `TELEGRAM_API_BASE_URL`, `TELEGRAM_MAX_PHOTO_SIZE`, `TELEGRAM_MAX_VIDEO_SIZE`, `TELEGRAM_MAX_DIMENSION_SUM` (L56-59) +- `_is_asset_id(value)` (L76-85) +- `_extract_asset_id_from_url(url)` (L88-111) +- `_split_media_by_upload_size(media_items, max_upload_size)` (L114-170) + +### 11. Write tests `[ ]` + +- `test_models.py` -- `SharedLinkInfo.from_api_response`, `AssetInfo.from_api_response`, `AlbumData.from_api_response`, processing status checks +- `test_immich_client.py` -- Mock aiohttp responses for each API call (use `aioresponses`) +- `test_change_detector.py` -- Various change scenarios: add only, remove only, rename, sharing changed, pending assets becoming processed, no change +- `test_asset_utils.py` -- Filter/sort combinations, URL building +- `test_telegram_cache.py` -- TTL expiry, thumbhash validation, batch set, cleanup +- `test_notification_queue.py` -- Enqueue, get_all, remove_indices, clear + +--- + +## Acceptance Criteria + +- [ ] All extracted modules have zero Home Assistant imports +- [ ] `pyproject.toml` is valid and installable (`pip install -e packages/core`) +- [ ] All tests pass +- [ ] The HAOS integration is NOT modified yet (that's Phase 2) +- [ ] No functionality is lost in extraction -- behavior matches original exactly + +--- + +## Key Design Decisions + +1. **Session injection**: `ImmichClient` and `TelegramClient` accept `aiohttp.ClientSession` -- no global session creation +2. **Storage protocols**: `CacheBackend`/`QueueBackend` protocols allow HA's `Store` and standalone's SQLite/JSON to satisfy the same interface +3. **URL resolver callback**: Telegram client accepts optional `url_resolver: Callable[[str], str]` for converting external URLs to internal ones (coordinator owns this mapping) +4. **Logging**: Use stdlib `logging` throughout. Consumers configure their own handlers. +5. **No async_get_clientsession**: All HA-specific session management stays in the integration diff --git a/plans/primary-plan.md b/plans/primary-plan.md new file mode 100644 index 0000000..7a1ca6e --- /dev/null +++ b/plans/primary-plan.md @@ -0,0 +1,214 @@ +# Immich Watcher: Standalone Web App + Shared Core Architecture + +**Status**: Planning +**Created**: 2026-03-19 + +--- + +## Context + +The current `immich_album_watcher` HA integration contains ~3,600 lines of tightly coupled code: Immich API client, change detection, Telegram notifications, and HA entities/services. A separate HA Blueprint (~2,000 lines) adds message templating, filtering, scheduled notifications, and memory mode. + +**Goal**: Enable Immich album change notifications **without Home Assistant** via a standalone web application, while keeping the HAOS integration functional and sharing as much logic as possible. + +--- + +## Decisions + +| Decision | Choice | Rationale | +|---|---|---| +| Architecture | Hybrid (Option C) | HAOS standalone with shared core lib, optional server sync. No breaking changes for HA-only users. | +| Frontend | SvelteKit + Shadcn-svelte | Small bundle, fast, calm UI aesthetic. Good fit for self-hosted. | +| Notifications | Telegram + Generic webhook | Telegram from existing code. Webhook enables Discord/Slack/ntfy/custom. | +| Auth | Multi-user (admin/user roles) | Supports shared Immich servers. Admin manages servers/users, users manage own trackers. | +| Backend | FastAPI + SQLite + APScheduler | Async-native Python, zero external DB deps, proven scheduler. | + +--- + +## Repository Structure + +``` +immich-watcher/ + packages/ + core/ # Shared Python library + pyproject.toml + src/immich_watcher_core/ + immich_client.py # Immich API (from coordinator.py) + models.py # AssetInfo, AlbumData, AlbumChange, SharedLinkInfo + change_detector.py # Change detection (from coordinator.py) + telegram/ + client.py # Telegram Bot API (from sensor.py) + cache.py # File cache with pluggable backend + media.py # Media download, size checks, group splitting + webhook/ + client.py # Generic webhook notification provider + templates.py # Jinja2 template engine + storage.py # Abstract storage protocol + SQLite impl + constants.py # Shared constants (from const.py) + tests/ + + server/ # Standalone FastAPI app + pyproject.toml + src/immich_watcher_server/ + main.py # FastAPI entry point + database/models.py # SQLModel ORM + database/migrations/ # Alembic + api/ # REST endpoints + services/ + scheduler.py # APScheduler background polling + watcher.py # Album polling orchestrator + notifier.py # Notification dispatch + Dockerfile + docker-compose.yml + + haos/ # Home Assistant integration (moved) + custom_components/immich_album_watcher/ + ... (refactored to use core library) + hacs.json + + frontend/ # SvelteKit web UI source + package.json + src/ + dist/ # Built static files served by FastAPI + + plans/ # This folder + README.md + LICENSE +``` + +--- + +## Shared Core Library Extractions + +| Core Module | Source File | What Gets Extracted | +|---|---|---| +| `immich_client.py` | coordinator.py | All `/api/` calls, session injection via constructor | +| `models.py` | coordinator.py L66-326 | Dataclasses (already HA-independent) | +| `change_detector.py` | coordinator.py L979-1066 | `detect_album_changes()` pure function | +| `telegram/client.py` | sensor.py (~1200 lines) | Full Telegram Bot API: send_message/photo/video/media_group | +| `telegram/cache.py` | storage.py | TelegramFileCache with `CacheBackend` protocol | +| `templates.py` | NEW (from blueprint logic) | Jinja2 renderer with ~40 variables matching blueprint | +| `storage.py` | storage.py | Abstract protocol + SQLite implementation | +| `webhook/client.py` | NEW | Generic webhook POST JSON with event data | + +The `ImmichClient` accepts an `aiohttp.ClientSession` in constructor -- HA provides its managed session, standalone creates its own. + +--- + +## Standalone Server Design + +### Backend: FastAPI + SQLite + APScheduler + +**Database tables**: `users`, `immich_servers`, `notification_targets`, `message_templates`, `album_trackers`, `album_states`, `telegram_cache`, `notification_queue` + +**Key API endpoints**: +- `POST /api/auth/setup` / `POST /api/auth/login` -- JWT auth +- `CRUD /api/servers` -- Immich server connections +- `GET /api/servers/{id}/albums` -- Fetch album list from Immich +- `CRUD /api/trackers` -- Album trackers (album selection, event types, template overrides, targets) +- `CRUD /api/templates` -- Message templates with preview +- `CRUD /api/targets` -- Notification targets (Telegram chats, webhooks) +- `CRUD /api/users` -- User management (admin only) +- `GET /api/status` -- Dashboard data + +**Background**: APScheduler runs one job per tracker at its scan interval. Each job: fetch album -> detect changes -> render template -> dispatch notification. + +### Frontend: SvelteKit + Shadcn-svelte + +**Pages**: +1. **Setup wizard** -- First-run: create admin account, connect Immich server +2. **Login** -- Username/password +3. **Dashboard** -- Active trackers overview, recent events timeline, server status +4. **Servers** -- Add/edit Immich server connections (URL + API key validation) +5. **Trackers** -- Create/edit album trackers: + - Album picker (multi-select, fetched from Immich) + - Event type toggles (assets added/removed, renamed, sharing changed, deleted) + - Notification target selection + - Template selection or per-tracker override + - Scan interval, quiet hours +6. **Templates** -- Jinja2 template editor: + - CodeMirror with Jinja2 syntax highlighting + - Live preview with sample album data + - Variable reference sidebar + - Default templates for common use cases +7. **Targets** -- Manage notification destinations (Telegram chats, webhooks) +8. **Users** -- User management (admin only) +9. **Settings** -- Global defaults + +### Auth: Multi-user, bcrypt + JWT + +- Multiple user accounts with admin/user roles +- Admin: full access (user management, server configuration) +- User: manage own trackers, templates, and targets +- First-run setup creates initial admin account + +### Deployment: Single Docker container, SQLite in mounted volume + +--- + +## HAOS Integration Changes + +The integration gets refactored to delegate to core: + +```python +# coordinator.py becomes thin wrapper +class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator): + def __init__(self, ...): + self._client = ImmichClient(session, url, api_key) # from core + + async def _async_update_data(self): + album = await self._client.get_album(self._album_id) + change = detect_album_changes(old, album, pending) # from core + if change: self._fire_events(change, album) # HA-specific + return album + +# sensor.py Telegram methods delegate to core +async def _execute_telegram_notification(self, ...): + telegram = TelegramClient(session, token, cache) # from core + return await telegram.send_notification(...) +``` + +--- + +## Phases + +> **Rule**: Before starting work on any phase, create a detailed trackable subplan at `plans/phase-N-.md` with granular tasks, specific files to create/modify, and acceptance criteria. Do not begin implementation until the subplan is reviewed. + +### Phase 1: Extract Core Library `[x]` +- Extract models, Immich client, change detection, Telegram client, cache into `packages/core/` +- Write unit tests for all extracted modules +- **Subplan**: `plans/phase-1-core-library.md` + +### Phase 2: Wire Core into HAOS Integration `[ ]` +- Move integration to `packages/haos/` +- Refactor coordinator.py, sensor.py, storage.py to use core library +- Update manifest.json, hacs.json for new structure +- Verify identical behavior with real Immich server +- **Subplan**: `plans/phase-2-haos-refactor.md` + +### Phase 3: Build Server Backend `[ ]` +- FastAPI app with database, scheduler, API endpoints +- Telegram + webhook notification providers +- Jinja2 template engine with variable system matching blueprint +- **Subplan**: `plans/phase-3-server-backend.md` + +### Phase 4: Build Frontend `[ ]` +- SvelteKit app with all pages (setup, dashboard, trackers, templates, targets, users) +- Template editor with live preview +- Album picker connected to Immich API +- **Subplan**: `plans/phase-4-frontend.md` + +### Phase 5: HAOS-Server Sync (Optional) `[ ]` +- Add optional server URL to HA config flow +- Implement tracker/template config sync +- **Subplan**: `plans/phase-5-haos-server-sync.md` + +--- + +## Verification + +1. **Core library**: Unit tests for Immich client (mocked HTTP), change detection (pure function), Telegram client (mocked API), template rendering +2. **HAOS integration**: After refactoring, verify all existing entities, services, and events work identically with a real Immich server +3. **Server backend**: API integration tests with SQLite in-memory DB, scheduler tests +4. **Frontend**: Manual testing of all pages, template editor preview, album picker +5. **End-to-end**: Docker Compose up -> create account -> connect Immich -> create tracker -> trigger album change -> verify notification received