package logscanner import ( "sync" "sync/atomic" "time" ) // Engine evaluates lines against a snapshot, gates by per-rule // cooldown, and rate-limits emissions per container so a noisy regex // can't flood the event_log table. type Engine struct { // cooldownMu guards lastFiredAt. The map is keyed on a // composite (containerID, ruleID) so the same rule firing on two // different containers gets independent cooldowns — matches the // operator intuition that "this container is alerting" doesn't // suppress an alert from a different container. cooldownMu sync.Mutex lastFiredAt map[cooldownKey]time.Time // bucketMu guards tokenBuckets. One bucket per container. bucketMu sync.Mutex tokenBuckets map[string]*bucket // Drop counters. Incremented when a matching hit is suppressed // before reaching the bus. droppedByBucket covers per-container // rate-limit drops; droppedByCooldown counts cooldown-suppressed // matches so operators can tell whether their patterns are too // greedy vs too tightly cooled. Atomic so the hot path doesn't // take the lock just to bump the counter. droppedByBucket atomic.Int64 droppedByCooldown atomic.Int64 // Configuration knobs. Defaults set in NewEngine. tokensPerWindow int // bucket capacity tokenWindow time.Duration // bucket refill window now func() time.Time } // EngineStats is the public-facing counter snapshot. Returned by // Stats() and surfaced through the manager + API for operator // visibility — when a noisy regex floods the bucket, this is how // they discover it. type EngineStats struct { DroppedByBucket int64 `json:"dropped_by_bucket"` DroppedByCooldown int64 `json:"dropped_by_cooldown"` } type cooldownKey struct { ContainerID string RuleID int64 } type bucket struct { tokens int resetsAt time.Time } // NewEngine constructs an Engine with sensible defaults: 10 // events / 60s per container. Both knobs can be overridden via // the With* options. func NewEngine(opts ...Option) *Engine { e := &Engine{ lastFiredAt: map[cooldownKey]time.Time{}, tokenBuckets: map[string]*bucket{}, tokensPerWindow: 10, tokenWindow: 60 * time.Second, now: time.Now, } for _, opt := range opts { opt(e) } return e } // Option mutates an Engine during construction. type Option func(*Engine) // WithBucket sets the per-container token bucket capacity and // refill window. Used by tests to make rate-limit assertions // deterministic. func WithBucket(tokens int, window time.Duration) Option { return func(e *Engine) { e.tokensPerWindow = tokens e.tokenWindow = window } } // WithNow overrides the clock for tests. func WithNow(now func() time.Time) Option { return func(e *Engine) { e.now = now } } // Match is the hot-path evaluation. For every rule whose Streams // covers `stream` and whose pattern matches `line`, returns one // Hit — gated by cooldown and per-container token bucket. Empty // result means "drop this line." // // Side-effect: cooldown timestamps + bucket counters are updated // when a rule fires. Callers must not retry the same line. func (e *Engine) Match(containerID, workloadID, stream, line string, rules []Rule) []Hit { if line == "" || len(rules) == 0 { return nil } var hits []Hit now := e.now() for _, r := range rules { if !streamMatches(r.Streams, stream) { continue } if !r.Pattern.MatchString(line) { continue } if !e.cooledDown(containerID, r, now) { // Matched but inside the cooldown window — bump the // counter so operators can see when their cooldowns // are eating real signal. e.droppedByCooldown.Add(1) continue } if !e.takeToken(containerID, now) { // Bucket exhausted for this container. Bump the // counter so the stats endpoint can surface chatty // patterns; the operator's signal is "Stats says we // dropped N events — your rule is too broad." e.droppedByBucket.Add(1) continue } e.markFired(containerID, r.ID, now) hits = append(hits, Hit{ Rule: r, ContainerID: containerID, WorkloadID: workloadID, Stream: stream, Line: line, }) } return hits } // Stats returns a snapshot of the engine's drop counters. The // counters are monotonic over the lifetime of the engine — useful // for trend observation, but not for instantaneous "is it dropping // right now" alerting. Reset semantics: the engine is recreated on // every process restart so counters reset to zero with the binary. func (e *Engine) Stats() EngineStats { return EngineStats{ DroppedByBucket: e.droppedByBucket.Load(), DroppedByCooldown: e.droppedByCooldown.Load(), } } // Hit is one rule fire — the engine returns these for the manager // to persist + publish on the bus. Kept narrow on purpose so the // engine has no event_log / bus dependency. type Hit struct { Rule Rule ContainerID string WorkloadID string Stream string Line string } // streamMatches checks whether a rule that scopes itself to a // stream subset accepts the given stream. Empty rule.Streams is // equivalent to "all" for forward-compat with older rows. func streamMatches(ruleStreams, lineStream string) bool { if ruleStreams == "" || ruleStreams == "all" { return true } return ruleStreams == lineStream } func (e *Engine) cooledDown(containerID string, r Rule, now time.Time) bool { if r.CooldownSeconds <= 0 { return true } e.cooldownMu.Lock() defer e.cooldownMu.Unlock() last, ok := e.lastFiredAt[cooldownKey{containerID, r.ID}] if !ok { return true } return now.Sub(last) >= time.Duration(r.CooldownSeconds)*time.Second } func (e *Engine) markFired(containerID string, ruleID int64, now time.Time) { e.cooldownMu.Lock() e.lastFiredAt[cooldownKey{containerID, ruleID}] = now e.cooldownMu.Unlock() } func (e *Engine) takeToken(containerID string, now time.Time) bool { e.bucketMu.Lock() defer e.bucketMu.Unlock() b, ok := e.tokenBuckets[containerID] if !ok { b = &bucket{tokens: e.tokensPerWindow, resetsAt: now.Add(e.tokenWindow)} e.tokenBuckets[containerID] = b } if !now.Before(b.resetsAt) { b.tokens = e.tokensPerWindow b.resetsAt = now.Add(e.tokenWindow) } if b.tokens <= 0 { return false } b.tokens-- return true } // Forget drops cooldown + bucket state for a container that has been // removed. Called by the manager when a tail exits to reclaim memory. func (e *Engine) Forget(containerID string) { e.cooldownMu.Lock() for k := range e.lastFiredAt { if k.ContainerID == containerID { delete(e.lastFiredAt, k) } } e.cooldownMu.Unlock() e.bucketMu.Lock() delete(e.tokenBuckets, containerID) e.bucketMu.Unlock() }