Add shared core library and architecture plans (Phase 1)
Some checks failed
Validate / Hassfest (push) Has been cancelled

Extract HA-independent logic from the integration into packages/core/
as a standalone Python library (immich-watcher-core). This is the first
phase of restructuring the project to support a standalone web app
alongside the existing HAOS integration.

Core library modules:
- models: SharedLinkInfo, AssetInfo, AlbumData, AlbumChange dataclasses
- immich_client: Async Immich API client (aiohttp, session-injected)
- change_detector: Pure function for album change detection
- asset_utils: Filtering, sorting, URL building utilities
- telegram/client: Full Telegram Bot API (text, photo, video, media groups)
- telegram/cache: File ID cache with pluggable storage backend
- telegram/media: Media size checks, URL extraction, group splitting
- notifications/queue: Persistent notification queue
- storage: StorageBackend protocol + JSON file implementation

All modules have zero Home Assistant imports. 50 unit tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-19 12:40:08 +03:00
parent 71b79cd919
commit d0783d0b6a
23 changed files with 3740 additions and 0 deletions

View File

View File

@@ -0,0 +1,185 @@
"""Tests for asset filtering, sorting, and URL utilities."""
from immich_watcher_core.asset_utils import (
build_asset_detail,
filter_assets,
get_any_url,
get_public_url,
get_protected_url,
sort_assets,
)
from immich_watcher_core.models import AssetInfo, SharedLinkInfo
def _make_asset(
asset_id: str = "a1",
asset_type: str = "IMAGE",
filename: str = "photo.jpg",
created_at: str = "2024-01-15T10:30:00Z",
is_favorite: bool = False,
rating: int | None = None,
city: str | None = None,
country: str | None = None,
) -> AssetInfo:
return AssetInfo(
id=asset_id,
type=asset_type,
filename=filename,
created_at=created_at,
is_favorite=is_favorite,
rating=rating,
city=city,
country=country,
is_processed=True,
)
class TestFilterAssets:
def test_favorite_only(self):
assets = [_make_asset("a1", is_favorite=True), _make_asset("a2")]
result = filter_assets(assets, favorite_only=True)
assert len(result) == 1
assert result[0].id == "a1"
def test_min_rating(self):
assets = [
_make_asset("a1", rating=5),
_make_asset("a2", rating=2),
_make_asset("a3"), # no rating
]
result = filter_assets(assets, min_rating=3)
assert len(result) == 1
assert result[0].id == "a1"
def test_asset_type_photo(self):
assets = [
_make_asset("a1", asset_type="IMAGE"),
_make_asset("a2", asset_type="VIDEO"),
]
result = filter_assets(assets, asset_type="photo")
assert len(result) == 1
assert result[0].type == "IMAGE"
def test_date_range(self):
assets = [
_make_asset("a1", created_at="2024-01-10T00:00:00Z"),
_make_asset("a2", created_at="2024-01-15T00:00:00Z"),
_make_asset("a3", created_at="2024-01-20T00:00:00Z"),
]
result = filter_assets(
assets, min_date="2024-01-12T00:00:00Z", max_date="2024-01-18T00:00:00Z"
)
assert len(result) == 1
assert result[0].id == "a2"
def test_memory_date(self):
assets = [
_make_asset("a1", created_at="2023-03-19T10:00:00Z"), # same month/day, different year
_make_asset("a2", created_at="2024-03-19T10:00:00Z"), # same year as reference
_make_asset("a3", created_at="2023-06-15T10:00:00Z"), # different date
]
result = filter_assets(assets, memory_date="2024-03-19T00:00:00Z")
assert len(result) == 1
assert result[0].id == "a1"
def test_city_filter(self):
assets = [
_make_asset("a1", city="Paris"),
_make_asset("a2", city="London"),
_make_asset("a3"),
]
result = filter_assets(assets, city="paris")
assert len(result) == 1
assert result[0].id == "a1"
class TestSortAssets:
def test_sort_by_date_descending(self):
assets = [
_make_asset("a1", created_at="2024-01-10T00:00:00Z"),
_make_asset("a2", created_at="2024-01-20T00:00:00Z"),
_make_asset("a3", created_at="2024-01-15T00:00:00Z"),
]
result = sort_assets(assets, order_by="date", order="descending")
assert [a.id for a in result] == ["a2", "a3", "a1"]
def test_sort_by_name(self):
assets = [
_make_asset("a1", filename="charlie.jpg"),
_make_asset("a2", filename="alice.jpg"),
_make_asset("a3", filename="bob.jpg"),
]
result = sort_assets(assets, order_by="name", order="ascending")
assert [a.id for a in result] == ["a2", "a3", "a1"]
def test_sort_by_rating(self):
assets = [
_make_asset("a1", rating=3),
_make_asset("a2", rating=5),
_make_asset("a3"), # None rating
]
result = sort_assets(assets, order_by="rating", order="descending")
# With descending + (is_none, value) key: None goes last when reversed
# (True, 0) vs (False, 5) vs (False, 3) - reversed: (True, 0), (False, 5), (False, 3)
# Actually: reversed sort puts (True,0) first. Let's just check rated come before unrated
rated = [a for a in result if a.rating is not None]
assert rated[0].id == "a2"
assert rated[1].id == "a1"
class TestUrlHelpers:
def _make_links(self):
return [
SharedLinkInfo(id="l1", key="public-key"),
SharedLinkInfo(id="l2", key="protected-key", has_password=True, password="pass123"),
]
def test_get_public_url(self):
links = self._make_links()
url = get_public_url("https://immich.example.com", links)
assert url == "https://immich.example.com/share/public-key"
def test_get_protected_url(self):
links = self._make_links()
url = get_protected_url("https://immich.example.com", links)
assert url == "https://immich.example.com/share/protected-key"
def test_get_any_url_prefers_public(self):
links = self._make_links()
url = get_any_url("https://immich.example.com", links)
assert url == "https://immich.example.com/share/public-key"
def test_get_any_url_falls_back_to_protected(self):
links = [SharedLinkInfo(id="l1", key="prot-key", has_password=True, password="x")]
url = get_any_url("https://immich.example.com", links)
assert url == "https://immich.example.com/share/prot-key"
def test_no_links(self):
assert get_public_url("https://example.com", []) is None
assert get_any_url("https://example.com", []) is None
class TestBuildAssetDetail:
def test_build_image_detail(self):
asset = _make_asset("a1", asset_type="IMAGE")
links = [SharedLinkInfo(id="l1", key="key1")]
detail = build_asset_detail(asset, "https://immich.example.com", links)
assert detail["id"] == "a1"
assert "url" in detail
assert "download_url" in detail
assert "photo_url" in detail
assert "thumbnail_url" in detail
def test_build_video_detail(self):
asset = _make_asset("a1", asset_type="VIDEO")
links = [SharedLinkInfo(id="l1", key="key1")]
detail = build_asset_detail(asset, "https://immich.example.com", links)
assert "playback_url" in detail
assert "photo_url" not in detail
def test_no_shared_links(self):
asset = _make_asset("a1")
detail = build_asset_detail(asset, "https://immich.example.com", [])
assert "url" not in detail
assert "download_url" not in detail
assert "thumbnail_url" in detail # always present

