Files
tiny-forge/internal/logscanner/engine.go
T
alexei.dolgolyov 7a9ff7ad54 feat(observability): event triggers + log scanner backend
Two paired backends sharing the events.Bus seam:

Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
  redaction on read (placeholder echo treated as "no change" on
  PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
  filters (severity CSV, source CSV, message regex with memoized
  compile cache). Structural loop-prevention: never writes to
  event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
  TierEventTrigger constant, doSendRaw shared with the legacy
  Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
  sending the real TriggerWebhookPayload shape. SSRF guard
  rejects loopback / link-local / unspecified targets. PATCH
  uses pointer-typed DTO for partial updates.

Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
  per-container token bucket, atomic drop counters), tail
  (multiplexed docker frame demuxer with TTY fallback + 16 MiB
  payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
  timestamp strip + UTF-8-safe message truncation), manager
  (5s container polling, atomic.Pointer[Snapshot] hot-reload,
  HitEmitter writes event_log + publishes EventLog so the
  trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
  stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
  EffectiveLogScanRules resolver (globals minus per-workload
  overrides plus workload-only additions). Transactional
  cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
  (sample_line → matched/captures) + /stats (drop counters +
  active tail count + last-snapshot compile errors) +
  GET /api/workloads/{id}/effective-rules.

cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 22:18:11 +03:00

228 lines
6.6 KiB
Go

package logscanner
import (
"sync"
"sync/atomic"
"time"
)
// Engine evaluates lines against a snapshot, gates by per-rule
// cooldown, and rate-limits emissions per container so a noisy regex
// can't flood the event_log table.
type Engine struct {
// cooldownMu guards lastFiredAt. The map is keyed on a
// composite (containerID, ruleID) so the same rule firing on two
// different containers gets independent cooldowns — matches the
// operator intuition that "this container is alerting" doesn't
// suppress an alert from a different container.
cooldownMu sync.Mutex
lastFiredAt map[cooldownKey]time.Time
// bucketMu guards tokenBuckets. One bucket per container.
bucketMu sync.Mutex
tokenBuckets map[string]*bucket
// Drop counters. Incremented when a matching hit is suppressed
// before reaching the bus. droppedByBucket covers per-container
// rate-limit drops; droppedByCooldown counts cooldown-suppressed
// matches so operators can tell whether their patterns are too
// greedy vs too tightly cooled. Atomic so the hot path doesn't
// take the lock just to bump the counter.
droppedByBucket atomic.Int64
droppedByCooldown atomic.Int64
// Configuration knobs. Defaults set in NewEngine.
tokensPerWindow int // bucket capacity
tokenWindow time.Duration // bucket refill window
now func() time.Time
}
// EngineStats is the public-facing counter snapshot. Returned by
// Stats() and surfaced through the manager + API for operator
// visibility — when a noisy regex floods the bucket, this is how
// they discover it.
type EngineStats struct {
DroppedByBucket int64 `json:"dropped_by_bucket"`
DroppedByCooldown int64 `json:"dropped_by_cooldown"`
}
type cooldownKey struct {
ContainerID string
RuleID int64
}
type bucket struct {
tokens int
resetsAt time.Time
}
// NewEngine constructs an Engine with sensible defaults: 10
// events / 60s per container. Both knobs can be overridden via
// the With* options.
func NewEngine(opts ...Option) *Engine {
e := &Engine{
lastFiredAt: map[cooldownKey]time.Time{},
tokenBuckets: map[string]*bucket{},
tokensPerWindow: 10,
tokenWindow: 60 * time.Second,
now: time.Now,
}
for _, opt := range opts {
opt(e)
}
return e
}
// Option mutates an Engine during construction.
type Option func(*Engine)
// WithBucket sets the per-container token bucket capacity and
// refill window. Used by tests to make rate-limit assertions
// deterministic.
func WithBucket(tokens int, window time.Duration) Option {
return func(e *Engine) {
e.tokensPerWindow = tokens
e.tokenWindow = window
}
}
// WithNow overrides the clock for tests.
func WithNow(now func() time.Time) Option {
return func(e *Engine) {
e.now = now
}
}
// Match is the hot-path evaluation. For every rule whose Streams
// covers `stream` and whose pattern matches `line`, returns one
// Hit — gated by cooldown and per-container token bucket. Empty
// result means "drop this line."
//
// Side-effect: cooldown timestamps + bucket counters are updated
// when a rule fires. Callers must not retry the same line.
func (e *Engine) Match(containerID, workloadID, stream, line string, rules []Rule) []Hit {
if line == "" || len(rules) == 0 {
return nil
}
var hits []Hit
now := e.now()
for _, r := range rules {
if !streamMatches(r.Streams, stream) {
continue
}
if !r.Pattern.MatchString(line) {
continue
}
if !e.cooledDown(containerID, r, now) {
// Matched but inside the cooldown window — bump the
// counter so operators can see when their cooldowns
// are eating real signal.
e.droppedByCooldown.Add(1)
continue
}
if !e.takeToken(containerID, now) {
// Bucket exhausted for this container. Bump the
// counter so the stats endpoint can surface chatty
// patterns; the operator's signal is "Stats says we
// dropped N events — your rule is too broad."
e.droppedByBucket.Add(1)
continue
}
e.markFired(containerID, r.ID, now)
hits = append(hits, Hit{
Rule: r,
ContainerID: containerID,
WorkloadID: workloadID,
Stream: stream,
Line: line,
})
}
return hits
}
// Stats returns a snapshot of the engine's drop counters. The
// counters are monotonic over the lifetime of the engine — useful
// for trend observation, but not for instantaneous "is it dropping
// right now" alerting. Reset semantics: the engine is recreated on
// every process restart so counters reset to zero with the binary.
func (e *Engine) Stats() EngineStats {
return EngineStats{
DroppedByBucket: e.droppedByBucket.Load(),
DroppedByCooldown: e.droppedByCooldown.Load(),
}
}
// Hit is one rule fire — the engine returns these for the manager
// to persist + publish on the bus. Kept narrow on purpose so the
// engine has no event_log / bus dependency.
type Hit struct {
Rule Rule
ContainerID string
WorkloadID string
Stream string
Line string
}
// streamMatches checks whether a rule that scopes itself to a
// stream subset accepts the given stream. Empty rule.Streams is
// equivalent to "all" for forward-compat with older rows.
func streamMatches(ruleStreams, lineStream string) bool {
if ruleStreams == "" || ruleStreams == "all" {
return true
}
return ruleStreams == lineStream
}
func (e *Engine) cooledDown(containerID string, r Rule, now time.Time) bool {
if r.CooldownSeconds <= 0 {
return true
}
e.cooldownMu.Lock()
defer e.cooldownMu.Unlock()
last, ok := e.lastFiredAt[cooldownKey{containerID, r.ID}]
if !ok {
return true
}
return now.Sub(last) >= time.Duration(r.CooldownSeconds)*time.Second
}
func (e *Engine) markFired(containerID string, ruleID int64, now time.Time) {
e.cooldownMu.Lock()
e.lastFiredAt[cooldownKey{containerID, ruleID}] = now
e.cooldownMu.Unlock()
}
func (e *Engine) takeToken(containerID string, now time.Time) bool {
e.bucketMu.Lock()
defer e.bucketMu.Unlock()
b, ok := e.tokenBuckets[containerID]
if !ok {
b = &bucket{tokens: e.tokensPerWindow, resetsAt: now.Add(e.tokenWindow)}
e.tokenBuckets[containerID] = b
}
if !now.Before(b.resetsAt) {
b.tokens = e.tokensPerWindow
b.resetsAt = now.Add(e.tokenWindow)
}
if b.tokens <= 0 {
return false
}
b.tokens--
return true
}
// Forget drops cooldown + bucket state for a container that has been
// removed. Called by the manager when a tail exits to reclaim memory.
func (e *Engine) Forget(containerID string) {
e.cooldownMu.Lock()
for k := range e.lastFiredAt {
if k.ContainerID == containerID {
delete(e.lastFiredAt, k)
}
}
e.cooldownMu.Unlock()
e.bucketMu.Lock()
delete(e.tokenBuckets, containerID)
e.bucketMu.Unlock()
}