feat(activity-log): phase 1 - storage model, migration, repository

- ActivityLogEntry dataclass + ActivityCategory/ActivitySeverity + ActivityLogFilters
- additive idempotent migration 002_add_activity_log (indexed activity_log table, seq keyset tiebreaker)
- ActivityLogRepository (record/query/count/prune/clear/iter_export), keyset pagination, parameterized SQL
- 102 unit + adversarial tests (SQL-injection, pagination, prune, codec, migration idempotency)
This commit is contained in:
2026-06-09 17:40:37 +03:00
parent 1afe7d6fcc
commit 1ac4a0f66d
8 changed files with 2100 additions and 18 deletions
+3 -3
View File
@@ -45,8 +45,8 @@ context (survives across phases; graduates to CLAUDE.md only if it's a lasting p
## Frozen contracts (fill as phases complete)
- ActivityLogEntry fields / dict shape: _(Phase 1/2 handoff)_
- ActivityLogFilters shape: _(Phase 1 handoff)_
- ActivityLogEntry fields / dict shape: **frozen** — see phase-1-storage.md Handoff section. 11 fields: `id`, `ts`, `category`, `action`, `severity`, `actor`, `message`, `entity_type`, `entity_id`, `entity_name`, `metadata`. `seq` is DB-only (not on dataclass).
- ActivityLogFilters shape: **frozen** — 8 optional fields: `categories`, `severities`, `actor`, `entity_type`, `entity_id`, `since`, `until`, `message_like`. See phase-1-storage.md Handoff.
- recorder.record(...) signature + actor ContextVar import path: _(Phase 2 handoff)_
- API endpoints + query params + page envelope + settings bounds: _(Phase 4 handoff)_
@@ -65,4 +65,4 @@ context (survives across phases; graduates to CLAUDE.md only if it's a lasting p
## Phase progress notes
_(Orchestrator appends a short note per phase: what landed, commit sha, any warnings.)_
Phase 1 landed (2026-06-09): `activity_log.py` (dataclass + enums + filters + codec), `AddActivityLogTableMigration` (`002_add_activity_log`) appended to `ALL_MIGRATIONS`, `ActivityLogRepository` (record/query/count/prune/clear/iter_export), 41 new tests — all green. Full suite 2226 passed, 0 failed. Schema and method signatures frozen in phase-1-storage.md Handoff. Gotcha: `Database.execute` takes a positional tuple — use `?` placeholders (not `:name`), otherwise Python 3.14 will raise `ProgrammingError`.
+2 -2
View File
@@ -59,7 +59,7 @@ is an on-demand CSV/JSON **export** (no separate backup subsystem).
## Phases
- [ ] Phase 1: Storage — model, migration, repository [domain: data] → [subplan](./phase-1-storage.md)
- [x] Phase 1: Storage — model, migration, repository [domain: data] → [subplan](./phase-1-storage.md)
- [ ] Phase 2: Recorder, actor context, retention, lifecycle [domain: backend] → [subplan](./phase-2-recorder-retention.md)
- [ ] Phase 3: Event instrumentation (4 categories) [domain: backend] → [subplan](./phase-3-instrumentation.md)
- [ ] Phase 4: REST API — query/filter/export/settings/clear [domain: backend] → [subplan](./phase-4-api.md)
@@ -79,7 +79,7 @@ is an on-demand CSV/JSON **export** (no separate backup subsystem).
| Phase | Domain | Status | Review | Build | Committed |
|-------|--------|--------|--------|-------|-----------|
| Phase 1: Storage | data | ⬜ Not Started | ⬜ | ⬜ | |
| Phase 1: Storage | data | ✅ Done | ✅ Passed | ✅ Passed | |
| Phase 2: Recorder/Retention | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
| Phase 3: Instrumentation | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
| Phase 4: REST API | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
+88 -13
View File
@@ -1,6 +1,6 @@
# Phase 1: Storage — model, migration, repository
**Status:** ⬜ Not Started
**Status:** ✅ Done
**Parent plan:** [PLAN.md](./PLAN.md)
**Domain:** data
@@ -13,7 +13,7 @@ keyset-paginated filtered query, count, time/count-based prune, and streaming ex
## Tasks
- [ ] Create `server/src/ledgrab/storage/activity_log.py`:
- [x] Create `server/src/ledgrab/storage/activity_log.py`:
- `ActivityCategory` and `ActivitySeverity` string enums (or `Literal` unions used as
constants). Categories: `auth`, `device`, `entity`, `capture`, `system`. Severities:
`info`, `warning`, `error`.
@@ -22,7 +22,7 @@ keyset-paginated filtered query, count, time/count-based prune, and streaming ex
`entity_type: str | None`, `entity_id: str | None`, `entity_name: str | None`,
`message: str`, `metadata: dict` (small JSON; default empty). Provide `to_row()` /
`from_row()` (column tuple/dict ↔ dataclass; `metadata` JSON-encoded; `ts` isoformat).
- [ ] Add migration to `server/src/ledgrab/storage/data_migrations.py`:
- [x] Add migration to `server/src/ledgrab/storage/data_migrations.py`:
- New `DataMigration` subclass `AddActivityLogTableMigration` with unique `name`
(next sequential id, e.g. `"NNN_add_activity_log"` — match existing naming) and
`apply(conn)` creating `activity_log` with an INTEGER PRIMARY KEY AUTOINCREMENT `seq`
@@ -33,7 +33,7 @@ keyset-paginated filtered query, count, time/count-based prune, and streaming ex
- Indexes: `(ts DESC, seq DESC)` (primary keyset/sort), `category`, `severity`, `actor`,
`(entity_type, entity_id)`. Use `CREATE TABLE/INDEX IF NOT EXISTS` for idempotency.
- Append the instance to `ALL_MIGRATIONS` (never reorder existing entries).
- [ ] Create `server/src/ledgrab/storage/activity_log_repository.py`:
- [x] Create `server/src/ledgrab/storage/activity_log_repository.py`:
- `class ActivityLogRepository` taking `db: Database` (NOT subclassing `BaseSqliteStore`).
- `record(entry: ActivityLogEntry) -> None`: single parameterized INSERT via
`db.execute(...)` (auto-commit). The `seq` is DB-assigned. **Caller guarantees this runs
@@ -51,7 +51,7 @@ keyset-paginated filtered query, count, time/count-based prune, and streaming ex
(does not load all rows into memory).
- Define a small `ActivityLogFilters` dataclass (all-optional fields) in the repository or
`activity_log.py` and reuse it across query/count/prune/export.
- [ ] Unit tests in `server/tests/storage/test_activity_log_repository.py`:
- [x] Unit tests in `server/tests/storage/test_activity_log_repository.py`:
- insert + read back round-trip (incl. metadata JSON, UTC ts);
- filter by each dimension (category/severity/actor/entity/date/free-text);
- keyset pagination stability across two pages with same-`ts` rows (seq tiebreaker);
@@ -89,14 +89,89 @@ keyset-paginated filtered query, count, time/count-based prune, and streaming ex
## Review Checklist
- [ ] All tasks completed
- [ ] Code follows project conventions (dataclass codec style, migration naming)
- [ ] No unintended side effects (no startup wiring yet)
- [ ] Build passes (ruff + pytest)
- [ ] Tests pass (new + existing)
- [x] All tasks completed
- [x] Code follows project conventions (dataclass codec style, migration naming)
- [x] No unintended side effects (no startup wiring yet)
- [x] Build passes (ruff + pytest)
- [x] Tests pass (new + existing)
## Handoff to Next Phase
<!-- Filled in by the implementer: final ActivityLogEntry field list + the ActivityLogFilters
shape (Phase 2/4 depend on the frozen schema), the migration name used, and the exact
repository method signatures. -->
### ActivityLogEntry — final field list and dict shape
```python
@dataclass
class ActivityLogEntry:
id: str # "al_<uuid8>" — caller-assigned
ts: datetime # UTC-aware; stored as ISO-8601 string in DB
category: str # ActivityCategory constant
action: str # verb-object label, e.g. "entity.created"
severity: str # ActivitySeverity constant
actor: str # API-key label or "system"
message: str # human-readable description
entity_type: str | None # e.g. "output_target"
entity_id: str | None # stable entity id
entity_name: str | None # name at time of event
metadata: dict # JSON-serialisable; default {}
```
`to_row()` returns a flat dict with 11 keys (same names); `metadata` is JSON string, `ts` is isoformat string. `seq` is NOT in `to_row()` — it is DB-assigned.
### ActivityLogFilters — shape (all fields optional, default None)
```python
@dataclass
class ActivityLogFilters:
categories: Sequence[str] | None # category IN (...)
severities: Sequence[str] | None # severity IN (...)
actor: str | None # exact match
entity_type: str | None # exact match
entity_id: str | None # exact match
since: datetime | None # ts >= since
until: datetime | None # ts <= until
message_like: str | None # LIKE %value% (escaped)
```
### Migration name used
`"002_add_activity_log"` — appended as position [1] in `ALL_MIGRATIONS`.
### ActivityLogRepository — exact method signatures
```python
class ActivityLogRepository:
def __init__(self, db: Database) -> None
def record(self, entry: ActivityLogEntry) -> None
def query(
self,
filters: ActivityLogFilters,
*,
before_seq: int | None = None,
limit: int = 50,
) -> list[ActivityLogEntry]
def count(self, filters: ActivityLogFilters | None = None) -> int
def prune(
self,
*,
before_ts: datetime | None = None,
max_entries: int | None = None,
) -> int
def clear(self) -> int
def iter_export(
self, filters: ActivityLogFilters | None = None
) -> Iterator[ActivityLogEntry]
```
### Key behavioural notes for Phase 2/3/4
- `record()` expects to be called from the event-loop thread (or with `Database` RLock already held). Phase 2 is responsible for thread marshaling via `loop.call_soon_threadsafe`.
- `query()` returns entries in **ascending chronological order within the page** (reversed internally from DESC fetch for display convenience). The smallest `seq` on a page is `page[0]`'s seq — pass that as `before_seq` for the next page.
- `count(None)` == `count(ActivityLogFilters())` — both count all rows.
- `prune(before_ts=X, max_entries=N)` applies both predicates independently (age prune first, then count cap).
- `iter_export` holds `db._lock` for the entire iteration. Phase 4 should stream the response and consume promptly.
- `ActivityLogCategory` and `ActivityLogSeverity` are plain classes with string class-attributes and an `ALL` tuple — NOT `enum.Enum`.
- Imports for Phase 2/3/4:
```python
from ledgrab.storage.activity_log import ActivityLogEntry, ActivityLogFilters, ActivityCategory, ActivitySeverity
from ledgrab.storage.activity_log_repository import ActivityLogRepository
```
+171
View File
@@ -0,0 +1,171 @@
"""Activity / audit log data model.
Defines the ``ActivityLogEntry`` dataclass together with its category and
severity enumerations and the ``ActivityLogFilters`` query object. Row-level
codec (``to_row`` / ``from_row``) converts between the dataclass and a flat
SQLite column dict.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Sequence
# ---------------------------------------------------------------------------
# Enumerations (string constants, not enum.Enum, to stay consistent with the
# rest of the codebase which uses plain string literals)
# ---------------------------------------------------------------------------
class ActivityCategory:
"""Valid ``category`` values for an ``ActivityLogEntry``."""
AUTH = "auth"
DEVICE = "device"
ENTITY = "entity"
CAPTURE = "capture"
SYSTEM = "system"
ALL: tuple[str, ...] = (AUTH, DEVICE, ENTITY, CAPTURE, SYSTEM)
class ActivitySeverity:
"""Valid ``severity`` values for an ``ActivityLogEntry``."""
INFO = "info"
WARNING = "warning"
ERROR = "error"
ALL: tuple[str, ...] = (INFO, WARNING, ERROR)
# ---------------------------------------------------------------------------
# Entry dataclass
# ---------------------------------------------------------------------------
@dataclass
class ActivityLogEntry:
"""One immutable audit record.
Fields
------
id Stable application-assigned identifier, e.g. ``al_<uuid8>``.
ts UTC timestamp of the event (server-assigned at record time).
category Broad bucket — one of :class:`ActivityCategory`.
action Verb-object label, e.g. ``"entity.created"`` or ``"auth.rejected"``.
severity One of :class:`ActivitySeverity`.
actor Who triggered the action, e.g. an API-key label or ``"system"``.
entity_type Optional: kind of entity involved, e.g. ``"output_target"``.
entity_id Optional: stable entity identifier.
entity_name Optional: human-readable entity name at the time of the event.
message Human-readable description suitable for display.
metadata Small structured context (device address, error code, …).
Must be JSON-serialisable; defaults to empty dict.
"""
id: str
ts: datetime
category: str
action: str
severity: str
actor: str
message: str
entity_type: str | None = None
entity_id: str | None = None
entity_name: str | None = None
metadata: dict = field(default_factory=dict)
# -- Row codec -----------------------------------------------------------
def to_row(self) -> dict:
"""Return a flat dict suitable for a parameterised SQLite INSERT.
``ts`` is stored as an ISO-8601 string (UTC). ``metadata`` is stored
as a JSON string. ``seq`` is DB-assigned and is NOT included.
"""
return {
"id": self.id,
"ts": self.ts.isoformat(),
"category": self.category,
"action": self.action,
"severity": self.severity,
"actor": self.actor,
"entity_type": self.entity_type,
"entity_id": self.entity_id,
"entity_name": self.entity_name,
"message": self.message,
"metadata": json.dumps(self.metadata, ensure_ascii=False),
}
@staticmethod
def from_row(row: dict) -> "ActivityLogEntry":
"""Reconstruct an ``ActivityLogEntry`` from a SQLite row dict.
``row`` may include the ``seq`` column — it is ignored here (callers
that need ``seq`` for keyset pagination access it directly from the
row before calling this method).
"""
raw_meta = row.get("metadata") or "{}"
try:
metadata = json.loads(raw_meta)
except (json.JSONDecodeError, TypeError):
metadata = {}
raw_ts = row["ts"]
ts = datetime.fromisoformat(raw_ts)
# Ensure tz-aware UTC even if stored without offset (legacy rows)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
return ActivityLogEntry(
id=row["id"],
ts=ts,
category=row["category"],
action=row["action"],
severity=row["severity"],
actor=row["actor"],
entity_type=row.get("entity_type"),
entity_id=row.get("entity_id"),
entity_name=row.get("entity_name"),
message=row["message"],
metadata=metadata,
)
# ---------------------------------------------------------------------------
# Filters dataclass
# ---------------------------------------------------------------------------
@dataclass
class ActivityLogFilters:
"""Optional query-time filters for :class:`ActivityLogRepository`.
All fields are optional. ``None`` / empty sequence means "no restriction
on this dimension".
Fields
------
categories Restrict to entries whose ``category`` is in this set.
severities Restrict to entries whose ``severity`` is in this set.
actor Exact match on the ``actor`` field.
entity_type Exact match on the ``entity_type`` field.
entity_id Exact match on the ``entity_id`` field.
since Inclusive lower bound on ``ts`` (``ts >= since``).
until Inclusive upper bound on ``ts`` (``ts <= until``).
message_like Free-text substring match on ``message`` (LIKE ``%value%``).
The value is escaped — no SQL injection risk.
"""
categories: Sequence[str] | None = None
severities: Sequence[str] | None = None
actor: str | None = None
entity_type: str | None = None
entity_id: str | None = None
since: datetime | None = None
until: datetime | None = None
message_like: str | None = None
@@ -0,0 +1,293 @@
"""Append-only repository for the persistent activity / audit log.
Design notes
------------
* Does NOT subclass ``BaseSqliteStore`` — that base loads every row into an
in-memory cache at construction time, which is wrong for an unbounded,
append-heavy log.
* All SQL parameters are passed as positional ``?`` placeholders — no user
input is interpolated into the query string.
* Keyset pagination is implemented via the ``seq`` INTEGER PRIMARY KEY
AUTOINCREMENT column, which is strictly monotonic and survives rows with
identical ``ts`` values.
* The repository takes a ``Database`` instance and calls ``db.execute`` /
``db.transaction`` directly.
* Thread safety: ``Database.execute`` holds the ``RLock`` for the duration of
each call. The repository itself adds no extra locking. Callers that
originate from non-event-loop threads MUST marshal via
``loop.call_soon_threadsafe`` (that is Phase 2's responsibility).
"""
from __future__ import annotations
import sqlite3
from datetime import datetime
from typing import Iterator
from ledgrab.storage.activity_log import ActivityLogEntry, ActivityLogFilters
from ledgrab.storage.database import Database
from ledgrab.utils import get_logger
logger = get_logger(__name__)
_TABLE = "activity_log"
def _build_filter_clause(
filters: ActivityLogFilters,
params: list,
*,
extra_where: str | None = None,
) -> str:
"""Return a WHERE clause string and append bound parameters to *params*.
The caller is responsible for providing the opening ``WHERE`` keyword when
the returned string is non-empty, or ``AND`` when appending to an existing
predicate. This function returns only the condition fragment(s) joined by
``AND``, or an empty string when *filters* has no restrictions.
``extra_where`` is prepended (with AND) before the filter conditions if
provided (used for the ``seq < ?`` keyset predicate).
"""
conditions: list[str] = []
if extra_where:
conditions.append(extra_where)
if filters.categories:
placeholders = ",".join("?" * len(filters.categories))
conditions.append(f"category IN ({placeholders})")
params.extend(filters.categories)
if filters.severities:
placeholders = ",".join("?" * len(filters.severities))
conditions.append(f"severity IN ({placeholders})")
params.extend(filters.severities)
if filters.actor is not None:
conditions.append("actor = ?")
params.append(filters.actor)
if filters.entity_type is not None:
conditions.append("entity_type = ?")
params.append(filters.entity_type)
if filters.entity_id is not None:
conditions.append("entity_id = ?")
params.append(filters.entity_id)
if filters.since is not None:
conditions.append("ts >= ?")
params.append(filters.since.isoformat())
if filters.until is not None:
conditions.append("ts <= ?")
params.append(filters.until.isoformat())
if filters.message_like is not None:
# Escape LIKE special characters in the user-supplied substring so that
# a literal '%' or '_' in the message does not act as a wildcard.
escaped = filters.message_like.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
conditions.append("message LIKE ? ESCAPE '\\'")
params.append(f"%{escaped}%")
return " AND ".join(conditions)
class ActivityLogRepository:
"""Purpose-built repository for the ``activity_log`` table.
Parameters
----------
db:
The shared ``Database`` singleton. The migration that creates the
``activity_log`` table must have been applied before the first call to
:meth:`record`. Construction triggers migration via
``MigrationRunner`` so users can create the repository directly
without a separate startup step.
"""
def __init__(self, db: Database) -> None:
self._db = db
# Apply pending migrations (idempotent; most runs are no-ops)
from ledgrab.storage.data_migrations import ALL_MIGRATIONS, MigrationRunner
MigrationRunner(db).run(ALL_MIGRATIONS)
# -- Write ---------------------------------------------------------------
def record(self, entry: ActivityLogEntry) -> None:
"""Append *entry* to the log.
The ``seq`` column is assigned by SQLite (AUTOINCREMENT) — ``entry``
does not carry a ``seq`` field.
Caller contract: this must be called from the event-loop thread (or
from a context already serialised through the ``Database`` RLock).
Cross-thread callers must marshal via ``loop.call_soon_threadsafe``;
that is Phase 2's responsibility.
"""
row = entry.to_row()
self._db.execute(
f"INSERT INTO {_TABLE} "
f"(id, ts, category, action, severity, actor, "
f" entity_type, entity_id, entity_name, message, metadata) "
f"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
row["id"],
row["ts"],
row["category"],
row["action"],
row["severity"],
row["actor"],
row["entity_type"],
row["entity_id"],
row["entity_name"],
row["message"],
row["metadata"],
),
)
# -- Read ----------------------------------------------------------------
def query(
self,
filters: ActivityLogFilters,
*,
before_seq: int | None = None,
limit: int = 50,
) -> list[ActivityLogEntry]:
"""Return the newest matching entries, oldest-first within the page.
Keyset pagination: pass the smallest ``seq`` seen on the previous page
as *before_seq* to get the next page. Entries are fetched in
``seq DESC`` order from SQLite and then reversed before returning so
that the caller receives them in chronological order within the page.
Parameters
----------
filters:
Optional filter criteria.
before_seq:
Exclusive upper bound on ``seq``. ``None`` returns the first
(newest) page.
limit:
Maximum number of entries to return.
"""
params: list = []
keyset = "seq < ?" if before_seq is not None else None
if before_seq is not None:
params.append(before_seq)
where_fragment = _build_filter_clause(filters, params, extra_where=keyset)
where_clause = f"WHERE {where_fragment}" if where_fragment else ""
params.append(limit)
sql = (
f"SELECT seq, id, ts, category, action, severity, actor, "
f"entity_type, entity_id, entity_name, message, metadata "
f"FROM {_TABLE} "
f"{where_clause} "
f"ORDER BY seq DESC "
f"LIMIT ?"
)
cursor = self._db.execute(sql, tuple(params))
rows = cursor.fetchall()
# Reverse to return chronological order within the page
return [ActivityLogEntry.from_row(dict(row)) for row in reversed(rows)]
def count(self, filters: ActivityLogFilters | None = None) -> int:
"""Return the number of entries matching *filters* (or all entries)."""
if filters is None:
filters = ActivityLogFilters()
params: list = []
where_fragment = _build_filter_clause(filters, params)
where_clause = f"WHERE {where_fragment}" if where_fragment else ""
sql = f"SELECT COUNT(*) AS cnt FROM {_TABLE} {where_clause}"
cursor = self._db.execute(sql, tuple(params))
row = cursor.fetchone()
return int(row["cnt"])
# -- Maintenance ---------------------------------------------------------
def prune(
self,
*,
before_ts: datetime | None = None,
max_entries: int | None = None,
) -> int:
"""Delete old / excess entries; return the total rows deleted.
Both predicates are applied independently:
1. Delete all rows where ``ts < before_ts`` (age-based pruning).
2. Keep only the newest ``max_entries`` rows, deleting the rest
(count-based pruning).
The two operations run in separate statements so that each can be
independently enabled or disabled.
"""
deleted = 0
if before_ts is not None:
cursor = self._db.execute(
f"DELETE FROM {_TABLE} WHERE ts < ?",
(before_ts.isoformat(),),
)
deleted += cursor.rowcount
if max_entries is not None and max_entries >= 0:
# Find the seq of the Nth newest row; delete everything older than it.
cursor = self._db.execute(
f"SELECT seq FROM {_TABLE} ORDER BY seq DESC LIMIT 1 OFFSET ?",
(max_entries,),
)
cutoff_row = cursor.fetchone()
if cutoff_row is not None:
cutoff_seq = cutoff_row["seq"]
cursor = self._db.execute(
f"DELETE FROM {_TABLE} WHERE seq <= ?",
(cutoff_seq,),
)
deleted += cursor.rowcount
return deleted
def clear(self) -> int:
"""Delete all entries; return the number of rows deleted."""
cursor = self._db.execute(f"DELETE FROM {_TABLE}")
return cursor.rowcount
# -- Export --------------------------------------------------------------
def iter_export(self, filters: ActivityLogFilters | None = None) -> Iterator[ActivityLogEntry]:
"""Yield all matching entries in ascending ``seq`` order.
Uses a server-side cursor so the entire result set is never loaded
into memory — safe for large tables. The connection's ``RLock`` is
held for the duration of the iteration; callers should consume this
iterator promptly.
"""
if filters is None:
filters = ActivityLogFilters()
params: list = []
where_fragment = _build_filter_clause(filters, params)
where_clause = f"WHERE {where_fragment}" if where_fragment else ""
sql = (
f"SELECT seq, id, ts, category, action, severity, actor, "
f"entity_type, entity_id, entity_name, message, metadata "
f"FROM {_TABLE} "
f"{where_clause} "
f"ORDER BY seq ASC"
)
# Use the raw connection directly to get a streaming cursor.
# We borrow the lock for the full iteration.
with self._db._lock: # noqa: SLF001 — internal access; no public cursor API
cursor: sqlite3.Cursor = self._db._conn.execute(sql, tuple(params)) # noqa: SLF001
for row in cursor:
yield ActivityLogEntry.from_row(dict(row))
@@ -213,7 +213,57 @@ class StaticToSingleColorMigration(DataMigration):
return rows_changed
class AddActivityLogTableMigration(DataMigration):
"""Create the ``activity_log`` table and its indexes.
This is a purely additive migration — it does not touch any existing table.
``CREATE TABLE / INDEX IF NOT EXISTS`` ensures idempotency if the migration
somehow runs twice (e.g. after a partial restore).
"""
name = "002_add_activity_log"
def apply(self, conn: sqlite3.Connection) -> int:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS activity_log (
seq INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT UNIQUE NOT NULL,
ts TEXT NOT NULL,
category TEXT NOT NULL,
action TEXT NOT NULL,
severity TEXT NOT NULL,
actor TEXT NOT NULL,
entity_type TEXT,
entity_id TEXT,
entity_name TEXT,
message TEXT NOT NULL,
metadata TEXT NOT NULL DEFAULT '{}'
)
"""
)
# Primary read path: newest-first keyset pagination
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_activity_log_ts_seq "
"ON activity_log (ts DESC, seq DESC)"
)
# Filter indexes
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_activity_log_category " "ON activity_log (category)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_activity_log_severity " "ON activity_log (severity)"
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_activity_log_actor " "ON activity_log (actor)")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_activity_log_entity "
"ON activity_log (entity_type, entity_id)"
)
return 0
# Master list — ORDER MATTERS. Append new migrations; never reorder.
ALL_MIGRATIONS: list[DataMigration] = [
StaticToSingleColorMigration(),
AddActivityLogTableMigration(),
]
@@ -0,0 +1,884 @@
"""Adversarial / edge-case tests for ActivityLogRepository (Phase 1 — storage layer).
These tests are intentionally skeptical — they derive expected behaviour from the
acceptance criteria in plans/activity-log/phase-1-storage.md, NOT from what the
code happens to do today. If a test fails, it is a real bug.
Coverage areas
--------------
1. SQL-injection / parameterization safety (message_like with %, _, ;, --, quotes, etc.)
2. Keyset pagination edge cases (empty table, before_seq bounds, stability,
ordering contract, no duplicates/gaps)
3. Prune edge cases (before_ts only, max_entries only, both,
max_entries=0, larger than count, deleted count,
keeps NEWEST entries)
4. Filter combination edge cases (AND semantics, empty sequence vs None,
since/until inclusive bounds, tz-aware datetimes)
5. Codec / data integrity (metadata round-trip: nested, unicode, JSON-escape;
entity_* None vs empty string; microsecond ts)
6. Migration idempotency (table + all indexes present; double-run is no-op)
7. iter_export vs query consistency (same filters yield same rows; empty table; filter)
"""
from __future__ import annotations
import uuid
from datetime import datetime, timedelta, timezone
import pytest
from ledgrab.storage.activity_log import (
ActivityCategory,
ActivityLogEntry,
ActivityLogFilters,
ActivitySeverity,
)
from ledgrab.storage.activity_log_repository import ActivityLogRepository
from ledgrab.storage.database import Database
# ---------------------------------------------------------------------------
# Helpers (mirror the implementer's helpers so tests are self-contained)
# ---------------------------------------------------------------------------
def _now() -> datetime:
return datetime.now(timezone.utc)
_SENTINEL = object()
def _entry(
*,
id: str | None = None,
ts: datetime | None = None,
category: str = ActivityCategory.ENTITY,
action: str = "entity.created",
severity: str = ActivitySeverity.INFO,
actor: str = "test_actor",
entity_type: str | None = "output_target",
entity_id: object = _SENTINEL,
entity_name: str | None = "My Target",
message: str = "Created output target",
metadata: dict | None = None,
) -> ActivityLogEntry:
resolved_entity_id: str | None = (
f"ot_{uuid.uuid4().hex[:8]}" if entity_id is _SENTINEL else entity_id # type: ignore[assignment]
)
return ActivityLogEntry(
id=id or f"al_{uuid.uuid4().hex[:8]}",
ts=ts or _now(),
category=category,
action=action,
severity=severity,
actor=actor,
entity_type=entity_type,
entity_id=resolved_entity_id,
entity_name=entity_name,
message=message,
metadata=metadata if metadata is not None else {},
)
@pytest.fixture
def repo(tmp_db: Database) -> ActivityLogRepository:
"""Fresh ActivityLogRepository backed by a temp database."""
return ActivityLogRepository(tmp_db)
def _get_seq(repo: ActivityLogRepository, entry_id: str) -> int:
cursor = repo._db.execute("SELECT seq FROM activity_log WHERE id = ?", (entry_id,))
row = cursor.fetchone()
assert row is not None, f"No row found for id={entry_id!r}"
return int(row["seq"])
# ---------------------------------------------------------------------------
# 1. SQL-injection / parameterization safety
# ---------------------------------------------------------------------------
class TestSQLInjectionSafety:
"""All user-supplied filter values must be treated as literal text, not SQL."""
def test_message_like_percent_is_literal(self, repo: ActivityLogRepository) -> None:
"""A literal '%' in message_like must NOT act as a LIKE wildcard."""
repo.record(_entry(message="100% done"))
repo.record(_entry(message="all done"))
repo.record(_entry(message="percent sign here"))
results = repo.query(ActivityLogFilters(message_like="100%"), limit=10)
assert len(results) == 1, "% in message_like should be a literal percent, not a wildcard"
assert results[0].message == "100% done"
def test_message_like_underscore_is_literal(self, repo: ActivityLogRepository) -> None:
"""A literal '_' in message_like must NOT act as a single-char wildcard."""
repo.record(_entry(message="device_01"))
repo.record(_entry(message="device001")) # would match if _ were a wildcard
repo.record(_entry(message="some other message"))
results = repo.query(ActivityLogFilters(message_like="device_01"), limit=10)
assert (
len(results) == 1
), "_ in message_like should be a literal underscore, not a single-char wildcard"
assert results[0].message == "device_01"
def test_message_like_single_quote_does_not_break_query(
self, repo: ActivityLogRepository
) -> None:
"""A single quote in message_like must not cause a SQL syntax error."""
repo.record(_entry(message="it's working"))
repo.record(_entry(message="no quote here"))
# Must not raise
results = repo.query(ActivityLogFilters(message_like="it's"), limit=10)
assert len(results) == 1
assert results[0].message == "it's working"
def test_message_like_semicolon_does_not_execute_second_statement(
self, repo: ActivityLogRepository
) -> None:
"""';' in message_like must not let a second SQL statement execute."""
repo.record(_entry(message="a; DROP TABLE activity_log; --"))
repo.record(_entry(message="safe message"))
# If injection succeeded, table would be dropped and next call would error
results = repo.query(
ActivityLogFilters(message_like="a; DROP TABLE activity_log; --"), limit=10
)
# Table must still exist
assert repo.count() == 2
assert len(results) == 1
def test_message_like_sql_comment_sequence(self, repo: ActivityLogRepository) -> None:
"""'--' (SQL comment) in message_like must be treated literally."""
repo.record(_entry(message="value -- comment"))
repo.record(_entry(message="value no comment"))
results = repo.query(ActivityLogFilters(message_like="value --"), limit=10)
assert len(results) == 1
assert results[0].message == "value -- comment"
def test_message_like_backslash_literal(self, repo: ActivityLogRepository) -> None:
"""Backslash in message_like must be treated as a literal character."""
repo.record(_entry(message="path\\to\\file"))
repo.record(_entry(message="path/to/file"))
results = repo.query(ActivityLogFilters(message_like="path\\to"), limit=10)
assert len(results) == 1
assert results[0].message == "path\\to\\file"
def test_message_like_classic_injection_pattern(self, repo: ActivityLogRepository) -> None:
"""Classic ') OR '1'='1 injection attempt must return no false positives."""
repo.record(_entry(message="innocent message"))
repo.record(_entry(message="another message"))
# If injection worked, all rows would match
results = repo.query(ActivityLogFilters(message_like="') OR '1'='1"), limit=10)
assert (
len(results) == 0
), "Injection payload matched rows it shouldn't — parameterization may be broken"
def test_message_like_all_wildcards_returns_nothing_for_no_match(
self, repo: ActivityLogRepository
) -> None:
"""'%_%' as a literal search term should return no rows unless that exact
substring appears in a message."""
repo.record(_entry(message="some message"))
results = repo.query(ActivityLogFilters(message_like="%_%"), limit=10)
# '%_%' as literal text does not appear in "some message"
assert (
len(results) == 0
), "% and _ in message_like were treated as SQL wildcards instead of literals"
def test_actor_exact_match_not_like(self, repo: ActivityLogRepository) -> None:
"""actor filter is exact match — SQL wildcards in value must not act as wildcards."""
repo.record(_entry(actor="alice"))
repo.record(_entry(actor="alice_admin"))
results = repo.query(ActivityLogFilters(actor="alice"), limit=10)
assert (
len(results) == 1
), "actor filter is exact-match; 'alice' should not match 'alice_admin'"
assert results[0].actor == "alice"
def test_entity_id_exact_match_not_like(self, repo: ActivityLogRepository) -> None:
"""entity_id filter is exact match — prefix should not leak."""
repo.record(_entry(entity_id="ot_abc"))
repo.record(_entry(entity_id="ot_abc_extra"))
results = repo.query(ActivityLogFilters(entity_id="ot_abc"), limit=10)
assert len(results) == 1
# ---------------------------------------------------------------------------
# 2. Keyset pagination edge cases
# ---------------------------------------------------------------------------
class TestKeysetPaginationEdges:
def test_empty_table_returns_empty_list(self, repo: ActivityLogRepository) -> None:
"""Query on empty table must return [] not raise."""
results = repo.query(ActivityLogFilters(), limit=10)
assert results == []
def test_before_seq_none_is_first_page(self, repo: ActivityLogRepository) -> None:
"""before_seq=None must return the newest (first) page."""
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
for i in range(5):
repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}"))
page = repo.query(ActivityLogFilters(), before_seq=None, limit=3)
assert len(page) == 3
# Page should contain the 3 newest entries
messages = {e.message for e in page}
assert "e4" in messages
assert "e3" in messages
assert "e2" in messages
def test_before_seq_smaller_than_all_rows_returns_empty(
self, repo: ActivityLogRepository
) -> None:
"""before_seq=1 (smaller than all autoincrement seqs) returns empty page."""
for i in range(5):
repo.record(_entry(message=f"e{i}"))
# seq starts at 1, so before_seq=1 means seq < 1 — no rows
results = repo.query(ActivityLogFilters(), before_seq=1, limit=10)
assert (
results == []
), "before_seq=1 should yield empty page since autoincrement starts at 1 (seq<1 = nothing)"
def test_before_seq_larger_than_max_returns_full_first_page(
self, repo: ActivityLogRepository
) -> None:
"""before_seq larger than any seq in the table behaves like before_seq=None."""
for i in range(5):
repo.record(_entry(message=f"e{i}"))
page_none = repo.query(ActivityLogFilters(), before_seq=None, limit=5)
page_large = repo.query(ActivityLogFilters(), before_seq=999_999, limit=5)
ids_none = {e.id for e in page_none}
ids_large = {e.id for e in page_large}
assert ids_none == ids_large
def test_page_boundary_limit_equals_row_count(self, repo: ActivityLogRepository) -> None:
"""When limit == total rows, one page covers all rows and a second page is empty."""
for i in range(5):
repo.record(_entry(message=f"e{i}"))
page1 = repo.query(ActivityLogFilters(), limit=5)
assert len(page1) == 5
first_seq = _get_seq(repo, page1[0].id)
page2 = repo.query(ActivityLogFilters(), before_seq=first_seq, limit=5)
assert page2 == []
def test_ordering_contract_page_zero_is_smallest_seq(self, repo: ActivityLogRepository) -> None:
"""Within a page, page[0] must have the smallest seq (ascending chrono order).
The acceptance criteria state: 'The smallest seq on a page is page[0]'s seq —
pass that as before_seq for the next page.'"""
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
for i in range(6):
repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}"))
page = repo.query(ActivityLogFilters(), limit=6)
seqs = [_get_seq(repo, e.id) for e in page]
assert seqs == sorted(
seqs
), "page must be in ascending seq order (page[0] is oldest/smallest seq)"
def test_no_duplicates_across_full_walk(self, repo: ActivityLogRepository) -> None:
"""Walking the entire table page by page yields each row exactly once."""
total = 11
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
for i in range(total):
repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}"))
all_ids: list[str] = []
before_seq: int | None = None
limit = 4
while True:
page = repo.query(ActivityLogFilters(), before_seq=before_seq, limit=limit)
if not page:
break
all_ids.extend(e.id for e in page)
before_seq = _get_seq(repo, page[0].id)
assert len(all_ids) == total, "Total rows from all pages must equal inserted count"
assert len(set(all_ids)) == total, "No duplicate IDs across pages"
def test_no_gaps_across_full_walk(self, repo: ActivityLogRepository) -> None:
"""Walking the entire table page by page with limit=1 yields every row."""
total = 7
for i in range(total):
repo.record(_entry(message=f"e{i}"))
all_ids: list[str] = []
before_seq: int | None = None
while True:
page = repo.query(ActivityLogFilters(), before_seq=before_seq, limit=1)
if not page:
break
all_ids.append(page[0].id)
before_seq = _get_seq(repo, page[0].id)
assert len(all_ids) == total
def test_many_rows_same_ts_no_duplicates_or_gaps(self, repo: ActivityLogRepository) -> None:
"""With many identical timestamps, pagination via seq prevents any dup or gap."""
same_ts = datetime(2026, 5, 1, 10, 0, 0, tzinfo=timezone.utc)
count = 9
for i in range(count):
repo.record(_entry(ts=same_ts, message=f"same-ts {i}"))
all_ids: list[str] = []
before_seq: int | None = None
limit = 4
while True:
page = repo.query(ActivityLogFilters(), before_seq=before_seq, limit=limit)
if not page:
break
all_ids.extend(e.id for e in page)
before_seq = _get_seq(repo, page[0].id)
assert len(all_ids) == count
assert len(set(all_ids)) == count, "Duplicates found in same-ts pagination walk"
def test_next_page_cursor_is_page_zero_seq(self, repo: ActivityLogRepository) -> None:
"""The documented contract: pass page[0].seq as before_seq for next page.
Verify the next page does NOT overlap with the current page."""
for i in range(6):
repo.record(_entry(message=f"e{i}"))
page1 = repo.query(ActivityLogFilters(), limit=3)
cursor = _get_seq(repo, page1[0].id) # page[0] = smallest seq on page
page2 = repo.query(ActivityLogFilters(), before_seq=cursor, limit=3)
ids1 = {e.id for e in page1}
ids2 = {e.id for e in page2}
assert ids1.isdisjoint(ids2), "Pages overlap — cursor contract broken"
# ---------------------------------------------------------------------------
# 3. Prune edge cases
# ---------------------------------------------------------------------------
class TestPruneEdgeCases:
def test_prune_before_ts_only_no_max_entries(self, repo: ActivityLogRepository) -> None:
"""before_ts alone removes only old rows; recent rows untouched."""
cutoff = datetime(2026, 3, 1, tzinfo=timezone.utc)
repo.record(_entry(ts=cutoff - timedelta(days=2), message="old"))
repo.record(_entry(ts=cutoff + timedelta(days=1), message="new"))
deleted = repo.prune(before_ts=cutoff)
assert deleted == 1
remaining = repo.query(ActivityLogFilters(), limit=10)
assert len(remaining) == 1
assert remaining[0].message == "new"
def test_prune_max_entries_only_no_before_ts(self, repo: ActivityLogRepository) -> None:
"""max_entries alone trims to N newest; no age filter applied."""
for i in range(6):
repo.record(_entry(message=f"e{i}"))
deleted = repo.prune(max_entries=2)
assert deleted == 4
assert repo.count() == 2
def test_prune_max_entries_zero_deletes_all(self, repo: ActivityLogRepository) -> None:
"""max_entries=0 means keep nothing — all rows deleted."""
for i in range(5):
repo.record(_entry())
deleted = repo.prune(max_entries=0)
assert deleted == 5
assert repo.count() == 0
def test_prune_max_entries_larger_than_count_is_noop(self, repo: ActivityLogRepository) -> None:
"""max_entries > actual count must not delete anything."""
for i in range(3):
repo.record(_entry(message=f"e{i}"))
deleted = repo.prune(max_entries=100)
assert deleted == 0
assert repo.count() == 3
def test_prune_keeps_newest_entries_by_seq(self, repo: ActivityLogRepository) -> None:
"""max_entries prune MUST keep the rows with the HIGHEST seq values."""
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
all_ids = []
for i in range(6):
e = _entry(ts=base + timedelta(seconds=i), message=f"e{i}")
all_ids.append(e.id)
repo.record(e)
# keep only 2
repo.prune(max_entries=2)
remaining = repo.query(ActivityLogFilters(), limit=10)
remaining_ids = {r.id for r in remaining}
# Must keep the last two inserted (highest seq = newest)
assert all_ids[-1] in remaining_ids, "Newest entry (e5) must be kept"
assert all_ids[-2] in remaining_ids, "Second newest entry (e4) must be kept"
# Oldest must be gone
assert all_ids[0] not in remaining_ids, "Oldest entry (e0) must be pruned"
def test_prune_both_returns_sum_of_deleted(self, repo: ActivityLogRepository) -> None:
"""prune(before_ts, max_entries) returns the TOTAL rows deleted by both steps."""
base = datetime(2026, 4, 1, tzinfo=timezone.utc)
# 4 old entries (before base)
for i in range(4):
repo.record(_entry(ts=base - timedelta(hours=i + 1), message=f"old{i}"))
# 4 new entries (after base)
for i in range(4):
repo.record(_entry(ts=base + timedelta(hours=i + 1), message=f"new{i}"))
# prune old, then keep only 2 new
deleted = repo.prune(before_ts=base, max_entries=2)
# 4 old + 2 of the 4 new = 6 total
assert deleted == 6
assert repo.count() == 2
def test_prune_no_args_is_noop(self, repo: ActivityLogRepository) -> None:
"""prune() with no args should delete 0 rows."""
for i in range(3):
repo.record(_entry())
deleted = repo.prune()
assert deleted == 0
assert repo.count() == 3
def test_prune_before_ts_boundary_is_exclusive(self, repo: ActivityLogRepository) -> None:
"""prune(before_ts=X) uses strict < X; a row exactly at X must survive."""
ts = datetime(2026, 5, 1, 12, 0, 0, tzinfo=timezone.utc)
repo.record(_entry(ts=ts - timedelta(seconds=1), message="before"))
repo.record(_entry(ts=ts, message="exact boundary"))
repo.record(_entry(ts=ts + timedelta(seconds=1), message="after"))
deleted = repo.prune(before_ts=ts)
assert deleted == 1 # only "before" deleted
remaining = {r.message for r in repo.query(ActivityLogFilters(), limit=10)}
assert "exact boundary" in remaining
assert "before" not in remaining
# ---------------------------------------------------------------------------
# 4. Filter combination edge cases
# ---------------------------------------------------------------------------
class TestFilterCombinationEdges:
def test_multiple_filters_are_anded(self, repo: ActivityLogRepository) -> None:
"""All non-None filters must be AND-ed together, not OR-ed."""
repo.record(
_entry(actor="alice", category=ActivityCategory.AUTH, severity=ActivitySeverity.ERROR)
)
repo.record(
_entry(actor="alice", category=ActivityCategory.DEVICE, severity=ActivitySeverity.INFO)
)
repo.record(
_entry(actor="bob", category=ActivityCategory.AUTH, severity=ActivitySeverity.ERROR)
)
results = repo.query(
ActivityLogFilters(
actor="alice",
categories=[ActivityCategory.AUTH],
severities=[ActivitySeverity.ERROR],
),
limit=10,
)
assert len(results) == 1
r = results[0]
assert r.actor == "alice"
assert r.category == ActivityCategory.AUTH
assert r.severity == ActivitySeverity.ERROR
def test_empty_categories_sequence_means_no_restriction(
self, repo: ActivityLogRepository
) -> None:
"""An empty list for categories must behave the same as None (no restriction).
The acceptance criteria state empty sequence == None for this dimension."""
repo.record(_entry(category=ActivityCategory.AUTH))
repo.record(_entry(category=ActivityCategory.DEVICE))
# empty list
results_empty = repo.query(ActivityLogFilters(categories=[]), limit=10)
# None
results_none = repo.query(ActivityLogFilters(categories=None), limit=10)
assert len(results_empty) == len(results_none), (
"categories=[] and categories=None should behave identically (no restriction); "
f"got {len(results_empty)} vs {len(results_none)}"
)
def test_empty_severities_sequence_means_no_restriction(
self, repo: ActivityLogRepository
) -> None:
"""An empty list for severities must behave the same as None (no restriction)."""
repo.record(_entry(severity=ActivitySeverity.INFO))
repo.record(_entry(severity=ActivitySeverity.ERROR))
results_empty = repo.query(ActivityLogFilters(severities=[]), limit=10)
results_none = repo.query(ActivityLogFilters(severities=None), limit=10)
assert len(results_empty) == len(
results_none
), "severities=[] and severities=None should behave identically"
def test_since_is_inclusive(self, repo: ActivityLogRepository) -> None:
"""since is an INCLUSIVE lower bound: ts >= since."""
ts = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc)
repo.record(_entry(ts=ts - timedelta(seconds=1), message="before"))
repo.record(_entry(ts=ts, message="at boundary"))
repo.record(_entry(ts=ts + timedelta(seconds=1), message="after"))
results = repo.query(ActivityLogFilters(since=ts), limit=10)
messages = {r.message for r in results}
assert "at boundary" in messages, "since boundary row (ts == since) must be included"
assert "before" not in messages
def test_until_is_inclusive(self, repo: ActivityLogRepository) -> None:
"""until is an INCLUSIVE upper bound: ts <= until."""
ts = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc)
repo.record(_entry(ts=ts - timedelta(seconds=1), message="before"))
repo.record(_entry(ts=ts, message="at boundary"))
repo.record(_entry(ts=ts + timedelta(seconds=1), message="after"))
results = repo.query(ActivityLogFilters(until=ts), limit=10)
messages = {r.message for r in results}
assert "at boundary" in messages, "until boundary row (ts == until) must be included"
assert "after" not in messages
def test_since_and_until_define_closed_range(self, repo: ActivityLogRepository) -> None:
"""Combining since + until must keep rows in [since, until] inclusive."""
base = datetime(2026, 6, 1, 12, 0, 0, tzinfo=timezone.utc)
repo.record(_entry(ts=base - timedelta(hours=1), message="out_before"))
repo.record(_entry(ts=base, message="in_start"))
repo.record(_entry(ts=base + timedelta(hours=1), message="in_middle"))
repo.record(_entry(ts=base + timedelta(hours=2), message="in_end"))
repo.record(_entry(ts=base + timedelta(hours=3), message="out_after"))
results = repo.query(
ActivityLogFilters(since=base, until=base + timedelta(hours=2)),
limit=10,
)
messages = {r.message for r in results}
assert {"in_start", "in_middle", "in_end"} == messages
def test_tz_aware_datetime_round_trip(self, repo: ActivityLogRepository) -> None:
"""UTC-aware datetimes must survive storage and come back tz-aware."""
ts = datetime(2026, 1, 15, 8, 30, 0, tzinfo=timezone.utc)
e = _entry(ts=ts)
repo.record(e)
got = repo.query(ActivityLogFilters(), limit=1)[0]
assert got.ts.tzinfo is not None, "Returned ts must be tz-aware"
assert got.ts.utcoffset().total_seconds() == 0, "Returned ts must be UTC" # type: ignore[union-attr]
assert got.ts == ts
def test_count_none_equals_count_empty_filters(self, repo: ActivityLogRepository) -> None:
"""count(None) == count(ActivityLogFilters()) per acceptance criteria."""
for i in range(4):
repo.record(_entry())
assert repo.count(None) == repo.count(ActivityLogFilters())
# ---------------------------------------------------------------------------
# 5. Codec / data integrity
# ---------------------------------------------------------------------------
class TestCodecDataIntegrity:
def test_metadata_nested_dict_round_trip(self, repo: ActivityLogRepository) -> None:
"""Deeply nested metadata survives JSON round-trip."""
meta = {
"level1": {
"level2": {"level3": [1, 2, 3]},
"list": [{"a": True}, {"b": None}],
},
"count": 42,
}
e = _entry(metadata=meta)
repo.record(e)
got = repo.query(ActivityLogFilters(), limit=1)[0]
assert got.metadata == meta
def test_metadata_unicode_round_trip(self, repo: ActivityLogRepository) -> None:
"""Unicode (including emoji and CJK) in metadata survives storage."""
meta = {"label": "こんにちは", "emoji": "🎉", "arrow": ""}
e = _entry(metadata=meta)
repo.record(e)
got = repo.query(ActivityLogFilters(), limit=1)[0]
assert got.metadata == meta
def test_metadata_empty_dict_round_trip(self, repo: ActivityLogRepository) -> None:
"""An empty {} metadata must come back as {} not None."""
e = _entry(metadata={})
repo.record(e)
got = repo.query(ActivityLogFilters(), limit=1)[0]
assert got.metadata == {}
assert isinstance(got.metadata, dict)
def test_metadata_json_special_chars(self, repo: ActivityLogRepository) -> None:
"""Metadata with JSON-special characters (backslash, quotes) round-trips correctly."""
meta = {"path": "C:\\Users\\test", "quoted": '"hello"', "newline": "line1\nline2"}
e = _entry(metadata=meta)
repo.record(e)
got = repo.query(ActivityLogFilters(), limit=1)[0]
assert got.metadata == meta
def test_entity_type_none_vs_empty_string(self, repo: ActivityLogRepository) -> None:
"""None entity_type must come back as None (not empty string '').
These are semantically different — None means 'not applicable'."""
e = _entry(entity_type=None, entity_id=None, entity_name=None)
repo.record(e)
got = repo.query(ActivityLogFilters(), limit=1)[0]
# Must be exactly None, not ""
assert got.entity_type is None
assert got.entity_id is None
assert got.entity_name is None
def test_ts_microsecond_precision_preserved(self, repo: ActivityLogRepository) -> None:
"""Microsecond component of ts must survive the isoformat() round-trip."""
ts = datetime(2026, 6, 9, 12, 34, 56, 789012, tzinfo=timezone.utc)
e = _entry(ts=ts)
repo.record(e)
got = repo.query(ActivityLogFilters(), limit=1)[0]
assert got.ts == ts, f"Expected {ts!r}, got {got.ts!r} — microsecond precision may be lost"
def test_to_row_does_not_include_seq(self) -> None:
"""to_row() must NOT include 'seq' (it's DB-assigned)."""
e = _entry()
row = e.to_row()
assert "seq" not in row, "to_row() must not include seq — it is DB-assigned"
def test_to_row_has_exactly_11_keys(self) -> None:
"""Acceptance criteria: to_row() returns 11 keys."""
e = _entry()
row = e.to_row()
expected_keys = {
"id",
"ts",
"category",
"action",
"severity",
"actor",
"entity_type",
"entity_id",
"entity_name",
"message",
"metadata",
}
assert set(row.keys()) == expected_keys
def test_from_row_ignores_seq_column(self) -> None:
"""from_row() must not raise or fail when 'seq' is present in the dict."""
e = _entry()
row = e.to_row()
row["seq"] = 42 # inject seq as if from DB
recovered = ActivityLogEntry.from_row(row)
assert recovered.id == e.id
def test_from_row_naive_ts_becomes_utc_aware(self) -> None:
"""If a stored ts has no timezone offset (legacy row), from_row must attach UTC."""
e = _entry()
row = e.to_row()
# Strip timezone from the isoformat string to simulate a legacy row
row["ts"] = datetime(2026, 1, 1, 10, 0, 0).isoformat() # naive
recovered = ActivityLogEntry.from_row(row)
assert recovered.ts.tzinfo is not None, "Legacy naive ts must become tz-aware (UTC)"
def test_metadata_with_numeric_keys_round_trip(self, repo: ActivityLogRepository) -> None:
"""JSON only supports string keys; numeric keys are coerced to strings."""
# This tests that the codec doesn't silently crash on non-string keys
# (Python allows them but JSON does not — json.dumps coerces to string)
meta = {1: "one", "two": 2}
e = _entry(metadata=meta) # type: ignore[arg-type]
repo.record(e)
got = repo.query(ActivityLogFilters(), limit=1)[0]
# json.dumps coerces int key 1 → "1"
assert "1" in got.metadata or 1 in got.metadata
def test_all_category_values_round_trip(self, repo: ActivityLogRepository) -> None:
"""Every ActivityCategory constant must survive storage without corruption."""
for cat in ActivityCategory.ALL:
repo.record(_entry(category=cat, message=f"cat:{cat}"))
for cat in ActivityCategory.ALL:
results = repo.query(ActivityLogFilters(categories=[cat]), limit=10)
assert len(results) == 1
assert results[0].category == cat
def test_all_severity_values_round_trip(self, repo: ActivityLogRepository) -> None:
"""Every ActivitySeverity constant must survive storage without corruption."""
for sev in ActivitySeverity.ALL:
repo.record(_entry(severity=sev, message=f"sev:{sev}"))
for sev in ActivitySeverity.ALL:
results = repo.query(ActivityLogFilters(severities=[sev]), limit=10)
assert len(results) == 1
assert results[0].severity == sev
# ---------------------------------------------------------------------------
# 6. Migration idempotency (additional structural checks)
# ---------------------------------------------------------------------------
class TestMigrationIdempotencyExtended:
def test_table_has_autoincrement_seq(self, tmp_db: Database) -> None:
"""The seq column must be INTEGER PRIMARY KEY AUTOINCREMENT — never reuse deleted seqs."""
repo = ActivityLogRepository(tmp_db)
e1 = _entry(message="first")
e2 = _entry(message="second")
repo.record(e1)
repo.record(e2)
seq1 = _get_seq(repo, e1.id)
seq2 = _get_seq(repo, e2.id)
assert seq2 > seq1, "AUTOINCREMENT must produce monotonically increasing seqs"
# After clear, a new record must get a seq higher than the previous max
repo.clear()
e3 = _entry(message="third")
repo.record(e3)
seq3 = _get_seq(repo, e3.id)
assert seq3 > seq2, "AUTOINCREMENT must not reuse seqs after DELETE"
def test_all_expected_indexes_present(self, tmp_db: Database) -> None:
"""All 5 indexes declared in the acceptance criteria must exist."""
ActivityLogRepository(tmp_db)
cursor = tmp_db.execute(
"SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='activity_log'"
)
index_names = {row["name"] for row in cursor.fetchall()}
required = {
"idx_activity_log_ts_seq",
"idx_activity_log_category",
"idx_activity_log_severity",
"idx_activity_log_actor",
"idx_activity_log_entity",
}
missing = required - index_names
assert not missing, f"Missing indexes: {missing}"
def test_id_column_has_unique_constraint(self, tmp_db: Database) -> None:
"""Inserting duplicate id must raise IntegrityError."""
import sqlite3 as sqlite_module
repo = ActivityLogRepository(tmp_db)
fixed_id = f"al_{uuid.uuid4().hex[:8]}"
repo.record(_entry(id=fixed_id, message="first"))
with pytest.raises((Exception, sqlite_module.IntegrityError)):
repo.record(_entry(id=fixed_id, message="duplicate id"))
def test_migration_name_is_002_add_activity_log(self, tmp_db: Database) -> None:
"""The migration name must exactly match '002_add_activity_log'."""
from ledgrab.storage.data_migrations import AddActivityLogTableMigration
migration = AddActivityLogTableMigration()
assert migration.name == "002_add_activity_log"
def test_migration_is_second_in_all_migrations(self) -> None:
"""AddActivityLogTableMigration must be at index [1] in ALL_MIGRATIONS."""
from ledgrab.storage.data_migrations import (
ALL_MIGRATIONS,
AddActivityLogTableMigration,
)
assert len(ALL_MIGRATIONS) >= 2, "ALL_MIGRATIONS must have at least 2 entries"
assert isinstance(
ALL_MIGRATIONS[1], AddActivityLogTableMigration
), "AddActivityLogTableMigration must be the second migration (index 1)"
def test_apply_twice_is_noop_no_error(self, tmp_db: Database) -> None:
"""Calling apply() on the connection twice must not raise — IF NOT EXISTS ensures this."""
from ledgrab.storage.data_migrations import AddActivityLogTableMigration
migration = AddActivityLogTableMigration()
with tmp_db.transaction() as conn:
migration.apply(conn)
# Second apply — must not raise
with tmp_db.transaction() as conn:
migration.apply(conn)
# Table should still be accessible
cursor = tmp_db.execute("SELECT COUNT(*) AS cnt FROM activity_log")
assert cursor.fetchone()["cnt"] == 0
# ---------------------------------------------------------------------------
# 7. iter_export vs query consistency
# ---------------------------------------------------------------------------
class TestIterExportConsistency:
def test_iter_export_empty_table_yields_nothing(self, repo: ActivityLogRepository) -> None:
"""iter_export on empty table must yield nothing, not raise."""
exported = list(repo.iter_export())
assert exported == []
def test_iter_export_matches_query_results(self, repo: ActivityLogRepository) -> None:
"""iter_export(filters) and query(filters) must return the same entries."""
for i in range(8):
cat = ActivityCategory.AUTH if i % 2 == 0 else ActivityCategory.DEVICE
repo.record(_entry(category=cat, message=f"e{i}"))
filters = ActivityLogFilters(categories=[ActivityCategory.AUTH])
exported_ids = {e.id for e in repo.iter_export(filters)}
queried_ids = {e.id for e in repo.query(filters, limit=100)}
assert (
exported_ids == queried_ids
), "iter_export and query must return the same set of entries for the same filters"
def test_iter_export_none_filter_yields_all(self, repo: ActivityLogRepository) -> None:
"""iter_export(None) must yield all rows (same as query with no filter)."""
for i in range(5):
repo.record(_entry(message=f"e{i}"))
all_exported = list(repo.iter_export(None))
all_queried = repo.query(ActivityLogFilters(), limit=100)
assert len(all_exported) == len(all_queried) == 5
def test_iter_export_ascending_seq_order(self, repo: ActivityLogRepository) -> None:
"""iter_export must yield rows in ascending seq order (oldest first)."""
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
for i in range(5):
repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}"))
exported = list(repo.iter_export())
seqs = [_get_seq(repo, e.id) for e in exported]
assert seqs == sorted(seqs), "iter_export must yield rows in ascending seq order"
def test_iter_export_respects_message_like_filter(self, repo: ActivityLogRepository) -> None:
"""iter_export should honour message_like just as query does."""
repo.record(_entry(message="found: hello world"))
repo.record(_entry(message="nothing relevant here"))
repo.record(_entry(message="also found: hello there"))
exported = list(repo.iter_export(ActivityLogFilters(message_like="found")))
assert len(exported) == 2
assert all("found" in e.message for e in exported)
def test_iter_export_is_lazy_generator(self, repo: ActivityLogRepository) -> None:
"""iter_export must return a generator (lazy), not a list."""
import types
for _ in range(3):
repo.record(_entry())
result = repo.iter_export()
assert isinstance(
result, types.GeneratorType
), "iter_export must return a generator for streaming — not a pre-loaded list"
@@ -0,0 +1,609 @@
"""Unit tests for ActivityLogRepository (Phase 1 — storage layer).
Coverage
--------
* round-trip: record + read back, including metadata JSON and UTC ts
* filter by each dimension: category, severity, actor, entity_type/id, date range, message_like
* keyset pagination stability with same-ts rows (seq tiebreaker)
* prune by age (before_ts) and by max_entries
* clear; count (filtered + unfiltered); export iterator
* migration idempotency: constructing the repo twice does not re-run the migration
"""
from __future__ import annotations
import uuid
from datetime import datetime, timedelta, timezone
import pytest
from ledgrab.storage.activity_log import (
ActivityCategory,
ActivityLogEntry,
ActivityLogFilters,
ActivitySeverity,
)
from ledgrab.storage.activity_log_repository import ActivityLogRepository
from ledgrab.storage.database import Database
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _now() -> datetime:
return datetime.now(timezone.utc)
_SENTINEL = object() # sentinel for "caller did not pass this kwarg"
def _entry(
*,
id: str | None = None,
ts: datetime | None = None,
category: str = ActivityCategory.ENTITY,
action: str = "entity.created",
severity: str = ActivitySeverity.INFO,
actor: str = "test_actor",
entity_type: str | None = "output_target",
entity_id: object = _SENTINEL,
entity_name: str | None = "My Target",
message: str = "Created output target",
metadata: dict | None = None,
) -> ActivityLogEntry:
"""Build a test ``ActivityLogEntry``.
``entity_id`` defaults to a random id when not supplied at all. Pass
``entity_id=None`` explicitly to get ``None`` stored in the entry.
"""
resolved_entity_id: str | None = (
f"ot_{uuid.uuid4().hex[:8]}" if entity_id is _SENTINEL else entity_id # type: ignore[assignment]
)
return ActivityLogEntry(
id=id or f"al_{uuid.uuid4().hex[:8]}",
ts=ts or _now(),
category=category,
action=action,
severity=severity,
actor=actor,
entity_type=entity_type,
entity_id=resolved_entity_id,
entity_name=entity_name,
message=message,
metadata=metadata if metadata is not None else {},
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def repo(tmp_db: Database) -> ActivityLogRepository:
"""Fresh ActivityLogRepository backed by a temp database."""
return ActivityLogRepository(tmp_db)
# ---------------------------------------------------------------------------
# Round-trip
# ---------------------------------------------------------------------------
class TestRoundTrip:
def test_record_and_read_back(self, repo: ActivityLogRepository) -> None:
e = _entry(message="Hello world", metadata={"key": "value", "n": 42})
repo.record(e)
page = repo.query(ActivityLogFilters(), limit=10)
assert len(page) == 1
got = page[0]
assert got.id == e.id
assert got.category == e.category
assert got.action == e.action
assert got.severity == e.severity
assert got.actor == e.actor
assert got.entity_type == e.entity_type
assert got.entity_id == e.entity_id
assert got.entity_name == e.entity_name
assert got.message == e.message
def test_metadata_json_round_trip(self, repo: ActivityLogRepository) -> None:
meta = {"device": "wled_01", "led_count": 150, "nested": {"x": True}}
e = _entry(metadata=meta)
repo.record(e)
got = repo.query(ActivityLogFilters(), limit=1)[0]
assert got.metadata == meta
def test_utc_timestamp_preserved(self, repo: ActivityLogRepository) -> None:
ts = datetime(2026, 1, 15, 12, 30, 45, tzinfo=timezone.utc)
e = _entry(ts=ts)
repo.record(e)
got = repo.query(ActivityLogFilters(), limit=1)[0]
# Should round-trip to the same UTC moment
assert got.ts.replace(tzinfo=timezone.utc) == ts.replace(tzinfo=timezone.utc)
def test_none_optional_fields_preserved(self, repo: ActivityLogRepository) -> None:
e = _entry(entity_type=None, entity_id=None, entity_name=None)
repo.record(e)
got = repo.query(ActivityLogFilters(), limit=1)[0]
assert got.entity_type is None
assert got.entity_id is None
assert got.entity_name is None
def test_empty_metadata_default(self, repo: ActivityLogRepository) -> None:
e = _entry(metadata={})
repo.record(e)
got = repo.query(ActivityLogFilters(), limit=1)[0]
assert got.metadata == {}
# ---------------------------------------------------------------------------
# Filters
# ---------------------------------------------------------------------------
class TestFilters:
def test_filter_by_category(self, repo: ActivityLogRepository) -> None:
repo.record(_entry(category=ActivityCategory.AUTH, action="auth.rejected"))
repo.record(_entry(category=ActivityCategory.DEVICE, action="device.connected"))
repo.record(_entry(category=ActivityCategory.ENTITY, action="entity.deleted"))
results = repo.query(ActivityLogFilters(categories=[ActivityCategory.AUTH]), limit=10)
assert len(results) == 1
assert results[0].category == ActivityCategory.AUTH
def test_filter_by_multiple_categories(self, repo: ActivityLogRepository) -> None:
repo.record(_entry(category=ActivityCategory.AUTH))
repo.record(_entry(category=ActivityCategory.DEVICE))
repo.record(_entry(category=ActivityCategory.SYSTEM))
results = repo.query(
ActivityLogFilters(categories=[ActivityCategory.AUTH, ActivityCategory.DEVICE]),
limit=10,
)
assert len(results) == 2
cats = {r.category for r in results}
assert cats == {ActivityCategory.AUTH, ActivityCategory.DEVICE}
def test_filter_by_severity(self, repo: ActivityLogRepository) -> None:
repo.record(_entry(severity=ActivitySeverity.INFO))
repo.record(_entry(severity=ActivitySeverity.WARNING))
repo.record(_entry(severity=ActivitySeverity.ERROR))
results = repo.query(ActivityLogFilters(severities=[ActivitySeverity.ERROR]), limit=10)
assert len(results) == 1
assert results[0].severity == ActivitySeverity.ERROR
def test_filter_by_multiple_severities(self, repo: ActivityLogRepository) -> None:
repo.record(_entry(severity=ActivitySeverity.INFO))
repo.record(_entry(severity=ActivitySeverity.WARNING))
repo.record(_entry(severity=ActivitySeverity.ERROR))
results = repo.query(
ActivityLogFilters(severities=[ActivitySeverity.WARNING, ActivitySeverity.ERROR]),
limit=10,
)
assert len(results) == 2
def test_filter_by_actor(self, repo: ActivityLogRepository) -> None:
repo.record(_entry(actor="alice"))
repo.record(_entry(actor="bob"))
repo.record(_entry(actor="alice"))
results = repo.query(ActivityLogFilters(actor="alice"), limit=10)
assert len(results) == 2
assert all(r.actor == "alice" for r in results)
def test_filter_by_entity_type(self, repo: ActivityLogRepository) -> None:
repo.record(_entry(entity_type="output_target"))
repo.record(_entry(entity_type="device"))
repo.record(_entry(entity_type="output_target"))
results = repo.query(ActivityLogFilters(entity_type="output_target"), limit=10)
assert len(results) == 2
def test_filter_by_entity_id(self, repo: ActivityLogRepository) -> None:
repo.record(_entry(entity_id="ot_aabbccdd"))
repo.record(_entry(entity_id="ot_11223344"))
repo.record(_entry(entity_id="ot_aabbccdd"))
results = repo.query(ActivityLogFilters(entity_id="ot_aabbccdd"), limit=10)
assert len(results) == 2
def test_filter_by_since(self, repo: ActivityLogRepository) -> None:
base = datetime(2026, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
repo.record(_entry(ts=base - timedelta(hours=2), message="old"))
repo.record(_entry(ts=base, message="boundary"))
repo.record(_entry(ts=base + timedelta(hours=1), message="new"))
results = repo.query(ActivityLogFilters(since=base), limit=10)
assert len(results) == 2
messages = {r.message for r in results}
assert "old" not in messages
def test_filter_by_until(self, repo: ActivityLogRepository) -> None:
base = datetime(2026, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
repo.record(_entry(ts=base - timedelta(hours=1), message="old"))
repo.record(_entry(ts=base, message="boundary"))
repo.record(_entry(ts=base + timedelta(hours=2), message="new"))
results = repo.query(ActivityLogFilters(until=base), limit=10)
assert len(results) == 2
messages = {r.message for r in results}
assert "new" not in messages
def test_filter_message_like_substring(self, repo: ActivityLogRepository) -> None:
repo.record(_entry(message="Created output target MyStrip"))
repo.record(_entry(message="Deleted device sensor-01"))
repo.record(_entry(message="Updated output target MyStrip"))
results = repo.query(ActivityLogFilters(message_like="output target"), limit=10)
assert len(results) == 2
def test_filter_message_like_escapes_percent(self, repo: ActivityLogRepository) -> None:
"""A literal % in message_like should not act as a SQL wildcard."""
repo.record(_entry(message="100% done"))
repo.record(_entry(message="partial done"))
results = repo.query(ActivityLogFilters(message_like="100%"), limit=10)
assert len(results) == 1
assert results[0].message == "100% done"
def test_combined_filters(self, repo: ActivityLogRepository) -> None:
repo.record(
_entry(
actor="alice",
category=ActivityCategory.ENTITY,
severity=ActivitySeverity.INFO,
)
)
repo.record(
_entry(
actor="alice",
category=ActivityCategory.AUTH,
severity=ActivitySeverity.WARNING,
)
)
repo.record(
_entry(
actor="bob",
category=ActivityCategory.ENTITY,
severity=ActivitySeverity.INFO,
)
)
results = repo.query(
ActivityLogFilters(
actor="alice",
categories=[ActivityCategory.ENTITY],
severities=[ActivitySeverity.INFO],
),
limit=10,
)
assert len(results) == 1
assert results[0].actor == "alice"
assert results[0].category == ActivityCategory.ENTITY
# ---------------------------------------------------------------------------
# Keyset pagination
# ---------------------------------------------------------------------------
class TestKeysetPagination:
def test_basic_pagination(self, repo: ActivityLogRepository) -> None:
"""Records returned across two pages cover the full set without overlap."""
for i in range(10):
repo.record(_entry(message=f"entry {i}"))
page1 = repo.query(ActivityLogFilters(), limit=4)
assert len(page1) == 4
# The last entry on page 1 has the smallest seq — use it as the cursor
# We need the seq; query internally reverses, so page1[0] is oldest on page
# and page1[-1] is newest on page. We need the min seq to paginate.
# The repo returns entries in ascending order within a page, so page1[0]
# has the smallest seq on the page.
first_seq = self._get_seq(repo, page1[0].id)
page2 = repo.query(ActivityLogFilters(), before_seq=first_seq, limit=4)
assert len(page2) == 4
ids1 = {e.id for e in page1}
ids2 = {e.id for e in page2}
assert ids1.isdisjoint(ids2), "Pages must not overlap"
page3 = repo.query(
ActivityLogFilters(),
before_seq=self._get_seq(repo, page2[0].id),
limit=4,
)
assert len(page3) == 2 # 10 total; 4 + 4 + 2
def test_same_ts_stability(self, repo: ActivityLogRepository) -> None:
"""Rows with identical ts are ordered by seq, not ts — no duplicates across pages."""
# Insert 6 rows all sharing the exact same timestamp
same_ts = datetime(2026, 3, 10, 15, 0, 0, tzinfo=timezone.utc)
entries = [_entry(ts=same_ts, message=f"same-ts {i}") for i in range(6)]
for e in entries:
repo.record(e)
page1 = repo.query(ActivityLogFilters(), limit=3)
first_seq = self._get_seq(repo, page1[0].id)
page2 = repo.query(ActivityLogFilters(), before_seq=first_seq, limit=3)
ids1 = {e.id for e in page1}
ids2 = {e.id for e in page2}
assert ids1.isdisjoint(ids2), "Same-ts rows leaked across page boundary"
assert ids1 | ids2 == {e.id for e in entries}, "All rows covered exactly once"
def test_empty_page_at_end(self, repo: ActivityLogRepository) -> None:
"""Requesting a page beyond the last entry returns an empty list."""
for i in range(3):
repo.record(_entry(message=f"e{i}"))
page1 = repo.query(ActivityLogFilters(), limit=3)
assert len(page1) == 3
first_seq = self._get_seq(repo, page1[0].id)
page2 = repo.query(ActivityLogFilters(), before_seq=first_seq, limit=3)
assert page2 == []
@staticmethod
def _get_seq(repo: ActivityLogRepository, entry_id: str) -> int:
"""Helper: retrieve the seq for an entry by its application id."""
cursor = repo._db.execute("SELECT seq FROM activity_log WHERE id = ?", (entry_id,))
row = cursor.fetchone()
assert row is not None, f"No row found for id={entry_id!r}"
return int(row["seq"])
# ---------------------------------------------------------------------------
# Prune
# ---------------------------------------------------------------------------
class TestPrune:
def test_prune_by_age(self, repo: ActivityLogRepository) -> None:
base = datetime(2026, 2, 1, 12, 0, 0, tzinfo=timezone.utc)
repo.record(_entry(ts=base - timedelta(days=10), message="old1"))
repo.record(_entry(ts=base - timedelta(days=5), message="old2"))
repo.record(_entry(ts=base, message="boundary"))
repo.record(_entry(ts=base + timedelta(days=1), message="new"))
# Prune everything strictly older than base
deleted = repo.prune(before_ts=base)
assert deleted == 2
remaining = repo.query(ActivityLogFilters(), limit=20)
messages = {r.message for r in remaining}
assert "old1" not in messages
assert "old2" not in messages
assert "boundary" in messages
assert "new" in messages
def test_prune_by_max_entries(self, repo: ActivityLogRepository) -> None:
for i in range(10):
repo.record(_entry(message=f"entry {i}"))
deleted = repo.prune(max_entries=3)
assert deleted == 7
assert repo.count() == 3
def test_prune_keeps_newest_on_max_entries(self, repo: ActivityLogRepository) -> None:
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
ids = []
for i in range(5):
e = _entry(ts=base + timedelta(hours=i), message=f"entry {i}")
ids.append(e.id)
repo.record(e)
# Keep only the 2 newest
repo.prune(max_entries=2)
remaining = repo.query(ActivityLogFilters(), limit=10)
remaining_ids = {r.id for r in remaining}
# The 2 newest are the last 2 inserted (highest seq)
assert ids[3] in remaining_ids
assert ids[4] in remaining_ids
assert ids[0] not in remaining_ids
def test_prune_both_predicates(self, repo: ActivityLogRepository) -> None:
base = datetime(2026, 2, 1, 0, 0, 0, tzinfo=timezone.utc)
# Insert 6 entries: 3 old, 3 recent
for i in range(3):
repo.record(_entry(ts=base - timedelta(days=i + 1), message=f"old{i}"))
for i in range(3):
repo.record(_entry(ts=base + timedelta(hours=i), message=f"new{i}"))
# Prune old entries AND keep at most 2 of the remaining
deleted = repo.prune(before_ts=base, max_entries=2)
# 3 age-pruned + 1 count-pruned = 4
assert deleted == 4
assert repo.count() == 2
def test_prune_max_entries_zero_clears_all(self, repo: ActivityLogRepository) -> None:
for i in range(5):
repo.record(_entry())
repo.prune(max_entries=0)
assert repo.count() == 0
def test_prune_no_op_when_below_max(self, repo: ActivityLogRepository) -> None:
for i in range(3):
repo.record(_entry())
deleted = repo.prune(max_entries=10)
assert deleted == 0
assert repo.count() == 3
# ---------------------------------------------------------------------------
# Clear
# ---------------------------------------------------------------------------
class TestClear:
def test_clear_returns_row_count(self, repo: ActivityLogRepository) -> None:
for _ in range(5):
repo.record(_entry())
deleted = repo.clear()
assert deleted == 5
def test_clear_empties_table(self, repo: ActivityLogRepository) -> None:
for _ in range(3):
repo.record(_entry())
repo.clear()
assert repo.count() == 0
def test_clear_on_empty_table(self, repo: ActivityLogRepository) -> None:
deleted = repo.clear()
assert deleted == 0
# ---------------------------------------------------------------------------
# Count
# ---------------------------------------------------------------------------
class TestCount:
def test_count_all(self, repo: ActivityLogRepository) -> None:
for _ in range(7):
repo.record(_entry())
assert repo.count() == 7
def test_count_filtered(self, repo: ActivityLogRepository) -> None:
repo.record(_entry(category=ActivityCategory.AUTH))
repo.record(_entry(category=ActivityCategory.DEVICE))
repo.record(_entry(category=ActivityCategory.AUTH))
n = repo.count(ActivityLogFilters(categories=[ActivityCategory.AUTH]))
assert n == 2
def test_count_empty(self, repo: ActivityLogRepository) -> None:
assert repo.count() == 0
def test_count_no_match(self, repo: ActivityLogRepository) -> None:
repo.record(_entry(category=ActivityCategory.ENTITY))
n = repo.count(ActivityLogFilters(categories=[ActivityCategory.AUTH]))
assert n == 0
# ---------------------------------------------------------------------------
# Export iterator
# ---------------------------------------------------------------------------
class TestExportIterator:
def test_iter_export_yields_all_rows(self, repo: ActivityLogRepository) -> None:
entries = [_entry(message=f"e{i}") for i in range(5)]
for e in entries:
repo.record(e)
exported = list(repo.iter_export())
assert len(exported) == 5
exported_ids = {e.id for e in exported}
assert exported_ids == {e.id for e in entries}
def test_iter_export_ascending_order(self, repo: ActivityLogRepository) -> None:
base = datetime(2026, 4, 1, tzinfo=timezone.utc)
for i in range(5):
repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}"))
exported = list(repo.iter_export())
seqs = [
repo._db.execute("SELECT seq FROM activity_log WHERE id = ?", (e.id,)).fetchone()["seq"]
for e in exported
]
assert seqs == sorted(seqs), "iter_export must yield rows in ascending seq order"
def test_iter_export_with_filter(self, repo: ActivityLogRepository) -> None:
repo.record(_entry(category=ActivityCategory.AUTH))
repo.record(_entry(category=ActivityCategory.ENTITY))
repo.record(_entry(category=ActivityCategory.AUTH))
exported = list(repo.iter_export(ActivityLogFilters(categories=[ActivityCategory.AUTH])))
assert len(exported) == 2
assert all(e.category == ActivityCategory.AUTH for e in exported)
def test_iter_export_streaming_not_all_in_memory(self, repo: ActivityLogRepository) -> None:
"""Verify iter_export is a generator (lazy), not a pre-loaded list."""
import types
for _ in range(3):
repo.record(_entry())
result = repo.iter_export()
assert isinstance(result, types.GeneratorType)
# ---------------------------------------------------------------------------
# Migration idempotency
# ---------------------------------------------------------------------------
class TestMigrationIdempotency:
def test_construct_repo_twice_is_noop(self, tmp_db: Database) -> None:
"""Creating two repos on the same DB does not re-run the migration."""
repo1 = ActivityLogRepository(tmp_db)
repo1.record(_entry(message="before second construction"))
# Second construction must not raise or re-apply the migration
repo2 = ActivityLogRepository(tmp_db)
assert repo2.count() == 1
# Confirm migration is recorded exactly once
# Count how many times our migration name appears (should be 1)
cursor = tmp_db.execute(
"SELECT COUNT(*) AS cnt FROM data_migrations WHERE name = ?",
("002_add_activity_log",),
)
assert cursor.fetchone()["cnt"] == 1
def test_running_migrations_twice_is_noop(self, tmp_db: Database) -> None:
"""MigrationRunner.run is idempotent for AddActivityLogTableMigration."""
from ledgrab.storage.data_migrations import (
AddActivityLogTableMigration,
MigrationRunner,
)
runner = MigrationRunner(tmp_db)
migration = AddActivityLogTableMigration()
first_run = runner.run([migration])
assert len(first_run) == 1
assert first_run[0].name == "002_add_activity_log"
second_run = runner.run([migration])
assert second_run == [], "Second run must be a no-op"
def test_activity_log_table_exists_after_migration(self, tmp_db: Database) -> None:
"""The activity_log table is present after the migration runs."""
ActivityLogRepository(tmp_db)
cursor = tmp_db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='activity_log'"
)
assert cursor.fetchone() is not None
def test_activity_log_indexes_exist_after_migration(self, tmp_db: Database) -> None:
"""All declared indexes are present after migration."""
ActivityLogRepository(tmp_db)
cursor = tmp_db.execute(
"SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='activity_log'"
)
index_names = {row["name"] for row in cursor.fetchall()}
expected = {
"idx_activity_log_ts_seq",
"idx_activity_log_category",
"idx_activity_log_severity",
"idx_activity_log_actor",
"idx_activity_log_entity",
}
assert expected.issubset(index_names)