6 Commits

Author SHA1 Message Date
71b79cd919 Move quiet hours from hub config to per-call service params
All checks were successful
Validate / Hassfest (push) Successful in 1m19s
Quiet hours are now specified per send_telegram_notification call via
quiet_hours_start/quiet_hours_end params instead of being a hub-wide
integration option. This allows different automations to use different
quiet hours windows (or none at all).

- Remove quiet_hours_start/end from config options UI and const.py
- Add quiet_hours_start/end as optional HH:MM params on the service
- Remove ignore_quiet_hours param (omit quiet hours params to send immediately)
- Queue stores quiet_hours_end per item; each unique end time gets its
  own async_track_time_change timer for replay
- On startup, items whose quiet hours have passed are sent immediately
- Add async_remove_indices() to NotificationQueue for selective removal
- Timers are cleaned up when no more items need them

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 12:04:20 +03:00
678e8a6e62 Add quiet hours, fix Telegram bugs, and improve cache performance
All checks were successful
Validate / Hassfest (push) Successful in 5s
- Add quiet hours support to queue notifications during configured time windows
- Fix UnboundLocalError when single-item document chunk exceeds max_asset_data_size
- Fix document-only multi-item chunks being silently dropped (missing skip guard)
- Fix notification queue entity lookup by storing entity_id in queued params
- Fix quiet hours using OS timezone instead of HA-configured timezone (dt_util.now)
- Fix chat_action schema rejecting empty string from "Disabled" selector
- Fix stale thumbhash cache entries not being removed on mismatch
- Fix translation descriptions for send_large_photos_as_documents
- Add batch async_set_many() to TelegramFileCache to reduce disk writes
- Add max-entries eviction (2000) for thumbhash cache to prevent unbounded growth
- Eliminate redundant _is_asset_id/get_asset_thumbhash lookups in media group loop

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 09:45:34 +03:00
dd7032b411 Replace TTL with thumbhash-based cache validation and add Telegram video size limits
Some checks failed
Validate / Hassfest (push) Has been cancelled
- Asset cache now validates entries by comparing stored thumbhash with current
  Immich thumbhash instead of using TTL expiration. This makes cache invalidation
  precise (only when content actually changes) and eliminates unnecessary re-uploads.
  URL-based cache retains TTL for non-Immich URLs.
- Add TELEGRAM_MAX_VIDEO_SIZE (50 MB) check to skip oversized videos in both
  single-video and media-group paths, preventing entire groups from failing.
- Split media groups into sub-groups by cumulative upload size to ensure each
  sendMediaGroup request stays under Telegram's 50 MB upload limit.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 12:28:33 +03:00
65ca81a3f3 Link to local server repository
All checks were successful
Validate / Hassfest (push) Successful in 5s
2026-02-05 00:17:22 +03:00
3ba33a36cf Minor refactoring to use common const for telegram API url
All checks were successful
Validate / Hassfest (push) Successful in 4s
2026-02-04 17:46:14 +03:00
6ca3cae5df Add document type and content_type support for send_telegram_notification
All checks were successful
Validate / Hassfest (push) Successful in 3s
- Add type: document as default media type (instead of photo)
- Add optional content_type field for explicit MIME type specification
- Documents are sent separately (Telegram API limitation for media groups)
- Default content types: image/jpeg (photo), video/mp4 (video), auto-detect (document)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 01:35:57 +03:00
10 changed files with 1214 additions and 292 deletions

View File

