"""Adversarial / edge-case tests for ActivityLogRepository (Phase 1 — storage layer). These tests are intentionally skeptical — they derive expected behaviour from the acceptance criteria in plans/activity-log/phase-1-storage.md, NOT from what the code happens to do today. If a test fails, it is a real bug. Coverage areas -------------- 1. SQL-injection / parameterization safety (message_like with %, _, ;, --, quotes, etc.) 2. Keyset pagination edge cases (empty table, before_seq bounds, stability, ordering contract, no duplicates/gaps) 3. Prune edge cases (before_ts only, max_entries only, both, max_entries=0, larger than count, deleted count, keeps NEWEST entries) 4. Filter combination edge cases (AND semantics, empty sequence vs None, since/until inclusive bounds, tz-aware datetimes) 5. Codec / data integrity (metadata round-trip: nested, unicode, JSON-escape; entity_* None vs empty string; microsecond ts) 6. Migration idempotency (table + all indexes present; double-run is no-op) 7. iter_export vs query consistency (same filters yield same rows; empty table; filter) """ from __future__ import annotations import uuid from datetime import datetime, timedelta, timezone import pytest from ledgrab.storage.activity_log import ( ActivityCategory, ActivityLogEntry, ActivityLogFilters, ActivitySeverity, ) from ledgrab.storage.activity_log_repository import ActivityLogRepository from ledgrab.storage.database import Database # --------------------------------------------------------------------------- # Helpers (mirror the implementer's helpers so tests are self-contained) # --------------------------------------------------------------------------- def _now() -> datetime: return datetime.now(timezone.utc) _SENTINEL = object() def _entry( *, id: str | None = None, ts: datetime | None = None, category: str = ActivityCategory.ENTITY, action: str = "entity.created", severity: str = ActivitySeverity.INFO, actor: str = "test_actor", entity_type: str | None = "output_target", entity_id: object = _SENTINEL, entity_name: str | None = "My Target", message: str = "Created output target", metadata: dict | None = None, ) -> ActivityLogEntry: resolved_entity_id: str | None = ( f"ot_{uuid.uuid4().hex[:8]}" if entity_id is _SENTINEL else entity_id # type: ignore[assignment] ) return ActivityLogEntry( id=id or f"al_{uuid.uuid4().hex[:8]}", ts=ts or _now(), category=category, action=action, severity=severity, actor=actor, entity_type=entity_type, entity_id=resolved_entity_id, entity_name=entity_name, message=message, metadata=metadata if metadata is not None else {}, ) @pytest.fixture def repo(tmp_db: Database) -> ActivityLogRepository: """Fresh ActivityLogRepository backed by a temp database.""" return ActivityLogRepository(tmp_db) def _get_seq(repo: ActivityLogRepository, entry_id: str) -> int: cursor = repo._db.execute("SELECT seq FROM activity_log WHERE id = ?", (entry_id,)) row = cursor.fetchone() assert row is not None, f"No row found for id={entry_id!r}" return int(row["seq"]) # --------------------------------------------------------------------------- # 1. SQL-injection / parameterization safety # --------------------------------------------------------------------------- class TestSQLInjectionSafety: """All user-supplied filter values must be treated as literal text, not SQL.""" def test_message_like_percent_is_literal(self, repo: ActivityLogRepository) -> None: """A literal '%' in message_like must NOT act as a LIKE wildcard.""" repo.record(_entry(message="100% done")) repo.record(_entry(message="all done")) repo.record(_entry(message="percent sign here")) results = repo.query(ActivityLogFilters(message_like="100%"), limit=10) assert len(results) == 1, "% in message_like should be a literal percent, not a wildcard" assert results[0].message == "100% done" def test_message_like_underscore_is_literal(self, repo: ActivityLogRepository) -> None: """A literal '_' in message_like must NOT act as a single-char wildcard.""" repo.record(_entry(message="device_01")) repo.record(_entry(message="device001")) # would match if _ were a wildcard repo.record(_entry(message="some other message")) results = repo.query(ActivityLogFilters(message_like="device_01"), limit=10) assert ( len(results) == 1 ), "_ in message_like should be a literal underscore, not a single-char wildcard" assert results[0].message == "device_01" def test_message_like_single_quote_does_not_break_query( self, repo: ActivityLogRepository ) -> None: """A single quote in message_like must not cause a SQL syntax error.""" repo.record(_entry(message="it's working")) repo.record(_entry(message="no quote here")) # Must not raise results = repo.query(ActivityLogFilters(message_like="it's"), limit=10) assert len(results) == 1 assert results[0].message == "it's working" def test_message_like_semicolon_does_not_execute_second_statement( self, repo: ActivityLogRepository ) -> None: """';' in message_like must not let a second SQL statement execute.""" repo.record(_entry(message="a; DROP TABLE activity_log; --")) repo.record(_entry(message="safe message")) # If injection succeeded, table would be dropped and next call would error results = repo.query( ActivityLogFilters(message_like="a; DROP TABLE activity_log; --"), limit=10 ) # Table must still exist assert repo.count() == 2 assert len(results) == 1 def test_message_like_sql_comment_sequence(self, repo: ActivityLogRepository) -> None: """'--' (SQL comment) in message_like must be treated literally.""" repo.record(_entry(message="value -- comment")) repo.record(_entry(message="value no comment")) results = repo.query(ActivityLogFilters(message_like="value --"), limit=10) assert len(results) == 1 assert results[0].message == "value -- comment" def test_message_like_backslash_literal(self, repo: ActivityLogRepository) -> None: """Backslash in message_like must be treated as a literal character.""" repo.record(_entry(message="path\\to\\file")) repo.record(_entry(message="path/to/file")) results = repo.query(ActivityLogFilters(message_like="path\\to"), limit=10) assert len(results) == 1 assert results[0].message == "path\\to\\file" def test_message_like_classic_injection_pattern(self, repo: ActivityLogRepository) -> None: """Classic ') OR '1'='1 injection attempt must return no false positives.""" repo.record(_entry(message="innocent message")) repo.record(_entry(message="another message")) # If injection worked, all rows would match results = repo.query(ActivityLogFilters(message_like="') OR '1'='1"), limit=10) assert ( len(results) == 0 ), "Injection payload matched rows it shouldn't — parameterization may be broken" def test_message_like_all_wildcards_returns_nothing_for_no_match( self, repo: ActivityLogRepository ) -> None: """'%_%' as a literal search term should return no rows unless that exact substring appears in a message.""" repo.record(_entry(message="some message")) results = repo.query(ActivityLogFilters(message_like="%_%"), limit=10) # '%_%' as literal text does not appear in "some message" assert ( len(results) == 0 ), "% and _ in message_like were treated as SQL wildcards instead of literals" def test_actor_exact_match_not_like(self, repo: ActivityLogRepository) -> None: """actor filter is exact match — SQL wildcards in value must not act as wildcards.""" repo.record(_entry(actor="alice")) repo.record(_entry(actor="alice_admin")) results = repo.query(ActivityLogFilters(actor="alice"), limit=10) assert ( len(results) == 1 ), "actor filter is exact-match; 'alice' should not match 'alice_admin'" assert results[0].actor == "alice" def test_entity_id_exact_match_not_like(self, repo: ActivityLogRepository) -> None: """entity_id filter is exact match — prefix should not leak.""" repo.record(_entry(entity_id="ot_abc")) repo.record(_entry(entity_id="ot_abc_extra")) results = repo.query(ActivityLogFilters(entity_id="ot_abc"), limit=10) assert len(results) == 1 # --------------------------------------------------------------------------- # 2. Keyset pagination edge cases # --------------------------------------------------------------------------- class TestKeysetPaginationEdges: def test_empty_table_returns_empty_list(self, repo: ActivityLogRepository) -> None: """Query on empty table must return [] not raise.""" results = repo.query(ActivityLogFilters(), limit=10) assert results == [] def test_before_seq_none_is_first_page(self, repo: ActivityLogRepository) -> None: """before_seq=None must return the newest (first) page.""" base = datetime(2026, 1, 1, tzinfo=timezone.utc) for i in range(5): repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}")) page = repo.query(ActivityLogFilters(), before_seq=None, limit=3) assert len(page) == 3 # Page should contain the 3 newest entries messages = {e.message for e in page} assert "e4" in messages assert "e3" in messages assert "e2" in messages def test_before_seq_smaller_than_all_rows_returns_empty( self, repo: ActivityLogRepository ) -> None: """before_seq=1 (smaller than all autoincrement seqs) returns empty page.""" for i in range(5): repo.record(_entry(message=f"e{i}")) # seq starts at 1, so before_seq=1 means seq < 1 — no rows results = repo.query(ActivityLogFilters(), before_seq=1, limit=10) assert ( results == [] ), "before_seq=1 should yield empty page since autoincrement starts at 1 (seq<1 = nothing)" def test_before_seq_larger_than_max_returns_full_first_page( self, repo: ActivityLogRepository ) -> None: """before_seq larger than any seq in the table behaves like before_seq=None.""" for i in range(5): repo.record(_entry(message=f"e{i}")) page_none = repo.query(ActivityLogFilters(), before_seq=None, limit=5) page_large = repo.query(ActivityLogFilters(), before_seq=999_999, limit=5) ids_none = {e.id for e in page_none} ids_large = {e.id for e in page_large} assert ids_none == ids_large def test_page_boundary_limit_equals_row_count(self, repo: ActivityLogRepository) -> None: """When limit == total rows, one page covers all rows and a second page is empty.""" for i in range(5): repo.record(_entry(message=f"e{i}")) page1 = repo.query(ActivityLogFilters(), limit=5) assert len(page1) == 5 first_seq = _get_seq(repo, page1[0].id) page2 = repo.query(ActivityLogFilters(), before_seq=first_seq, limit=5) assert page2 == [] def test_ordering_contract_page_zero_is_smallest_seq(self, repo: ActivityLogRepository) -> None: """Within a page, page[0] must have the smallest seq (ascending chrono order). The acceptance criteria state: 'The smallest seq on a page is page[0]'s seq — pass that as before_seq for the next page.'""" base = datetime(2026, 1, 1, tzinfo=timezone.utc) for i in range(6): repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}")) page = repo.query(ActivityLogFilters(), limit=6) seqs = [_get_seq(repo, e.id) for e in page] assert seqs == sorted( seqs ), "page must be in ascending seq order (page[0] is oldest/smallest seq)" def test_no_duplicates_across_full_walk(self, repo: ActivityLogRepository) -> None: """Walking the entire table page by page yields each row exactly once.""" total = 11 base = datetime(2026, 1, 1, tzinfo=timezone.utc) for i in range(total): repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}")) all_ids: list[str] = [] before_seq: int | None = None limit = 4 while True: page = repo.query(ActivityLogFilters(), before_seq=before_seq, limit=limit) if not page: break all_ids.extend(e.id for e in page) before_seq = _get_seq(repo, page[0].id) assert len(all_ids) == total, "Total rows from all pages must equal inserted count" assert len(set(all_ids)) == total, "No duplicate IDs across pages" def test_no_gaps_across_full_walk(self, repo: ActivityLogRepository) -> None: """Walking the entire table page by page with limit=1 yields every row.""" total = 7 for i in range(total): repo.record(_entry(message=f"e{i}")) all_ids: list[str] = [] before_seq: int | None = None while True: page = repo.query(ActivityLogFilters(), before_seq=before_seq, limit=1) if not page: break all_ids.append(page[0].id) before_seq = _get_seq(repo, page[0].id) assert len(all_ids) == total def test_many_rows_same_ts_no_duplicates_or_gaps(self, repo: ActivityLogRepository) -> None: """With many identical timestamps, pagination via seq prevents any dup or gap.""" same_ts = datetime(2026, 5, 1, 10, 0, 0, tzinfo=timezone.utc) count = 9 for i in range(count): repo.record(_entry(ts=same_ts, message=f"same-ts {i}")) all_ids: list[str] = [] before_seq: int | None = None limit = 4 while True: page = repo.query(ActivityLogFilters(), before_seq=before_seq, limit=limit) if not page: break all_ids.extend(e.id for e in page) before_seq = _get_seq(repo, page[0].id) assert len(all_ids) == count assert len(set(all_ids)) == count, "Duplicates found in same-ts pagination walk" def test_next_page_cursor_is_page_zero_seq(self, repo: ActivityLogRepository) -> None: """The documented contract: pass page[0].seq as before_seq for next page. Verify the next page does NOT overlap with the current page.""" for i in range(6): repo.record(_entry(message=f"e{i}")) page1 = repo.query(ActivityLogFilters(), limit=3) cursor = _get_seq(repo, page1[0].id) # page[0] = smallest seq on page page2 = repo.query(ActivityLogFilters(), before_seq=cursor, limit=3) ids1 = {e.id for e in page1} ids2 = {e.id for e in page2} assert ids1.isdisjoint(ids2), "Pages overlap — cursor contract broken" # --------------------------------------------------------------------------- # 3. Prune edge cases # --------------------------------------------------------------------------- class TestPruneEdgeCases: def test_prune_before_ts_only_no_max_entries(self, repo: ActivityLogRepository) -> None: """before_ts alone removes only old rows; recent rows untouched.""" cutoff = datetime(2026, 3, 1, tzinfo=timezone.utc) repo.record(_entry(ts=cutoff - timedelta(days=2), message="old")) repo.record(_entry(ts=cutoff + timedelta(days=1), message="new")) deleted = repo.prune(before_ts=cutoff) assert deleted == 1 remaining = repo.query(ActivityLogFilters(), limit=10) assert len(remaining) == 1 assert remaining[0].message == "new" def test_prune_max_entries_only_no_before_ts(self, repo: ActivityLogRepository) -> None: """max_entries alone trims to N newest; no age filter applied.""" for i in range(6): repo.record(_entry(message=f"e{i}")) deleted = repo.prune(max_entries=2) assert deleted == 4 assert repo.count() == 2 def test_prune_max_entries_zero_deletes_all(self, repo: ActivityLogRepository) -> None: """max_entries=0 means keep nothing — all rows deleted.""" for i in range(5): repo.record(_entry()) deleted = repo.prune(max_entries=0) assert deleted == 5 assert repo.count() == 0 def test_prune_max_entries_larger_than_count_is_noop(self, repo: ActivityLogRepository) -> None: """max_entries > actual count must not delete anything.""" for i in range(3): repo.record(_entry(message=f"e{i}")) deleted = repo.prune(max_entries=100) assert deleted == 0 assert repo.count() == 3 def test_prune_keeps_newest_entries_by_seq(self, repo: ActivityLogRepository) -> None: """max_entries prune MUST keep the rows with the HIGHEST seq values.""" base = datetime(2026, 1, 1, tzinfo=timezone.utc) all_ids = [] for i in range(6): e = _entry(ts=base + timedelta(seconds=i), message=f"e{i}") all_ids.append(e.id) repo.record(e) # keep only 2 repo.prune(max_entries=2) remaining = repo.query(ActivityLogFilters(), limit=10) remaining_ids = {r.id for r in remaining} # Must keep the last two inserted (highest seq = newest) assert all_ids[-1] in remaining_ids, "Newest entry (e5) must be kept" assert all_ids[-2] in remaining_ids, "Second newest entry (e4) must be kept" # Oldest must be gone assert all_ids[0] not in remaining_ids, "Oldest entry (e0) must be pruned" def test_prune_both_returns_sum_of_deleted(self, repo: ActivityLogRepository) -> None: """prune(before_ts, max_entries) returns the TOTAL rows deleted by both steps.""" base = datetime(2026, 4, 1, tzinfo=timezone.utc) # 4 old entries (before base) for i in range(4): repo.record(_entry(ts=base - timedelta(hours=i + 1), message=f"old{i}")) # 4 new entries (after base) for i in range(4): repo.record(_entry(ts=base + timedelta(hours=i + 1), message=f"new{i}")) # prune old, then keep only 2 new deleted = repo.prune(before_ts=base, max_entries=2) # 4 old + 2 of the 4 new = 6 total assert deleted == 6 assert repo.count() == 2 def test_prune_no_args_is_noop(self, repo: ActivityLogRepository) -> None: """prune() with no args should delete 0 rows.""" for i in range(3): repo.record(_entry()) deleted = repo.prune() assert deleted == 0 assert repo.count() == 3 def test_prune_before_ts_boundary_is_exclusive(self, repo: ActivityLogRepository) -> None: """prune(before_ts=X) uses strict < X; a row exactly at X must survive.""" ts = datetime(2026, 5, 1, 12, 0, 0, tzinfo=timezone.utc) repo.record(_entry(ts=ts - timedelta(seconds=1), message="before")) repo.record(_entry(ts=ts, message="exact boundary")) repo.record(_entry(ts=ts + timedelta(seconds=1), message="after")) deleted = repo.prune(before_ts=ts) assert deleted == 1 # only "before" deleted remaining = {r.message for r in repo.query(ActivityLogFilters(), limit=10)} assert "exact boundary" in remaining assert "before" not in remaining # --------------------------------------------------------------------------- # 4. Filter combination edge cases # --------------------------------------------------------------------------- class TestFilterCombinationEdges: def test_multiple_filters_are_anded(self, repo: ActivityLogRepository) -> None: """All non-None filters must be AND-ed together, not OR-ed.""" repo.record( _entry(actor="alice", category=ActivityCategory.AUTH, severity=ActivitySeverity.ERROR) ) repo.record( _entry(actor="alice", category=ActivityCategory.DEVICE, severity=ActivitySeverity.INFO) ) repo.record( _entry(actor="bob", category=ActivityCategory.AUTH, severity=ActivitySeverity.ERROR) ) results = repo.query( ActivityLogFilters( actor="alice", categories=[ActivityCategory.AUTH], severities=[ActivitySeverity.ERROR], ), limit=10, ) assert len(results) == 1 r = results[0] assert r.actor == "alice" assert r.category == ActivityCategory.AUTH assert r.severity == ActivitySeverity.ERROR def test_empty_categories_sequence_means_no_restriction( self, repo: ActivityLogRepository ) -> None: """An empty list for categories must behave the same as None (no restriction). The acceptance criteria state empty sequence == None for this dimension.""" repo.record(_entry(category=ActivityCategory.AUTH)) repo.record(_entry(category=ActivityCategory.DEVICE)) # empty list results_empty = repo.query(ActivityLogFilters(categories=[]), limit=10) # None results_none = repo.query(ActivityLogFilters(categories=None), limit=10) assert len(results_empty) == len(results_none), ( "categories=[] and categories=None should behave identically (no restriction); " f"got {len(results_empty)} vs {len(results_none)}" ) def test_empty_severities_sequence_means_no_restriction( self, repo: ActivityLogRepository ) -> None: """An empty list for severities must behave the same as None (no restriction).""" repo.record(_entry(severity=ActivitySeverity.INFO)) repo.record(_entry(severity=ActivitySeverity.ERROR)) results_empty = repo.query(ActivityLogFilters(severities=[]), limit=10) results_none = repo.query(ActivityLogFilters(severities=None), limit=10) assert len(results_empty) == len( results_none ), "severities=[] and severities=None should behave identically" def test_since_is_inclusive(self, repo: ActivityLogRepository) -> None: """since is an INCLUSIVE lower bound: ts >= since.""" ts = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc) repo.record(_entry(ts=ts - timedelta(seconds=1), message="before")) repo.record(_entry(ts=ts, message="at boundary")) repo.record(_entry(ts=ts + timedelta(seconds=1), message="after")) results = repo.query(ActivityLogFilters(since=ts), limit=10) messages = {r.message for r in results} assert "at boundary" in messages, "since boundary row (ts == since) must be included" assert "before" not in messages def test_until_is_inclusive(self, repo: ActivityLogRepository) -> None: """until is an INCLUSIVE upper bound: ts <= until.""" ts = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc) repo.record(_entry(ts=ts - timedelta(seconds=1), message="before")) repo.record(_entry(ts=ts, message="at boundary")) repo.record(_entry(ts=ts + timedelta(seconds=1), message="after")) results = repo.query(ActivityLogFilters(until=ts), limit=10) messages = {r.message for r in results} assert "at boundary" in messages, "until boundary row (ts == until) must be included" assert "after" not in messages def test_since_and_until_define_closed_range(self, repo: ActivityLogRepository) -> None: """Combining since + until must keep rows in [since, until] inclusive.""" base = datetime(2026, 6, 1, 12, 0, 0, tzinfo=timezone.utc) repo.record(_entry(ts=base - timedelta(hours=1), message="out_before")) repo.record(_entry(ts=base, message="in_start")) repo.record(_entry(ts=base + timedelta(hours=1), message="in_middle")) repo.record(_entry(ts=base + timedelta(hours=2), message="in_end")) repo.record(_entry(ts=base + timedelta(hours=3), message="out_after")) results = repo.query( ActivityLogFilters(since=base, until=base + timedelta(hours=2)), limit=10, ) messages = {r.message for r in results} assert {"in_start", "in_middle", "in_end"} == messages def test_tz_aware_datetime_round_trip(self, repo: ActivityLogRepository) -> None: """UTC-aware datetimes must survive storage and come back tz-aware.""" ts = datetime(2026, 1, 15, 8, 30, 0, tzinfo=timezone.utc) e = _entry(ts=ts) repo.record(e) got = repo.query(ActivityLogFilters(), limit=1)[0] assert got.ts.tzinfo is not None, "Returned ts must be tz-aware" assert got.ts.utcoffset().total_seconds() == 0, "Returned ts must be UTC" # type: ignore[union-attr] assert got.ts == ts def test_count_none_equals_count_empty_filters(self, repo: ActivityLogRepository) -> None: """count(None) == count(ActivityLogFilters()) per acceptance criteria.""" for i in range(4): repo.record(_entry()) assert repo.count(None) == repo.count(ActivityLogFilters()) # --------------------------------------------------------------------------- # 5. Codec / data integrity # --------------------------------------------------------------------------- class TestCodecDataIntegrity: def test_metadata_nested_dict_round_trip(self, repo: ActivityLogRepository) -> None: """Deeply nested metadata survives JSON round-trip.""" meta = { "level1": { "level2": {"level3": [1, 2, 3]}, "list": [{"a": True}, {"b": None}], }, "count": 42, } e = _entry(metadata=meta) repo.record(e) got = repo.query(ActivityLogFilters(), limit=1)[0] assert got.metadata == meta def test_metadata_unicode_round_trip(self, repo: ActivityLogRepository) -> None: """Unicode (including emoji and CJK) in metadata survives storage.""" meta = {"label": "こんにちは", "emoji": "🎉", "arrow": "→"} e = _entry(metadata=meta) repo.record(e) got = repo.query(ActivityLogFilters(), limit=1)[0] assert got.metadata == meta def test_metadata_empty_dict_round_trip(self, repo: ActivityLogRepository) -> None: """An empty {} metadata must come back as {} not None.""" e = _entry(metadata={}) repo.record(e) got = repo.query(ActivityLogFilters(), limit=1)[0] assert got.metadata == {} assert isinstance(got.metadata, dict) def test_metadata_json_special_chars(self, repo: ActivityLogRepository) -> None: """Metadata with JSON-special characters (backslash, quotes) round-trips correctly.""" meta = {"path": "C:\\Users\\test", "quoted": '"hello"', "newline": "line1\nline2"} e = _entry(metadata=meta) repo.record(e) got = repo.query(ActivityLogFilters(), limit=1)[0] assert got.metadata == meta def test_entity_type_none_vs_empty_string(self, repo: ActivityLogRepository) -> None: """None entity_type must come back as None (not empty string ''). These are semantically different — None means 'not applicable'.""" e = _entry(entity_type=None, entity_id=None, entity_name=None) repo.record(e) got = repo.query(ActivityLogFilters(), limit=1)[0] # Must be exactly None, not "" assert got.entity_type is None assert got.entity_id is None assert got.entity_name is None def test_ts_microsecond_precision_preserved(self, repo: ActivityLogRepository) -> None: """Microsecond component of ts must survive the isoformat() round-trip.""" ts = datetime(2026, 6, 9, 12, 34, 56, 789012, tzinfo=timezone.utc) e = _entry(ts=ts) repo.record(e) got = repo.query(ActivityLogFilters(), limit=1)[0] assert got.ts == ts, f"Expected {ts!r}, got {got.ts!r} — microsecond precision may be lost" def test_to_row_does_not_include_seq(self) -> None: """to_row() must NOT include 'seq' (it's DB-assigned).""" e = _entry() row = e.to_row() assert "seq" not in row, "to_row() must not include seq — it is DB-assigned" def test_to_row_has_exactly_11_keys(self) -> None: """Acceptance criteria: to_row() returns 11 keys.""" e = _entry() row = e.to_row() expected_keys = { "id", "ts", "category", "action", "severity", "actor", "entity_type", "entity_id", "entity_name", "message", "metadata", } assert set(row.keys()) == expected_keys def test_from_row_ignores_seq_column(self) -> None: """from_row() must not raise or fail when 'seq' is present in the dict.""" e = _entry() row = e.to_row() row["seq"] = 42 # inject seq as if from DB recovered = ActivityLogEntry.from_row(row) assert recovered.id == e.id def test_from_row_naive_ts_becomes_utc_aware(self) -> None: """If a stored ts has no timezone offset (legacy row), from_row must attach UTC.""" e = _entry() row = e.to_row() # Strip timezone from the isoformat string to simulate a legacy row row["ts"] = datetime(2026, 1, 1, 10, 0, 0).isoformat() # naive recovered = ActivityLogEntry.from_row(row) assert recovered.ts.tzinfo is not None, "Legacy naive ts must become tz-aware (UTC)" def test_metadata_with_numeric_keys_round_trip(self, repo: ActivityLogRepository) -> None: """JSON only supports string keys; numeric keys are coerced to strings.""" # This tests that the codec doesn't silently crash on non-string keys # (Python allows them but JSON does not — json.dumps coerces to string) meta = {1: "one", "two": 2} e = _entry(metadata=meta) # type: ignore[arg-type] repo.record(e) got = repo.query(ActivityLogFilters(), limit=1)[0] # json.dumps coerces int key 1 → "1" assert "1" in got.metadata or 1 in got.metadata def test_all_category_values_round_trip(self, repo: ActivityLogRepository) -> None: """Every ActivityCategory constant must survive storage without corruption.""" for cat in ActivityCategory.ALL: repo.record(_entry(category=cat, message=f"cat:{cat}")) for cat in ActivityCategory.ALL: results = repo.query(ActivityLogFilters(categories=[cat]), limit=10) assert len(results) == 1 assert results[0].category == cat def test_all_severity_values_round_trip(self, repo: ActivityLogRepository) -> None: """Every ActivitySeverity constant must survive storage without corruption.""" for sev in ActivitySeverity.ALL: repo.record(_entry(severity=sev, message=f"sev:{sev}")) for sev in ActivitySeverity.ALL: results = repo.query(ActivityLogFilters(severities=[sev]), limit=10) assert len(results) == 1 assert results[0].severity == sev # --------------------------------------------------------------------------- # 6. Migration idempotency (additional structural checks) # --------------------------------------------------------------------------- class TestMigrationIdempotencyExtended: def test_table_has_autoincrement_seq(self, tmp_db: Database) -> None: """The seq column must be INTEGER PRIMARY KEY AUTOINCREMENT — never reuse deleted seqs.""" repo = ActivityLogRepository(tmp_db) e1 = _entry(message="first") e2 = _entry(message="second") repo.record(e1) repo.record(e2) seq1 = _get_seq(repo, e1.id) seq2 = _get_seq(repo, e2.id) assert seq2 > seq1, "AUTOINCREMENT must produce monotonically increasing seqs" # After clear, a new record must get a seq higher than the previous max repo.clear() e3 = _entry(message="third") repo.record(e3) seq3 = _get_seq(repo, e3.id) assert seq3 > seq2, "AUTOINCREMENT must not reuse seqs after DELETE" def test_all_expected_indexes_present(self, tmp_db: Database) -> None: """All 5 indexes declared in the acceptance criteria must exist.""" ActivityLogRepository(tmp_db) cursor = tmp_db.execute( "SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='activity_log'" ) index_names = {row["name"] for row in cursor.fetchall()} required = { "idx_activity_log_ts_seq", "idx_activity_log_category", "idx_activity_log_severity", "idx_activity_log_actor", "idx_activity_log_entity", } missing = required - index_names assert not missing, f"Missing indexes: {missing}" def test_id_column_has_unique_constraint(self, tmp_db: Database) -> None: """Inserting duplicate id must raise IntegrityError.""" import sqlite3 as sqlite_module repo = ActivityLogRepository(tmp_db) fixed_id = f"al_{uuid.uuid4().hex[:8]}" repo.record(_entry(id=fixed_id, message="first")) with pytest.raises((Exception, sqlite_module.IntegrityError)): repo.record(_entry(id=fixed_id, message="duplicate id")) def test_migration_name_is_002_add_activity_log(self, tmp_db: Database) -> None: """The migration name must exactly match '002_add_activity_log'.""" from ledgrab.storage.data_migrations import AddActivityLogTableMigration migration = AddActivityLogTableMigration() assert migration.name == "002_add_activity_log" def test_migration_is_second_in_all_migrations(self) -> None: """AddActivityLogTableMigration must be at index [1] in ALL_MIGRATIONS.""" from ledgrab.storage.data_migrations import ( ALL_MIGRATIONS, AddActivityLogTableMigration, ) assert len(ALL_MIGRATIONS) >= 2, "ALL_MIGRATIONS must have at least 2 entries" assert isinstance( ALL_MIGRATIONS[1], AddActivityLogTableMigration ), "AddActivityLogTableMigration must be the second migration (index 1)" def test_apply_twice_is_noop_no_error(self, tmp_db: Database) -> None: """Calling apply() on the connection twice must not raise — IF NOT EXISTS ensures this.""" from ledgrab.storage.data_migrations import AddActivityLogTableMigration migration = AddActivityLogTableMigration() with tmp_db.transaction() as conn: migration.apply(conn) # Second apply — must not raise with tmp_db.transaction() as conn: migration.apply(conn) # Table should still be accessible cursor = tmp_db.execute("SELECT COUNT(*) AS cnt FROM activity_log") assert cursor.fetchone()["cnt"] == 0 # --------------------------------------------------------------------------- # 7. iter_export vs query consistency # --------------------------------------------------------------------------- class TestIterExportConsistency: def test_iter_export_empty_table_yields_nothing(self, repo: ActivityLogRepository) -> None: """iter_export on empty table must yield nothing, not raise.""" exported = list(repo.iter_export()) assert exported == [] def test_iter_export_matches_query_results(self, repo: ActivityLogRepository) -> None: """iter_export(filters) and query(filters) must return the same entries.""" for i in range(8): cat = ActivityCategory.AUTH if i % 2 == 0 else ActivityCategory.DEVICE repo.record(_entry(category=cat, message=f"e{i}")) filters = ActivityLogFilters(categories=[ActivityCategory.AUTH]) exported_ids = {e.id for e in repo.iter_export(filters)} queried_ids = {e.id for e in repo.query(filters, limit=100)} assert ( exported_ids == queried_ids ), "iter_export and query must return the same set of entries for the same filters" def test_iter_export_none_filter_yields_all(self, repo: ActivityLogRepository) -> None: """iter_export(None) must yield all rows (same as query with no filter).""" for i in range(5): repo.record(_entry(message=f"e{i}")) all_exported = list(repo.iter_export(None)) all_queried = repo.query(ActivityLogFilters(), limit=100) assert len(all_exported) == len(all_queried) == 5 def test_iter_export_ascending_seq_order(self, repo: ActivityLogRepository) -> None: """iter_export must yield rows in ascending seq order (oldest first).""" base = datetime(2026, 1, 1, tzinfo=timezone.utc) for i in range(5): repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}")) exported = list(repo.iter_export()) seqs = [_get_seq(repo, e.id) for e in exported] assert seqs == sorted(seqs), "iter_export must yield rows in ascending seq order" def test_iter_export_respects_message_like_filter(self, repo: ActivityLogRepository) -> None: """iter_export should honour message_like just as query does.""" repo.record(_entry(message="found: hello world")) repo.record(_entry(message="nothing relevant here")) repo.record(_entry(message="also found: hello there")) exported = list(repo.iter_export(ActivityLogFilters(message_like="found"))) assert len(exported) == 2 assert all("found" in e.message for e in exported) def test_iter_export_is_lazy_generator(self, repo: ActivityLogRepository) -> None: """iter_export must return a generator (lazy), not a list.""" import types for _ in range(3): repo.record(_entry()) result = repo.iter_export() assert isinstance( result, types.GeneratorType ), "iter_export must return a generator for streaming — not a pre-loaded list"