Refactor HAOS integration to use shared core library (Phase 2)
Some checks failed
Validate / Hassfest (push) Has been cancelled

Wire the integration to delegate all HA-independent logic to
immich-watcher-core, eliminating ~2300 lines of duplicated code.

Changes:
- const.py: Import shared constants from core, keep HA-specific ones
- storage.py: Create HAStorageBackend adapter wrapping HA's Store,
  use core TelegramFileCache and NotificationQueue via adapter
- coordinator.py: Delegate to core ImmichClient for API calls,
  detect_album_changes() for change detection, and asset_utils
  for filtering/sorting/URL building. Keep HA-specific event firing.
- sensor.py: Replace ~1300 lines of Telegram code with 15-line
  delegation to core TelegramClient. Keep entity classes unchanged.
- __init__.py: Use factory functions for creating core instances
  with HA storage backends
- manifest.json: Add immich-watcher-core dependency

Integration line count: 3600 -> 1295 lines (-64%)
Zero behavior changes for end users.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-19 12:47:18 +03:00
parent d0783d0b6a
commit b107cfe67f
8 changed files with 502 additions and 2804 deletions

View File

@@ -9,17 +9,51 @@ from typing import Any
from homeassistant.core import HomeAssistant
from homeassistant.helpers.storage import Store
from immich_watcher_core.notifications.queue import (
NotificationQueue as CoreNotificationQueue,
)
from immich_watcher_core.telegram.cache import TelegramFileCache as CoreTelegramFileCache
_LOGGER = logging.getLogger(__name__)
STORAGE_VERSION = 1
STORAGE_KEY_PREFIX = "immich_album_watcher"
# Default TTL for Telegram file_id cache (48 hours in seconds)
DEFAULT_TELEGRAM_CACHE_TTL = 48 * 60 * 60
class HAStorageBackend:
"""Home Assistant storage backend adapter.
Wraps homeassistant.helpers.storage.Store to satisfy the
StorageBackend protocol from immich_watcher_core.
"""
def __init__(self, hass: HomeAssistant, key: str) -> None:
"""Initialize with HA store.
Args:
hass: Home Assistant instance
key: Storage key (e.g. "immich_album_watcher.telegram_cache.xxx")
"""
self._store: Store[dict[str, Any]] = Store(hass, STORAGE_VERSION, key)
async def load(self) -> dict[str, Any] | None:
"""Load data from HA storage."""
return await self._store.async_load()
async def save(self, data: dict[str, Any]) -> None:
"""Save data to HA storage."""
await self._store.async_save(data)
async def remove(self) -> None:
"""Remove all stored data."""
await self._store.async_remove()
class ImmichAlbumStorage:
"""Handles persistence of album state across restarts."""
"""Handles persistence of album state across restarts.
This remains HA-native as it manages HA-specific album tracking state.
"""
def __init__(self, hass: HomeAssistant, entry_id: str) -> None:
"""Initialize the storage."""
@@ -68,260 +102,40 @@ class ImmichAlbumStorage:
self._data = None
class TelegramFileCache:
"""Cache for Telegram file_ids to avoid re-uploading media.
# Convenience factory functions for creating core classes with HA backends
When a file is uploaded to Telegram, it returns a file_id that can be reused
to send the same file without re-uploading. This cache stores these file_ids
keyed by the source URL or asset ID.
Supports two validation modes:
- TTL mode (default): entries expire after a configured time-to-live
- Thumbhash mode: entries are validated by comparing stored thumbhash with
the current asset thumbhash from Immich
def create_telegram_cache(
hass: HomeAssistant,
entry_id: str,
ttl_seconds: int = 48 * 60 * 60,
use_thumbhash: bool = False,
) -> CoreTelegramFileCache:
"""Create a TelegramFileCache with HA storage backend.
Args:
hass: Home Assistant instance
entry_id: Config entry ID for scoping
ttl_seconds: TTL for cache entries (TTL mode only)
use_thumbhash: Use thumbhash validation instead of TTL
"""
def __init__(
self,
hass: HomeAssistant,
entry_id: str,
ttl_seconds: int = DEFAULT_TELEGRAM_CACHE_TTL,
use_thumbhash: bool = False,
) -> None:
"""Initialize the Telegram file cache.
Args:
hass: Home Assistant instance
entry_id: Config entry ID for scoping the cache (per hub)
ttl_seconds: Time-to-live for cache entries in seconds (TTL mode only)
use_thumbhash: Use thumbhash-based validation instead of TTL
"""
self._store: Store[dict[str, Any]] = Store(
hass, STORAGE_VERSION, f"{STORAGE_KEY_PREFIX}.telegram_cache.{entry_id}"
)
self._data: dict[str, Any] | None = None
self._ttl_seconds = ttl_seconds
self._use_thumbhash = use_thumbhash
async def async_load(self) -> None:
"""Load cache data from storage."""
self._data = await self._store.async_load() or {"files": {}}
# Clean up expired entries on load (TTL mode only)
await self._cleanup_expired()
mode = "thumbhash" if self._use_thumbhash else "TTL"
_LOGGER.debug(
"Loaded Telegram file cache with %d entries (mode: %s)",
len(self._data.get("files", {})),
mode,
)
# Maximum number of entries to keep in thumbhash mode to prevent unbounded growth
THUMBHASH_MAX_ENTRIES = 2000
async def _cleanup_expired(self) -> None:
"""Remove expired cache entries (TTL mode) or trim old entries (thumbhash mode)."""
if self._use_thumbhash:
files = self._data.get("files", {}) if self._data else {}
if len(files) > self.THUMBHASH_MAX_ENTRIES:
sorted_keys = sorted(
files, key=lambda k: files[k].get("cached_at", "")
)
keys_to_remove = sorted_keys[: len(files) - self.THUMBHASH_MAX_ENTRIES]
for key in keys_to_remove:
del files[key]
await self._store.async_save(self._data)
_LOGGER.debug(
"Trimmed thumbhash cache from %d to %d entries",
len(keys_to_remove) + self.THUMBHASH_MAX_ENTRIES,
self.THUMBHASH_MAX_ENTRIES,
)
return
if not self._data or "files" not in self._data:
return
now = datetime.now(timezone.utc)
expired_keys = []
for url, entry in self._data["files"].items():
cached_at_str = entry.get("cached_at")
if cached_at_str:
cached_at = datetime.fromisoformat(cached_at_str)
age_seconds = (now - cached_at).total_seconds()
if age_seconds > self._ttl_seconds:
expired_keys.append(url)
if expired_keys:
for key in expired_keys:
del self._data["files"][key]
await self._store.async_save(self._data)
_LOGGER.debug("Cleaned up %d expired Telegram cache entries", len(expired_keys))
def get(self, key: str, thumbhash: str | None = None) -> dict[str, Any] | None:
"""Get cached file_id for a key.
Args:
key: The cache key (URL or asset ID)
thumbhash: Current thumbhash for validation (thumbhash mode only).
If provided, compares with stored thumbhash. Mismatch = cache miss.
Returns:
Dict with 'file_id' and 'type' if cached and valid, None otherwise
"""
if not self._data or "files" not in self._data:
return None
entry = self._data["files"].get(key)
if not entry:
return None
if self._use_thumbhash:
# Thumbhash-based validation
if thumbhash is not None:
stored_thumbhash = entry.get("thumbhash")
if stored_thumbhash and stored_thumbhash != thumbhash:
_LOGGER.debug(
"Cache miss for %s: thumbhash changed, removing stale entry",
key[:36],
)
del self._data["files"][key]
return None
# If no thumbhash provided (asset not in monitored album),
# return cached entry anyway — self-heals on Telegram rejection
else:
# TTL-based validation
cached_at_str = entry.get("cached_at")
if cached_at_str:
cached_at = datetime.fromisoformat(cached_at_str)
age_seconds = (datetime.now(timezone.utc) - cached_at).total_seconds()
if age_seconds > self._ttl_seconds:
return None
return {
"file_id": entry.get("file_id"),
"type": entry.get("type"),
}
async def async_set(
self, key: str, file_id: str, media_type: str, thumbhash: str | None = None
) -> None:
"""Store a file_id for a key.
Args:
key: The cache key (URL or asset ID)
file_id: The Telegram file_id
media_type: The type of media ('photo', 'video', 'document')
thumbhash: Current thumbhash to store alongside file_id (thumbhash mode only)
"""
if self._data is None:
self._data = {"files": {}}
entry_data: dict[str, Any] = {
"file_id": file_id,
"type": media_type,
"cached_at": datetime.now(timezone.utc).isoformat(),
}
if thumbhash is not None:
entry_data["thumbhash"] = thumbhash
self._data["files"][key] = entry_data
await self._store.async_save(self._data)
_LOGGER.debug("Cached Telegram file_id for key (type: %s)", media_type)
async def async_set_many(
self, entries: list[tuple[str, str, str, str | None]]
) -> None:
"""Store multiple file_ids in a single disk write.
Args:
entries: List of (key, file_id, media_type, thumbhash) tuples
"""
if not entries:
return
if self._data is None:
self._data = {"files": {}}
now_iso = datetime.now(timezone.utc).isoformat()
for key, file_id, media_type, thumbhash in entries:
entry_data: dict[str, Any] = {
"file_id": file_id,
"type": media_type,
"cached_at": now_iso,
}
if thumbhash is not None:
entry_data["thumbhash"] = thumbhash
self._data["files"][key] = entry_data
await self._store.async_save(self._data)
_LOGGER.debug("Batch cached %d Telegram file_ids", len(entries))
async def async_remove(self) -> None:
"""Remove all cache data."""
await self._store.async_remove()
self._data = None
suffix = f"_assets" if use_thumbhash else ""
backend = HAStorageBackend(
hass, f"{STORAGE_KEY_PREFIX}.telegram_cache.{entry_id}{suffix}"
)
return CoreTelegramFileCache(backend, ttl_seconds=ttl_seconds, use_thumbhash=use_thumbhash)
class NotificationQueue:
"""Persistent queue for notifications deferred during quiet hours.
def create_notification_queue(
hass: HomeAssistant, entry_id: str
) -> CoreNotificationQueue:
"""Create a NotificationQueue with HA storage backend."""
backend = HAStorageBackend(
hass, f"{STORAGE_KEY_PREFIX}.notification_queue.{entry_id}"
)
return CoreNotificationQueue(backend)
Stores full service call parameters so notifications can be replayed
exactly as they were originally called.
"""
def __init__(self, hass: HomeAssistant, entry_id: str) -> None:
"""Initialize the notification queue."""
self._store: Store[dict[str, Any]] = Store(
hass, STORAGE_VERSION, f"{STORAGE_KEY_PREFIX}.notification_queue.{entry_id}"
)
self._data: dict[str, Any] | None = None
async def async_load(self) -> None:
"""Load queue data from storage."""
self._data = await self._store.async_load() or {"queue": []}
_LOGGER.debug(
"Loaded notification queue with %d items",
len(self._data.get("queue", [])),
)
async def async_enqueue(self, notification_params: dict[str, Any]) -> None:
"""Add a notification to the queue."""
if self._data is None:
self._data = {"queue": []}
self._data["queue"].append({
"params": notification_params,
"queued_at": datetime.now(timezone.utc).isoformat(),
})
await self._store.async_save(self._data)
_LOGGER.debug("Queued notification during quiet hours (total: %d)", len(self._data["queue"]))
def get_all(self) -> list[dict[str, Any]]:
"""Get all queued notifications."""
if not self._data:
return []
return list(self._data.get("queue", []))
def has_pending(self) -> bool:
"""Check if there are pending notifications."""
return bool(self._data and self._data.get("queue"))
async def async_remove_indices(self, indices: list[int]) -> None:
"""Remove specific items by index (indices must be in descending order)."""
if not self._data or not indices:
return
for idx in indices:
if 0 <= idx < len(self._data["queue"]):
del self._data["queue"][idx]
await self._store.async_save(self._data)
async def async_clear(self) -> None:
"""Clear all queued notifications."""
if self._data:
self._data["queue"] = []
await self._store.async_save(self._data)
async def async_remove(self) -> None:
"""Remove all queue data."""
await self._store.async_remove()
self._data = None
# Re-export core types for backward compatibility
TelegramFileCache = CoreTelegramFileCache
NotificationQueue = CoreNotificationQueue