diff --git a/packages/core/src/notify_bridge_core/notifications/dispatcher.py b/packages/core/src/notify_bridge_core/notifications/dispatcher.py
index 3afa6c3..68bf1a4 100644
--- a/packages/core/src/notify_bridge_core/notifications/dispatcher.py
+++ b/packages/core/src/notify_bridge_core/notifications/dispatcher.py
@@ -12,17 +12,13 @@ from notify_bridge_core.models.events import ServiceEvent
from notify_bridge_core.templates.context import build_template_context
from notify_bridge_core.templates.renderer import render_template
+from .telegram.cache import TelegramFileCache
from .telegram.client import TelegramClient
from .webhook.client import WebhookClient
_LOGGER = logging.getLogger(__name__)
-DEFAULT_TEMPLATE = (
- '{{ added_count }} new item(s) added to '
- '{% if public_url %}
{{ collection_name }}'
- '{% else %}"{{ collection_name }}"{% endif %}.'
- '{% if people %}\nPeople: {{ people | join(", ") }}{% endif %}'
-)
+DEFAULT_TEMPLATE = '{{ event_type }}: "{{ collection_name }}"'
@dataclass
@@ -42,6 +38,15 @@ class TargetConfig:
class NotificationDispatcher:
"""Dispatches ServiceEvent notifications to configured targets."""
+ def __init__(
+ self,
+ *,
+ url_cache: TelegramFileCache | None = None,
+ asset_cache: TelegramFileCache | None = None,
+ ) -> None:
+ self._url_cache = url_cache
+ self._asset_cache = asset_cache
+
async def dispatch(
self,
event: ServiceEvent,
@@ -104,13 +109,17 @@ class NotificationDispatcher:
return {"success": False, "error": "Missing bot_token or chat_id"}
async with aiohttp.ClientSession() as session:
- client = TelegramClient(session, bot_token)
+ client = TelegramClient(
+ session, bot_token,
+ url_cache=self._url_cache,
+ asset_cache=self._asset_cache,
+ )
# Step 1: Send the text message first
text_result = await client.send_message(
chat_id=str(chat_id),
text=message,
- disable_web_page_preview=disable_preview or None,
+ disable_web_page_preview=bool(disable_preview),
)
if not text_result.get("success"):
return text_result
diff --git a/packages/core/src/notify_bridge_core/notifications/telegram/client.py b/packages/core/src/notify_bridge_core/notifications/telegram/client.py
index 61d4dbf..e5ddd8d 100644
--- a/packages/core/src/notify_bridge_core/notifications/telegram/client.py
+++ b/packages/core/src/notify_bridge_core/notifications/telegram/client.py
@@ -16,8 +16,10 @@ from .media import (
TELEGRAM_API_BASE_URL,
TELEGRAM_MAX_PHOTO_SIZE,
TELEGRAM_MAX_VIDEO_SIZE,
+ asset_id_from_cache_key,
check_photo_limits,
extract_asset_id_from_url,
+ is_asset_cache_key,
is_asset_id,
split_media_by_upload_size,
)
@@ -61,16 +63,17 @@ class TelegramClient:
if is_asset_id(url):
thumbhash = self._thumbhash_resolver(url) if self._thumbhash_resolver else None
return self._asset_cache, url, thumbhash
- asset_id = extract_asset_id_from_url(url)
- if asset_id:
- thumbhash = self._thumbhash_resolver(asset_id) if self._thumbhash_resolver else None
- return self._asset_cache, asset_id, thumbhash
+ asset_cache_key = extract_asset_id_from_url(url)
+ if asset_cache_key:
+ bare_id = asset_id_from_cache_key(asset_cache_key)
+ thumbhash = self._thumbhash_resolver(bare_id) if self._thumbhash_resolver else None
+ return self._asset_cache, asset_cache_key, thumbhash
return self._url_cache, url, None
return None, None, None
def _get_cache_for_key(self, key: str, is_asset: bool | None = None) -> TelegramFileCache | None:
if is_asset is None:
- is_asset = is_asset_id(key)
+ is_asset = is_asset_cache_key(key)
return self._asset_cache if is_asset else self._url_cache
async def send_notification(
@@ -163,8 +166,8 @@ class TelegramClient:
}
if reply_to_message_id:
payload["reply_to_message_id"] = reply_to_message_id
- if disable_web_page_preview is not None:
- payload["disable_web_page_preview"] = disable_web_page_preview
+ if disable_web_page_preview:
+ payload["link_preview_options"] = {"is_disabled": True}
try:
async with self._session.post(telegram_url, json=payload) as response:
@@ -429,9 +432,10 @@ class TelegramClient:
# Check cache
ck = custom_cache_key or extract_asset_id_from_url(url) or url
- ck_is_asset = is_asset_id(ck)
+ ck_is_asset = is_asset_cache_key(ck)
item_cache = self._get_cache_for_key(ck, ck_is_asset)
- item_thumbhash = self._thumbhash_resolver(ck) if ck_is_asset and self._thumbhash_resolver else None
+ bare_ck = asset_id_from_cache_key(ck) if ck_is_asset else ck
+ item_thumbhash = self._thumbhash_resolver(bare_ck) if ck_is_asset and self._thumbhash_resolver else None
cached = item_cache.get(ck, thumbhash=item_thumbhash) if item_cache else None
if cached and cached.get("file_id"):
diff --git a/packages/core/src/notify_bridge_core/notifications/telegram/media.py b/packages/core/src/notify_bridge_core/notifications/telegram/media.py
index 2cbacae..4a1179a 100644
--- a/packages/core/src/notify_bridge_core/notifications/telegram/media.py
+++ b/packages/core/src/notify_bridge_core/notifications/telegram/media.py
@@ -4,6 +4,7 @@ from __future__ import annotations
import re
from typing import Final
+from urllib.parse import urlparse
# Telegram constants
TELEGRAM_API_BASE_URL: Final = "https://api.telegram.org/bot"
@@ -13,6 +14,8 @@ TELEGRAM_MAX_DIMENSION_SUM: Final = 10000
# Generic UUID pattern for asset IDs
_ASSET_ID_PATTERN = re.compile(r"^[a-f0-9-]{36}$")
+# Cache key: "host:uuid" or bare "uuid"
+_ASSET_CACHE_KEY_PATTERN = re.compile(r"^(?:[^:]+:)?[a-f0-9-]{36}$")
# URL patterns to extract asset IDs (generic enough for Immich-style URLs)
_ASSET_ID_URL_PATTERNS = [
@@ -26,14 +29,26 @@ def is_asset_id(value: str) -> bool:
return bool(_ASSET_ID_PATTERN.match(value))
+def is_asset_cache_key(value: str) -> bool:
+ """Check if a string is an asset cache key (bare UUID or host:UUID)."""
+ return bool(_ASSET_CACHE_KEY_PATTERN.match(value))
+
+
+def asset_id_from_cache_key(key: str) -> str:
+ """Extract bare asset ID from a cache key (strips host: prefix if present)."""
+ idx = key.find(":")
+ return key[idx + 1:] if idx != -1 else key
+
+
def extract_asset_id_from_url(url: str) -> str | None:
- """Extract asset ID from a URL if possible."""
+ """Extract host-qualified asset cache key (host:uuid) from a URL."""
if not url:
return None
for pattern in _ASSET_ID_URL_PATTERNS:
match = pattern.search(url)
if match:
- return match.group(1)
+ host = urlparse(url).hostname or ""
+ return f"{host}:{match.group(1)}" if host else match.group(1)
return None
diff --git a/packages/core/src/notify_bridge_core/providers/immich/change_detector.py b/packages/core/src/notify_bridge_core/providers/immich/change_detector.py
index 6ccb31a..bb94c8f 100644
--- a/packages/core/src/notify_bridge_core/providers/immich/change_detector.py
+++ b/packages/core/src/notify_bridge_core/providers/immich/change_detector.py
@@ -47,14 +47,27 @@ def _asset_to_media(asset: ImmichAssetInfo, external_url: str) -> MediaAsset:
)
+def _make_base_extra(new_album: ImmichAlbumData, external_url: str) -> dict:
+ """Build the common extra dict for album events."""
+ return {
+ "album_url": f"{external_url}/albums/{new_album.id}",
+ "people": list(new_album.people),
+ "shared": new_album.shared,
+ "photo_count": new_album.photo_count,
+ "video_count": new_album.video_count,
+ "asset_count": new_album.asset_count,
+ "owner": new_album.owner,
+ }
+
+
def detect_album_changes(
old_album: ImmichAlbumData,
new_album: ImmichAlbumData,
pending_asset_ids: set[str],
provider_name: str,
external_url: str,
-) -> tuple[ServiceEvent | None, set[str]]:
- """Detect changes between two album states, producing a generic ServiceEvent.
+) -> tuple[list[ServiceEvent], set[str]]:
+ """Detect changes between two album states, producing generic ServiceEvents.
Args:
old_album: Previous album data
@@ -64,7 +77,7 @@ def detect_album_changes(
external_url: External URL for building asset URLs
Returns:
- Tuple of (ServiceEvent or None, updated pending_asset_ids)
+ Tuple of (list of ServiceEvents, updated pending_asset_ids)
"""
added_ids = new_album.asset_ids - old_album.asset_ids
removed_ids = old_album.asset_ids - new_album.asset_ids
@@ -97,47 +110,76 @@ def detect_album_changes(
sharing_changed = old_album.shared != new_album.shared
if not added_assets and not removed_ids and not name_changed and not sharing_changed:
- return None, pending
+ return [], pending
- # Determine event type
- if name_changed and not added_assets and not removed_ids and not sharing_changed:
- event_type = EventType.COLLECTION_RENAMED
- elif sharing_changed and not added_assets and not removed_ids and not name_changed:
- event_type = EventType.SHARING_CHANGED
- elif added_assets and not removed_ids and not name_changed and not sharing_changed:
- event_type = EventType.ASSETS_ADDED
- elif removed_ids and not added_assets and not name_changed and not sharing_changed:
- event_type = EventType.ASSETS_REMOVED
- else:
- event_type = EventType.ASSETS_ADDED # default for mixed changes
+ now = datetime.now(timezone.utc)
+ extra = _make_base_extra(new_album, external_url)
+ events: list[ServiceEvent] = []
- # Convert to generic MediaAssets
- media_assets = [_asset_to_media(a, external_url) for a in added_assets]
+ # Emit one event per change type detected
+ if added_assets:
+ media_assets = [_asset_to_media(a, external_url) for a in added_assets]
+ events.append(ServiceEvent(
+ event_type=EventType.ASSETS_ADDED,
+ provider_type=ServiceProviderType.IMMICH,
+ provider_name=provider_name,
+ collection_id=new_album.id,
+ collection_name=new_album.name,
+ timestamp=now,
+ added_assets=media_assets,
+ removed_asset_ids=[],
+ added_count=len(added_assets),
+ removed_count=0,
+ extra=dict(extra),
+ ))
- event = ServiceEvent(
- event_type=event_type,
- provider_type=ServiceProviderType.IMMICH,
- provider_name=provider_name,
- collection_id=new_album.id,
- collection_name=new_album.name,
- timestamp=datetime.now(timezone.utc),
- added_assets=media_assets,
- removed_asset_ids=list(removed_ids),
- added_count=len(added_assets),
- removed_count=len(removed_ids),
- old_name=old_album.name if name_changed else None,
- new_name=new_album.name if name_changed else None,
- old_shared=old_album.shared if sharing_changed else None,
- new_shared=new_album.shared if sharing_changed else None,
- extra={
- "album_url": f"{external_url}/albums/{new_album.id}",
- "people": list(new_album.people),
- "shared": new_album.shared,
- "photo_count": new_album.photo_count,
- "video_count": new_album.video_count,
- "asset_count": new_album.asset_count,
- "owner": new_album.owner,
- },
- )
+ if removed_ids:
+ events.append(ServiceEvent(
+ event_type=EventType.ASSETS_REMOVED,
+ provider_type=ServiceProviderType.IMMICH,
+ provider_name=provider_name,
+ collection_id=new_album.id,
+ collection_name=new_album.name,
+ timestamp=now,
+ added_assets=[],
+ removed_asset_ids=list(removed_ids),
+ added_count=0,
+ removed_count=len(removed_ids),
+ extra=dict(extra),
+ ))
- return event, pending
+ if name_changed:
+ events.append(ServiceEvent(
+ event_type=EventType.COLLECTION_RENAMED,
+ provider_type=ServiceProviderType.IMMICH,
+ provider_name=provider_name,
+ collection_id=new_album.id,
+ collection_name=new_album.name,
+ timestamp=now,
+ added_assets=[],
+ removed_asset_ids=[],
+ added_count=0,
+ removed_count=0,
+ old_name=old_album.name,
+ new_name=new_album.name,
+ extra=dict(extra),
+ ))
+
+ if sharing_changed:
+ events.append(ServiceEvent(
+ event_type=EventType.SHARING_CHANGED,
+ provider_type=ServiceProviderType.IMMICH,
+ provider_name=provider_name,
+ collection_id=new_album.id,
+ collection_name=new_album.name,
+ timestamp=now,
+ added_assets=[],
+ removed_asset_ids=[],
+ added_count=0,
+ removed_count=0,
+ old_shared=old_album.shared,
+ new_shared=new_album.shared,
+ extra=dict(extra),
+ ))
+
+ return events, pending
diff --git a/packages/core/src/notify_bridge_core/providers/immich/client.py b/packages/core/src/notify_bridge_core/providers/immich/client.py
index ef699cd..11433eb 100644
--- a/packages/core/src/notify_bridge_core/providers/immich/client.py
+++ b/packages/core/src/notify_bridge_core/providers/immich/client.py
@@ -30,6 +30,14 @@ class ImmichClient:
def url(self) -> str:
return self._url
+ @property
+ def external_domain(self) -> str | None:
+ return self._external_domain
+
+ @external_domain.setter
+ def external_domain(self, value: str | None) -> None:
+ self._external_domain = value
+
@property
def external_url(self) -> str:
if self._external_domain:
@@ -252,6 +260,79 @@ class ImmichClient:
pass
return []
+ async def search_metadata(
+ self,
+ query: str,
+ album_ids: list[str] | None = None,
+ limit: int = 10,
+ ) -> list[dict[str, Any]]:
+ payload: dict[str, Any] = {"originalFileName": query, "page": 1, "size": limit}
+ try:
+ async with self._session.post(
+ f"{self._url}/api/search/metadata",
+ headers=self._json_headers,
+ json=payload,
+ ) as response:
+ if response.status == 200:
+ data = await response.json()
+ items = data.get("assets", {}).get("items", [])
+ if album_ids:
+ tracked = set(album_ids)
+ items = [
+ a for a in items
+ if any(alb.get("id") in tracked for alb in a.get("albums", []))
+ ]
+ return items[:limit]
+ except aiohttp.ClientError:
+ pass
+ return []
+
+ async def search_by_person(
+ self, person_id: str, limit: int = 10
+ ) -> list[dict[str, Any]]:
+ try:
+ async with self._session.get(
+ f"{self._url}/api/people/{person_id}/assets",
+ headers=self._headers,
+ ) as response:
+ if response.status == 200:
+ data = await response.json()
+ return data[:limit] if isinstance(data, list) else []
+ except aiohttp.ClientError:
+ pass
+ return []
+
+ async def get_memories(
+ self,
+ date: str | None = None,
+ ) -> list[dict[str, Any]]:
+ """Fetch native Immich memories (On This Day).
+
+ Args:
+ date: ISO date string (e.g. "2026-03-20") to fetch memories for.
+ If None, Immich returns memories for today.
+
+ Returns a list of memory objects, each containing an ``assets`` list
+ with full ``AssetResponseDto`` items.
+ """
+ params: dict[str, str] = {}
+ if date:
+ params["for"] = date
+ try:
+ async with self._session.get(
+ f"{self._url}/api/memories",
+ headers=self._headers,
+ params=params,
+ ) as response:
+ if response.status == 200:
+ return await response.json()
+ _LOGGER.warning(
+ "Failed to fetch memories: HTTP %s", response.status
+ )
+ except aiohttp.ClientError as err:
+ _LOGGER.warning("Failed to fetch memories: %s", err)
+ return []
+
async def get_asset_thumbnail(self, asset_id: str, size: str = "preview") -> bytes | None:
try:
async with self._session.get(
diff --git a/packages/core/src/notify_bridge_core/providers/immich/provider.py b/packages/core/src/notify_bridge_core/providers/immich/provider.py
index 906e7c5..0a87ddf 100644
--- a/packages/core/src/notify_bridge_core/providers/immich/provider.py
+++ b/packages/core/src/notify_bridge_core/providers/immich/provider.py
@@ -134,7 +134,7 @@ class ImmichServiceProvider(ServiceProvider):
if ok:
await self._client.get_server_config()
if self._external_domain:
- self._client._external_domain = self._external_domain
+ self._client.external_domain = self._external_domain
self._users_cache = await self._client.get_users()
return ok
@@ -179,12 +179,12 @@ class ImmichServiceProvider(ServiceProvider):
old_album = _deserialize_album_state(album_id, prev)
pending = set(prev.get("pending_asset_ids", []))
- event, updated_pending = detect_album_changes(
+ detected_events, updated_pending = detect_album_changes(
old_album, album, pending, self._name, external_url
)
- if event:
- # Fetch shared links to enrich event with public_url
+ if detected_events:
+ # Fetch shared links to enrich events with public_url
shared_links = await self._client.get_shared_links(album_id)
public_link = None
protected_link = None
@@ -197,13 +197,13 @@ class ImmichServiceProvider(ServiceProvider):
break # prefer non-password link
ext_domain = self._external_domain or self._client.external_url
- if public_link:
- event.extra["public_url"] = f"{ext_domain}/share/{public_link.key}"
- elif protected_link:
- event.extra["protected_url"] = f"{ext_domain}/share/{protected_link.key}"
- # If no links, public_url stays absent — templates handle gracefully
+ for evt in detected_events:
+ if public_link:
+ evt.extra["public_url"] = f"{ext_domain}/share/{public_link.key}"
+ elif protected_link:
+ evt.extra["protected_url"] = f"{ext_domain}/share/{protected_link.key}"
- events.append(event)
+ events.extend(detected_events)
# Update state
state = _serialize_album_state(album)
diff --git a/packages/core/src/notify_bridge_core/templates/defaults/en/assets_added.jinja2 b/packages/core/src/notify_bridge_core/templates/defaults/en/assets_added.jinja2
index c764da8..3fa0b59 100644
--- a/packages/core/src/notify_bridge_core/templates/defaults/en/assets_added.jinja2
+++ b/packages/core/src/notify_bridge_core/templates/defaults/en/assets_added.jinja2
@@ -4,6 +4,9 @@
{%- if people %}
👤 {{ people | join(", ") }}
{%- endif %}
+{%- if public_url %}
+🔗
Album URL
+{%- endif %}
{%- if added_assets %}
{%- for asset in added_assets %}
• {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {% if asset.public_url %}
{{ asset.filename }}{% else %}{{ asset.filename }}{% endif %}
diff --git a/packages/core/src/notify_bridge_core/templates/defaults/en/memory_mode.jinja2 b/packages/core/src/notify_bridge_core/templates/defaults/en/memory_mode.jinja2
index 62d5984..5c0bce3 100644
--- a/packages/core/src/notify_bridge_core/templates/defaults/en/memory_mode.jinja2
+++ b/packages/core/src/notify_bridge_core/templates/defaults/en/memory_mode.jinja2
@@ -1,4 +1,4 @@
📅 On this day:
{%- for asset in assets %}
- • {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {{ asset.filename }} ({{ asset.created_at[:4] }})
-{%- endfor %}
+ • {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {% if asset.public_url %}
{{ asset.filename }}{% else %}{{ asset.filename }}{% endif %} ({{ asset.created_at[:4] }})
+{%- endfor %}
\ No newline at end of file
diff --git a/packages/core/src/notify_bridge_core/templates/defaults/en/periodic_summary.jinja2 b/packages/core/src/notify_bridge_core/templates/defaults/en/periodic_summary.jinja2
index 2752d01..7d791dc 100644
--- a/packages/core/src/notify_bridge_core/templates/defaults/en/periodic_summary.jinja2
+++ b/packages/core/src/notify_bridge_core/templates/defaults/en/periodic_summary.jinja2
@@ -1,5 +1,4 @@
📋 Tracked Albums Summary ({{ albums | length }} albums):
{%- for album in albums %}
- • {{ album.name }}: {{ album.asset_count }} assets
- {%- if album.url %} — {{ album.url }}{% endif %}
-{%- endfor %}
+ • {% if album.public_url %}
{{ album.name }}{% else %}{{ album.name }}{% endif %}: {{ album.asset_count }} assets
+{%- endfor %}
\ No newline at end of file
diff --git a/packages/core/src/notify_bridge_core/templates/defaults/en/scheduled_assets.jinja2 b/packages/core/src/notify_bridge_core/templates/defaults/en/scheduled_assets.jinja2
index 4008258..3749dff 100644
--- a/packages/core/src/notify_bridge_core/templates/defaults/en/scheduled_assets.jinja2
+++ b/packages/core/src/notify_bridge_core/templates/defaults/en/scheduled_assets.jinja2
@@ -1,4 +1,4 @@
-📸 Photos from "{{ album_name }}":
+📸 Photos from {% if public_url %}
{{ album_name }}{% else %}"{{ album_name }}"{% endif %}:
{%- for asset in assets %}
- • {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {{ asset.filename }}
-{%- endfor %}
+ • {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {% if asset.public_url %}
{{ asset.filename }}{% else %}{{ asset.filename }}{% endif %}
+{%- endfor %}
\ No newline at end of file
diff --git a/packages/core/src/notify_bridge_core/templates/defaults/ru/assets_added.jinja2 b/packages/core/src/notify_bridge_core/templates/defaults/ru/assets_added.jinja2
index a19ed1e..2b30ee0 100644
--- a/packages/core/src/notify_bridge_core/templates/defaults/ru/assets_added.jinja2
+++ b/packages/core/src/notify_bridge_core/templates/defaults/ru/assets_added.jinja2
@@ -4,6 +4,9 @@
{%- if people %}
👤 {{ people | join(", ") }}
{%- endif %}
+{%- if public_url %}
+🔗
Ссылка на альбом
+{%- endif %}
{%- if added_assets %}
{%- for asset in added_assets %}
• {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {% if asset.public_url %}
{{ asset.filename }}{% else %}{{ asset.filename }}{% endif %}
diff --git a/packages/core/src/notify_bridge_core/templates/defaults/ru/memory_mode.jinja2 b/packages/core/src/notify_bridge_core/templates/defaults/ru/memory_mode.jinja2
index c1ecb71..222dfbe 100644
--- a/packages/core/src/notify_bridge_core/templates/defaults/ru/memory_mode.jinja2
+++ b/packages/core/src/notify_bridge_core/templates/defaults/ru/memory_mode.jinja2
@@ -1,4 +1,4 @@
📅 В этот день:
{%- for asset in assets %}
- • {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {{ asset.filename }} ({{ asset.created_at[:4] }})
-{%- endfor %}
+ • {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {% if asset.public_url %}
{{ asset.filename }}{% else %}{{ asset.filename }}{% endif %} ({{ asset.created_at[:4] }})
+{%- endfor %}
\ No newline at end of file
diff --git a/packages/core/src/notify_bridge_core/templates/defaults/ru/periodic_summary.jinja2 b/packages/core/src/notify_bridge_core/templates/defaults/ru/periodic_summary.jinja2
index 26a9410..ac11f89 100644
--- a/packages/core/src/notify_bridge_core/templates/defaults/ru/periodic_summary.jinja2
+++ b/packages/core/src/notify_bridge_core/templates/defaults/ru/periodic_summary.jinja2
@@ -1,5 +1,4 @@
📋 Сводка альбомов ({{ albums | length }}):
{%- for album in albums %}
- • {{ album.name }}: {{ album.asset_count }} файлов
- {%- if album.url %} — {{ album.url }}{% endif %}
-{%- endfor %}
+ • {% if album.public_url %}
{{ album.name }}{% else %}{{ album.name }}{% endif %}: {{ album.asset_count }} файлов
+{%- endfor %}
\ No newline at end of file
diff --git a/packages/core/src/notify_bridge_core/templates/defaults/ru/scheduled_assets.jinja2 b/packages/core/src/notify_bridge_core/templates/defaults/ru/scheduled_assets.jinja2
index 8665fb5..04f86df 100644
--- a/packages/core/src/notify_bridge_core/templates/defaults/ru/scheduled_assets.jinja2
+++ b/packages/core/src/notify_bridge_core/templates/defaults/ru/scheduled_assets.jinja2
@@ -1,4 +1,4 @@
-📸 Фото из "{{ album_name }}":
+📸 Фото из {% if public_url %}
{{ album_name }}{% else %}"{{ album_name }}"{% endif %}:
{%- for asset in assets %}
- • {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {{ asset.filename }}
-{%- endfor %}
+ • {%- if asset.type == "VIDEO" %} 🎬{% else %} 🖼️{% endif %} {% if asset.public_url %}
{{ asset.filename }}{% else %}{{ asset.filename }}{% endif %}
+{%- endfor %}
\ No newline at end of file
diff --git a/packages/server/pyproject.toml b/packages/server/pyproject.toml
index 5df2fa2..34b2ee4 100644
--- a/packages/server/pyproject.toml
+++ b/packages/server/pyproject.toml
@@ -8,7 +8,7 @@ version = "0.1.0"
description = "Standalone Notify Bridge server — FastAPI REST API with SQLite database"
requires-python = ">=3.12"
dependencies = [
- "notify-bridge-core==0.1.0",
+ "notify-bridge-core>=0.1.0",
"fastapi>=0.115",
"uvicorn[standard]>=0.32",
"sqlmodel>=0.0.22",
@@ -18,7 +18,6 @@ dependencies = [
"apscheduler>=3.10,<4",
"aiohttp>=3.9",
"pydantic-settings>=2.0",
- "anthropic>=0.42",
]
[project.optional-dependencies]
diff --git a/packages/server/src/notify_bridge_server/api/app_settings.py b/packages/server/src/notify_bridge_server/api/app_settings.py
new file mode 100644
index 0000000..dba03f4
--- /dev/null
+++ b/packages/server/src/notify_bridge_server/api/app_settings.py
@@ -0,0 +1,123 @@
+"""App-level settings API (admin only)."""
+
+import logging
+import os
+
+from fastapi import APIRouter, Depends
+from pydantic import BaseModel
+from sqlmodel import select
+from sqlmodel.ext.asyncio.session import AsyncSession
+
+from ..auth.dependencies import get_current_user
+from ..database.engine import get_session
+from ..database.models import AppSetting, TelegramBot, User
+
+_LOGGER = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/api/settings", tags=["settings"])
+
+# Keys exposed to the frontend with their env-var fallbacks
+_SETTING_KEYS = {
+ "external_url": "NOTIFY_BRIDGE_EXTERNAL_URL",
+ "telegram_webhook_secret": "NOTIFY_BRIDGE_TELEGRAM_WEBHOOK_SECRET",
+ "telegram_cache_ttl_hours": None, # no env fallback, default 48
+}
+
+_DEFAULTS = {
+ "external_url": "",
+ "telegram_webhook_secret": "",
+ "telegram_cache_ttl_hours": "48",
+}
+
+
+async def get_setting(session: AsyncSession, key: str) -> str:
+ """Read a setting from DB, falling back to env var then default."""
+ row = await session.get(AppSetting, key)
+ if row and row.value:
+ return row.value
+ env_key = _SETTING_KEYS.get(key)
+ if env_key:
+ env_val = os.environ.get(env_key, "")
+ if env_val:
+ return env_val
+ return _DEFAULTS.get(key, "")
+
+
+class SettingsUpdate(BaseModel):
+ external_url: str | None = None
+ telegram_webhook_secret: str | None = None
+ telegram_cache_ttl_hours: str | None = None
+
+
+@router.get("")
+async def get_settings(
+ user: User = Depends(get_current_user),
+ session: AsyncSession = Depends(get_session),
+):
+ """Return all app settings."""
+ result = {}
+ for key in _SETTING_KEYS:
+ result[key] = await get_setting(session, key)
+ return result
+
+
+@router.put("")
+async def update_settings(
+ body: SettingsUpdate,
+ user: User = Depends(get_current_user),
+ session: AsyncSession = Depends(get_session),
+):
+ """Update app settings (admin). Re-registers webhooks when base URL changes."""
+ old_base_url = await get_setting(session, "external_url")
+ old_secret = await get_setting(session, "telegram_webhook_secret")
+
+ for key in _SETTING_KEYS:
+ value = getattr(body, key, None)
+ if value is None:
+ continue
+ row = await session.get(AppSetting, key)
+ if row:
+ row.value = value
+ else:
+ row = AppSetting(key=key, value=value)
+ session.add(row)
+ await session.commit()
+
+ new_base_url = await get_setting(session, "external_url")
+ new_secret = await get_setting(session, "telegram_webhook_secret")
+
+ # Update webhook secret in the webhook handler module
+ if new_secret != old_secret:
+ from ..commands.webhook import set_webhook_secret
+ set_webhook_secret(new_secret or None)
+
+ # Re-register webhooks for all bots in webhook mode if URL or secret changed
+ if new_base_url and (new_base_url != old_base_url or new_secret != old_secret):
+ await _reregister_webhooks(session, new_base_url, new_secret)
+
+ result = {}
+ for key in _SETTING_KEYS:
+ result[key] = await get_setting(session, key)
+ return result
+
+
+async def _reregister_webhooks(
+ session: AsyncSession, base_url: str, secret: str
+) -> None:
+ """Re-register webhooks for all bots in webhook mode."""
+ from ..commands.webhook import register_webhook
+
+ result = await session.exec(
+ select(TelegramBot).where(TelegramBot.update_mode == "webhook")
+ )
+ bots = result.all()
+ for bot in bots:
+ webhook_url = f"{base_url.rstrip('/')}/api/telegram/webhook/{bot.webhook_path_id}"
+ res = await register_webhook(bot.token, webhook_url, secret or None)
+ if res.get("success"):
+ _LOGGER.info("Re-registered webhook for bot %d (%s)", bot.id, bot.name)
+ else:
+ _LOGGER.warning(
+ "Failed to re-register webhook for bot %d: %s",
+ bot.id, res.get("error"),
+ )
diff --git a/packages/server/src/notify_bridge_server/api/providers.py b/packages/server/src/notify_bridge_server/api/providers.py
index 1a1c3f9..a3215b9 100644
--- a/packages/server/src/notify_bridge_server/api/providers.py
+++ b/packages/server/src/notify_bridge_server/api/providers.py
@@ -1,5 +1,7 @@
"""Service provider management API routes."""
+import logging
+
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlmodel import select
@@ -11,6 +13,9 @@ import aiohttp
from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import ServiceProvider, User
+from ..services import make_immich_provider
+
+_LOGGER = logging.getLogger(__name__)
router = APIRouter(prefix="/api/providers", tags=["providers"])
@@ -63,11 +68,8 @@ async def create_provider(
config = body.config
async with aiohttp.ClientSession() as http_session:
immich = ImmichServiceProvider(
- http_session,
- config.get("url", ""),
- config.get("api_key", ""),
- config.get("external_domain"),
- body.name,
+ http_session, config.get("url", ""), config.get("api_key", ""),
+ config.get("external_domain"), body.name,
)
test_result = await immich.test_connection()
if not test_result.get("ok"):
@@ -124,16 +126,8 @@ async def update_provider(
# Re-validate connection when config changes for known provider types
if config_changed and provider.type == "immich":
try:
- from notify_bridge_core.providers.immich import ImmichServiceProvider
- config = provider.config
async with aiohttp.ClientSession() as http_session:
- immich = ImmichServiceProvider(
- http_session,
- config.get("url", ""),
- config.get("api_key", ""),
- config.get("external_domain"),
- provider.name,
- )
+ immich = make_immich_provider(http_session, provider)
test_result = await immich.test_connection()
if not test_result.get("ok"):
raise HTTPException(
@@ -176,16 +170,8 @@ async def test_provider(
provider = await _get_user_provider(session, provider_id, user.id)
if provider.type == "immich":
- from notify_bridge_core.providers.immich import ImmichServiceProvider
- config = provider.config
async with aiohttp.ClientSession() as http_session:
- immich = ImmichServiceProvider(
- http_session,
- config.get("url", ""),
- config.get("api_key", ""),
- config.get("external_domain"),
- provider.name,
- )
+ immich = make_immich_provider(http_session, provider)
return await immich.test_connection()
return {"ok": False, "message": f"Unknown provider type: {provider.type}"}
@@ -201,16 +187,8 @@ async def list_collections(
provider = await _get_user_provider(session, provider_id, user.id)
if provider.type == "immich":
- from notify_bridge_core.providers.immich import ImmichServiceProvider
- config = provider.config
async with aiohttp.ClientSession() as http_session:
- immich = ImmichServiceProvider(
- http_session,
- config.get("url", ""),
- config.get("api_key", ""),
- config.get("external_domain"),
- provider.name,
- )
+ immich = make_immich_provider(http_session, provider)
return await immich.list_collections()
return []
@@ -227,16 +205,8 @@ async def get_album_shared_links(
provider = await _get_user_provider(session, provider_id, user.id)
if provider.type == "immich":
- from notify_bridge_core.providers.immich import ImmichServiceProvider
- config = provider.config
async with aiohttp.ClientSession() as http_session:
- immich = ImmichServiceProvider(
- http_session,
- config.get("url", ""),
- config.get("api_key", ""),
- config.get("external_domain"),
- provider.name,
- )
+ immich = make_immich_provider(http_session, provider)
links = await immich.client.get_shared_links(album_id)
return [
{
@@ -263,16 +233,8 @@ async def create_album_shared_link(
provider = await _get_user_provider(session, provider_id, user.id)
if provider.type == "immich":
- from notify_bridge_core.providers.immich import ImmichServiceProvider
- config = provider.config
async with aiohttp.ClientSession() as http_session:
- immich = ImmichServiceProvider(
- http_session,
- config.get("url", ""),
- config.get("api_key", ""),
- config.get("external_domain"),
- provider.name,
- )
+ immich = make_immich_provider(http_session, provider)
success = await immich.client.create_shared_link(album_id)
if success:
return {"success": True}
diff --git a/packages/server/src/notify_bridge_server/api/status.py b/packages/server/src/notify_bridge_server/api/status.py
index 34e1d36..c8a4cf3 100644
--- a/packages/server/src/notify_bridge_server/api/status.py
+++ b/packages/server/src/notify_bridge_server/api/status.py
@@ -108,7 +108,7 @@ async def get_event_chart(
select(
day_col.label("day"),
EventLog.event_type,
- func.count().label("count"),
+ func.count().label("total"),
)
.join(Tracker, EventLog.tracker_id == Tracker.id)
.where(Tracker.user_id == user.id, EventLog.created_at >= cutoff)
@@ -118,13 +118,13 @@ async def get_event_chart(
rows = (await session.exec(query)).all()
- # Build a dict: { "2026-03-15": { "assets_added": 3, ... }, ... }
+ # Build a dict: { "2026-03-15": { "assets_added": 18, ... }, ... }
by_day: dict[str, dict[str, int]] = {}
for row in rows:
day_str = str(row.day)
if day_str not in by_day:
by_day[day_str] = {}
- by_day[day_str][row.event_type] = row.count
+ by_day[day_str][row.event_type] = row.total
# Fill in missing days so the frontend gets a continuous series
result = []
diff --git a/packages/server/src/notify_bridge_server/api/targets.py b/packages/server/src/notify_bridge_server/api/targets.py
index 0ec78b5..c32d46b 100644
--- a/packages/server/src/notify_bridge_server/api/targets.py
+++ b/packages/server/src/notify_bridge_server/api/targets.py
@@ -1,5 +1,7 @@
"""Notification target management API routes."""
+import logging
+
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel
from sqlmodel import select
@@ -9,6 +11,9 @@ from typing import Any
from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import NotificationTarget, TelegramBot, TelegramChat, TrackerTarget, User
+from ..services.notifier import send_test_notification
+
+_LOGGER = logging.getLogger(__name__)
router = APIRouter(prefix="/api/targets", tags=["targets"])
@@ -137,7 +142,6 @@ async def test_target(
):
"""Send a test notification to a target."""
target = await _get_user_target(session, target_id, user.id)
- from ..services.notifier import send_test_notification
result = await send_test_notification(target, locale=locale)
return result
diff --git a/packages/server/src/notify_bridge_server/api/telegram_bots.py b/packages/server/src/notify_bridge_server/api/telegram_bots.py
index 20a66c4..400c6c9 100644
--- a/packages/server/src/notify_bridge_server/api/telegram_bots.py
+++ b/packages/server/src/notify_bridge_server/api/telegram_bots.py
@@ -1,5 +1,7 @@
"""Telegram bot management API routes."""
+import logging
+
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel
from sqlmodel import select
@@ -10,8 +12,15 @@ import aiohttp
from notify_bridge_core.notifications.telegram.media import TELEGRAM_API_BASE_URL
from ..auth.dependencies import get_current_user
+from ..commands.handler import register_commands_with_telegram
+from ..commands.webhook import register_webhook, unregister_webhook
from ..database.engine import get_session
-from ..database.models import TelegramBot, TelegramChat, User
+from ..database.models import AppSetting, TelegramBot, TelegramChat, User
+from ..services.notifier import _get_test_message
+from ..services.telegram_poller import schedule_bot_polling, unschedule_bot_polling
+from .app_settings import get_setting
+
+_LOGGER = logging.getLogger(__name__)
router = APIRouter(prefix="/api/telegram-bots", tags=["telegram-bots"])
@@ -24,6 +33,8 @@ class BotCreate(BaseModel):
class BotUpdate(BaseModel):
name: str | None = None
icon: str | None = None
+ update_mode: str | None = None
+ commands_config: dict | None = None
@router.get("")
@@ -69,12 +80,41 @@ async def update_bot(
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
- """Update a bot's display name and icon."""
+ """Update a bot's display name, icon, commands config, and update mode."""
bot = await _get_user_bot(session, bot_id, user.id)
if body.name is not None:
bot.name = body.name
if body.icon is not None:
bot.icon = body.icon
+ if body.commands_config is not None:
+ bot.commands_config = body.commands_config
+
+ # Handle mode switching
+ if body.update_mode is not None and body.update_mode != bot.update_mode:
+ if body.update_mode == "webhook":
+ # Validate and register webhook BEFORE stopping polling
+ base_url = await get_setting(session, "external_url")
+ if not base_url:
+ raise HTTPException(
+ status_code=400,
+ detail="Cannot switch to webhook: external domain URL not configured. Set it in Settings.",
+ )
+ webhook_url = f"{base_url.rstrip('/')}/api/telegram/webhook/{bot.webhook_path_id}"
+ secret = await get_setting(session, "telegram_webhook_secret")
+ result = await register_webhook(bot.token, webhook_url, secret or None)
+ if not result.get("success"):
+ raise HTTPException(
+ status_code=400,
+ detail=f"Webhook registration failed: {result.get('error', 'unknown')}",
+ )
+ # Webhook registered successfully — now stop polling
+ unschedule_bot_polling(bot.id)
+ elif body.update_mode == "polling":
+ # Switching to polling: unregister webhook, start polling
+ await unregister_webhook(bot.token)
+ schedule_bot_polling(bot.id)
+ bot.update_mode = body.update_mode
+
session.add(bot)
await session.commit()
await session.refresh(bot)
@@ -173,6 +213,75 @@ async def discover_chats(
return [_chat_response(c) for c in result.all()]
+@router.post("/{bot_id}/sync-commands")
+async def sync_commands(
+ bot_id: int,
+ user: User = Depends(get_current_user),
+ session: AsyncSession = Depends(get_session),
+):
+ """Register enabled commands with Telegram BotFather API."""
+ bot = await _get_user_bot(session, bot_id, user.id)
+ success = await register_commands_with_telegram(bot)
+ return {"success": success}
+
+
+@router.post("/{bot_id}/webhook/register")
+async def register_bot_webhook(
+ bot_id: int,
+ user: User = Depends(get_current_user),
+ session: AsyncSession = Depends(get_session),
+):
+ """Register Telegram webhook for this bot."""
+ bot = await _get_user_bot(session, bot_id, user.id)
+ base_url = await get_setting(session, "external_url")
+ if not base_url:
+ return {"success": False, "error": "External domain URL not configured. Set it in Telegram Settings."}
+ webhook_url = f"{base_url.rstrip('/')}/api/telegram/webhook/{bot.webhook_path_id}"
+ secret = await get_setting(session, "telegram_webhook_secret")
+ result = await register_webhook(bot.token, webhook_url, secret or None)
+ if not result.get("success"):
+ return result
+ # Verify with getWebhookInfo
+ info = await _get_webhook_info(bot.token)
+ if info and info.get("url") == webhook_url:
+ result["verified"] = True
+ result["webhook_url"] = webhook_url
+ else:
+ result["verified"] = False
+ result["warning"] = "Webhook set but verification failed"
+ return result
+
+
+@router.post("/{bot_id}/webhook/unregister")
+async def unregister_bot_webhook(
+ bot_id: int,
+ user: User = Depends(get_current_user),
+ session: AsyncSession = Depends(get_session),
+):
+ """Unregister Telegram webhook for this bot."""
+ bot = await _get_user_bot(session, bot_id, user.id)
+ result = await unregister_webhook(bot.token)
+ return result
+
+
+@router.get("/{bot_id}/webhook/status")
+async def get_webhook_status(
+ bot_id: int,
+ user: User = Depends(get_current_user),
+ session: AsyncSession = Depends(get_session),
+):
+ """Get current webhook status from Telegram."""
+ bot = await _get_user_bot(session, bot_id, user.id)
+ info = await _get_webhook_info(bot.token)
+ return {
+ "url": info.get("url", "") if info else "",
+ "has_custom_certificate": info.get("has_custom_certificate", False) if info else False,
+ "pending_update_count": info.get("pending_update_count", 0) if info else 0,
+ "last_error_message": info.get("last_error_message", "") if info else "",
+ "last_error_date": info.get("last_error_date", 0) if info else 0,
+ }
+
+
@router.post("/{bot_id}/chats/{chat_id}/test")
async def test_chat(
bot_id: int,
@@ -182,8 +291,6 @@ async def test_chat(
session: AsyncSession = Depends(get_session),
):
"""Send a test message to a chat via the bot."""
- from ..services.notifier import _get_test_message
-
bot = await _get_user_bot(session, bot_id, user.id)
message = _get_test_message(locale, "telegram")
try:
@@ -222,6 +329,19 @@ async def delete_chat(
# --- Helpers ---
+async def _get_webhook_info(token: str) -> dict | None:
+ """Call Telegram getWebhookInfo to check current webhook state."""
+ try:
+ async with aiohttp.ClientSession() as http:
+ async with http.get(f"{TELEGRAM_API_BASE_URL}{token}/getWebhookInfo") as resp:
+ data = await resp.json()
+ if data.get("ok"):
+ return data.get("result", {})
+ except aiohttp.ClientError:
+ pass
+ return None
+
+
async def _get_me(token: str) -> dict | None:
"""Call Telegram getMe to validate token and get bot info."""
try:
@@ -281,6 +401,9 @@ def _bot_response(b: TelegramBot) -> dict:
"icon": b.icon,
"bot_username": b.bot_username,
"bot_id": b.bot_id,
+ "webhook_path_id": b.webhook_path_id,
+ "update_mode": b.update_mode or "polling",
+ "commands_config": b.commands_config or {},
"token_preview": f"{b.token[:8]}...{b.token[-4:]}" if len(b.token) > 12 else "***",
"created_at": b.created_at.isoformat(),
}
@@ -293,38 +416,6 @@ async def _get_user_bot(session: AsyncSession, bot_id: int, user_id: int) -> Tel
return bot
-async def save_chat_from_webhook(
- session: AsyncSession, bot_id: int, chat_data: dict
-) -> None:
- """Save or update a chat entry from an incoming webhook message.
- Called by the webhook handler to auto-persist chats.
- """
- chat_id = str(chat_data.get("id", ""))
- if not chat_id:
- return
-
- result = await session.exec(
- select(TelegramChat).where(
- TelegramChat.bot_id == bot_id,
- TelegramChat.chat_id == chat_id,
- )
- )
- existing = result.first()
-
- title = chat_data.get("title") or (
- chat_data.get("first_name", "") + (" " + chat_data.get("last_name", "")).strip()
- )
-
- if existing:
- existing.title = title
- existing.username = chat_data.get("username", existing.username)
- session.add(existing)
- else:
- session.add(TelegramChat(
- bot_id=bot_id,
- chat_id=chat_id,
- title=title,
- chat_type=chat_data.get("type", "private"),
- username=chat_data.get("username", ""),
- ))
+# Re-export for backward compatibility
+from ..services.telegram import save_chat_from_webhook # noqa: F401
diff --git a/packages/server/src/notify_bridge_server/api/template_configs.py b/packages/server/src/notify_bridge_server/api/template_configs.py
index 88613ff..efbdb8c 100644
--- a/packages/server/src/notify_bridge_server/api/template_configs.py
+++ b/packages/server/src/notify_bridge_server/api/template_configs.py
@@ -1,5 +1,7 @@
"""Template configuration CRUD API routes."""
+import logging
+
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlmodel import select
@@ -11,103 +13,12 @@ from jinja2 import TemplateSyntaxError, UndefinedError, StrictUndefined
from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import TemplateConfig, User
+from ..services.sample_context import _SAMPLE_CONTEXT
+
+_LOGGER = logging.getLogger(__name__)
router = APIRouter(prefix="/api/template-configs", tags=["template-configs"])
-# Sample asset matching what build_asset_detail() actually returns
-_SAMPLE_ASSET = {
- "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
- "filename": "IMG_001.jpg",
- "type": "IMAGE",
- "created_at": "2026-03-19T10:30:00",
- "owner": "Alice",
- "owner_id": "user-uuid-1",
- "description": "Family picnic",
- "people": ["Alice", "Bob"],
- "is_favorite": True,
- "rating": 5,
- "latitude": 48.8566,
- "longitude": 2.3522,
- "city": "Paris",
- "state": "Ile-de-France",
- "country": "France",
- "url": "https://immich.example.com/photos/abc123",
- "public_url": "https://immich.example.com/share/abc123/photos/a1b2c3d4-e5f6-7890-abcd-ef1234567890",
- "download_url": "https://immich.example.com/api/assets/abc123/original",
- "photo_url": "https://immich.example.com/api/assets/abc123/thumbnail",
-}
-
-_SAMPLE_VIDEO_ASSET = {
- **_SAMPLE_ASSET,
- "id": "d4e5f6a7-b8c9-0123-defg-456789abcdef",
- "filename": "VID_002.mp4",
- "type": "VIDEO",
- "is_favorite": False,
- "rating": None,
- "photo_url": None,
- "public_url": "https://immich.example.com/share/abc123/photos/d4e5f6a7-b8c9-0123-defg-456789abcdef",
- "playback_url": "https://immich.example.com/api/assets/def456/video",
-}
-
-_SAMPLE_COLLECTION = {
- "name": "Family Photos",
- "url": "https://immich.example.com/share/abc123",
- "public_url": "https://immich.example.com/share/abc123",
- "asset_count": 42,
- "shared": True,
-}
-
-# Full context covering ALL possible template variables
-_SAMPLE_CONTEXT = {
- # Core event fields (always present)
- "collection_id": "b2eeeaa4-bba0-477a-a06f-5cb9e21818e8",
- "collection_name": "Family Photos",
- "collection_url": "https://immich.example.com/share/abc123",
- "event_type": "assets_added",
- "timestamp": "2026-03-19T10:30:00+00:00",
- "service_name": "Immich",
- "service_type": "immich",
- # Immich aliases (always present alongside collection_*)
- "album_name": "Family Photos",
- "album_id": "b2eeeaa4-bba0-477a-a06f-5cb9e21818e8",
- "album_url": "https://immich.example.com/share/abc123",
- "old_album_name": "Old Album",
- "new_album_name": "New Album",
- "change_type": "assets_added",
- "added_count": 3,
- "removed_count": 1,
- "added_assets": [_SAMPLE_ASSET, _SAMPLE_VIDEO_ASSET],
- "removed_assets": ["asset-id-1", "asset-id-2"],
- "people": ["Alice", "Bob"],
- "shared": True,
- "target_type": "telegram",
- "has_videos": True,
- "has_photos": True,
- # Rename fields (always present, empty for non-rename events)
- "old_name": "Old Album",
- "new_name": "New Album",
- "old_shared": False,
- "new_shared": True,
- # Public share URLs (may be empty if no shared link exists)
- "public_url": "https://immich.example.com/share/abc123",
- "protected_url": "",
- "album_url": "https://immich.example.com/albums/b2eeeaa4",
- # Common date/location (set when all assets share the same value)
- "common_date": "19.03.2026",
- "common_location": "Paris, France",
- # Date format strings (from template config)
- "date_format": "%d.%m.%Y, %H:%M UTC",
- "date_only_format": "%d.%m.%Y",
- # Scheduled/periodic variables (for those templates)
- "collections": [_SAMPLE_COLLECTION, {**_SAMPLE_COLLECTION, "name": "Vacation 2025", "asset_count": 120}],
- "albums": [_SAMPLE_COLLECTION, {**_SAMPLE_COLLECTION, "name": "Vacation 2025", "asset_count": 120}],
- "assets": [_SAMPLE_ASSET, {**_SAMPLE_ASSET, "id": "x1y2z3", "filename": "IMG_002.jpg", "city": "London", "country": "UK", "public_url": "https://immich.example.com/share/abc123/photos/x1y2z3"}],
- "date": "2026-03-19",
- "photo_count": 30,
- "video_count": 5,
- "owner": "Alice",
-}
-
class TemplateConfigCreate(BaseModel):
provider_type: str
@@ -335,6 +246,32 @@ async def preview_config(
raise HTTPException(status_code=400, detail=f"Template error: {e}")
+class DateFormatPreviewRequest(BaseModel):
+ date_format: str = "%d.%m.%Y, %H:%M UTC"
+ date_only_format: str = "%d.%m.%Y"
+
+
+@router.post("/preview-date-format")
+async def preview_date_format(
+ body: DateFormatPreviewRequest,
+ user: User = Depends(get_current_user),
+):
+ """Preview what date/datetime formats look like with sample data."""
+ from datetime import datetime, timezone
+ sample_dt = datetime(2026, 3, 19, 14, 30, 0, tzinfo=timezone.utc)
+ sample_date = datetime(2026, 3, 19)
+ result: dict[str, str | None] = {}
+ for key, fmt, sample in [
+ ("date_format", body.date_format, sample_dt),
+ ("date_only_format", body.date_only_format, sample_date),
+ ]:
+ try:
+ result[key] = sample.strftime(fmt)
+ except (ValueError, TypeError):
+ result[key] = None
+ return result
+
+
class PreviewRequest(BaseModel):
template: str
target_type: str = "telegram" # "telegram" or "webhook"
diff --git a/packages/server/src/notify_bridge_server/api/tracker_targets.py b/packages/server/src/notify_bridge_server/api/tracker_targets.py
index b4e9a78..75334ff 100644
--- a/packages/server/src/notify_bridge_server/api/tracker_targets.py
+++ b/packages/server/src/notify_bridge_server/api/tracker_targets.py
@@ -1,5 +1,6 @@
"""Tracker-Target link management API routes."""
+import logging
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query, status
@@ -18,6 +19,9 @@ from ..database.models import (
TrackingConfig,
User,
)
+from ..services.notifier import send_real_data_notification, send_test_notification
+
+_LOGGER = logging.getLogger(__name__)
router = APIRouter(prefix="/api/trackers/{tracker_id}/targets", tags=["tracker-targets"])
@@ -176,7 +180,6 @@ async def test_tracker_target(
raise HTTPException(status_code=404, detail="Target not found")
if test_type == "basic":
- from ..services.notifier import send_test_notification
r = await send_test_notification(target, locale=locale)
return {"target": target.name, **r}
@@ -199,8 +202,14 @@ async def test_tracker_target(
provider_config = dict(provider.config)
collection_ids = list(tracker.collection_ids or [])
+ # Load tracking config to get memory_source
+ memory_source = "albums"
+ if tt.tracking_config_id:
+ tracking_config = await session.get(TrackingConfig, tt.tracking_config_id)
+ if tracking_config:
+ memory_source = tracking_config.memory_source or "albums"
+
# Fetch real data from provider
- from ..services.notifier import send_real_data_notification
r = await send_real_data_notification(
target=target,
template_str=template_str,
@@ -209,7 +218,8 @@ async def test_tracker_target(
provider_config=provider_config,
collection_ids=collection_ids,
date_format=template_config.date_format if template_config else "%d.%m.%Y, %H:%M UTC",
- date_only_format=template_config.date_only_format if template_config and hasattr(template_config, "date_only_format") else "%d.%m.%Y",
+ date_only_format=template_config.date_only_format if template_config and template_config.date_only_format else "%d.%m.%Y",
+ memory_source=memory_source,
)
return {"target": target.name, **r}
diff --git a/packages/server/src/notify_bridge_server/api/trackers.py b/packages/server/src/notify_bridge_server/api/trackers.py
index 1a67301..887159b 100644
--- a/packages/server/src/notify_bridge_server/api/trackers.py
+++ b/packages/server/src/notify_bridge_server/api/trackers.py
@@ -1,5 +1,7 @@
"""Tracker management API routes."""
+import logging
+
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel
from sqlmodel import select
@@ -9,13 +11,17 @@ from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import (
EventLog,
- NotificationTarget,
ServiceProvider,
Tracker,
TrackerState,
TrackerTarget,
User,
)
+from ..services.scheduler import schedule_tracker, unschedule_tracker
+from ..services.watcher import check_tracker
+from .tracker_targets import _tt_response
+
+_LOGGER = logging.getLogger(__name__)
router = APIRouter(prefix="/api/trackers", tags=["trackers"])
@@ -66,7 +72,6 @@ async def create_tracker(
await session.commit()
await session.refresh(tracker)
if tracker.enabled:
- from ..services.scheduler import schedule_tracker
await schedule_tracker(tracker.id, tracker.scan_interval)
return await _tracker_response(session, tracker)
@@ -94,7 +99,6 @@ async def update_tracker(
session.add(tracker)
await session.commit()
await session.refresh(tracker)
- from ..services.scheduler import schedule_tracker, unschedule_tracker
if tracker.enabled:
await schedule_tracker(tracker.id, tracker.scan_interval)
else:
@@ -130,7 +134,6 @@ async def delete_tracker(
session.add(el)
await session.delete(tracker)
await session.commit()
- from ..services.scheduler import unschedule_tracker
await unschedule_tracker(tracker_id)
@@ -141,71 +144,10 @@ async def trigger_tracker(
session: AsyncSession = Depends(get_session),
):
tracker = await _get_user_tracker(session, tracker_id, user.id)
- from ..services.watcher import check_tracker
result = await check_tracker(tracker.id)
return {"triggered": True, "result": result}
-@router.post("/{tracker_id}/test-periodic")
-async def test_periodic(
- tracker_id: int,
- user: User = Depends(get_current_user),
- session: AsyncSession = Depends(get_session),
-):
- """Send a test periodic summary notification using actual templates."""
- from ..services.notifier import send_test_template_notification
- from ..database.models import TemplateConfig
- tracker = await _get_user_tracker(session, tracker_id, user.id)
- result = await session.exec(
- select(TrackerTarget).where(
- TrackerTarget.tracker_id == tracker.id,
- TrackerTarget.enabled == True,
- )
- )
- results = []
- for tt in result.all():
- target = await session.get(NotificationTarget, tt.target_id)
- if not target:
- continue
- template_config = None
- if tt.template_config_id:
- template_config = await session.get(TemplateConfig, tt.template_config_id)
- template_str = (template_config.periodic_summary_message if template_config else "") or ""
- r = await send_test_template_notification(target, "periodic_summary", template_str)
- results.append({"target": target.name, **r})
- return {"test": "periodic_summary", "results": results}
-
-
-@router.post("/{tracker_id}/test-memory")
-async def test_memory(
- tracker_id: int,
- user: User = Depends(get_current_user),
- session: AsyncSession = Depends(get_session),
-):
- """Send a test memory/on-this-day notification using actual templates."""
- from ..services.notifier import send_test_template_notification
- from ..database.models import TemplateConfig
- tracker = await _get_user_tracker(session, tracker_id, user.id)
- result = await session.exec(
- select(TrackerTarget).where(
- TrackerTarget.tracker_id == tracker.id,
- TrackerTarget.enabled == True,
- )
- )
- results = []
- for tt in result.all():
- target = await session.get(NotificationTarget, tt.target_id)
- if not target:
- continue
- template_config = None
- if tt.template_config_id:
- template_config = await session.get(TemplateConfig, tt.template_config_id)
- template_str = (template_config.memory_mode_message if template_config else "") or ""
- r = await send_test_template_notification(target, "memory_mode", template_str)
- results.append({"target": target.name, **r})
- return {"test": "memory_mode", "results": results}
-
-
@router.get("/{tracker_id}/history")
async def tracker_history(
tracker_id: int,
@@ -238,23 +180,7 @@ async def _tracker_response(session: AsyncSession, t: Tracker) -> dict:
result = await session.exec(
select(TrackerTarget).where(TrackerTarget.tracker_id == t.id)
)
- tracker_targets = []
- for tt in result.all():
- target = await session.get(NotificationTarget, tt.target_id)
- tracker_targets.append({
- "id": tt.id,
- "target_id": tt.target_id,
- "target_name": target.name if target else None,
- "target_type": target.type if target else None,
- "target_icon": target.icon if target else None,
- "tracking_config_id": tt.tracking_config_id,
- "template_config_id": tt.template_config_id,
- "enabled": tt.enabled,
- "quiet_hours_start": tt.quiet_hours_start,
- "quiet_hours_end": tt.quiet_hours_end,
- "commands_config": tt.commands_config,
- "created_at": tt.created_at.isoformat(),
- })
+ tracker_targets = [await _tt_response(session, tt) for tt in result.all()]
return {
"id": t.id,
diff --git a/packages/server/src/notify_bridge_server/api/tracking_configs.py b/packages/server/src/notify_bridge_server/api/tracking_configs.py
index b15349c..eb4c6cf 100644
--- a/packages/server/src/notify_bridge_server/api/tracking_configs.py
+++ b/packages/server/src/notify_bridge_server/api/tracking_configs.py
@@ -1,5 +1,7 @@
"""Tracking configuration CRUD API routes."""
+import logging
+
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlmodel import select
@@ -9,6 +11,8 @@ from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import TrackingConfig, User
+_LOGGER = logging.getLogger(__name__)
+
router = APIRouter(prefix="/api/tracking-configs", tags=["tracking-configs"])
@@ -43,6 +47,7 @@ class TrackingConfigCreate(BaseModel):
scheduled_order_by: str = "random"
scheduled_order: str = "descending"
memory_enabled: bool = False
+ memory_source: str = "albums"
memory_times: str = "09:00"
memory_collection_mode: str = "combined"
memory_limit: int = 10
@@ -81,6 +86,7 @@ class TrackingConfigUpdate(BaseModel):
scheduled_order_by: str | None = None
scheduled_order: str | None = None
memory_enabled: bool | None = None
+ memory_source: str | None = None
memory_times: str | None = None
memory_collection_mode: str | None = None
memory_limit: int | None = None
diff --git a/packages/server/src/notify_bridge_server/api/users.py b/packages/server/src/notify_bridge_server/api/users.py
index 65387fb..480e268 100644
--- a/packages/server/src/notify_bridge_server/api/users.py
+++ b/packages/server/src/notify_bridge_server/api/users.py
@@ -1,5 +1,7 @@
"""User management API routes (admin only)."""
+import logging
+
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlmodel import select
@@ -11,6 +13,8 @@ from ..auth.dependencies import require_admin
from ..database.engine import get_session
from ..database.models import User
+_LOGGER = logging.getLogger(__name__)
+
router = APIRouter(prefix="/api/users", tags=["users"])
diff --git a/packages/server/src/notify_bridge_server/commands/__init__.py b/packages/server/src/notify_bridge_server/commands/__init__.py
new file mode 100644
index 0000000..b40ff45
--- /dev/null
+++ b/packages/server/src/notify_bridge_server/commands/__init__.py
@@ -0,0 +1 @@
+"""Telegram bot commands package."""
diff --git a/packages/server/src/notify_bridge_server/commands/__pycache__/__init__.cpython-313.pyc b/packages/server/src/notify_bridge_server/commands/__pycache__/__init__.cpython-313.pyc
new file mode 100644
index 0000000..6c43a2d
Binary files /dev/null and b/packages/server/src/notify_bridge_server/commands/__pycache__/__init__.cpython-313.pyc differ
diff --git a/packages/server/src/notify_bridge_server/commands/__pycache__/handler.cpython-313.pyc b/packages/server/src/notify_bridge_server/commands/__pycache__/handler.cpython-313.pyc
new file mode 100644
index 0000000..73ce61a
Binary files /dev/null and b/packages/server/src/notify_bridge_server/commands/__pycache__/handler.cpython-313.pyc differ
diff --git a/packages/server/src/notify_bridge_server/commands/__pycache__/parser.cpython-313.pyc b/packages/server/src/notify_bridge_server/commands/__pycache__/parser.cpython-313.pyc
new file mode 100644
index 0000000..2009d9c
Binary files /dev/null and b/packages/server/src/notify_bridge_server/commands/__pycache__/parser.cpython-313.pyc differ
diff --git a/packages/server/src/notify_bridge_server/commands/__pycache__/registry.cpython-313.pyc b/packages/server/src/notify_bridge_server/commands/__pycache__/registry.cpython-313.pyc
new file mode 100644
index 0000000..f9fc3f3
Binary files /dev/null and b/packages/server/src/notify_bridge_server/commands/__pycache__/registry.cpython-313.pyc differ
diff --git a/packages/server/src/notify_bridge_server/commands/__pycache__/webhook.cpython-313.pyc b/packages/server/src/notify_bridge_server/commands/__pycache__/webhook.cpython-313.pyc
new file mode 100644
index 0000000..523de8c
Binary files /dev/null and b/packages/server/src/notify_bridge_server/commands/__pycache__/webhook.cpython-313.pyc differ
diff --git a/packages/server/src/notify_bridge_server/commands/handler.py b/packages/server/src/notify_bridge_server/commands/handler.py
new file mode 100644
index 0000000..3764f84
--- /dev/null
+++ b/packages/server/src/notify_bridge_server/commands/handler.py
@@ -0,0 +1,615 @@
+"""Telegram bot command handler — implements all /commands."""
+
+from __future__ import annotations
+
+import logging
+import random as rng
+import time
+from datetime import datetime, timezone
+from typing import Any
+
+import aiohttp
+from sqlmodel import select
+from sqlmodel.ext.asyncio.session import AsyncSession
+
+from notify_bridge_core.notifications.telegram.media import TELEGRAM_API_BASE_URL
+from ..database.engine import get_engine
+from ..services import make_immich_provider
+from ..database.models import (
+ EventLog,
+ NotificationTarget,
+ ServiceProvider,
+ TelegramBot,
+ Tracker,
+ TrackerTarget,
+ TrackingConfig,
+)
+from .parser import parse_command
+from .registry import COMMAND_DESCRIPTIONS, get_rate_category
+
+_LOGGER = logging.getLogger(__name__)
+
+# Rate limit state: { (bot_id, chat_id, category): last_used_timestamp }
+_rate_limits: dict[tuple[int, str, str], float] = {}
+
+
+def _check_rate_limit(bot_id: int, chat_id: str, cmd: str, limits: dict[str, int]) -> int | None:
+ """Check rate limit. Returns seconds to wait, or None if OK."""
+ category = get_rate_category(cmd)
+ cooldown = limits.get(category, limits.get("default", 10))
+ if cooldown <= 0:
+ return None
+ key = (bot_id, chat_id, category)
+ now = time.time()
+ last = _rate_limits.get(key, 0)
+ if now - last < cooldown:
+ return int(cooldown - (now - last)) + 1
+ _rate_limits[key] = now
+ return None
+
+
+async def handle_command(
+ bot: TelegramBot,
+ chat_id: str,
+ text: str,
+) -> str | list[dict[str, Any]] | None:
+ """Handle a bot command. Returns text response, media list, or None."""
+ cmd, args, count_override = parse_command(text)
+ if not cmd:
+ return None
+
+ config = bot.commands_config or {}
+ enabled = config.get("enabled", [])
+ default_count = min(config.get("default_count", 5), 20)
+ locale = config.get("locale", "en")
+ rate_limits = config.get("rate_limits", {})
+
+ if cmd == "start":
+ msgs = {
+ "en": "Hi! I'm your Notify Bridge bot. Use /help to see available commands.",
+ "ru": "Привет! Я бот Notify Bridge. Используйте /help для списка команд.",
+ }
+ return msgs.get(locale, msgs["en"])
+
+ if cmd not in enabled and cmd != "start":
+ return None # Silently ignore disabled commands
+
+ # Rate limit check
+ wait = _check_rate_limit(bot.id, chat_id, cmd, rate_limits)
+ if wait is not None:
+ msgs = {
+ "en": f"Please wait {wait}s before using this command again.",
+ "ru": f"Подождите {wait} сек. перед повторным использованием.",
+ }
+ return msgs.get(locale, msgs["en"])
+
+ count = min(count_override or default_count, 20)
+
+ # Dispatch
+ if cmd == "help":
+ return _cmd_help(enabled, locale)
+ if cmd == "status":
+ return await _cmd_status(bot, locale)
+ if cmd == "albums":
+ return await _cmd_albums(bot, locale)
+ if cmd == "events":
+ return await _cmd_events(bot, count, locale)
+ if cmd == "people":
+ return await _cmd_people(bot, locale)
+ if cmd in ("search", "find", "person", "place", "latest", "random",
+ "favorites", "summary", "memory"):
+ return await _cmd_immich(bot, cmd, args, count, locale)
+
+ return None
+
+
+def _cmd_help(enabled: list[str], locale: str) -> str:
+ lines = []
+ for cmd in enabled:
+ desc = COMMAND_DESCRIPTIONS.get(cmd, {})
+ lines.append(f"/{cmd} — {desc.get(locale, desc.get('en', ''))}")
+ header = {"en": "Available commands:", "ru": "Доступные команды:"}
+ return header.get(locale, header["en"]) + "\n" + "\n".join(lines)
+
+
+async def _get_bot_context(bot: TelegramBot) -> tuple[
+ list[Tracker], dict[int, ServiceProvider]
+]:
+ """Get trackers and providers associated with a bot via its targets."""
+ engine = get_engine()
+ async with AsyncSession(engine) as session:
+ # Find targets that use this bot's token
+ result = await session.exec(
+ select(NotificationTarget).where(
+ NotificationTarget.type == "telegram",
+ NotificationTarget.user_id == bot.user_id,
+ )
+ )
+ targets = result.all()
+ bot_target_ids = {t.id for t in targets if t.config.get("bot_token") == bot.token}
+
+ if not bot_target_ids:
+ return [], {}
+
+ # Find trackers linked to these targets via TrackerTarget
+ tt_result = await session.exec(
+ select(TrackerTarget).where(TrackerTarget.target_id.in_(bot_target_ids))
+ )
+ all_links = tt_result.all()
+ tracker_ids = {tt.tracker_id for tt in all_links}
+
+ if not tracker_ids:
+ return [], {}
+
+ trackers = []
+ provider_ids = set()
+ for tid in tracker_ids:
+ tracker = await session.get(Tracker, tid)
+ if tracker:
+ trackers.append(tracker)
+ provider_ids.add(tracker.provider_id)
+
+ providers_map: dict[int, ServiceProvider] = {}
+ for pid in provider_ids:
+ provider = await session.get(ServiceProvider, pid)
+ if provider:
+ providers_map[pid] = provider
+
+ return trackers, providers_map
+
+
+async def _check_native_memory(bot: TelegramBot) -> bool:
+ """Check if any tracker-target linked to this bot uses native memory source."""
+ engine = get_engine()
+ async with AsyncSession(engine) as session:
+ result = await session.exec(
+ select(NotificationTarget).where(
+ NotificationTarget.type == "telegram",
+ NotificationTarget.user_id == bot.user_id,
+ )
+ )
+ targets = result.all()
+ bot_target_ids = {t.id for t in targets if t.config.get("bot_token") == bot.token}
+ if not bot_target_ids:
+ return False
+ tt_result = await session.exec(
+ select(TrackerTarget).where(TrackerTarget.target_id.in_(bot_target_ids))
+ )
+ for tt in tt_result.all():
+ if tt.tracking_config_id:
+ tc = await session.get(TrackingConfig, tt.tracking_config_id)
+ if tc and tc.memory_source == "native":
+ return True
+ return False
+
+
+async def _cmd_status(bot: TelegramBot, locale: str) -> str:
+ trackers, _ = await _get_bot_context(bot)
+ active = sum(1 for t in trackers if t.enabled)
+ total = len(trackers)
+ total_albums = sum(len(t.collection_ids or []) for t in trackers)
+
+ engine = get_engine()
+ async with AsyncSession(engine) as session:
+ result = await session.exec(
+ select(EventLog).order_by(EventLog.created_at.desc()).limit(1)
+ )
+ last_event = result.first()
+ last_str = last_event.created_at.strftime("%Y-%m-%d %H:%M") if last_event else "-"
+
+ if locale == "ru":
+ return (
+ f"📊 Статус\n"
+ f"Трекеры: {active}/{total} активных\n"
+ f"Альбомы: {total_albums}\n"
+ f"Последнее событие: {last_str}"
+ )
+ return (
+ f"📊 Status\n"
+ f"Trackers: {active}/{total} active\n"
+ f"Albums: {total_albums}\n"
+ f"Last event: {last_str}"
+ )
+
+
+async def _cmd_albums(bot: TelegramBot, locale: str) -> str:
+ trackers, providers_map = await _get_bot_context(bot)
+ if not trackers:
+ return "No tracked albums." if locale == "en" else "Нет отслеживаемых альбомов."
+
+ lines = []
+ async with aiohttp.ClientSession() as http:
+ for tracker in trackers:
+ provider = providers_map.get(tracker.provider_id)
+ if not provider or provider.type != "immich":
+ continue
+ immich = make_immich_provider(http, provider)
+ for album_id in (tracker.collection_ids or []):
+ try:
+ album = await immich.client.get_album(album_id)
+ if album:
+ lines.append(f" • {album.name} ({album.asset_count} assets)")
+ except Exception:
+ lines.append(f" • {album_id[:8]}... (error)")
+
+ header = "📚 Tracked albums:" if locale == "en" else "📚 Отслеживаемые альбомы:"
+ return header + "\n" + "\n".join(lines) if lines else header + "\n (none)"
+
+
+async def _cmd_events(bot: TelegramBot, count: int, locale: str) -> str:
+ trackers, _ = await _get_bot_context(bot)
+ tracker_ids = [t.id for t in trackers]
+ if not tracker_ids:
+ return "No events." if locale == "en" else "Нет событий."
+
+ engine = get_engine()
+ async with AsyncSession(engine) as session:
+ result = await session.exec(
+ select(EventLog)
+ .where(EventLog.tracker_id.in_(tracker_ids))
+ .order_by(EventLog.created_at.desc())
+ .limit(count)
+ )
+ events = result.all()
+
+ if not events:
+ return "No events yet." if locale == "en" else "Пока нет событий."
+
+ header = f"📋 Last {len(events)} events:" if locale == "en" else f"📋 Последние {len(events)} событий:"
+ lines = []
+ for e in events:
+ ts = e.created_at.strftime("%m/%d %H:%M")
+ lines.append(f" {ts} — {e.event_type}: {e.collection_name}")
+ return header + "\n" + "\n".join(lines)
+
+
+async def _cmd_people(bot: TelegramBot, locale: str) -> str:
+ _, providers_map = await _get_bot_context(bot)
+ all_people: dict[str, str] = {}
+
+ async with aiohttp.ClientSession() as http:
+ for provider in providers_map.values():
+ if provider.type != "immich":
+ continue
+ immich = make_immich_provider(http, provider)
+ people = await immich.client.get_people()
+ all_people.update(people)
+
+ if not all_people:
+ return "No people detected." if locale == "en" else "Люди не обнаружены."
+
+ names = sorted(all_people.values())
+ header = f"👥 {len(names)} people:" if locale == "en" else f"👥 {len(names)} людей:"
+ return header + "\n" + ", ".join(names)
+
+
+async def _cmd_immich(
+ bot: TelegramBot, cmd: str, args: str, count: int, locale: str,
+) -> str | list[dict[str, Any]]:
+ """Handle commands that need Immich API access and may return media."""
+ trackers, providers_map = await _get_bot_context(bot)
+ if not trackers:
+ return "No trackers configured." if locale == "en" else "Трекеры не настроены."
+
+ all_album_ids: list[str] = []
+ for t in trackers:
+ all_album_ids.extend(t.collection_ids or [])
+
+ first_tracker = trackers[0]
+ provider = providers_map.get(first_tracker.provider_id)
+ if not provider or provider.type != "immich":
+ return "Server not found." if locale == "en" else "Сервер не найден."
+
+ config = bot.commands_config or {}
+ response_mode = config.get("response_mode", "media")
+ async with aiohttp.ClientSession() as http:
+ immich = make_immich_provider(http, provider)
+ client = immich.client
+
+ if cmd == "search":
+ if not args:
+ return "Usage: /search
" if locale == "en" else "Использование: /search <запрос>"
+ assets = await client.search_smart(args, album_ids=all_album_ids, limit=count)
+ return _format_assets(assets, cmd, args, locale, response_mode, client)
+
+ if cmd == "find":
+ if not args:
+ return "Usage: /find " if locale == "en" else "Использование: /find <текст>"
+ assets = await client.search_metadata(args, album_ids=all_album_ids, limit=count)
+ return _format_assets(assets, cmd, args, locale, response_mode, client)
+
+ if cmd == "person":
+ if not args:
+ return "Usage: /person " if locale == "en" else "Использование: /person <имя>"
+ people = await client.get_people()
+ person_id = None
+ for pid, pname in people.items():
+ if args.lower() in pname.lower():
+ person_id = pid
+ break
+ if not person_id:
+ return f"Person '{args}' not found." if locale == "en" else f"Человек '{args}' не найден."
+ assets = await client.search_by_person(person_id, limit=count)
+ return _format_assets(assets, cmd, args, locale, response_mode, client)
+
+ if cmd == "place":
+ if not args:
+ return "Usage: /place " if locale == "en" else "Использование: /place <место>"
+ assets = await client.search_smart(
+ f"photos taken in {args}", album_ids=all_album_ids, limit=count
+ )
+ return _format_assets(assets, cmd, args, locale, response_mode, client)
+
+ if cmd == "favorites":
+ fav_assets: list[dict[str, Any]] = []
+ for album_id in all_album_ids[:10]:
+ try:
+ album = await client.get_album(album_id)
+ if album:
+ for aid, asset in list(album.assets.items())[:50]:
+ if asset.is_favorite and len(fav_assets) < count:
+ fav_assets.append({
+ "id": asset.id, "originalFileName": asset.filename,
+ "type": asset.type,
+ })
+ except Exception:
+ pass
+ if len(fav_assets) >= count:
+ break
+ return _format_assets(fav_assets, cmd, "", locale, response_mode, client)
+
+ if cmd == "latest":
+ latest_assets: list[dict[str, Any]] = []
+ for album_id in all_album_ids[:10]:
+ try:
+ album = await client.get_album(album_id)
+ if album:
+ for aid, asset in list(album.assets.items())[:count]:
+ latest_assets.append({
+ "id": asset.id, "originalFileName": asset.filename,
+ "type": asset.type, "createdAt": asset.created_at,
+ })
+ except Exception:
+ pass
+ latest_assets.sort(key=lambda a: a.get("createdAt", ""), reverse=True)
+ return _format_assets(latest_assets[:count], cmd, "", locale, response_mode, client)
+
+ if cmd == "random":
+ random_assets: list[dict[str, Any]] = []
+ for album_id in all_album_ids[:10]:
+ try:
+ album = await client.get_album(album_id)
+ if album:
+ asset_list = list(album.assets.values())
+ sampled = rng.sample(asset_list, min(count, len(asset_list)))
+ for asset in sampled:
+ random_assets.append({
+ "id": asset.id, "originalFileName": asset.filename,
+ "type": asset.type,
+ })
+ except Exception:
+ pass
+ rng.shuffle(random_assets)
+ return _format_assets(random_assets[:count], cmd, "", locale, response_mode, client)
+
+ if cmd == "summary":
+ lines = []
+ for album_id in all_album_ids:
+ try:
+ album = await client.get_album(album_id)
+ if album:
+ lines.append(f" • {album.name}: {album.asset_count} assets")
+ except Exception:
+ pass
+ header = f"📋 Album summary ({len(lines)}):" if locale == "en" else f"📋 Сводка альбомов ({len(lines)}):"
+ return header + "\n" + "\n".join(lines) if lines else header
+
+ if cmd == "memory":
+ # Check if any linked tracking config uses native memories
+ use_native = await _check_native_memory(bot)
+
+ today = datetime.now(timezone.utc)
+ memory_assets: list[dict[str, Any]] = []
+
+ if use_native:
+ # Use Immich native memories API
+ memories = await client.get_memories()
+ tracked_ids = set(all_album_ids) if all_album_ids else None
+ for mem in memories:
+ year = mem.get("data", {}).get("year")
+ for raw_asset in mem.get("assets", []):
+ # Optional album filtering
+ if tracked_ids:
+ asset_albums = raw_asset.get("albums", [])
+ if not any(a.get("id") in tracked_ids for a in asset_albums):
+ continue
+ memory_assets.append({
+ "id": raw_asset.get("id", ""),
+ "originalFileName": raw_asset.get("originalFileName", ""),
+ "type": raw_asset.get("type", "IMAGE"),
+ "createdAt": raw_asset.get("fileCreatedAt", raw_asset.get("createdAt", "")),
+ "year": year,
+ })
+ else:
+ # Album-scanning fallback
+ month_day = (today.month, today.day)
+ for album_id in all_album_ids[:10]:
+ try:
+ album = await client.get_album(album_id)
+ if album:
+ for aid, asset in album.assets.items():
+ try:
+ dt = datetime.fromisoformat(asset.created_at.replace("Z", "+00:00"))
+ if (dt.month, dt.day) == month_day and dt.year != today.year:
+ memory_assets.append({
+ "id": asset.id, "originalFileName": asset.filename,
+ "type": asset.type, "createdAt": asset.created_at,
+ "year": dt.year,
+ })
+ except (ValueError, AttributeError):
+ pass
+ except Exception:
+ pass
+
+ memory_assets = memory_assets[:count]
+ if not memory_assets:
+ return "No memories for today." if locale == "en" else "Нет воспоминаний за сегодня."
+ return _format_assets(memory_assets, cmd, "", locale, response_mode, client)
+
+ return "Unknown command." if locale == "en" else "Неизвестная команда."
+
+
+def _format_assets(
+ assets: list[dict[str, Any]], cmd: str, query: str,
+ locale: str, response_mode: str, client: Any,
+) -> str | list[dict[str, Any]]:
+ """Format asset results as text or media payload."""
+ if not assets:
+ return {"en": "No results found.", "ru": "Ничего не найдено."}.get(locale, "No results found.")
+
+ if response_mode == "media":
+ media_items = []
+ for asset in assets:
+ asset_id = asset.get("id", "")
+ filename = asset.get("originalFileName", "")
+ year = asset.get("year", "")
+ caption = f"{filename} ({year})" if year else filename
+ media_items.append({
+ "type": "photo",
+ "asset_id": asset_id,
+ "caption": caption,
+ "thumbnail_url": f"{client.url}/api/assets/{asset_id}/thumbnail?size=preview",
+ "api_key": client.api_key,
+ })
+ return media_items
+
+ # Text mode
+ header_map = {
+ "search": {"en": f'🔍 Results for "{query}":', "ru": f'🔍 Результаты для "{query}":'},
+ "find": {"en": f'📄 Files matching "{query}":', "ru": f'📄 Файлы по запросу "{query}":'},
+ "person": {"en": f"👤 Photos of {query}:", "ru": f"👤 Фото {query}:"},
+ "place": {"en": f"📍 Photos from {query}:", "ru": f"📍 Фото из {query}:"},
+ "favorites": {"en": "⭐ Favorites:", "ru": "⭐ Избранное:"},
+ "latest": {"en": "📸 Latest:", "ru": "📸 Последние:"},
+ "random": {"en": "🎲 Random:", "ru": "🎲 Случайные:"},
+ "memory": {"en": "📅 On this day:", "ru": "📅 В этот день:"},
+ }
+ header = header_map.get(cmd, {}).get(locale, f"Results ({len(assets)}):")
+ lines = []
+ for a in assets:
+ name = a.get("originalFileName", a.get("id", "?")[:8])
+ year = a.get("year", "")
+ lines.append(f" • {name} ({year})" if year else f" • {name}")
+ return header + "\n" + "\n".join(lines)
+
+
+async def send_media_group(
+ bot_token: str, chat_id: str, media_items: list[dict[str, Any]],
+) -> None:
+ """Send media items as a Telegram media group (album).
+
+ Falls back to individual sendPhoto calls if sendMediaGroup fails.
+ Telegram allows max 10 items per media group.
+ """
+ if not media_items:
+ return
+
+ async with aiohttp.ClientSession() as http:
+ # Download all thumbnails first
+ downloaded: list[tuple[bytes, str, str]] = [] # (photo_bytes, asset_id, caption)
+ for item in media_items:
+ asset_id = item.get("asset_id", "")
+ caption = item.get("caption", "")
+ thumb_url = item.get("thumbnail_url", "")
+ api_key = item.get("api_key", "")
+ try:
+ async with http.get(thumb_url, headers={"x-api-key": api_key}) as resp:
+ if resp.status != 200:
+ _LOGGER.warning("Failed to download thumbnail for %s: HTTP %d", asset_id, resp.status)
+ continue
+ photo_bytes = await resp.read()
+ downloaded.append((photo_bytes, asset_id, caption))
+ except aiohttp.ClientError:
+ continue
+
+ if not downloaded:
+ return
+
+ # Send in groups of 10 (Telegram limit)
+ for i in range(0, len(downloaded), 10):
+ chunk = downloaded[i:i + 10]
+
+ if len(chunk) == 1:
+ # Single photo — use sendPhoto
+ photo_bytes, asset_id, caption = chunk[0]
+ data = aiohttp.FormData()
+ data.add_field("chat_id", chat_id)
+ data.add_field("photo", photo_bytes, filename=f"{asset_id}.jpg", content_type="image/jpeg")
+ if caption:
+ data.add_field("caption", caption)
+ try:
+ async with http.post(f"{TELEGRAM_API_BASE_URL}{bot_token}/sendPhoto", data=data) as resp:
+ if resp.status != 200:
+ result = await resp.json()
+ _LOGGER.warning("Failed to send photo: %s", result.get("description"))
+ except aiohttp.ClientError as err:
+ _LOGGER.warning("Failed to send photo: %s", err)
+ else:
+ # Multiple photos — use sendMediaGroup
+ import json as _json
+ data = aiohttp.FormData()
+ data.add_field("chat_id", chat_id)
+ media_array = []
+ for idx, (photo_bytes, asset_id, caption) in enumerate(chunk):
+ attach_key = f"photo_{idx}"
+ media_obj: dict[str, Any] = {"type": "photo", "media": f"attach://{attach_key}"}
+ if caption:
+ media_obj["caption"] = caption
+ media_array.append(media_obj)
+ data.add_field(attach_key, photo_bytes, filename=f"{asset_id}.jpg", content_type="image/jpeg")
+ data.add_field("media", _json.dumps(media_array))
+ try:
+ async with http.post(f"{TELEGRAM_API_BASE_URL}{bot_token}/sendMediaGroup", data=data) as resp:
+ if resp.status != 200:
+ result = await resp.json()
+ _LOGGER.warning("Failed to send media group: %s", result.get("description"))
+ except aiohttp.ClientError as err:
+ _LOGGER.warning("Failed to send media group: %s", err)
+
+
+async def register_commands_with_telegram(bot: TelegramBot) -> bool:
+ """Register enabled commands with Telegram BotFather API."""
+ config = bot.commands_config or {}
+ enabled = config.get("enabled", [])
+ locale = config.get("locale", "en")
+
+ commands = []
+ for cmd in enabled:
+ desc = COMMAND_DESCRIPTIONS.get(cmd, {})
+ commands.append({
+ "command": cmd,
+ "description": desc.get(locale, desc.get("en", cmd)),
+ })
+
+ async with aiohttp.ClientSession() as http:
+ url = f"{TELEGRAM_API_BASE_URL}{bot.token}/setMyCommands"
+ payload: dict[str, Any] = {"commands": commands}
+ try:
+ async with http.post(url, json=payload) as resp:
+ result = await resp.json()
+ if result.get("ok"):
+ _LOGGER.info("Registered %d commands for bot @%s", len(commands), bot.bot_username)
+ # Also register for the other locale
+ other_locale = "ru" if locale == "en" else "en"
+ other_commands = [
+ {"command": c, "description": COMMAND_DESCRIPTIONS.get(c, {}).get(other_locale, c)}
+ for c in enabled
+ ]
+ async with http.post(url, json={"commands": other_commands, "language_code": other_locale}) as r2:
+ pass
+ return True
+ _LOGGER.warning("Failed to register commands: %s", result.get("description"))
+ return False
+ except aiohttp.ClientError as err:
+ _LOGGER.error("Failed to register commands: %s", err)
+ return False
diff --git a/packages/server/src/notify_bridge_server/commands/parser.py b/packages/server/src/notify_bridge_server/commands/parser.py
new file mode 100644
index 0000000..749a19c
--- /dev/null
+++ b/packages/server/src/notify_bridge_server/commands/parser.py
@@ -0,0 +1,40 @@
+"""Command text parsing — extracts command name, arguments, and optional count."""
+
+from __future__ import annotations
+
+
+def parse_command(text: str) -> tuple[str, str, int | None]:
+ """Parse a command message into (command, args, count).
+
+ Examples:
+ "/search sunset" -> ("search", "sunset", None)
+ "/latest Family 5" -> ("latest", "Family", 5)
+ "/events 10" -> ("events", "", 10)
+ "/help@mybot" -> ("help", "", None)
+ """
+ text = text.strip()
+ if not text.startswith("/"):
+ return ("", text, None)
+
+ # Strip @botname suffix: /command@botname args
+ parts = text[1:].split(None, 1)
+ cmd = parts[0].split("@")[0].lower()
+ rest = parts[1] if len(parts) > 1 else ""
+
+ # Try to extract trailing count
+ count = None
+ rest_parts = rest.rsplit(None, 1)
+ if len(rest_parts) == 2:
+ try:
+ count = int(rest_parts[1])
+ rest = rest_parts[0]
+ except ValueError:
+ pass
+ elif rest_parts and rest_parts[0]:
+ try:
+ count = int(rest_parts[0])
+ rest = ""
+ except ValueError:
+ pass
+
+ return (cmd, rest.strip(), count)
diff --git a/packages/server/src/notify_bridge_server/commands/registry.py b/packages/server/src/notify_bridge_server/commands/registry.py
new file mode 100644
index 0000000..60f858a
--- /dev/null
+++ b/packages/server/src/notify_bridge_server/commands/registry.py
@@ -0,0 +1,42 @@
+"""Command definitions — descriptions, categories, and rate limit grouping."""
+
+from __future__ import annotations
+
+# Command descriptions for Telegram menu (EN / RU)
+COMMAND_DESCRIPTIONS: dict[str, dict[str, str]] = {
+ "status": {"en": "Show tracker status", "ru": "Показать статус трекеров"},
+ "albums": {"en": "List tracked albums", "ru": "Список отслеживаемых альбомов"},
+ "events": {"en": "Show recent events", "ru": "Показать последние события"},
+ "summary": {"en": "Send album summary now", "ru": "Отправить сводку альбомов"},
+ "latest": {"en": "Show latest photos", "ru": "Показать последние фото"},
+ "memory": {"en": "On This Day memories", "ru": "Воспоминания за этот день"},
+ "random": {"en": "Send random photo", "ru": "Отправить случайное фото"},
+ "search": {"en": "Smart search (AI)", "ru": "Умный поиск (AI)"},
+ "find": {"en": "Search by filename", "ru": "Поиск по имени файла"},
+ "person": {"en": "Find photos of person", "ru": "Найти фото человека"},
+ "place": {"en": "Find photos by location", "ru": "Найти фото по месту"},
+ "favorites": {"en": "Show favorites", "ru": "Показать избранное"},
+ "people": {"en": "List detected people", "ru": "Список людей"},
+ "help": {"en": "Show available commands", "ru": "Показать доступные команды"},
+}
+
+ALL_COMMANDS = list(COMMAND_DESCRIPTIONS.keys())
+
+# Map commands to rate limit categories
+_RATE_CATEGORY: dict[str, str] = {
+ "search": "search", "find": "search", "person": "search",
+ "place": "search", "favorites": "search", "people": "search",
+}
+
+
+def get_rate_category(cmd: str) -> str:
+ return _RATE_CATEGORY.get(cmd, "default")
+
+
+DEFAULT_COMMANDS_CONFIG = {
+ "enabled": ["help", "status", "albums", "events", "latest", "random", "favorites", "summary", "memory"],
+ "locale": "en",
+ "response_mode": "media",
+ "default_count": 5,
+ "rate_limits": {"search": 30, "default": 10},
+}
diff --git a/packages/server/src/notify_bridge_server/commands/webhook.py b/packages/server/src/notify_bridge_server/commands/webhook.py
new file mode 100644
index 0000000..90e16a9
--- /dev/null
+++ b/packages/server/src/notify_bridge_server/commands/webhook.py
@@ -0,0 +1,136 @@
+"""Telegram webhook handler for bot commands."""
+
+from __future__ import annotations
+
+import logging
+from typing import Any
+
+import aiohttp
+from fastapi import APIRouter, Depends, Header, HTTPException, Request
+from sqlmodel import select
+from sqlmodel.ext.asyncio.session import AsyncSession
+
+from notify_bridge_core.notifications.telegram.media import TELEGRAM_API_BASE_URL
+
+from ..database.engine import get_session
+from ..database.models import TelegramBot
+from ..services.telegram import save_chat_from_webhook
+from .handler import handle_command, send_media_group
+
+_LOGGER = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/api/telegram", tags=["telegram-webhook"])
+
+# Webhook secret — set via NOTIFY_BRIDGE_TELEGRAM_WEBHOOK_SECRET env var
+_webhook_secret: str | None = None
+
+
+def set_webhook_secret(secret: str | None) -> None:
+ global _webhook_secret
+ _webhook_secret = secret
+
+
+@router.post("/webhook/{webhook_id}")
+async def telegram_webhook(
+ webhook_id: str,
+ request: Request,
+ x_telegram_bot_api_secret_token: str | None = Header(default=None),
+ session: AsyncSession = Depends(get_session),
+):
+ """Handle incoming Telegram messages — route commands to handlers."""
+ # Validate webhook secret if configured
+ if _webhook_secret:
+ if x_telegram_bot_api_secret_token != _webhook_secret:
+ raise HTTPException(status_code=403, detail="Invalid webhook secret")
+
+ # Find bot by opaque webhook path ID (not by token — token must not appear in URLs)
+ bot_result = await session.exec(
+ select(TelegramBot).where(TelegramBot.webhook_path_id == webhook_id)
+ )
+ bot = bot_result.first()
+ if not bot:
+ raise HTTPException(status_code=403, detail="Unknown webhook")
+
+ try:
+ update = await request.json()
+ except Exception:
+ return {"ok": True, "error": "invalid_json"}
+
+ message = update.get("message")
+ if not message:
+ return {"ok": True, "skipped": "no_message"}
+
+ chat_info = message.get("chat", {})
+ chat_id = str(chat_info.get("id", ""))
+ text = message.get("text", "")
+
+ if not chat_id or not text:
+ return {"ok": True, "skipped": "empty"}
+
+ # Auto-persist chat from incoming message
+ try:
+ await save_chat_from_webhook(session, bot.id, chat_info)
+ await session.commit()
+ except Exception:
+ _LOGGER.warning("Failed to auto-save chat %s", chat_id, exc_info=True)
+
+ # Handle commands
+ if text.startswith("/"):
+ cmd_response = await handle_command(bot, chat_id, text)
+ if cmd_response is not None:
+ if isinstance(cmd_response, list):
+ await send_media_group(bot.token, chat_id, cmd_response)
+ else:
+ await _send_reply(bot.token, chat_id, cmd_response)
+ return {"ok": True}
+
+ return {"ok": True, "skipped": "not_a_command"}
+
+
+async def _send_reply(bot_token: str, chat_id: str, text: str) -> None:
+ """Send a text reply via Telegram Bot API."""
+ async with aiohttp.ClientSession() as http_session:
+ url = f"{TELEGRAM_API_BASE_URL}{bot_token}/sendMessage"
+ payload: dict[str, Any] = {"chat_id": chat_id, "text": text, "parse_mode": "HTML"}
+ try:
+ async with http_session.post(url, json=payload) as resp:
+ if resp.status != 200:
+ result = await resp.json()
+ _LOGGER.debug("Telegram reply failed: %s", result.get("description"))
+ # Retry without parse_mode if HTML fails
+ if "parse" in str(result.get("description", "")).lower():
+ payload.pop("parse_mode", None)
+ async with http_session.post(url, json=payload) as retry_resp:
+ if retry_resp.status != 200:
+ _LOGGER.warning("Telegram reply failed on retry")
+ except aiohttp.ClientError as err:
+ _LOGGER.error("Failed to send Telegram reply: %s", err)
+
+
+async def register_webhook(bot_token: str, webhook_url: str, secret: str | None = None) -> dict:
+ """Register webhook URL with Telegram Bot API."""
+ async with aiohttp.ClientSession() as http:
+ url = f"{TELEGRAM_API_BASE_URL}{bot_token}/setWebhook"
+ payload: dict[str, Any] = {"url": webhook_url}
+ if secret:
+ payload["secret_token"] = secret
+ try:
+ async with http.post(url, json=payload) as resp:
+ result = await resp.json()
+ if result.get("ok"):
+ return {"success": True}
+ return {"success": False, "error": result.get("description")}
+ except aiohttp.ClientError as err:
+ return {"success": False, "error": str(err)}
+
+
+async def unregister_webhook(bot_token: str) -> dict:
+ """Remove webhook from Telegram Bot API."""
+ async with aiohttp.ClientSession() as http:
+ url = f"{TELEGRAM_API_BASE_URL}{bot_token}/deleteWebhook"
+ try:
+ async with http.post(url) as resp:
+ result = await resp.json()
+ return {"success": result.get("ok", False)}
+ except aiohttp.ClientError as err:
+ return {"success": False, "error": str(err)}
diff --git a/packages/server/src/notify_bridge_server/database/migrations.py b/packages/server/src/notify_bridge_server/database/migrations.py
index 76a9384..5ac6011 100644
--- a/packages/server/src/notify_bridge_server/database/migrations.py
+++ b/packages/server/src/notify_bridge_server/database/migrations.py
@@ -44,6 +44,30 @@ async def migrate_schema(engine: AsyncEngine) -> None:
await conn.execute(text(sql))
logger.info("Added %s column to event_log table", col)
+ # Add commands_config to telegram_bot if missing
+ if not await _has_column("telegram_bot", "commands_config"):
+ await conn.execute(
+ text("ALTER TABLE telegram_bot ADD COLUMN commands_config TEXT DEFAULT '{}'")
+ )
+ logger.info("Added commands_config column to telegram_bot table")
+
+ # Add webhook_path_id to telegram_bot if missing
+ if not await _has_column("telegram_bot", "webhook_path_id"):
+ await conn.execute(
+ text("ALTER TABLE telegram_bot ADD COLUMN webhook_path_id TEXT DEFAULT ''")
+ )
+ logger.info("Added webhook_path_id column to telegram_bot table")
+ # Backfill existing bots with unique IDs
+ import uuid
+ bots = (await conn.execute(text("SELECT id FROM telegram_bot"))).fetchall()
+ for bot in bots:
+ await conn.execute(
+ text("UPDATE telegram_bot SET webhook_path_id = :wid WHERE id = :bid"),
+ {"wid": uuid.uuid4().hex, "bid": bot[0]},
+ )
+ if bots:
+ logger.info("Backfilled webhook_path_id for %d existing bots", len(bots))
+
# Add date_only_format to template_config if missing
if not await _has_column("template_config", "date_only_format"):
await conn.execute(
@@ -51,6 +75,20 @@ async def migrate_schema(engine: AsyncEngine) -> None:
)
logger.info("Added date_only_format column to template_config table")
+ # Add update_mode to telegram_bot if missing
+ if not await _has_column("telegram_bot", "update_mode"):
+ await conn.execute(
+ text("ALTER TABLE telegram_bot ADD COLUMN update_mode TEXT DEFAULT 'polling'")
+ )
+ logger.info("Added update_mode column to telegram_bot table")
+
+ # Add memory_source to tracking_config if missing
+ if not await _has_column("tracking_config", "memory_source"):
+ await conn.execute(
+ text("ALTER TABLE tracking_config ADD COLUMN memory_source TEXT DEFAULT 'albums'")
+ )
+ logger.info("Added memory_source column to tracking_config table")
+
# Add collection_name and shared to tracker_state if missing
if not await _has_column("tracker_state", "collection_name"):
await conn.execute(
@@ -190,7 +228,10 @@ async def migrate_tracker_targets(engine: AsyncEngine) -> None:
):
target_bot_map[tgt[0]] = bot_id
except Exception:
- pass
+ logger.warning(
+ "Failed to match bot token for target %s", tgt[0],
+ exc_info=True,
+ )
# Create TrackerTarget rows
import json
diff --git a/packages/server/src/notify_bridge_server/database/models.py b/packages/server/src/notify_bridge_server/database/models.py
index 9268e71..9c98046 100644
--- a/packages/server/src/notify_bridge_server/database/models.py
+++ b/packages/server/src/notify_bridge_server/database/models.py
@@ -4,6 +4,7 @@ from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
+from uuid import uuid4
from sqlmodel import JSON, Column, Field, SQLModel
@@ -44,6 +45,9 @@ class TelegramBot(SQLModel, table=True):
icon: str = Field(default="")
bot_username: str = Field(default="")
bot_id: int = Field(default=0)
+ webhook_path_id: str = Field(default_factory=lambda: uuid4().hex)
+ update_mode: str = Field(default="polling") # "polling" or "webhook"
+ commands_config: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=_utcnow)
@@ -106,6 +110,7 @@ class TrackingConfig(SQLModel, table=True):
# Memory mode
memory_enabled: bool = Field(default=False)
+ memory_source: str = Field(default="albums") # "albums" or "native"
memory_times: str = Field(default="09:00")
memory_collection_mode: str = Field(default="combined")
memory_limit: int = Field(default=10)
@@ -221,13 +226,22 @@ class EventLog(SQLModel, table=True):
__tablename__ = "event_log"
id: int | None = Field(default=None, primary_key=True)
- tracker_id: int | None = Field(default=None, foreign_key="tracker.id")
+ tracker_id: int | None = Field(default=None, foreign_key="tracker.id", index=True)
tracker_name: str = Field(default="")
- provider_id: int | None = Field(default=None)
+ provider_id: int | None = Field(default=None, index=True)
provider_name: str = Field(default="")
- event_type: str
+ event_type: str = Field(index=True)
collection_id: str
collection_name: str
assets_count: int = Field(default=0)
details: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=_utcnow)
+
+
+class AppSetting(SQLModel, table=True):
+ """Key-value app-level settings (admin-configurable)."""
+
+ __tablename__ = "app_setting"
+
+ key: str = Field(primary_key=True)
+ value: str = Field(default="")
diff --git a/packages/server/src/notify_bridge_server/main.py b/packages/server/src/notify_bridge_server/main.py
index 7431615..8746c6c 100644
--- a/packages/server/src/notify_bridge_server/main.py
+++ b/packages/server/src/notify_bridge_server/main.py
@@ -24,6 +24,8 @@ from .api.telegram_bots import router as telegram_bots_router
from .api.users import router as users_router
from .api.status import router as status_router
from .api.template_vars import router as template_vars_router
+from .api.app_settings import router as app_settings_router
+from .commands.webhook import router as webhook_router, set_webhook_secret
@asynccontextmanager
@@ -36,6 +38,12 @@ async def lifespan(app: FastAPI):
await migrate_schema(engine)
await migrate_tracker_targets(engine)
await _seed_default_templates()
+ # Configure webhook secret from DB setting (falls back to env var)
+ from sqlmodel.ext.asyncio.session import AsyncSession as _AS
+ from .api.app_settings import get_setting as _get_setting
+ async with _AS(engine) as _session:
+ _secret = await _get_setting(_session, "telegram_webhook_secret")
+ set_webhook_secret(_secret or None)
from .services.scheduler import start_scheduler
await start_scheduler()
yield
@@ -55,6 +63,8 @@ app.include_router(targets_router)
app.include_router(telegram_bots_router)
app.include_router(users_router)
app.include_router(status_router)
+app.include_router(app_settings_router)
+app.include_router(webhook_router)
@app.get("/api/health")
diff --git a/packages/server/src/notify_bridge_server/services/__init__.py b/packages/server/src/notify_bridge_server/services/__init__.py
index daf1d2e..c04a3d9 100644
--- a/packages/server/src/notify_bridge_server/services/__init__.py
+++ b/packages/server/src/notify_bridge_server/services/__init__.py
@@ -1 +1,17 @@
-"""Business logic services — scheduler, watcher, notifier."""
+"""Shared service utilities."""
+
+from notify_bridge_core.providers.immich import ImmichServiceProvider
+
+from ..database.models import ServiceProvider
+
+
+def make_immich_provider(http_session, provider: ServiceProvider) -> ImmichServiceProvider:
+ """Create an ImmichServiceProvider from a DB provider model."""
+ config = provider.config or {}
+ return ImmichServiceProvider(
+ http_session,
+ config.get("url", ""),
+ config.get("api_key", ""),
+ config.get("external_domain"),
+ provider.name,
+ )
diff --git a/packages/server/src/notify_bridge_server/services/notifier.py b/packages/server/src/notify_bridge_server/services/notifier.py
index f3159a4..148b434 100644
--- a/packages/server/src/notify_bridge_server/services/notifier.py
+++ b/packages/server/src/notify_bridge_server/services/notifier.py
@@ -1,6 +1,7 @@
-"""Test notification sender."""
+"""Notification sender — unified send logic for all paths (dispatch + test)."""
import logging
+from typing import Any
import aiohttp
@@ -25,41 +26,69 @@ def _get_test_message(locale: str, target_type: str) -> str:
return msgs.get(target_type, msgs.get("webhook", "Test"))
-async def send_test_notification(target: NotificationTarget, locale: str = "en") -> dict:
- """Send a simple test message to a notification target."""
+async def send_to_target(target: NotificationTarget, message: str) -> dict:
+ """Send a message to a target, respecting all target config settings.
+
+ This is the SINGLE send path used by dispatch, test, and real-data notifications.
+ """
try:
if target.type == "telegram":
- return await _test_telegram(target, locale)
+ return await _send_telegram(target, message)
elif target.type == "webhook":
- return await _test_webhook(target, locale)
+ return await _send_webhook(target, message)
return {"success": False, "error": f"Unknown target type: {target.type}"}
except Exception as e:
- _LOGGER.error("Test notification failed: %s", e)
+ _LOGGER.error("Send failed: %s", e)
return {"success": False, "error": str(e)}
-async def _test_telegram(target: NotificationTarget, locale: str = "en") -> dict:
+async def _send_telegram(target: NotificationTarget, message: str) -> dict:
from notify_bridge_core.notifications.telegram.client import TelegramClient
bot_token = target.config.get("bot_token")
chat_id = target.config.get("chat_id")
+ disable_preview = target.config.get("disable_url_preview", False)
+
if not bot_token or not chat_id:
return {"success": False, "error": "Missing bot_token or chat_id"}
async with aiohttp.ClientSession() as session:
client = TelegramClient(session, bot_token)
- return await client.send_notification(
+ return await client.send_message(
chat_id=str(chat_id),
- caption=_get_test_message(locale, "telegram"),
+ text=message,
+ disable_web_page_preview=bool(disable_preview),
)
+async def _send_webhook(target: NotificationTarget, message: str, event_type: str = "notification") -> dict:
+ from notify_bridge_core.notifications.webhook.client import WebhookClient
+
+ url = target.config.get("url")
+ headers = target.config.get("headers", {})
+ if not url:
+ return {"success": False, "error": "Missing url in target config"}
+
+ async with aiohttp.ClientSession() as session:
+ client = WebhookClient(session, url, headers)
+ return await client.send({"message": message, "event_type": event_type})
+
+
+# --- Public API used by routes ---
+
+
+async def send_test_notification(target: NotificationTarget, locale: str = "en") -> dict:
+ """Send a simple test message."""
+ message = _get_test_message(locale, target.type)
+ return await send_to_target(target, message)
+
+
async def send_test_template_notification(
target: NotificationTarget, slot: str, template_str: str
) -> dict:
- """Render a template slot with sample data and send it to a target."""
+ """Render a template slot with sample data and send."""
from jinja2.sandbox import SandboxedEnvironment
- from ..api.template_configs import _SAMPLE_CONTEXT
+ from .sample_context import _SAMPLE_CONTEXT
if not template_str:
return await send_test_notification(target)
@@ -71,53 +100,7 @@ async def send_test_template_notification(
except Exception as e:
return {"success": False, "error": f"Template render error: {e}"}
- try:
- if target.type == "telegram":
- return await _test_telegram_with_message(target, message)
- elif target.type == "webhook":
- return await _test_webhook_with_message(target, message)
- return {"success": False, "error": f"Unknown target type: {target.type}"}
- except Exception as e:
- _LOGGER.error("Test template notification failed: %s", e)
- return {"success": False, "error": str(e)}
-
-
-async def _test_telegram_with_message(target: NotificationTarget, message: str) -> dict:
- from notify_bridge_core.notifications.telegram.client import TelegramClient
- bot_token = target.config.get("bot_token")
- chat_id = target.config.get("chat_id")
- if not bot_token or not chat_id:
- return {"success": False, "error": "Missing bot_token or chat_id"}
- async with aiohttp.ClientSession() as session:
- client = TelegramClient(session, bot_token)
- return await client.send_notification(chat_id=str(chat_id), caption=message)
-
-
-async def _test_webhook_with_message(target: NotificationTarget, message: str) -> dict:
- from notify_bridge_core.notifications.webhook.client import WebhookClient
- url = target.config.get("url")
- headers = target.config.get("headers", {})
- if not url:
- return {"success": False, "error": "Missing url in target config"}
- async with aiohttp.ClientSession() as session:
- client = WebhookClient(session, url, headers)
- return await client.send({"message": message, "event_type": "test_template"})
-
-
-async def _test_webhook(target: NotificationTarget, locale: str = "en") -> dict:
- from notify_bridge_core.notifications.webhook.client import WebhookClient
-
- url = target.config.get("url")
- headers = target.config.get("headers", {})
- if not url:
- return {"success": False, "error": "Missing url in target config"}
-
- async with aiohttp.ClientSession() as session:
- client = WebhookClient(session, url, headers)
- return await client.send({
- "message": _get_test_message(locale, "webhook"),
- "event_type": "test",
- })
+ return await send_to_target(target, message)
async def send_real_data_notification(
@@ -129,20 +112,19 @@ async def send_real_data_notification(
collection_ids: list[str],
date_format: str = "%d.%m.%Y, %H:%M UTC",
date_only_format: str = "%d.%m.%Y",
+ memory_source: str = "albums",
) -> dict:
- """Fetch real data from provider, render template, and send notification."""
- from datetime import datetime, timezone
+ """Fetch real data from provider, render template, and send."""
from jinja2.sandbox import SandboxedEnvironment
if not template_str:
return {"success": False, "error": f"No template configured for {test_type}"}
- # Fetch real data from provider
- ctx: dict = {}
try:
ctx = await _build_real_context(
provider_type, provider_config, collection_ids,
test_type, date_format, date_only_format,
+ memory_source=memory_source,
)
except Exception as e:
_LOGGER.error("Failed to fetch real data for test: %s", e)
@@ -152,7 +134,6 @@ async def send_real_data_notification(
ctx["date_format"] = date_format
ctx["date_only_format"] = date_only_format
- # Render template
try:
env = SandboxedEnvironment(autoescape=False)
tmpl = env.from_string(template_str)
@@ -160,16 +141,7 @@ async def send_real_data_notification(
except Exception as e:
return {"success": False, "error": f"Template render error: {e}"}
- # Send
- try:
- if target.type == "telegram":
- return await _test_telegram_with_message(target, message)
- elif target.type == "webhook":
- return await _test_webhook_with_message(target, message)
- return {"success": False, "error": f"Unknown target type: {target.type}"}
- except Exception as e:
- _LOGGER.error("Test notification failed: %s", e)
- return {"success": False, "error": str(e)}
+ return await send_to_target(target, message)
async def _build_real_context(
@@ -179,6 +151,7 @@ async def _build_real_context(
test_type: str,
date_format: str,
date_only_format: str,
+ memory_source: str = "albums",
) -> dict:
"""Build template context from real provider data."""
from datetime import datetime, timezone
@@ -200,16 +173,77 @@ async def _build_real_context(
if not connected:
raise RuntimeError("Failed to connect to Immich")
- # Fetch album data for all tracked collections
- collections = []
- all_assets = []
+ ext_domain = provider_config.get("external_domain") or provider_config.get("url", "")
+
+ # --- Native Immich memories ---
+ if test_type == "memory" and memory_source == "native":
+ memories = await immich.client.get_memories()
+ all_assets: list[dict[str, Any]] = []
+ tracked_ids = set(collection_ids) if collection_ids else None
+ for mem in memories:
+ for raw_asset in mem.get("assets", []):
+ asset_id = raw_asset.get("id", "")
+ # Optional album filtering: keep only assets in tracked albums
+ if tracked_ids:
+ asset_albums = raw_asset.get("albums", [])
+ if not any(a.get("id") in tracked_ids for a in asset_albums):
+ continue
+ exif = raw_asset.get("exifInfo") or {}
+ people_raw = raw_asset.get("people", [])
+ all_assets.append({
+ "id": asset_id,
+ "filename": raw_asset.get("originalFileName", ""),
+ "type": (raw_asset.get("type") or "IMAGE").upper(),
+ "created_at": raw_asset.get("fileCreatedAt", raw_asset.get("createdAt", "")),
+ "owner": "",
+ "description": exif.get("description", "") or raw_asset.get("description", "") or "",
+ "people": [p.get("name", "") for p in people_raw if p.get("name")],
+ "is_favorite": raw_asset.get("isFavorite", False),
+ "rating": exif.get("rating"),
+ "city": exif.get("city", "") or "",
+ "state": exif.get("state", "") or "",
+ "country": exif.get("country", "") or "",
+ "public_url": "",
+ "url": f"{ext_domain.rstrip('/')}/api/assets/{asset_id}/original",
+ "photo_url": f"{ext_domain.rstrip('/')}/api/assets/{asset_id}/thumbnail",
+ "year": mem.get("data", {}).get("year"),
+ })
+
+ now = datetime.now(timezone.utc)
+ ctx: dict[str, Any] = {
+ "date": now.strftime(date_only_format),
+ "timestamp": now.isoformat(),
+ "service_name": "Immich",
+ "service_type": "immich",
+ "collections": [],
+ "albums": [],
+ "assets": all_assets,
+ "common_date": "",
+ "common_location": "",
+ "collection_name": "", "album_name": "",
+ "public_url": "", "album_url": "",
+ "shared": False, "photo_count": 0, "video_count": 0, "owner": "",
+ }
+ people: set[str] = set()
+ for a in all_assets:
+ people.update(a.get("people", []))
+ ctx["people"] = list(people)
+ ctx["has_videos"] = any(a.get("type") == "VIDEO" for a in all_assets)
+ ctx["has_photos"] = any(a.get("type") == "IMAGE" for a in all_assets)
+ ctx["added_count"] = len(all_assets)
+ ctx["added_assets"] = all_assets
+ ctx["protected_url"] = ""
+ return ctx
+
+ # --- Album-based asset collection (default path) ---
+ collections: list[dict[str, Any]] = []
+ all_assets: list[dict[str, Any]] = []
for album_id in collection_ids:
album = await immich.client.get_album(album_id)
if not album:
continue
- # Get shared link for public URL
shared_links = await immich.client.get_shared_links(album_id)
ext_domain = provider_config.get("external_domain") or provider_config.get("url", "")
album_public_url = ""
@@ -229,7 +263,6 @@ async def _build_real_context(
"owner": album.owner,
})
- # Collect assets (limited sample)
for asset_id, asset in list(album.assets.items())[:10]:
asset_public_url = f"{album_public_url}/photos/{asset_id}" if album_public_url else ""
all_assets.append({
@@ -250,60 +283,42 @@ async def _build_real_context(
"photo_url": f"{ext_domain.rstrip('/')}/api/assets/{asset.id}/thumbnail",
})
- # Build context based on test type
now = datetime.now(timezone.utc)
- ctx: dict = {
+ ctx: dict[str, Any] = {
"date": now.strftime(date_only_format),
"timestamp": now.isoformat(),
"service_name": "Immich",
"service_type": "immich",
"collections": collections,
- "albums": collections, # alias
+ "albums": collections,
"assets": all_assets,
+ "common_date": "",
+ "common_location": "",
}
- # Common date/location for assets
if len(all_assets) > 1:
- dates = set()
- for a in all_assets:
- ca = a.get("created_at", "")
- if ca:
- dates.add(ca[:10])
+ dates = {a.get("created_at", "")[:10] for a in all_assets if a.get("created_at")}
if len(dates) == 1:
try:
ctx["common_date"] = datetime.fromisoformat(dates.pop()).strftime(date_only_format)
except (ValueError, TypeError):
- ctx["common_date"] = ""
- else:
- ctx["common_date"] = ""
+ pass
locations = set()
for a in all_assets:
city = a.get("city", "")
country = a.get("country", "")
- if city:
- locations.add(f"{city}, {country}" if country else city)
- else:
- locations.add("")
+ locations.add(f"{city}, {country}" if city and country else city or "")
if len(locations) == 1 and "" not in locations:
ctx["common_location"] = locations.pop()
- else:
- ctx["common_location"] = ""
- else:
- ctx["common_date"] = ""
- ctx["common_location"] = ""
- # Add first collection details as top-level for periodic-style templates
if collections:
first = collections[0]
ctx.update({
- "collection_name": first["name"],
- "album_name": first["name"],
- "public_url": first.get("public_url", ""),
- "album_url": first.get("url", ""),
+ "collection_name": first["name"], "album_name": first["name"],
+ "public_url": first.get("public_url", ""), "album_url": first.get("url", ""),
"shared": first.get("shared", False),
- "photo_count": first.get("photo_count", 0),
- "video_count": first.get("video_count", 0),
+ "photo_count": first.get("photo_count", 0), "video_count": first.get("video_count", 0),
"owner": first.get("owner", ""),
})
else:
@@ -313,8 +328,7 @@ async def _build_real_context(
"shared": False, "photo_count": 0, "video_count": 0, "owner": "",
})
- # People across all assets
- people = set()
+ people: set[str] = set()
for a in all_assets:
people.update(a.get("people", []))
ctx["people"] = list(people)
diff --git a/packages/server/src/notify_bridge_server/services/sample_context.py b/packages/server/src/notify_bridge_server/services/sample_context.py
new file mode 100644
index 0000000..fa60c1f
--- /dev/null
+++ b/packages/server/src/notify_bridge_server/services/sample_context.py
@@ -0,0 +1,94 @@
+"""Sample template context for previews and test notifications."""
+
+# Sample asset matching what build_asset_detail() actually returns
+_SAMPLE_ASSET = {
+ "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
+ "filename": "IMG_001.jpg",
+ "type": "IMAGE",
+ "created_at": "2026-03-19T10:30:00",
+ "owner": "Alice",
+ "owner_id": "user-uuid-1",
+ "description": "Family picnic",
+ "people": ["Alice", "Bob"],
+ "is_favorite": True,
+ "rating": 5,
+ "latitude": 48.8566,
+ "longitude": 2.3522,
+ "city": "Paris",
+ "state": "Ile-de-France",
+ "country": "France",
+ "url": "https://immich.example.com/photos/abc123",
+ "public_url": "https://immich.example.com/share/abc123/photos/a1b2c3d4-e5f6-7890-abcd-ef1234567890",
+ "download_url": "https://immich.example.com/api/assets/abc123/original",
+ "photo_url": "https://immich.example.com/api/assets/abc123/thumbnail",
+}
+
+_SAMPLE_VIDEO_ASSET = {
+ **_SAMPLE_ASSET,
+ "id": "d4e5f6a7-b8c9-0123-defg-456789abcdef",
+ "filename": "VID_002.mp4",
+ "type": "VIDEO",
+ "is_favorite": False,
+ "rating": None,
+ "photo_url": None,
+ "public_url": "https://immich.example.com/share/abc123/photos/d4e5f6a7-b8c9-0123-defg-456789abcdef",
+ "playback_url": "https://immich.example.com/api/assets/def456/video",
+}
+
+_SAMPLE_COLLECTION = {
+ "name": "Family Photos",
+ "url": "https://immich.example.com/share/abc123",
+ "public_url": "https://immich.example.com/share/abc123",
+ "asset_count": 42,
+ "shared": True,
+}
+
+# Full context covering ALL possible template variables
+_SAMPLE_CONTEXT = {
+ # Core event fields (always present)
+ "collection_id": "b2eeeaa4-bba0-477a-a06f-5cb9e21818e8",
+ "collection_name": "Family Photos",
+ "collection_url": "https://immich.example.com/share/abc123",
+ "event_type": "assets_added",
+ "timestamp": "2026-03-19T10:30:00+00:00",
+ "service_name": "Immich",
+ "service_type": "immich",
+ # Immich aliases (always present alongside collection_*)
+ "album_name": "Family Photos",
+ "album_id": "b2eeeaa4-bba0-477a-a06f-5cb9e21818e8",
+ "old_album_name": "Old Album",
+ "new_album_name": "New Album",
+ "change_type": "assets_added",
+ "added_count": 3,
+ "removed_count": 1,
+ "added_assets": [_SAMPLE_ASSET, _SAMPLE_VIDEO_ASSET],
+ "removed_assets": ["asset-id-1", "asset-id-2"],
+ "people": ["Alice", "Bob"],
+ "shared": True,
+ "target_type": "telegram",
+ "has_videos": True,
+ "has_photos": True,
+ # Rename fields (always present, empty for non-rename events)
+ "old_name": "Old Album",
+ "new_name": "New Album",
+ "old_shared": False,
+ "new_shared": True,
+ # Public share URLs (may be empty if no shared link exists)
+ "public_url": "https://immich.example.com/share/abc123",
+ "protected_url": "",
+ "album_url": "https://immich.example.com/albums/b2eeeaa4",
+ # Common date/location (set when all assets share the same value)
+ "common_date": "19.03.2026",
+ "common_location": "Paris, France",
+ # Date format strings (from template config)
+ "date_format": "%d.%m.%Y, %H:%M UTC",
+ "date_only_format": "%d.%m.%Y",
+ # Scheduled/periodic variables (for those templates)
+ "collections": [_SAMPLE_COLLECTION, {**_SAMPLE_COLLECTION, "name": "Vacation 2025", "asset_count": 120}],
+ "albums": [_SAMPLE_COLLECTION, {**_SAMPLE_COLLECTION, "name": "Vacation 2025", "asset_count": 120}],
+ "assets": [_SAMPLE_ASSET, {**_SAMPLE_ASSET, "id": "x1y2z3", "filename": "IMG_002.jpg", "city": "London", "country": "UK", "public_url": "https://immich.example.com/share/abc123/photos/x1y2z3"}],
+ "date": "2026-03-19",
+ "photo_count": 30,
+ "video_count": 5,
+ "owner": "Alice",
+}
diff --git a/packages/server/src/notify_bridge_server/services/scheduler.py b/packages/server/src/notify_bridge_server/services/scheduler.py
index 0341df9..3656d87 100644
--- a/packages/server/src/notify_bridge_server/services/scheduler.py
+++ b/packages/server/src/notify_bridge_server/services/scheduler.py
@@ -26,6 +26,10 @@ async def start_scheduler() -> None:
await _load_tracker_jobs()
+ # Start Telegram bot polling for bots in polling mode
+ from .telegram_poller import start_bot_polling
+ await start_bot_polling()
+
async def _load_tracker_jobs() -> None:
"""Load enabled trackers and schedule polling jobs."""
diff --git a/packages/server/src/notify_bridge_server/services/telegram.py b/packages/server/src/notify_bridge_server/services/telegram.py
new file mode 100644
index 0000000..2943cf9
--- /dev/null
+++ b/packages/server/src/notify_bridge_server/services/telegram.py
@@ -0,0 +1,43 @@
+"""Telegram service utilities — chat persistence helpers."""
+
+from sqlmodel import select
+from sqlmodel.ext.asyncio.session import AsyncSession
+
+from ..database.models import TelegramChat
+
+
+async def save_chat_from_webhook(
+ session: AsyncSession, bot_id: int, chat_data: dict
+) -> None:
+ """Save or update a chat entry from an incoming webhook message.
+
+ Called by the webhook handler to auto-persist chats.
+ """
+ chat_id = str(chat_data.get("id", ""))
+ if not chat_id:
+ return
+
+ result = await session.exec(
+ select(TelegramChat).where(
+ TelegramChat.bot_id == bot_id,
+ TelegramChat.chat_id == chat_id,
+ )
+ )
+ existing = result.first()
+
+ title = chat_data.get("title") or (
+ chat_data.get("first_name", "") + (" " + chat_data.get("last_name", "")).strip()
+ )
+
+ if existing:
+ existing.title = title
+ existing.username = chat_data.get("username", existing.username)
+ session.add(existing)
+ else:
+ session.add(TelegramChat(
+ bot_id=bot_id,
+ chat_id=chat_id,
+ title=title,
+ chat_type=chat_data.get("type", "private"),
+ username=chat_data.get("username", ""),
+ ))
diff --git a/packages/server/src/notify_bridge_server/services/telegram_poller.py b/packages/server/src/notify_bridge_server/services/telegram_poller.py
new file mode 100644
index 0000000..9f8dba2
--- /dev/null
+++ b/packages/server/src/notify_bridge_server/services/telegram_poller.py
@@ -0,0 +1,158 @@
+"""Telegram long-polling service for bots in polling mode.
+
+Uses APScheduler to run getUpdates periodically for each bot
+with update_mode == "polling". Processes updates identically
+to the webhook handler (auto-save chat, dispatch commands).
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import Any
+
+import aiohttp
+from sqlmodel import select
+from sqlmodel.ext.asyncio.session import AsyncSession
+
+from notify_bridge_core.notifications.telegram.media import TELEGRAM_API_BASE_URL
+
+from ..database.engine import get_engine
+from ..database.models import TelegramBot
+from ..services.telegram import save_chat_from_webhook
+from .scheduler import get_scheduler
+
+_LOGGER = logging.getLogger(__name__)
+
+# Track last update_id per bot to use as offset
+_last_update_id: dict[int, int] = {}
+
+
+async def start_bot_polling() -> None:
+ """Schedule polling jobs for all bots with update_mode == 'polling'."""
+ engine = get_engine()
+ async with AsyncSession(engine) as session:
+ result = await session.exec(
+ select(TelegramBot).where(TelegramBot.update_mode == "polling")
+ )
+ bots = result.all()
+
+ for bot in bots:
+ schedule_bot_polling(bot.id)
+
+
+def schedule_bot_polling(bot_id: int) -> None:
+ """Add a polling job for a bot (idempotent)."""
+ scheduler = get_scheduler()
+ job_id = f"telegram_poll_{bot_id}"
+ if scheduler.get_job(job_id):
+ return
+ scheduler.add_job(
+ _poll_bot,
+ "interval",
+ seconds=3,
+ id=job_id,
+ args=[bot_id],
+ replace_existing=True,
+ max_instances=1,
+ )
+ _LOGGER.info("Started polling for bot %d", bot_id)
+
+
+def unschedule_bot_polling(bot_id: int) -> None:
+ """Remove polling job for a bot."""
+ scheduler = get_scheduler()
+ job_id = f"telegram_poll_{bot_id}"
+ if scheduler.get_job(job_id):
+ scheduler.remove_job(job_id)
+ _LOGGER.info("Stopped polling for bot %d", bot_id)
+
+
+async def _poll_bot(bot_id: int) -> None:
+ """Fetch updates from Telegram and process them."""
+ engine = get_engine()
+ async with AsyncSession(engine) as session:
+ bot = await session.get(TelegramBot, bot_id)
+ if not bot or bot.update_mode != "polling":
+ unschedule_bot_polling(bot_id)
+ return
+
+ offset = _last_update_id.get(bot_id, 0)
+ params: dict[str, Any] = {
+ "timeout": 0,
+ "limit": 50,
+ "allowed_updates": '["message"]',
+ }
+ if offset:
+ params["offset"] = offset + 1
+
+ try:
+ async with aiohttp.ClientSession() as http:
+ async with http.get(
+ f"{TELEGRAM_API_BASE_URL}{bot.token}/getUpdates",
+ params=params,
+ timeout=aiohttp.ClientTimeout(total=10),
+ ) as resp:
+ data = await resp.json()
+ if not data.get("ok"):
+ return
+ updates = data.get("result", [])
+ except Exception as e:
+ _LOGGER.debug("Polling error for bot %d: %s", bot_id, e)
+ return
+
+ if not updates:
+ return
+
+ # Update offset to latest
+ _last_update_id[bot_id] = updates[-1]["update_id"]
+
+ # Process each update
+ from ..commands.handler import handle_command, send_media_group
+
+ for update in updates:
+ message = update.get("message")
+ if not message:
+ continue
+
+ chat_info = message.get("chat", {})
+ chat_id = str(chat_info.get("id", ""))
+ text = message.get("text", "")
+
+ if not chat_id:
+ continue
+
+ # Auto-persist chat
+ try:
+ async with AsyncSession(engine) as save_session:
+ await save_chat_from_webhook(save_session, bot.id, chat_info)
+ await save_session.commit()
+ except Exception:
+ _LOGGER.debug("Failed to auto-save chat %s", chat_id, exc_info=True)
+
+ # Dispatch commands
+ if text and text.startswith("/"):
+ try:
+ cmd_response = await handle_command(bot, chat_id, text)
+ if cmd_response is not None:
+ if isinstance(cmd_response, list):
+ await send_media_group(bot.token, chat_id, cmd_response)
+ else:
+ await _send_reply(bot.token, chat_id, cmd_response)
+ except Exception:
+ _LOGGER.error("Error handling command from bot %d", bot_id, exc_info=True)
+
+
+async def _send_reply(bot_token: str, chat_id: str, text: str) -> None:
+ """Send a text reply via Telegram Bot API."""
+ async with aiohttp.ClientSession() as http:
+ url = f"{TELEGRAM_API_BASE_URL}{bot_token}/sendMessage"
+ payload: dict[str, Any] = {"chat_id": chat_id, "text": text, "parse_mode": "HTML"}
+ try:
+ async with http.post(url, json=payload) as resp:
+ if resp.status != 200:
+ result = await resp.json()
+ if "parse" in str(result.get("description", "")).lower():
+ payload.pop("parse_mode", None)
+ await http.post(url, json=payload)
+ except aiohttp.ClientError as err:
+ _LOGGER.error("Failed to send Telegram reply: %s", err)
diff --git a/packages/server/src/notify_bridge_server/services/watcher.py b/packages/server/src/notify_bridge_server/services/watcher.py
index 93ff47e..7ce806e 100644
--- a/packages/server/src/notify_bridge_server/services/watcher.py
+++ b/packages/server/src/notify_bridge_server/services/watcher.py
@@ -12,7 +12,8 @@ from sqlmodel.ext.asyncio.session import AsyncSession
from notify_bridge_core.models.events import ServiceEvent
from notify_bridge_core.notifications.dispatcher import NotificationDispatcher, TargetConfig
-from notify_bridge_core.providers.immich import ImmichServiceProvider
+from notify_bridge_core.notifications.telegram.cache import TelegramFileCache
+from notify_bridge_core.storage import JsonFileBackend
from ..database.engine import get_engine
from ..database.models import (
@@ -28,6 +29,29 @@ from ..database.models import (
_LOGGER = logging.getLogger(__name__)
+# Module-level Telegram file caches — shared across dispatches for reuse
+_url_cache: TelegramFileCache | None = None
+_asset_cache: TelegramFileCache | None = None
+
+
+async def _get_telegram_caches() -> tuple[TelegramFileCache | None, TelegramFileCache | None]:
+ """Lazily initialize shared Telegram file caches using NOTIFY_BRIDGE_DATA_DIR."""
+ global _url_cache, _asset_cache
+ if _url_cache is not None:
+ return _url_cache, _asset_cache
+ import os
+ from pathlib import Path
+ data_dir = os.environ.get("NOTIFY_BRIDGE_DATA_DIR")
+ if not data_dir:
+ return None, None
+ cache_dir = Path(data_dir) / "cache"
+ _url_cache = TelegramFileCache(JsonFileBackend(cache_dir / "telegram_url_cache.json"))
+ _asset_cache = TelegramFileCache(JsonFileBackend(cache_dir / "telegram_asset_cache.json"))
+ await _url_cache.async_load()
+ await _asset_cache.async_load()
+ _LOGGER.info("Initialized Telegram file caches in %s", cache_dir)
+ return _url_cache, _asset_cache
+
def _in_quiet_hours(start: str | None, end: str | None) -> bool:
"""Check if the current UTC time is within the quiet hours window."""
@@ -131,6 +155,7 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
new_state: dict[str, Any] = {}
if provider_type == "immich":
+ from notify_bridge_core.providers.immich import ImmichServiceProvider
async with aiohttp.ClientSession() as http_session:
immich = ImmichServiceProvider(
http_session,
@@ -208,7 +233,8 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
)
if events and link_data:
- dispatcher = NotificationDispatcher()
+ url_cache, asset_cache = await _get_telegram_caches()
+ dispatcher = NotificationDispatcher(url_cache=url_cache, asset_cache=asset_cache)
for event in events:
_LOGGER.info(
"Dispatching event %s for %s (added=%d removed=%d)",
@@ -239,7 +265,7 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
config=ld["target_config"],
template_slots=slots,
date_format=tmpl.date_format if tmpl else "%d.%m.%Y, %H:%M UTC",
- date_only_format=tmpl.date_only_format if tmpl and hasattr(tmpl, "date_only_format") else "%d.%m.%Y",
+ date_only_format=tmpl.date_only_format if tmpl and tmpl.date_only_format else "%d.%m.%Y",
provider_api_key=provider_config.get("api_key"),
provider_internal_url=provider_config.get("url", ""),
provider_external_url=provider_config.get("external_domain", ""),