feat: large polish pass — UX fixes, per-chat scope, restore/backup, action events
Backend
- Per-chat album scope for Immich commands (search/latest/memory/...): new
allowed_album_ids on CommandTrackerListener, threaded listener/page kwargs
through ProviderCommandHandler.handle; PATCH listener-scope endpoint.
- /search and /find accept a trailing page number; Immich client search_smart
/ search_metadata take a page param.
- Immich person-asset lookup switched from removed GET /api/people/{id}/assets
to POST /api/search/metadata with personIds (fixes /person command and
auto_organize rules silently returning zero candidates on Immich 1.106+).
- Auto_organize rule now sets the target album's thumbnail to the first added
image when missing (falls back to any asset type); failures do not fail the
rule. add_assets_to_album surfaces the Immich error body on non-2xx.
- EventLog.user_id / action_id / action_name columns with defensive migration
+ backfill. Status query filters by user_id directly; Immich/webhook paths
emit user_id explicitly. action_runner writes an action_success/partial/
failed event on each non-dry-run.
- Dashboard DELETE /api/status/events (scoped to user_id) + rendering live
tracker/provider/action names via FK join with snapshot fallback.
- PATCH /api/users/{id} for username/role change with last-admin guard.
- Deletion protection returns structured {message, entity, blocked_by}
(ApiError carries .blockedBy; frontend opens BlockedByModal).
- Backup prepare-restore → AppSetting markers + atomic write of
pending_restore.json; lifespan hook applies on next startup and archives
under data/applied_restores/. apply-restart sends SIGTERM so the lifespan
shutdown runs; NOTIFY_BRIDGE_SUPERVISED env override gates the button.
Manual POST /api/backup/files (same format as scheduled).
- New periodic-summary test path reuses shared collect_scheduled_assets
(limit=0) so test and future production code go through one primitive.
- Per-receiver locale for Telegram test messages (resolves
TelegramChat.language_override per chat instead of applying the first
receiver's locale to everyone).
- Bounded concurrency (semaphores) in NotificationDispatcher._preload_asset_data
and _refresh_telegram_chat_titles; chat title sweep extended to 24h since
save_chat_from_webhook covers active chats opportunistically.
- Telegram poller detects the \"webhook is active\" 409 and auto-calls
deleteWebhook for bots whose DB update_mode is polling (throttled per bot).
- TelegramClient.get_chat added (CLAUDE.md rule 6); set_album_thumbnail added.
- Seeds: rename \"Default Commands\" → \"Default Immich Commands\";
track_assets_removed default False.
Frontend
- Global provider selector visible when there is only one provider.
- Clear-events button + i18n + ConfirmModal on the dashboard; new icons/
labels/filters/colors for action_success / action_partial / action_failed.
- Auto-select first available tracking/template/command/config + bot on
create forms (trackers, command-trackers, targets, template/command
configs).
- Telegram target disable_url_preview defaults to true.
- BlockedByModal wired into 8 deletion flows; fetchAuth helper for
multipart/binary calls (reuses api()'s refresh + ApiError mapping).
- Immich tracker 'Checking links' parallelised (concurrency cap 6).
- Backup page: pending-restore banner + Apply-now / Apply-later modal,
restarting overlay polling /api/health, manual 'Create backup' button.
- Command-trackers listener row gets an 'Edit album scope' modal with
inherit/explicit multiselect.
- Users page: Edit user modal (username + role).
- parseDate helper for consistent UTC date rendering.
Migrations / schema
- event_log: + user_id, action_id, action_name (+ backfill user_id from
notification_tracker).
- command_tracker_listener: + allowed_album_ids.
This commit is contained in:
@@ -16,6 +16,7 @@ from ..database.models import (
|
||||
Action,
|
||||
ActionExecution,
|
||||
ActionRule,
|
||||
EventLog,
|
||||
ServiceProvider,
|
||||
)
|
||||
|
||||
@@ -115,13 +116,42 @@ async def run_action(
|
||||
execution.error = action_result.error or ""
|
||||
session.add(execution)
|
||||
|
||||
# Update action last_run metadata (skip for dry runs)
|
||||
# Update action last_run metadata + emit a dashboard EventLog row
|
||||
# (skip both for dry runs — dashboards should not count previews).
|
||||
if not is_dry_run:
|
||||
action = await session.get(Action, action_id)
|
||||
if action:
|
||||
action.last_run_at = datetime.now(timezone.utc)
|
||||
action.last_run_status = execution.status if execution else ""
|
||||
session.add(action)
|
||||
provider = await session.get(ServiceProvider, action.provider_id)
|
||||
status_str = execution.status if execution else "success"
|
||||
event_type = f"action_{status_str}" # action_success|partial|failed
|
||||
session.add(EventLog(
|
||||
user_id=action.user_id,
|
||||
tracker_id=None,
|
||||
tracker_name="",
|
||||
action_id=action.id,
|
||||
action_name=action.name,
|
||||
provider_id=provider.id if provider else None,
|
||||
provider_name=(provider.name if provider else "") or "",
|
||||
event_type=event_type,
|
||||
collection_id=str(action.id),
|
||||
# ``collection_name`` is what the dashboard row shows as the
|
||||
# event subject; use the action name so the row is readable
|
||||
# without a separate action_name renderer.
|
||||
collection_name=action.name,
|
||||
assets_count=action_result.total_items_affected,
|
||||
details={
|
||||
"action_type": action.action_type,
|
||||
"trigger": trigger,
|
||||
"rules_processed": action_result.rules_processed,
|
||||
"rules_succeeded": action_result.rules_succeeded,
|
||||
"rules_failed": action_result.rules_failed,
|
||||
"error": action_result.error or "",
|
||||
"execution_id": execution_id,
|
||||
},
|
||||
))
|
||||
|
||||
await session.commit()
|
||||
|
||||
|
||||
@@ -298,13 +298,82 @@ async def send_to_receiver(target: NotificationTarget, receiver_config: dict, me
|
||||
|
||||
|
||||
async def send_test_notification(target: NotificationTarget, locale: str = "en") -> dict:
|
||||
"""Send a simple test message. For broadcast targets, fans out to all children."""
|
||||
"""Send a simple test message. For broadcast targets, fans out to all children.
|
||||
|
||||
For Telegram targets, per-receiver locale (TargetReceiver.locale or
|
||||
TelegramChat.language_override/language_code) is resolved individually so
|
||||
each chat receives the message in its own configured language.
|
||||
"""
|
||||
if target.type == "broadcast":
|
||||
return await _send_broadcast_test(target, locale)
|
||||
if target.type == "telegram":
|
||||
return await _send_telegram_test_per_receiver(target, default_locale=locale)
|
||||
message = _get_test_message(locale, target.type)
|
||||
return await send_to_target(target, message)
|
||||
|
||||
|
||||
async def _send_telegram_test_per_receiver(
|
||||
target: NotificationTarget, default_locale: str = "en",
|
||||
) -> dict:
|
||||
"""Send a test message to each Telegram receiver in its own resolved locale."""
|
||||
from notify_bridge_core.notifications.telegram.client import TelegramClient
|
||||
|
||||
from ..database.models import TargetReceiver, TelegramChat
|
||||
from .http_session import get_http_session
|
||||
|
||||
bot_token = target.config.get("bot_token")
|
||||
bot_id = target.config.get("bot_id")
|
||||
disable_preview = target.config.get("disable_url_preview", False)
|
||||
if not bot_token:
|
||||
return {"success": False, "error": "Missing bot_token"}
|
||||
|
||||
engine = get_engine()
|
||||
async with AsyncSession(engine) as session:
|
||||
recv_rows = (await session.exec(
|
||||
select(TargetReceiver).where(
|
||||
TargetReceiver.target_id == target.id,
|
||||
TargetReceiver.enabled == True,
|
||||
)
|
||||
)).all()
|
||||
if not recv_rows:
|
||||
return {"success": False, "error": "No receivers configured"}
|
||||
|
||||
# Resolve per-receiver locale
|
||||
chat_ids = [str(r.config.get("chat_id", "")) for r in recv_rows if r.config.get("chat_id")]
|
||||
chat_locale_map: dict[str, str] = {}
|
||||
if bot_id and chat_ids:
|
||||
chat_rows = (await session.exec(
|
||||
select(TelegramChat).where(
|
||||
TelegramChat.bot_id == bot_id,
|
||||
TelegramChat.chat_id.in_(chat_ids),
|
||||
)
|
||||
)).all()
|
||||
for chat in chat_rows:
|
||||
override = (
|
||||
getattr(chat, "language_override", "") or
|
||||
getattr(chat, "language_code", "") or ""
|
||||
)
|
||||
if override:
|
||||
chat_locale_map[chat.chat_id] = override[:2].lower()
|
||||
|
||||
http = await get_http_session()
|
||||
client = TelegramClient(http, bot_token)
|
||||
results: list[dict] = []
|
||||
for r in recv_rows:
|
||||
chat_id = str(r.config.get("chat_id", ""))
|
||||
if not chat_id:
|
||||
continue
|
||||
explicit = getattr(r, "locale", "") or ""
|
||||
locale = explicit or chat_locale_map.get(chat_id) or default_locale
|
||||
message = _get_test_message(locale[:2].lower(), "telegram")
|
||||
results.append(await client.send_message(
|
||||
chat_id=chat_id,
|
||||
text=message,
|
||||
disable_web_page_preview=bool(disable_preview),
|
||||
))
|
||||
return _aggregate(results)
|
||||
|
||||
|
||||
async def _send_broadcast_test(target: NotificationTarget, locale: str) -> dict:
|
||||
"""Send test notifications to all child targets of a broadcast target."""
|
||||
child_ids = target.config.get("child_target_ids", [])
|
||||
|
||||
@@ -0,0 +1,162 @@
|
||||
"""Startup hook that applies a pending restore prepared via the backup API.
|
||||
|
||||
When an admin uploads a backup via /api/backup/prepare-restore, the file is
|
||||
staged at data/pending_restore.json and marker rows are written to AppSetting.
|
||||
This module is invoked during app startup (after migrations + seeds) to
|
||||
atomically apply that pending restore — if present — before the server begins
|
||||
serving requests.
|
||||
|
||||
If the apply fails, the pending file is kept so the operator can inspect it
|
||||
and markers are updated to record the last error. On success, the staged file
|
||||
is archived under data/applied_restores/<timestamp>.json and markers are
|
||||
cleared.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import shutil
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
from ..api.backup import (
|
||||
PENDING_RESTORE_CONFLICT_KEY,
|
||||
PENDING_RESTORE_PATH_KEY,
|
||||
PENDING_RESTORE_UPLOADED_AT_KEY,
|
||||
PENDING_RESTORE_UPLOADED_BY_KEY,
|
||||
_applied_restores_dir,
|
||||
_pending_restore_path,
|
||||
)
|
||||
from ..database.engine import get_engine
|
||||
from ..database.models import AppSetting
|
||||
from .backup_schema import BackupFile, ConflictMode
|
||||
from .backup_service import import_backup
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
PENDING_RESTORE_LAST_ERROR_KEY = "pending_restore_last_error"
|
||||
PENDING_RESTORE_LAST_APPLIED_KEY = "pending_restore_last_applied"
|
||||
|
||||
|
||||
async def apply_pending_restore_if_any() -> None:
|
||||
"""Apply a staged restore if one exists. Idempotent and safe to call at startup."""
|
||||
engine = get_engine()
|
||||
async with AsyncSession(engine) as session:
|
||||
path_row = await session.get(AppSetting, PENDING_RESTORE_PATH_KEY)
|
||||
if not path_row or not path_row.value:
|
||||
return
|
||||
|
||||
pending_path = _pending_restore_path()
|
||||
if not pending_path.exists():
|
||||
_LOGGER.warning(
|
||||
"Pending-restore marker present but file missing at %s — clearing marker",
|
||||
pending_path,
|
||||
)
|
||||
await _clear_markers(session)
|
||||
await session.commit()
|
||||
return
|
||||
|
||||
conflict_row = await session.get(AppSetting, PENDING_RESTORE_CONFLICT_KEY)
|
||||
conflict_mode = ConflictMode(conflict_row.value) if conflict_row and conflict_row.value else ConflictMode.SKIP
|
||||
uploaded_by_row = await session.get(AppSetting, PENDING_RESTORE_UPLOADED_BY_KEY)
|
||||
uploaded_by = uploaded_by_row.value if uploaded_by_row else "admin"
|
||||
|
||||
try:
|
||||
raw = json.loads(pending_path.read_text(encoding="utf-8"))
|
||||
backup = BackupFile.model_validate(raw)
|
||||
except Exception as err: # noqa: BLE001
|
||||
_LOGGER.exception("Pending-restore file unreadable")
|
||||
await _record_error(session, f"Unreadable backup: {err}")
|
||||
await session.commit()
|
||||
return
|
||||
|
||||
# Resolve the target user: first admin (restore is cross-user).
|
||||
# The backup carries its own user_id per-record, so this is mostly
|
||||
# used for provenance.
|
||||
from sqlmodel import select
|
||||
from ..database.models import User
|
||||
admin_row = (await session.exec(select(User).where(User.role == "admin"))).first()
|
||||
if not admin_row:
|
||||
_LOGGER.error("No admin user found; refusing to apply pending restore")
|
||||
await _record_error(session, "No admin user available to own the restore")
|
||||
await session.commit()
|
||||
return
|
||||
|
||||
try:
|
||||
result = await import_backup(session, admin_row.id, backup, conflict_mode)
|
||||
except Exception as err: # noqa: BLE001
|
||||
_LOGGER.exception("Pending-restore apply failed")
|
||||
await _record_error(session, str(err))
|
||||
await session.commit()
|
||||
return
|
||||
|
||||
# Archive the file
|
||||
archive_dir = _applied_restores_dir()
|
||||
archive_dir.mkdir(parents=True, exist_ok=True)
|
||||
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S")
|
||||
archived_name = f"applied-{ts}.json"
|
||||
try:
|
||||
shutil.move(str(pending_path), str(archive_dir / archived_name))
|
||||
except Exception as err: # noqa: BLE001
|
||||
_LOGGER.warning("Could not archive applied restore file: %s", err)
|
||||
# Still consider the apply a success; just best-effort cleanup
|
||||
try:
|
||||
pending_path.unlink()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
await _clear_markers(session)
|
||||
applied_summary = {
|
||||
"applied_at": datetime.now(timezone.utc).isoformat(),
|
||||
"uploaded_by": uploaded_by,
|
||||
"archived_file": archived_name,
|
||||
"stats": result.model_dump() if hasattr(result, "model_dump") else {},
|
||||
}
|
||||
await _set_setting(
|
||||
session,
|
||||
PENDING_RESTORE_LAST_APPLIED_KEY,
|
||||
json.dumps(applied_summary, default=str),
|
||||
)
|
||||
# Clear any prior error marker.
|
||||
err_row = await session.get(AppSetting, PENDING_RESTORE_LAST_ERROR_KEY)
|
||||
if err_row:
|
||||
await session.delete(err_row)
|
||||
await session.commit()
|
||||
_LOGGER.info(
|
||||
"Applied pending restore (uploaded by %s): %s",
|
||||
uploaded_by, applied_summary["stats"],
|
||||
)
|
||||
|
||||
|
||||
async def _clear_markers(session: AsyncSession) -> None:
|
||||
for key in (
|
||||
PENDING_RESTORE_PATH_KEY,
|
||||
PENDING_RESTORE_CONFLICT_KEY,
|
||||
PENDING_RESTORE_UPLOADED_AT_KEY,
|
||||
PENDING_RESTORE_UPLOADED_BY_KEY,
|
||||
):
|
||||
row = await session.get(AppSetting, key)
|
||||
if row:
|
||||
await session.delete(row)
|
||||
|
||||
|
||||
async def _record_error(session: AsyncSession, message: str) -> None:
|
||||
await _set_setting(
|
||||
session,
|
||||
PENDING_RESTORE_LAST_ERROR_KEY,
|
||||
json.dumps({
|
||||
"at": datetime.now(timezone.utc).isoformat(),
|
||||
"message": message[:2048],
|
||||
}),
|
||||
)
|
||||
|
||||
|
||||
async def _set_setting(session: AsyncSession, key: str, value: str) -> None:
|
||||
row = await session.get(AppSetting, key)
|
||||
if row:
|
||||
row.value = value
|
||||
else:
|
||||
row = AppSetting(key=key, value=value)
|
||||
session.add(row)
|
||||
@@ -29,7 +29,8 @@ _SAMPLE_ASSET = {
|
||||
"public_url": "https://immich.example.com/share/abc123/photos/a1b2c3d4-e5f6-7890-abcd-ef1234567890",
|
||||
"download_url": "https://immich.example.com/api/assets/abc123/original",
|
||||
"photo_url": "https://immich.example.com/api/assets/abc123/thumbnail",
|
||||
"file_size": 3_500_000, # 3.5 MB
|
||||
"file_size": 3_500_000, # 3.5 MB — original asset bytes
|
||||
"playback_size": None, # photos are sent as-is, no transcoded variant
|
||||
"oversized": False,
|
||||
}
|
||||
|
||||
@@ -43,7 +44,8 @@ _SAMPLE_VIDEO_ASSET = {
|
||||
"photo_url": None,
|
||||
"public_url": "https://immich.example.com/share/abc123/photos/d4e5f6a7-b8c9-0123-defg-456789abcdef",
|
||||
"playback_url": "https://immich.example.com/api/assets/def456/video",
|
||||
"file_size": 75_000_000, # 75 MB — exceeds Telegram's 50 MB limit
|
||||
"file_size": 180_000_000, # 180 MB — original HEVC
|
||||
"playback_size": 62_000_000, # 62 MB transcoded — exceeds Telegram's 50 MB limit
|
||||
"oversized": True,
|
||||
}
|
||||
|
||||
|
||||
@@ -34,6 +34,9 @@ async def start_scheduler() -> None:
|
||||
# Schedule daily cleanup of old event log entries
|
||||
_schedule_event_cleanup()
|
||||
|
||||
# Schedule periodic Telegram chat title refresh
|
||||
_schedule_telegram_chat_sync()
|
||||
|
||||
# Start debounced command auto-sync scheduler
|
||||
from .command_sync import start_sync_scheduler
|
||||
start_sync_scheduler()
|
||||
@@ -60,6 +63,139 @@ def _schedule_event_cleanup() -> None:
|
||||
_LOGGER.info("Scheduled daily event log cleanup at 03:00 UTC")
|
||||
|
||||
|
||||
# Chat-title refresh tuning.
|
||||
# Sweep runs daily as a fallback — we additionally refresh opportunistically
|
||||
# on every incoming webhook/long-poll update (``save_chat_from_webhook``), so
|
||||
# the sweep only catches chats that haven't sent anything recently.
|
||||
_CHAT_SYNC_INTERVAL_HOURS = 24
|
||||
_CHAT_SYNC_INITIAL_DELAY_SECONDS = 60
|
||||
_CHAT_SYNC_CONCURRENCY = 10
|
||||
|
||||
|
||||
def _schedule_telegram_chat_sync() -> None:
|
||||
"""Schedule periodic refresh of Telegram chat titles via getChat."""
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
|
||||
scheduler = get_scheduler()
|
||||
job_id = "refresh_telegram_chat_titles"
|
||||
if scheduler.get_job(job_id):
|
||||
return
|
||||
scheduler.add_job(
|
||||
_refresh_telegram_chat_titles,
|
||||
IntervalTrigger(hours=_CHAT_SYNC_INTERVAL_HOURS),
|
||||
id=job_id,
|
||||
replace_existing=True,
|
||||
max_instances=1,
|
||||
next_run_time=None,
|
||||
)
|
||||
# Fire once shortly after startup so stale names refresh without waiting a day.
|
||||
from datetime import datetime, timedelta, timezone
|
||||
scheduler.add_job(
|
||||
_refresh_telegram_chat_titles,
|
||||
"date",
|
||||
run_date=datetime.now(timezone.utc) + timedelta(seconds=_CHAT_SYNC_INITIAL_DELAY_SECONDS),
|
||||
id="refresh_telegram_chat_titles_once",
|
||||
replace_existing=True,
|
||||
max_instances=1,
|
||||
)
|
||||
_LOGGER.info(
|
||||
"Scheduled Telegram chat title refresh every %sh (concurrency %s)",
|
||||
_CHAT_SYNC_INTERVAL_HOURS, _CHAT_SYNC_CONCURRENCY,
|
||||
)
|
||||
|
||||
|
||||
async def _refresh_telegram_chat_titles() -> None:
|
||||
"""Refresh TelegramChat.title/username via getChat for all known chats.
|
||||
|
||||
Runs requests in bounded parallel (``_CHAT_SYNC_CONCURRENCY``) so a fleet
|
||||
of 50 chats finishes in ~5 round-trips instead of 50. Telegram's
|
||||
``getChat`` rate limit is well above 10 concurrent per bot, and the cap is
|
||||
global across bots so we never flood the shared HTTP session.
|
||||
"""
|
||||
import asyncio
|
||||
from collections import defaultdict
|
||||
|
||||
from sqlmodel import select
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
from notify_bridge_core.notifications.telegram.client import TelegramClient
|
||||
|
||||
from ..database.engine import get_engine
|
||||
from ..database.models import TelegramBot, TelegramChat
|
||||
from .http_session import get_http_session
|
||||
|
||||
engine = get_engine()
|
||||
async with AsyncSession(engine) as session:
|
||||
bots = (await session.exec(select(TelegramBot))).all()
|
||||
bot_tokens = {b.id: b.token for b in bots if b.token}
|
||||
if not bot_tokens:
|
||||
return
|
||||
chats = (await session.exec(select(TelegramChat))).all()
|
||||
|
||||
by_bot: dict[int, list[TelegramChat]] = defaultdict(list)
|
||||
for chat in chats:
|
||||
if chat.bot_id in bot_tokens:
|
||||
by_bot[chat.bot_id].append(chat)
|
||||
if not by_bot:
|
||||
return
|
||||
|
||||
http = await get_http_session()
|
||||
clients_by_bot = {
|
||||
bot_id: TelegramClient(http, token) for bot_id, token in bot_tokens.items()
|
||||
}
|
||||
|
||||
sem = asyncio.Semaphore(_CHAT_SYNC_CONCURRENCY)
|
||||
|
||||
async def _fetch(bot_id: int, chat: TelegramChat) -> tuple[int, dict | None, str | None]:
|
||||
"""Return (chat_row_id, info_dict_or_None, error_message_or_None)."""
|
||||
async with sem:
|
||||
try:
|
||||
res = await clients_by_bot[bot_id].get_chat(chat.chat_id)
|
||||
except Exception as err: # noqa: BLE001
|
||||
return chat.id, None, str(err)
|
||||
if not res.get("success"):
|
||||
return chat.id, None, res.get("error") or "unknown"
|
||||
return chat.id, (res.get("result") or {}), None
|
||||
|
||||
tasks = [
|
||||
_fetch(bot_id, chat)
|
||||
for bot_id, bot_chats in by_bot.items()
|
||||
for chat in bot_chats
|
||||
]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
refreshed = 0
|
||||
errors = 0
|
||||
async with AsyncSession(engine) as session:
|
||||
for chat_id, info, err in results:
|
||||
if err is not None or info is None:
|
||||
errors += 1
|
||||
if err:
|
||||
_LOGGER.debug("getChat failed for chat row %s: %s", chat_id, err)
|
||||
continue
|
||||
merged = await session.get(TelegramChat, chat_id)
|
||||
if not merged:
|
||||
continue
|
||||
title = info.get("title") or (
|
||||
(info.get("first_name", "") + " " + info.get("last_name", "")).strip()
|
||||
)
|
||||
changed = False
|
||||
if title and merged.title != title:
|
||||
merged.title = title
|
||||
changed = True
|
||||
new_username = info.get("username")
|
||||
if new_username is not None and merged.username != new_username:
|
||||
merged.username = new_username
|
||||
changed = True
|
||||
if changed:
|
||||
session.add(merged)
|
||||
refreshed += 1
|
||||
await session.commit()
|
||||
_LOGGER.info(
|
||||
"Telegram chat title refresh: %s updated, %s errors", refreshed, errors
|
||||
)
|
||||
|
||||
|
||||
async def _cleanup_old_events() -> None:
|
||||
"""Delete EventLog entries older than 90 days."""
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
@@ -28,6 +28,16 @@ _LOGGER = logging.getLogger(__name__)
|
||||
# Track last update_id per bot to use as offset
|
||||
_last_update_id: dict[int, int] = {}
|
||||
|
||||
# Throttle auto-reclaim attempts so we don't hammer deleteWebhook when a
|
||||
# stubborn external instance keeps re-setting the webhook. (bot_id → unix ts)
|
||||
_last_webhook_reclaim_at: dict[int, float] = {}
|
||||
_WEBHOOK_RECLAIM_COOLDOWN_SECONDS = 60.0
|
||||
|
||||
# Phrase Telegram uses in the 409 response description for the
|
||||
# "webhook is active" conflict. Matched case-insensitively so we don't
|
||||
# depend on exact wording.
|
||||
_WEBHOOK_CONFLICT_PHRASE = "webhook is active"
|
||||
|
||||
|
||||
async def _get_bot_ids_with_active_listeners() -> set[int]:
|
||||
"""Return bot IDs that have at least one active command tracker listener.
|
||||
@@ -141,6 +151,64 @@ def unschedule_bot_polling(bot_id: int) -> None:
|
||||
_LOGGER.info("Stopped polling for bot %d", bot_id)
|
||||
|
||||
|
||||
async def _handle_webhook_conflict(bot_id: int, bot_token: str, description: str) -> None:
|
||||
"""Reclaim a bot stuck behind an active webhook set by another instance.
|
||||
|
||||
Telegram's ``getUpdates`` returns 409 ``Conflict: can't use getUpdates
|
||||
method while webhook is active`` whenever a webhook is currently
|
||||
registered for the bot. Since this bot row has ``update_mode="polling"``
|
||||
in our DB (that's the only reason we're polling it), the user's intent
|
||||
is polling, so we drop the webhook and resume. Throttled to once per
|
||||
minute per bot so a rival instance constantly re-registering the
|
||||
webhook doesn't trigger a reclaim storm.
|
||||
"""
|
||||
import time
|
||||
now = time.time()
|
||||
last = _last_webhook_reclaim_at.get(bot_id, 0.0)
|
||||
if now - last < _WEBHOOK_RECLAIM_COOLDOWN_SECONDS:
|
||||
# Already logged recently; stay quiet until cooldown expires so the
|
||||
# user gets one clear warning line per minute, not one every 3s.
|
||||
return
|
||||
_last_webhook_reclaim_at[bot_id] = now
|
||||
|
||||
from .http_session import get_http_session
|
||||
http = await get_http_session()
|
||||
client = TelegramClient(http, bot_token)
|
||||
|
||||
# Surface which URL stole the bot so the user can tell where it came from.
|
||||
conflicting_url = ""
|
||||
try:
|
||||
info = await client.get_webhook_info()
|
||||
if info.get("success"):
|
||||
conflicting_url = info.get("result", {}).get("url", "") or ""
|
||||
except Exception as err: # noqa: BLE001
|
||||
_LOGGER.debug("getWebhookInfo during conflict recovery failed: %s", err)
|
||||
|
||||
_LOGGER.warning(
|
||||
"Bot %d: webhook is active (url=%r) but this instance is in polling "
|
||||
"mode — calling deleteWebhook to reclaim. Telegram said: %s",
|
||||
bot_id, conflicting_url, description,
|
||||
)
|
||||
|
||||
try:
|
||||
del_result = await client.delete_webhook()
|
||||
if del_result.get("success"):
|
||||
_LOGGER.warning(
|
||||
"Bot %d: webhook cleared; polling will resume on next tick",
|
||||
bot_id,
|
||||
)
|
||||
# Reset offset so we don't skip updates that accumulated during the
|
||||
# conflict window (Telegram held them until a client acknowledged).
|
||||
_last_update_id.pop(bot_id, None)
|
||||
else:
|
||||
_LOGGER.error(
|
||||
"Bot %d: deleteWebhook failed: %s",
|
||||
bot_id, del_result.get("error"),
|
||||
)
|
||||
except Exception as err: # noqa: BLE001
|
||||
_LOGGER.error("Bot %d: deleteWebhook raised: %s", bot_id, err)
|
||||
|
||||
|
||||
async def _poll_bot(bot_id: int) -> None:
|
||||
"""Fetch updates from Telegram and process them."""
|
||||
engine = get_engine()
|
||||
@@ -167,6 +235,15 @@ async def _poll_bot(bot_id: int) -> None:
|
||||
offset=offset + 1 if offset else None, limit=50,
|
||||
)
|
||||
if not result.get("success"):
|
||||
err_text = str(result.get("error") or "")
|
||||
# Detect the webhook-is-active conflict: another instance (or a
|
||||
# stale registration) owns this bot's delivery, so getUpdates
|
||||
# returns 409 and we get zero updates forever. Reclaim it —
|
||||
# but only for bots the user explicitly set to polling mode.
|
||||
if _WEBHOOK_CONFLICT_PHRASE in err_text.lower():
|
||||
await _handle_webhook_conflict(bot_id, bot_token, err_text)
|
||||
else:
|
||||
_LOGGER.debug("Polling error for bot %d: %s", bot_id, err_text)
|
||||
return
|
||||
updates = result.get("result", [])
|
||||
except Exception as e:
|
||||
|
||||
@@ -79,7 +79,8 @@ async def dispatch_test_notification(
|
||||
if locale_map:
|
||||
template_slots = {EventType.SCHEDULED_MESSAGE.value: locale_map}
|
||||
|
||||
# Resolve target config + receivers (same as watcher)
|
||||
# Resolve target config + receivers (same as watcher — this already sets
|
||||
# each receiver.locale from TargetReceiver.locale or TelegramChat override)
|
||||
resolved = await _resolve_target(session, target)
|
||||
|
||||
target_cfg = TargetConfig(
|
||||
@@ -95,21 +96,47 @@ async def dispatch_test_notification(
|
||||
receivers=resolved["receivers"],
|
||||
)
|
||||
|
||||
if not template_slots:
|
||||
return {
|
||||
"success": False,
|
||||
"error": (
|
||||
f"No '{slot_name}' template defined for this target's template config "
|
||||
f"(locale: {locale}). Add the slot under Template Configs."
|
||||
),
|
||||
}
|
||||
|
||||
# Fetch assets and build event
|
||||
event = await _build_event(
|
||||
provider_type=provider.type,
|
||||
provider_config=provider_config,
|
||||
provider_name=provider.name or provider.type,
|
||||
tracker_name=tracker.name or "",
|
||||
tracker_filters=dict(tracker.filters) if tracker.filters else {},
|
||||
collection_ids=collection_ids,
|
||||
test_type=test_type,
|
||||
tracking_config=tracking_config,
|
||||
)
|
||||
try:
|
||||
event = await _build_event(
|
||||
provider_type=provider.type,
|
||||
provider_config=provider_config,
|
||||
provider_name=provider.name or provider.type,
|
||||
tracker_name=tracker.name or "",
|
||||
tracker_filters=dict(tracker.filters) if tracker.filters else {},
|
||||
collection_ids=collection_ids,
|
||||
test_type=test_type,
|
||||
tracking_config=tracking_config,
|
||||
)
|
||||
except Exception as err: # noqa: BLE001
|
||||
_LOGGER.exception("Test dispatch event build failed")
|
||||
return {"success": False, "error": f"Provider connection failed: {err}"}
|
||||
if event is None:
|
||||
return {"success": False, "error": "No data returned from provider"}
|
||||
return {
|
||||
"success": False,
|
||||
"error": (
|
||||
"Provider returned no data. Check that the provider is reachable, "
|
||||
"credentials are valid, and the tracker has collections configured."
|
||||
),
|
||||
}
|
||||
# Periodic summary only needs album stats (extra.albums), not assets — skip the asset check.
|
||||
if not event.added_assets and test_type in ("scheduled", "memory"):
|
||||
return {"success": False, "error": "No matching assets found" + (" for today" if test_type == "memory" else "")}
|
||||
return {
|
||||
"success": False,
|
||||
"error": (
|
||||
"No matching assets found. Verify the tracker's albums contain assets "
|
||||
"that pass the tracking config filters (favorites only, rating, asset type)."
|
||||
) + (" for today" if test_type == "memory" else ""),
|
||||
}
|
||||
|
||||
# Dispatch through the real NotificationDispatcher
|
||||
url_cache, asset_cache = await _get_telegram_caches()
|
||||
@@ -136,6 +163,13 @@ async def _build_event(
|
||||
from datetime import datetime, timezone
|
||||
|
||||
if provider_type == "immich":
|
||||
if test_type == "periodic":
|
||||
return await _build_immich_periodic_event(
|
||||
provider_config=provider_config,
|
||||
provider_name=provider_name,
|
||||
tracker_name=tracker_name,
|
||||
collection_ids=collection_ids,
|
||||
)
|
||||
return await _build_immich_event(
|
||||
provider_config=provider_config,
|
||||
provider_name=provider_name,
|
||||
@@ -237,6 +271,76 @@ async def _build_immich_event(
|
||||
)
|
||||
|
||||
|
||||
async def _build_immich_periodic_event(
|
||||
*,
|
||||
provider_config: dict,
|
||||
provider_name: str,
|
||||
tracker_name: str,
|
||||
collection_ids: list[str],
|
||||
) -> ServiceEvent | None:
|
||||
"""Build a periodic-summary event (album stats only, no assets).
|
||||
|
||||
Reuses the same shared core utility (`collect_scheduled_assets`) that
|
||||
scheduled/memory tests use, invoked with limit=0 so we get the full
|
||||
``collections_extra`` block (album name/url/counts/...) without selecting
|
||||
any individual assets — which is exactly what the
|
||||
``periodic_summary_message`` template renders.
|
||||
"""
|
||||
from datetime import datetime, timezone
|
||||
from notify_bridge_core.providers.immich import ImmichServiceProvider
|
||||
from notify_bridge_core.providers.immich.asset_utils import collect_scheduled_assets
|
||||
from notify_bridge_core.providers.immich.models import ImmichAlbumData, SharedLinkInfo
|
||||
|
||||
from .http_session import get_http_session
|
||||
http_session = await get_http_session()
|
||||
immich = ImmichServiceProvider(
|
||||
http_session,
|
||||
provider_config.get("url", ""),
|
||||
provider_config.get("api_key", ""),
|
||||
provider_config.get("external_domain"),
|
||||
provider_name,
|
||||
)
|
||||
if not await immich.connect():
|
||||
return None
|
||||
|
||||
ext_domain = provider_config.get("external_domain") or provider_config.get("url", "")
|
||||
|
||||
albums: dict[str, ImmichAlbumData] = {}
|
||||
shared_links: dict[str, list[SharedLinkInfo]] = {}
|
||||
for album_id in collection_ids:
|
||||
album = await immich.client.get_album(album_id)
|
||||
if album:
|
||||
albums[album_id] = album
|
||||
shared_links[album_id] = await immich.client.get_shared_links(album_id)
|
||||
|
||||
# limit=0 → returns ([], collections_extra) with full per-album stats.
|
||||
_assets, collections_extra = collect_scheduled_assets(
|
||||
albums, shared_links, ext_domain,
|
||||
limit=0,
|
||||
asset_type="all",
|
||||
favorite_only=False,
|
||||
min_rating=0,
|
||||
is_memory=False,
|
||||
)
|
||||
|
||||
first_col = collections_extra[0] if collections_extra else {}
|
||||
return ServiceEvent(
|
||||
event_type=EventType.SCHEDULED_MESSAGE,
|
||||
provider_type=ServiceProviderType.IMMICH,
|
||||
provider_name=provider_name,
|
||||
collection_id=collection_ids[0] if collection_ids else "",
|
||||
collection_name=first_col.get("name", tracker_name),
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
added_assets=[],
|
||||
added_count=0,
|
||||
extra={
|
||||
"collections": collections_extra,
|
||||
"albums": collections_extra,
|
||||
**(first_col if first_col else {}),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
async def _build_native_memory_event(
|
||||
immich,
|
||||
ext_domain: str,
|
||||
|
||||
@@ -191,6 +191,7 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
|
||||
for event in events:
|
||||
assets_count = event.added_count or event.removed_count or 0
|
||||
log = EventLog(
|
||||
user_id=tracker.user_id,
|
||||
tracker_id=tracker_id,
|
||||
tracker_name=tracker.name,
|
||||
provider_id=provider.id,
|
||||
|
||||
Reference in New Issue
Block a user