7a9ff7ad54
Two paired backends sharing the events.Bus seam:
Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
redaction on read (placeholder echo treated as "no change" on
PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
filters (severity CSV, source CSV, message regex with memoized
compile cache). Structural loop-prevention: never writes to
event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
TierEventTrigger constant, doSendRaw shared with the legacy
Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
sending the real TriggerWebhookPayload shape. SSRF guard
rejects loopback / link-local / unspecified targets. PATCH
uses pointer-typed DTO for partial updates.
Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
per-container token bucket, atomic drop counters), tail
(multiplexed docker frame demuxer with TTY fallback + 16 MiB
payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
timestamp strip + UTF-8-safe message truncation), manager
(5s container polling, atomic.Pointer[Snapshot] hot-reload,
HitEmitter writes event_log + publishes EventLog so the
trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
EffectiveLogScanRules resolver (globals minus per-workload
overrides plus workload-only additions). Transactional
cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
(sample_line → matched/captures) + /stats (drop counters +
active tail count + last-snapshot compile errors) +
GET /api/workloads/{id}/effective-rules.
cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
228 lines
6.6 KiB
Go
228 lines
6.6 KiB
Go
package logscanner
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// Engine evaluates lines against a snapshot, gates by per-rule
|
|
// cooldown, and rate-limits emissions per container so a noisy regex
|
|
// can't flood the event_log table.
|
|
type Engine struct {
|
|
// cooldownMu guards lastFiredAt. The map is keyed on a
|
|
// composite (containerID, ruleID) so the same rule firing on two
|
|
// different containers gets independent cooldowns — matches the
|
|
// operator intuition that "this container is alerting" doesn't
|
|
// suppress an alert from a different container.
|
|
cooldownMu sync.Mutex
|
|
lastFiredAt map[cooldownKey]time.Time
|
|
|
|
// bucketMu guards tokenBuckets. One bucket per container.
|
|
bucketMu sync.Mutex
|
|
tokenBuckets map[string]*bucket
|
|
|
|
// Drop counters. Incremented when a matching hit is suppressed
|
|
// before reaching the bus. droppedByBucket covers per-container
|
|
// rate-limit drops; droppedByCooldown counts cooldown-suppressed
|
|
// matches so operators can tell whether their patterns are too
|
|
// greedy vs too tightly cooled. Atomic so the hot path doesn't
|
|
// take the lock just to bump the counter.
|
|
droppedByBucket atomic.Int64
|
|
droppedByCooldown atomic.Int64
|
|
|
|
// Configuration knobs. Defaults set in NewEngine.
|
|
tokensPerWindow int // bucket capacity
|
|
tokenWindow time.Duration // bucket refill window
|
|
now func() time.Time
|
|
}
|
|
|
|
// EngineStats is the public-facing counter snapshot. Returned by
|
|
// Stats() and surfaced through the manager + API for operator
|
|
// visibility — when a noisy regex floods the bucket, this is how
|
|
// they discover it.
|
|
type EngineStats struct {
|
|
DroppedByBucket int64 `json:"dropped_by_bucket"`
|
|
DroppedByCooldown int64 `json:"dropped_by_cooldown"`
|
|
}
|
|
|
|
type cooldownKey struct {
|
|
ContainerID string
|
|
RuleID int64
|
|
}
|
|
|
|
type bucket struct {
|
|
tokens int
|
|
resetsAt time.Time
|
|
}
|
|
|
|
// NewEngine constructs an Engine with sensible defaults: 10
|
|
// events / 60s per container. Both knobs can be overridden via
|
|
// the With* options.
|
|
func NewEngine(opts ...Option) *Engine {
|
|
e := &Engine{
|
|
lastFiredAt: map[cooldownKey]time.Time{},
|
|
tokenBuckets: map[string]*bucket{},
|
|
tokensPerWindow: 10,
|
|
tokenWindow: 60 * time.Second,
|
|
now: time.Now,
|
|
}
|
|
for _, opt := range opts {
|
|
opt(e)
|
|
}
|
|
return e
|
|
}
|
|
|
|
// Option mutates an Engine during construction.
|
|
type Option func(*Engine)
|
|
|
|
// WithBucket sets the per-container token bucket capacity and
|
|
// refill window. Used by tests to make rate-limit assertions
|
|
// deterministic.
|
|
func WithBucket(tokens int, window time.Duration) Option {
|
|
return func(e *Engine) {
|
|
e.tokensPerWindow = tokens
|
|
e.tokenWindow = window
|
|
}
|
|
}
|
|
|
|
// WithNow overrides the clock for tests.
|
|
func WithNow(now func() time.Time) Option {
|
|
return func(e *Engine) {
|
|
e.now = now
|
|
}
|
|
}
|
|
|
|
// Match is the hot-path evaluation. For every rule whose Streams
|
|
// covers `stream` and whose pattern matches `line`, returns one
|
|
// Hit — gated by cooldown and per-container token bucket. Empty
|
|
// result means "drop this line."
|
|
//
|
|
// Side-effect: cooldown timestamps + bucket counters are updated
|
|
// when a rule fires. Callers must not retry the same line.
|
|
func (e *Engine) Match(containerID, workloadID, stream, line string, rules []Rule) []Hit {
|
|
if line == "" || len(rules) == 0 {
|
|
return nil
|
|
}
|
|
var hits []Hit
|
|
now := e.now()
|
|
for _, r := range rules {
|
|
if !streamMatches(r.Streams, stream) {
|
|
continue
|
|
}
|
|
if !r.Pattern.MatchString(line) {
|
|
continue
|
|
}
|
|
if !e.cooledDown(containerID, r, now) {
|
|
// Matched but inside the cooldown window — bump the
|
|
// counter so operators can see when their cooldowns
|
|
// are eating real signal.
|
|
e.droppedByCooldown.Add(1)
|
|
continue
|
|
}
|
|
if !e.takeToken(containerID, now) {
|
|
// Bucket exhausted for this container. Bump the
|
|
// counter so the stats endpoint can surface chatty
|
|
// patterns; the operator's signal is "Stats says we
|
|
// dropped N events — your rule is too broad."
|
|
e.droppedByBucket.Add(1)
|
|
continue
|
|
}
|
|
e.markFired(containerID, r.ID, now)
|
|
hits = append(hits, Hit{
|
|
Rule: r,
|
|
ContainerID: containerID,
|
|
WorkloadID: workloadID,
|
|
Stream: stream,
|
|
Line: line,
|
|
})
|
|
}
|
|
return hits
|
|
}
|
|
|
|
// Stats returns a snapshot of the engine's drop counters. The
|
|
// counters are monotonic over the lifetime of the engine — useful
|
|
// for trend observation, but not for instantaneous "is it dropping
|
|
// right now" alerting. Reset semantics: the engine is recreated on
|
|
// every process restart so counters reset to zero with the binary.
|
|
func (e *Engine) Stats() EngineStats {
|
|
return EngineStats{
|
|
DroppedByBucket: e.droppedByBucket.Load(),
|
|
DroppedByCooldown: e.droppedByCooldown.Load(),
|
|
}
|
|
}
|
|
|
|
// Hit is one rule fire — the engine returns these for the manager
|
|
// to persist + publish on the bus. Kept narrow on purpose so the
|
|
// engine has no event_log / bus dependency.
|
|
type Hit struct {
|
|
Rule Rule
|
|
ContainerID string
|
|
WorkloadID string
|
|
Stream string
|
|
Line string
|
|
}
|
|
|
|
// streamMatches checks whether a rule that scopes itself to a
|
|
// stream subset accepts the given stream. Empty rule.Streams is
|
|
// equivalent to "all" for forward-compat with older rows.
|
|
func streamMatches(ruleStreams, lineStream string) bool {
|
|
if ruleStreams == "" || ruleStreams == "all" {
|
|
return true
|
|
}
|
|
return ruleStreams == lineStream
|
|
}
|
|
|
|
func (e *Engine) cooledDown(containerID string, r Rule, now time.Time) bool {
|
|
if r.CooldownSeconds <= 0 {
|
|
return true
|
|
}
|
|
e.cooldownMu.Lock()
|
|
defer e.cooldownMu.Unlock()
|
|
last, ok := e.lastFiredAt[cooldownKey{containerID, r.ID}]
|
|
if !ok {
|
|
return true
|
|
}
|
|
return now.Sub(last) >= time.Duration(r.CooldownSeconds)*time.Second
|
|
}
|
|
|
|
func (e *Engine) markFired(containerID string, ruleID int64, now time.Time) {
|
|
e.cooldownMu.Lock()
|
|
e.lastFiredAt[cooldownKey{containerID, ruleID}] = now
|
|
e.cooldownMu.Unlock()
|
|
}
|
|
|
|
func (e *Engine) takeToken(containerID string, now time.Time) bool {
|
|
e.bucketMu.Lock()
|
|
defer e.bucketMu.Unlock()
|
|
b, ok := e.tokenBuckets[containerID]
|
|
if !ok {
|
|
b = &bucket{tokens: e.tokensPerWindow, resetsAt: now.Add(e.tokenWindow)}
|
|
e.tokenBuckets[containerID] = b
|
|
}
|
|
if !now.Before(b.resetsAt) {
|
|
b.tokens = e.tokensPerWindow
|
|
b.resetsAt = now.Add(e.tokenWindow)
|
|
}
|
|
if b.tokens <= 0 {
|
|
return false
|
|
}
|
|
b.tokens--
|
|
return true
|
|
}
|
|
|
|
// Forget drops cooldown + bucket state for a container that has been
|
|
// removed. Called by the manager when a tail exits to reclaim memory.
|
|
func (e *Engine) Forget(containerID string) {
|
|
e.cooldownMu.Lock()
|
|
for k := range e.lastFiredAt {
|
|
if k.ContainerID == containerID {
|
|
delete(e.lastFiredAt, k)
|
|
}
|
|
}
|
|
e.cooldownMu.Unlock()
|
|
e.bucketMu.Lock()
|
|
delete(e.tokenBuckets, containerID)
|
|
e.bucketMu.Unlock()
|
|
}
|