"""Pre-migration snapshot: atomic copy + retention pruning.""" from __future__ import annotations import asyncio from datetime import datetime, timedelta, timezone from pathlib import Path import pytest from sqlalchemy import text from sqlalchemy.ext.asyncio import create_async_engine from notify_bridge_server.database.snapshot import ( prune_old_snapshots, snapshot_and_prune, snapshot_database, ) @pytest.fixture async def sqlite_engine(tmp_path: Path): """Tiny SQLite DB with one table + one row, closed cleanly after the test.""" db_path = tmp_path / "app.db" engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}") async with engine.begin() as conn: await conn.execute(text("CREATE TABLE t (id INTEGER PRIMARY KEY, v TEXT)")) await conn.execute(text("INSERT INTO t (v) VALUES ('seed')")) yield engine, db_path, tmp_path / "backups" await engine.dispose() class TestSnapshot: @pytest.mark.asyncio async def test_creates_consistent_copy(self, sqlite_engine) -> None: engine, _db, backups = sqlite_engine dest = await snapshot_database(engine, backups) assert dest is not None assert dest.exists() # Can open the snapshot and see the seed row — proves it's a real DB copy. copy = create_async_engine(f"sqlite+aiosqlite:///{dest}") async with copy.connect() as c: result = await c.execute(text("SELECT v FROM t")) rows = result.all() await copy.dispose() assert rows == [("seed",)] @pytest.mark.asyncio async def test_skips_when_db_missing(self, tmp_path: Path) -> None: # Engine pointing at a path that doesn't exist yet. engine = create_async_engine( f"sqlite+aiosqlite:///{tmp_path / 'does-not-exist.db'}" ) try: dest = await snapshot_database(engine, tmp_path / "backups") finally: await engine.dispose() assert dest is None @pytest.mark.asyncio async def test_rejects_unsafe_label(self, sqlite_engine) -> None: engine, _db, backups = sqlite_engine dest = await snapshot_database(engine, backups, label="bad'; DROP TABLE t;--") assert dest is None class TestPrune: def _make_snapshot(self, backups: Path, age_seconds: int) -> Path: backups.mkdir(parents=True, exist_ok=True) ts = datetime.now(timezone.utc) - timedelta(seconds=age_seconds) name = f"pre-migrate-{ts.strftime('%Y-%m-%dT%H-%M-%S')}.db" p = backups / name p.write_bytes(b"x") mtime = ts.timestamp() import os os.utime(p, (mtime, mtime)) return p def test_keeps_n_newest(self, tmp_path: Path) -> None: backups = tmp_path / "backups" for age in (100, 80, 60, 40, 20, 0): self._make_snapshot(backups, age) deleted = prune_old_snapshots(backups, keep=3) remaining = sorted(backups.glob("pre-migrate-*.db")) assert len(deleted) == 3 assert len(remaining) == 3 def test_keep_zero_deletes_all(self, tmp_path: Path) -> None: backups = tmp_path / "backups" for age in (30, 20, 10): self._make_snapshot(backups, age) prune_old_snapshots(backups, keep=0) assert list(backups.glob("pre-migrate-*.db")) == [] def test_missing_dir_is_noop(self, tmp_path: Path) -> None: assert prune_old_snapshots(tmp_path / "never-created", keep=5) == [] class TestSnapshotAndPrune: @pytest.mark.asyncio async def test_keep_zero_disables(self, sqlite_engine) -> None: engine, _db, backups = sqlite_engine result = await snapshot_and_prune(engine, backups, keep=0) assert result is None assert not backups.exists() or list(backups.glob("*.db")) == [] @pytest.mark.asyncio async def test_end_to_end(self, sqlite_engine) -> None: engine, _db, backups = sqlite_engine # Run twice — second run should keep both snapshots (keep=5). a = await snapshot_and_prune(engine, backups, keep=5) # Guarantee distinct filenames (timestamp has second resolution). await asyncio.sleep(1.05) b = await snapshot_and_prune(engine, backups, keep=5) assert a and b and a != b assert a.exists() and b.exists()