@@ -4,7 +4,7 @@
A Home Assistant custom integration that monitors [Immich](https://immich.app/) photo/video library albums for changes and exposes them as Home Assistant entities with event-firing capabilities.
> **Tip:** For the best experience, use this integration with the [Immich Album Watcher Blueprint](https://github.com/DolgolyovAlexei/haos-blueprints/tree/main/Common/Immich%20Album%20Watcher) to easily create automations for album change notifications.
> **Tip:** For the best experience, use this integration with the [Immich Album Watcher Blueprint](https://git.dolgolyov-family.by/alexei.dolgolyov/haos-blueprints/src/branch/main/Common/Immich%20Album%20Watcher) to easily create automations for album change notifications.
## Features
@@ -37,7 +37,7 @@ A Home Assistant custom integration that monitors [Immich](https://immich.app/)
- **Services** - Custom service calls:
- `immich_album_watcher.refresh` - Force immediate data refresh
- `immich_album_watcher.get_assets` - Get assets from an album with filtering and ordering
- `immich_album_watcher.send_telegram_notification` - Send text, photo, video, or media group to Telegram
- `immich_album_watcher.send_telegram_notification` - Send text, photo, video, document, or media group to Telegram
- **Share Link Management** - Button entities to create and delete share links:
- Create/delete public (unprotected) share links
- Create/delete password-protected share links
@@ -334,14 +334,29 @@ data:
Send notifications to Telegram. Supports multiple formats:
- **Text message** - When `urls` is empty or not provided
- **Single photo** - When `urls` contains one photo
- **Single video** - When `urls` contains one video
- **Media group** - When `urls` contains multiple items
- **Text message** - When `assets` is empty or not provided
- **Single document** - When `assets` contains one document (default type)
- **Single photo** - When `assets` contains one photo (`type: photo`)
- **Single video** - When `assets` contains one video (`type: video`)
- **Media group** - When `assets` contains multiple photos/videos (documents are sent separately)
The service downloads media from Immich and uploads it to Telegram, bypassing any CORS restrictions. Large lists of media are automatically split into multiple media groups based on the `max_group_size` parameter (default: 10 items per group).
The service downloads media from Immich and uploads it to Telegram, bypassing any CORS restrictions. Large lists of photos and videos are automatically split into multiple media groups based on the `max_group_size` parameter (default: 10 items per group). Documents cannot be grouped and are sent individually.
**File ID Caching:** When media is uploaded to Telegram, the service caches the returned `file_id`. Subsequent sends of the same media will use the cached `file_id` instead of re-uploading, significantly improving performance. The cache TTL is configurable in hub options (default: 48 hours, range: 1-168 hours). The cache is persistent across Home Assistant restarts and is stored per album.
**File ID Caching:** When media is uploaded to Telegram, the service caches the returned `file_id`. Subsequent sends of the same media will use the cached `file_id` instead of re-uploading, significantly improving performance. The cache TTL is configurable in hub options (default: 48 hours, range: 1-168 hours). The cache is persistent across Home Assistant restarts and is shared across all albums in the hub.
**Dual Cache System:** The integration maintains two separate caches for optimal performance:
- **Asset ID Cache** - For Immich assets with extractable asset IDs (UUIDs). The same asset accessed via different URL types (thumbnail, original, video playback, share links) shares the same cache entry.
- **URL Cache** - For non-Immich URLs or URLs without extractable asset IDs. Also used when a custom `cache_key` is provided.
**Smart Cache Keys:** The service automatically extracts asset IDs from Immich URLs. Supported URL patterns:
- `/api/assets/{asset_id}/original`
- `/api/assets/{asset_id}/thumbnail`
- `/api/assets/{asset_id}/video/playback`
- `/share/{key}/photos/{asset_id}`
You can provide a custom `cache_key` per asset to override this behavior (stored in URL cache).
**Examples:**
@@ -357,6 +372,20 @@ data:
disable_web_page_preview: true
```
Single document (default):
```yaml
service: immich_album_watcher.send_telegram_notification
target:
entity_id: sensor.album_name_asset_limit
data:
chat_id: "-1001234567890"
assets:
- url: "https://immich.example.com/api/assets/xxx/original?key=yyy"
content_type: "image/heic" # Optional: explicit MIME type
caption: "Original file"
```
Single photo:
```yaml
@@ -365,7 +394,7 @@ target:
entity_id: sensor.album_name_asset_limit
data:
chat_id: "-1001234567890"
urls:
assets:
- url: "https://immich.example.com/api/assets/xxx/thumbnail?key=yyy"
type: photo
caption: "Beautiful sunset!"
@@ -379,7 +408,7 @@ target:
entity_id: sensor.album_name_asset_limit
data:
chat_id: "-1001234567890"
urls:
assets:
- url: "https://immich.example.com/api/assets/xxx/thumbnail?key=yyy"
type: photo
- url: "https://immich.example.com/api/assets/zzz/video/playback?key=yyy"
@@ -411,17 +440,32 @@ target:
entity_id: sensor.album_name_asset_limit
data:
chat_id: "-1001234567890"
urls:
assets:
- url: "https://immich.example.com/api/assets/xxx/thumbnail?key=yyy"
type: photo
caption: "Quick notification"
wait_for_response: false # Automation continues immediately
```
Using custom cache_key (useful when same media has different URLs):
```yaml
service: immich_album_watcher.send_telegram_notification
target:
entity_id: sensor.album_name_asset_limit
data:
chat_id: "-1001234567890"
assets:
- url: "https://immich.example.com/api/assets/xxx/thumbnail?key=yyy"
type: photo
cache_key: "asset_xxx" # Custom key for caching instead of URL
caption: "Photo with custom cache key"
```
| Field | Description | Required |
|-------|-------------|----------|
| `chat_id` | Telegram chat ID to send to | Yes |
| `urls` | List of media items with `url` and `type` (photo/video). Empty for text message. | No |
| `assets` | List of media items with `url`, optional `type` (document/photo/video, default: document), optional `content_type` (MIME type, e.g., `image/jpeg`), and optional `cache_key` (custom key for caching). Empty for text message. Photos and videos can be grouped; documents are sent separately. | No |
| `bot_token` | Telegram bot token (uses configured token if not provided) | No |
| `caption` | For media: caption applied to first item. For text: the message text. Supports HTML formatting by default. | No |
| `reply_to_message_id` | Message ID to reply to | No |

View File

@@ -4,9 +4,12 @@ from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import datetime, time as dt_time
from homeassistant.config_entries import ConfigEntry, ConfigSubentry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.event import async_track_time_change
from homeassistant.util import dt as dt_util
from .const import (
CONF_ALBUM_ID,
@@ -22,7 +25,7 @@ from .const import (
PLATFORMS,
)
from .coordinator import ImmichAlbumWatcherCoordinator
from .storage import ImmichAlbumStorage, TelegramFileCache
from .storage import ImmichAlbumStorage, NotificationQueue, TelegramFileCache
_LOGGER = logging.getLogger(__name__)
@@ -73,11 +76,31 @@ async def async_setup_entry(hass: HomeAssistant, entry: ImmichConfigEntry) -> bo
storage = ImmichAlbumStorage(hass, entry.entry_id)
await storage.async_load()
# Create and load Telegram file caches once per hub (shared across all albums)
# TTL is in hours from config, convert to seconds
cache_ttl_seconds = telegram_cache_ttl * 60 * 60
# URL-based cache for non-Immich URLs or URLs without extractable asset IDs
telegram_cache = TelegramFileCache(hass, entry.entry_id, ttl_seconds=cache_ttl_seconds)
await telegram_cache.async_load()
# Asset ID-based cache for Immich URLs — uses thumbhash validation instead of TTL
telegram_asset_cache = TelegramFileCache(
hass, f"{entry.entry_id}_assets", use_thumbhash=True
)
await telegram_asset_cache.async_load()
# Create notification queue for quiet hours
notification_queue = NotificationQueue(hass, entry.entry_id)
await notification_queue.async_load()
# Store hub reference
hass.data[DOMAIN][entry.entry_id] = {
"hub": entry.runtime_data,
"subentries": {},
"storage": storage,
"telegram_cache": telegram_cache,
"telegram_asset_cache": telegram_asset_cache,
"notification_queue": notification_queue,
"quiet_hours_unsubs": {}, # keyed by "HH:MM" end time
}
# Track loaded subentries to detect changes
@@ -90,6 +113,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: ImmichConfigEntry) -> bo
# Forward platform setup once - platforms will iterate through subentries
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
# Check if there are queued notifications from before restart
if notification_queue.has_pending():
_register_queue_timers(hass, entry)
# Process any items whose quiet hours have already ended
hass.async_create_task(_process_ready_notifications(hass, entry))
# Register update listener for options and subentry changes
entry.async_on_unload(entry.add_update_listener(_async_update_listener))
@@ -109,15 +138,11 @@ async def _async_setup_subentry_coordinator(
album_id = subentry.data[CONF_ALBUM_ID]
album_name = subentry.data.get(CONF_ALBUM_NAME, "Unknown Album")
storage: ImmichAlbumStorage = hass.data[DOMAIN][entry.entry_id]["storage"]
telegram_cache: TelegramFileCache = hass.data[DOMAIN][entry.entry_id]["telegram_cache"]
telegram_asset_cache: TelegramFileCache = hass.data[DOMAIN][entry.entry_id]["telegram_asset_cache"]
_LOGGER.debug("Setting up coordinator for album: %s (%s)", album_name, album_id)
# Create and load Telegram file cache for this album
# TTL is in hours from config, convert to seconds
cache_ttl_seconds = hub_data.telegram_cache_ttl * 60 * 60
telegram_cache = TelegramFileCache(hass, album_id, ttl_seconds=cache_ttl_seconds)
await telegram_cache.async_load()
# Create coordinator for this album
coordinator = ImmichAlbumWatcherCoordinator(
hass,
@@ -129,6 +154,7 @@ async def _async_setup_subentry_coordinator(
hub_name=hub_data.name,
storage=storage,
telegram_cache=telegram_cache,
telegram_asset_cache=telegram_asset_cache,
)
# Load persisted state before first refresh to detect changes during downtime
@@ -148,6 +174,195 @@ async def _async_setup_subentry_coordinator(
_LOGGER.info("Coordinator for album '%s' set up successfully", album_name)
def _is_quiet_hours(start_str: str, end_str: str) -> bool:
"""Check if current time is within quiet hours."""
if not start_str or not end_str:
return False
try:
now = dt_util.now().time()
start_time = dt_time.fromisoformat(start_str)
end_time = dt_time.fromisoformat(end_str)
except ValueError:
return False
if start_time <= end_time:
return start_time <= now < end_time
else:
# Crosses midnight (e.g., 22:00 - 08:00)
return now >= start_time or now < end_time
def _register_queue_timers(hass: HomeAssistant, entry: ImmichConfigEntry) -> None:
"""Register timers for each unique quiet_hours_end in the queue."""
entry_data = hass.data[DOMAIN][entry.entry_id]
queue: NotificationQueue = entry_data["notification_queue"]
unsubs: dict[str, list] = entry_data["quiet_hours_unsubs"]
# Collect unique end times from queued items
end_times: set[str] = set()
for item in queue.get_all():
end_str = item.get("params", {}).get("quiet_hours_end", "")
if end_str:
end_times.add(end_str)
for end_str in end_times:
if end_str in unsubs:
continue # Timer already registered for this end time
try:
end_time = dt_time.fromisoformat(end_str)
except ValueError:
_LOGGER.warning("Invalid quiet hours end time in queue: %s", end_str)
continue
async def _on_quiet_hours_end(_now: datetime, _end_str: str = end_str) -> None:
"""Handle quiet hours end — process matching queued notifications."""
_LOGGER.info("Quiet hours ended (%s), processing queued notifications", _end_str)
await _process_notifications_for_end_time(hass, entry, _end_str)
unsub = async_track_time_change(
hass, _on_quiet_hours_end, hour=end_time.hour, minute=end_time.minute, second=0
)
unsubs[end_str] = unsub
entry.async_on_unload(unsub)
_LOGGER.debug("Registered quiet hours timer for %s", end_str)
def _unregister_queue_timer(hass: HomeAssistant, entry: ImmichConfigEntry, end_str: str) -> None:
"""Unregister a quiet hours timer if no more items need it."""
entry_data = hass.data[DOMAIN][entry.entry_id]
queue: NotificationQueue = entry_data["notification_queue"]
unsubs: dict[str, list] = entry_data["quiet_hours_unsubs"]
# Check if any remaining items still use this end time
for item in queue.get_all():
if item.get("params", {}).get("quiet_hours_end", "") == end_str:
return # Still needed
unsub = unsubs.pop(end_str, None)
if unsub:
unsub()
_LOGGER.debug("Unregistered quiet hours timer for %s (no more items)", end_str)
async def _process_ready_notifications(
hass: HomeAssistant, entry: ImmichConfigEntry
) -> None:
"""Process queued notifications whose quiet hours have already ended."""
entry_data = hass.data[DOMAIN].get(entry.entry_id)
if not entry_data:
return
queue: NotificationQueue = entry_data["notification_queue"]
items = queue.get_all()
if not items:
return
# Find items whose quiet hours have ended
ready_indices = []
for i, item in enumerate(items):
params = item.get("params", {})
start_str = params.get("quiet_hours_start", "")
end_str = params.get("quiet_hours_end", "")
if not _is_quiet_hours(start_str, end_str):
ready_indices.append(i)
if not ready_indices:
return
_LOGGER.info("Found %d queued notifications ready to send (quiet hours ended)", len(ready_indices))
await _send_queued_items(hass, entry, ready_indices)
async def _process_notifications_for_end_time(
hass: HomeAssistant, entry: ImmichConfigEntry, end_str: str
) -> None:
"""Process queued notifications matching a specific quiet_hours_end time."""
entry_data = hass.data[DOMAIN].get(entry.entry_id)
if not entry_data:
return
queue: NotificationQueue = entry_data["notification_queue"]
items = queue.get_all()
if not items:
return
# Find items matching this end time that are no longer in quiet hours
matching_indices = []
for i, item in enumerate(items):
params = item.get("params", {})
if params.get("quiet_hours_end", "") == end_str:
start_str = params.get("quiet_hours_start", "")
if not _is_quiet_hours(start_str, end_str):
matching_indices.append(i)
if not matching_indices:
return
_LOGGER.info("Processing %d queued notifications for quiet hours end %s", len(matching_indices), end_str)
await _send_queued_items(hass, entry, matching_indices)
# Clean up timer if no more items need it
_unregister_queue_timer(hass, entry, end_str)
async def _send_queued_items(
hass: HomeAssistant, entry: ImmichConfigEntry, indices: list[int]
) -> None:
"""Send specific queued notifications by index and remove them from the queue."""
import asyncio
from homeassistant.helpers import entity_registry as er
entry_data = hass.data[DOMAIN].get(entry.entry_id)
if not entry_data:
return
queue: NotificationQueue = entry_data["notification_queue"]
# Find a fallback sensor entity
ent_reg = er.async_get(hass)
fallback_entity_id = None
for ent in er.async_entries_for_config_entry(ent_reg, entry.entry_id):
if ent.domain == "sensor":
fallback_entity_id = ent.entity_id
break
if not fallback_entity_id:
_LOGGER.warning("No sensor entity found to process notification queue")
return
items = queue.get_all()
sent_count = 0
for i in indices:
if i >= len(items):
continue
params = dict(items[i].get("params", {}))
try:
target_entity_id = params.pop("entity_id", None) or fallback_entity_id
# Remove quiet hours params so the replay doesn't re-queue
params.pop("quiet_hours_start", None)
params.pop("quiet_hours_end", None)
await hass.services.async_call(
DOMAIN,
"send_telegram_notification",
params,
target={"entity_id": target_entity_id},
blocking=True,
)
sent_count += 1
except Exception:
_LOGGER.exception("Failed to send queued notification %d", i + 1)
# Small delay between notifications to avoid rate limiting
await asyncio.sleep(1)
# Remove sent items from queue (in reverse order to preserve indices)
await queue.async_remove_indices(sorted(indices, reverse=True))
_LOGGER.info("Sent %d/%d queued notifications", sent_count, len(indices))
async def _async_update_listener(
hass: HomeAssistant, entry: ImmichConfigEntry
) -> None:
@@ -166,7 +381,7 @@ async def _async_update_listener(
await hass.config_entries.async_reload(entry.entry_id)
return
# Handle options-only update (scan interval change)
# Handle options-only update
new_interval = entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
# Update hub data
@@ -177,11 +392,16 @@ async def _async_update_listener(
for subentry_data in subentries_data.values():
subentry_data.coordinator.update_scan_interval(new_interval)
_LOGGER.info("Updated scan interval to %d seconds", new_interval)
_LOGGER.info("Updated hub options (scan_interval=%d)", new_interval)
async def async_unload_entry(hass: HomeAssistant, entry: ImmichConfigEntry) -> bool:
"""Unload a config entry."""
# Cancel all quiet hours timers
entry_data = hass.data[DOMAIN].get(entry.entry_id, {})
for unsub in entry_data.get("quiet_hours_unsubs", {}).values():
unsub()
# Unload all platforms
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -260,6 +260,13 @@ class ImmichAlbumWatcherOptionsFlow(OptionsFlow):
},
)
return self.async_show_form(
step_id="init",
data_schema=self._build_options_schema(),
)
def _build_options_schema(self) -> vol.Schema:
"""Build the options form schema."""
current_interval = self._config_entry.options.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
)
@@ -270,21 +277,18 @@ class ImmichAlbumWatcherOptionsFlow(OptionsFlow):
CONF_TELEGRAM_CACHE_TTL, DEFAULT_TELEGRAM_CACHE_TTL
)
return self.async_show_form(
step_id="init",
data_schema=vol.Schema(
{
vol.Required(
CONF_SCAN_INTERVAL, default=current_interval
): vol.All(vol.Coerce(int), vol.Range(min=10, max=3600)),
vol.Optional(
CONF_TELEGRAM_BOT_TOKEN, default=current_bot_token
): str,
vol.Optional(
CONF_TELEGRAM_CACHE_TTL, default=current_cache_ttl
): vol.All(vol.Coerce(int), vol.Range(min=1, max=168)),
}
),
return vol.Schema(
{
vol.Required(
CONF_SCAN_INTERVAL, default=current_interval
): vol.All(vol.Coerce(int), vol.Range(min=10, max=3600)),
vol.Optional(
CONF_TELEGRAM_BOT_TOKEN, default=current_bot_token
): str,
vol.Optional(
CONF_TELEGRAM_CACHE_TTL, default=current_cache_ttl
): vol.All(vol.Coerce(int), vol.Range(min=1, max=168)),
}
)

View File

@@ -131,6 +131,7 @@ class AssetInfo:
state: str | None = None
country: str | None = None
is_processed: bool = True # Whether asset is fully processed by Immich
thumbhash: str | None = None # Perceptual hash for cache validation
@classmethod
def from_api_response(
@@ -169,6 +170,7 @@ class AssetInfo:
# Check if asset is fully processed by Immich
asset_type = data.get("type", ASSET_TYPE_IMAGE)
is_processed = cls._check_processing_status(data, asset_type)
thumbhash = data.get("thumbhash")
return cls(
id=data["id"],
@@ -187,6 +189,7 @@ class AssetInfo:
state=state,
country=country,
is_processed=is_processed,
thumbhash=thumbhash,
)
@staticmethod
@@ -336,6 +339,7 @@ class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]):
hub_name: str = "Immich",
storage: ImmichAlbumStorage | None = None,
telegram_cache: TelegramFileCache | None = None,
telegram_asset_cache: TelegramFileCache | None = None,
) -> None:
"""Initialize the coordinator."""
super().__init__(
@@ -356,6 +360,7 @@ class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]):
self._shared_links: list[SharedLinkInfo] = []
self._storage = storage
self._telegram_cache = telegram_cache
self._telegram_asset_cache = telegram_asset_cache
self._persisted_asset_ids: set[str] | None = None
self._external_domain: str | None = None # Fetched from server config
self._pending_asset_ids: set[str] = set() # Assets detected but not yet processed
@@ -411,9 +416,20 @@ class ImmichAlbumWatcherCoordinator(DataUpdateCoordinator[AlbumData | None]):
@property
def telegram_cache(self) -> TelegramFileCache | None:
"""Return the Telegram file cache."""
"""Return the Telegram file cache (URL-based)."""
return self._telegram_cache
@property
def telegram_asset_cache(self) -> TelegramFileCache | None:
"""Return the Telegram asset cache (asset ID-based)."""
return self._telegram_asset_cache
def get_asset_thumbhash(self, asset_id: str) -> str | None:
"""Get the current thumbhash for an asset from coordinator data."""
if self.data and asset_id in self.data.assets:
return self.data.assets[asset_id].thumbhash
return None
def update_scan_interval(self, scan_interval: int) -> None:
"""Update the scan interval."""
self.update_interval = timedelta(seconds=scan_interval)

View File

@@ -8,5 +8,5 @@
"iot_class": "cloud_polling",
"issue_tracker": "https://github.com/DolgolyovAlexei/haos-hacs-immich-album-watcher/issues",
"requirements": [],
"version": "2.7.1"
"version": "2.8.0"
}

File diff suppressed because it is too large Load Diff

View File

@@ -131,7 +131,7 @@ get_assets:
send_telegram_notification:
name: Send Telegram Notification
description: Send a notification to Telegram (text, photo, video, or media group).
description: Send a notification to Telegram (text, photo, video, document, or media group).
target:
entity:
integration: immich_album_watcher
@@ -149,9 +149,9 @@ send_telegram_notification:
required: true
selector:
text:
urls:
name: URLs
description: List of media URLs to send. Each item should have 'url' and 'type' (photo/video). If empty, sends a text message. Large lists are automatically split into multiple media groups.
assets:
name: Assets
description: "List of media assets to send. Each item should have 'url', optional 'type' (document/photo/video, default: document), optional 'content_type' (MIME type, e.g., 'image/jpeg'), and optional 'cache_key' (custom key for caching instead of URL). If empty, sends a text message. Photos and videos can be grouped; documents are sent separately."
required: false
selector:
object:
@@ -256,3 +256,15 @@ send_telegram_notification:
value: "upload_document"
- label: "Disabled"
value: ""
quiet_hours_start:
name: Quiet Hours Start
description: "Start time for quiet hours (HH:MM format, e.g. 22:00). When set along with quiet_hours_end, notifications during this period are queued and sent when quiet hours end. Omit to send immediately."
required: false
selector:
text:
quiet_hours_end:
name: Quiet Hours End
description: "End time for quiet hours (HH:MM format, e.g. 08:00). Queued notifications will be sent at this time."
required: false
selector:
text:

View File

@@ -73,40 +73,70 @@ class TelegramFileCache:
When a file is uploaded to Telegram, it returns a file_id that can be reused
to send the same file without re-uploading. This cache stores these file_ids
keyed by the source URL.
keyed by the source URL or asset ID.
Supports two validation modes:
- TTL mode (default): entries expire after a configured time-to-live
- Thumbhash mode: entries are validated by comparing stored thumbhash with
the current asset thumbhash from Immich
"""
def __init__(
self,
hass: HomeAssistant,
album_id: str,
entry_id: str,
ttl_seconds: int = DEFAULT_TELEGRAM_CACHE_TTL,
use_thumbhash: bool = False,
) -> None:
"""Initialize the Telegram file cache.
Args:
hass: Home Assistant instance
album_id: Album ID for scoping the cache
ttl_seconds: Time-to-live for cache entries in seconds (default: 48 hours)
entry_id: Config entry ID for scoping the cache (per hub)
ttl_seconds: Time-to-live for cache entries in seconds (TTL mode only)
use_thumbhash: Use thumbhash-based validation instead of TTL
"""
self._store: Store[dict[str, Any]] = Store(
hass, STORAGE_VERSION, f"{STORAGE_KEY_PREFIX}.telegram_cache.{album_id}"
hass, STORAGE_VERSION, f"{STORAGE_KEY_PREFIX}.telegram_cache.{entry_id}"
)
self._data: dict[str, Any] | None = None
self._ttl_seconds = ttl_seconds
self._use_thumbhash = use_thumbhash
async def async_load(self) -> None:
"""Load cache data from storage."""
self._data = await self._store.async_load() or {"files": {}}
# Clean up expired entries on load
# Clean up expired entries on load (TTL mode only)
await self._cleanup_expired()
mode = "thumbhash" if self._use_thumbhash else "TTL"
_LOGGER.debug(
"Loaded Telegram file cache with %d entries",
"Loaded Telegram file cache with %d entries (mode: %s)",
len(self._data.get("files", {})),
mode,
)
# Maximum number of entries to keep in thumbhash mode to prevent unbounded growth
THUMBHASH_MAX_ENTRIES = 2000
async def _cleanup_expired(self) -> None:
"""Remove expired cache entries."""
"""Remove expired cache entries (TTL mode) or trim old entries (thumbhash mode)."""
if self._use_thumbhash:
files = self._data.get("files", {}) if self._data else {}
if len(files) > self.THUMBHASH_MAX_ENTRIES:
sorted_keys = sorted(
files, key=lambda k: files[k].get("cached_at", "")
)
keys_to_remove = sorted_keys[: len(files) - self.THUMBHASH_MAX_ENTRIES]
for key in keys_to_remove:
del files[key]
await self._store.async_save(self._data)
_LOGGER.debug(
"Trimmed thumbhash cache from %d to %d entries",
len(keys_to_remove) + self.THUMBHASH_MAX_ENTRIES,
self.THUMBHASH_MAX_ENTRIES,
)
return
if not self._data or "files" not in self._data:
return
@@ -127,55 +157,171 @@ class TelegramFileCache:
await self._store.async_save(self._data)
_LOGGER.debug("Cleaned up %d expired Telegram cache entries", len(expired_keys))
def get(self, url: str) -> dict[str, Any] | None:
"""Get cached file_id for a URL.
def get(self, key: str, thumbhash: str | None = None) -> dict[str, Any] | None:
"""Get cached file_id for a key.
Args:
url: The source URL of the media
key: The cache key (URL or asset ID)
thumbhash: Current thumbhash for validation (thumbhash mode only).
If provided, compares with stored thumbhash. Mismatch = cache miss.
Returns:
Dict with 'file_id' and 'type' if cached and not expired, None otherwise
Dict with 'file_id' and 'type' if cached and valid, None otherwise
"""
if not self._data or "files" not in self._data:
return None
entry = self._data["files"].get(url)
entry = self._data["files"].get(key)
if not entry:
return None
# Check if expired
cached_at_str = entry.get("cached_at")
if cached_at_str:
cached_at = datetime.fromisoformat(cached_at_str)
age_seconds = (datetime.now(timezone.utc) - cached_at).total_seconds()
if age_seconds > self._ttl_seconds:
return None
if self._use_thumbhash:
# Thumbhash-based validation
if thumbhash is not None:
stored_thumbhash = entry.get("thumbhash")
if stored_thumbhash and stored_thumbhash != thumbhash:
_LOGGER.debug(
"Cache miss for %s: thumbhash changed, removing stale entry",
key[:36],
)
del self._data["files"][key]
return None
# If no thumbhash provided (asset not in monitored album),
# return cached entry anyway — self-heals on Telegram rejection
else:
# TTL-based validation
cached_at_str = entry.get("cached_at")
if cached_at_str:
cached_at = datetime.fromisoformat(cached_at_str)
age_seconds = (datetime.now(timezone.utc) - cached_at).total_seconds()
if age_seconds > self._ttl_seconds:
return None
return {
"file_id": entry.get("file_id"),
"type": entry.get("type"),
}
async def async_set(self, url: str, file_id: str, media_type: str) -> None:
"""Store a file_id for a URL.
async def async_set(
self, key: str, file_id: str, media_type: str, thumbhash: str | None = None
) -> None:
"""Store a file_id for a key.
Args:
url: The source URL of the media
key: The cache key (URL or asset ID)
file_id: The Telegram file_id
media_type: The type of media ('photo', 'video', 'document')
thumbhash: Current thumbhash to store alongside file_id (thumbhash mode only)
"""
if self._data is None:
self._data = {"files": {}}
self._data["files"][url] = {
entry_data: dict[str, Any] = {
"file_id": file_id,
"type": media_type,
"cached_at": datetime.now(timezone.utc).isoformat(),
}
if thumbhash is not None:
entry_data["thumbhash"] = thumbhash
self._data["files"][key] = entry_data
await self._store.async_save(self._data)
_LOGGER.debug("Cached Telegram file_id for URL (type: %s)", media_type)
_LOGGER.debug("Cached Telegram file_id for key (type: %s)", media_type)
async def async_set_many(
self, entries: list[tuple[str, str, str, str | None]]
) -> None:
"""Store multiple file_ids in a single disk write.
Args:
entries: List of (key, file_id, media_type, thumbhash) tuples
"""
if not entries:
return
if self._data is None:
self._data = {"files": {}}
now_iso = datetime.now(timezone.utc).isoformat()
for key, file_id, media_type, thumbhash in entries:
entry_data: dict[str, Any] = {
"file_id": file_id,
"type": media_type,
"cached_at": now_iso,
}
if thumbhash is not None:
entry_data["thumbhash"] = thumbhash
self._data["files"][key] = entry_data
await self._store.async_save(self._data)
_LOGGER.debug("Batch cached %d Telegram file_ids", len(entries))
async def async_remove(self) -> None:
"""Remove all cache data."""
await self._store.async_remove()
self._data = None
class NotificationQueue:
"""Persistent queue for notifications deferred during quiet hours.
Stores full service call parameters so notifications can be replayed
exactly as they were originally called.
"""
def __init__(self, hass: HomeAssistant, entry_id: str) -> None:
"""Initialize the notification queue."""
self._store: Store[dict[str, Any]] = Store(
hass, STORAGE_VERSION, f"{STORAGE_KEY_PREFIX}.notification_queue.{entry_id}"
)
self._data: dict[str, Any] | None = None
async def async_load(self) -> None:
"""Load queue data from storage."""
self._data = await self._store.async_load() or {"queue": []}
_LOGGER.debug(
"Loaded notification queue with %d items",
len(self._data.get("queue", [])),
)
async def async_enqueue(self, notification_params: dict[str, Any]) -> None:
"""Add a notification to the queue."""
if self._data is None:
self._data = {"queue": []}
self._data["queue"].append({
"params": notification_params,
"queued_at": datetime.now(timezone.utc).isoformat(),
})
await self._store.async_save(self._data)
_LOGGER.debug("Queued notification during quiet hours (total: %d)", len(self._data["queue"]))
def get_all(self) -> list[dict[str, Any]]:
"""Get all queued notifications."""
if not self._data:
return []
return list(self._data.get("queue", []))
def has_pending(self) -> bool:
"""Check if there are pending notifications."""
return bool(self._data and self._data.get("queue"))
async def async_remove_indices(self, indices: list[int]) -> None:
"""Remove specific items by index (indices must be in descending order)."""
if not self._data or not indices:
return
for idx in indices:
if 0 <= idx < len(self._data["queue"]):
del self._data["queue"][idx]
await self._store.async_save(self._data)
async def async_clear(self) -> None:
"""Clear all queued notifications."""
if self._data:
self._data["queue"] = []
await self._store.async_save(self._data)
async def async_remove(self) -> None:
"""Remove all queue data."""
await self._store.async_remove()
self._data = None

View File

@@ -195,7 +195,7 @@
},
"send_telegram_notification": {
"name": "Send Telegram Notification",
"description": "Send a notification to Telegram (text, photo, video, or media group).",
"description": "Send a notification to Telegram (text, photo, video, document, or media group).",
"fields": {
"bot_token": {
"name": "Bot Token",
@@ -205,9 +205,9 @@
"name": "Chat ID",
"description": "Telegram chat ID to send to."
},
"urls": {
"name": "URLs",
"description": "List of media URLs with type (photo/video). If empty, sends a text message. Large lists are automatically split into multiple media groups."
"assets": {
"name": "Assets",
"description": "List of media assets with 'url', optional 'type' (document/photo/video, default: document), optional 'content_type' (MIME type), and optional 'cache_key' (custom key for caching instead of URL). If empty, sends a text message. Photos and videos can be grouped; documents are sent separately."
},
"caption": {
"name": "Caption",
@@ -243,11 +243,19 @@
},
"send_large_photos_as_documents": {
"name": "Send Large Photos As Documents",
"description": "How to handle photos exceeding Telegram's limits (10MB or 10000px dimension sum). If true, send as documents. If false, downsize to fit limits."
"description": "How to handle photos exceeding Telegram's limits (10MB or 10000px dimension sum). If true, send as documents. If false, skip oversized photos."
},
"chat_action": {
"name": "Chat Action",
"description": "Chat action to display while processing (typing, upload_photo, upload_video, upload_document). Set to empty to disable."
},
"quiet_hours_start": {
"name": "Quiet Hours Start",
"description": "Start time for quiet hours (HH:MM format, e.g. 22:00). Notifications during this period are queued and sent when quiet hours end. Omit to send immediately."
},
"quiet_hours_end": {
"name": "Quiet Hours End",
"description": "End time for quiet hours (HH:MM format, e.g. 08:00). Queued notifications will be sent at this time."
}
}
}

View File

@@ -195,7 +195,7 @@
},
"send_telegram_notification": {
"name": "Отправить уведомление в Telegram",
"description": "Отправить уведомление в Telegram (текст, фото, видео или медиа-группу).",
"description": "Отправить уведомление в Telegram (текст, фото, видео, документ или медиа-группу).",
"fields": {
"bot_token": {
"name": "Токен бота",
@@ -205,9 +205,9 @@
"name": "ID чата",
"description": "ID чата Telegram для отправки."
},
"urls": {
"name": "URL-адреса",
"description": "Список URL медиа-файлов с типом (photo/video). Если пусто, отправляет текстовое сообщение. Большие списки автоматически разделяются на несколько медиа-групп."
"assets": {
"name": "Ресурсы",
"description": "Список медиа-ресурсов с 'url', опциональным 'type' (document/photo/video, по умолчанию document), опциональным 'content_type' (MIME-тип) и опциональным 'cache_key' (свой ключ кэширования вместо URL). Если пусто, отправляет текстовое сообщение. Фото и видео группируются; документы отправляются отдельно."
},
"caption": {
"name": "Подпись",
@@ -243,11 +243,19 @@
},
"send_large_photos_as_documents": {
"name": "Большие фото как документы",
"description": "Как обрабатывать фото, превышающие лимиты Telegram (10МБ или сумма размеров 10000пкс). Если true, отправлять как документы. Если false, уменьшать для соответствия лимитам."
"description": "Как обрабатывать фото, превышающие лимиты Telegram (10МБ или сумма размеров 10000пкс). Если true, отправлять как документы. Если false, пропускать."
},
"chat_action": {
"name": "Действие в чате",
"description": "Действие для отображения во время обработки (typing, upload_photo, upload_video, upload_document). Оставьте пустым для отключения."
},
"quiet_hours_start": {
"name": "Начало тихих часов",
"description": "Время начала тихих часов (формат ЧЧ:ММ, например 22:00). Уведомления в этот период ставятся в очередь и отправляются по окончании. Не указывайте для немедленной отправки."
},
"quiet_hours_end": {
"name": "Конец тихих часов",
"description": "Время окончания тихих часов (формат ЧЧ:ММ, например 08:00). Уведомления из очереди будут отправлены в это время."
}
}
}