feat(db): pre-migration SQLite snapshots via VACUUM INTO
Take a consistent, atomic copy of the DB at lifespan startup BEFORE migrations run, so a botched future upgrade is recoverable by restoring a single file instead of a data-loss incident. Uses SQLite's VACUUM INTO — safe under WAL, cannot tear against concurrent writes. Best-effort: failures are logged, never raised — the main DB remains the source of truth. Configurable via NOTIFY_BRIDGE_PRE_MIGRATE_SNAPSHOT_KEEP (default 5; 0 disables). Snapshots land in ``data_dir/backups/pre-migrate-<ts>.db`` and the N oldest are pruned each boot.
This commit is contained in:
@@ -70,6 +70,12 @@ class Settings(BaseSettings):
|
|||||||
event_log_retention_days: int = 30
|
event_log_retention_days: int = 30
|
||||||
"""Days of event_log history to retain. 0 disables the retention job."""
|
"""Days of event_log history to retain. 0 disables the retention job."""
|
||||||
|
|
||||||
|
pre_migrate_snapshot_keep: int = 5
|
||||||
|
"""Number of pre-migration DB snapshots to keep in ``data_dir/backups/``.
|
||||||
|
0 disables snapshotting entirely. Each snapshot is produced at boot
|
||||||
|
before migrations run using SQLite's ``VACUUM INTO`` (atomic, consistent).
|
||||||
|
"""
|
||||||
|
|
||||||
model_config = {"env_prefix": "NOTIFY_BRIDGE_"}
|
model_config = {"env_prefix": "NOTIFY_BRIDGE_"}
|
||||||
|
|
||||||
def model_post_init(self, __context: Any) -> None:
|
def model_post_init(self, __context: Any) -> None:
|
||||||
@@ -102,6 +108,8 @@ class Settings(BaseSettings):
|
|||||||
raise ValueError("port must be in range 1..65535")
|
raise ValueError("port must be in range 1..65535")
|
||||||
if self.event_log_retention_days < 0:
|
if self.event_log_retention_days < 0:
|
||||||
raise ValueError("event_log_retention_days must be >= 0")
|
raise ValueError("event_log_retention_days must be >= 0")
|
||||||
|
if self.pre_migrate_snapshot_keep < 0:
|
||||||
|
raise ValueError("pre_migrate_snapshot_keep must be >= 0")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def effective_database_url(self) -> str:
|
def effective_database_url(self) -> str:
|
||||||
|
|||||||
@@ -0,0 +1,155 @@
|
|||||||
|
"""Pre-migration database snapshots.
|
||||||
|
|
||||||
|
Runs at lifespan startup BEFORE migrations execute. Produces a consistent
|
||||||
|
point-in-time copy of the SQLite database using ``VACUUM INTO`` (atomic,
|
||||||
|
cannot tear against concurrent activity, works with WAL).
|
||||||
|
|
||||||
|
The snapshot is the operator's fallback if a future migration corrupts the
|
||||||
|
schema — restore is a single ``mv`` / ``docker cp``. We keep the N most
|
||||||
|
recent files (default 5) and never fail startup if the snapshot itself
|
||||||
|
fails: a snapshot is best-effort safety net, not a gate.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from sqlalchemy import text
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncEngine
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_SNAPSHOT_GLOB = "pre-migrate-*.db"
|
||||||
|
_SNAPSHOT_NAME_RE = re.compile(r"^[A-Za-z0-9._+\-:]+$")
|
||||||
|
|
||||||
|
|
||||||
|
def _sqlite_path_from_url(url: str) -> Path | None:
|
||||||
|
"""Extract the filesystem path from a ``sqlite+aiosqlite:///...`` URL."""
|
||||||
|
if not url.startswith("sqlite"):
|
||||||
|
return None
|
||||||
|
# e.g. "sqlite+aiosqlite:///C:/data/notify_bridge.db"
|
||||||
|
prefix, _, rest = url.partition(":///")
|
||||||
|
if not rest:
|
||||||
|
return None
|
||||||
|
return Path(rest)
|
||||||
|
|
||||||
|
|
||||||
|
async def snapshot_database(
|
||||||
|
engine: AsyncEngine,
|
||||||
|
target_dir: Path,
|
||||||
|
*,
|
||||||
|
label: str = "pre-migrate",
|
||||||
|
) -> Path | None:
|
||||||
|
"""Write a consistent copy of the SQLite DB to ``target_dir``.
|
||||||
|
|
||||||
|
Uses ``VACUUM INTO`` which SQLite executes atomically against a read
|
||||||
|
snapshot — safe under WAL, cannot produce a torn copy. Returns the
|
||||||
|
snapshot path on success, ``None`` when skipped or on non-fatal
|
||||||
|
failure. Never raises: callers treat a missing snapshot as acceptable
|
||||||
|
(the main DB remains the source of truth).
|
||||||
|
"""
|
||||||
|
if not _SNAPSHOT_NAME_RE.match(label):
|
||||||
|
_LOGGER.warning("Snapshot label %r contains unsafe characters; skipping", label)
|
||||||
|
return None
|
||||||
|
|
||||||
|
url = str(engine.url)
|
||||||
|
src = _sqlite_path_from_url(url)
|
||||||
|
if src is None:
|
||||||
|
_LOGGER.debug("Non-SQLite engine; skipping snapshot")
|
||||||
|
return None
|
||||||
|
if not src.exists():
|
||||||
|
_LOGGER.debug("DB file %s does not exist yet (fresh install); skipping snapshot", src)
|
||||||
|
return None
|
||||||
|
|
||||||
|
target_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S")
|
||||||
|
dest = target_dir / f"{label}-{ts}.db"
|
||||||
|
|
||||||
|
# VACUUM INTO accepts a string literal, not a bind parameter. The dest
|
||||||
|
# path is built from our own label + timestamp (never user input), so
|
||||||
|
# escaping is straightforward — still, reject any dest containing a
|
||||||
|
# single quote as a belt-and-braces check.
|
||||||
|
dest_str = str(dest)
|
||||||
|
if "'" in dest_str:
|
||||||
|
_LOGGER.warning("Refusing to snapshot to path containing a single quote: %s", dest_str)
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with engine.connect() as conn:
|
||||||
|
# VACUUM cannot run inside an explicit transaction; use the
|
||||||
|
# plain connection without begin().
|
||||||
|
await conn.execute(text(f"VACUUM INTO '{dest_str}'"))
|
||||||
|
_LOGGER.info("Database snapshot written: %s (%.1f KiB)", dest, dest.stat().st_size / 1024)
|
||||||
|
return dest
|
||||||
|
except Exception:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Pre-migration snapshot failed — continuing with startup. "
|
||||||
|
"Check disk space in %s.",
|
||||||
|
target_dir,
|
||||||
|
exc_info=True,
|
||||||
|
)
|
||||||
|
# Partial file can linger if VACUUM INTO aborted mid-write; clean up.
|
||||||
|
try:
|
||||||
|
if dest.exists():
|
||||||
|
dest.unlink()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def prune_old_snapshots(target_dir: Path, keep: int) -> list[Path]:
|
||||||
|
"""Keep the ``keep`` most recent pre-migrate snapshots, delete the rest.
|
||||||
|
|
||||||
|
Returns the list of paths that were deleted. Safe to call with
|
||||||
|
``keep=0`` (deletes everything) or when the directory does not exist.
|
||||||
|
"""
|
||||||
|
if keep < 0:
|
||||||
|
raise ValueError("keep must be >= 0")
|
||||||
|
if not target_dir.is_dir():
|
||||||
|
return []
|
||||||
|
|
||||||
|
try:
|
||||||
|
snapshots = sorted(
|
||||||
|
target_dir.glob(_SNAPSHOT_GLOB),
|
||||||
|
key=lambda p: p.stat().st_mtime,
|
||||||
|
reverse=True,
|
||||||
|
)
|
||||||
|
except OSError:
|
||||||
|
return []
|
||||||
|
|
||||||
|
deleted: list[Path] = []
|
||||||
|
for old in snapshots[keep:]:
|
||||||
|
try:
|
||||||
|
old.unlink()
|
||||||
|
deleted.append(old)
|
||||||
|
except OSError:
|
||||||
|
_LOGGER.debug("Could not delete old snapshot %s", old, exc_info=True)
|
||||||
|
if deleted:
|
||||||
|
_LOGGER.info(
|
||||||
|
"Pruned %d old pre-migrate snapshot(s); kept %d most recent",
|
||||||
|
len(deleted), min(keep, len(snapshots)),
|
||||||
|
)
|
||||||
|
return deleted
|
||||||
|
|
||||||
|
|
||||||
|
async def snapshot_and_prune(
|
||||||
|
engine: AsyncEngine,
|
||||||
|
target_dir: Path,
|
||||||
|
*,
|
||||||
|
keep: int,
|
||||||
|
) -> Path | None:
|
||||||
|
"""Take a snapshot and prune old ones. Used by the lifespan startup path.
|
||||||
|
|
||||||
|
``keep=0`` disables snapshotting entirely.
|
||||||
|
"""
|
||||||
|
if keep <= 0:
|
||||||
|
return None
|
||||||
|
snapshot_path = await snapshot_database(engine, target_dir)
|
||||||
|
# Always prune even if this run's snapshot failed — old files still
|
||||||
|
# cost disk and may have been written by prior successful boots.
|
||||||
|
await asyncio.to_thread(prune_old_snapshots, target_dir, keep)
|
||||||
|
return snapshot_path
|
||||||
@@ -77,7 +77,16 @@ async def lifespan(app: FastAPI):
|
|||||||
migrate_performance_indexes,
|
migrate_performance_indexes,
|
||||||
migrate_schema_version,
|
migrate_schema_version,
|
||||||
)
|
)
|
||||||
|
from .database.snapshot import snapshot_and_prune
|
||||||
engine = get_engine()
|
engine = get_engine()
|
||||||
|
# Take a consistent DB snapshot BEFORE migrations run, so operators can
|
||||||
|
# roll back a bad upgrade by restoring one file. Best-effort — failures
|
||||||
|
# are logged, not raised.
|
||||||
|
await snapshot_and_prune(
|
||||||
|
engine,
|
||||||
|
_log_cfg.data_dir / "backups",
|
||||||
|
keep=_log_cfg.pre_migrate_snapshot_keep,
|
||||||
|
)
|
||||||
await migrate_schema(engine)
|
await migrate_schema(engine)
|
||||||
await migrate_tracker_targets(engine)
|
await migrate_tracker_targets(engine)
|
||||||
await migrate_entity_refactor(engine)
|
await migrate_entity_refactor(engine)
|
||||||
|
|||||||
@@ -0,0 +1,116 @@
|
|||||||
|
"""Pre-migration snapshot: atomic copy + retention pruning."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from sqlalchemy import text
|
||||||
|
from sqlalchemy.ext.asyncio import create_async_engine
|
||||||
|
|
||||||
|
from notify_bridge_server.database.snapshot import (
|
||||||
|
prune_old_snapshots,
|
||||||
|
snapshot_and_prune,
|
||||||
|
snapshot_database,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def sqlite_engine(tmp_path: Path):
|
||||||
|
"""Tiny SQLite DB with one table + one row, closed cleanly after the test."""
|
||||||
|
db_path = tmp_path / "app.db"
|
||||||
|
engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}")
|
||||||
|
async with engine.begin() as conn:
|
||||||
|
await conn.execute(text("CREATE TABLE t (id INTEGER PRIMARY KEY, v TEXT)"))
|
||||||
|
await conn.execute(text("INSERT INTO t (v) VALUES ('seed')"))
|
||||||
|
yield engine, db_path, tmp_path / "backups"
|
||||||
|
await engine.dispose()
|
||||||
|
|
||||||
|
|
||||||
|
class TestSnapshot:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_creates_consistent_copy(self, sqlite_engine) -> None:
|
||||||
|
engine, _db, backups = sqlite_engine
|
||||||
|
dest = await snapshot_database(engine, backups)
|
||||||
|
assert dest is not None
|
||||||
|
assert dest.exists()
|
||||||
|
# Can open the snapshot and see the seed row — proves it's a real DB copy.
|
||||||
|
copy = create_async_engine(f"sqlite+aiosqlite:///{dest}")
|
||||||
|
async with copy.connect() as c:
|
||||||
|
result = await c.execute(text("SELECT v FROM t"))
|
||||||
|
rows = result.all()
|
||||||
|
await copy.dispose()
|
||||||
|
assert rows == [("seed",)]
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_skips_when_db_missing(self, tmp_path: Path) -> None:
|
||||||
|
# Engine pointing at a path that doesn't exist yet.
|
||||||
|
engine = create_async_engine(
|
||||||
|
f"sqlite+aiosqlite:///{tmp_path / 'does-not-exist.db'}"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
dest = await snapshot_database(engine, tmp_path / "backups")
|
||||||
|
finally:
|
||||||
|
await engine.dispose()
|
||||||
|
assert dest is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rejects_unsafe_label(self, sqlite_engine) -> None:
|
||||||
|
engine, _db, backups = sqlite_engine
|
||||||
|
dest = await snapshot_database(engine, backups, label="bad'; DROP TABLE t;--")
|
||||||
|
assert dest is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestPrune:
|
||||||
|
def _make_snapshot(self, backups: Path, age_seconds: int) -> Path:
|
||||||
|
backups.mkdir(parents=True, exist_ok=True)
|
||||||
|
ts = datetime.now(timezone.utc) - timedelta(seconds=age_seconds)
|
||||||
|
name = f"pre-migrate-{ts.strftime('%Y-%m-%dT%H-%M-%S')}.db"
|
||||||
|
p = backups / name
|
||||||
|
p.write_bytes(b"x")
|
||||||
|
mtime = ts.timestamp()
|
||||||
|
import os
|
||||||
|
os.utime(p, (mtime, mtime))
|
||||||
|
return p
|
||||||
|
|
||||||
|
def test_keeps_n_newest(self, tmp_path: Path) -> None:
|
||||||
|
backups = tmp_path / "backups"
|
||||||
|
for age in (100, 80, 60, 40, 20, 0):
|
||||||
|
self._make_snapshot(backups, age)
|
||||||
|
|
||||||
|
deleted = prune_old_snapshots(backups, keep=3)
|
||||||
|
remaining = sorted(backups.glob("pre-migrate-*.db"))
|
||||||
|
assert len(deleted) == 3
|
||||||
|
assert len(remaining) == 3
|
||||||
|
|
||||||
|
def test_keep_zero_deletes_all(self, tmp_path: Path) -> None:
|
||||||
|
backups = tmp_path / "backups"
|
||||||
|
for age in (30, 20, 10):
|
||||||
|
self._make_snapshot(backups, age)
|
||||||
|
prune_old_snapshots(backups, keep=0)
|
||||||
|
assert list(backups.glob("pre-migrate-*.db")) == []
|
||||||
|
|
||||||
|
def test_missing_dir_is_noop(self, tmp_path: Path) -> None:
|
||||||
|
assert prune_old_snapshots(tmp_path / "never-created", keep=5) == []
|
||||||
|
|
||||||
|
|
||||||
|
class TestSnapshotAndPrune:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_keep_zero_disables(self, sqlite_engine) -> None:
|
||||||
|
engine, _db, backups = sqlite_engine
|
||||||
|
result = await snapshot_and_prune(engine, backups, keep=0)
|
||||||
|
assert result is None
|
||||||
|
assert not backups.exists() or list(backups.glob("*.db")) == []
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_end_to_end(self, sqlite_engine) -> None:
|
||||||
|
engine, _db, backups = sqlite_engine
|
||||||
|
# Run twice — second run should keep both snapshots (keep=5).
|
||||||
|
a = await snapshot_and_prune(engine, backups, keep=5)
|
||||||
|
# Guarantee distinct filenames (timestamp has second resolution).
|
||||||
|
await asyncio.sleep(1.05)
|
||||||
|
b = await snapshot_and_prune(engine, backups, keep=5)
|
||||||
|
assert a and b and a != b
|
||||||
|
assert a.exists() and b.exists()
|
||||||
Reference in New Issue
Block a user