7a9ff7ad54
Two paired backends sharing the events.Bus seam:
Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
redaction on read (placeholder echo treated as "no change" on
PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
filters (severity CSV, source CSV, message regex with memoized
compile cache). Structural loop-prevention: never writes to
event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
TierEventTrigger constant, doSendRaw shared with the legacy
Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
sending the real TriggerWebhookPayload shape. SSRF guard
rejects loopback / link-local / unspecified targets. PATCH
uses pointer-typed DTO for partial updates.
Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
per-container token bucket, atomic drop counters), tail
(multiplexed docker frame demuxer with TTY fallback + 16 MiB
payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
timestamp strip + UTF-8-safe message truncation), manager
(5s container polling, atomic.Pointer[Snapshot] hot-reload,
HitEmitter writes event_log + publishes EventLog so the
trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
EffectiveLogScanRules resolver (globals minus per-workload
overrides plus workload-only additions). Transactional
cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
(sample_line → matched/captures) + /stats (drop counters +
active tail count + last-snapshot compile errors) +
GET /api/workloads/{id}/effective-rules.
cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
246 lines
7.8 KiB
Go
246 lines
7.8 KiB
Go
package logscanner
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/alexei/tinyforge/internal/docker"
|
|
)
|
|
|
|
// maxFramePayloadBytes caps a single docker log frame payload so a
|
|
// hostile or corrupted stream can't force an unbounded allocation.
|
|
// Real-world container log frames are well under this; the limit is
|
|
// purely a safety belt against the 4 GiB-by-spec upper bound the
|
|
// header byte width allows.
|
|
const maxFramePayloadBytes = 16 * 1024 * 1024 // 16 MiB
|
|
|
|
// maxLineBufferBytes caps the per-stream line-reassembly buffer for
|
|
// the same reason. A stream that never sends a newline would
|
|
// otherwise grow without bound. 1 MiB matches the bufio.Scanner
|
|
// default max and is far above any reasonable log line.
|
|
const maxLineBufferBytes = 1 * 1024 * 1024 // 1 MiB
|
|
|
|
// dockerLogger is the minimum surface the tail needs from the
|
|
// docker client. Defined as an interface so tests can stand up a
|
|
// canned log stream without spinning up containerd.
|
|
type dockerLogger interface {
|
|
ContainerLogsOpts(ctx context.Context, containerID string, opts docker.ContainerLogOptions) (io.ReadCloser, error)
|
|
}
|
|
|
|
// HitEmitter is what the tail calls when a rule fires. Implemented
|
|
// by the manager — it writes to event_log and publishes on the bus.
|
|
// Kept as a single-method interface so the tail has zero coupling to
|
|
// store/events.
|
|
type HitEmitter interface {
|
|
EmitHit(ctx context.Context, hit Hit)
|
|
}
|
|
|
|
// tail watches one container. Lifetime is bound to the supplied
|
|
// context — cancellation propagates through docker's log stream so
|
|
// goroutines exit promptly on container stop or manager shutdown.
|
|
type tail struct {
|
|
containerID string
|
|
workloadID string
|
|
docker dockerLogger
|
|
engine *Engine
|
|
emitter HitEmitter
|
|
snapshot *atomic.Pointer[Snapshot]
|
|
}
|
|
|
|
// run is the goroutine body. Opens a follow=true log stream and
|
|
// reads lines until the stream EOFs or context is cancelled.
|
|
// Tails terminate quietly on context cancel; any other error is
|
|
// logged at warn so the operator sees it without it stopping the
|
|
// whole manager.
|
|
func (t *tail) run(ctx context.Context) {
|
|
stream, err := t.docker.ContainerLogsOpts(ctx, t.containerID, docker.ContainerLogOptions{
|
|
Follow: true,
|
|
Tail: "0", // start from the newest line; backfill is out of scope
|
|
ShowStdout: true,
|
|
ShowStderr: true,
|
|
})
|
|
if err != nil {
|
|
if !errors.Is(err, context.Canceled) {
|
|
slog.Warn("logscanner: open log stream", "container", t.containerID, "error", err)
|
|
}
|
|
return
|
|
}
|
|
defer stream.Close()
|
|
|
|
// Demuxing: docker emits multiplexed frames when TTY is off,
|
|
// where each frame's first byte is the stream type (1=stdout,
|
|
// 2=stderr) and bytes 4..7 are big-endian length. When TTY is
|
|
// on the stream is plain bytes. We try to detect the frame
|
|
// header on the first read; if it parses as a valid frame we
|
|
// use the demux path, otherwise we fall back to line-by-line.
|
|
bufStream := bufio.NewReaderSize(stream, 32*1024)
|
|
if isMultiplexedStream(bufStream) {
|
|
t.readDemuxed(ctx, bufStream)
|
|
} else {
|
|
t.readPlain(ctx, bufStream)
|
|
}
|
|
}
|
|
|
|
// readDemuxed reads docker's multiplexed log frames. Each frame:
|
|
//
|
|
// [type 1 byte][000 3 bytes][len 4 bytes BE][payload len bytes]
|
|
//
|
|
// We track the stream type per-frame and feed payload bytes into a
|
|
// per-stream line buffer so a line split across frames still
|
|
// reassembles cleanly.
|
|
func (t *tail) readDemuxed(ctx context.Context, r *bufio.Reader) {
|
|
header := make([]byte, 8)
|
|
// Two line buffers — stdout vs stderr — so a partial line read
|
|
// across multiple frames doesn't bleed into the other stream.
|
|
buffers := map[string]*bytes.Buffer{
|
|
"stdout": {},
|
|
"stderr": {},
|
|
}
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
if _, err := io.ReadFull(r, header); err != nil {
|
|
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
|
|
slog.Warn("logscanner: read header", "container", t.containerID, "error", err)
|
|
}
|
|
return
|
|
}
|
|
stream := "stdout"
|
|
if header[0] == 2 {
|
|
stream = "stderr"
|
|
}
|
|
size := int(uint32(header[4])<<24 | uint32(header[5])<<16 | uint32(header[6])<<8 | uint32(header[7]))
|
|
if size <= 0 {
|
|
continue
|
|
}
|
|
if size > maxFramePayloadBytes {
|
|
slog.Warn("logscanner: frame payload exceeds cap, dropping tail",
|
|
"container", t.containerID, "size", size, "cap", maxFramePayloadBytes)
|
|
return
|
|
}
|
|
payload := make([]byte, size)
|
|
if _, err := io.ReadFull(r, payload); err != nil {
|
|
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
|
|
slog.Warn("logscanner: read payload", "container", t.containerID, "error", err)
|
|
}
|
|
return
|
|
}
|
|
buf := buffers[stream]
|
|
buf.Write(payload)
|
|
// Reassembly buffer should never accumulate beyond
|
|
// maxLineBufferBytes — a stream with no newline would
|
|
// otherwise grow unbounded. Drop the buffer (and the partial
|
|
// line) when the cap is reached so the tail stays healthy.
|
|
if buf.Len() > maxLineBufferBytes {
|
|
slog.Warn("logscanner: line buffer exceeded cap, resetting",
|
|
"container", t.containerID, "stream", stream, "size", buf.Len())
|
|
buf.Reset()
|
|
continue
|
|
}
|
|
for {
|
|
line, ok := readLineFromBuffer(buf)
|
|
if !ok {
|
|
break
|
|
}
|
|
t.processLine(ctx, stream, line)
|
|
}
|
|
}
|
|
}
|
|
|
|
// readPlain reads a non-multiplexed stream (TTY mode). All lines
|
|
// are reported as "stdout" since the API doesn't separate the
|
|
// streams in this mode.
|
|
func (t *tail) readPlain(ctx context.Context, r *bufio.Reader) {
|
|
scanner := bufio.NewScanner(r)
|
|
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
|
|
for scanner.Scan() {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
t.processLine(ctx, "stdout", scanner.Text())
|
|
}
|
|
if err := scanner.Err(); err != nil && !errors.Is(err, context.Canceled) {
|
|
slog.Warn("logscanner: plain read", "container", t.containerID, "error", err)
|
|
}
|
|
}
|
|
|
|
func (t *tail) processLine(ctx context.Context, stream, line string) {
|
|
line = strings.TrimRight(line, "\r\n")
|
|
// Strip the leading RFC3339Nano timestamp docker prepends when
|
|
// Timestamps=true. We validate the prefix actually parses as a
|
|
// time rather than blindly stripping at the first space — a
|
|
// legitimate log line whose first word is 20+ chars (e.g. a long
|
|
// hash with no whitespace) would otherwise lose data.
|
|
if idx := strings.IndexByte(line, ' '); idx >= 20 && idx <= 40 {
|
|
candidate := line[:idx]
|
|
if _, err := time.Parse(time.RFC3339Nano, candidate); err == nil {
|
|
line = line[idx+1:]
|
|
}
|
|
}
|
|
snap := t.snapshot.Load()
|
|
if snap == nil {
|
|
return
|
|
}
|
|
rules := snap.EffectiveFor(t.workloadID)
|
|
hits := t.engine.Match(t.containerID, t.workloadID, stream, line, rules)
|
|
for _, h := range hits {
|
|
t.emitter.EmitHit(ctx, h)
|
|
}
|
|
}
|
|
|
|
// isMultiplexedStream peeks at the first 8 bytes to detect docker's
|
|
// multiplexed frame header. The frame type byte must be 0..2 and
|
|
// bytes 1..3 must be zero. We restore the bytes via UnreadByte
|
|
// equivalent (bufio.Peek), so the actual reader is unaffected.
|
|
func isMultiplexedStream(r *bufio.Reader) bool {
|
|
peeked, err := r.Peek(8)
|
|
if err != nil || len(peeked) < 8 {
|
|
return false
|
|
}
|
|
if peeked[0] > 2 || peeked[1] != 0 || peeked[2] != 0 || peeked[3] != 0 {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func readLineFromBuffer(b *bytes.Buffer) (string, bool) {
|
|
idx := bytes.IndexByte(b.Bytes(), '\n')
|
|
if idx < 0 {
|
|
return "", false
|
|
}
|
|
line := make([]byte, idx)
|
|
copy(line, b.Bytes()[:idx])
|
|
b.Next(idx + 1) // consume line + newline
|
|
return string(line), true
|
|
}
|
|
|
|
// validate sanity-checks the tail before launch. Returns an error
|
|
// the manager can log rather than panicking on a nil dependency.
|
|
func (t *tail) validate() error {
|
|
if t.containerID == "" {
|
|
return fmt.Errorf("tail: container_id required")
|
|
}
|
|
if t.docker == nil {
|
|
return fmt.Errorf("tail: docker client required")
|
|
}
|
|
if t.engine == nil {
|
|
return fmt.Errorf("tail: engine required")
|
|
}
|
|
if t.emitter == nil {
|
|
return fmt.Errorf("tail: emitter required")
|
|
}
|
|
if t.snapshot == nil {
|
|
return fmt.Errorf("tail: snapshot pointer required")
|
|
}
|
|
return nil
|
|
}
|