"""Pre-migration database snapshots. Runs at lifespan startup BEFORE migrations execute. Produces a consistent point-in-time copy of the SQLite database using ``VACUUM INTO`` (atomic, cannot tear against concurrent activity, works with WAL). The snapshot is the operator's fallback if a future migration corrupts the schema — restore is a single ``mv`` / ``docker cp``. We keep the N most recent files (default 5) and never fail startup if the snapshot itself fails: a snapshot is best-effort safety net, not a gate. """ from __future__ import annotations import asyncio import logging import re from datetime import datetime, timezone from pathlib import Path from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncEngine _LOGGER = logging.getLogger(__name__) _SNAPSHOT_GLOB = "pre-migrate-*.db" _SNAPSHOT_NAME_RE = re.compile(r"^[A-Za-z0-9._+\-:]+$") def _sqlite_path_from_url(url: str) -> Path | None: """Extract the filesystem path from a ``sqlite+aiosqlite:///...`` URL.""" if not url.startswith("sqlite"): return None # e.g. "sqlite+aiosqlite:///C:/data/notify_bridge.db" prefix, _, rest = url.partition(":///") if not rest: return None return Path(rest) async def snapshot_database( engine: AsyncEngine, target_dir: Path, *, label: str = "pre-migrate", ) -> Path | None: """Write a consistent copy of the SQLite DB to ``target_dir``. Uses ``VACUUM INTO`` which SQLite executes atomically against a read snapshot — safe under WAL, cannot produce a torn copy. Returns the snapshot path on success, ``None`` when skipped or on non-fatal failure. Never raises: callers treat a missing snapshot as acceptable (the main DB remains the source of truth). """ if not _SNAPSHOT_NAME_RE.match(label): _LOGGER.warning("Snapshot label %r contains unsafe characters; skipping", label) return None url = str(engine.url) src = _sqlite_path_from_url(url) if src is None: _LOGGER.debug("Non-SQLite engine; skipping snapshot") return None if not src.exists(): _LOGGER.debug("DB file %s does not exist yet (fresh install); skipping snapshot", src) return None target_dir.mkdir(parents=True, exist_ok=True) ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S") dest = target_dir / f"{label}-{ts}.db" # VACUUM INTO accepts a string literal, not a bind parameter. The dest # path is built from our own label + timestamp (never user input), so # escaping is straightforward — still, reject any dest containing a # single quote as a belt-and-braces check. dest_str = str(dest) if "'" in dest_str: _LOGGER.warning("Refusing to snapshot to path containing a single quote: %s", dest_str) return None try: async with engine.connect() as conn: # VACUUM cannot run inside an explicit transaction; use the # plain connection without begin(). await conn.execute(text(f"VACUUM INTO '{dest_str}'")) _LOGGER.info("Database snapshot written: %s (%.1f KiB)", dest, dest.stat().st_size / 1024) return dest except Exception: _LOGGER.warning( "Pre-migration snapshot failed — continuing with startup. " "Check disk space in %s.", target_dir, exc_info=True, ) # Partial file can linger if VACUUM INTO aborted mid-write; clean up. try: if dest.exists(): dest.unlink() except OSError: pass return None def prune_old_snapshots(target_dir: Path, keep: int) -> list[Path]: """Keep the ``keep`` most recent pre-migrate snapshots, delete the rest. Returns the list of paths that were deleted. Safe to call with ``keep=0`` (deletes everything) or when the directory does not exist. """ if keep < 0: raise ValueError("keep must be >= 0") if not target_dir.is_dir(): return [] try: snapshots = sorted( target_dir.glob(_SNAPSHOT_GLOB), key=lambda p: p.stat().st_mtime, reverse=True, ) except OSError: return [] deleted: list[Path] = [] for old in snapshots[keep:]: try: old.unlink() deleted.append(old) except OSError: _LOGGER.debug("Could not delete old snapshot %s", old, exc_info=True) if deleted: _LOGGER.info( "Pruned %d old pre-migrate snapshot(s); kept %d most recent", len(deleted), min(keep, len(snapshots)), ) return deleted async def snapshot_and_prune( engine: AsyncEngine, target_dir: Path, *, keep: int, ) -> Path | None: """Take a snapshot and prune old ones. Used by the lifespan startup path. ``keep=0`` disables snapshotting entirely. """ if keep <= 0: return None snapshot_path = await snapshot_database(engine, target_dir) # Always prune even if this run's snapshot failed — old files still # cost disk and may have been written by prior successful boots. await asyncio.to_thread(prune_old_snapshots, target_dir, keep) return snapshot_path