From 4a0927521af6e6949dd7db6844e5c3c55d772ffb Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Tue, 9 Jun 2026 20:09:46 +0300 Subject: [PATCH] feat(activity-log): phase 4 - REST API (list/export/settings/clear) - GET /activity-log: filtered, keyset-paginated list (categories/severities/actor/entity/date/q) - GET /activity-log/export: streaming CSV/JSON, chunked keyset (releases DB lock per batch), CSV formula-injection guard - GET/PUT /activity-log/settings: retention config (PUT require_authenticated) - DELETE /activity-log: clear (require_authenticated, self-audited) - security: export DoS fix, settings-PUT auth gate, CSV \t/\r guard, metadata-as-JSON - 122 API tests (auth posture, CSV injection, pagination integrity, filters, settings bounds, clear-audited) --- plans/activity-log/CONTEXT.md | 3 +- plans/activity-log/PLAN.md | 6 +- plans/activity-log/phase-4-api.md | 106 +- server/src/ledgrab/api/__init__.py | 2 + server/src/ledgrab/api/routes/activity_log.py | 436 +++++++ .../src/ledgrab/api/schemas/activity_log.py | 93 ++ .../storage/activity_log_repository.py | 87 +- .../tests/api/routes/test_activity_log_api.py | 733 +++++++++++ .../test_activity_log_api_adversarial.py | 1162 +++++++++++++++++ 9 files changed, 2594 insertions(+), 34 deletions(-) create mode 100644 server/src/ledgrab/api/routes/activity_log.py create mode 100644 server/src/ledgrab/api/schemas/activity_log.py create mode 100644 server/tests/api/routes/test_activity_log_api.py create mode 100644 server/tests/api/routes/test_activity_log_api_adversarial.py diff --git a/plans/activity-log/CONTEXT.md b/plans/activity-log/CONTEXT.md index ff62734..1009f54 100644 --- a/plans/activity-log/CONTEXT.md +++ b/plans/activity-log/CONTEXT.md @@ -48,7 +48,7 @@ context (survives across phases; graduates to CLAUDE.md only if it's a lasting p - ActivityLogEntry fields / dict shape: **frozen** — see phase-1-storage.md Handoff section. 11 fields: `id`, `ts`, `category`, `action`, `severity`, `actor`, `message`, `entity_type`, `entity_id`, `entity_name`, `metadata`. `seq` is DB-only (not on dataclass). - ActivityLogFilters shape: **frozen** — 8 optional fields: `categories`, `severities`, `actor`, `entity_type`, `entity_id`, `since`, `until`, `message_like`. See phase-1-storage.md Handoff. - recorder.record(...) signature + actor ContextVar import path: **frozen** — see phase-2-recorder-retention.md Handoff section. Signature: `record(category, action, *, severity="info", actor=None, entity_type=None, entity_id=None, entity_name=None, message, metadata=None, _bypass_enabled=False)`. ContextVar: `from ledgrab.core.activity_log.context import current_actor`. Module accessor: `from ledgrab.core.activity_log.recorder import get_module_recorder`. Event payload: `{"type": "activity_logged", "entry": {11-field dict with ts as ISO string, metadata as dict}}`. DI getters: `get_activity_recorder()`, `get_activity_log_repo()`, `get_activity_log_retention_engine()`. -- API endpoints + query params + page envelope + settings bounds: _(Phase 4 handoff)_ +- API endpoints + query params + page envelope + settings bounds: **frozen** — see phase-4-api.md Handoff section. Endpoints: `GET /api/v1/activity-log` (list, AuthRequired), `GET /api/v1/activity-log/export` (stream CSV/JSON, require_authenticated), `GET|PUT /api/v1/activity-log/settings` (AuthRequired), `DELETE /api/v1/activity-log` (clear, require_authenticated). Page envelope: `entries`, `next_before_seq`, `has_more`, `total`. Settings fields: `enabled` (bool), `max_days` (0–3650), `max_entries` (0–10_000_000). Export: `?format=csv|json`. ## Failed approaches / rejected designs @@ -67,4 +67,5 @@ context (survives across phases; graduates to CLAUDE.md only if it's a lasting p Phase 1 landed (2026-06-09): `activity_log.py` (dataclass + enums + filters + codec), `AddActivityLogTableMigration` (`002_add_activity_log`) appended to `ALL_MIGRATIONS`, `ActivityLogRepository` (record/query/count/prune/clear/iter_export), 41 new tests — all green. Full suite 2226 passed, 0 failed. Schema and method signatures frozen in phase-1-storage.md Handoff. Gotcha: `Database.execute` takes a positional tuple — use `?` placeholders (not `:name`), otherwise Python 3.14 will raise `ProgrammingError`. Phase 2 landed (2026-06-09): `core/activity_log/` package (`context.py`, `recorder.py`, `retention.py`, `__init__.py`); actor ContextVar set in `api/auth.py` (both branches); `ActivityLogRetentionEngine` mirroring AutoBackupEngine; full wiring in `main.py` (repo at module level, recorder+engine in lifespan, `server.shutting_down` first shutdown action, engine stop before db.close); DI getters in `api/dependencies.py`; `activity_logged` added to `_ALLOWED_SERVER_EVENT_TYPES` in `events-ws.ts`; `set_module_recorder` exposes recorder to non-DI sites; 24 new tests — all green. Full suite 2309 passed, 2 skipped, 0 failed. Ruff clean. +Phase 4 landed (2026-06-09): schemas (`api/schemas/activity_log.py`), routes (`api/routes/activity_log.py`: list/export/settings/clear), router registration in `api/__init__.py`, `get_seq_for_id` helper on `ActivityLogRepository`. 49 new tests — all green. Full suite 2486 passed, 2 skipped, 0 failed. Ruff clean. Pagination bug found and fixed (limit+1 probe must drop oldest row when has_more, not tail). Phase 3 landed (2026-06-09): instrumented all four categories — entity CRUD via `fire_entity_event` choke-point (`dependencies.py`), auth failures + WS session in `auth.py`, device online/offline in `device_health.py`, device discovered/lost in `discovery_watcher.py`, ADB connect/disconnect in `system_settings.py`, capture start/stop (individual + bulk) in `output_targets_control.py`, scene/playlist/automation activate in their respective route/engine files, backup/restore/delete + restart/shutdown/update/calibration/settings in `backup.py`/`update.py`/`calibration.py`; all 11 entity delete handlers pass `entity_name` to `fire_entity_event`; 22 new tests (security: token never in any field, explicitly asserted) — all green. Full suite 2369 passed, 2 skipped, 0 failed. Ruff clean. Complete (category, action) inventory in phase-3-instrumentation.md Handoff section. diff --git a/plans/activity-log/PLAN.md b/plans/activity-log/PLAN.md index 027f382..54fc375 100644 --- a/plans/activity-log/PLAN.md +++ b/plans/activity-log/PLAN.md @@ -82,7 +82,7 @@ is an on-demand CSV/JSON **export** (no separate backup subsystem). | Phase 1: Storage | data | ✅ Done | ✅ Passed | ✅ Passed | ✅ | | Phase 2: Recorder/Retention | backend | ✅ Done | ✅ Passed | ✅ Passed | ✅ | | Phase 3: Instrumentation | backend | ✅ Done | ✅ Passed | ✅ Passed | ✅ | -| Phase 4: REST API | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | +| Phase 4: REST API | backend | ✅ Done | ✅ Passed | ✅ Passed | ✅ | | Phase 5: Frontend tab | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | | Phase 6: Dashboard/Settings | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | @@ -95,6 +95,10 @@ is an on-demand CSV/JSON **export** (no separate backup subsystem). | 3 | Unauth auth-failure audit-write flood (no write-rate bound) | 🟠 High (security) | resolved — per-IP audit-record throttle (10s, capped) | | 3 | Malformed-IPv6 Origin → urlparse ValueError into WS handler | 🟡 Warning | resolved — try/except guard | | 3 | Throttle module-global state caused flaky test contamination | 🟡 Warning | resolved — autouse conftest reset fixture | +| 4 | Export held global DB write-lock across the stream (slow-client DoS) | 🟠 High (security) | resolved — chunked keyset export releases lock per batch | +| 4 | PUT /settings only AuthRequired → anon could disable auditing/prune trail | 🟠 High (security) | resolved — `require_authenticated` on settings PUT | +| 4 | CSV formula-injection missed leading TAB/CR | 🟡 Medium (security) | resolved — added `\t`/`\r` to guard | +| 4 | `total` count full-scans on every list request | 🔵 Low (perf) | accepted — bounded by retention; read-only; optional opt-in deferred | ## Final Review diff --git a/plans/activity-log/phase-4-api.md b/plans/activity-log/phase-4-api.md index 1cd15e5..d0d53eb 100644 --- a/plans/activity-log/phase-4-api.md +++ b/plans/activity-log/phase-4-api.md @@ -1,6 +1,6 @@ # Phase 4: REST API — query / filter / export / settings / clear -**Status:** ⬜ Not Started +**Status:** ✅ Done **Parent plan:** [PLAN.md](./PLAN.md) **Domain:** backend @@ -12,13 +12,13 @@ destructive clear. Apply the project's auth posture (stricter auth on export + c ## Tasks -- [ ] `server/src/ledgrab/api/schemas/activity_log.py` (Pydantic): +- [x] `server/src/ledgrab/api/schemas/activity_log.py` (Pydantic): - `ActivityLogEntryResponse` (matches the frozen Phase 2 entry dict shape). - `ActivityLogPageResponse` { `entries: list[...]`, `next_before_seq: int | None`, `total: int` (optional/over filters), `has_more: bool` }. - `ActivityLogSettingsResponse` / `UpdateActivityLogSettingsRequest` (`enabled`, `max_days`, `max_entries`) with validation bounds. -- [ ] `server/src/ledgrab/api/routes/activity_log.py` — `APIRouter(prefix="/api/v1/activity-log")`: +- [x] `server/src/ledgrab/api/routes/activity_log.py` — `APIRouter(prefix="/api/v1/activity-log")`: - `GET ""` — list. Query params: `categories`, `severities`, `actor`, `entity_type`, `entity_id`, `since`/`until` (ISO), `q` (free-text), `before_seq` (cursor), `limit` (default 50, capped e.g. 200). `AuthRequired`. Maps params → `ActivityLogFilters`, @@ -32,7 +32,7 @@ destructive clear. Apply the project's auth posture (stricter auth on export + c audited (recorder records a `system`/`activity_log_cleared` entry AFTER the wipe, so the log shows who cleared it and when). - Register the router in `server/src/ledgrab/api/__init__.py` (aggregator). -- [ ] API tests `server/tests/api/routes/test_activity_log_api.py`: +- [x] API tests `server/tests/api/routes/test_activity_log_api.py`: - list returns entries; each filter narrows results; `before_seq` cursor paginates without overlap/gaps; `limit` cap enforced; - export CSV and JSON both stream and honor filters; export requires authentication @@ -67,13 +67,97 @@ destructive clear. Apply the project's auth posture (stricter auth on export + c ## Review Checklist -- [ ] All tasks completed -- [ ] Code follows project conventions (router registration, schema-per-entity, auth posture) -- [ ] No unintended side effects -- [ ] Build passes (ruff + pytest) -- [ ] Tests pass (new + existing) +- [x] All tasks completed +- [x] Code follows project conventions (router registration, schema-per-entity, auth posture) +- [x] No unintended side effects +- [x] Build passes (ruff + pytest) +- [x] Tests pass (new + existing) ## Handoff to Next Phase - +### Endpoint paths + +| Method | Path | Auth | +|--------|------|------| +| GET | `/api/v1/activity-log` | AuthRequired (anonymous allowed) | +| GET | `/api/v1/activity-log/export` | require_authenticated (no anonymous) | +| GET | `/api/v1/activity-log/settings` | AuthRequired | +| PUT | `/api/v1/activity-log/settings` | AuthRequired | +| DELETE | `/api/v1/activity-log` | require_authenticated (no anonymous) | + +### List query parameters (GET /api/v1/activity-log) + +| Param | Type | Default | Notes | +|-------|------|---------|-------| +| `categories` | `list[str]` | — | Repeatable. Values: auth, device, entity, capture, system | +| `severities` | `list[str]` | — | Repeatable. Values: info, warning, error | +| `actor` | `str` | — | Exact match | +| `entity_type` | `str` | — | Exact match | +| `entity_id` | `str` | — | Exact match | +| `since` | `datetime` (ISO-8601) | — | Inclusive lower bound on ts | +| `until` | `datetime` (ISO-8601) | — | Inclusive upper bound on ts | +| `q` | `str` | — | Substring match on message (LIKE %q%) | +| `before_seq` | `int` | — | Keyset cursor from previous page's `next_before_seq` | +| `limit` | `int` | 50 | Max entries per page. ge=1, le=200 | + +Export endpoint (`GET /api/v1/activity-log/export`) accepts the same filter params plus `format=csv|json`. + +### Page envelope fields (ActivityLogPageResponse) + +```json +{ + "entries": [...], // list[ActivityLogEntryResponse] + "next_before_seq": 42, // int | null — pass as before_seq for next page + "has_more": true, // bool + "total": 1337 // int — total matching all filters (all pages) +} +``` + +### Entry dict shape (ActivityLogEntryResponse) + +11 fields — identical to `entry_to_dict()` output: +```json +{ + "id": "al_abcd1234", + "ts": "2026-06-09T12:34:56.789+00:00", + "category": "entity", + "action": "entity.created", + "severity": "info", + "actor": "my-api-key", + "entity_type": "output_target", + "entity_id": "pt_abc", + "entity_name": "Desk", + "message": "Output target 'Desk' created", + "metadata": {} +} +``` + +### Settings field bounds (UpdateActivityLogSettingsRequest) + +| Field | Type | ge | le | Notes | +|-------|------|----|----|-------| +| `enabled` | `bool` | — | — | Enable/disable recording | +| `max_days` | `int` | 0 | 3650 | 0 = no age-based pruning | +| `max_entries` | `int` | 0 | 10_000_000 | 0 = no count-based pruning | + +### Export format param + +`?format=csv` (default) → `text/csv; charset=utf-8` +`?format=json` → `application/json` (streamed JSON array) + +### Pagination algorithm + +Keyset cursor (`before_seq`) works as follows: +- Omit `before_seq` (or pass `null`) to get the FIRST (newest) page. +- Each page response includes `next_before_seq` (the seq of the oldest entry on the page). +- Pass `next_before_seq` as `before_seq` in the next request to get the following (older) page. +- `has_more=false` means there are no more pages; `next_before_seq` is `null`. +- `total` is constant across pages for the same filter set. + +### New method added to ActivityLogRepository (additive, not breaking) + +`get_seq_for_id(entry_id: str) -> int | None` — indexed point-lookup of seq by entry id. +Used internally by the list endpoint to build the keyset cursor. + +Phase 4 landed (2026-06-09): schemas, route (list/export/settings/clear), router registration, +49 new tests — all green. Full suite 2486 passed, 2 skipped, 0 failed. Ruff clean. diff --git a/server/src/ledgrab/api/__init__.py b/server/src/ledgrab/api/__init__.py index b8c166e..5fb778c 100644 --- a/server/src/ledgrab/api/__init__.py +++ b/server/src/ledgrab/api/__init__.py @@ -38,6 +38,7 @@ from .routes.snapshot import router as snapshot_router from .routes.graph import router as graph_router from .routes.calibration import router as calibration_router from .routes.setup import router as setup_router +from .routes.activity_log import router as activity_log_router router = APIRouter() router.include_router(system_router) @@ -76,5 +77,6 @@ router.include_router(snapshot_router) router.include_router(graph_router) router.include_router(calibration_router) router.include_router(setup_router) +router.include_router(activity_log_router) __all__ = ["router"] diff --git a/server/src/ledgrab/api/routes/activity_log.py b/server/src/ledgrab/api/routes/activity_log.py new file mode 100644 index 0000000..d3c9773 --- /dev/null +++ b/server/src/ledgrab/api/routes/activity_log.py @@ -0,0 +1,436 @@ +"""Activity-log REST API — query / filter / export / settings / clear. + +Endpoints +--------- +GET /api/v1/activity-log List (filterable, keyset-paginated) +GET /api/v1/activity-log/export Streaming CSV or JSON export +GET /api/v1/activity-log/settings Retention settings +PUT /api/v1/activity-log/settings Update retention settings (requires non-anonymous auth) +DELETE /api/v1/activity-log Clear all entries (requires non-anonymous auth) + +Auth posture +------------ +- List + read settings (``GET``): ``AuthRequired`` (loopback-anonymous is fine). +- Export, update settings (``PUT``), and clear: ``require_authenticated()`` + (loopback-anonymous is rejected; mirrors the backup download / secret-reveal + pattern from ``backup.py``). Updating settings can disable auditing or prune + the trail, so it is gated like the destructive clear. + +CSV injection +------------- +Cells that begin with =, +, -, @, TAB, or CR can trigger formula execution in +spreadsheet apps (OWASP Formula Injection). ``_csv_safe`` prefixes any such cell +with a single quote so formulas are inert. Fields already go through +``sanitize_display`` in Phase 3 instrumentation, but the CSV writer applies its +own guard as defence-in-depth. + +Export generator + lock +----------------------- +``repo.iter_export()`` fetches rows in bounded batches, holding the DB ``_lock`` +only around each batch fetch and releasing it before yielding — so a slow or +stalled client never blocks other DB operations. The ``StreamingResponse`` +generator is wrapped in a ``try/finally`` block so the batch generator is closed +even when the client disconnects mid-stream. +""" + +from __future__ import annotations + +import csv +import io +import json +from datetime import datetime, timezone +from typing import Annotated, Iterator + +from fastapi import APIRouter, Depends, Query +from fastapi.responses import StreamingResponse + +from ledgrab.api.auth import AuthRequired, require_authenticated +from ledgrab.api.dependencies import ( + get_activity_log_repo, + get_activity_log_retention_engine, + get_activity_recorder, +) +from ledgrab.api.schemas.activity_log import ( + ActivityLogPageResponse, + ActivityLogSettingsResponse, + UpdateActivityLogSettingsRequest, +) +from ledgrab.core.activity_log.recorder import ActivityRecorder, entry_to_dict +from ledgrab.core.activity_log.retention import ActivityLogRetentionEngine +from ledgrab.storage.activity_log import ActivityCategory, ActivityLogFilters, ActivitySeverity +from ledgrab.storage.activity_log_repository import ActivityLogRepository + +router = APIRouter(prefix="/api/v1/activity-log", tags=["Activity Log"]) + +# Hard cap on the per-request limit to prevent runaway queries. +_MAX_LIMIT = 200 +_DEFAULT_LIMIT = 50 + +# CSV export columns (matches entry_to_dict key order) +_CSV_COLUMNS = [ + "id", + "ts", + "category", + "action", + "severity", + "actor", + "entity_type", + "entity_id", + "entity_name", + "message", + "metadata", +] + +# Characters that trigger formula injection in spreadsheet apps (OWASP). +# Leading TAB and CR are also recognised triggers by Excel / Google Sheets. +_FORMULA_PREFIXES = ("=", "+", "-", "@", "\t", "\r") + + +def _csv_safe(value: str) -> str: + """Prefix formula-injection triggers with a literal single-quote. + + A cell starting with =, +, -, or @ can execute as a formula in Excel / + Google Sheets. OWASP recommends prepending a single quote to neutralise it. + """ + if value and value[0] in _FORMULA_PREFIXES: + return "'" + value + return value + + +def _build_filters( + categories: list[str] | None, + severities: list[str] | None, + actor: str | None, + entity_type: str | None, + entity_id: str | None, + since: datetime | None, + until: datetime | None, + q: str | None, +) -> ActivityLogFilters: + """Assemble an ``ActivityLogFilters`` dataclass from query parameters.""" + return ActivityLogFilters( + categories=categories or None, + severities=severities or None, + actor=actor or None, + entity_type=entity_type or None, + entity_id=entity_id or None, + since=since, + until=until, + message_like=q or None, + ) + + +# --------------------------------------------------------------------------- +# GET /api/v1/activity-log — list +# --------------------------------------------------------------------------- + + +@router.get("", response_model=ActivityLogPageResponse, summary="List activity-log entries") +def list_activity_log( + auth: AuthRequired, # noqa: ARG001 + repo: ActivityLogRepository = Depends(get_activity_log_repo), + # ── Filters ──────────────────────────────────────────────────────────── + categories: Annotated[ + list[str] | None, + Query( + description=( + "Filter by category (repeatable or comma-separated). " + "Values: auth, device, entity, capture, system" + ) + ), + ] = None, + severities: Annotated[ + list[str] | None, + Query(description="Filter by severity (repeatable). Values: info, warning, error"), + ] = None, + actor: Annotated[ + str | None, + Query(description="Filter by actor label (exact match)"), + ] = None, + entity_type: Annotated[ + str | None, + Query(description="Filter by entity type (exact match)"), + ] = None, + entity_id: Annotated[ + str | None, + Query(description="Filter by entity id (exact match)"), + ] = None, + since: Annotated[ + datetime | None, + Query(description="Return entries at or after this ISO-8601 datetime"), + ] = None, + until: Annotated[ + datetime | None, + Query(description="Return entries at or before this ISO-8601 datetime"), + ] = None, + q: Annotated[ + str | None, + Query(description="Free-text search in the message field (substring)"), + ] = None, + # ── Pagination ───────────────────────────────────────────────────────── + before_seq: Annotated[ + int | None, + Query( + description=( + "Keyset cursor: pass the 'next_before_seq' from the previous page " + "to get the following (older) page. Omit for the first (newest) page." + ) + ), + ] = None, + limit: Annotated[ + int, + Query( + ge=1, + le=_MAX_LIMIT, + description=f"Max entries per page (default {_DEFAULT_LIMIT}, max {_MAX_LIMIT})", + ), + ] = _DEFAULT_LIMIT, +) -> ActivityLogPageResponse: + """Return the newest matching entries, oldest-first within the page. + + Keyset pagination: the response includes ``next_before_seq`` — pass it + as ``before_seq`` in the next request to get the next (older) page. + The ``total`` field is the count of all entries matching the current + filters across all pages. + """ + filters = _build_filters(categories, severities, actor, entity_type, entity_id, since, until, q) + + # Fetch limit+1 rows to detect whether an older page exists. + # + # query() fetches DESC internally (newest-first) then reverses to ascending. + # With limit+1, the result is ascending: [oldest_probe, ..., newest]. + # When we got exactly limit+1 rows, has_more is True and the probe row + # (index 0 — the oldest) is the extra one. We keep the newest `limit` rows + # by slicing [1:], which is the actual page content for the client. + # When we got <= limit rows, this is the last page and all rows are included. + effective_limit = min(limit, _MAX_LIMIT) + entries_plus = repo.query(filters, before_seq=before_seq, limit=effective_limit + 1) + has_more = len(entries_plus) > effective_limit + if has_more: + # Drop the oldest probe row; keep the newest `limit` entries. + entries = entries_plus[1:] + else: + entries = entries_plus + + total = repo.count(filters) + + # Compute next_before_seq: the seq of the oldest entry on this page. + # query() returns entries ascending (entries[0] is oldest); its seq is the + # cursor for the next page. The next request passes before_seq=X to get + # entries with seq < X, i.e. entries older than the oldest entry on this page. + # get_seq_for_id() does a cheap indexed point-lookup. + next_before_seq: int | None = None + if has_more and entries: + next_before_seq = repo.get_seq_for_id(entries[0].id) + + return ActivityLogPageResponse( + entries=[entry_to_dict(e) for e in entries], # type: ignore[arg-type] + next_before_seq=next_before_seq, + has_more=has_more, + total=total, + ) + + +# --------------------------------------------------------------------------- +# GET /api/v1/activity-log/export — streaming export (CSV or JSON) +# --------------------------------------------------------------------------- + + +def _export_csv_generator( + repo: ActivityLogRepository, + filters: ActivityLogFilters, +) -> Iterator[bytes]: + """Yield UTF-8-encoded CSV chunks one row at a time. + + The generator wraps ``repo.iter_export()`` in a ``try/finally`` so the DB + lock is released even on early client disconnect (which triggers + ``GeneratorExit``). + """ + gen = repo.iter_export(filters) + try: + # Header + buf = io.StringIO() + writer = csv.writer(buf) + writer.writerow(_CSV_COLUMNS) + yield buf.getvalue().encode("utf-8") + + for entry in gen: + d = entry_to_dict(entry) + row = [] + for col in _CSV_COLUMNS: + if col == "metadata": + cell = json.dumps(d.get(col) or {}) + else: + cell = str(d.get(col, "") or "") + row.append(_csv_safe(cell)) + buf = io.StringIO() + writer = csv.writer(buf) + writer.writerow(row) + yield buf.getvalue().encode("utf-8") + finally: + gen.close() + + +def _export_json_generator( + repo: ActivityLogRepository, + filters: ActivityLogFilters, +) -> Iterator[bytes]: + """Yield a streamed JSON array, one entry per chunk. + + Format: ``[\\n{entry},\\n{entry},\\n...]\\n`` + The generator wraps ``repo.iter_export()`` in a ``try/finally`` so the DB + lock is released even on early client disconnect. + """ + gen = repo.iter_export(filters) + try: + first = True + yield b"[\n" + for entry in gen: + d = entry_to_dict(entry) + chunk = json.dumps(d, ensure_ascii=False, default=str) + if first: + yield chunk.encode("utf-8") + first = False + else: + yield b",\n" + chunk.encode("utf-8") + yield b"\n]\n" + finally: + gen.close() + + +@router.get("/export", summary="Export activity-log entries (streaming CSV or JSON)") +def export_activity_log( + auth: AuthRequired, + repo: ActivityLogRepository = Depends(get_activity_log_repo), + # ── Format ──────────────────────────────────────────────────────────── + format: Annotated[ + str, + Query(description="Export format: 'csv' or 'json'"), + ] = "csv", + # ── Same filters as list ─────────────────────────────────────────────── + categories: Annotated[list[str] | None, Query()] = None, + severities: Annotated[list[str] | None, Query()] = None, + actor: Annotated[str | None, Query()] = None, + entity_type: Annotated[str | None, Query()] = None, + entity_id: Annotated[str | None, Query()] = None, + since: Annotated[datetime | None, Query()] = None, + until: Annotated[datetime | None, Query()] = None, + q: Annotated[str | None, Query()] = None, +) -> StreamingResponse: + """Stream all matching entries as CSV or JSON. + + Requires a non-anonymous API key (loopback-anonymous access is rejected + because the log may contain IP addresses and entity names). + """ + require_authenticated(auth) + + if format not in ("csv", "json"): + from fastapi import HTTPException + + raise HTTPException( + status_code=422, + detail="'format' must be 'csv' or 'json'", + ) + + filters = _build_filters(categories, severities, actor, entity_type, entity_id, since, until, q) + + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H%M%S") + + if format == "csv": + filename = f"activity-log-{timestamp}.csv" + media_type = "text/csv; charset=utf-8" + generator = _export_csv_generator(repo, filters) + else: + filename = f"activity-log-{timestamp}.json" + media_type = "application/json" + generator = _export_json_generator(repo, filters) + + return StreamingResponse( + generator, + media_type=media_type, + headers={"Content-Disposition": f'attachment; filename="{filename}"'}, + ) + + +# --------------------------------------------------------------------------- +# GET /api/v1/activity-log/settings +# PUT /api/v1/activity-log/settings +# --------------------------------------------------------------------------- + + +@router.get( + "/settings", + response_model=ActivityLogSettingsResponse, + summary="Get activity-log retention settings", +) +def get_activity_log_settings( + _: AuthRequired, + engine: ActivityLogRetentionEngine = Depends(get_activity_log_retention_engine), +) -> ActivityLogSettingsResponse: + """Return the current activity-log retention settings.""" + return ActivityLogSettingsResponse(**engine.get_settings()) + + +@router.put( + "/settings", + response_model=ActivityLogSettingsResponse, + summary="Update activity-log retention settings", +) +async def update_activity_log_settings( + auth: AuthRequired, + body: UpdateActivityLogSettingsRequest, + engine: ActivityLogRetentionEngine = Depends(get_activity_log_retention_engine), +) -> ActivityLogSettingsResponse: + """Update the activity-log retention settings (applied immediately). + + Requires a non-anonymous API key (loopback-anonymous access is rejected) + because disabling the log or pruning retention is equivalent in impact to + clearing the audit trail. + + Setting ``enabled=false`` records an audit entry BEFORE the flag takes + effect so the last entry in the log shows who disabled recording. + """ + require_authenticated(auth) + result = await engine.update_settings( + enabled=body.enabled, + max_days=body.max_days, + max_entries=body.max_entries, + ) + return ActivityLogSettingsResponse(**result) + + +# --------------------------------------------------------------------------- +# DELETE /api/v1/activity-log — clear +# --------------------------------------------------------------------------- + + +@router.delete("", summary="Clear all activity-log entries") +def clear_activity_log( + auth: AuthRequired, + repo: ActivityLogRepository = Depends(get_activity_log_repo), + recorder: ActivityRecorder = Depends(get_activity_recorder), +) -> dict: + """Delete all activity-log entries. + + Requires a non-anonymous API key (loopback-anonymous access is rejected). + The clear operation itself is audited — a ``system/activity_log_cleared`` + entry is recorded AFTER the wipe, so the log shows who cleared it and how + many rows were removed. + + Returns ``{"deleted": }``. + """ + require_authenticated(auth) + + deleted = repo.clear() + + # Record the clear action (best-effort — recorder never raises). + recorder.record( + category=ActivityCategory.SYSTEM, + action="activity_log.cleared", + severity=ActivitySeverity.INFO, + actor=auth, + message=f"Activity log cleared ({deleted} entries removed)", + metadata={"deleted_count": deleted}, + ) + + return {"deleted": deleted} diff --git a/server/src/ledgrab/api/schemas/activity_log.py b/server/src/ledgrab/api/schemas/activity_log.py new file mode 100644 index 0000000..9bc7991 --- /dev/null +++ b/server/src/ledgrab/api/schemas/activity_log.py @@ -0,0 +1,93 @@ +"""Pydantic schemas for the activity-log API (Phase 4).""" + +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel, Field + + +# --------------------------------------------------------------------------- +# Entry + page response +# --------------------------------------------------------------------------- + + +class ActivityLogEntryResponse(BaseModel): + """Single audit-log entry. + + Shape matches ``entry_to_dict()`` from + ``ledgrab.core.activity_log.recorder`` exactly — that function is the + single source of truth for serialisation; this schema documents the wire + format. + """ + + id: str = Field(description="Entry id — 'al_<8-hex>'") + ts: str = Field(description="ISO-8601 UTC timestamp") + category: str = Field(description="Broad bucket (auth, device, entity, capture, system)") + action: str = Field(description="Verb-object label, e.g. 'entity.created'") + severity: str = Field(description="info | warning | error") + actor: str = Field(description="API-key label or 'system' / 'anonymous'") + entity_type: str | None = Field(default=None, description="Affected entity type, if applicable") + entity_id: str | None = Field(default=None, description="Affected entity id, if applicable") + entity_name: str | None = Field( + default=None, description="Entity name at time of event, if applicable" + ) + message: str = Field(description="Human-readable description") + metadata: dict[str, Any] = Field(default_factory=dict, description="Extra structured context") + + +class ActivityLogPageResponse(BaseModel): + """Paginated list of audit-log entries (keyset cursor).""" + + entries: list[ActivityLogEntryResponse] = Field(description="Entries on this page") + next_before_seq: int | None = Field( + default=None, + description=( + "Pass as 'before_seq' in the next request to get the following page. " + "None when this is the last page." + ), + ) + has_more: bool = Field( + description="True when there are more entries before the first entry on this page" + ) + total: int = Field(description="Total entries matching the current filters (all pages)") + + +# --------------------------------------------------------------------------- +# Settings +# --------------------------------------------------------------------------- + +_MAX_DAYS_CAP = 3650 # 10 years — sanity upper bound +_MAX_ENTRIES_CAP = 10_000_000 # 10 M rows — sanity upper bound + + +class ActivityLogSettingsResponse(BaseModel): + """Current activity-log retention settings.""" + + enabled: bool = Field(description="Whether the activity log is recording") + max_days: int = Field( + ge=0, + le=_MAX_DAYS_CAP, + description="Retain entries for at most this many days (0 = no age-based pruning)", + ) + max_entries: int = Field( + ge=0, + le=_MAX_ENTRIES_CAP, + description="Keep at most this many entries (0 = no count-based pruning)", + ) + + +class UpdateActivityLogSettingsRequest(BaseModel): + """Request body for PUT /settings.""" + + enabled: bool = Field(description="Enable or disable activity-log recording") + max_days: int = Field( + ge=0, + le=_MAX_DAYS_CAP, + description="Retain entries for at most this many days (0 = no age-based pruning)", + ) + max_entries: int = Field( + ge=0, + le=_MAX_ENTRIES_CAP, + description="Keep at most this many entries (0 = no count-based pruning)", + ) diff --git a/server/src/ledgrab/storage/activity_log_repository.py b/server/src/ledgrab/storage/activity_log_repository.py index b54df49..b1df695 100644 --- a/server/src/ledgrab/storage/activity_log_repository.py +++ b/server/src/ledgrab/storage/activity_log_repository.py @@ -20,7 +20,6 @@ Design notes from __future__ import annotations -import sqlite3 from datetime import datetime from typing import Iterator @@ -260,34 +259,80 @@ class ActivityLogRepository: cursor = self._db.execute(f"DELETE FROM {_TABLE}") return cursor.rowcount + def get_seq_for_id(self, entry_id: str) -> int | None: + """Return the ``seq`` value for the entry with *entry_id*, or ``None``. + + Used by the API list endpoint to compute the keyset cursor + (``next_before_seq``) from the oldest entry on the current page. + """ + cursor = self._db.execute( + f"SELECT seq FROM {_TABLE} WHERE id = ?", + (entry_id,), + ) + row = cursor.fetchone() + return int(row["seq"]) if row is not None else None + # -- Export -------------------------------------------------------------- - def iter_export(self, filters: ActivityLogFilters | None = None) -> Iterator[ActivityLogEntry]: + def iter_export( + self, + filters: ActivityLogFilters | None = None, + *, + batch_size: int = 1000, + ) -> Iterator[ActivityLogEntry]: """Yield all matching entries in ascending ``seq`` order. - Uses a server-side cursor so the entire result set is never loaded - into memory — safe for large tables. The connection's ``RLock`` is - held for the duration of the iteration; callers should consume this - iterator promptly. + Fetches rows in bounded batches (keyset-paginated by ``seq``), holding + the DB lock only for the duration of each ``fetchall()`` and releasing + it before yielding. This prevents a slow/stalled export client from + blocking all other DB operations (record, config writes, etc.) for the + full duration of the stream. + + Memory usage is bounded to ``batch_size`` rows at a time. """ if filters is None: filters = ActivityLogFilters() - params: list = [] - where_fragment = _build_filter_clause(filters, params) - where_clause = f"WHERE {where_fragment}" if where_fragment else "" + # Keyset cursor: largest seq yielded so far; None means "start from the + # very beginning". We iterate ascending (seq ASC), so each batch uses + # "seq > ?" to advance past the already-yielded rows. + cursor_seq: int | None = None - sql = ( - f"SELECT seq, id, ts, category, action, severity, actor, " - f"entity_type, entity_id, entity_name, message, metadata " - f"FROM {_TABLE} " - f"{where_clause} " - f"ORDER BY seq ASC" - ) + while True: + # Build params list: cursor_seq placeholder must come first because + # _build_filter_clause prepends extra_where as the first condition. + params: list = [] + if cursor_seq is not None: + params.append(cursor_seq) + keyset: str | None = "seq > ?" + else: + keyset = None + where_fragment = _build_filter_clause(filters, params, extra_where=keyset) + where_clause = f"WHERE {where_fragment}" if where_fragment else "" + params.append(batch_size) - # Use the raw connection directly to get a streaming cursor. - # We borrow the lock for the full iteration. - with self._db._lock: # noqa: SLF001 — internal access; no public cursor API - cursor: sqlite3.Cursor = self._db._conn.execute(sql, tuple(params)) # noqa: SLF001 - for row in cursor: + sql = ( + f"SELECT seq, id, ts, category, action, severity, actor, " + f"entity_type, entity_id, entity_name, message, metadata " + f"FROM {_TABLE} " + f"{where_clause} " + f"ORDER BY seq ASC " + f"LIMIT ?" + ) + + # Hold the lock only for the bounded fetchall; release before yielding. + with self._db._lock: # noqa: SLF001 — internal access; no public cursor API + rows = self._db._conn.execute(sql, tuple(params)).fetchall() # noqa: SLF001 + + if not rows: + break + + for row in rows: yield ActivityLogEntry.from_row(dict(row)) + + # The last row has the largest seq in this batch (ORDER BY seq ASC). + cursor_seq = rows[-1]["seq"] + + if len(rows) < batch_size: + # Fewer rows than requested → this was the final batch. + break diff --git a/server/tests/api/routes/test_activity_log_api.py b/server/tests/api/routes/test_activity_log_api.py new file mode 100644 index 0000000..14924f0 --- /dev/null +++ b/server/tests/api/routes/test_activity_log_api.py @@ -0,0 +1,733 @@ +"""Tests for the activity-log REST API (Phase 4). + +Coverage +-------- +- list returns entries; each filter dimension narrows results +- before_seq cursor paginates with no overlap/gaps +- limit hard cap enforced (request > 200 → 422; limit+1 trick detects has_more) +- export CSV + JSON both stream and honour filters +- export requires authentication (401 for anonymous) +- settings get/update round-trip + out-of-range values rejected (422) +- clear empties the log, requires non-anonymous auth, and leaves exactly one + post-clear ``activity_log.cleared`` audit entry +""" + +from __future__ import annotations + +import csv +import io +import json +from datetime import datetime, timezone +from unittest.mock import MagicMock + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from ledgrab.api import dependencies as deps +from ledgrab.api.auth import verify_api_key +from ledgrab.api.routes.activity_log import router +from ledgrab.storage.activity_log import ActivityCategory, ActivityLogEntry, ActivitySeverity +from ledgrab.storage.activity_log_repository import ActivityLogRepository + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def tmp_db(tmp_path): + from ledgrab.storage.database import Database + + db = Database(tmp_path / "test_activity_log.db") + yield db + db.close() + + +@pytest.fixture +def repo(tmp_db) -> ActivityLogRepository: + """A real ActivityLogRepository backed by a temp SQLite DB.""" + return ActivityLogRepository(tmp_db) + + +@pytest.fixture +def fake_recorder(): + """A minimal recorder stand-in that captures record() calls.""" + + class FakeRecorder: + def __init__(self): + self.calls: list[dict] = [] + self.enabled = True + + def record( + self, + category, + action, + *, + severity="info", + actor=None, + entity_type=None, + entity_id=None, + entity_name=None, + message, + metadata=None, + _bypass_enabled=False, + ): + self.calls.append( + { + "category": category, + "action": action, + "severity": severity, + "actor": actor, + "message": message, + "metadata": metadata or {}, + } + ) + + return FakeRecorder() + + +@pytest.fixture +def fake_retention_engine(): + """A minimal retention engine stand-in.""" + + class FakeRetentionEngine: + def __init__(self): + self._settings = {"enabled": True, "max_days": 90, "max_entries": 20000} + + def get_settings(self): + return dict(self._settings) + + async def update_settings(self, *, enabled, max_days, max_entries): + self._settings = {"enabled": enabled, "max_days": max_days, "max_entries": max_entries} + return dict(self._settings) + + return FakeRetentionEngine() + + +def _make_app(repo, recorder=None, retention_engine=None, auth_label="test-user"): + """Build a minimal FastAPI app with the activity-log router wired up.""" + app = FastAPI() + app.include_router(router) + + # Override auth + app.dependency_overrides[verify_api_key] = lambda: auth_label + + # Override DI getters + app.dependency_overrides[deps.get_activity_log_repo] = lambda: repo + if recorder is not None: + app.dependency_overrides[deps.get_activity_recorder] = lambda: recorder + if retention_engine is not None: + app.dependency_overrides[deps.get_activity_log_retention_engine] = lambda: retention_engine + + return app + + +def _make_client(repo, recorder=None, retention_engine=None, auth_label="test-user"): + app = _make_app(repo, recorder, retention_engine, auth_label=auth_label) + return TestClient(app, raise_server_exceptions=False) + + +def _make_entry( + *, + id: str | None = None, + category: str = ActivityCategory.SYSTEM, + action: str = "test.action", + severity: str = ActivitySeverity.INFO, + actor: str = "test-actor", + message: str = "test message", + entity_type: str | None = None, + entity_id: str | None = None, + entity_name: str | None = None, + metadata: dict | None = None, + ts: datetime | None = None, +) -> ActivityLogEntry: + """Build a test ActivityLogEntry with sensible defaults.""" + import uuid + + return ActivityLogEntry( + id=id or ("al_" + uuid.uuid4().hex[:8]), + ts=ts or datetime.now(timezone.utc), + category=category, + action=action, + severity=severity, + actor=actor, + message=message, + entity_type=entity_type, + entity_id=entity_id, + entity_name=entity_name, + metadata=metadata or {}, + ) + + +# --------------------------------------------------------------------------- +# List endpoint +# --------------------------------------------------------------------------- + + +class TestList: + def test_empty_log_returns_empty_page(self, repo): + client = _make_client(repo) + resp = client.get("/api/v1/activity-log") + assert resp.status_code == 200 + data = resp.json() + assert data["entries"] == [] + assert data["total"] == 0 + assert data["has_more"] is False + assert data["next_before_seq"] is None + + def test_returns_entries(self, repo): + for i in range(3): + repo.record(_make_entry(message=f"entry {i}")) + client = _make_client(repo) + resp = client.get("/api/v1/activity-log") + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 3 + assert len(data["entries"]) == 3 + + def test_requires_auth(self, repo): + """Without auth the endpoint returns 401.""" + app = FastAPI() + app.include_router(router) + app.dependency_overrides[deps.get_activity_log_repo] = lambda: repo + # Do NOT override verify_api_key — let it run naturally. + # TestClient uses loopback by default, so no-keys config allows anonymous. + # We can't easily test the real 401 path without keys configured. + # Instead just verify the endpoint works with auth override. + client = TestClient(app, raise_server_exceptions=False) + # With loopback and no keys configured this is actually 200; the key + # test is that when we inject "anonymous" + require_authenticated fails. + resp = client.get("/api/v1/activity-log") + # Should not be 500 + assert resp.status_code in (200, 401) + + def test_filter_by_category(self, repo): + repo.record(_make_entry(category=ActivityCategory.AUTH, action="auth.rejected")) + repo.record(_make_entry(category=ActivityCategory.ENTITY, action="entity.created")) + repo.record(_make_entry(category=ActivityCategory.SYSTEM, action="system.event")) + + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?categories=auth") + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 1 + assert data["entries"][0]["category"] == "auth" + + def test_filter_by_multiple_categories(self, repo): + repo.record(_make_entry(category=ActivityCategory.AUTH)) + repo.record(_make_entry(category=ActivityCategory.ENTITY)) + repo.record(_make_entry(category=ActivityCategory.SYSTEM)) + + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?categories=auth&categories=entity") + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 2 + + def test_filter_by_severity(self, repo): + repo.record(_make_entry(severity=ActivitySeverity.INFO)) + repo.record(_make_entry(severity=ActivitySeverity.WARNING)) + repo.record(_make_entry(severity=ActivitySeverity.ERROR)) + + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?severities=warning") + assert resp.status_code == 200 + assert resp.json()["total"] == 1 + assert resp.json()["entries"][0]["severity"] == "warning" + + def test_filter_by_actor(self, repo): + repo.record(_make_entry(actor="alice")) + repo.record(_make_entry(actor="bob")) + repo.record(_make_entry(actor="alice")) + + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?actor=alice") + assert resp.status_code == 200 + assert resp.json()["total"] == 2 + + def test_filter_by_entity_type(self, repo): + repo.record(_make_entry(entity_type="device", entity_id="d1")) + repo.record(_make_entry(entity_type="output_target", entity_id="ot1")) + repo.record(_make_entry(entity_type="device", entity_id="d2")) + + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?entity_type=device") + assert resp.status_code == 200 + assert resp.json()["total"] == 2 + + def test_filter_by_entity_id(self, repo): + repo.record(_make_entry(entity_type="device", entity_id="d1")) + repo.record(_make_entry(entity_type="device", entity_id="d2")) + + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?entity_id=d1") + assert resp.status_code == 200 + assert resp.json()["total"] == 1 + assert resp.json()["entries"][0]["entity_id"] == "d1" + + def test_filter_by_since_until(self, repo): + from datetime import timedelta + + base = datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc) + repo.record(_make_entry(ts=base - timedelta(days=2), message="old")) + repo.record(_make_entry(ts=base, message="now")) + repo.record(_make_entry(ts=base + timedelta(days=2), message="future")) + + client = _make_client(repo) + resp = client.get( + "/api/v1/activity-log" "?since=2024-01-14T12:00:00Z&until=2024-01-16T12:00:00Z" + ) + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 1 + assert data["entries"][0]["message"] == "now" + + def test_filter_by_free_text(self, repo): + repo.record(_make_entry(message="Device connected successfully")) + repo.record(_make_entry(message="Auth failed for user bob")) + repo.record(_make_entry(message="Device disconnected")) + + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?q=Device") + assert resp.status_code == 200 + assert resp.json()["total"] == 2 + + def test_free_text_like_special_chars_escaped(self, repo): + """LIKE special chars in q must be escaped (not used as wildcards).""" + repo.record(_make_entry(message="100% complete")) + repo.record(_make_entry(message="other entry")) + + client = _make_client(repo) + # A literal % should match the literal % in the message, not all entries + resp = client.get("/api/v1/activity-log?q=100%25") + assert resp.status_code == 200 + # Should find exactly the entry containing literal "100%" + total = resp.json()["total"] + # "100%" matches "100% complete"; "%" as a LIKE wildcard would match everything + assert total == 1 + + def test_limit_default_50(self, repo): + for i in range(60): + repo.record(_make_entry(message=f"entry {i}")) + client = _make_client(repo) + resp = client.get("/api/v1/activity-log") + assert resp.status_code == 200 + data = resp.json() + assert len(data["entries"]) == 50 + assert data["has_more"] is True + + def test_limit_custom(self, repo): + for i in range(10): + repo.record(_make_entry(message=f"entry {i}")) + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?limit=3") + assert resp.status_code == 200 + data = resp.json() + assert len(data["entries"]) == 3 + assert data["has_more"] is True + + def test_limit_hard_cap_rejected(self, repo): + """limit > 200 should be rejected with 422.""" + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?limit=201") + assert resp.status_code == 422 + + def test_limit_zero_rejected(self, repo): + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?limit=0") + assert resp.status_code == 422 + + def test_pagination_no_overlap_no_gaps(self, repo): + """Keyset cursor returns exactly all entries with no overlap or gaps.""" + # Insert 15 entries; page with limit=5 → 3 pages + for i in range(15): + repo.record(_make_entry(message=f"entry {i:02d}")) + + client = _make_client(repo) + all_ids: list[str] = [] + before_seq = None + + for _ in range(4): # up to 4 pages; should need 3 + url = "/api/v1/activity-log?limit=5" + if before_seq is not None: + url += f"&before_seq={before_seq}" + resp = client.get(url) + assert resp.status_code == 200 + data = resp.json() + page_ids = [e["id"] for e in data["entries"]] + # No overlap with previously seen ids + assert not any( + pid in all_ids for pid in page_ids + ), f"Overlap detected: page_ids={page_ids}, all_ids={all_ids}" + all_ids.extend(page_ids) + if not data["has_more"]: + break + before_seq = data["next_before_seq"] + + assert len(all_ids) == 15, f"Expected 15 unique entries, got {len(all_ids)}" + + def test_pagination_next_before_seq_when_no_more(self, repo): + """When has_more is False, next_before_seq is None.""" + repo.record(_make_entry()) + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?limit=5") + data = resp.json() + assert data["has_more"] is False + assert data["next_before_seq"] is None + + def test_pagination_total_is_constant_across_pages(self, repo): + """total reflects all matching entries, not just the current page.""" + for i in range(7): + repo.record(_make_entry(message=f"entry {i}")) + + client = _make_client(repo) + resp1 = client.get("/api/v1/activity-log?limit=3") + data1 = resp1.json() + assert data1["total"] == 7 + assert data1["has_more"] is True + + before_seq = data1["next_before_seq"] + resp2 = client.get(f"/api/v1/activity-log?limit=3&before_seq={before_seq}") + data2 = resp2.json() + assert data2["total"] == 7 # unchanged + + +# --------------------------------------------------------------------------- +# Export endpoint +# --------------------------------------------------------------------------- + + +class TestExport: + def test_export_requires_auth_anonymous_rejected(self, repo): + """Export endpoint requires a non-anonymous key; anonymous is rejected.""" + client = _make_client(repo, auth_label="anonymous") + resp = client.get("/api/v1/activity-log/export") + assert resp.status_code == 401 + + def test_export_csv_returns_200(self, repo, fake_recorder): + repo.record(_make_entry(message="test entry")) + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=csv") + assert resp.status_code == 200 + assert "text/csv" in resp.headers["content-type"] + + def test_export_csv_has_header_and_rows(self, repo, fake_recorder): + repo.record(_make_entry(message="hello export")) + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=csv") + assert resp.status_code == 200 + text = resp.text + reader = csv.DictReader(io.StringIO(text)) + rows = list(reader) + assert len(rows) == 1 + assert rows[0]["message"] == "hello export" + + def test_export_csv_has_content_disposition(self, repo, fake_recorder): + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=csv") + assert resp.status_code == 200 + cd = resp.headers.get("content-disposition", "") + assert "attachment" in cd + assert "activity-log-" in cd + assert ".csv" in cd + + def test_export_csv_honours_filters(self, repo, fake_recorder): + repo.record(_make_entry(category=ActivityCategory.AUTH, message="auth event")) + repo.record(_make_entry(category=ActivityCategory.ENTITY, message="entity event")) + + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=csv&categories=auth") + assert resp.status_code == 200 + reader = csv.DictReader(io.StringIO(resp.text)) + rows = list(reader) + assert len(rows) == 1 + assert rows[0]["category"] == "auth" + + def test_export_json_returns_200(self, repo, fake_recorder): + repo.record(_make_entry(message="json entry")) + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=json") + assert resp.status_code == 200 + assert "application/json" in resp.headers["content-type"] + + def test_export_json_is_valid_array(self, repo, fake_recorder): + for i in range(3): + repo.record(_make_entry(message=f"entry {i}")) + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=json") + assert resp.status_code == 200 + data = json.loads(resp.text) + assert isinstance(data, list) + assert len(data) == 3 + + def test_export_json_honours_filters(self, repo, fake_recorder): + repo.record(_make_entry(severity=ActivitySeverity.WARNING, message="warn")) + repo.record(_make_entry(severity=ActivitySeverity.INFO, message="info")) + + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=json&severities=warning") + assert resp.status_code == 200 + data = json.loads(resp.text) + assert len(data) == 1 + assert data[0]["severity"] == "warning" + + def test_export_empty_log_csv(self, repo, fake_recorder): + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=csv") + assert resp.status_code == 200 + # Only header line + reader = csv.DictReader(io.StringIO(resp.text)) + rows = list(reader) + assert rows == [] + + def test_export_empty_log_json(self, repo, fake_recorder): + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=json") + assert resp.status_code == 200 + data = json.loads(resp.text) + assert data == [] + + def test_export_invalid_format_rejected(self, repo, fake_recorder): + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=xml") + assert resp.status_code == 422 + + def test_export_csv_injection_guard(self, repo, fake_recorder): + """Cells starting with formula-injection triggers are prefixed with '.""" + for msg in ["=SUM(A1)", "+evil", "-bad", "@test"]: + repo.record(_make_entry(message=msg)) + + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=csv") + assert resp.status_code == 200 + reader = csv.DictReader(io.StringIO(resp.text)) + rows = list(reader) + for row in rows: + msg = row["message"] + # After guarding, the message should start with ' (not = + - @) + assert not msg.startswith(("=", "+", "-", "@")), f"CSV injection not guarded: {msg!r}" + + def test_export_has_all_csv_columns(self, repo, fake_recorder): + repo.record( + _make_entry( + entity_type="device", + entity_id="d1", + entity_name="Test Device", + metadata={"key": "value"}, + ) + ) + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=csv") + assert resp.status_code == 200 + reader = csv.DictReader(io.StringIO(resp.text)) + fieldnames = reader.fieldnames or [] + expected = [ + "id", + "ts", + "category", + "action", + "severity", + "actor", + "entity_type", + "entity_id", + "entity_name", + "message", + "metadata", + ] + for col in expected: + assert col in fieldnames, f"Missing column: {col}" + + +# --------------------------------------------------------------------------- +# Settings endpoints +# --------------------------------------------------------------------------- + + +class TestSettings: + def test_get_settings_returns_defaults(self, repo, fake_retention_engine): + client = _make_client(repo, retention_engine=fake_retention_engine) + resp = client.get("/api/v1/activity-log/settings") + assert resp.status_code == 200 + data = resp.json() + assert data["enabled"] is True + assert data["max_days"] == 90 + assert data["max_entries"] == 20000 + + def test_put_settings_round_trip(self, repo, fake_retention_engine): + client = _make_client(repo, retention_engine=fake_retention_engine) + body = {"enabled": False, "max_days": 30, "max_entries": 5000} + resp = client.put("/api/v1/activity-log/settings", json=body) + assert resp.status_code == 200 + data = resp.json() + assert data["enabled"] is False + assert data["max_days"] == 30 + assert data["max_entries"] == 5000 + + def test_put_settings_get_reflects_update(self, repo, fake_retention_engine): + client = _make_client(repo, retention_engine=fake_retention_engine) + client.put( + "/api/v1/activity-log/settings", + json={"enabled": True, "max_days": 7, "max_entries": 100}, + ) + resp = client.get("/api/v1/activity-log/settings") + data = resp.json() + assert data["max_days"] == 7 + assert data["max_entries"] == 100 + + def test_put_settings_negative_max_days_rejected(self, repo, fake_retention_engine): + client = _make_client(repo, retention_engine=fake_retention_engine) + resp = client.put( + "/api/v1/activity-log/settings", + json={"enabled": True, "max_days": -1, "max_entries": 100}, + ) + assert resp.status_code == 422 + + def test_put_settings_negative_max_entries_rejected(self, repo, fake_retention_engine): + client = _make_client(repo, retention_engine=fake_retention_engine) + resp = client.put( + "/api/v1/activity-log/settings", + json={"enabled": True, "max_days": 30, "max_entries": -1}, + ) + assert resp.status_code == 422 + + def test_put_settings_max_days_over_cap_rejected(self, repo, fake_retention_engine): + client = _make_client(repo, retention_engine=fake_retention_engine) + resp = client.put( + "/api/v1/activity-log/settings", + json={"enabled": True, "max_days": 99999, "max_entries": 100}, + ) + assert resp.status_code == 422 + + def test_put_settings_max_entries_over_cap_rejected(self, repo, fake_retention_engine): + client = _make_client(repo, retention_engine=fake_retention_engine) + resp = client.put( + "/api/v1/activity-log/settings", + json={"enabled": True, "max_days": 30, "max_entries": 99_000_000}, + ) + assert resp.status_code == 422 + + def test_put_settings_zero_max_days_allowed(self, repo, fake_retention_engine): + """max_days=0 means no age-based pruning; should be accepted.""" + client = _make_client(repo, retention_engine=fake_retention_engine) + resp = client.put( + "/api/v1/activity-log/settings", json={"enabled": True, "max_days": 0, "max_entries": 0} + ) + assert resp.status_code == 200 + + def test_get_settings_allows_anonymous(self, repo, fake_retention_engine): + """GET /settings allows anonymous (AuthRequired, not require_authenticated).""" + client = _make_client(repo, retention_engine=fake_retention_engine, auth_label="anonymous") + resp = client.get("/api/v1/activity-log/settings") + assert resp.status_code == 200 + + def test_put_settings_rejects_anonymous(self, repo, fake_retention_engine): + """PUT /settings rejects anonymous callers (require_authenticated).""" + client = _make_client(repo, retention_engine=fake_retention_engine, auth_label="anonymous") + resp = client.put( + "/api/v1/activity-log/settings", + json={"enabled": True, "max_days": 30, "max_entries": 1000}, + ) + assert resp.status_code == 401 + + +# --------------------------------------------------------------------------- +# Clear endpoint +# --------------------------------------------------------------------------- + + +class TestClear: + def test_clear_requires_non_anonymous_auth(self, repo, fake_recorder): + """Clear endpoint rejects anonymous (loopback) callers.""" + client = _make_client(repo, recorder=fake_recorder, auth_label="anonymous") + resp = client.delete("/api/v1/activity-log") + assert resp.status_code == 401 + + def test_clear_empties_log(self, repo, fake_recorder): + for _ in range(5): + repo.record(_make_entry()) + assert repo.count() == 5 + + client = _make_client(repo, recorder=fake_recorder) + resp = client.delete("/api/v1/activity-log") + assert resp.status_code == 200 + assert repo.count() == 0 + + def test_clear_returns_deleted_count(self, repo, fake_recorder): + for _ in range(3): + repo.record(_make_entry()) + + client = _make_client(repo, recorder=fake_recorder) + resp = client.delete("/api/v1/activity-log") + assert resp.status_code == 200 + data = resp.json() + assert data["deleted"] == 3 + + def test_clear_empty_log_returns_zero(self, repo, fake_recorder): + client = _make_client(repo, recorder=fake_recorder) + resp = client.delete("/api/v1/activity-log") + assert resp.status_code == 200 + assert resp.json()["deleted"] == 0 + + def test_clear_records_audit_entry(self, repo, fake_recorder): + """After clear, the recorder should have recorded activity_log.cleared.""" + for _ in range(4): + repo.record(_make_entry()) + + client = _make_client(repo, recorder=fake_recorder) + resp = client.delete("/api/v1/activity-log") + assert resp.status_code == 200 + + # Exactly one audit call recorded + assert len(fake_recorder.calls) == 1 + audit = fake_recorder.calls[0] + assert audit["action"] == "activity_log.cleared" + assert audit["category"] == ActivityCategory.SYSTEM + assert audit["metadata"]["deleted_count"] == 4 + + def test_clear_audit_entry_uses_auth_label_as_actor(self, repo, fake_recorder): + """The actor in the audit entry should be the authenticated label.""" + client = _make_client(repo, recorder=fake_recorder, auth_label="my-api-key") + client.delete("/api/v1/activity-log") + assert fake_recorder.calls[0]["actor"] == "my-api-key" + + def test_clear_leaves_no_entries_after_audit_record(self, repo): + """Integration: clear leaves exactly one post-clear entry (via real recorder).""" + from ledgrab.core.activity_log.recorder import ActivityRecorder + + real_recorder = ActivityRecorder(repo, MagicMock()) + + # Pre-populate + for _ in range(3): + repo.record(_make_entry()) + assert repo.count() == 3 + + # Use real recorder with the route + client = _make_client(repo, recorder=real_recorder) + resp = client.delete("/api/v1/activity-log") + assert resp.status_code == 200 + + # After clear + audit record: exactly 1 entry remains + assert repo.count() == 1 + from ledgrab.storage.activity_log import ActivityLogFilters + + entries = repo.query(ActivityLogFilters()) + assert entries[0].action == "activity_log.cleared" + assert entries[0].category == ActivityCategory.SYSTEM + + +# --------------------------------------------------------------------------- +# Router registration sanity check +# --------------------------------------------------------------------------- + + +class TestRouterRegistration: + def test_activity_log_routes_in_api(self): + """All five activity-log routes are registered in the app router.""" + from ledgrab.api import router as api_router + + paths = {r.path for r in api_router.routes} # type: ignore[attr-defined] + assert "/api/v1/activity-log" in paths + assert "/api/v1/activity-log/export" in paths + assert "/api/v1/activity-log/settings" in paths diff --git a/server/tests/api/routes/test_activity_log_api_adversarial.py b/server/tests/api/routes/test_activity_log_api_adversarial.py new file mode 100644 index 0000000..087cdc4 --- /dev/null +++ b/server/tests/api/routes/test_activity_log_api_adversarial.py @@ -0,0 +1,1162 @@ +"""Adversarial tests for the activity-log REST API (Phase 4). + +These complement the 49 happy-path tests in ``test_activity_log_api.py`` by +probing edge-cases, security boundaries, and invariants the normal tests don't +cover: + +1. AUTH POSTURE — require_authenticated vs AuthRequired distinction; bad key. +2. CSV INJECTION — all four trigger chars on every column; quoting of embedded + commas, quotes, and newlines; row count vs. filter. +3. EXPORT JSON — empty → []; matches list endpoint data; content-type. +4. PAGINATION INTEGRITY — full traversal; total stable; limit edge cases; + before_seq beyond range; limit=0 and negative → 422. +5. FILTER EDGE CASES — combined filters AND; bad since/until → 422; q with + SQL metachar literals; unknown category/severity contract. +6. SETTINGS EDGE CASES — boundary values; disabled flag roundtrip; missing + fields → 422. +7. CLEAR IS AUDITED — exactly one post-clear system entry; actor recorded; + count in metadata. +""" + +from __future__ import annotations + +import csv +import io +import json +from datetime import datetime, timezone +from unittest.mock import MagicMock + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from ledgrab.api import dependencies as deps +from ledgrab.api.auth import verify_api_key +from ledgrab.api.routes.activity_log import router +from ledgrab.storage.activity_log import ActivityCategory, ActivityLogEntry, ActivitySeverity +from ledgrab.storage.activity_log_repository import ActivityLogRepository + + +# --------------------------------------------------------------------------- +# Fixtures (mirrors test_activity_log_api.py exactly) +# --------------------------------------------------------------------------- + + +@pytest.fixture +def tmp_db(tmp_path): + from ledgrab.storage.database import Database + + db = Database(tmp_path / "adv_activity_log.db") + yield db + db.close() + + +@pytest.fixture +def repo(tmp_db) -> ActivityLogRepository: + return ActivityLogRepository(tmp_db) + + +@pytest.fixture +def fake_recorder(): + class FakeRecorder: + def __init__(self): + self.calls: list[dict] = [] + self.enabled = True + + def record( + self, + category, + action, + *, + severity="info", + actor=None, + entity_type=None, + entity_id=None, + entity_name=None, + message, + metadata=None, + _bypass_enabled=False, + ): + self.calls.append( + { + "category": category, + "action": action, + "severity": severity, + "actor": actor, + "message": message, + "metadata": metadata or {}, + } + ) + + return FakeRecorder() + + +@pytest.fixture +def fake_retention_engine(): + class FakeRetentionEngine: + def __init__(self): + self._settings = {"enabled": True, "max_days": 90, "max_entries": 20000} + + def get_settings(self): + return dict(self._settings) + + async def update_settings(self, *, enabled, max_days, max_entries): + self._settings = { + "enabled": enabled, + "max_days": max_days, + "max_entries": max_entries, + } + return dict(self._settings) + + return FakeRetentionEngine() + + +def _make_app(repo, recorder=None, retention_engine=None, auth_label="test-user"): + app = FastAPI() + app.include_router(router) + app.dependency_overrides[verify_api_key] = lambda: auth_label + app.dependency_overrides[deps.get_activity_log_repo] = lambda: repo + if recorder is not None: + app.dependency_overrides[deps.get_activity_recorder] = lambda: recorder + if retention_engine is not None: + app.dependency_overrides[deps.get_activity_log_retention_engine] = lambda: retention_engine + return app + + +def _make_client(repo, recorder=None, retention_engine=None, auth_label="test-user"): + return TestClient( + _make_app(repo, recorder, retention_engine, auth_label=auth_label), + raise_server_exceptions=False, + ) + + +def _make_entry( + *, + id: str | None = None, + category: str = ActivityCategory.SYSTEM, + action: str = "test.action", + severity: str = ActivitySeverity.INFO, + actor: str = "test-actor", + message: str = "test message", + entity_type: str | None = None, + entity_id: str | None = None, + entity_name: str | None = None, + metadata: dict | None = None, + ts: datetime | None = None, +) -> ActivityLogEntry: + import uuid + + return ActivityLogEntry( + id=id or ("al_" + uuid.uuid4().hex[:8]), + ts=ts or datetime.now(timezone.utc), + category=category, + action=action, + severity=severity, + actor=actor, + message=message, + entity_type=entity_type, + entity_id=entity_id, + entity_name=entity_name, + metadata=metadata or {}, + ) + + +# --------------------------------------------------------------------------- +# 1. AUTH POSTURE +# --------------------------------------------------------------------------- + + +class TestAuthPosture: + """Verify the require_authenticated vs AuthRequired distinction is enforced.""" + + # --- require_authenticated endpoints (export, clear) --- + + def test_export_anonymous_is_rejected(self, repo): + """GET /export with anonymous label → 401 (require_authenticated).""" + client = _make_client(repo, auth_label="anonymous") + resp = client.get("/api/v1/activity-log/export") + assert ( + resp.status_code == 401 + ), f"Export must reject anonymous callers; got {resp.status_code}" + + def test_export_authenticated_user_succeeds(self, repo, fake_recorder): + """GET /export with a non-anonymous label → 200.""" + client = _make_client(repo, recorder=fake_recorder, auth_label="my-key") + resp = client.get("/api/v1/activity-log/export") + assert resp.status_code == 200 + + def test_clear_anonymous_is_rejected(self, repo, fake_recorder): + """DELETE / with anonymous label → 401 (require_authenticated).""" + client = _make_client(repo, recorder=fake_recorder, auth_label="anonymous") + resp = client.delete("/api/v1/activity-log") + assert ( + resp.status_code == 401 + ), f"Clear must reject anonymous callers; got {resp.status_code}" + + def test_clear_authenticated_user_succeeds(self, repo, fake_recorder): + """DELETE / with non-anonymous label → 200.""" + client = _make_client(repo, recorder=fake_recorder, auth_label="my-key") + resp = client.delete("/api/v1/activity-log") + assert resp.status_code == 200 + + # --- AuthRequired endpoints (list, GET settings) allow anonymous --- + + def test_list_allows_anonymous(self, repo): + """GET / with anonymous label → 200 (AuthRequired, not require_authenticated).""" + client = _make_client(repo, auth_label="anonymous") + resp = client.get("/api/v1/activity-log") + assert ( + resp.status_code == 200 + ), f"List should allow anonymous (AuthRequired); got {resp.status_code}" + + def test_settings_get_allows_anonymous(self, repo, fake_retention_engine): + """GET /settings with anonymous → 200 (AuthRequired).""" + client = _make_client(repo, retention_engine=fake_retention_engine, auth_label="anonymous") + resp = client.get("/api/v1/activity-log/settings") + assert ( + resp.status_code == 200 + ), f"GET /settings should allow anonymous; got {resp.status_code}" + + # --- PUT /settings now requires non-anonymous auth (require_authenticated) --- + + def test_settings_put_rejects_anonymous(self, repo, fake_retention_engine): + """PUT /settings with anonymous label → 401 (require_authenticated). + + Disabling auditing or trimming retention to near-zero is equivalent in + impact to clearing the audit trail, so the same auth bar applies. + """ + client = _make_client(repo, retention_engine=fake_retention_engine, auth_label="anonymous") + resp = client.put( + "/api/v1/activity-log/settings", + json={"enabled": True, "max_days": 30, "max_entries": 1000}, + ) + assert ( + resp.status_code == 401 + ), f"PUT /settings must reject anonymous callers; got {resp.status_code}" + + def test_settings_put_authenticated_succeeds(self, repo, fake_retention_engine): + """PUT /settings with a non-anonymous label → 200 (require_authenticated).""" + client = _make_client(repo, retention_engine=fake_retention_engine, auth_label="my-key") + resp = client.put( + "/api/v1/activity-log/settings", + json={"enabled": True, "max_days": 30, "max_entries": 1000}, + ) + assert ( + resp.status_code == 200 + ), f"PUT /settings should succeed for authenticated caller; got {resp.status_code}" + + # --- With api_keys configured: bad key → 401 on every endpoint --- + + def _make_app_with_real_auth(self, repo, recorder=None, retention_engine=None): + """Build an app that runs the REAL verify_api_key (no override). + + We patch ``get_config`` so that it looks as if an API key is + configured, but the test client doesn't provide one — exercising the + real 401 path. + """ + + # Build a lightweight stand-in for the config object so that + # verify_api_key sees api_keys = {"dev": "correct-key"}. + # We use a plain object rather than MagicMock(spec=...) to avoid + # attribute-access restrictions on nested sub-objects. + class _FakeAuth: + api_keys = {"dev": "correct-key"} + + class _FakeConfig: + auth = _FakeAuth() + + mock_config = _FakeConfig() + + app = FastAPI() + app.include_router(router) + # Do NOT override verify_api_key — let the real function run. + app.dependency_overrides[deps.get_activity_log_repo] = lambda: repo + if recorder is not None: + app.dependency_overrides[deps.get_activity_recorder] = lambda: recorder + if retention_engine is not None: + app.dependency_overrides[deps.get_activity_log_retention_engine] = ( + lambda: retention_engine + ) + + client = TestClient(app, raise_server_exceptions=False) + return client, mock_config + + def test_list_bad_key_is_401(self, repo): + """With api_keys configured, a wrong Bearer token → 401 on the list endpoint.""" + client, mock_config = self._make_app_with_real_auth(repo) + with MagicMock() as _: + from unittest.mock import patch + + with patch("ledgrab.api.auth.get_config", return_value=mock_config): + resp = client.get( + "/api/v1/activity-log", + headers={"Authorization": "Bearer wrong-key"}, + ) + assert resp.status_code == 401 + + def test_export_bad_key_is_401(self, repo, fake_recorder): + """With api_keys configured, a wrong Bearer token → 401 on the export endpoint.""" + client, mock_config = self._make_app_with_real_auth(repo, recorder=fake_recorder) + from unittest.mock import patch + + with patch("ledgrab.api.auth.get_config", return_value=mock_config): + resp = client.get( + "/api/v1/activity-log/export", + headers={"Authorization": "Bearer wrong-key"}, + ) + assert resp.status_code == 401 + + def test_clear_bad_key_is_401(self, repo, fake_recorder): + """With api_keys configured, a wrong Bearer token → 401 on the clear endpoint.""" + client, mock_config = self._make_app_with_real_auth(repo, recorder=fake_recorder) + from unittest.mock import patch + + with patch("ledgrab.api.auth.get_config", return_value=mock_config): + resp = client.delete( + "/api/v1/activity-log", + headers={"Authorization": "Bearer wrong-key"}, + ) + assert resp.status_code == 401 + + def test_export_no_key_header_is_401(self, repo, fake_recorder): + """With api_keys configured, missing Authorization header → 401 on export.""" + client, mock_config = self._make_app_with_real_auth(repo, recorder=fake_recorder) + from unittest.mock import patch + + with patch("ledgrab.api.auth.get_config", return_value=mock_config): + resp = client.get("/api/v1/activity-log/export") + assert resp.status_code == 401 + + def test_export_correct_key_is_200(self, repo, fake_recorder): + """With api_keys configured, correct Bearer token → 200 on export.""" + client, mock_config = self._make_app_with_real_auth(repo, recorder=fake_recorder) + from unittest.mock import patch + + with patch("ledgrab.api.auth.get_config", return_value=mock_config): + resp = client.get( + "/api/v1/activity-log/export", + headers={"Authorization": "Bearer correct-key"}, + ) + assert resp.status_code == 200 + + +# --------------------------------------------------------------------------- +# 2. CSV INJECTION / EXPORT SAFETY +# --------------------------------------------------------------------------- + + +class TestCsvInjection: + """OWASP CSV formula-injection guards and well-formedness.""" + + # The four trigger characters, used in each user-controlled field. + _TRIGGERS = ("=", "+", "-", "@") + + def _csv_rows(self, resp) -> list[dict]: + assert resp.status_code == 200 + reader = csv.DictReader(io.StringIO(resp.text)) + return list(reader) + + # -- message column -- + + def test_message_equal_sign_neutralised(self, repo, fake_recorder): + repo.record(_make_entry(message="=SUM(A1:A10)")) + client = _make_client(repo, recorder=fake_recorder) + rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv")) + assert len(rows) == 1 + assert not rows[0]["message"].startswith( + "=" + ), f"CSV injection not neutralised in 'message': {rows[0]['message']!r}" + # The prefix quote must be there + assert rows[0]["message"].startswith( + "'" + ), f"Expected single-quote prefix in 'message': {rows[0]['message']!r}" + + def test_message_plus_sign_neutralised(self, repo, fake_recorder): + repo.record(_make_entry(message="+evil formula")) + client = _make_client(repo, recorder=fake_recorder) + rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv")) + assert not rows[0]["message"].startswith("+"), rows[0]["message"] + assert rows[0]["message"].startswith("'"), rows[0]["message"] + + def test_message_minus_sign_neutralised(self, repo, fake_recorder): + repo.record(_make_entry(message="-bad")) + client = _make_client(repo, recorder=fake_recorder) + rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv")) + assert not rows[0]["message"].startswith("-"), rows[0]["message"] + assert rows[0]["message"].startswith("'"), rows[0]["message"] + + def test_message_at_sign_neutralised(self, repo, fake_recorder): + repo.record(_make_entry(message="@SUM(1+1)")) + client = _make_client(repo, recorder=fake_recorder) + rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv")) + assert not rows[0]["message"].startswith("@"), rows[0]["message"] + assert rows[0]["message"].startswith("'"), rows[0]["message"] + + # -- entity_name column (another user-controlled string field) -- + + def test_entity_name_injection_neutralised(self, repo, fake_recorder): + """entity_name starting with = is neutralised in CSV export.""" + repo.record( + _make_entry( + entity_name="=cmd|' /C calc'!A1", + entity_type="device", + entity_id="d1", + ) + ) + client = _make_client(repo, recorder=fake_recorder) + rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv")) + assert len(rows) == 1 + assert not rows[0]["entity_name"].startswith( + "=" + ), f"CSV injection not neutralised in 'entity_name': {rows[0]['entity_name']!r}" + assert rows[0]["entity_name"].startswith("'"), rows[0]["entity_name"] + + def test_actor_injection_neutralised(self, repo, fake_recorder): + """actor field starting with + is neutralised.""" + repo.record(_make_entry(actor="+actor-formula")) + client = _make_client(repo, recorder=fake_recorder) + rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv")) + assert not rows[0]["actor"].startswith("+"), rows[0]["actor"] + assert rows[0]["actor"].startswith("'"), rows[0]["actor"] + + # -- Safe values must NOT be mangled -- + + def test_safe_message_not_mangled(self, repo, fake_recorder): + """A message that does not start with a trigger char must be unchanged.""" + repo.record(_make_entry(message="Normal log message")) + client = _make_client(repo, recorder=fake_recorder) + rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv")) + assert rows[0]["message"] == "Normal log message" + + def test_empty_message_not_mangled(self, repo, fake_recorder): + """Empty-string fields (None → '') are not prefixed with '.""" + repo.record(_make_entry(entity_name=None)) + client = _make_client(repo, recorder=fake_recorder) + rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv")) + assert rows[0]["entity_name"] == "" + + # -- CSV well-formedness -- + + def test_csv_comma_in_field_properly_quoted(self, repo, fake_recorder): + """A comma inside a field value must be quoted, not split into two columns.""" + repo.record(_make_entry(message="left, right")) + client = _make_client(repo, recorder=fake_recorder) + rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv")) + # DictReader must parse it back as a single field + assert rows[0]["message"] == "left, right" + + def test_csv_double_quote_in_field_escaped(self, repo, fake_recorder): + """A double-quote inside a field must be escaped (RFC 4180: doubled).""" + repo.record(_make_entry(message='He said "hello"')) + client = _make_client(repo, recorder=fake_recorder) + rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv")) + assert rows[0]["message"] == 'He said "hello"' + + def test_csv_newline_in_field_handled(self, repo, fake_recorder): + """A newline embedded in a field must not break the row count.""" + repo.record(_make_entry(message="line1\nline2")) + client = _make_client(repo, recorder=fake_recorder) + rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv")) + # Must still be exactly 1 data row + assert len(rows) == 1 + assert "line1" in rows[0]["message"] + + # -- Row count matches filter -- + + def test_csv_row_count_matches_filter(self, repo, fake_recorder): + """Row count in CSV equals the total from the list endpoint for the same filter.""" + repo.record(_make_entry(category=ActivityCategory.AUTH, message="auth 1")) + repo.record(_make_entry(category=ActivityCategory.AUTH, message="auth 2")) + repo.record(_make_entry(category=ActivityCategory.ENTITY, message="entity 1")) + + client = _make_client(repo, recorder=fake_recorder) + + list_resp = client.get("/api/v1/activity-log?categories=auth") + assert list_resp.status_code == 200 + expected_count = list_resp.json()["total"] + + csv_resp = client.get("/api/v1/activity-log/export?format=csv&categories=auth") + rows = self._csv_rows(csv_resp) + assert ( + len(rows) == expected_count + ), f"CSV row count {len(rows)} != list total {expected_count}" + for row in rows: + assert row["category"] == "auth" + + def test_csv_all_trigger_chars_in_one_export(self, repo, fake_recorder): + """Multiple entries with all four injection chars are all neutralised.""" + triggers = ["=HYPERLINK(url)", "+evil", "-minus", "@at"] + for msg in triggers: + repo.record(_make_entry(message=msg)) + + client = _make_client(repo, recorder=fake_recorder) + rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv")) + assert len(rows) == len(triggers) + for row in rows: + assert row["message"][0:1] not in ( + "=", + "+", + "-", + "@", + ), f"Unguarded injection trigger in message: {row['message']!r}" + + # -- Leading TAB / CR triggers (MEDIUM-1) -- + + def test_tab_formula_prefix_neutralised(self, repo, fake_recorder): + """A leading TAB before a formula character is a recognised injection trigger.""" + repo.record(_make_entry(message="\t=SUM(A1:A10)")) + client = _make_client(repo, recorder=fake_recorder) + rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv")) + assert len(rows) == 1 + msg = rows[0]["message"] + assert not msg.startswith("\t"), f"Leading TAB injection trigger not neutralised: {msg!r}" + assert msg.startswith("'"), f"Expected single-quote prefix for TAB trigger: {msg!r}" + + def test_cr_formula_prefix_neutralised(self, repo, fake_recorder): + """A leading CR is a recognised injection trigger (used to evade column-A checks).""" + repo.record(_make_entry(message="\r=bad")) + client = _make_client(repo, recorder=fake_recorder) + rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv")) + assert len(rows) == 1 + msg = rows[0]["message"] + assert not msg.startswith("\r"), f"Leading CR injection trigger not neutralised: {msg!r}" + assert msg.startswith("'"), f"Expected single-quote prefix for CR trigger: {msg!r}" + + +# --------------------------------------------------------------------------- +# 3. EXPORT JSON +# --------------------------------------------------------------------------- + + +class TestExportJson: + def test_empty_log_returns_empty_array(self, repo, fake_recorder): + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=json") + assert resp.status_code == 200 + data = json.loads(resp.text) + assert data == [], f"Expected [] for empty log, got: {data!r}" + + def test_json_content_type(self, repo, fake_recorder): + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=json") + assert "application/json" in resp.headers["content-type"] + + def test_json_export_matches_list_endpoint_data(self, repo, fake_recorder): + """Exported JSON entries must match what the list endpoint returns.""" + for i in range(5): + repo.record(_make_entry(message=f"entry {i}")) + + client = _make_client(repo, recorder=fake_recorder) + + # Collect all entries via list endpoint (5 < default limit 50) + list_resp = client.get("/api/v1/activity-log") + assert list_resp.status_code == 200 + list_ids = {e["id"] for e in list_resp.json()["entries"]} + + export_resp = client.get("/api/v1/activity-log/export?format=json") + assert export_resp.status_code == 200 + exported = json.loads(export_resp.text) + export_ids = {e["id"] for e in exported} + + assert list_ids == export_ids, ( + f"Export IDs differ from list IDs.\n" + f"Only in list: {list_ids - export_ids}\n" + f"Only in export: {export_ids - list_ids}" + ) + + def test_json_export_honours_filter(self, repo, fake_recorder): + """JSON export with a filter returns only matching entries.""" + repo.record(_make_entry(severity=ActivitySeverity.ERROR, message="error")) + repo.record(_make_entry(severity=ActivitySeverity.INFO, message="info")) + repo.record(_make_entry(severity=ActivitySeverity.WARNING, message="warn")) + + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=json&severities=error") + assert resp.status_code == 200 + data = json.loads(resp.text) + assert len(data) == 1 + assert data[0]["severity"] == "error" + + def test_json_export_all_11_fields_present(self, repo, fake_recorder): + """Every exported entry must contain all 11 schema fields.""" + repo.record( + _make_entry( + entity_type="device", + entity_id="d1", + entity_name="Test Device", + metadata={"k": "v"}, + ) + ) + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=json") + data = json.loads(resp.text) + assert len(data) == 1 + expected_keys = { + "id", + "ts", + "category", + "action", + "severity", + "actor", + "entity_type", + "entity_id", + "entity_name", + "message", + "metadata", + } + missing = expected_keys - set(data[0].keys()) + assert not missing, f"Missing fields in JSON export: {missing}" + + def test_json_export_filtered_empty_returns_empty_array(self, repo, fake_recorder): + """A filter that matches nothing returns [] (not null, not {}).""" + repo.record(_make_entry(category=ActivityCategory.AUTH)) + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=json&categories=device") + assert resp.status_code == 200 + data = json.loads(resp.text) + assert data == [] + + def test_json_export_content_disposition_filename(self, repo, fake_recorder): + """JSON export Content-Disposition must include filename with .json extension.""" + client = _make_client(repo, recorder=fake_recorder) + resp = client.get("/api/v1/activity-log/export?format=json") + cd = resp.headers.get("content-disposition", "") + assert "attachment" in cd + assert "activity-log-" in cd + assert ".json" in cd + + +# --------------------------------------------------------------------------- +# 4. PAGINATION INTEGRITY +# --------------------------------------------------------------------------- + + +class TestPaginationIntegrity: + def test_full_traversal_no_gaps_no_dupes(self, repo): + """Page through all entries; every entry appears exactly once.""" + n = 23 + for i in range(n): + repo.record(_make_entry(message=f"entry {i:03d}")) + + client = _make_client(repo) + all_ids: list[str] = [] + before_seq = None + max_pages = n + 1 # safety guard + + for _ in range(max_pages): + url = "/api/v1/activity-log?limit=7" + if before_seq is not None: + url += f"&before_seq={before_seq}" + resp = client.get(url) + assert resp.status_code == 200 + data = resp.json() + page_ids = [e["id"] for e in data["entries"]] + dupes = [pid for pid in page_ids if pid in all_ids] + assert not dupes, f"Duplicate entries on page: {dupes}" + all_ids.extend(page_ids) + if not data["has_more"]: + break + before_seq = data["next_before_seq"] + + assert len(all_ids) == n, f"Expected {n} unique entries, got {len(all_ids)}" + + def test_total_is_stable_across_pages(self, repo): + """The 'total' field stays constant across all pages for the same filters.""" + for i in range(13): + repo.record(_make_entry(message=f"msg {i}")) + + client = _make_client(repo) + totals: list[int] = [] + before_seq = None + + for _ in range(10): + url = "/api/v1/activity-log?limit=5" + if before_seq is not None: + url += f"&before_seq={before_seq}" + resp = client.get(url) + data = resp.json() + totals.append(data["total"]) + if not data["has_more"]: + break + before_seq = data["next_before_seq"] + + assert len(set(totals)) == 1, f"'total' changed across pages: {totals}" + assert totals[0] == 13 + + def test_limit_above_200_rejected(self, repo): + """limit=201 → 422 (hard cap).""" + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?limit=201") + assert resp.status_code == 422 + + def test_limit_at_cap_200_accepted(self, repo): + """limit=200 is the maximum allowed value and must be accepted.""" + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?limit=200") + assert resp.status_code == 200 + + def test_limit_1_accepted(self, repo): + """limit=1 is the minimum allowed value.""" + repo.record(_make_entry()) + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?limit=1") + assert resp.status_code == 200 + + def test_limit_0_rejected(self, repo): + """limit=0 → 422.""" + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?limit=0") + assert resp.status_code == 422 + + def test_limit_negative_rejected(self, repo): + """limit=-5 → 422.""" + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?limit=-5") + assert resp.status_code == 422 + + def test_before_seq_beyond_range_returns_empty(self, repo): + """before_seq larger than any seq in the DB returns an empty page gracefully.""" + repo.record(_make_entry()) + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?before_seq=999999999") + assert resp.status_code == 200 + data = resp.json() + # before_seq=999999999 means "entries with seq < 999999999" which is everything, + # so we should actually get the entry. Let's use seq=1 to get nothing. + # Actually this tests that a very large before_seq doesn't crash. + assert isinstance(data["entries"], list) + assert isinstance(data["has_more"], bool) + + def test_before_seq_1_returns_empty_page(self, repo): + """before_seq=1 (before any autoincrement seq) → empty page, no error.""" + for _ in range(3): + repo.record(_make_entry()) + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?before_seq=1") + assert resp.status_code == 200 + data = resp.json() + assert data["entries"] == [] + assert data["has_more"] is False + + def test_has_more_false_next_before_seq_null(self, repo): + """When has_more is False, next_before_seq must be null (not an int).""" + for _ in range(3): + repo.record(_make_entry()) + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?limit=10") + data = resp.json() + assert data["has_more"] is False + assert data["next_before_seq"] is None + + def test_second_page_cursor_is_usable(self, repo): + """next_before_seq from page 1 must work as before_seq on page 2.""" + for i in range(6): + repo.record(_make_entry(message=f"msg {i}")) + + client = _make_client(repo) + page1 = client.get("/api/v1/activity-log?limit=3").json() + assert page1["has_more"] is True + cursor = page1["next_before_seq"] + assert cursor is not None + + page2 = client.get(f"/api/v1/activity-log?limit=3&before_seq={cursor}").json() + ids_p1 = {e["id"] for e in page1["entries"]} + ids_p2 = {e["id"] for e in page2["entries"]} + assert ids_p1.isdisjoint(ids_p2), f"Overlap between page1 and page2: {ids_p1 & ids_p2}" + + +# --------------------------------------------------------------------------- +# 5. FILTER EDGE CASES +# --------------------------------------------------------------------------- + + +class TestFilterEdgeCases: + def test_combined_filters_and_semantics(self, repo): + """Multiple filters narrow results with AND semantics.""" + repo.record( + _make_entry( + category=ActivityCategory.AUTH, + severity=ActivitySeverity.WARNING, + actor="alice", + message="auth warning", + ) + ) + repo.record( + _make_entry( + category=ActivityCategory.AUTH, + severity=ActivitySeverity.INFO, + actor="alice", + message="auth info", + ) + ) + repo.record( + _make_entry( + category=ActivityCategory.ENTITY, + severity=ActivitySeverity.WARNING, + actor="alice", + message="entity warning", + ) + ) + + client = _make_client(repo) + # category=auth AND severity=warning AND actor=alice → exactly 1 + resp = client.get("/api/v1/activity-log?categories=auth&severities=warning&actor=alice") + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 1, f"Expected 1 result from combined filters, got {data['total']}" + entry = data["entries"][0] + assert entry["category"] == "auth" + assert entry["severity"] == "warning" + assert entry["actor"] == "alice" + + def test_categories_multi_value_or_semantics(self, repo): + """Multiple 'categories' params → OR within the dimension.""" + repo.record(_make_entry(category=ActivityCategory.AUTH)) + repo.record(_make_entry(category=ActivityCategory.DEVICE)) + repo.record(_make_entry(category=ActivityCategory.ENTITY)) + + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?categories=auth&categories=device") + assert resp.status_code == 200 + assert resp.json()["total"] == 2 + + def test_severities_multi_value(self, repo): + """Multiple 'severities' params → OR within the dimension.""" + repo.record(_make_entry(severity=ActivitySeverity.INFO)) + repo.record(_make_entry(severity=ActivitySeverity.WARNING)) + repo.record(_make_entry(severity=ActivitySeverity.ERROR)) + + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?severities=warning&severities=error") + assert resp.status_code == 200 + assert resp.json()["total"] == 2 + + def test_bad_since_format_is_422(self, repo): + """Malformed 'since' datetime → 422 (Pydantic validation).""" + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?since=not-a-date") + assert resp.status_code == 422 + + def test_bad_until_format_is_422(self, repo): + """Malformed 'until' datetime → 422.""" + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?until=2024-99-99") + assert resp.status_code == 422 + + def test_q_with_percent_literal_treated_literally(self, repo): + """A literal '%' in q must not act as a LIKE wildcard.""" + repo.record(_make_entry(message="100% done")) + repo.record(_make_entry(message="other entry")) + + client = _make_client(repo) + # URL-encode % as %25; the server must pass it as a literal to LIKE + resp = client.get("/api/v1/activity-log?q=100%25") + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 1, ( + f"Literal '100%' matched {data['total']} entries (expected 1). " + f"'%' is being used as a wildcard instead of a literal character." + ) + + def test_q_with_underscore_literal(self, repo): + """A literal '_' in q must not act as a LIKE single-char wildcard.""" + repo.record(_make_entry(message="snake_case message")) + repo.record(_make_entry(message="camelCase message")) + + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?q=snake_case") + assert resp.status_code == 200 + data = resp.json() + # "snake_case" as a LIKE pattern would match any 10-char sequence; + # as a literal it should only match the entry with "snake_case" + assert data["total"] == 1, ( + f"Literal 'snake_case' matched {data['total']} entries (expected 1). " + f"'_' may be acting as a wildcard." + ) + + def test_q_with_single_quote_no_sql_error(self, repo): + """A single quote in q must not cause a SQL error.""" + repo.record(_make_entry(message="it's working")) + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?q=it%27s") + assert resp.status_code == 200 + + def test_q_with_backslash_no_sql_error(self, repo): + """A backslash in q must not cause a SQL error (LIKE ESCAPE '\\').""" + repo.record(_make_entry(message=r"path\to\file")) + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?q=path%5Cto") + assert resp.status_code == 200 + + def test_unknown_category_returns_empty(self, repo): + """A category value that matches no entries returns total=0 (not an error).""" + repo.record(_make_entry(category=ActivityCategory.AUTH)) + client = _make_client(repo) + resp = client.get("/api/v1/activity-log?categories=nonexistent_category") + # Should return 200 with empty results (not 422) — the contract is an enum + # at the DB level, not at the HTTP level; unknown values simply match nothing. + assert resp.status_code in ( + 200, + 422, + ), f"Unexpected status {resp.status_code} for unknown category value" + if resp.status_code == 200: + assert resp.json()["total"] == 0 + + def test_since_after_until_returns_empty(self, repo): + """since > until is a valid (but empty) range — should not be a server error.""" + repo.record(_make_entry(message="any entry")) + client = _make_client(repo) + resp = client.get( + "/api/v1/activity-log" "?since=2024-06-10T00:00:00Z&until=2024-06-09T00:00:00Z" + ) + # The DB will evaluate ts >= since AND ts <= until — when since > until, + # no row can satisfy both; result must be empty, not an error. + assert resp.status_code in (200, 422) + if resp.status_code == 200: + assert resp.json()["total"] == 0 + + def test_actor_empty_string_matches_nothing(self, repo): + """actor='' should match no entries (empty string is treated as None/absent).""" + repo.record(_make_entry(actor="alice")) + client = _make_client(repo) + # Empty string actor — the implementation converts '' → None → no filter, + # or it becomes an exact-match for '' which also returns 0. + resp = client.get("/api/v1/activity-log?actor=") + assert resp.status_code == 200 + + +# --------------------------------------------------------------------------- +# 6. SETTINGS EDGE CASES +# --------------------------------------------------------------------------- + + +class TestSettingsEdgeCases: + def test_max_days_at_cap_3650_accepted(self, repo, fake_retention_engine): + """max_days=3650 (cap) must be accepted.""" + client = _make_client(repo, retention_engine=fake_retention_engine) + resp = client.put( + "/api/v1/activity-log/settings", + json={"enabled": True, "max_days": 3650, "max_entries": 0}, + ) + assert resp.status_code == 200 + assert resp.json()["max_days"] == 3650 + + def test_max_days_one_over_cap_rejected(self, repo, fake_retention_engine): + """max_days=3651 (one over cap) must be rejected with 422.""" + client = _make_client(repo, retention_engine=fake_retention_engine) + resp = client.put( + "/api/v1/activity-log/settings", + json={"enabled": True, "max_days": 3651, "max_entries": 0}, + ) + assert resp.status_code == 422 + + def test_max_entries_at_cap_accepted(self, repo, fake_retention_engine): + """max_entries=10_000_000 (cap) must be accepted.""" + client = _make_client(repo, retention_engine=fake_retention_engine) + resp = client.put( + "/api/v1/activity-log/settings", + json={"enabled": True, "max_days": 0, "max_entries": 10_000_000}, + ) + assert resp.status_code == 200 + assert resp.json()["max_entries"] == 10_000_000 + + def test_max_entries_one_over_cap_rejected(self, repo, fake_retention_engine): + """max_entries=10_000_001 (one over cap) must be rejected with 422.""" + client = _make_client(repo, retention_engine=fake_retention_engine) + resp = client.put( + "/api/v1/activity-log/settings", + json={"enabled": True, "max_days": 0, "max_entries": 10_000_001}, + ) + assert resp.status_code == 422 + + def test_enabled_false_then_get_reflects_disabled(self, repo, fake_retention_engine): + """PUT enabled=False → GET returns enabled=False.""" + client = _make_client(repo, retention_engine=fake_retention_engine) + client.put( + "/api/v1/activity-log/settings", + json={"enabled": False, "max_days": 30, "max_entries": 1000}, + ) + resp = client.get("/api/v1/activity-log/settings") + assert resp.status_code == 200 + assert resp.json()["enabled"] is False + + def test_malformed_body_missing_field_rejected(self, repo, fake_retention_engine): + """PUT body missing 'enabled' → 422.""" + client = _make_client(repo, retention_engine=fake_retention_engine) + resp = client.put( + "/api/v1/activity-log/settings", + json={"max_days": 30, "max_entries": 1000}, + # 'enabled' is required; Pydantic should reject the body + ) + assert resp.status_code == 422 + + def test_malformed_body_extra_field_ignored(self, repo, fake_retention_engine): + """PUT body with unknown extra field must not cause a 500.""" + client = _make_client(repo, retention_engine=fake_retention_engine) + resp = client.put( + "/api/v1/activity-log/settings", + json={ + "enabled": True, + "max_days": 30, + "max_entries": 1000, + "undocumented_field": "surprise", + }, + ) + # Pydantic ignores extra fields by default; should succeed or 422, never 500 + assert resp.status_code in (200, 422) + + def test_put_settings_not_dict_rejected(self, repo, fake_retention_engine): + """PUT with a JSON array body (not an object) → 422.""" + client = _make_client(repo, retention_engine=fake_retention_engine) + resp = client.put( + "/api/v1/activity-log/settings", + content=b'["not", "a", "dict"]', + headers={"Content-Type": "application/json"}, + ) + assert resp.status_code == 422 + + def test_put_settings_empty_body_rejected(self, repo, fake_retention_engine): + """PUT with empty body → 422.""" + client = _make_client(repo, retention_engine=fake_retention_engine) + resp = client.put( + "/api/v1/activity-log/settings", + content=b"", + headers={"Content-Type": "application/json"}, + ) + assert resp.status_code == 422 + + def test_put_settings_zero_values_accepted(self, repo, fake_retention_engine): + """max_days=0 and max_entries=0 (no pruning) must be accepted.""" + client = _make_client(repo, retention_engine=fake_retention_engine) + resp = client.put( + "/api/v1/activity-log/settings", + json={"enabled": True, "max_days": 0, "max_entries": 0}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["max_days"] == 0 + assert data["max_entries"] == 0 + + +# --------------------------------------------------------------------------- +# 7. CLEAR IS AUDITED +# --------------------------------------------------------------------------- + + +class TestClearIsAudited: + def test_clear_leaves_exactly_one_post_clear_entry(self, repo): + """Integration: DELETE leaves exactly one 'activity_log.cleared' entry via real recorder.""" + from ledgrab.core.activity_log.recorder import ActivityRecorder + from ledgrab.storage.activity_log import ActivityLogFilters + + real_recorder = ActivityRecorder(repo, MagicMock()) + + # Pre-populate + for _ in range(5): + repo.record(_make_entry()) + assert repo.count() == 5 + + client = _make_client(repo, recorder=real_recorder, auth_label="admin-key") + resp = client.delete("/api/v1/activity-log") + assert resp.status_code == 200 + + remaining = repo.count() + assert remaining == 1, f"Expected exactly 1 post-clear audit entry, got {remaining}" + entries = repo.query(ActivityLogFilters()) + assert len(entries) == 1 + assert entries[0].action == "activity_log.cleared" + + def test_clear_audit_entry_is_system_category(self, repo): + """The post-clear audit entry must have category='system'.""" + from ledgrab.core.activity_log.recorder import ActivityRecorder + from ledgrab.storage.activity_log import ActivityLogFilters + + real_recorder = ActivityRecorder(repo, MagicMock()) + repo.record(_make_entry()) + + client = _make_client(repo, recorder=real_recorder, auth_label="tester") + client.delete("/api/v1/activity-log") + + entries = repo.query(ActivityLogFilters()) + assert entries[0].category == ActivityCategory.SYSTEM + + def test_clear_audit_entry_records_deleted_count(self, repo): + """The post-clear audit metadata must include the correct deleted_count.""" + from ledgrab.core.activity_log.recorder import ActivityRecorder + from ledgrab.storage.activity_log import ActivityLogFilters + + real_recorder = ActivityRecorder(repo, MagicMock()) + + for _ in range(7): + repo.record(_make_entry()) + + client = _make_client(repo, recorder=real_recorder, auth_label="tester") + resp = client.delete("/api/v1/activity-log") + assert resp.json()["deleted"] == 7 + + entries = repo.query(ActivityLogFilters()) + assert ( + entries[0].metadata.get("deleted_count") == 7 + ), f"Audit metadata missing correct deleted_count: {entries[0].metadata}" + + def test_clear_audit_entry_records_actor(self, repo): + """The post-clear audit entry's actor must match the authenticated label.""" + from ledgrab.core.activity_log.recorder import ActivityRecorder + from ledgrab.storage.activity_log import ActivityLogFilters + + real_recorder = ActivityRecorder(repo, MagicMock()) + client = _make_client(repo, recorder=real_recorder, auth_label="specific-key") + client.delete("/api/v1/activity-log") + + entries = repo.query(ActivityLogFilters()) + assert ( + entries[0].actor == "specific-key" + ), f"Expected actor 'specific-key', got {entries[0].actor!r}" + + def test_clear_audit_entry_action_is_correct(self, repo, fake_recorder): + """The audit record action must be 'activity_log.cleared' (not cleared or log_cleared).""" + client = _make_client(repo, recorder=fake_recorder, auth_label="tester") + client.delete("/api/v1/activity-log") + + assert len(fake_recorder.calls) == 1 + assert ( + fake_recorder.calls[0]["action"] == "activity_log.cleared" + ), f"Wrong action recorded: {fake_recorder.calls[0]['action']!r}" + + def test_clear_response_deleted_matches_pre_clear_count(self, repo, fake_recorder): + """The 'deleted' count in the response must equal the pre-clear row count.""" + n = 9 + for _ in range(n): + repo.record(_make_entry()) + assert repo.count() == n + + client = _make_client(repo, recorder=fake_recorder) + resp = client.delete("/api/v1/activity-log") + assert resp.status_code == 200 + assert resp.json()["deleted"] == n, f"Expected deleted={n}, got {resp.json()['deleted']}" + + def test_clear_empty_log_audit_records_zero_deleted(self, repo, fake_recorder): + """Clearing an empty log records deleted_count=0 in metadata.""" + client = _make_client(repo, recorder=fake_recorder, auth_label="tester") + resp = client.delete("/api/v1/activity-log") + assert resp.status_code == 200 + assert resp.json()["deleted"] == 0 + assert fake_recorder.calls[0]["metadata"]["deleted_count"] == 0 + + def test_clear_log_then_get_shows_only_audit_entry(self, repo): + """After clear, GET /api/v1/activity-log shows exactly the 1 audit entry.""" + from ledgrab.core.activity_log.recorder import ActivityRecorder + + real_recorder = ActivityRecorder(repo, MagicMock()) + + for _ in range(4): + repo.record(_make_entry()) + + client = _make_client(repo, recorder=real_recorder, auth_label="admin") + client.delete("/api/v1/activity-log") + + resp = client.get("/api/v1/activity-log") + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 1, f"Expected total=1 after clear, got {data['total']}" + assert data["entries"][0]["action"] == "activity_log.cleared"