feat(cache): thumbhash-validated asset cache + settings UX overhaul
Cache engine: - TelegramFileCache: configurable max_entries (LRU cap applies in both TTL and thumbhash modes), ttl_seconds<=0 disables TTL, stats() method. - Dispatcher builds an asset.id -> thumbhash resolver from event.added_assets (Immich populates thumbhash in extra) and passes it to TelegramClient, so asset-cache entries invalidate on visual change rather than age. - Watcher wires app settings into cache init: URL cache = TTL + LRU cap, asset cache = thumbhash + LRU cap. Adds soft-reset (in-memory only) used when cache params change. Settings: - New key telegram_asset_cache_max_entries (default 5000). - telegram_cache_ttl_hours default bumped 48 -> 720 (30d); now URL-only. - PUT /settings resets in-memory caches when cache keys change (files kept). - New endpoints: GET/POST /settings/telegram-cache/stats and /clear. Settings page: - Cache stats card (count + size + oldest/newest per bucket) with a hint explaining that the size is cumulative uploaded-to-Telegram bytes. - Clear-cache button behind a confirm modal. - New TimezoneSelector + LocaleSelector components replace raw inputs. - max-entries input, TTL range updated (0..8760, 0 = disabled). Mobile nav: - "More" panel now mirrors the full sidebar tree (groups + subnodes) so every destination is reachable on mobile; previously flat hand-picked list. - Nav height uses env(safe-area-inset-bottom); panel bottom + z-index fixed so content can't visually overlay the bottom bar. A11y / DOM warnings: - Password-change form has a hidden username field for password-manager association; autocomplete hints on all three password inputs. - Telegram webhook secret wrapped in a no-op form + autocomplete=off. Bug fix: - update_settings used any(await ... for ...) which raised TypeError at runtime (async generator not an iterator); replaced with explicit loop.
This commit is contained in:
@@ -294,10 +294,24 @@ class NotificationDispatcher:
|
||||
await self._preload_asset_data(assets, media_assets, session, max_size)
|
||||
default_message = self._render_message(event, target, target.locale)
|
||||
|
||||
# Asset cache (when in thumbhash mode) invalidates entries when the
|
||||
# asset's visual content changes. The resolver maps asset id → its
|
||||
# current thumbhash. Providers that expose thumbhash put it in
|
||||
# ``asset.extra["thumbhash"]`` (currently Immich).
|
||||
thumbhash_map = {
|
||||
asset.id: asset.extra.get("thumbhash")
|
||||
for asset in event.added_assets
|
||||
if asset.extra.get("thumbhash")
|
||||
}
|
||||
thumbhash_resolver = (
|
||||
(lambda key: thumbhash_map.get(key)) if thumbhash_map else None
|
||||
)
|
||||
|
||||
client = TelegramClient(
|
||||
session, bot_token,
|
||||
url_cache=self._url_cache,
|
||||
asset_cache=self._asset_cache,
|
||||
thumbhash_resolver=thumbhash_resolver,
|
||||
)
|
||||
|
||||
for receiver in target.receivers:
|
||||
|
||||
@@ -11,56 +11,69 @@ from notify_bridge_core.storage import StorageBackend
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_TELEGRAM_CACHE_TTL = 48 * 60 * 60
|
||||
DEFAULT_MAX_ENTRIES = 5000
|
||||
|
||||
|
||||
class TelegramFileCache:
|
||||
"""Cache for Telegram file_ids to avoid re-uploading media.
|
||||
|
||||
Supports two validation modes:
|
||||
- TTL mode (default): entries expire after a configured time-to-live
|
||||
- Thumbhash mode: entries validated by comparing stored thumbhash with current
|
||||
"""
|
||||
Two complementary invalidation strategies, usable together or separately:
|
||||
- TTL: entries expire after ``ttl_seconds``. Set to 0 to disable TTL
|
||||
(cache essentially forever, subject only to the size cap).
|
||||
- Thumbhash mode: entries are validated on read by comparing the stored
|
||||
thumbhash with the one the caller supplies; a mismatch drops the entry.
|
||||
Intended for content-addressable assets (e.g. Immich) where re-uploads
|
||||
should be triggered by visual change, not elapsed time.
|
||||
|
||||
THUMBHASH_MAX_ENTRIES = 2000
|
||||
``max_entries`` always applies as an LRU size cap (by ``cached_at``).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
backend: StorageBackend,
|
||||
ttl_seconds: int = DEFAULT_TELEGRAM_CACHE_TTL,
|
||||
use_thumbhash: bool = False,
|
||||
max_entries: int = DEFAULT_MAX_ENTRIES,
|
||||
) -> None:
|
||||
self._backend = backend
|
||||
self._data: dict[str, Any] | None = None
|
||||
self._ttl_seconds = ttl_seconds
|
||||
self._use_thumbhash = use_thumbhash
|
||||
self._max_entries = max_entries
|
||||
|
||||
async def async_load(self) -> None:
|
||||
self._data = await self._backend.load() or {"files": {}}
|
||||
await self._cleanup_expired()
|
||||
|
||||
async def _cleanup_expired(self) -> None:
|
||||
if self._use_thumbhash:
|
||||
files = self._data.get("files", {}) if self._data else {}
|
||||
if len(files) > self.THUMBHASH_MAX_ENTRIES:
|
||||
sorted_keys = sorted(files, key=lambda k: files[k].get("cached_at", ""))
|
||||
for key in sorted_keys[: len(files) - self.THUMBHASH_MAX_ENTRIES]:
|
||||
del files[key]
|
||||
await self._backend.save(self._data)
|
||||
return
|
||||
|
||||
if not self._data or "files" not in self._data:
|
||||
return
|
||||
files = self._data["files"]
|
||||
changed = False
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
expired = [
|
||||
url for url, entry in self._data["files"].items()
|
||||
if entry.get("cached_at") and
|
||||
(now - datetime.fromisoformat(entry["cached_at"])).total_seconds() > self._ttl_seconds
|
||||
]
|
||||
|
||||
if expired:
|
||||
# TTL sweep — only when TTL validation is active (i.e. no thumbhash
|
||||
# mode and a positive TTL). In thumbhash mode we rely entirely on
|
||||
# content validation; in "TTL disabled" mode (ttl_seconds <= 0) we
|
||||
# cache forever, subject only to the size cap.
|
||||
if not self._use_thumbhash and self._ttl_seconds > 0:
|
||||
now = datetime.now(timezone.utc)
|
||||
expired = [
|
||||
url for url, entry in files.items()
|
||||
if entry.get("cached_at") and
|
||||
(now - datetime.fromisoformat(entry["cached_at"])).total_seconds() > self._ttl_seconds
|
||||
]
|
||||
for key in expired:
|
||||
del self._data["files"][key]
|
||||
del files[key]
|
||||
changed = True
|
||||
|
||||
# LRU cap — always enforced. Evicts oldest-cached entries first.
|
||||
if self._max_entries > 0 and len(files) > self._max_entries:
|
||||
sorted_keys = sorted(files, key=lambda k: files[k].get("cached_at", ""))
|
||||
for key in sorted_keys[: len(files) - self._max_entries]:
|
||||
del files[key]
|
||||
changed = True
|
||||
|
||||
if changed:
|
||||
await self._backend.save(self._data)
|
||||
|
||||
def get(self, key: str, thumbhash: str | None = None) -> dict[str, Any] | None:
|
||||
@@ -77,7 +90,7 @@ class TelegramFileCache:
|
||||
if stored and stored != thumbhash:
|
||||
del self._data["files"][key]
|
||||
return None
|
||||
else:
|
||||
elif self._ttl_seconds > 0:
|
||||
cached_at_str = entry.get("cached_at")
|
||||
if cached_at_str:
|
||||
age = (datetime.now(timezone.utc) - datetime.fromisoformat(cached_at_str)).total_seconds()
|
||||
@@ -152,3 +165,32 @@ class TelegramFileCache:
|
||||
async def async_remove(self) -> None:
|
||||
await self._backend.remove()
|
||||
self._data = None
|
||||
|
||||
def stats(self) -> dict[str, Any]:
|
||||
"""Return summary stats about the current cache contents.
|
||||
|
||||
Includes the number of cached entries, total tracked size in bytes
|
||||
(only counts entries with a recorded ``size``), and the oldest /
|
||||
newest ``cached_at`` timestamps (ISO strings, or ``None`` if empty).
|
||||
"""
|
||||
files = self._data.get("files", {}) if self._data else {}
|
||||
count = len(files)
|
||||
total_size = 0
|
||||
oldest: str | None = None
|
||||
newest: str | None = None
|
||||
for entry in files.values():
|
||||
size = entry.get("size")
|
||||
if isinstance(size, int):
|
||||
total_size += size
|
||||
cached_at = entry.get("cached_at")
|
||||
if cached_at:
|
||||
if oldest is None or cached_at < oldest:
|
||||
oldest = cached_at
|
||||
if newest is None or cached_at > newest:
|
||||
newest = cached_at
|
||||
return {
|
||||
"count": count,
|
||||
"total_size_bytes": total_size,
|
||||
"oldest": oldest,
|
||||
"newest": newest,
|
||||
}
|
||||
|
||||
@@ -20,7 +20,8 @@ router = APIRouter(prefix="/api/settings", tags=["settings"])
|
||||
_SETTING_KEYS = {
|
||||
"external_url": "NOTIFY_BRIDGE_EXTERNAL_URL",
|
||||
"telegram_webhook_secret": "NOTIFY_BRIDGE_TELEGRAM_WEBHOOK_SECRET",
|
||||
"telegram_cache_ttl_hours": None, # no env fallback, default 48
|
||||
"telegram_cache_ttl_hours": None, # URL cache TTL; 0 disables TTL
|
||||
"telegram_asset_cache_max_entries": None, # LRU cap for both caches
|
||||
"supported_locales": None, # comma-separated locale codes
|
||||
"timezone": "NOTIFY_BRIDGE_TIMEZONE", # IANA tz (e.g. "Europe/Warsaw"); empty = UTC
|
||||
}
|
||||
@@ -28,11 +29,18 @@ _SETTING_KEYS = {
|
||||
_DEFAULTS = {
|
||||
"external_url": "",
|
||||
"telegram_webhook_secret": "",
|
||||
"telegram_cache_ttl_hours": "48",
|
||||
# 720h = 30d. URL cache only; asset cache uses thumbhash validation
|
||||
# (content-addressable) and ignores TTL entirely.
|
||||
"telegram_cache_ttl_hours": "720",
|
||||
"telegram_asset_cache_max_entries": "5000",
|
||||
"supported_locales": "en,ru",
|
||||
"timezone": "UTC",
|
||||
}
|
||||
|
||||
# Settings whose changes require dropping in-memory Telegram caches so the
|
||||
# next dispatch rebuilds them with the new parameters. Files are preserved.
|
||||
_CACHE_SETTING_KEYS = {"telegram_cache_ttl_hours", "telegram_asset_cache_max_entries"}
|
||||
|
||||
|
||||
async def get_setting(session: AsyncSession, key: str) -> str:
|
||||
"""Read a setting from DB, falling back to env var then default."""
|
||||
@@ -51,6 +59,7 @@ class SettingsUpdate(BaseModel):
|
||||
external_url: str | None = None
|
||||
telegram_webhook_secret: str | None = None
|
||||
telegram_cache_ttl_hours: str | None = None
|
||||
telegram_asset_cache_max_entries: str | None = None
|
||||
supported_locales: str | None = None
|
||||
timezone: str | None = None
|
||||
|
||||
@@ -80,6 +89,7 @@ async def update_settings(
|
||||
"""Update app settings (admin). Re-registers webhooks when base URL changes."""
|
||||
old_base_url = await get_setting(session, "external_url")
|
||||
old_secret = await get_setting(session, "telegram_webhook_secret")
|
||||
old_cache_values = {k: await get_setting(session, k) for k in _CACHE_SETTING_KEYS}
|
||||
|
||||
for key in _SETTING_KEYS:
|
||||
value = getattr(body, key, None)
|
||||
@@ -93,6 +103,17 @@ async def update_settings(
|
||||
session.add(row)
|
||||
await session.commit()
|
||||
|
||||
# Drop in-memory caches if any cache-tuning setting actually changed, so
|
||||
# the next dispatch rebuilds them with the new parameters. Files survive.
|
||||
cache_changed = False
|
||||
for key in _CACHE_SETTING_KEYS:
|
||||
if await get_setting(session, key) != old_cache_values[key]:
|
||||
cache_changed = True
|
||||
break
|
||||
if cache_changed:
|
||||
from ..services.watcher import reset_telegram_caches_in_memory
|
||||
await reset_telegram_caches_in_memory()
|
||||
|
||||
new_base_url = await get_setting(session, "external_url")
|
||||
new_secret = await get_setting(session, "telegram_webhook_secret")
|
||||
|
||||
@@ -111,6 +132,25 @@ async def update_settings(
|
||||
return result
|
||||
|
||||
|
||||
@router.get("/telegram-cache/stats")
|
||||
async def telegram_cache_stats(
|
||||
user: User = Depends(require_admin),
|
||||
):
|
||||
"""Return counts and sizes for the Telegram file_id caches."""
|
||||
from ..services.watcher import get_telegram_cache_stats
|
||||
return await get_telegram_cache_stats()
|
||||
|
||||
|
||||
@router.post("/telegram-cache/clear")
|
||||
async def clear_telegram_cache(
|
||||
user: User = Depends(require_admin),
|
||||
):
|
||||
"""Clear the Telegram file_id cache (URL and asset) from disk and memory."""
|
||||
from ..services.watcher import clear_telegram_caches
|
||||
result = await clear_telegram_caches()
|
||||
return result
|
||||
|
||||
|
||||
@router.get("/locales")
|
||||
async def get_supported_locales(
|
||||
user: User = Depends(get_current_user),
|
||||
|
||||
@@ -35,8 +35,34 @@ _asset_cache: TelegramFileCache | None = None
|
||||
_cache_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def _load_cache_settings() -> tuple[int, int]:
|
||||
"""Return (url_ttl_seconds, asset_max_entries) from app settings.
|
||||
|
||||
Defaults apply when the settings rows are missing. Reads in a short-lived
|
||||
session to avoid coupling to the caller's transaction.
|
||||
"""
|
||||
from ..api.app_settings import get_setting
|
||||
async with AsyncSession(get_engine()) as session:
|
||||
ttl_hours_str = await get_setting(session, "telegram_cache_ttl_hours")
|
||||
max_entries_str = await get_setting(session, "telegram_asset_cache_max_entries")
|
||||
try:
|
||||
ttl_hours = int(ttl_hours_str) if ttl_hours_str else 720
|
||||
except ValueError:
|
||||
ttl_hours = 720
|
||||
try:
|
||||
max_entries = int(max_entries_str) if max_entries_str else 5000
|
||||
except ValueError:
|
||||
max_entries = 5000
|
||||
return ttl_hours * 3600, max_entries
|
||||
|
||||
|
||||
async def _get_telegram_caches() -> tuple[TelegramFileCache | None, TelegramFileCache | None]:
|
||||
"""Lazily initialize shared Telegram file caches using NOTIFY_BRIDGE_DATA_DIR."""
|
||||
"""Lazily initialize shared Telegram file caches using NOTIFY_BRIDGE_DATA_DIR.
|
||||
|
||||
The URL cache runs in TTL mode (URLs aren't content-addressable); the asset
|
||||
cache runs in thumbhash mode so entries invalidate on visual change rather
|
||||
than age. Both honor an LRU size cap from settings.
|
||||
"""
|
||||
global _url_cache, _asset_cache
|
||||
if _url_cache is not None:
|
||||
return _url_cache, _asset_cache
|
||||
@@ -50,16 +76,91 @@ async def _get_telegram_caches() -> tuple[TelegramFileCache | None, TelegramFile
|
||||
if not data_dir:
|
||||
return None, None
|
||||
cache_dir = Path(data_dir) / "cache"
|
||||
url_cache = TelegramFileCache(JsonFileBackend(cache_dir / "telegram_url_cache.json"))
|
||||
asset_cache = TelegramFileCache(JsonFileBackend(cache_dir / "telegram_asset_cache.json"))
|
||||
ttl_seconds, max_entries = await _load_cache_settings()
|
||||
url_cache = TelegramFileCache(
|
||||
JsonFileBackend(cache_dir / "telegram_url_cache.json"),
|
||||
ttl_seconds=ttl_seconds,
|
||||
max_entries=max_entries,
|
||||
)
|
||||
asset_cache = TelegramFileCache(
|
||||
JsonFileBackend(cache_dir / "telegram_asset_cache.json"),
|
||||
use_thumbhash=True,
|
||||
max_entries=max_entries,
|
||||
)
|
||||
await url_cache.async_load()
|
||||
await asset_cache.async_load()
|
||||
_url_cache = url_cache
|
||||
_asset_cache = asset_cache
|
||||
_LOGGER.info("Initialized Telegram file caches in %s", cache_dir)
|
||||
_LOGGER.info(
|
||||
"Initialized Telegram caches in %s (url ttl=%ds, max_entries=%d, asset thumbhash mode)",
|
||||
cache_dir, ttl_seconds, max_entries,
|
||||
)
|
||||
return _url_cache, _asset_cache
|
||||
|
||||
|
||||
async def reset_telegram_caches_in_memory() -> None:
|
||||
"""Drop in-memory cache refs without touching files on disk.
|
||||
|
||||
Used after settings changes so the next dispatch re-initializes caches
|
||||
with fresh parameters. Contrast with ``clear_telegram_caches`` which also
|
||||
deletes cached file_ids.
|
||||
"""
|
||||
global _url_cache, _asset_cache
|
||||
async with _cache_lock:
|
||||
_url_cache = None
|
||||
_asset_cache = None
|
||||
_LOGGER.info("Reset Telegram cache refs in memory (files preserved)")
|
||||
|
||||
|
||||
async def get_telegram_cache_stats() -> dict[str, Any]:
|
||||
"""Return stats for the URL and asset Telegram caches.
|
||||
|
||||
Loads caches lazily if they haven't been touched by a dispatch yet.
|
||||
Returns zero-counts when ``NOTIFY_BRIDGE_DATA_DIR`` is not configured.
|
||||
"""
|
||||
url_cache, asset_cache = await _get_telegram_caches()
|
||||
empty = {"count": 0, "total_size_bytes": 0, "oldest": None, "newest": None}
|
||||
return {
|
||||
"url": url_cache.stats() if url_cache else empty,
|
||||
"asset": asset_cache.stats() if asset_cache else empty,
|
||||
}
|
||||
|
||||
|
||||
async def clear_telegram_caches() -> dict[str, Any]:
|
||||
"""Delete both Telegram file caches from disk and reset in-memory state.
|
||||
|
||||
Next dispatch re-initializes the caches via `_get_telegram_caches()`.
|
||||
Returns a summary with the paths that were removed.
|
||||
"""
|
||||
global _url_cache, _asset_cache
|
||||
async with _cache_lock:
|
||||
removed: list[str] = []
|
||||
for cache, label in ((_url_cache, "url"), (_asset_cache, "asset")):
|
||||
if cache is not None:
|
||||
await cache.async_remove()
|
||||
removed.append(label)
|
||||
|
||||
# Also remove files from disk in case caches were never initialized
|
||||
# in this process (data_dir set but dispatch never ran).
|
||||
import os
|
||||
from pathlib import Path
|
||||
data_dir = os.environ.get("NOTIFY_BRIDGE_DATA_DIR")
|
||||
if data_dir:
|
||||
cache_dir = Path(data_dir) / "cache"
|
||||
for name in ("telegram_url_cache.json", "telegram_asset_cache.json"):
|
||||
path = cache_dir / name
|
||||
if path.exists():
|
||||
try:
|
||||
path.unlink()
|
||||
except OSError as e:
|
||||
_LOGGER.warning("Failed to remove %s: %s", path, e)
|
||||
|
||||
_url_cache = None
|
||||
_asset_cache = None
|
||||
_LOGGER.info("Cleared Telegram file caches: %s", removed or "none in memory")
|
||||
return {"cleared": True, "removed": removed}
|
||||
|
||||
|
||||
async def check_tracker(tracker_id: int) -> dict[str, Any]:
|
||||
"""Poll a tracker's provider for changes and dispatch notifications."""
|
||||
engine = get_engine()
|
||||
|
||||
Reference in New Issue
Block a user