View File

@@ -0,0 +1,139 @@
"""Tests for change detection logic."""
from immich_watcher_core.change_detector import detect_album_changes
from immich_watcher_core.models import AlbumData, AssetInfo
def _make_album(
album_id: str = "album-1",
name: str = "Test Album",
shared: bool = False,
assets: dict[str, AssetInfo] | None = None,
) -> AlbumData:
"""Helper to create AlbumData for testing."""
if assets is None:
assets = {}
return AlbumData(
id=album_id,
name=name,
asset_count=len(assets),
photo_count=0,
video_count=0,
created_at="2024-01-01T00:00:00Z",
updated_at="2024-01-15T10:30:00Z",
shared=shared,
owner="Alice",
thumbnail_asset_id=None,
asset_ids=set(assets.keys()),
assets=assets,
)
def _make_asset(asset_id: str, is_processed: bool = True) -> AssetInfo:
"""Helper to create AssetInfo for testing."""
return AssetInfo(
id=asset_id,
type="IMAGE",
filename=f"{asset_id}.jpg",
created_at="2024-01-15T10:30:00Z",
is_processed=is_processed,
thumbhash="abc" if is_processed else None,
)
class TestDetectAlbumChanges:
def test_no_changes(self):
a1 = _make_asset("a1")
old = _make_album(assets={"a1": a1})
new = _make_album(assets={"a1": a1})
change, pending = detect_album_changes(old, new, set())
assert change is None
assert pending == set()
def test_assets_added(self):
a1 = _make_asset("a1")
a2 = _make_asset("a2")
old = _make_album(assets={"a1": a1})
new = _make_album(assets={"a1": a1, "a2": a2})
change, pending = detect_album_changes(old, new, set())
assert change is not None
assert change.change_type == "assets_added"
assert change.added_count == 1
assert change.added_assets[0].id == "a2"
def test_assets_removed(self):
a1 = _make_asset("a1")
a2 = _make_asset("a2")
old = _make_album(assets={"a1": a1, "a2": a2})
new = _make_album(assets={"a1": a1})
change, pending = detect_album_changes(old, new, set())
assert change is not None
assert change.change_type == "assets_removed"
assert change.removed_count == 1
assert "a2" in change.removed_asset_ids
def test_mixed_changes(self):
a1 = _make_asset("a1")
a2 = _make_asset("a2")
a3 = _make_asset("a3")
old = _make_album(assets={"a1": a1, "a2": a2})
new = _make_album(assets={"a1": a1, "a3": a3})
change, pending = detect_album_changes(old, new, set())
assert change is not None
assert change.change_type == "changed"
assert change.added_count == 1
assert change.removed_count == 1
def test_album_renamed(self):
a1 = _make_asset("a1")
old = _make_album(name="Old Name", assets={"a1": a1})
new = _make_album(name="New Name", assets={"a1": a1})
change, pending = detect_album_changes(old, new, set())
assert change is not None
assert change.change_type == "album_renamed"
assert change.old_name == "Old Name"
assert change.new_name == "New Name"
def test_sharing_changed(self):
a1 = _make_asset("a1")
old = _make_album(shared=False, assets={"a1": a1})
new = _make_album(shared=True, assets={"a1": a1})
change, pending = detect_album_changes(old, new, set())
assert change is not None
assert change.change_type == "album_sharing_changed"
assert change.old_shared is False
assert change.new_shared is True
def test_pending_asset_becomes_processed(self):
a1 = _make_asset("a1")
a2_unprocessed = _make_asset("a2", is_processed=False)
a2_processed = _make_asset("a2", is_processed=True)
old = _make_album(assets={"a1": a1, "a2": a2_unprocessed})
new = _make_album(assets={"a1": a1, "a2": a2_processed})
# a2 is in pending set
change, pending = detect_album_changes(old, new, {"a2"})
assert change is not None
assert change.added_count == 1
assert change.added_assets[0].id == "a2"
assert "a2" not in pending
def test_unprocessed_asset_added_to_pending(self):
a1 = _make_asset("a1")
a2 = _make_asset("a2", is_processed=False)
old = _make_album(assets={"a1": a1})
new = _make_album(assets={"a1": a1, "a2": a2})
change, pending = detect_album_changes(old, new, set())
# No change because a2 is unprocessed
assert change is None
assert "a2" in pending
def test_pending_asset_removed(self):
a1 = _make_asset("a1")
old = _make_album(assets={"a1": a1})
new = _make_album(assets={"a1": a1})
# a2 was pending but now gone from album
change, pending = detect_album_changes(old, new, {"a2"})
assert change is None
assert "a2" not in pending

