feat(activity-log): phase 1 - storage model, migration, repository
- ActivityLogEntry dataclass + ActivityCategory/ActivitySeverity + ActivityLogFilters - additive idempotent migration 002_add_activity_log (indexed activity_log table, seq keyset tiebreaker) - ActivityLogRepository (record/query/count/prune/clear/iter_export), keyset pagination, parameterized SQL - 102 unit + adversarial tests (SQL-injection, pagination, prune, codec, migration idempotency)
This commit is contained in:
@@ -45,8 +45,8 @@ context (survives across phases; graduates to CLAUDE.md only if it's a lasting p
|
||||
|
||||
## Frozen contracts (fill as phases complete)
|
||||
|
||||
- ActivityLogEntry fields / dict shape: _(Phase 1/2 handoff)_
|
||||
- ActivityLogFilters shape: _(Phase 1 handoff)_
|
||||
- ActivityLogEntry fields / dict shape: **frozen** — see phase-1-storage.md Handoff section. 11 fields: `id`, `ts`, `category`, `action`, `severity`, `actor`, `message`, `entity_type`, `entity_id`, `entity_name`, `metadata`. `seq` is DB-only (not on dataclass).
|
||||
- ActivityLogFilters shape: **frozen** — 8 optional fields: `categories`, `severities`, `actor`, `entity_type`, `entity_id`, `since`, `until`, `message_like`. See phase-1-storage.md Handoff.
|
||||
- recorder.record(...) signature + actor ContextVar import path: _(Phase 2 handoff)_
|
||||
- API endpoints + query params + page envelope + settings bounds: _(Phase 4 handoff)_
|
||||
|
||||
@@ -65,4 +65,4 @@ context (survives across phases; graduates to CLAUDE.md only if it's a lasting p
|
||||
|
||||
## Phase progress notes
|
||||
|
||||
_(Orchestrator appends a short note per phase: what landed, commit sha, any warnings.)_
|
||||
Phase 1 landed (2026-06-09): `activity_log.py` (dataclass + enums + filters + codec), `AddActivityLogTableMigration` (`002_add_activity_log`) appended to `ALL_MIGRATIONS`, `ActivityLogRepository` (record/query/count/prune/clear/iter_export), 41 new tests — all green. Full suite 2226 passed, 0 failed. Schema and method signatures frozen in phase-1-storage.md Handoff. Gotcha: `Database.execute` takes a positional tuple — use `?` placeholders (not `:name`), otherwise Python 3.14 will raise `ProgrammingError`.
|
||||
|
||||
@@ -59,7 +59,7 @@ is an on-demand CSV/JSON **export** (no separate backup subsystem).
|
||||
|
||||
## Phases
|
||||
|
||||
- [ ] Phase 1: Storage — model, migration, repository [domain: data] → [subplan](./phase-1-storage.md)
|
||||
- [x] Phase 1: Storage — model, migration, repository [domain: data] → [subplan](./phase-1-storage.md)
|
||||
- [ ] Phase 2: Recorder, actor context, retention, lifecycle [domain: backend] → [subplan](./phase-2-recorder-retention.md)
|
||||
- [ ] Phase 3: Event instrumentation (4 categories) [domain: backend] → [subplan](./phase-3-instrumentation.md)
|
||||
- [ ] Phase 4: REST API — query/filter/export/settings/clear [domain: backend] → [subplan](./phase-4-api.md)
|
||||
@@ -79,7 +79,7 @@ is an on-demand CSV/JSON **export** (no separate backup subsystem).
|
||||
|
||||
| Phase | Domain | Status | Review | Build | Committed |
|
||||
|-------|--------|--------|--------|-------|-----------|
|
||||
| Phase 1: Storage | data | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
|
||||
| Phase 1: Storage | data | ✅ Done | ✅ Passed | ✅ Passed | ✅ |
|
||||
| Phase 2: Recorder/Retention | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
|
||||
| Phase 3: Instrumentation | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
|
||||
| Phase 4: REST API | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Phase 1: Storage — model, migration, repository
|
||||
|
||||
**Status:** ⬜ Not Started
|
||||
**Status:** ✅ Done
|
||||
**Parent plan:** [PLAN.md](./PLAN.md)
|
||||
**Domain:** data
|
||||
|
||||
@@ -13,7 +13,7 @@ keyset-paginated filtered query, count, time/count-based prune, and streaming ex
|
||||
|
||||
## Tasks
|
||||
|
||||
- [ ] Create `server/src/ledgrab/storage/activity_log.py`:
|
||||
- [x] Create `server/src/ledgrab/storage/activity_log.py`:
|
||||
- `ActivityCategory` and `ActivitySeverity` string enums (or `Literal` unions used as
|
||||
constants). Categories: `auth`, `device`, `entity`, `capture`, `system`. Severities:
|
||||
`info`, `warning`, `error`.
|
||||
@@ -22,7 +22,7 @@ keyset-paginated filtered query, count, time/count-based prune, and streaming ex
|
||||
`entity_type: str | None`, `entity_id: str | None`, `entity_name: str | None`,
|
||||
`message: str`, `metadata: dict` (small JSON; default empty). Provide `to_row()` /
|
||||
`from_row()` (column tuple/dict ↔ dataclass; `metadata` JSON-encoded; `ts` isoformat).
|
||||
- [ ] Add migration to `server/src/ledgrab/storage/data_migrations.py`:
|
||||
- [x] Add migration to `server/src/ledgrab/storage/data_migrations.py`:
|
||||
- New `DataMigration` subclass `AddActivityLogTableMigration` with unique `name`
|
||||
(next sequential id, e.g. `"NNN_add_activity_log"` — match existing naming) and
|
||||
`apply(conn)` creating `activity_log` with an INTEGER PRIMARY KEY AUTOINCREMENT `seq`
|
||||
@@ -33,7 +33,7 @@ keyset-paginated filtered query, count, time/count-based prune, and streaming ex
|
||||
- Indexes: `(ts DESC, seq DESC)` (primary keyset/sort), `category`, `severity`, `actor`,
|
||||
`(entity_type, entity_id)`. Use `CREATE TABLE/INDEX IF NOT EXISTS` for idempotency.
|
||||
- Append the instance to `ALL_MIGRATIONS` (never reorder existing entries).
|
||||
- [ ] Create `server/src/ledgrab/storage/activity_log_repository.py`:
|
||||
- [x] Create `server/src/ledgrab/storage/activity_log_repository.py`:
|
||||
- `class ActivityLogRepository` taking `db: Database` (NOT subclassing `BaseSqliteStore`).
|
||||
- `record(entry: ActivityLogEntry) -> None`: single parameterized INSERT via
|
||||
`db.execute(...)` (auto-commit). The `seq` is DB-assigned. **Caller guarantees this runs
|
||||
@@ -51,7 +51,7 @@ keyset-paginated filtered query, count, time/count-based prune, and streaming ex
|
||||
(does not load all rows into memory).
|
||||
- Define a small `ActivityLogFilters` dataclass (all-optional fields) in the repository or
|
||||
`activity_log.py` and reuse it across query/count/prune/export.
|
||||
- [ ] Unit tests in `server/tests/storage/test_activity_log_repository.py`:
|
||||
- [x] Unit tests in `server/tests/storage/test_activity_log_repository.py`:
|
||||
- insert + read back round-trip (incl. metadata JSON, UTC ts);
|
||||
- filter by each dimension (category/severity/actor/entity/date/free-text);
|
||||
- keyset pagination stability across two pages with same-`ts` rows (seq tiebreaker);
|
||||
@@ -89,14 +89,89 @@ keyset-paginated filtered query, count, time/count-based prune, and streaming ex
|
||||
|
||||
## Review Checklist
|
||||
|
||||
- [ ] All tasks completed
|
||||
- [ ] Code follows project conventions (dataclass codec style, migration naming)
|
||||
- [ ] No unintended side effects (no startup wiring yet)
|
||||
- [ ] Build passes (ruff + pytest)
|
||||
- [ ] Tests pass (new + existing)
|
||||
- [x] All tasks completed
|
||||
- [x] Code follows project conventions (dataclass codec style, migration naming)
|
||||
- [x] No unintended side effects (no startup wiring yet)
|
||||
- [x] Build passes (ruff + pytest)
|
||||
- [x] Tests pass (new + existing)
|
||||
|
||||
## Handoff to Next Phase
|
||||
|
||||
<!-- Filled in by the implementer: final ActivityLogEntry field list + the ActivityLogFilters
|
||||
shape (Phase 2/4 depend on the frozen schema), the migration name used, and the exact
|
||||
repository method signatures. -->
|
||||
### ActivityLogEntry — final field list and dict shape
|
||||
|
||||
```python
|
||||
@dataclass
|
||||
class ActivityLogEntry:
|
||||
id: str # "al_<uuid8>" — caller-assigned
|
||||
ts: datetime # UTC-aware; stored as ISO-8601 string in DB
|
||||
category: str # ActivityCategory constant
|
||||
action: str # verb-object label, e.g. "entity.created"
|
||||
severity: str # ActivitySeverity constant
|
||||
actor: str # API-key label or "system"
|
||||
message: str # human-readable description
|
||||
entity_type: str | None # e.g. "output_target"
|
||||
entity_id: str | None # stable entity id
|
||||
entity_name: str | None # name at time of event
|
||||
metadata: dict # JSON-serialisable; default {}
|
||||
```
|
||||
|
||||
`to_row()` returns a flat dict with 11 keys (same names); `metadata` is JSON string, `ts` is isoformat string. `seq` is NOT in `to_row()` — it is DB-assigned.
|
||||
|
||||
### ActivityLogFilters — shape (all fields optional, default None)
|
||||
|
||||
```python
|
||||
@dataclass
|
||||
class ActivityLogFilters:
|
||||
categories: Sequence[str] | None # category IN (...)
|
||||
severities: Sequence[str] | None # severity IN (...)
|
||||
actor: str | None # exact match
|
||||
entity_type: str | None # exact match
|
||||
entity_id: str | None # exact match
|
||||
since: datetime | None # ts >= since
|
||||
until: datetime | None # ts <= until
|
||||
message_like: str | None # LIKE %value% (escaped)
|
||||
```
|
||||
|
||||
### Migration name used
|
||||
|
||||
`"002_add_activity_log"` — appended as position [1] in `ALL_MIGRATIONS`.
|
||||
|
||||
### ActivityLogRepository — exact method signatures
|
||||
|
||||
```python
|
||||
class ActivityLogRepository:
|
||||
def __init__(self, db: Database) -> None
|
||||
def record(self, entry: ActivityLogEntry) -> None
|
||||
def query(
|
||||
self,
|
||||
filters: ActivityLogFilters,
|
||||
*,
|
||||
before_seq: int | None = None,
|
||||
limit: int = 50,
|
||||
) -> list[ActivityLogEntry]
|
||||
def count(self, filters: ActivityLogFilters | None = None) -> int
|
||||
def prune(
|
||||
self,
|
||||
*,
|
||||
before_ts: datetime | None = None,
|
||||
max_entries: int | None = None,
|
||||
) -> int
|
||||
def clear(self) -> int
|
||||
def iter_export(
|
||||
self, filters: ActivityLogFilters | None = None
|
||||
) -> Iterator[ActivityLogEntry]
|
||||
```
|
||||
|
||||
### Key behavioural notes for Phase 2/3/4
|
||||
|
||||
- `record()` expects to be called from the event-loop thread (or with `Database` RLock already held). Phase 2 is responsible for thread marshaling via `loop.call_soon_threadsafe`.
|
||||
- `query()` returns entries in **ascending chronological order within the page** (reversed internally from DESC fetch for display convenience). The smallest `seq` on a page is `page[0]`'s seq — pass that as `before_seq` for the next page.
|
||||
- `count(None)` == `count(ActivityLogFilters())` — both count all rows.
|
||||
- `prune(before_ts=X, max_entries=N)` applies both predicates independently (age prune first, then count cap).
|
||||
- `iter_export` holds `db._lock` for the entire iteration. Phase 4 should stream the response and consume promptly.
|
||||
- `ActivityLogCategory` and `ActivityLogSeverity` are plain classes with string class-attributes and an `ALL` tuple — NOT `enum.Enum`.
|
||||
- Imports for Phase 2/3/4:
|
||||
```python
|
||||
from ledgrab.storage.activity_log import ActivityLogEntry, ActivityLogFilters, ActivityCategory, ActivitySeverity
|
||||
from ledgrab.storage.activity_log_repository import ActivityLogRepository
|
||||
```
|
||||
|
||||
@@ -0,0 +1,171 @@
|
||||
"""Activity / audit log data model.
|
||||
|
||||
Defines the ``ActivityLogEntry`` dataclass together with its category and
|
||||
severity enumerations and the ``ActivityLogFilters`` query object. Row-level
|
||||
codec (``to_row`` / ``from_row``) converts between the dataclass and a flat
|
||||
SQLite column dict.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Sequence
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Enumerations (string constants, not enum.Enum, to stay consistent with the
|
||||
# rest of the codebase which uses plain string literals)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class ActivityCategory:
|
||||
"""Valid ``category`` values for an ``ActivityLogEntry``."""
|
||||
|
||||
AUTH = "auth"
|
||||
DEVICE = "device"
|
||||
ENTITY = "entity"
|
||||
CAPTURE = "capture"
|
||||
SYSTEM = "system"
|
||||
|
||||
ALL: tuple[str, ...] = (AUTH, DEVICE, ENTITY, CAPTURE, SYSTEM)
|
||||
|
||||
|
||||
class ActivitySeverity:
|
||||
"""Valid ``severity`` values for an ``ActivityLogEntry``."""
|
||||
|
||||
INFO = "info"
|
||||
WARNING = "warning"
|
||||
ERROR = "error"
|
||||
|
||||
ALL: tuple[str, ...] = (INFO, WARNING, ERROR)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entry dataclass
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class ActivityLogEntry:
|
||||
"""One immutable audit record.
|
||||
|
||||
Fields
|
||||
------
|
||||
id Stable application-assigned identifier, e.g. ``al_<uuid8>``.
|
||||
ts UTC timestamp of the event (server-assigned at record time).
|
||||
category Broad bucket — one of :class:`ActivityCategory`.
|
||||
action Verb-object label, e.g. ``"entity.created"`` or ``"auth.rejected"``.
|
||||
severity One of :class:`ActivitySeverity`.
|
||||
actor Who triggered the action, e.g. an API-key label or ``"system"``.
|
||||
entity_type Optional: kind of entity involved, e.g. ``"output_target"``.
|
||||
entity_id Optional: stable entity identifier.
|
||||
entity_name Optional: human-readable entity name at the time of the event.
|
||||
message Human-readable description suitable for display.
|
||||
metadata Small structured context (device address, error code, …).
|
||||
Must be JSON-serialisable; defaults to empty dict.
|
||||
"""
|
||||
|
||||
id: str
|
||||
ts: datetime
|
||||
category: str
|
||||
action: str
|
||||
severity: str
|
||||
actor: str
|
||||
message: str
|
||||
entity_type: str | None = None
|
||||
entity_id: str | None = None
|
||||
entity_name: str | None = None
|
||||
metadata: dict = field(default_factory=dict)
|
||||
|
||||
# -- Row codec -----------------------------------------------------------
|
||||
|
||||
def to_row(self) -> dict:
|
||||
"""Return a flat dict suitable for a parameterised SQLite INSERT.
|
||||
|
||||
``ts`` is stored as an ISO-8601 string (UTC). ``metadata`` is stored
|
||||
as a JSON string. ``seq`` is DB-assigned and is NOT included.
|
||||
"""
|
||||
return {
|
||||
"id": self.id,
|
||||
"ts": self.ts.isoformat(),
|
||||
"category": self.category,
|
||||
"action": self.action,
|
||||
"severity": self.severity,
|
||||
"actor": self.actor,
|
||||
"entity_type": self.entity_type,
|
||||
"entity_id": self.entity_id,
|
||||
"entity_name": self.entity_name,
|
||||
"message": self.message,
|
||||
"metadata": json.dumps(self.metadata, ensure_ascii=False),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def from_row(row: dict) -> "ActivityLogEntry":
|
||||
"""Reconstruct an ``ActivityLogEntry`` from a SQLite row dict.
|
||||
|
||||
``row`` may include the ``seq`` column — it is ignored here (callers
|
||||
that need ``seq`` for keyset pagination access it directly from the
|
||||
row before calling this method).
|
||||
"""
|
||||
raw_meta = row.get("metadata") or "{}"
|
||||
try:
|
||||
metadata = json.loads(raw_meta)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
metadata = {}
|
||||
|
||||
raw_ts = row["ts"]
|
||||
ts = datetime.fromisoformat(raw_ts)
|
||||
# Ensure tz-aware UTC even if stored without offset (legacy rows)
|
||||
if ts.tzinfo is None:
|
||||
ts = ts.replace(tzinfo=timezone.utc)
|
||||
|
||||
return ActivityLogEntry(
|
||||
id=row["id"],
|
||||
ts=ts,
|
||||
category=row["category"],
|
||||
action=row["action"],
|
||||
severity=row["severity"],
|
||||
actor=row["actor"],
|
||||
entity_type=row.get("entity_type"),
|
||||
entity_id=row.get("entity_id"),
|
||||
entity_name=row.get("entity_name"),
|
||||
message=row["message"],
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Filters dataclass
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class ActivityLogFilters:
|
||||
"""Optional query-time filters for :class:`ActivityLogRepository`.
|
||||
|
||||
All fields are optional. ``None`` / empty sequence means "no restriction
|
||||
on this dimension".
|
||||
|
||||
Fields
|
||||
------
|
||||
categories Restrict to entries whose ``category`` is in this set.
|
||||
severities Restrict to entries whose ``severity`` is in this set.
|
||||
actor Exact match on the ``actor`` field.
|
||||
entity_type Exact match on the ``entity_type`` field.
|
||||
entity_id Exact match on the ``entity_id`` field.
|
||||
since Inclusive lower bound on ``ts`` (``ts >= since``).
|
||||
until Inclusive upper bound on ``ts`` (``ts <= until``).
|
||||
message_like Free-text substring match on ``message`` (LIKE ``%value%``).
|
||||
The value is escaped — no SQL injection risk.
|
||||
"""
|
||||
|
||||
categories: Sequence[str] | None = None
|
||||
severities: Sequence[str] | None = None
|
||||
actor: str | None = None
|
||||
entity_type: str | None = None
|
||||
entity_id: str | None = None
|
||||
since: datetime | None = None
|
||||
until: datetime | None = None
|
||||
message_like: str | None = None
|
||||
@@ -0,0 +1,293 @@
|
||||
"""Append-only repository for the persistent activity / audit log.
|
||||
|
||||
Design notes
|
||||
------------
|
||||
* Does NOT subclass ``BaseSqliteStore`` — that base loads every row into an
|
||||
in-memory cache at construction time, which is wrong for an unbounded,
|
||||
append-heavy log.
|
||||
* All SQL parameters are passed as positional ``?`` placeholders — no user
|
||||
input is interpolated into the query string.
|
||||
* Keyset pagination is implemented via the ``seq`` INTEGER PRIMARY KEY
|
||||
AUTOINCREMENT column, which is strictly monotonic and survives rows with
|
||||
identical ``ts`` values.
|
||||
* The repository takes a ``Database`` instance and calls ``db.execute`` /
|
||||
``db.transaction`` directly.
|
||||
* Thread safety: ``Database.execute`` holds the ``RLock`` for the duration of
|
||||
each call. The repository itself adds no extra locking. Callers that
|
||||
originate from non-event-loop threads MUST marshal via
|
||||
``loop.call_soon_threadsafe`` (that is Phase 2's responsibility).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
from typing import Iterator
|
||||
|
||||
from ledgrab.storage.activity_log import ActivityLogEntry, ActivityLogFilters
|
||||
from ledgrab.storage.database import Database
|
||||
from ledgrab.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
_TABLE = "activity_log"
|
||||
|
||||
|
||||
def _build_filter_clause(
|
||||
filters: ActivityLogFilters,
|
||||
params: list,
|
||||
*,
|
||||
extra_where: str | None = None,
|
||||
) -> str:
|
||||
"""Return a WHERE clause string and append bound parameters to *params*.
|
||||
|
||||
The caller is responsible for providing the opening ``WHERE`` keyword when
|
||||
the returned string is non-empty, or ``AND`` when appending to an existing
|
||||
predicate. This function returns only the condition fragment(s) joined by
|
||||
``AND``, or an empty string when *filters* has no restrictions.
|
||||
|
||||
``extra_where`` is prepended (with AND) before the filter conditions if
|
||||
provided (used for the ``seq < ?`` keyset predicate).
|
||||
"""
|
||||
conditions: list[str] = []
|
||||
|
||||
if extra_where:
|
||||
conditions.append(extra_where)
|
||||
|
||||
if filters.categories:
|
||||
placeholders = ",".join("?" * len(filters.categories))
|
||||
conditions.append(f"category IN ({placeholders})")
|
||||
params.extend(filters.categories)
|
||||
|
||||
if filters.severities:
|
||||
placeholders = ",".join("?" * len(filters.severities))
|
||||
conditions.append(f"severity IN ({placeholders})")
|
||||
params.extend(filters.severities)
|
||||
|
||||
if filters.actor is not None:
|
||||
conditions.append("actor = ?")
|
||||
params.append(filters.actor)
|
||||
|
||||
if filters.entity_type is not None:
|
||||
conditions.append("entity_type = ?")
|
||||
params.append(filters.entity_type)
|
||||
|
||||
if filters.entity_id is not None:
|
||||
conditions.append("entity_id = ?")
|
||||
params.append(filters.entity_id)
|
||||
|
||||
if filters.since is not None:
|
||||
conditions.append("ts >= ?")
|
||||
params.append(filters.since.isoformat())
|
||||
|
||||
if filters.until is not None:
|
||||
conditions.append("ts <= ?")
|
||||
params.append(filters.until.isoformat())
|
||||
|
||||
if filters.message_like is not None:
|
||||
# Escape LIKE special characters in the user-supplied substring so that
|
||||
# a literal '%' or '_' in the message does not act as a wildcard.
|
||||
escaped = filters.message_like.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
|
||||
conditions.append("message LIKE ? ESCAPE '\\'")
|
||||
params.append(f"%{escaped}%")
|
||||
|
||||
return " AND ".join(conditions)
|
||||
|
||||
|
||||
class ActivityLogRepository:
|
||||
"""Purpose-built repository for the ``activity_log`` table.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
db:
|
||||
The shared ``Database`` singleton. The migration that creates the
|
||||
``activity_log`` table must have been applied before the first call to
|
||||
:meth:`record`. Construction triggers migration via
|
||||
``MigrationRunner`` so users can create the repository directly
|
||||
without a separate startup step.
|
||||
"""
|
||||
|
||||
def __init__(self, db: Database) -> None:
|
||||
self._db = db
|
||||
# Apply pending migrations (idempotent; most runs are no-ops)
|
||||
from ledgrab.storage.data_migrations import ALL_MIGRATIONS, MigrationRunner
|
||||
|
||||
MigrationRunner(db).run(ALL_MIGRATIONS)
|
||||
|
||||
# -- Write ---------------------------------------------------------------
|
||||
|
||||
def record(self, entry: ActivityLogEntry) -> None:
|
||||
"""Append *entry* to the log.
|
||||
|
||||
The ``seq`` column is assigned by SQLite (AUTOINCREMENT) — ``entry``
|
||||
does not carry a ``seq`` field.
|
||||
|
||||
Caller contract: this must be called from the event-loop thread (or
|
||||
from a context already serialised through the ``Database`` RLock).
|
||||
Cross-thread callers must marshal via ``loop.call_soon_threadsafe``;
|
||||
that is Phase 2's responsibility.
|
||||
"""
|
||||
row = entry.to_row()
|
||||
self._db.execute(
|
||||
f"INSERT INTO {_TABLE} "
|
||||
f"(id, ts, category, action, severity, actor, "
|
||||
f" entity_type, entity_id, entity_name, message, metadata) "
|
||||
f"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
row["id"],
|
||||
row["ts"],
|
||||
row["category"],
|
||||
row["action"],
|
||||
row["severity"],
|
||||
row["actor"],
|
||||
row["entity_type"],
|
||||
row["entity_id"],
|
||||
row["entity_name"],
|
||||
row["message"],
|
||||
row["metadata"],
|
||||
),
|
||||
)
|
||||
|
||||
# -- Read ----------------------------------------------------------------
|
||||
|
||||
def query(
|
||||
self,
|
||||
filters: ActivityLogFilters,
|
||||
*,
|
||||
before_seq: int | None = None,
|
||||
limit: int = 50,
|
||||
) -> list[ActivityLogEntry]:
|
||||
"""Return the newest matching entries, oldest-first within the page.
|
||||
|
||||
Keyset pagination: pass the smallest ``seq`` seen on the previous page
|
||||
as *before_seq* to get the next page. Entries are fetched in
|
||||
``seq DESC`` order from SQLite and then reversed before returning so
|
||||
that the caller receives them in chronological order within the page.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
filters:
|
||||
Optional filter criteria.
|
||||
before_seq:
|
||||
Exclusive upper bound on ``seq``. ``None`` returns the first
|
||||
(newest) page.
|
||||
limit:
|
||||
Maximum number of entries to return.
|
||||
"""
|
||||
params: list = []
|
||||
keyset = "seq < ?" if before_seq is not None else None
|
||||
if before_seq is not None:
|
||||
params.append(before_seq)
|
||||
|
||||
where_fragment = _build_filter_clause(filters, params, extra_where=keyset)
|
||||
where_clause = f"WHERE {where_fragment}" if where_fragment else ""
|
||||
params.append(limit)
|
||||
|
||||
sql = (
|
||||
f"SELECT seq, id, ts, category, action, severity, actor, "
|
||||
f"entity_type, entity_id, entity_name, message, metadata "
|
||||
f"FROM {_TABLE} "
|
||||
f"{where_clause} "
|
||||
f"ORDER BY seq DESC "
|
||||
f"LIMIT ?"
|
||||
)
|
||||
|
||||
cursor = self._db.execute(sql, tuple(params))
|
||||
rows = cursor.fetchall()
|
||||
# Reverse to return chronological order within the page
|
||||
return [ActivityLogEntry.from_row(dict(row)) for row in reversed(rows)]
|
||||
|
||||
def count(self, filters: ActivityLogFilters | None = None) -> int:
|
||||
"""Return the number of entries matching *filters* (or all entries)."""
|
||||
if filters is None:
|
||||
filters = ActivityLogFilters()
|
||||
params: list = []
|
||||
where_fragment = _build_filter_clause(filters, params)
|
||||
where_clause = f"WHERE {where_fragment}" if where_fragment else ""
|
||||
|
||||
sql = f"SELECT COUNT(*) AS cnt FROM {_TABLE} {where_clause}"
|
||||
cursor = self._db.execute(sql, tuple(params))
|
||||
row = cursor.fetchone()
|
||||
return int(row["cnt"])
|
||||
|
||||
# -- Maintenance ---------------------------------------------------------
|
||||
|
||||
def prune(
|
||||
self,
|
||||
*,
|
||||
before_ts: datetime | None = None,
|
||||
max_entries: int | None = None,
|
||||
) -> int:
|
||||
"""Delete old / excess entries; return the total rows deleted.
|
||||
|
||||
Both predicates are applied independently:
|
||||
|
||||
1. Delete all rows where ``ts < before_ts`` (age-based pruning).
|
||||
2. Keep only the newest ``max_entries`` rows, deleting the rest
|
||||
(count-based pruning).
|
||||
|
||||
The two operations run in separate statements so that each can be
|
||||
independently enabled or disabled.
|
||||
"""
|
||||
deleted = 0
|
||||
|
||||
if before_ts is not None:
|
||||
cursor = self._db.execute(
|
||||
f"DELETE FROM {_TABLE} WHERE ts < ?",
|
||||
(before_ts.isoformat(),),
|
||||
)
|
||||
deleted += cursor.rowcount
|
||||
|
||||
if max_entries is not None and max_entries >= 0:
|
||||
# Find the seq of the Nth newest row; delete everything older than it.
|
||||
cursor = self._db.execute(
|
||||
f"SELECT seq FROM {_TABLE} ORDER BY seq DESC LIMIT 1 OFFSET ?",
|
||||
(max_entries,),
|
||||
)
|
||||
cutoff_row = cursor.fetchone()
|
||||
if cutoff_row is not None:
|
||||
cutoff_seq = cutoff_row["seq"]
|
||||
cursor = self._db.execute(
|
||||
f"DELETE FROM {_TABLE} WHERE seq <= ?",
|
||||
(cutoff_seq,),
|
||||
)
|
||||
deleted += cursor.rowcount
|
||||
|
||||
return deleted
|
||||
|
||||
def clear(self) -> int:
|
||||
"""Delete all entries; return the number of rows deleted."""
|
||||
cursor = self._db.execute(f"DELETE FROM {_TABLE}")
|
||||
return cursor.rowcount
|
||||
|
||||
# -- Export --------------------------------------------------------------
|
||||
|
||||
def iter_export(self, filters: ActivityLogFilters | None = None) -> Iterator[ActivityLogEntry]:
|
||||
"""Yield all matching entries in ascending ``seq`` order.
|
||||
|
||||
Uses a server-side cursor so the entire result set is never loaded
|
||||
into memory — safe for large tables. The connection's ``RLock`` is
|
||||
held for the duration of the iteration; callers should consume this
|
||||
iterator promptly.
|
||||
"""
|
||||
if filters is None:
|
||||
filters = ActivityLogFilters()
|
||||
|
||||
params: list = []
|
||||
where_fragment = _build_filter_clause(filters, params)
|
||||
where_clause = f"WHERE {where_fragment}" if where_fragment else ""
|
||||
|
||||
sql = (
|
||||
f"SELECT seq, id, ts, category, action, severity, actor, "
|
||||
f"entity_type, entity_id, entity_name, message, metadata "
|
||||
f"FROM {_TABLE} "
|
||||
f"{where_clause} "
|
||||
f"ORDER BY seq ASC"
|
||||
)
|
||||
|
||||
# Use the raw connection directly to get a streaming cursor.
|
||||
# We borrow the lock for the full iteration.
|
||||
with self._db._lock: # noqa: SLF001 — internal access; no public cursor API
|
||||
cursor: sqlite3.Cursor = self._db._conn.execute(sql, tuple(params)) # noqa: SLF001
|
||||
for row in cursor:
|
||||
yield ActivityLogEntry.from_row(dict(row))
|
||||
@@ -213,7 +213,57 @@ class StaticToSingleColorMigration(DataMigration):
|
||||
return rows_changed
|
||||
|
||||
|
||||
class AddActivityLogTableMigration(DataMigration):
|
||||
"""Create the ``activity_log`` table and its indexes.
|
||||
|
||||
This is a purely additive migration — it does not touch any existing table.
|
||||
``CREATE TABLE / INDEX IF NOT EXISTS`` ensures idempotency if the migration
|
||||
somehow runs twice (e.g. after a partial restore).
|
||||
"""
|
||||
|
||||
name = "002_add_activity_log"
|
||||
|
||||
def apply(self, conn: sqlite3.Connection) -> int:
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS activity_log (
|
||||
seq INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
id TEXT UNIQUE NOT NULL,
|
||||
ts TEXT NOT NULL,
|
||||
category TEXT NOT NULL,
|
||||
action TEXT NOT NULL,
|
||||
severity TEXT NOT NULL,
|
||||
actor TEXT NOT NULL,
|
||||
entity_type TEXT,
|
||||
entity_id TEXT,
|
||||
entity_name TEXT,
|
||||
message TEXT NOT NULL,
|
||||
metadata TEXT NOT NULL DEFAULT '{}'
|
||||
)
|
||||
"""
|
||||
)
|
||||
# Primary read path: newest-first keyset pagination
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_activity_log_ts_seq "
|
||||
"ON activity_log (ts DESC, seq DESC)"
|
||||
)
|
||||
# Filter indexes
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_activity_log_category " "ON activity_log (category)"
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_activity_log_severity " "ON activity_log (severity)"
|
||||
)
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_activity_log_actor " "ON activity_log (actor)")
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_activity_log_entity "
|
||||
"ON activity_log (entity_type, entity_id)"
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
# Master list — ORDER MATTERS. Append new migrations; never reorder.
|
||||
ALL_MIGRATIONS: list[DataMigration] = [
|
||||
StaticToSingleColorMigration(),
|
||||
AddActivityLogTableMigration(),
|
||||
]
|
||||
|
||||
@@ -0,0 +1,884 @@
|
||||
"""Adversarial / edge-case tests for ActivityLogRepository (Phase 1 — storage layer).
|
||||
|
||||
These tests are intentionally skeptical — they derive expected behaviour from the
|
||||
acceptance criteria in plans/activity-log/phase-1-storage.md, NOT from what the
|
||||
code happens to do today. If a test fails, it is a real bug.
|
||||
|
||||
Coverage areas
|
||||
--------------
|
||||
1. SQL-injection / parameterization safety (message_like with %, _, ;, --, quotes, etc.)
|
||||
2. Keyset pagination edge cases (empty table, before_seq bounds, stability,
|
||||
ordering contract, no duplicates/gaps)
|
||||
3. Prune edge cases (before_ts only, max_entries only, both,
|
||||
max_entries=0, larger than count, deleted count,
|
||||
keeps NEWEST entries)
|
||||
4. Filter combination edge cases (AND semantics, empty sequence vs None,
|
||||
since/until inclusive bounds, tz-aware datetimes)
|
||||
5. Codec / data integrity (metadata round-trip: nested, unicode, JSON-escape;
|
||||
entity_* None vs empty string; microsecond ts)
|
||||
6. Migration idempotency (table + all indexes present; double-run is no-op)
|
||||
7. iter_export vs query consistency (same filters yield same rows; empty table; filter)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.storage.activity_log import (
|
||||
ActivityCategory,
|
||||
ActivityLogEntry,
|
||||
ActivityLogFilters,
|
||||
ActivitySeverity,
|
||||
)
|
||||
from ledgrab.storage.activity_log_repository import ActivityLogRepository
|
||||
from ledgrab.storage.database import Database
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers (mirror the implementer's helpers so tests are self-contained)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _now() -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
_SENTINEL = object()
|
||||
|
||||
|
||||
def _entry(
|
||||
*,
|
||||
id: str | None = None,
|
||||
ts: datetime | None = None,
|
||||
category: str = ActivityCategory.ENTITY,
|
||||
action: str = "entity.created",
|
||||
severity: str = ActivitySeverity.INFO,
|
||||
actor: str = "test_actor",
|
||||
entity_type: str | None = "output_target",
|
||||
entity_id: object = _SENTINEL,
|
||||
entity_name: str | None = "My Target",
|
||||
message: str = "Created output target",
|
||||
metadata: dict | None = None,
|
||||
) -> ActivityLogEntry:
|
||||
resolved_entity_id: str | None = (
|
||||
f"ot_{uuid.uuid4().hex[:8]}" if entity_id is _SENTINEL else entity_id # type: ignore[assignment]
|
||||
)
|
||||
return ActivityLogEntry(
|
||||
id=id or f"al_{uuid.uuid4().hex[:8]}",
|
||||
ts=ts or _now(),
|
||||
category=category,
|
||||
action=action,
|
||||
severity=severity,
|
||||
actor=actor,
|
||||
entity_type=entity_type,
|
||||
entity_id=resolved_entity_id,
|
||||
entity_name=entity_name,
|
||||
message=message,
|
||||
metadata=metadata if metadata is not None else {},
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def repo(tmp_db: Database) -> ActivityLogRepository:
|
||||
"""Fresh ActivityLogRepository backed by a temp database."""
|
||||
return ActivityLogRepository(tmp_db)
|
||||
|
||||
|
||||
def _get_seq(repo: ActivityLogRepository, entry_id: str) -> int:
|
||||
cursor = repo._db.execute("SELECT seq FROM activity_log WHERE id = ?", (entry_id,))
|
||||
row = cursor.fetchone()
|
||||
assert row is not None, f"No row found for id={entry_id!r}"
|
||||
return int(row["seq"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 1. SQL-injection / parameterization safety
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSQLInjectionSafety:
|
||||
"""All user-supplied filter values must be treated as literal text, not SQL."""
|
||||
|
||||
def test_message_like_percent_is_literal(self, repo: ActivityLogRepository) -> None:
|
||||
"""A literal '%' in message_like must NOT act as a LIKE wildcard."""
|
||||
repo.record(_entry(message="100% done"))
|
||||
repo.record(_entry(message="all done"))
|
||||
repo.record(_entry(message="percent sign here"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(message_like="100%"), limit=10)
|
||||
assert len(results) == 1, "% in message_like should be a literal percent, not a wildcard"
|
||||
assert results[0].message == "100% done"
|
||||
|
||||
def test_message_like_underscore_is_literal(self, repo: ActivityLogRepository) -> None:
|
||||
"""A literal '_' in message_like must NOT act as a single-char wildcard."""
|
||||
repo.record(_entry(message="device_01"))
|
||||
repo.record(_entry(message="device001")) # would match if _ were a wildcard
|
||||
repo.record(_entry(message="some other message"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(message_like="device_01"), limit=10)
|
||||
assert (
|
||||
len(results) == 1
|
||||
), "_ in message_like should be a literal underscore, not a single-char wildcard"
|
||||
assert results[0].message == "device_01"
|
||||
|
||||
def test_message_like_single_quote_does_not_break_query(
|
||||
self, repo: ActivityLogRepository
|
||||
) -> None:
|
||||
"""A single quote in message_like must not cause a SQL syntax error."""
|
||||
repo.record(_entry(message="it's working"))
|
||||
repo.record(_entry(message="no quote here"))
|
||||
|
||||
# Must not raise
|
||||
results = repo.query(ActivityLogFilters(message_like="it's"), limit=10)
|
||||
assert len(results) == 1
|
||||
assert results[0].message == "it's working"
|
||||
|
||||
def test_message_like_semicolon_does_not_execute_second_statement(
|
||||
self, repo: ActivityLogRepository
|
||||
) -> None:
|
||||
"""';' in message_like must not let a second SQL statement execute."""
|
||||
repo.record(_entry(message="a; DROP TABLE activity_log; --"))
|
||||
repo.record(_entry(message="safe message"))
|
||||
|
||||
# If injection succeeded, table would be dropped and next call would error
|
||||
results = repo.query(
|
||||
ActivityLogFilters(message_like="a; DROP TABLE activity_log; --"), limit=10
|
||||
)
|
||||
# Table must still exist
|
||||
assert repo.count() == 2
|
||||
assert len(results) == 1
|
||||
|
||||
def test_message_like_sql_comment_sequence(self, repo: ActivityLogRepository) -> None:
|
||||
"""'--' (SQL comment) in message_like must be treated literally."""
|
||||
repo.record(_entry(message="value -- comment"))
|
||||
repo.record(_entry(message="value no comment"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(message_like="value --"), limit=10)
|
||||
assert len(results) == 1
|
||||
assert results[0].message == "value -- comment"
|
||||
|
||||
def test_message_like_backslash_literal(self, repo: ActivityLogRepository) -> None:
|
||||
"""Backslash in message_like must be treated as a literal character."""
|
||||
repo.record(_entry(message="path\\to\\file"))
|
||||
repo.record(_entry(message="path/to/file"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(message_like="path\\to"), limit=10)
|
||||
assert len(results) == 1
|
||||
assert results[0].message == "path\\to\\file"
|
||||
|
||||
def test_message_like_classic_injection_pattern(self, repo: ActivityLogRepository) -> None:
|
||||
"""Classic ') OR '1'='1 injection attempt must return no false positives."""
|
||||
repo.record(_entry(message="innocent message"))
|
||||
repo.record(_entry(message="another message"))
|
||||
|
||||
# If injection worked, all rows would match
|
||||
results = repo.query(ActivityLogFilters(message_like="') OR '1'='1"), limit=10)
|
||||
assert (
|
||||
len(results) == 0
|
||||
), "Injection payload matched rows it shouldn't — parameterization may be broken"
|
||||
|
||||
def test_message_like_all_wildcards_returns_nothing_for_no_match(
|
||||
self, repo: ActivityLogRepository
|
||||
) -> None:
|
||||
"""'%_%' as a literal search term should return no rows unless that exact
|
||||
substring appears in a message."""
|
||||
repo.record(_entry(message="some message"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(message_like="%_%"), limit=10)
|
||||
# '%_%' as literal text does not appear in "some message"
|
||||
assert (
|
||||
len(results) == 0
|
||||
), "% and _ in message_like were treated as SQL wildcards instead of literals"
|
||||
|
||||
def test_actor_exact_match_not_like(self, repo: ActivityLogRepository) -> None:
|
||||
"""actor filter is exact match — SQL wildcards in value must not act as wildcards."""
|
||||
repo.record(_entry(actor="alice"))
|
||||
repo.record(_entry(actor="alice_admin"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(actor="alice"), limit=10)
|
||||
assert (
|
||||
len(results) == 1
|
||||
), "actor filter is exact-match; 'alice' should not match 'alice_admin'"
|
||||
assert results[0].actor == "alice"
|
||||
|
||||
def test_entity_id_exact_match_not_like(self, repo: ActivityLogRepository) -> None:
|
||||
"""entity_id filter is exact match — prefix should not leak."""
|
||||
repo.record(_entry(entity_id="ot_abc"))
|
||||
repo.record(_entry(entity_id="ot_abc_extra"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(entity_id="ot_abc"), limit=10)
|
||||
assert len(results) == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 2. Keyset pagination edge cases
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestKeysetPaginationEdges:
|
||||
def test_empty_table_returns_empty_list(self, repo: ActivityLogRepository) -> None:
|
||||
"""Query on empty table must return [] not raise."""
|
||||
results = repo.query(ActivityLogFilters(), limit=10)
|
||||
assert results == []
|
||||
|
||||
def test_before_seq_none_is_first_page(self, repo: ActivityLogRepository) -> None:
|
||||
"""before_seq=None must return the newest (first) page."""
|
||||
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
|
||||
for i in range(5):
|
||||
repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}"))
|
||||
|
||||
page = repo.query(ActivityLogFilters(), before_seq=None, limit=3)
|
||||
assert len(page) == 3
|
||||
# Page should contain the 3 newest entries
|
||||
messages = {e.message for e in page}
|
||||
assert "e4" in messages
|
||||
assert "e3" in messages
|
||||
assert "e2" in messages
|
||||
|
||||
def test_before_seq_smaller_than_all_rows_returns_empty(
|
||||
self, repo: ActivityLogRepository
|
||||
) -> None:
|
||||
"""before_seq=1 (smaller than all autoincrement seqs) returns empty page."""
|
||||
for i in range(5):
|
||||
repo.record(_entry(message=f"e{i}"))
|
||||
|
||||
# seq starts at 1, so before_seq=1 means seq < 1 — no rows
|
||||
results = repo.query(ActivityLogFilters(), before_seq=1, limit=10)
|
||||
assert (
|
||||
results == []
|
||||
), "before_seq=1 should yield empty page since autoincrement starts at 1 (seq<1 = nothing)"
|
||||
|
||||
def test_before_seq_larger_than_max_returns_full_first_page(
|
||||
self, repo: ActivityLogRepository
|
||||
) -> None:
|
||||
"""before_seq larger than any seq in the table behaves like before_seq=None."""
|
||||
for i in range(5):
|
||||
repo.record(_entry(message=f"e{i}"))
|
||||
|
||||
page_none = repo.query(ActivityLogFilters(), before_seq=None, limit=5)
|
||||
page_large = repo.query(ActivityLogFilters(), before_seq=999_999, limit=5)
|
||||
|
||||
ids_none = {e.id for e in page_none}
|
||||
ids_large = {e.id for e in page_large}
|
||||
assert ids_none == ids_large
|
||||
|
||||
def test_page_boundary_limit_equals_row_count(self, repo: ActivityLogRepository) -> None:
|
||||
"""When limit == total rows, one page covers all rows and a second page is empty."""
|
||||
for i in range(5):
|
||||
repo.record(_entry(message=f"e{i}"))
|
||||
|
||||
page1 = repo.query(ActivityLogFilters(), limit=5)
|
||||
assert len(page1) == 5
|
||||
|
||||
first_seq = _get_seq(repo, page1[0].id)
|
||||
page2 = repo.query(ActivityLogFilters(), before_seq=first_seq, limit=5)
|
||||
assert page2 == []
|
||||
|
||||
def test_ordering_contract_page_zero_is_smallest_seq(self, repo: ActivityLogRepository) -> None:
|
||||
"""Within a page, page[0] must have the smallest seq (ascending chrono order).
|
||||
The acceptance criteria state: 'The smallest seq on a page is page[0]'s seq —
|
||||
pass that as before_seq for the next page.'"""
|
||||
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
|
||||
for i in range(6):
|
||||
repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}"))
|
||||
|
||||
page = repo.query(ActivityLogFilters(), limit=6)
|
||||
seqs = [_get_seq(repo, e.id) for e in page]
|
||||
assert seqs == sorted(
|
||||
seqs
|
||||
), "page must be in ascending seq order (page[0] is oldest/smallest seq)"
|
||||
|
||||
def test_no_duplicates_across_full_walk(self, repo: ActivityLogRepository) -> None:
|
||||
"""Walking the entire table page by page yields each row exactly once."""
|
||||
total = 11
|
||||
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
|
||||
for i in range(total):
|
||||
repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}"))
|
||||
|
||||
all_ids: list[str] = []
|
||||
before_seq: int | None = None
|
||||
limit = 4
|
||||
|
||||
while True:
|
||||
page = repo.query(ActivityLogFilters(), before_seq=before_seq, limit=limit)
|
||||
if not page:
|
||||
break
|
||||
all_ids.extend(e.id for e in page)
|
||||
before_seq = _get_seq(repo, page[0].id)
|
||||
|
||||
assert len(all_ids) == total, "Total rows from all pages must equal inserted count"
|
||||
assert len(set(all_ids)) == total, "No duplicate IDs across pages"
|
||||
|
||||
def test_no_gaps_across_full_walk(self, repo: ActivityLogRepository) -> None:
|
||||
"""Walking the entire table page by page with limit=1 yields every row."""
|
||||
total = 7
|
||||
for i in range(total):
|
||||
repo.record(_entry(message=f"e{i}"))
|
||||
|
||||
all_ids: list[str] = []
|
||||
before_seq: int | None = None
|
||||
while True:
|
||||
page = repo.query(ActivityLogFilters(), before_seq=before_seq, limit=1)
|
||||
if not page:
|
||||
break
|
||||
all_ids.append(page[0].id)
|
||||
before_seq = _get_seq(repo, page[0].id)
|
||||
|
||||
assert len(all_ids) == total
|
||||
|
||||
def test_many_rows_same_ts_no_duplicates_or_gaps(self, repo: ActivityLogRepository) -> None:
|
||||
"""With many identical timestamps, pagination via seq prevents any dup or gap."""
|
||||
same_ts = datetime(2026, 5, 1, 10, 0, 0, tzinfo=timezone.utc)
|
||||
count = 9
|
||||
for i in range(count):
|
||||
repo.record(_entry(ts=same_ts, message=f"same-ts {i}"))
|
||||
|
||||
all_ids: list[str] = []
|
||||
before_seq: int | None = None
|
||||
limit = 4
|
||||
while True:
|
||||
page = repo.query(ActivityLogFilters(), before_seq=before_seq, limit=limit)
|
||||
if not page:
|
||||
break
|
||||
all_ids.extend(e.id for e in page)
|
||||
before_seq = _get_seq(repo, page[0].id)
|
||||
|
||||
assert len(all_ids) == count
|
||||
assert len(set(all_ids)) == count, "Duplicates found in same-ts pagination walk"
|
||||
|
||||
def test_next_page_cursor_is_page_zero_seq(self, repo: ActivityLogRepository) -> None:
|
||||
"""The documented contract: pass page[0].seq as before_seq for next page.
|
||||
Verify the next page does NOT overlap with the current page."""
|
||||
for i in range(6):
|
||||
repo.record(_entry(message=f"e{i}"))
|
||||
|
||||
page1 = repo.query(ActivityLogFilters(), limit=3)
|
||||
cursor = _get_seq(repo, page1[0].id) # page[0] = smallest seq on page
|
||||
page2 = repo.query(ActivityLogFilters(), before_seq=cursor, limit=3)
|
||||
|
||||
ids1 = {e.id for e in page1}
|
||||
ids2 = {e.id for e in page2}
|
||||
assert ids1.isdisjoint(ids2), "Pages overlap — cursor contract broken"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 3. Prune edge cases
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPruneEdgeCases:
|
||||
def test_prune_before_ts_only_no_max_entries(self, repo: ActivityLogRepository) -> None:
|
||||
"""before_ts alone removes only old rows; recent rows untouched."""
|
||||
cutoff = datetime(2026, 3, 1, tzinfo=timezone.utc)
|
||||
repo.record(_entry(ts=cutoff - timedelta(days=2), message="old"))
|
||||
repo.record(_entry(ts=cutoff + timedelta(days=1), message="new"))
|
||||
|
||||
deleted = repo.prune(before_ts=cutoff)
|
||||
assert deleted == 1
|
||||
remaining = repo.query(ActivityLogFilters(), limit=10)
|
||||
assert len(remaining) == 1
|
||||
assert remaining[0].message == "new"
|
||||
|
||||
def test_prune_max_entries_only_no_before_ts(self, repo: ActivityLogRepository) -> None:
|
||||
"""max_entries alone trims to N newest; no age filter applied."""
|
||||
for i in range(6):
|
||||
repo.record(_entry(message=f"e{i}"))
|
||||
|
||||
deleted = repo.prune(max_entries=2)
|
||||
assert deleted == 4
|
||||
assert repo.count() == 2
|
||||
|
||||
def test_prune_max_entries_zero_deletes_all(self, repo: ActivityLogRepository) -> None:
|
||||
"""max_entries=0 means keep nothing — all rows deleted."""
|
||||
for i in range(5):
|
||||
repo.record(_entry())
|
||||
|
||||
deleted = repo.prune(max_entries=0)
|
||||
assert deleted == 5
|
||||
assert repo.count() == 0
|
||||
|
||||
def test_prune_max_entries_larger_than_count_is_noop(self, repo: ActivityLogRepository) -> None:
|
||||
"""max_entries > actual count must not delete anything."""
|
||||
for i in range(3):
|
||||
repo.record(_entry(message=f"e{i}"))
|
||||
|
||||
deleted = repo.prune(max_entries=100)
|
||||
assert deleted == 0
|
||||
assert repo.count() == 3
|
||||
|
||||
def test_prune_keeps_newest_entries_by_seq(self, repo: ActivityLogRepository) -> None:
|
||||
"""max_entries prune MUST keep the rows with the HIGHEST seq values."""
|
||||
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
|
||||
all_ids = []
|
||||
for i in range(6):
|
||||
e = _entry(ts=base + timedelta(seconds=i), message=f"e{i}")
|
||||
all_ids.append(e.id)
|
||||
repo.record(e)
|
||||
|
||||
# keep only 2
|
||||
repo.prune(max_entries=2)
|
||||
remaining = repo.query(ActivityLogFilters(), limit=10)
|
||||
remaining_ids = {r.id for r in remaining}
|
||||
|
||||
# Must keep the last two inserted (highest seq = newest)
|
||||
assert all_ids[-1] in remaining_ids, "Newest entry (e5) must be kept"
|
||||
assert all_ids[-2] in remaining_ids, "Second newest entry (e4) must be kept"
|
||||
# Oldest must be gone
|
||||
assert all_ids[0] not in remaining_ids, "Oldest entry (e0) must be pruned"
|
||||
|
||||
def test_prune_both_returns_sum_of_deleted(self, repo: ActivityLogRepository) -> None:
|
||||
"""prune(before_ts, max_entries) returns the TOTAL rows deleted by both steps."""
|
||||
base = datetime(2026, 4, 1, tzinfo=timezone.utc)
|
||||
# 4 old entries (before base)
|
||||
for i in range(4):
|
||||
repo.record(_entry(ts=base - timedelta(hours=i + 1), message=f"old{i}"))
|
||||
# 4 new entries (after base)
|
||||
for i in range(4):
|
||||
repo.record(_entry(ts=base + timedelta(hours=i + 1), message=f"new{i}"))
|
||||
|
||||
# prune old, then keep only 2 new
|
||||
deleted = repo.prune(before_ts=base, max_entries=2)
|
||||
# 4 old + 2 of the 4 new = 6 total
|
||||
assert deleted == 6
|
||||
assert repo.count() == 2
|
||||
|
||||
def test_prune_no_args_is_noop(self, repo: ActivityLogRepository) -> None:
|
||||
"""prune() with no args should delete 0 rows."""
|
||||
for i in range(3):
|
||||
repo.record(_entry())
|
||||
|
||||
deleted = repo.prune()
|
||||
assert deleted == 0
|
||||
assert repo.count() == 3
|
||||
|
||||
def test_prune_before_ts_boundary_is_exclusive(self, repo: ActivityLogRepository) -> None:
|
||||
"""prune(before_ts=X) uses strict < X; a row exactly at X must survive."""
|
||||
ts = datetime(2026, 5, 1, 12, 0, 0, tzinfo=timezone.utc)
|
||||
repo.record(_entry(ts=ts - timedelta(seconds=1), message="before"))
|
||||
repo.record(_entry(ts=ts, message="exact boundary"))
|
||||
repo.record(_entry(ts=ts + timedelta(seconds=1), message="after"))
|
||||
|
||||
deleted = repo.prune(before_ts=ts)
|
||||
assert deleted == 1 # only "before" deleted
|
||||
remaining = {r.message for r in repo.query(ActivityLogFilters(), limit=10)}
|
||||
assert "exact boundary" in remaining
|
||||
assert "before" not in remaining
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 4. Filter combination edge cases
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFilterCombinationEdges:
|
||||
def test_multiple_filters_are_anded(self, repo: ActivityLogRepository) -> None:
|
||||
"""All non-None filters must be AND-ed together, not OR-ed."""
|
||||
repo.record(
|
||||
_entry(actor="alice", category=ActivityCategory.AUTH, severity=ActivitySeverity.ERROR)
|
||||
)
|
||||
repo.record(
|
||||
_entry(actor="alice", category=ActivityCategory.DEVICE, severity=ActivitySeverity.INFO)
|
||||
)
|
||||
repo.record(
|
||||
_entry(actor="bob", category=ActivityCategory.AUTH, severity=ActivitySeverity.ERROR)
|
||||
)
|
||||
|
||||
results = repo.query(
|
||||
ActivityLogFilters(
|
||||
actor="alice",
|
||||
categories=[ActivityCategory.AUTH],
|
||||
severities=[ActivitySeverity.ERROR],
|
||||
),
|
||||
limit=10,
|
||||
)
|
||||
assert len(results) == 1
|
||||
r = results[0]
|
||||
assert r.actor == "alice"
|
||||
assert r.category == ActivityCategory.AUTH
|
||||
assert r.severity == ActivitySeverity.ERROR
|
||||
|
||||
def test_empty_categories_sequence_means_no_restriction(
|
||||
self, repo: ActivityLogRepository
|
||||
) -> None:
|
||||
"""An empty list for categories must behave the same as None (no restriction).
|
||||
The acceptance criteria state empty sequence == None for this dimension."""
|
||||
repo.record(_entry(category=ActivityCategory.AUTH))
|
||||
repo.record(_entry(category=ActivityCategory.DEVICE))
|
||||
|
||||
# empty list
|
||||
results_empty = repo.query(ActivityLogFilters(categories=[]), limit=10)
|
||||
# None
|
||||
results_none = repo.query(ActivityLogFilters(categories=None), limit=10)
|
||||
|
||||
assert len(results_empty) == len(results_none), (
|
||||
"categories=[] and categories=None should behave identically (no restriction); "
|
||||
f"got {len(results_empty)} vs {len(results_none)}"
|
||||
)
|
||||
|
||||
def test_empty_severities_sequence_means_no_restriction(
|
||||
self, repo: ActivityLogRepository
|
||||
) -> None:
|
||||
"""An empty list for severities must behave the same as None (no restriction)."""
|
||||
repo.record(_entry(severity=ActivitySeverity.INFO))
|
||||
repo.record(_entry(severity=ActivitySeverity.ERROR))
|
||||
|
||||
results_empty = repo.query(ActivityLogFilters(severities=[]), limit=10)
|
||||
results_none = repo.query(ActivityLogFilters(severities=None), limit=10)
|
||||
|
||||
assert len(results_empty) == len(
|
||||
results_none
|
||||
), "severities=[] and severities=None should behave identically"
|
||||
|
||||
def test_since_is_inclusive(self, repo: ActivityLogRepository) -> None:
|
||||
"""since is an INCLUSIVE lower bound: ts >= since."""
|
||||
ts = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc)
|
||||
repo.record(_entry(ts=ts - timedelta(seconds=1), message="before"))
|
||||
repo.record(_entry(ts=ts, message="at boundary"))
|
||||
repo.record(_entry(ts=ts + timedelta(seconds=1), message="after"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(since=ts), limit=10)
|
||||
messages = {r.message for r in results}
|
||||
assert "at boundary" in messages, "since boundary row (ts == since) must be included"
|
||||
assert "before" not in messages
|
||||
|
||||
def test_until_is_inclusive(self, repo: ActivityLogRepository) -> None:
|
||||
"""until is an INCLUSIVE upper bound: ts <= until."""
|
||||
ts = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc)
|
||||
repo.record(_entry(ts=ts - timedelta(seconds=1), message="before"))
|
||||
repo.record(_entry(ts=ts, message="at boundary"))
|
||||
repo.record(_entry(ts=ts + timedelta(seconds=1), message="after"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(until=ts), limit=10)
|
||||
messages = {r.message for r in results}
|
||||
assert "at boundary" in messages, "until boundary row (ts == until) must be included"
|
||||
assert "after" not in messages
|
||||
|
||||
def test_since_and_until_define_closed_range(self, repo: ActivityLogRepository) -> None:
|
||||
"""Combining since + until must keep rows in [since, until] inclusive."""
|
||||
base = datetime(2026, 6, 1, 12, 0, 0, tzinfo=timezone.utc)
|
||||
repo.record(_entry(ts=base - timedelta(hours=1), message="out_before"))
|
||||
repo.record(_entry(ts=base, message="in_start"))
|
||||
repo.record(_entry(ts=base + timedelta(hours=1), message="in_middle"))
|
||||
repo.record(_entry(ts=base + timedelta(hours=2), message="in_end"))
|
||||
repo.record(_entry(ts=base + timedelta(hours=3), message="out_after"))
|
||||
|
||||
results = repo.query(
|
||||
ActivityLogFilters(since=base, until=base + timedelta(hours=2)),
|
||||
limit=10,
|
||||
)
|
||||
messages = {r.message for r in results}
|
||||
assert {"in_start", "in_middle", "in_end"} == messages
|
||||
|
||||
def test_tz_aware_datetime_round_trip(self, repo: ActivityLogRepository) -> None:
|
||||
"""UTC-aware datetimes must survive storage and come back tz-aware."""
|
||||
ts = datetime(2026, 1, 15, 8, 30, 0, tzinfo=timezone.utc)
|
||||
e = _entry(ts=ts)
|
||||
repo.record(e)
|
||||
|
||||
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
||||
assert got.ts.tzinfo is not None, "Returned ts must be tz-aware"
|
||||
assert got.ts.utcoffset().total_seconds() == 0, "Returned ts must be UTC" # type: ignore[union-attr]
|
||||
assert got.ts == ts
|
||||
|
||||
def test_count_none_equals_count_empty_filters(self, repo: ActivityLogRepository) -> None:
|
||||
"""count(None) == count(ActivityLogFilters()) per acceptance criteria."""
|
||||
for i in range(4):
|
||||
repo.record(_entry())
|
||||
|
||||
assert repo.count(None) == repo.count(ActivityLogFilters())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 5. Codec / data integrity
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCodecDataIntegrity:
|
||||
def test_metadata_nested_dict_round_trip(self, repo: ActivityLogRepository) -> None:
|
||||
"""Deeply nested metadata survives JSON round-trip."""
|
||||
meta = {
|
||||
"level1": {
|
||||
"level2": {"level3": [1, 2, 3]},
|
||||
"list": [{"a": True}, {"b": None}],
|
||||
},
|
||||
"count": 42,
|
||||
}
|
||||
e = _entry(metadata=meta)
|
||||
repo.record(e)
|
||||
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
||||
assert got.metadata == meta
|
||||
|
||||
def test_metadata_unicode_round_trip(self, repo: ActivityLogRepository) -> None:
|
||||
"""Unicode (including emoji and CJK) in metadata survives storage."""
|
||||
meta = {"label": "こんにちは", "emoji": "🎉", "arrow": "→"}
|
||||
e = _entry(metadata=meta)
|
||||
repo.record(e)
|
||||
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
||||
assert got.metadata == meta
|
||||
|
||||
def test_metadata_empty_dict_round_trip(self, repo: ActivityLogRepository) -> None:
|
||||
"""An empty {} metadata must come back as {} not None."""
|
||||
e = _entry(metadata={})
|
||||
repo.record(e)
|
||||
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
||||
assert got.metadata == {}
|
||||
assert isinstance(got.metadata, dict)
|
||||
|
||||
def test_metadata_json_special_chars(self, repo: ActivityLogRepository) -> None:
|
||||
"""Metadata with JSON-special characters (backslash, quotes) round-trips correctly."""
|
||||
meta = {"path": "C:\\Users\\test", "quoted": '"hello"', "newline": "line1\nline2"}
|
||||
e = _entry(metadata=meta)
|
||||
repo.record(e)
|
||||
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
||||
assert got.metadata == meta
|
||||
|
||||
def test_entity_type_none_vs_empty_string(self, repo: ActivityLogRepository) -> None:
|
||||
"""None entity_type must come back as None (not empty string '').
|
||||
These are semantically different — None means 'not applicable'."""
|
||||
e = _entry(entity_type=None, entity_id=None, entity_name=None)
|
||||
repo.record(e)
|
||||
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
||||
# Must be exactly None, not ""
|
||||
assert got.entity_type is None
|
||||
assert got.entity_id is None
|
||||
assert got.entity_name is None
|
||||
|
||||
def test_ts_microsecond_precision_preserved(self, repo: ActivityLogRepository) -> None:
|
||||
"""Microsecond component of ts must survive the isoformat() round-trip."""
|
||||
ts = datetime(2026, 6, 9, 12, 34, 56, 789012, tzinfo=timezone.utc)
|
||||
e = _entry(ts=ts)
|
||||
repo.record(e)
|
||||
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
||||
assert got.ts == ts, f"Expected {ts!r}, got {got.ts!r} — microsecond precision may be lost"
|
||||
|
||||
def test_to_row_does_not_include_seq(self) -> None:
|
||||
"""to_row() must NOT include 'seq' (it's DB-assigned)."""
|
||||
e = _entry()
|
||||
row = e.to_row()
|
||||
assert "seq" not in row, "to_row() must not include seq — it is DB-assigned"
|
||||
|
||||
def test_to_row_has_exactly_11_keys(self) -> None:
|
||||
"""Acceptance criteria: to_row() returns 11 keys."""
|
||||
e = _entry()
|
||||
row = e.to_row()
|
||||
expected_keys = {
|
||||
"id",
|
||||
"ts",
|
||||
"category",
|
||||
"action",
|
||||
"severity",
|
||||
"actor",
|
||||
"entity_type",
|
||||
"entity_id",
|
||||
"entity_name",
|
||||
"message",
|
||||
"metadata",
|
||||
}
|
||||
assert set(row.keys()) == expected_keys
|
||||
|
||||
def test_from_row_ignores_seq_column(self) -> None:
|
||||
"""from_row() must not raise or fail when 'seq' is present in the dict."""
|
||||
e = _entry()
|
||||
row = e.to_row()
|
||||
row["seq"] = 42 # inject seq as if from DB
|
||||
recovered = ActivityLogEntry.from_row(row)
|
||||
assert recovered.id == e.id
|
||||
|
||||
def test_from_row_naive_ts_becomes_utc_aware(self) -> None:
|
||||
"""If a stored ts has no timezone offset (legacy row), from_row must attach UTC."""
|
||||
e = _entry()
|
||||
row = e.to_row()
|
||||
# Strip timezone from the isoformat string to simulate a legacy row
|
||||
row["ts"] = datetime(2026, 1, 1, 10, 0, 0).isoformat() # naive
|
||||
recovered = ActivityLogEntry.from_row(row)
|
||||
assert recovered.ts.tzinfo is not None, "Legacy naive ts must become tz-aware (UTC)"
|
||||
|
||||
def test_metadata_with_numeric_keys_round_trip(self, repo: ActivityLogRepository) -> None:
|
||||
"""JSON only supports string keys; numeric keys are coerced to strings."""
|
||||
# This tests that the codec doesn't silently crash on non-string keys
|
||||
# (Python allows them but JSON does not — json.dumps coerces to string)
|
||||
meta = {1: "one", "two": 2}
|
||||
e = _entry(metadata=meta) # type: ignore[arg-type]
|
||||
repo.record(e)
|
||||
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
||||
# json.dumps coerces int key 1 → "1"
|
||||
assert "1" in got.metadata or 1 in got.metadata
|
||||
|
||||
def test_all_category_values_round_trip(self, repo: ActivityLogRepository) -> None:
|
||||
"""Every ActivityCategory constant must survive storage without corruption."""
|
||||
for cat in ActivityCategory.ALL:
|
||||
repo.record(_entry(category=cat, message=f"cat:{cat}"))
|
||||
|
||||
for cat in ActivityCategory.ALL:
|
||||
results = repo.query(ActivityLogFilters(categories=[cat]), limit=10)
|
||||
assert len(results) == 1
|
||||
assert results[0].category == cat
|
||||
|
||||
def test_all_severity_values_round_trip(self, repo: ActivityLogRepository) -> None:
|
||||
"""Every ActivitySeverity constant must survive storage without corruption."""
|
||||
for sev in ActivitySeverity.ALL:
|
||||
repo.record(_entry(severity=sev, message=f"sev:{sev}"))
|
||||
|
||||
for sev in ActivitySeverity.ALL:
|
||||
results = repo.query(ActivityLogFilters(severities=[sev]), limit=10)
|
||||
assert len(results) == 1
|
||||
assert results[0].severity == sev
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 6. Migration idempotency (additional structural checks)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMigrationIdempotencyExtended:
|
||||
def test_table_has_autoincrement_seq(self, tmp_db: Database) -> None:
|
||||
"""The seq column must be INTEGER PRIMARY KEY AUTOINCREMENT — never reuse deleted seqs."""
|
||||
repo = ActivityLogRepository(tmp_db)
|
||||
e1 = _entry(message="first")
|
||||
e2 = _entry(message="second")
|
||||
repo.record(e1)
|
||||
repo.record(e2)
|
||||
seq1 = _get_seq(repo, e1.id)
|
||||
seq2 = _get_seq(repo, e2.id)
|
||||
assert seq2 > seq1, "AUTOINCREMENT must produce monotonically increasing seqs"
|
||||
|
||||
# After clear, a new record must get a seq higher than the previous max
|
||||
repo.clear()
|
||||
e3 = _entry(message="third")
|
||||
repo.record(e3)
|
||||
seq3 = _get_seq(repo, e3.id)
|
||||
assert seq3 > seq2, "AUTOINCREMENT must not reuse seqs after DELETE"
|
||||
|
||||
def test_all_expected_indexes_present(self, tmp_db: Database) -> None:
|
||||
"""All 5 indexes declared in the acceptance criteria must exist."""
|
||||
ActivityLogRepository(tmp_db)
|
||||
|
||||
cursor = tmp_db.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='activity_log'"
|
||||
)
|
||||
index_names = {row["name"] for row in cursor.fetchall()}
|
||||
required = {
|
||||
"idx_activity_log_ts_seq",
|
||||
"idx_activity_log_category",
|
||||
"idx_activity_log_severity",
|
||||
"idx_activity_log_actor",
|
||||
"idx_activity_log_entity",
|
||||
}
|
||||
missing = required - index_names
|
||||
assert not missing, f"Missing indexes: {missing}"
|
||||
|
||||
def test_id_column_has_unique_constraint(self, tmp_db: Database) -> None:
|
||||
"""Inserting duplicate id must raise IntegrityError."""
|
||||
import sqlite3 as sqlite_module
|
||||
|
||||
repo = ActivityLogRepository(tmp_db)
|
||||
fixed_id = f"al_{uuid.uuid4().hex[:8]}"
|
||||
repo.record(_entry(id=fixed_id, message="first"))
|
||||
|
||||
with pytest.raises((Exception, sqlite_module.IntegrityError)):
|
||||
repo.record(_entry(id=fixed_id, message="duplicate id"))
|
||||
|
||||
def test_migration_name_is_002_add_activity_log(self, tmp_db: Database) -> None:
|
||||
"""The migration name must exactly match '002_add_activity_log'."""
|
||||
from ledgrab.storage.data_migrations import AddActivityLogTableMigration
|
||||
|
||||
migration = AddActivityLogTableMigration()
|
||||
assert migration.name == "002_add_activity_log"
|
||||
|
||||
def test_migration_is_second_in_all_migrations(self) -> None:
|
||||
"""AddActivityLogTableMigration must be at index [1] in ALL_MIGRATIONS."""
|
||||
from ledgrab.storage.data_migrations import (
|
||||
ALL_MIGRATIONS,
|
||||
AddActivityLogTableMigration,
|
||||
)
|
||||
|
||||
assert len(ALL_MIGRATIONS) >= 2, "ALL_MIGRATIONS must have at least 2 entries"
|
||||
assert isinstance(
|
||||
ALL_MIGRATIONS[1], AddActivityLogTableMigration
|
||||
), "AddActivityLogTableMigration must be the second migration (index 1)"
|
||||
|
||||
def test_apply_twice_is_noop_no_error(self, tmp_db: Database) -> None:
|
||||
"""Calling apply() on the connection twice must not raise — IF NOT EXISTS ensures this."""
|
||||
from ledgrab.storage.data_migrations import AddActivityLogTableMigration
|
||||
|
||||
migration = AddActivityLogTableMigration()
|
||||
with tmp_db.transaction() as conn:
|
||||
migration.apply(conn)
|
||||
# Second apply — must not raise
|
||||
with tmp_db.transaction() as conn:
|
||||
migration.apply(conn)
|
||||
|
||||
# Table should still be accessible
|
||||
cursor = tmp_db.execute("SELECT COUNT(*) AS cnt FROM activity_log")
|
||||
assert cursor.fetchone()["cnt"] == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 7. iter_export vs query consistency
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestIterExportConsistency:
|
||||
def test_iter_export_empty_table_yields_nothing(self, repo: ActivityLogRepository) -> None:
|
||||
"""iter_export on empty table must yield nothing, not raise."""
|
||||
exported = list(repo.iter_export())
|
||||
assert exported == []
|
||||
|
||||
def test_iter_export_matches_query_results(self, repo: ActivityLogRepository) -> None:
|
||||
"""iter_export(filters) and query(filters) must return the same entries."""
|
||||
for i in range(8):
|
||||
cat = ActivityCategory.AUTH if i % 2 == 0 else ActivityCategory.DEVICE
|
||||
repo.record(_entry(category=cat, message=f"e{i}"))
|
||||
|
||||
filters = ActivityLogFilters(categories=[ActivityCategory.AUTH])
|
||||
|
||||
exported_ids = {e.id for e in repo.iter_export(filters)}
|
||||
queried_ids = {e.id for e in repo.query(filters, limit=100)}
|
||||
assert (
|
||||
exported_ids == queried_ids
|
||||
), "iter_export and query must return the same set of entries for the same filters"
|
||||
|
||||
def test_iter_export_none_filter_yields_all(self, repo: ActivityLogRepository) -> None:
|
||||
"""iter_export(None) must yield all rows (same as query with no filter)."""
|
||||
for i in range(5):
|
||||
repo.record(_entry(message=f"e{i}"))
|
||||
|
||||
all_exported = list(repo.iter_export(None))
|
||||
all_queried = repo.query(ActivityLogFilters(), limit=100)
|
||||
|
||||
assert len(all_exported) == len(all_queried) == 5
|
||||
|
||||
def test_iter_export_ascending_seq_order(self, repo: ActivityLogRepository) -> None:
|
||||
"""iter_export must yield rows in ascending seq order (oldest first)."""
|
||||
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
|
||||
for i in range(5):
|
||||
repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}"))
|
||||
|
||||
exported = list(repo.iter_export())
|
||||
seqs = [_get_seq(repo, e.id) for e in exported]
|
||||
assert seqs == sorted(seqs), "iter_export must yield rows in ascending seq order"
|
||||
|
||||
def test_iter_export_respects_message_like_filter(self, repo: ActivityLogRepository) -> None:
|
||||
"""iter_export should honour message_like just as query does."""
|
||||
repo.record(_entry(message="found: hello world"))
|
||||
repo.record(_entry(message="nothing relevant here"))
|
||||
repo.record(_entry(message="also found: hello there"))
|
||||
|
||||
exported = list(repo.iter_export(ActivityLogFilters(message_like="found")))
|
||||
assert len(exported) == 2
|
||||
assert all("found" in e.message for e in exported)
|
||||
|
||||
def test_iter_export_is_lazy_generator(self, repo: ActivityLogRepository) -> None:
|
||||
"""iter_export must return a generator (lazy), not a list."""
|
||||
import types
|
||||
|
||||
for _ in range(3):
|
||||
repo.record(_entry())
|
||||
|
||||
result = repo.iter_export()
|
||||
assert isinstance(
|
||||
result, types.GeneratorType
|
||||
), "iter_export must return a generator for streaming — not a pre-loaded list"
|
||||
@@ -0,0 +1,609 @@
|
||||
"""Unit tests for ActivityLogRepository (Phase 1 — storage layer).
|
||||
|
||||
Coverage
|
||||
--------
|
||||
* round-trip: record + read back, including metadata JSON and UTC ts
|
||||
* filter by each dimension: category, severity, actor, entity_type/id, date range, message_like
|
||||
* keyset pagination stability with same-ts rows (seq tiebreaker)
|
||||
* prune by age (before_ts) and by max_entries
|
||||
* clear; count (filtered + unfiltered); export iterator
|
||||
* migration idempotency: constructing the repo twice does not re-run the migration
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.storage.activity_log import (
|
||||
ActivityCategory,
|
||||
ActivityLogEntry,
|
||||
ActivityLogFilters,
|
||||
ActivitySeverity,
|
||||
)
|
||||
from ledgrab.storage.activity_log_repository import ActivityLogRepository
|
||||
from ledgrab.storage.database import Database
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _now() -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
_SENTINEL = object() # sentinel for "caller did not pass this kwarg"
|
||||
|
||||
|
||||
def _entry(
|
||||
*,
|
||||
id: str | None = None,
|
||||
ts: datetime | None = None,
|
||||
category: str = ActivityCategory.ENTITY,
|
||||
action: str = "entity.created",
|
||||
severity: str = ActivitySeverity.INFO,
|
||||
actor: str = "test_actor",
|
||||
entity_type: str | None = "output_target",
|
||||
entity_id: object = _SENTINEL,
|
||||
entity_name: str | None = "My Target",
|
||||
message: str = "Created output target",
|
||||
metadata: dict | None = None,
|
||||
) -> ActivityLogEntry:
|
||||
"""Build a test ``ActivityLogEntry``.
|
||||
|
||||
``entity_id`` defaults to a random id when not supplied at all. Pass
|
||||
``entity_id=None`` explicitly to get ``None`` stored in the entry.
|
||||
"""
|
||||
resolved_entity_id: str | None = (
|
||||
f"ot_{uuid.uuid4().hex[:8]}" if entity_id is _SENTINEL else entity_id # type: ignore[assignment]
|
||||
)
|
||||
return ActivityLogEntry(
|
||||
id=id or f"al_{uuid.uuid4().hex[:8]}",
|
||||
ts=ts or _now(),
|
||||
category=category,
|
||||
action=action,
|
||||
severity=severity,
|
||||
actor=actor,
|
||||
entity_type=entity_type,
|
||||
entity_id=resolved_entity_id,
|
||||
entity_name=entity_name,
|
||||
message=message,
|
||||
metadata=metadata if metadata is not None else {},
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def repo(tmp_db: Database) -> ActivityLogRepository:
|
||||
"""Fresh ActivityLogRepository backed by a temp database."""
|
||||
return ActivityLogRepository(tmp_db)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Round-trip
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRoundTrip:
|
||||
def test_record_and_read_back(self, repo: ActivityLogRepository) -> None:
|
||||
e = _entry(message="Hello world", metadata={"key": "value", "n": 42})
|
||||
repo.record(e)
|
||||
|
||||
page = repo.query(ActivityLogFilters(), limit=10)
|
||||
assert len(page) == 1
|
||||
got = page[0]
|
||||
assert got.id == e.id
|
||||
assert got.category == e.category
|
||||
assert got.action == e.action
|
||||
assert got.severity == e.severity
|
||||
assert got.actor == e.actor
|
||||
assert got.entity_type == e.entity_type
|
||||
assert got.entity_id == e.entity_id
|
||||
assert got.entity_name == e.entity_name
|
||||
assert got.message == e.message
|
||||
|
||||
def test_metadata_json_round_trip(self, repo: ActivityLogRepository) -> None:
|
||||
meta = {"device": "wled_01", "led_count": 150, "nested": {"x": True}}
|
||||
e = _entry(metadata=meta)
|
||||
repo.record(e)
|
||||
|
||||
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
||||
assert got.metadata == meta
|
||||
|
||||
def test_utc_timestamp_preserved(self, repo: ActivityLogRepository) -> None:
|
||||
ts = datetime(2026, 1, 15, 12, 30, 45, tzinfo=timezone.utc)
|
||||
e = _entry(ts=ts)
|
||||
repo.record(e)
|
||||
|
||||
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
||||
# Should round-trip to the same UTC moment
|
||||
assert got.ts.replace(tzinfo=timezone.utc) == ts.replace(tzinfo=timezone.utc)
|
||||
|
||||
def test_none_optional_fields_preserved(self, repo: ActivityLogRepository) -> None:
|
||||
e = _entry(entity_type=None, entity_id=None, entity_name=None)
|
||||
repo.record(e)
|
||||
|
||||
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
||||
assert got.entity_type is None
|
||||
assert got.entity_id is None
|
||||
assert got.entity_name is None
|
||||
|
||||
def test_empty_metadata_default(self, repo: ActivityLogRepository) -> None:
|
||||
e = _entry(metadata={})
|
||||
repo.record(e)
|
||||
|
||||
got = repo.query(ActivityLogFilters(), limit=1)[0]
|
||||
assert got.metadata == {}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Filters
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFilters:
|
||||
def test_filter_by_category(self, repo: ActivityLogRepository) -> None:
|
||||
repo.record(_entry(category=ActivityCategory.AUTH, action="auth.rejected"))
|
||||
repo.record(_entry(category=ActivityCategory.DEVICE, action="device.connected"))
|
||||
repo.record(_entry(category=ActivityCategory.ENTITY, action="entity.deleted"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(categories=[ActivityCategory.AUTH]), limit=10)
|
||||
assert len(results) == 1
|
||||
assert results[0].category == ActivityCategory.AUTH
|
||||
|
||||
def test_filter_by_multiple_categories(self, repo: ActivityLogRepository) -> None:
|
||||
repo.record(_entry(category=ActivityCategory.AUTH))
|
||||
repo.record(_entry(category=ActivityCategory.DEVICE))
|
||||
repo.record(_entry(category=ActivityCategory.SYSTEM))
|
||||
|
||||
results = repo.query(
|
||||
ActivityLogFilters(categories=[ActivityCategory.AUTH, ActivityCategory.DEVICE]),
|
||||
limit=10,
|
||||
)
|
||||
assert len(results) == 2
|
||||
cats = {r.category for r in results}
|
||||
assert cats == {ActivityCategory.AUTH, ActivityCategory.DEVICE}
|
||||
|
||||
def test_filter_by_severity(self, repo: ActivityLogRepository) -> None:
|
||||
repo.record(_entry(severity=ActivitySeverity.INFO))
|
||||
repo.record(_entry(severity=ActivitySeverity.WARNING))
|
||||
repo.record(_entry(severity=ActivitySeverity.ERROR))
|
||||
|
||||
results = repo.query(ActivityLogFilters(severities=[ActivitySeverity.ERROR]), limit=10)
|
||||
assert len(results) == 1
|
||||
assert results[0].severity == ActivitySeverity.ERROR
|
||||
|
||||
def test_filter_by_multiple_severities(self, repo: ActivityLogRepository) -> None:
|
||||
repo.record(_entry(severity=ActivitySeverity.INFO))
|
||||
repo.record(_entry(severity=ActivitySeverity.WARNING))
|
||||
repo.record(_entry(severity=ActivitySeverity.ERROR))
|
||||
|
||||
results = repo.query(
|
||||
ActivityLogFilters(severities=[ActivitySeverity.WARNING, ActivitySeverity.ERROR]),
|
||||
limit=10,
|
||||
)
|
||||
assert len(results) == 2
|
||||
|
||||
def test_filter_by_actor(self, repo: ActivityLogRepository) -> None:
|
||||
repo.record(_entry(actor="alice"))
|
||||
repo.record(_entry(actor="bob"))
|
||||
repo.record(_entry(actor="alice"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(actor="alice"), limit=10)
|
||||
assert len(results) == 2
|
||||
assert all(r.actor == "alice" for r in results)
|
||||
|
||||
def test_filter_by_entity_type(self, repo: ActivityLogRepository) -> None:
|
||||
repo.record(_entry(entity_type="output_target"))
|
||||
repo.record(_entry(entity_type="device"))
|
||||
repo.record(_entry(entity_type="output_target"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(entity_type="output_target"), limit=10)
|
||||
assert len(results) == 2
|
||||
|
||||
def test_filter_by_entity_id(self, repo: ActivityLogRepository) -> None:
|
||||
repo.record(_entry(entity_id="ot_aabbccdd"))
|
||||
repo.record(_entry(entity_id="ot_11223344"))
|
||||
repo.record(_entry(entity_id="ot_aabbccdd"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(entity_id="ot_aabbccdd"), limit=10)
|
||||
assert len(results) == 2
|
||||
|
||||
def test_filter_by_since(self, repo: ActivityLogRepository) -> None:
|
||||
base = datetime(2026, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
|
||||
repo.record(_entry(ts=base - timedelta(hours=2), message="old"))
|
||||
repo.record(_entry(ts=base, message="boundary"))
|
||||
repo.record(_entry(ts=base + timedelta(hours=1), message="new"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(since=base), limit=10)
|
||||
assert len(results) == 2
|
||||
messages = {r.message for r in results}
|
||||
assert "old" not in messages
|
||||
|
||||
def test_filter_by_until(self, repo: ActivityLogRepository) -> None:
|
||||
base = datetime(2026, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
|
||||
repo.record(_entry(ts=base - timedelta(hours=1), message="old"))
|
||||
repo.record(_entry(ts=base, message="boundary"))
|
||||
repo.record(_entry(ts=base + timedelta(hours=2), message="new"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(until=base), limit=10)
|
||||
assert len(results) == 2
|
||||
messages = {r.message for r in results}
|
||||
assert "new" not in messages
|
||||
|
||||
def test_filter_message_like_substring(self, repo: ActivityLogRepository) -> None:
|
||||
repo.record(_entry(message="Created output target MyStrip"))
|
||||
repo.record(_entry(message="Deleted device sensor-01"))
|
||||
repo.record(_entry(message="Updated output target MyStrip"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(message_like="output target"), limit=10)
|
||||
assert len(results) == 2
|
||||
|
||||
def test_filter_message_like_escapes_percent(self, repo: ActivityLogRepository) -> None:
|
||||
"""A literal % in message_like should not act as a SQL wildcard."""
|
||||
repo.record(_entry(message="100% done"))
|
||||
repo.record(_entry(message="partial done"))
|
||||
|
||||
results = repo.query(ActivityLogFilters(message_like="100%"), limit=10)
|
||||
assert len(results) == 1
|
||||
assert results[0].message == "100% done"
|
||||
|
||||
def test_combined_filters(self, repo: ActivityLogRepository) -> None:
|
||||
repo.record(
|
||||
_entry(
|
||||
actor="alice",
|
||||
category=ActivityCategory.ENTITY,
|
||||
severity=ActivitySeverity.INFO,
|
||||
)
|
||||
)
|
||||
repo.record(
|
||||
_entry(
|
||||
actor="alice",
|
||||
category=ActivityCategory.AUTH,
|
||||
severity=ActivitySeverity.WARNING,
|
||||
)
|
||||
)
|
||||
repo.record(
|
||||
_entry(
|
||||
actor="bob",
|
||||
category=ActivityCategory.ENTITY,
|
||||
severity=ActivitySeverity.INFO,
|
||||
)
|
||||
)
|
||||
|
||||
results = repo.query(
|
||||
ActivityLogFilters(
|
||||
actor="alice",
|
||||
categories=[ActivityCategory.ENTITY],
|
||||
severities=[ActivitySeverity.INFO],
|
||||
),
|
||||
limit=10,
|
||||
)
|
||||
assert len(results) == 1
|
||||
assert results[0].actor == "alice"
|
||||
assert results[0].category == ActivityCategory.ENTITY
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Keyset pagination
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestKeysetPagination:
|
||||
def test_basic_pagination(self, repo: ActivityLogRepository) -> None:
|
||||
"""Records returned across two pages cover the full set without overlap."""
|
||||
for i in range(10):
|
||||
repo.record(_entry(message=f"entry {i}"))
|
||||
|
||||
page1 = repo.query(ActivityLogFilters(), limit=4)
|
||||
assert len(page1) == 4
|
||||
|
||||
# The last entry on page 1 has the smallest seq — use it as the cursor
|
||||
# We need the seq; query internally reverses, so page1[0] is oldest on page
|
||||
# and page1[-1] is newest on page. We need the min seq to paginate.
|
||||
# The repo returns entries in ascending order within a page, so page1[0]
|
||||
# has the smallest seq on the page.
|
||||
first_seq = self._get_seq(repo, page1[0].id)
|
||||
page2 = repo.query(ActivityLogFilters(), before_seq=first_seq, limit=4)
|
||||
assert len(page2) == 4
|
||||
|
||||
ids1 = {e.id for e in page1}
|
||||
ids2 = {e.id for e in page2}
|
||||
assert ids1.isdisjoint(ids2), "Pages must not overlap"
|
||||
|
||||
page3 = repo.query(
|
||||
ActivityLogFilters(),
|
||||
before_seq=self._get_seq(repo, page2[0].id),
|
||||
limit=4,
|
||||
)
|
||||
assert len(page3) == 2 # 10 total; 4 + 4 + 2
|
||||
|
||||
def test_same_ts_stability(self, repo: ActivityLogRepository) -> None:
|
||||
"""Rows with identical ts are ordered by seq, not ts — no duplicates across pages."""
|
||||
# Insert 6 rows all sharing the exact same timestamp
|
||||
same_ts = datetime(2026, 3, 10, 15, 0, 0, tzinfo=timezone.utc)
|
||||
entries = [_entry(ts=same_ts, message=f"same-ts {i}") for i in range(6)]
|
||||
for e in entries:
|
||||
repo.record(e)
|
||||
|
||||
page1 = repo.query(ActivityLogFilters(), limit=3)
|
||||
first_seq = self._get_seq(repo, page1[0].id)
|
||||
page2 = repo.query(ActivityLogFilters(), before_seq=first_seq, limit=3)
|
||||
|
||||
ids1 = {e.id for e in page1}
|
||||
ids2 = {e.id for e in page2}
|
||||
assert ids1.isdisjoint(ids2), "Same-ts rows leaked across page boundary"
|
||||
assert ids1 | ids2 == {e.id for e in entries}, "All rows covered exactly once"
|
||||
|
||||
def test_empty_page_at_end(self, repo: ActivityLogRepository) -> None:
|
||||
"""Requesting a page beyond the last entry returns an empty list."""
|
||||
for i in range(3):
|
||||
repo.record(_entry(message=f"e{i}"))
|
||||
|
||||
page1 = repo.query(ActivityLogFilters(), limit=3)
|
||||
assert len(page1) == 3
|
||||
first_seq = self._get_seq(repo, page1[0].id)
|
||||
page2 = repo.query(ActivityLogFilters(), before_seq=first_seq, limit=3)
|
||||
assert page2 == []
|
||||
|
||||
@staticmethod
|
||||
def _get_seq(repo: ActivityLogRepository, entry_id: str) -> int:
|
||||
"""Helper: retrieve the seq for an entry by its application id."""
|
||||
cursor = repo._db.execute("SELECT seq FROM activity_log WHERE id = ?", (entry_id,))
|
||||
row = cursor.fetchone()
|
||||
assert row is not None, f"No row found for id={entry_id!r}"
|
||||
return int(row["seq"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Prune
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPrune:
|
||||
def test_prune_by_age(self, repo: ActivityLogRepository) -> None:
|
||||
base = datetime(2026, 2, 1, 12, 0, 0, tzinfo=timezone.utc)
|
||||
repo.record(_entry(ts=base - timedelta(days=10), message="old1"))
|
||||
repo.record(_entry(ts=base - timedelta(days=5), message="old2"))
|
||||
repo.record(_entry(ts=base, message="boundary"))
|
||||
repo.record(_entry(ts=base + timedelta(days=1), message="new"))
|
||||
|
||||
# Prune everything strictly older than base
|
||||
deleted = repo.prune(before_ts=base)
|
||||
assert deleted == 2
|
||||
remaining = repo.query(ActivityLogFilters(), limit=20)
|
||||
messages = {r.message for r in remaining}
|
||||
assert "old1" not in messages
|
||||
assert "old2" not in messages
|
||||
assert "boundary" in messages
|
||||
assert "new" in messages
|
||||
|
||||
def test_prune_by_max_entries(self, repo: ActivityLogRepository) -> None:
|
||||
for i in range(10):
|
||||
repo.record(_entry(message=f"entry {i}"))
|
||||
|
||||
deleted = repo.prune(max_entries=3)
|
||||
assert deleted == 7
|
||||
assert repo.count() == 3
|
||||
|
||||
def test_prune_keeps_newest_on_max_entries(self, repo: ActivityLogRepository) -> None:
|
||||
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
|
||||
ids = []
|
||||
for i in range(5):
|
||||
e = _entry(ts=base + timedelta(hours=i), message=f"entry {i}")
|
||||
ids.append(e.id)
|
||||
repo.record(e)
|
||||
|
||||
# Keep only the 2 newest
|
||||
repo.prune(max_entries=2)
|
||||
remaining = repo.query(ActivityLogFilters(), limit=10)
|
||||
remaining_ids = {r.id for r in remaining}
|
||||
# The 2 newest are the last 2 inserted (highest seq)
|
||||
assert ids[3] in remaining_ids
|
||||
assert ids[4] in remaining_ids
|
||||
assert ids[0] not in remaining_ids
|
||||
|
||||
def test_prune_both_predicates(self, repo: ActivityLogRepository) -> None:
|
||||
base = datetime(2026, 2, 1, 0, 0, 0, tzinfo=timezone.utc)
|
||||
# Insert 6 entries: 3 old, 3 recent
|
||||
for i in range(3):
|
||||
repo.record(_entry(ts=base - timedelta(days=i + 1), message=f"old{i}"))
|
||||
for i in range(3):
|
||||
repo.record(_entry(ts=base + timedelta(hours=i), message=f"new{i}"))
|
||||
|
||||
# Prune old entries AND keep at most 2 of the remaining
|
||||
deleted = repo.prune(before_ts=base, max_entries=2)
|
||||
# 3 age-pruned + 1 count-pruned = 4
|
||||
assert deleted == 4
|
||||
assert repo.count() == 2
|
||||
|
||||
def test_prune_max_entries_zero_clears_all(self, repo: ActivityLogRepository) -> None:
|
||||
for i in range(5):
|
||||
repo.record(_entry())
|
||||
|
||||
repo.prune(max_entries=0)
|
||||
assert repo.count() == 0
|
||||
|
||||
def test_prune_no_op_when_below_max(self, repo: ActivityLogRepository) -> None:
|
||||
for i in range(3):
|
||||
repo.record(_entry())
|
||||
|
||||
deleted = repo.prune(max_entries=10)
|
||||
assert deleted == 0
|
||||
assert repo.count() == 3
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Clear
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestClear:
|
||||
def test_clear_returns_row_count(self, repo: ActivityLogRepository) -> None:
|
||||
for _ in range(5):
|
||||
repo.record(_entry())
|
||||
|
||||
deleted = repo.clear()
|
||||
assert deleted == 5
|
||||
|
||||
def test_clear_empties_table(self, repo: ActivityLogRepository) -> None:
|
||||
for _ in range(3):
|
||||
repo.record(_entry())
|
||||
|
||||
repo.clear()
|
||||
assert repo.count() == 0
|
||||
|
||||
def test_clear_on_empty_table(self, repo: ActivityLogRepository) -> None:
|
||||
deleted = repo.clear()
|
||||
assert deleted == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Count
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCount:
|
||||
def test_count_all(self, repo: ActivityLogRepository) -> None:
|
||||
for _ in range(7):
|
||||
repo.record(_entry())
|
||||
assert repo.count() == 7
|
||||
|
||||
def test_count_filtered(self, repo: ActivityLogRepository) -> None:
|
||||
repo.record(_entry(category=ActivityCategory.AUTH))
|
||||
repo.record(_entry(category=ActivityCategory.DEVICE))
|
||||
repo.record(_entry(category=ActivityCategory.AUTH))
|
||||
|
||||
n = repo.count(ActivityLogFilters(categories=[ActivityCategory.AUTH]))
|
||||
assert n == 2
|
||||
|
||||
def test_count_empty(self, repo: ActivityLogRepository) -> None:
|
||||
assert repo.count() == 0
|
||||
|
||||
def test_count_no_match(self, repo: ActivityLogRepository) -> None:
|
||||
repo.record(_entry(category=ActivityCategory.ENTITY))
|
||||
n = repo.count(ActivityLogFilters(categories=[ActivityCategory.AUTH]))
|
||||
assert n == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Export iterator
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExportIterator:
|
||||
def test_iter_export_yields_all_rows(self, repo: ActivityLogRepository) -> None:
|
||||
entries = [_entry(message=f"e{i}") for i in range(5)]
|
||||
for e in entries:
|
||||
repo.record(e)
|
||||
|
||||
exported = list(repo.iter_export())
|
||||
assert len(exported) == 5
|
||||
exported_ids = {e.id for e in exported}
|
||||
assert exported_ids == {e.id for e in entries}
|
||||
|
||||
def test_iter_export_ascending_order(self, repo: ActivityLogRepository) -> None:
|
||||
base = datetime(2026, 4, 1, tzinfo=timezone.utc)
|
||||
for i in range(5):
|
||||
repo.record(_entry(ts=base + timedelta(seconds=i), message=f"e{i}"))
|
||||
|
||||
exported = list(repo.iter_export())
|
||||
seqs = [
|
||||
repo._db.execute("SELECT seq FROM activity_log WHERE id = ?", (e.id,)).fetchone()["seq"]
|
||||
for e in exported
|
||||
]
|
||||
assert seqs == sorted(seqs), "iter_export must yield rows in ascending seq order"
|
||||
|
||||
def test_iter_export_with_filter(self, repo: ActivityLogRepository) -> None:
|
||||
repo.record(_entry(category=ActivityCategory.AUTH))
|
||||
repo.record(_entry(category=ActivityCategory.ENTITY))
|
||||
repo.record(_entry(category=ActivityCategory.AUTH))
|
||||
|
||||
exported = list(repo.iter_export(ActivityLogFilters(categories=[ActivityCategory.AUTH])))
|
||||
assert len(exported) == 2
|
||||
assert all(e.category == ActivityCategory.AUTH for e in exported)
|
||||
|
||||
def test_iter_export_streaming_not_all_in_memory(self, repo: ActivityLogRepository) -> None:
|
||||
"""Verify iter_export is a generator (lazy), not a pre-loaded list."""
|
||||
import types
|
||||
|
||||
for _ in range(3):
|
||||
repo.record(_entry())
|
||||
|
||||
result = repo.iter_export()
|
||||
assert isinstance(result, types.GeneratorType)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Migration idempotency
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMigrationIdempotency:
|
||||
def test_construct_repo_twice_is_noop(self, tmp_db: Database) -> None:
|
||||
"""Creating two repos on the same DB does not re-run the migration."""
|
||||
repo1 = ActivityLogRepository(tmp_db)
|
||||
repo1.record(_entry(message="before second construction"))
|
||||
|
||||
# Second construction must not raise or re-apply the migration
|
||||
repo2 = ActivityLogRepository(tmp_db)
|
||||
assert repo2.count() == 1
|
||||
|
||||
# Confirm migration is recorded exactly once
|
||||
# Count how many times our migration name appears (should be 1)
|
||||
cursor = tmp_db.execute(
|
||||
"SELECT COUNT(*) AS cnt FROM data_migrations WHERE name = ?",
|
||||
("002_add_activity_log",),
|
||||
)
|
||||
assert cursor.fetchone()["cnt"] == 1
|
||||
|
||||
def test_running_migrations_twice_is_noop(self, tmp_db: Database) -> None:
|
||||
"""MigrationRunner.run is idempotent for AddActivityLogTableMigration."""
|
||||
from ledgrab.storage.data_migrations import (
|
||||
AddActivityLogTableMigration,
|
||||
MigrationRunner,
|
||||
)
|
||||
|
||||
runner = MigrationRunner(tmp_db)
|
||||
migration = AddActivityLogTableMigration()
|
||||
|
||||
first_run = runner.run([migration])
|
||||
assert len(first_run) == 1
|
||||
assert first_run[0].name == "002_add_activity_log"
|
||||
|
||||
second_run = runner.run([migration])
|
||||
assert second_run == [], "Second run must be a no-op"
|
||||
|
||||
def test_activity_log_table_exists_after_migration(self, tmp_db: Database) -> None:
|
||||
"""The activity_log table is present after the migration runs."""
|
||||
ActivityLogRepository(tmp_db)
|
||||
|
||||
cursor = tmp_db.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND name='activity_log'"
|
||||
)
|
||||
assert cursor.fetchone() is not None
|
||||
|
||||
def test_activity_log_indexes_exist_after_migration(self, tmp_db: Database) -> None:
|
||||
"""All declared indexes are present after migration."""
|
||||
ActivityLogRepository(tmp_db)
|
||||
|
||||
cursor = tmp_db.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='activity_log'"
|
||||
)
|
||||
index_names = {row["name"] for row in cursor.fetchall()}
|
||||
expected = {
|
||||
"idx_activity_log_ts_seq",
|
||||
"idx_activity_log_category",
|
||||
"idx_activity_log_severity",
|
||||
"idx_activity_log_actor",
|
||||
"idx_activity_log_entity",
|
||||
}
|
||||
assert expected.issubset(index_names)
|
||||
Reference in New Issue
Block a user