Files
tiny-forge/internal/metricalert/manager.go
T
alexei.dolgolyov cdb9fd57d1 feat(alerts): metric-threshold alerting (backend + API)
Operators can define metric-threshold alert rules (cpu_percent,
memory_percent, memory_bytes; gt/lt) per-workload or global via
/api/metric-alert-rules. A periodic evaluator (internal/metricalert,
30s tick) checks the freshest container stats sample per container
against enabled rules and, on breach (per-rule-per-workload cooldown),
emits into the existing event_log + bus pipeline (source "metric_alert",
workload_id set). Alerts therefore surface on the global events page,
the per-app activity timeline, and any configured event-trigger webhook
-- no new notification plumbing.

Mirrors the log_scan_rules store/API/route patterns and the
stats.Collector lifecycle. Rule CRUD reads are authed, mutations
AdminOnly. Frontend rule-config UI is a follow-up phase.

Reviewed: go APPROVE (0 CRITICAL/HIGH).
2026-05-29 14:06:23 +03:00

350 lines
9.7 KiB
Go

// Package metricalert implements a background goroutine that
// periodically evaluates operator-configured metric-threshold rules
// against recent container stats samples. On breach (subject to a
// per-rule-per-workload cooldown) it emits an event into the existing
// event_log + event-bus pipeline — the same fan-out used by the
// log-scanner — instead of building any new notification plumbing.
package metricalert
import (
"encoding/json"
"fmt"
"log/slog"
"sync"
"time"
"github.com/alexei/tinyforge/internal/events"
"github.com/alexei/tinyforge/internal/store"
)
// EvalInterval is how often the evaluator tick fires.
const EvalInterval = 30 * time.Second
// lookbackSeconds bounds how far back we pull samples each tick. Stats
// are collected at most every few seconds (see internal/stats), so a
// 120s window comfortably captures the latest reading per container
// even if collection briefly stalls.
const lookbackSeconds = 120
// RuleSource is the read-side seam for fetching the current rule rows.
// Real callers pass *store.Store; tests pass a fake.
type RuleSource interface {
ListMetricAlertRules() ([]store.MetricAlertRule, error)
}
// SampleSource fetches the recent container stats samples to evaluate.
type SampleSource interface {
ListAllRecentContainerStatsSamples(sinceTS int64) ([]store.ContainerStatsSample, error)
}
// EventSink writes a breach into event_log.
type EventSink interface {
InsertEvent(store.EventLog) (store.EventLog, error)
}
// Publisher fans the breach out on the event bus. Matches *events.Bus.
type Publisher interface {
Publish(events.Event)
}
// Source identifies metric-alert events in event_log + the bus.
const eventSource = "metric_alert"
// Manager owns the evaluation loop lifecycle. It mirrors
// stats.Collector: a once-guarded Start/Stop pair with stop/done
// channels and a single-goroutine run loop.
type Manager struct {
rules RuleSource
samples SampleSource
sink EventSink
pub Publisher
// now is swappable in tests so cooldown windows can be exercised
// deterministically. Defaults to time.Now.
now func() time.Time
// mu guards lastFired. The run loop is single-goroutine today, but
// Start/Stop and a future ReloadRules may touch shared state; the
// mutex is cheap insurance.
mu sync.Mutex
lastFired map[string]time.Time // "ruleID:ownerID" -> last emit time
startOnce sync.Once
stopOnce sync.Once
started bool
stop chan struct{}
done chan struct{}
}
// New wires a manager with the supplied dependencies. Call Start to
// begin evaluating.
func New(rules RuleSource, samples SampleSource, sink EventSink, pub Publisher) *Manager {
return &Manager{
rules: rules,
samples: samples,
sink: sink,
pub: pub,
now: time.Now,
lastFired: map[string]time.Time{},
stop: make(chan struct{}),
done: make(chan struct{}),
}
}
// Start launches the background loop. Returns immediately. The loop
// exits when Stop is called. Safe to call multiple times — only the
// first call has an effect.
func (m *Manager) Start() {
m.startOnce.Do(func() {
m.started = true
go m.run()
})
}
// Stop signals the loop to exit and blocks until it has finished the
// in-flight tick. If Start was never called, Stop returns immediately.
func (m *Manager) Stop() {
m.stopOnce.Do(func() {
close(m.stop)
if !m.started {
close(m.done)
}
})
<-m.done
}
// run is the main loop. It evaluates once shortly after start, then on
// every EvalInterval tick, until Stop is called.
func (m *Manager) run() {
defer close(m.done)
// Settle delay so the app + first stats samples exist before the
// first evaluation.
select {
case <-time.After(3 * time.Second):
case <-m.stop:
return
}
ticker := time.NewTicker(EvalInterval)
defer ticker.Stop()
m.evaluate(m.now())
for {
select {
case <-m.stop:
return
case <-ticker.C:
m.evaluate(m.now())
}
}
}
// evaluate runs one pass: load rules + recent samples, reduce to the
// freshest sample per (owner, container), and emit on breach subject to
// cooldown. Best-effort throughout — a bad rule or sample never crashes
// the loop.
func (m *Manager) evaluate(now time.Time) {
rules, err := m.rules.ListMetricAlertRules()
if err != nil {
slog.Warn("metricalert: list rules", "error", err)
return
}
if len(rules) == 0 {
return
}
since := now.Unix() - lookbackSeconds
samples, err := m.samples.ListAllRecentContainerStatsSamples(since)
if err != nil {
slog.Warn("metricalert: list samples", "error", err)
return
}
latest := latestPerContainer(samples)
if len(latest) == 0 {
return
}
for _, rule := range rules {
if !rule.Enabled {
continue
}
for _, sample := range latest {
// Per-workload rules only match their workload; "" matches all.
if rule.WorkloadID != "" && rule.WorkloadID != sample.OwnerID {
continue
}
value, ok := metricValue(rule.Metric, sample)
if !ok {
continue // e.g. memory_percent with a zero limit
}
if !breached(rule.Comparator, value, rule.Threshold) {
continue
}
if m.coolingDown(rule, sample.OwnerID, now) {
continue
}
m.emit(rule, sample, value)
m.recordFire(rule, sample.OwnerID, now)
}
}
}
// latestPerContainer keeps only the most recent sample per
// (OwnerID, ContainerID), so each container is judged on its freshest
// reading rather than every historical row in the window.
func latestPerContainer(samples []store.ContainerStatsSample) []store.ContainerStatsSample {
newest := map[string]store.ContainerStatsSample{}
for _, s := range samples {
key := s.OwnerID + "\x00" + s.ContainerID
if prev, ok := newest[key]; !ok || s.TS > prev.TS {
newest[key] = s
}
}
out := make([]store.ContainerStatsSample, 0, len(newest))
for _, s := range newest {
out = append(out, s)
}
return out
}
// metricValue resolves a rule's metric against a sample. The bool is
// false when the sample can't be judged for that metric (memory_percent
// with a zero/unknown limit) so the caller skips it instead of dividing
// by zero.
func metricValue(metric string, s store.ContainerStatsSample) (float64, bool) {
switch metric {
case store.MetricCPUPercent:
return s.CPUPercent, true
case store.MetricMemoryPercent:
if s.MemoryLimit <= 0 {
return 0, false
}
return float64(s.MemoryUsage) / float64(s.MemoryLimit) * 100, true
case store.MetricMemoryBytes:
return float64(s.MemoryUsage), true
default:
return 0, false
}
}
// breached returns whether value crosses threshold per the comparator.
func breached(comparator string, value, threshold float64) bool {
switch comparator {
case store.MetricComparatorGT:
return value > threshold
case store.MetricComparatorLT:
return value < threshold
default:
return false
}
}
// cooldownKey is the per-rule-per-workload cooldown key.
func cooldownKey(ruleID int64, ownerID string) string {
return fmt.Sprintf("%d:%s", ruleID, ownerID)
}
func (m *Manager) coolingDown(rule store.MetricAlertRule, ownerID string, now time.Time) bool {
if rule.CooldownSeconds <= 0 {
return false
}
m.mu.Lock()
defer m.mu.Unlock()
last, ok := m.lastFired[cooldownKey(rule.ID, ownerID)]
if !ok {
return false
}
return now.Sub(last) < time.Duration(rule.CooldownSeconds)*time.Second
}
func (m *Manager) recordFire(rule store.MetricAlertRule, ownerID string, now time.Time) {
m.mu.Lock()
m.lastFired[cooldownKey(rule.ID, ownerID)] = now
m.mu.Unlock()
}
// emit persists the breach as an event_log row and publishes it on the
// bus. WorkloadID routes the alert to that app's activity timeline.
// Metadata is JSON-marshalled (never string-concatenated). Any
// marshal/insert failure is logged and skipped — emitting must never
// crash the loop.
func (m *Manager) emit(rule store.MetricAlertRule, sample store.ContainerStatsSample, value float64) {
message := formatMessage(rule, value)
meta := map[string]any{
"workload_id": sample.OwnerID,
"rule": rule.Name,
"metric": rule.Metric,
"value": value,
"threshold": rule.Threshold,
"comparator": rule.Comparator,
}
metaJSON, err := json.Marshal(meta)
if err != nil {
slog.Error("metricalert: marshal metadata", "rule", rule.Name, "error", err)
return
}
severity := rule.Severity
if severity == "" {
severity = store.LogScanSeverityWarn
}
evt, err := m.sink.InsertEvent(store.EventLog{
Source: eventSource,
Severity: severity,
Message: message,
WorkloadID: sample.OwnerID,
Metadata: string(metaJSON),
})
if err != nil {
slog.Error("metricalert: persist event", "rule", rule.Name, "error", err)
return
}
if m.pub != nil {
m.pub.Publish(events.Event{
Type: events.EventLog,
Payload: events.EventLogPayload{
ID: evt.ID,
Source: eventSource,
WorkloadID: sample.OwnerID,
Severity: severity,
Message: message,
Metadata: string(metaJSON),
CreatedAt: evt.CreatedAt,
},
})
}
}
// formatMessage builds a concise, human, secret-free breach line. The
// only operator-supplied text is rule.Name; the rest are numbers and
// fixed labels.
func formatMessage(rule store.MetricAlertRule, value float64) string {
label, unit := metricLabelUnit(rule.Metric)
word := comparatorWord(rule.Comparator)
return fmt.Sprintf("%s: %s is %.0f%s (threshold %s %.0f%s)",
rule.Name, label, value, unit, word, rule.Threshold, unit)
}
func metricLabelUnit(metric string) (label, unit string) {
switch metric {
case store.MetricCPUPercent:
return "CPU", "%"
case store.MetricMemoryPercent:
return "Memory", "%"
case store.MetricMemoryBytes:
return "Memory", " bytes"
default:
return metric, ""
}
}
func comparatorWord(comparator string) string {
switch comparator {
case store.MetricComparatorGT:
return ">"
case store.MetricComparatorLT:
return "<"
default:
return comparator
}
}