7a9ff7ad54
Two paired backends sharing the events.Bus seam:
Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
redaction on read (placeholder echo treated as "no change" on
PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
filters (severity CSV, source CSV, message regex with memoized
compile cache). Structural loop-prevention: never writes to
event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
TierEventTrigger constant, doSendRaw shared with the legacy
Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
sending the real TriggerWebhookPayload shape. SSRF guard
rejects loopback / link-local / unspecified targets. PATCH
uses pointer-typed DTO for partial updates.
Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
per-container token bucket, atomic drop counters), tail
(multiplexed docker frame demuxer with TTY fallback + 16 MiB
payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
timestamp strip + UTF-8-safe message truncation), manager
(5s container polling, atomic.Pointer[Snapshot] hot-reload,
HitEmitter writes event_log + publishes EventLog so the
trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
EffectiveLogScanRules resolver (globals minus per-workload
overrides plus workload-only additions). Transactional
cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
(sample_line → matched/captures) + /stats (drop counters +
active tail count + last-snapshot compile errors) +
GET /api/workloads/{id}/effective-rules.
cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
346 lines
9.6 KiB
Go
346 lines
9.6 KiB
Go
package logscanner
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"log/slog"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
"unicode/utf8"
|
|
|
|
"github.com/alexei/tinyforge/internal/docker"
|
|
"github.com/alexei/tinyforge/internal/events"
|
|
"github.com/alexei/tinyforge/internal/store"
|
|
)
|
|
|
|
// RuleSource is the read-side seam for fetching the current rule
|
|
// rows. Real callers pass *store.Store; tests pass a fake.
|
|
type RuleSource interface {
|
|
ListLogScanRules() ([]store.LogScanRule, error)
|
|
}
|
|
|
|
// ContainerLister is what the manager needs from the store to
|
|
// discover running container rows. The filter is set by the
|
|
// manager (state="running") so adapters don't have to do that.
|
|
type ContainerLister interface {
|
|
ListContainers(filter store.ContainerFilter) ([]store.Container, error)
|
|
}
|
|
|
|
// EventStore writes the matched-line entries into event_log.
|
|
// Same shape as events.PersistFunc but typed to keep the package
|
|
// self-contained.
|
|
type EventStore interface {
|
|
InsertEvent(evt store.EventLog) (store.EventLog, error)
|
|
}
|
|
|
|
// Manager owns the lifecycle of per-container tails. It polls the
|
|
// container index every PollInterval (default 5s), starts tails for
|
|
// new running containers, and stops tails when a container is no
|
|
// longer running (state != "running" or row deleted).
|
|
//
|
|
// Rule changes (CRUD via API) trigger ReloadRules which rebuilds the
|
|
// snapshot once and atomically swaps the pointer all tails read.
|
|
type Manager struct {
|
|
rules RuleSource
|
|
containers ContainerLister
|
|
docker dockerLogger
|
|
events EventStore
|
|
bus *events.Bus
|
|
pollInterval time.Duration
|
|
|
|
snapshot atomic.Pointer[Snapshot]
|
|
engine *Engine
|
|
|
|
mu sync.Mutex
|
|
tails map[string]context.CancelFunc // containerID -> cancel
|
|
tailWG sync.WaitGroup
|
|
|
|
// statsMu guards lastCompileErrors. Compile-error visibility
|
|
// matters because a broken rule silently disappears from the
|
|
// snapshot — without surfacing the message, the operator has
|
|
// no signal beyond a warn-level log line.
|
|
statsMu sync.RWMutex
|
|
lastCompileErrors []string
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// Stats bundles the operator-facing observability for the log
|
|
// scanner. EngineStats covers the hot-path drop counters; compile
|
|
// errors are the last snapshot's invalid-pattern messages (helpful
|
|
// when a freshly-saved rule "doesn't fire" — the issue is usually
|
|
// here). ActiveTails reports how many container goroutines the
|
|
// manager is currently driving.
|
|
type Stats struct {
|
|
Engine EngineStats `json:"engine"`
|
|
ActiveTails int `json:"active_tails"`
|
|
LastCompileErrors []string `json:"last_compile_errors"`
|
|
}
|
|
|
|
// Config bundles the constructor inputs.
|
|
type Config struct {
|
|
Rules RuleSource
|
|
Containers ContainerLister
|
|
Docker *docker.Client
|
|
Events EventStore
|
|
Bus *events.Bus
|
|
PollInterval time.Duration // default 5s
|
|
}
|
|
|
|
// NewManager wires a manager with the supplied dependencies.
|
|
// It does not start polling — call Start to begin the lifecycle.
|
|
func NewManager(cfg Config) *Manager {
|
|
poll := cfg.PollInterval
|
|
if poll <= 0 {
|
|
poll = 5 * time.Second
|
|
}
|
|
return &Manager{
|
|
rules: cfg.Rules,
|
|
containers: cfg.Containers,
|
|
docker: cfg.Docker,
|
|
events: cfg.Events,
|
|
bus: cfg.Bus,
|
|
pollInterval: poll,
|
|
engine: NewEngine(),
|
|
tails: map[string]context.CancelFunc{},
|
|
}
|
|
}
|
|
|
|
// Start kicks off the polling loop and initial snapshot build.
|
|
// Returns an error only if the initial rule load fails; subsequent
|
|
// load failures are logged but do not stop the manager.
|
|
func (m *Manager) Start(ctx context.Context) error {
|
|
if err := m.ReloadRules(); err != nil {
|
|
return err
|
|
}
|
|
m.ctx, m.cancel = context.WithCancel(ctx)
|
|
go m.loop()
|
|
return nil
|
|
}
|
|
|
|
// Stop cancels every tail and waits for them to exit.
|
|
func (m *Manager) Stop() {
|
|
if m.cancel != nil {
|
|
m.cancel()
|
|
}
|
|
m.tailWG.Wait()
|
|
}
|
|
|
|
// ReloadRules rebuilds the snapshot from the current store state.
|
|
// Safe to call concurrently with the polling loop and with tails —
|
|
// the atomic pointer swap is the only synchronization required.
|
|
//
|
|
// Compile errors are both logged and stored on the manager so the
|
|
// API stats endpoint can surface them to operators. The set is
|
|
// fully replaced on each reload — there's no "and previously
|
|
// these other rules were broken" trail.
|
|
func (m *Manager) ReloadRules() error {
|
|
rows, err := m.rules.ListLogScanRules()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
snap, compileErrs := BuildSnapshot(rows)
|
|
errMsgs := make([]string, 0, len(compileErrs))
|
|
for _, e := range compileErrs {
|
|
slog.Warn("logscanner: rule compile failed (dropped from snapshot)", "error", e)
|
|
errMsgs = append(errMsgs, e.Error())
|
|
}
|
|
m.snapshot.Store(snap)
|
|
m.statsMu.Lock()
|
|
m.lastCompileErrors = errMsgs
|
|
m.statsMu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// Stats returns the current operator-facing observability snapshot.
|
|
// Read-safe: the mutex protects only the compile-errors slice;
|
|
// counters and tail-count are atomic / mutex-guarded internally.
|
|
func (m *Manager) Stats() Stats {
|
|
m.statsMu.RLock()
|
|
errs := make([]string, len(m.lastCompileErrors))
|
|
copy(errs, m.lastCompileErrors)
|
|
m.statsMu.RUnlock()
|
|
|
|
m.mu.Lock()
|
|
tails := len(m.tails)
|
|
m.mu.Unlock()
|
|
|
|
return Stats{
|
|
Engine: m.engine.Stats(),
|
|
ActiveTails: tails,
|
|
LastCompileErrors: errs,
|
|
}
|
|
}
|
|
|
|
// EmitHit persists a hit as an event_log row and publishes it on
|
|
// the bus. Implements the HitEmitter interface so tails depend on
|
|
// the interface, not the concrete manager — also makes the manager
|
|
// easy to unit-test by passing a recording emitter.
|
|
func (m *Manager) EmitHit(ctx context.Context, hit Hit) {
|
|
const maxMessage = 500 // truncate long lines so one chatty rule can't blow up event_log
|
|
msg := truncateUTF8(hit.Line, maxMessage)
|
|
meta := map[string]any{
|
|
"workload_id": hit.WorkloadID,
|
|
"container_id": hit.ContainerID,
|
|
"rule_id": hit.Rule.ID,
|
|
"rule_name": hit.Rule.Name,
|
|
"stream": hit.Stream,
|
|
}
|
|
if subs := hit.Rule.Pattern.FindStringSubmatch(hit.Line); len(subs) > 1 {
|
|
captures := map[string]string{}
|
|
for i, sub := range subs[1:] {
|
|
captures[indexName(hit.Rule.Pattern, i+1)] = sub
|
|
}
|
|
meta["captures"] = captures
|
|
}
|
|
metaJSON, _ := json.Marshal(meta)
|
|
evt, err := m.events.InsertEvent(store.EventLog{
|
|
Source: "logscan",
|
|
Severity: nonEmpty(hit.Rule.Severity, store.LogScanSeverityWarn),
|
|
Message: msg,
|
|
Metadata: string(metaJSON),
|
|
})
|
|
if err != nil {
|
|
slog.Warn("logscanner: persist event", "rule", hit.Rule.Name, "error", err)
|
|
return
|
|
}
|
|
if m.bus != nil {
|
|
m.bus.Publish(events.Event{
|
|
Type: events.EventLog,
|
|
Payload: events.EventLogPayload{
|
|
ID: evt.ID,
|
|
Source: evt.Source,
|
|
Severity: evt.Severity,
|
|
Message: evt.Message,
|
|
Metadata: evt.Metadata,
|
|
CreatedAt: evt.CreatedAt,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
// loop runs the lifecycle poll until ctx cancellation. It is single-
|
|
// threaded over the tails map — start/stop of individual tails
|
|
// happens here so there's no race on the map.
|
|
func (m *Manager) loop() {
|
|
ticker := time.NewTicker(m.pollInterval)
|
|
defer ticker.Stop()
|
|
m.reconcile() // run once immediately
|
|
for {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
m.stopAllTails()
|
|
return
|
|
case <-ticker.C:
|
|
m.reconcile()
|
|
}
|
|
}
|
|
}
|
|
|
|
// reconcile diffs the desired set of running container IDs against
|
|
// the live tails and starts/stops as needed.
|
|
func (m *Manager) reconcile() {
|
|
rows, err := m.containers.ListContainers(store.ContainerFilter{State: "running"})
|
|
if err != nil {
|
|
slog.Warn("logscanner: list containers", "error", err)
|
|
return
|
|
}
|
|
desired := map[string]store.Container{}
|
|
for _, c := range rows {
|
|
if c.ContainerID == "" {
|
|
continue
|
|
}
|
|
desired[c.ContainerID] = c
|
|
}
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
for id, c := range desired {
|
|
if _, ok := m.tails[id]; ok {
|
|
continue
|
|
}
|
|
m.startTailLocked(id, c.WorkloadID)
|
|
}
|
|
for id, cancel := range m.tails {
|
|
if _, ok := desired[id]; !ok {
|
|
cancel()
|
|
delete(m.tails, id)
|
|
m.engine.Forget(id)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) startTailLocked(containerID, workloadID string) {
|
|
tailCtx, cancel := context.WithCancel(m.ctx)
|
|
t := &tail{
|
|
containerID: containerID,
|
|
workloadID: workloadID,
|
|
docker: m.docker,
|
|
engine: m.engine,
|
|
emitter: m,
|
|
snapshot: &m.snapshot,
|
|
}
|
|
if err := t.validate(); err != nil {
|
|
slog.Warn("logscanner: tail validation failed", "container", containerID, "error", err)
|
|
cancel()
|
|
return
|
|
}
|
|
m.tails[containerID] = cancel
|
|
m.tailWG.Add(1)
|
|
go func() {
|
|
defer m.tailWG.Done()
|
|
t.run(tailCtx)
|
|
}()
|
|
}
|
|
|
|
func (m *Manager) stopAllTails() {
|
|
m.mu.Lock()
|
|
for id, cancel := range m.tails {
|
|
cancel()
|
|
delete(m.tails, id)
|
|
}
|
|
m.mu.Unlock()
|
|
m.tailWG.Wait()
|
|
}
|
|
|
|
// indexName returns the name of capture group i (1-based). Falls
|
|
// back to "$<i>" when the group is unnamed so JSON keys stay stable
|
|
// AND distinct across groups (the previous $N collision-fallback
|
|
// silently dropped groups beyond $3 onto the same JSON key).
|
|
func indexName(re interface{ SubexpNames() []string }, i int) string {
|
|
names := re.SubexpNames()
|
|
if i < len(names) && names[i] != "" {
|
|
return names[i]
|
|
}
|
|
return "$" + strconv.Itoa(i)
|
|
}
|
|
|
|
func nonEmpty(a, b string) string {
|
|
if a != "" {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
// truncateUTF8 returns s shortened to at most maxBytes, cutting on
|
|
// a rune boundary so we never leave a partial UTF-8 codepoint in
|
|
// the output. An ellipsis is appended when truncation occurred so
|
|
// downstream readers can tell. Callers must size maxBytes to
|
|
// include the trailing 3-byte ellipsis.
|
|
func truncateUTF8(s string, maxBytes int) string {
|
|
if len(s) <= maxBytes {
|
|
return s
|
|
}
|
|
end := maxBytes
|
|
// Walk back from the byte cut to the last rune boundary so
|
|
// utf8.ValidString stays true on the returned slice.
|
|
for end > 0 && !utf8.RuneStart(s[end]) {
|
|
end--
|
|
}
|
|
return s[:end] + "…"
|
|
}
|