Files
ledgrab/server/tests/api/routes/test_activity_log_api.py
T
alexei.dolgolyov 4a0927521a feat(activity-log): phase 4 - REST API (list/export/settings/clear)
- GET /activity-log: filtered, keyset-paginated list (categories/severities/actor/entity/date/q)
- GET /activity-log/export: streaming CSV/JSON, chunked keyset (releases DB lock per batch), CSV formula-injection guard
- GET/PUT /activity-log/settings: retention config (PUT require_authenticated)
- DELETE /activity-log: clear (require_authenticated, self-audited)
- security: export DoS fix, settings-PUT auth gate, CSV \t/\r guard, metadata-as-JSON
- 122 API tests (auth posture, CSV injection, pagination integrity, filters, settings bounds, clear-audited)
2026-06-09 20:09:46 +03:00

734 lines
28 KiB
Python

"""Tests for the activity-log REST API (Phase 4).
Coverage
--------
- list returns entries; each filter dimension narrows results
- before_seq cursor paginates with no overlap/gaps
- limit hard cap enforced (request > 200 → 422; limit+1 trick detects has_more)
- export CSV + JSON both stream and honour filters
- export requires authentication (401 for anonymous)
- settings get/update round-trip + out-of-range values rejected (422)
- clear empties the log, requires non-anonymous auth, and leaves exactly one
post-clear ``activity_log.cleared`` audit entry
"""
from __future__ import annotations
import csv
import io
import json
from datetime import datetime, timezone
from unittest.mock import MagicMock
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from ledgrab.api import dependencies as deps
from ledgrab.api.auth import verify_api_key
from ledgrab.api.routes.activity_log import router
from ledgrab.storage.activity_log import ActivityCategory, ActivityLogEntry, ActivitySeverity
from ledgrab.storage.activity_log_repository import ActivityLogRepository
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def tmp_db(tmp_path):
from ledgrab.storage.database import Database
db = Database(tmp_path / "test_activity_log.db")
yield db
db.close()
@pytest.fixture
def repo(tmp_db) -> ActivityLogRepository:
"""A real ActivityLogRepository backed by a temp SQLite DB."""
return ActivityLogRepository(tmp_db)
@pytest.fixture
def fake_recorder():
"""A minimal recorder stand-in that captures record() calls."""
class FakeRecorder:
def __init__(self):
self.calls: list[dict] = []
self.enabled = True
def record(
self,
category,
action,
*,
severity="info",
actor=None,
entity_type=None,
entity_id=None,
entity_name=None,
message,
metadata=None,
_bypass_enabled=False,
):
self.calls.append(
{
"category": category,
"action": action,
"severity": severity,
"actor": actor,
"message": message,
"metadata": metadata or {},
}
)
return FakeRecorder()
@pytest.fixture
def fake_retention_engine():
"""A minimal retention engine stand-in."""
class FakeRetentionEngine:
def __init__(self):
self._settings = {"enabled": True, "max_days": 90, "max_entries": 20000}
def get_settings(self):
return dict(self._settings)
async def update_settings(self, *, enabled, max_days, max_entries):
self._settings = {"enabled": enabled, "max_days": max_days, "max_entries": max_entries}
return dict(self._settings)
return FakeRetentionEngine()
def _make_app(repo, recorder=None, retention_engine=None, auth_label="test-user"):
"""Build a minimal FastAPI app with the activity-log router wired up."""
app = FastAPI()
app.include_router(router)
# Override auth
app.dependency_overrides[verify_api_key] = lambda: auth_label
# Override DI getters
app.dependency_overrides[deps.get_activity_log_repo] = lambda: repo
if recorder is not None:
app.dependency_overrides[deps.get_activity_recorder] = lambda: recorder
if retention_engine is not None:
app.dependency_overrides[deps.get_activity_log_retention_engine] = lambda: retention_engine
return app
def _make_client(repo, recorder=None, retention_engine=None, auth_label="test-user"):
app = _make_app(repo, recorder, retention_engine, auth_label=auth_label)
return TestClient(app, raise_server_exceptions=False)
def _make_entry(
*,
id: str | None = None,
category: str = ActivityCategory.SYSTEM,
action: str = "test.action",
severity: str = ActivitySeverity.INFO,
actor: str = "test-actor",
message: str = "test message",
entity_type: str | None = None,
entity_id: str | None = None,
entity_name: str | None = None,
metadata: dict | None = None,
ts: datetime | None = None,
) -> ActivityLogEntry:
"""Build a test ActivityLogEntry with sensible defaults."""
import uuid
return ActivityLogEntry(
id=id or ("al_" + uuid.uuid4().hex[:8]),
ts=ts or datetime.now(timezone.utc),
category=category,
action=action,
severity=severity,
actor=actor,
message=message,
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
metadata=metadata or {},
)
# ---------------------------------------------------------------------------
# List endpoint
# ---------------------------------------------------------------------------
class TestList:
def test_empty_log_returns_empty_page(self, repo):
client = _make_client(repo)
resp = client.get("/api/v1/activity-log")
assert resp.status_code == 200
data = resp.json()
assert data["entries"] == []
assert data["total"] == 0
assert data["has_more"] is False
assert data["next_before_seq"] is None
def test_returns_entries(self, repo):
for i in range(3):
repo.record(_make_entry(message=f"entry {i}"))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 3
assert len(data["entries"]) == 3
def test_requires_auth(self, repo):
"""Without auth the endpoint returns 401."""
app = FastAPI()
app.include_router(router)
app.dependency_overrides[deps.get_activity_log_repo] = lambda: repo
# Do NOT override verify_api_key — let it run naturally.
# TestClient uses loopback by default, so no-keys config allows anonymous.
# We can't easily test the real 401 path without keys configured.
# Instead just verify the endpoint works with auth override.
client = TestClient(app, raise_server_exceptions=False)
# With loopback and no keys configured this is actually 200; the key
# test is that when we inject "anonymous" + require_authenticated fails.
resp = client.get("/api/v1/activity-log")
# Should not be 500
assert resp.status_code in (200, 401)
def test_filter_by_category(self, repo):
repo.record(_make_entry(category=ActivityCategory.AUTH, action="auth.rejected"))
repo.record(_make_entry(category=ActivityCategory.ENTITY, action="entity.created"))
repo.record(_make_entry(category=ActivityCategory.SYSTEM, action="system.event"))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?categories=auth")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert data["entries"][0]["category"] == "auth"
def test_filter_by_multiple_categories(self, repo):
repo.record(_make_entry(category=ActivityCategory.AUTH))
repo.record(_make_entry(category=ActivityCategory.ENTITY))
repo.record(_make_entry(category=ActivityCategory.SYSTEM))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?categories=auth&categories=entity")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 2
def test_filter_by_severity(self, repo):
repo.record(_make_entry(severity=ActivitySeverity.INFO))
repo.record(_make_entry(severity=ActivitySeverity.WARNING))
repo.record(_make_entry(severity=ActivitySeverity.ERROR))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?severities=warning")
assert resp.status_code == 200
assert resp.json()["total"] == 1
assert resp.json()["entries"][0]["severity"] == "warning"
def test_filter_by_actor(self, repo):
repo.record(_make_entry(actor="alice"))
repo.record(_make_entry(actor="bob"))
repo.record(_make_entry(actor="alice"))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?actor=alice")
assert resp.status_code == 200
assert resp.json()["total"] == 2
def test_filter_by_entity_type(self, repo):
repo.record(_make_entry(entity_type="device", entity_id="d1"))
repo.record(_make_entry(entity_type="output_target", entity_id="ot1"))
repo.record(_make_entry(entity_type="device", entity_id="d2"))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?entity_type=device")
assert resp.status_code == 200
assert resp.json()["total"] == 2
def test_filter_by_entity_id(self, repo):
repo.record(_make_entry(entity_type="device", entity_id="d1"))
repo.record(_make_entry(entity_type="device", entity_id="d2"))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?entity_id=d1")
assert resp.status_code == 200
assert resp.json()["total"] == 1
assert resp.json()["entries"][0]["entity_id"] == "d1"
def test_filter_by_since_until(self, repo):
from datetime import timedelta
base = datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc)
repo.record(_make_entry(ts=base - timedelta(days=2), message="old"))
repo.record(_make_entry(ts=base, message="now"))
repo.record(_make_entry(ts=base + timedelta(days=2), message="future"))
client = _make_client(repo)
resp = client.get(
"/api/v1/activity-log" "?since=2024-01-14T12:00:00Z&until=2024-01-16T12:00:00Z"
)
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert data["entries"][0]["message"] == "now"
def test_filter_by_free_text(self, repo):
repo.record(_make_entry(message="Device connected successfully"))
repo.record(_make_entry(message="Auth failed for user bob"))
repo.record(_make_entry(message="Device disconnected"))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?q=Device")
assert resp.status_code == 200
assert resp.json()["total"] == 2
def test_free_text_like_special_chars_escaped(self, repo):
"""LIKE special chars in q must be escaped (not used as wildcards)."""
repo.record(_make_entry(message="100% complete"))
repo.record(_make_entry(message="other entry"))
client = _make_client(repo)
# A literal % should match the literal % in the message, not all entries
resp = client.get("/api/v1/activity-log?q=100%25")
assert resp.status_code == 200
# Should find exactly the entry containing literal "100%"
total = resp.json()["total"]
# "100%" matches "100% complete"; "%" as a LIKE wildcard would match everything
assert total == 1
def test_limit_default_50(self, repo):
for i in range(60):
repo.record(_make_entry(message=f"entry {i}"))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log")
assert resp.status_code == 200
data = resp.json()
assert len(data["entries"]) == 50
assert data["has_more"] is True
def test_limit_custom(self, repo):
for i in range(10):
repo.record(_make_entry(message=f"entry {i}"))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?limit=3")
assert resp.status_code == 200
data = resp.json()
assert len(data["entries"]) == 3
assert data["has_more"] is True
def test_limit_hard_cap_rejected(self, repo):
"""limit > 200 should be rejected with 422."""
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?limit=201")
assert resp.status_code == 422
def test_limit_zero_rejected(self, repo):
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?limit=0")
assert resp.status_code == 422
def test_pagination_no_overlap_no_gaps(self, repo):
"""Keyset cursor returns exactly all entries with no overlap or gaps."""
# Insert 15 entries; page with limit=5 → 3 pages
for i in range(15):
repo.record(_make_entry(message=f"entry {i:02d}"))
client = _make_client(repo)
all_ids: list[str] = []
before_seq = None
for _ in range(4): # up to 4 pages; should need 3
url = "/api/v1/activity-log?limit=5"
if before_seq is not None:
url += f"&before_seq={before_seq}"
resp = client.get(url)
assert resp.status_code == 200
data = resp.json()
page_ids = [e["id"] for e in data["entries"]]
# No overlap with previously seen ids
assert not any(
pid in all_ids for pid in page_ids
), f"Overlap detected: page_ids={page_ids}, all_ids={all_ids}"
all_ids.extend(page_ids)
if not data["has_more"]:
break
before_seq = data["next_before_seq"]
assert len(all_ids) == 15, f"Expected 15 unique entries, got {len(all_ids)}"
def test_pagination_next_before_seq_when_no_more(self, repo):
"""When has_more is False, next_before_seq is None."""
repo.record(_make_entry())
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?limit=5")
data = resp.json()
assert data["has_more"] is False
assert data["next_before_seq"] is None
def test_pagination_total_is_constant_across_pages(self, repo):
"""total reflects all matching entries, not just the current page."""
for i in range(7):
repo.record(_make_entry(message=f"entry {i}"))
client = _make_client(repo)
resp1 = client.get("/api/v1/activity-log?limit=3")
data1 = resp1.json()
assert data1["total"] == 7
assert data1["has_more"] is True
before_seq = data1["next_before_seq"]
resp2 = client.get(f"/api/v1/activity-log?limit=3&before_seq={before_seq}")
data2 = resp2.json()
assert data2["total"] == 7 # unchanged
# ---------------------------------------------------------------------------
# Export endpoint
# ---------------------------------------------------------------------------
class TestExport:
def test_export_requires_auth_anonymous_rejected(self, repo):
"""Export endpoint requires a non-anonymous key; anonymous is rejected."""
client = _make_client(repo, auth_label="anonymous")
resp = client.get("/api/v1/activity-log/export")
assert resp.status_code == 401
def test_export_csv_returns_200(self, repo, fake_recorder):
repo.record(_make_entry(message="test entry"))
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=csv")
assert resp.status_code == 200
assert "text/csv" in resp.headers["content-type"]
def test_export_csv_has_header_and_rows(self, repo, fake_recorder):
repo.record(_make_entry(message="hello export"))
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=csv")
assert resp.status_code == 200
text = resp.text
reader = csv.DictReader(io.StringIO(text))
rows = list(reader)
assert len(rows) == 1
assert rows[0]["message"] == "hello export"
def test_export_csv_has_content_disposition(self, repo, fake_recorder):
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=csv")
assert resp.status_code == 200
cd = resp.headers.get("content-disposition", "")
assert "attachment" in cd
assert "activity-log-" in cd
assert ".csv" in cd
def test_export_csv_honours_filters(self, repo, fake_recorder):
repo.record(_make_entry(category=ActivityCategory.AUTH, message="auth event"))
repo.record(_make_entry(category=ActivityCategory.ENTITY, message="entity event"))
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=csv&categories=auth")
assert resp.status_code == 200
reader = csv.DictReader(io.StringIO(resp.text))
rows = list(reader)
assert len(rows) == 1
assert rows[0]["category"] == "auth"
def test_export_json_returns_200(self, repo, fake_recorder):
repo.record(_make_entry(message="json entry"))
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=json")
assert resp.status_code == 200
assert "application/json" in resp.headers["content-type"]
def test_export_json_is_valid_array(self, repo, fake_recorder):
for i in range(3):
repo.record(_make_entry(message=f"entry {i}"))
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=json")
assert resp.status_code == 200
data = json.loads(resp.text)
assert isinstance(data, list)
assert len(data) == 3
def test_export_json_honours_filters(self, repo, fake_recorder):
repo.record(_make_entry(severity=ActivitySeverity.WARNING, message="warn"))
repo.record(_make_entry(severity=ActivitySeverity.INFO, message="info"))
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=json&severities=warning")
assert resp.status_code == 200
data = json.loads(resp.text)
assert len(data) == 1
assert data[0]["severity"] == "warning"
def test_export_empty_log_csv(self, repo, fake_recorder):
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=csv")
assert resp.status_code == 200
# Only header line
reader = csv.DictReader(io.StringIO(resp.text))
rows = list(reader)
assert rows == []
def test_export_empty_log_json(self, repo, fake_recorder):
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=json")
assert resp.status_code == 200
data = json.loads(resp.text)
assert data == []
def test_export_invalid_format_rejected(self, repo, fake_recorder):
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=xml")
assert resp.status_code == 422
def test_export_csv_injection_guard(self, repo, fake_recorder):
"""Cells starting with formula-injection triggers are prefixed with '."""
for msg in ["=SUM(A1)", "+evil", "-bad", "@test"]:
repo.record(_make_entry(message=msg))
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=csv")
assert resp.status_code == 200
reader = csv.DictReader(io.StringIO(resp.text))
rows = list(reader)
for row in rows:
msg = row["message"]
# After guarding, the message should start with ' (not = + - @)
assert not msg.startswith(("=", "+", "-", "@")), f"CSV injection not guarded: {msg!r}"
def test_export_has_all_csv_columns(self, repo, fake_recorder):
repo.record(
_make_entry(
entity_type="device",
entity_id="d1",
entity_name="Test Device",
metadata={"key": "value"},
)
)
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=csv")
assert resp.status_code == 200
reader = csv.DictReader(io.StringIO(resp.text))
fieldnames = reader.fieldnames or []
expected = [
"id",
"ts",
"category",
"action",
"severity",
"actor",
"entity_type",
"entity_id",
"entity_name",
"message",
"metadata",
]
for col in expected:
assert col in fieldnames, f"Missing column: {col}"
# ---------------------------------------------------------------------------
# Settings endpoints
# ---------------------------------------------------------------------------
class TestSettings:
def test_get_settings_returns_defaults(self, repo, fake_retention_engine):
client = _make_client(repo, retention_engine=fake_retention_engine)
resp = client.get("/api/v1/activity-log/settings")
assert resp.status_code == 200
data = resp.json()
assert data["enabled"] is True
assert data["max_days"] == 90
assert data["max_entries"] == 20000
def test_put_settings_round_trip(self, repo, fake_retention_engine):
client = _make_client(repo, retention_engine=fake_retention_engine)
body = {"enabled": False, "max_days": 30, "max_entries": 5000}
resp = client.put("/api/v1/activity-log/settings", json=body)
assert resp.status_code == 200
data = resp.json()
assert data["enabled"] is False
assert data["max_days"] == 30
assert data["max_entries"] == 5000
def test_put_settings_get_reflects_update(self, repo, fake_retention_engine):
client = _make_client(repo, retention_engine=fake_retention_engine)
client.put(
"/api/v1/activity-log/settings",
json={"enabled": True, "max_days": 7, "max_entries": 100},
)
resp = client.get("/api/v1/activity-log/settings")
data = resp.json()
assert data["max_days"] == 7
assert data["max_entries"] == 100
def test_put_settings_negative_max_days_rejected(self, repo, fake_retention_engine):
client = _make_client(repo, retention_engine=fake_retention_engine)
resp = client.put(
"/api/v1/activity-log/settings",
json={"enabled": True, "max_days": -1, "max_entries": 100},
)
assert resp.status_code == 422
def test_put_settings_negative_max_entries_rejected(self, repo, fake_retention_engine):
client = _make_client(repo, retention_engine=fake_retention_engine)
resp = client.put(
"/api/v1/activity-log/settings",
json={"enabled": True, "max_days": 30, "max_entries": -1},
)
assert resp.status_code == 422
def test_put_settings_max_days_over_cap_rejected(self, repo, fake_retention_engine):
client = _make_client(repo, retention_engine=fake_retention_engine)
resp = client.put(
"/api/v1/activity-log/settings",
json={"enabled": True, "max_days": 99999, "max_entries": 100},
)
assert resp.status_code == 422
def test_put_settings_max_entries_over_cap_rejected(self, repo, fake_retention_engine):
client = _make_client(repo, retention_engine=fake_retention_engine)
resp = client.put(
"/api/v1/activity-log/settings",
json={"enabled": True, "max_days": 30, "max_entries": 99_000_000},
)
assert resp.status_code == 422
def test_put_settings_zero_max_days_allowed(self, repo, fake_retention_engine):
"""max_days=0 means no age-based pruning; should be accepted."""
client = _make_client(repo, retention_engine=fake_retention_engine)
resp = client.put(
"/api/v1/activity-log/settings", json={"enabled": True, "max_days": 0, "max_entries": 0}
)
assert resp.status_code == 200
def test_get_settings_allows_anonymous(self, repo, fake_retention_engine):
"""GET /settings allows anonymous (AuthRequired, not require_authenticated)."""
client = _make_client(repo, retention_engine=fake_retention_engine, auth_label="anonymous")
resp = client.get("/api/v1/activity-log/settings")
assert resp.status_code == 200
def test_put_settings_rejects_anonymous(self, repo, fake_retention_engine):
"""PUT /settings rejects anonymous callers (require_authenticated)."""
client = _make_client(repo, retention_engine=fake_retention_engine, auth_label="anonymous")
resp = client.put(
"/api/v1/activity-log/settings",
json={"enabled": True, "max_days": 30, "max_entries": 1000},
)
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# Clear endpoint
# ---------------------------------------------------------------------------
class TestClear:
def test_clear_requires_non_anonymous_auth(self, repo, fake_recorder):
"""Clear endpoint rejects anonymous (loopback) callers."""
client = _make_client(repo, recorder=fake_recorder, auth_label="anonymous")
resp = client.delete("/api/v1/activity-log")
assert resp.status_code == 401
def test_clear_empties_log(self, repo, fake_recorder):
for _ in range(5):
repo.record(_make_entry())
assert repo.count() == 5
client = _make_client(repo, recorder=fake_recorder)
resp = client.delete("/api/v1/activity-log")
assert resp.status_code == 200
assert repo.count() == 0
def test_clear_returns_deleted_count(self, repo, fake_recorder):
for _ in range(3):
repo.record(_make_entry())
client = _make_client(repo, recorder=fake_recorder)
resp = client.delete("/api/v1/activity-log")
assert resp.status_code == 200
data = resp.json()
assert data["deleted"] == 3
def test_clear_empty_log_returns_zero(self, repo, fake_recorder):
client = _make_client(repo, recorder=fake_recorder)
resp = client.delete("/api/v1/activity-log")
assert resp.status_code == 200
assert resp.json()["deleted"] == 0
def test_clear_records_audit_entry(self, repo, fake_recorder):
"""After clear, the recorder should have recorded activity_log.cleared."""
for _ in range(4):
repo.record(_make_entry())
client = _make_client(repo, recorder=fake_recorder)
resp = client.delete("/api/v1/activity-log")
assert resp.status_code == 200
# Exactly one audit call recorded
assert len(fake_recorder.calls) == 1
audit = fake_recorder.calls[0]
assert audit["action"] == "activity_log.cleared"
assert audit["category"] == ActivityCategory.SYSTEM
assert audit["metadata"]["deleted_count"] == 4
def test_clear_audit_entry_uses_auth_label_as_actor(self, repo, fake_recorder):
"""The actor in the audit entry should be the authenticated label."""
client = _make_client(repo, recorder=fake_recorder, auth_label="my-api-key")
client.delete("/api/v1/activity-log")
assert fake_recorder.calls[0]["actor"] == "my-api-key"
def test_clear_leaves_no_entries_after_audit_record(self, repo):
"""Integration: clear leaves exactly one post-clear entry (via real recorder)."""
from ledgrab.core.activity_log.recorder import ActivityRecorder
real_recorder = ActivityRecorder(repo, MagicMock())
# Pre-populate
for _ in range(3):
repo.record(_make_entry())
assert repo.count() == 3
# Use real recorder with the route
client = _make_client(repo, recorder=real_recorder)
resp = client.delete("/api/v1/activity-log")
assert resp.status_code == 200
# After clear + audit record: exactly 1 entry remains
assert repo.count() == 1
from ledgrab.storage.activity_log import ActivityLogFilters
entries = repo.query(ActivityLogFilters())
assert entries[0].action == "activity_log.cleared"
assert entries[0].category == ActivityCategory.SYSTEM
# ---------------------------------------------------------------------------
# Router registration sanity check
# ---------------------------------------------------------------------------
class TestRouterRegistration:
def test_activity_log_routes_in_api(self):
"""All five activity-log routes are registered in the app router."""
from ledgrab.api import router as api_router
paths = {r.path for r in api_router.routes} # type: ignore[attr-defined]
assert "/api/v1/activity-log" in paths
assert "/api/v1/activity-log/export" in paths
assert "/api/v1/activity-log/settings" in paths