7cbb02b1ef
Take a consistent, atomic copy of the DB at lifespan startup BEFORE migrations run, so a botched future upgrade is recoverable by restoring a single file instead of a data-loss incident. Uses SQLite's VACUUM INTO — safe under WAL, cannot tear against concurrent writes. Best-effort: failures are logged, never raised — the main DB remains the source of truth. Configurable via NOTIFY_BRIDGE_PRE_MIGRATE_SNAPSHOT_KEEP (default 5; 0 disables). Snapshots land in ``data_dir/backups/pre-migrate-<ts>.db`` and the N oldest are pruned each boot.
156 lines
5.1 KiB
Python
156 lines
5.1 KiB
Python
"""Pre-migration database snapshots.
|
|
|
|
Runs at lifespan startup BEFORE migrations execute. Produces a consistent
|
|
point-in-time copy of the SQLite database using ``VACUUM INTO`` (atomic,
|
|
cannot tear against concurrent activity, works with WAL).
|
|
|
|
The snapshot is the operator's fallback if a future migration corrupts the
|
|
schema — restore is a single ``mv`` / ``docker cp``. We keep the N most
|
|
recent files (default 5) and never fail startup if the snapshot itself
|
|
fails: a snapshot is best-effort safety net, not a gate.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncEngine
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
_SNAPSHOT_GLOB = "pre-migrate-*.db"
|
|
_SNAPSHOT_NAME_RE = re.compile(r"^[A-Za-z0-9._+\-:]+$")
|
|
|
|
|
|
def _sqlite_path_from_url(url: str) -> Path | None:
|
|
"""Extract the filesystem path from a ``sqlite+aiosqlite:///...`` URL."""
|
|
if not url.startswith("sqlite"):
|
|
return None
|
|
# e.g. "sqlite+aiosqlite:///C:/data/notify_bridge.db"
|
|
prefix, _, rest = url.partition(":///")
|
|
if not rest:
|
|
return None
|
|
return Path(rest)
|
|
|
|
|
|
async def snapshot_database(
|
|
engine: AsyncEngine,
|
|
target_dir: Path,
|
|
*,
|
|
label: str = "pre-migrate",
|
|
) -> Path | None:
|
|
"""Write a consistent copy of the SQLite DB to ``target_dir``.
|
|
|
|
Uses ``VACUUM INTO`` which SQLite executes atomically against a read
|
|
snapshot — safe under WAL, cannot produce a torn copy. Returns the
|
|
snapshot path on success, ``None`` when skipped or on non-fatal
|
|
failure. Never raises: callers treat a missing snapshot as acceptable
|
|
(the main DB remains the source of truth).
|
|
"""
|
|
if not _SNAPSHOT_NAME_RE.match(label):
|
|
_LOGGER.warning("Snapshot label %r contains unsafe characters; skipping", label)
|
|
return None
|
|
|
|
url = str(engine.url)
|
|
src = _sqlite_path_from_url(url)
|
|
if src is None:
|
|
_LOGGER.debug("Non-SQLite engine; skipping snapshot")
|
|
return None
|
|
if not src.exists():
|
|
_LOGGER.debug("DB file %s does not exist yet (fresh install); skipping snapshot", src)
|
|
return None
|
|
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S")
|
|
dest = target_dir / f"{label}-{ts}.db"
|
|
|
|
# VACUUM INTO accepts a string literal, not a bind parameter. The dest
|
|
# path is built from our own label + timestamp (never user input), so
|
|
# escaping is straightforward — still, reject any dest containing a
|
|
# single quote as a belt-and-braces check.
|
|
dest_str = str(dest)
|
|
if "'" in dest_str:
|
|
_LOGGER.warning("Refusing to snapshot to path containing a single quote: %s", dest_str)
|
|
return None
|
|
|
|
try:
|
|
async with engine.connect() as conn:
|
|
# VACUUM cannot run inside an explicit transaction; use the
|
|
# plain connection without begin().
|
|
await conn.execute(text(f"VACUUM INTO '{dest_str}'"))
|
|
_LOGGER.info("Database snapshot written: %s (%.1f KiB)", dest, dest.stat().st_size / 1024)
|
|
return dest
|
|
except Exception:
|
|
_LOGGER.warning(
|
|
"Pre-migration snapshot failed — continuing with startup. "
|
|
"Check disk space in %s.",
|
|
target_dir,
|
|
exc_info=True,
|
|
)
|
|
# Partial file can linger if VACUUM INTO aborted mid-write; clean up.
|
|
try:
|
|
if dest.exists():
|
|
dest.unlink()
|
|
except OSError:
|
|
pass
|
|
return None
|
|
|
|
|
|
def prune_old_snapshots(target_dir: Path, keep: int) -> list[Path]:
|
|
"""Keep the ``keep`` most recent pre-migrate snapshots, delete the rest.
|
|
|
|
Returns the list of paths that were deleted. Safe to call with
|
|
``keep=0`` (deletes everything) or when the directory does not exist.
|
|
"""
|
|
if keep < 0:
|
|
raise ValueError("keep must be >= 0")
|
|
if not target_dir.is_dir():
|
|
return []
|
|
|
|
try:
|
|
snapshots = sorted(
|
|
target_dir.glob(_SNAPSHOT_GLOB),
|
|
key=lambda p: p.stat().st_mtime,
|
|
reverse=True,
|
|
)
|
|
except OSError:
|
|
return []
|
|
|
|
deleted: list[Path] = []
|
|
for old in snapshots[keep:]:
|
|
try:
|
|
old.unlink()
|
|
deleted.append(old)
|
|
except OSError:
|
|
_LOGGER.debug("Could not delete old snapshot %s", old, exc_info=True)
|
|
if deleted:
|
|
_LOGGER.info(
|
|
"Pruned %d old pre-migrate snapshot(s); kept %d most recent",
|
|
len(deleted), min(keep, len(snapshots)),
|
|
)
|
|
return deleted
|
|
|
|
|
|
async def snapshot_and_prune(
|
|
engine: AsyncEngine,
|
|
target_dir: Path,
|
|
*,
|
|
keep: int,
|
|
) -> Path | None:
|
|
"""Take a snapshot and prune old ones. Used by the lifespan startup path.
|
|
|
|
``keep=0`` disables snapshotting entirely.
|
|
"""
|
|
if keep <= 0:
|
|
return None
|
|
snapshot_path = await snapshot_database(engine, target_dir)
|
|
# Always prune even if this run's snapshot failed — old files still
|
|
# cost disk and may have been written by prior successful boots.
|
|
await asyncio.to_thread(prune_old_snapshots, target_dir, keep)
|
|
return snapshot_path
|