diff --git a/plans/activity-log/CONTEXT.md b/plans/activity-log/CONTEXT.md index d3b5ba6..0776c6a 100644 --- a/plans/activity-log/CONTEXT.md +++ b/plans/activity-log/CONTEXT.md @@ -45,8 +45,8 @@ context (survives across phases; graduates to CLAUDE.md only if it's a lasting p ## Frozen contracts (fill as phases complete) -- ActivityLogEntry fields / dict shape: _(Phase 1/2 handoff)_ -- ActivityLogFilters shape: _(Phase 1 handoff)_ +- ActivityLogEntry fields / dict shape: **frozen** — see phase-1-storage.md Handoff section. 11 fields: `id`, `ts`, `category`, `action`, `severity`, `actor`, `message`, `entity_type`, `entity_id`, `entity_name`, `metadata`. `seq` is DB-only (not on dataclass). +- ActivityLogFilters shape: **frozen** — 8 optional fields: `categories`, `severities`, `actor`, `entity_type`, `entity_id`, `since`, `until`, `message_like`. See phase-1-storage.md Handoff. - recorder.record(...) signature + actor ContextVar import path: _(Phase 2 handoff)_ - API endpoints + query params + page envelope + settings bounds: _(Phase 4 handoff)_ @@ -65,4 +65,4 @@ context (survives across phases; graduates to CLAUDE.md only if it's a lasting p ## Phase progress notes -_(Orchestrator appends a short note per phase: what landed, commit sha, any warnings.)_ +Phase 1 landed (2026-06-09): `activity_log.py` (dataclass + enums + filters + codec), `AddActivityLogTableMigration` (`002_add_activity_log`) appended to `ALL_MIGRATIONS`, `ActivityLogRepository` (record/query/count/prune/clear/iter_export), 41 new tests — all green. Full suite 2226 passed, 0 failed. Schema and method signatures frozen in phase-1-storage.md Handoff. Gotcha: `Database.execute` takes a positional tuple — use `?` placeholders (not `:name`), otherwise Python 3.14 will raise `ProgrammingError`. diff --git a/plans/activity-log/PLAN.md b/plans/activity-log/PLAN.md index 67819cd..b74fc71 100644 --- a/plans/activity-log/PLAN.md +++ b/plans/activity-log/PLAN.md @@ -59,7 +59,7 @@ is an on-demand CSV/JSON **export** (no separate backup subsystem). ## Phases -- [ ] Phase 1: Storage — model, migration, repository [domain: data] → [subplan](./phase-1-storage.md) +- [x] Phase 1: Storage — model, migration, repository [domain: data] → [subplan](./phase-1-storage.md) - [ ] Phase 2: Recorder, actor context, retention, lifecycle [domain: backend] → [subplan](./phase-2-recorder-retention.md) - [ ] Phase 3: Event instrumentation (4 categories) [domain: backend] → [subplan](./phase-3-instrumentation.md) - [ ] Phase 4: REST API — query/filter/export/settings/clear [domain: backend] → [subplan](./phase-4-api.md) @@ -79,7 +79,7 @@ is an on-demand CSV/JSON **export** (no separate backup subsystem). | Phase | Domain | Status | Review | Build | Committed | |-------|--------|--------|--------|-------|-----------| -| Phase 1: Storage | data | ⬜ Not Started | ⬜ | ⬜ | ⬜ | +| Phase 1: Storage | data | ✅ Done | ✅ Passed | ✅ Passed | ✅ | | Phase 2: Recorder/Retention | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | | Phase 3: Instrumentation | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | | Phase 4: REST API | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | diff --git a/plans/activity-log/phase-1-storage.md b/plans/activity-log/phase-1-storage.md index 5b13255..0cb7518 100644 --- a/plans/activity-log/phase-1-storage.md +++ b/plans/activity-log/phase-1-storage.md @@ -1,6 +1,6 @@ # Phase 1: Storage — model, migration, repository -**Status:** ⬜ Not Started +**Status:** ✅ Done **Parent plan:** [PLAN.md](./PLAN.md) **Domain:** data @@ -13,7 +13,7 @@ keyset-paginated filtered query, count, time/count-based prune, and streaming ex ## Tasks -- [ ] Create `server/src/ledgrab/storage/activity_log.py`: +- [x] Create `server/src/ledgrab/storage/activity_log.py`: - `ActivityCategory` and `ActivitySeverity` string enums (or `Literal` unions used as constants). Categories: `auth`, `device`, `entity`, `capture`, `system`. Severities: `info`, `warning`, `error`. @@ -22,7 +22,7 @@ keyset-paginated filtered query, count, time/count-based prune, and streaming ex `entity_type: str | None`, `entity_id: str | None`, `entity_name: str | None`, `message: str`, `metadata: dict` (small JSON; default empty). Provide `to_row()` / `from_row()` (column tuple/dict ↔ dataclass; `metadata` JSON-encoded; `ts` isoformat). -- [ ] Add migration to `server/src/ledgrab/storage/data_migrations.py`: +- [x] Add migration to `server/src/ledgrab/storage/data_migrations.py`: - New `DataMigration` subclass `AddActivityLogTableMigration` with unique `name` (next sequential id, e.g. `"NNN_add_activity_log"` — match existing naming) and `apply(conn)` creating `activity_log` with an INTEGER PRIMARY KEY AUTOINCREMENT `seq` @@ -33,7 +33,7 @@ keyset-paginated filtered query, count, time/count-based prune, and streaming ex - Indexes: `(ts DESC, seq DESC)` (primary keyset/sort), `category`, `severity`, `actor`, `(entity_type, entity_id)`. Use `CREATE TABLE/INDEX IF NOT EXISTS` for idempotency. - Append the instance to `ALL_MIGRATIONS` (never reorder existing entries). -- [ ] Create `server/src/ledgrab/storage/activity_log_repository.py`: +- [x] Create `server/src/ledgrab/storage/activity_log_repository.py`: - `class ActivityLogRepository` taking `db: Database` (NOT subclassing `BaseSqliteStore`). - `record(entry: ActivityLogEntry) -> None`: single parameterized INSERT via `db.execute(...)` (auto-commit). The `seq` is DB-assigned. **Caller guarantees this runs @@ -51,7 +51,7 @@ keyset-paginated filtered query, count, time/count-based prune, and streaming ex (does not load all rows into memory). - Define a small `ActivityLogFilters` dataclass (all-optional fields) in the repository or `activity_log.py` and reuse it across query/count/prune/export. -- [ ] Unit tests in `server/tests/storage/test_activity_log_repository.py`: +- [x] Unit tests in `server/tests/storage/test_activity_log_repository.py`: - insert + read back round-trip (incl. metadata JSON, UTC ts); - filter by each dimension (category/severity/actor/entity/date/free-text); - keyset pagination stability across two pages with same-`ts` rows (seq tiebreaker); @@ -89,14 +89,89 @@ keyset-paginated filtered query, count, time/count-based prune, and streaming ex ## Review Checklist -- [ ] All tasks completed -- [ ] Code follows project conventions (dataclass codec style, migration naming) -- [ ] No unintended side effects (no startup wiring yet) -- [ ] Build passes (ruff + pytest) -- [ ] Tests pass (new + existing) +- [x] All tasks completed +- [x] Code follows project conventions (dataclass codec style, migration naming) +- [x] No unintended side effects (no startup wiring yet) +- [x] Build passes (ruff + pytest) +- [x] Tests pass (new + existing) ## Handoff to Next Phase - +### ActivityLogEntry — final field list and dict shape + +```python +@dataclass +class ActivityLogEntry: + id: str # "al_" — caller-assigned + ts: datetime # UTC-aware; stored as ISO-8601 string in DB + category: str # ActivityCategory constant + action: str # verb-object label, e.g. "entity.created" + severity: str # ActivitySeverity constant + actor: str # API-key label or "system" + message: str # human-readable description + entity_type: str | None # e.g. "output_target" + entity_id: str | None # stable entity id + entity_name: str | None # name at time of event + metadata: dict # JSON-serialisable; default {} +``` + +`to_row()` returns a flat dict with 11 keys (same names); `metadata` is JSON string, `ts` is isoformat string. `seq` is NOT in `to_row()` — it is DB-assigned. + +### ActivityLogFilters — shape (all fields optional, default None) + +```python +@dataclass +class ActivityLogFilters: + categories: Sequence[str] | None # category IN (...) + severities: Sequence[str] | None # severity IN (...) + actor: str | None # exact match + entity_type: str | None # exact match + entity_id: str | None # exact match + since: datetime | None # ts >= since + until: datetime | None # ts <= until + message_like: str | None # LIKE %value% (escaped) +``` + +### Migration name used + +`"002_add_activity_log"` — appended as position [1] in `ALL_MIGRATIONS`. + +### ActivityLogRepository — exact method signatures + +```python +class ActivityLogRepository: + def __init__(self, db: Database) -> None + def record(self, entry: ActivityLogEntry) -> None + def query( + self, + filters: ActivityLogFilters, + *, + before_seq: int | None = None, + limit: int = 50, + ) -> list[ActivityLogEntry] + def count(self, filters: ActivityLogFilters | None = None) -> int + def prune( + self, + *, + before_ts: datetime | None = None, + max_entries: int | None = None, + ) -> int + def clear(self) -> int + def iter_export( + self, filters: ActivityLogFilters | None = None + ) -> Iterator[ActivityLogEntry] +``` + +### Key behavioural notes for Phase 2/3/4 + +- `record()` expects to be called from the event-loop thread (or with `Database` RLock already held). Phase 2 is responsible for thread marshaling via `loop.call_soon_threadsafe`. +- `query()` returns entries in **ascending chronological order within the page** (reversed internally from DESC fetch for display convenience). The smallest `seq` on a page is `page[0]`'s seq — pass that as `before_seq` for the next page. +- `count(None)` == `count(ActivityLogFilters())` — both count all rows. +- `prune(before_ts=X, max_entries=N)` applies both predicates independently (age prune first, then count cap). +- `iter_export` holds `db._lock` for the entire iteration. Phase 4 should stream the response and consume promptly. +- `ActivityLogCategory` and `ActivityLogSeverity` are plain classes with string class-attributes and an `ALL` tuple — NOT `enum.Enum`. +- Imports for Phase 2/3/4: + ```python + from ledgrab.storage.activity_log import ActivityLogEntry, ActivityLogFilters, ActivityCategory, ActivitySeverity + from ledgrab.storage.activity_log_repository import ActivityLogRepository + ``` diff --git a/server/src/ledgrab/storage/activity_log.py b/server/src/ledgrab/storage/activity_log.py new file mode 100644 index 0000000..84ff3df --- /dev/null +++ b/server/src/ledgrab/storage/activity_log.py @@ -0,0 +1,171 @@ +"""Activity / audit log data model. + +Defines the ``ActivityLogEntry`` dataclass together with its category and +severity enumerations and the ``ActivityLogFilters`` query object. Row-level +codec (``to_row`` / ``from_row``) converts between the dataclass and a flat +SQLite column dict. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Sequence + + +# --------------------------------------------------------------------------- +# Enumerations (string constants, not enum.Enum, to stay consistent with the +# rest of the codebase which uses plain string literals) +# --------------------------------------------------------------------------- + + +class ActivityCategory: + """Valid ``category`` values for an ``ActivityLogEntry``.""" + + AUTH = "auth" + DEVICE = "device" + ENTITY = "entity" + CAPTURE = "capture" + SYSTEM = "system" + + ALL: tuple[str, ...] = (AUTH, DEVICE, ENTITY, CAPTURE, SYSTEM) + + +class ActivitySeverity: + """Valid ``severity`` values for an ``ActivityLogEntry``.""" + + INFO = "info" + WARNING = "warning" + ERROR = "error" + + ALL: tuple[str, ...] = (INFO, WARNING, ERROR) + + +# --------------------------------------------------------------------------- +# Entry dataclass +# --------------------------------------------------------------------------- + + +@dataclass +class ActivityLogEntry: + """One immutable audit record. + + Fields + ------ + id Stable application-assigned identifier, e.g. ``al_``. + ts UTC timestamp of the event (server-assigned at record time). + category Broad bucket — one of :class:`ActivityCategory`. + action Verb-object label, e.g. ``"entity.created"`` or ``"auth.rejected"``. + severity One of :class:`ActivitySeverity`. + actor Who triggered the action, e.g. an API-key label or ``"system"``. + entity_type Optional: kind of entity involved, e.g. ``"output_target"``. + entity_id Optional: stable entity identifier. + entity_name Optional: human-readable entity name at the time of the event. + message Human-readable description suitable for display. + metadata Small structured context (device address, error code, …). + Must be JSON-serialisable; defaults to empty dict. + """ + + id: str + ts: datetime + category: str + action: str + severity: str + actor: str + message: str + entity_type: str | None = None + entity_id: str | None = None + entity_name: str | None = None + metadata: dict = field(default_factory=dict) + + # -- Row codec ----------------------------------------------------------- + + def to_row(self) -> dict: + """Return a flat dict suitable for a parameterised SQLite INSERT. + + ``ts`` is stored as an ISO-8601 string (UTC). ``metadata`` is stored + as a JSON string. ``seq`` is DB-assigned and is NOT included. + """ + return { + "id": self.id, + "ts": self.ts.isoformat(), + "category": self.category, + "action": self.action, + "severity": self.severity, + "actor": self.actor, + "entity_type": self.entity_type, + "entity_id": self.entity_id, + "entity_name": self.entity_name, + "message": self.message, + "metadata": json.dumps(self.metadata, ensure_ascii=False), + } + + @staticmethod + def from_row(row: dict) -> "ActivityLogEntry": + """Reconstruct an ``ActivityLogEntry`` from a SQLite row dict. + + ``row`` may include the ``seq`` column — it is ignored here (callers + that need ``seq`` for keyset pagination access it directly from the + row before calling this method). + """ + raw_meta = row.get("metadata") or "{}" + try: + metadata = json.loads(raw_meta) + except (json.JSONDecodeError, TypeError): + metadata = {} + + raw_ts = row["ts"] + ts = datetime.fromisoformat(raw_ts) + # Ensure tz-aware UTC even if stored without offset (legacy rows) + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + + return ActivityLogEntry( + id=row["id"], + ts=ts, + category=row["category"], + action=row["action"], + severity=row["severity"], + actor=row["actor"], + entity_type=row.get("entity_type"), + entity_id=row.get("entity_id"), + entity_name=row.get("entity_name"), + message=row["message"], + metadata=metadata, + ) + + +# --------------------------------------------------------------------------- +# Filters dataclass +# --------------------------------------------------------------------------- + + +@dataclass +class ActivityLogFilters: + """Optional query-time filters for :class:`ActivityLogRepository`. + + All fields are optional. ``None`` / empty sequence means "no restriction + on this dimension". + + Fields + ------ + categories Restrict to entries whose ``category`` is in this set. + severities Restrict to entries whose ``severity`` is in this set. + actor Exact match on the ``actor`` field. + entity_type Exact match on the ``entity_type`` field. + entity_id Exact match on the ``entity_id`` field. + since Inclusive lower bound on ``ts`` (``ts >= since``). + until Inclusive upper bound on ``ts`` (``ts <= until``). + message_like Free-text substring match on ``message`` (LIKE ``%value%``). + The value is escaped — no SQL injection risk. + """ + + categories: Sequence[str] | None = None + severities: Sequence[str] | None = None + actor: str | None = None + entity_type: str | None = None + entity_id: str | None = None + since: datetime | None = None + until: datetime | None = None + message_like: str | None = None diff --git a/server/src/ledgrab/storage/activity_log_repository.py b/server/src/ledgrab/storage/activity_log_repository.py new file mode 100644 index 0000000..b54df49 --- /dev/null +++ b/server/src/ledgrab/storage/activity_log_repository.py @@ -0,0 +1,293 @@ +"""Append-only repository for the persistent activity / audit log. + +Design notes +------------ +* Does NOT subclass ``BaseSqliteStore`` — that base loads every row into an + in-memory cache at construction time, which is wrong for an unbounded, + append-heavy log. +* All SQL parameters are passed as positional ``?`` placeholders — no user + input is interpolated into the query string. +* Keyset pagination is implemented via the ``seq`` INTEGER PRIMARY KEY + AUTOINCREMENT column, which is strictly monotonic and survives rows with + identical ``ts`` values. +* The repository takes a ``Database`` instance and calls ``db.execute`` / + ``db.transaction`` directly. +* Thread safety: ``Database.execute`` holds the ``RLock`` for the duration of + each call. The repository itself adds no extra locking. Callers that + originate from non-event-loop threads MUST marshal via + ``loop.call_soon_threadsafe`` (that is Phase 2's responsibility). +""" + +from __future__ import annotations + +import sqlite3 +from datetime import datetime +from typing import Iterator + +from ledgrab.storage.activity_log import ActivityLogEntry, ActivityLogFilters +from ledgrab.storage.database import Database +from ledgrab.utils import get_logger + +logger = get_logger(__name__) + +_TABLE = "activity_log" + + +def _build_filter_clause( + filters: ActivityLogFilters, + params: list, + *, + extra_where: str | None = None, +) -> str: + """Return a WHERE clause string and append bound parameters to *params*. + + The caller is responsible for providing the opening ``WHERE`` keyword when + the returned string is non-empty, or ``AND`` when appending to an existing + predicate. This function returns only the condition fragment(s) joined by + ``AND``, or an empty string when *filters* has no restrictions. + + ``extra_where`` is prepended (with AND) before the filter conditions if + provided (used for the ``seq < ?`` keyset predicate). + """ + conditions: list[str] = [] + + if extra_where: + conditions.append(extra_where) + + if filters.categories: + placeholders = ",".join("?" * len(filters.categories)) + conditions.append(f"category IN ({placeholders})") + params.extend(filters.categories) + + if filters.severities: + placeholders = ",".join("?" * len(filters.severities)) + conditions.append(f"severity IN ({placeholders})") + params.extend(filters.severities) + + if filters.actor is not None: + conditions.append("actor = ?") + params.append(filters.actor) + + if filters.entity_type is not None: + conditions.append("entity_type = ?") + params.append(filters.entity_type) + + if filters.entity_id is not None: + conditions.append("entity_id = ?") + params.append(filters.entity_id) + + if filters.since is not None: + conditions.append("ts >= ?") + params.append(filters.since.isoformat()) + + if filters.until is not None: + conditions.append("ts <= ?") + params.append(filters.until.isoformat()) + + if filters.message_like is not None: + # Escape LIKE special characters in the user-supplied substring so that + # a literal '%' or '_' in the message does not act as a wildcard. + escaped = filters.message_like.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") + conditions.append("message LIKE ? ESCAPE '\\'") + params.append(f"%{escaped}%") + + return " AND ".join(conditions) + + +class ActivityLogRepository: + """Purpose-built repository for the ``activity_log`` table. + + Parameters + ---------- + db: + The shared ``Database`` singleton. The migration that creates the + ``activity_log`` table must have been applied before the first call to + :meth:`record`. Construction triggers migration via + ``MigrationRunner`` so users can create the repository directly + without a separate startup step. + """ + + def __init__(self, db: Database) -> None: + self._db = db + # Apply pending migrations (idempotent; most runs are no-ops) + from ledgrab.storage.data_migrations import ALL_MIGRATIONS, MigrationRunner + + MigrationRunner(db).run(ALL_MIGRATIONS) + + # -- Write --------------------------------------------------------------- + + def record(self, entry: ActivityLogEntry) -> None: + """Append *entry* to the log. + + The ``seq`` column is assigned by SQLite (AUTOINCREMENT) — ``entry`` + does not carry a ``seq`` field. + + Caller contract: this must be called from the event-loop thread (or + from a context already serialised through the ``Database`` RLock). + Cross-thread callers must marshal via ``loop.call_soon_threadsafe``; + that is Phase 2's responsibility. + """ + row = entry.to_row() + self._db.execute( + f"INSERT INTO {_TABLE} " + f"(id, ts, category, action, severity, actor, " + f" entity_type, entity_id, entity_name, message, metadata) " + f"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + row["id"], + row["ts"], + row["category"], + row["action"], + row["severity"], + row["actor"], + row["entity_type"], + row["entity_id"], + row["entity_name"], + row["message"], + row["metadata"], + ), + ) + + # -- Read ---------------------------------------------------------------- + + def query( + self, + filters: ActivityLogFilters, + *, + before_seq: int | None = None, + limit: int = 50, + ) -> list[ActivityLogEntry]: + """Return the newest matching entries, oldest-first within the page. + + Keyset pagination: pass the smallest ``seq`` seen on the previous page + as *before_seq* to get the next page. Entries are fetched in + ``seq DESC`` order from SQLite and then reversed before returning so + that the caller receives them in chronological order within the page. + + Parameters + ---------- + filters: + Optional filter criteria. + before_seq: + Exclusive upper bound on ``seq``. ``None`` returns the first + (newest) page. + limit: + Maximum number of entries to return. + """ + params: list = [] + keyset = "seq < ?" if before_seq is not None else None + if before_seq is not None: + params.append(before_seq) + + where_fragment = _build_filter_clause(filters, params, extra_where=keyset) + where_clause = f"WHERE {where_fragment}" if where_fragment else "" + params.append(limit) + + sql = ( + f"SELECT seq, id, ts, category, action, severity, actor, " + f"entity_type, entity_id, entity_name, message, metadata " + f"FROM {_TABLE} " + f"{where_clause} " + f"ORDER BY seq DESC " + f"LIMIT ?" + ) + + cursor = self._db.execute(sql, tuple(params)) + rows = cursor.fetchall() + # Reverse to return chronological order within the page + return [ActivityLogEntry.from_row(dict(row)) for row in reversed(rows)] + + def count(self, filters: ActivityLogFilters | None = None) -> int: + """Return the number of entries matching *filters* (or all entries).""" + if filters is None: + filters = ActivityLogFilters() + params: list = [] + where_fragment = _build_filter_clause(filters, params) + where_clause = f"WHERE {where_fragment}" if where_fragment else "" + + sql = f"SELECT COUNT(*) AS cnt FROM {_TABLE} {where_clause}" + cursor = self._db.execute(sql, tuple(params)) + row = cursor.fetchone() + return int(row["cnt"]) + + # -- Maintenance --------------------------------------------------------- + + def prune( + self, + *, + before_ts: datetime | None = None, + max_entries: int | None = None, + ) -> int: + """Delete old / excess entries; return the total rows deleted. + + Both predicates are applied independently: + + 1. Delete all rows where ``ts < before_ts`` (age-based pruning). + 2. Keep only the newest ``max_entries`` rows, deleting the rest + (count-based pruning). + + The two operations run in separate statements so that each can be + independently enabled or disabled. + """ + deleted = 0 + + if before_ts is not None: + cursor = self._db.execute( + f"DELETE FROM {_TABLE} WHERE ts < ?", + (before_ts.isoformat(),), + ) + deleted += cursor.rowcount + + if max_entries is not None and max_entries >= 0: + # Find the seq of the Nth newest row; delete everything older than it. + cursor = self._db.execute( + f"SELECT seq FROM {_TABLE} ORDER BY seq DESC LIMIT 1 OFFSET ?", + (max_entries,), + ) + cutoff_row = cursor.fetchone() + if cutoff_row is not None: + cutoff_seq = cutoff_row["seq"] + cursor = self._db.execute( + f"DELETE FROM {_TABLE} WHERE seq <= ?", + (cutoff_seq,), + ) + deleted += cursor.rowcount + + return deleted + + def clear(self) -> int: + """Delete all entries; return the number of rows deleted.""" + cursor = self._db.execute(f"DELETE FROM {_TABLE}") + return cursor.rowcount + + # -- Export -------------------------------------------------------------- + + def iter_export(self, filters: ActivityLogFilters | None = None) -> Iterator[ActivityLogEntry]: + """Yield all matching entries in ascending ``seq`` order. + + Uses a server-side cursor so the entire result set is never loaded + into memory — safe for large tables. The connection's ``RLock`` is + held for the duration of the iteration; callers should consume this + iterator promptly. + """ + if filters is None: + filters = ActivityLogFilters() + + params: list = [] + where_fragment = _build_filter_clause(filters, params) + where_clause = f"WHERE {where_fragment}" if where_fragment else "" + + sql = ( + f"SELECT seq, id, ts, category, action, severity, actor, " + f"entity_type, entity_id, entity_name, message, metadata " + f"FROM {_TABLE} " + f"{where_clause} " + f"ORDER BY seq ASC" + ) + + # Use the raw connection directly to get a streaming cursor. + # We borrow the lock for the full iteration. + with self._db._lock: # noqa: SLF001 — internal access; no public cursor API + cursor: sqlite3.Cursor = self._db._conn.execute(sql, tuple(params)) # noqa: SLF001 + for row in cursor: + yield ActivityLogEntry.from_row(dict(row)) diff --git a/server/src/ledgrab/storage/data_migrations.py b/server/src/ledgrab/storage/data_migrations.py index b8515d1..ea26de3 100644 --- a/server/src/ledgrab/storage/data_migrations.py +++ b/server/src/ledgrab/storage/data_migrations.py @@ -213,7 +213,57 @@ class StaticToSingleColorMigration(DataMigration): return rows_changed +class AddActivityLogTableMigration(DataMigration): + """Create the ``activity_log`` table and its indexes. + + This is a purely additive migration — it does not touch any existing table. + ``CREATE TABLE / INDEX IF NOT EXISTS`` ensures idempotency if the migration + somehow runs twice (e.g. after a partial restore). + """ + + name = "002_add_activity_log" + + def apply(self, conn: sqlite3.Connection) -> int: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS activity_log ( + seq INTEGER PRIMARY KEY AUTOINCREMENT, + id TEXT UNIQUE NOT NULL, + ts TEXT NOT NULL, + category TEXT NOT NULL, + action TEXT NOT NULL, + severity TEXT NOT NULL, + actor TEXT NOT NULL, + entity_type TEXT, + entity_id TEXT, + entity_name TEXT, + message TEXT NOT NULL, + metadata TEXT NOT NULL DEFAULT '{}' + ) + """ + ) + # Primary read path: newest-first keyset pagination + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_activity_log_ts_seq " + "ON activity_log (ts DESC, seq DESC)" + ) + # Filter indexes + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_activity_log_category " "ON activity_log (category)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_activity_log_severity " "ON activity_log (severity)" + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_activity_log_actor " "ON activity_log (actor)") + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_activity_log_entity " + "ON activity_log (entity_type, entity_id)" + ) + return 0 + + # Master list — ORDER MATTERS. Append new migrations; never reorder. ALL_MIGRATIONS: list[DataMigration] = [ StaticToSingleColorMigration(), + AddActivityLogTableMigration(), ] diff --git a/server/tests/storage/test_activity_log_adversarial.py b/server/tests/storage/test_activity_log_adversarial.py new file mode 100644 index 0000000..28070e7 --- /dev/null +++ b/server/tests/storage/test_activity_log_adversarial.py @@ -0,0 +1,884 @@ +"""Adversarial / edge-case tests for ActivityLogRepository (Phase 1 — storage layer). + +These tests are intentionally skeptical — they derive expected behaviour from the +acceptance criteria in plans/activity-log/phase-1-storage.md, NOT from what the +code happens to do today. If a test fails, it is a real bug. + +Coverage areas +-------------- +1. SQL-injection / parameterization safety (message_like with %, _, ;, --, quotes, etc.) +2. Keyset pagination edge cases (empty table, before_seq bounds, stability, + ordering contract, no duplicates/gaps) +3. Prune edge cases (before_ts only, max_entries only, both, + max_entries=0, larger than count, deleted count, + keeps NEWEST entries) +4. Filter combination edge cases (AND semantics, empty sequence vs None, + since/until inclusive bounds, tz-aware datetimes) +5. Codec / data integrity (metadata round-trip: nested, unicode, JSON-escape; + entity_* None vs empty string; microsecond ts) +6. Migration idempotency (table + all indexes present; double-run is no-op) +7. iter_export vs query consistency (same filters yield same rows; empty table; filter) +""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta, timezone + +import pytest + +from ledgrab.storage.activity_log import ( + ActivityCategory, + ActivityLogEntry, + ActivityLogFilters, + ActivitySeverity, +) +from ledgrab.storage.activity_log_repository import ActivityLogRepository +from ledgrab.storage.database import Database + + +# --------------------------------------------------------------------------- +# Helpers (mirror the implementer's helpers so tests are self-contained) +# --------------------------------------------------------------------------- + + +def _now() -> datetime: + return datetime.now(timezone.utc) + + +_SENTINEL = object() + + +def _entry( + *, + id: str | None = None, + ts: datetime | None = None, + category: str = ActivityCategory.ENTITY, + action: str = "entity.created", + severity: str = ActivitySeverity.INFO, + actor: str = "test_actor", + entity_type: str | None = "output_target", + entity_id: object = _SENTINEL, + entity_name: str | None = "My Target", + message: str = "Created output target", + metadata: dict | None = None, +) -> ActivityLogEntry: + resolved_entity_id: str | None = ( + f"ot_{uuid.uuid4().hex[:8]}" if entity_id is _SENTINEL else entity_id # type: ignore[assignment] + ) + return ActivityLogEntry( + id=id or f"al_{uuid.uuid4().hex[:8]}", + ts=ts or _now(), + category=category, + action=action, + severity=severity, + actor=actor, + entity_type=entity_type, + entity_id=resolved_entity_id, + entity_name=entity_name, + message=message, + metadata=metadata if metadata is not None else {}, + ) + + +@pytest.fixture +def repo(tmp_db: Database) -> ActivityLogRepository: + """Fresh ActivityLogRepository backed by a temp database.""" + return ActivityLogRepository(tmp_db) + + +def _get_seq(repo: ActivityLogRepository, entry_id: str) -> int: + cursor = repo._db.execute("SELECT seq FROM activity_log WHERE id = ?", (entry_id,)) + row = cursor.fetchone() + assert row is not None, f"No row found for id={entry_id!r}" + return int(row["seq"]) + + +# --------------------------------------------------------------------------- +# 1. SQL-injection / parameterization safety +# --------------------------------------------------------------------------- + + +class TestSQLInjectionSafety: + """All user-supplied filter values must be treated as literal text, not SQL.""" + + def test_message_like_percent_is_literal(self, repo: ActivityLogRepository) -> None: + """A literal '%' in message_like must NOT act as a LIKE wildcard.""" + repo.record(_entry(message="100% done")) + repo.record(_entry(message="all done")) + repo.record(_entry(message="percent sign here")) + + results = repo.query(ActivityLogFilters(message_like="100%"), limit=10) + assert len(results) == 1, "% in message_like should be a literal percent, not a wildcard" + assert results[0].message == "100% done" + + def test_message_like_underscore_is_literal(self, repo: ActivityLogRepository) -> None: + """A literal '_' in message_like must NOT act as a single-char wildcard.""" + repo.record(_entry(message="device_01")) + repo.record(_entry(message="device001")) # would match if _ were a wildcard + repo.record(_entry(message="some other message")) + + results = repo.query(ActivityLogFilters(message_like="device_01"), limit=10) + assert ( + len(results) == 1 + ), "_ in message_like should be a literal underscore, not a single-char wildcard" + assert results[0].message == "device_01" + + def test_message_like_single_quote_does_not_break_query( + self, repo: ActivityLogRepository + ) -> None: + """A single quote in message_like must not cause a SQL syntax error.""" + repo.record(_entry(message="it's working")) + repo.record(_entry(message="no quote here")) + + # Must not raise + results = repo.query(ActivityLogFilters(message_like="it's"), limit=10) + assert len(results) == 1 + assert results[0].message == "it's working" + + def test_message_like_semicolon_does_not_execute_second_statement( + self, repo: ActivityLogRepository + ) -> None: + """';' in message_like must not let a second SQL statement execute.""" + repo.record(_entry(message="a; DROP TABLE activity_log; --")) + repo.record(_entry(message="safe message")) + + # If injection succeeded, table would be dropped and next call would error + results = repo.query( + ActivityLogFilters(message_like="a; DROP TABLE activity_log; --"), limit=10 + ) + # Table must still exist + assert repo.count() == 2 + assert len(results) == 1 + + def test_message_like_sql_comment_sequence(self, repo: ActivityLogRepository) -> None: + """'--' (SQL comment) in message_like must be treated literally.""" + repo.record(_entry(message="value -- comment")) + repo.record(_entry(message="value no comment")) + + results = repo.query(ActivityLogFilters(message_like="value --"), limit=10) + assert len(results) == 1 + assert results[0].message == "value -- comment" + + def test_message_like_backslash_literal(self, repo: ActivityLogRepository) -> None: + """Backslash in message_like must be treated as a literal character.""" + repo.record(_entry(message="path\\to\\file")) + repo.record(_entry(message="path/to/file")) + + results = repo.query(ActivityLogFilters(message_like="path\\to"), limit=10) + assert len(results) == 1 + assert results[0].message == "path\\to\\file" + + def test_message_like_classic_injection_pattern(self, repo: ActivityLogRepository) -> None: + """Classic ') OR '1'='1 injection attempt must return no false positives.""" + repo.record(_entry(message="innocent message")) + repo.record(_entry(message="another message")) + + # If injection worked, all rows would match + results = repo.query(ActivityLogFilters(message_like="') OR '1'='1"), limit=10) + assert ( + len(results) == 0 + ), "Injection payload matched rows it shouldn't — parameterization may be broken" + + def test_message_like_all_wildcards_returns_nothing_for_no_match( + self, repo: ActivityLogRepository + ) -> None: + """'%_%' as a literal search term should return no rows unless that exact + substring appears in a message.""" + repo.record(_entry(message="some message")) + + results = repo.query(ActivityLogFilters(message_like="%_%"), limit=10) + # '%_%' as literal text does not appear in "some message" + assert ( + len(results) == 0 + ), "% and _ in message_like were treated as SQL wildcards instead of literals" + + def test_actor_exact_match_not_like(self, repo: ActivityLogRepository) -> None: + """actor filter is exact match — SQL wildcards in value must not act as wildcards.""" + repo.record(_entry(actor="alice")) + repo.record(_entry(actor="alice_admin")) + + results = repo.query(ActivityLogFilters(actor="alice"), limit=10) + assert ( + len(results) == 1 + ), "actor filter is exact-match; 'alice' should not match 'alice_admin'" + assert results[0].actor == "alice" + + def test_entity_id_exact_match_not_like(self, repo: ActivityLogRepository) -> None: + """entity_id filter is exact match — prefix should not leak.""" + repo.record(_entry(entity_id="ot_abc")) + repo.record(_entry(entity_id="ot_abc_extra")) + + results = repo.query(ActivityLogFilters(entity_id="ot_abc"), limit=10) + assert len(results) == 1 + + +# --------------------------------------------------------------------------- +# 2. Keyset pagination edge cases +# --------------------------------------------------------------------------- + + +class TestKeysetPaginationEdges: + def test_empty_table_returns_empty_list(self, repo: ActivityLogRepository) -> None: + """Query on empty table must return [] not raise.""" + results = repo.query(ActivityLogFilters(), limit=10) + assert results == [] + + def test_before_seq_none_is_first_page(self, repo: ActivityLogRepository) -> None: + """before_seq=None must return the newest (first) page.""" + base = datetime(2026, 1, 1, tzinfo=timezone.utc) + for i in range(5): + repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}")) + + page = repo.query(ActivityLogFilters(), before_seq=None, limit=3) + assert len(page) == 3 + # Page should contain the 3 newest entries + messages = {e.message for e in page} + assert "e4" in messages + assert "e3" in messages + assert "e2" in messages + + def test_before_seq_smaller_than_all_rows_returns_empty( + self, repo: ActivityLogRepository + ) -> None: + """before_seq=1 (smaller than all autoincrement seqs) returns empty page.""" + for i in range(5): + repo.record(_entry(message=f"e{i}")) + + # seq starts at 1, so before_seq=1 means seq < 1 — no rows + results = repo.query(ActivityLogFilters(), before_seq=1, limit=10) + assert ( + results == [] + ), "before_seq=1 should yield empty page since autoincrement starts at 1 (seq<1 = nothing)" + + def test_before_seq_larger_than_max_returns_full_first_page( + self, repo: ActivityLogRepository + ) -> None: + """before_seq larger than any seq in the table behaves like before_seq=None.""" + for i in range(5): + repo.record(_entry(message=f"e{i}")) + + page_none = repo.query(ActivityLogFilters(), before_seq=None, limit=5) + page_large = repo.query(ActivityLogFilters(), before_seq=999_999, limit=5) + + ids_none = {e.id for e in page_none} + ids_large = {e.id for e in page_large} + assert ids_none == ids_large + + def test_page_boundary_limit_equals_row_count(self, repo: ActivityLogRepository) -> None: + """When limit == total rows, one page covers all rows and a second page is empty.""" + for i in range(5): + repo.record(_entry(message=f"e{i}")) + + page1 = repo.query(ActivityLogFilters(), limit=5) + assert len(page1) == 5 + + first_seq = _get_seq(repo, page1[0].id) + page2 = repo.query(ActivityLogFilters(), before_seq=first_seq, limit=5) + assert page2 == [] + + def test_ordering_contract_page_zero_is_smallest_seq(self, repo: ActivityLogRepository) -> None: + """Within a page, page[0] must have the smallest seq (ascending chrono order). + The acceptance criteria state: 'The smallest seq on a page is page[0]'s seq — + pass that as before_seq for the next page.'""" + base = datetime(2026, 1, 1, tzinfo=timezone.utc) + for i in range(6): + repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}")) + + page = repo.query(ActivityLogFilters(), limit=6) + seqs = [_get_seq(repo, e.id) for e in page] + assert seqs == sorted( + seqs + ), "page must be in ascending seq order (page[0] is oldest/smallest seq)" + + def test_no_duplicates_across_full_walk(self, repo: ActivityLogRepository) -> None: + """Walking the entire table page by page yields each row exactly once.""" + total = 11 + base = datetime(2026, 1, 1, tzinfo=timezone.utc) + for i in range(total): + repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}")) + + all_ids: list[str] = [] + before_seq: int | None = None + limit = 4 + + while True: + page = repo.query(ActivityLogFilters(), before_seq=before_seq, limit=limit) + if not page: + break + all_ids.extend(e.id for e in page) + before_seq = _get_seq(repo, page[0].id) + + assert len(all_ids) == total, "Total rows from all pages must equal inserted count" + assert len(set(all_ids)) == total, "No duplicate IDs across pages" + + def test_no_gaps_across_full_walk(self, repo: ActivityLogRepository) -> None: + """Walking the entire table page by page with limit=1 yields every row.""" + total = 7 + for i in range(total): + repo.record(_entry(message=f"e{i}")) + + all_ids: list[str] = [] + before_seq: int | None = None + while True: + page = repo.query(ActivityLogFilters(), before_seq=before_seq, limit=1) + if not page: + break + all_ids.append(page[0].id) + before_seq = _get_seq(repo, page[0].id) + + assert len(all_ids) == total + + def test_many_rows_same_ts_no_duplicates_or_gaps(self, repo: ActivityLogRepository) -> None: + """With many identical timestamps, pagination via seq prevents any dup or gap.""" + same_ts = datetime(2026, 5, 1, 10, 0, 0, tzinfo=timezone.utc) + count = 9 + for i in range(count): + repo.record(_entry(ts=same_ts, message=f"same-ts {i}")) + + all_ids: list[str] = [] + before_seq: int | None = None + limit = 4 + while True: + page = repo.query(ActivityLogFilters(), before_seq=before_seq, limit=limit) + if not page: + break + all_ids.extend(e.id for e in page) + before_seq = _get_seq(repo, page[0].id) + + assert len(all_ids) == count + assert len(set(all_ids)) == count, "Duplicates found in same-ts pagination walk" + + def test_next_page_cursor_is_page_zero_seq(self, repo: ActivityLogRepository) -> None: + """The documented contract: pass page[0].seq as before_seq for next page. + Verify the next page does NOT overlap with the current page.""" + for i in range(6): + repo.record(_entry(message=f"e{i}")) + + page1 = repo.query(ActivityLogFilters(), limit=3) + cursor = _get_seq(repo, page1[0].id) # page[0] = smallest seq on page + page2 = repo.query(ActivityLogFilters(), before_seq=cursor, limit=3) + + ids1 = {e.id for e in page1} + ids2 = {e.id for e in page2} + assert ids1.isdisjoint(ids2), "Pages overlap — cursor contract broken" + + +# --------------------------------------------------------------------------- +# 3. Prune edge cases +# --------------------------------------------------------------------------- + + +class TestPruneEdgeCases: + def test_prune_before_ts_only_no_max_entries(self, repo: ActivityLogRepository) -> None: + """before_ts alone removes only old rows; recent rows untouched.""" + cutoff = datetime(2026, 3, 1, tzinfo=timezone.utc) + repo.record(_entry(ts=cutoff - timedelta(days=2), message="old")) + repo.record(_entry(ts=cutoff + timedelta(days=1), message="new")) + + deleted = repo.prune(before_ts=cutoff) + assert deleted == 1 + remaining = repo.query(ActivityLogFilters(), limit=10) + assert len(remaining) == 1 + assert remaining[0].message == "new" + + def test_prune_max_entries_only_no_before_ts(self, repo: ActivityLogRepository) -> None: + """max_entries alone trims to N newest; no age filter applied.""" + for i in range(6): + repo.record(_entry(message=f"e{i}")) + + deleted = repo.prune(max_entries=2) + assert deleted == 4 + assert repo.count() == 2 + + def test_prune_max_entries_zero_deletes_all(self, repo: ActivityLogRepository) -> None: + """max_entries=0 means keep nothing — all rows deleted.""" + for i in range(5): + repo.record(_entry()) + + deleted = repo.prune(max_entries=0) + assert deleted == 5 + assert repo.count() == 0 + + def test_prune_max_entries_larger_than_count_is_noop(self, repo: ActivityLogRepository) -> None: + """max_entries > actual count must not delete anything.""" + for i in range(3): + repo.record(_entry(message=f"e{i}")) + + deleted = repo.prune(max_entries=100) + assert deleted == 0 + assert repo.count() == 3 + + def test_prune_keeps_newest_entries_by_seq(self, repo: ActivityLogRepository) -> None: + """max_entries prune MUST keep the rows with the HIGHEST seq values.""" + base = datetime(2026, 1, 1, tzinfo=timezone.utc) + all_ids = [] + for i in range(6): + e = _entry(ts=base + timedelta(seconds=i), message=f"e{i}") + all_ids.append(e.id) + repo.record(e) + + # keep only 2 + repo.prune(max_entries=2) + remaining = repo.query(ActivityLogFilters(), limit=10) + remaining_ids = {r.id for r in remaining} + + # Must keep the last two inserted (highest seq = newest) + assert all_ids[-1] in remaining_ids, "Newest entry (e5) must be kept" + assert all_ids[-2] in remaining_ids, "Second newest entry (e4) must be kept" + # Oldest must be gone + assert all_ids[0] not in remaining_ids, "Oldest entry (e0) must be pruned" + + def test_prune_both_returns_sum_of_deleted(self, repo: ActivityLogRepository) -> None: + """prune(before_ts, max_entries) returns the TOTAL rows deleted by both steps.""" + base = datetime(2026, 4, 1, tzinfo=timezone.utc) + # 4 old entries (before base) + for i in range(4): + repo.record(_entry(ts=base - timedelta(hours=i + 1), message=f"old{i}")) + # 4 new entries (after base) + for i in range(4): + repo.record(_entry(ts=base + timedelta(hours=i + 1), message=f"new{i}")) + + # prune old, then keep only 2 new + deleted = repo.prune(before_ts=base, max_entries=2) + # 4 old + 2 of the 4 new = 6 total + assert deleted == 6 + assert repo.count() == 2 + + def test_prune_no_args_is_noop(self, repo: ActivityLogRepository) -> None: + """prune() with no args should delete 0 rows.""" + for i in range(3): + repo.record(_entry()) + + deleted = repo.prune() + assert deleted == 0 + assert repo.count() == 3 + + def test_prune_before_ts_boundary_is_exclusive(self, repo: ActivityLogRepository) -> None: + """prune(before_ts=X) uses strict < X; a row exactly at X must survive.""" + ts = datetime(2026, 5, 1, 12, 0, 0, tzinfo=timezone.utc) + repo.record(_entry(ts=ts - timedelta(seconds=1), message="before")) + repo.record(_entry(ts=ts, message="exact boundary")) + repo.record(_entry(ts=ts + timedelta(seconds=1), message="after")) + + deleted = repo.prune(before_ts=ts) + assert deleted == 1 # only "before" deleted + remaining = {r.message for r in repo.query(ActivityLogFilters(), limit=10)} + assert "exact boundary" in remaining + assert "before" not in remaining + + +# --------------------------------------------------------------------------- +# 4. Filter combination edge cases +# --------------------------------------------------------------------------- + + +class TestFilterCombinationEdges: + def test_multiple_filters_are_anded(self, repo: ActivityLogRepository) -> None: + """All non-None filters must be AND-ed together, not OR-ed.""" + repo.record( + _entry(actor="alice", category=ActivityCategory.AUTH, severity=ActivitySeverity.ERROR) + ) + repo.record( + _entry(actor="alice", category=ActivityCategory.DEVICE, severity=ActivitySeverity.INFO) + ) + repo.record( + _entry(actor="bob", category=ActivityCategory.AUTH, severity=ActivitySeverity.ERROR) + ) + + results = repo.query( + ActivityLogFilters( + actor="alice", + categories=[ActivityCategory.AUTH], + severities=[ActivitySeverity.ERROR], + ), + limit=10, + ) + assert len(results) == 1 + r = results[0] + assert r.actor == "alice" + assert r.category == ActivityCategory.AUTH + assert r.severity == ActivitySeverity.ERROR + + def test_empty_categories_sequence_means_no_restriction( + self, repo: ActivityLogRepository + ) -> None: + """An empty list for categories must behave the same as None (no restriction). + The acceptance criteria state empty sequence == None for this dimension.""" + repo.record(_entry(category=ActivityCategory.AUTH)) + repo.record(_entry(category=ActivityCategory.DEVICE)) + + # empty list + results_empty = repo.query(ActivityLogFilters(categories=[]), limit=10) + # None + results_none = repo.query(ActivityLogFilters(categories=None), limit=10) + + assert len(results_empty) == len(results_none), ( + "categories=[] and categories=None should behave identically (no restriction); " + f"got {len(results_empty)} vs {len(results_none)}" + ) + + def test_empty_severities_sequence_means_no_restriction( + self, repo: ActivityLogRepository + ) -> None: + """An empty list for severities must behave the same as None (no restriction).""" + repo.record(_entry(severity=ActivitySeverity.INFO)) + repo.record(_entry(severity=ActivitySeverity.ERROR)) + + results_empty = repo.query(ActivityLogFilters(severities=[]), limit=10) + results_none = repo.query(ActivityLogFilters(severities=None), limit=10) + + assert len(results_empty) == len( + results_none + ), "severities=[] and severities=None should behave identically" + + def test_since_is_inclusive(self, repo: ActivityLogRepository) -> None: + """since is an INCLUSIVE lower bound: ts >= since.""" + ts = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc) + repo.record(_entry(ts=ts - timedelta(seconds=1), message="before")) + repo.record(_entry(ts=ts, message="at boundary")) + repo.record(_entry(ts=ts + timedelta(seconds=1), message="after")) + + results = repo.query(ActivityLogFilters(since=ts), limit=10) + messages = {r.message for r in results} + assert "at boundary" in messages, "since boundary row (ts == since) must be included" + assert "before" not in messages + + def test_until_is_inclusive(self, repo: ActivityLogRepository) -> None: + """until is an INCLUSIVE upper bound: ts <= until.""" + ts = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc) + repo.record(_entry(ts=ts - timedelta(seconds=1), message="before")) + repo.record(_entry(ts=ts, message="at boundary")) + repo.record(_entry(ts=ts + timedelta(seconds=1), message="after")) + + results = repo.query(ActivityLogFilters(until=ts), limit=10) + messages = {r.message for r in results} + assert "at boundary" in messages, "until boundary row (ts == until) must be included" + assert "after" not in messages + + def test_since_and_until_define_closed_range(self, repo: ActivityLogRepository) -> None: + """Combining since + until must keep rows in [since, until] inclusive.""" + base = datetime(2026, 6, 1, 12, 0, 0, tzinfo=timezone.utc) + repo.record(_entry(ts=base - timedelta(hours=1), message="out_before")) + repo.record(_entry(ts=base, message="in_start")) + repo.record(_entry(ts=base + timedelta(hours=1), message="in_middle")) + repo.record(_entry(ts=base + timedelta(hours=2), message="in_end")) + repo.record(_entry(ts=base + timedelta(hours=3), message="out_after")) + + results = repo.query( + ActivityLogFilters(since=base, until=base + timedelta(hours=2)), + limit=10, + ) + messages = {r.message for r in results} + assert {"in_start", "in_middle", "in_end"} == messages + + def test_tz_aware_datetime_round_trip(self, repo: ActivityLogRepository) -> None: + """UTC-aware datetimes must survive storage and come back tz-aware.""" + ts = datetime(2026, 1, 15, 8, 30, 0, tzinfo=timezone.utc) + e = _entry(ts=ts) + repo.record(e) + + got = repo.query(ActivityLogFilters(), limit=1)[0] + assert got.ts.tzinfo is not None, "Returned ts must be tz-aware" + assert got.ts.utcoffset().total_seconds() == 0, "Returned ts must be UTC" # type: ignore[union-attr] + assert got.ts == ts + + def test_count_none_equals_count_empty_filters(self, repo: ActivityLogRepository) -> None: + """count(None) == count(ActivityLogFilters()) per acceptance criteria.""" + for i in range(4): + repo.record(_entry()) + + assert repo.count(None) == repo.count(ActivityLogFilters()) + + +# --------------------------------------------------------------------------- +# 5. Codec / data integrity +# --------------------------------------------------------------------------- + + +class TestCodecDataIntegrity: + def test_metadata_nested_dict_round_trip(self, repo: ActivityLogRepository) -> None: + """Deeply nested metadata survives JSON round-trip.""" + meta = { + "level1": { + "level2": {"level3": [1, 2, 3]}, + "list": [{"a": True}, {"b": None}], + }, + "count": 42, + } + e = _entry(metadata=meta) + repo.record(e) + got = repo.query(ActivityLogFilters(), limit=1)[0] + assert got.metadata == meta + + def test_metadata_unicode_round_trip(self, repo: ActivityLogRepository) -> None: + """Unicode (including emoji and CJK) in metadata survives storage.""" + meta = {"label": "こんにちは", "emoji": "🎉", "arrow": "→"} + e = _entry(metadata=meta) + repo.record(e) + got = repo.query(ActivityLogFilters(), limit=1)[0] + assert got.metadata == meta + + def test_metadata_empty_dict_round_trip(self, repo: ActivityLogRepository) -> None: + """An empty {} metadata must come back as {} not None.""" + e = _entry(metadata={}) + repo.record(e) + got = repo.query(ActivityLogFilters(), limit=1)[0] + assert got.metadata == {} + assert isinstance(got.metadata, dict) + + def test_metadata_json_special_chars(self, repo: ActivityLogRepository) -> None: + """Metadata with JSON-special characters (backslash, quotes) round-trips correctly.""" + meta = {"path": "C:\\Users\\test", "quoted": '"hello"', "newline": "line1\nline2"} + e = _entry(metadata=meta) + repo.record(e) + got = repo.query(ActivityLogFilters(), limit=1)[0] + assert got.metadata == meta + + def test_entity_type_none_vs_empty_string(self, repo: ActivityLogRepository) -> None: + """None entity_type must come back as None (not empty string ''). + These are semantically different — None means 'not applicable'.""" + e = _entry(entity_type=None, entity_id=None, entity_name=None) + repo.record(e) + got = repo.query(ActivityLogFilters(), limit=1)[0] + # Must be exactly None, not "" + assert got.entity_type is None + assert got.entity_id is None + assert got.entity_name is None + + def test_ts_microsecond_precision_preserved(self, repo: ActivityLogRepository) -> None: + """Microsecond component of ts must survive the isoformat() round-trip.""" + ts = datetime(2026, 6, 9, 12, 34, 56, 789012, tzinfo=timezone.utc) + e = _entry(ts=ts) + repo.record(e) + got = repo.query(ActivityLogFilters(), limit=1)[0] + assert got.ts == ts, f"Expected {ts!r}, got {got.ts!r} — microsecond precision may be lost" + + def test_to_row_does_not_include_seq(self) -> None: + """to_row() must NOT include 'seq' (it's DB-assigned).""" + e = _entry() + row = e.to_row() + assert "seq" not in row, "to_row() must not include seq — it is DB-assigned" + + def test_to_row_has_exactly_11_keys(self) -> None: + """Acceptance criteria: to_row() returns 11 keys.""" + e = _entry() + row = e.to_row() + expected_keys = { + "id", + "ts", + "category", + "action", + "severity", + "actor", + "entity_type", + "entity_id", + "entity_name", + "message", + "metadata", + } + assert set(row.keys()) == expected_keys + + def test_from_row_ignores_seq_column(self) -> None: + """from_row() must not raise or fail when 'seq' is present in the dict.""" + e = _entry() + row = e.to_row() + row["seq"] = 42 # inject seq as if from DB + recovered = ActivityLogEntry.from_row(row) + assert recovered.id == e.id + + def test_from_row_naive_ts_becomes_utc_aware(self) -> None: + """If a stored ts has no timezone offset (legacy row), from_row must attach UTC.""" + e = _entry() + row = e.to_row() + # Strip timezone from the isoformat string to simulate a legacy row + row["ts"] = datetime(2026, 1, 1, 10, 0, 0).isoformat() # naive + recovered = ActivityLogEntry.from_row(row) + assert recovered.ts.tzinfo is not None, "Legacy naive ts must become tz-aware (UTC)" + + def test_metadata_with_numeric_keys_round_trip(self, repo: ActivityLogRepository) -> None: + """JSON only supports string keys; numeric keys are coerced to strings.""" + # This tests that the codec doesn't silently crash on non-string keys + # (Python allows them but JSON does not — json.dumps coerces to string) + meta = {1: "one", "two": 2} + e = _entry(metadata=meta) # type: ignore[arg-type] + repo.record(e) + got = repo.query(ActivityLogFilters(), limit=1)[0] + # json.dumps coerces int key 1 → "1" + assert "1" in got.metadata or 1 in got.metadata + + def test_all_category_values_round_trip(self, repo: ActivityLogRepository) -> None: + """Every ActivityCategory constant must survive storage without corruption.""" + for cat in ActivityCategory.ALL: + repo.record(_entry(category=cat, message=f"cat:{cat}")) + + for cat in ActivityCategory.ALL: + results = repo.query(ActivityLogFilters(categories=[cat]), limit=10) + assert len(results) == 1 + assert results[0].category == cat + + def test_all_severity_values_round_trip(self, repo: ActivityLogRepository) -> None: + """Every ActivitySeverity constant must survive storage without corruption.""" + for sev in ActivitySeverity.ALL: + repo.record(_entry(severity=sev, message=f"sev:{sev}")) + + for sev in ActivitySeverity.ALL: + results = repo.query(ActivityLogFilters(severities=[sev]), limit=10) + assert len(results) == 1 + assert results[0].severity == sev + + +# --------------------------------------------------------------------------- +# 6. Migration idempotency (additional structural checks) +# --------------------------------------------------------------------------- + + +class TestMigrationIdempotencyExtended: + def test_table_has_autoincrement_seq(self, tmp_db: Database) -> None: + """The seq column must be INTEGER PRIMARY KEY AUTOINCREMENT — never reuse deleted seqs.""" + repo = ActivityLogRepository(tmp_db) + e1 = _entry(message="first") + e2 = _entry(message="second") + repo.record(e1) + repo.record(e2) + seq1 = _get_seq(repo, e1.id) + seq2 = _get_seq(repo, e2.id) + assert seq2 > seq1, "AUTOINCREMENT must produce monotonically increasing seqs" + + # After clear, a new record must get a seq higher than the previous max + repo.clear() + e3 = _entry(message="third") + repo.record(e3) + seq3 = _get_seq(repo, e3.id) + assert seq3 > seq2, "AUTOINCREMENT must not reuse seqs after DELETE" + + def test_all_expected_indexes_present(self, tmp_db: Database) -> None: + """All 5 indexes declared in the acceptance criteria must exist.""" + ActivityLogRepository(tmp_db) + + cursor = tmp_db.execute( + "SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='activity_log'" + ) + index_names = {row["name"] for row in cursor.fetchall()} + required = { + "idx_activity_log_ts_seq", + "idx_activity_log_category", + "idx_activity_log_severity", + "idx_activity_log_actor", + "idx_activity_log_entity", + } + missing = required - index_names + assert not missing, f"Missing indexes: {missing}" + + def test_id_column_has_unique_constraint(self, tmp_db: Database) -> None: + """Inserting duplicate id must raise IntegrityError.""" + import sqlite3 as sqlite_module + + repo = ActivityLogRepository(tmp_db) + fixed_id = f"al_{uuid.uuid4().hex[:8]}" + repo.record(_entry(id=fixed_id, message="first")) + + with pytest.raises((Exception, sqlite_module.IntegrityError)): + repo.record(_entry(id=fixed_id, message="duplicate id")) + + def test_migration_name_is_002_add_activity_log(self, tmp_db: Database) -> None: + """The migration name must exactly match '002_add_activity_log'.""" + from ledgrab.storage.data_migrations import AddActivityLogTableMigration + + migration = AddActivityLogTableMigration() + assert migration.name == "002_add_activity_log" + + def test_migration_is_second_in_all_migrations(self) -> None: + """AddActivityLogTableMigration must be at index [1] in ALL_MIGRATIONS.""" + from ledgrab.storage.data_migrations import ( + ALL_MIGRATIONS, + AddActivityLogTableMigration, + ) + + assert len(ALL_MIGRATIONS) >= 2, "ALL_MIGRATIONS must have at least 2 entries" + assert isinstance( + ALL_MIGRATIONS[1], AddActivityLogTableMigration + ), "AddActivityLogTableMigration must be the second migration (index 1)" + + def test_apply_twice_is_noop_no_error(self, tmp_db: Database) -> None: + """Calling apply() on the connection twice must not raise — IF NOT EXISTS ensures this.""" + from ledgrab.storage.data_migrations import AddActivityLogTableMigration + + migration = AddActivityLogTableMigration() + with tmp_db.transaction() as conn: + migration.apply(conn) + # Second apply — must not raise + with tmp_db.transaction() as conn: + migration.apply(conn) + + # Table should still be accessible + cursor = tmp_db.execute("SELECT COUNT(*) AS cnt FROM activity_log") + assert cursor.fetchone()["cnt"] == 0 + + +# --------------------------------------------------------------------------- +# 7. iter_export vs query consistency +# --------------------------------------------------------------------------- + + +class TestIterExportConsistency: + def test_iter_export_empty_table_yields_nothing(self, repo: ActivityLogRepository) -> None: + """iter_export on empty table must yield nothing, not raise.""" + exported = list(repo.iter_export()) + assert exported == [] + + def test_iter_export_matches_query_results(self, repo: ActivityLogRepository) -> None: + """iter_export(filters) and query(filters) must return the same entries.""" + for i in range(8): + cat = ActivityCategory.AUTH if i % 2 == 0 else ActivityCategory.DEVICE + repo.record(_entry(category=cat, message=f"e{i}")) + + filters = ActivityLogFilters(categories=[ActivityCategory.AUTH]) + + exported_ids = {e.id for e in repo.iter_export(filters)} + queried_ids = {e.id for e in repo.query(filters, limit=100)} + assert ( + exported_ids == queried_ids + ), "iter_export and query must return the same set of entries for the same filters" + + def test_iter_export_none_filter_yields_all(self, repo: ActivityLogRepository) -> None: + """iter_export(None) must yield all rows (same as query with no filter).""" + for i in range(5): + repo.record(_entry(message=f"e{i}")) + + all_exported = list(repo.iter_export(None)) + all_queried = repo.query(ActivityLogFilters(), limit=100) + + assert len(all_exported) == len(all_queried) == 5 + + def test_iter_export_ascending_seq_order(self, repo: ActivityLogRepository) -> None: + """iter_export must yield rows in ascending seq order (oldest first).""" + base = datetime(2026, 1, 1, tzinfo=timezone.utc) + for i in range(5): + repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}")) + + exported = list(repo.iter_export()) + seqs = [_get_seq(repo, e.id) for e in exported] + assert seqs == sorted(seqs), "iter_export must yield rows in ascending seq order" + + def test_iter_export_respects_message_like_filter(self, repo: ActivityLogRepository) -> None: + """iter_export should honour message_like just as query does.""" + repo.record(_entry(message="found: hello world")) + repo.record(_entry(message="nothing relevant here")) + repo.record(_entry(message="also found: hello there")) + + exported = list(repo.iter_export(ActivityLogFilters(message_like="found"))) + assert len(exported) == 2 + assert all("found" in e.message for e in exported) + + def test_iter_export_is_lazy_generator(self, repo: ActivityLogRepository) -> None: + """iter_export must return a generator (lazy), not a list.""" + import types + + for _ in range(3): + repo.record(_entry()) + + result = repo.iter_export() + assert isinstance( + result, types.GeneratorType + ), "iter_export must return a generator for streaming — not a pre-loaded list" diff --git a/server/tests/storage/test_activity_log_repository.py b/server/tests/storage/test_activity_log_repository.py new file mode 100644 index 0000000..906a055 --- /dev/null +++ b/server/tests/storage/test_activity_log_repository.py @@ -0,0 +1,609 @@ +"""Unit tests for ActivityLogRepository (Phase 1 — storage layer). + +Coverage +-------- +* round-trip: record + read back, including metadata JSON and UTC ts +* filter by each dimension: category, severity, actor, entity_type/id, date range, message_like +* keyset pagination stability with same-ts rows (seq tiebreaker) +* prune by age (before_ts) and by max_entries +* clear; count (filtered + unfiltered); export iterator +* migration idempotency: constructing the repo twice does not re-run the migration +""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta, timezone + +import pytest + +from ledgrab.storage.activity_log import ( + ActivityCategory, + ActivityLogEntry, + ActivityLogFilters, + ActivitySeverity, +) +from ledgrab.storage.activity_log_repository import ActivityLogRepository +from ledgrab.storage.database import Database + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _now() -> datetime: + return datetime.now(timezone.utc) + + +_SENTINEL = object() # sentinel for "caller did not pass this kwarg" + + +def _entry( + *, + id: str | None = None, + ts: datetime | None = None, + category: str = ActivityCategory.ENTITY, + action: str = "entity.created", + severity: str = ActivitySeverity.INFO, + actor: str = "test_actor", + entity_type: str | None = "output_target", + entity_id: object = _SENTINEL, + entity_name: str | None = "My Target", + message: str = "Created output target", + metadata: dict | None = None, +) -> ActivityLogEntry: + """Build a test ``ActivityLogEntry``. + + ``entity_id`` defaults to a random id when not supplied at all. Pass + ``entity_id=None`` explicitly to get ``None`` stored in the entry. + """ + resolved_entity_id: str | None = ( + f"ot_{uuid.uuid4().hex[:8]}" if entity_id is _SENTINEL else entity_id # type: ignore[assignment] + ) + return ActivityLogEntry( + id=id or f"al_{uuid.uuid4().hex[:8]}", + ts=ts or _now(), + category=category, + action=action, + severity=severity, + actor=actor, + entity_type=entity_type, + entity_id=resolved_entity_id, + entity_name=entity_name, + message=message, + metadata=metadata if metadata is not None else {}, + ) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def repo(tmp_db: Database) -> ActivityLogRepository: + """Fresh ActivityLogRepository backed by a temp database.""" + return ActivityLogRepository(tmp_db) + + +# --------------------------------------------------------------------------- +# Round-trip +# --------------------------------------------------------------------------- + + +class TestRoundTrip: + def test_record_and_read_back(self, repo: ActivityLogRepository) -> None: + e = _entry(message="Hello world", metadata={"key": "value", "n": 42}) + repo.record(e) + + page = repo.query(ActivityLogFilters(), limit=10) + assert len(page) == 1 + got = page[0] + assert got.id == e.id + assert got.category == e.category + assert got.action == e.action + assert got.severity == e.severity + assert got.actor == e.actor + assert got.entity_type == e.entity_type + assert got.entity_id == e.entity_id + assert got.entity_name == e.entity_name + assert got.message == e.message + + def test_metadata_json_round_trip(self, repo: ActivityLogRepository) -> None: + meta = {"device": "wled_01", "led_count": 150, "nested": {"x": True}} + e = _entry(metadata=meta) + repo.record(e) + + got = repo.query(ActivityLogFilters(), limit=1)[0] + assert got.metadata == meta + + def test_utc_timestamp_preserved(self, repo: ActivityLogRepository) -> None: + ts = datetime(2026, 1, 15, 12, 30, 45, tzinfo=timezone.utc) + e = _entry(ts=ts) + repo.record(e) + + got = repo.query(ActivityLogFilters(), limit=1)[0] + # Should round-trip to the same UTC moment + assert got.ts.replace(tzinfo=timezone.utc) == ts.replace(tzinfo=timezone.utc) + + def test_none_optional_fields_preserved(self, repo: ActivityLogRepository) -> None: + e = _entry(entity_type=None, entity_id=None, entity_name=None) + repo.record(e) + + got = repo.query(ActivityLogFilters(), limit=1)[0] + assert got.entity_type is None + assert got.entity_id is None + assert got.entity_name is None + + def test_empty_metadata_default(self, repo: ActivityLogRepository) -> None: + e = _entry(metadata={}) + repo.record(e) + + got = repo.query(ActivityLogFilters(), limit=1)[0] + assert got.metadata == {} + + +# --------------------------------------------------------------------------- +# Filters +# --------------------------------------------------------------------------- + + +class TestFilters: + def test_filter_by_category(self, repo: ActivityLogRepository) -> None: + repo.record(_entry(category=ActivityCategory.AUTH, action="auth.rejected")) + repo.record(_entry(category=ActivityCategory.DEVICE, action="device.connected")) + repo.record(_entry(category=ActivityCategory.ENTITY, action="entity.deleted")) + + results = repo.query(ActivityLogFilters(categories=[ActivityCategory.AUTH]), limit=10) + assert len(results) == 1 + assert results[0].category == ActivityCategory.AUTH + + def test_filter_by_multiple_categories(self, repo: ActivityLogRepository) -> None: + repo.record(_entry(category=ActivityCategory.AUTH)) + repo.record(_entry(category=ActivityCategory.DEVICE)) + repo.record(_entry(category=ActivityCategory.SYSTEM)) + + results = repo.query( + ActivityLogFilters(categories=[ActivityCategory.AUTH, ActivityCategory.DEVICE]), + limit=10, + ) + assert len(results) == 2 + cats = {r.category for r in results} + assert cats == {ActivityCategory.AUTH, ActivityCategory.DEVICE} + + def test_filter_by_severity(self, repo: ActivityLogRepository) -> None: + repo.record(_entry(severity=ActivitySeverity.INFO)) + repo.record(_entry(severity=ActivitySeverity.WARNING)) + repo.record(_entry(severity=ActivitySeverity.ERROR)) + + results = repo.query(ActivityLogFilters(severities=[ActivitySeverity.ERROR]), limit=10) + assert len(results) == 1 + assert results[0].severity == ActivitySeverity.ERROR + + def test_filter_by_multiple_severities(self, repo: ActivityLogRepository) -> None: + repo.record(_entry(severity=ActivitySeverity.INFO)) + repo.record(_entry(severity=ActivitySeverity.WARNING)) + repo.record(_entry(severity=ActivitySeverity.ERROR)) + + results = repo.query( + ActivityLogFilters(severities=[ActivitySeverity.WARNING, ActivitySeverity.ERROR]), + limit=10, + ) + assert len(results) == 2 + + def test_filter_by_actor(self, repo: ActivityLogRepository) -> None: + repo.record(_entry(actor="alice")) + repo.record(_entry(actor="bob")) + repo.record(_entry(actor="alice")) + + results = repo.query(ActivityLogFilters(actor="alice"), limit=10) + assert len(results) == 2 + assert all(r.actor == "alice" for r in results) + + def test_filter_by_entity_type(self, repo: ActivityLogRepository) -> None: + repo.record(_entry(entity_type="output_target")) + repo.record(_entry(entity_type="device")) + repo.record(_entry(entity_type="output_target")) + + results = repo.query(ActivityLogFilters(entity_type="output_target"), limit=10) + assert len(results) == 2 + + def test_filter_by_entity_id(self, repo: ActivityLogRepository) -> None: + repo.record(_entry(entity_id="ot_aabbccdd")) + repo.record(_entry(entity_id="ot_11223344")) + repo.record(_entry(entity_id="ot_aabbccdd")) + + results = repo.query(ActivityLogFilters(entity_id="ot_aabbccdd"), limit=10) + assert len(results) == 2 + + def test_filter_by_since(self, repo: ActivityLogRepository) -> None: + base = datetime(2026, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + repo.record(_entry(ts=base - timedelta(hours=2), message="old")) + repo.record(_entry(ts=base, message="boundary")) + repo.record(_entry(ts=base + timedelta(hours=1), message="new")) + + results = repo.query(ActivityLogFilters(since=base), limit=10) + assert len(results) == 2 + messages = {r.message for r in results} + assert "old" not in messages + + def test_filter_by_until(self, repo: ActivityLogRepository) -> None: + base = datetime(2026, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + repo.record(_entry(ts=base - timedelta(hours=1), message="old")) + repo.record(_entry(ts=base, message="boundary")) + repo.record(_entry(ts=base + timedelta(hours=2), message="new")) + + results = repo.query(ActivityLogFilters(until=base), limit=10) + assert len(results) == 2 + messages = {r.message for r in results} + assert "new" not in messages + + def test_filter_message_like_substring(self, repo: ActivityLogRepository) -> None: + repo.record(_entry(message="Created output target MyStrip")) + repo.record(_entry(message="Deleted device sensor-01")) + repo.record(_entry(message="Updated output target MyStrip")) + + results = repo.query(ActivityLogFilters(message_like="output target"), limit=10) + assert len(results) == 2 + + def test_filter_message_like_escapes_percent(self, repo: ActivityLogRepository) -> None: + """A literal % in message_like should not act as a SQL wildcard.""" + repo.record(_entry(message="100% done")) + repo.record(_entry(message="partial done")) + + results = repo.query(ActivityLogFilters(message_like="100%"), limit=10) + assert len(results) == 1 + assert results[0].message == "100% done" + + def test_combined_filters(self, repo: ActivityLogRepository) -> None: + repo.record( + _entry( + actor="alice", + category=ActivityCategory.ENTITY, + severity=ActivitySeverity.INFO, + ) + ) + repo.record( + _entry( + actor="alice", + category=ActivityCategory.AUTH, + severity=ActivitySeverity.WARNING, + ) + ) + repo.record( + _entry( + actor="bob", + category=ActivityCategory.ENTITY, + severity=ActivitySeverity.INFO, + ) + ) + + results = repo.query( + ActivityLogFilters( + actor="alice", + categories=[ActivityCategory.ENTITY], + severities=[ActivitySeverity.INFO], + ), + limit=10, + ) + assert len(results) == 1 + assert results[0].actor == "alice" + assert results[0].category == ActivityCategory.ENTITY + + +# --------------------------------------------------------------------------- +# Keyset pagination +# --------------------------------------------------------------------------- + + +class TestKeysetPagination: + def test_basic_pagination(self, repo: ActivityLogRepository) -> None: + """Records returned across two pages cover the full set without overlap.""" + for i in range(10): + repo.record(_entry(message=f"entry {i}")) + + page1 = repo.query(ActivityLogFilters(), limit=4) + assert len(page1) == 4 + + # The last entry on page 1 has the smallest seq — use it as the cursor + # We need the seq; query internally reverses, so page1[0] is oldest on page + # and page1[-1] is newest on page. We need the min seq to paginate. + # The repo returns entries in ascending order within a page, so page1[0] + # has the smallest seq on the page. + first_seq = self._get_seq(repo, page1[0].id) + page2 = repo.query(ActivityLogFilters(), before_seq=first_seq, limit=4) + assert len(page2) == 4 + + ids1 = {e.id for e in page1} + ids2 = {e.id for e in page2} + assert ids1.isdisjoint(ids2), "Pages must not overlap" + + page3 = repo.query( + ActivityLogFilters(), + before_seq=self._get_seq(repo, page2[0].id), + limit=4, + ) + assert len(page3) == 2 # 10 total; 4 + 4 + 2 + + def test_same_ts_stability(self, repo: ActivityLogRepository) -> None: + """Rows with identical ts are ordered by seq, not ts — no duplicates across pages.""" + # Insert 6 rows all sharing the exact same timestamp + same_ts = datetime(2026, 3, 10, 15, 0, 0, tzinfo=timezone.utc) + entries = [_entry(ts=same_ts, message=f"same-ts {i}") for i in range(6)] + for e in entries: + repo.record(e) + + page1 = repo.query(ActivityLogFilters(), limit=3) + first_seq = self._get_seq(repo, page1[0].id) + page2 = repo.query(ActivityLogFilters(), before_seq=first_seq, limit=3) + + ids1 = {e.id for e in page1} + ids2 = {e.id for e in page2} + assert ids1.isdisjoint(ids2), "Same-ts rows leaked across page boundary" + assert ids1 | ids2 == {e.id for e in entries}, "All rows covered exactly once" + + def test_empty_page_at_end(self, repo: ActivityLogRepository) -> None: + """Requesting a page beyond the last entry returns an empty list.""" + for i in range(3): + repo.record(_entry(message=f"e{i}")) + + page1 = repo.query(ActivityLogFilters(), limit=3) + assert len(page1) == 3 + first_seq = self._get_seq(repo, page1[0].id) + page2 = repo.query(ActivityLogFilters(), before_seq=first_seq, limit=3) + assert page2 == [] + + @staticmethod + def _get_seq(repo: ActivityLogRepository, entry_id: str) -> int: + """Helper: retrieve the seq for an entry by its application id.""" + cursor = repo._db.execute("SELECT seq FROM activity_log WHERE id = ?", (entry_id,)) + row = cursor.fetchone() + assert row is not None, f"No row found for id={entry_id!r}" + return int(row["seq"]) + + +# --------------------------------------------------------------------------- +# Prune +# --------------------------------------------------------------------------- + + +class TestPrune: + def test_prune_by_age(self, repo: ActivityLogRepository) -> None: + base = datetime(2026, 2, 1, 12, 0, 0, tzinfo=timezone.utc) + repo.record(_entry(ts=base - timedelta(days=10), message="old1")) + repo.record(_entry(ts=base - timedelta(days=5), message="old2")) + repo.record(_entry(ts=base, message="boundary")) + repo.record(_entry(ts=base + timedelta(days=1), message="new")) + + # Prune everything strictly older than base + deleted = repo.prune(before_ts=base) + assert deleted == 2 + remaining = repo.query(ActivityLogFilters(), limit=20) + messages = {r.message for r in remaining} + assert "old1" not in messages + assert "old2" not in messages + assert "boundary" in messages + assert "new" in messages + + def test_prune_by_max_entries(self, repo: ActivityLogRepository) -> None: + for i in range(10): + repo.record(_entry(message=f"entry {i}")) + + deleted = repo.prune(max_entries=3) + assert deleted == 7 + assert repo.count() == 3 + + def test_prune_keeps_newest_on_max_entries(self, repo: ActivityLogRepository) -> None: + base = datetime(2026, 1, 1, tzinfo=timezone.utc) + ids = [] + for i in range(5): + e = _entry(ts=base + timedelta(hours=i), message=f"entry {i}") + ids.append(e.id) + repo.record(e) + + # Keep only the 2 newest + repo.prune(max_entries=2) + remaining = repo.query(ActivityLogFilters(), limit=10) + remaining_ids = {r.id for r in remaining} + # The 2 newest are the last 2 inserted (highest seq) + assert ids[3] in remaining_ids + assert ids[4] in remaining_ids + assert ids[0] not in remaining_ids + + def test_prune_both_predicates(self, repo: ActivityLogRepository) -> None: + base = datetime(2026, 2, 1, 0, 0, 0, tzinfo=timezone.utc) + # Insert 6 entries: 3 old, 3 recent + for i in range(3): + repo.record(_entry(ts=base - timedelta(days=i + 1), message=f"old{i}")) + for i in range(3): + repo.record(_entry(ts=base + timedelta(hours=i), message=f"new{i}")) + + # Prune old entries AND keep at most 2 of the remaining + deleted = repo.prune(before_ts=base, max_entries=2) + # 3 age-pruned + 1 count-pruned = 4 + assert deleted == 4 + assert repo.count() == 2 + + def test_prune_max_entries_zero_clears_all(self, repo: ActivityLogRepository) -> None: + for i in range(5): + repo.record(_entry()) + + repo.prune(max_entries=0) + assert repo.count() == 0 + + def test_prune_no_op_when_below_max(self, repo: ActivityLogRepository) -> None: + for i in range(3): + repo.record(_entry()) + + deleted = repo.prune(max_entries=10) + assert deleted == 0 + assert repo.count() == 3 + + +# --------------------------------------------------------------------------- +# Clear +# --------------------------------------------------------------------------- + + +class TestClear: + def test_clear_returns_row_count(self, repo: ActivityLogRepository) -> None: + for _ in range(5): + repo.record(_entry()) + + deleted = repo.clear() + assert deleted == 5 + + def test_clear_empties_table(self, repo: ActivityLogRepository) -> None: + for _ in range(3): + repo.record(_entry()) + + repo.clear() + assert repo.count() == 0 + + def test_clear_on_empty_table(self, repo: ActivityLogRepository) -> None: + deleted = repo.clear() + assert deleted == 0 + + +# --------------------------------------------------------------------------- +# Count +# --------------------------------------------------------------------------- + + +class TestCount: + def test_count_all(self, repo: ActivityLogRepository) -> None: + for _ in range(7): + repo.record(_entry()) + assert repo.count() == 7 + + def test_count_filtered(self, repo: ActivityLogRepository) -> None: + repo.record(_entry(category=ActivityCategory.AUTH)) + repo.record(_entry(category=ActivityCategory.DEVICE)) + repo.record(_entry(category=ActivityCategory.AUTH)) + + n = repo.count(ActivityLogFilters(categories=[ActivityCategory.AUTH])) + assert n == 2 + + def test_count_empty(self, repo: ActivityLogRepository) -> None: + assert repo.count() == 0 + + def test_count_no_match(self, repo: ActivityLogRepository) -> None: + repo.record(_entry(category=ActivityCategory.ENTITY)) + n = repo.count(ActivityLogFilters(categories=[ActivityCategory.AUTH])) + assert n == 0 + + +# --------------------------------------------------------------------------- +# Export iterator +# --------------------------------------------------------------------------- + + +class TestExportIterator: + def test_iter_export_yields_all_rows(self, repo: ActivityLogRepository) -> None: + entries = [_entry(message=f"e{i}") for i in range(5)] + for e in entries: + repo.record(e) + + exported = list(repo.iter_export()) + assert len(exported) == 5 + exported_ids = {e.id for e in exported} + assert exported_ids == {e.id for e in entries} + + def test_iter_export_ascending_order(self, repo: ActivityLogRepository) -> None: + base = datetime(2026, 4, 1, tzinfo=timezone.utc) + for i in range(5): + repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}")) + + exported = list(repo.iter_export()) + seqs = [ + repo._db.execute("SELECT seq FROM activity_log WHERE id = ?", (e.id,)).fetchone()["seq"] + for e in exported + ] + assert seqs == sorted(seqs), "iter_export must yield rows in ascending seq order" + + def test_iter_export_with_filter(self, repo: ActivityLogRepository) -> None: + repo.record(_entry(category=ActivityCategory.AUTH)) + repo.record(_entry(category=ActivityCategory.ENTITY)) + repo.record(_entry(category=ActivityCategory.AUTH)) + + exported = list(repo.iter_export(ActivityLogFilters(categories=[ActivityCategory.AUTH]))) + assert len(exported) == 2 + assert all(e.category == ActivityCategory.AUTH for e in exported) + + def test_iter_export_streaming_not_all_in_memory(self, repo: ActivityLogRepository) -> None: + """Verify iter_export is a generator (lazy), not a pre-loaded list.""" + import types + + for _ in range(3): + repo.record(_entry()) + + result = repo.iter_export() + assert isinstance(result, types.GeneratorType) + + +# --------------------------------------------------------------------------- +# Migration idempotency +# --------------------------------------------------------------------------- + + +class TestMigrationIdempotency: + def test_construct_repo_twice_is_noop(self, tmp_db: Database) -> None: + """Creating two repos on the same DB does not re-run the migration.""" + repo1 = ActivityLogRepository(tmp_db) + repo1.record(_entry(message="before second construction")) + + # Second construction must not raise or re-apply the migration + repo2 = ActivityLogRepository(tmp_db) + assert repo2.count() == 1 + + # Confirm migration is recorded exactly once + # Count how many times our migration name appears (should be 1) + cursor = tmp_db.execute( + "SELECT COUNT(*) AS cnt FROM data_migrations WHERE name = ?", + ("002_add_activity_log",), + ) + assert cursor.fetchone()["cnt"] == 1 + + def test_running_migrations_twice_is_noop(self, tmp_db: Database) -> None: + """MigrationRunner.run is idempotent for AddActivityLogTableMigration.""" + from ledgrab.storage.data_migrations import ( + AddActivityLogTableMigration, + MigrationRunner, + ) + + runner = MigrationRunner(tmp_db) + migration = AddActivityLogTableMigration() + + first_run = runner.run([migration]) + assert len(first_run) == 1 + assert first_run[0].name == "002_add_activity_log" + + second_run = runner.run([migration]) + assert second_run == [], "Second run must be a no-op" + + def test_activity_log_table_exists_after_migration(self, tmp_db: Database) -> None: + """The activity_log table is present after the migration runs.""" + ActivityLogRepository(tmp_db) + + cursor = tmp_db.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='activity_log'" + ) + assert cursor.fetchone() is not None + + def test_activity_log_indexes_exist_after_migration(self, tmp_db: Database) -> None: + """All declared indexes are present after migration.""" + ActivityLogRepository(tmp_db) + + cursor = tmp_db.execute( + "SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='activity_log'" + ) + index_names = {row["name"] for row in cursor.fetchall()} + expected = { + "idx_activity_log_ts_seq", + "idx_activity_log_category", + "idx_activity_log_severity", + "idx_activity_log_actor", + "idx_activity_log_entity", + } + assert expected.issubset(index_names)