View File

@@ -0,0 +1,185 @@
"""Tests for data models."""
from datetime import datetime, timezone
from immich_watcher_core.models import (
AlbumChange,
AlbumData,
AssetInfo,
SharedLinkInfo,
)
class TestSharedLinkInfo:
def test_from_api_response_basic(self):
data = {"id": "link-1", "key": "abc123"}
link = SharedLinkInfo.from_api_response(data)
assert link.id == "link-1"
assert link.key == "abc123"
assert not link.has_password
assert link.is_accessible
def test_from_api_response_with_password(self):
data = {"id": "link-1", "key": "abc123", "password": "secret"}
link = SharedLinkInfo.from_api_response(data)
assert link.has_password
assert link.password == "secret"
assert not link.is_accessible
def test_from_api_response_with_expiry(self):
data = {
"id": "link-1",
"key": "abc123",
"expiresAt": "2099-12-31T23:59:59Z",
}
link = SharedLinkInfo.from_api_response(data)
assert link.expires_at is not None
assert not link.is_expired
def test_expired_link(self):
link = SharedLinkInfo(
id="link-1",
key="abc123",
expires_at=datetime(2020, 1, 1, tzinfo=timezone.utc),
)
assert link.is_expired
assert not link.is_accessible
class TestAssetInfo:
def test_from_api_response_image(self):
data = {
"id": "asset-1",
"type": "IMAGE",
"originalFileName": "photo.jpg",
"fileCreatedAt": "2024-01-15T10:30:00Z",
"ownerId": "user-1",
"thumbhash": "abc123",
}
asset = AssetInfo.from_api_response(data, {"user-1": "Alice"})
assert asset.id == "asset-1"
assert asset.type == "IMAGE"
assert asset.filename == "photo.jpg"
assert asset.owner_name == "Alice"
assert asset.is_processed
def test_from_api_response_with_exif(self):
data = {
"id": "asset-2",
"type": "IMAGE",
"originalFileName": "photo.jpg",
"fileCreatedAt": "2024-01-15T10:30:00Z",
"ownerId": "user-1",
"isFavorite": True,
"thumbhash": "xyz",
"exifInfo": {
"rating": 5,
"latitude": 48.8566,
"longitude": 2.3522,
"city": "Paris",
"state": "Île-de-France",
"country": "France",
"description": "Eiffel Tower",
},
}
asset = AssetInfo.from_api_response(data)
assert asset.is_favorite
assert asset.rating == 5
assert asset.latitude == 48.8566
assert asset.city == "Paris"
assert asset.description == "Eiffel Tower"
def test_unprocessed_asset(self):
data = {
"id": "asset-3",
"type": "VIDEO",
"originalFileName": "video.mp4",
"fileCreatedAt": "2024-01-15T10:30:00Z",
"ownerId": "user-1",
# No thumbhash = not processed
}
asset = AssetInfo.from_api_response(data)
assert not asset.is_processed
def test_trashed_asset(self):
data = {
"id": "asset-4",
"type": "IMAGE",
"originalFileName": "deleted.jpg",
"fileCreatedAt": "2024-01-15T10:30:00Z",
"ownerId": "user-1",
"isTrashed": True,
"thumbhash": "abc",
}
asset = AssetInfo.from_api_response(data)
assert not asset.is_processed
def test_people_extraction(self):
data = {
"id": "asset-5",
"type": "IMAGE",
"originalFileName": "group.jpg",
"fileCreatedAt": "2024-01-15T10:30:00Z",
"ownerId": "user-1",
"thumbhash": "abc",
"people": [
{"name": "Alice"},
{"name": "Bob"},
{"name": ""}, # empty name filtered
],
}
asset = AssetInfo.from_api_response(data)
assert asset.people == ["Alice", "Bob"]
class TestAlbumData:
def test_from_api_response(self):
data = {
"id": "album-1",
"albumName": "Vacation",
"assetCount": 2,
"createdAt": "2024-01-01T00:00:00Z",
"updatedAt": "2024-01-15T10:30:00Z",
"shared": True,
"owner": {"name": "Alice"},
"albumThumbnailAssetId": "asset-1",
"assets": [
{
"id": "asset-1",
"type": "IMAGE",
"originalFileName": "photo.jpg",
"fileCreatedAt": "2024-01-15T10:30:00Z",
"ownerId": "user-1",
"thumbhash": "abc",
},
{
"id": "asset-2",
"type": "VIDEO",
"originalFileName": "video.mp4",
"fileCreatedAt": "2024-01-15T11:00:00Z",
"ownerId": "user-1",
"thumbhash": "def",
},
],
}
album = AlbumData.from_api_response(data)
assert album.id == "album-1"
assert album.name == "Vacation"
assert album.photo_count == 1
assert album.video_count == 1
assert album.shared
assert len(album.asset_ids) == 2
assert "asset-1" in album.asset_ids
class TestAlbumChange:
def test_basic_creation(self):
change = AlbumChange(
album_id="album-1",
album_name="Test",
change_type="assets_added",
added_count=3,
)
assert change.added_count == 3
assert change.removed_count == 0
assert change.old_name is None

