Some checks failed
Validate / Hassfest (push) Has been cancelled
Wire the integration to delegate all HA-independent logic to immich-watcher-core, eliminating ~2300 lines of duplicated code. Changes: - const.py: Import shared constants from core, keep HA-specific ones - storage.py: Create HAStorageBackend adapter wrapping HA's Store, use core TelegramFileCache and NotificationQueue via adapter - coordinator.py: Delegate to core ImmichClient for API calls, detect_album_changes() for change detection, and asset_utils for filtering/sorting/URL building. Keep HA-specific event firing. - sensor.py: Replace ~1300 lines of Telegram code with 15-line delegation to core TelegramClient. Keep entity classes unchanged. - __init__.py: Use factory functions for creating core instances with HA storage backends - manifest.json: Add immich-watcher-core dependency Integration line count: 3600 -> 1295 lines (-64%) Zero behavior changes for end users. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
142 lines
4.6 KiB
Python
142 lines
4.6 KiB
Python
"""Storage helpers for Immich Album Watcher."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.storage import Store
|
|
|
|
from immich_watcher_core.notifications.queue import (
|
|
NotificationQueue as CoreNotificationQueue,
|
|
)
|
|
from immich_watcher_core.telegram.cache import TelegramFileCache as CoreTelegramFileCache
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
STORAGE_VERSION = 1
|
|
STORAGE_KEY_PREFIX = "immich_album_watcher"
|
|
|
|
|
|
class HAStorageBackend:
|
|
"""Home Assistant storage backend adapter.
|
|
|
|
Wraps homeassistant.helpers.storage.Store to satisfy the
|
|
StorageBackend protocol from immich_watcher_core.
|
|
"""
|
|
|
|
def __init__(self, hass: HomeAssistant, key: str) -> None:
|
|
"""Initialize with HA store.
|
|
|
|
Args:
|
|
hass: Home Assistant instance
|
|
key: Storage key (e.g. "immich_album_watcher.telegram_cache.xxx")
|
|
"""
|
|
self._store: Store[dict[str, Any]] = Store(hass, STORAGE_VERSION, key)
|
|
|
|
async def load(self) -> dict[str, Any] | None:
|
|
"""Load data from HA storage."""
|
|
return await self._store.async_load()
|
|
|
|
async def save(self, data: dict[str, Any]) -> None:
|
|
"""Save data to HA storage."""
|
|
await self._store.async_save(data)
|
|
|
|
async def remove(self) -> None:
|
|
"""Remove all stored data."""
|
|
await self._store.async_remove()
|
|
|
|
|
|
class ImmichAlbumStorage:
|
|
"""Handles persistence of album state across restarts.
|
|
|
|
This remains HA-native as it manages HA-specific album tracking state.
|
|
"""
|
|
|
|
def __init__(self, hass: HomeAssistant, entry_id: str) -> None:
|
|
"""Initialize the storage."""
|
|
self._store: Store[dict[str, Any]] = Store(
|
|
hass, STORAGE_VERSION, f"{STORAGE_KEY_PREFIX}.{entry_id}"
|
|
)
|
|
self._data: dict[str, Any] | None = None
|
|
|
|
async def async_load(self) -> dict[str, Any]:
|
|
"""Load data from storage."""
|
|
self._data = await self._store.async_load() or {"albums": {}}
|
|
_LOGGER.debug("Loaded storage data with %d albums", len(self._data.get("albums", {})))
|
|
return self._data
|
|
|
|
async def async_save_album_state(self, album_id: str, asset_ids: set[str]) -> None:
|
|
"""Save album asset IDs to storage."""
|
|
if self._data is None:
|
|
self._data = {"albums": {}}
|
|
|
|
self._data["albums"][album_id] = {
|
|
"asset_ids": list(asset_ids),
|
|
"last_updated": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
await self._store.async_save(self._data)
|
|
|
|
def get_album_asset_ids(self, album_id: str) -> set[str] | None:
|
|
"""Get persisted asset IDs for an album.
|
|
|
|
Returns None if no persisted state exists for the album.
|
|
"""
|
|
if self._data and "albums" in self._data:
|
|
album_data = self._data["albums"].get(album_id)
|
|
if album_data:
|
|
return set(album_data.get("asset_ids", []))
|
|
return None
|
|
|
|
async def async_remove_album(self, album_id: str) -> None:
|
|
"""Remove an album from storage."""
|
|
if self._data and "albums" in self._data:
|
|
self._data["albums"].pop(album_id, None)
|
|
await self._store.async_save(self._data)
|
|
|
|
async def async_remove(self) -> None:
|
|
"""Remove all storage data."""
|
|
await self._store.async_remove()
|
|
self._data = None
|
|
|
|
|
|
# Convenience factory functions for creating core classes with HA backends
|
|
|
|
|
|
def create_telegram_cache(
|
|
hass: HomeAssistant,
|
|
entry_id: str,
|
|
ttl_seconds: int = 48 * 60 * 60,
|
|
use_thumbhash: bool = False,
|
|
) -> CoreTelegramFileCache:
|
|
"""Create a TelegramFileCache with HA storage backend.
|
|
|
|
Args:
|
|
hass: Home Assistant instance
|
|
entry_id: Config entry ID for scoping
|
|
ttl_seconds: TTL for cache entries (TTL mode only)
|
|
use_thumbhash: Use thumbhash validation instead of TTL
|
|
"""
|
|
suffix = f"_assets" if use_thumbhash else ""
|
|
backend = HAStorageBackend(
|
|
hass, f"{STORAGE_KEY_PREFIX}.telegram_cache.{entry_id}{suffix}"
|
|
)
|
|
return CoreTelegramFileCache(backend, ttl_seconds=ttl_seconds, use_thumbhash=use_thumbhash)
|
|
|
|
|
|
def create_notification_queue(
|
|
hass: HomeAssistant, entry_id: str
|
|
) -> CoreNotificationQueue:
|
|
"""Create a NotificationQueue with HA storage backend."""
|
|
backend = HAStorageBackend(
|
|
hass, f"{STORAGE_KEY_PREFIX}.notification_queue.{entry_id}"
|
|
)
|
|
return CoreNotificationQueue(backend)
|
|
|
|
|
|
# Re-export core types for backward compatibility
|
|
TelegramFileCache = CoreTelegramFileCache
|
|
NotificationQueue = CoreNotificationQueue
|