7a9ff7ad54
Two paired backends sharing the events.Bus seam:
Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
redaction on read (placeholder echo treated as "no change" on
PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
filters (severity CSV, source CSV, message regex with memoized
compile cache). Structural loop-prevention: never writes to
event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
TierEventTrigger constant, doSendRaw shared with the legacy
Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
sending the real TriggerWebhookPayload shape. SSRF guard
rejects loopback / link-local / unspecified targets. PATCH
uses pointer-typed DTO for partial updates.
Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
per-container token bucket, atomic drop counters), tail
(multiplexed docker frame demuxer with TTY fallback + 16 MiB
payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
timestamp strip + UTF-8-safe message truncation), manager
(5s container polling, atomic.Pointer[Snapshot] hot-reload,
HitEmitter writes event_log + publishes EventLog so the
trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
EffectiveLogScanRules resolver (globals minus per-workload
overrides plus workload-only additions). Transactional
cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
(sample_line → matched/captures) + /stats (drop counters +
active tail count + last-snapshot compile errors) +
GET /api/workloads/{id}/effective-rules.
cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
224 lines
6.5 KiB
Go
224 lines
6.5 KiB
Go
package logscanner
|
|
|
|
import (
|
|
"regexp"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func mustRegexp(t *testing.T, p string) *regexp.Regexp {
|
|
t.Helper()
|
|
re, err := regexp.Compile(p)
|
|
if err != nil {
|
|
t.Fatalf("compile %q: %v", p, err)
|
|
}
|
|
return re
|
|
}
|
|
|
|
func newRule(t *testing.T, id int64, pattern string, opts ...func(*Rule)) Rule {
|
|
r := Rule{
|
|
ID: id,
|
|
Name: "rule" + pattern,
|
|
Pattern: mustRegexp(t, pattern),
|
|
Severity: "warn",
|
|
Streams: "all",
|
|
CooldownSeconds: 0,
|
|
}
|
|
for _, o := range opts {
|
|
o(&r)
|
|
}
|
|
return r
|
|
}
|
|
|
|
func TestEngineMatch_BasicHit(t *testing.T) {
|
|
e := NewEngine()
|
|
rules := []Rule{newRule(t, 1, `panic`)}
|
|
hits := e.Match("c1", "w1", "stderr", "fatal panic in worker", rules)
|
|
if len(hits) != 1 {
|
|
t.Fatalf("want 1 hit, got %d", len(hits))
|
|
}
|
|
if hits[0].Rule.ID != 1 {
|
|
t.Errorf("rule id mismatch: %d", hits[0].Rule.ID)
|
|
}
|
|
}
|
|
|
|
func TestEngineMatch_StreamFilter(t *testing.T) {
|
|
e := NewEngine()
|
|
rules := []Rule{newRule(t, 1, `boom`, func(r *Rule) { r.Streams = "stderr" })}
|
|
if len(e.Match("c", "w", "stdout", "boom there", rules)) != 0 {
|
|
t.Error("stdout line should not match stderr-only rule")
|
|
}
|
|
if len(e.Match("c", "w", "stderr", "boom there", rules)) != 1 {
|
|
t.Error("stderr line should match stderr-only rule")
|
|
}
|
|
}
|
|
|
|
func TestEngineMatch_NoMatchNoHit(t *testing.T) {
|
|
e := NewEngine()
|
|
rules := []Rule{newRule(t, 1, `panic`)}
|
|
if h := e.Match("c", "w", "stdout", "all good", rules); len(h) != 0 {
|
|
t.Errorf("expected no hit, got %d", len(h))
|
|
}
|
|
}
|
|
|
|
func TestEngineMatch_Cooldown(t *testing.T) {
|
|
now := time.Now()
|
|
clock := now
|
|
e := NewEngine(WithNow(func() time.Time { return clock }))
|
|
rules := []Rule{newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 30 })}
|
|
|
|
if len(e.Match("c", "w", "stdout", "bad event", rules)) != 1 {
|
|
t.Fatal("first fire expected")
|
|
}
|
|
clock = now.Add(10 * time.Second)
|
|
if h := e.Match("c", "w", "stdout", "bad event 2", rules); len(h) != 0 {
|
|
t.Errorf("cooled-down rule fired: %+v", h)
|
|
}
|
|
clock = now.Add(31 * time.Second)
|
|
if len(e.Match("c", "w", "stdout", "bad event 3", rules)) != 1 {
|
|
t.Error("cooldown expired but rule did not fire")
|
|
}
|
|
}
|
|
|
|
func TestEngineMatch_PerContainerCooldownIndependent(t *testing.T) {
|
|
// Same rule firing on two different containers should not
|
|
// share cooldown — operators expect each container's alerts
|
|
// to be independent.
|
|
now := time.Now()
|
|
clock := now
|
|
e := NewEngine(WithNow(func() time.Time { return clock }))
|
|
rules := []Rule{newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 30 })}
|
|
|
|
e.Match("c1", "w", "stdout", "bad on one", rules)
|
|
if len(e.Match("c2", "w", "stdout", "bad on two", rules)) != 1 {
|
|
t.Error("second container should fire independently")
|
|
}
|
|
}
|
|
|
|
func TestEngineMatch_TokenBucket(t *testing.T) {
|
|
now := time.Now()
|
|
clock := now
|
|
e := NewEngine(WithNow(func() time.Time { return clock }), WithBucket(3, time.Minute))
|
|
rules := []Rule{newRule(t, 1, `noisy`)}
|
|
for i := 0; i < 3; i++ {
|
|
if len(e.Match("c", "w", "stdout", "noisy "+string(rune('a'+i)), rules)) != 1 {
|
|
t.Errorf("hit %d should fire", i)
|
|
}
|
|
}
|
|
// 4th within window should be dropped.
|
|
if h := e.Match("c", "w", "stdout", "noisy d", rules); len(h) != 0 {
|
|
t.Errorf("4th hit should be rate-limited, got %d", len(h))
|
|
}
|
|
// After the window the bucket refills.
|
|
clock = now.Add(time.Minute + time.Second)
|
|
if len(e.Match("c", "w", "stdout", "noisy e", rules)) != 1 {
|
|
t.Error("bucket should have refilled after window")
|
|
}
|
|
}
|
|
|
|
func TestEngineForget_DropsState(t *testing.T) {
|
|
e := NewEngine()
|
|
rules := []Rule{newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 999 })}
|
|
e.Match("c", "w", "stdout", "bad once", rules)
|
|
e.Forget("c")
|
|
// After Forget, the same rule on the same container should
|
|
// fire again immediately — cooldown state was cleared.
|
|
if len(e.Match("c", "w", "stdout", "bad again", rules)) != 1 {
|
|
t.Error("Forget should drop cooldown state")
|
|
}
|
|
}
|
|
|
|
func TestEngineStats_CooldownCounter(t *testing.T) {
|
|
// Two matches inside the cooldown window should drop once and
|
|
// increment DroppedByCooldown by one. Bucket is generous so it
|
|
// never participates.
|
|
now := time.Now()
|
|
clock := now
|
|
e := NewEngine(
|
|
WithNow(func() time.Time { return clock }),
|
|
WithBucket(100, time.Hour),
|
|
)
|
|
rule := newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 60 })
|
|
|
|
if len(e.Match("c", "w", "stdout", "bad a", []Rule{rule})) != 1 {
|
|
t.Fatal("first fire expected")
|
|
}
|
|
if len(e.Match("c", "w", "stdout", "bad b", []Rule{rule})) != 0 {
|
|
t.Fatal("second fire inside cooldown should drop")
|
|
}
|
|
stats := e.Stats()
|
|
if stats.DroppedByCooldown != 1 {
|
|
t.Errorf("DroppedByCooldown = %d, want 1", stats.DroppedByCooldown)
|
|
}
|
|
if stats.DroppedByBucket != 0 {
|
|
t.Errorf("DroppedByBucket = %d, want 0 (bucket should not have fired)",
|
|
stats.DroppedByBucket)
|
|
}
|
|
}
|
|
|
|
func TestEngineStats_BucketCounter(t *testing.T) {
|
|
// Drain a small bucket without any cooldown so every drop
|
|
// after the first is bucket-attributable.
|
|
now := time.Now()
|
|
clock := now
|
|
e := NewEngine(
|
|
WithNow(func() time.Time { return clock }),
|
|
WithBucket(2, time.Hour),
|
|
)
|
|
rule := newRule(t, 1, `bad`) // cooldown=0
|
|
|
|
// 2 fires consume both tokens.
|
|
for i := 0; i < 2; i++ {
|
|
if len(e.Match("c", "w", "stdout", "bad", []Rule{rule})) != 1 {
|
|
t.Fatalf("fire %d expected to succeed", i)
|
|
}
|
|
}
|
|
// 3rd and 4th fires hit an empty bucket.
|
|
for i := 0; i < 2; i++ {
|
|
if len(e.Match("c", "w", "stdout", "bad", []Rule{rule})) != 0 {
|
|
t.Fatalf("fire %d expected to be bucket-dropped", i+2)
|
|
}
|
|
}
|
|
stats := e.Stats()
|
|
if stats.DroppedByBucket != 2 {
|
|
t.Errorf("DroppedByBucket = %d, want 2", stats.DroppedByBucket)
|
|
}
|
|
if stats.DroppedByCooldown != 0 {
|
|
t.Errorf("DroppedByCooldown = %d, want 0 (cooldown should not have fired)",
|
|
stats.DroppedByCooldown)
|
|
}
|
|
}
|
|
|
|
func TestEngineStats_NoDropsWhenAllowed(t *testing.T) {
|
|
e := NewEngine(WithBucket(100, time.Minute))
|
|
rule := newRule(t, 1, `x`)
|
|
for i := 0; i < 5; i++ {
|
|
e.Match("c", "w", "stdout", "x", []Rule{rule})
|
|
}
|
|
stats := e.Stats()
|
|
if stats.DroppedByBucket != 0 || stats.DroppedByCooldown != 0 {
|
|
t.Errorf("expected zero drops, got %+v", stats)
|
|
}
|
|
}
|
|
|
|
func TestStreamMatches(t *testing.T) {
|
|
cases := []struct {
|
|
rule, line string
|
|
want bool
|
|
}{
|
|
{"all", "stdout", true},
|
|
{"all", "stderr", true},
|
|
{"", "stdout", true},
|
|
{"stdout", "stdout", true},
|
|
{"stdout", "stderr", false},
|
|
{"stderr", "stderr", true},
|
|
{"stderr", "stdout", false},
|
|
}
|
|
for _, c := range cases {
|
|
got := streamMatches(c.rule, c.line)
|
|
if got != c.want {
|
|
t.Errorf("streamMatches(%q,%q) = %v want %v", c.rule, c.line, got, c.want)
|
|
}
|
|
}
|
|
}
|