Files
ledgrab/server/tests/api/routes/test_activity_log_api_adversarial.py
T
alexei.dolgolyov 4a0927521a feat(activity-log): phase 4 - REST API (list/export/settings/clear)
- GET /activity-log: filtered, keyset-paginated list (categories/severities/actor/entity/date/q)
- GET /activity-log/export: streaming CSV/JSON, chunked keyset (releases DB lock per batch), CSV formula-injection guard
- GET/PUT /activity-log/settings: retention config (PUT require_authenticated)
- DELETE /activity-log: clear (require_authenticated, self-audited)
- security: export DoS fix, settings-PUT auth gate, CSV \t/\r guard, metadata-as-JSON
- 122 API tests (auth posture, CSV injection, pagination integrity, filters, settings bounds, clear-audited)
2026-06-09 20:09:46 +03:00

1163 lines
49 KiB
Python

"""Adversarial tests for the activity-log REST API (Phase 4).
These complement the 49 happy-path tests in ``test_activity_log_api.py`` by
probing edge-cases, security boundaries, and invariants the normal tests don't
cover:
1. AUTH POSTURE — require_authenticated vs AuthRequired distinction; bad key.
2. CSV INJECTION — all four trigger chars on every column; quoting of embedded
commas, quotes, and newlines; row count vs. filter.
3. EXPORT JSON — empty → []; matches list endpoint data; content-type.
4. PAGINATION INTEGRITY — full traversal; total stable; limit edge cases;
before_seq beyond range; limit=0 and negative → 422.
5. FILTER EDGE CASES — combined filters AND; bad since/until → 422; q with
SQL metachar literals; unknown category/severity contract.
6. SETTINGS EDGE CASES — boundary values; disabled flag roundtrip; missing
fields → 422.
7. CLEAR IS AUDITED — exactly one post-clear system entry; actor recorded;
count in metadata.
"""
from __future__ import annotations
import csv
import io
import json
from datetime import datetime, timezone
from unittest.mock import MagicMock
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from ledgrab.api import dependencies as deps
from ledgrab.api.auth import verify_api_key
from ledgrab.api.routes.activity_log import router
from ledgrab.storage.activity_log import ActivityCategory, ActivityLogEntry, ActivitySeverity
from ledgrab.storage.activity_log_repository import ActivityLogRepository
# ---------------------------------------------------------------------------
# Fixtures (mirrors test_activity_log_api.py exactly)
# ---------------------------------------------------------------------------
@pytest.fixture
def tmp_db(tmp_path):
from ledgrab.storage.database import Database
db = Database(tmp_path / "adv_activity_log.db")
yield db
db.close()
@pytest.fixture
def repo(tmp_db) -> ActivityLogRepository:
return ActivityLogRepository(tmp_db)
@pytest.fixture
def fake_recorder():
class FakeRecorder:
def __init__(self):
self.calls: list[dict] = []
self.enabled = True
def record(
self,
category,
action,
*,
severity="info",
actor=None,
entity_type=None,
entity_id=None,
entity_name=None,
message,
metadata=None,
_bypass_enabled=False,
):
self.calls.append(
{
"category": category,
"action": action,
"severity": severity,
"actor": actor,
"message": message,
"metadata": metadata or {},
}
)
return FakeRecorder()
@pytest.fixture
def fake_retention_engine():
class FakeRetentionEngine:
def __init__(self):
self._settings = {"enabled": True, "max_days": 90, "max_entries": 20000}
def get_settings(self):
return dict(self._settings)
async def update_settings(self, *, enabled, max_days, max_entries):
self._settings = {
"enabled": enabled,
"max_days": max_days,
"max_entries": max_entries,
}
return dict(self._settings)
return FakeRetentionEngine()
def _make_app(repo, recorder=None, retention_engine=None, auth_label="test-user"):
app = FastAPI()
app.include_router(router)
app.dependency_overrides[verify_api_key] = lambda: auth_label
app.dependency_overrides[deps.get_activity_log_repo] = lambda: repo
if recorder is not None:
app.dependency_overrides[deps.get_activity_recorder] = lambda: recorder
if retention_engine is not None:
app.dependency_overrides[deps.get_activity_log_retention_engine] = lambda: retention_engine
return app
def _make_client(repo, recorder=None, retention_engine=None, auth_label="test-user"):
return TestClient(
_make_app(repo, recorder, retention_engine, auth_label=auth_label),
raise_server_exceptions=False,
)
def _make_entry(
*,
id: str | None = None,
category: str = ActivityCategory.SYSTEM,
action: str = "test.action",
severity: str = ActivitySeverity.INFO,
actor: str = "test-actor",
message: str = "test message",
entity_type: str | None = None,
entity_id: str | None = None,
entity_name: str | None = None,
metadata: dict | None = None,
ts: datetime | None = None,
) -> ActivityLogEntry:
import uuid
return ActivityLogEntry(
id=id or ("al_" + uuid.uuid4().hex[:8]),
ts=ts or datetime.now(timezone.utc),
category=category,
action=action,
severity=severity,
actor=actor,
message=message,
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
metadata=metadata or {},
)
# ---------------------------------------------------------------------------
# 1. AUTH POSTURE
# ---------------------------------------------------------------------------
class TestAuthPosture:
"""Verify the require_authenticated vs AuthRequired distinction is enforced."""
# --- require_authenticated endpoints (export, clear) ---
def test_export_anonymous_is_rejected(self, repo):
"""GET /export with anonymous label → 401 (require_authenticated)."""
client = _make_client(repo, auth_label="anonymous")
resp = client.get("/api/v1/activity-log/export")
assert (
resp.status_code == 401
), f"Export must reject anonymous callers; got {resp.status_code}"
def test_export_authenticated_user_succeeds(self, repo, fake_recorder):
"""GET /export with a non-anonymous label → 200."""
client = _make_client(repo, recorder=fake_recorder, auth_label="my-key")
resp = client.get("/api/v1/activity-log/export")
assert resp.status_code == 200
def test_clear_anonymous_is_rejected(self, repo, fake_recorder):
"""DELETE / with anonymous label → 401 (require_authenticated)."""
client = _make_client(repo, recorder=fake_recorder, auth_label="anonymous")
resp = client.delete("/api/v1/activity-log")
assert (
resp.status_code == 401
), f"Clear must reject anonymous callers; got {resp.status_code}"
def test_clear_authenticated_user_succeeds(self, repo, fake_recorder):
"""DELETE / with non-anonymous label → 200."""
client = _make_client(repo, recorder=fake_recorder, auth_label="my-key")
resp = client.delete("/api/v1/activity-log")
assert resp.status_code == 200
# --- AuthRequired endpoints (list, GET settings) allow anonymous ---
def test_list_allows_anonymous(self, repo):
"""GET / with anonymous label → 200 (AuthRequired, not require_authenticated)."""
client = _make_client(repo, auth_label="anonymous")
resp = client.get("/api/v1/activity-log")
assert (
resp.status_code == 200
), f"List should allow anonymous (AuthRequired); got {resp.status_code}"
def test_settings_get_allows_anonymous(self, repo, fake_retention_engine):
"""GET /settings with anonymous → 200 (AuthRequired)."""
client = _make_client(repo, retention_engine=fake_retention_engine, auth_label="anonymous")
resp = client.get("/api/v1/activity-log/settings")
assert (
resp.status_code == 200
), f"GET /settings should allow anonymous; got {resp.status_code}"
# --- PUT /settings now requires non-anonymous auth (require_authenticated) ---
def test_settings_put_rejects_anonymous(self, repo, fake_retention_engine):
"""PUT /settings with anonymous label → 401 (require_authenticated).
Disabling auditing or trimming retention to near-zero is equivalent in
impact to clearing the audit trail, so the same auth bar applies.
"""
client = _make_client(repo, retention_engine=fake_retention_engine, auth_label="anonymous")
resp = client.put(
"/api/v1/activity-log/settings",
json={"enabled": True, "max_days": 30, "max_entries": 1000},
)
assert (
resp.status_code == 401
), f"PUT /settings must reject anonymous callers; got {resp.status_code}"
def test_settings_put_authenticated_succeeds(self, repo, fake_retention_engine):
"""PUT /settings with a non-anonymous label → 200 (require_authenticated)."""
client = _make_client(repo, retention_engine=fake_retention_engine, auth_label="my-key")
resp = client.put(
"/api/v1/activity-log/settings",
json={"enabled": True, "max_days": 30, "max_entries": 1000},
)
assert (
resp.status_code == 200
), f"PUT /settings should succeed for authenticated caller; got {resp.status_code}"
# --- With api_keys configured: bad key → 401 on every endpoint ---
def _make_app_with_real_auth(self, repo, recorder=None, retention_engine=None):
"""Build an app that runs the REAL verify_api_key (no override).
We patch ``get_config`` so that it looks as if an API key is
configured, but the test client doesn't provide one — exercising the
real 401 path.
"""
# Build a lightweight stand-in for the config object so that
# verify_api_key sees api_keys = {"dev": "correct-key"}.
# We use a plain object rather than MagicMock(spec=...) to avoid
# attribute-access restrictions on nested sub-objects.
class _FakeAuth:
api_keys = {"dev": "correct-key"}
class _FakeConfig:
auth = _FakeAuth()
mock_config = _FakeConfig()
app = FastAPI()
app.include_router(router)
# Do NOT override verify_api_key — let the real function run.
app.dependency_overrides[deps.get_activity_log_repo] = lambda: repo
if recorder is not None:
app.dependency_overrides[deps.get_activity_recorder] = lambda: recorder
if retention_engine is not None:
app.dependency_overrides[deps.get_activity_log_retention_engine] = (
lambda: retention_engine
)
client = TestClient(app, raise_server_exceptions=False)
return client, mock_config
def test_list_bad_key_is_401(self, repo):
"""With api_keys configured, a wrong Bearer token → 401 on the list endpoint."""
client, mock_config = self._make_app_with_real_auth(repo)
with MagicMock() as _:
from unittest.mock import patch
with patch("ledgrab.api.auth.get_config", return_value=mock_config):
resp = client.get(
"/api/v1/activity-log",
headers={"Authorization": "Bearer wrong-key"},
)
assert resp.status_code == 401
def test_export_bad_key_is_401(self, repo, fake_recorder):
"""With api_keys configured, a wrong Bearer token → 401 on the export endpoint."""
client, mock_config = self._make_app_with_real_auth(repo, recorder=fake_recorder)
from unittest.mock import patch
with patch("ledgrab.api.auth.get_config", return_value=mock_config):
resp = client.get(
"/api/v1/activity-log/export",
headers={"Authorization": "Bearer wrong-key"},
)
assert resp.status_code == 401
def test_clear_bad_key_is_401(self, repo, fake_recorder):
"""With api_keys configured, a wrong Bearer token → 401 on the clear endpoint."""
client, mock_config = self._make_app_with_real_auth(repo, recorder=fake_recorder)
from unittest.mock import patch
with patch("ledgrab.api.auth.get_config", return_value=mock_config):
resp = client.delete(
"/api/v1/activity-log",
headers={"Authorization": "Bearer wrong-key"},
)
assert resp.status_code == 401
def test_export_no_key_header_is_401(self, repo, fake_recorder):
"""With api_keys configured, missing Authorization header → 401 on export."""
client, mock_config = self._make_app_with_real_auth(repo, recorder=fake_recorder)
from unittest.mock import patch
with patch("ledgrab.api.auth.get_config", return_value=mock_config):
resp = client.get("/api/v1/activity-log/export")
assert resp.status_code == 401
def test_export_correct_key_is_200(self, repo, fake_recorder):
"""With api_keys configured, correct Bearer token → 200 on export."""
client, mock_config = self._make_app_with_real_auth(repo, recorder=fake_recorder)
from unittest.mock import patch
with patch("ledgrab.api.auth.get_config", return_value=mock_config):
resp = client.get(
"/api/v1/activity-log/export",
headers={"Authorization": "Bearer correct-key"},
)
assert resp.status_code == 200
# ---------------------------------------------------------------------------
# 2. CSV INJECTION / EXPORT SAFETY
# ---------------------------------------------------------------------------
class TestCsvInjection:
"""OWASP CSV formula-injection guards and well-formedness."""
# The four trigger characters, used in each user-controlled field.
_TRIGGERS = ("=", "+", "-", "@")
def _csv_rows(self, resp) -> list[dict]:
assert resp.status_code == 200
reader = csv.DictReader(io.StringIO(resp.text))
return list(reader)
# -- message column --
def test_message_equal_sign_neutralised(self, repo, fake_recorder):
repo.record(_make_entry(message="=SUM(A1:A10)"))
client = _make_client(repo, recorder=fake_recorder)
rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv"))
assert len(rows) == 1
assert not rows[0]["message"].startswith(
"="
), f"CSV injection not neutralised in 'message': {rows[0]['message']!r}"
# The prefix quote must be there
assert rows[0]["message"].startswith(
"'"
), f"Expected single-quote prefix in 'message': {rows[0]['message']!r}"
def test_message_plus_sign_neutralised(self, repo, fake_recorder):
repo.record(_make_entry(message="+evil formula"))
client = _make_client(repo, recorder=fake_recorder)
rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv"))
assert not rows[0]["message"].startswith("+"), rows[0]["message"]
assert rows[0]["message"].startswith("'"), rows[0]["message"]
def test_message_minus_sign_neutralised(self, repo, fake_recorder):
repo.record(_make_entry(message="-bad"))
client = _make_client(repo, recorder=fake_recorder)
rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv"))
assert not rows[0]["message"].startswith("-"), rows[0]["message"]
assert rows[0]["message"].startswith("'"), rows[0]["message"]
def test_message_at_sign_neutralised(self, repo, fake_recorder):
repo.record(_make_entry(message="@SUM(1+1)"))
client = _make_client(repo, recorder=fake_recorder)
rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv"))
assert not rows[0]["message"].startswith("@"), rows[0]["message"]
assert rows[0]["message"].startswith("'"), rows[0]["message"]
# -- entity_name column (another user-controlled string field) --
def test_entity_name_injection_neutralised(self, repo, fake_recorder):
"""entity_name starting with = is neutralised in CSV export."""
repo.record(
_make_entry(
entity_name="=cmd|' /C calc'!A1",
entity_type="device",
entity_id="d1",
)
)
client = _make_client(repo, recorder=fake_recorder)
rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv"))
assert len(rows) == 1
assert not rows[0]["entity_name"].startswith(
"="
), f"CSV injection not neutralised in 'entity_name': {rows[0]['entity_name']!r}"
assert rows[0]["entity_name"].startswith("'"), rows[0]["entity_name"]
def test_actor_injection_neutralised(self, repo, fake_recorder):
"""actor field starting with + is neutralised."""
repo.record(_make_entry(actor="+actor-formula"))
client = _make_client(repo, recorder=fake_recorder)
rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv"))
assert not rows[0]["actor"].startswith("+"), rows[0]["actor"]
assert rows[0]["actor"].startswith("'"), rows[0]["actor"]
# -- Safe values must NOT be mangled --
def test_safe_message_not_mangled(self, repo, fake_recorder):
"""A message that does not start with a trigger char must be unchanged."""
repo.record(_make_entry(message="Normal log message"))
client = _make_client(repo, recorder=fake_recorder)
rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv"))
assert rows[0]["message"] == "Normal log message"
def test_empty_message_not_mangled(self, repo, fake_recorder):
"""Empty-string fields (None → '') are not prefixed with '."""
repo.record(_make_entry(entity_name=None))
client = _make_client(repo, recorder=fake_recorder)
rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv"))
assert rows[0]["entity_name"] == ""
# -- CSV well-formedness --
def test_csv_comma_in_field_properly_quoted(self, repo, fake_recorder):
"""A comma inside a field value must be quoted, not split into two columns."""
repo.record(_make_entry(message="left, right"))
client = _make_client(repo, recorder=fake_recorder)
rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv"))
# DictReader must parse it back as a single field
assert rows[0]["message"] == "left, right"
def test_csv_double_quote_in_field_escaped(self, repo, fake_recorder):
"""A double-quote inside a field must be escaped (RFC 4180: doubled)."""
repo.record(_make_entry(message='He said "hello"'))
client = _make_client(repo, recorder=fake_recorder)
rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv"))
assert rows[0]["message"] == 'He said "hello"'
def test_csv_newline_in_field_handled(self, repo, fake_recorder):
"""A newline embedded in a field must not break the row count."""
repo.record(_make_entry(message="line1\nline2"))
client = _make_client(repo, recorder=fake_recorder)
rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv"))
# Must still be exactly 1 data row
assert len(rows) == 1
assert "line1" in rows[0]["message"]
# -- Row count matches filter --
def test_csv_row_count_matches_filter(self, repo, fake_recorder):
"""Row count in CSV equals the total from the list endpoint for the same filter."""
repo.record(_make_entry(category=ActivityCategory.AUTH, message="auth 1"))
repo.record(_make_entry(category=ActivityCategory.AUTH, message="auth 2"))
repo.record(_make_entry(category=ActivityCategory.ENTITY, message="entity 1"))
client = _make_client(repo, recorder=fake_recorder)
list_resp = client.get("/api/v1/activity-log?categories=auth")
assert list_resp.status_code == 200
expected_count = list_resp.json()["total"]
csv_resp = client.get("/api/v1/activity-log/export?format=csv&categories=auth")
rows = self._csv_rows(csv_resp)
assert (
len(rows) == expected_count
), f"CSV row count {len(rows)} != list total {expected_count}"
for row in rows:
assert row["category"] == "auth"
def test_csv_all_trigger_chars_in_one_export(self, repo, fake_recorder):
"""Multiple entries with all four injection chars are all neutralised."""
triggers = ["=HYPERLINK(url)", "+evil", "-minus", "@at"]
for msg in triggers:
repo.record(_make_entry(message=msg))
client = _make_client(repo, recorder=fake_recorder)
rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv"))
assert len(rows) == len(triggers)
for row in rows:
assert row["message"][0:1] not in (
"=",
"+",
"-",
"@",
), f"Unguarded injection trigger in message: {row['message']!r}"
# -- Leading TAB / CR triggers (MEDIUM-1) --
def test_tab_formula_prefix_neutralised(self, repo, fake_recorder):
"""A leading TAB before a formula character is a recognised injection trigger."""
repo.record(_make_entry(message="\t=SUM(A1:A10)"))
client = _make_client(repo, recorder=fake_recorder)
rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv"))
assert len(rows) == 1
msg = rows[0]["message"]
assert not msg.startswith("\t"), f"Leading TAB injection trigger not neutralised: {msg!r}"
assert msg.startswith("'"), f"Expected single-quote prefix for TAB trigger: {msg!r}"
def test_cr_formula_prefix_neutralised(self, repo, fake_recorder):
"""A leading CR is a recognised injection trigger (used to evade column-A checks)."""
repo.record(_make_entry(message="\r=bad"))
client = _make_client(repo, recorder=fake_recorder)
rows = self._csv_rows(client.get("/api/v1/activity-log/export?format=csv"))
assert len(rows) == 1
msg = rows[0]["message"]
assert not msg.startswith("\r"), f"Leading CR injection trigger not neutralised: {msg!r}"
assert msg.startswith("'"), f"Expected single-quote prefix for CR trigger: {msg!r}"
# ---------------------------------------------------------------------------
# 3. EXPORT JSON
# ---------------------------------------------------------------------------
class TestExportJson:
def test_empty_log_returns_empty_array(self, repo, fake_recorder):
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=json")
assert resp.status_code == 200
data = json.loads(resp.text)
assert data == [], f"Expected [] for empty log, got: {data!r}"
def test_json_content_type(self, repo, fake_recorder):
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=json")
assert "application/json" in resp.headers["content-type"]
def test_json_export_matches_list_endpoint_data(self, repo, fake_recorder):
"""Exported JSON entries must match what the list endpoint returns."""
for i in range(5):
repo.record(_make_entry(message=f"entry {i}"))
client = _make_client(repo, recorder=fake_recorder)
# Collect all entries via list endpoint (5 < default limit 50)
list_resp = client.get("/api/v1/activity-log")
assert list_resp.status_code == 200
list_ids = {e["id"] for e in list_resp.json()["entries"]}
export_resp = client.get("/api/v1/activity-log/export?format=json")
assert export_resp.status_code == 200
exported = json.loads(export_resp.text)
export_ids = {e["id"] for e in exported}
assert list_ids == export_ids, (
f"Export IDs differ from list IDs.\n"
f"Only in list: {list_ids - export_ids}\n"
f"Only in export: {export_ids - list_ids}"
)
def test_json_export_honours_filter(self, repo, fake_recorder):
"""JSON export with a filter returns only matching entries."""
repo.record(_make_entry(severity=ActivitySeverity.ERROR, message="error"))
repo.record(_make_entry(severity=ActivitySeverity.INFO, message="info"))
repo.record(_make_entry(severity=ActivitySeverity.WARNING, message="warn"))
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=json&severities=error")
assert resp.status_code == 200
data = json.loads(resp.text)
assert len(data) == 1
assert data[0]["severity"] == "error"
def test_json_export_all_11_fields_present(self, repo, fake_recorder):
"""Every exported entry must contain all 11 schema fields."""
repo.record(
_make_entry(
entity_type="device",
entity_id="d1",
entity_name="Test Device",
metadata={"k": "v"},
)
)
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=json")
data = json.loads(resp.text)
assert len(data) == 1
expected_keys = {
"id",
"ts",
"category",
"action",
"severity",
"actor",
"entity_type",
"entity_id",
"entity_name",
"message",
"metadata",
}
missing = expected_keys - set(data[0].keys())
assert not missing, f"Missing fields in JSON export: {missing}"
def test_json_export_filtered_empty_returns_empty_array(self, repo, fake_recorder):
"""A filter that matches nothing returns [] (not null, not {})."""
repo.record(_make_entry(category=ActivityCategory.AUTH))
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=json&categories=device")
assert resp.status_code == 200
data = json.loads(resp.text)
assert data == []
def test_json_export_content_disposition_filename(self, repo, fake_recorder):
"""JSON export Content-Disposition must include filename with .json extension."""
client = _make_client(repo, recorder=fake_recorder)
resp = client.get("/api/v1/activity-log/export?format=json")
cd = resp.headers.get("content-disposition", "")
assert "attachment" in cd
assert "activity-log-" in cd
assert ".json" in cd
# ---------------------------------------------------------------------------
# 4. PAGINATION INTEGRITY
# ---------------------------------------------------------------------------
class TestPaginationIntegrity:
def test_full_traversal_no_gaps_no_dupes(self, repo):
"""Page through all entries; every entry appears exactly once."""
n = 23
for i in range(n):
repo.record(_make_entry(message=f"entry {i:03d}"))
client = _make_client(repo)
all_ids: list[str] = []
before_seq = None
max_pages = n + 1 # safety guard
for _ in range(max_pages):
url = "/api/v1/activity-log?limit=7"
if before_seq is not None:
url += f"&before_seq={before_seq}"
resp = client.get(url)
assert resp.status_code == 200
data = resp.json()
page_ids = [e["id"] for e in data["entries"]]
dupes = [pid for pid in page_ids if pid in all_ids]
assert not dupes, f"Duplicate entries on page: {dupes}"
all_ids.extend(page_ids)
if not data["has_more"]:
break
before_seq = data["next_before_seq"]
assert len(all_ids) == n, f"Expected {n} unique entries, got {len(all_ids)}"
def test_total_is_stable_across_pages(self, repo):
"""The 'total' field stays constant across all pages for the same filters."""
for i in range(13):
repo.record(_make_entry(message=f"msg {i}"))
client = _make_client(repo)
totals: list[int] = []
before_seq = None
for _ in range(10):
url = "/api/v1/activity-log?limit=5"
if before_seq is not None:
url += f"&before_seq={before_seq}"
resp = client.get(url)
data = resp.json()
totals.append(data["total"])
if not data["has_more"]:
break
before_seq = data["next_before_seq"]
assert len(set(totals)) == 1, f"'total' changed across pages: {totals}"
assert totals[0] == 13
def test_limit_above_200_rejected(self, repo):
"""limit=201 → 422 (hard cap)."""
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?limit=201")
assert resp.status_code == 422
def test_limit_at_cap_200_accepted(self, repo):
"""limit=200 is the maximum allowed value and must be accepted."""
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?limit=200")
assert resp.status_code == 200
def test_limit_1_accepted(self, repo):
"""limit=1 is the minimum allowed value."""
repo.record(_make_entry())
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?limit=1")
assert resp.status_code == 200
def test_limit_0_rejected(self, repo):
"""limit=0 → 422."""
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?limit=0")
assert resp.status_code == 422
def test_limit_negative_rejected(self, repo):
"""limit=-5 → 422."""
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?limit=-5")
assert resp.status_code == 422
def test_before_seq_beyond_range_returns_empty(self, repo):
"""before_seq larger than any seq in the DB returns an empty page gracefully."""
repo.record(_make_entry())
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?before_seq=999999999")
assert resp.status_code == 200
data = resp.json()
# before_seq=999999999 means "entries with seq < 999999999" which is everything,
# so we should actually get the entry. Let's use seq=1 to get nothing.
# Actually this tests that a very large before_seq doesn't crash.
assert isinstance(data["entries"], list)
assert isinstance(data["has_more"], bool)
def test_before_seq_1_returns_empty_page(self, repo):
"""before_seq=1 (before any autoincrement seq) → empty page, no error."""
for _ in range(3):
repo.record(_make_entry())
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?before_seq=1")
assert resp.status_code == 200
data = resp.json()
assert data["entries"] == []
assert data["has_more"] is False
def test_has_more_false_next_before_seq_null(self, repo):
"""When has_more is False, next_before_seq must be null (not an int)."""
for _ in range(3):
repo.record(_make_entry())
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?limit=10")
data = resp.json()
assert data["has_more"] is False
assert data["next_before_seq"] is None
def test_second_page_cursor_is_usable(self, repo):
"""next_before_seq from page 1 must work as before_seq on page 2."""
for i in range(6):
repo.record(_make_entry(message=f"msg {i}"))
client = _make_client(repo)
page1 = client.get("/api/v1/activity-log?limit=3").json()
assert page1["has_more"] is True
cursor = page1["next_before_seq"]
assert cursor is not None
page2 = client.get(f"/api/v1/activity-log?limit=3&before_seq={cursor}").json()
ids_p1 = {e["id"] for e in page1["entries"]}
ids_p2 = {e["id"] for e in page2["entries"]}
assert ids_p1.isdisjoint(ids_p2), f"Overlap between page1 and page2: {ids_p1 & ids_p2}"
# ---------------------------------------------------------------------------
# 5. FILTER EDGE CASES
# ---------------------------------------------------------------------------
class TestFilterEdgeCases:
def test_combined_filters_and_semantics(self, repo):
"""Multiple filters narrow results with AND semantics."""
repo.record(
_make_entry(
category=ActivityCategory.AUTH,
severity=ActivitySeverity.WARNING,
actor="alice",
message="auth warning",
)
)
repo.record(
_make_entry(
category=ActivityCategory.AUTH,
severity=ActivitySeverity.INFO,
actor="alice",
message="auth info",
)
)
repo.record(
_make_entry(
category=ActivityCategory.ENTITY,
severity=ActivitySeverity.WARNING,
actor="alice",
message="entity warning",
)
)
client = _make_client(repo)
# category=auth AND severity=warning AND actor=alice → exactly 1
resp = client.get("/api/v1/activity-log?categories=auth&severities=warning&actor=alice")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1, f"Expected 1 result from combined filters, got {data['total']}"
entry = data["entries"][0]
assert entry["category"] == "auth"
assert entry["severity"] == "warning"
assert entry["actor"] == "alice"
def test_categories_multi_value_or_semantics(self, repo):
"""Multiple 'categories' params → OR within the dimension."""
repo.record(_make_entry(category=ActivityCategory.AUTH))
repo.record(_make_entry(category=ActivityCategory.DEVICE))
repo.record(_make_entry(category=ActivityCategory.ENTITY))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?categories=auth&categories=device")
assert resp.status_code == 200
assert resp.json()["total"] == 2
def test_severities_multi_value(self, repo):
"""Multiple 'severities' params → OR within the dimension."""
repo.record(_make_entry(severity=ActivitySeverity.INFO))
repo.record(_make_entry(severity=ActivitySeverity.WARNING))
repo.record(_make_entry(severity=ActivitySeverity.ERROR))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?severities=warning&severities=error")
assert resp.status_code == 200
assert resp.json()["total"] == 2
def test_bad_since_format_is_422(self, repo):
"""Malformed 'since' datetime → 422 (Pydantic validation)."""
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?since=not-a-date")
assert resp.status_code == 422
def test_bad_until_format_is_422(self, repo):
"""Malformed 'until' datetime → 422."""
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?until=2024-99-99")
assert resp.status_code == 422
def test_q_with_percent_literal_treated_literally(self, repo):
"""A literal '%' in q must not act as a LIKE wildcard."""
repo.record(_make_entry(message="100% done"))
repo.record(_make_entry(message="other entry"))
client = _make_client(repo)
# URL-encode % as %25; the server must pass it as a literal to LIKE
resp = client.get("/api/v1/activity-log?q=100%25")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1, (
f"Literal '100%' matched {data['total']} entries (expected 1). "
f"'%' is being used as a wildcard instead of a literal character."
)
def test_q_with_underscore_literal(self, repo):
"""A literal '_' in q must not act as a LIKE single-char wildcard."""
repo.record(_make_entry(message="snake_case message"))
repo.record(_make_entry(message="camelCase message"))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?q=snake_case")
assert resp.status_code == 200
data = resp.json()
# "snake_case" as a LIKE pattern would match any 10-char sequence;
# as a literal it should only match the entry with "snake_case"
assert data["total"] == 1, (
f"Literal 'snake_case' matched {data['total']} entries (expected 1). "
f"'_' may be acting as a wildcard."
)
def test_q_with_single_quote_no_sql_error(self, repo):
"""A single quote in q must not cause a SQL error."""
repo.record(_make_entry(message="it's working"))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?q=it%27s")
assert resp.status_code == 200
def test_q_with_backslash_no_sql_error(self, repo):
"""A backslash in q must not cause a SQL error (LIKE ESCAPE '\\')."""
repo.record(_make_entry(message=r"path\to\file"))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?q=path%5Cto")
assert resp.status_code == 200
def test_unknown_category_returns_empty(self, repo):
"""A category value that matches no entries returns total=0 (not an error)."""
repo.record(_make_entry(category=ActivityCategory.AUTH))
client = _make_client(repo)
resp = client.get("/api/v1/activity-log?categories=nonexistent_category")
# Should return 200 with empty results (not 422) — the contract is an enum
# at the DB level, not at the HTTP level; unknown values simply match nothing.
assert resp.status_code in (
200,
422,
), f"Unexpected status {resp.status_code} for unknown category value"
if resp.status_code == 200:
assert resp.json()["total"] == 0
def test_since_after_until_returns_empty(self, repo):
"""since > until is a valid (but empty) range — should not be a server error."""
repo.record(_make_entry(message="any entry"))
client = _make_client(repo)
resp = client.get(
"/api/v1/activity-log" "?since=2024-06-10T00:00:00Z&until=2024-06-09T00:00:00Z"
)
# The DB will evaluate ts >= since AND ts <= until — when since > until,
# no row can satisfy both; result must be empty, not an error.
assert resp.status_code in (200, 422)
if resp.status_code == 200:
assert resp.json()["total"] == 0
def test_actor_empty_string_matches_nothing(self, repo):
"""actor='' should match no entries (empty string is treated as None/absent)."""
repo.record(_make_entry(actor="alice"))
client = _make_client(repo)
# Empty string actor — the implementation converts '' → None → no filter,
# or it becomes an exact-match for '' which also returns 0.
resp = client.get("/api/v1/activity-log?actor=")
assert resp.status_code == 200
# ---------------------------------------------------------------------------
# 6. SETTINGS EDGE CASES
# ---------------------------------------------------------------------------
class TestSettingsEdgeCases:
def test_max_days_at_cap_3650_accepted(self, repo, fake_retention_engine):
"""max_days=3650 (cap) must be accepted."""
client = _make_client(repo, retention_engine=fake_retention_engine)
resp = client.put(
"/api/v1/activity-log/settings",
json={"enabled": True, "max_days": 3650, "max_entries": 0},
)
assert resp.status_code == 200
assert resp.json()["max_days"] == 3650
def test_max_days_one_over_cap_rejected(self, repo, fake_retention_engine):
"""max_days=3651 (one over cap) must be rejected with 422."""
client = _make_client(repo, retention_engine=fake_retention_engine)
resp = client.put(
"/api/v1/activity-log/settings",
json={"enabled": True, "max_days": 3651, "max_entries": 0},
)
assert resp.status_code == 422
def test_max_entries_at_cap_accepted(self, repo, fake_retention_engine):
"""max_entries=10_000_000 (cap) must be accepted."""
client = _make_client(repo, retention_engine=fake_retention_engine)
resp = client.put(
"/api/v1/activity-log/settings",
json={"enabled": True, "max_days": 0, "max_entries": 10_000_000},
)
assert resp.status_code == 200
assert resp.json()["max_entries"] == 10_000_000
def test_max_entries_one_over_cap_rejected(self, repo, fake_retention_engine):
"""max_entries=10_000_001 (one over cap) must be rejected with 422."""
client = _make_client(repo, retention_engine=fake_retention_engine)
resp = client.put(
"/api/v1/activity-log/settings",
json={"enabled": True, "max_days": 0, "max_entries": 10_000_001},
)
assert resp.status_code == 422
def test_enabled_false_then_get_reflects_disabled(self, repo, fake_retention_engine):
"""PUT enabled=False → GET returns enabled=False."""
client = _make_client(repo, retention_engine=fake_retention_engine)
client.put(
"/api/v1/activity-log/settings",
json={"enabled": False, "max_days": 30, "max_entries": 1000},
)
resp = client.get("/api/v1/activity-log/settings")
assert resp.status_code == 200
assert resp.json()["enabled"] is False
def test_malformed_body_missing_field_rejected(self, repo, fake_retention_engine):
"""PUT body missing 'enabled' → 422."""
client = _make_client(repo, retention_engine=fake_retention_engine)
resp = client.put(
"/api/v1/activity-log/settings",
json={"max_days": 30, "max_entries": 1000},
# 'enabled' is required; Pydantic should reject the body
)
assert resp.status_code == 422
def test_malformed_body_extra_field_ignored(self, repo, fake_retention_engine):
"""PUT body with unknown extra field must not cause a 500."""
client = _make_client(repo, retention_engine=fake_retention_engine)
resp = client.put(
"/api/v1/activity-log/settings",
json={
"enabled": True,
"max_days": 30,
"max_entries": 1000,
"undocumented_field": "surprise",
},
)
# Pydantic ignores extra fields by default; should succeed or 422, never 500
assert resp.status_code in (200, 422)
def test_put_settings_not_dict_rejected(self, repo, fake_retention_engine):
"""PUT with a JSON array body (not an object) → 422."""
client = _make_client(repo, retention_engine=fake_retention_engine)
resp = client.put(
"/api/v1/activity-log/settings",
content=b'["not", "a", "dict"]',
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 422
def test_put_settings_empty_body_rejected(self, repo, fake_retention_engine):
"""PUT with empty body → 422."""
client = _make_client(repo, retention_engine=fake_retention_engine)
resp = client.put(
"/api/v1/activity-log/settings",
content=b"",
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 422
def test_put_settings_zero_values_accepted(self, repo, fake_retention_engine):
"""max_days=0 and max_entries=0 (no pruning) must be accepted."""
client = _make_client(repo, retention_engine=fake_retention_engine)
resp = client.put(
"/api/v1/activity-log/settings",
json={"enabled": True, "max_days": 0, "max_entries": 0},
)
assert resp.status_code == 200
data = resp.json()
assert data["max_days"] == 0
assert data["max_entries"] == 0
# ---------------------------------------------------------------------------
# 7. CLEAR IS AUDITED
# ---------------------------------------------------------------------------
class TestClearIsAudited:
def test_clear_leaves_exactly_one_post_clear_entry(self, repo):
"""Integration: DELETE leaves exactly one 'activity_log.cleared' entry via real recorder."""
from ledgrab.core.activity_log.recorder import ActivityRecorder
from ledgrab.storage.activity_log import ActivityLogFilters
real_recorder = ActivityRecorder(repo, MagicMock())
# Pre-populate
for _ in range(5):
repo.record(_make_entry())
assert repo.count() == 5
client = _make_client(repo, recorder=real_recorder, auth_label="admin-key")
resp = client.delete("/api/v1/activity-log")
assert resp.status_code == 200
remaining = repo.count()
assert remaining == 1, f"Expected exactly 1 post-clear audit entry, got {remaining}"
entries = repo.query(ActivityLogFilters())
assert len(entries) == 1
assert entries[0].action == "activity_log.cleared"
def test_clear_audit_entry_is_system_category(self, repo):
"""The post-clear audit entry must have category='system'."""
from ledgrab.core.activity_log.recorder import ActivityRecorder
from ledgrab.storage.activity_log import ActivityLogFilters
real_recorder = ActivityRecorder(repo, MagicMock())
repo.record(_make_entry())
client = _make_client(repo, recorder=real_recorder, auth_label="tester")
client.delete("/api/v1/activity-log")
entries = repo.query(ActivityLogFilters())
assert entries[0].category == ActivityCategory.SYSTEM
def test_clear_audit_entry_records_deleted_count(self, repo):
"""The post-clear audit metadata must include the correct deleted_count."""
from ledgrab.core.activity_log.recorder import ActivityRecorder
from ledgrab.storage.activity_log import ActivityLogFilters
real_recorder = ActivityRecorder(repo, MagicMock())
for _ in range(7):
repo.record(_make_entry())
client = _make_client(repo, recorder=real_recorder, auth_label="tester")
resp = client.delete("/api/v1/activity-log")
assert resp.json()["deleted"] == 7
entries = repo.query(ActivityLogFilters())
assert (
entries[0].metadata.get("deleted_count") == 7
), f"Audit metadata missing correct deleted_count: {entries[0].metadata}"
def test_clear_audit_entry_records_actor(self, repo):
"""The post-clear audit entry's actor must match the authenticated label."""
from ledgrab.core.activity_log.recorder import ActivityRecorder
from ledgrab.storage.activity_log import ActivityLogFilters
real_recorder = ActivityRecorder(repo, MagicMock())
client = _make_client(repo, recorder=real_recorder, auth_label="specific-key")
client.delete("/api/v1/activity-log")
entries = repo.query(ActivityLogFilters())
assert (
entries[0].actor == "specific-key"
), f"Expected actor 'specific-key', got {entries[0].actor!r}"
def test_clear_audit_entry_action_is_correct(self, repo, fake_recorder):
"""The audit record action must be 'activity_log.cleared' (not cleared or log_cleared)."""
client = _make_client(repo, recorder=fake_recorder, auth_label="tester")
client.delete("/api/v1/activity-log")
assert len(fake_recorder.calls) == 1
assert (
fake_recorder.calls[0]["action"] == "activity_log.cleared"
), f"Wrong action recorded: {fake_recorder.calls[0]['action']!r}"
def test_clear_response_deleted_matches_pre_clear_count(self, repo, fake_recorder):
"""The 'deleted' count in the response must equal the pre-clear row count."""
n = 9
for _ in range(n):
repo.record(_make_entry())
assert repo.count() == n
client = _make_client(repo, recorder=fake_recorder)
resp = client.delete("/api/v1/activity-log")
assert resp.status_code == 200
assert resp.json()["deleted"] == n, f"Expected deleted={n}, got {resp.json()['deleted']}"
def test_clear_empty_log_audit_records_zero_deleted(self, repo, fake_recorder):
"""Clearing an empty log records deleted_count=0 in metadata."""
client = _make_client(repo, recorder=fake_recorder, auth_label="tester")
resp = client.delete("/api/v1/activity-log")
assert resp.status_code == 200
assert resp.json()["deleted"] == 0
assert fake_recorder.calls[0]["metadata"]["deleted_count"] == 0
def test_clear_log_then_get_shows_only_audit_entry(self, repo):
"""After clear, GET /api/v1/activity-log shows exactly the 1 audit entry."""
from ledgrab.core.activity_log.recorder import ActivityRecorder
real_recorder = ActivityRecorder(repo, MagicMock())
for _ in range(4):
repo.record(_make_entry())
client = _make_client(repo, recorder=real_recorder, auth_label="admin")
client.delete("/api/v1/activity-log")
resp = client.get("/api/v1/activity-log")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1, f"Expected total=1 after clear, got {data['total']}"
assert data["entries"][0]["action"] == "activity_log.cleared"