Files
alexei.dolgolyov 7a9ff7ad54 feat(observability): event triggers + log scanner backend
Two paired backends sharing the events.Bus seam:

Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
  redaction on read (placeholder echo treated as "no change" on
  PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
  filters (severity CSV, source CSV, message regex with memoized
  compile cache). Structural loop-prevention: never writes to
  event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
  TierEventTrigger constant, doSendRaw shared with the legacy
  Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
  sending the real TriggerWebhookPayload shape. SSRF guard
  rejects loopback / link-local / unspecified targets. PATCH
  uses pointer-typed DTO for partial updates.

Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
  per-container token bucket, atomic drop counters), tail
  (multiplexed docker frame demuxer with TTY fallback + 16 MiB
  payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
  timestamp strip + UTF-8-safe message truncation), manager
  (5s container polling, atomic.Pointer[Snapshot] hot-reload,
  HitEmitter writes event_log + publishes EventLog so the
  trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
  stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
  EffectiveLogScanRules resolver (globals minus per-workload
  overrides plus workload-only additions). Transactional
  cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
  (sample_line → matched/captures) + /stats (drop counters +
  active tail count + last-snapshot compile errors) +
  GET /api/workloads/{id}/effective-rules.

cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 22:18:11 +03:00

346 lines
9.6 KiB
Go

package logscanner
import (
"context"
"encoding/json"
"log/slog"
"strconv"
"sync"
"sync/atomic"
"time"
"unicode/utf8"
"github.com/alexei/tinyforge/internal/docker"
"github.com/alexei/tinyforge/internal/events"
"github.com/alexei/tinyforge/internal/store"
)
// RuleSource is the read-side seam for fetching the current rule
// rows. Real callers pass *store.Store; tests pass a fake.
type RuleSource interface {
ListLogScanRules() ([]store.LogScanRule, error)
}
// ContainerLister is what the manager needs from the store to
// discover running container rows. The filter is set by the
// manager (state="running") so adapters don't have to do that.
type ContainerLister interface {
ListContainers(filter store.ContainerFilter) ([]store.Container, error)
}
// EventStore writes the matched-line entries into event_log.
// Same shape as events.PersistFunc but typed to keep the package
// self-contained.
type EventStore interface {
InsertEvent(evt store.EventLog) (store.EventLog, error)
}
// Manager owns the lifecycle of per-container tails. It polls the
// container index every PollInterval (default 5s), starts tails for
// new running containers, and stops tails when a container is no
// longer running (state != "running" or row deleted).
//
// Rule changes (CRUD via API) trigger ReloadRules which rebuilds the
// snapshot once and atomically swaps the pointer all tails read.
type Manager struct {
rules RuleSource
containers ContainerLister
docker dockerLogger
events EventStore
bus *events.Bus
pollInterval time.Duration
snapshot atomic.Pointer[Snapshot]
engine *Engine
mu sync.Mutex
tails map[string]context.CancelFunc // containerID -> cancel
tailWG sync.WaitGroup
// statsMu guards lastCompileErrors. Compile-error visibility
// matters because a broken rule silently disappears from the
// snapshot — without surfacing the message, the operator has
// no signal beyond a warn-level log line.
statsMu sync.RWMutex
lastCompileErrors []string
ctx context.Context
cancel context.CancelFunc
}
// Stats bundles the operator-facing observability for the log
// scanner. EngineStats covers the hot-path drop counters; compile
// errors are the last snapshot's invalid-pattern messages (helpful
// when a freshly-saved rule "doesn't fire" — the issue is usually
// here). ActiveTails reports how many container goroutines the
// manager is currently driving.
type Stats struct {
Engine EngineStats `json:"engine"`
ActiveTails int `json:"active_tails"`
LastCompileErrors []string `json:"last_compile_errors"`
}
// Config bundles the constructor inputs.
type Config struct {
Rules RuleSource
Containers ContainerLister
Docker *docker.Client
Events EventStore
Bus *events.Bus
PollInterval time.Duration // default 5s
}
// NewManager wires a manager with the supplied dependencies.
// It does not start polling — call Start to begin the lifecycle.
func NewManager(cfg Config) *Manager {
poll := cfg.PollInterval
if poll <= 0 {
poll = 5 * time.Second
}
return &Manager{
rules: cfg.Rules,
containers: cfg.Containers,
docker: cfg.Docker,
events: cfg.Events,
bus: cfg.Bus,
pollInterval: poll,
engine: NewEngine(),
tails: map[string]context.CancelFunc{},
}
}
// Start kicks off the polling loop and initial snapshot build.
// Returns an error only if the initial rule load fails; subsequent
// load failures are logged but do not stop the manager.
func (m *Manager) Start(ctx context.Context) error {
if err := m.ReloadRules(); err != nil {
return err
}
m.ctx, m.cancel = context.WithCancel(ctx)
go m.loop()
return nil
}
// Stop cancels every tail and waits for them to exit.
func (m *Manager) Stop() {
if m.cancel != nil {
m.cancel()
}
m.tailWG.Wait()
}
// ReloadRules rebuilds the snapshot from the current store state.
// Safe to call concurrently with the polling loop and with tails —
// the atomic pointer swap is the only synchronization required.
//
// Compile errors are both logged and stored on the manager so the
// API stats endpoint can surface them to operators. The set is
// fully replaced on each reload — there's no "and previously
// these other rules were broken" trail.
func (m *Manager) ReloadRules() error {
rows, err := m.rules.ListLogScanRules()
if err != nil {
return err
}
snap, compileErrs := BuildSnapshot(rows)
errMsgs := make([]string, 0, len(compileErrs))
for _, e := range compileErrs {
slog.Warn("logscanner: rule compile failed (dropped from snapshot)", "error", e)
errMsgs = append(errMsgs, e.Error())
}
m.snapshot.Store(snap)
m.statsMu.Lock()
m.lastCompileErrors = errMsgs
m.statsMu.Unlock()
return nil
}
// Stats returns the current operator-facing observability snapshot.
// Read-safe: the mutex protects only the compile-errors slice;
// counters and tail-count are atomic / mutex-guarded internally.
func (m *Manager) Stats() Stats {
m.statsMu.RLock()
errs := make([]string, len(m.lastCompileErrors))
copy(errs, m.lastCompileErrors)
m.statsMu.RUnlock()
m.mu.Lock()
tails := len(m.tails)
m.mu.Unlock()
return Stats{
Engine: m.engine.Stats(),
ActiveTails: tails,
LastCompileErrors: errs,
}
}
// EmitHit persists a hit as an event_log row and publishes it on
// the bus. Implements the HitEmitter interface so tails depend on
// the interface, not the concrete manager — also makes the manager
// easy to unit-test by passing a recording emitter.
func (m *Manager) EmitHit(ctx context.Context, hit Hit) {
const maxMessage = 500 // truncate long lines so one chatty rule can't blow up event_log
msg := truncateUTF8(hit.Line, maxMessage)
meta := map[string]any{
"workload_id": hit.WorkloadID,
"container_id": hit.ContainerID,
"rule_id": hit.Rule.ID,
"rule_name": hit.Rule.Name,
"stream": hit.Stream,
}
if subs := hit.Rule.Pattern.FindStringSubmatch(hit.Line); len(subs) > 1 {
captures := map[string]string{}
for i, sub := range subs[1:] {
captures[indexName(hit.Rule.Pattern, i+1)] = sub
}
meta["captures"] = captures
}
metaJSON, _ := json.Marshal(meta)
evt, err := m.events.InsertEvent(store.EventLog{
Source: "logscan",
Severity: nonEmpty(hit.Rule.Severity, store.LogScanSeverityWarn),
Message: msg,
Metadata: string(metaJSON),
})
if err != nil {
slog.Warn("logscanner: persist event", "rule", hit.Rule.Name, "error", err)
return
}
if m.bus != nil {
m.bus.Publish(events.Event{
Type: events.EventLog,
Payload: events.EventLogPayload{
ID: evt.ID,
Source: evt.Source,
Severity: evt.Severity,
Message: evt.Message,
Metadata: evt.Metadata,
CreatedAt: evt.CreatedAt,
},
})
}
}
// loop runs the lifecycle poll until ctx cancellation. It is single-
// threaded over the tails map — start/stop of individual tails
// happens here so there's no race on the map.
func (m *Manager) loop() {
ticker := time.NewTicker(m.pollInterval)
defer ticker.Stop()
m.reconcile() // run once immediately
for {
select {
case <-m.ctx.Done():
m.stopAllTails()
return
case <-ticker.C:
m.reconcile()
}
}
}
// reconcile diffs the desired set of running container IDs against
// the live tails and starts/stops as needed.
func (m *Manager) reconcile() {
rows, err := m.containers.ListContainers(store.ContainerFilter{State: "running"})
if err != nil {
slog.Warn("logscanner: list containers", "error", err)
return
}
desired := map[string]store.Container{}
for _, c := range rows {
if c.ContainerID == "" {
continue
}
desired[c.ContainerID] = c
}
m.mu.Lock()
defer m.mu.Unlock()
for id, c := range desired {
if _, ok := m.tails[id]; ok {
continue
}
m.startTailLocked(id, c.WorkloadID)
}
for id, cancel := range m.tails {
if _, ok := desired[id]; !ok {
cancel()
delete(m.tails, id)
m.engine.Forget(id)
}
}
}
func (m *Manager) startTailLocked(containerID, workloadID string) {
tailCtx, cancel := context.WithCancel(m.ctx)
t := &tail{
containerID: containerID,
workloadID: workloadID,
docker: m.docker,
engine: m.engine,
emitter: m,
snapshot: &m.snapshot,
}
if err := t.validate(); err != nil {
slog.Warn("logscanner: tail validation failed", "container", containerID, "error", err)
cancel()
return
}
m.tails[containerID] = cancel
m.tailWG.Add(1)
go func() {
defer m.tailWG.Done()
t.run(tailCtx)
}()
}
func (m *Manager) stopAllTails() {
m.mu.Lock()
for id, cancel := range m.tails {
cancel()
delete(m.tails, id)
}
m.mu.Unlock()
m.tailWG.Wait()
}
// indexName returns the name of capture group i (1-based). Falls
// back to "$<i>" when the group is unnamed so JSON keys stay stable
// AND distinct across groups (the previous $N collision-fallback
// silently dropped groups beyond $3 onto the same JSON key).
func indexName(re interface{ SubexpNames() []string }, i int) string {
names := re.SubexpNames()
if i < len(names) && names[i] != "" {
return names[i]
}
return "$" + strconv.Itoa(i)
}
func nonEmpty(a, b string) string {
if a != "" {
return a
}
return b
}
// truncateUTF8 returns s shortened to at most maxBytes, cutting on
// a rune boundary so we never leave a partial UTF-8 codepoint in
// the output. An ellipsis is appended when truncation occurred so
// downstream readers can tell. Callers must size maxBytes to
// include the trailing 3-byte ellipsis.
func truncateUTF8(s string, maxBytes int) string {
if len(s) <= maxBytes {
return s
}
end := maxBytes
// Walk back from the byte cut to the last rune boundary so
// utf8.ValidString stays true on the returned slice.
for end > 0 && !utf8.RuneStart(s[end]) {
end--
}
return s[:end] + "…"
}