"""Append-only audit log for sensitive actions (script + callback execution). Writes a single JSONL line per event to ``/audit.log``. The log is write-only from the app's perspective — it never reads back, and rotation is left to the operator (the file size is dominated by stdout/stderr truncation, which is already capped at 10 KB per stream in `_run_script`). Designed to be cheap: the write goes through a small background thread so the hot path never blocks on disk I/O, and a failure to write is logged at WARNING but never raised to callers. """ from __future__ import annotations import json import logging import queue import threading import time from typing import Any from ..auth import token_label_var from ..config import get_config_dir logger = logging.getLogger(__name__) # Cap on stdout/stderr inside the audit record so a chatty script doesn't # explode the log. Mirrors the 10k cap used by _run_script. _OUTPUT_CAP = 2000 _audit_queue: "queue.Queue[dict[str, Any] | None]" = queue.Queue(maxsize=1000) _audit_thread: threading.Thread | None = None _audit_lock = threading.Lock() def _ensure_writer_started() -> None: global _audit_thread with _audit_lock: if _audit_thread is not None and _audit_thread.is_alive(): return _audit_thread = threading.Thread( target=_audit_writer_loop, name="audit-log", daemon=True, ) _audit_thread.start() def _audit_writer_loop() -> None: log_path = get_config_dir() / "audit.log" while True: try: record = _audit_queue.get() except Exception: return if record is None: return try: line = json.dumps(record, ensure_ascii=False, default=str) with open(log_path, "a", encoding="utf-8") as f: f.write(line + "\n") except OSError as e: logger.warning("Failed to write audit record: %s", e) def _truncate(value: str | None) -> str | None: if value is None: return None if len(value) <= _OUTPUT_CAP: return value return value[:_OUTPUT_CAP] + f"\n…[truncated, {len(value) - _OUTPUT_CAP} chars]" def record_script_execution( *, kind: str, name: str, exit_code: int | None, duration: float | None, stdout: str | None = None, stderr: str | None = None, error: str | None = None, ) -> None: """Append a single audit record. Never raises.""" _ensure_writer_started() try: record = { "ts": time.time(), "iso": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()), "token_label": token_label_var.get("unknown"), "kind": kind, "name": name, "exit_code": exit_code, "duration_s": round(duration, 4) if duration is not None else None, "success": exit_code == 0 if exit_code is not None else False, "stdout": _truncate(stdout), "stderr": _truncate(stderr), "error": error, } _audit_queue.put_nowait(record) except queue.Full: # Backpressure: drop oldest record to make room. We'd rather lose an # old entry than block the script that just ran. try: _audit_queue.get_nowait() _audit_queue.put_nowait(record) except queue.Empty: pass except Exception as e: logger.warning("Failed to enqueue audit record: %s", e) def shutdown_audit_log() -> None: """Flush the audit queue on app shutdown.""" try: _audit_queue.put_nowait(None) except queue.Full: pass if _audit_thread is not None: _audit_thread.join(timeout=2)