7a9ff7ad54
Two paired backends sharing the events.Bus seam:
Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
redaction on read (placeholder echo treated as "no change" on
PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
filters (severity CSV, source CSV, message regex with memoized
compile cache). Structural loop-prevention: never writes to
event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
TierEventTrigger constant, doSendRaw shared with the legacy
Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
sending the real TriggerWebhookPayload shape. SSRF guard
rejects loopback / link-local / unspecified targets. PATCH
uses pointer-typed DTO for partial updates.
Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
per-container token bucket, atomic drop counters), tail
(multiplexed docker frame demuxer with TTY fallback + 16 MiB
payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
timestamp strip + UTF-8-safe message truncation), manager
(5s container polling, atomic.Pointer[Snapshot] hot-reload,
HitEmitter writes event_log + publishes EventLog so the
trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
EffectiveLogScanRules resolver (globals minus per-workload
overrides plus workload-only additions). Transactional
cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
(sample_line → matched/captures) + /stats (drop counters +
active tail count + last-snapshot compile errors) +
GET /api/workloads/{id}/effective-rules.
cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
157 lines
4.4 KiB
Go
157 lines
4.4 KiB
Go
package logscanner
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
|
|
"github.com/alexei/tinyforge/internal/store"
|
|
)
|
|
|
|
// recordingEmitter captures hits from a tail without touching the
|
|
// real store or event bus. Concurrent-safe so we can let the tail
|
|
// goroutine push hits while the test asserts.
|
|
type recordingEmitter struct {
|
|
mu sync.Mutex
|
|
hits []Hit
|
|
}
|
|
|
|
func (r *recordingEmitter) EmitHit(_ context.Context, hit Hit) {
|
|
r.mu.Lock()
|
|
r.hits = append(r.hits, hit)
|
|
r.mu.Unlock()
|
|
}
|
|
|
|
func (r *recordingEmitter) Hits() []Hit {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
out := make([]Hit, len(r.hits))
|
|
copy(out, r.hits)
|
|
return out
|
|
}
|
|
|
|
// snapshotForRule wraps a single rule into an atomic Snapshot
|
|
// pointer so tests can drive processLine without rebuilding from
|
|
// store rows.
|
|
func snapshotForRule(t *testing.T, pattern string) *atomic.Pointer[Snapshot] {
|
|
t.Helper()
|
|
rows := []store.LogScanRule{
|
|
{ID: 1, Name: "t", Pattern: pattern, Severity: "warn", Streams: "all", Enabled: true},
|
|
}
|
|
snap, errs := BuildSnapshot(rows)
|
|
if len(errs) != 0 {
|
|
t.Fatalf("BuildSnapshot: %v", errs)
|
|
}
|
|
p := &atomic.Pointer[Snapshot]{}
|
|
p.Store(snap)
|
|
return p
|
|
}
|
|
|
|
func TestProcessLine_StripsRFC3339Prefix(t *testing.T) {
|
|
emit := &recordingEmitter{}
|
|
snap := snapshotForRule(t, `panic`)
|
|
tl := &tail{
|
|
containerID: "c1",
|
|
workloadID: "w1",
|
|
engine: NewEngine(),
|
|
emitter: emit,
|
|
snapshot: snap,
|
|
}
|
|
tl.processLine(context.Background(), "stderr", "2026-05-11T12:34:56.123456789Z fatal panic in worker")
|
|
hits := emit.Hits()
|
|
if len(hits) != 1 {
|
|
t.Fatalf("want 1 hit, got %d", len(hits))
|
|
}
|
|
if hits[0].Line != "fatal panic in worker" {
|
|
t.Errorf("timestamp not stripped: %q", hits[0].Line)
|
|
}
|
|
}
|
|
|
|
func TestProcessLine_LeavesNonTimestampedLineAlone(t *testing.T) {
|
|
// The previous heuristic stripped the first word of any line
|
|
// whose first space landed past byte 20. A long-hash line with
|
|
// no embedded timestamp must now survive intact.
|
|
emit := &recordingEmitter{}
|
|
snap := snapshotForRule(t, `(?i)hash`)
|
|
tl := &tail{
|
|
containerID: "c1",
|
|
workloadID: "w1",
|
|
engine: NewEngine(),
|
|
emitter: emit,
|
|
snapshot: snap,
|
|
}
|
|
long := "aaaaaaaaaaaaaaaaaaaaaaaa hash payload"
|
|
tl.processLine(context.Background(), "stdout", long)
|
|
hits := emit.Hits()
|
|
if len(hits) != 1 {
|
|
t.Fatalf("want 1 hit, got %d", len(hits))
|
|
}
|
|
if hits[0].Line != long {
|
|
t.Errorf("non-timestamp prefix incorrectly stripped: %q", hits[0].Line)
|
|
}
|
|
}
|
|
|
|
func TestProcessLine_NoSnapshotIsSafe(t *testing.T) {
|
|
// Tail constructed before the manager loads its first snapshot
|
|
// (or after a transient nil) must not crash — processLine
|
|
// returns silently when snapshot.Load() is nil.
|
|
tl := &tail{
|
|
containerID: "c1",
|
|
workloadID: "w1",
|
|
engine: NewEngine(),
|
|
emitter: &recordingEmitter{},
|
|
snapshot: &atomic.Pointer[Snapshot]{}, // empty pointer
|
|
}
|
|
tl.processLine(context.Background(), "stdout", "anything")
|
|
}
|
|
|
|
func TestReadLineFromBuffer(t *testing.T) {
|
|
buf := &bytes.Buffer{}
|
|
buf.WriteString("line one\nline two\npartial")
|
|
|
|
got, ok := readLineFromBuffer(buf)
|
|
if !ok || got != "line one" {
|
|
t.Fatalf("first read: ok=%v got=%q", ok, got)
|
|
}
|
|
got, ok = readLineFromBuffer(buf)
|
|
if !ok || got != "line two" {
|
|
t.Fatalf("second read: ok=%v got=%q", ok, got)
|
|
}
|
|
// Trailing partial line stays in buffer until more data arrives.
|
|
if _, ok := readLineFromBuffer(buf); ok {
|
|
t.Errorf("partial line should NOT yield until newline arrives")
|
|
}
|
|
if buf.String() != "partial" {
|
|
t.Errorf("remainder=%q want %q", buf.String(), "partial")
|
|
}
|
|
}
|
|
|
|
func TestIsMultiplexedStream(t *testing.T) {
|
|
// Valid docker frame header: type=2 (stderr), 3 nulls, then 4-byte length.
|
|
demuxed := []byte{2, 0, 0, 0, 0, 0, 0, 12, 'h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd', '!'}
|
|
if !isMultiplexedStream(newReader(demuxed)) {
|
|
t.Error("valid demuxed frame should be detected")
|
|
}
|
|
|
|
// Plain text: first byte is a printable letter, header check fails.
|
|
plain := []byte("plain log line without framing\n")
|
|
if isMultiplexedStream(newReader(plain)) {
|
|
t.Error("plain text should not be detected as multiplexed")
|
|
}
|
|
|
|
// Header with type=3 is invalid (docker uses 0,1,2 only).
|
|
bad := []byte{3, 0, 0, 0, 0, 0, 0, 1}
|
|
if isMultiplexedStream(newReader(bad)) {
|
|
t.Error("type=3 header is not a valid docker frame")
|
|
}
|
|
}
|
|
|
|
// newReader returns a *bufio.Reader sized large enough to satisfy
|
|
// the Peek(8) the demuxer detection requires.
|
|
func newReader(b []byte) *bufio.Reader {
|
|
return bufio.NewReaderSize(bytes.NewReader(b), 32)
|
|
}
|