Files
tiny-forge/internal/logscanner/engine_test.go
alexei.dolgolyov 7a9ff7ad54 feat(observability): event triggers + log scanner backend
Two paired backends sharing the events.Bus seam:

Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
  redaction on read (placeholder echo treated as "no change" on
  PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
  filters (severity CSV, source CSV, message regex with memoized
  compile cache). Structural loop-prevention: never writes to
  event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
  TierEventTrigger constant, doSendRaw shared with the legacy
  Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
  sending the real TriggerWebhookPayload shape. SSRF guard
  rejects loopback / link-local / unspecified targets. PATCH
  uses pointer-typed DTO for partial updates.

Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
  per-container token bucket, atomic drop counters), tail
  (multiplexed docker frame demuxer with TTY fallback + 16 MiB
  payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
  timestamp strip + UTF-8-safe message truncation), manager
  (5s container polling, atomic.Pointer[Snapshot] hot-reload,
  HitEmitter writes event_log + publishes EventLog so the
  trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
  stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
  EffectiveLogScanRules resolver (globals minus per-workload
  overrides plus workload-only additions). Transactional
  cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
  (sample_line → matched/captures) + /stats (drop counters +
  active tail count + last-snapshot compile errors) +
  GET /api/workloads/{id}/effective-rules.

cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 22:18:11 +03:00

224 lines
6.5 KiB
Go

package logscanner
import (
"regexp"
"testing"
"time"
)
func mustRegexp(t *testing.T, p string) *regexp.Regexp {
t.Helper()
re, err := regexp.Compile(p)
if err != nil {
t.Fatalf("compile %q: %v", p, err)
}
return re
}
func newRule(t *testing.T, id int64, pattern string, opts ...func(*Rule)) Rule {
r := Rule{
ID: id,
Name: "rule" + pattern,
Pattern: mustRegexp(t, pattern),
Severity: "warn",
Streams: "all",
CooldownSeconds: 0,
}
for _, o := range opts {
o(&r)
}
return r
}
func TestEngineMatch_BasicHit(t *testing.T) {
e := NewEngine()
rules := []Rule{newRule(t, 1, `panic`)}
hits := e.Match("c1", "w1", "stderr", "fatal panic in worker", rules)
if len(hits) != 1 {
t.Fatalf("want 1 hit, got %d", len(hits))
}
if hits[0].Rule.ID != 1 {
t.Errorf("rule id mismatch: %d", hits[0].Rule.ID)
}
}
func TestEngineMatch_StreamFilter(t *testing.T) {
e := NewEngine()
rules := []Rule{newRule(t, 1, `boom`, func(r *Rule) { r.Streams = "stderr" })}
if len(e.Match("c", "w", "stdout", "boom there", rules)) != 0 {
t.Error("stdout line should not match stderr-only rule")
}
if len(e.Match("c", "w", "stderr", "boom there", rules)) != 1 {
t.Error("stderr line should match stderr-only rule")
}
}
func TestEngineMatch_NoMatchNoHit(t *testing.T) {
e := NewEngine()
rules := []Rule{newRule(t, 1, `panic`)}
if h := e.Match("c", "w", "stdout", "all good", rules); len(h) != 0 {
t.Errorf("expected no hit, got %d", len(h))
}
}
func TestEngineMatch_Cooldown(t *testing.T) {
now := time.Now()
clock := now
e := NewEngine(WithNow(func() time.Time { return clock }))
rules := []Rule{newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 30 })}
if len(e.Match("c", "w", "stdout", "bad event", rules)) != 1 {
t.Fatal("first fire expected")
}
clock = now.Add(10 * time.Second)
if h := e.Match("c", "w", "stdout", "bad event 2", rules); len(h) != 0 {
t.Errorf("cooled-down rule fired: %+v", h)
}
clock = now.Add(31 * time.Second)
if len(e.Match("c", "w", "stdout", "bad event 3", rules)) != 1 {
t.Error("cooldown expired but rule did not fire")
}
}
func TestEngineMatch_PerContainerCooldownIndependent(t *testing.T) {
// Same rule firing on two different containers should not
// share cooldown — operators expect each container's alerts
// to be independent.
now := time.Now()
clock := now
e := NewEngine(WithNow(func() time.Time { return clock }))
rules := []Rule{newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 30 })}
e.Match("c1", "w", "stdout", "bad on one", rules)
if len(e.Match("c2", "w", "stdout", "bad on two", rules)) != 1 {
t.Error("second container should fire independently")
}
}
func TestEngineMatch_TokenBucket(t *testing.T) {
now := time.Now()
clock := now
e := NewEngine(WithNow(func() time.Time { return clock }), WithBucket(3, time.Minute))
rules := []Rule{newRule(t, 1, `noisy`)}
for i := 0; i < 3; i++ {
if len(e.Match("c", "w", "stdout", "noisy "+string(rune('a'+i)), rules)) != 1 {
t.Errorf("hit %d should fire", i)
}
}
// 4th within window should be dropped.
if h := e.Match("c", "w", "stdout", "noisy d", rules); len(h) != 0 {
t.Errorf("4th hit should be rate-limited, got %d", len(h))
}
// After the window the bucket refills.
clock = now.Add(time.Minute + time.Second)
if len(e.Match("c", "w", "stdout", "noisy e", rules)) != 1 {
t.Error("bucket should have refilled after window")
}
}
func TestEngineForget_DropsState(t *testing.T) {
e := NewEngine()
rules := []Rule{newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 999 })}
e.Match("c", "w", "stdout", "bad once", rules)
e.Forget("c")
// After Forget, the same rule on the same container should
// fire again immediately — cooldown state was cleared.
if len(e.Match("c", "w", "stdout", "bad again", rules)) != 1 {
t.Error("Forget should drop cooldown state")
}
}
func TestEngineStats_CooldownCounter(t *testing.T) {
// Two matches inside the cooldown window should drop once and
// increment DroppedByCooldown by one. Bucket is generous so it
// never participates.
now := time.Now()
clock := now
e := NewEngine(
WithNow(func() time.Time { return clock }),
WithBucket(100, time.Hour),
)
rule := newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 60 })
if len(e.Match("c", "w", "stdout", "bad a", []Rule{rule})) != 1 {
t.Fatal("first fire expected")
}
if len(e.Match("c", "w", "stdout", "bad b", []Rule{rule})) != 0 {
t.Fatal("second fire inside cooldown should drop")
}
stats := e.Stats()
if stats.DroppedByCooldown != 1 {
t.Errorf("DroppedByCooldown = %d, want 1", stats.DroppedByCooldown)
}
if stats.DroppedByBucket != 0 {
t.Errorf("DroppedByBucket = %d, want 0 (bucket should not have fired)",
stats.DroppedByBucket)
}
}
func TestEngineStats_BucketCounter(t *testing.T) {
// Drain a small bucket without any cooldown so every drop
// after the first is bucket-attributable.
now := time.Now()
clock := now
e := NewEngine(
WithNow(func() time.Time { return clock }),
WithBucket(2, time.Hour),
)
rule := newRule(t, 1, `bad`) // cooldown=0
// 2 fires consume both tokens.
for i := 0; i < 2; i++ {
if len(e.Match("c", "w", "stdout", "bad", []Rule{rule})) != 1 {
t.Fatalf("fire %d expected to succeed", i)
}
}
// 3rd and 4th fires hit an empty bucket.
for i := 0; i < 2; i++ {
if len(e.Match("c", "w", "stdout", "bad", []Rule{rule})) != 0 {
t.Fatalf("fire %d expected to be bucket-dropped", i+2)
}
}
stats := e.Stats()
if stats.DroppedByBucket != 2 {
t.Errorf("DroppedByBucket = %d, want 2", stats.DroppedByBucket)
}
if stats.DroppedByCooldown != 0 {
t.Errorf("DroppedByCooldown = %d, want 0 (cooldown should not have fired)",
stats.DroppedByCooldown)
}
}
func TestEngineStats_NoDropsWhenAllowed(t *testing.T) {
e := NewEngine(WithBucket(100, time.Minute))
rule := newRule(t, 1, `x`)
for i := 0; i < 5; i++ {
e.Match("c", "w", "stdout", "x", []Rule{rule})
}
stats := e.Stats()
if stats.DroppedByBucket != 0 || stats.DroppedByCooldown != 0 {
t.Errorf("expected zero drops, got %+v", stats)
}
}
func TestStreamMatches(t *testing.T) {
cases := []struct {
rule, line string
want bool
}{
{"all", "stdout", true},
{"all", "stderr", true},
{"", "stdout", true},
{"stdout", "stdout", true},
{"stdout", "stderr", false},
{"stderr", "stderr", true},
{"stderr", "stdout", false},
}
for _, c := range cases {
got := streamMatches(c.rule, c.line)
if got != c.want {
t.Errorf("streamMatches(%q,%q) = %v want %v", c.rule, c.line, got, c.want)
}
}
}