Files
alexei.dolgolyov 7a9ff7ad54 feat(observability): event triggers + log scanner backend
Two paired backends sharing the events.Bus seam:

Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
  redaction on read (placeholder echo treated as "no change" on
  PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
  filters (severity CSV, source CSV, message regex with memoized
  compile cache). Structural loop-prevention: never writes to
  event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
  TierEventTrigger constant, doSendRaw shared with the legacy
  Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
  sending the real TriggerWebhookPayload shape. SSRF guard
  rejects loopback / link-local / unspecified targets. PATCH
  uses pointer-typed DTO for partial updates.

Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
  per-container token bucket, atomic drop counters), tail
  (multiplexed docker frame demuxer with TTY fallback + 16 MiB
  payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
  timestamp strip + UTF-8-safe message truncation), manager
  (5s container polling, atomic.Pointer[Snapshot] hot-reload,
  HitEmitter writes event_log + publishes EventLog so the
  trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
  stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
  EffectiveLogScanRules resolver (globals minus per-workload
  overrides plus workload-only additions). Transactional
  cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
  (sample_line → matched/captures) + /stats (drop counters +
  active tail count + last-snapshot compile errors) +
  GET /api/workloads/{id}/effective-rules.

cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 22:18:11 +03:00

246 lines
7.8 KiB
Go

package logscanner
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"log/slog"
"strings"
"sync/atomic"
"time"
"github.com/alexei/tinyforge/internal/docker"
)
// maxFramePayloadBytes caps a single docker log frame payload so a
// hostile or corrupted stream can't force an unbounded allocation.
// Real-world container log frames are well under this; the limit is
// purely a safety belt against the 4 GiB-by-spec upper bound the
// header byte width allows.
const maxFramePayloadBytes = 16 * 1024 * 1024 // 16 MiB
// maxLineBufferBytes caps the per-stream line-reassembly buffer for
// the same reason. A stream that never sends a newline would
// otherwise grow without bound. 1 MiB matches the bufio.Scanner
// default max and is far above any reasonable log line.
const maxLineBufferBytes = 1 * 1024 * 1024 // 1 MiB
// dockerLogger is the minimum surface the tail needs from the
// docker client. Defined as an interface so tests can stand up a
// canned log stream without spinning up containerd.
type dockerLogger interface {
ContainerLogsOpts(ctx context.Context, containerID string, opts docker.ContainerLogOptions) (io.ReadCloser, error)
}
// HitEmitter is what the tail calls when a rule fires. Implemented
// by the manager — it writes to event_log and publishes on the bus.
// Kept as a single-method interface so the tail has zero coupling to
// store/events.
type HitEmitter interface {
EmitHit(ctx context.Context, hit Hit)
}
// tail watches one container. Lifetime is bound to the supplied
// context — cancellation propagates through docker's log stream so
// goroutines exit promptly on container stop or manager shutdown.
type tail struct {
containerID string
workloadID string
docker dockerLogger
engine *Engine
emitter HitEmitter
snapshot *atomic.Pointer[Snapshot]
}
// run is the goroutine body. Opens a follow=true log stream and
// reads lines until the stream EOFs or context is cancelled.
// Tails terminate quietly on context cancel; any other error is
// logged at warn so the operator sees it without it stopping the
// whole manager.
func (t *tail) run(ctx context.Context) {
stream, err := t.docker.ContainerLogsOpts(ctx, t.containerID, docker.ContainerLogOptions{
Follow: true,
Tail: "0", // start from the newest line; backfill is out of scope
ShowStdout: true,
ShowStderr: true,
})
if err != nil {
if !errors.Is(err, context.Canceled) {
slog.Warn("logscanner: open log stream", "container", t.containerID, "error", err)
}
return
}
defer stream.Close()
// Demuxing: docker emits multiplexed frames when TTY is off,
// where each frame's first byte is the stream type (1=stdout,
// 2=stderr) and bytes 4..7 are big-endian length. When TTY is
// on the stream is plain bytes. We try to detect the frame
// header on the first read; if it parses as a valid frame we
// use the demux path, otherwise we fall back to line-by-line.
bufStream := bufio.NewReaderSize(stream, 32*1024)
if isMultiplexedStream(bufStream) {
t.readDemuxed(ctx, bufStream)
} else {
t.readPlain(ctx, bufStream)
}
}
// readDemuxed reads docker's multiplexed log frames. Each frame:
//
// [type 1 byte][000 3 bytes][len 4 bytes BE][payload len bytes]
//
// We track the stream type per-frame and feed payload bytes into a
// per-stream line buffer so a line split across frames still
// reassembles cleanly.
func (t *tail) readDemuxed(ctx context.Context, r *bufio.Reader) {
header := make([]byte, 8)
// Two line buffers — stdout vs stderr — so a partial line read
// across multiple frames doesn't bleed into the other stream.
buffers := map[string]*bytes.Buffer{
"stdout": {},
"stderr": {},
}
for {
if ctx.Err() != nil {
return
}
if _, err := io.ReadFull(r, header); err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
slog.Warn("logscanner: read header", "container", t.containerID, "error", err)
}
return
}
stream := "stdout"
if header[0] == 2 {
stream = "stderr"
}
size := int(uint32(header[4])<<24 | uint32(header[5])<<16 | uint32(header[6])<<8 | uint32(header[7]))
if size <= 0 {
continue
}
if size > maxFramePayloadBytes {
slog.Warn("logscanner: frame payload exceeds cap, dropping tail",
"container", t.containerID, "size", size, "cap", maxFramePayloadBytes)
return
}
payload := make([]byte, size)
if _, err := io.ReadFull(r, payload); err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
slog.Warn("logscanner: read payload", "container", t.containerID, "error", err)
}
return
}
buf := buffers[stream]
buf.Write(payload)
// Reassembly buffer should never accumulate beyond
// maxLineBufferBytes — a stream with no newline would
// otherwise grow unbounded. Drop the buffer (and the partial
// line) when the cap is reached so the tail stays healthy.
if buf.Len() > maxLineBufferBytes {
slog.Warn("logscanner: line buffer exceeded cap, resetting",
"container", t.containerID, "stream", stream, "size", buf.Len())
buf.Reset()
continue
}
for {
line, ok := readLineFromBuffer(buf)
if !ok {
break
}
t.processLine(ctx, stream, line)
}
}
}
// readPlain reads a non-multiplexed stream (TTY mode). All lines
// are reported as "stdout" since the API doesn't separate the
// streams in this mode.
func (t *tail) readPlain(ctx context.Context, r *bufio.Reader) {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for scanner.Scan() {
if ctx.Err() != nil {
return
}
t.processLine(ctx, "stdout", scanner.Text())
}
if err := scanner.Err(); err != nil && !errors.Is(err, context.Canceled) {
slog.Warn("logscanner: plain read", "container", t.containerID, "error", err)
}
}
func (t *tail) processLine(ctx context.Context, stream, line string) {
line = strings.TrimRight(line, "\r\n")
// Strip the leading RFC3339Nano timestamp docker prepends when
// Timestamps=true. We validate the prefix actually parses as a
// time rather than blindly stripping at the first space — a
// legitimate log line whose first word is 20+ chars (e.g. a long
// hash with no whitespace) would otherwise lose data.
if idx := strings.IndexByte(line, ' '); idx >= 20 && idx <= 40 {
candidate := line[:idx]
if _, err := time.Parse(time.RFC3339Nano, candidate); err == nil {
line = line[idx+1:]
}
}
snap := t.snapshot.Load()
if snap == nil {
return
}
rules := snap.EffectiveFor(t.workloadID)
hits := t.engine.Match(t.containerID, t.workloadID, stream, line, rules)
for _, h := range hits {
t.emitter.EmitHit(ctx, h)
}
}
// isMultiplexedStream peeks at the first 8 bytes to detect docker's
// multiplexed frame header. The frame type byte must be 0..2 and
// bytes 1..3 must be zero. We restore the bytes via UnreadByte
// equivalent (bufio.Peek), so the actual reader is unaffected.
func isMultiplexedStream(r *bufio.Reader) bool {
peeked, err := r.Peek(8)
if err != nil || len(peeked) < 8 {
return false
}
if peeked[0] > 2 || peeked[1] != 0 || peeked[2] != 0 || peeked[3] != 0 {
return false
}
return true
}
func readLineFromBuffer(b *bytes.Buffer) (string, bool) {
idx := bytes.IndexByte(b.Bytes(), '\n')
if idx < 0 {
return "", false
}
line := make([]byte, idx)
copy(line, b.Bytes()[:idx])
b.Next(idx + 1) // consume line + newline
return string(line), true
}
// validate sanity-checks the tail before launch. Returns an error
// the manager can log rather than panicking on a nil dependency.
func (t *tail) validate() error {
if t.containerID == "" {
return fmt.Errorf("tail: container_id required")
}
if t.docker == nil {
return fmt.Errorf("tail: docker client required")
}
if t.engine == nil {
return fmt.Errorf("tail: engine required")
}
if t.emitter == nil {
return fmt.Errorf("tail: emitter required")
}
if t.snapshot == nil {
return fmt.Errorf("tail: snapshot pointer required")
}
return nil
}