Files
tiny-forge/internal/api/log_scan_rules.go
T
alexei.dolgolyov 7a9ff7ad54 feat(observability): event triggers + log scanner backend
Two paired backends sharing the events.Bus seam:

Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
  redaction on read (placeholder echo treated as "no change" on
  PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
  filters (severity CSV, source CSV, message regex with memoized
  compile cache). Structural loop-prevention: never writes to
  event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
  TierEventTrigger constant, doSendRaw shared with the legacy
  Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
  sending the real TriggerWebhookPayload shape. SSRF guard
  rejects loopback / link-local / unspecified targets. PATCH
  uses pointer-typed DTO for partial updates.

Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
  per-container token bucket, atomic drop counters), tail
  (multiplexed docker frame demuxer with TTY fallback + 16 MiB
  payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
  timestamp strip + UTF-8-safe message truncation), manager
  (5s container polling, atomic.Pointer[Snapshot] hot-reload,
  HitEmitter writes event_log + publishes EventLog so the
  trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
  stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
  EffectiveLogScanRules resolver (globals minus per-workload
  overrides plus workload-only additions). Transactional
  cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
  (sample_line → matched/captures) + /stats (drop counters +
  active tail count + last-snapshot compile errors) +
  GET /api/workloads/{id}/effective-rules.

cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 22:18:11 +03:00

351 lines
10 KiB
Go

// Package api: log-scan rule HTTP handlers. The scanner manager
// lives in internal/logscanner; this file is the REST surface that
// lets operators create, edit, and test rules from the UI.
package api
import (
"errors"
"log/slog"
"net/http"
"regexp"
"strconv"
"strings"
"github.com/go-chi/chi/v5"
"github.com/alexei/tinyforge/internal/logscanner"
"github.com/alexei/tinyforge/internal/store"
)
// LogScanReloader is what the API calls after any rule CRUD so the
// scanner manager swaps its snapshot, and what the /stats endpoint
// queries for runtime counters. Implemented by *logscanner.Manager;
// nil-tolerant on the API side so the routes still work in a
// scanner-disabled deployment.
type LogScanReloader interface {
ReloadRules() error
Stats() logscanner.Stats
}
// SetLogScanReloader wires the API → manager reload signal. Called
// from main after both subsystems are constructed.
func (s *Server) SetLogScanReloader(r LogScanReloader) {
s.logScanReloader = r
}
// ruleInput is the JSON shape accepted by POST + PATCH. Pointers
// distinguish "absent" from explicit empty/zero. WorkloadID and
// OverridesID are immutable on update (per store.UpdateLogScanRule)
// so they only appear here for create.
type ruleInput struct {
WorkloadID *string `json:"workload_id"`
OverridesID *int64 `json:"overrides_id"`
Name *string `json:"name"`
Pattern *string `json:"pattern"`
Severity *string `json:"severity"`
Streams *string `json:"streams"`
CooldownSeconds *int `json:"cooldown_seconds"`
Enabled *bool `json:"enabled"`
}
// listLogScanRules handles GET /api/log-scan-rules. Optional query
// filter `workload_id=...` returns only rules scoped to that
// workload (workload-only + override rows, NOT globals).
func (s *Server) listLogScanRules(w http.ResponseWriter, r *http.Request) {
if wlID := r.URL.Query().Get("workload_id"); wlID != "" {
out, err := s.store.ListLogScanRulesByWorkload(wlID)
if err != nil {
respondError(w, http.StatusInternalServerError, "list log scan rules")
return
}
respondJSON(w, http.StatusOK, out)
return
}
out, err := s.store.ListLogScanRules()
if err != nil {
respondError(w, http.StatusInternalServerError, "list log scan rules")
return
}
respondJSON(w, http.StatusOK, out)
}
// getLogScanRule handles GET /api/log-scan-rules/{id}.
func (s *Server) getLogScanRule(w http.ResponseWriter, r *http.Request) {
id, ok := parseRuleID(w, r)
if !ok {
return
}
rule, err := s.store.GetLogScanRule(id)
if err != nil {
mapStoreError(w, err, "log scan rule")
return
}
respondJSON(w, http.StatusOK, rule)
}
// createLogScanRule handles POST /api/log-scan-rules.
func (s *Server) createLogScanRule(w http.ResponseWriter, r *http.Request) {
var in ruleInput
if !decodeJSON(w, r, &in) {
return
}
rule := store.LogScanRule{
WorkloadID: derefString(in.WorkloadID),
OverridesID: derefInt64(in.OverridesID),
Name: derefString(in.Name),
Pattern: derefString(in.Pattern),
Severity: firstNonEmpty(derefString(in.Severity), store.LogScanSeverityWarn),
Streams: firstNonEmpty(derefString(in.Streams), store.LogScanStreamAll),
CooldownSeconds: derefIntDefault(in.CooldownSeconds, 60),
Enabled: in.Enabled == nil || *in.Enabled,
}
if msg := validateRulePattern(rule.Pattern); msg != "" {
respondError(w, http.StatusBadRequest, msg)
return
}
out, err := s.store.CreateLogScanRule(rule)
if err != nil {
// Store-side validation errors map to 400; anything else
// (driver errors) is a 500 without leaking the raw text.
if isClientValidationErr(err) {
respondError(w, http.StatusBadRequest, err.Error())
return
}
respondError(w, http.StatusInternalServerError, "create log scan rule")
return
}
s.reloadLogScan()
respondJSON(w, http.StatusCreated, out)
}
// updateLogScanRule handles PATCH /api/log-scan-rules/{id}. Scope
// fields (workload_id, overrides_id) are immutable; pattern/severity/
// streams/cooldown/enabled/name are individually overridable.
func (s *Server) updateLogScanRule(w http.ResponseWriter, r *http.Request) {
id, ok := parseRuleID(w, r)
if !ok {
return
}
existing, err := s.store.GetLogScanRule(id)
if err != nil {
mapStoreError(w, err, "log scan rule")
return
}
var in ruleInput
if !decodeJSON(w, r, &in) {
return
}
if in.Name != nil {
existing.Name = *in.Name
}
if in.Pattern != nil {
existing.Pattern = *in.Pattern
}
if in.Severity != nil && *in.Severity != "" {
existing.Severity = *in.Severity
}
if in.Streams != nil && *in.Streams != "" {
existing.Streams = *in.Streams
}
if in.CooldownSeconds != nil {
existing.CooldownSeconds = *in.CooldownSeconds
}
if in.Enabled != nil {
existing.Enabled = *in.Enabled
}
if msg := validateRulePattern(existing.Pattern); msg != "" {
respondError(w, http.StatusBadRequest, msg)
return
}
out, err := s.store.UpdateLogScanRule(existing)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
respondNotFound(w, "log scan rule")
return
}
if isClientValidationErr(err) {
respondError(w, http.StatusBadRequest, err.Error())
return
}
respondError(w, http.StatusInternalServerError, "update log scan rule")
return
}
s.reloadLogScan()
respondJSON(w, http.StatusOK, out)
}
// deleteLogScanRule handles DELETE /api/log-scan-rules/{id}. Override
// rows that reference this id are cascade-deleted by the store layer.
func (s *Server) deleteLogScanRule(w http.ResponseWriter, r *http.Request) {
id, ok := parseRuleID(w, r)
if !ok {
return
}
if err := s.store.DeleteLogScanRule(id); err != nil {
mapStoreError(w, err, "log scan rule")
return
}
s.reloadLogScan()
w.WriteHeader(http.StatusNoContent)
}
// testLogScanRule handles POST /api/log-scan-rules/{id}/test. Body
// `{"sample_line": "..."}` returns whether the rule pattern matches +
// any captured subgroups. Lets operators iterate on regexes in the
// UI without spinning up real container traffic.
func (s *Server) testLogScanRule(w http.ResponseWriter, r *http.Request) {
id, ok := parseRuleID(w, r)
if !ok {
return
}
rule, err := s.store.GetLogScanRule(id)
if err != nil {
mapStoreError(w, err, "log scan rule")
return
}
var body struct {
SampleLine string `json:"sample_line"`
}
if !decodeJSON(w, r, &body) {
return
}
respondJSON(w, http.StatusOK, testRuleAgainstLine(rule, body.SampleLine))
}
// getEffectiveLogScanRules handles GET /api/workloads/{id}/effective-rules.
// Returns the resolved effective rule set (globals minus overrides +
// workload-only + override-substitutes) that the scanner uses for
// this workload's containers.
func (s *Server) getEffectiveLogScanRules(w http.ResponseWriter, r *http.Request) {
workloadID := chi.URLParam(r, "id")
if workloadID == "" {
respondError(w, http.StatusBadRequest, "workload id required")
return
}
rules, err := s.store.EffectiveLogScanRules(workloadID)
if err != nil {
respondError(w, http.StatusInternalServerError, "compute effective rules")
return
}
respondJSON(w, http.StatusOK, rules)
}
// testResult is the shape returned by /test. Keeping it focused —
// caller wants a yes/no + captures so they can iterate, nothing more.
type ruleTestResult struct {
Matched bool `json:"matched"`
Captures map[string]string `json:"captures,omitempty"`
Error string `json:"error,omitempty"`
}
func testRuleAgainstLine(rule store.LogScanRule, line string) ruleTestResult {
re, err := regexp.Compile(rule.Pattern)
if err != nil {
return ruleTestResult{Error: "rule pattern is invalid: " + err.Error()}
}
subs := re.FindStringSubmatch(line)
if subs == nil {
return ruleTestResult{Matched: false}
}
captures := map[string]string{}
names := re.SubexpNames()
for i, s := range subs[1:] {
key := names[i+1]
if key == "" {
key = "$" + strconv.Itoa(i+1)
}
captures[key] = s
}
return ruleTestResult{Matched: true, Captures: captures}
}
func validateRulePattern(pattern string) string {
if strings.TrimSpace(pattern) == "" {
return "pattern is required"
}
if _, err := regexp.Compile(pattern); err != nil {
return "pattern invalid: " + err.Error()
}
return ""
}
// isClientValidationErr returns true when the store error is one of
// the validation errors raised by CreateLogScanRule /
// UpdateLogScanRule (name/pattern required, invalid enum, negative
// cooldown). Used to map those to 400 rather than 500 without
// exposing driver text.
func isClientValidationErr(err error) bool {
if err == nil {
return false
}
msg := err.Error()
for _, needle := range []string{
"name is required",
"pattern is required",
"invalid severity",
"invalid streams",
"cooldown_seconds must be",
"override row requires workload_id",
} {
if strings.Contains(msg, needle) {
return true
}
}
return false
}
func parseRuleID(w http.ResponseWriter, r *http.Request) (int64, bool) {
raw := chi.URLParam(r, "id")
id, err := strconv.ParseInt(raw, 10, 64)
if err != nil || id <= 0 {
respondError(w, http.StatusBadRequest, "invalid rule id")
return 0, false
}
return id, true
}
func derefInt64(p *int64) int64 {
if p == nil {
return 0
}
return *p
}
func derefIntDefault(p *int, def int) int {
if p == nil {
return def
}
return *p
}
// getLogScanStats handles GET /api/log-scan-rules/stats. Returns
// engine drop counters + last-snapshot compile errors + active
// tail count so operators can see when their patterns are too
// greedy or syntactically broken. When the scanner manager is not
// wired (scanner-disabled deployment), returns a zero-valued
// shape rather than 404 so the frontend can render the panel
// uniformly.
func (s *Server) getLogScanStats(w http.ResponseWriter, r *http.Request) {
if s.logScanReloader == nil {
respondJSON(w, http.StatusOK, logscanner.Stats{})
return
}
respondJSON(w, http.StatusOK, s.logScanReloader.Stats())
}
// reloadLogScan fires the manager's snapshot rebuild. Nil-tolerant
// so the API can run before the manager is wired (and in
// scanner-disabled deployments). Failures are logged at warn —
// we don't fail the originating CRUD request because that already
// succeeded, but operators need a signal so they don't chase a
// "why isn't my rule firing?" mystery.
func (s *Server) reloadLogScan() {
if s.logScanReloader == nil {
return
}
if err := s.logScanReloader.ReloadRules(); err != nil {
slog.Warn("log-scan reload failed; manager snapshot may be stale",
"error", err)
}
}