package logscanner import ( "bufio" "bytes" "context" "errors" "fmt" "io" "log/slog" "strings" "sync/atomic" "time" "github.com/alexei/tinyforge/internal/docker" ) // maxFramePayloadBytes caps a single docker log frame payload so a // hostile or corrupted stream can't force an unbounded allocation. // Real-world container log frames are well under this; the limit is // purely a safety belt against the 4 GiB-by-spec upper bound the // header byte width allows. const maxFramePayloadBytes = 16 * 1024 * 1024 // 16 MiB // maxLineBufferBytes caps the per-stream line-reassembly buffer for // the same reason. A stream that never sends a newline would // otherwise grow without bound. 1 MiB matches the bufio.Scanner // default max and is far above any reasonable log line. const maxLineBufferBytes = 1 * 1024 * 1024 // 1 MiB // dockerLogger is the minimum surface the tail needs from the // docker client. Defined as an interface so tests can stand up a // canned log stream without spinning up containerd. type dockerLogger interface { ContainerLogsOpts(ctx context.Context, containerID string, opts docker.ContainerLogOptions) (io.ReadCloser, error) } // HitEmitter is what the tail calls when a rule fires. Implemented // by the manager — it writes to event_log and publishes on the bus. // Kept as a single-method interface so the tail has zero coupling to // store/events. type HitEmitter interface { EmitHit(ctx context.Context, hit Hit) } // tail watches one container. Lifetime is bound to the supplied // context — cancellation propagates through docker's log stream so // goroutines exit promptly on container stop or manager shutdown. type tail struct { containerID string workloadID string docker dockerLogger engine *Engine emitter HitEmitter snapshot *atomic.Pointer[Snapshot] } // run is the goroutine body. Opens a follow=true log stream and // reads lines until the stream EOFs or context is cancelled. // Tails terminate quietly on context cancel; any other error is // logged at warn so the operator sees it without it stopping the // whole manager. func (t *tail) run(ctx context.Context) { stream, err := t.docker.ContainerLogsOpts(ctx, t.containerID, docker.ContainerLogOptions{ Follow: true, Tail: "0", // start from the newest line; backfill is out of scope ShowStdout: true, ShowStderr: true, }) if err != nil { if !errors.Is(err, context.Canceled) { slog.Warn("logscanner: open log stream", "container", t.containerID, "error", err) } return } defer stream.Close() // Demuxing: docker emits multiplexed frames when TTY is off, // where each frame's first byte is the stream type (1=stdout, // 2=stderr) and bytes 4..7 are big-endian length. When TTY is // on the stream is plain bytes. We try to detect the frame // header on the first read; if it parses as a valid frame we // use the demux path, otherwise we fall back to line-by-line. bufStream := bufio.NewReaderSize(stream, 32*1024) if isMultiplexedStream(bufStream) { t.readDemuxed(ctx, bufStream) } else { t.readPlain(ctx, bufStream) } } // readDemuxed reads docker's multiplexed log frames. Each frame: // // [type 1 byte][000 3 bytes][len 4 bytes BE][payload len bytes] // // We track the stream type per-frame and feed payload bytes into a // per-stream line buffer so a line split across frames still // reassembles cleanly. func (t *tail) readDemuxed(ctx context.Context, r *bufio.Reader) { header := make([]byte, 8) // Two line buffers — stdout vs stderr — so a partial line read // across multiple frames doesn't bleed into the other stream. buffers := map[string]*bytes.Buffer{ "stdout": {}, "stderr": {}, } for { if ctx.Err() != nil { return } if _, err := io.ReadFull(r, header); err != nil { if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) { slog.Warn("logscanner: read header", "container", t.containerID, "error", err) } return } stream := "stdout" if header[0] == 2 { stream = "stderr" } size := int(uint32(header[4])<<24 | uint32(header[5])<<16 | uint32(header[6])<<8 | uint32(header[7])) if size <= 0 { continue } if size > maxFramePayloadBytes { slog.Warn("logscanner: frame payload exceeds cap, dropping tail", "container", t.containerID, "size", size, "cap", maxFramePayloadBytes) return } payload := make([]byte, size) if _, err := io.ReadFull(r, payload); err != nil { if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) { slog.Warn("logscanner: read payload", "container", t.containerID, "error", err) } return } buf := buffers[stream] buf.Write(payload) // Reassembly buffer should never accumulate beyond // maxLineBufferBytes — a stream with no newline would // otherwise grow unbounded. Drop the buffer (and the partial // line) when the cap is reached so the tail stays healthy. if buf.Len() > maxLineBufferBytes { slog.Warn("logscanner: line buffer exceeded cap, resetting", "container", t.containerID, "stream", stream, "size", buf.Len()) buf.Reset() continue } for { line, ok := readLineFromBuffer(buf) if !ok { break } t.processLine(ctx, stream, line) } } } // readPlain reads a non-multiplexed stream (TTY mode). All lines // are reported as "stdout" since the API doesn't separate the // streams in this mode. func (t *tail) readPlain(ctx context.Context, r *bufio.Reader) { scanner := bufio.NewScanner(r) scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) for scanner.Scan() { if ctx.Err() != nil { return } t.processLine(ctx, "stdout", scanner.Text()) } if err := scanner.Err(); err != nil && !errors.Is(err, context.Canceled) { slog.Warn("logscanner: plain read", "container", t.containerID, "error", err) } } func (t *tail) processLine(ctx context.Context, stream, line string) { line = strings.TrimRight(line, "\r\n") // Strip the leading RFC3339Nano timestamp docker prepends when // Timestamps=true. We validate the prefix actually parses as a // time rather than blindly stripping at the first space — a // legitimate log line whose first word is 20+ chars (e.g. a long // hash with no whitespace) would otherwise lose data. if idx := strings.IndexByte(line, ' '); idx >= 20 && idx <= 40 { candidate := line[:idx] if _, err := time.Parse(time.RFC3339Nano, candidate); err == nil { line = line[idx+1:] } } snap := t.snapshot.Load() if snap == nil { return } rules := snap.EffectiveFor(t.workloadID) hits := t.engine.Match(t.containerID, t.workloadID, stream, line, rules) for _, h := range hits { t.emitter.EmitHit(ctx, h) } } // isMultiplexedStream peeks at the first 8 bytes to detect docker's // multiplexed frame header. The frame type byte must be 0..2 and // bytes 1..3 must be zero. We restore the bytes via UnreadByte // equivalent (bufio.Peek), so the actual reader is unaffected. func isMultiplexedStream(r *bufio.Reader) bool { peeked, err := r.Peek(8) if err != nil || len(peeked) < 8 { return false } if peeked[0] > 2 || peeked[1] != 0 || peeked[2] != 0 || peeked[3] != 0 { return false } return true } func readLineFromBuffer(b *bytes.Buffer) (string, bool) { idx := bytes.IndexByte(b.Bytes(), '\n') if idx < 0 { return "", false } line := make([]byte, idx) copy(line, b.Bytes()[:idx]) b.Next(idx + 1) // consume line + newline return string(line), true } // validate sanity-checks the tail before launch. Returns an error // the manager can log rather than panicking on a nil dependency. func (t *tail) validate() error { if t.containerID == "" { return fmt.Errorf("tail: container_id required") } if t.docker == nil { return fmt.Errorf("tail: docker client required") } if t.engine == nil { return fmt.Errorf("tail: engine required") } if t.emitter == nil { return fmt.Errorf("tail: emitter required") } if t.snapshot == nil { return fmt.Errorf("tail: snapshot pointer required") } return nil }