feat(alerts): metric-threshold alerting (backend + API)

Operators can define metric-threshold alert rules (cpu_percent,
memory_percent, memory_bytes; gt/lt) per-workload or global via
/api/metric-alert-rules. A periodic evaluator (internal/metricalert,
30s tick) checks the freshest container stats sample per container
against enabled rules and, on breach (per-rule-per-workload cooldown),
emits into the existing event_log + bus pipeline (source "metric_alert",
workload_id set). Alerts therefore surface on the global events page,
the per-app activity timeline, and any configured event-trigger webhook
-- no new notification plumbing.

Mirrors the log_scan_rules store/API/route patterns and the
stats.Collector lifecycle. Rule CRUD reads are authed, mutations
AdminOnly. Frontend rule-config UI is a follow-up phase.

Reviewed: go APPROVE (0 CRITICAL/HIGH).
This commit is contained in:
2026-05-29 14:06:23 +03:00
parent 5c17885197
commit cdb9fd57d1
11 changed files with 1299 additions and 0 deletions
+349
View File
@@ -0,0 +1,349 @@
// Package metricalert implements a background goroutine that
// periodically evaluates operator-configured metric-threshold rules
// against recent container stats samples. On breach (subject to a
// per-rule-per-workload cooldown) it emits an event into the existing
// event_log + event-bus pipeline — the same fan-out used by the
// log-scanner — instead of building any new notification plumbing.
package metricalert
import (
"encoding/json"
"fmt"
"log/slog"
"sync"
"time"
"github.com/alexei/tinyforge/internal/events"
"github.com/alexei/tinyforge/internal/store"
)
// EvalInterval is how often the evaluator tick fires.
const EvalInterval = 30 * time.Second
// lookbackSeconds bounds how far back we pull samples each tick. Stats
// are collected at most every few seconds (see internal/stats), so a
// 120s window comfortably captures the latest reading per container
// even if collection briefly stalls.
const lookbackSeconds = 120
// RuleSource is the read-side seam for fetching the current rule rows.
// Real callers pass *store.Store; tests pass a fake.
type RuleSource interface {
ListMetricAlertRules() ([]store.MetricAlertRule, error)
}
// SampleSource fetches the recent container stats samples to evaluate.
type SampleSource interface {
ListAllRecentContainerStatsSamples(sinceTS int64) ([]store.ContainerStatsSample, error)
}
// EventSink writes a breach into event_log.
type EventSink interface {
InsertEvent(store.EventLog) (store.EventLog, error)
}
// Publisher fans the breach out on the event bus. Matches *events.Bus.
type Publisher interface {
Publish(events.Event)
}
// Source identifies metric-alert events in event_log + the bus.
const eventSource = "metric_alert"
// Manager owns the evaluation loop lifecycle. It mirrors
// stats.Collector: a once-guarded Start/Stop pair with stop/done
// channels and a single-goroutine run loop.
type Manager struct {
rules RuleSource
samples SampleSource
sink EventSink
pub Publisher
// now is swappable in tests so cooldown windows can be exercised
// deterministically. Defaults to time.Now.
now func() time.Time
// mu guards lastFired. The run loop is single-goroutine today, but
// Start/Stop and a future ReloadRules may touch shared state; the
// mutex is cheap insurance.
mu sync.Mutex
lastFired map[string]time.Time // "ruleID:ownerID" -> last emit time
startOnce sync.Once
stopOnce sync.Once
started bool
stop chan struct{}
done chan struct{}
}
// New wires a manager with the supplied dependencies. Call Start to
// begin evaluating.
func New(rules RuleSource, samples SampleSource, sink EventSink, pub Publisher) *Manager {
return &Manager{
rules: rules,
samples: samples,
sink: sink,
pub: pub,
now: time.Now,
lastFired: map[string]time.Time{},
stop: make(chan struct{}),
done: make(chan struct{}),
}
}
// Start launches the background loop. Returns immediately. The loop
// exits when Stop is called. Safe to call multiple times — only the
// first call has an effect.
func (m *Manager) Start() {
m.startOnce.Do(func() {
m.started = true
go m.run()
})
}
// Stop signals the loop to exit and blocks until it has finished the
// in-flight tick. If Start was never called, Stop returns immediately.
func (m *Manager) Stop() {
m.stopOnce.Do(func() {
close(m.stop)
if !m.started {
close(m.done)
}
})
<-m.done
}
// run is the main loop. It evaluates once shortly after start, then on
// every EvalInterval tick, until Stop is called.
func (m *Manager) run() {
defer close(m.done)
// Settle delay so the app + first stats samples exist before the
// first evaluation.
select {
case <-time.After(3 * time.Second):
case <-m.stop:
return
}
ticker := time.NewTicker(EvalInterval)
defer ticker.Stop()
m.evaluate(m.now())
for {
select {
case <-m.stop:
return
case <-ticker.C:
m.evaluate(m.now())
}
}
}
// evaluate runs one pass: load rules + recent samples, reduce to the
// freshest sample per (owner, container), and emit on breach subject to
// cooldown. Best-effort throughout — a bad rule or sample never crashes
// the loop.
func (m *Manager) evaluate(now time.Time) {
rules, err := m.rules.ListMetricAlertRules()
if err != nil {
slog.Warn("metricalert: list rules", "error", err)
return
}
if len(rules) == 0 {
return
}
since := now.Unix() - lookbackSeconds
samples, err := m.samples.ListAllRecentContainerStatsSamples(since)
if err != nil {
slog.Warn("metricalert: list samples", "error", err)
return
}
latest := latestPerContainer(samples)
if len(latest) == 0 {
return
}
for _, rule := range rules {
if !rule.Enabled {
continue
}
for _, sample := range latest {
// Per-workload rules only match their workload; "" matches all.
if rule.WorkloadID != "" && rule.WorkloadID != sample.OwnerID {
continue
}
value, ok := metricValue(rule.Metric, sample)
if !ok {
continue // e.g. memory_percent with a zero limit
}
if !breached(rule.Comparator, value, rule.Threshold) {
continue
}
if m.coolingDown(rule, sample.OwnerID, now) {
continue
}
m.emit(rule, sample, value)
m.recordFire(rule, sample.OwnerID, now)
}
}
}
// latestPerContainer keeps only the most recent sample per
// (OwnerID, ContainerID), so each container is judged on its freshest
// reading rather than every historical row in the window.
func latestPerContainer(samples []store.ContainerStatsSample) []store.ContainerStatsSample {
newest := map[string]store.ContainerStatsSample{}
for _, s := range samples {
key := s.OwnerID + "\x00" + s.ContainerID
if prev, ok := newest[key]; !ok || s.TS > prev.TS {
newest[key] = s
}
}
out := make([]store.ContainerStatsSample, 0, len(newest))
for _, s := range newest {
out = append(out, s)
}
return out
}
// metricValue resolves a rule's metric against a sample. The bool is
// false when the sample can't be judged for that metric (memory_percent
// with a zero/unknown limit) so the caller skips it instead of dividing
// by zero.
func metricValue(metric string, s store.ContainerStatsSample) (float64, bool) {
switch metric {
case store.MetricCPUPercent:
return s.CPUPercent, true
case store.MetricMemoryPercent:
if s.MemoryLimit <= 0 {
return 0, false
}
return float64(s.MemoryUsage) / float64(s.MemoryLimit) * 100, true
case store.MetricMemoryBytes:
return float64(s.MemoryUsage), true
default:
return 0, false
}
}
// breached returns whether value crosses threshold per the comparator.
func breached(comparator string, value, threshold float64) bool {
switch comparator {
case store.MetricComparatorGT:
return value > threshold
case store.MetricComparatorLT:
return value < threshold
default:
return false
}
}
// cooldownKey is the per-rule-per-workload cooldown key.
func cooldownKey(ruleID int64, ownerID string) string {
return fmt.Sprintf("%d:%s", ruleID, ownerID)
}
func (m *Manager) coolingDown(rule store.MetricAlertRule, ownerID string, now time.Time) bool {
if rule.CooldownSeconds <= 0 {
return false
}
m.mu.Lock()
defer m.mu.Unlock()
last, ok := m.lastFired[cooldownKey(rule.ID, ownerID)]
if !ok {
return false
}
return now.Sub(last) < time.Duration(rule.CooldownSeconds)*time.Second
}
func (m *Manager) recordFire(rule store.MetricAlertRule, ownerID string, now time.Time) {
m.mu.Lock()
m.lastFired[cooldownKey(rule.ID, ownerID)] = now
m.mu.Unlock()
}
// emit persists the breach as an event_log row and publishes it on the
// bus. WorkloadID routes the alert to that app's activity timeline.
// Metadata is JSON-marshalled (never string-concatenated). Any
// marshal/insert failure is logged and skipped — emitting must never
// crash the loop.
func (m *Manager) emit(rule store.MetricAlertRule, sample store.ContainerStatsSample, value float64) {
message := formatMessage(rule, value)
meta := map[string]any{
"workload_id": sample.OwnerID,
"rule": rule.Name,
"metric": rule.Metric,
"value": value,
"threshold": rule.Threshold,
"comparator": rule.Comparator,
}
metaJSON, err := json.Marshal(meta)
if err != nil {
slog.Error("metricalert: marshal metadata", "rule", rule.Name, "error", err)
return
}
severity := rule.Severity
if severity == "" {
severity = store.LogScanSeverityWarn
}
evt, err := m.sink.InsertEvent(store.EventLog{
Source: eventSource,
Severity: severity,
Message: message,
WorkloadID: sample.OwnerID,
Metadata: string(metaJSON),
})
if err != nil {
slog.Error("metricalert: persist event", "rule", rule.Name, "error", err)
return
}
if m.pub != nil {
m.pub.Publish(events.Event{
Type: events.EventLog,
Payload: events.EventLogPayload{
ID: evt.ID,
Source: eventSource,
WorkloadID: sample.OwnerID,
Severity: severity,
Message: message,
Metadata: string(metaJSON),
CreatedAt: evt.CreatedAt,
},
})
}
}
// formatMessage builds a concise, human, secret-free breach line. The
// only operator-supplied text is rule.Name; the rest are numbers and
// fixed labels.
func formatMessage(rule store.MetricAlertRule, value float64) string {
label, unit := metricLabelUnit(rule.Metric)
word := comparatorWord(rule.Comparator)
return fmt.Sprintf("%s: %s is %.0f%s (threshold %s %.0f%s)",
rule.Name, label, value, unit, word, rule.Threshold, unit)
}
func metricLabelUnit(metric string) (label, unit string) {
switch metric {
case store.MetricCPUPercent:
return "CPU", "%"
case store.MetricMemoryPercent:
return "Memory", "%"
case store.MetricMemoryBytes:
return "Memory", " bytes"
default:
return metric, ""
}
}
func comparatorWord(comparator string) string {
switch comparator {
case store.MetricComparatorGT:
return ">"
case store.MetricComparatorLT:
return "<"
default:
return comparator
}
}
+284
View File
@@ -0,0 +1,284 @@
package metricalert
import (
"testing"
"time"
"github.com/alexei/tinyforge/internal/events"
"github.com/alexei/tinyforge/internal/store"
)
// --- fakes -----------------------------------------------------------
type fakeRules struct {
rules []store.MetricAlertRule
err error
}
func (f *fakeRules) ListMetricAlertRules() ([]store.MetricAlertRule, error) {
return f.rules, f.err
}
type fakeSamples struct {
samples []store.ContainerStatsSample
err error
since int64 // captured arg of the last call
}
func (f *fakeSamples) ListAllRecentContainerStatsSamples(sinceTS int64) ([]store.ContainerStatsSample, error) {
f.since = sinceTS
return f.samples, f.err
}
type recordedEvent struct {
evt store.EventLog
}
type fakeSink struct {
events []recordedEvent
err error
nextID int64
}
func (f *fakeSink) InsertEvent(e store.EventLog) (store.EventLog, error) {
if f.err != nil {
return store.EventLog{}, f.err
}
f.nextID++
e.ID = f.nextID
e.CreatedAt = "2026-05-29T00:00:00Z"
f.events = append(f.events, recordedEvent{evt: e})
return e, nil
}
type fakePublisher struct {
published []events.Event
}
func (f *fakePublisher) Publish(e events.Event) {
f.published = append(f.published, e)
}
func newManager(rules []store.MetricAlertRule, samples []store.ContainerStatsSample) (*Manager, *fakeSink, *fakePublisher) {
sink := &fakeSink{}
pub := &fakePublisher{}
m := New(&fakeRules{rules: rules}, &fakeSamples{samples: samples}, sink, pub)
return m, sink, pub
}
// --- tests -----------------------------------------------------------
func TestEvaluate_BreachEmits(t *testing.T) {
rules := []store.MetricAlertRule{{
ID: 1, Name: "cpu-hot", Metric: store.MetricCPUPercent,
Comparator: store.MetricComparatorGT, Threshold: 80, Severity: "error",
CooldownSeconds: 300, Enabled: true,
}}
samples := []store.ContainerStatsSample{{
ContainerID: "c1", OwnerID: "w1", OwnerType: "instance", TS: 100, CPUPercent: 95,
}}
m, sink, pub := newManager(rules, samples)
m.evaluate(time.Unix(200, 0))
if len(sink.events) != 1 {
t.Fatalf("expected 1 event, got %d", len(sink.events))
}
got := sink.events[0].evt
if got.Source != "metric_alert" {
t.Errorf("source = %q, want metric_alert", got.Source)
}
if got.Severity != "error" {
t.Errorf("severity = %q, want error", got.Severity)
}
if got.WorkloadID != "w1" {
t.Errorf("workload_id = %q, want w1", got.WorkloadID)
}
if got.Metadata == "" || got.Metadata == "{}" {
t.Errorf("metadata should be populated JSON, got %q", got.Metadata)
}
if len(pub.published) != 1 {
t.Fatalf("expected 1 published event, got %d", len(pub.published))
}
payload, ok := pub.published[0].Payload.(events.EventLogPayload)
if !ok {
t.Fatalf("published payload is not EventLogPayload")
}
if payload.WorkloadID != "w1" || payload.Source != "metric_alert" {
t.Errorf("payload workload/source mismatch: %+v", payload)
}
}
func TestEvaluate_NoBreachNoEmit(t *testing.T) {
rules := []store.MetricAlertRule{{
ID: 1, Name: "cpu-hot", Metric: store.MetricCPUPercent,
Comparator: store.MetricComparatorGT, Threshold: 80, Enabled: true,
}}
samples := []store.ContainerStatsSample{{
ContainerID: "c1", OwnerID: "w1", TS: 100, CPUPercent: 10,
}}
m, sink, _ := newManager(rules, samples)
m.evaluate(time.Unix(200, 0))
if len(sink.events) != 0 {
t.Fatalf("expected no events for non-breach, got %d", len(sink.events))
}
}
func TestEvaluate_DisabledRuleSkipped(t *testing.T) {
rules := []store.MetricAlertRule{{
ID: 1, Name: "cpu-hot", Metric: store.MetricCPUPercent,
Comparator: store.MetricComparatorGT, Threshold: 80, Enabled: false,
}}
samples := []store.ContainerStatsSample{{ContainerID: "c1", OwnerID: "w1", TS: 100, CPUPercent: 95}}
m, sink, _ := newManager(rules, samples)
m.evaluate(time.Unix(200, 0))
if len(sink.events) != 0 {
t.Fatalf("disabled rule should not emit, got %d", len(sink.events))
}
}
func TestEvaluate_PerWorkloadScoping(t *testing.T) {
rules := []store.MetricAlertRule{{
ID: 1, Name: "w2-only", WorkloadID: "w2", Metric: store.MetricCPUPercent,
Comparator: store.MetricComparatorGT, Threshold: 80, Enabled: true,
}}
samples := []store.ContainerStatsSample{
{ContainerID: "c1", OwnerID: "w1", TS: 100, CPUPercent: 95}, // breach but wrong workload
{ContainerID: "c2", OwnerID: "w2", TS: 100, CPUPercent: 95}, // breach, correct workload
}
m, sink, _ := newManager(rules, samples)
m.evaluate(time.Unix(200, 0))
if len(sink.events) != 1 {
t.Fatalf("expected 1 event (only w2), got %d", len(sink.events))
}
if sink.events[0].evt.WorkloadID != "w2" {
t.Errorf("event should be scoped to w2, got %q", sink.events[0].evt.WorkloadID)
}
}
func TestEvaluate_GlobalRuleMatchesAll(t *testing.T) {
rules := []store.MetricAlertRule{{
ID: 1, Name: "global", WorkloadID: "", Metric: store.MetricCPUPercent,
Comparator: store.MetricComparatorGT, Threshold: 80, Enabled: true,
}}
samples := []store.ContainerStatsSample{
{ContainerID: "c1", OwnerID: "w1", TS: 100, CPUPercent: 95},
{ContainerID: "c2", OwnerID: "w2", TS: 100, CPUPercent: 95},
}
m, sink, _ := newManager(rules, samples)
m.evaluate(time.Unix(200, 0))
if len(sink.events) != 2 {
t.Fatalf("global rule should fire for both workloads, got %d", len(sink.events))
}
}
func TestEvaluate_MemoryPercentDivByZeroSkip(t *testing.T) {
rules := []store.MetricAlertRule{{
ID: 1, Name: "mem", Metric: store.MetricMemoryPercent,
Comparator: store.MetricComparatorGT, Threshold: 50, Enabled: true,
}}
samples := []store.ContainerStatsSample{{
ContainerID: "c1", OwnerID: "w1", TS: 100, MemoryUsage: 1000, MemoryLimit: 0,
}}
m, sink, _ := newManager(rules, samples)
m.evaluate(time.Unix(200, 0))
if len(sink.events) != 0 {
t.Fatalf("zero memory limit should be skipped for percent rule, got %d", len(sink.events))
}
}
func TestEvaluate_MemoryPercentBreaches(t *testing.T) {
rules := []store.MetricAlertRule{{
ID: 1, Name: "mem", Metric: store.MetricMemoryPercent,
Comparator: store.MetricComparatorGT, Threshold: 90, Enabled: true,
}}
samples := []store.ContainerStatsSample{{
ContainerID: "c1", OwnerID: "w1", TS: 100, MemoryUsage: 950, MemoryLimit: 1000, // 95%
}}
m, sink, _ := newManager(rules, samples)
m.evaluate(time.Unix(200, 0))
if len(sink.events) != 1 {
t.Fatalf("95%% should breach 90%% threshold, got %d events", len(sink.events))
}
}
func TestEvaluate_CooldownSuppressesSecondEmit(t *testing.T) {
rules := []store.MetricAlertRule{{
ID: 1, Name: "cpu-hot", Metric: store.MetricCPUPercent,
Comparator: store.MetricComparatorGT, Threshold: 80, CooldownSeconds: 300, Enabled: true,
}}
samples := []store.ContainerStatsSample{{ContainerID: "c1", OwnerID: "w1", TS: 100, CPUPercent: 95}}
m, sink, _ := newManager(rules, samples)
base := time.Unix(1000, 0)
m.evaluate(base)
// 10s later — still inside the 300s cooldown window.
m.evaluate(base.Add(10 * time.Second))
if len(sink.events) != 1 {
t.Fatalf("cooldown should suppress second emit, got %d events", len(sink.events))
}
// Past the window — should fire again.
m.evaluate(base.Add(301 * time.Second))
if len(sink.events) != 2 {
t.Fatalf("should re-fire after cooldown elapses, got %d events", len(sink.events))
}
}
func TestEvaluate_LatestSamplePerContainer(t *testing.T) {
// Two samples for the same container: an old non-breaching reading
// and a newer breaching one. Only the freshest should be judged.
rules := []store.MetricAlertRule{{
ID: 1, Name: "cpu-hot", Metric: store.MetricCPUPercent,
Comparator: store.MetricComparatorGT, Threshold: 80, Enabled: true,
}}
samples := []store.ContainerStatsSample{
{ContainerID: "c1", OwnerID: "w1", TS: 100, CPUPercent: 10},
{ContainerID: "c1", OwnerID: "w1", TS: 150, CPUPercent: 95},
}
m, sink, _ := newManager(rules, samples)
m.evaluate(time.Unix(200, 0))
if len(sink.events) != 1 {
t.Fatalf("expected exactly 1 event from freshest sample, got %d", len(sink.events))
}
}
func TestEvaluate_LessThanComparator(t *testing.T) {
rules := []store.MetricAlertRule{{
ID: 1, Name: "cpu-idle", Metric: store.MetricCPUPercent,
Comparator: store.MetricComparatorLT, Threshold: 5, Enabled: true,
}}
samples := []store.ContainerStatsSample{{ContainerID: "c1", OwnerID: "w1", TS: 100, CPUPercent: 1}}
m, sink, _ := newManager(rules, samples)
m.evaluate(time.Unix(200, 0))
if len(sink.events) != 1 {
t.Fatalf("1%% < 5%% threshold should breach lt rule, got %d events", len(sink.events))
}
}
func TestEvaluate_NoRulesNoFetch(t *testing.T) {
// With no rules there's nothing to do; we shouldn't even query samples.
samplesSrc := &fakeSamples{samples: nil}
m := New(&fakeRules{rules: nil}, samplesSrc, &fakeSink{}, &fakePublisher{})
m.evaluate(time.Unix(200, 0))
if samplesSrc.since != 0 {
t.Errorf("samples should not be queried when there are no rules")
}
}