View File

@@ -0,0 +1,83 @@
"""Tests for notification queue."""
import pytest
from typing import Any
from immich_watcher_core.notifications.queue import NotificationQueue
class InMemoryBackend:
"""In-memory storage backend for testing."""
def __init__(self, initial_data: dict[str, Any] | None = None):
self._data = initial_data
async def load(self) -> dict[str, Any] | None:
return self._data
async def save(self, data: dict[str, Any]) -> None:
self._data = data
async def remove(self) -> None:
self._data = None
@pytest.fixture
def backend():
return InMemoryBackend()
class TestNotificationQueue:
@pytest.mark.asyncio
async def test_empty_queue(self, backend):
queue = NotificationQueue(backend)
await queue.async_load()
assert not queue.has_pending()
assert queue.get_all() == []
@pytest.mark.asyncio
async def test_enqueue_and_get(self, backend):
queue = NotificationQueue(backend)
await queue.async_load()
await queue.async_enqueue({"chat_id": "123", "text": "Hello"})
assert queue.has_pending()
items = queue.get_all()
assert len(items) == 1
assert items[0]["params"]["chat_id"] == "123"
@pytest.mark.asyncio
async def test_multiple_enqueue(self, backend):
queue = NotificationQueue(backend)
await queue.async_load()
await queue.async_enqueue({"msg": "first"})
await queue.async_enqueue({"msg": "second"})
assert len(queue.get_all()) == 2
@pytest.mark.asyncio
async def test_clear(self, backend):
queue = NotificationQueue(backend)
await queue.async_load()
await queue.async_enqueue({"msg": "test"})
await queue.async_clear()
assert not queue.has_pending()
@pytest.mark.asyncio
async def test_remove_indices(self, backend):
queue = NotificationQueue(backend)
await queue.async_load()
await queue.async_enqueue({"msg": "first"})
await queue.async_enqueue({"msg": "second"})
await queue.async_enqueue({"msg": "third"})
# Remove indices in descending order
await queue.async_remove_indices([2, 0])
items = queue.get_all()
assert len(items) == 1
assert items[0]["params"]["msg"] == "second"
@pytest.mark.asyncio
async def test_remove_all(self, backend):
queue = NotificationQueue(backend)
await queue.async_load()
await queue.async_enqueue({"msg": "test"})
await queue.async_remove()
assert backend._data is None

