Files
alexei.dolgolyov 7a9ff7ad54 feat(observability): event triggers + log scanner backend
Two paired backends sharing the events.Bus seam:

Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
  redaction on read (placeholder echo treated as "no change" on
  PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
  filters (severity CSV, source CSV, message regex with memoized
  compile cache). Structural loop-prevention: never writes to
  event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
  TierEventTrigger constant, doSendRaw shared with the legacy
  Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
  sending the real TriggerWebhookPayload shape. SSRF guard
  rejects loopback / link-local / unspecified targets. PATCH
  uses pointer-typed DTO for partial updates.

Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
  per-container token bucket, atomic drop counters), tail
  (multiplexed docker frame demuxer with TTY fallback + 16 MiB
  payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
  timestamp strip + UTF-8-safe message truncation), manager
  (5s container polling, atomic.Pointer[Snapshot] hot-reload,
  HitEmitter writes event_log + publishes EventLog so the
  trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
  stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
  EffectiveLogScanRules resolver (globals minus per-workload
  overrides plus workload-only additions). Transactional
  cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
  (sample_line → matched/captures) + /stats (drop counters +
  active tail count + last-snapshot compile errors) +
  GET /api/workloads/{id}/effective-rules.

cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 22:18:11 +03:00

157 lines
4.4 KiB
Go

package logscanner
import (
"bufio"
"bytes"
"context"
"sync"
"sync/atomic"
"testing"
"github.com/alexei/tinyforge/internal/store"
)
// recordingEmitter captures hits from a tail without touching the
// real store or event bus. Concurrent-safe so we can let the tail
// goroutine push hits while the test asserts.
type recordingEmitter struct {
mu sync.Mutex
hits []Hit
}
func (r *recordingEmitter) EmitHit(_ context.Context, hit Hit) {
r.mu.Lock()
r.hits = append(r.hits, hit)
r.mu.Unlock()
}
func (r *recordingEmitter) Hits() []Hit {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]Hit, len(r.hits))
copy(out, r.hits)
return out
}
// snapshotForRule wraps a single rule into an atomic Snapshot
// pointer so tests can drive processLine without rebuilding from
// store rows.
func snapshotForRule(t *testing.T, pattern string) *atomic.Pointer[Snapshot] {
t.Helper()
rows := []store.LogScanRule{
{ID: 1, Name: "t", Pattern: pattern, Severity: "warn", Streams: "all", Enabled: true},
}
snap, errs := BuildSnapshot(rows)
if len(errs) != 0 {
t.Fatalf("BuildSnapshot: %v", errs)
}
p := &atomic.Pointer[Snapshot]{}
p.Store(snap)
return p
}
func TestProcessLine_StripsRFC3339Prefix(t *testing.T) {
emit := &recordingEmitter{}
snap := snapshotForRule(t, `panic`)
tl := &tail{
containerID: "c1",
workloadID: "w1",
engine: NewEngine(),
emitter: emit,
snapshot: snap,
}
tl.processLine(context.Background(), "stderr", "2026-05-11T12:34:56.123456789Z fatal panic in worker")
hits := emit.Hits()
if len(hits) != 1 {
t.Fatalf("want 1 hit, got %d", len(hits))
}
if hits[0].Line != "fatal panic in worker" {
t.Errorf("timestamp not stripped: %q", hits[0].Line)
}
}
func TestProcessLine_LeavesNonTimestampedLineAlone(t *testing.T) {
// The previous heuristic stripped the first word of any line
// whose first space landed past byte 20. A long-hash line with
// no embedded timestamp must now survive intact.
emit := &recordingEmitter{}
snap := snapshotForRule(t, `(?i)hash`)
tl := &tail{
containerID: "c1",
workloadID: "w1",
engine: NewEngine(),
emitter: emit,
snapshot: snap,
}
long := "aaaaaaaaaaaaaaaaaaaaaaaa hash payload"
tl.processLine(context.Background(), "stdout", long)
hits := emit.Hits()
if len(hits) != 1 {
t.Fatalf("want 1 hit, got %d", len(hits))
}
if hits[0].Line != long {
t.Errorf("non-timestamp prefix incorrectly stripped: %q", hits[0].Line)
}
}
func TestProcessLine_NoSnapshotIsSafe(t *testing.T) {
// Tail constructed before the manager loads its first snapshot
// (or after a transient nil) must not crash — processLine
// returns silently when snapshot.Load() is nil.
tl := &tail{
containerID: "c1",
workloadID: "w1",
engine: NewEngine(),
emitter: &recordingEmitter{},
snapshot: &atomic.Pointer[Snapshot]{}, // empty pointer
}
tl.processLine(context.Background(), "stdout", "anything")
}
func TestReadLineFromBuffer(t *testing.T) {
buf := &bytes.Buffer{}
buf.WriteString("line one\nline two\npartial")
got, ok := readLineFromBuffer(buf)
if !ok || got != "line one" {
t.Fatalf("first read: ok=%v got=%q", ok, got)
}
got, ok = readLineFromBuffer(buf)
if !ok || got != "line two" {
t.Fatalf("second read: ok=%v got=%q", ok, got)
}
// Trailing partial line stays in buffer until more data arrives.
if _, ok := readLineFromBuffer(buf); ok {
t.Errorf("partial line should NOT yield until newline arrives")
}
if buf.String() != "partial" {
t.Errorf("remainder=%q want %q", buf.String(), "partial")
}
}
func TestIsMultiplexedStream(t *testing.T) {
// Valid docker frame header: type=2 (stderr), 3 nulls, then 4-byte length.
demuxed := []byte{2, 0, 0, 0, 0, 0, 0, 12, 'h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd', '!'}
if !isMultiplexedStream(newReader(demuxed)) {
t.Error("valid demuxed frame should be detected")
}
// Plain text: first byte is a printable letter, header check fails.
plain := []byte("plain log line without framing\n")
if isMultiplexedStream(newReader(plain)) {
t.Error("plain text should not be detected as multiplexed")
}
// Header with type=3 is invalid (docker uses 0,1,2 only).
bad := []byte{3, 0, 0, 0, 0, 0, 0, 1}
if isMultiplexedStream(newReader(bad)) {
t.Error("type=3 header is not a valid docker frame")
}
}
// newReader returns a *bufio.Reader sized large enough to satisfy
// the Peek(8) the demuxer detection requires.
func newReader(b []byte) *bufio.Reader {
return bufio.NewReaderSize(bytes.NewReader(b), 32)
}