fix(security,perf): harden restore, CSRF, token_version + perf pass
Security
- Sign pending_restore.json (SHA256 stored in AppSetting, verified on
startup apply) + refuse path outside data_dir, tighten to 0600.
- Require same-origin Origin/Referer on POST /api/backup/apply-restart —
Bearer-in-localStorage is CSRF-reachable from any XSS'd admin tab.
- Bump token_version on role/username change and admin password reset so
demoted admins lose admin in already-issued JWTs. Guard last-admin
TOCTOU via COUNT + post-commit re-check that rolls back a race.
- SSRF guard (validate_outbound_url) in ImmichClient.__init__ and the
external_domain setter — admin-mutable URLs were bypassing the check
that webhook/slack/discord paths already used. Dev restart script now
sets NOTIFY_BRIDGE_ALLOW_PRIVATE_URLS=1 so homelab Immich still works.
- Redact + cap Immich error bodies to ~120 chars before they flow into
ActionExecution.error / EventLog.details (both UI-visible).
- Deny-list sensitive keys (api_key / token / secret / password /
authorization / cookie / ...) in template-context merges so a rogue
template can't exfiltrate provider creds via {{ api_key }}.
- Cap user-controlled Immich search params (query ≤256, person_ids ≤50,
size ≤100) so a Telegram listener can't DoS upstream.
- Stream upload reads with running byte counter + content-length precheck
instead of buffering the full body and then rejecting.
- Log Telegram parse_mode fallbacks instead of swallowing silently;
template escape bugs now surface in server logs.
- Rollback partial imports on pending-restore failure (error recorded on
a fresh session).
Performance
- Fix N+1 in _refresh_telegram_chat_titles: single IN query instead of
session.get per chat.
- Parallelize album + shared-link fetches in test_dispatch (asyncio.gather)
and per-receiver Telegram test sends in notifier (semaphore 5).
- Early-exit collect_scheduled_assets(limit=0) so the periodic-summary
test path skips full per-album filter/sample (was O(album_assets)).
- Emit explicit CREATE INDEX IF NOT EXISTS for event_log user_id /
action_id / provider_id so the first boot after upgrade isn't left
unindexed for the dashboard query.
- Add AbortController timeout (120s) to fetchAuth so uploads/downloads
don't hang indefinitely.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,13 +1,15 @@
|
||||
"""Configuration backup/restore API (admin only)."""
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
from datetime import datetime, timezone
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, UploadFile, File, Query
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, UploadFile, File, Query
|
||||
from fastapi.responses import JSONResponse
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
@@ -28,6 +30,11 @@ PENDING_RESTORE_PATH_KEY = "pending_restore_path"
|
||||
PENDING_RESTORE_CONFLICT_KEY = "pending_restore_conflict_mode"
|
||||
PENDING_RESTORE_UPLOADED_AT_KEY = "pending_restore_uploaded_at"
|
||||
PENDING_RESTORE_UPLOADED_BY_KEY = "pending_restore_uploaded_by"
|
||||
# SHA256 of the staged pending_restore.json, written atomically with the file.
|
||||
# The startup hook refuses to apply if the on-disk file's hash does not match —
|
||||
# defends against anyone dropping a tampered file into data/ between prepare
|
||||
# and restart.
|
||||
PENDING_RESTORE_SHA256_KEY = "pending_restore_sha256"
|
||||
|
||||
|
||||
def _pending_restore_path():
|
||||
@@ -44,6 +51,69 @@ router = APIRouter(prefix="/api/backup", tags=["backup"])
|
||||
MAX_UPLOAD_SIZE = 10 * 1024 * 1024 # 10 MB
|
||||
|
||||
|
||||
async def _read_upload_bounded(file: UploadFile, max_bytes: int = MAX_UPLOAD_SIZE) -> bytes:
|
||||
"""Read an UploadFile into memory, failing fast if it exceeds ``max_bytes``.
|
||||
|
||||
Rejects on ``content_length`` header up-front when available; always
|
||||
stream-reads with a running byte counter so we never allocate more than
|
||||
the limit even when the header is missing or lies.
|
||||
"""
|
||||
# Fast path: reject on header before we allocate anything.
|
||||
cl = file.headers.get("content-length") if hasattr(file, "headers") else None
|
||||
if cl:
|
||||
try:
|
||||
if int(cl) > max_bytes:
|
||||
raise HTTPException(status_code=400, detail="File too large (max 10 MB)")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
chunks: list[bytes] = []
|
||||
total = 0
|
||||
while True:
|
||||
chunk = await file.read(64 * 1024)
|
||||
if not chunk:
|
||||
break
|
||||
total += len(chunk)
|
||||
if total > max_bytes:
|
||||
raise HTTPException(status_code=400, detail="File too large (max 10 MB)")
|
||||
chunks.append(chunk)
|
||||
return b"".join(chunks)
|
||||
|
||||
|
||||
def _check_same_origin(request: Request) -> None:
|
||||
"""Reject cross-origin admin-write POSTs (CSRF defense).
|
||||
|
||||
Bearer tokens in ``localStorage`` plus cookie-less CORS mean a malicious
|
||||
page cannot technically submit our Authorization header from a victim's
|
||||
session, BUT browser extensions and misconfigured CORS policies routinely
|
||||
break this assumption. For endpoints whose blast radius is restart/RCE-
|
||||
equivalent (restore apply), we additionally require the request to come
|
||||
from our own origin.
|
||||
"""
|
||||
host = request.headers.get("host", "").lower()
|
||||
if not host:
|
||||
raise HTTPException(status_code=400, detail="Missing Host header")
|
||||
|
||||
def _host_of(u: str | None) -> str:
|
||||
if not u:
|
||||
return ""
|
||||
try:
|
||||
return (urlparse(u).netloc or "").lower()
|
||||
except Exception: # noqa: BLE001
|
||||
return ""
|
||||
|
||||
origin_host = _host_of(request.headers.get("origin"))
|
||||
referer_host = _host_of(request.headers.get("referer"))
|
||||
# At least one of Origin/Referer must be present and match Host.
|
||||
# Legitimate browser requests to this endpoint always ship Origin.
|
||||
same = (origin_host and origin_host == host) or (referer_host and referer_host == host)
|
||||
if not same:
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail="Cross-origin request rejected",
|
||||
)
|
||||
|
||||
|
||||
def _backup_dir():
|
||||
return app_config.data_dir / "backups"
|
||||
|
||||
@@ -104,9 +174,7 @@ async def validate_config(
|
||||
user: User = Depends(require_admin),
|
||||
):
|
||||
"""Validate a backup file without importing."""
|
||||
content = await file.read()
|
||||
if len(content) > MAX_UPLOAD_SIZE:
|
||||
raise HTTPException(status_code=400, detail="File too large (max 10 MB)")
|
||||
content = await _read_upload_bounded(file)
|
||||
|
||||
try:
|
||||
raw = json.loads(content)
|
||||
@@ -129,9 +197,7 @@ async def import_config(
|
||||
session: AsyncSession = Depends(get_session),
|
||||
):
|
||||
"""Import configuration from a backup file."""
|
||||
content = await file.read()
|
||||
if len(content) > MAX_UPLOAD_SIZE:
|
||||
raise HTTPException(status_code=400, detail="File too large (max 10 MB)")
|
||||
content = await _read_upload_bounded(file)
|
||||
|
||||
try:
|
||||
raw = json.loads(content)
|
||||
@@ -167,6 +233,7 @@ async def _clear_pending_restore_markers(session: AsyncSession) -> None:
|
||||
PENDING_RESTORE_CONFLICT_KEY,
|
||||
PENDING_RESTORE_UPLOADED_AT_KEY,
|
||||
PENDING_RESTORE_UPLOADED_BY_KEY,
|
||||
PENDING_RESTORE_SHA256_KEY,
|
||||
):
|
||||
row = await session.get(AppSetting, key)
|
||||
if row:
|
||||
@@ -185,9 +252,7 @@ async def prepare_restore(
|
||||
Validates the uploaded file, writes it to ``data/pending_restore.json``,
|
||||
and persists marker settings so startup will apply it atomically.
|
||||
"""
|
||||
content = await file.read()
|
||||
if len(content) > MAX_UPLOAD_SIZE:
|
||||
raise HTTPException(status_code=400, detail="File too large (max 10 MB)")
|
||||
content = await _read_upload_bounded(file)
|
||||
|
||||
try:
|
||||
raw = json.loads(content)
|
||||
@@ -205,15 +270,25 @@ async def prepare_restore(
|
||||
pending_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
# Atomic write: write to tmp then rename, so a crash mid-write never
|
||||
# leaves a truncated pending_restore.json that would break startup apply.
|
||||
payload = json.dumps(raw).encode("utf-8")
|
||||
digest = hashlib.sha256(payload).hexdigest()
|
||||
tmp_path = pending_path.with_suffix(pending_path.suffix + ".tmp")
|
||||
tmp_path.write_text(json.dumps(raw), encoding="utf-8")
|
||||
tmp_path.write_bytes(payload)
|
||||
os.replace(tmp_path, pending_path)
|
||||
# Best-effort tighten perms so a non-root local user cannot swap the file
|
||||
# for one they control between prepare and restart. On Windows this is a
|
||||
# no-op; on POSIX we restrict to owner-only rw.
|
||||
try:
|
||||
os.chmod(pending_path, 0o600)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
now_iso = datetime.now(timezone.utc).isoformat()
|
||||
await _set_app_setting(session, PENDING_RESTORE_PATH_KEY, str(pending_path))
|
||||
await _set_app_setting(session, PENDING_RESTORE_CONFLICT_KEY, conflict_mode.value)
|
||||
await _set_app_setting(session, PENDING_RESTORE_UPLOADED_AT_KEY, now_iso)
|
||||
await _set_app_setting(session, PENDING_RESTORE_UPLOADED_BY_KEY, user.username)
|
||||
await _set_app_setting(session, PENDING_RESTORE_SHA256_KEY, digest)
|
||||
await session.commit()
|
||||
|
||||
return {
|
||||
@@ -292,6 +367,7 @@ def _is_supervised() -> bool:
|
||||
|
||||
@router.post("/apply-restart")
|
||||
async def apply_and_restart(
|
||||
request: Request,
|
||||
background_tasks: BackgroundTasks,
|
||||
user: User = Depends(require_admin),
|
||||
session: AsyncSession = Depends(get_session),
|
||||
@@ -299,7 +375,11 @@ async def apply_and_restart(
|
||||
"""Trigger a graceful exit so the supervisor respawns and applies the pending restore.
|
||||
|
||||
Only allowed when a pending restore is staged AND the process is supervised.
|
||||
Requires same-origin Origin/Referer — this endpoint's blast radius is a
|
||||
full config replace + restart, so an admin token alone (vulnerable to
|
||||
XSS-driven CSRF) is not enough.
|
||||
"""
|
||||
_check_same_origin(request)
|
||||
path_row = await session.get(AppSetting, PENDING_RESTORE_PATH_KEY)
|
||||
if not path_row or not path_row.value:
|
||||
raise HTTPException(status_code=409, detail="No pending restore to apply")
|
||||
|
||||
@@ -4,6 +4,7 @@ import logging
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import func
|
||||
from sqlmodel import select
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
@@ -81,6 +82,12 @@ async def update_user(
|
||||
if not user:
|
||||
raise HTTPException(status_code=404, detail="User not found")
|
||||
|
||||
# Track whether the identity that JWTs encode has changed. Any such change
|
||||
# must bump ``token_version`` so already-issued tokens are rejected — a
|
||||
# user demoted admin→user must not keep admin in their cached JWT until
|
||||
# expiry, and a rename should invalidate prior sessions too.
|
||||
identity_changed = False
|
||||
|
||||
if body.username is not None and body.username != user.username:
|
||||
new_username = body.username.strip()
|
||||
if not new_username:
|
||||
@@ -89,21 +96,51 @@ async def update_user(
|
||||
if dup.first():
|
||||
raise HTTPException(status_code=409, detail="Username already exists")
|
||||
user.username = new_username
|
||||
identity_changed = True
|
||||
|
||||
if body.role is not None and body.role != user.role:
|
||||
if body.role not in ("admin", "user"):
|
||||
raise HTTPException(status_code=400, detail="Invalid role")
|
||||
# Prevent demoting the last admin
|
||||
# Prevent demoting the last admin. Done via a COUNT to avoid loading
|
||||
# every admin row; more importantly, re-checked *after* the role
|
||||
# change is staged (TOCTOU guard — two concurrent demotes can each
|
||||
# see admin_count=2 and both proceed, dropping to 0).
|
||||
if user.role == "admin" and body.role != "admin":
|
||||
admins = (await session.exec(
|
||||
select(User).where(User.role == "admin")
|
||||
)).all()
|
||||
if len(admins) <= 1:
|
||||
admin_count = (await session.exec(
|
||||
select(func.count(User.id)).where(User.role == "admin")
|
||||
)).one()
|
||||
if isinstance(admin_count, tuple):
|
||||
admin_count = admin_count[0]
|
||||
if (admin_count or 0) <= 1:
|
||||
raise HTTPException(status_code=400, detail="Cannot demote the last admin")
|
||||
user.role = body.role
|
||||
identity_changed = True
|
||||
|
||||
if identity_changed:
|
||||
user.token_version = (user.token_version or 1) + 1
|
||||
|
||||
session.add(user)
|
||||
await session.commit()
|
||||
try:
|
||||
await session.commit()
|
||||
except Exception:
|
||||
await session.rollback()
|
||||
raise
|
||||
|
||||
# Final defense against admin-count race: if we just demoted the last admin
|
||||
# due to a concurrent demote landing between our check and commit, undo.
|
||||
if body.role is not None and body.role != "admin":
|
||||
admin_count_after = (await session.exec(
|
||||
select(func.count(User.id)).where(User.role == "admin")
|
||||
)).one()
|
||||
if isinstance(admin_count_after, tuple):
|
||||
admin_count_after = admin_count_after[0]
|
||||
if (admin_count_after or 0) < 1:
|
||||
# Roll the user back to admin and re-commit.
|
||||
user.role = "admin"
|
||||
session.add(user)
|
||||
await session.commit()
|
||||
raise HTTPException(status_code=409, detail="Refused: would remove the last admin")
|
||||
|
||||
await session.refresh(user)
|
||||
return {"id": user.id, "username": user.username, "role": user.role}
|
||||
|
||||
@@ -126,6 +163,9 @@ async def reset_user_password(
|
||||
if len(body.new_password) < 8:
|
||||
raise HTTPException(status_code=400, detail="Password must be at least 8 characters")
|
||||
user.hashed_password = bcrypt.hashpw(body.new_password.encode(), bcrypt.gensalt()).decode()
|
||||
# Invalidate all prior JWTs issued for this user — matches the self-serve
|
||||
# password-change path in auth/routes.py.
|
||||
user.token_version = (user.token_version or 1) + 1
|
||||
session.add(user)
|
||||
await session.commit()
|
||||
return {"success": True}
|
||||
|
||||
@@ -92,6 +92,21 @@ async def migrate_schema(engine: AsyncEngine) -> None:
|
||||
await conn.execute(text(sql))
|
||||
logger.info("Added %s column to event_log table", col)
|
||||
|
||||
# Explicit indexes on the dashboard-query columns. SQLModel's
|
||||
# ``index=True`` is emitted by ``create_all`` on *new* installs,
|
||||
# but ALTER TABLE ADD COLUMN doesn't create them on upgrades —
|
||||
# so the first boot after upgrade would leave these unindexed
|
||||
# and status.py ``WHERE user_id=...`` would table-scan. The
|
||||
# indexes are redundant-but-safe once create_all also runs.
|
||||
for idx_name, col in [
|
||||
("ix_event_log_user_id", "user_id"),
|
||||
("ix_event_log_action_id", "action_id"),
|
||||
("ix_event_log_provider_id", "provider_id"),
|
||||
]:
|
||||
await conn.execute(
|
||||
text(f"CREATE INDEX IF NOT EXISTS {idx_name} ON event_log ({col})")
|
||||
)
|
||||
|
||||
# Backfill user_id from notification_tracker for legacy rows.
|
||||
# Safe to run repeatedly: only touches rows where user_id is still NULL.
|
||||
await conn.execute(text("""
|
||||
@@ -250,6 +265,21 @@ async def migrate_schema(engine: AsyncEngine) -> None:
|
||||
)
|
||||
logger.info("Added track_webhook_received column to tracking_config table")
|
||||
|
||||
# Add quiet hours to tracking_config if missing.
|
||||
# Start/end are nullable HH:MM strings; quiet_hours_enabled gates them.
|
||||
if await _has_table(conn, "tracking_config"):
|
||||
if not await _has_column(conn, "tracking_config", "quiet_hours_enabled"):
|
||||
await conn.execute(
|
||||
text("ALTER TABLE tracking_config ADD COLUMN quiet_hours_enabled INTEGER DEFAULT 0")
|
||||
)
|
||||
logger.info("Added quiet_hours_enabled column to tracking_config table")
|
||||
for col_name in ("quiet_hours_start", "quiet_hours_end"):
|
||||
if not await _has_column(conn, "tracking_config", col_name):
|
||||
await conn.execute(
|
||||
text(f"ALTER TABLE tracking_config ADD COLUMN {col_name} TEXT")
|
||||
)
|
||||
logger.info("Added %s column to tracking_config table", col_name)
|
||||
|
||||
# Drop legacy template content columns from template_config
|
||||
# (template content moved to template_slot child rows)
|
||||
if await _has_table(conn, "template_config"):
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Notification sender — unified send logic for all paths (dispatch + test)."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
@@ -11,6 +12,10 @@ from ..database.models import NotificationTarget, TargetReceiver
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
# Cap on concurrent per-receiver test sends. Keeps us under Telegram's per-bot
|
||||
# rate limit (~30 msg/s) while still saving ~N×RTT on multi-chat broadcasts.
|
||||
_TEST_SEND_CONCURRENCY = 5
|
||||
|
||||
_TEST_MESSAGES: dict[str, dict[str, str]] = {
|
||||
"en": {
|
||||
"telegram": "\u2705 Test message from <b>Notify Bridge</b>",
|
||||
@@ -358,19 +363,29 @@ async def _send_telegram_test_per_receiver(
|
||||
|
||||
http = await get_http_session()
|
||||
client = TelegramClient(http, bot_token)
|
||||
results: list[dict] = []
|
||||
for r in recv_rows:
|
||||
|
||||
# Parallelize per-receiver sends with a small semaphore — broadcast to
|
||||
# N chats now takes ~ceil(N / concurrency) × RTT instead of N × RTT,
|
||||
# matching the dispatcher's bounded-concurrency pattern. Capped below
|
||||
# Telegram's rate limit so we don't trigger 429s on large fleets.
|
||||
sem = asyncio.Semaphore(_TEST_SEND_CONCURRENCY)
|
||||
|
||||
async def _send_one(r: TargetReceiver) -> dict | None:
|
||||
chat_id = str(r.config.get("chat_id", ""))
|
||||
if not chat_id:
|
||||
continue
|
||||
return None
|
||||
explicit = getattr(r, "locale", "") or ""
|
||||
locale = explicit or chat_locale_map.get(chat_id) or default_locale
|
||||
message = _get_test_message(locale[:2].lower(), "telegram")
|
||||
results.append(await client.send_message(
|
||||
chat_id=chat_id,
|
||||
text=message,
|
||||
disable_web_page_preview=bool(disable_preview),
|
||||
))
|
||||
async with sem:
|
||||
return await client.send_message(
|
||||
chat_id=chat_id,
|
||||
text=message,
|
||||
disable_web_page_preview=bool(disable_preview),
|
||||
)
|
||||
|
||||
raw = await asyncio.gather(*(_send_one(r) for r in recv_rows))
|
||||
results = [r for r in raw if r is not None]
|
||||
return _aggregate(results)
|
||||
|
||||
|
||||
|
||||
@@ -10,10 +10,19 @@ If the apply fails, the pending file is kept so the operator can inspect it
|
||||
and markers are updated to record the last error. On success, the staged file
|
||||
is archived under data/applied_restores/<timestamp>.json and markers are
|
||||
cleared.
|
||||
|
||||
Integrity checks on startup:
|
||||
- The on-disk file's SHA256 must match ``PENDING_RESTORE_SHA256_KEY``
|
||||
(written atomically with the staged file). Protects against tampering
|
||||
between prepare and restart.
|
||||
- The pending path must resolve *inside* ``app_config.data_dir``. Protects
|
||||
against a rogue AppSetting pointing at an arbitrary file.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import shutil
|
||||
@@ -24,11 +33,13 @@ from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
from ..api.backup import (
|
||||
PENDING_RESTORE_CONFLICT_KEY,
|
||||
PENDING_RESTORE_PATH_KEY,
|
||||
PENDING_RESTORE_SHA256_KEY,
|
||||
PENDING_RESTORE_UPLOADED_AT_KEY,
|
||||
PENDING_RESTORE_UPLOADED_BY_KEY,
|
||||
_applied_restores_dir,
|
||||
_pending_restore_path,
|
||||
)
|
||||
from ..config import settings as app_config
|
||||
from ..database.engine import get_engine
|
||||
from ..database.models import AppSetting
|
||||
from .backup_schema import BackupFile, ConflictMode
|
||||
@@ -49,6 +60,23 @@ async def apply_pending_restore_if_any() -> None:
|
||||
return
|
||||
|
||||
pending_path = _pending_restore_path()
|
||||
|
||||
# Defensive: ensure the hard-coded path still lives inside data_dir.
|
||||
# If future refactors let this be read from AppSetting, this check
|
||||
# blocks arbitrary-file reads.
|
||||
try:
|
||||
resolved = pending_path.resolve()
|
||||
data_root = app_config.data_dir.resolve()
|
||||
resolved.relative_to(data_root)
|
||||
except (ValueError, OSError):
|
||||
_LOGGER.error(
|
||||
"Pending-restore path %s is outside data_dir %s — refusing to apply",
|
||||
pending_path, app_config.data_dir,
|
||||
)
|
||||
await _record_error(session, "Pending path outside data_dir")
|
||||
await session.commit()
|
||||
return
|
||||
|
||||
if not pending_path.exists():
|
||||
_LOGGER.warning(
|
||||
"Pending-restore marker present but file missing at %s — clearing marker",
|
||||
@@ -62,9 +90,42 @@ async def apply_pending_restore_if_any() -> None:
|
||||
conflict_mode = ConflictMode(conflict_row.value) if conflict_row and conflict_row.value else ConflictMode.SKIP
|
||||
uploaded_by_row = await session.get(AppSetting, PENDING_RESTORE_UPLOADED_BY_KEY)
|
||||
uploaded_by = uploaded_by_row.value if uploaded_by_row else "admin"
|
||||
sha_row = await session.get(AppSetting, PENDING_RESTORE_SHA256_KEY)
|
||||
expected_sha = (sha_row.value or "").strip().lower() if sha_row else ""
|
||||
|
||||
try:
|
||||
raw = json.loads(pending_path.read_text(encoding="utf-8"))
|
||||
raw_bytes = await asyncio.to_thread(pending_path.read_bytes)
|
||||
except OSError as err:
|
||||
_LOGGER.exception("Pending-restore file unreadable")
|
||||
await _record_error(session, f"Unreadable backup: {err}")
|
||||
await session.commit()
|
||||
return
|
||||
|
||||
# Integrity: reject unless hash matches what prepare-restore stored.
|
||||
# An attacker with write access to data/ (swapped file, bind-mount
|
||||
# abuse) does not also have write access to the DB.
|
||||
if not expected_sha:
|
||||
_LOGGER.error("Pending-restore marker has no SHA256; refusing to apply")
|
||||
await _record_error(session, "Missing integrity marker")
|
||||
await session.commit()
|
||||
return
|
||||
actual_sha = hashlib.sha256(raw_bytes).hexdigest()
|
||||
if actual_sha != expected_sha:
|
||||
_LOGGER.error(
|
||||
"Pending-restore SHA256 mismatch (expected %s, got %s) — refusing to apply",
|
||||
expected_sha, actual_sha,
|
||||
)
|
||||
await _record_error(
|
||||
session,
|
||||
"Integrity check failed: on-disk backup SHA256 does not match the hash "
|
||||
"recorded at prepare time. File may have been tampered with; cancel and "
|
||||
"re-upload.",
|
||||
)
|
||||
await session.commit()
|
||||
return
|
||||
|
||||
try:
|
||||
raw = json.loads(raw_bytes.decode("utf-8"))
|
||||
backup = BackupFile.model_validate(raw)
|
||||
except Exception as err: # noqa: BLE001
|
||||
_LOGGER.exception("Pending-restore file unreadable")
|
||||
@@ -88,8 +149,14 @@ async def apply_pending_restore_if_any() -> None:
|
||||
result = await import_backup(session, admin_row.id, backup, conflict_mode)
|
||||
except Exception as err: # noqa: BLE001
|
||||
_LOGGER.exception("Pending-restore apply failed")
|
||||
await _record_error(session, str(err))
|
||||
await session.commit()
|
||||
# Discard any partial inserts the importer made before raising —
|
||||
# committing partial state would let a crafted failing backup
|
||||
# selectively mutate entities. The error-record commit below
|
||||
# happens on a *fresh* session.
|
||||
await session.rollback()
|
||||
async with AsyncSession(engine) as fresh:
|
||||
await _record_error(fresh, str(err))
|
||||
await fresh.commit()
|
||||
return
|
||||
|
||||
# Archive the file
|
||||
@@ -136,6 +203,7 @@ async def _clear_markers(session: AsyncSession) -> None:
|
||||
PENDING_RESTORE_CONFLICT_KEY,
|
||||
PENDING_RESTORE_UPLOADED_AT_KEY,
|
||||
PENDING_RESTORE_UPLOADED_BY_KEY,
|
||||
PENDING_RESTORE_SHA256_KEY,
|
||||
):
|
||||
row = await session.get(AppSetting, key)
|
||||
if row:
|
||||
|
||||
@@ -166,30 +166,41 @@ async def _refresh_telegram_chat_titles() -> None:
|
||||
|
||||
refreshed = 0
|
||||
errors = 0
|
||||
# Bucket results first, then fetch all rows in one IN-query instead of
|
||||
# per-row ``session.get`` — otherwise a 50-chat fleet issues 50 extra
|
||||
# SELECTs before commit.
|
||||
successes: dict[int, dict] = {}
|
||||
for chat_id, info, err in results:
|
||||
if err is not None or info is None:
|
||||
errors += 1
|
||||
if err:
|
||||
_LOGGER.debug("getChat failed for chat row %s: %s", chat_id, err)
|
||||
continue
|
||||
if chat_id is not None:
|
||||
successes[chat_id] = info
|
||||
async with AsyncSession(engine) as session:
|
||||
for chat_id, info, err in results:
|
||||
if err is not None or info is None:
|
||||
errors += 1
|
||||
if err:
|
||||
_LOGGER.debug("getChat failed for chat row %s: %s", chat_id, err)
|
||||
continue
|
||||
merged = await session.get(TelegramChat, chat_id)
|
||||
if not merged:
|
||||
continue
|
||||
title = info.get("title") or (
|
||||
(info.get("first_name", "") + " " + info.get("last_name", "")).strip()
|
||||
)
|
||||
changed = False
|
||||
if title and merged.title != title:
|
||||
merged.title = title
|
||||
changed = True
|
||||
new_username = info.get("username")
|
||||
if new_username is not None and merged.username != new_username:
|
||||
merged.username = new_username
|
||||
changed = True
|
||||
if changed:
|
||||
session.add(merged)
|
||||
refreshed += 1
|
||||
if successes:
|
||||
rows = (await session.exec(
|
||||
select(TelegramChat).where(TelegramChat.id.in_(list(successes.keys())))
|
||||
)).all()
|
||||
for merged in rows:
|
||||
info = successes.get(merged.id)
|
||||
if not info:
|
||||
continue
|
||||
title = info.get("title") or (
|
||||
(info.get("first_name", "") + " " + info.get("last_name", "")).strip()
|
||||
)
|
||||
changed = False
|
||||
if title and merged.title != title:
|
||||
merged.title = title
|
||||
changed = True
|
||||
new_username = info.get("username")
|
||||
if new_username is not None and merged.username != new_username:
|
||||
merged.username = new_username
|
||||
changed = True
|
||||
if changed:
|
||||
session.add(merged)
|
||||
refreshed += 1
|
||||
await session.commit()
|
||||
_LOGGER.info(
|
||||
"Telegram chat title refresh: %s updated, %s errors", refreshed, errors
|
||||
|
||||
@@ -250,14 +250,23 @@ async def _build_immich_event(
|
||||
collection_ids, limit, asset_type, favorite_only, min_rating,
|
||||
)
|
||||
|
||||
# Album-based path: use shared collect_scheduled_assets
|
||||
# Album-based path: use shared collect_scheduled_assets.
|
||||
# Fetch albums + shared links in parallel — on a 20-album tracker the old
|
||||
# serial ``await`` loop took ~2 × 20 × RTT, now it's one round-trip.
|
||||
import asyncio as _asyncio
|
||||
album_tasks = [immich.client.get_album(aid) for aid in collection_ids]
|
||||
link_tasks = [immich.client.get_shared_links(aid) for aid in collection_ids]
|
||||
album_results, link_results = await _asyncio.gather(
|
||||
_asyncio.gather(*album_tasks, return_exceptions=True),
|
||||
_asyncio.gather(*link_tasks, return_exceptions=True),
|
||||
)
|
||||
albums: dict[str, ImmichAlbumData] = {}
|
||||
shared_links: dict[str, list[SharedLinkInfo]] = {}
|
||||
for album_id in collection_ids:
|
||||
album = await immich.client.get_album(album_id)
|
||||
if album:
|
||||
albums[album_id] = album
|
||||
shared_links[album_id] = await immich.client.get_shared_links(album_id)
|
||||
for album_id, album, links in zip(collection_ids, album_results, link_results):
|
||||
if isinstance(album, Exception) or album is None:
|
||||
continue
|
||||
albums[album_id] = album
|
||||
shared_links[album_id] = links if not isinstance(links, Exception) else []
|
||||
|
||||
assets, collections_extra = collect_scheduled_assets(
|
||||
albums, shared_links, ext_domain,
|
||||
@@ -320,13 +329,21 @@ async def _build_immich_periodic_event(
|
||||
|
||||
ext_domain = provider_config.get("external_domain") or provider_config.get("url", "")
|
||||
|
||||
# Parallel fetch — see _build_immich_event above for the same rationale.
|
||||
import asyncio as _asyncio
|
||||
album_tasks = [immich.client.get_album(aid) for aid in collection_ids]
|
||||
link_tasks = [immich.client.get_shared_links(aid) for aid in collection_ids]
|
||||
album_results, link_results = await _asyncio.gather(
|
||||
_asyncio.gather(*album_tasks, return_exceptions=True),
|
||||
_asyncio.gather(*link_tasks, return_exceptions=True),
|
||||
)
|
||||
albums: dict[str, ImmichAlbumData] = {}
|
||||
shared_links: dict[str, list[SharedLinkInfo]] = {}
|
||||
for album_id in collection_ids:
|
||||
album = await immich.client.get_album(album_id)
|
||||
if album:
|
||||
albums[album_id] = album
|
||||
shared_links[album_id] = await immich.client.get_shared_links(album_id)
|
||||
for album_id, album, links in zip(collection_ids, album_results, link_results):
|
||||
if isinstance(album, Exception) or album is None:
|
||||
continue
|
||||
albums[album_id] = album
|
||||
shared_links[album_id] = links if not isinstance(links, Exception) else []
|
||||
|
||||
# limit=0 → returns ([], collections_extra) with full per-album stats.
|
||||
_assets, collections_extra = collect_scheduled_assets(
|
||||
|
||||
Reference in New Issue
Block a user