1ac4a0f66d
- ActivityLogEntry dataclass + ActivityCategory/ActivitySeverity + ActivityLogFilters - additive idempotent migration 002_add_activity_log (indexed activity_log table, seq keyset tiebreaker) - ActivityLogRepository (record/query/count/prune/clear/iter_export), keyset pagination, parameterized SQL - 102 unit + adversarial tests (SQL-injection, pagination, prune, codec, migration idempotency)
885 lines
38 KiB
Python
885 lines
38 KiB
Python
"""Adversarial / edge-case tests for ActivityLogRepository (Phase 1 — storage layer).
|
|
|
|
These tests are intentionally skeptical — they derive expected behaviour from the
|
|
acceptance criteria in plans/activity-log/phase-1-storage.md, NOT from what the
|
|
code happens to do today. If a test fails, it is a real bug.
|
|
|
|
Coverage areas
|
|
--------------
|
|
1. SQL-injection / parameterization safety (message_like with %, _, ;, --, quotes, etc.)
|
|
2. Keyset pagination edge cases (empty table, before_seq bounds, stability,
|
|
ordering contract, no duplicates/gaps)
|
|
3. Prune edge cases (before_ts only, max_entries only, both,
|
|
max_entries=0, larger than count, deleted count,
|
|
keeps NEWEST entries)
|
|
4. Filter combination edge cases (AND semantics, empty sequence vs None,
|
|
since/until inclusive bounds, tz-aware datetimes)
|
|
5. Codec / data integrity (metadata round-trip: nested, unicode, JSON-escape;
|
|
entity_* None vs empty string; microsecond ts)
|
|
6. Migration idempotency (table + all indexes present; double-run is no-op)
|
|
7. iter_export vs query consistency (same filters yield same rows; empty table; filter)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import pytest
|
|
|
|
from ledgrab.storage.activity_log import (
|
|
ActivityCategory,
|
|
ActivityLogEntry,
|
|
ActivityLogFilters,
|
|
ActivitySeverity,
|
|
)
|
|
from ledgrab.storage.activity_log_repository import ActivityLogRepository
|
|
from ledgrab.storage.database import Database
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers (mirror the implementer's helpers so tests are self-contained)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _now() -> datetime:
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
_SENTINEL = object()
|
|
|
|
|
|
def _entry(
|
|
*,
|
|
id: str | None = None,
|
|
ts: datetime | None = None,
|
|
category: str = ActivityCategory.ENTITY,
|
|
action: str = "entity.created",
|
|
severity: str = ActivitySeverity.INFO,
|
|
actor: str = "test_actor",
|
|
entity_type: str | None = "output_target",
|
|
entity_id: object = _SENTINEL,
|
|
entity_name: str | None = "My Target",
|
|
message: str = "Created output target",
|
|
metadata: dict | None = None,
|
|
) -> ActivityLogEntry:
|
|
resolved_entity_id: str | None = (
|
|
f"ot_{uuid.uuid4().hex[:8]}" if entity_id is _SENTINEL else entity_id # type: ignore[assignment]
|
|
)
|
|
return ActivityLogEntry(
|
|
id=id or f"al_{uuid.uuid4().hex[:8]}",
|
|
ts=ts or _now(),
|
|
category=category,
|
|
action=action,
|
|
severity=severity,
|
|
actor=actor,
|
|
entity_type=entity_type,
|
|
entity_id=resolved_entity_id,
|
|
entity_name=entity_name,
|
|
message=message,
|
|
metadata=metadata if metadata is not None else {},
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def repo(tmp_db: Database) -> ActivityLogRepository:
|
|
"""Fresh ActivityLogRepository backed by a temp database."""
|
|
return ActivityLogRepository(tmp_db)
|
|
|
|
|
|
def _get_seq(repo: ActivityLogRepository, entry_id: str) -> int:
|
|
cursor = repo._db.execute("SELECT seq FROM activity_log WHERE id = ?", (entry_id,))
|
|
row = cursor.fetchone()
|
|
assert row is not None, f"No row found for id={entry_id!r}"
|
|
return int(row["seq"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 1. SQL-injection / parameterization safety
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSQLInjectionSafety:
|
|
"""All user-supplied filter values must be treated as literal text, not SQL."""
|
|
|
|
def test_message_like_percent_is_literal(self, repo: ActivityLogRepository) -> None:
|
|
"""A literal '%' in message_like must NOT act as a LIKE wildcard."""
|
|
repo.record(_entry(message="100% done"))
|
|
repo.record(_entry(message="all done"))
|
|
repo.record(_entry(message="percent sign here"))
|
|
|
|
results = repo.query(ActivityLogFilters(message_like="100%"), limit=10)
|
|
assert len(results) == 1, "% in message_like should be a literal percent, not a wildcard"
|
|
assert results[0].message == "100% done"
|
|
|
|
def test_message_like_underscore_is_literal(self, repo: ActivityLogRepository) -> None:
|
|
"""A literal '_' in message_like must NOT act as a single-char wildcard."""
|
|
repo.record(_entry(message="device_01"))
|
|
repo.record(_entry(message="device001")) # would match if _ were a wildcard
|
|
repo.record(_entry(message="some other message"))
|
|
|
|
results = repo.query(ActivityLogFilters(message_like="device_01"), limit=10)
|
|
assert (
|
|
len(results) == 1
|
|
), "_ in message_like should be a literal underscore, not a single-char wildcard"
|
|
assert results[0].message == "device_01"
|
|
|
|
def test_message_like_single_quote_does_not_break_query(
|
|
self, repo: ActivityLogRepository
|
|
) -> None:
|
|
"""A single quote in message_like must not cause a SQL syntax error."""
|
|
repo.record(_entry(message="it's working"))
|
|
repo.record(_entry(message="no quote here"))
|
|
|
|
# Must not raise
|
|
results = repo.query(ActivityLogFilters(message_like="it's"), limit=10)
|
|
assert len(results) == 1
|
|
assert results[0].message == "it's working"
|
|
|
|
def test_message_like_semicolon_does_not_execute_second_statement(
|
|
self, repo: ActivityLogRepository
|
|
) -> None:
|
|
"""';' in message_like must not let a second SQL statement execute."""
|
|
repo.record(_entry(message="a; DROP TABLE activity_log; --"))
|
|
repo.record(_entry(message="safe message"))
|
|
|
|
# If injection succeeded, table would be dropped and next call would error
|
|
results = repo.query(
|
|
ActivityLogFilters(message_like="a; DROP TABLE activity_log; --"), limit=10
|
|
)
|
|
# Table must still exist
|
|
assert repo.count() == 2
|
|
assert len(results) == 1
|
|
|
|
def test_message_like_sql_comment_sequence(self, repo: ActivityLogRepository) -> None:
|
|
"""'--' (SQL comment) in message_like must be treated literally."""
|
|
repo.record(_entry(message="value -- comment"))
|
|
repo.record(_entry(message="value no comment"))
|
|
|
|
results = repo.query(ActivityLogFilters(message_like="value --"), limit=10)
|
|
assert len(results) == 1
|
|
assert results[0].message == "value -- comment"
|
|
|
|
def test_message_like_backslash_literal(self, repo: ActivityLogRepository) -> None:
|
|
"""Backslash in message_like must be treated as a literal character."""
|
|
repo.record(_entry(message="path\\to\\file"))
|
|
repo.record(_entry(message="path/to/file"))
|
|
|
|
results = repo.query(ActivityLogFilters(message_like="path\\to"), limit=10)
|
|
assert len(results) == 1
|
|
assert results[0].message == "path\\to\\file"
|
|
|
|
def test_message_like_classic_injection_pattern(self, repo: ActivityLogRepository) -> None:
|
|
"""Classic ') OR '1'='1 injection attempt must return no false positives."""
|
|
repo.record(_entry(message="innocent message"))
|
|
repo.record(_entry(message="another message"))
|
|
|
|
# If injection worked, all rows would match
|
|
results = repo.query(ActivityLogFilters(message_like="') OR '1'='1"), limit=10)
|
|
assert (
|
|
len(results) == 0
|
|
), "Injection payload matched rows it shouldn't — parameterization may be broken"
|
|
|
|
def test_message_like_all_wildcards_returns_nothing_for_no_match(
|
|
self, repo: ActivityLogRepository
|
|
) -> None:
|
|
"""'%_%' as a literal search term should return no rows unless that exact
|
|
substring appears in a message."""
|
|
repo.record(_entry(message="some message"))
|
|
|
|
results = repo.query(ActivityLogFilters(message_like="%_%"), limit=10)
|
|
# '%_%' as literal text does not appear in "some message"
|
|
assert (
|
|
len(results) == 0
|
|
), "% and _ in message_like were treated as SQL wildcards instead of literals"
|
|
|
|
def test_actor_exact_match_not_like(self, repo: ActivityLogRepository) -> None:
|
|
"""actor filter is exact match — SQL wildcards in value must not act as wildcards."""
|
|
repo.record(_entry(actor="alice"))
|
|
repo.record(_entry(actor="alice_admin"))
|
|
|
|
results = repo.query(ActivityLogFilters(actor="alice"), limit=10)
|
|
assert (
|
|
len(results) == 1
|
|
), "actor filter is exact-match; 'alice' should not match 'alice_admin'"
|
|
assert results[0].actor == "alice"
|
|
|
|
def test_entity_id_exact_match_not_like(self, repo: ActivityLogRepository) -> None:
|
|
"""entity_id filter is exact match — prefix should not leak."""
|
|
repo.record(_entry(entity_id="ot_abc"))
|
|
repo.record(_entry(entity_id="ot_abc_extra"))
|
|
|
|
results = repo.query(ActivityLogFilters(entity_id="ot_abc"), limit=10)
|
|
assert len(results) == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 2. Keyset pagination edge cases
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestKeysetPaginationEdges:
|
|
def test_empty_table_returns_empty_list(self, repo: ActivityLogRepository) -> None:
|
|
"""Query on empty table must return [] not raise."""
|
|
results = repo.query(ActivityLogFilters(), limit=10)
|
|
assert results == []
|
|
|
|
def test_before_seq_none_is_first_page(self, repo: ActivityLogRepository) -> None:
|
|
"""before_seq=None must return the newest (first) page."""
|
|
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
|
|
for i in range(5):
|
|
repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}"))
|
|
|
|
page = repo.query(ActivityLogFilters(), before_seq=None, limit=3)
|
|
assert len(page) == 3
|
|
# Page should contain the 3 newest entries
|
|
messages = {e.message for e in page}
|
|
assert "e4" in messages
|
|
assert "e3" in messages
|
|
assert "e2" in messages
|
|
|
|
def test_before_seq_smaller_than_all_rows_returns_empty(
|
|
self, repo: ActivityLogRepository
|
|
) -> None:
|
|
"""before_seq=1 (smaller than all autoincrement seqs) returns empty page."""
|
|
for i in range(5):
|
|
repo.record(_entry(message=f"e{i}"))
|
|
|
|
# seq starts at 1, so before_seq=1 means seq < 1 — no rows
|
|
results = repo.query(ActivityLogFilters(), before_seq=1, limit=10)
|
|
assert (
|
|
results == []
|
|
), "before_seq=1 should yield empty page since autoincrement starts at 1 (seq<1 = nothing)"
|
|
|
|
def test_before_seq_larger_than_max_returns_full_first_page(
|
|
self, repo: ActivityLogRepository
|
|
) -> None:
|
|
"""before_seq larger than any seq in the table behaves like before_seq=None."""
|
|
for i in range(5):
|
|
repo.record(_entry(message=f"e{i}"))
|
|
|
|
page_none = repo.query(ActivityLogFilters(), before_seq=None, limit=5)
|
|
page_large = repo.query(ActivityLogFilters(), before_seq=999_999, limit=5)
|
|
|
|
ids_none = {e.id for e in page_none}
|
|
ids_large = {e.id for e in page_large}
|
|
assert ids_none == ids_large
|
|
|
|
def test_page_boundary_limit_equals_row_count(self, repo: ActivityLogRepository) -> None:
|
|
"""When limit == total rows, one page covers all rows and a second page is empty."""
|
|
for i in range(5):
|
|
repo.record(_entry(message=f"e{i}"))
|
|
|
|
page1 = repo.query(ActivityLogFilters(), limit=5)
|
|
assert len(page1) == 5
|
|
|
|
first_seq = _get_seq(repo, page1[0].id)
|
|
page2 = repo.query(ActivityLogFilters(), before_seq=first_seq, limit=5)
|
|
assert page2 == []
|
|
|
|
def test_ordering_contract_page_zero_is_smallest_seq(self, repo: ActivityLogRepository) -> None:
|
|
"""Within a page, page[0] must have the smallest seq (ascending chrono order).
|
|
The acceptance criteria state: 'The smallest seq on a page is page[0]'s seq —
|
|
pass that as before_seq for the next page.'"""
|
|
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
|
|
for i in range(6):
|
|
repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}"))
|
|
|
|
page = repo.query(ActivityLogFilters(), limit=6)
|
|
seqs = [_get_seq(repo, e.id) for e in page]
|
|
assert seqs == sorted(
|
|
seqs
|
|
), "page must be in ascending seq order (page[0] is oldest/smallest seq)"
|
|
|
|
def test_no_duplicates_across_full_walk(self, repo: ActivityLogRepository) -> None:
|
|
"""Walking the entire table page by page yields each row exactly once."""
|
|
total = 11
|
|
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
|
|
for i in range(total):
|
|
repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}"))
|
|
|
|
all_ids: list[str] = []
|
|
before_seq: int | None = None
|
|
limit = 4
|
|
|
|
while True:
|
|
page = repo.query(ActivityLogFilters(), before_seq=before_seq, limit=limit)
|
|
if not page:
|
|
break
|
|
all_ids.extend(e.id for e in page)
|
|
before_seq = _get_seq(repo, page[0].id)
|
|
|
|
assert len(all_ids) == total, "Total rows from all pages must equal inserted count"
|
|
assert len(set(all_ids)) == total, "No duplicate IDs across pages"
|
|
|
|
def test_no_gaps_across_full_walk(self, repo: ActivityLogRepository) -> None:
|
|
"""Walking the entire table page by page with limit=1 yields every row."""
|
|
total = 7
|
|
for i in range(total):
|
|
repo.record(_entry(message=f"e{i}"))
|
|
|
|
all_ids: list[str] = []
|
|
before_seq: int | None = None
|
|
while True:
|
|
page = repo.query(ActivityLogFilters(), before_seq=before_seq, limit=1)
|
|
if not page:
|
|
break
|
|
all_ids.append(page[0].id)
|
|
before_seq = _get_seq(repo, page[0].id)
|
|
|
|
assert len(all_ids) == total
|
|
|
|
def test_many_rows_same_ts_no_duplicates_or_gaps(self, repo: ActivityLogRepository) -> None:
|
|
"""With many identical timestamps, pagination via seq prevents any dup or gap."""
|
|
same_ts = datetime(2026, 5, 1, 10, 0, 0, tzinfo=timezone.utc)
|
|
count = 9
|
|
for i in range(count):
|
|
repo.record(_entry(ts=same_ts, message=f"same-ts {i}"))
|
|
|
|
all_ids: list[str] = []
|
|
before_seq: int | None = None
|
|
limit = 4
|
|
while True:
|
|
page = repo.query(ActivityLogFilters(), before_seq=before_seq, limit=limit)
|
|
if not page:
|
|
break
|
|
all_ids.extend(e.id for e in page)
|
|
before_seq = _get_seq(repo, page[0].id)
|
|
|
|
assert len(all_ids) == count
|
|
assert len(set(all_ids)) == count, "Duplicates found in same-ts pagination walk"
|
|
|
|
def test_next_page_cursor_is_page_zero_seq(self, repo: ActivityLogRepository) -> None:
|
|
"""The documented contract: pass page[0].seq as before_seq for next page.
|
|
Verify the next page does NOT overlap with the current page."""
|
|
for i in range(6):
|
|
repo.record(_entry(message=f"e{i}"))
|
|
|
|
page1 = repo.query(ActivityLogFilters(), limit=3)
|
|
cursor = _get_seq(repo, page1[0].id) # page[0] = smallest seq on page
|
|
page2 = repo.query(ActivityLogFilters(), before_seq=cursor, limit=3)
|
|
|
|
ids1 = {e.id for e in page1}
|
|
ids2 = {e.id for e in page2}
|
|
assert ids1.isdisjoint(ids2), "Pages overlap — cursor contract broken"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 3. Prune edge cases
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPruneEdgeCases:
|
|
def test_prune_before_ts_only_no_max_entries(self, repo: ActivityLogRepository) -> None:
|
|
"""before_ts alone removes only old rows; recent rows untouched."""
|
|
cutoff = datetime(2026, 3, 1, tzinfo=timezone.utc)
|
|
repo.record(_entry(ts=cutoff - timedelta(days=2), message="old"))
|
|
repo.record(_entry(ts=cutoff + timedelta(days=1), message="new"))
|
|
|
|
deleted = repo.prune(before_ts=cutoff)
|
|
assert deleted == 1
|
|
remaining = repo.query(ActivityLogFilters(), limit=10)
|
|
assert len(remaining) == 1
|
|
assert remaining[0].message == "new"
|
|
|
|
def test_prune_max_entries_only_no_before_ts(self, repo: ActivityLogRepository) -> None:
|
|
"""max_entries alone trims to N newest; no age filter applied."""
|
|
for i in range(6):
|
|
repo.record(_entry(message=f"e{i}"))
|
|
|
|
deleted = repo.prune(max_entries=2)
|
|
assert deleted == 4
|
|
assert repo.count() == 2
|
|
|
|
def test_prune_max_entries_zero_deletes_all(self, repo: ActivityLogRepository) -> None:
|
|
"""max_entries=0 means keep nothing — all rows deleted."""
|
|
for i in range(5):
|
|
repo.record(_entry())
|
|
|
|
deleted = repo.prune(max_entries=0)
|
|
assert deleted == 5
|
|
assert repo.count() == 0
|
|
|
|
def test_prune_max_entries_larger_than_count_is_noop(self, repo: ActivityLogRepository) -> None:
|
|
"""max_entries > actual count must not delete anything."""
|
|
for i in range(3):
|
|
repo.record(_entry(message=f"e{i}"))
|
|
|
|
deleted = repo.prune(max_entries=100)
|
|
assert deleted == 0
|
|
assert repo.count() == 3
|
|
|
|
def test_prune_keeps_newest_entries_by_seq(self, repo: ActivityLogRepository) -> None:
|
|
"""max_entries prune MUST keep the rows with the HIGHEST seq values."""
|
|
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
|
|
all_ids = []
|
|
for i in range(6):
|
|
e = _entry(ts=base + timedelta(seconds=i), message=f"e{i}")
|
|
all_ids.append(e.id)
|
|
repo.record(e)
|
|
|
|
# keep only 2
|
|
repo.prune(max_entries=2)
|
|
remaining = repo.query(ActivityLogFilters(), limit=10)
|
|
remaining_ids = {r.id for r in remaining}
|
|
|
|
# Must keep the last two inserted (highest seq = newest)
|
|
assert all_ids[-1] in remaining_ids, "Newest entry (e5) must be kept"
|
|
assert all_ids[-2] in remaining_ids, "Second newest entry (e4) must be kept"
|
|
# Oldest must be gone
|
|
assert all_ids[0] not in remaining_ids, "Oldest entry (e0) must be pruned"
|
|
|
|
def test_prune_both_returns_sum_of_deleted(self, repo: ActivityLogRepository) -> None:
|
|
"""prune(before_ts, max_entries) returns the TOTAL rows deleted by both steps."""
|
|
base = datetime(2026, 4, 1, tzinfo=timezone.utc)
|
|
# 4 old entries (before base)
|
|
for i in range(4):
|
|
repo.record(_entry(ts=base - timedelta(hours=i + 1), message=f"old{i}"))
|
|
# 4 new entries (after base)
|
|
for i in range(4):
|
|
repo.record(_entry(ts=base + timedelta(hours=i + 1), message=f"new{i}"))
|
|
|
|
# prune old, then keep only 2 new
|
|
deleted = repo.prune(before_ts=base, max_entries=2)
|
|
# 4 old + 2 of the 4 new = 6 total
|
|
assert deleted == 6
|
|
assert repo.count() == 2
|
|
|
|
def test_prune_no_args_is_noop(self, repo: ActivityLogRepository) -> None:
|
|
"""prune() with no args should delete 0 rows."""
|
|
for i in range(3):
|
|
repo.record(_entry())
|
|
|
|
deleted = repo.prune()
|
|
assert deleted == 0
|
|
assert repo.count() == 3
|
|
|
|
def test_prune_before_ts_boundary_is_exclusive(self, repo: ActivityLogRepository) -> None:
|
|
"""prune(before_ts=X) uses strict < X; a row exactly at X must survive."""
|
|
ts = datetime(2026, 5, 1, 12, 0, 0, tzinfo=timezone.utc)
|
|
repo.record(_entry(ts=ts - timedelta(seconds=1), message="before"))
|
|
repo.record(_entry(ts=ts, message="exact boundary"))
|
|
repo.record(_entry(ts=ts + timedelta(seconds=1), message="after"))
|
|
|
|
deleted = repo.prune(before_ts=ts)
|
|
assert deleted == 1 # only "before" deleted
|
|
remaining = {r.message for r in repo.query(ActivityLogFilters(), limit=10)}
|
|
assert "exact boundary" in remaining
|
|
assert "before" not in remaining
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4. Filter combination edge cases
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestFilterCombinationEdges:
|
|
def test_multiple_filters_are_anded(self, repo: ActivityLogRepository) -> None:
|
|
"""All non-None filters must be AND-ed together, not OR-ed."""
|
|
repo.record(
|
|
_entry(actor="alice", category=ActivityCategory.AUTH, severity=ActivitySeverity.ERROR)
|
|
)
|
|
repo.record(
|
|
_entry(actor="alice", category=ActivityCategory.DEVICE, severity=ActivitySeverity.INFO)
|
|
)
|
|
repo.record(
|
|
_entry(actor="bob", category=ActivityCategory.AUTH, severity=ActivitySeverity.ERROR)
|
|
)
|
|
|
|
results = repo.query(
|
|
ActivityLogFilters(
|
|
actor="alice",
|
|
categories=[ActivityCategory.AUTH],
|
|
severities=[ActivitySeverity.ERROR],
|
|
),
|
|
limit=10,
|
|
)
|
|
assert len(results) == 1
|
|
r = results[0]
|
|
assert r.actor == "alice"
|
|
assert r.category == ActivityCategory.AUTH
|
|
assert r.severity == ActivitySeverity.ERROR
|
|
|
|
def test_empty_categories_sequence_means_no_restriction(
|
|
self, repo: ActivityLogRepository
|
|
) -> None:
|
|
"""An empty list for categories must behave the same as None (no restriction).
|
|
The acceptance criteria state empty sequence == None for this dimension."""
|
|
repo.record(_entry(category=ActivityCategory.AUTH))
|
|
repo.record(_entry(category=ActivityCategory.DEVICE))
|
|
|
|
# empty list
|
|
results_empty = repo.query(ActivityLogFilters(categories=[]), limit=10)
|
|
# None
|
|
results_none = repo.query(ActivityLogFilters(categories=None), limit=10)
|
|
|
|
assert len(results_empty) == len(results_none), (
|
|
"categories=[] and categories=None should behave identically (no restriction); "
|
|
f"got {len(results_empty)} vs {len(results_none)}"
|
|
)
|
|
|
|
def test_empty_severities_sequence_means_no_restriction(
|
|
self, repo: ActivityLogRepository
|
|
) -> None:
|
|
"""An empty list for severities must behave the same as None (no restriction)."""
|
|
repo.record(_entry(severity=ActivitySeverity.INFO))
|
|
repo.record(_entry(severity=ActivitySeverity.ERROR))
|
|
|
|
results_empty = repo.query(ActivityLogFilters(severities=[]), limit=10)
|
|
results_none = repo.query(ActivityLogFilters(severities=None), limit=10)
|
|
|
|
assert len(results_empty) == len(
|
|
results_none
|
|
), "severities=[] and severities=None should behave identically"
|
|
|
|
def test_since_is_inclusive(self, repo: ActivityLogRepository) -> None:
|
|
"""since is an INCLUSIVE lower bound: ts >= since."""
|
|
ts = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc)
|
|
repo.record(_entry(ts=ts - timedelta(seconds=1), message="before"))
|
|
repo.record(_entry(ts=ts, message="at boundary"))
|
|
repo.record(_entry(ts=ts + timedelta(seconds=1), message="after"))
|
|
|
|
results = repo.query(ActivityLogFilters(since=ts), limit=10)
|
|
messages = {r.message for r in results}
|
|
assert "at boundary" in messages, "since boundary row (ts == since) must be included"
|
|
assert "before" not in messages
|
|
|
|
def test_until_is_inclusive(self, repo: ActivityLogRepository) -> None:
|
|
"""until is an INCLUSIVE upper bound: ts <= until."""
|
|
ts = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc)
|
|
repo.record(_entry(ts=ts - timedelta(seconds=1), message="before"))
|
|
repo.record(_entry(ts=ts, message="at boundary"))
|
|
repo.record(_entry(ts=ts + timedelta(seconds=1), message="after"))
|
|
|
|
results = repo.query(ActivityLogFilters(until=ts), limit=10)
|
|
messages = {r.message for r in results}
|
|
assert "at boundary" in messages, "until boundary row (ts == until) must be included"
|
|
assert "after" not in messages
|
|
|
|
def test_since_and_until_define_closed_range(self, repo: ActivityLogRepository) -> None:
|
|
"""Combining since + until must keep rows in [since, until] inclusive."""
|
|
base = datetime(2026, 6, 1, 12, 0, 0, tzinfo=timezone.utc)
|
|
repo.record(_entry(ts=base - timedelta(hours=1), message="out_before"))
|
|
repo.record(_entry(ts=base, message="in_start"))
|
|
repo.record(_entry(ts=base + timedelta(hours=1), message="in_middle"))
|
|
repo.record(_entry(ts=base + timedelta(hours=2), message="in_end"))
|
|
repo.record(_entry(ts=base + timedelta(hours=3), message="out_after"))
|
|
|
|
results = repo.query(
|
|
ActivityLogFilters(since=base, until=base + timedelta(hours=2)),
|
|
limit=10,
|
|
)
|
|
messages = {r.message for r in results}
|
|
assert {"in_start", "in_middle", "in_end"} == messages
|
|
|
|
def test_tz_aware_datetime_round_trip(self, repo: ActivityLogRepository) -> None:
|
|
"""UTC-aware datetimes must survive storage and come back tz-aware."""
|
|
ts = datetime(2026, 1, 15, 8, 30, 0, tzinfo=timezone.utc)
|
|
e = _entry(ts=ts)
|
|
repo.record(e)
|
|
|
|
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
|
assert got.ts.tzinfo is not None, "Returned ts must be tz-aware"
|
|
assert got.ts.utcoffset().total_seconds() == 0, "Returned ts must be UTC" # type: ignore[union-attr]
|
|
assert got.ts == ts
|
|
|
|
def test_count_none_equals_count_empty_filters(self, repo: ActivityLogRepository) -> None:
|
|
"""count(None) == count(ActivityLogFilters()) per acceptance criteria."""
|
|
for i in range(4):
|
|
repo.record(_entry())
|
|
|
|
assert repo.count(None) == repo.count(ActivityLogFilters())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 5. Codec / data integrity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCodecDataIntegrity:
|
|
def test_metadata_nested_dict_round_trip(self, repo: ActivityLogRepository) -> None:
|
|
"""Deeply nested metadata survives JSON round-trip."""
|
|
meta = {
|
|
"level1": {
|
|
"level2": {"level3": [1, 2, 3]},
|
|
"list": [{"a": True}, {"b": None}],
|
|
},
|
|
"count": 42,
|
|
}
|
|
e = _entry(metadata=meta)
|
|
repo.record(e)
|
|
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
|
assert got.metadata == meta
|
|
|
|
def test_metadata_unicode_round_trip(self, repo: ActivityLogRepository) -> None:
|
|
"""Unicode (including emoji and CJK) in metadata survives storage."""
|
|
meta = {"label": "こんにちは", "emoji": "🎉", "arrow": "→"}
|
|
e = _entry(metadata=meta)
|
|
repo.record(e)
|
|
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
|
assert got.metadata == meta
|
|
|
|
def test_metadata_empty_dict_round_trip(self, repo: ActivityLogRepository) -> None:
|
|
"""An empty {} metadata must come back as {} not None."""
|
|
e = _entry(metadata={})
|
|
repo.record(e)
|
|
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
|
assert got.metadata == {}
|
|
assert isinstance(got.metadata, dict)
|
|
|
|
def test_metadata_json_special_chars(self, repo: ActivityLogRepository) -> None:
|
|
"""Metadata with JSON-special characters (backslash, quotes) round-trips correctly."""
|
|
meta = {"path": "C:\\Users\\test", "quoted": '"hello"', "newline": "line1\nline2"}
|
|
e = _entry(metadata=meta)
|
|
repo.record(e)
|
|
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
|
assert got.metadata == meta
|
|
|
|
def test_entity_type_none_vs_empty_string(self, repo: ActivityLogRepository) -> None:
|
|
"""None entity_type must come back as None (not empty string '').
|
|
These are semantically different — None means 'not applicable'."""
|
|
e = _entry(entity_type=None, entity_id=None, entity_name=None)
|
|
repo.record(e)
|
|
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
|
# Must be exactly None, not ""
|
|
assert got.entity_type is None
|
|
assert got.entity_id is None
|
|
assert got.entity_name is None
|
|
|
|
def test_ts_microsecond_precision_preserved(self, repo: ActivityLogRepository) -> None:
|
|
"""Microsecond component of ts must survive the isoformat() round-trip."""
|
|
ts = datetime(2026, 6, 9, 12, 34, 56, 789012, tzinfo=timezone.utc)
|
|
e = _entry(ts=ts)
|
|
repo.record(e)
|
|
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
|
assert got.ts == ts, f"Expected {ts!r}, got {got.ts!r} — microsecond precision may be lost"
|
|
|
|
def test_to_row_does_not_include_seq(self) -> None:
|
|
"""to_row() must NOT include 'seq' (it's DB-assigned)."""
|
|
e = _entry()
|
|
row = e.to_row()
|
|
assert "seq" not in row, "to_row() must not include seq — it is DB-assigned"
|
|
|
|
def test_to_row_has_exactly_11_keys(self) -> None:
|
|
"""Acceptance criteria: to_row() returns 11 keys."""
|
|
e = _entry()
|
|
row = e.to_row()
|
|
expected_keys = {
|
|
"id",
|
|
"ts",
|
|
"category",
|
|
"action",
|
|
"severity",
|
|
"actor",
|
|
"entity_type",
|
|
"entity_id",
|
|
"entity_name",
|
|
"message",
|
|
"metadata",
|
|
}
|
|
assert set(row.keys()) == expected_keys
|
|
|
|
def test_from_row_ignores_seq_column(self) -> None:
|
|
"""from_row() must not raise or fail when 'seq' is present in the dict."""
|
|
e = _entry()
|
|
row = e.to_row()
|
|
row["seq"] = 42 # inject seq as if from DB
|
|
recovered = ActivityLogEntry.from_row(row)
|
|
assert recovered.id == e.id
|
|
|
|
def test_from_row_naive_ts_becomes_utc_aware(self) -> None:
|
|
"""If a stored ts has no timezone offset (legacy row), from_row must attach UTC."""
|
|
e = _entry()
|
|
row = e.to_row()
|
|
# Strip timezone from the isoformat string to simulate a legacy row
|
|
row["ts"] = datetime(2026, 1, 1, 10, 0, 0).isoformat() # naive
|
|
recovered = ActivityLogEntry.from_row(row)
|
|
assert recovered.ts.tzinfo is not None, "Legacy naive ts must become tz-aware (UTC)"
|
|
|
|
def test_metadata_with_numeric_keys_round_trip(self, repo: ActivityLogRepository) -> None:
|
|
"""JSON only supports string keys; numeric keys are coerced to strings."""
|
|
# This tests that the codec doesn't silently crash on non-string keys
|
|
# (Python allows them but JSON does not — json.dumps coerces to string)
|
|
meta = {1: "one", "two": 2}
|
|
e = _entry(metadata=meta) # type: ignore[arg-type]
|
|
repo.record(e)
|
|
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
|
# json.dumps coerces int key 1 → "1"
|
|
assert "1" in got.metadata or 1 in got.metadata
|
|
|
|
def test_all_category_values_round_trip(self, repo: ActivityLogRepository) -> None:
|
|
"""Every ActivityCategory constant must survive storage without corruption."""
|
|
for cat in ActivityCategory.ALL:
|
|
repo.record(_entry(category=cat, message=f"cat:{cat}"))
|
|
|
|
for cat in ActivityCategory.ALL:
|
|
results = repo.query(ActivityLogFilters(categories=[cat]), limit=10)
|
|
assert len(results) == 1
|
|
assert results[0].category == cat
|
|
|
|
def test_all_severity_values_round_trip(self, repo: ActivityLogRepository) -> None:
|
|
"""Every ActivitySeverity constant must survive storage without corruption."""
|
|
for sev in ActivitySeverity.ALL:
|
|
repo.record(_entry(severity=sev, message=f"sev:{sev}"))
|
|
|
|
for sev in ActivitySeverity.ALL:
|
|
results = repo.query(ActivityLogFilters(severities=[sev]), limit=10)
|
|
assert len(results) == 1
|
|
assert results[0].severity == sev
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 6. Migration idempotency (additional structural checks)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMigrationIdempotencyExtended:
|
|
def test_table_has_autoincrement_seq(self, tmp_db: Database) -> None:
|
|
"""The seq column must be INTEGER PRIMARY KEY AUTOINCREMENT — never reuse deleted seqs."""
|
|
repo = ActivityLogRepository(tmp_db)
|
|
e1 = _entry(message="first")
|
|
e2 = _entry(message="second")
|
|
repo.record(e1)
|
|
repo.record(e2)
|
|
seq1 = _get_seq(repo, e1.id)
|
|
seq2 = _get_seq(repo, e2.id)
|
|
assert seq2 > seq1, "AUTOINCREMENT must produce monotonically increasing seqs"
|
|
|
|
# After clear, a new record must get a seq higher than the previous max
|
|
repo.clear()
|
|
e3 = _entry(message="third")
|
|
repo.record(e3)
|
|
seq3 = _get_seq(repo, e3.id)
|
|
assert seq3 > seq2, "AUTOINCREMENT must not reuse seqs after DELETE"
|
|
|
|
def test_all_expected_indexes_present(self, tmp_db: Database) -> None:
|
|
"""All 5 indexes declared in the acceptance criteria must exist."""
|
|
ActivityLogRepository(tmp_db)
|
|
|
|
cursor = tmp_db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='activity_log'"
|
|
)
|
|
index_names = {row["name"] for row in cursor.fetchall()}
|
|
required = {
|
|
"idx_activity_log_ts_seq",
|
|
"idx_activity_log_category",
|
|
"idx_activity_log_severity",
|
|
"idx_activity_log_actor",
|
|
"idx_activity_log_entity",
|
|
}
|
|
missing = required - index_names
|
|
assert not missing, f"Missing indexes: {missing}"
|
|
|
|
def test_id_column_has_unique_constraint(self, tmp_db: Database) -> None:
|
|
"""Inserting duplicate id must raise IntegrityError."""
|
|
import sqlite3 as sqlite_module
|
|
|
|
repo = ActivityLogRepository(tmp_db)
|
|
fixed_id = f"al_{uuid.uuid4().hex[:8]}"
|
|
repo.record(_entry(id=fixed_id, message="first"))
|
|
|
|
with pytest.raises((Exception, sqlite_module.IntegrityError)):
|
|
repo.record(_entry(id=fixed_id, message="duplicate id"))
|
|
|
|
def test_migration_name_is_002_add_activity_log(self, tmp_db: Database) -> None:
|
|
"""The migration name must exactly match '002_add_activity_log'."""
|
|
from ledgrab.storage.data_migrations import AddActivityLogTableMigration
|
|
|
|
migration = AddActivityLogTableMigration()
|
|
assert migration.name == "002_add_activity_log"
|
|
|
|
def test_migration_is_second_in_all_migrations(self) -> None:
|
|
"""AddActivityLogTableMigration must be at index [1] in ALL_MIGRATIONS."""
|
|
from ledgrab.storage.data_migrations import (
|
|
ALL_MIGRATIONS,
|
|
AddActivityLogTableMigration,
|
|
)
|
|
|
|
assert len(ALL_MIGRATIONS) >= 2, "ALL_MIGRATIONS must have at least 2 entries"
|
|
assert isinstance(
|
|
ALL_MIGRATIONS[1], AddActivityLogTableMigration
|
|
), "AddActivityLogTableMigration must be the second migration (index 1)"
|
|
|
|
def test_apply_twice_is_noop_no_error(self, tmp_db: Database) -> None:
|
|
"""Calling apply() on the connection twice must not raise — IF NOT EXISTS ensures this."""
|
|
from ledgrab.storage.data_migrations import AddActivityLogTableMigration
|
|
|
|
migration = AddActivityLogTableMigration()
|
|
with tmp_db.transaction() as conn:
|
|
migration.apply(conn)
|
|
# Second apply — must not raise
|
|
with tmp_db.transaction() as conn:
|
|
migration.apply(conn)
|
|
|
|
# Table should still be accessible
|
|
cursor = tmp_db.execute("SELECT COUNT(*) AS cnt FROM activity_log")
|
|
assert cursor.fetchone()["cnt"] == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 7. iter_export vs query consistency
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestIterExportConsistency:
|
|
def test_iter_export_empty_table_yields_nothing(self, repo: ActivityLogRepository) -> None:
|
|
"""iter_export on empty table must yield nothing, not raise."""
|
|
exported = list(repo.iter_export())
|
|
assert exported == []
|
|
|
|
def test_iter_export_matches_query_results(self, repo: ActivityLogRepository) -> None:
|
|
"""iter_export(filters) and query(filters) must return the same entries."""
|
|
for i in range(8):
|
|
cat = ActivityCategory.AUTH if i % 2 == 0 else ActivityCategory.DEVICE
|
|
repo.record(_entry(category=cat, message=f"e{i}"))
|
|
|
|
filters = ActivityLogFilters(categories=[ActivityCategory.AUTH])
|
|
|
|
exported_ids = {e.id for e in repo.iter_export(filters)}
|
|
queried_ids = {e.id for e in repo.query(filters, limit=100)}
|
|
assert (
|
|
exported_ids == queried_ids
|
|
), "iter_export and query must return the same set of entries for the same filters"
|
|
|
|
def test_iter_export_none_filter_yields_all(self, repo: ActivityLogRepository) -> None:
|
|
"""iter_export(None) must yield all rows (same as query with no filter)."""
|
|
for i in range(5):
|
|
repo.record(_entry(message=f"e{i}"))
|
|
|
|
all_exported = list(repo.iter_export(None))
|
|
all_queried = repo.query(ActivityLogFilters(), limit=100)
|
|
|
|
assert len(all_exported) == len(all_queried) == 5
|
|
|
|
def test_iter_export_ascending_seq_order(self, repo: ActivityLogRepository) -> None:
|
|
"""iter_export must yield rows in ascending seq order (oldest first)."""
|
|
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
|
|
for i in range(5):
|
|
repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}"))
|
|
|
|
exported = list(repo.iter_export())
|
|
seqs = [_get_seq(repo, e.id) for e in exported]
|
|
assert seqs == sorted(seqs), "iter_export must yield rows in ascending seq order"
|
|
|
|
def test_iter_export_respects_message_like_filter(self, repo: ActivityLogRepository) -> None:
|
|
"""iter_export should honour message_like just as query does."""
|
|
repo.record(_entry(message="found: hello world"))
|
|
repo.record(_entry(message="nothing relevant here"))
|
|
repo.record(_entry(message="also found: hello there"))
|
|
|
|
exported = list(repo.iter_export(ActivityLogFilters(message_like="found")))
|
|
assert len(exported) == 2
|
|
assert all("found" in e.message for e in exported)
|
|
|
|
def test_iter_export_is_lazy_generator(self, repo: ActivityLogRepository) -> None:
|
|
"""iter_export must return a generator (lazy), not a list."""
|
|
import types
|
|
|
|
for _ in range(3):
|
|
repo.record(_entry())
|
|
|
|
result = repo.iter_export()
|
|
assert isinstance(
|
|
result, types.GeneratorType
|
|
), "iter_export must return a generator for streaming — not a pre-loaded list"
|