Files
tiny-forge/internal/logscanner/rules.go
T
alexei.dolgolyov 7a9ff7ad54 feat(observability): event triggers + log scanner backend
Two paired backends sharing the events.Bus seam:

Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
  redaction on read (placeholder echo treated as "no change" on
  PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
  filters (severity CSV, source CSV, message regex with memoized
  compile cache). Structural loop-prevention: never writes to
  event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
  TierEventTrigger constant, doSendRaw shared with the legacy
  Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
  sending the real TriggerWebhookPayload shape. SSRF guard
  rejects loopback / link-local / unspecified targets. PATCH
  uses pointer-typed DTO for partial updates.

Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
  per-container token bucket, atomic drop counters), tail
  (multiplexed docker frame demuxer with TTY fallback + 16 MiB
  payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
  timestamp strip + UTF-8-safe message truncation), manager
  (5s container polling, atomic.Pointer[Snapshot] hot-reload,
  HitEmitter writes event_log + publishes EventLog so the
  trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
  stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
  EffectiveLogScanRules resolver (globals minus per-workload
  overrides plus workload-only additions). Transactional
  cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
  (sample_line → matched/captures) + /stats (drop counters +
  active tail count + last-snapshot compile errors) +
  GET /api/workloads/{id}/effective-rules.

cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 22:18:11 +03:00

144 lines
5.0 KiB
Go

// Package logscanner tails running container logs, matches lines
// against operator-configured regex rules, and emits event_log entries
// via the events bus. The package is split into four files:
//
// - rules.go: the rule snapshot — compiled regexes + per-rule
// metadata. Snapshots are immutable; the manager builds a new
// snapshot on every rule change and swaps it atomically.
// - engine.go: rule evaluation per line + cooldown + token bucket.
// - tail.go: per-container goroutine reading docker log stream.
// - manager.go: container lifecycle polling + tail lifecycle.
package logscanner
import (
"fmt"
"regexp"
"github.com/alexei/tinyforge/internal/store"
)
// Rule is a compiled, immutable representation of a store.LogScanRule.
// Built once when the snapshot is loaded; held by every tail goroutine
// via the snapshot atomic pointer.
type Rule struct {
ID int64
WorkloadID string // "" = global
Name string
Pattern *regexp.Regexp
Severity string
Streams string // "all" | "stdout" | "stderr"
CooldownSeconds int
}
// Snapshot is the rule set as seen by tails. Built from a flat slice
// of store.LogScanRule rows: globals are expanded via
// store.EffectiveLogScanRules to yield the per-workload effective
// set at lookup time.
//
// The snapshot is immutable. Hot-reload semantics are achieved by
// building a fresh snapshot in the manager and swapping the atomic
// pointer; in-flight tails finish their current match against the
// old snapshot, then pick up the new one on the next line.
//
// MUST NOT mutate any field (including map values, including slice
// elements) after BuildSnapshot returns — tails read these
// concurrently with no locking and rely on immutability for safety.
type Snapshot struct {
// global is the list of compiled global rules (workload_id == "").
global []Rule
// perWorkload[workloadID] holds workload-only additions AND
// per-workload overrides (already resolved against globals).
// EffectiveFor(id) merges global + perWorkload[id] minus any
// global overridden under that workload.
perWorkload map[string][]Rule
// overrides[id] is the workload's overriding rule (compiled).
// overrides[id][globalID] -> Rule (the override row).
overrides map[string]map[int64]Rule
}
// BuildSnapshot compiles every rule's pattern and returns an
// immutable Snapshot. Rules that fail to compile are skipped with
// their error reported to the caller — store-side validation keeps
// the pattern field as a free-form regex string, so engine-time
// compile failures are an expected, recoverable mode.
//
// The returned error slice is informational, not fatal: bad rules
// are dropped from the snapshot but the rest still work.
func BuildSnapshot(rows []store.LogScanRule) (*Snapshot, []error) {
s := &Snapshot{
perWorkload: map[string][]Rule{},
overrides: map[string]map[int64]Rule{},
}
var errs []error
for _, row := range rows {
if !row.Enabled {
// Skip outright. Overrides with enabled=false still
// need to be recorded so they suppress a global —
// handled by tracking disable separately below.
}
re, err := regexp.Compile(row.Pattern)
if err != nil {
errs = append(errs, fmt.Errorf("rule #%d %q: %w", row.ID, row.Name, err))
continue
}
r := Rule{
ID: row.ID,
WorkloadID: row.WorkloadID,
Name: row.Name,
Pattern: re,
Severity: row.Severity,
Streams: row.Streams,
CooldownSeconds: row.CooldownSeconds,
}
switch {
case row.WorkloadID == "" && row.OverridesID == 0:
if row.Enabled {
s.global = append(s.global, r)
}
case row.WorkloadID != "" && row.OverridesID == 0:
if row.Enabled {
s.perWorkload[row.WorkloadID] = append(s.perWorkload[row.WorkloadID], r)
}
case row.WorkloadID != "" && row.OverridesID != 0:
// Override row: always record so a disabled override
// suppresses the global for this workload.
if _, ok := s.overrides[row.WorkloadID]; !ok {
s.overrides[row.WorkloadID] = map[int64]Rule{}
}
if row.Enabled {
s.overrides[row.WorkloadID][row.OverridesID] = r
} else {
// Encode "disabled override" as the zero rule so
// EffectiveFor can drop it without re-querying.
s.overrides[row.WorkloadID][row.OverridesID] = Rule{ID: row.OverridesID}
}
}
}
return s, errs
}
// EffectiveFor returns the rule list to evaluate against logs of a
// specific workload. Equivalent to store.EffectiveLogScanRules but
// operates on the compiled snapshot, so the hot path is allocation
// free except for the result slice.
func (s *Snapshot) EffectiveFor(workloadID string) []Rule {
if s == nil {
return nil
}
overrides := s.overrides[workloadID]
out := make([]Rule, 0, len(s.global)+len(s.perWorkload[workloadID]))
for _, g := range s.global {
if ov, ok := overrides[g.ID]; ok {
if ov.Pattern == nil {
// Suppressed for this workload — skip.
continue
}
out = append(out, ov)
continue
}
out = append(out, g)
}
out = append(out, s.perWorkload[workloadID]...)
return out
}