Files
alexei.dolgolyov 7a9ff7ad54 feat(observability): event triggers + log scanner backend
Two paired backends sharing the events.Bus seam:

Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
  redaction on read (placeholder echo treated as "no change" on
  PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
  filters (severity CSV, source CSV, message regex with memoized
  compile cache). Structural loop-prevention: never writes to
  event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
  TierEventTrigger constant, doSendRaw shared with the legacy
  Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
  sending the real TriggerWebhookPayload shape. SSRF guard
  rejects loopback / link-local / unspecified targets. PATCH
  uses pointer-typed DTO for partial updates.

Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
  per-container token bucket, atomic drop counters), tail
  (multiplexed docker frame demuxer with TTY fallback + 16 MiB
  payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
  timestamp strip + UTF-8-safe message truncation), manager
  (5s container polling, atomic.Pointer[Snapshot] hot-reload,
  HitEmitter writes event_log + publishes EventLog so the
  trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
  stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
  EffectiveLogScanRules resolver (globals minus per-workload
  overrides plus workload-only additions). Transactional
  cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
  (sample_line → matched/captures) + /stats (drop counters +
  active tail count + last-snapshot compile errors) +
  GET /api/workloads/{id}/effective-rules.

cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 22:18:11 +03:00

253 lines
6.6 KiB
Go

package events
import (
"errors"
"regexp"
"sync"
"testing"
"time"
"github.com/alexei/tinyforge/internal/store"
)
// fakeTriggerSource lets tests inject a static set of enabled triggers
// without standing up SQLite. ListEnabledEventTriggers is the only
// method the dispatcher uses.
type fakeTriggerSource struct {
rows []store.EventTrigger
err error
}
func (f *fakeTriggerSource) ListEnabledEventTriggers() ([]store.EventTrigger, error) {
if f.err != nil {
return nil, f.err
}
return f.rows, nil
}
// fakeNotifier captures dispatches in memory so tests can assert
// (URL, secret, eventType, payload) tuples.
type fakeNotifier struct {
mu sync.Mutex
calls []fakeNotifierCall
}
type fakeNotifierCall struct {
URL string
Secret string
EventType string
Payload any
}
func (f *fakeNotifier) SendPayload(url, secret, eventType string, payload any) {
f.mu.Lock()
defer f.mu.Unlock()
f.calls = append(f.calls, fakeNotifierCall{URL: url, Secret: secret, EventType: eventType, Payload: payload})
}
func (f *fakeNotifier) Calls() []fakeNotifierCall {
f.mu.Lock()
defer f.mu.Unlock()
out := make([]fakeNotifierCall, len(f.calls))
copy(out, f.calls)
return out
}
func newDispatcher(rows []store.EventTrigger) (*dispatcher, *fakeNotifier) {
n := &fakeNotifier{}
return &dispatcher{
triggers: &fakeTriggerSource{rows: rows},
notifier: n,
regexCache: map[string]*regexp.Regexp{},
}, n
}
func TestMatches_EmptyFiltersAllowAnything(t *testing.T) {
d, _ := newDispatcher(nil)
tr := store.EventTrigger{Name: "anything"}
got, err := d.matches(tr, EventLogPayload{Severity: "info", Source: "deploy", Message: "hello"})
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if !got {
t.Fatal("empty filters should pass")
}
}
func TestMatches_SeverityCSV(t *testing.T) {
d, _ := newDispatcher(nil)
tr := store.EventTrigger{FilterSeverity: "warn, error"}
cases := []struct {
sev string
want bool
}{
{"error", true},
{"warn", true},
{"info", false},
}
for _, c := range cases {
got, err := d.matches(tr, EventLogPayload{Severity: c.sev})
if err != nil {
t.Fatalf("err: %v", err)
}
if got != c.want {
t.Errorf("severity=%q want=%v got=%v", c.sev, c.want, got)
}
}
}
func TestMatches_SourceCSV(t *testing.T) {
d, _ := newDispatcher(nil)
tr := store.EventTrigger{FilterSource: "logscan,deploy"}
cases := []struct {
src string
want bool
}{
{"logscan", true},
{"deploy", true},
{"reconciler", false},
{"", false},
}
for _, c := range cases {
got, _ := d.matches(tr, EventLogPayload{Source: c.src})
if got != c.want {
t.Errorf("source=%q want=%v got=%v", c.src, c.want, got)
}
}
}
func TestMatches_MessageRegex(t *testing.T) {
d, _ := newDispatcher(nil)
tr := store.EventTrigger{FilterMessageRegex: `(?i)\bpanic\b`}
if got, _ := d.matches(tr, EventLogPayload{Message: "fatal: Panic in worker"}); !got {
t.Error("expected case-insensitive panic to match")
}
if got, _ := d.matches(tr, EventLogPayload{Message: "all good"}); got {
t.Error("expected non-matching message to fail")
}
}
func TestMatches_InvalidRegexReturnsError(t *testing.T) {
d, _ := newDispatcher(nil)
tr := store.EventTrigger{FilterMessageRegex: "([unclosed"}
_, err := d.matches(tr, EventLogPayload{Message: "x"})
if err == nil {
t.Fatal("expected compile error on invalid regex")
}
}
func TestMatches_AllFiltersAND(t *testing.T) {
d, _ := newDispatcher(nil)
tr := store.EventTrigger{
FilterSeverity: "error",
FilterSource: "logscan",
FilterMessageRegex: "timeout",
}
full := EventLogPayload{Severity: "error", Source: "logscan", Message: "request timeout"}
if got, _ := d.matches(tr, full); !got {
t.Error("all-match payload should pass")
}
missMessage := full
missMessage.Message = "all good"
if got, _ := d.matches(tr, missMessage); got {
t.Error("message mismatch should fail despite severity+source match")
}
missSource := full
missSource.Source = "deploy"
if got, _ := d.matches(tr, missSource); got {
t.Error("source mismatch should fail")
}
}
func TestHandle_DispatchesMatchingTrigger(t *testing.T) {
rows := []store.EventTrigger{
{ID: 1, Name: "T1", FilterSeverity: "error", ActionType: store.EventTriggerActionWebhook,
ActionTarget: "https://example.com/hook", ActionSecret: "shh"},
{ID: 2, Name: "T2", FilterSeverity: "warn", ActionType: store.EventTriggerActionWebhook,
ActionTarget: "https://example.com/other"},
}
d, notifier := newDispatcher(rows)
d.handle(EventLogPayload{Severity: "error", Source: "logscan", Message: "panic", CreatedAt: time.Now().Format(time.RFC3339)})
calls := notifier.Calls()
if len(calls) != 1 {
t.Fatalf("expected 1 call, got %d", len(calls))
}
if calls[0].URL != "https://example.com/hook" {
t.Errorf("URL=%q want https://example.com/hook", calls[0].URL)
}
if calls[0].Secret != "shh" {
t.Errorf("Secret=%q want shh", calls[0].Secret)
}
p, ok := calls[0].Payload.(TriggerWebhookPayload)
if !ok {
t.Fatalf("payload type=%T want TriggerWebhookPayload", calls[0].Payload)
}
if p.TriggerID != 1 || p.Trigger != "T1" {
t.Errorf("payload trigger metadata wrong: %+v", p)
}
if p.Event.Severity != "error" {
t.Errorf("payload event mismatch: %+v", p.Event)
}
}
func TestHandle_TriggerSourceErrorLogged(t *testing.T) {
n := &fakeNotifier{}
d := &dispatcher{
triggers: &fakeTriggerSource{err: errors.New("db down")},
notifier: n,
regexCache: map[string]*regexp.Regexp{},
}
d.handle(EventLogPayload{Severity: "error"})
if len(n.Calls()) != 0 {
t.Errorf("dispatcher should not call notifier when trigger source errored")
}
}
func TestHandle_UnsupportedActionTypeSkipped(t *testing.T) {
rows := []store.EventTrigger{
{ID: 1, Name: "T1", ActionType: "future-channel", ActionTarget: "x"},
}
d, n := newDispatcher(rows)
d.handle(EventLogPayload{Severity: "info"})
if len(n.Calls()) != 0 {
t.Errorf("unsupported action_type should not dispatch")
}
}
func TestFilterMatchCSV(t *testing.T) {
cases := []struct {
filter, cand string
want bool
}{
{"", "anything", true},
{" ", "anything", true},
{"a", "a", true},
{"a", "b", false},
{"a,b,c", "b", true},
{" a , b , c ", "b", true},
{"a,b", "c", false},
}
for _, c := range cases {
got := filterMatchCSV(c.filter, c.cand)
if got != c.want {
t.Errorf("filterMatchCSV(%q, %q) = %v want %v", c.filter, c.cand, got, c.want)
}
}
}
func TestRegexCache_ReusesCompiledPattern(t *testing.T) {
d, _ := newDispatcher(nil)
re1, err := d.compile(`\bfoo\b`)
if err != nil {
t.Fatalf("compile: %v", err)
}
re2, err := d.compile(`\bfoo\b`)
if err != nil {
t.Fatalf("recompile: %v", err)
}
if re1 != re2 {
t.Error("expected cached compile to return the same regexp pointer")
}
}