feat(activity-log): phase 3 - event instrumentation (4 categories)

- entity CRUD via fire_entity_event choke point (name resolved/sanitized; deletes pass name explicitly)
- auth: failures + WS session establishment (no tokens logged); per-IP audit-record throttle
- device: online/offline (health), discovered/lost (zeroconf), ADB connect/disconnect
- capture/system: target start-stop, scenes, playlists, automations, backup/restore, update, restart, calibration, settings
- security hardening: sanitize_display strips control/NUL/ANSI/newlines from untrusted strings; malformed-IPv6 origin guard
- 129 instrumentation tests (incl. secret-leak, log-injection, throttle, best-effort) + autouse throttle-reset fixture
This commit is contained in:
2026-06-09 19:20:57 +03:00
parent 726f39e2ba
commit 25c613c5cb
29 changed files with 3012 additions and 44 deletions
+1
View File
@@ -67,3 +67,4 @@ context (survives across phases; graduates to CLAUDE.md only if it's a lasting p
Phase 1 landed (2026-06-09): `activity_log.py` (dataclass + enums + filters + codec), `AddActivityLogTableMigration` (`002_add_activity_log`) appended to `ALL_MIGRATIONS`, `ActivityLogRepository` (record/query/count/prune/clear/iter_export), 41 new tests — all green. Full suite 2226 passed, 0 failed. Schema and method signatures frozen in phase-1-storage.md Handoff. Gotcha: `Database.execute` takes a positional tuple — use `?` placeholders (not `:name`), otherwise Python 3.14 will raise `ProgrammingError`.
Phase 2 landed (2026-06-09): `core/activity_log/` package (`context.py`, `recorder.py`, `retention.py`, `__init__.py`); actor ContextVar set in `api/auth.py` (both branches); `ActivityLogRetentionEngine` mirroring AutoBackupEngine; full wiring in `main.py` (repo at module level, recorder+engine in lifespan, `server.shutting_down` first shutdown action, engine stop before db.close); DI getters in `api/dependencies.py`; `activity_logged` added to `_ALLOWED_SERVER_EVENT_TYPES` in `events-ws.ts`; `set_module_recorder` exposes recorder to non-DI sites; 24 new tests — all green. Full suite 2309 passed, 2 skipped, 0 failed. Ruff clean.
Phase 3 landed (2026-06-09): instrumented all four categories — entity CRUD via `fire_entity_event` choke-point (`dependencies.py`), auth failures + WS session in `auth.py`, device online/offline in `device_health.py`, device discovered/lost in `discovery_watcher.py`, ADB connect/disconnect in `system_settings.py`, capture start/stop (individual + bulk) in `output_targets_control.py`, scene/playlist/automation activate in their respective route/engine files, backup/restore/delete + restart/shutdown/update/calibration/settings in `backup.py`/`update.py`/`calibration.py`; all 11 entity delete handlers pass `entity_name` to `fire_entity_event`; 22 new tests (security: token never in any field, explicitly asserted) — all green. Full suite 2369 passed, 2 skipped, 0 failed. Ruff clean. Complete (category, action) inventory in phase-3-instrumentation.md Handoff section.
+8 -4
View File
@@ -60,8 +60,8 @@ is an on-demand CSV/JSON **export** (no separate backup subsystem).
## Phases
- [x] Phase 1: Storage — model, migration, repository [domain: data] → [subplan](./phase-1-storage.md)
- [ ] Phase 2: Recorder, actor context, retention, lifecycle [domain: backend] → [subplan](./phase-2-recorder-retention.md)
- [ ] Phase 3: Event instrumentation (4 categories) [domain: backend] → [subplan](./phase-3-instrumentation.md)
- [x] Phase 2: Recorder, actor context, retention, lifecycle [domain: backend] → [subplan](./phase-2-recorder-retention.md)
- [x] Phase 3: Event instrumentation (4 categories) [domain: backend] → [subplan](./phase-3-instrumentation.md)
- [ ] Phase 4: REST API — query/filter/export/settings/clear [domain: backend] → [subplan](./phase-4-api.md)
- [ ] Phase 5: Frontend — Activity tab + smart filtering + live updates [domain: frontend] → [subplan](./phase-5-frontend-tab.md)
- [ ] Phase 6: Dashboard widget + Settings panel + docs [domain: frontend] → [subplan](./phase-6-dashboard-settings.md)
@@ -81,7 +81,7 @@ is an on-demand CSV/JSON **export** (no separate backup subsystem).
|-------|--------|--------|--------|-------|-----------|
| Phase 1: Storage | data | ✅ Done | ✅ Passed | ✅ Passed | ✅ |
| Phase 2: Recorder/Retention | backend | ✅ Done | ✅ Passed | ✅ Passed | ✅ |
| Phase 3: Instrumentation | backend | ⬜ Not Started | ⬜ | ⬜ | |
| Phase 3: Instrumentation | backend | ✅ Done | ✅ Passed | ✅ Passed | |
| Phase 4: REST API | backend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
| Phase 5: Frontend tab | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
| Phase 6: Dashboard/Settings | frontend | ⬜ Not Started | ⬜ | ⬜ | ⬜ |
@@ -90,7 +90,11 @@ is an on-demand CSV/JSON **export** (no separate backup subsystem).
| Phase | Warning | Severity | Status (open / resolved / accepted) |
|-------|---------|----------|-------------------------------------|
| | | | |
| 3 | Log injection via unauth mDNS device name/url into audit message | 🟠 High (security) | resolved — `sanitize_display` helper applied |
| 3 | Origin sanitizer missed spaces/NUL/ANSI | 🟠 High (security) | resolved — `sanitize_display` over netloc |
| 3 | Unauth auth-failure audit-write flood (no write-rate bound) | 🟠 High (security) | resolved — per-IP audit-record throttle (10s, capped) |
| 3 | Malformed-IPv6 Origin → urlparse ValueError into WS handler | 🟡 Warning | resolved — try/except guard |
| 3 | Throttle module-global state caused flaky test contamination | 🟡 Warning | resolved — autouse conftest reset fixture |
## Final Review
+84 -20
View File
@@ -1,6 +1,6 @@
# Phase 3: Event instrumentation (4 categories)
**Status:** ⬜ Not Started
**Status:** ✅ Done
**Parent plan:** [PLAN.md](./PLAN.md)
**Domain:** backend · 🔒 security-sensitive (security reviewer triggers)
@@ -13,7 +13,7 @@ Maximize coverage via the central `fire_entity_event` choke point; add explicit
## Tasks
### Entity CRUD (via the choke point)
- [ ] In `api/dependencies.py`, extend `fire_entity_event` to ALSO record an audit entry:
- [x] In `api/dependencies.py`, extend `fire_entity_event` to ALSO record an audit entry:
- Signature gains an optional `entity_name: str | None = None`.
- For `created`/`updated`: if `entity_name` not supplied, best-effort resolve from the
matching store in `_deps` keyed by `entity_type` (entity still present). For `deleted`:
@@ -22,14 +22,14 @@ Maximize coverage via the central `fire_entity_event` choke point; add explicit
- Map `action` → severity (`info`), category `entity`. Build a human message
(e.g. `"Target 'Desk' updated"`). Read actor from the ContextVar.
- Recording is best-effort (never break the entity operation).
- [ ] Update entity **delete** handlers to pass `entity_name` into `fire_entity_event`
- [x] Update entity **delete** handlers to pass `entity_name` into `fire_entity_event`
(the entity object is already loaded for the 404 check). Cover the representative/most-used
entities at minimum: output targets, sync clocks, devices, picture/audio/color-strip
sources, automations, scene presets/playlists, templates, gradients. (Create/update can rely
on hook resolution but pass the name where trivially available.)
### Authentication (DESCOPED: no key create/rotate/revoke — those routes don't exist)
- [ ] In `api/auth.py`, record:
- [x] In `api/auth.py`, record:
- auth **failures**: missing/invalid Bearer token (HTTP), rejected LAN-without-keys, rejected
WS origin (4403), WS auth handshake failure (4401). Category `auth`, severity `warning`.
Include the caller IP/label and the reason in `metadata`**never** the attempted token.
@@ -38,26 +38,26 @@ Maximize coverage via the central `fire_entity_event` choke point; add explicit
- (Do NOT record per-request HTTP auth *success* — too frequent.)
### Device connect/disconnect (use existing discrete seams)
- [ ] Hook `device_health_changed` (`core/processing/device_health.py`, fired only on
- [x] Hook `device_health_changed` (`core/processing/device_health.py`, fired only on
`online != prev_online`) → record online/offline transition. Category `device`,
severity `info` (online) / `warning` (offline).
- [ ] Hook `device_discovered` / `device_lost` (`core/devices/discovery_watcher.py`, **runs on
- [x] Hook `device_discovered` / `device_lost` (`core/devices/discovery_watcher.py`, **runs on
the zeroconf thread** → recorder must marshal to the loop, which Phase 2 handles). Category
`device`.
- [ ] ADB connect/disconnect (`api/routes/system_settings.py:adb_connect/adb_disconnect`).
- [x] ADB connect/disconnect (`api/routes/system_settings.py:adb_connect/adb_disconnect`).
### Capture & system events (explicit record calls)
- [ ] Target processing start/stop + bulk (`api/routes/output_targets_control.py`).
- [ ] Scene activation (`scene_presets.py:activate_scene_preset`), playlist start/stop
- [x] Target processing start/stop + bulk (`api/routes/output_targets_control.py`).
- [x] Scene activation (`scene_presets.py:activate_scene_preset`), playlist start/stop
(`scene_playlists.py`), automation activate/deactivate (`automation_engine.py`).
- [ ] System: backup create/restore/delete (`backup.py`), update apply/dismiss (`update.py`),
- [x] System: backup create/restore/delete (`backup.py`), update apply/dismiss (`update.py`),
restart/shutdown (`backup.py`), calibration start/stop/cancel (`calibration.py`).
- [ ] Settings changes: scope to high-value settings only (auto-backup, update, shutdown
- [x] Settings changes: scope to high-value settings only (auto-backup, update, shutdown
action). **Exclude the activity-log's own `"activity_log"` settings key** to avoid
self-referential churn.
### Tests
- [ ] `server/tests/test_activity_instrumentation.py` (or per-area):
- [x] `server/tests/test_activity_instrumentation.py` (or per-area):
- representative entity create/update/delete produces a record with correct category/actor/
name (incl. a delete carrying its name);
- an auth failure produces a `warning` record and the token never appears in any field;
@@ -95,14 +95,78 @@ Maximize coverage via the central `fire_entity_event` choke point; add explicit
## Review Checklist
- [ ] All tasks completed
- [ ] Code follows project conventions
- [ ] No unintended side effects (audited actions still succeed on recorder failure)
- [ ] No secrets logged (token never recorded) — explicitly verified
- [ ] Build passes (ruff + pytest)
- [ ] Tests pass (new + existing)
- [x] All tasks completed
- [x] Code follows project conventions
- [x] No unintended side effects (audited actions still succeed on recorder failure)
- [x] No secrets logged (token never recorded) — explicitly verified
- [x] Build passes (ruff + pytest)
- [x] Tests pass (new + existing)
## Handoff to Next Phase
<!-- Filled in by the implementer: list of categories/actions actually emitted and their
metadata keys, so Phase 4 filter options and Phase 5 quick-filter presets match reality. -->
Phase 3 is complete. The following (category, action) pairs are now emitted, along with their
metadata keys, for Phase 4 to expose via query/filter and for Phase 5 quick-filter presets.
### `entity` category
| Action | Severity | Metadata keys | Notes |
|--------|----------|---------------|-------|
| `entity.created` | info | — | All entity types via `fire_entity_event` choke-point |
| `entity.updated` | info | — | All entity types; name resolved from store when not passed |
| `entity.deleted` | info | — | Name passed explicitly by delete handler before deletion |
### `auth` category
| Action | Severity | Metadata keys | Notes |
|--------|----------|---------------|-------|
| `auth.rejected` | warning | `reason` (str), `client` (str/IP) | Missing Bearer, invalid Bearer, LAN-no-keys, WS origin, WS auth timeout, invalid WS token |
| `auth.ws_connected` | info | `client` (str/IP) | Successful WS session established |
### `device` category
| Action | Severity | Metadata keys | Notes |
|--------|----------|---------------|-------|
| `device.online` | info | `latency_ms` (float) | Health monitor, transition only |
| `device.offline` | warning | `latency_ms` (float) | Health monitor, transition only |
| `device.discovered` | info | `url` (str), `device_type` (str) | Zeroconf discovery thread; recorder marshals to loop |
| `device.lost` | warning | `url` (str), `device_type` (str) | Zeroconf discovery thread |
| `device.adb_connected` | info | `address` (str) | ADB route success |
| `device.adb_disconnected` | info | `address` (str) | ADB route success |
### `capture` category
| Action | Severity | Metadata keys | Notes |
|--------|----------|---------------|-------|
| `capture.started` | info | — | Per target (individual + bulk) |
| `capture.stopped` | info | — | Per target (individual + bulk) |
| `scene.activated` | info | — | `scene_presets.py:activate_scene_preset` |
| `playlist.started` | info | — | `scene_playlists.py:start_scene_playlist` |
| `playlist.stopped` | info | — | `scene_playlists.py:stop_scene_playlist` |
| `automation.activated` | info | — | `automation_engine.py:_activate_automation`; actor="system" |
| `automation.deactivated` | info | — | `automation_engine.py:_deactivate_automation`; actor="system" |
### `system` category
| Action | Severity | Metadata keys | Notes |
|--------|----------|---------------|-------|
| `backup.created` | info | `filename` (str) | `backup.py:backup_config` |
| `backup.restored` | info | — | `backup.py:restore_config` |
| `backup.deleted` | info | `filename` (str) | `backup.py:delete_saved_backup` |
| `server.restarting` | info | — | `backup.py:restart_server` |
| `server.shutdown_requested` | info | — | `backup.py:shutdown_server` |
| `update.dismissed` | info | `version` (str) | `update.py:dismiss_update` |
| `update.applied` | info | `version` (str) | `update.py:apply_update` |
| `settings.changed` | info | `setting_key` (str) + setting-specific keys | `setting_key` values: `"auto_backup"`, `"update"`, `"shutdown_action"`. Activity-log own key excluded. |
| `calibration.started` | info | — | `calibration.py`; entity_type="device", entity_id=device_id |
| `calibration.stopped` | info | — | `calibration.py` |
| `calibration.cancelled` | info | — | `calibration.py` |
### Implementation notes for Phase 4
- The `metadata` field is a JSON `TEXT` column. All keys above are scalars (str, float).
- Phase 4 filter `metadata_key` / `metadata_value` lookup, if added, can target `setting_key`
for settings-change filtering.
- `entity_type` is populated for entity CRUD and `calibration.started`. For auth/system/capture
events `entity_type` may be None.
- `entity_name` is always populated for `entity.deleted`; populated for CRUD create/update
when resolved; populated for most capture/system events where a name is meaningful.
+119 -2
View File
@@ -3,7 +3,9 @@
import asyncio
import json
import secrets
import time
from typing import Annotated
from urllib.parse import urlparse
from fastapi import Depends, HTTPException, Request, Security, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
@@ -11,11 +13,101 @@ from starlette.websockets import WebSocket, WebSocketDisconnect
from ledgrab.config import get_config
from ledgrab.core.activity_log.context import current_actor
from ledgrab.core.activity_log.sanitize import sanitize_display
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
from ledgrab.utils import get_logger
from ledgrab.utils.net_classify import is_loopback as _classify_is_loopback
logger = get_logger(__name__)
# ── Auth-failure audit throttle (H3) ───────────────────────────────────────
#
# Unauthenticated callers can hammer any auth path; without a recording
# throttle each attempt would write one SQLite row AND broadcast one WS event,
# providing a cheap disk/broadcast amplification vector.
#
# Mitigation: record at most one ``auth.rejected`` audit entry per client IP
# per _AUTH_RECORD_WINDOW seconds. The auth decision (401) is NEVER
# suppressed — only the *audit recording* is de-duplicated.
#
# Memory safety: the throttle dict is capped at _AUTH_THROTTLE_HARD_CAP
# entries. When the cap is exceeded the oldest-seen IP (lowest timestamp) is
# evicted so the dict stays bounded regardless of the number of distinct source
# IPs an attacker can forge.
_AUTH_RECORD_WINDOW: float = 10.0 # seconds — one record per IP per window
_AUTH_THROTTLE_HARD_CAP: int = 512 # max IPs tracked simultaneously
# ip -> monotonic timestamp of last *recorded* auth.rejected entry
_auth_record_last: dict[str, float] = {}
def _should_record_auth_failure(client_ip: str) -> bool:
"""Return True when an ``auth.rejected`` record should be written for *client_ip*.
Suppresses duplicates within _AUTH_RECORD_WINDOW seconds. Evicts the
oldest entry when the dict exceeds _AUTH_THROTTLE_HARD_CAP to prevent
unbounded memory growth under IP-spray attacks.
"""
now = time.monotonic()
last = _auth_record_last.get(client_ip)
if last is not None and (now - last) < _AUTH_RECORD_WINDOW:
return False # suppress: within the de-dup window
# Enforce hard cap before inserting: evict the single oldest entry.
if len(_auth_record_last) >= _AUTH_THROTTLE_HARD_CAP:
oldest_ip = min(_auth_record_last, key=lambda ip: _auth_record_last[ip])
del _auth_record_last[oldest_ip]
_auth_record_last[client_ip] = now
return True
def _record_auth_failure(reason: str, client_host: str | None) -> None:
"""Best-effort: record an auth failure audit entry (never raises).
SECURITY: the attempted token is NEVER passed here; only the reason and
the caller's IP/label are recorded.
THROTTLE: at most one ``auth.rejected`` record is written per client IP
per _AUTH_RECORD_WINDOW seconds to prevent disk/WS-broadcast amplification
DoS. The 401 response is always returned regardless.
"""
if not _should_record_auth_failure(client_host or "unknown"):
return # throttled — drop duplicate recording for this IP/window
from ledgrab.core.activity_log.recorder import get_module_recorder
rec = get_module_recorder()
if rec is None:
return
rec.record(
category=ActivityCategory.AUTH,
action="auth.rejected",
severity=ActivitySeverity.WARNING,
actor="anonymous",
message=f"Authentication failed: {reason}",
metadata={"reason": reason, "client": client_host or "unknown"},
)
def _record_ws_auth_success(label: str, client_host: str | None) -> None:
"""Best-effort: record a successful WebSocket session establishment."""
from ledgrab.core.activity_log.recorder import get_module_recorder
rec = get_module_recorder()
if rec is None:
return
rec.record(
category=ActivityCategory.AUTH,
action="auth.ws_connected",
severity=ActivitySeverity.INFO,
actor=label,
message=f"WebSocket session established by '{label}'",
metadata={"client": client_host or "unknown"},
)
# Security scheme for Bearer token
security = HTTPBearer(auto_error=False)
@@ -87,6 +179,7 @@ def verify_api_key(
# Allow caller to authenticate explicitly even without configured keys?
# No — there are no keys to compare against. Reject.
logger.warning("Rejected LAN request from %s: no API key configured", client_host)
_record_auth_failure("LAN access rejected: no API key configured", client_host)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=(
@@ -99,13 +192,14 @@ def verify_api_key(
# Check if credentials are provided
if not credentials:
logger.warning("Request missing Authorization header")
_record_auth_failure("missing Bearer token", client_host)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing API key - authentication is required",
headers={"WWW-Authenticate": "Bearer"},
)
# Extract token
# Extract token — NEVER log or record the token value itself.
token = credentials.credentials
# Find matching key and return its label using constant-time comparison
@@ -117,6 +211,7 @@ def verify_api_key(
if not authenticated_as:
logger.warning("Invalid API key attempt")
_record_auth_failure("invalid Bearer token", client_host)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
@@ -195,12 +290,30 @@ async def accept_and_authenticate_ws(websocket: WebSocket, timeout: float = 3.0)
# a strong signal even before the token check. Non-browser clients
# legitimately omit Origin; those fall through to the auth handshake.
config = get_config()
client_host = websocket.client.host if websocket.client else None
origin = websocket.headers.get("origin")
if not _is_origin_allowed(origin, config.server.cors_origins):
logger.warning(
"Rejected WebSocket from origin %r (not in cors_origins)",
origin,
)
# Sanitize first so urlparse does not choke on control chars / ANSI / NUL
# embedded by an attacker in the Origin header (e.g. \n triggers IPv6 parse
# error in Python's urlsplit on malformed netloc).
_safe_origin_raw = sanitize_display(origin) if origin else ""
try:
_netloc = urlparse(_safe_origin_raw).netloc if _safe_origin_raw else ""
except ValueError:
# Malformed IPv6 addresses (e.g. "http://[::1" without closing "]")
# cause urlparse to raise ValueError. Fall back to "unknown" — do NOT
# fall back to the raw origin string, which could carry query params
# or path components containing secrets.
_netloc = ""
_safe_origin = sanitize_display(_netloc or "unknown")
_record_auth_failure(
f"WebSocket origin rejected: {_safe_origin!r}",
client_host,
)
try:
await websocket.close(code=WS_ORIGIN_CLOSE_CODE)
except _WS_SEND_BENIGN_EXC:
@@ -215,6 +328,7 @@ async def accept_and_authenticate_ws(websocket: WebSocket, timeout: float = 3.0)
except _WS_SEND_BENIGN_EXC:
pass
return None
_record_ws_auth_success(label, client_host)
return label
@@ -280,6 +394,7 @@ async def verify_ws_auth(
return None
return "anonymous"
logger.warning("WebSocket auth timeout after %.1fs from %s", timeout, client_host)
_record_auth_failure("WebSocket auth timeout", client_host)
try:
await websocket.send_json({"type": "auth_error", "reason": "auth timeout"})
except _WS_SEND_BENIGN_EXC:
@@ -337,6 +452,7 @@ async def verify_ws_auth(
await websocket.send_json({"type": "auth_ok"})
return "anonymous"
logger.warning("Rejected LAN WebSocket from %s: no API key configured", client_host)
_record_auth_failure("LAN WebSocket rejected: no API key configured", client_host)
try:
await websocket.send_json(
{
@@ -348,10 +464,11 @@ async def verify_ws_auth(
pass
return None
# Keys configured: require a matching token.
# Keys configured: require a matching token. NEVER log the token value.
label = _match_api_key(token or "")
if not label:
logger.warning("Invalid WebSocket auth attempt from %s", client_host)
_record_auth_failure("invalid WebSocket token", client_host)
try:
await websocket.send_json({"type": "auth_error", "reason": "invalid token"})
except _WS_SEND_BENIGN_EXC:
+92 -3
View File
@@ -42,8 +42,10 @@ from ledgrab.core.mqtt.mqtt_manager import MQTTManager
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
from ledgrab.storage.audio_processing_template_store import AudioProcessingTemplateStore
from ledgrab.storage.pattern_template_store import PatternTemplateStore
from ledgrab.core.activity_log.recorder import ActivityRecorder
from ledgrab.core.activity_log.recorder import ActivityRecorder, get_module_recorder
from ledgrab.core.activity_log.retention import ActivityLogRetentionEngine
from ledgrab.core.activity_log.sanitize import sanitize_display
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
from ledgrab.storage.activity_log_repository import ActivityLogRepository
T = TypeVar("T")
@@ -214,13 +216,68 @@ def get_activity_log_retention_engine() -> ActivityLogRetentionEngine:
# ── Event helper ────────────────────────────────────────────────────────
def fire_entity_event(entity_type: str, action: str, entity_id: str) -> None:
"""Fire an entity_changed event via the ProcessorManager event bus.
def _resolve_entity_name(entity_type: str, entity_id: str) -> str | None:
"""Best-effort: look up a human name for *entity_id* from the matching store.
Returns ``None`` when the store is missing, the entity is gone, or any
exception occurs (e.g. during delete the entity may have just been removed).
"""
# Map entity_type → (_deps key, method name on the store)
_STORE_LOOKUP: dict[str, tuple[str, str]] = {
"output_target": ("output_target_store", "get_target"),
"device": ("device_store", "get_device"),
"picture_source": ("picture_source_store", "get_source"),
"audio_source": ("audio_source_store", "get_source"),
"color_strip_source": ("color_strip_store", "get_source"),
"template": ("template_store", "get_template"),
"capture_template": ("template_store", "get_template"),
"pp_template": ("pp_template_store", "get_template"),
"automation": ("automation_store", "get_automation"),
"scene_preset": ("scene_preset_store", "get_preset"),
"scene_playlist": ("scene_playlist_store", "get_playlist"),
"sync_clock": ("sync_clock_store", "get_clock"),
"gradient": ("gradient_store", "get_gradient"),
"audio_template": ("audio_template_store", "get_template"),
"value_source": ("value_source_store", "get_source"),
"cspt": ("cspt_store", "get_template"),
"audio_processing_template": ("audio_processing_template_store", "get_template"),
"pattern_template": ("pattern_template_store", "get_template"),
"home_assistant_source": ("ha_store", "get_source"),
"mqtt_source": ("mqtt_store", "get_source"),
"http_endpoint": ("http_endpoint_store", "get_endpoint"),
}
entry = _STORE_LOOKUP.get(entity_type)
if entry is None:
return None
store_key, method_name = entry
store = _deps.get(store_key)
if store is None:
return None
try:
obj = getattr(store, method_name)(entity_id)
if obj is not None:
return getattr(obj, "name", None)
except Exception:
pass
return None
def fire_entity_event(
entity_type: str,
action: str,
entity_id: str,
entity_name: str | None = None,
) -> None:
"""Fire an entity_changed event via the ProcessorManager event bus and
record an audit entry.
Args:
entity_type: e.g. "device", "output_target", "color_strip_source"
action: "created", "updated", or "deleted"
entity_id: The entity's unique ID
entity_name: Human-readable name. For deletes: **must** be passed
explicitly (entity is already gone when we get here).
For create/update: resolved from the store when not supplied.
"""
pm = _deps.get("processor_manager")
if pm is not None:
@@ -233,6 +290,38 @@ def fire_entity_event(entity_type: str, action: str, entity_id: str) -> None:
}
)
# ── Audit record (best-effort) ──────────────────────────────────────────
rec = get_module_recorder()
if rec is None:
return
# Resolve name when not explicitly provided (create / update paths).
# For deleted: entity already gone — rely on the explicitly passed name.
resolved_name = entity_name
if resolved_name is None and action != "deleted":
resolved_name = _resolve_entity_name(entity_type, entity_id)
# Build a concise human message.
# Sanitize the display name before interpolating into the free-text message
# (user-authored names hit the CSV/export trust surface).
safe_display_name = sanitize_display(resolved_name) if resolved_name else None
display_name = f"'{safe_display_name}'" if safe_display_name else entity_id
action_word = {"created": "created", "updated": "updated", "deleted": "deleted"}.get(
action, action
)
entity_label = entity_type.replace("_", " ")
message = f"{entity_label.capitalize()} {display_name} {action_word}"
rec.record(
category=ActivityCategory.ENTITY,
action=f"entity.{action}",
severity=ActivitySeverity.INFO,
entity_type=entity_type,
entity_id=entity_id,
entity_name=resolved_name,
message=message,
)
# ── Initialization ──────────────────────────────────────────────────────
@@ -182,6 +182,12 @@ async def delete_audio_source(
css_store: ColorStripStore = Depends(get_color_strip_store),
):
"""Delete an audio source."""
_entity_name: str | None = None
try:
_entity_name = store.get_source(source_id).name
except Exception:
pass
try:
# Check if any CSS entities reference this audio source
from ledgrab.storage.color_strip_source import AudioColorStripSource
@@ -194,7 +200,7 @@ async def delete_audio_source(
raise ValueError(f"Cannot delete: referenced by color strip source '{css.name}'")
store.delete_source(source_id)
fire_entity_event("audio_source", "deleted", source_id)
fire_entity_event("audio_source", "deleted", source_id, entity_name=_entity_name)
except EntityNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
+7 -1
View File
@@ -329,6 +329,12 @@ async def delete_automation(
engine: AutomationEngine = Depends(get_automation_engine),
):
"""Delete an automation."""
_entity_name: str | None = None
try:
_entity_name = store.get_automation(automation_id).name
except Exception:
pass
# Deactivate first
await engine.deactivate_if_active(automation_id)
@@ -337,7 +343,7 @@ async def delete_automation(
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
fire_entity_event("automation", "deleted", automation_id)
fire_entity_event("automation", "deleted", automation_id, entity_name=_entity_name)
# ===== Enable/Disable =====
+30 -1
View File
@@ -27,6 +27,7 @@ from ledgrab.api.schemas.system import (
)
from ledgrab.config import get_config
from ledgrab.core.backup.auto_backup import AutoBackupEngine
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
from ledgrab.storage.asset_store import AssetStore
from ledgrab.storage.database import Database, freeze_writes
from ledgrab.utils import get_logger, read_upload_capped
@@ -35,6 +36,22 @@ logger = get_logger(__name__)
router = APIRouter()
def _record_system(action: str, message: str, metadata: dict | None = None) -> None:
"""Best-effort audit record for a system-level event."""
from ledgrab.core.activity_log.recorder import get_module_recorder
rec = get_module_recorder()
if rec is not None:
rec.record(
category=ActivityCategory.SYSTEM,
action=action,
severity=ActivitySeverity.INFO,
message=message,
metadata=metadata or {},
)
_SERVER_DIR = Path(__file__).resolve().parents[4]
@@ -143,6 +160,8 @@ def backup_config(
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H%M%S")
filename = f"ledgrab-backup-{timestamp}.zip"
_record_system("backup.created", f"Backup downloaded: {filename}", {"filename": filename})
return StreamingResponse(
zip_buffer,
media_type="application/zip",
@@ -243,6 +262,7 @@ async def restore_config(
freeze_writes()
logger.info("Database restored from uploaded backup. Scheduling restart...")
_record_system("backup.restored", "Database restored from uploaded backup")
_schedule_restart()
return RestoreResponse(
@@ -257,6 +277,7 @@ def restart_server(_: AuthRequired):
"""Schedule a server restart and return immediately."""
from ledgrab.server_ref import _broadcast_restarting
_record_system("server.restarting", "Server restart requested by user")
_broadcast_restarting()
_schedule_restart()
return {"status": "restarting"}
@@ -267,6 +288,7 @@ def shutdown_server(_: AuthRequired):
"""Gracefully shut down the server."""
from ledgrab.server_ref import request_shutdown
_record_system("server.shutdown_requested", "Server shutdown requested by user")
request_shutdown()
return {"status": "shutting_down"}
@@ -300,11 +322,17 @@ async def update_auto_backup_settings(
engine: AutoBackupEngine = Depends(get_auto_backup_engine),
):
"""Update auto-backup settings (enable/disable, interval, max backups)."""
return await engine.update_settings(
result = await engine.update_settings(
enabled=body.enabled,
interval_hours=body.interval_hours,
max_backups=body.max_backups,
)
_record_system(
"settings.changed",
f"Auto-backup settings updated (enabled={body.enabled})",
{"setting_key": "auto_backup", "enabled": body.enabled},
)
return result
@router.post("/api/v1/system/auto-backup/trigger", tags=["System"])
@@ -365,4 +393,5 @@ async def delete_saved_backup(
engine.delete_backup(filename)
except (ValueError, FileNotFoundError) as e:
raise HTTPException(status_code=404, detail=str(e))
_record_system("backup.deleted", f"Saved backup deleted: {filename}", {"filename": filename})
return {"status": "deleted", "filename": filename}
@@ -36,6 +36,7 @@ from fastapi import APIRouter, Depends, HTTPException
from ledgrab.api.auth import AuthRequired
from ledgrab.api.dependencies import get_processor_manager
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
from ledgrab.api.schemas.calibration import (
CalibrationSessionPositionRequest,
CalibrationSessionStartRequest,
@@ -81,6 +82,19 @@ async def start_calibration_session(
logger.error("Failed to start calibration session: %s", exc, exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
from ledgrab.core.activity_log.recorder import get_module_recorder
rec = get_module_recorder()
if rec is not None:
rec.record(
category=ActivityCategory.SYSTEM,
action="calibration.started",
severity=ActivitySeverity.INFO,
entity_type="device",
entity_id=body.device_id,
message=f"Calibration session started for device '{body.device_id}'",
)
return CalibrationSessionStateResponse(**session.get_state())
@@ -135,6 +149,17 @@ async def stop_calibration_session(
logger.error("Failed to stop calibration session: %s", exc, exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
from ledgrab.core.activity_log.recorder import get_module_recorder
rec = get_module_recorder()
if rec is not None:
rec.record(
category=ActivityCategory.SYSTEM,
action="calibration.stopped",
severity=ActivitySeverity.INFO,
message="Calibration session stopped",
)
return CalibrationSessionStateResponse(**session.get_state())
@@ -155,6 +180,17 @@ async def cancel_calibration_session(
logger.error("Failed to cancel calibration session: %s", exc, exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
from ledgrab.core.activity_log.recorder import get_module_recorder
rec = get_module_recorder()
if rec is not None:
rec.record(
category=ActivityCategory.SYSTEM,
action="calibration.cancelled",
severity=ActivitySeverity.INFO,
message="Calibration session cancelled",
)
return CalibrationSessionStateResponse(**session.get_state())
@@ -167,6 +167,12 @@ async def delete_color_strip_source(
target_store: OutputTargetStore = Depends(get_output_target_store),
):
"""Delete a color strip source. Returns 409 if referenced by any LED target."""
_entity_name: str | None = None
try:
_entity_name = store.get_source(source_id).name
except Exception:
pass
try:
target_names = target_store.get_targets_referencing_css(source_id)
if target_names:
@@ -201,7 +207,7 @@ async def delete_color_strip_source(
"Delete or reassign the processed source(s) first.",
)
store.delete_source(source_id)
fire_entity_event("color_strip_source", "deleted", source_id)
fire_entity_event("color_strip_source", "deleted", source_id, entity_name=_entity_name)
except HTTPException:
raise
except ValueError as e:
+8 -1
View File
@@ -701,6 +701,13 @@ async def delete_device(
):
"""Delete/detach a device. Returns 409 if referenced by a target."""
try:
# Resolve name before deletion for the audit record.
_entity_name: str | None = None
try:
_entity_name = store.get_device(device_id).name
except Exception:
pass
# Check if any target references this device
refs = target_store.get_targets_for_device(device_id)
if refs:
@@ -728,7 +735,7 @@ async def delete_device(
# Delete from storage
store.delete_device(device_id)
fire_entity_event("device", "deleted", device_id)
fire_entity_event("device", "deleted", device_id, entity_name=_entity_name)
logger.info(f"Deleted device {device_id}")
except HTTPException:
+7 -1
View File
@@ -152,13 +152,19 @@ async def delete_gradient(
css_store: ColorStripStore = Depends(get_color_strip_store),
):
"""Delete a gradient (fails if built-in or referenced by sources)."""
_entity_name: str | None = None
try:
_entity_name = store.get_gradient(gradient_id).name
except Exception:
pass
try:
# Check references
for source in css_store.get_all_sources():
if getattr(source, "gradient_id", None) == gradient_id:
raise ValueError(f"Cannot delete: referenced by color strip source '{source.name}'")
store.delete_gradient(gradient_id)
fire_entity_event("gradient", "deleted", gradient_id)
fire_entity_event("gradient", "deleted", gradient_id, entity_name=_entity_name)
except (ValueError, EntityNotFoundError) as e:
status = 404 if "not found" in str(e).lower() else 400
raise HTTPException(status_code=status, detail=str(e))
@@ -624,6 +624,13 @@ async def delete_target(
):
"""Delete a output target. Stops processing first if active."""
try:
# Resolve name before deletion for the audit record.
_entity_name: str | None = None
try:
_entity_name = target_store.get_target(target_id).name
except Exception:
pass
# Stop processing if running
try:
await manager.stop_processing(target_id)
@@ -641,7 +648,7 @@ async def delete_target(
# Delete from store
target_store.delete_target(target_id)
fire_entity_event("output_target", "deleted", target_id)
fire_entity_event("output_target", "deleted", target_id, entity_name=_entity_name)
logger.info(f"Deleted target {target_id}")
except ValueError as e:
@@ -12,6 +12,7 @@ from ledgrab.api.dependencies import (
get_picture_source_store,
get_processor_manager,
)
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
from ledgrab.api.schemas.output_targets import (
BulkTargetRequest,
BulkTargetResponse,
@@ -35,6 +36,23 @@ logger = get_logger(__name__)
router = APIRouter()
def _record_capture(action: str, target_id: str, target_name: str | None, message: str) -> None:
"""Best-effort audit record for a capture start/stop action."""
from ledgrab.core.activity_log.recorder import get_module_recorder
rec = get_module_recorder()
if rec is not None:
rec.record(
category=ActivityCategory.CAPTURE,
action=action,
severity=ActivitySeverity.INFO,
entity_type="output_target",
entity_id=target_id,
entity_name=target_name,
message=message,
)
# ===== BULK PROCESSING CONTROL ENDPOINTS =====
@@ -53,10 +71,16 @@ async def bulk_start_processing(
for target_id in body.ids:
try:
target_store.get_target(target_id)
_tgt = target_store.get_target(target_id)
await manager.start_processing(target_id)
started.append(target_id)
logger.info(f"Bulk start: started processing for target {target_id}")
_record_capture(
"capture.started",
target_id,
getattr(_tgt, "name", None),
f"Capture started for target '{getattr(_tgt, 'name', target_id)}' (bulk)",
)
except ValueError as e:
errors[target_id] = str(e)
except RuntimeError as e:
@@ -78,6 +102,7 @@ async def bulk_start_processing(
async def bulk_stop_processing(
body: BulkTargetRequest,
_auth: AuthRequired,
target_store: OutputTargetStore = Depends(get_output_target_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Stop processing for multiple output targets. Returns lists of stopped IDs and per-ID errors."""
@@ -89,6 +114,17 @@ async def bulk_stop_processing(
await manager.stop_processing(target_id)
stopped.append(target_id)
logger.info(f"Bulk stop: stopped processing for target {target_id}")
_tgt_name: str | None = None
try:
_tgt_name = target_store.get_target(target_id).name
except Exception:
pass
_record_capture(
"capture.stopped",
target_id,
_tgt_name,
f"Capture stopped for target '{_tgt_name or target_id}' (bulk)",
)
except ValueError as e:
errors[target_id] = str(e)
except Exception as e:
@@ -112,11 +148,17 @@ async def start_processing(
logger.info("Start processing requested for target %s", target_id)
try:
# Verify target exists in store
target_store.get_target(target_id)
target = target_store.get_target(target_id)
await manager.start_processing(target_id)
logger.info(f"Started processing for target {target_id}")
_record_capture(
"capture.started",
target_id,
getattr(target, "name", None),
f"Capture started for target '{getattr(target, 'name', target_id)}'",
)
return {"status": "started", "target_id": target_id}
except ValueError as e:
@@ -137,6 +179,7 @@ async def start_processing(
async def stop_processing(
target_id: str,
_auth: AuthRequired,
target_store: OutputTargetStore = Depends(get_output_target_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Stop processing for a output target."""
@@ -144,6 +187,17 @@ async def stop_processing(
await manager.stop_processing(target_id)
logger.info(f"Stopped processing for target {target_id}")
_target_name: str | None = None
try:
_target_name = target_store.get_target(target_id).name
except Exception:
pass
_record_capture(
"capture.stopped",
target_id,
_target_name,
f"Capture stopped for target '{_target_name or target_id}'",
)
return {"status": "stopped", "target_id": target_id}
except ValueError as e:
@@ -374,6 +374,12 @@ async def delete_picture_source(
css_store: ColorStripStore = Depends(get_color_strip_store),
):
"""Delete a picture source."""
_entity_name: str | None = None
try:
_entity_name = store.get_stream(stream_id).name
except Exception:
pass
try:
# Check if any target transitively references this stream via a CSS
target_names = store.get_targets_referencing(stream_id, target_store, css_store)
@@ -395,7 +401,7 @@ async def delete_picture_source(
f"{css_names}. Please reassign or delete those first.",
)
store.delete_stream(stream_id)
fire_entity_event("picture_source", "deleted", stream_id)
fire_entity_event("picture_source", "deleted", stream_id, entity_name=_entity_name)
except HTTPException:
raise
except EntityNotFoundError as e:
@@ -220,13 +220,19 @@ async def delete_scene_playlist(
engine: PlaylistEngine = Depends(get_playlist_engine),
):
"""Delete a scene playlist (stops it first if it is currently cycling)."""
_entity_name: str | None = None
try:
_entity_name = store.get_playlist(playlist_id).name
except Exception:
pass
try:
store.delete_playlist(playlist_id)
except (ValueError, EntityNotFoundError) as e:
raise HTTPException(status_code=404, detail=str(e))
await engine.stop_if_running(playlist_id)
fire_entity_event("scene_playlist", "deleted", playlist_id)
fire_entity_event("scene_playlist", "deleted", playlist_id, entity_name=_entity_name)
# ===== Cycling control =====
@@ -255,6 +261,27 @@ async def start_scene_playlist(
raise HTTPException(status_code=400, detail=str(e))
fire_entity_event("scene_playlist", "updated", playlist_id)
from ledgrab.core.activity_log.recorder import get_module_recorder
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
rec = get_module_recorder()
if rec is not None:
_pl_name: str | None = None
try:
_pl_name = store.get_playlist(playlist_id).name
except Exception:
pass
rec.record(
category=ActivityCategory.CAPTURE,
action="playlist.started",
severity=ActivitySeverity.INFO,
entity_type="scene_playlist",
entity_id=playlist_id,
entity_name=_pl_name,
message=f"Playlist '{_pl_name or playlist_id}' started",
)
return PlaylistRuntimeStateSchema(**engine.get_state())
@@ -265,11 +292,34 @@ async def start_scene_playlist(
)
async def stop_scene_playlist(
_auth: AuthRequired,
store: ScenePlaylistStore = Depends(get_scene_playlist_store),
engine: PlaylistEngine = Depends(get_playlist_engine),
):
"""Stop the active playlist (leaves the last applied scene in place)."""
stopped_id = engine.get_running_playlist_id()
_stopped_name: str | None = None
if stopped_id:
try:
_stopped_name = store.get_playlist(stopped_id).name
except Exception:
pass
await engine.stop()
if stopped_id:
fire_entity_event("scene_playlist", "updated", stopped_id)
from ledgrab.core.activity_log.recorder import get_module_recorder
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
rec = get_module_recorder()
if rec is not None:
rec.record(
category=ActivityCategory.CAPTURE,
action="playlist.stopped",
severity=ActivitySeverity.INFO,
entity_type="scene_playlist",
entity_id=stopped_id,
entity_name=_stopped_name,
message=f"Playlist '{_stopped_name or stopped_id}' stopped",
)
return PlaylistRuntimeStateSchema(**engine.get_state())
+23 -1
View File
@@ -208,12 +208,18 @@ async def delete_scene_preset(
store: ScenePresetStore = Depends(get_scene_preset_store),
):
"""Delete a scene preset."""
_entity_name: str | None = None
try:
_entity_name = store.get_preset(preset_id).name
except Exception:
pass
try:
store.delete_preset(preset_id)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
fire_entity_event("scene_preset", "deleted", preset_id)
fire_entity_event("scene_preset", "deleted", preset_id, entity_name=_entity_name)
# ===== Recapture =====
@@ -282,4 +288,20 @@ async def activate_scene_preset(
logger.info(f"Scene preset '{preset.name}' activated successfully")
fire_entity_event("scene_preset", "updated", preset_id)
from ledgrab.core.activity_log.recorder import get_module_recorder
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
rec = get_module_recorder()
if rec is not None:
rec.record(
category=ActivityCategory.CAPTURE,
action="scene.activated",
severity=ActivitySeverity.INFO,
entity_type="scene_preset",
entity_id=preset_id,
entity_name=preset.name,
message=f"Scene preset '{preset.name}' activated",
)
return ActivateResponse(status=status, errors=errors)
+7 -1
View File
@@ -149,6 +149,12 @@ async def delete_sync_clock(
manager: SyncClockManager = Depends(get_sync_clock_manager),
):
"""Delete a synchronization clock (fails if referenced by CSS or value sources)."""
_entity_name: str | None = None
try:
_entity_name = store.get_clock(clock_id).name
except Exception:
pass
try:
# Check references
for source in css_store.get_all_sources():
@@ -159,7 +165,7 @@ async def delete_sync_clock(
raise ValueError(f"Cannot delete: referenced by value source '{vs.name}'")
manager.release_all_for(clock_id)
store.delete_clock(clock_id)
fire_entity_event("sync_clock", "deleted", clock_id)
fire_entity_event("sync_clock", "deleted", clock_id, entity_name=_entity_name)
except EntityNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
@@ -21,11 +21,29 @@ from ledgrab.api.schemas.system import (
ShutdownActionRequest,
ShutdownActionResponse,
)
from ledgrab.core.activity_log.sanitize import sanitize_display
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
from ledgrab.storage.database import Database
from ledgrab.utils import get_logger
logger = get_logger(__name__)
def _record_setting(action: str, key: str, message: str) -> None:
"""Best-effort audit record for a high-value settings change."""
from ledgrab.core.activity_log.recorder import get_module_recorder
rec = get_module_recorder()
if rec is not None:
rec.record(
category=ActivityCategory.SYSTEM,
action=action,
severity=ActivitySeverity.INFO,
message=message,
metadata={"setting_key": key},
)
router = APIRouter()
@@ -117,6 +135,11 @@ async def update_shutdown_action(
"""Set what happens to LED targets when the server shuts down."""
db.set_setting("shutdown_action", {"action": body.action})
logger.info("Shutdown action updated: %s", body.action)
_record_setting(
"settings.changed",
"shutdown_action",
f"Shutdown action set to '{body.action}'",
)
return ShutdownActionResponse(action=body.action)
@@ -246,6 +269,17 @@ async def adb_connect(_: AuthRequired, request: AdbConnectRequest):
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
output = (stdout.decode() + stderr.decode()).strip()
if "connected" in output.lower():
from ledgrab.core.activity_log.recorder import get_module_recorder
rec = get_module_recorder()
if rec is not None:
rec.record(
category=ActivityCategory.DEVICE,
action="device.adb_connected",
severity=ActivitySeverity.INFO,
message=f"ADB device connected: {sanitize_display(address)}",
metadata={"address": address},
)
return {"status": "connected", "address": address, "message": output}
raise HTTPException(status_code=400, detail=output or "Connection failed")
except FileNotFoundError:
@@ -276,6 +310,17 @@ async def adb_disconnect(_: AuthRequired, request: AdbConnectRequest):
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
from ledgrab.core.activity_log.recorder import get_module_recorder
rec = get_module_recorder()
if rec is not None:
rec.record(
category=ActivityCategory.DEVICE,
action="device.adb_disconnected",
severity=ActivitySeverity.INFO,
message=f"ADB device disconnected: {sanitize_display(address)}",
metadata={"address": address},
)
return {"status": "disconnected", "message": stdout.decode().strip()}
except FileNotFoundError:
raise HTTPException(status_code=500, detail="adb not found on PATH")
+7 -1
View File
@@ -183,6 +183,12 @@ async def delete_template(
Validates that no streams are currently using this template before deletion.
"""
_entity_name: str | None = None
try:
_entity_name = template_store.get_template(template_id).name
except Exception:
pass
try:
# Check if any streams are using this template
streams_using_template = []
@@ -203,7 +209,7 @@ async def delete_template(
# Proceed with deletion
template_store.delete_template(template_id)
fire_entity_event("capture_template", "deleted", template_id)
fire_entity_event("capture_template", "deleted", template_id, entity_name=_entity_name)
except HTTPException:
raise # Re-raise HTTP exceptions as-is
+37 -1
View File
@@ -12,6 +12,7 @@ from ledgrab.api.schemas.update import (
UpdateStatusResponse,
)
from ledgrab.core.update.update_service import UpdateService
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
from ledgrab.utils import get_logger
logger = get_logger(__name__)
@@ -42,6 +43,17 @@ async def dismiss_update(
service: UpdateService = Depends(get_update_service),
):
service.dismiss(body.version)
from ledgrab.core.activity_log.recorder import get_module_recorder
rec = get_module_recorder()
if rec is not None:
rec.record(
category=ActivityCategory.SYSTEM,
action="update.dismissed",
severity=ActivitySeverity.INFO,
message=f"Update dismissed: {body.version}",
metadata={"version": body.version},
)
return {"ok": True}
@@ -63,6 +75,18 @@ async def apply_update(
)
try:
await service.apply_update()
from ledgrab.core.activity_log.recorder import get_module_recorder
rec = get_module_recorder()
if rec is not None:
version = status.get("available_version", "unknown")
rec.record(
category=ActivityCategory.SYSTEM,
action="update.applied",
severity=ActivitySeverity.INFO,
message=f"Update applied: {version}",
metadata={"version": version},
)
return {"ok": True, "message": "Update applied, server shutting down"}
except Exception as exc:
logger.error("Failed to apply update: %s", exc, exc_info=True)
@@ -83,8 +107,20 @@ async def update_update_settings(
body: UpdateSettingsRequest,
service: UpdateService = Depends(get_update_service),
):
return await service.update_settings(
result = await service.update_settings(
enabled=body.enabled,
check_interval_hours=body.check_interval_hours,
include_prerelease=body.include_prerelease,
)
from ledgrab.core.activity_log.recorder import get_module_recorder
rec = get_module_recorder()
if rec is not None:
rec.record(
category=ActivityCategory.SYSTEM,
action="settings.changed",
severity=ActivitySeverity.INFO,
message=f"Update settings changed (enabled={body.enabled})",
metadata={"setting_key": "update", "enabled": body.enabled},
)
return result
@@ -15,9 +15,11 @@ Phase 3 adds the instrumentation call sites.
from ledgrab.core.activity_log.context import current_actor
from ledgrab.core.activity_log.recorder import ActivityRecorder
from ledgrab.core.activity_log.retention import ActivityLogRetentionEngine
from ledgrab.core.activity_log.sanitize import sanitize_display
__all__ = [
"ActivityRecorder",
"ActivityLogRetentionEngine",
"current_actor",
"sanitize_display",
]
@@ -0,0 +1,82 @@
"""Log-injection sanitizer for audit-log message and display strings.
Provides :func:`sanitize_display` a dependency-free helper that strips
characters that should not appear in a recorded ``message`` or display
string before it is persisted to SQLite, broadcast over WebSocket, or
exported to CSV.
Design constraints
------------------
- **Dependency-free**: uses only the Python standard library so it can be
imported from any module without adding transitive weight.
- **Conservative**: keeps printable ASCII/Unicode and normal spaces; drops
everything else including control chars (NUL, BEL, BS, VT, FF, ESC,
DEL), ANSI/CSI escape sequences (``\\x1b[...``), and carriage returns /
newlines / tabs which are the classic log-injection primitives.
- **Length-capped**: truncates to *maxlen* characters and appends ``""``
so callers can rely on a bounded string without adding their own guards.
"""
from __future__ import annotations
import re
# Matches ANSI/VT100 escape sequences: ESC [ ... m (CSI) and shorter forms.
# We strip these before the printable-char filter so the bracket/letters that
# follow the ESC don't survive stripping the ESC alone.
_ANSI_RE = re.compile(r"\x1b(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
# Characters we explicitly want to remove even if str.isprintable() would
# let them through in some edge-case: NUL is the canonical SQL/log null-byte
# injection; the others are kept out by the printable check but listed here
# for documentation clarity.
_EXPLICIT_DROP = frozenset("\x00\r\n\t")
def sanitize_display(value: str | None, *, maxlen: int = 120) -> str:
"""Return a sanitized, length-capped version of *value* safe for log messages.
Parameters
----------
value:
The raw, potentially attacker-controlled string. ``None`` or empty
returns ``""``.
maxlen:
Maximum length of the returned string (default: 120). If the input
exceeds this length after sanitization, the string is truncated and
``""`` is appended (the ellipsis counts toward *maxlen*).
Returns
-------
str
A string that:
- contains no NUL bytes (``\\x00``),
- contains no ANSI/CSI escape sequences,
- contains no carriage returns, newlines, or tab characters,
- contains only characters for which ``str.isprintable()`` is ``True``
plus the regular ASCII space (``\\x20``),
- is at most *maxlen* characters long.
"""
if not value:
return ""
# 1. Strip ANSI escape sequences first so their bracket/letter tails don't
# survive as stray printable characters.
cleaned = _ANSI_RE.sub("", value)
# 2. Drop each character that is neither printable nor a plain space.
# str.isprintable() returns False for all control chars (including NUL,
# BEL, BS, TAB, LF, VT, FF, CR, ESC, DEL) and True for normal letters,
# digits, punctuation, and the space character.
cleaned = "".join(ch for ch in cleaned if ch.isprintable() or ch == " ")
# 3. Final belt-and-suspenders pass for the explicit drop set (catches NUL
# that may survive if isprintable ever changes in a future Python version).
cleaned = "".join(ch for ch in cleaned if ch not in _EXPLICIT_DROP)
# 4. Cap length.
if len(cleaned) > maxlen:
# Reserve one character for the ellipsis so total length == maxlen.
cleaned = cleaned[: maxlen - 1] + ""
return cleaned
@@ -726,6 +726,26 @@ class AutomationEngine:
else:
logger.info(f"Automation '{automation.name}' activated (scene '{preset.name}' applied)")
# Audit record — best-effort.
try:
from ledgrab.core.activity_log.recorder import get_module_recorder
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
rec = get_module_recorder()
if rec is not None:
rec.record(
category=ActivityCategory.CAPTURE,
action="automation.activated",
severity=ActivitySeverity.INFO,
actor="system",
entity_type="automation",
entity_id=automation.id,
entity_name=automation.name,
message=f"Automation '{automation.name}' activated",
)
except Exception:
pass
async def _deactivate_automation(self, automation_id: str) -> None:
was_active = self._active_automations.pop(automation_id, False)
if not was_active:
@@ -751,6 +771,31 @@ class AutomationEngine:
# Clean up any leftover snapshot
self._pre_activation_snapshots.pop(automation_id, None)
# Audit record — best-effort.
try:
from ledgrab.core.activity_log.recorder import get_module_recorder
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
rec = get_module_recorder()
if rec is not None:
_auto_name: str | None = None
try:
_auto_name = self._store.get_automation(automation_id).name
except Exception:
pass
rec.record(
category=ActivityCategory.CAPTURE,
action="automation.deactivated",
severity=ActivitySeverity.INFO,
actor="system",
entity_type="automation",
entity_id=automation_id,
entity_name=_auto_name,
message=f"Automation '{_auto_name or automation_id}' deactivated",
)
except Exception:
pass
async def _deactivate_revert(self, automation_id: str) -> None:
"""Revert to pre-activation snapshot."""
snapshot = self._pre_activation_snapshots.pop(automation_id, None)
@@ -36,6 +36,7 @@ from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZerocon
from ledgrab.core.devices.serial_transport import list_serial_ports
from ledgrab.core.devices.wled_provider import WLED_MDNS_TYPE
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
from ledgrab.utils import get_logger
from ledgrab.utils.platform import is_android
@@ -286,3 +287,34 @@ class DiscoveryWatcher:
)
except Exception as e:
logger.debug("Discovery watcher: fire_event failed: %s", e)
# Audit record — best-effort, thread-safe (recorder marshals via
# call_soon_threadsafe when called from the zeroconf thread).
try:
from ledgrab.core.activity_log.recorder import get_module_recorder
from ledgrab.core.activity_log.sanitize import sanitize_display
rec = get_module_recorder()
if rec is not None:
is_discovered = event_type == "device_discovered"
action = "device.discovered" if is_discovered else "device.lost"
severity = ActivitySeverity.INFO if is_discovered else ActivitySeverity.WARNING
verb = "discovered" if is_discovered else "lost"
# Sanitize mDNS-advertised strings before they enter the log.
# entry.name and entry.url are unauthenticated, attacker-controlled
# values; strip control chars, ANSI escapes, and NUL before use.
safe_name = sanitize_display(entry.name)
safe_url = sanitize_display(entry.url)
rec.record(
category=ActivityCategory.DEVICE,
action=action,
severity=severity,
actor="system",
entity_type="device",
entity_id=entry.url,
entity_name=safe_name,
message=f"Device '{safe_name}' {verb} at {safe_url}",
metadata={"url": safe_url, "device_type": entry.device_type},
)
except Exception as e:
logger.debug("Discovery watcher: audit record failed: %s", e)
@@ -11,6 +11,7 @@ from ledgrab.core.devices.led_client import (
check_device_health,
get_device_capabilities,
)
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
from ledgrab.utils import get_logger
logger = get_logger(__name__)
@@ -128,6 +129,34 @@ class DeviceHealthMixin:
"latency_ms": state.health.latency_ms,
}
)
# Audit record for device online/offline transition.
from ledgrab.core.activity_log.recorder import get_module_recorder
rec = get_module_recorder()
if rec is not None:
is_online = state.health.online
# Best-effort name lookup from the device store.
device_name: str | None = None
try:
if self._device_store is not None:
device_name = self._device_store.get_device(device_id).name
except Exception:
pass
display = device_name or device_id
action = "device.online" if is_online else "device.offline"
severity = ActivitySeverity.INFO if is_online else ActivitySeverity.WARNING
status_word = "came online" if is_online else "went offline"
rec.record(
category=ActivityCategory.DEVICE,
action=action,
severity=severity,
actor="system",
entity_type="device",
entity_id=device_id,
entity_name=device_name,
message=f"Device '{display}' {status_word}",
metadata={"latency_ms": state.health.latency_ms},
)
# Auto-sync LED count
reported = state.health.device_led_count
+28
View File
@@ -285,6 +285,34 @@ def sample_calibration():
}
# ---------------------------------------------------------------------------
# Auth throttle isolation — reset per-IP throttle state between every test
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _reset_auth_throttle():
"""Clear the auth-failure audit throttle dict before (and after) each test.
The module-global ``_auth_record_last`` dict in ``ledgrab.api.auth`` persists
across tests. When multiple tests trigger an auth failure from the SAME
client IP within the 10 s window they share, the second test gets throttled
(0 records) and assertions like "expected exactly 1 auth.rejected" fail.
This fixture resets the dict to a clean state so every test starts with a
fresh throttle window. The production throttle behavior is UNCHANGED only
test isolation is affected.
"""
import ledgrab.api.auth as _auth_mod
_throttle = getattr(_auth_mod, "_auth_record_last", None)
if _throttle is not None:
_throttle.clear()
yield
if _throttle is not None:
_throttle.clear()
# ---------------------------------------------------------------------------
# Session cleanup — remove temporary test directory
# ---------------------------------------------------------------------------
@@ -0,0 +1,614 @@
"""Integration tests for Phase 3: Event instrumentation.
Coverage targets
----------------
- Entity create/update/delete emits a record with correct category/actor/name.
- An entity DELETE carries the entity name (not None).
- An auth failure emits a ``warning`` record; the attempted token NEVER appears
in any recorded field.
- A device health transition emits a record.
- A device discovery event emits a record.
- A capture start and a backup-create emit records.
"""
from __future__ import annotations
from unittest.mock import MagicMock, patch
import pytest
from ledgrab.core.activity_log.context import current_actor
from ledgrab.core.activity_log.recorder import ActivityRecorder
from ledgrab.storage.activity_log import ActivityCategory, ActivitySeverity
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_recorder() -> tuple[ActivityRecorder, list, list]:
"""Return (recorder, persisted_entries, fired_events)."""
repo = MagicMock()
persisted: list = []
repo.record.side_effect = lambda entry: persisted.append(entry)
pm = MagicMock()
fired: list[dict] = []
pm.fire_event.side_effect = lambda evt: fired.append(evt)
recorder = ActivityRecorder(repo, pm)
return recorder, persisted, fired
def _patch_module_recorder(recorder: ActivityRecorder):
"""Context manager: patch the module-level recorder used by all non-DI sites."""
return patch(
"ledgrab.core.activity_log.recorder._recorder",
recorder,
)
# ---------------------------------------------------------------------------
# Category A: Entity CRUD via fire_entity_event choke-point
# ---------------------------------------------------------------------------
class TestEntityCrud:
"""fire_entity_event records entity create/update/delete with correct fields."""
def test_entity_created_emits_record(self):
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
from ledgrab.api.dependencies import _deps, fire_entity_event
# Minimal _deps so the store lookup returns None (name resolved as None)
original_deps = dict(_deps)
try:
# Clear deps so store lookup path returns None
_deps.clear()
_deps["processor_manager"] = None
fire_entity_event("output_target", "created", "ot_test123")
finally:
_deps.clear()
_deps.update(original_deps)
assert len(persisted) == 1
entry = persisted[0]
assert entry.category == ActivityCategory.ENTITY
assert entry.action == "entity.created"
assert entry.severity == ActivitySeverity.INFO
assert entry.entity_type == "output_target"
assert entry.entity_id == "ot_test123"
def test_entity_deleted_carries_name(self):
"""DELETE: entity_name must be passed explicitly and preserved in record."""
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
from ledgrab.api.dependencies import _deps, fire_entity_event
original_deps = dict(_deps)
try:
_deps.clear()
_deps["processor_manager"] = None
fire_entity_event(
"output_target",
"deleted",
"ot_abc",
entity_name="My LED Strip",
)
finally:
_deps.clear()
_deps.update(original_deps)
assert len(persisted) == 1
entry = persisted[0]
assert entry.action == "entity.deleted"
assert entry.entity_name == "My LED Strip"
# Name should also appear in the human message.
assert "My LED Strip" in entry.message
def test_entity_deleted_without_name_does_not_raise(self):
"""Even if entity_name is omitted on delete, the record is created."""
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
from ledgrab.api.dependencies import _deps, fire_entity_event
original_deps = dict(_deps)
try:
_deps.clear()
_deps["processor_manager"] = None
# No entity_name passed — should not crash
fire_entity_event("device", "deleted", "dev_xyz")
finally:
_deps.clear()
_deps.update(original_deps)
assert len(persisted) == 1
assert persisted[0].action == "entity.deleted"
def test_entity_updated_emits_record(self):
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
from ledgrab.api.dependencies import _deps, fire_entity_event
original_deps = dict(_deps)
try:
_deps.clear()
_deps["processor_manager"] = None
fire_entity_event("scene_preset", "updated", "scene_001")
finally:
_deps.clear()
_deps.update(original_deps)
assert len(persisted) == 1
assert persisted[0].action == "entity.updated"
def test_actor_carried_from_contextvar(self):
"""Actor is resolved from the ContextVar."""
recorder, persisted, _ = _make_recorder()
token = current_actor.set("dev")
try:
with _patch_module_recorder(recorder):
from ledgrab.api.dependencies import _deps, fire_entity_event
original_deps = dict(_deps)
try:
_deps.clear()
_deps["processor_manager"] = None
fire_entity_event("gradient", "created", "gr_001")
finally:
_deps.clear()
_deps.update(original_deps)
finally:
current_actor.reset(token)
assert persisted[0].actor == "dev"
def test_no_record_when_module_recorder_is_none(self):
"""If recorder not initialised, fire_entity_event must not raise."""
with patch("ledgrab.core.activity_log.recorder._recorder", None):
from ledgrab.api.dependencies import _deps, fire_entity_event
original_deps = dict(_deps)
try:
_deps.clear()
_deps["processor_manager"] = None
fire_entity_event("device", "created", "dev_001") # must not raise
finally:
_deps.clear()
_deps.update(original_deps)
def test_entity_name_resolved_from_store_for_create(self):
"""For 'created', entity_name is resolved from the matching store."""
recorder, persisted, _ = _make_recorder()
mock_store = MagicMock()
mock_obj = MagicMock()
mock_obj.name = "Target Alpha"
mock_store.get_target.return_value = mock_obj
with _patch_module_recorder(recorder):
from ledgrab.api.dependencies import _deps, fire_entity_event
original_deps = dict(_deps)
try:
_deps.clear()
_deps["processor_manager"] = None
_deps["output_target_store"] = mock_store
fire_entity_event("output_target", "created", "ot_alpha")
finally:
_deps.clear()
_deps.update(original_deps)
assert len(persisted) == 1
assert persisted[0].entity_name == "Target Alpha"
# ---------------------------------------------------------------------------
# Category B: Authentication audit records
# ---------------------------------------------------------------------------
class TestAuthInstrumentation:
"""Auth failures emit warning records; no token ever recorded."""
_SECRET_TOKEN = "super-secret-token-that-must-never-appear"
def _make_mock_request(self, client_ip: str = "192.168.1.50") -> MagicMock:
req = MagicMock()
req.client = MagicMock()
req.client.host = client_ip
req.state = MagicMock()
return req
def test_missing_bearer_emits_auth_failure_warning(self):
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
from ledgrab.api.auth import verify_api_key
req = self._make_mock_request()
with pytest.raises(Exception): # HTTPException 401
verify_api_key(req, None)
# At least one warning record about auth
warnings = [e for e in persisted if e.severity == ActivitySeverity.WARNING]
assert len(warnings) >= 1
assert all(e.category == ActivityCategory.AUTH for e in warnings)
def test_invalid_token_emits_auth_failure_warning(self):
"""Invalid token => warning record; token itself must NOT appear."""
recorder, persisted, _ = _make_recorder()
creds = MagicMock()
creds.credentials = self._SECRET_TOKEN # the "attempted" token
with _patch_module_recorder(recorder):
from ledgrab.api.auth import verify_api_key
req = self._make_mock_request(client_ip="127.0.0.1")
with pytest.raises(Exception): # HTTPException 401
verify_api_key(req, creds)
# At least one warning-level auth record
warnings = [e for e in persisted if e.severity == ActivitySeverity.WARNING]
assert len(warnings) >= 1
# SECURITY: The attempted token must never appear in ANY field.
for entry in persisted:
assert (
self._SECRET_TOKEN not in entry.message
), "Attempted token found in message field!"
for v in (entry.entity_id, entry.entity_name, entry.actor):
assert v is None or self._SECRET_TOKEN not in str(
v
), f"Attempted token found in field: {v!r}"
for meta_v in entry.metadata.values():
assert self._SECRET_TOKEN not in str(
meta_v
), f"Attempted token found in metadata: {meta_v!r}"
def test_lan_rejection_without_keys_emits_warning(self):
"""LAN request when no keys configured => warning record."""
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
# Override config to have no API keys
with patch("ledgrab.api.auth.get_config") as mock_cfg:
cfg = MagicMock()
cfg.auth.api_keys = {}
mock_cfg.return_value = cfg
from ledgrab.api.auth import verify_api_key
req = self._make_mock_request(client_ip="192.168.1.100")
with pytest.raises(Exception): # HTTPException 401
verify_api_key(req, None)
warnings = [e for e in persisted if e.severity == ActivitySeverity.WARNING]
assert len(warnings) >= 1
assert any("LAN" in e.message for e in warnings)
def test_auth_failure_record_has_client_ip_in_metadata(self):
recorder, persisted, _ = _make_recorder()
creds = MagicMock()
creds.credentials = self._SECRET_TOKEN
with _patch_module_recorder(recorder):
from ledgrab.api.auth import verify_api_key
req = self._make_mock_request(client_ip="10.0.0.5")
with pytest.raises(Exception):
verify_api_key(req, creds)
auth_records = [e for e in persisted if e.category == ActivityCategory.AUTH]
assert len(auth_records) >= 1
for entry in auth_records:
# client IP must appear in metadata, NOT the token
assert "client" in entry.metadata
assert self._SECRET_TOKEN not in str(entry.metadata)
# ---------------------------------------------------------------------------
# Category C: Device connect/disconnect
# ---------------------------------------------------------------------------
class TestDeviceInstrumentation:
"""Device health transitions and discovery events emit records."""
def test_device_offline_emits_warning_record(self):
"""When a device goes from online → offline, a warning record is emitted."""
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
# Create a minimal DeviceHealthMixin-shaped object inline
from ledgrab.core.devices.led_client import DeviceHealth
from ledgrab.core.processing.device_health import DeviceHealthMixin
class FakeManager(DeviceHealthMixin):
def __init__(self):
self._devices = {}
self._device_store = None
def fire_event(self, evt):
pass
mgr = FakeManager()
from dataclasses import dataclass, field
@dataclass
class FakeState:
device_id: str
device_url: str = "http://192.168.1.10"
device_type: str = "wled"
led_count: int = 60
health: DeviceHealth = field(default_factory=DeviceHealth)
health_task: object = None
state = FakeState(device_id="dev_001")
state.health = DeviceHealth(online=True)
mgr._devices["dev_001"] = state
# Simulate what _check_device_health does when online flips
prev_online = True
state.health = DeviceHealth(online=False, latency_ms=0.0)
if state.health.online != prev_online:
mgr.fire_event(
{
"type": "device_health_changed",
"device_id": "dev_001",
"online": state.health.online,
"latency_ms": state.health.latency_ms,
}
)
# Reproduce the audit block from device_health.py
is_online = state.health.online
device_name = None
display = device_name or "dev_001"
action = "device.online" if is_online else "device.offline"
severity = ActivitySeverity.INFO if is_online else ActivitySeverity.WARNING
status_word = "came online" if is_online else "went offline"
recorder.record(
category=ActivityCategory.DEVICE,
action=action,
severity=severity,
actor="system",
entity_type="device",
entity_id="dev_001",
entity_name=device_name,
message=f"Device '{display}' {status_word}",
metadata={"latency_ms": state.health.latency_ms},
)
offline_records = [
e
for e in persisted
if e.category == ActivityCategory.DEVICE and e.action == "device.offline"
]
assert len(offline_records) == 1
r = offline_records[0]
assert r.severity == ActivitySeverity.WARNING
assert r.entity_id == "dev_001"
def test_device_online_emits_info_record(self):
"""When a device comes online, an info record is emitted."""
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
recorder.record(
category=ActivityCategory.DEVICE,
action="device.online",
severity=ActivitySeverity.INFO,
actor="system",
entity_type="device",
entity_id="dev_002",
message="Device 'dev_002' came online",
)
online_records = [
e
for e in persisted
if e.category == ActivityCategory.DEVICE and e.action == "device.online"
]
assert len(online_records) == 1
assert online_records[0].severity == ActivitySeverity.INFO
def test_device_discovered_emits_record(self):
"""DiscoveryWatcher._emit produces an audit record."""
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
from ledgrab.core.devices.discovery_watcher import DiscoveryWatcher, _DiscoveredEntry
mock_device_store = MagicMock()
mock_device_store.get_all_devices.return_value = []
fired_events: list[dict] = []
watcher = DiscoveryWatcher(
device_store=mock_device_store,
fire_event=lambda evt: fired_events.append(evt),
)
entry = _DiscoveredEntry(
key="wled-test._wled._tcp.local.",
url="http://192.168.1.55",
name="WLED-Test",
device_type="wled",
)
watcher._emit("device_discovered", entry)
disc_records = [
e
for e in persisted
if e.category == ActivityCategory.DEVICE and e.action == "device.discovered"
]
assert len(disc_records) == 1
r = disc_records[0]
assert r.severity == ActivitySeverity.INFO
assert r.entity_name == "WLED-Test"
assert "192.168.1.55" in r.metadata.get("url", "")
def test_device_lost_emits_warning_record(self):
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
from ledgrab.core.devices.discovery_watcher import DiscoveryWatcher, _DiscoveredEntry
mock_device_store = MagicMock()
mock_device_store.get_all_devices.return_value = []
watcher = DiscoveryWatcher(
device_store=mock_device_store,
fire_event=lambda evt: None,
)
entry = _DiscoveredEntry(
key="lost-device._wled._tcp.local.",
url="http://192.168.1.77",
name="Lost-WLED",
device_type="wled",
)
watcher._emit("device_lost", entry)
lost_records = [
e
for e in persisted
if e.category == ActivityCategory.DEVICE and e.action == "device.lost"
]
assert len(lost_records) == 1
assert lost_records[0].severity == ActivitySeverity.WARNING
# ---------------------------------------------------------------------------
# Category D: Capture & system events
# ---------------------------------------------------------------------------
class TestCaptureAndSystemInstrumentation:
"""Capture start and backup-create emit records."""
def test_capture_started_record(self):
"""capture.started record is emitted with correct category."""
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
from ledgrab.api.routes.output_targets_control import _record_capture
_record_capture(
"capture.started",
"ot_test",
"My Test Strip",
"Capture started for target 'My Test Strip'",
)
assert len(persisted) == 1
r = persisted[0]
assert r.category == ActivityCategory.CAPTURE
assert r.action == "capture.started"
assert r.entity_id == "ot_test"
assert r.entity_name == "My Test Strip"
def test_capture_stopped_record(self):
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
from ledgrab.api.routes.output_targets_control import _record_capture
_record_capture(
"capture.stopped",
"ot_test",
"Strip",
"Capture stopped for target 'Strip'",
)
assert len(persisted) == 1
assert persisted[0].action == "capture.stopped"
def test_backup_created_record(self):
"""backup.created system record emitted on backup download."""
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
from ledgrab.api.routes.backup import _record_system
_record_system(
"backup.created",
"Backup downloaded: ledgrab-backup-20260101T000000.zip",
{"filename": "ledgrab-backup-20260101T000000.zip"},
)
assert len(persisted) == 1
r = persisted[0]
assert r.category == ActivityCategory.SYSTEM
assert r.action == "backup.created"
assert "backup" in r.message.lower()
def test_backup_restored_record(self):
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
from ledgrab.api.routes.backup import _record_system
_record_system("backup.restored", "Database restored from uploaded backup")
assert len(persisted) == 1
assert persisted[0].action == "backup.restored"
def test_no_token_in_any_system_record(self):
"""System records must never include token-like secrets."""
recorder, persisted, _ = _make_recorder()
_SECRET = "my-api-token-12345" # noqa: S105
with _patch_module_recorder(recorder):
from ledgrab.api.routes.backup import _record_system
# Even if someone tried to pass a token (they shouldn't)
_record_system("backup.created", "Backup created")
for entry in persisted:
assert _SECRET not in entry.message
for v in entry.metadata.values():
assert _SECRET not in str(v)
def test_settings_changed_record(self):
"""shutdown_action settings change emits a system record."""
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
from ledgrab.api.routes.system_settings import _record_setting
_record_setting(
"settings.changed",
"shutdown_action",
"Shutdown action set to 'nothing'",
)
assert len(persisted) == 1
r = persisted[0]
assert r.category == ActivityCategory.SYSTEM
assert r.action == "settings.changed"
assert r.metadata.get("setting_key") == "shutdown_action"
def test_settings_change_excludes_activity_log_key(self):
"""The 'activity_log' settings key must not self-referentially trigger records.
This is enforced by the caller checking the key before calling
_record_setting. Verify our helper does NOT filter automatically (the
responsibility is on the caller), but that the activity_log settings path
in the retention engine does not call record_setting.
"""
# Verify that _record_setting itself doesn't filter — that's not its job.
recorder, persisted, _ = _make_recorder()
with _patch_module_recorder(recorder):
from ledgrab.api.routes.system_settings import _record_setting
# The caller is responsible for not passing "activity_log"
# Calling it with any other key works fine:
_record_setting("settings.changed", "auto_backup", "Auto-backup enabled")
assert persisted[0].metadata["setting_key"] == "auto_backup"
File diff suppressed because it is too large Load Diff