// Package metricalert implements a background goroutine that // periodically evaluates operator-configured metric-threshold rules // against recent container stats samples. On breach (subject to a // per-rule-per-workload cooldown) it emits an event into the existing // event_log + event-bus pipeline — the same fan-out used by the // log-scanner — instead of building any new notification plumbing. package metricalert import ( "encoding/json" "fmt" "log/slog" "sync" "time" "github.com/alexei/tinyforge/internal/events" "github.com/alexei/tinyforge/internal/store" ) // EvalInterval is how often the evaluator tick fires. const EvalInterval = 30 * time.Second // lookbackSeconds bounds how far back we pull samples each tick. Stats // are collected at most every few seconds (see internal/stats), so a // 120s window comfortably captures the latest reading per container // even if collection briefly stalls. const lookbackSeconds = 120 // RuleSource is the read-side seam for fetching the current rule rows. // Real callers pass *store.Store; tests pass a fake. type RuleSource interface { ListMetricAlertRules() ([]store.MetricAlertRule, error) } // SampleSource fetches the recent container stats samples to evaluate. type SampleSource interface { ListAllRecentContainerStatsSamples(sinceTS int64) ([]store.ContainerStatsSample, error) } // EventSink writes a breach into event_log. type EventSink interface { InsertEvent(store.EventLog) (store.EventLog, error) } // Publisher fans the breach out on the event bus. Matches *events.Bus. type Publisher interface { Publish(events.Event) } // Source identifies metric-alert events in event_log + the bus. const eventSource = "metric_alert" // Manager owns the evaluation loop lifecycle. It mirrors // stats.Collector: a once-guarded Start/Stop pair with stop/done // channels and a single-goroutine run loop. type Manager struct { rules RuleSource samples SampleSource sink EventSink pub Publisher // now is swappable in tests so cooldown windows can be exercised // deterministically. Defaults to time.Now. now func() time.Time // mu guards lastFired. The run loop is single-goroutine today, but // Start/Stop and a future ReloadRules may touch shared state; the // mutex is cheap insurance. mu sync.Mutex lastFired map[string]time.Time // "ruleID:ownerID" -> last emit time startOnce sync.Once stopOnce sync.Once started bool stop chan struct{} done chan struct{} } // New wires a manager with the supplied dependencies. Call Start to // begin evaluating. func New(rules RuleSource, samples SampleSource, sink EventSink, pub Publisher) *Manager { return &Manager{ rules: rules, samples: samples, sink: sink, pub: pub, now: time.Now, lastFired: map[string]time.Time{}, stop: make(chan struct{}), done: make(chan struct{}), } } // Start launches the background loop. Returns immediately. The loop // exits when Stop is called. Safe to call multiple times — only the // first call has an effect. func (m *Manager) Start() { m.startOnce.Do(func() { m.started = true go m.run() }) } // Stop signals the loop to exit and blocks until it has finished the // in-flight tick. If Start was never called, Stop returns immediately. func (m *Manager) Stop() { m.stopOnce.Do(func() { close(m.stop) if !m.started { close(m.done) } }) <-m.done } // run is the main loop. It evaluates once shortly after start, then on // every EvalInterval tick, until Stop is called. func (m *Manager) run() { defer close(m.done) // Settle delay so the app + first stats samples exist before the // first evaluation. select { case <-time.After(3 * time.Second): case <-m.stop: return } ticker := time.NewTicker(EvalInterval) defer ticker.Stop() m.evaluate(m.now()) for { select { case <-m.stop: return case <-ticker.C: m.evaluate(m.now()) } } } // evaluate runs one pass: load rules + recent samples, reduce to the // freshest sample per (owner, container), and emit on breach subject to // cooldown. Best-effort throughout — a bad rule or sample never crashes // the loop. func (m *Manager) evaluate(now time.Time) { rules, err := m.rules.ListMetricAlertRules() if err != nil { slog.Warn("metricalert: list rules", "error", err) return } if len(rules) == 0 { return } since := now.Unix() - lookbackSeconds samples, err := m.samples.ListAllRecentContainerStatsSamples(since) if err != nil { slog.Warn("metricalert: list samples", "error", err) return } latest := latestPerContainer(samples) if len(latest) == 0 { return } for _, rule := range rules { if !rule.Enabled { continue } for _, sample := range latest { // Per-workload rules only match their workload; "" matches all. if rule.WorkloadID != "" && rule.WorkloadID != sample.OwnerID { continue } value, ok := metricValue(rule.Metric, sample) if !ok { continue // e.g. memory_percent with a zero limit } if !breached(rule.Comparator, value, rule.Threshold) { continue } if m.coolingDown(rule, sample.OwnerID, now) { continue } m.emit(rule, sample, value) m.recordFire(rule, sample.OwnerID, now) } } } // latestPerContainer keeps only the most recent sample per // (OwnerID, ContainerID), so each container is judged on its freshest // reading rather than every historical row in the window. func latestPerContainer(samples []store.ContainerStatsSample) []store.ContainerStatsSample { newest := map[string]store.ContainerStatsSample{} for _, s := range samples { key := s.OwnerID + "\x00" + s.ContainerID if prev, ok := newest[key]; !ok || s.TS > prev.TS { newest[key] = s } } out := make([]store.ContainerStatsSample, 0, len(newest)) for _, s := range newest { out = append(out, s) } return out } // metricValue resolves a rule's metric against a sample. The bool is // false when the sample can't be judged for that metric (memory_percent // with a zero/unknown limit) so the caller skips it instead of dividing // by zero. func metricValue(metric string, s store.ContainerStatsSample) (float64, bool) { switch metric { case store.MetricCPUPercent: return s.CPUPercent, true case store.MetricMemoryPercent: if s.MemoryLimit <= 0 { return 0, false } return float64(s.MemoryUsage) / float64(s.MemoryLimit) * 100, true case store.MetricMemoryBytes: return float64(s.MemoryUsage), true default: return 0, false } } // breached returns whether value crosses threshold per the comparator. func breached(comparator string, value, threshold float64) bool { switch comparator { case store.MetricComparatorGT: return value > threshold case store.MetricComparatorLT: return value < threshold default: return false } } // cooldownKey is the per-rule-per-workload cooldown key. func cooldownKey(ruleID int64, ownerID string) string { return fmt.Sprintf("%d:%s", ruleID, ownerID) } func (m *Manager) coolingDown(rule store.MetricAlertRule, ownerID string, now time.Time) bool { if rule.CooldownSeconds <= 0 { return false } m.mu.Lock() defer m.mu.Unlock() last, ok := m.lastFired[cooldownKey(rule.ID, ownerID)] if !ok { return false } return now.Sub(last) < time.Duration(rule.CooldownSeconds)*time.Second } func (m *Manager) recordFire(rule store.MetricAlertRule, ownerID string, now time.Time) { m.mu.Lock() m.lastFired[cooldownKey(rule.ID, ownerID)] = now m.mu.Unlock() } // emit persists the breach as an event_log row and publishes it on the // bus. WorkloadID routes the alert to that app's activity timeline. // Metadata is JSON-marshalled (never string-concatenated). Any // marshal/insert failure is logged and skipped — emitting must never // crash the loop. func (m *Manager) emit(rule store.MetricAlertRule, sample store.ContainerStatsSample, value float64) { message := formatMessage(rule, value) meta := map[string]any{ "workload_id": sample.OwnerID, "rule": rule.Name, "metric": rule.Metric, "value": value, "threshold": rule.Threshold, "comparator": rule.Comparator, } metaJSON, err := json.Marshal(meta) if err != nil { slog.Error("metricalert: marshal metadata", "rule", rule.Name, "error", err) return } severity := rule.Severity if severity == "" { severity = store.LogScanSeverityWarn } evt, err := m.sink.InsertEvent(store.EventLog{ Source: eventSource, Severity: severity, Message: message, WorkloadID: sample.OwnerID, Metadata: string(metaJSON), }) if err != nil { slog.Error("metricalert: persist event", "rule", rule.Name, "error", err) return } if m.pub != nil { m.pub.Publish(events.Event{ Type: events.EventLog, Payload: events.EventLogPayload{ ID: evt.ID, Source: eventSource, WorkloadID: sample.OwnerID, Severity: severity, Message: message, Metadata: string(metaJSON), CreatedAt: evt.CreatedAt, }, }) } } // formatMessage builds a concise, human, secret-free breach line. The // only operator-supplied text is rule.Name; the rest are numbers and // fixed labels. func formatMessage(rule store.MetricAlertRule, value float64) string { label, unit := metricLabelUnit(rule.Metric) word := comparatorWord(rule.Comparator) return fmt.Sprintf("%s: %s is %.0f%s (threshold %s %.0f%s)", rule.Name, label, value, unit, word, rule.Threshold, unit) } func metricLabelUnit(metric string) (label, unit string) { switch metric { case store.MetricCPUPercent: return "CPU", "%" case store.MetricMemoryPercent: return "Memory", "%" case store.MetricMemoryBytes: return "Memory", " bytes" default: return metric, "" } } func comparatorWord(comparator string) string { switch comparator { case store.MetricComparatorGT: return ">" case store.MetricComparatorLT: return "<" default: return comparator } }