feat(activity-log): phase 2 - recorder, actor context, retention, lifecycle

- ActivityRecorder: thread-safe record() (inline on loop, call_soon_threadsafe off-loop), best-effort, fires activity_logged event
- current_actor ContextVar set in verify_api_key (both branches), default system
- ActivityLogRetentionEngine: prune loop (max_days+max_entries), settings persistence, rehydrates recorder.enabled on startup
- lifespan wiring: server.shutting_down recorded first on shutdown, retention stop before db.close
- events-ws.ts allowlist + parity; DI getters + module accessor; 62 new tests
This commit is contained in:
2026-06-09 18:10:27 +03:00
parent 1ac4a0f66d
commit 726f39e2ba
14 changed files with 2037 additions and 16 deletions
+5
View File
@@ -10,6 +10,7 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette.websockets import WebSocket, WebSocketDisconnect
from ledgrab.config import get_config
from ledgrab.core.activity_log.context import current_actor
from ledgrab.utils import get_logger
from ledgrab.utils.net_classify import is_loopback as _classify_is_loopback
@@ -81,6 +82,7 @@ def verify_api_key(
# No keys configured — allow loopback only.
if _is_loopback(client_host):
request.state.auth_label = "anonymous"
current_actor.set("anonymous")
return "anonymous"
# Allow caller to authenticate explicitly even without configured keys?
# No — there are no keys to compare against. Reject.
@@ -127,6 +129,9 @@ def verify_api_key(
# Stash the friendly label so the access-log middleware can attribute the
# request to a client without re-running the token comparison.
request.state.auth_label = authenticated_as
# Set the actor ContextVar so ActivityRecorder can resolve it without
# threading it through every call site.
current_actor.set(authenticated_as)
return authenticated_as
+21
View File
@@ -42,6 +42,9 @@ from ledgrab.core.mqtt.mqtt_manager import MQTTManager
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
from ledgrab.storage.audio_processing_template_store import AudioProcessingTemplateStore
from ledgrab.storage.pattern_template_store import PatternTemplateStore
from ledgrab.core.activity_log.recorder import ActivityRecorder
from ledgrab.core.activity_log.retention import ActivityLogRetentionEngine
from ledgrab.storage.activity_log_repository import ActivityLogRepository
T = TypeVar("T")
@@ -196,6 +199,18 @@ def get_update_service() -> UpdateService:
return _get("update_service", "Update service")
def get_activity_recorder() -> ActivityRecorder:
return _get("activity_recorder", "Activity recorder")
def get_activity_log_repo() -> ActivityLogRepository:
return _get("activity_log_repo", "Activity log repository")
def get_activity_log_retention_engine() -> ActivityLogRetentionEngine:
return _get("activity_log_retention_engine", "Activity log retention engine")
# ── Event helper ────────────────────────────────────────────────────────
@@ -257,6 +272,9 @@ def init_dependencies(
http_endpoint_store: HTTPEndpointStore | None = None,
audio_processing_template_store: AudioProcessingTemplateStore | None = None,
pattern_template_store: PatternTemplateStore | None = None,
activity_recorder: ActivityRecorder | None = None,
activity_log_repo: ActivityLogRepository | None = None,
activity_log_retention_engine: ActivityLogRetentionEngine | None = None,
):
"""Initialize global dependencies."""
_deps.update(
@@ -295,5 +313,8 @@ def init_dependencies(
"http_endpoint_store": http_endpoint_store,
"audio_processing_template_store": audio_processing_template_store,
"pattern_template_store": pattern_template_store,
"activity_recorder": activity_recorder,
"activity_log_repo": activity_log_repo,
"activity_log_retention_engine": activity_log_retention_engine,
}
)
@@ -0,0 +1,23 @@
"""Activity / audit log core — recorder, retention engine, and actor context.
Public surface
--------------
``ActivityRecorder`` — thread-safe facade; persists entries and fires live events.
``ActivityLogRetentionEngine`` — background pruning engine (mirrors AutoBackupEngine).
``current_actor`` — ``ContextVar[str]`` set by the auth layer per request.
Quick start
-----------
Wired in ``main.py`` lifespan; injected via ``api/dependencies.py`` getters.
Phase 3 adds the instrumentation call sites.
"""
from ledgrab.core.activity_log.context import current_actor
from ledgrab.core.activity_log.recorder import ActivityRecorder
from ledgrab.core.activity_log.retention import ActivityLogRetentionEngine
__all__ = [
"ActivityRecorder",
"ActivityLogRetentionEngine",
"current_actor",
]
@@ -0,0 +1,21 @@
"""Actor context variable for the activity log.
``current_actor`` is set by ``api/auth.py:verify_api_key`` on every request so
that ``ActivityRecorder.record(...)`` can resolve the actor without requiring
every call site to pass it explicitly.
Default value is ``"system"`` — used by background engines and any code path
that runs outside a request context (e.g. lifespan startup/shutdown, zeroconf
discovery thread).
Per-request isolation is guaranteed by ASGI's coroutine context: each request
runs in its own coroutine with its own copy of the context inherited from the
server's main task. The auth layer resets it on every request before the route
handler runs, so stale labels from a previous request cannot bleed into a new
one.
"""
from contextvars import ContextVar
#: The actor label for the current request — API-key label or ``"system"``.
current_actor: ContextVar[str] = ContextVar("current_actor", default="system")
@@ -0,0 +1,267 @@
"""Thread-safe ActivityRecorder facade.
Responsibilities
----------------
1. Build an ``ActivityLogEntry`` from the caller-supplied fields.
2. Resolve the ``actor`` from the ``current_actor`` ContextVar when not given.
3. Persist the entry via ``ActivityLogRepository.record()`` on the event-loop
thread — inline if already on that thread, via
``loop.call_soon_threadsafe`` if called from another thread (e.g. the
zeroconf discovery thread that fires ``device_discovered/lost`` events).
4. Push a live ``activity_logged`` event via
``ProcessorManager.fire_event({"type": "activity_logged", "entry": {...}})``.
5. Never raise into the caller — audit recording is best-effort. Failures are
logged at ``WARNING`` level so operators can diagnose without breaking the
audited action.
Thread-marshal pattern mirrors ``utils/log_broadcaster.py`` (``ensure_loop`` /
``call_soon_threadsafe``).
Module accessor
---------------
A module-level singleton ``_recorder`` is populated by
``set_module_recorder()`` during ``main.py`` lifespan startup and exposed via
``get_module_recorder()``. Background engines and other non-DI sites that need
to call ``record()`` without FastAPI DI can use this accessor. Phase 3
instrumentation uses it at the ``fire_entity_event`` choke-point.
"""
from __future__ import annotations
import asyncio
import uuid
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from ledgrab.core.activity_log.context import current_actor
from ledgrab.storage.activity_log import ActivityLogEntry, ActivitySeverity
from ledgrab.utils import get_logger
if TYPE_CHECKING:
from ledgrab.core.processing.processor_manager import ProcessorManager
from ledgrab.storage.activity_log_repository import ActivityLogRepository
logger = get_logger(__name__)
def _new_id() -> str:
"""Generate a compact activity-log entry id: ``al_<8-hex-chars>``."""
return "al_" + uuid.uuid4().hex[:8]
def entry_to_dict(entry: ActivityLogEntry) -> dict:
"""Serialise an ``ActivityLogEntry`` to the canonical API/event payload dict.
Reused by Phase 4 (API response serialisation) and Phase 5 (frontend).
The shape is identical to the flat row codec minus the DB-only ``seq``
field, but with ``ts`` kept as an ISO-8601 string and ``metadata`` as a
real ``dict`` (not a JSON string).
"""
return {
"id": entry.id,
"ts": entry.ts.isoformat(),
"category": entry.category,
"action": entry.action,
"severity": entry.severity,
"actor": entry.actor,
"entity_type": entry.entity_type,
"entity_id": entry.entity_id,
"entity_name": entry.entity_name,
"message": entry.message,
"metadata": entry.metadata,
}
class ActivityRecorder:
"""Thread-safe facade for persisting audit log entries.
Parameters
----------
repo:
``ActivityLogRepository`` used to persist entries.
processor_manager:
``ProcessorManager`` whose ``fire_event`` dispatches the live
``activity_logged`` event to WebSocket subscribers.
loop:
Optional: the running asyncio event loop. If ``None``, it is
captured lazily on the first call that originates from an async
context (mirroring ``LogBroadcaster.ensure_loop``). Pass it
explicitly in tests to avoid depending on a real running loop.
"""
def __init__(
self,
repo: "ActivityLogRepository",
processor_manager: "ProcessorManager",
*,
loop: asyncio.AbstractEventLoop | None = None,
) -> None:
self._repo = repo
self._pm = processor_manager
self._loop: asyncio.AbstractEventLoop | None = loop
self._enabled: bool = True
# ── Loop capture (mirrors LogBroadcaster.ensure_loop) ──────────────────
def ensure_loop(self) -> None:
"""Capture the running event loop if not already stored.
Call from an async context (e.g. lifespan startup) so that
``call_soon_threadsafe`` works when ``record()`` is later called from
non-async threads.
"""
if self._loop is None:
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
pass
# ── Public API ──────────────────────────────────────────────────────────
@property
def enabled(self) -> bool:
"""Whether recording is currently active."""
return self._enabled
@enabled.setter
def enabled(self, value: bool) -> None:
self._enabled = value
def record(
self,
category: str,
action: str,
*,
severity: str = ActivitySeverity.INFO,
actor: str | None = None,
entity_type: str | None = None,
entity_id: str | None = None,
entity_name: str | None = None,
message: str,
metadata: dict | None = None,
_bypass_enabled: bool = False,
) -> None:
"""Append one audit entry — best-effort, never raises.
Parameters
----------
category:
Broad bucket — one of :class:`~ledgrab.storage.activity_log.ActivityCategory`.
action:
Verb-object label, e.g. ``"entity.created"`` or ``"server.shutting_down"``.
severity:
One of :class:`~ledgrab.storage.activity_log.ActivitySeverity`. Defaults
to ``"info"``.
actor:
Who triggered the action. When ``None`` (the common case), the
value is resolved from :data:`~ledgrab.core.activity_log.context.current_actor`
with a default of ``"system"``.
entity_type / entity_id / entity_name:
Optional entity context for entity-domain events.
message:
Human-readable description suitable for display.
metadata:
Small JSON-serialisable dict with extra context. Defaults to ``{}``.
_bypass_enabled:
Internal flag used by the retention engine to record the
"audit log disabled" event even when ``enabled`` is ``False``.
"""
if not self._enabled and not _bypass_enabled:
return
# Resolve actor from ContextVar when not explicitly supplied.
resolved_actor = actor if actor is not None else current_actor.get()
entry = ActivityLogEntry(
id=_new_id(),
ts=datetime.now(timezone.utc),
category=category,
action=action,
severity=severity,
actor=resolved_actor,
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
message=message,
metadata=metadata or {},
)
# Determine whether we are on the event-loop thread or not.
loop = self._loop
if loop is None:
# Lazy capture — may fail if called before the loop is running.
try:
loop = asyncio.get_running_loop()
self._loop = loop
except RuntimeError:
pass
if loop is not None and loop.is_running():
try:
current = asyncio.get_event_loop()
except RuntimeError:
current = None
# If the current thread IS the event-loop thread, write inline.
if current is loop:
self._write_and_emit(entry)
else:
# Called from a non-loop thread (e.g. zeroconf discovery) —
# marshal onto the event-loop thread.
try:
loop.call_soon_threadsafe(self._write_and_emit, entry)
except RuntimeError:
# Loop has been closed (rare; happens during tests)
logger.warning(
"ActivityRecorder: event loop closed, dropping entry %s", entry.id
)
else:
# No running loop — fall back to a direct synchronous write.
# This path hits in synchronous unit tests that do not start a loop.
self._write_and_emit(entry)
def _write_and_emit(self, entry: ActivityLogEntry) -> None:
"""Persist *entry* and fire the live event — called on the loop thread."""
try:
self._repo.record(entry)
except Exception as exc:
logger.warning("ActivityRecorder: failed to persist entry %s: %s", entry.id, exc)
return # don't emit an event for an entry that failed to persist
try:
self._pm.fire_event(
{
"type": "activity_logged",
"entry": entry_to_dict(entry),
}
)
except Exception as exc:
logger.warning("ActivityRecorder: failed to fire live event for %s: %s", entry.id, exc)
# ── Module-level singleton accessor ────────────────────────────────────────
#
# Background engines and non-DI call sites (Phase 3's fire_entity_event hook,
# device discovery thread) need ``record()`` without going through FastAPI DI.
# ``set_module_recorder`` is called from ``main.py`` lifespan immediately after
# the recorder is wired into ``init_dependencies``.
_recorder: ActivityRecorder | None = None
def set_module_recorder(recorder: ActivityRecorder) -> None:
"""Store the application-level recorder in the module singleton.
Called once from ``main.py`` lifespan startup.
"""
global _recorder
_recorder = recorder
def get_module_recorder() -> ActivityRecorder | None:
"""Return the module-level recorder, or ``None`` if not yet initialised.
Callers must guard against ``None`` — this returns ``None`` during module
import and early startup before ``main.py`` lifespan has run.
"""
return _recorder
@@ -0,0 +1,216 @@
"""Activity log retention engine.
Mirrors ``core/backup/auto_backup.py``:
- Settings persisted via ``db.get_setting("activity_log")`` /
``db.set_setting("activity_log", {...})``.
- ``start()`` / ``stop()`` lifecycle following the engine convention used
throughout the codebase.
- Hourly background loop calling ``repo.prune(before_ts=..., max_entries=...)``.
- ``get_settings()`` / ``async update_settings(...)`` for the Settings API
(Phase 4).
Changing ``enabled`` to ``False`` records an ``"audit_log.disabled"`` event via
the recorder BEFORE the flag takes effect — so the last action in the log is a
record of the intentional disable.
"""
from __future__ import annotations
import asyncio
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
from ledgrab.utils import get_logger
if TYPE_CHECKING:
from ledgrab.core.activity_log.recorder import ActivityRecorder
from ledgrab.storage.activity_log_repository import ActivityLogRepository
from ledgrab.storage.database import Database
logger = get_logger(__name__)
DEFAULT_SETTINGS: dict = {
"enabled": True,
"max_days": 90,
"max_entries": 20000,
}
# Prune loop interval — run roughly once an hour.
_PRUNE_INTERVAL_SECS = 3600
class ActivityLogRetentionEngine:
"""Background engine that prunes old activity log entries.
Parameters
----------
repo:
The ``ActivityLogRepository`` used to prune entries.
db:
The shared ``Database`` singleton for settings persistence.
recorder:
The ``ActivityRecorder`` used to log the "audit log disabled" event
before disabling takes effect.
"""
def __init__(
self,
repo: "ActivityLogRepository",
db: "Database",
recorder: "ActivityRecorder",
) -> None:
self._repo = repo
self._db = db
self._recorder = recorder
self._task: asyncio.Task | None = None
self._settings = self._load_settings()
# Rehydrate the recorder's enabled flag from persisted settings so a
# previously-disabled log stays disabled across restarts.
self._recorder.enabled = self._settings["enabled"]
# ── Settings persistence ───────────────────────────────────────────────
def _load_settings(self) -> dict:
data = self._db.get_setting("activity_log")
if data:
return {**DEFAULT_SETTINGS, **data}
return dict(DEFAULT_SETTINGS)
def _save_settings(self) -> None:
self._db.set_setting(
"activity_log",
{
"enabled": self._settings["enabled"],
"max_days": self._settings["max_days"],
"max_entries": self._settings["max_entries"],
},
)
# ── Lifecycle ──────────────────────────────────────────────────────────
async def start(self) -> None:
"""Start the retention loop if enabled."""
if self._settings["enabled"]:
self._start_loop()
logger.info(
"Activity log retention engine started " "(max_days=%d, max_entries=%d)",
self._settings["max_days"],
self._settings["max_entries"],
)
else:
logger.info("Activity log retention engine initialized (disabled)")
async def stop(self) -> None:
"""Cancel the retention loop."""
self._cancel_loop()
logger.info("Activity log retention engine stopped")
def _start_loop(self) -> None:
self._cancel_loop()
self._task = asyncio.create_task(self._retention_loop())
def _cancel_loop(self) -> None:
if self._task is not None:
self._task.cancel()
self._task = None
# ── Prune loop ─────────────────────────────────────────────────────────
async def _retention_loop(self) -> None:
try:
while True:
await asyncio.sleep(_PRUNE_INTERVAL_SECS)
try:
self._prune()
except Exception as exc:
logger.error("Activity log retention prune failed: %s", exc, exc_info=True)
except asyncio.CancelledError:
logger.debug("Activity log retention loop cancelled")
def _prune(self) -> None:
"""Execute one prune pass based on current settings."""
settings = self._settings
if not settings["enabled"]:
return
max_days: int = settings["max_days"]
max_entries: int = settings["max_entries"]
before_ts: datetime | None = None
if max_days and max_days > 0:
before_ts = datetime.now(timezone.utc) - timedelta(days=max_days)
max_entries_val: int | None = max_entries if max_entries and max_entries > 0 else None
deleted = self._repo.prune(before_ts=before_ts, max_entries=max_entries_val)
if deleted:
logger.info(
"Activity log pruned %d rows (max_days=%d, max_entries=%d)",
deleted,
max_days,
max_entries,
)
# ── Public API ─────────────────────────────────────────────────────────
def get_settings(self) -> dict:
"""Return the current retention settings dict."""
return {
"enabled": self._settings["enabled"],
"max_days": self._settings["max_days"],
"max_entries": self._settings["max_entries"],
}
async def update_settings(
self,
*,
enabled: bool,
max_days: int,
max_entries: int,
) -> dict:
"""Persist new settings and apply them immediately.
If ``enabled`` is changing to ``False``, the disable event is recorded
BEFORE the flag takes effect so there is a final log entry.
Returns the new settings dict (same as ``get_settings()``).
"""
was_enabled = self._settings["enabled"]
# Record the disable event before the recorder stops accepting entries.
if was_enabled and not enabled:
self._recorder.record(
category=ActivityCategory.SYSTEM,
action="audit_log.disabled",
severity=ActivitySeverity.WARNING,
actor="system",
message="Activity log recording disabled via settings",
_bypass_enabled=True,
)
self._settings["enabled"] = enabled
self._settings["max_days"] = max_days
self._settings["max_entries"] = max_entries
self._save_settings()
# Propagate enabled flag to the recorder.
self._recorder.enabled = enabled
if enabled:
self._start_loop()
logger.info(
"Activity log retention enabled (max_days=%d, max_entries=%d)",
max_days,
max_entries,
)
# Run an immediate prune pass when re-enabling.
try:
self._prune()
except Exception as exc:
logger.error("Activity log immediate prune failed: %s", exc)
else:
self._cancel_loop()
logger.info("Activity log retention disabled")
return self.get_settings()
+41
View File
@@ -60,6 +60,9 @@ from ledgrab.storage.audio_processing_template_store import AudioProcessingTempl
from ledgrab.storage.pattern_template_store import PatternTemplateStore
import ledgrab.core.audio.filters # noqa: F401 — trigger audio filter auto-registration
from ledgrab.core.backup.auto_backup import AutoBackupEngine
from ledgrab.core.activity_log.recorder import ActivityRecorder, set_module_recorder
from ledgrab.core.activity_log.retention import ActivityLogRetentionEngine
from ledgrab.storage.activity_log_repository import ActivityLogRepository
from ledgrab.core.processing.os_notification_listener import OsNotificationListener
from ledgrab.core.devices.discovery_watcher import DiscoveryWatcher
from ledgrab.core.update.update_service import UpdateService
@@ -184,6 +187,10 @@ pattern_template_store = PatternTemplateStore(db)
game_event_bus = GameEventBus()
register_community_adapters()
# Activity log repository — constructed at module level like other stores so
# it migrates the DB schema (``002_add_activity_log``) on import.
activity_log_repo = ActivityLogRepository(db)
processor_manager = ProcessorManager(
ProcessorDependencies(
picture_source_store=picture_source_store,
@@ -290,6 +297,17 @@ async def lifespan(app: FastAPI):
processor_manager=processor_manager,
)
# Create activity recorder + retention engine. The recorder needs the
# processor_manager to fire live events, so it is built after that is
# already constructed at module level.
activity_recorder = ActivityRecorder(activity_log_repo, processor_manager)
activity_recorder.ensure_loop()
activity_log_retention_engine = ActivityLogRetentionEngine(
repo=activity_log_repo,
db=db,
recorder=activity_recorder,
)
# Create auto-backup engine — derive paths from database location so that
# demo mode auto-backups go to data/demo/ instead of data/.
_data_dir = Path(config.storage.database_file).parent
@@ -347,7 +365,13 @@ async def lifespan(app: FastAPI):
http_endpoint_store=http_endpoint_store,
audio_processing_template_store=audio_processing_template_store,
pattern_template_store=pattern_template_store,
activity_recorder=activity_recorder,
activity_log_repo=activity_log_repo,
activity_log_retention_engine=activity_log_retention_engine,
)
# Expose the recorder via the module singleton so non-DI sites
# (fire_entity_event, device threads) can call record() without FastAPI DI.
set_module_recorder(activity_recorder)
# Register devices in processor manager for health monitoring
devices = device_store.get_all_devices()
@@ -390,6 +414,9 @@ async def lifespan(app: FastAPI):
# Start auto-backup engine (periodic configuration backups)
await auto_backup_engine.start()
# Start activity log retention engine (hourly prune of old entries)
await activity_log_retention_engine.start()
# Start update checker (periodic release polling)
await update_service.start()
@@ -438,6 +465,19 @@ async def lifespan(app: FastAPI):
except Exception as e:
logger.error("Shutdown step '%s' raised: %s", label, e)
# Record the shutdown event FIRST — before any engine teardown — so there
# is always a final log entry on graceful shutdown.
try:
activity_recorder.record(
category="system",
action="server.shutting_down",
severity="info",
actor="system",
message="Server is shutting down",
)
except Exception as e:
logger.error("Failed to record shutdown event: %s", e)
# Legacy hook — SQLite stores are write-through so this only logs.
# Durability comes from PRAGMA synchronous=FULL + the explicit
# wal_checkpoint(TRUNCATE) in Database.close() at the end of this block.
@@ -510,6 +550,7 @@ async def lifespan(app: FastAPI):
await _bounded("update_service.stop", update_service.stop(), timeout=0.5)
await _bounded("auto_backup_engine.stop", auto_backup_engine.stop(), timeout=0.5)
await _bounded("activity_log_retention.stop", activity_log_retention_engine.stop(), timeout=0.5)
# Close the DB last so it runs a TRUNCATE checkpoint, flushing the WAL
# into the main file. Without this, writes can survive a graceful app
@@ -32,6 +32,7 @@ import { openAuthedWs } from './ws-auth.ts';
* update_download_progress — update_service.py (consumed by features/update.ts)
* device_discovered — discovery_watcher.py (consumed by features/notifications-watcher.ts)
* device_lost — discovery_watcher.py (consumed by features/notifications-watcher.ts)
* activity_logged — core/activity_log/recorder.py (consumed by features/activity-log.ts)
*
* Missing any of these silently breaks the corresponding UI flow — keep
* this list in sync when adding new event types on the server side.
@@ -47,6 +48,7 @@ const _ALLOWED_SERVER_EVENT_TYPES: ReadonlySet<string> = new Set([
'update_download_progress',
'device_discovered',
'device_lost',
'activity_logged', // source: core/activity_log/recorder.py
]);
interface ServerEventEnvelope {