feat(activity-log): phase 4 - REST API (list/export/settings/clear)
- GET /activity-log: filtered, keyset-paginated list (categories/severities/actor/entity/date/q) - GET /activity-log/export: streaming CSV/JSON, chunked keyset (releases DB lock per batch), CSV formula-injection guard - GET/PUT /activity-log/settings: retention config (PUT require_authenticated) - DELETE /activity-log: clear (require_authenticated, self-audited) - security: export DoS fix, settings-PUT auth gate, CSV \t/\r guard, metadata-as-JSON - 122 API tests (auth posture, CSV injection, pagination integrity, filters, settings bounds, clear-audited)
This commit is contained in:
@@ -38,6 +38,7 @@ from .routes.snapshot import router as snapshot_router
|
||||
from .routes.graph import router as graph_router
|
||||
from .routes.calibration import router as calibration_router
|
||||
from .routes.setup import router as setup_router
|
||||
from .routes.activity_log import router as activity_log_router
|
||||
|
||||
router = APIRouter()
|
||||
router.include_router(system_router)
|
||||
@@ -76,5 +77,6 @@ router.include_router(snapshot_router)
|
||||
router.include_router(graph_router)
|
||||
router.include_router(calibration_router)
|
||||
router.include_router(setup_router)
|
||||
router.include_router(activity_log_router)
|
||||
|
||||
__all__ = ["router"]
|
||||
|
||||
@@ -0,0 +1,436 @@
|
||||
"""Activity-log REST API — query / filter / export / settings / clear.
|
||||
|
||||
Endpoints
|
||||
---------
|
||||
GET /api/v1/activity-log List (filterable, keyset-paginated)
|
||||
GET /api/v1/activity-log/export Streaming CSV or JSON export
|
||||
GET /api/v1/activity-log/settings Retention settings
|
||||
PUT /api/v1/activity-log/settings Update retention settings (requires non-anonymous auth)
|
||||
DELETE /api/v1/activity-log Clear all entries (requires non-anonymous auth)
|
||||
|
||||
Auth posture
|
||||
------------
|
||||
- List + read settings (``GET``): ``AuthRequired`` (loopback-anonymous is fine).
|
||||
- Export, update settings (``PUT``), and clear: ``require_authenticated()``
|
||||
(loopback-anonymous is rejected; mirrors the backup download / secret-reveal
|
||||
pattern from ``backup.py``). Updating settings can disable auditing or prune
|
||||
the trail, so it is gated like the destructive clear.
|
||||
|
||||
CSV injection
|
||||
-------------
|
||||
Cells that begin with =, +, -, @, TAB, or CR can trigger formula execution in
|
||||
spreadsheet apps (OWASP Formula Injection). ``_csv_safe`` prefixes any such cell
|
||||
with a single quote so formulas are inert. Fields already go through
|
||||
``sanitize_display`` in Phase 3 instrumentation, but the CSV writer applies its
|
||||
own guard as defence-in-depth.
|
||||
|
||||
Export generator + lock
|
||||
-----------------------
|
||||
``repo.iter_export()`` fetches rows in bounded batches, holding the DB ``_lock``
|
||||
only around each batch fetch and releasing it before yielding — so a slow or
|
||||
stalled client never blocks other DB operations. The ``StreamingResponse``
|
||||
generator is wrapped in a ``try/finally`` block so the batch generator is closed
|
||||
even when the client disconnects mid-stream.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import io
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from typing import Annotated, Iterator
|
||||
|
||||
from fastapi import APIRouter, Depends, Query
|
||||
from fastapi.responses import StreamingResponse
|
||||
|
||||
from ledgrab.api.auth import AuthRequired, require_authenticated
|
||||
from ledgrab.api.dependencies import (
|
||||
get_activity_log_repo,
|
||||
get_activity_log_retention_engine,
|
||||
get_activity_recorder,
|
||||
)
|
||||
from ledgrab.api.schemas.activity_log import (
|
||||
ActivityLogPageResponse,
|
||||
ActivityLogSettingsResponse,
|
||||
UpdateActivityLogSettingsRequest,
|
||||
)
|
||||
from ledgrab.core.activity_log.recorder import ActivityRecorder, entry_to_dict
|
||||
from ledgrab.core.activity_log.retention import ActivityLogRetentionEngine
|
||||
from ledgrab.storage.activity_log import ActivityCategory, ActivityLogFilters, ActivitySeverity
|
||||
from ledgrab.storage.activity_log_repository import ActivityLogRepository
|
||||
|
||||
router = APIRouter(prefix="/api/v1/activity-log", tags=["Activity Log"])
|
||||
|
||||
# Hard cap on the per-request limit to prevent runaway queries.
|
||||
_MAX_LIMIT = 200
|
||||
_DEFAULT_LIMIT = 50
|
||||
|
||||
# CSV export columns (matches entry_to_dict key order)
|
||||
_CSV_COLUMNS = [
|
||||
"id",
|
||||
"ts",
|
||||
"category",
|
||||
"action",
|
||||
"severity",
|
||||
"actor",
|
||||
"entity_type",
|
||||
"entity_id",
|
||||
"entity_name",
|
||||
"message",
|
||||
"metadata",
|
||||
]
|
||||
|
||||
# Characters that trigger formula injection in spreadsheet apps (OWASP).
|
||||
# Leading TAB and CR are also recognised triggers by Excel / Google Sheets.
|
||||
_FORMULA_PREFIXES = ("=", "+", "-", "@", "\t", "\r")
|
||||
|
||||
|
||||
def _csv_safe(value: str) -> str:
|
||||
"""Prefix formula-injection triggers with a literal single-quote.
|
||||
|
||||
A cell starting with =, +, -, or @ can execute as a formula in Excel /
|
||||
Google Sheets. OWASP recommends prepending a single quote to neutralise it.
|
||||
"""
|
||||
if value and value[0] in _FORMULA_PREFIXES:
|
||||
return "'" + value
|
||||
return value
|
||||
|
||||
|
||||
def _build_filters(
|
||||
categories: list[str] | None,
|
||||
severities: list[str] | None,
|
||||
actor: str | None,
|
||||
entity_type: str | None,
|
||||
entity_id: str | None,
|
||||
since: datetime | None,
|
||||
until: datetime | None,
|
||||
q: str | None,
|
||||
) -> ActivityLogFilters:
|
||||
"""Assemble an ``ActivityLogFilters`` dataclass from query parameters."""
|
||||
return ActivityLogFilters(
|
||||
categories=categories or None,
|
||||
severities=severities or None,
|
||||
actor=actor or None,
|
||||
entity_type=entity_type or None,
|
||||
entity_id=entity_id or None,
|
||||
since=since,
|
||||
until=until,
|
||||
message_like=q or None,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GET /api/v1/activity-log — list
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.get("", response_model=ActivityLogPageResponse, summary="List activity-log entries")
|
||||
def list_activity_log(
|
||||
auth: AuthRequired, # noqa: ARG001
|
||||
repo: ActivityLogRepository = Depends(get_activity_log_repo),
|
||||
# ── Filters ────────────────────────────────────────────────────────────
|
||||
categories: Annotated[
|
||||
list[str] | None,
|
||||
Query(
|
||||
description=(
|
||||
"Filter by category (repeatable or comma-separated). "
|
||||
"Values: auth, device, entity, capture, system"
|
||||
)
|
||||
),
|
||||
] = None,
|
||||
severities: Annotated[
|
||||
list[str] | None,
|
||||
Query(description="Filter by severity (repeatable). Values: info, warning, error"),
|
||||
] = None,
|
||||
actor: Annotated[
|
||||
str | None,
|
||||
Query(description="Filter by actor label (exact match)"),
|
||||
] = None,
|
||||
entity_type: Annotated[
|
||||
str | None,
|
||||
Query(description="Filter by entity type (exact match)"),
|
||||
] = None,
|
||||
entity_id: Annotated[
|
||||
str | None,
|
||||
Query(description="Filter by entity id (exact match)"),
|
||||
] = None,
|
||||
since: Annotated[
|
||||
datetime | None,
|
||||
Query(description="Return entries at or after this ISO-8601 datetime"),
|
||||
] = None,
|
||||
until: Annotated[
|
||||
datetime | None,
|
||||
Query(description="Return entries at or before this ISO-8601 datetime"),
|
||||
] = None,
|
||||
q: Annotated[
|
||||
str | None,
|
||||
Query(description="Free-text search in the message field (substring)"),
|
||||
] = None,
|
||||
# ── Pagination ─────────────────────────────────────────────────────────
|
||||
before_seq: Annotated[
|
||||
int | None,
|
||||
Query(
|
||||
description=(
|
||||
"Keyset cursor: pass the 'next_before_seq' from the previous page "
|
||||
"to get the following (older) page. Omit for the first (newest) page."
|
||||
)
|
||||
),
|
||||
] = None,
|
||||
limit: Annotated[
|
||||
int,
|
||||
Query(
|
||||
ge=1,
|
||||
le=_MAX_LIMIT,
|
||||
description=f"Max entries per page (default {_DEFAULT_LIMIT}, max {_MAX_LIMIT})",
|
||||
),
|
||||
] = _DEFAULT_LIMIT,
|
||||
) -> ActivityLogPageResponse:
|
||||
"""Return the newest matching entries, oldest-first within the page.
|
||||
|
||||
Keyset pagination: the response includes ``next_before_seq`` — pass it
|
||||
as ``before_seq`` in the next request to get the next (older) page.
|
||||
The ``total`` field is the count of all entries matching the current
|
||||
filters across all pages.
|
||||
"""
|
||||
filters = _build_filters(categories, severities, actor, entity_type, entity_id, since, until, q)
|
||||
|
||||
# Fetch limit+1 rows to detect whether an older page exists.
|
||||
#
|
||||
# query() fetches DESC internally (newest-first) then reverses to ascending.
|
||||
# With limit+1, the result is ascending: [oldest_probe, ..., newest].
|
||||
# When we got exactly limit+1 rows, has_more is True and the probe row
|
||||
# (index 0 — the oldest) is the extra one. We keep the newest `limit` rows
|
||||
# by slicing [1:], which is the actual page content for the client.
|
||||
# When we got <= limit rows, this is the last page and all rows are included.
|
||||
effective_limit = min(limit, _MAX_LIMIT)
|
||||
entries_plus = repo.query(filters, before_seq=before_seq, limit=effective_limit + 1)
|
||||
has_more = len(entries_plus) > effective_limit
|
||||
if has_more:
|
||||
# Drop the oldest probe row; keep the newest `limit` entries.
|
||||
entries = entries_plus[1:]
|
||||
else:
|
||||
entries = entries_plus
|
||||
|
||||
total = repo.count(filters)
|
||||
|
||||
# Compute next_before_seq: the seq of the oldest entry on this page.
|
||||
# query() returns entries ascending (entries[0] is oldest); its seq is the
|
||||
# cursor for the next page. The next request passes before_seq=X to get
|
||||
# entries with seq < X, i.e. entries older than the oldest entry on this page.
|
||||
# get_seq_for_id() does a cheap indexed point-lookup.
|
||||
next_before_seq: int | None = None
|
||||
if has_more and entries:
|
||||
next_before_seq = repo.get_seq_for_id(entries[0].id)
|
||||
|
||||
return ActivityLogPageResponse(
|
||||
entries=[entry_to_dict(e) for e in entries], # type: ignore[arg-type]
|
||||
next_before_seq=next_before_seq,
|
||||
has_more=has_more,
|
||||
total=total,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GET /api/v1/activity-log/export — streaming export (CSV or JSON)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _export_csv_generator(
|
||||
repo: ActivityLogRepository,
|
||||
filters: ActivityLogFilters,
|
||||
) -> Iterator[bytes]:
|
||||
"""Yield UTF-8-encoded CSV chunks one row at a time.
|
||||
|
||||
The generator wraps ``repo.iter_export()`` in a ``try/finally`` so the DB
|
||||
lock is released even on early client disconnect (which triggers
|
||||
``GeneratorExit``).
|
||||
"""
|
||||
gen = repo.iter_export(filters)
|
||||
try:
|
||||
# Header
|
||||
buf = io.StringIO()
|
||||
writer = csv.writer(buf)
|
||||
writer.writerow(_CSV_COLUMNS)
|
||||
yield buf.getvalue().encode("utf-8")
|
||||
|
||||
for entry in gen:
|
||||
d = entry_to_dict(entry)
|
||||
row = []
|
||||
for col in _CSV_COLUMNS:
|
||||
if col == "metadata":
|
||||
cell = json.dumps(d.get(col) or {})
|
||||
else:
|
||||
cell = str(d.get(col, "") or "")
|
||||
row.append(_csv_safe(cell))
|
||||
buf = io.StringIO()
|
||||
writer = csv.writer(buf)
|
||||
writer.writerow(row)
|
||||
yield buf.getvalue().encode("utf-8")
|
||||
finally:
|
||||
gen.close()
|
||||
|
||||
|
||||
def _export_json_generator(
|
||||
repo: ActivityLogRepository,
|
||||
filters: ActivityLogFilters,
|
||||
) -> Iterator[bytes]:
|
||||
"""Yield a streamed JSON array, one entry per chunk.
|
||||
|
||||
Format: ``[\\n{entry},\\n{entry},\\n...]\\n``
|
||||
The generator wraps ``repo.iter_export()`` in a ``try/finally`` so the DB
|
||||
lock is released even on early client disconnect.
|
||||
"""
|
||||
gen = repo.iter_export(filters)
|
||||
try:
|
||||
first = True
|
||||
yield b"[\n"
|
||||
for entry in gen:
|
||||
d = entry_to_dict(entry)
|
||||
chunk = json.dumps(d, ensure_ascii=False, default=str)
|
||||
if first:
|
||||
yield chunk.encode("utf-8")
|
||||
first = False
|
||||
else:
|
||||
yield b",\n" + chunk.encode("utf-8")
|
||||
yield b"\n]\n"
|
||||
finally:
|
||||
gen.close()
|
||||
|
||||
|
||||
@router.get("/export", summary="Export activity-log entries (streaming CSV or JSON)")
|
||||
def export_activity_log(
|
||||
auth: AuthRequired,
|
||||
repo: ActivityLogRepository = Depends(get_activity_log_repo),
|
||||
# ── Format ────────────────────────────────────────────────────────────
|
||||
format: Annotated[
|
||||
str,
|
||||
Query(description="Export format: 'csv' or 'json'"),
|
||||
] = "csv",
|
||||
# ── Same filters as list ───────────────────────────────────────────────
|
||||
categories: Annotated[list[str] | None, Query()] = None,
|
||||
severities: Annotated[list[str] | None, Query()] = None,
|
||||
actor: Annotated[str | None, Query()] = None,
|
||||
entity_type: Annotated[str | None, Query()] = None,
|
||||
entity_id: Annotated[str | None, Query()] = None,
|
||||
since: Annotated[datetime | None, Query()] = None,
|
||||
until: Annotated[datetime | None, Query()] = None,
|
||||
q: Annotated[str | None, Query()] = None,
|
||||
) -> StreamingResponse:
|
||||
"""Stream all matching entries as CSV or JSON.
|
||||
|
||||
Requires a non-anonymous API key (loopback-anonymous access is rejected
|
||||
because the log may contain IP addresses and entity names).
|
||||
"""
|
||||
require_authenticated(auth)
|
||||
|
||||
if format not in ("csv", "json"):
|
||||
from fastapi import HTTPException
|
||||
|
||||
raise HTTPException(
|
||||
status_code=422,
|
||||
detail="'format' must be 'csv' or 'json'",
|
||||
)
|
||||
|
||||
filters = _build_filters(categories, severities, actor, entity_type, entity_id, since, until, q)
|
||||
|
||||
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H%M%S")
|
||||
|
||||
if format == "csv":
|
||||
filename = f"activity-log-{timestamp}.csv"
|
||||
media_type = "text/csv; charset=utf-8"
|
||||
generator = _export_csv_generator(repo, filters)
|
||||
else:
|
||||
filename = f"activity-log-{timestamp}.json"
|
||||
media_type = "application/json"
|
||||
generator = _export_json_generator(repo, filters)
|
||||
|
||||
return StreamingResponse(
|
||||
generator,
|
||||
media_type=media_type,
|
||||
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GET /api/v1/activity-log/settings
|
||||
# PUT /api/v1/activity-log/settings
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.get(
|
||||
"/settings",
|
||||
response_model=ActivityLogSettingsResponse,
|
||||
summary="Get activity-log retention settings",
|
||||
)
|
||||
def get_activity_log_settings(
|
||||
_: AuthRequired,
|
||||
engine: ActivityLogRetentionEngine = Depends(get_activity_log_retention_engine),
|
||||
) -> ActivityLogSettingsResponse:
|
||||
"""Return the current activity-log retention settings."""
|
||||
return ActivityLogSettingsResponse(**engine.get_settings())
|
||||
|
||||
|
||||
@router.put(
|
||||
"/settings",
|
||||
response_model=ActivityLogSettingsResponse,
|
||||
summary="Update activity-log retention settings",
|
||||
)
|
||||
async def update_activity_log_settings(
|
||||
auth: AuthRequired,
|
||||
body: UpdateActivityLogSettingsRequest,
|
||||
engine: ActivityLogRetentionEngine = Depends(get_activity_log_retention_engine),
|
||||
) -> ActivityLogSettingsResponse:
|
||||
"""Update the activity-log retention settings (applied immediately).
|
||||
|
||||
Requires a non-anonymous API key (loopback-anonymous access is rejected)
|
||||
because disabling the log or pruning retention is equivalent in impact to
|
||||
clearing the audit trail.
|
||||
|
||||
Setting ``enabled=false`` records an audit entry BEFORE the flag takes
|
||||
effect so the last entry in the log shows who disabled recording.
|
||||
"""
|
||||
require_authenticated(auth)
|
||||
result = await engine.update_settings(
|
||||
enabled=body.enabled,
|
||||
max_days=body.max_days,
|
||||
max_entries=body.max_entries,
|
||||
)
|
||||
return ActivityLogSettingsResponse(**result)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DELETE /api/v1/activity-log — clear
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.delete("", summary="Clear all activity-log entries")
|
||||
def clear_activity_log(
|
||||
auth: AuthRequired,
|
||||
repo: ActivityLogRepository = Depends(get_activity_log_repo),
|
||||
recorder: ActivityRecorder = Depends(get_activity_recorder),
|
||||
) -> dict:
|
||||
"""Delete all activity-log entries.
|
||||
|
||||
Requires a non-anonymous API key (loopback-anonymous access is rejected).
|
||||
The clear operation itself is audited — a ``system/activity_log_cleared``
|
||||
entry is recorded AFTER the wipe, so the log shows who cleared it and how
|
||||
many rows were removed.
|
||||
|
||||
Returns ``{"deleted": <count>}``.
|
||||
"""
|
||||
require_authenticated(auth)
|
||||
|
||||
deleted = repo.clear()
|
||||
|
||||
# Record the clear action (best-effort — recorder never raises).
|
||||
recorder.record(
|
||||
category=ActivityCategory.SYSTEM,
|
||||
action="activity_log.cleared",
|
||||
severity=ActivitySeverity.INFO,
|
||||
actor=auth,
|
||||
message=f"Activity log cleared ({deleted} entries removed)",
|
||||
metadata={"deleted_count": deleted},
|
||||
)
|
||||
|
||||
return {"deleted": deleted}
|
||||
@@ -0,0 +1,93 @@
|
||||
"""Pydantic schemas for the activity-log API (Phase 4)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entry + page response
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class ActivityLogEntryResponse(BaseModel):
|
||||
"""Single audit-log entry.
|
||||
|
||||
Shape matches ``entry_to_dict()`` from
|
||||
``ledgrab.core.activity_log.recorder`` exactly — that function is the
|
||||
single source of truth for serialisation; this schema documents the wire
|
||||
format.
|
||||
"""
|
||||
|
||||
id: str = Field(description="Entry id — 'al_<8-hex>'")
|
||||
ts: str = Field(description="ISO-8601 UTC timestamp")
|
||||
category: str = Field(description="Broad bucket (auth, device, entity, capture, system)")
|
||||
action: str = Field(description="Verb-object label, e.g. 'entity.created'")
|
||||
severity: str = Field(description="info | warning | error")
|
||||
actor: str = Field(description="API-key label or 'system' / 'anonymous'")
|
||||
entity_type: str | None = Field(default=None, description="Affected entity type, if applicable")
|
||||
entity_id: str | None = Field(default=None, description="Affected entity id, if applicable")
|
||||
entity_name: str | None = Field(
|
||||
default=None, description="Entity name at time of event, if applicable"
|
||||
)
|
||||
message: str = Field(description="Human-readable description")
|
||||
metadata: dict[str, Any] = Field(default_factory=dict, description="Extra structured context")
|
||||
|
||||
|
||||
class ActivityLogPageResponse(BaseModel):
|
||||
"""Paginated list of audit-log entries (keyset cursor)."""
|
||||
|
||||
entries: list[ActivityLogEntryResponse] = Field(description="Entries on this page")
|
||||
next_before_seq: int | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Pass as 'before_seq' in the next request to get the following page. "
|
||||
"None when this is the last page."
|
||||
),
|
||||
)
|
||||
has_more: bool = Field(
|
||||
description="True when there are more entries before the first entry on this page"
|
||||
)
|
||||
total: int = Field(description="Total entries matching the current filters (all pages)")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Settings
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_MAX_DAYS_CAP = 3650 # 10 years — sanity upper bound
|
||||
_MAX_ENTRIES_CAP = 10_000_000 # 10 M rows — sanity upper bound
|
||||
|
||||
|
||||
class ActivityLogSettingsResponse(BaseModel):
|
||||
"""Current activity-log retention settings."""
|
||||
|
||||
enabled: bool = Field(description="Whether the activity log is recording")
|
||||
max_days: int = Field(
|
||||
ge=0,
|
||||
le=_MAX_DAYS_CAP,
|
||||
description="Retain entries for at most this many days (0 = no age-based pruning)",
|
||||
)
|
||||
max_entries: int = Field(
|
||||
ge=0,
|
||||
le=_MAX_ENTRIES_CAP,
|
||||
description="Keep at most this many entries (0 = no count-based pruning)",
|
||||
)
|
||||
|
||||
|
||||
class UpdateActivityLogSettingsRequest(BaseModel):
|
||||
"""Request body for PUT /settings."""
|
||||
|
||||
enabled: bool = Field(description="Enable or disable activity-log recording")
|
||||
max_days: int = Field(
|
||||
ge=0,
|
||||
le=_MAX_DAYS_CAP,
|
||||
description="Retain entries for at most this many days (0 = no age-based pruning)",
|
||||
)
|
||||
max_entries: int = Field(
|
||||
ge=0,
|
||||
le=_MAX_ENTRIES_CAP,
|
||||
description="Keep at most this many entries (0 = no count-based pruning)",
|
||||
)
|
||||
@@ -20,7 +20,6 @@ Design notes
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
from typing import Iterator
|
||||
|
||||
@@ -260,34 +259,80 @@ class ActivityLogRepository:
|
||||
cursor = self._db.execute(f"DELETE FROM {_TABLE}")
|
||||
return cursor.rowcount
|
||||
|
||||
def get_seq_for_id(self, entry_id: str) -> int | None:
|
||||
"""Return the ``seq`` value for the entry with *entry_id*, or ``None``.
|
||||
|
||||
Used by the API list endpoint to compute the keyset cursor
|
||||
(``next_before_seq``) from the oldest entry on the current page.
|
||||
"""
|
||||
cursor = self._db.execute(
|
||||
f"SELECT seq FROM {_TABLE} WHERE id = ?",
|
||||
(entry_id,),
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
return int(row["seq"]) if row is not None else None
|
||||
|
||||
# -- Export --------------------------------------------------------------
|
||||
|
||||
def iter_export(self, filters: ActivityLogFilters | None = None) -> Iterator[ActivityLogEntry]:
|
||||
def iter_export(
|
||||
self,
|
||||
filters: ActivityLogFilters | None = None,
|
||||
*,
|
||||
batch_size: int = 1000,
|
||||
) -> Iterator[ActivityLogEntry]:
|
||||
"""Yield all matching entries in ascending ``seq`` order.
|
||||
|
||||
Uses a server-side cursor so the entire result set is never loaded
|
||||
into memory — safe for large tables. The connection's ``RLock`` is
|
||||
held for the duration of the iteration; callers should consume this
|
||||
iterator promptly.
|
||||
Fetches rows in bounded batches (keyset-paginated by ``seq``), holding
|
||||
the DB lock only for the duration of each ``fetchall()`` and releasing
|
||||
it before yielding. This prevents a slow/stalled export client from
|
||||
blocking all other DB operations (record, config writes, etc.) for the
|
||||
full duration of the stream.
|
||||
|
||||
Memory usage is bounded to ``batch_size`` rows at a time.
|
||||
"""
|
||||
if filters is None:
|
||||
filters = ActivityLogFilters()
|
||||
|
||||
params: list = []
|
||||
where_fragment = _build_filter_clause(filters, params)
|
||||
where_clause = f"WHERE {where_fragment}" if where_fragment else ""
|
||||
# Keyset cursor: largest seq yielded so far; None means "start from the
|
||||
# very beginning". We iterate ascending (seq ASC), so each batch uses
|
||||
# "seq > ?" to advance past the already-yielded rows.
|
||||
cursor_seq: int | None = None
|
||||
|
||||
sql = (
|
||||
f"SELECT seq, id, ts, category, action, severity, actor, "
|
||||
f"entity_type, entity_id, entity_name, message, metadata "
|
||||
f"FROM {_TABLE} "
|
||||
f"{where_clause} "
|
||||
f"ORDER BY seq ASC"
|
||||
)
|
||||
while True:
|
||||
# Build params list: cursor_seq placeholder must come first because
|
||||
# _build_filter_clause prepends extra_where as the first condition.
|
||||
params: list = []
|
||||
if cursor_seq is not None:
|
||||
params.append(cursor_seq)
|
||||
keyset: str | None = "seq > ?"
|
||||
else:
|
||||
keyset = None
|
||||
where_fragment = _build_filter_clause(filters, params, extra_where=keyset)
|
||||
where_clause = f"WHERE {where_fragment}" if where_fragment else ""
|
||||
params.append(batch_size)
|
||||
|
||||
# Use the raw connection directly to get a streaming cursor.
|
||||
# We borrow the lock for the full iteration.
|
||||
with self._db._lock: # noqa: SLF001 — internal access; no public cursor API
|
||||
cursor: sqlite3.Cursor = self._db._conn.execute(sql, tuple(params)) # noqa: SLF001
|
||||
for row in cursor:
|
||||
sql = (
|
||||
f"SELECT seq, id, ts, category, action, severity, actor, "
|
||||
f"entity_type, entity_id, entity_name, message, metadata "
|
||||
f"FROM {_TABLE} "
|
||||
f"{where_clause} "
|
||||
f"ORDER BY seq ASC "
|
||||
f"LIMIT ?"
|
||||
)
|
||||
|
||||
# Hold the lock only for the bounded fetchall; release before yielding.
|
||||
with self._db._lock: # noqa: SLF001 — internal access; no public cursor API
|
||||
rows = self._db._conn.execute(sql, tuple(params)).fetchall() # noqa: SLF001
|
||||
|
||||
if not rows:
|
||||
break
|
||||
|
||||
for row in rows:
|
||||
yield ActivityLogEntry.from_row(dict(row))
|
||||
|
||||
# The last row has the largest seq in this batch (ORDER BY seq ASC).
|
||||
cursor_seq = rows[-1]["seq"]
|
||||
|
||||
if len(rows) < batch_size:
|
||||
# Fewer rows than requested → this was the final batch.
|
||||
break
|
||||
|
||||
Reference in New Issue
Block a user