Files
ledgrab/plans/activity-log/phase-1-storage.md
T
alexei.dolgolyov 1ac4a0f66d feat(activity-log): phase 1 - storage model, migration, repository
- ActivityLogEntry dataclass + ActivityCategory/ActivitySeverity + ActivityLogFilters
- additive idempotent migration 002_add_activity_log (indexed activity_log table, seq keyset tiebreaker)
- ActivityLogRepository (record/query/count/prune/clear/iter_export), keyset pagination, parameterized SQL
- 102 unit + adversarial tests (SQL-injection, pagination, prune, codec, migration idempotency)
2026-06-09 17:40:37 +03:00

178 lines
9.3 KiB
Markdown

# Phase 1: Storage — model, migration, repository
**Status:** ✅ Done
**Parent plan:** [PLAN.md](./PLAN.md)
**Domain:** data
## Objective
Create the persistent foundation for the audit log: an `ActivityLogEntry` dataclass, an
additive idempotent SQLite migration that creates a dedicated indexed `activity_log` table,
and a purpose-built `ActivityLogRepository` (NOT `BaseSqliteStore`) supporting append,
keyset-paginated filtered query, count, time/count-based prune, and streaming export.
## Tasks
- [x] Create `server/src/ledgrab/storage/activity_log.py`:
- `ActivityCategory` and `ActivitySeverity` string enums (or `Literal` unions used as
constants). Categories: `auth`, `device`, `entity`, `capture`, `system`. Severities:
`info`, `warning`, `error`.
- `@dataclass ActivityLogEntry` with fields: `id: str` (e.g. `al_<uuid8>`), `ts: datetime`
(UTC, server-assigned), `category: str`, `action: str`, `severity: str`, `actor: str`,
`entity_type: str | None`, `entity_id: str | None`, `entity_name: str | None`,
`message: str`, `metadata: dict` (small JSON; default empty). Provide `to_row()` /
`from_row()` (column tuple/dict ↔ dataclass; `metadata` JSON-encoded; `ts` isoformat).
- [x] Add migration to `server/src/ledgrab/storage/data_migrations.py`:
- New `DataMigration` subclass `AddActivityLogTableMigration` with unique `name`
(next sequential id, e.g. `"NNN_add_activity_log"` — match existing naming) and
`apply(conn)` creating `activity_log` with an INTEGER PRIMARY KEY AUTOINCREMENT `seq`
(monotonic keyset tiebreaker) plus columns: `id TEXT UNIQUE NOT NULL`, `ts TEXT NOT NULL`,
`category TEXT NOT NULL`, `action TEXT NOT NULL`, `severity TEXT NOT NULL`,
`actor TEXT NOT NULL`, `entity_type TEXT`, `entity_id TEXT`, `entity_name TEXT`,
`message TEXT NOT NULL`, `metadata TEXT NOT NULL DEFAULT '{}'`.
- Indexes: `(ts DESC, seq DESC)` (primary keyset/sort), `category`, `severity`, `actor`,
`(entity_type, entity_id)`. Use `CREATE TABLE/INDEX IF NOT EXISTS` for idempotency.
- Append the instance to `ALL_MIGRATIONS` (never reorder existing entries).
- [x] Create `server/src/ledgrab/storage/activity_log_repository.py`:
- `class ActivityLogRepository` taking `db: Database` (NOT subclassing `BaseSqliteStore`).
- `record(entry: ActivityLogEntry) -> None`: single parameterized INSERT via
`db.execute(...)` (auto-commit). The `seq` is DB-assigned. **Caller guarantees this runs
on the event-loop thread** (see Phase 2 — cross-thread marshaling lives in the recorder).
- `query(filters: ActivityLogFilters, *, before_seq: int | None, limit: int) -> list[ActivityLogEntry]`:
keyset pagination `WHERE seq < ? ORDER BY seq DESC LIMIT ?` plus optional filters —
`category IN (...)`, `severity IN (...)`, `actor = ?`, `entity_type = ?`, `entity_id = ?`,
`ts >= ?` / `ts <= ?`, `message LIKE ?` (free-text, `%q%`, escaped). All parameterized.
- `count(filters) -> int`.
- `prune(*, before_ts: datetime | None, max_entries: int | None) -> int`: delete rows older
than `before_ts`, and/or trim to the newest `max_entries` by `seq`. Returns rows deleted.
- `clear() -> int`: delete all rows (used by the API clear endpoint; the clear action is
itself audited by the recorder, not here). Returns rows deleted.
- `iter_export(filters) -> Iterator[ActivityLogEntry]`: cursor-based streaming for export
(does not load all rows into memory).
- Define a small `ActivityLogFilters` dataclass (all-optional fields) in the repository or
`activity_log.py` and reuse it across query/count/prune/export.
- [x] Unit tests in `server/tests/storage/test_activity_log_repository.py`:
- insert + read back round-trip (incl. metadata JSON, UTC ts);
- filter by each dimension (category/severity/actor/entity/date/free-text);
- keyset pagination stability across two pages with same-`ts` rows (seq tiebreaker);
- prune by age and by max_entries;
- clear; count; export iterator yields all matching rows;
- migration idempotency (constructing the repo twice / running migrations twice is safe).
## Files to Modify/Create
- `server/src/ledgrab/storage/activity_log.py` — new: dataclass + enums + filters + row codec
- `server/src/ledgrab/storage/data_migrations.py` — modify: add migration + append to `ALL_MIGRATIONS`
- `server/src/ledgrab/storage/activity_log_repository.py` — new: repository
- `server/tests/storage/test_activity_log_repository.py` — new: unit tests
## Acceptance Criteria
- `activity_log` table + indexes created idempotently on startup (running migrations twice is a no-op).
- Query is keyset-paginated and index-backed; a 10k-row table never loads fully into memory.
- Pagination is stable when many rows share the same millisecond `ts` (uses `seq` tiebreaker).
- `prune` removes by age AND by max-entry cap; `clear` empties the table; `export` streams.
- All filters use parameterized SQL (no string interpolation of user input).
- New unit tests pass; `ruff check` clean; existing tests still green.
## Notes
- Reference patterns: `storage/database.py` (`execute`, `transaction`, `get_setting`),
`storage/data_migrations.py` (`DataMigration`, `MigrationRunner`, `ALL_MIGRATIONS`),
`storage/sync_clock.py` (dataclass `to_dict`/`from_dict` style).
- 🔒 **Migration-safety addendum (data domain):** this migration is purely additive (new
table) — no rename, no field/key/file move, no data movement → no data-loss risk. Still
idempotent (`IF NOT EXISTS`). Rollback = drop the table; no user data is transformed.
- Do NOT wire the repository into `main.py` or `dependencies.py` here — that is Phase 2.
- `Database`'s connection is created with the existing threading model; the repository must
not assume it can be called from arbitrary threads. Thread marshaling is Phase 2's job.
## Review Checklist
- [x] All tasks completed
- [x] Code follows project conventions (dataclass codec style, migration naming)
- [x] No unintended side effects (no startup wiring yet)
- [x] Build passes (ruff + pytest)
- [x] Tests pass (new + existing)
## Handoff to Next Phase
### ActivityLogEntry — final field list and dict shape
```python
@dataclass
class ActivityLogEntry:
id: str # "al_<uuid8>" — caller-assigned
ts: datetime # UTC-aware; stored as ISO-8601 string in DB
category: str # ActivityCategory constant
action: str # verb-object label, e.g. "entity.created"
severity: str # ActivitySeverity constant
actor: str # API-key label or "system"
message: str # human-readable description
entity_type: str | None # e.g. "output_target"
entity_id: str | None # stable entity id
entity_name: str | None # name at time of event
metadata: dict # JSON-serialisable; default {}
```
`to_row()` returns a flat dict with 11 keys (same names); `metadata` is JSON string, `ts` is isoformat string. `seq` is NOT in `to_row()` — it is DB-assigned.
### ActivityLogFilters — shape (all fields optional, default None)
```python
@dataclass
class ActivityLogFilters:
categories: Sequence[str] | None # category IN (...)
severities: Sequence[str] | None # severity IN (...)
actor: str | None # exact match
entity_type: str | None # exact match
entity_id: str | None # exact match
since: datetime | None # ts >= since
until: datetime | None # ts <= until
message_like: str | None # LIKE %value% (escaped)
```
### Migration name used
`"002_add_activity_log"` — appended as position [1] in `ALL_MIGRATIONS`.
### ActivityLogRepository — exact method signatures
```python
class ActivityLogRepository:
def __init__(self, db: Database) -> None
def record(self, entry: ActivityLogEntry) -> None
def query(
self,
filters: ActivityLogFilters,
*,
before_seq: int | None = None,
limit: int = 50,
) -> list[ActivityLogEntry]
def count(self, filters: ActivityLogFilters | None = None) -> int
def prune(
self,
*,
before_ts: datetime | None = None,
max_entries: int | None = None,
) -> int
def clear(self) -> int
def iter_export(
self, filters: ActivityLogFilters | None = None
) -> Iterator[ActivityLogEntry]
```
### Key behavioural notes for Phase 2/3/4
- `record()` expects to be called from the event-loop thread (or with `Database` RLock already held). Phase 2 is responsible for thread marshaling via `loop.call_soon_threadsafe`.
- `query()` returns entries in **ascending chronological order within the page** (reversed internally from DESC fetch for display convenience). The smallest `seq` on a page is `page[0]`'s seq — pass that as `before_seq` for the next page.
- `count(None)` == `count(ActivityLogFilters())` — both count all rows.
- `prune(before_ts=X, max_entries=N)` applies both predicates independently (age prune first, then count cap).
- `iter_export` holds `db._lock` for the entire iteration. Phase 4 should stream the response and consume promptly.
- `ActivityLogCategory` and `ActivityLogSeverity` are plain classes with string class-attributes and an `ALL` tuple — NOT `enum.Enum`.
- Imports for Phase 2/3/4:
```python
from ledgrab.storage.activity_log import ActivityLogEntry, ActivityLogFilters, ActivityCategory, ActivitySeverity
from ledgrab.storage.activity_log_repository import ActivityLogRepository
```