diff --git a/plans/activity-log/CONTEXT.md b/plans/activity-log/CONTEXT.md index 0776c6a..be800ea 100644 --- a/plans/activity-log/CONTEXT.md +++ b/plans/activity-log/CONTEXT.md @@ -47,7 +47,7 @@ context (survives across phases; graduates to CLAUDE.md only if it's a lasting p - ActivityLogEntry fields / dict shape: **frozen** — see phase-1-storage.md Handoff section. 11 fields: `id`, `ts`, `category`, `action`, `severity`, `actor`, `message`, `entity_type`, `entity_id`, `entity_name`, `metadata`. `seq` is DB-only (not on dataclass). - ActivityLogFilters shape: **frozen** — 8 optional fields: `categories`, `severities`, `actor`, `entity_type`, `entity_id`, `since`, `until`, `message_like`. See phase-1-storage.md Handoff. -- recorder.record(...) signature + actor ContextVar import path: _(Phase 2 handoff)_ +- recorder.record(...) signature + actor ContextVar import path: **frozen** — see phase-2-recorder-retention.md Handoff section. Signature: `record(category, action, *, severity="info", actor=None, entity_type=None, entity_id=None, entity_name=None, message, metadata=None, _bypass_enabled=False)`. ContextVar: `from ledgrab.core.activity_log.context import current_actor`. Module accessor: `from ledgrab.core.activity_log.recorder import get_module_recorder`. Event payload: `{"type": "activity_logged", "entry": {11-field dict with ts as ISO string, metadata as dict}}`. DI getters: `get_activity_recorder()`, `get_activity_log_repo()`, `get_activity_log_retention_engine()`. - API endpoints + query params + page envelope + settings bounds: _(Phase 4 handoff)_ ## Failed approaches / rejected designs @@ -66,3 +66,4 @@ context (survives across phases; graduates to CLAUDE.md only if it's a lasting p ## Phase progress notes Phase 1 landed (2026-06-09): `activity_log.py` (dataclass + enums + filters + codec), `AddActivityLogTableMigration` (`002_add_activity_log`) appended to `ALL_MIGRATIONS`, `ActivityLogRepository` (record/query/count/prune/clear/iter_export), 41 new tests — all green. Full suite 2226 passed, 0 failed. Schema and method signatures frozen in phase-1-storage.md Handoff. Gotcha: `Database.execute` takes a positional tuple — use `?` placeholders (not `:name`), otherwise Python 3.14 will raise `ProgrammingError`. +Phase 2 landed (2026-06-09): `core/activity_log/` package (`context.py`, `recorder.py`, `retention.py`, `__init__.py`); actor ContextVar set in `api/auth.py` (both branches); `ActivityLogRetentionEngine` mirroring AutoBackupEngine; full wiring in `main.py` (repo at module level, recorder+engine in lifespan, `server.shutting_down` first shutdown action, engine stop before db.close); DI getters in `api/dependencies.py`; `activity_logged` added to `_ALLOWED_SERVER_EVENT_TYPES` in `events-ws.ts`; `set_module_recorder` exposes recorder to non-DI sites; 24 new tests — all green. Full suite 2309 passed, 2 skipped, 0 failed. Ruff clean. diff --git a/plans/activity-log/PLAN.md b/plans/activity-log/PLAN.md index b74fc71..fd423dd 100644 --- a/plans/activity-log/PLAN.md +++ b/plans/activity-log/PLAN.md @@ -80,7 +80,7 @@ is an on-demand CSV/JSON **export** (no separate backup subsystem). | Phase | Domain | Status | Review | Build | Committed | |-------|--------|--------|--------|-------|-----------| | Phase 1: Storage | data | ✅ Done | ✅ Passed | ✅ Passed | ✅ | -| Phase 2: Recorder/Retention | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | +| Phase 2: Recorder/Retention | backend | ✅ Done | ✅ Passed | ✅ Passed | ✅ | | Phase 3: Instrumentation | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | | Phase 4: REST API | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | | Phase 5: Frontend tab | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ | diff --git a/plans/activity-log/phase-2-recorder-retention.md b/plans/activity-log/phase-2-recorder-retention.md index cc9fea0..fd33c95 100644 --- a/plans/activity-log/phase-2-recorder-retention.md +++ b/plans/activity-log/phase-2-recorder-retention.md @@ -1,6 +1,6 @@ # Phase 2: Recorder, actor context, retention, lifecycle -**Status:** ⬜ Not Started +**Status:** ✅ Done **Parent plan:** [PLAN.md](./PLAN.md) **Domain:** backend @@ -15,7 +15,7 @@ sites) — that is Phase 3 — but the full machinery is live and unit-tested. ## Tasks -- [ ] Create `server/src/ledgrab/core/activity_log/__init__.py` and +- [x] Create `server/src/ledgrab/core/activity_log/__init__.py` and `server/src/ledgrab/core/activity_log/recorder.py`: - `ActivityRecorder(repo: ActivityLogRepository, processor_manager, *, loop=None)`. - `record(category, action, *, severity="info", actor=None, entity_type=None, @@ -35,13 +35,13 @@ sites) — that is Phase 3 — but the full machinery is live and unit-tested. - `enabled` flag honored: when retention settings say `enabled=false`, `record()` is a no-op — EXCEPT the "audit log disabled" event itself, which must be recorded before the flag takes effect (see retention engine). -- [ ] Actor `ContextVar`: +- [x] Actor `ContextVar`: - Add `current_actor: ContextVar[str]` (module-level, e.g. in `core/activity_log/context.py` or `api/auth.py`). In `verify_api_key` (`api/auth.py`), set it next to the existing `request.state.auth_label = ...` (both the authenticated label and the `"anonymous"` branch). Default `"system"` when unset. Ensure no cross-request leakage (set on every auth evaluation). -- [ ] Create `server/src/ledgrab/core/activity_log/retention.py`: +- [x] Create `server/src/ledgrab/core/activity_log/retention.py`: - `ActivityLogRetentionEngine(repo, db, recorder)` mirroring `core/backup/auto_backup.py`: `_load_settings()`/`_save_settings()` via `db.get_setting("activity_log")` / `db.set_setting("activity_log", {...})`, `DEFAULT_SETTINGS = {"enabled": True, @@ -50,7 +50,7 @@ sites) — that is Phase 3 — but the full machinery is live and unit-tested. interval (e.g. hourly) then calls `repo.prune(before_ts=now-max_days, max_entries=...)`. `async stop()` → cancel + await task. `get_settings()` / `async update_settings(...)` that persist and apply (changing `enabled` is logged via the recorder BEFORE disabling). -- [ ] Wiring: +- [x] Wiring: - `main.py`: instantiate `activity_log_repo = ActivityLogRepository(db)` (module level near other stores); in `lifespan` startup build `activity_recorder` + `activity_log_retention_engine`, pass to `init_dependencies(...)`, and `await activity_log_retention_engine.start()`. @@ -61,11 +61,11 @@ sites) — that is Phase 3 — but the full machinery is live and unit-tested. `activity_log_retention_engine` to `_deps`, parameters to `init_dependencies`, and getters `get_activity_recorder()`, `get_activity_log_repo()`, `get_activity_log_retention_engine()`. -- [ ] Realtime allowlist (order matters — do allowlist FIRST so the parity test stays green): +- [x] Realtime allowlist (order matters — do allowlist FIRST so the parity test stays green): - Add `'activity_logged'` to `_ALLOWED_SERVER_EVENT_TYPES` in `server/src/ledgrab/static/js/core/events-ws.ts` (+ a one-line comment naming the source). - Confirm `tests/test_events_ws_parity.py` passes with the new emit type. -- [ ] Unit tests `server/tests/core/test_activity_recorder.py` + +- [x] Unit tests `server/tests/core/test_activity_recorder.py` + `test_activity_log_retention.py`: - recorder persists an entry AND calls `fire_event` with `type=="activity_logged"`; - actor resolves from ContextVar; defaults to `"system"`; failure in repo doesn't raise; @@ -106,13 +106,91 @@ sites) — that is Phase 3 — but the full machinery is live and unit-tested. ## Review Checklist -- [ ] All tasks completed -- [ ] Code follows project conventions (engine/DI patterns) -- [ ] No unintended side effects (no call sites yet; lifespan order correct) -- [ ] Build passes (ruff + pytest, incl. parity test) -- [ ] Tests pass (new + existing) +- [x] All tasks completed +- [x] Code follows project conventions (engine/DI patterns) +- [x] No unintended side effects (no call sites yet; lifespan order correct) +- [x] Build passes (ruff + pytest, incl. parity test) +- [x] Tests pass (new + existing) ## Handoff to Next Phase - +### recorder.record(...) — final signature + +```python +recorder.record( + category: str, # ActivityCategory constant + action: str, # verb-object label + *, + severity: str = "info", # ActivitySeverity constant + actor: str | None = None, # resolved from current_actor ContextVar when None + entity_type: str | None = None, + entity_id: str | None = None, + entity_name: str | None = None, + message: str, + metadata: dict | None = None, + _bypass_enabled: bool = False, # internal: used by retention engine only +) -> None +``` + +### Actor ContextVar import path + +```python +from ledgrab.core.activity_log.context import current_actor +``` + +### Module accessor (for non-DI sites) + +```python +from ledgrab.core.activity_log.recorder import get_module_recorder, set_module_recorder +recorder = get_module_recorder() # returns ActivityRecorder | None +``` + +### entry_to_dict helper (for API response serialisation) + +```python +from ledgrab.core.activity_log.recorder import entry_to_dict +d = entry_to_dict(entry) # returns dict with 11 keys +``` + +### Frozen `activity_logged` event payload shape + +```python +{ + "type": "activity_logged", + "entry": { + "id": str, # "al_<8-hex>" + "ts": str, # ISO-8601 UTC string + "category": str, + "action": str, + "severity": str, + "actor": str, + "entity_type": str | None, + "entity_id": str | None, + "entity_name": str | None, + "message": str, + "metadata": dict, # real dict, not JSON string + } +} +``` + +### DI getter names (in `api/dependencies.py`) + +```python +from ledgrab.api.dependencies import ( + get_activity_recorder, + get_activity_log_repo, + get_activity_log_retention_engine, +) +``` + +### Notes for Phase 3 + +- Phase 3 instruments `fire_entity_event` in `api/dependencies.py` by calling + `get_module_recorder()` there (not via FastAPI DI — it's a plain function). +- The actor ContextVar is already set by `verify_api_key` before any route + handler runs, so entity events carry the correct actor automatically. +- `recorder.record(...)` never raises; Phase 3 call sites need no try/except. + +Phase 2 landed (2026-06-09): ActivityRecorder, actor ContextVar, ActivityLogRetentionEngine, +all wiring in main.py/dependencies.py/auth.py, activity_logged allowlist in events-ws.ts, +24 new tests — all green. Full suite 2309 passed, 2 skipped, 0 failed. Ruff clean. diff --git a/server/src/ledgrab/api/auth.py b/server/src/ledgrab/api/auth.py index 6f5e68c..3102165 100644 --- a/server/src/ledgrab/api/auth.py +++ b/server/src/ledgrab/api/auth.py @@ -10,6 +10,7 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from starlette.websockets import WebSocket, WebSocketDisconnect from ledgrab.config import get_config +from ledgrab.core.activity_log.context import current_actor from ledgrab.utils import get_logger from ledgrab.utils.net_classify import is_loopback as _classify_is_loopback @@ -81,6 +82,7 @@ def verify_api_key( # No keys configured — allow loopback only. if _is_loopback(client_host): request.state.auth_label = "anonymous" + current_actor.set("anonymous") return "anonymous" # Allow caller to authenticate explicitly even without configured keys? # No — there are no keys to compare against. Reject. @@ -127,6 +129,9 @@ def verify_api_key( # Stash the friendly label so the access-log middleware can attribute the # request to a client without re-running the token comparison. request.state.auth_label = authenticated_as + # Set the actor ContextVar so ActivityRecorder can resolve it without + # threading it through every call site. + current_actor.set(authenticated_as) return authenticated_as diff --git a/server/src/ledgrab/api/dependencies.py b/server/src/ledgrab/api/dependencies.py index 0a1f15d..c218b7d 100644 --- a/server/src/ledgrab/api/dependencies.py +++ b/server/src/ledgrab/api/dependencies.py @@ -42,6 +42,9 @@ from ledgrab.core.mqtt.mqtt_manager import MQTTManager from ledgrab.storage.http_endpoint_store import HTTPEndpointStore from ledgrab.storage.audio_processing_template_store import AudioProcessingTemplateStore from ledgrab.storage.pattern_template_store import PatternTemplateStore +from ledgrab.core.activity_log.recorder import ActivityRecorder +from ledgrab.core.activity_log.retention import ActivityLogRetentionEngine +from ledgrab.storage.activity_log_repository import ActivityLogRepository T = TypeVar("T") @@ -196,6 +199,18 @@ def get_update_service() -> UpdateService: return _get("update_service", "Update service") +def get_activity_recorder() -> ActivityRecorder: + return _get("activity_recorder", "Activity recorder") + + +def get_activity_log_repo() -> ActivityLogRepository: + return _get("activity_log_repo", "Activity log repository") + + +def get_activity_log_retention_engine() -> ActivityLogRetentionEngine: + return _get("activity_log_retention_engine", "Activity log retention engine") + + # ── Event helper ──────────────────────────────────────────────────────── @@ -257,6 +272,9 @@ def init_dependencies( http_endpoint_store: HTTPEndpointStore | None = None, audio_processing_template_store: AudioProcessingTemplateStore | None = None, pattern_template_store: PatternTemplateStore | None = None, + activity_recorder: ActivityRecorder | None = None, + activity_log_repo: ActivityLogRepository | None = None, + activity_log_retention_engine: ActivityLogRetentionEngine | None = None, ): """Initialize global dependencies.""" _deps.update( @@ -295,5 +313,8 @@ def init_dependencies( "http_endpoint_store": http_endpoint_store, "audio_processing_template_store": audio_processing_template_store, "pattern_template_store": pattern_template_store, + "activity_recorder": activity_recorder, + "activity_log_repo": activity_log_repo, + "activity_log_retention_engine": activity_log_retention_engine, } ) diff --git a/server/src/ledgrab/core/activity_log/__init__.py b/server/src/ledgrab/core/activity_log/__init__.py new file mode 100644 index 0000000..8f47584 --- /dev/null +++ b/server/src/ledgrab/core/activity_log/__init__.py @@ -0,0 +1,23 @@ +"""Activity / audit log core — recorder, retention engine, and actor context. + +Public surface +-------------- +``ActivityRecorder`` — thread-safe facade; persists entries and fires live events. +``ActivityLogRetentionEngine`` — background pruning engine (mirrors AutoBackupEngine). +``current_actor`` — ``ContextVar[str]`` set by the auth layer per request. + +Quick start +----------- +Wired in ``main.py`` lifespan; injected via ``api/dependencies.py`` getters. +Phase 3 adds the instrumentation call sites. +""" + +from ledgrab.core.activity_log.context import current_actor +from ledgrab.core.activity_log.recorder import ActivityRecorder +from ledgrab.core.activity_log.retention import ActivityLogRetentionEngine + +__all__ = [ + "ActivityRecorder", + "ActivityLogRetentionEngine", + "current_actor", +] diff --git a/server/src/ledgrab/core/activity_log/context.py b/server/src/ledgrab/core/activity_log/context.py new file mode 100644 index 0000000..657cc85 --- /dev/null +++ b/server/src/ledgrab/core/activity_log/context.py @@ -0,0 +1,21 @@ +"""Actor context variable for the activity log. + +``current_actor`` is set by ``api/auth.py:verify_api_key`` on every request so +that ``ActivityRecorder.record(...)`` can resolve the actor without requiring +every call site to pass it explicitly. + +Default value is ``"system"`` — used by background engines and any code path +that runs outside a request context (e.g. lifespan startup/shutdown, zeroconf +discovery thread). + +Per-request isolation is guaranteed by ASGI's coroutine context: each request +runs in its own coroutine with its own copy of the context inherited from the +server's main task. The auth layer resets it on every request before the route +handler runs, so stale labels from a previous request cannot bleed into a new +one. +""" + +from contextvars import ContextVar + +#: The actor label for the current request — API-key label or ``"system"``. +current_actor: ContextVar[str] = ContextVar("current_actor", default="system") diff --git a/server/src/ledgrab/core/activity_log/recorder.py b/server/src/ledgrab/core/activity_log/recorder.py new file mode 100644 index 0000000..2fc2533 --- /dev/null +++ b/server/src/ledgrab/core/activity_log/recorder.py @@ -0,0 +1,267 @@ +"""Thread-safe ActivityRecorder facade. + +Responsibilities +---------------- +1. Build an ``ActivityLogEntry`` from the caller-supplied fields. +2. Resolve the ``actor`` from the ``current_actor`` ContextVar when not given. +3. Persist the entry via ``ActivityLogRepository.record()`` on the event-loop + thread — inline if already on that thread, via + ``loop.call_soon_threadsafe`` if called from another thread (e.g. the + zeroconf discovery thread that fires ``device_discovered/lost`` events). +4. Push a live ``activity_logged`` event via + ``ProcessorManager.fire_event({"type": "activity_logged", "entry": {...}})``. +5. Never raise into the caller — audit recording is best-effort. Failures are + logged at ``WARNING`` level so operators can diagnose without breaking the + audited action. + +Thread-marshal pattern mirrors ``utils/log_broadcaster.py`` (``ensure_loop`` / +``call_soon_threadsafe``). + +Module accessor +--------------- +A module-level singleton ``_recorder`` is populated by +``set_module_recorder()`` during ``main.py`` lifespan startup and exposed via +``get_module_recorder()``. Background engines and other non-DI sites that need +to call ``record()`` without FastAPI DI can use this accessor. Phase 3 +instrumentation uses it at the ``fire_entity_event`` choke-point. +""" + +from __future__ import annotations + +import asyncio +import uuid +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +from ledgrab.core.activity_log.context import current_actor +from ledgrab.storage.activity_log import ActivityLogEntry, ActivitySeverity +from ledgrab.utils import get_logger + +if TYPE_CHECKING: + from ledgrab.core.processing.processor_manager import ProcessorManager + from ledgrab.storage.activity_log_repository import ActivityLogRepository + +logger = get_logger(__name__) + + +def _new_id() -> str: + """Generate a compact activity-log entry id: ``al_<8-hex-chars>``.""" + return "al_" + uuid.uuid4().hex[:8] + + +def entry_to_dict(entry: ActivityLogEntry) -> dict: + """Serialise an ``ActivityLogEntry`` to the canonical API/event payload dict. + + Reused by Phase 4 (API response serialisation) and Phase 5 (frontend). + The shape is identical to the flat row codec minus the DB-only ``seq`` + field, but with ``ts`` kept as an ISO-8601 string and ``metadata`` as a + real ``dict`` (not a JSON string). + """ + return { + "id": entry.id, + "ts": entry.ts.isoformat(), + "category": entry.category, + "action": entry.action, + "severity": entry.severity, + "actor": entry.actor, + "entity_type": entry.entity_type, + "entity_id": entry.entity_id, + "entity_name": entry.entity_name, + "message": entry.message, + "metadata": entry.metadata, + } + + +class ActivityRecorder: + """Thread-safe facade for persisting audit log entries. + + Parameters + ---------- + repo: + ``ActivityLogRepository`` used to persist entries. + processor_manager: + ``ProcessorManager`` whose ``fire_event`` dispatches the live + ``activity_logged`` event to WebSocket subscribers. + loop: + Optional: the running asyncio event loop. If ``None``, it is + captured lazily on the first call that originates from an async + context (mirroring ``LogBroadcaster.ensure_loop``). Pass it + explicitly in tests to avoid depending on a real running loop. + """ + + def __init__( + self, + repo: "ActivityLogRepository", + processor_manager: "ProcessorManager", + *, + loop: asyncio.AbstractEventLoop | None = None, + ) -> None: + self._repo = repo + self._pm = processor_manager + self._loop: asyncio.AbstractEventLoop | None = loop + self._enabled: bool = True + + # ── Loop capture (mirrors LogBroadcaster.ensure_loop) ────────────────── + + def ensure_loop(self) -> None: + """Capture the running event loop if not already stored. + + Call from an async context (e.g. lifespan startup) so that + ``call_soon_threadsafe`` works when ``record()`` is later called from + non-async threads. + """ + if self._loop is None: + try: + self._loop = asyncio.get_running_loop() + except RuntimeError: + pass + + # ── Public API ────────────────────────────────────────────────────────── + + @property + def enabled(self) -> bool: + """Whether recording is currently active.""" + return self._enabled + + @enabled.setter + def enabled(self, value: bool) -> None: + self._enabled = value + + def record( + self, + category: str, + action: str, + *, + severity: str = ActivitySeverity.INFO, + actor: str | None = None, + entity_type: str | None = None, + entity_id: str | None = None, + entity_name: str | None = None, + message: str, + metadata: dict | None = None, + _bypass_enabled: bool = False, + ) -> None: + """Append one audit entry — best-effort, never raises. + + Parameters + ---------- + category: + Broad bucket — one of :class:`~ledgrab.storage.activity_log.ActivityCategory`. + action: + Verb-object label, e.g. ``"entity.created"`` or ``"server.shutting_down"``. + severity: + One of :class:`~ledgrab.storage.activity_log.ActivitySeverity`. Defaults + to ``"info"``. + actor: + Who triggered the action. When ``None`` (the common case), the + value is resolved from :data:`~ledgrab.core.activity_log.context.current_actor` + with a default of ``"system"``. + entity_type / entity_id / entity_name: + Optional entity context for entity-domain events. + message: + Human-readable description suitable for display. + metadata: + Small JSON-serialisable dict with extra context. Defaults to ``{}``. + _bypass_enabled: + Internal flag used by the retention engine to record the + "audit log disabled" event even when ``enabled`` is ``False``. + """ + if not self._enabled and not _bypass_enabled: + return + + # Resolve actor from ContextVar when not explicitly supplied. + resolved_actor = actor if actor is not None else current_actor.get() + + entry = ActivityLogEntry( + id=_new_id(), + ts=datetime.now(timezone.utc), + category=category, + action=action, + severity=severity, + actor=resolved_actor, + entity_type=entity_type, + entity_id=entity_id, + entity_name=entity_name, + message=message, + metadata=metadata or {}, + ) + + # Determine whether we are on the event-loop thread or not. + loop = self._loop + if loop is None: + # Lazy capture — may fail if called before the loop is running. + try: + loop = asyncio.get_running_loop() + self._loop = loop + except RuntimeError: + pass + + if loop is not None and loop.is_running(): + try: + current = asyncio.get_event_loop() + except RuntimeError: + current = None + + # If the current thread IS the event-loop thread, write inline. + if current is loop: + self._write_and_emit(entry) + else: + # Called from a non-loop thread (e.g. zeroconf discovery) — + # marshal onto the event-loop thread. + try: + loop.call_soon_threadsafe(self._write_and_emit, entry) + except RuntimeError: + # Loop has been closed (rare; happens during tests) + logger.warning( + "ActivityRecorder: event loop closed, dropping entry %s", entry.id + ) + else: + # No running loop — fall back to a direct synchronous write. + # This path hits in synchronous unit tests that do not start a loop. + self._write_and_emit(entry) + + def _write_and_emit(self, entry: ActivityLogEntry) -> None: + """Persist *entry* and fire the live event — called on the loop thread.""" + try: + self._repo.record(entry) + except Exception as exc: + logger.warning("ActivityRecorder: failed to persist entry %s: %s", entry.id, exc) + return # don't emit an event for an entry that failed to persist + + try: + self._pm.fire_event( + { + "type": "activity_logged", + "entry": entry_to_dict(entry), + } + ) + except Exception as exc: + logger.warning("ActivityRecorder: failed to fire live event for %s: %s", entry.id, exc) + + +# ── Module-level singleton accessor ──────────────────────────────────────── +# +# Background engines and non-DI call sites (Phase 3's fire_entity_event hook, +# device discovery thread) need ``record()`` without going through FastAPI DI. +# ``set_module_recorder`` is called from ``main.py`` lifespan immediately after +# the recorder is wired into ``init_dependencies``. + +_recorder: ActivityRecorder | None = None + + +def set_module_recorder(recorder: ActivityRecorder) -> None: + """Store the application-level recorder in the module singleton. + + Called once from ``main.py`` lifespan startup. + """ + global _recorder + _recorder = recorder + + +def get_module_recorder() -> ActivityRecorder | None: + """Return the module-level recorder, or ``None`` if not yet initialised. + + Callers must guard against ``None`` — this returns ``None`` during module + import and early startup before ``main.py`` lifespan has run. + """ + return _recorder diff --git a/server/src/ledgrab/core/activity_log/retention.py b/server/src/ledgrab/core/activity_log/retention.py new file mode 100644 index 0000000..e6c0fda --- /dev/null +++ b/server/src/ledgrab/core/activity_log/retention.py @@ -0,0 +1,216 @@ +"""Activity log retention engine. + +Mirrors ``core/backup/auto_backup.py``: +- Settings persisted via ``db.get_setting("activity_log")`` / + ``db.set_setting("activity_log", {...})``. +- ``start()`` / ``stop()`` lifecycle following the engine convention used + throughout the codebase. +- Hourly background loop calling ``repo.prune(before_ts=..., max_entries=...)``. +- ``get_settings()`` / ``async update_settings(...)`` for the Settings API + (Phase 4). + +Changing ``enabled`` to ``False`` records an ``"audit_log.disabled"`` event via +the recorder BEFORE the flag takes effect — so the last action in the log is a +record of the intentional disable. +""" + +from __future__ import annotations + +import asyncio +from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING + +from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity +from ledgrab.utils import get_logger + +if TYPE_CHECKING: + from ledgrab.core.activity_log.recorder import ActivityRecorder + from ledgrab.storage.activity_log_repository import ActivityLogRepository + from ledgrab.storage.database import Database + +logger = get_logger(__name__) + +DEFAULT_SETTINGS: dict = { + "enabled": True, + "max_days": 90, + "max_entries": 20000, +} + +# Prune loop interval — run roughly once an hour. +_PRUNE_INTERVAL_SECS = 3600 + + +class ActivityLogRetentionEngine: + """Background engine that prunes old activity log entries. + + Parameters + ---------- + repo: + The ``ActivityLogRepository`` used to prune entries. + db: + The shared ``Database`` singleton for settings persistence. + recorder: + The ``ActivityRecorder`` used to log the "audit log disabled" event + before disabling takes effect. + """ + + def __init__( + self, + repo: "ActivityLogRepository", + db: "Database", + recorder: "ActivityRecorder", + ) -> None: + self._repo = repo + self._db = db + self._recorder = recorder + self._task: asyncio.Task | None = None + self._settings = self._load_settings() + # Rehydrate the recorder's enabled flag from persisted settings so a + # previously-disabled log stays disabled across restarts. + self._recorder.enabled = self._settings["enabled"] + + # ── Settings persistence ─────────────────────────────────────────────── + + def _load_settings(self) -> dict: + data = self._db.get_setting("activity_log") + if data: + return {**DEFAULT_SETTINGS, **data} + return dict(DEFAULT_SETTINGS) + + def _save_settings(self) -> None: + self._db.set_setting( + "activity_log", + { + "enabled": self._settings["enabled"], + "max_days": self._settings["max_days"], + "max_entries": self._settings["max_entries"], + }, + ) + + # ── Lifecycle ────────────────────────────────────────────────────────── + + async def start(self) -> None: + """Start the retention loop if enabled.""" + if self._settings["enabled"]: + self._start_loop() + logger.info( + "Activity log retention engine started " "(max_days=%d, max_entries=%d)", + self._settings["max_days"], + self._settings["max_entries"], + ) + else: + logger.info("Activity log retention engine initialized (disabled)") + + async def stop(self) -> None: + """Cancel the retention loop.""" + self._cancel_loop() + logger.info("Activity log retention engine stopped") + + def _start_loop(self) -> None: + self._cancel_loop() + self._task = asyncio.create_task(self._retention_loop()) + + def _cancel_loop(self) -> None: + if self._task is not None: + self._task.cancel() + self._task = None + + # ── Prune loop ───────────────────────────────────────────────────────── + + async def _retention_loop(self) -> None: + try: + while True: + await asyncio.sleep(_PRUNE_INTERVAL_SECS) + try: + self._prune() + except Exception as exc: + logger.error("Activity log retention prune failed: %s", exc, exc_info=True) + except asyncio.CancelledError: + logger.debug("Activity log retention loop cancelled") + + def _prune(self) -> None: + """Execute one prune pass based on current settings.""" + settings = self._settings + if not settings["enabled"]: + return + + max_days: int = settings["max_days"] + max_entries: int = settings["max_entries"] + + before_ts: datetime | None = None + if max_days and max_days > 0: + before_ts = datetime.now(timezone.utc) - timedelta(days=max_days) + + max_entries_val: int | None = max_entries if max_entries and max_entries > 0 else None + + deleted = self._repo.prune(before_ts=before_ts, max_entries=max_entries_val) + if deleted: + logger.info( + "Activity log pruned %d rows (max_days=%d, max_entries=%d)", + deleted, + max_days, + max_entries, + ) + + # ── Public API ───────────────────────────────────────────────────────── + + def get_settings(self) -> dict: + """Return the current retention settings dict.""" + return { + "enabled": self._settings["enabled"], + "max_days": self._settings["max_days"], + "max_entries": self._settings["max_entries"], + } + + async def update_settings( + self, + *, + enabled: bool, + max_days: int, + max_entries: int, + ) -> dict: + """Persist new settings and apply them immediately. + + If ``enabled`` is changing to ``False``, the disable event is recorded + BEFORE the flag takes effect so there is a final log entry. + + Returns the new settings dict (same as ``get_settings()``). + """ + was_enabled = self._settings["enabled"] + + # Record the disable event before the recorder stops accepting entries. + if was_enabled and not enabled: + self._recorder.record( + category=ActivityCategory.SYSTEM, + action="audit_log.disabled", + severity=ActivitySeverity.WARNING, + actor="system", + message="Activity log recording disabled via settings", + _bypass_enabled=True, + ) + + self._settings["enabled"] = enabled + self._settings["max_days"] = max_days + self._settings["max_entries"] = max_entries + self._save_settings() + + # Propagate enabled flag to the recorder. + self._recorder.enabled = enabled + + if enabled: + self._start_loop() + logger.info( + "Activity log retention enabled (max_days=%d, max_entries=%d)", + max_days, + max_entries, + ) + # Run an immediate prune pass when re-enabling. + try: + self._prune() + except Exception as exc: + logger.error("Activity log immediate prune failed: %s", exc) + else: + self._cancel_loop() + logger.info("Activity log retention disabled") + + return self.get_settings() diff --git a/server/src/ledgrab/main.py b/server/src/ledgrab/main.py index 2a1b08c..a43ee0f 100644 --- a/server/src/ledgrab/main.py +++ b/server/src/ledgrab/main.py @@ -60,6 +60,9 @@ from ledgrab.storage.audio_processing_template_store import AudioProcessingTempl from ledgrab.storage.pattern_template_store import PatternTemplateStore import ledgrab.core.audio.filters # noqa: F401 — trigger audio filter auto-registration from ledgrab.core.backup.auto_backup import AutoBackupEngine +from ledgrab.core.activity_log.recorder import ActivityRecorder, set_module_recorder +from ledgrab.core.activity_log.retention import ActivityLogRetentionEngine +from ledgrab.storage.activity_log_repository import ActivityLogRepository from ledgrab.core.processing.os_notification_listener import OsNotificationListener from ledgrab.core.devices.discovery_watcher import DiscoveryWatcher from ledgrab.core.update.update_service import UpdateService @@ -184,6 +187,10 @@ pattern_template_store = PatternTemplateStore(db) game_event_bus = GameEventBus() register_community_adapters() +# Activity log repository — constructed at module level like other stores so +# it migrates the DB schema (``002_add_activity_log``) on import. +activity_log_repo = ActivityLogRepository(db) + processor_manager = ProcessorManager( ProcessorDependencies( picture_source_store=picture_source_store, @@ -290,6 +297,17 @@ async def lifespan(app: FastAPI): processor_manager=processor_manager, ) + # Create activity recorder + retention engine. The recorder needs the + # processor_manager to fire live events, so it is built after that is + # already constructed at module level. + activity_recorder = ActivityRecorder(activity_log_repo, processor_manager) + activity_recorder.ensure_loop() + activity_log_retention_engine = ActivityLogRetentionEngine( + repo=activity_log_repo, + db=db, + recorder=activity_recorder, + ) + # Create auto-backup engine — derive paths from database location so that # demo mode auto-backups go to data/demo/ instead of data/. _data_dir = Path(config.storage.database_file).parent @@ -347,7 +365,13 @@ async def lifespan(app: FastAPI): http_endpoint_store=http_endpoint_store, audio_processing_template_store=audio_processing_template_store, pattern_template_store=pattern_template_store, + activity_recorder=activity_recorder, + activity_log_repo=activity_log_repo, + activity_log_retention_engine=activity_log_retention_engine, ) + # Expose the recorder via the module singleton so non-DI sites + # (fire_entity_event, device threads) can call record() without FastAPI DI. + set_module_recorder(activity_recorder) # Register devices in processor manager for health monitoring devices = device_store.get_all_devices() @@ -390,6 +414,9 @@ async def lifespan(app: FastAPI): # Start auto-backup engine (periodic configuration backups) await auto_backup_engine.start() + # Start activity log retention engine (hourly prune of old entries) + await activity_log_retention_engine.start() + # Start update checker (periodic release polling) await update_service.start() @@ -438,6 +465,19 @@ async def lifespan(app: FastAPI): except Exception as e: logger.error("Shutdown step '%s' raised: %s", label, e) + # Record the shutdown event FIRST — before any engine teardown — so there + # is always a final log entry on graceful shutdown. + try: + activity_recorder.record( + category="system", + action="server.shutting_down", + severity="info", + actor="system", + message="Server is shutting down", + ) + except Exception as e: + logger.error("Failed to record shutdown event: %s", e) + # Legacy hook — SQLite stores are write-through so this only logs. # Durability comes from PRAGMA synchronous=FULL + the explicit # wal_checkpoint(TRUNCATE) in Database.close() at the end of this block. @@ -510,6 +550,7 @@ async def lifespan(app: FastAPI): await _bounded("update_service.stop", update_service.stop(), timeout=0.5) await _bounded("auto_backup_engine.stop", auto_backup_engine.stop(), timeout=0.5) + await _bounded("activity_log_retention.stop", activity_log_retention_engine.stop(), timeout=0.5) # Close the DB last so it runs a TRUNCATE checkpoint, flushing the WAL # into the main file. Without this, writes can survive a graceful app diff --git a/server/src/ledgrab/static/js/core/events-ws.ts b/server/src/ledgrab/static/js/core/events-ws.ts index 507b8f3..f46f9b4 100644 --- a/server/src/ledgrab/static/js/core/events-ws.ts +++ b/server/src/ledgrab/static/js/core/events-ws.ts @@ -32,6 +32,7 @@ import { openAuthedWs } from './ws-auth.ts'; * update_download_progress — update_service.py (consumed by features/update.ts) * device_discovered — discovery_watcher.py (consumed by features/notifications-watcher.ts) * device_lost — discovery_watcher.py (consumed by features/notifications-watcher.ts) + * activity_logged — core/activity_log/recorder.py (consumed by features/activity-log.ts) * * Missing any of these silently breaks the corresponding UI flow — keep * this list in sync when adding new event types on the server side. @@ -47,6 +48,7 @@ const _ALLOWED_SERVER_EVENT_TYPES: ReadonlySet = new Set([ 'update_download_progress', 'device_discovered', 'device_lost', + 'activity_logged', // source: core/activity_log/recorder.py ]); interface ServerEventEnvelope { diff --git a/server/tests/core/test_activity_log_retention.py b/server/tests/core/test_activity_log_retention.py new file mode 100644 index 0000000..3feab00 --- /dev/null +++ b/server/tests/core/test_activity_log_retention.py @@ -0,0 +1,312 @@ +"""Unit tests for ActivityLogRetentionEngine (Phase 2). + +Coverage targets +---------------- +- Prunes entries older than max_days. +- Prunes entries when count exceeds max_entries. +- Settings round-trip: persist to DB, reload on construction. +- Disabling logs the ``audit_log.disabled`` event via the recorder BEFORE + the flag takes effect. +- ``start()`` / ``stop()`` lifecycle does not raise. +- Loop starts only when ``enabled=True``; does not start when ``enabled=False``. +""" + +from __future__ import annotations + +import asyncio +from datetime import datetime, timedelta, timezone +from unittest.mock import MagicMock + + +from ledgrab.core.activity_log.recorder import ActivityRecorder +from ledgrab.core.activity_log.retention import ( + DEFAULT_SETTINGS, + ActivityLogRetentionEngine, +) +from ledgrab.storage.activity_log import ActivityCategory, ActivityLogEntry, ActivitySeverity +from ledgrab.storage.activity_log_repository import ActivityLogRepository +from ledgrab.storage.database import Database + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_entry( + *, + action: str = "test.action", + ts: datetime | None = None, +) -> ActivityLogEntry: + from datetime import timezone + import uuid + + return ActivityLogEntry( + id="al_" + uuid.uuid4().hex[:8], + ts=ts or datetime.now(timezone.utc), + category=ActivityCategory.SYSTEM, + action=action, + severity=ActivitySeverity.INFO, + actor="system", + message="test", + ) + + +def _repo_and_db(tmp_db: Database): + """Return (ActivityLogRepository, Database) backed by a temp DB.""" + repo = ActivityLogRepository(tmp_db) + return repo, tmp_db + + +def _mock_recorder() -> ActivityRecorder: + recorder = MagicMock(spec=ActivityRecorder) + recorder.enabled = True + return recorder + + +# --------------------------------------------------------------------------- +# Settings persistence +# --------------------------------------------------------------------------- + + +def test_default_settings(tmp_db): + repo, db = _repo_and_db(tmp_db) + recorder = _mock_recorder() + engine = ActivityLogRetentionEngine(repo=repo, db=db, recorder=recorder) + s = engine.get_settings() + assert s == DEFAULT_SETTINGS + + +def test_settings_round_trip(tmp_db): + repo, db = _repo_and_db(tmp_db) + recorder = _mock_recorder() + engine = ActivityLogRetentionEngine(repo=repo, db=db, recorder=recorder) + + asyncio.run(engine.update_settings(enabled=True, max_days=30, max_entries=5000)) + s = engine.get_settings() + assert s["max_days"] == 30 + assert s["max_entries"] == 5000 + + # Reload from DB (simulates restart) + engine2 = ActivityLogRetentionEngine(repo=repo, db=db, recorder=recorder) + s2 = engine2.get_settings() + assert s2["max_days"] == 30 + assert s2["max_entries"] == 5000 + + +# --------------------------------------------------------------------------- +# Pruning +# --------------------------------------------------------------------------- + + +def test_prune_by_age(tmp_db): + repo, db = _repo_and_db(tmp_db) + recorder = _mock_recorder() + engine = ActivityLogRetentionEngine(repo=repo, db=db, recorder=recorder) + + now = datetime.now(timezone.utc) + old_ts = now - timedelta(days=100) + recent_ts = now - timedelta(days=5) + + repo.record(_make_entry(ts=old_ts)) + repo.record(_make_entry(ts=recent_ts)) + assert repo.count() == 2 + + # Simulate prune with max_days=90 + asyncio.run(engine.update_settings(enabled=True, max_days=90, max_entries=0)) + engine._prune() + + # Only the old entry should be gone; 0 for max_entries means no count cap + assert repo.count() == 1 + entries = repo.query( + filters=__import__( + "ledgrab.storage.activity_log", fromlist=["ActivityLogFilters"] + ).ActivityLogFilters() + ) + assert entries[0].ts.date() == recent_ts.date() + + +def test_prune_by_max_entries(tmp_db): + + repo, db = _repo_and_db(tmp_db) + recorder = _mock_recorder() + engine = ActivityLogRetentionEngine(repo=repo, db=db, recorder=recorder) + + for _ in range(10): + repo.record(_make_entry()) + + assert repo.count() == 10 + + asyncio.run(engine.update_settings(enabled=True, max_days=0, max_entries=5)) + engine._prune() + + assert repo.count() == 5 + + +def test_prune_disabled_is_noop(tmp_db): + repo, db = _repo_and_db(tmp_db) + recorder = _mock_recorder() + engine = ActivityLogRetentionEngine(repo=repo, db=db, recorder=recorder) + + for _ in range(5): + repo.record(_make_entry()) + + # Disable engine then force a prune call + asyncio.run(engine.update_settings(enabled=False, max_days=1, max_entries=1)) + engine._prune() # should be a no-op since enabled=False + + assert repo.count() == 5 + + +# --------------------------------------------------------------------------- +# Disabling records the disable event +# --------------------------------------------------------------------------- + + +def test_disable_records_disable_event(tmp_db): + repo, db = _repo_and_db(tmp_db) + recorder = _mock_recorder() + engine = ActivityLogRetentionEngine(repo=repo, db=db, recorder=recorder) + + # Engine starts enabled (DEFAULT_SETTINGS["enabled"] == True) + asyncio.run(engine.update_settings(enabled=False, max_days=90, max_entries=20000)) + + # recorder.record must have been called with the disable action + recorder.record.assert_called_once() + kwargs = recorder.record.call_args + assert kwargs.kwargs.get("action") == "audit_log.disabled" or ( + len(kwargs.args) > 1 and kwargs.args[1] == "audit_log.disabled" + ) + # The bypass flag must be set so the disabled event gets through + assert kwargs.kwargs.get("_bypass_enabled") is True + + +def test_re_enable_does_not_record_disable_event(tmp_db): + repo, db = _repo_and_db(tmp_db) + recorder = _mock_recorder() + engine = ActivityLogRetentionEngine(repo=repo, db=db, recorder=recorder) + + # Disable first + asyncio.run(engine.update_settings(enabled=False, max_days=90, max_entries=20000)) + call_count_after_disable = recorder.record.call_count + + # Re-enable — should NOT call recorder again + asyncio.run(engine.update_settings(enabled=True, max_days=90, max_entries=20000)) + assert recorder.record.call_count == call_count_after_disable + + +# --------------------------------------------------------------------------- +# Lifecycle: start / stop +# --------------------------------------------------------------------------- + + +def test_start_stop_does_not_raise(tmp_db): + repo, db = _repo_and_db(tmp_db) + recorder = _mock_recorder() + engine = ActivityLogRetentionEngine(repo=repo, db=db, recorder=recorder) + + async def _run(): + await engine.start() + await engine.stop() + + asyncio.run(_run()) + + +def test_start_disabled_does_not_create_task(tmp_db): + repo, db = _repo_and_db(tmp_db) + recorder = _mock_recorder() + # Persist disabled setting + db.set_setting("activity_log", {**DEFAULT_SETTINGS, "enabled": False}) + engine = ActivityLogRetentionEngine(repo=repo, db=db, recorder=recorder) + + async def _run(): + await engine.start() + assert engine._task is None + await engine.stop() + + asyncio.run(_run()) + + +def test_start_enabled_creates_task(tmp_db): + repo, db = _repo_and_db(tmp_db) + recorder = _mock_recorder() + engine = ActivityLogRetentionEngine(repo=repo, db=db, recorder=recorder) + # DEFAULT_SETTINGS has enabled=True + + async def _run(): + await engine.start() + assert engine._task is not None + await engine.stop() + assert engine._task is None + + asyncio.run(_run()) + + +# --------------------------------------------------------------------------- +# Regression: recorder.enabled rehydrated from persisted settings on __init__ +# --------------------------------------------------------------------------- + + +def test_recorder_enabled_rehydrated_from_persisted_disabled(tmp_db): + """Engine.__init__ must propagate persisted enabled=False to recorder. + + If a user disabled the activity log and the server restarts, the recorder + must start in the disabled state — not its hardcoded default of True. + """ + from unittest.mock import MagicMock + from ledgrab.core.processing.processor_manager import ProcessorManager + + repo, db = _repo_and_db(tmp_db) + + # Persist enabled=False before constructing the engine. + db.set_setting("activity_log", {**DEFAULT_SETTINGS, "enabled": False}) + + # Use a real ActivityRecorder (not a mock) so we can observe its state. + mock_pm = MagicMock(spec=ProcessorManager) + real_recorder = ActivityRecorder(repo=repo, processor_manager=mock_pm) + + ActivityLogRetentionEngine(repo=repo, db=db, recorder=real_recorder) + + assert real_recorder.enabled is False, ( + "recorder.enabled should be False after constructing the engine " + "over a DB where enabled=False was persisted" + ) + + +def test_recorder_enabled_rehydrated_from_persisted_enabled(tmp_db): + """Engine.__init__ leaves recorder enabled when persisted setting is True.""" + from unittest.mock import MagicMock + from ledgrab.core.processing.processor_manager import ProcessorManager + + repo, db = _repo_and_db(tmp_db) + + # Persist enabled=True explicitly. + db.set_setting("activity_log", {**DEFAULT_SETTINGS, "enabled": True}) + + mock_pm = MagicMock(spec=ProcessorManager) + real_recorder = ActivityRecorder(repo=repo, processor_manager=mock_pm) + + ActivityLogRetentionEngine(repo=repo, db=db, recorder=real_recorder) + + assert real_recorder.enabled is True, ( + "recorder.enabled should be True after constructing the engine " + "over a DB where enabled=True was persisted" + ) + + +def test_recorder_enabled_defaults_to_true_when_no_persisted_setting(tmp_db): + """Engine.__init__ leaves recorder enabled when no setting has been persisted.""" + from unittest.mock import MagicMock + from ledgrab.core.processing.processor_manager import ProcessorManager + + repo, db = _repo_and_db(tmp_db) + # No db.set_setting call — DB has no activity_log setting yet. + + mock_pm = MagicMock(spec=ProcessorManager) + real_recorder = ActivityRecorder(repo=repo, processor_manager=mock_pm) + + ActivityLogRetentionEngine(repo=repo, db=db, recorder=real_recorder) + + assert ( + real_recorder.enabled is True + ), "recorder.enabled should default to True when no setting is persisted" diff --git a/server/tests/core/test_activity_recorder.py b/server/tests/core/test_activity_recorder.py new file mode 100644 index 0000000..20db027 --- /dev/null +++ b/server/tests/core/test_activity_recorder.py @@ -0,0 +1,256 @@ +"""Unit tests for ActivityRecorder (Phase 2). + +Coverage targets +---------------- +- record() persists an entry AND fires ``activity_logged`` via fire_event. +- actor resolves from the ``current_actor`` ContextVar; defaults to ``"system"``. +- Failure in repo.record() does NOT raise into the caller. +- Cross-thread record() from a threading.Thread routes through the event loop + and persists. +- ``enabled=False`` makes record() a no-op (except ``_bypass_enabled``). +- ``entry_to_dict`` produces the expected shape. +""" + +from __future__ import annotations + +import asyncio +import threading +from unittest.mock import MagicMock + + +from ledgrab.core.activity_log.context import current_actor +from ledgrab.core.activity_log.recorder import ActivityRecorder, entry_to_dict +from ledgrab.storage.activity_log import ActivityCategory, ActivityLogEntry, ActivitySeverity + + +# --------------------------------------------------------------------------- +# Helpers / fixtures +# --------------------------------------------------------------------------- + + +def _make_recorder( + *, + fail_repo: bool = False, + loop: asyncio.AbstractEventLoop | None = None, +) -> tuple[ActivityRecorder, list, list]: + """Return (recorder, persisted_entries, fired_events).""" + repo = MagicMock() + persisted: list[ActivityLogEntry] = [] + + if fail_repo: + repo.record.side_effect = RuntimeError("DB exploded") + else: + repo.record.side_effect = lambda entry: persisted.append(entry) + + pm = MagicMock() + fired: list[dict] = [] + pm.fire_event.side_effect = lambda evt: fired.append(evt) + + recorder = ActivityRecorder(repo, pm, loop=loop) + return recorder, persisted, fired + + +# --------------------------------------------------------------------------- +# Basic persistence + event emit +# --------------------------------------------------------------------------- + + +def test_record_persists_entry(): + recorder, persisted, fired = _make_recorder() + recorder.record( + category=ActivityCategory.SYSTEM, + action="test.action", + message="hello", + ) + assert len(persisted) == 1 + entry = persisted[0] + assert entry.category == ActivityCategory.SYSTEM + assert entry.action == "test.action" + assert entry.message == "hello" + assert entry.id.startswith("al_") + + +def test_record_fires_activity_logged_event(): + recorder, persisted, fired = _make_recorder() + recorder.record( + category=ActivityCategory.AUTH, + action="auth.login", + message="user signed in", + ) + assert len(fired) == 1 + evt = fired[0] + assert evt["type"] == "activity_logged" + assert "entry" in evt + assert evt["entry"]["action"] == "auth.login" + + +def test_record_default_severity_is_info(): + recorder, persisted, _ = _make_recorder() + recorder.record(category=ActivityCategory.SYSTEM, action="x", message="m") + assert persisted[0].severity == ActivitySeverity.INFO + + +def test_record_custom_fields(): + recorder, persisted, _ = _make_recorder() + recorder.record( + category=ActivityCategory.ENTITY, + action="entity.created", + severity=ActivitySeverity.WARNING, + entity_type="output_target", + entity_id="ot_abc123", + entity_name="My strip", + message="created", + metadata={"key": "val"}, + ) + e = persisted[0] + assert e.severity == ActivitySeverity.WARNING + assert e.entity_type == "output_target" + assert e.entity_id == "ot_abc123" + assert e.entity_name == "My strip" + assert e.metadata == {"key": "val"} + + +# --------------------------------------------------------------------------- +# Actor resolution from ContextVar +# --------------------------------------------------------------------------- + + +def test_actor_defaults_to_system(): + recorder, persisted, _ = _make_recorder() + recorder.record(category=ActivityCategory.SYSTEM, action="a", message="m") + assert persisted[0].actor == "system" + + +def test_actor_resolved_from_contextvar(): + recorder, persisted, _ = _make_recorder() + token = current_actor.set("homeassistant") + try: + recorder.record(category=ActivityCategory.AUTH, action="b", message="m") + finally: + current_actor.reset(token) + assert persisted[0].actor == "homeassistant" + + +def test_actor_explicit_overrides_contextvar(): + recorder, persisted, _ = _make_recorder() + token = current_actor.set("homeassistant") + try: + recorder.record( + category=ActivityCategory.SYSTEM, + action="c", + message="m", + actor="explicit_actor", + ) + finally: + current_actor.reset(token) + assert persisted[0].actor == "explicit_actor" + + +# --------------------------------------------------------------------------- +# Failure isolation — repo failure must not raise into caller +# --------------------------------------------------------------------------- + + +def test_repo_failure_does_not_raise(): + recorder, persisted, fired = _make_recorder(fail_repo=True) + # Must not raise + recorder.record(category=ActivityCategory.SYSTEM, action="a", message="m") + # No event emitted when persist failed + assert fired == [] + + +# --------------------------------------------------------------------------- +# enabled flag +# --------------------------------------------------------------------------- + + +def test_disabled_recorder_is_noop(): + recorder, persisted, fired = _make_recorder() + recorder.enabled = False + recorder.record(category=ActivityCategory.SYSTEM, action="a", message="m") + assert persisted == [] + assert fired == [] + + +def test_bypass_enabled_flag_records_even_when_disabled(): + recorder, persisted, fired = _make_recorder() + recorder.enabled = False + recorder.record( + category=ActivityCategory.SYSTEM, + action="audit_log.disabled", + message="disabled", + _bypass_enabled=True, + ) + assert len(persisted) == 1 + + +# --------------------------------------------------------------------------- +# entry_to_dict helper +# --------------------------------------------------------------------------- + + +def test_entry_to_dict_shape(): + from datetime import datetime, timezone + + entry = ActivityLogEntry( + id="al_aabbccdd", + ts=datetime(2026, 1, 2, 3, 4, 5, tzinfo=timezone.utc), + category="system", + action="test", + severity="info", + actor="system", + message="hello", + metadata={"x": 1}, + ) + d = entry_to_dict(entry) + assert set(d.keys()) == { + "id", + "ts", + "category", + "action", + "severity", + "actor", + "entity_type", + "entity_id", + "entity_name", + "message", + "metadata", + } + assert d["metadata"] == {"x": 1} # real dict, not JSON string + assert d["ts"].startswith("2026-01-02T03:04:05") + + +# --------------------------------------------------------------------------- +# Cross-thread record() — routes through the event loop and persists +# --------------------------------------------------------------------------- + + +def test_cross_thread_record_routes_through_loop(): + """record() called from a non-loop thread marshals via call_soon_threadsafe.""" + + async def _run(): + recorder, persisted, fired = _make_recorder(loop=asyncio.get_running_loop()) + recorder.ensure_loop() + + done = threading.Event() + + def _thread_body(): + recorder.record( + category=ActivityCategory.DEVICE, + action="device.discovered", + message="found it", + ) + done.set() + + t = threading.Thread(target=_thread_body) + t.start() + t.join(timeout=2.0) + # Give the scheduled callback a chance to run on the loop. + await asyncio.sleep(0.05) + + assert len(persisted) == 1, f"Expected 1 entry, got {persisted}" + assert persisted[0].action == "device.discovered" + assert len(fired) == 1 + assert fired[0]["type"] == "activity_logged" + + asyncio.run(_run()) diff --git a/server/tests/core/test_activity_recorder_adversarial.py b/server/tests/core/test_activity_recorder_adversarial.py new file mode 100644 index 0000000..df7af5f --- /dev/null +++ b/server/tests/core/test_activity_recorder_adversarial.py @@ -0,0 +1,778 @@ +"""Adversarial / edge-case tests for ActivityRecorder and ActivityLogRetentionEngine. + +Derive expected behaviour from the acceptance criteria in +plans/activity-log/phase-2-recorder-retention.md — NOT from what the code does. +A failing test is a bug found in the implementation. + +Coverage areas +-------------- +1. Thread-safety / marshaling — record() from a non-loop thread routes via + call_soon_threadsafe; record() from the loop thread writes inline. + Neither path raises into the caller even when the loop is closed. +2. Best-effort / never-raises — repo.record raises → no exception escapes, + no event emitted; fire_event raises → no exception escapes, entry still + persisted (order: persist THEN emit). +3. Actor resolution — defaults "system"; ContextVar wins when set; + explicit actor= overrides ContextVar; no cross-context leakage (fresh + ContextVar copy does not see a previous set). +4. enabled flag — disabled → NO-OP (nothing persisted, no event); + _bypass_enabled=True → still records despite enabled=False. +5. entry_to_dict / payload — exactly 11 keys; ts is ISO-8601 string; + metadata is a real dict; activity_logged payload shape is frozen. +6. Retention engine — update_settings persists and round-trips; + _prune calls repo.prune with correct before_ts / max_entries; + disabling records exactly one disable event BEFORE recording stops; + subsequent normal record after disable is a no-op; + start() then stop() cancels task cleanly; + stop() without prior start() is safe. +7. Lazy loop capture — recorder built before the loop is running still + works after the loop starts (no explicit loop= argument). +""" + +from __future__ import annotations + +import asyncio +import contextvars +import threading +import uuid +from datetime import datetime, timedelta, timezone +from unittest.mock import MagicMock + + +from ledgrab.core.activity_log.context import current_actor +from ledgrab.core.activity_log.recorder import ( + ActivityRecorder, + entry_to_dict, + get_module_recorder, + set_module_recorder, +) +from ledgrab.core.activity_log.retention import ( + DEFAULT_SETTINGS, + ActivityLogRetentionEngine, +) +from ledgrab.storage.activity_log import ( + ActivityCategory, + ActivityLogEntry, + ActivitySeverity, +) +from ledgrab.storage.activity_log_repository import ActivityLogRepository + + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + + +def _make_recorder( + *, + fail_repo: bool = False, + fail_pm: bool = False, + loop: asyncio.AbstractEventLoop | None = None, +) -> tuple[ActivityRecorder, list[ActivityLogEntry], list[dict]]: + """Return (recorder, persisted_entries, fired_events) — pure mocks.""" + repo = MagicMock() + persisted: list[ActivityLogEntry] = [] + + if fail_repo: + repo.record.side_effect = RuntimeError("DB exploded") + else: + repo.record.side_effect = lambda entry: persisted.append(entry) + + pm = MagicMock() + fired: list[dict] = [] + + if fail_pm: + pm.fire_event.side_effect = RuntimeError("PM exploded") + else: + pm.fire_event.side_effect = lambda evt: fired.append(evt) + + recorder = ActivityRecorder(repo, pm, loop=loop) + return recorder, persisted, fired + + +def _make_entry(action: str = "test.action") -> ActivityLogEntry: + return ActivityLogEntry( + id="al_" + uuid.uuid4().hex[:8], + ts=datetime.now(timezone.utc), + category=ActivityCategory.SYSTEM, + action=action, + severity=ActivitySeverity.INFO, + actor="system", + message="test", + ) + + +# --------------------------------------------------------------------------- +# 1. Thread-safety / marshaling +# --------------------------------------------------------------------------- + + +def test_cross_thread_write_goes_via_call_soon_threadsafe(): + """record() from a non-loop thread must marshal onto the loop, not write inline.""" + + async def _run(): + loop = asyncio.get_running_loop() + recorder, persisted, fired = _make_recorder(loop=loop) + + # Ensure the recorder knows the loop. + recorder.ensure_loop() + + thread_saw_persisted_count: list[int] = [] + + def _thread_body(): + # Nothing should be persisted synchronously in this thread — + # the write must be deferred to the loop via call_soon_threadsafe. + recorder.record( + category=ActivityCategory.DEVICE, + action="device.discovered", + message="zeroconf found it", + ) + # Capture how many entries are persisted synchronously right after the call. + thread_saw_persisted_count.append(len(persisted)) + + t = threading.Thread(target=_thread_body) + t.start() + t.join(timeout=2.0) + assert not t.is_alive(), "thread did not finish in time" + + # The thread's synchronous view must see 0 — the actual write is deferred. + assert thread_saw_persisted_count[0] == 0, ( + "write was not deferred: record() persisted synchronously on the " + "calling thread instead of marshaling to the loop" + ) + + # Give the loop a tick to drain the call_soon_threadsafe callback. + await asyncio.sleep(0.05) + + assert len(persisted) == 1, f"entry not persisted after loop tick: {persisted}" + assert persisted[0].action == "device.discovered" + assert len(fired) == 1 + assert fired[0]["type"] == "activity_logged" + + asyncio.run(_run()) + + +def test_loop_thread_write_is_inline(): + """record() called from the loop thread writes without call_soon_threadsafe.""" + + call_soon_threadsafe_calls: list = [] + + async def _run(): + loop = asyncio.get_running_loop() + # Monkeypatch call_soon_threadsafe to detect if it is used. + original_csst = loop.call_soon_threadsafe + loop.call_soon_threadsafe = lambda *a, **kw: call_soon_threadsafe_calls.append(a) or original_csst(*a, **kw) # type: ignore[method-assign] + try: + recorder, persisted, fired = _make_recorder(loop=loop) + recorder.ensure_loop() + + recorder.record( + category=ActivityCategory.SYSTEM, + action="system.startup", + message="loop thread write", + ) + # No yield — synchronous within the coroutine. + assert len(persisted) == 1, "inline write did not happen synchronously" + assert call_soon_threadsafe_calls == [], ( + "call_soon_threadsafe was invoked from the loop thread — " + "should write inline instead" + ) + finally: + loop.call_soon_threadsafe = original_csst # type: ignore[method-assign] + + asyncio.run(_run()) + + +def test_cross_thread_closed_loop_does_not_raise(): + """record() from a thread after the loop closes must log a warning, not raise.""" + loop = asyncio.new_event_loop() + recorder, persisted, fired = _make_recorder(loop=loop) + # Close the loop immediately — simulate a test teardown race. + loop.close() + + # This must not raise. + recorder.record( + category=ActivityCategory.SYSTEM, + action="closed.loop.test", + message="should not raise", + ) + # The entry may or may not be persisted (the closed-loop path drops it), + # but the important contract is: no exception propagates. + + +def test_no_loop_falls_back_to_synchronous_write(): + """record() with no loop and no running loop writes synchronously.""" + recorder, persisted, fired = _make_recorder(loop=None) + # No loop is running here (plain synchronous test). + recorder.record( + category=ActivityCategory.SYSTEM, + action="no.loop.write", + message="synchronous fallback", + ) + assert len(persisted) == 1 + assert len(fired) == 1 + + +# --------------------------------------------------------------------------- +# 2. Best-effort / never-raises +# --------------------------------------------------------------------------- + + +def test_repo_failure_does_not_raise_and_suppresses_fire_event(): + """repo.record raises → no exception propagates AND fire_event is NOT called.""" + recorder, persisted, fired = _make_recorder(fail_repo=True) + # Must not raise. + recorder.record(category=ActivityCategory.SYSTEM, action="a", message="m") + # No event must be emitted for an entry that failed to persist. + assert fired == [], ( + "fire_event was called even though repo.record failed — " + "the event would reference an entry that was never stored" + ) + + +def test_fire_event_failure_does_not_raise_and_entry_is_persisted(): + """fire_event raises → no exception propagates AND the entry IS persisted. + + Order: persist THEN emit. An emit failure must not roll back the persist. + """ + recorder, persisted, fired = _make_recorder(fail_pm=True) + # Must not raise. + recorder.record(category=ActivityCategory.SYSTEM, action="b", message="m") + # Entry must have been persisted before the emit attempt. + assert ( + len(persisted) == 1 + ), "entry was not persisted — fire_event failure retroactively erased it" + + +def test_both_failures_do_not_raise(): + """Even when both repo AND fire_event raise, record() must not propagate.""" + recorder, persisted, fired = _make_recorder(fail_repo=True, fail_pm=True) + recorder.record(category=ActivityCategory.SYSTEM, action="c", message="m") + # Nothing persisted, nothing fired — but absolutely no exception. + + +# --------------------------------------------------------------------------- +# 3. Actor resolution / ContextVar isolation +# --------------------------------------------------------------------------- + + +def test_actor_defaults_to_system_when_contextvar_unset(): + """When no actor arg and ContextVar at default, actor must be 'system'.""" + recorder, persisted, _ = _make_recorder() + # Make sure the ContextVar is at its default. + token = current_actor.set("system") + current_actor.reset(token) + recorder.record(category=ActivityCategory.SYSTEM, action="a", message="m") + assert persisted[0].actor == "system" + + +def test_explicit_actor_overrides_contextvar(): + """Explicit actor= argument wins over the ContextVar value.""" + recorder, persisted, _ = _make_recorder() + token = current_actor.set("api_user") + try: + recorder.record( + category=ActivityCategory.AUTH, + action="auth.login", + message="explicit", + actor="override_actor", + ) + finally: + current_actor.reset(token) + assert ( + persisted[0].actor == "override_actor" + ), "explicit actor= was ignored; ContextVar won over explicit arg" + + +def test_contextvar_value_used_when_set(): + """When current_actor ContextVar is set, recorder picks it up.""" + recorder, persisted, _ = _make_recorder() + token = current_actor.set("mobile_client") + try: + recorder.record(category=ActivityCategory.ENTITY, action="e.created", message="m") + finally: + current_actor.reset(token) + assert persisted[0].actor == "mobile_client" + + +def test_no_cross_context_leakage_via_copy_context(): + """ContextVar set in one context does not bleed into an independent copy.""" + # Simulate request-1 setting the actor. + ctx_req1 = contextvars.copy_context() + + def _request1(): + current_actor.set("user_alice") + + ctx_req1.run(_request1) + + # Simulate request-2 in a fresh copy — must not see user_alice. + ctx_req2 = contextvars.copy_context() + + def _request2(): + return current_actor.get() + + actor_in_req2 = ctx_req2.run(_request2) + assert actor_in_req2 == "system", ( + f"Cross-context leakage: request-2 saw actor '{actor_in_req2}' " + "from request-1 instead of the default 'system'" + ) + + +def test_no_cross_request_leakage_sequential_records(): + """Two sequential simulated requests must not share ContextVar state.""" + recorder, persisted, _ = _make_recorder() + + # Request 1: set actor, record, reset. + token1 = current_actor.set("admin_user") + try: + recorder.record(category=ActivityCategory.AUTH, action="login", message="req1") + finally: + current_actor.reset(token1) + + # Request 2: no actor set — must fall back to "system". + recorder.record(category=ActivityCategory.SYSTEM, action="heartbeat", message="req2") + + assert persisted[0].actor == "admin_user" + assert persisted[1].actor == "system", ( + f"Request-2 actor was '{persisted[1].actor}' instead of 'system' — " + "ContextVar from request-1 leaked into request-2" + ) + + +# --------------------------------------------------------------------------- +# 4. enabled flag +# --------------------------------------------------------------------------- + + +def test_disabled_record_is_noop_nothing_persisted(): + """enabled=False → record() returns immediately; nothing persisted.""" + recorder, persisted, fired = _make_recorder() + recorder.enabled = False + recorder.record(category=ActivityCategory.SYSTEM, action="a", message="m") + assert persisted == [], "disabled recorder should not persist" + assert fired == [], "disabled recorder should not fire events" + + +def test_disabled_record_bypass_enabled_still_records(): + """_bypass_enabled=True bypasses the enabled=False guard.""" + recorder, persisted, fired = _make_recorder() + recorder.enabled = False + recorder.record( + category=ActivityCategory.SYSTEM, + action="audit_log.disabled", + message="final entry before disable", + _bypass_enabled=True, + ) + assert len(persisted) == 1, "_bypass_enabled=True should still persist" + assert len(fired) == 1, "_bypass_enabled=True should still fire the event" + + +def test_bypass_enabled_false_with_enabled_true_is_normal_record(): + """_bypass_enabled=False and enabled=True → normal record (regression guard).""" + recorder, persisted, fired = _make_recorder() + recorder.enabled = True + recorder.record( + category=ActivityCategory.SYSTEM, + action="normal", + message="m", + _bypass_enabled=False, + ) + assert len(persisted) == 1 + assert len(fired) == 1 + + +# --------------------------------------------------------------------------- +# 5. entry_to_dict / payload shape +# --------------------------------------------------------------------------- + +_EXPECTED_KEYS = frozenset( + { + "id", + "ts", + "category", + "action", + "severity", + "actor", + "entity_type", + "entity_id", + "entity_name", + "message", + "metadata", + } +) + + +def test_entry_to_dict_has_exactly_11_keys(): + """entry_to_dict must return a dict with exactly the 11 frozen keys.""" + d = entry_to_dict(_make_entry()) + assert set(d.keys()) == _EXPECTED_KEYS, ( + f"Key mismatch.\n Missing: {_EXPECTED_KEYS - set(d.keys())}\n" + f" Extra: {set(d.keys()) - _EXPECTED_KEYS}" + ) + + +def test_entry_to_dict_ts_is_iso8601_string(): + """ts must be an ISO-8601 string, not a datetime object.""" + entry = _make_entry() + d = entry_to_dict(entry) + assert isinstance(d["ts"], str), f"ts is {type(d['ts'])}, expected str" + # Must round-trip through fromisoformat without error. + parsed = datetime.fromisoformat(d["ts"]) + assert parsed.tzinfo is not None, "ts ISO string has no timezone info" + + +def test_entry_to_dict_metadata_is_real_dict_not_json_string(): + """metadata must be a dict, not a JSON-encoded string.""" + entry = ActivityLogEntry( + id="al_aabbccdd", + ts=datetime.now(timezone.utc), + category=ActivityCategory.SYSTEM, + action="test", + severity=ActivitySeverity.INFO, + actor="system", + message="hello", + metadata={"nested": {"key": 42}, "list": [1, 2, 3]}, + ) + d = entry_to_dict(entry) + assert isinstance( + d["metadata"], dict + ), f"metadata is {type(d['metadata'])}, expected dict (not a JSON string)" + assert d["metadata"] == {"nested": {"key": 42}, "list": [1, 2, 3]} + + +def test_activity_logged_event_payload_shape(): + """The fired event must match the frozen payload shape from the handoff doc.""" + recorder, persisted, fired = _make_recorder() + recorder.record( + category=ActivityCategory.AUTH, + action="auth.login", + severity=ActivitySeverity.WARNING, + actor="api_user", + entity_type="session", + entity_id="sess_001", + entity_name="admin session", + message="user signed in", + metadata={"ip": "127.0.0.1"}, + ) + assert len(fired) == 1 + evt = fired[0] + assert evt.get("type") == "activity_logged", f"event type wrong: {evt.get('type')!r}" + entry_dict = evt.get("entry") + assert isinstance(entry_dict, dict), "event 'entry' must be a dict" + assert set(entry_dict.keys()) == _EXPECTED_KEYS + assert entry_dict["actor"] == "api_user" + assert entry_dict["entity_type"] == "session" + assert entry_dict["metadata"] == {"ip": "127.0.0.1"} + assert isinstance(entry_dict["metadata"], dict), "metadata in event must be a dict" + + +def test_entry_id_format(): + """Entry IDs must be 'al_' followed by 8 hex characters.""" + recorder, persisted, _ = _make_recorder() + recorder.record(category=ActivityCategory.SYSTEM, action="a", message="m") + entry_id = persisted[0].id + assert entry_id.startswith("al_"), f"id does not start with 'al_': {entry_id!r}" + suffix = entry_id[3:] + assert len(suffix) == 8, f"id suffix length is {len(suffix)}, expected 8: {entry_id!r}" + assert all(c in "0123456789abcdef" for c in suffix), f"id suffix is not hex: {suffix!r}" + + +def test_entry_ts_is_utc(): + """Recorded entry ts must be timezone-aware UTC.""" + recorder, persisted, _ = _make_recorder() + recorder.record(category=ActivityCategory.SYSTEM, action="a", message="m") + ts = persisted[0].ts + assert ts.tzinfo is not None, "ts has no timezone info" + # Must be within 5 seconds of now (sanity check). + delta = abs((datetime.now(timezone.utc) - ts).total_seconds()) + assert delta < 5, f"ts is {delta:.1f}s away from now — suspiciously stale" + + +# --------------------------------------------------------------------------- +# 6. Retention engine +# --------------------------------------------------------------------------- + + +def test_retention_update_settings_persists_to_db(tmp_db): + """update_settings must persist to DB so a fresh engine reload picks it up.""" + repo = ActivityLogRepository(tmp_db) + recorder = MagicMock(spec=ActivityRecorder) + recorder.enabled = True + + engine = ActivityLogRetentionEngine(repo=repo, db=tmp_db, recorder=recorder) + asyncio.run(engine.update_settings(enabled=True, max_days=14, max_entries=500)) + + # Reload from DB — simulates server restart. + engine2 = ActivityLogRetentionEngine(repo=repo, db=tmp_db, recorder=recorder) + s = engine2.get_settings() + assert s["max_days"] == 14, f"max_days not persisted: {s}" + assert s["max_entries"] == 500, f"max_entries not persisted: {s}" + assert s["enabled"] is True + + +def test_retention_prune_passes_correct_before_ts(tmp_db): + """_prune must call repo.prune with before_ts ≈ now - max_days.""" + repo = MagicMock() + repo.prune.return_value = 0 + recorder = MagicMock(spec=ActivityRecorder) + recorder.enabled = True + + engine = ActivityLogRetentionEngine(repo=repo, db=tmp_db, recorder=recorder) + # Patch settings directly (don't go through update_settings to avoid task side-effects). + engine._settings = {"enabled": True, "max_days": 30, "max_entries": 0} + + before = datetime.now(timezone.utc) + engine._prune() + after = datetime.now(timezone.utc) + + repo.prune.assert_called_once() + kwargs = repo.prune.call_args.kwargs + before_ts = kwargs.get("before_ts") + assert before_ts is not None, "_prune called repo.prune without before_ts" + expected_min = before - timedelta(days=30, seconds=1) + expected_max = after - timedelta(days=30) + timedelta(seconds=1) + assert expected_min <= before_ts <= expected_max, ( + f"before_ts {before_ts} is not near now - 30 days " + f"(expected [{expected_min}, {expected_max}])" + ) + + +def test_retention_prune_passes_max_entries(tmp_db): + """_prune must forward max_entries from settings to repo.prune.""" + repo = MagicMock() + repo.prune.return_value = 0 + recorder = MagicMock(spec=ActivityRecorder) + recorder.enabled = True + + engine = ActivityLogRetentionEngine(repo=repo, db=tmp_db, recorder=recorder) + engine._settings = {"enabled": True, "max_days": 0, "max_entries": 9999} + engine._prune() + + kwargs = repo.prune.call_args.kwargs + assert kwargs.get("max_entries") == 9999, f"max_entries not forwarded: {kwargs}" + + +def test_retention_prune_zero_max_entries_means_no_count_cap(tmp_db): + """max_entries=0 should pass None (no count cap) to repo.prune.""" + repo = MagicMock() + repo.prune.return_value = 0 + recorder = MagicMock(spec=ActivityRecorder) + recorder.enabled = True + + engine = ActivityLogRetentionEngine(repo=repo, db=tmp_db, recorder=recorder) + engine._settings = {"enabled": True, "max_days": 0, "max_entries": 0} + engine._prune() + + kwargs = repo.prune.call_args.kwargs + assert ( + kwargs.get("max_entries") is None + ), f"max_entries=0 should map to None (no cap), got {kwargs.get('max_entries')!r}" + + +def test_retention_disable_records_exactly_one_disable_event_with_bypass(tmp_db): + """Disabling via update_settings records exactly one 'audit_log.disabled' event + with _bypass_enabled=True BEFORE the enabled flag is set to False.""" + repo = ActivityLogRepository(tmp_db) + # Use a real recorder backed by repo to verify ordering. + pm = MagicMock() + pm.fire_event.return_value = None + real_recorder = ActivityRecorder(repo, pm) + real_recorder.enabled = True + + engine = ActivityLogRetentionEngine(repo=repo, db=tmp_db, recorder=real_recorder) + + # Disable — should record the event before flipping the flag. + asyncio.run(engine.update_settings(enabled=False, max_days=90, max_entries=20000)) + + # The disable event must be in the DB (persisted before the flag flipped). + entries = repo.query( + __import__( + "ledgrab.storage.activity_log", + fromlist=["ActivityLogFilters"], + ).ActivityLogFilters() + ) + disable_events = [e for e in entries if e.action == "audit_log.disabled"] + assert len(disable_events) == 1, f"Expected exactly 1 disable event, got {len(disable_events)}" + + # After disabling, a normal record must be a no-op. + real_recorder.record( + category=ActivityCategory.SYSTEM, + action="should.not.appear", + message="this should be dropped", + ) + entries_after = repo.query( + __import__( + "ledgrab.storage.activity_log", + fromlist=["ActivityLogFilters"], + ).ActivityLogFilters() + ) + post_disable_actions = [e.action for e in entries_after if e.action != "audit_log.disabled"] + assert post_disable_actions == [], f"Entries appeared after disable: {post_disable_actions}" + + +def test_retention_disable_does_not_record_event_when_already_disabled(tmp_db): + """update_settings(enabled=False) when already disabled must NOT record a second event.""" + repo = ActivityLogRepository(tmp_db) + pm = MagicMock() + pm.fire_event.return_value = None + recorder = MagicMock(spec=ActivityRecorder) + recorder.enabled = True + + engine = ActivityLogRetentionEngine(repo=repo, db=tmp_db, recorder=recorder) + + # First disable. + asyncio.run(engine.update_settings(enabled=False, max_days=90, max_entries=20000)) + first_count = recorder.record.call_count + + # Second disable — must NOT record another event. + asyncio.run(engine.update_settings(enabled=False, max_days=90, max_entries=20000)) + assert ( + recorder.record.call_count == first_count + ), "A second disable call recorded an extra event when already disabled" + + +async def test_retention_start_stop_cancels_task_cleanly(tmp_db): + """start() then stop() must cancel the task and leave _task=None. + + After stop(), the task has been requested to cancel and _task is None. + The cancellation may still be 'in-flight' on the event loop (status: + 'cancelling') until the next tick; we yield once to let the event loop + process the CancelledError and confirm task.done() is True. + """ + repo = ActivityLogRepository(tmp_db) + recorder = MagicMock(spec=ActivityRecorder) + recorder.enabled = True + + engine = ActivityLogRetentionEngine(repo=repo, db=tmp_db, recorder=recorder) + + await engine.start() + task = engine._task + assert task is not None, "start() did not create a task" + assert not task.done(), "task completed immediately — should be sleeping" + + await engine.stop() + # _task cleared immediately. + assert engine._task is None, "_task was not cleared to None after stop()" + # Give the event loop one tick to process the CancelledError. + await asyncio.sleep(0) + assert task.done(), ( + "task is still running after stop() + one event loop tick — " + "stop() did not cancel the task" + ) + + +async def test_retention_stop_without_start_is_safe(tmp_db): + """stop() without a prior start() must not raise.""" + repo = ActivityLogRepository(tmp_db) + recorder = MagicMock(spec=ActivityRecorder) + recorder.enabled = True + + engine = ActivityLogRetentionEngine(repo=repo, db=tmp_db, recorder=recorder) + assert engine._task is None + + # Must not raise. + await engine.stop() + assert engine._task is None + + +async def test_retention_start_disabled_no_task(tmp_db): + """start() when enabled=False must not create a task.""" + tmp_db.set_setting("activity_log", {**DEFAULT_SETTINGS, "enabled": False}) + repo = ActivityLogRepository(tmp_db) + recorder = MagicMock(spec=ActivityRecorder) + recorder.enabled = False + + engine = ActivityLogRetentionEngine(repo=repo, db=tmp_db, recorder=recorder) + await engine.start() + assert engine._task is None, "task was created even though engine is disabled" + await engine.stop() + + +async def test_retention_max_days_boundary(tmp_db): + """max_days ≤ 0 should not pass a before_ts to repo.prune (no age cap).""" + repo = MagicMock() + repo.prune.return_value = 0 + recorder = MagicMock(spec=ActivityRecorder) + recorder.enabled = True + + engine = ActivityLogRetentionEngine(repo=repo, db=tmp_db, recorder=recorder) + engine._settings = {"enabled": True, "max_days": 0, "max_entries": 0} + engine._prune() + + kwargs = repo.prune.call_args.kwargs + assert ( + kwargs.get("before_ts") is None + ), f"max_days=0 should map to before_ts=None (no age cap), got {kwargs.get('before_ts')!r}" + + +# --------------------------------------------------------------------------- +# 7. Lazy loop capture (recorder built before loop is running) +# --------------------------------------------------------------------------- + + +def test_lazy_loop_capture_without_explicit_loop(): + """Recorder with loop=None still works when record() is called from a running loop.""" + + async def _run(): + # Build the recorder BEFORE passing the loop explicitly — test lazy capture. + recorder, persisted, fired = _make_recorder(loop=None) + # Do NOT call ensure_loop() explicitly — the lazy path in record() must handle it. + recorder.record( + category=ActivityCategory.SYSTEM, + action="lazy.capture", + message="loop captured lazily", + ) + assert ( + len(persisted) == 1 + ), "Lazy loop capture failed: entry not persisted when loop=None at construction" + + asyncio.run(_run()) + + +def test_lazy_loop_stores_loop_for_subsequent_calls(): + """After the first call from an async context, _loop is populated.""" + + async def _run(): + recorder, persisted, fired = _make_recorder(loop=None) + assert recorder._loop is None + + recorder.record(category=ActivityCategory.SYSTEM, action="first", message="m") + + # The loop must have been captured. + assert ( + recorder._loop is not None + ), "recorder._loop was not set after first record() from async context" + assert recorder._loop is asyncio.get_running_loop() + + asyncio.run(_run()) + + +# --------------------------------------------------------------------------- +# 8. Module-level singleton accessor +# --------------------------------------------------------------------------- + + +def test_set_and_get_module_recorder(): + """set_module_recorder / get_module_recorder round-trip.""" + original = get_module_recorder() + try: + recorder, _, _ = _make_recorder() + set_module_recorder(recorder) + assert get_module_recorder() is recorder + finally: + # Restore whatever was there before (may be None in test isolation). + import ledgrab.core.activity_log.recorder as _mod + + _mod._recorder = original # type: ignore[attr-defined] + + +def test_get_module_recorder_returns_none_before_set(): + """get_module_recorder() returns None when not yet initialised.""" + import ledgrab.core.activity_log.recorder as _mod + + original = _mod._recorder # type: ignore[attr-defined] + _mod._recorder = None # type: ignore[attr-defined] + try: + assert get_module_recorder() is None + finally: + _mod._recorder = original # type: ignore[attr-defined]