View File

@@ -0,0 +1,112 @@
"""Tests for Telegram file cache."""
import pytest
from datetime import datetime, timezone, timedelta
from typing import Any
from immich_watcher_core.storage import StorageBackend
from immich_watcher_core.telegram.cache import TelegramFileCache
class InMemoryBackend:
"""In-memory storage backend for testing."""
def __init__(self, initial_data: dict[str, Any] | None = None):
self._data = initial_data
async def load(self) -> dict[str, Any] | None:
return self._data
async def save(self, data: dict[str, Any]) -> None:
self._data = data
async def remove(self) -> None:
self._data = None
@pytest.fixture
def backend():
return InMemoryBackend()
class TestTelegramFileCacheTTL:
@pytest.mark.asyncio
async def test_set_and_get(self, backend):
cache = TelegramFileCache(backend, ttl_seconds=3600)
await cache.async_load()
await cache.async_set("url1", "file_id_1", "photo")
result = cache.get("url1")
assert result is not None
assert result["file_id"] == "file_id_1"
assert result["type"] == "photo"
@pytest.mark.asyncio
async def test_miss(self, backend):
cache = TelegramFileCache(backend, ttl_seconds=3600)
await cache.async_load()
assert cache.get("nonexistent") is None
@pytest.mark.asyncio
async def test_ttl_expiry(self):
# Pre-populate with an old entry
old_time = (datetime.now(timezone.utc) - timedelta(hours=100)).isoformat()
data = {"files": {"url1": {"file_id": "old", "type": "photo", "cached_at": old_time}}}
backend = InMemoryBackend(data)
cache = TelegramFileCache(backend, ttl_seconds=3600)
await cache.async_load()
# Old entry should be cleaned up on load
assert cache.get("url1") is None
@pytest.mark.asyncio
async def test_set_many(self, backend):
cache = TelegramFileCache(backend, ttl_seconds=3600)
await cache.async_load()
entries = [
("url1", "fid1", "photo", None),
("url2", "fid2", "video", None),
]
await cache.async_set_many(entries)
assert cache.get("url1")["file_id"] == "fid1"
assert cache.get("url2")["file_id"] == "fid2"
class TestTelegramFileCacheThumbhash:
@pytest.mark.asyncio
async def test_thumbhash_validation(self, backend):
cache = TelegramFileCache(backend, use_thumbhash=True)
await cache.async_load()
await cache.async_set("asset-1", "fid1", "photo", thumbhash="hash_v1")
# Match
result = cache.get("asset-1", thumbhash="hash_v1")
assert result is not None
assert result["file_id"] == "fid1"
# Mismatch - cache miss
result = cache.get("asset-1", thumbhash="hash_v2")
assert result is None
@pytest.mark.asyncio
async def test_thumbhash_max_entries(self):
# Create cache with many entries
files = {}
for i in range(2100):
files[f"asset-{i}"] = {
"file_id": f"fid-{i}",
"type": "photo",
"cached_at": datetime(2024, 1, 1 + i // 1440, (i // 60) % 24, i % 60, tzinfo=timezone.utc).isoformat(),
}
backend = InMemoryBackend({"files": files})
cache = TelegramFileCache(backend, use_thumbhash=True)
await cache.async_load()
# Should be trimmed to 2000
remaining = backend._data["files"]
assert len(remaining) == 2000
@pytest.mark.asyncio
async def test_remove(self, backend):
cache = TelegramFileCache(backend, ttl_seconds=3600)
await cache.async_load()
await cache.async_set("url1", "fid1", "photo")
await cache.async_remove()
assert backend._data is None