From 7cbb02b1ef9b78ce9f13c82e1f64743d89d4f5a5 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Thu, 23 Apr 2026 19:53:15 +0300 Subject: [PATCH] feat(db): pre-migration SQLite snapshots via VACUUM INTO MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Take a consistent, atomic copy of the DB at lifespan startup BEFORE migrations run, so a botched future upgrade is recoverable by restoring a single file instead of a data-loss incident. Uses SQLite's VACUUM INTO — safe under WAL, cannot tear against concurrent writes. Best-effort: failures are logged, never raised — the main DB remains the source of truth. Configurable via NOTIFY_BRIDGE_PRE_MIGRATE_SNAPSHOT_KEEP (default 5; 0 disables). Snapshots land in ``data_dir/backups/pre-migrate-.db`` and the N oldest are pruned each boot. --- .../server/src/notify_bridge_server/config.py | 8 + .../notify_bridge_server/database/snapshot.py | 155 ++++++++++++++++++ .../server/src/notify_bridge_server/main.py | 9 + packages/server/tests/test_snapshot.py | 116 +++++++++++++ 4 files changed, 288 insertions(+) create mode 100644 packages/server/src/notify_bridge_server/database/snapshot.py create mode 100644 packages/server/tests/test_snapshot.py diff --git a/packages/server/src/notify_bridge_server/config.py b/packages/server/src/notify_bridge_server/config.py index 128aebe..a47f4ce 100644 --- a/packages/server/src/notify_bridge_server/config.py +++ b/packages/server/src/notify_bridge_server/config.py @@ -70,6 +70,12 @@ class Settings(BaseSettings): event_log_retention_days: int = 30 """Days of event_log history to retain. 0 disables the retention job.""" + pre_migrate_snapshot_keep: int = 5 + """Number of pre-migration DB snapshots to keep in ``data_dir/backups/``. + 0 disables snapshotting entirely. Each snapshot is produced at boot + before migrations run using SQLite's ``VACUUM INTO`` (atomic, consistent). + """ + model_config = {"env_prefix": "NOTIFY_BRIDGE_"} def model_post_init(self, __context: Any) -> None: @@ -102,6 +108,8 @@ class Settings(BaseSettings): raise ValueError("port must be in range 1..65535") if self.event_log_retention_days < 0: raise ValueError("event_log_retention_days must be >= 0") + if self.pre_migrate_snapshot_keep < 0: + raise ValueError("pre_migrate_snapshot_keep must be >= 0") @property def effective_database_url(self) -> str: diff --git a/packages/server/src/notify_bridge_server/database/snapshot.py b/packages/server/src/notify_bridge_server/database/snapshot.py new file mode 100644 index 0000000..e5e6aef --- /dev/null +++ b/packages/server/src/notify_bridge_server/database/snapshot.py @@ -0,0 +1,155 @@ +"""Pre-migration database snapshots. + +Runs at lifespan startup BEFORE migrations execute. Produces a consistent +point-in-time copy of the SQLite database using ``VACUUM INTO`` (atomic, +cannot tear against concurrent activity, works with WAL). + +The snapshot is the operator's fallback if a future migration corrupts the +schema — restore is a single ``mv`` / ``docker cp``. We keep the N most +recent files (default 5) and never fail startup if the snapshot itself +fails: a snapshot is best-effort safety net, not a gate. +""" + +from __future__ import annotations + +import asyncio +import logging +import re +from datetime import datetime, timezone +from pathlib import Path + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncEngine + +_LOGGER = logging.getLogger(__name__) + +_SNAPSHOT_GLOB = "pre-migrate-*.db" +_SNAPSHOT_NAME_RE = re.compile(r"^[A-Za-z0-9._+\-:]+$") + + +def _sqlite_path_from_url(url: str) -> Path | None: + """Extract the filesystem path from a ``sqlite+aiosqlite:///...`` URL.""" + if not url.startswith("sqlite"): + return None + # e.g. "sqlite+aiosqlite:///C:/data/notify_bridge.db" + prefix, _, rest = url.partition(":///") + if not rest: + return None + return Path(rest) + + +async def snapshot_database( + engine: AsyncEngine, + target_dir: Path, + *, + label: str = "pre-migrate", +) -> Path | None: + """Write a consistent copy of the SQLite DB to ``target_dir``. + + Uses ``VACUUM INTO`` which SQLite executes atomically against a read + snapshot — safe under WAL, cannot produce a torn copy. Returns the + snapshot path on success, ``None`` when skipped or on non-fatal + failure. Never raises: callers treat a missing snapshot as acceptable + (the main DB remains the source of truth). + """ + if not _SNAPSHOT_NAME_RE.match(label): + _LOGGER.warning("Snapshot label %r contains unsafe characters; skipping", label) + return None + + url = str(engine.url) + src = _sqlite_path_from_url(url) + if src is None: + _LOGGER.debug("Non-SQLite engine; skipping snapshot") + return None + if not src.exists(): + _LOGGER.debug("DB file %s does not exist yet (fresh install); skipping snapshot", src) + return None + + target_dir.mkdir(parents=True, exist_ok=True) + ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S") + dest = target_dir / f"{label}-{ts}.db" + + # VACUUM INTO accepts a string literal, not a bind parameter. The dest + # path is built from our own label + timestamp (never user input), so + # escaping is straightforward — still, reject any dest containing a + # single quote as a belt-and-braces check. + dest_str = str(dest) + if "'" in dest_str: + _LOGGER.warning("Refusing to snapshot to path containing a single quote: %s", dest_str) + return None + + try: + async with engine.connect() as conn: + # VACUUM cannot run inside an explicit transaction; use the + # plain connection without begin(). + await conn.execute(text(f"VACUUM INTO '{dest_str}'")) + _LOGGER.info("Database snapshot written: %s (%.1f KiB)", dest, dest.stat().st_size / 1024) + return dest + except Exception: + _LOGGER.warning( + "Pre-migration snapshot failed — continuing with startup. " + "Check disk space in %s.", + target_dir, + exc_info=True, + ) + # Partial file can linger if VACUUM INTO aborted mid-write; clean up. + try: + if dest.exists(): + dest.unlink() + except OSError: + pass + return None + + +def prune_old_snapshots(target_dir: Path, keep: int) -> list[Path]: + """Keep the ``keep`` most recent pre-migrate snapshots, delete the rest. + + Returns the list of paths that were deleted. Safe to call with + ``keep=0`` (deletes everything) or when the directory does not exist. + """ + if keep < 0: + raise ValueError("keep must be >= 0") + if not target_dir.is_dir(): + return [] + + try: + snapshots = sorted( + target_dir.glob(_SNAPSHOT_GLOB), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + except OSError: + return [] + + deleted: list[Path] = [] + for old in snapshots[keep:]: + try: + old.unlink() + deleted.append(old) + except OSError: + _LOGGER.debug("Could not delete old snapshot %s", old, exc_info=True) + if deleted: + _LOGGER.info( + "Pruned %d old pre-migrate snapshot(s); kept %d most recent", + len(deleted), min(keep, len(snapshots)), + ) + return deleted + + +async def snapshot_and_prune( + engine: AsyncEngine, + target_dir: Path, + *, + keep: int, +) -> Path | None: + """Take a snapshot and prune old ones. Used by the lifespan startup path. + + ``keep=0`` disables snapshotting entirely. + """ + if keep <= 0: + return None + snapshot_path = await snapshot_database(engine, target_dir) + # Always prune even if this run's snapshot failed — old files still + # cost disk and may have been written by prior successful boots. + await asyncio.to_thread(prune_old_snapshots, target_dir, keep) + return snapshot_path diff --git a/packages/server/src/notify_bridge_server/main.py b/packages/server/src/notify_bridge_server/main.py index 4726b4a..a73bf4b 100644 --- a/packages/server/src/notify_bridge_server/main.py +++ b/packages/server/src/notify_bridge_server/main.py @@ -77,7 +77,16 @@ async def lifespan(app: FastAPI): migrate_performance_indexes, migrate_schema_version, ) + from .database.snapshot import snapshot_and_prune engine = get_engine() + # Take a consistent DB snapshot BEFORE migrations run, so operators can + # roll back a bad upgrade by restoring one file. Best-effort — failures + # are logged, not raised. + await snapshot_and_prune( + engine, + _log_cfg.data_dir / "backups", + keep=_log_cfg.pre_migrate_snapshot_keep, + ) await migrate_schema(engine) await migrate_tracker_targets(engine) await migrate_entity_refactor(engine) diff --git a/packages/server/tests/test_snapshot.py b/packages/server/tests/test_snapshot.py new file mode 100644 index 0000000..c9dee0a --- /dev/null +++ b/packages/server/tests/test_snapshot.py @@ -0,0 +1,116 @@ +"""Pre-migration snapshot: atomic copy + retention pruning.""" + +from __future__ import annotations + +import asyncio +from datetime import datetime, timedelta, timezone +from pathlib import Path + +import pytest +from sqlalchemy import text +from sqlalchemy.ext.asyncio import create_async_engine + +from notify_bridge_server.database.snapshot import ( + prune_old_snapshots, + snapshot_and_prune, + snapshot_database, +) + + +@pytest.fixture +async def sqlite_engine(tmp_path: Path): + """Tiny SQLite DB with one table + one row, closed cleanly after the test.""" + db_path = tmp_path / "app.db" + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}") + async with engine.begin() as conn: + await conn.execute(text("CREATE TABLE t (id INTEGER PRIMARY KEY, v TEXT)")) + await conn.execute(text("INSERT INTO t (v) VALUES ('seed')")) + yield engine, db_path, tmp_path / "backups" + await engine.dispose() + + +class TestSnapshot: + @pytest.mark.asyncio + async def test_creates_consistent_copy(self, sqlite_engine) -> None: + engine, _db, backups = sqlite_engine + dest = await snapshot_database(engine, backups) + assert dest is not None + assert dest.exists() + # Can open the snapshot and see the seed row — proves it's a real DB copy. + copy = create_async_engine(f"sqlite+aiosqlite:///{dest}") + async with copy.connect() as c: + result = await c.execute(text("SELECT v FROM t")) + rows = result.all() + await copy.dispose() + assert rows == [("seed",)] + + @pytest.mark.asyncio + async def test_skips_when_db_missing(self, tmp_path: Path) -> None: + # Engine pointing at a path that doesn't exist yet. + engine = create_async_engine( + f"sqlite+aiosqlite:///{tmp_path / 'does-not-exist.db'}" + ) + try: + dest = await snapshot_database(engine, tmp_path / "backups") + finally: + await engine.dispose() + assert dest is None + + @pytest.mark.asyncio + async def test_rejects_unsafe_label(self, sqlite_engine) -> None: + engine, _db, backups = sqlite_engine + dest = await snapshot_database(engine, backups, label="bad'; DROP TABLE t;--") + assert dest is None + + +class TestPrune: + def _make_snapshot(self, backups: Path, age_seconds: int) -> Path: + backups.mkdir(parents=True, exist_ok=True) + ts = datetime.now(timezone.utc) - timedelta(seconds=age_seconds) + name = f"pre-migrate-{ts.strftime('%Y-%m-%dT%H-%M-%S')}.db" + p = backups / name + p.write_bytes(b"x") + mtime = ts.timestamp() + import os + os.utime(p, (mtime, mtime)) + return p + + def test_keeps_n_newest(self, tmp_path: Path) -> None: + backups = tmp_path / "backups" + for age in (100, 80, 60, 40, 20, 0): + self._make_snapshot(backups, age) + + deleted = prune_old_snapshots(backups, keep=3) + remaining = sorted(backups.glob("pre-migrate-*.db")) + assert len(deleted) == 3 + assert len(remaining) == 3 + + def test_keep_zero_deletes_all(self, tmp_path: Path) -> None: + backups = tmp_path / "backups" + for age in (30, 20, 10): + self._make_snapshot(backups, age) + prune_old_snapshots(backups, keep=0) + assert list(backups.glob("pre-migrate-*.db")) == [] + + def test_missing_dir_is_noop(self, tmp_path: Path) -> None: + assert prune_old_snapshots(tmp_path / "never-created", keep=5) == [] + + +class TestSnapshotAndPrune: + @pytest.mark.asyncio + async def test_keep_zero_disables(self, sqlite_engine) -> None: + engine, _db, backups = sqlite_engine + result = await snapshot_and_prune(engine, backups, keep=0) + assert result is None + assert not backups.exists() or list(backups.glob("*.db")) == [] + + @pytest.mark.asyncio + async def test_end_to_end(self, sqlite_engine) -> None: + engine, _db, backups = sqlite_engine + # Run twice — second run should keep both snapshots (keep=5). + a = await snapshot_and_prune(engine, backups, keep=5) + # Guarantee distinct filenames (timestamp has second resolution). + await asyncio.sleep(1.05) + b = await snapshot_and_prune(engine, backups, keep=5) + assert a and b and a != b + assert a.exists() and b.exists()