package logscanner import ( "context" "encoding/json" "log/slog" "strconv" "sync" "sync/atomic" "time" "unicode/utf8" "github.com/alexei/tinyforge/internal/docker" "github.com/alexei/tinyforge/internal/events" "github.com/alexei/tinyforge/internal/store" ) // RuleSource is the read-side seam for fetching the current rule // rows. Real callers pass *store.Store; tests pass a fake. type RuleSource interface { ListLogScanRules() ([]store.LogScanRule, error) } // ContainerLister is what the manager needs from the store to // discover running container rows. The filter is set by the // manager (state="running") so adapters don't have to do that. type ContainerLister interface { ListContainers(filter store.ContainerFilter) ([]store.Container, error) } // EventStore writes the matched-line entries into event_log. // Same shape as events.PersistFunc but typed to keep the package // self-contained. type EventStore interface { InsertEvent(evt store.EventLog) (store.EventLog, error) } // Manager owns the lifecycle of per-container tails. It polls the // container index every PollInterval (default 5s), starts tails for // new running containers, and stops tails when a container is no // longer running (state != "running" or row deleted). // // Rule changes (CRUD via API) trigger ReloadRules which rebuilds the // snapshot once and atomically swaps the pointer all tails read. type Manager struct { rules RuleSource containers ContainerLister docker dockerLogger events EventStore bus *events.Bus pollInterval time.Duration snapshot atomic.Pointer[Snapshot] engine *Engine mu sync.Mutex tails map[string]context.CancelFunc // containerID -> cancel tailWG sync.WaitGroup // statsMu guards lastCompileErrors. Compile-error visibility // matters because a broken rule silently disappears from the // snapshot — without surfacing the message, the operator has // no signal beyond a warn-level log line. statsMu sync.RWMutex lastCompileErrors []string ctx context.Context cancel context.CancelFunc } // Stats bundles the operator-facing observability for the log // scanner. EngineStats covers the hot-path drop counters; compile // errors are the last snapshot's invalid-pattern messages (helpful // when a freshly-saved rule "doesn't fire" — the issue is usually // here). ActiveTails reports how many container goroutines the // manager is currently driving. type Stats struct { Engine EngineStats `json:"engine"` ActiveTails int `json:"active_tails"` LastCompileErrors []string `json:"last_compile_errors"` } // Config bundles the constructor inputs. type Config struct { Rules RuleSource Containers ContainerLister Docker *docker.Client Events EventStore Bus *events.Bus PollInterval time.Duration // default 5s } // NewManager wires a manager with the supplied dependencies. // It does not start polling — call Start to begin the lifecycle. func NewManager(cfg Config) *Manager { poll := cfg.PollInterval if poll <= 0 { poll = 5 * time.Second } return &Manager{ rules: cfg.Rules, containers: cfg.Containers, docker: cfg.Docker, events: cfg.Events, bus: cfg.Bus, pollInterval: poll, engine: NewEngine(), tails: map[string]context.CancelFunc{}, } } // Start kicks off the polling loop and initial snapshot build. // Returns an error only if the initial rule load fails; subsequent // load failures are logged but do not stop the manager. func (m *Manager) Start(ctx context.Context) error { if err := m.ReloadRules(); err != nil { return err } m.ctx, m.cancel = context.WithCancel(ctx) go m.loop() return nil } // Stop cancels every tail and waits for them to exit. func (m *Manager) Stop() { if m.cancel != nil { m.cancel() } m.tailWG.Wait() } // ReloadRules rebuilds the snapshot from the current store state. // Safe to call concurrently with the polling loop and with tails — // the atomic pointer swap is the only synchronization required. // // Compile errors are both logged and stored on the manager so the // API stats endpoint can surface them to operators. The set is // fully replaced on each reload — there's no "and previously // these other rules were broken" trail. func (m *Manager) ReloadRules() error { rows, err := m.rules.ListLogScanRules() if err != nil { return err } snap, compileErrs := BuildSnapshot(rows) errMsgs := make([]string, 0, len(compileErrs)) for _, e := range compileErrs { slog.Warn("logscanner: rule compile failed (dropped from snapshot)", "error", e) errMsgs = append(errMsgs, e.Error()) } m.snapshot.Store(snap) m.statsMu.Lock() m.lastCompileErrors = errMsgs m.statsMu.Unlock() return nil } // Stats returns the current operator-facing observability snapshot. // Read-safe: the mutex protects only the compile-errors slice; // counters and tail-count are atomic / mutex-guarded internally. func (m *Manager) Stats() Stats { m.statsMu.RLock() errs := make([]string, len(m.lastCompileErrors)) copy(errs, m.lastCompileErrors) m.statsMu.RUnlock() m.mu.Lock() tails := len(m.tails) m.mu.Unlock() return Stats{ Engine: m.engine.Stats(), ActiveTails: tails, LastCompileErrors: errs, } } // EmitHit persists a hit as an event_log row and publishes it on // the bus. Implements the HitEmitter interface so tails depend on // the interface, not the concrete manager — also makes the manager // easy to unit-test by passing a recording emitter. func (m *Manager) EmitHit(ctx context.Context, hit Hit) { const maxMessage = 500 // truncate long lines so one chatty rule can't blow up event_log msg := truncateUTF8(hit.Line, maxMessage) meta := map[string]any{ "workload_id": hit.WorkloadID, "container_id": hit.ContainerID, "rule_id": hit.Rule.ID, "rule_name": hit.Rule.Name, "stream": hit.Stream, } if subs := hit.Rule.Pattern.FindStringSubmatch(hit.Line); len(subs) > 1 { captures := map[string]string{} for i, sub := range subs[1:] { captures[indexName(hit.Rule.Pattern, i+1)] = sub } meta["captures"] = captures } metaJSON, _ := json.Marshal(meta) evt, err := m.events.InsertEvent(store.EventLog{ Source: "logscan", Severity: nonEmpty(hit.Rule.Severity, store.LogScanSeverityWarn), Message: msg, Metadata: string(metaJSON), }) if err != nil { slog.Warn("logscanner: persist event", "rule", hit.Rule.Name, "error", err) return } if m.bus != nil { m.bus.Publish(events.Event{ Type: events.EventLog, Payload: events.EventLogPayload{ ID: evt.ID, Source: evt.Source, Severity: evt.Severity, Message: evt.Message, Metadata: evt.Metadata, CreatedAt: evt.CreatedAt, }, }) } } // loop runs the lifecycle poll until ctx cancellation. It is single- // threaded over the tails map — start/stop of individual tails // happens here so there's no race on the map. func (m *Manager) loop() { ticker := time.NewTicker(m.pollInterval) defer ticker.Stop() m.reconcile() // run once immediately for { select { case <-m.ctx.Done(): m.stopAllTails() return case <-ticker.C: m.reconcile() } } } // reconcile diffs the desired set of running container IDs against // the live tails and starts/stops as needed. func (m *Manager) reconcile() { rows, err := m.containers.ListContainers(store.ContainerFilter{State: "running"}) if err != nil { slog.Warn("logscanner: list containers", "error", err) return } desired := map[string]store.Container{} for _, c := range rows { if c.ContainerID == "" { continue } desired[c.ContainerID] = c } m.mu.Lock() defer m.mu.Unlock() for id, c := range desired { if _, ok := m.tails[id]; ok { continue } m.startTailLocked(id, c.WorkloadID) } for id, cancel := range m.tails { if _, ok := desired[id]; !ok { cancel() delete(m.tails, id) m.engine.Forget(id) } } } func (m *Manager) startTailLocked(containerID, workloadID string) { tailCtx, cancel := context.WithCancel(m.ctx) t := &tail{ containerID: containerID, workloadID: workloadID, docker: m.docker, engine: m.engine, emitter: m, snapshot: &m.snapshot, } if err := t.validate(); err != nil { slog.Warn("logscanner: tail validation failed", "container", containerID, "error", err) cancel() return } m.tails[containerID] = cancel m.tailWG.Add(1) go func() { defer m.tailWG.Done() t.run(tailCtx) }() } func (m *Manager) stopAllTails() { m.mu.Lock() for id, cancel := range m.tails { cancel() delete(m.tails, id) } m.mu.Unlock() m.tailWG.Wait() } // indexName returns the name of capture group i (1-based). Falls // back to "$" when the group is unnamed so JSON keys stay stable // AND distinct across groups (the previous $N collision-fallback // silently dropped groups beyond $3 onto the same JSON key). func indexName(re interface{ SubexpNames() []string }, i int) string { names := re.SubexpNames() if i < len(names) && names[i] != "" { return names[i] } return "$" + strconv.Itoa(i) } func nonEmpty(a, b string) string { if a != "" { return a } return b } // truncateUTF8 returns s shortened to at most maxBytes, cutting on // a rune boundary so we never leave a partial UTF-8 codepoint in // the output. An ellipsis is appended when truncation occurred so // downstream readers can tell. Callers must size maxBytes to // include the trailing 3-byte ellipsis. func truncateUTF8(s string, maxBytes int) string { if len(s) <= maxBytes { return s } end := maxBytes // Walk back from the byte cut to the last rune boundary so // utf8.ValidString stays true on the returned slice. for end > 0 && !utf8.RuneStart(s[end]) { end-- } return s[:end] + "…" }