feat(observability): event triggers + log scanner backend

Two paired backends sharing the events.Bus seam:

Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
  redaction on read (placeholder echo treated as "no change" on
  PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
  filters (severity CSV, source CSV, message regex with memoized
  compile cache). Structural loop-prevention: never writes to
  event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
  TierEventTrigger constant, doSendRaw shared with the legacy
  Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
  sending the real TriggerWebhookPayload shape. SSRF guard
  rejects loopback / link-local / unspecified targets. PATCH
  uses pointer-typed DTO for partial updates.

Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
  per-container token bucket, atomic drop counters), tail
  (multiplexed docker frame demuxer with TTY fallback + 16 MiB
  payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
  timestamp strip + UTF-8-safe message truncation), manager
  (5s container polling, atomic.Pointer[Snapshot] hot-reload,
  HitEmitter writes event_log + publishes EventLog so the
  trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
  stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
  EffectiveLogScanRules resolver (globals minus per-workload
  overrides plus workload-only additions). Transactional
  cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
  (sample_line → matched/captures) + /stats (drop counters +
  active tail count + last-snapshot compile errors) +
  GET /api/workloads/{id}/effective-rules.

cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-11 22:18:11 +03:00
parent 82d32181ba
commit 7a9ff7ad54
23 changed files with 3974 additions and 19 deletions
+227
View File
@@ -0,0 +1,227 @@
package logscanner
import (
"sync"
"sync/atomic"
"time"
)
// Engine evaluates lines against a snapshot, gates by per-rule
// cooldown, and rate-limits emissions per container so a noisy regex
// can't flood the event_log table.
type Engine struct {
// cooldownMu guards lastFiredAt. The map is keyed on a
// composite (containerID, ruleID) so the same rule firing on two
// different containers gets independent cooldowns — matches the
// operator intuition that "this container is alerting" doesn't
// suppress an alert from a different container.
cooldownMu sync.Mutex
lastFiredAt map[cooldownKey]time.Time
// bucketMu guards tokenBuckets. One bucket per container.
bucketMu sync.Mutex
tokenBuckets map[string]*bucket
// Drop counters. Incremented when a matching hit is suppressed
// before reaching the bus. droppedByBucket covers per-container
// rate-limit drops; droppedByCooldown counts cooldown-suppressed
// matches so operators can tell whether their patterns are too
// greedy vs too tightly cooled. Atomic so the hot path doesn't
// take the lock just to bump the counter.
droppedByBucket atomic.Int64
droppedByCooldown atomic.Int64
// Configuration knobs. Defaults set in NewEngine.
tokensPerWindow int // bucket capacity
tokenWindow time.Duration // bucket refill window
now func() time.Time
}
// EngineStats is the public-facing counter snapshot. Returned by
// Stats() and surfaced through the manager + API for operator
// visibility — when a noisy regex floods the bucket, this is how
// they discover it.
type EngineStats struct {
DroppedByBucket int64 `json:"dropped_by_bucket"`
DroppedByCooldown int64 `json:"dropped_by_cooldown"`
}
type cooldownKey struct {
ContainerID string
RuleID int64
}
type bucket struct {
tokens int
resetsAt time.Time
}
// NewEngine constructs an Engine with sensible defaults: 10
// events / 60s per container. Both knobs can be overridden via
// the With* options.
func NewEngine(opts ...Option) *Engine {
e := &Engine{
lastFiredAt: map[cooldownKey]time.Time{},
tokenBuckets: map[string]*bucket{},
tokensPerWindow: 10,
tokenWindow: 60 * time.Second,
now: time.Now,
}
for _, opt := range opts {
opt(e)
}
return e
}
// Option mutates an Engine during construction.
type Option func(*Engine)
// WithBucket sets the per-container token bucket capacity and
// refill window. Used by tests to make rate-limit assertions
// deterministic.
func WithBucket(tokens int, window time.Duration) Option {
return func(e *Engine) {
e.tokensPerWindow = tokens
e.tokenWindow = window
}
}
// WithNow overrides the clock for tests.
func WithNow(now func() time.Time) Option {
return func(e *Engine) {
e.now = now
}
}
// Match is the hot-path evaluation. For every rule whose Streams
// covers `stream` and whose pattern matches `line`, returns one
// Hit — gated by cooldown and per-container token bucket. Empty
// result means "drop this line."
//
// Side-effect: cooldown timestamps + bucket counters are updated
// when a rule fires. Callers must not retry the same line.
func (e *Engine) Match(containerID, workloadID, stream, line string, rules []Rule) []Hit {
if line == "" || len(rules) == 0 {
return nil
}
var hits []Hit
now := e.now()
for _, r := range rules {
if !streamMatches(r.Streams, stream) {
continue
}
if !r.Pattern.MatchString(line) {
continue
}
if !e.cooledDown(containerID, r, now) {
// Matched but inside the cooldown window — bump the
// counter so operators can see when their cooldowns
// are eating real signal.
e.droppedByCooldown.Add(1)
continue
}
if !e.takeToken(containerID, now) {
// Bucket exhausted for this container. Bump the
// counter so the stats endpoint can surface chatty
// patterns; the operator's signal is "Stats says we
// dropped N events — your rule is too broad."
e.droppedByBucket.Add(1)
continue
}
e.markFired(containerID, r.ID, now)
hits = append(hits, Hit{
Rule: r,
ContainerID: containerID,
WorkloadID: workloadID,
Stream: stream,
Line: line,
})
}
return hits
}
// Stats returns a snapshot of the engine's drop counters. The
// counters are monotonic over the lifetime of the engine — useful
// for trend observation, but not for instantaneous "is it dropping
// right now" alerting. Reset semantics: the engine is recreated on
// every process restart so counters reset to zero with the binary.
func (e *Engine) Stats() EngineStats {
return EngineStats{
DroppedByBucket: e.droppedByBucket.Load(),
DroppedByCooldown: e.droppedByCooldown.Load(),
}
}
// Hit is one rule fire — the engine returns these for the manager
// to persist + publish on the bus. Kept narrow on purpose so the
// engine has no event_log / bus dependency.
type Hit struct {
Rule Rule
ContainerID string
WorkloadID string
Stream string
Line string
}
// streamMatches checks whether a rule that scopes itself to a
// stream subset accepts the given stream. Empty rule.Streams is
// equivalent to "all" for forward-compat with older rows.
func streamMatches(ruleStreams, lineStream string) bool {
if ruleStreams == "" || ruleStreams == "all" {
return true
}
return ruleStreams == lineStream
}
func (e *Engine) cooledDown(containerID string, r Rule, now time.Time) bool {
if r.CooldownSeconds <= 0 {
return true
}
e.cooldownMu.Lock()
defer e.cooldownMu.Unlock()
last, ok := e.lastFiredAt[cooldownKey{containerID, r.ID}]
if !ok {
return true
}
return now.Sub(last) >= time.Duration(r.CooldownSeconds)*time.Second
}
func (e *Engine) markFired(containerID string, ruleID int64, now time.Time) {
e.cooldownMu.Lock()
e.lastFiredAt[cooldownKey{containerID, ruleID}] = now
e.cooldownMu.Unlock()
}
func (e *Engine) takeToken(containerID string, now time.Time) bool {
e.bucketMu.Lock()
defer e.bucketMu.Unlock()
b, ok := e.tokenBuckets[containerID]
if !ok {
b = &bucket{tokens: e.tokensPerWindow, resetsAt: now.Add(e.tokenWindow)}
e.tokenBuckets[containerID] = b
}
if !now.Before(b.resetsAt) {
b.tokens = e.tokensPerWindow
b.resetsAt = now.Add(e.tokenWindow)
}
if b.tokens <= 0 {
return false
}
b.tokens--
return true
}
// Forget drops cooldown + bucket state for a container that has been
// removed. Called by the manager when a tail exits to reclaim memory.
func (e *Engine) Forget(containerID string) {
e.cooldownMu.Lock()
for k := range e.lastFiredAt {
if k.ContainerID == containerID {
delete(e.lastFiredAt, k)
}
}
e.cooldownMu.Unlock()
e.bucketMu.Lock()
delete(e.tokenBuckets, containerID)
e.bucketMu.Unlock()
}
+223
View File
@@ -0,0 +1,223 @@
package logscanner
import (
"regexp"
"testing"
"time"
)
func mustRegexp(t *testing.T, p string) *regexp.Regexp {
t.Helper()
re, err := regexp.Compile(p)
if err != nil {
t.Fatalf("compile %q: %v", p, err)
}
return re
}
func newRule(t *testing.T, id int64, pattern string, opts ...func(*Rule)) Rule {
r := Rule{
ID: id,
Name: "rule" + pattern,
Pattern: mustRegexp(t, pattern),
Severity: "warn",
Streams: "all",
CooldownSeconds: 0,
}
for _, o := range opts {
o(&r)
}
return r
}
func TestEngineMatch_BasicHit(t *testing.T) {
e := NewEngine()
rules := []Rule{newRule(t, 1, `panic`)}
hits := e.Match("c1", "w1", "stderr", "fatal panic in worker", rules)
if len(hits) != 1 {
t.Fatalf("want 1 hit, got %d", len(hits))
}
if hits[0].Rule.ID != 1 {
t.Errorf("rule id mismatch: %d", hits[0].Rule.ID)
}
}
func TestEngineMatch_StreamFilter(t *testing.T) {
e := NewEngine()
rules := []Rule{newRule(t, 1, `boom`, func(r *Rule) { r.Streams = "stderr" })}
if len(e.Match("c", "w", "stdout", "boom there", rules)) != 0 {
t.Error("stdout line should not match stderr-only rule")
}
if len(e.Match("c", "w", "stderr", "boom there", rules)) != 1 {
t.Error("stderr line should match stderr-only rule")
}
}
func TestEngineMatch_NoMatchNoHit(t *testing.T) {
e := NewEngine()
rules := []Rule{newRule(t, 1, `panic`)}
if h := e.Match("c", "w", "stdout", "all good", rules); len(h) != 0 {
t.Errorf("expected no hit, got %d", len(h))
}
}
func TestEngineMatch_Cooldown(t *testing.T) {
now := time.Now()
clock := now
e := NewEngine(WithNow(func() time.Time { return clock }))
rules := []Rule{newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 30 })}
if len(e.Match("c", "w", "stdout", "bad event", rules)) != 1 {
t.Fatal("first fire expected")
}
clock = now.Add(10 * time.Second)
if h := e.Match("c", "w", "stdout", "bad event 2", rules); len(h) != 0 {
t.Errorf("cooled-down rule fired: %+v", h)
}
clock = now.Add(31 * time.Second)
if len(e.Match("c", "w", "stdout", "bad event 3", rules)) != 1 {
t.Error("cooldown expired but rule did not fire")
}
}
func TestEngineMatch_PerContainerCooldownIndependent(t *testing.T) {
// Same rule firing on two different containers should not
// share cooldown — operators expect each container's alerts
// to be independent.
now := time.Now()
clock := now
e := NewEngine(WithNow(func() time.Time { return clock }))
rules := []Rule{newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 30 })}
e.Match("c1", "w", "stdout", "bad on one", rules)
if len(e.Match("c2", "w", "stdout", "bad on two", rules)) != 1 {
t.Error("second container should fire independently")
}
}
func TestEngineMatch_TokenBucket(t *testing.T) {
now := time.Now()
clock := now
e := NewEngine(WithNow(func() time.Time { return clock }), WithBucket(3, time.Minute))
rules := []Rule{newRule(t, 1, `noisy`)}
for i := 0; i < 3; i++ {
if len(e.Match("c", "w", "stdout", "noisy "+string(rune('a'+i)), rules)) != 1 {
t.Errorf("hit %d should fire", i)
}
}
// 4th within window should be dropped.
if h := e.Match("c", "w", "stdout", "noisy d", rules); len(h) != 0 {
t.Errorf("4th hit should be rate-limited, got %d", len(h))
}
// After the window the bucket refills.
clock = now.Add(time.Minute + time.Second)
if len(e.Match("c", "w", "stdout", "noisy e", rules)) != 1 {
t.Error("bucket should have refilled after window")
}
}
func TestEngineForget_DropsState(t *testing.T) {
e := NewEngine()
rules := []Rule{newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 999 })}
e.Match("c", "w", "stdout", "bad once", rules)
e.Forget("c")
// After Forget, the same rule on the same container should
// fire again immediately — cooldown state was cleared.
if len(e.Match("c", "w", "stdout", "bad again", rules)) != 1 {
t.Error("Forget should drop cooldown state")
}
}
func TestEngineStats_CooldownCounter(t *testing.T) {
// Two matches inside the cooldown window should drop once and
// increment DroppedByCooldown by one. Bucket is generous so it
// never participates.
now := time.Now()
clock := now
e := NewEngine(
WithNow(func() time.Time { return clock }),
WithBucket(100, time.Hour),
)
rule := newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 60 })
if len(e.Match("c", "w", "stdout", "bad a", []Rule{rule})) != 1 {
t.Fatal("first fire expected")
}
if len(e.Match("c", "w", "stdout", "bad b", []Rule{rule})) != 0 {
t.Fatal("second fire inside cooldown should drop")
}
stats := e.Stats()
if stats.DroppedByCooldown != 1 {
t.Errorf("DroppedByCooldown = %d, want 1", stats.DroppedByCooldown)
}
if stats.DroppedByBucket != 0 {
t.Errorf("DroppedByBucket = %d, want 0 (bucket should not have fired)",
stats.DroppedByBucket)
}
}
func TestEngineStats_BucketCounter(t *testing.T) {
// Drain a small bucket without any cooldown so every drop
// after the first is bucket-attributable.
now := time.Now()
clock := now
e := NewEngine(
WithNow(func() time.Time { return clock }),
WithBucket(2, time.Hour),
)
rule := newRule(t, 1, `bad`) // cooldown=0
// 2 fires consume both tokens.
for i := 0; i < 2; i++ {
if len(e.Match("c", "w", "stdout", "bad", []Rule{rule})) != 1 {
t.Fatalf("fire %d expected to succeed", i)
}
}
// 3rd and 4th fires hit an empty bucket.
for i := 0; i < 2; i++ {
if len(e.Match("c", "w", "stdout", "bad", []Rule{rule})) != 0 {
t.Fatalf("fire %d expected to be bucket-dropped", i+2)
}
}
stats := e.Stats()
if stats.DroppedByBucket != 2 {
t.Errorf("DroppedByBucket = %d, want 2", stats.DroppedByBucket)
}
if stats.DroppedByCooldown != 0 {
t.Errorf("DroppedByCooldown = %d, want 0 (cooldown should not have fired)",
stats.DroppedByCooldown)
}
}
func TestEngineStats_NoDropsWhenAllowed(t *testing.T) {
e := NewEngine(WithBucket(100, time.Minute))
rule := newRule(t, 1, `x`)
for i := 0; i < 5; i++ {
e.Match("c", "w", "stdout", "x", []Rule{rule})
}
stats := e.Stats()
if stats.DroppedByBucket != 0 || stats.DroppedByCooldown != 0 {
t.Errorf("expected zero drops, got %+v", stats)
}
}
func TestStreamMatches(t *testing.T) {
cases := []struct {
rule, line string
want bool
}{
{"all", "stdout", true},
{"all", "stderr", true},
{"", "stdout", true},
{"stdout", "stdout", true},
{"stdout", "stderr", false},
{"stderr", "stderr", true},
{"stderr", "stdout", false},
}
for _, c := range cases {
got := streamMatches(c.rule, c.line)
if got != c.want {
t.Errorf("streamMatches(%q,%q) = %v want %v", c.rule, c.line, got, c.want)
}
}
}
+345
View File
@@ -0,0 +1,345 @@
package logscanner
import (
"context"
"encoding/json"
"log/slog"
"strconv"
"sync"
"sync/atomic"
"time"
"unicode/utf8"
"github.com/alexei/tinyforge/internal/docker"
"github.com/alexei/tinyforge/internal/events"
"github.com/alexei/tinyforge/internal/store"
)
// RuleSource is the read-side seam for fetching the current rule
// rows. Real callers pass *store.Store; tests pass a fake.
type RuleSource interface {
ListLogScanRules() ([]store.LogScanRule, error)
}
// ContainerLister is what the manager needs from the store to
// discover running container rows. The filter is set by the
// manager (state="running") so adapters don't have to do that.
type ContainerLister interface {
ListContainers(filter store.ContainerFilter) ([]store.Container, error)
}
// EventStore writes the matched-line entries into event_log.
// Same shape as events.PersistFunc but typed to keep the package
// self-contained.
type EventStore interface {
InsertEvent(evt store.EventLog) (store.EventLog, error)
}
// Manager owns the lifecycle of per-container tails. It polls the
// container index every PollInterval (default 5s), starts tails for
// new running containers, and stops tails when a container is no
// longer running (state != "running" or row deleted).
//
// Rule changes (CRUD via API) trigger ReloadRules which rebuilds the
// snapshot once and atomically swaps the pointer all tails read.
type Manager struct {
rules RuleSource
containers ContainerLister
docker dockerLogger
events EventStore
bus *events.Bus
pollInterval time.Duration
snapshot atomic.Pointer[Snapshot]
engine *Engine
mu sync.Mutex
tails map[string]context.CancelFunc // containerID -> cancel
tailWG sync.WaitGroup
// statsMu guards lastCompileErrors. Compile-error visibility
// matters because a broken rule silently disappears from the
// snapshot — without surfacing the message, the operator has
// no signal beyond a warn-level log line.
statsMu sync.RWMutex
lastCompileErrors []string
ctx context.Context
cancel context.CancelFunc
}
// Stats bundles the operator-facing observability for the log
// scanner. EngineStats covers the hot-path drop counters; compile
// errors are the last snapshot's invalid-pattern messages (helpful
// when a freshly-saved rule "doesn't fire" — the issue is usually
// here). ActiveTails reports how many container goroutines the
// manager is currently driving.
type Stats struct {
Engine EngineStats `json:"engine"`
ActiveTails int `json:"active_tails"`
LastCompileErrors []string `json:"last_compile_errors"`
}
// Config bundles the constructor inputs.
type Config struct {
Rules RuleSource
Containers ContainerLister
Docker *docker.Client
Events EventStore
Bus *events.Bus
PollInterval time.Duration // default 5s
}
// NewManager wires a manager with the supplied dependencies.
// It does not start polling — call Start to begin the lifecycle.
func NewManager(cfg Config) *Manager {
poll := cfg.PollInterval
if poll <= 0 {
poll = 5 * time.Second
}
return &Manager{
rules: cfg.Rules,
containers: cfg.Containers,
docker: cfg.Docker,
events: cfg.Events,
bus: cfg.Bus,
pollInterval: poll,
engine: NewEngine(),
tails: map[string]context.CancelFunc{},
}
}
// Start kicks off the polling loop and initial snapshot build.
// Returns an error only if the initial rule load fails; subsequent
// load failures are logged but do not stop the manager.
func (m *Manager) Start(ctx context.Context) error {
if err := m.ReloadRules(); err != nil {
return err
}
m.ctx, m.cancel = context.WithCancel(ctx)
go m.loop()
return nil
}
// Stop cancels every tail and waits for them to exit.
func (m *Manager) Stop() {
if m.cancel != nil {
m.cancel()
}
m.tailWG.Wait()
}
// ReloadRules rebuilds the snapshot from the current store state.
// Safe to call concurrently with the polling loop and with tails —
// the atomic pointer swap is the only synchronization required.
//
// Compile errors are both logged and stored on the manager so the
// API stats endpoint can surface them to operators. The set is
// fully replaced on each reload — there's no "and previously
// these other rules were broken" trail.
func (m *Manager) ReloadRules() error {
rows, err := m.rules.ListLogScanRules()
if err != nil {
return err
}
snap, compileErrs := BuildSnapshot(rows)
errMsgs := make([]string, 0, len(compileErrs))
for _, e := range compileErrs {
slog.Warn("logscanner: rule compile failed (dropped from snapshot)", "error", e)
errMsgs = append(errMsgs, e.Error())
}
m.snapshot.Store(snap)
m.statsMu.Lock()
m.lastCompileErrors = errMsgs
m.statsMu.Unlock()
return nil
}
// Stats returns the current operator-facing observability snapshot.
// Read-safe: the mutex protects only the compile-errors slice;
// counters and tail-count are atomic / mutex-guarded internally.
func (m *Manager) Stats() Stats {
m.statsMu.RLock()
errs := make([]string, len(m.lastCompileErrors))
copy(errs, m.lastCompileErrors)
m.statsMu.RUnlock()
m.mu.Lock()
tails := len(m.tails)
m.mu.Unlock()
return Stats{
Engine: m.engine.Stats(),
ActiveTails: tails,
LastCompileErrors: errs,
}
}
// EmitHit persists a hit as an event_log row and publishes it on
// the bus. Implements the HitEmitter interface so tails depend on
// the interface, not the concrete manager — also makes the manager
// easy to unit-test by passing a recording emitter.
func (m *Manager) EmitHit(ctx context.Context, hit Hit) {
const maxMessage = 500 // truncate long lines so one chatty rule can't blow up event_log
msg := truncateUTF8(hit.Line, maxMessage)
meta := map[string]any{
"workload_id": hit.WorkloadID,
"container_id": hit.ContainerID,
"rule_id": hit.Rule.ID,
"rule_name": hit.Rule.Name,
"stream": hit.Stream,
}
if subs := hit.Rule.Pattern.FindStringSubmatch(hit.Line); len(subs) > 1 {
captures := map[string]string{}
for i, sub := range subs[1:] {
captures[indexName(hit.Rule.Pattern, i+1)] = sub
}
meta["captures"] = captures
}
metaJSON, _ := json.Marshal(meta)
evt, err := m.events.InsertEvent(store.EventLog{
Source: "logscan",
Severity: nonEmpty(hit.Rule.Severity, store.LogScanSeverityWarn),
Message: msg,
Metadata: string(metaJSON),
})
if err != nil {
slog.Warn("logscanner: persist event", "rule", hit.Rule.Name, "error", err)
return
}
if m.bus != nil {
m.bus.Publish(events.Event{
Type: events.EventLog,
Payload: events.EventLogPayload{
ID: evt.ID,
Source: evt.Source,
Severity: evt.Severity,
Message: evt.Message,
Metadata: evt.Metadata,
CreatedAt: evt.CreatedAt,
},
})
}
}
// loop runs the lifecycle poll until ctx cancellation. It is single-
// threaded over the tails map — start/stop of individual tails
// happens here so there's no race on the map.
func (m *Manager) loop() {
ticker := time.NewTicker(m.pollInterval)
defer ticker.Stop()
m.reconcile() // run once immediately
for {
select {
case <-m.ctx.Done():
m.stopAllTails()
return
case <-ticker.C:
m.reconcile()
}
}
}
// reconcile diffs the desired set of running container IDs against
// the live tails and starts/stops as needed.
func (m *Manager) reconcile() {
rows, err := m.containers.ListContainers(store.ContainerFilter{State: "running"})
if err != nil {
slog.Warn("logscanner: list containers", "error", err)
return
}
desired := map[string]store.Container{}
for _, c := range rows {
if c.ContainerID == "" {
continue
}
desired[c.ContainerID] = c
}
m.mu.Lock()
defer m.mu.Unlock()
for id, c := range desired {
if _, ok := m.tails[id]; ok {
continue
}
m.startTailLocked(id, c.WorkloadID)
}
for id, cancel := range m.tails {
if _, ok := desired[id]; !ok {
cancel()
delete(m.tails, id)
m.engine.Forget(id)
}
}
}
func (m *Manager) startTailLocked(containerID, workloadID string) {
tailCtx, cancel := context.WithCancel(m.ctx)
t := &tail{
containerID: containerID,
workloadID: workloadID,
docker: m.docker,
engine: m.engine,
emitter: m,
snapshot: &m.snapshot,
}
if err := t.validate(); err != nil {
slog.Warn("logscanner: tail validation failed", "container", containerID, "error", err)
cancel()
return
}
m.tails[containerID] = cancel
m.tailWG.Add(1)
go func() {
defer m.tailWG.Done()
t.run(tailCtx)
}()
}
func (m *Manager) stopAllTails() {
m.mu.Lock()
for id, cancel := range m.tails {
cancel()
delete(m.tails, id)
}
m.mu.Unlock()
m.tailWG.Wait()
}
// indexName returns the name of capture group i (1-based). Falls
// back to "$<i>" when the group is unnamed so JSON keys stay stable
// AND distinct across groups (the previous $N collision-fallback
// silently dropped groups beyond $3 onto the same JSON key).
func indexName(re interface{ SubexpNames() []string }, i int) string {
names := re.SubexpNames()
if i < len(names) && names[i] != "" {
return names[i]
}
return "$" + strconv.Itoa(i)
}
func nonEmpty(a, b string) string {
if a != "" {
return a
}
return b
}
// truncateUTF8 returns s shortened to at most maxBytes, cutting on
// a rune boundary so we never leave a partial UTF-8 codepoint in
// the output. An ellipsis is appended when truncation occurred so
// downstream readers can tell. Callers must size maxBytes to
// include the trailing 3-byte ellipsis.
func truncateUTF8(s string, maxBytes int) string {
if len(s) <= maxBytes {
return s
}
end := maxBytes
// Walk back from the byte cut to the last rune boundary so
// utf8.ValidString stays true on the returned slice.
for end > 0 && !utf8.RuneStart(s[end]) {
end--
}
return s[:end] + "…"
}
+108
View File
@@ -0,0 +1,108 @@
package logscanner
import (
"regexp"
"strings"
"testing"
"unicode/utf8"
)
func TestTruncateUTF8(t *testing.T) {
cases := []struct {
name string
in string
maxBytes int
want string
}{
{
name: "shorter than cap untouched",
in: "hello",
maxBytes: 100,
want: "hello",
},
{
name: "ASCII truncation",
in: "abcdefghij",
maxBytes: 5,
want: "abcde…",
},
{
name: "cuts on rune boundary inside multibyte",
in: "abcdé",
maxBytes: 5,
want: "abcd…",
},
{
name: "preserves valid utf-8 when cap lands mid-codepoint",
in: "ééééééé",
maxBytes: 5,
want: "éé…",
},
{
name: "empty input untouched",
in: "",
maxBytes: 5,
want: "",
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
got := truncateUTF8(c.in, c.maxBytes)
if got != c.want {
t.Errorf("got %q want %q", got, c.want)
}
if !utf8.ValidString(got) {
t.Errorf("result not valid UTF-8: %q", got)
}
})
}
}
func TestNonEmpty(t *testing.T) {
if nonEmpty("a", "b") != "a" {
t.Error("first non-empty wins")
}
if nonEmpty("", "b") != "b" {
t.Error("fallback when first empty")
}
if nonEmpty("", "") != "" {
t.Error("both empty yields empty")
}
}
func TestIndexName_UnamedGroupsStable(t *testing.T) {
// Each unnamed group should get a distinct fallback name so
// JSON serialization doesn't collapse $4..$N onto a single key.
re := mustCompile(t, `(\w+) (\w+) (\w+) (\w+) (\w+)`)
seen := map[string]bool{}
for i := 1; i <= 5; i++ {
name := indexName(re, i)
if seen[name] {
t.Errorf("indexName(%d) = %q collides with prior group", i, name)
}
seen[name] = true
if !strings.HasPrefix(name, "$") {
t.Errorf("unnamed group %d should fall back to $N form, got %q", i, name)
}
}
}
func TestIndexName_NamedGroupWins(t *testing.T) {
re := mustCompile(t, `(?P<code>\d+) (\w+)`)
if got := indexName(re, 1); got != "code" {
t.Errorf("named group should win: %q", got)
}
if got := indexName(re, 2); got != "$2" {
t.Errorf("second (unnamed) group: %q", got)
}
}
// mustCompile is a local helper so the test file is self-contained.
func mustCompile(t *testing.T, pattern string) interface{ SubexpNames() []string } {
t.Helper()
r, err := regexp.Compile(pattern)
if err != nil {
t.Fatalf("compile %q: %v", pattern, err)
}
return r
}
+143
View File
@@ -0,0 +1,143 @@
// Package logscanner tails running container logs, matches lines
// against operator-configured regex rules, and emits event_log entries
// via the events bus. The package is split into four files:
//
// - rules.go: the rule snapshot — compiled regexes + per-rule
// metadata. Snapshots are immutable; the manager builds a new
// snapshot on every rule change and swaps it atomically.
// - engine.go: rule evaluation per line + cooldown + token bucket.
// - tail.go: per-container goroutine reading docker log stream.
// - manager.go: container lifecycle polling + tail lifecycle.
package logscanner
import (
"fmt"
"regexp"
"github.com/alexei/tinyforge/internal/store"
)
// Rule is a compiled, immutable representation of a store.LogScanRule.
// Built once when the snapshot is loaded; held by every tail goroutine
// via the snapshot atomic pointer.
type Rule struct {
ID int64
WorkloadID string // "" = global
Name string
Pattern *regexp.Regexp
Severity string
Streams string // "all" | "stdout" | "stderr"
CooldownSeconds int
}
// Snapshot is the rule set as seen by tails. Built from a flat slice
// of store.LogScanRule rows: globals are expanded via
// store.EffectiveLogScanRules to yield the per-workload effective
// set at lookup time.
//
// The snapshot is immutable. Hot-reload semantics are achieved by
// building a fresh snapshot in the manager and swapping the atomic
// pointer; in-flight tails finish their current match against the
// old snapshot, then pick up the new one on the next line.
//
// MUST NOT mutate any field (including map values, including slice
// elements) after BuildSnapshot returns — tails read these
// concurrently with no locking and rely on immutability for safety.
type Snapshot struct {
// global is the list of compiled global rules (workload_id == "").
global []Rule
// perWorkload[workloadID] holds workload-only additions AND
// per-workload overrides (already resolved against globals).
// EffectiveFor(id) merges global + perWorkload[id] minus any
// global overridden under that workload.
perWorkload map[string][]Rule
// overrides[id] is the workload's overriding rule (compiled).
// overrides[id][globalID] -> Rule (the override row).
overrides map[string]map[int64]Rule
}
// BuildSnapshot compiles every rule's pattern and returns an
// immutable Snapshot. Rules that fail to compile are skipped with
// their error reported to the caller — store-side validation keeps
// the pattern field as a free-form regex string, so engine-time
// compile failures are an expected, recoverable mode.
//
// The returned error slice is informational, not fatal: bad rules
// are dropped from the snapshot but the rest still work.
func BuildSnapshot(rows []store.LogScanRule) (*Snapshot, []error) {
s := &Snapshot{
perWorkload: map[string][]Rule{},
overrides: map[string]map[int64]Rule{},
}
var errs []error
for _, row := range rows {
if !row.Enabled {
// Skip outright. Overrides with enabled=false still
// need to be recorded so they suppress a global —
// handled by tracking disable separately below.
}
re, err := regexp.Compile(row.Pattern)
if err != nil {
errs = append(errs, fmt.Errorf("rule #%d %q: %w", row.ID, row.Name, err))
continue
}
r := Rule{
ID: row.ID,
WorkloadID: row.WorkloadID,
Name: row.Name,
Pattern: re,
Severity: row.Severity,
Streams: row.Streams,
CooldownSeconds: row.CooldownSeconds,
}
switch {
case row.WorkloadID == "" && row.OverridesID == 0:
if row.Enabled {
s.global = append(s.global, r)
}
case row.WorkloadID != "" && row.OverridesID == 0:
if row.Enabled {
s.perWorkload[row.WorkloadID] = append(s.perWorkload[row.WorkloadID], r)
}
case row.WorkloadID != "" && row.OverridesID != 0:
// Override row: always record so a disabled override
// suppresses the global for this workload.
if _, ok := s.overrides[row.WorkloadID]; !ok {
s.overrides[row.WorkloadID] = map[int64]Rule{}
}
if row.Enabled {
s.overrides[row.WorkloadID][row.OverridesID] = r
} else {
// Encode "disabled override" as the zero rule so
// EffectiveFor can drop it without re-querying.
s.overrides[row.WorkloadID][row.OverridesID] = Rule{ID: row.OverridesID}
}
}
}
return s, errs
}
// EffectiveFor returns the rule list to evaluate against logs of a
// specific workload. Equivalent to store.EffectiveLogScanRules but
// operates on the compiled snapshot, so the hot path is allocation
// free except for the result slice.
func (s *Snapshot) EffectiveFor(workloadID string) []Rule {
if s == nil {
return nil
}
overrides := s.overrides[workloadID]
out := make([]Rule, 0, len(s.global)+len(s.perWorkload[workloadID]))
for _, g := range s.global {
if ov, ok := overrides[g.ID]; ok {
if ov.Pattern == nil {
// Suppressed for this workload — skip.
continue
}
out = append(out, ov)
continue
}
out = append(out, g)
}
out = append(out, s.perWorkload[workloadID]...)
return out
}
+109
View File
@@ -0,0 +1,109 @@
package logscanner
import (
"testing"
"github.com/alexei/tinyforge/internal/store"
)
func TestBuildSnapshot_CompileErrorsReported(t *testing.T) {
rows := []store.LogScanRule{
{ID: 1, Pattern: `valid`, Enabled: true},
{ID: 2, Pattern: `([unclosed`, Enabled: true},
}
snap, errs := BuildSnapshot(rows)
if snap == nil {
t.Fatal("snapshot should not be nil")
}
if len(errs) != 1 {
t.Fatalf("expected 1 compile error, got %d", len(errs))
}
if len(snap.global) != 1 {
t.Errorf("expected 1 global rule (valid one), got %d", len(snap.global))
}
}
func TestEffectiveFor_GlobalOnly(t *testing.T) {
rows := []store.LogScanRule{
{ID: 1, Pattern: `panic`, Enabled: true},
{ID: 2, Pattern: `fatal`, Enabled: true},
}
snap, _ := BuildSnapshot(rows)
out := snap.EffectiveFor("w1")
if len(out) != 2 {
t.Fatalf("expected 2 rules, got %d", len(out))
}
}
func TestEffectiveFor_WorkloadAddition(t *testing.T) {
rows := []store.LogScanRule{
{ID: 1, Pattern: `panic`, Enabled: true},
{ID: 2, Pattern: `slow_query`, WorkloadID: "w1", Enabled: true},
}
snap, _ := BuildSnapshot(rows)
out := snap.EffectiveFor("w1")
if len(out) != 2 {
t.Fatalf("workload w1 should see both: %d", len(out))
}
out2 := snap.EffectiveFor("w2")
if len(out2) != 1 {
t.Errorf("workload w2 should see only the global: %d", len(out2))
}
}
func TestEffectiveFor_OverrideReplacesGlobal(t *testing.T) {
rows := []store.LogScanRule{
{ID: 1, Pattern: `panic`, Severity: "warn", Enabled: true},
{
ID: 2, WorkloadID: "w1", OverridesID: 1,
Pattern: `panic`, Severity: "error", Enabled: true,
},
}
snap, _ := BuildSnapshot(rows)
out := snap.EffectiveFor("w1")
if len(out) != 1 {
t.Fatalf("expected 1 rule, got %d", len(out))
}
if out[0].Severity != "error" {
t.Errorf("override severity should win: %q", out[0].Severity)
}
// Other workloads still see the original.
out2 := snap.EffectiveFor("w2")
if len(out2) != 1 || out2[0].Severity != "warn" {
t.Errorf("w2 should see original severity, got %+v", out2)
}
}
func TestEffectiveFor_DisabledOverrideSuppresses(t *testing.T) {
rows := []store.LogScanRule{
{ID: 1, Pattern: `panic`, Enabled: true},
{
ID: 2, WorkloadID: "w1", OverridesID: 1,
Pattern: `panic`, Enabled: false,
},
}
snap, _ := BuildSnapshot(rows)
if len(snap.EffectiveFor("w1")) != 0 {
t.Errorf("disabled override should suppress global for w1")
}
if len(snap.EffectiveFor("w2")) != 1 {
t.Errorf("w2 should still see the global")
}
}
func TestEffectiveFor_DisabledGlobalSkipped(t *testing.T) {
rows := []store.LogScanRule{
{ID: 1, Pattern: `panic`, Enabled: false},
}
snap, _ := BuildSnapshot(rows)
if len(snap.EffectiveFor("w1")) != 0 {
t.Errorf("disabled global should not appear in effective set")
}
}
func TestEffectiveFor_NilSnapshot(t *testing.T) {
var snap *Snapshot
if out := snap.EffectiveFor("w1"); out != nil {
t.Errorf("nil snapshot should return nil, got %+v", out)
}
}
+88
View File
@@ -0,0 +1,88 @@
package logscanner
import (
"errors"
"strings"
"testing"
"github.com/alexei/tinyforge/internal/store"
)
// fakeRuleSource lets us inject rules into ReloadRules without
// standing up SQLite. Mirrors the fakeTriggerSource pattern from
// the events package.
type fakeRuleSource struct {
rows []store.LogScanRule
err error
}
func (f *fakeRuleSource) ListLogScanRules() ([]store.LogScanRule, error) {
if f.err != nil {
return nil, f.err
}
return f.rows, nil
}
func TestManagerStats_CapturesCompileErrors(t *testing.T) {
rs := &fakeRuleSource{rows: []store.LogScanRule{
{ID: 1, Name: "valid", Pattern: `panic`, Severity: "warn", Streams: "all", Enabled: true},
{ID: 2, Name: "broken", Pattern: `([unclosed`, Severity: "warn", Streams: "all", Enabled: true},
{ID: 3, Name: "also-broken", Pattern: `[`, Severity: "warn", Streams: "all", Enabled: true},
}}
m := NewManager(Config{Rules: rs})
if err := m.ReloadRules(); err != nil {
t.Fatalf("ReloadRules: %v", err)
}
stats := m.Stats()
if len(stats.LastCompileErrors) != 2 {
t.Fatalf("expected 2 compile errors, got %d: %+v", len(stats.LastCompileErrors), stats.LastCompileErrors)
}
// The error messages should mention the rule id/name from the
// BuildSnapshot format so operators can find which rule broke.
joined := strings.Join(stats.LastCompileErrors, "|")
if !strings.Contains(joined, "broken") {
t.Errorf("error messages should reference rule name 'broken': %s", joined)
}
}
func TestManagerStats_CompileErrorsReplacedOnReload(t *testing.T) {
// A broken rule then a reload with all-valid rules should
// clear the error list — operators expect the panel to flip
// from "2 errors" to "all clean" after they fix things.
rs := &fakeRuleSource{rows: []store.LogScanRule{
{ID: 1, Name: "broken", Pattern: `([`, Severity: "warn", Streams: "all", Enabled: true},
}}
m := NewManager(Config{Rules: rs})
_ = m.ReloadRules()
if len(m.Stats().LastCompileErrors) != 1 {
t.Fatal("expected one compile error before fix")
}
rs.rows = []store.LogScanRule{
{ID: 1, Name: "fixed", Pattern: `panic`, Severity: "warn", Streams: "all", Enabled: true},
}
_ = m.ReloadRules()
if len(m.Stats().LastCompileErrors) != 0 {
t.Errorf("expected zero compile errors after reload, got %d",
len(m.Stats().LastCompileErrors))
}
}
func TestManagerStats_ReloadErrorPropagates(t *testing.T) {
rs := &fakeRuleSource{err: errors.New("db down")}
m := NewManager(Config{Rules: rs})
if err := m.ReloadRules(); err == nil {
t.Fatal("expected ReloadRules to propagate the source error")
}
}
func TestManagerStats_ActiveTailsDefaultsZero(t *testing.T) {
// Without Start() and without a docker dependency we can't
// run real tails, but the counter should be a stable 0 read
// rather than panic/uninitialized.
rs := &fakeRuleSource{}
m := NewManager(Config{Rules: rs})
if got := m.Stats().ActiveTails; got != 0 {
t.Errorf("ActiveTails on fresh manager = %d, want 0", got)
}
}
+245
View File
@@ -0,0 +1,245 @@
package logscanner
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"log/slog"
"strings"
"sync/atomic"
"time"
"github.com/alexei/tinyforge/internal/docker"
)
// maxFramePayloadBytes caps a single docker log frame payload so a
// hostile or corrupted stream can't force an unbounded allocation.
// Real-world container log frames are well under this; the limit is
// purely a safety belt against the 4 GiB-by-spec upper bound the
// header byte width allows.
const maxFramePayloadBytes = 16 * 1024 * 1024 // 16 MiB
// maxLineBufferBytes caps the per-stream line-reassembly buffer for
// the same reason. A stream that never sends a newline would
// otherwise grow without bound. 1 MiB matches the bufio.Scanner
// default max and is far above any reasonable log line.
const maxLineBufferBytes = 1 * 1024 * 1024 // 1 MiB
// dockerLogger is the minimum surface the tail needs from the
// docker client. Defined as an interface so tests can stand up a
// canned log stream without spinning up containerd.
type dockerLogger interface {
ContainerLogsOpts(ctx context.Context, containerID string, opts docker.ContainerLogOptions) (io.ReadCloser, error)
}
// HitEmitter is what the tail calls when a rule fires. Implemented
// by the manager — it writes to event_log and publishes on the bus.
// Kept as a single-method interface so the tail has zero coupling to
// store/events.
type HitEmitter interface {
EmitHit(ctx context.Context, hit Hit)
}
// tail watches one container. Lifetime is bound to the supplied
// context — cancellation propagates through docker's log stream so
// goroutines exit promptly on container stop or manager shutdown.
type tail struct {
containerID string
workloadID string
docker dockerLogger
engine *Engine
emitter HitEmitter
snapshot *atomic.Pointer[Snapshot]
}
// run is the goroutine body. Opens a follow=true log stream and
// reads lines until the stream EOFs or context is cancelled.
// Tails terminate quietly on context cancel; any other error is
// logged at warn so the operator sees it without it stopping the
// whole manager.
func (t *tail) run(ctx context.Context) {
stream, err := t.docker.ContainerLogsOpts(ctx, t.containerID, docker.ContainerLogOptions{
Follow: true,
Tail: "0", // start from the newest line; backfill is out of scope
ShowStdout: true,
ShowStderr: true,
})
if err != nil {
if !errors.Is(err, context.Canceled) {
slog.Warn("logscanner: open log stream", "container", t.containerID, "error", err)
}
return
}
defer stream.Close()
// Demuxing: docker emits multiplexed frames when TTY is off,
// where each frame's first byte is the stream type (1=stdout,
// 2=stderr) and bytes 4..7 are big-endian length. When TTY is
// on the stream is plain bytes. We try to detect the frame
// header on the first read; if it parses as a valid frame we
// use the demux path, otherwise we fall back to line-by-line.
bufStream := bufio.NewReaderSize(stream, 32*1024)
if isMultiplexedStream(bufStream) {
t.readDemuxed(ctx, bufStream)
} else {
t.readPlain(ctx, bufStream)
}
}
// readDemuxed reads docker's multiplexed log frames. Each frame:
//
// [type 1 byte][000 3 bytes][len 4 bytes BE][payload len bytes]
//
// We track the stream type per-frame and feed payload bytes into a
// per-stream line buffer so a line split across frames still
// reassembles cleanly.
func (t *tail) readDemuxed(ctx context.Context, r *bufio.Reader) {
header := make([]byte, 8)
// Two line buffers — stdout vs stderr — so a partial line read
// across multiple frames doesn't bleed into the other stream.
buffers := map[string]*bytes.Buffer{
"stdout": {},
"stderr": {},
}
for {
if ctx.Err() != nil {
return
}
if _, err := io.ReadFull(r, header); err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
slog.Warn("logscanner: read header", "container", t.containerID, "error", err)
}
return
}
stream := "stdout"
if header[0] == 2 {
stream = "stderr"
}
size := int(uint32(header[4])<<24 | uint32(header[5])<<16 | uint32(header[6])<<8 | uint32(header[7]))
if size <= 0 {
continue
}
if size > maxFramePayloadBytes {
slog.Warn("logscanner: frame payload exceeds cap, dropping tail",
"container", t.containerID, "size", size, "cap", maxFramePayloadBytes)
return
}
payload := make([]byte, size)
if _, err := io.ReadFull(r, payload); err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
slog.Warn("logscanner: read payload", "container", t.containerID, "error", err)
}
return
}
buf := buffers[stream]
buf.Write(payload)
// Reassembly buffer should never accumulate beyond
// maxLineBufferBytes — a stream with no newline would
// otherwise grow unbounded. Drop the buffer (and the partial
// line) when the cap is reached so the tail stays healthy.
if buf.Len() > maxLineBufferBytes {
slog.Warn("logscanner: line buffer exceeded cap, resetting",
"container", t.containerID, "stream", stream, "size", buf.Len())
buf.Reset()
continue
}
for {
line, ok := readLineFromBuffer(buf)
if !ok {
break
}
t.processLine(ctx, stream, line)
}
}
}
// readPlain reads a non-multiplexed stream (TTY mode). All lines
// are reported as "stdout" since the API doesn't separate the
// streams in this mode.
func (t *tail) readPlain(ctx context.Context, r *bufio.Reader) {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for scanner.Scan() {
if ctx.Err() != nil {
return
}
t.processLine(ctx, "stdout", scanner.Text())
}
if err := scanner.Err(); err != nil && !errors.Is(err, context.Canceled) {
slog.Warn("logscanner: plain read", "container", t.containerID, "error", err)
}
}
func (t *tail) processLine(ctx context.Context, stream, line string) {
line = strings.TrimRight(line, "\r\n")
// Strip the leading RFC3339Nano timestamp docker prepends when
// Timestamps=true. We validate the prefix actually parses as a
// time rather than blindly stripping at the first space — a
// legitimate log line whose first word is 20+ chars (e.g. a long
// hash with no whitespace) would otherwise lose data.
if idx := strings.IndexByte(line, ' '); idx >= 20 && idx <= 40 {
candidate := line[:idx]
if _, err := time.Parse(time.RFC3339Nano, candidate); err == nil {
line = line[idx+1:]
}
}
snap := t.snapshot.Load()
if snap == nil {
return
}
rules := snap.EffectiveFor(t.workloadID)
hits := t.engine.Match(t.containerID, t.workloadID, stream, line, rules)
for _, h := range hits {
t.emitter.EmitHit(ctx, h)
}
}
// isMultiplexedStream peeks at the first 8 bytes to detect docker's
// multiplexed frame header. The frame type byte must be 0..2 and
// bytes 1..3 must be zero. We restore the bytes via UnreadByte
// equivalent (bufio.Peek), so the actual reader is unaffected.
func isMultiplexedStream(r *bufio.Reader) bool {
peeked, err := r.Peek(8)
if err != nil || len(peeked) < 8 {
return false
}
if peeked[0] > 2 || peeked[1] != 0 || peeked[2] != 0 || peeked[3] != 0 {
return false
}
return true
}
func readLineFromBuffer(b *bytes.Buffer) (string, bool) {
idx := bytes.IndexByte(b.Bytes(), '\n')
if idx < 0 {
return "", false
}
line := make([]byte, idx)
copy(line, b.Bytes()[:idx])
b.Next(idx + 1) // consume line + newline
return string(line), true
}
// validate sanity-checks the tail before launch. Returns an error
// the manager can log rather than panicking on a nil dependency.
func (t *tail) validate() error {
if t.containerID == "" {
return fmt.Errorf("tail: container_id required")
}
if t.docker == nil {
return fmt.Errorf("tail: docker client required")
}
if t.engine == nil {
return fmt.Errorf("tail: engine required")
}
if t.emitter == nil {
return fmt.Errorf("tail: emitter required")
}
if t.snapshot == nil {
return fmt.Errorf("tail: snapshot pointer required")
}
return nil
}
+156
View File
@@ -0,0 +1,156 @@
package logscanner
import (
"bufio"
"bytes"
"context"
"sync"
"sync/atomic"
"testing"
"github.com/alexei/tinyforge/internal/store"
)
// recordingEmitter captures hits from a tail without touching the
// real store or event bus. Concurrent-safe so we can let the tail
// goroutine push hits while the test asserts.
type recordingEmitter struct {
mu sync.Mutex
hits []Hit
}
func (r *recordingEmitter) EmitHit(_ context.Context, hit Hit) {
r.mu.Lock()
r.hits = append(r.hits, hit)
r.mu.Unlock()
}
func (r *recordingEmitter) Hits() []Hit {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]Hit, len(r.hits))
copy(out, r.hits)
return out
}
// snapshotForRule wraps a single rule into an atomic Snapshot
// pointer so tests can drive processLine without rebuilding from
// store rows.
func snapshotForRule(t *testing.T, pattern string) *atomic.Pointer[Snapshot] {
t.Helper()
rows := []store.LogScanRule{
{ID: 1, Name: "t", Pattern: pattern, Severity: "warn", Streams: "all", Enabled: true},
}
snap, errs := BuildSnapshot(rows)
if len(errs) != 0 {
t.Fatalf("BuildSnapshot: %v", errs)
}
p := &atomic.Pointer[Snapshot]{}
p.Store(snap)
return p
}
func TestProcessLine_StripsRFC3339Prefix(t *testing.T) {
emit := &recordingEmitter{}
snap := snapshotForRule(t, `panic`)
tl := &tail{
containerID: "c1",
workloadID: "w1",
engine: NewEngine(),
emitter: emit,
snapshot: snap,
}
tl.processLine(context.Background(), "stderr", "2026-05-11T12:34:56.123456789Z fatal panic in worker")
hits := emit.Hits()
if len(hits) != 1 {
t.Fatalf("want 1 hit, got %d", len(hits))
}
if hits[0].Line != "fatal panic in worker" {
t.Errorf("timestamp not stripped: %q", hits[0].Line)
}
}
func TestProcessLine_LeavesNonTimestampedLineAlone(t *testing.T) {
// The previous heuristic stripped the first word of any line
// whose first space landed past byte 20. A long-hash line with
// no embedded timestamp must now survive intact.
emit := &recordingEmitter{}
snap := snapshotForRule(t, `(?i)hash`)
tl := &tail{
containerID: "c1",
workloadID: "w1",
engine: NewEngine(),
emitter: emit,
snapshot: snap,
}
long := "aaaaaaaaaaaaaaaaaaaaaaaa hash payload"
tl.processLine(context.Background(), "stdout", long)
hits := emit.Hits()
if len(hits) != 1 {
t.Fatalf("want 1 hit, got %d", len(hits))
}
if hits[0].Line != long {
t.Errorf("non-timestamp prefix incorrectly stripped: %q", hits[0].Line)
}
}
func TestProcessLine_NoSnapshotIsSafe(t *testing.T) {
// Tail constructed before the manager loads its first snapshot
// (or after a transient nil) must not crash — processLine
// returns silently when snapshot.Load() is nil.
tl := &tail{
containerID: "c1",
workloadID: "w1",
engine: NewEngine(),
emitter: &recordingEmitter{},
snapshot: &atomic.Pointer[Snapshot]{}, // empty pointer
}
tl.processLine(context.Background(), "stdout", "anything")
}
func TestReadLineFromBuffer(t *testing.T) {
buf := &bytes.Buffer{}
buf.WriteString("line one\nline two\npartial")
got, ok := readLineFromBuffer(buf)
if !ok || got != "line one" {
t.Fatalf("first read: ok=%v got=%q", ok, got)
}
got, ok = readLineFromBuffer(buf)
if !ok || got != "line two" {
t.Fatalf("second read: ok=%v got=%q", ok, got)
}
// Trailing partial line stays in buffer until more data arrives.
if _, ok := readLineFromBuffer(buf); ok {
t.Errorf("partial line should NOT yield until newline arrives")
}
if buf.String() != "partial" {
t.Errorf("remainder=%q want %q", buf.String(), "partial")
}
}
func TestIsMultiplexedStream(t *testing.T) {
// Valid docker frame header: type=2 (stderr), 3 nulls, then 4-byte length.
demuxed := []byte{2, 0, 0, 0, 0, 0, 0, 12, 'h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd', '!'}
if !isMultiplexedStream(newReader(demuxed)) {
t.Error("valid demuxed frame should be detected")
}
// Plain text: first byte is a printable letter, header check fails.
plain := []byte("plain log line without framing\n")
if isMultiplexedStream(newReader(plain)) {
t.Error("plain text should not be detected as multiplexed")
}
// Header with type=3 is invalid (docker uses 0,1,2 only).
bad := []byte{3, 0, 0, 0, 0, 0, 0, 1}
if isMultiplexedStream(newReader(bad)) {
t.Error("type=3 header is not a valid docker frame")
}
}
// newReader returns a *bufio.Reader sized large enough to satisfy
// the Peek(8) the demuxer detection requires.
func newReader(b []byte) *bufio.Reader {
return bufio.NewReaderSize(bytes.NewReader(b), 32)
}