feat(db): pre-migration SQLite snapshots via VACUUM INTO
Build and Test / test-backend (push) Successful in 2m38s
Build and Test / test-frontend (push) Successful in 9m44s
Build and Test / build-image (push) Failing after 17m9s

Take a consistent, atomic copy of the DB at lifespan startup BEFORE
migrations run, so a botched future upgrade is recoverable by restoring
a single file instead of a data-loss incident.

Uses SQLite's VACUUM INTO — safe under WAL, cannot tear against
concurrent writes. Best-effort: failures are logged, never raised —
the main DB remains the source of truth.

Configurable via NOTIFY_BRIDGE_PRE_MIGRATE_SNAPSHOT_KEEP (default 5;
0 disables). Snapshots land in ``data_dir/backups/pre-migrate-<ts>.db``
and the N oldest are pruned each boot.
This commit is contained in:
2026-04-23 19:53:15 +03:00
parent 920920bc67
commit 7cbb02b1ef
4 changed files with 288 additions and 0 deletions
@@ -70,6 +70,12 @@ class Settings(BaseSettings):
event_log_retention_days: int = 30
"""Days of event_log history to retain. 0 disables the retention job."""
pre_migrate_snapshot_keep: int = 5
"""Number of pre-migration DB snapshots to keep in ``data_dir/backups/``.
0 disables snapshotting entirely. Each snapshot is produced at boot
before migrations run using SQLite's ``VACUUM INTO`` (atomic, consistent).
"""
model_config = {"env_prefix": "NOTIFY_BRIDGE_"}
def model_post_init(self, __context: Any) -> None:
@@ -102,6 +108,8 @@ class Settings(BaseSettings):
raise ValueError("port must be in range 1..65535")
if self.event_log_retention_days < 0:
raise ValueError("event_log_retention_days must be >= 0")
if self.pre_migrate_snapshot_keep < 0:
raise ValueError("pre_migrate_snapshot_keep must be >= 0")
@property
def effective_database_url(self) -> str:
@@ -0,0 +1,155 @@
"""Pre-migration database snapshots.
Runs at lifespan startup BEFORE migrations execute. Produces a consistent
point-in-time copy of the SQLite database using ``VACUUM INTO`` (atomic,
cannot tear against concurrent activity, works with WAL).
The snapshot is the operator's fallback if a future migration corrupts the
schema — restore is a single ``mv`` / ``docker cp``. We keep the N most
recent files (default 5) and never fail startup if the snapshot itself
fails: a snapshot is best-effort safety net, not a gate.
"""
from __future__ import annotations
import asyncio
import logging
import re
from datetime import datetime, timezone
from pathlib import Path
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncEngine
_LOGGER = logging.getLogger(__name__)
_SNAPSHOT_GLOB = "pre-migrate-*.db"
_SNAPSHOT_NAME_RE = re.compile(r"^[A-Za-z0-9._+\-:]+$")
def _sqlite_path_from_url(url: str) -> Path | None:
"""Extract the filesystem path from a ``sqlite+aiosqlite:///...`` URL."""
if not url.startswith("sqlite"):
return None
# e.g. "sqlite+aiosqlite:///C:/data/notify_bridge.db"
prefix, _, rest = url.partition(":///")
if not rest:
return None
return Path(rest)
async def snapshot_database(
engine: AsyncEngine,
target_dir: Path,
*,
label: str = "pre-migrate",
) -> Path | None:
"""Write a consistent copy of the SQLite DB to ``target_dir``.
Uses ``VACUUM INTO`` which SQLite executes atomically against a read
snapshot — safe under WAL, cannot produce a torn copy. Returns the
snapshot path on success, ``None`` when skipped or on non-fatal
failure. Never raises: callers treat a missing snapshot as acceptable
(the main DB remains the source of truth).
"""
if not _SNAPSHOT_NAME_RE.match(label):
_LOGGER.warning("Snapshot label %r contains unsafe characters; skipping", label)
return None
url = str(engine.url)
src = _sqlite_path_from_url(url)
if src is None:
_LOGGER.debug("Non-SQLite engine; skipping snapshot")
return None
if not src.exists():
_LOGGER.debug("DB file %s does not exist yet (fresh install); skipping snapshot", src)
return None
target_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S")
dest = target_dir / f"{label}-{ts}.db"
# VACUUM INTO accepts a string literal, not a bind parameter. The dest
# path is built from our own label + timestamp (never user input), so
# escaping is straightforward — still, reject any dest containing a
# single quote as a belt-and-braces check.
dest_str = str(dest)
if "'" in dest_str:
_LOGGER.warning("Refusing to snapshot to path containing a single quote: %s", dest_str)
return None
try:
async with engine.connect() as conn:
# VACUUM cannot run inside an explicit transaction; use the
# plain connection without begin().
await conn.execute(text(f"VACUUM INTO '{dest_str}'"))
_LOGGER.info("Database snapshot written: %s (%.1f KiB)", dest, dest.stat().st_size / 1024)
return dest
except Exception:
_LOGGER.warning(
"Pre-migration snapshot failed — continuing with startup. "
"Check disk space in %s.",
target_dir,
exc_info=True,
)
# Partial file can linger if VACUUM INTO aborted mid-write; clean up.
try:
if dest.exists():
dest.unlink()
except OSError:
pass
return None
def prune_old_snapshots(target_dir: Path, keep: int) -> list[Path]:
"""Keep the ``keep`` most recent pre-migrate snapshots, delete the rest.
Returns the list of paths that were deleted. Safe to call with
``keep=0`` (deletes everything) or when the directory does not exist.
"""
if keep < 0:
raise ValueError("keep must be >= 0")
if not target_dir.is_dir():
return []
try:
snapshots = sorted(
target_dir.glob(_SNAPSHOT_GLOB),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
except OSError:
return []
deleted: list[Path] = []
for old in snapshots[keep:]:
try:
old.unlink()
deleted.append(old)
except OSError:
_LOGGER.debug("Could not delete old snapshot %s", old, exc_info=True)
if deleted:
_LOGGER.info(
"Pruned %d old pre-migrate snapshot(s); kept %d most recent",
len(deleted), min(keep, len(snapshots)),
)
return deleted
async def snapshot_and_prune(
engine: AsyncEngine,
target_dir: Path,
*,
keep: int,
) -> Path | None:
"""Take a snapshot and prune old ones. Used by the lifespan startup path.
``keep=0`` disables snapshotting entirely.
"""
if keep <= 0:
return None
snapshot_path = await snapshot_database(engine, target_dir)
# Always prune even if this run's snapshot failed — old files still
# cost disk and may have been written by prior successful boots.
await asyncio.to_thread(prune_old_snapshots, target_dir, keep)
return snapshot_path
@@ -77,7 +77,16 @@ async def lifespan(app: FastAPI):
migrate_performance_indexes,
migrate_schema_version,
)
from .database.snapshot import snapshot_and_prune
engine = get_engine()
# Take a consistent DB snapshot BEFORE migrations run, so operators can
# roll back a bad upgrade by restoring one file. Best-effort — failures
# are logged, not raised.
await snapshot_and_prune(
engine,
_log_cfg.data_dir / "backups",
keep=_log_cfg.pre_migrate_snapshot_keep,
)
await migrate_schema(engine)
await migrate_tracker_targets(engine)
await migrate_entity_refactor(engine)
+116
View File
@@ -0,0 +1,116 @@
"""Pre-migration snapshot: atomic copy + retention pruning."""
from __future__ import annotations
import asyncio
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pytest
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
from notify_bridge_server.database.snapshot import (
prune_old_snapshots,
snapshot_and_prune,
snapshot_database,
)
@pytest.fixture
async def sqlite_engine(tmp_path: Path):
"""Tiny SQLite DB with one table + one row, closed cleanly after the test."""
db_path = tmp_path / "app.db"
engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}")
async with engine.begin() as conn:
await conn.execute(text("CREATE TABLE t (id INTEGER PRIMARY KEY, v TEXT)"))
await conn.execute(text("INSERT INTO t (v) VALUES ('seed')"))
yield engine, db_path, tmp_path / "backups"
await engine.dispose()
class TestSnapshot:
@pytest.mark.asyncio
async def test_creates_consistent_copy(self, sqlite_engine) -> None:
engine, _db, backups = sqlite_engine
dest = await snapshot_database(engine, backups)
assert dest is not None
assert dest.exists()
# Can open the snapshot and see the seed row — proves it's a real DB copy.
copy = create_async_engine(f"sqlite+aiosqlite:///{dest}")
async with copy.connect() as c:
result = await c.execute(text("SELECT v FROM t"))
rows = result.all()
await copy.dispose()
assert rows == [("seed",)]
@pytest.mark.asyncio
async def test_skips_when_db_missing(self, tmp_path: Path) -> None:
# Engine pointing at a path that doesn't exist yet.
engine = create_async_engine(
f"sqlite+aiosqlite:///{tmp_path / 'does-not-exist.db'}"
)
try:
dest = await snapshot_database(engine, tmp_path / "backups")
finally:
await engine.dispose()
assert dest is None
@pytest.mark.asyncio
async def test_rejects_unsafe_label(self, sqlite_engine) -> None:
engine, _db, backups = sqlite_engine
dest = await snapshot_database(engine, backups, label="bad'; DROP TABLE t;--")
assert dest is None
class TestPrune:
def _make_snapshot(self, backups: Path, age_seconds: int) -> Path:
backups.mkdir(parents=True, exist_ok=True)
ts = datetime.now(timezone.utc) - timedelta(seconds=age_seconds)
name = f"pre-migrate-{ts.strftime('%Y-%m-%dT%H-%M-%S')}.db"
p = backups / name
p.write_bytes(b"x")
mtime = ts.timestamp()
import os
os.utime(p, (mtime, mtime))
return p
def test_keeps_n_newest(self, tmp_path: Path) -> None:
backups = tmp_path / "backups"
for age in (100, 80, 60, 40, 20, 0):
self._make_snapshot(backups, age)
deleted = prune_old_snapshots(backups, keep=3)
remaining = sorted(backups.glob("pre-migrate-*.db"))
assert len(deleted) == 3
assert len(remaining) == 3
def test_keep_zero_deletes_all(self, tmp_path: Path) -> None:
backups = tmp_path / "backups"
for age in (30, 20, 10):
self._make_snapshot(backups, age)
prune_old_snapshots(backups, keep=0)
assert list(backups.glob("pre-migrate-*.db")) == []
def test_missing_dir_is_noop(self, tmp_path: Path) -> None:
assert prune_old_snapshots(tmp_path / "never-created", keep=5) == []
class TestSnapshotAndPrune:
@pytest.mark.asyncio
async def test_keep_zero_disables(self, sqlite_engine) -> None:
engine, _db, backups = sqlite_engine
result = await snapshot_and_prune(engine, backups, keep=0)
assert result is None
assert not backups.exists() or list(backups.glob("*.db")) == []
@pytest.mark.asyncio
async def test_end_to_end(self, sqlite_engine) -> None:
engine, _db, backups = sqlite_engine
# Run twice — second run should keep both snapshots (keep=5).
a = await snapshot_and_prune(engine, backups, keep=5)
# Guarantee distinct filenames (timestamp has second resolution).
await asyncio.sleep(1.05)
b = await snapshot_and_prune(engine, backups, keep=5)
assert a and b and a != b
assert a.exists() and b.exists()