diff --git a/cmd/server/main.go b/cmd/server/main.go index 254ab00..9bce3e1 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -27,6 +27,7 @@ import ( "github.com/alexei/tinyforge/internal/events" "github.com/alexei/tinyforge/internal/health" "github.com/alexei/tinyforge/internal/logging" + "github.com/alexei/tinyforge/internal/logscanner" "github.com/alexei/tinyforge/internal/notify" "github.com/alexei/tinyforge/internal/npm" "github.com/alexei/tinyforge/internal/proxy" @@ -38,6 +39,16 @@ import ( "github.com/alexei/tinyforge/internal/staticsite" "github.com/alexei/tinyforge/internal/store" "github.com/alexei/tinyforge/internal/webhook" + + // Plugin registrations: each blank-import runs its init() and registers + // itself with internal/workload/plugin. Adding a new Source or Trigger + // is a matter of dropping a new package and adding it to this list. + _ "github.com/alexei/tinyforge/internal/workload/plugin/source/compose" + _ "github.com/alexei/tinyforge/internal/workload/plugin/source/image" + _ "github.com/alexei/tinyforge/internal/workload/plugin/source/static" + _ "github.com/alexei/tinyforge/internal/workload/plugin/trigger/git" + _ "github.com/alexei/tinyforge/internal/workload/plugin/trigger/manual" + _ "github.com/alexei/tinyforge/internal/workload/plugin/trigger/registry" ) func main() { @@ -105,6 +116,9 @@ func main() { rec := reconciler.New(db, dockerClient, 30*time.Second) rec.Start(context.Background()) defer rec.Stop() + // The plugin pass is wired after the deployer is constructed (below); + // the reconciler tolerates a nil dispatcher until then. SetPluginReconciler + // is safe to call at any time, including mid-tick. // Read settings for NPM URL and polling interval. settings, err := db.GetSettings() @@ -166,12 +180,24 @@ func main() { }) defer stopLogger() + // Event-trigger dispatcher: consume EventLog publishes off the bus + // and fan out to operator-configured webhook actions. Loop-prevention + // is structural — the dispatcher never writes back to event_log; all + // delivery outcomes land in notifier audit logging. + stopTriggerDispatcher := events.RegisterEventTriggerDispatcher(eventBus, db, notifier) + defer stopTriggerDispatcher() + dep := deployer.New(dockerClient, proxyProvider, db, healthChecker, notifier, eventBus, encKey) + rec.SetPluginReconciler(dep) // Initialize webhook handler. Per-project and per-site secrets are stored // on their respective rows; the static-site triggerer is wired in below // once the site manager has been constructed. webhookHandler := webhook.NewHandler(db, dep, nil) + // Plugin-pipeline dispatcher for /api/webhook/workloads/{secret}. + // Wired here so the same *deployer.Deployer serves both legacy and + // plugin-native paths from one place. + webhookHandler.SetPluginDispatcher(dep) // Initialize registry poller. poller := registry.NewPoller(db, dep, encKey) @@ -322,6 +348,11 @@ func main() { // Initialize static site manager and health checker. staticSiteMgr := staticsite.NewManager(db, dockerClient, proxyProvider, eventBus, notifier, encKey) webhookHandler.SetSiteSyncTriggerer(staticSiteMgr) + // Wire the plugin static source's backend to the manager. After this + // call the "static" kind appears in /api/hooks/kinds and the /apps/new + // picker; before it, the source registers no kind, so the frontend + // silently omits it. + wireStaticBackend(db, staticSiteMgr) staticSiteHealth := staticsite.NewHealthChecker(db, dockerClient, staticSiteMgr) if err := staticSiteHealth.Start("2m"); err != nil { slog.Warn("failed to start static site health checker", "error", err) @@ -339,6 +370,26 @@ func main() { stackMgr = nil } + // Log-scan manager: tails running containers and emits event_log + // entries when log lines match operator-configured regex rules. + // Start before the API server is wired so the reload callback can + // be plugged in via SetLogScanReloader. + logScanMgr := logscanner.NewManager(logscanner.Config{ + Rules: db, + Containers: db, + Docker: dockerClient, + Events: db, + Bus: eventBus, + PollInterval: 5 * time.Second, + }) + // Manager owns its own cancellation; Stop() drives the loop and + // every tail to exit. Using Background here matches the + // reconciler + stale-scanner pattern elsewhere in this file. + if err := logScanMgr.Start(context.Background()); err != nil { + slog.Warn("logscanner: initial rule load failed", "error", err) + } + defer logScanMgr.Stop() + // Build API server. apiServer := api.NewServer(db, dockerClient, npmClient, proxyProvider, dep, notifier, webhookHandler, eventBus, encKey) apiServer.SetStaticSiteManager(staticSiteMgr) @@ -346,6 +397,7 @@ func main() { apiServer.SetStackManager(stackMgr) } apiServer.SetStaleScanner(staleScanner) + apiServer.SetLogScanReloader(logScanMgr) apiServer.SetBackupEngine(backupEngine) apiServer.SetDBPath(dbPath) apiServer.SetBackupSettingsChangedCallback(scheduleAutobackup) diff --git a/internal/api/event_triggers.go b/internal/api/event_triggers.go new file mode 100644 index 0000000..1646b1c --- /dev/null +++ b/internal/api/event_triggers.go @@ -0,0 +1,325 @@ +// Package api: event-trigger HTTP handlers. The dispatcher itself +// lives in internal/events; this file is the REST surface that lets +// operators create, edit, and test triggers from the UI. +package api + +import ( + "context" + "errors" + "net" + "net/http" + "net/url" + "regexp" + "strconv" + "time" + + "github.com/go-chi/chi/v5" + + "github.com/alexei/tinyforge/internal/events" + "github.com/alexei/tinyforge/internal/notify" + "github.com/alexei/tinyforge/internal/store" +) + +// triggerInput is the JSON shape accepted by POST + PATCH. Pointers +// distinguish "absent" from a zero/empty value so PATCH can leave a +// field unchanged. Required fields on POST are validated explicitly. +type triggerInput struct { + Name *string `json:"name"` + FilterSeverity *string `json:"filter_severity"` + FilterSource *string `json:"filter_source"` + FilterMessageRegex *string `json:"filter_message_regex"` + ActionType *string `json:"action_type"` + ActionTarget *string `json:"action_target"` + ActionSecret *string `json:"action_secret"` // omit = leave unchanged; "" = clear + Enabled *bool `json:"enabled"` +} + +// actionSecretPlaceholder is what we return on read to signal "a secret +// is configured" without exposing the actual value. The edit page +// preserves this placeholder verbatim (or replaces it with a new value) +// — the API treats the placeholder as "no change" on PATCH. This is +// the same shape Stripe / GitHub use for their secret read APIs. +const actionSecretPlaceholder = "********" + +// listEventTriggers handles GET /api/event-triggers. Secrets are +// redacted to avoid exposing them on read; the edit page shows a +// "configured" indicator when a placeholder is present. +func (s *Server) listEventTriggers(w http.ResponseWriter, r *http.Request) { + out, err := s.store.ListEventTriggers() + if err != nil { + respondError(w, http.StatusInternalServerError, "list event triggers") + return + } + for i := range out { + out[i] = redactTriggerSecret(out[i]) + } + respondJSON(w, http.StatusOK, out) +} + +// getEventTrigger handles GET /api/event-triggers/{id}. +func (s *Server) getEventTrigger(w http.ResponseWriter, r *http.Request) { + id, ok := parseTriggerID(w, r) + if !ok { + return + } + t, err := s.store.GetEventTrigger(id) + if err != nil { + mapStoreError(w, err, "event trigger") + return + } + respondJSON(w, http.StatusOK, redactTriggerSecret(t)) +} + +// createEventTrigger handles POST /api/event-triggers. +func (s *Server) createEventTrigger(w http.ResponseWriter, r *http.Request) { + var in triggerInput + if !decodeJSON(w, r, &in) { + return + } + t := store.EventTrigger{ + Name: derefString(in.Name), + FilterSeverity: derefString(in.FilterSeverity), + FilterSource: derefString(in.FilterSource), + FilterMessageRegex: derefString(in.FilterMessageRegex), + ActionType: firstNonEmpty(derefString(in.ActionType), store.EventTriggerActionWebhook), + ActionTarget: derefString(in.ActionTarget), + ActionSecret: derefString(in.ActionSecret), + Enabled: in.Enabled == nil || *in.Enabled, + } + if msg := validateTrigger(t); msg != "" { + respondError(w, http.StatusBadRequest, msg) + return + } + out, err := s.store.CreateEventTrigger(t) + if err != nil { + // CreateEventTrigger returns validation-shaped errors plus + // raw DB errors. Validation already ran above, so anything + // here is a server-side problem — surface as 500 and avoid + // echoing driver text to the client. + respondError(w, http.StatusInternalServerError, "create event trigger") + return + } + respondJSON(w, http.StatusCreated, redactTriggerSecret(out)) +} + +// updateEventTrigger handles PATCH /api/event-triggers/{id}. Each +// field on the input is optional (pointer); absent fields are left +// unchanged. ActionSecret receives special treatment so the read-side +// placeholder round-trips safely. +func (s *Server) updateEventTrigger(w http.ResponseWriter, r *http.Request) { + id, ok := parseTriggerID(w, r) + if !ok { + return + } + existing, err := s.store.GetEventTrigger(id) + if err != nil { + mapStoreError(w, err, "event trigger") + return + } + + var in triggerInput + if !decodeJSON(w, r, &in) { + return + } + if in.Name != nil { + existing.Name = *in.Name + } + if in.FilterSeverity != nil { + existing.FilterSeverity = *in.FilterSeverity + } + if in.FilterSource != nil { + existing.FilterSource = *in.FilterSource + } + if in.FilterMessageRegex != nil { + existing.FilterMessageRegex = *in.FilterMessageRegex + } + if in.ActionType != nil && *in.ActionType != "" { + existing.ActionType = *in.ActionType + } + if in.ActionTarget != nil { + existing.ActionTarget = *in.ActionTarget + } + // Secret round-trip: the read API returns a placeholder when a + // secret is configured. If the client echoes the placeholder back + // unchanged we leave the stored secret alone; any other value + // (including the empty string) is treated as a deliberate update. + if in.ActionSecret != nil && *in.ActionSecret != actionSecretPlaceholder { + existing.ActionSecret = *in.ActionSecret + } + if in.Enabled != nil { + existing.Enabled = *in.Enabled + } + + if msg := validateTrigger(existing); msg != "" { + respondError(w, http.StatusBadRequest, msg) + return + } + + out, err := s.store.UpdateEventTrigger(existing) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + respondNotFound(w, "event trigger") + return + } + respondError(w, http.StatusInternalServerError, "update event trigger") + return + } + respondJSON(w, http.StatusOK, redactTriggerSecret(out)) +} + +// deleteEventTrigger handles DELETE /api/event-triggers/{id}. +func (s *Server) deleteEventTrigger(w http.ResponseWriter, r *http.Request) { + id, ok := parseTriggerID(w, r) + if !ok { + return + } + if err := s.store.DeleteEventTrigger(id); err != nil { + mapStoreError(w, err, "event trigger") + return + } + w.WriteHeader(http.StatusNoContent) +} + +// testEventTrigger handles POST /api/event-triggers/{id}/test. Sends +// a real TriggerWebhookPayload to the action target so receivers see +// the same shape they'll see at runtime. Routes through the dedicated +// SendSyncForTestPayload path that preserves the payload through the +// HMAC+HTTP core unchanged. +func (s *Server) testEventTrigger(w http.ResponseWriter, r *http.Request) { + id, ok := parseTriggerID(w, r) + if !ok { + return + } + t, err := s.store.GetEventTrigger(id) + if err != nil { + mapStoreError(w, err, "event trigger") + return + } + if t.ActionType != store.EventTriggerActionWebhook { + respondError(w, http.StatusBadRequest, "action_type not testable") + return + } + + now := time.Now().UTC().Format(time.RFC3339) + payload := events.TriggerWebhookPayload{ + Type: "event_trigger", + TriggerID: t.ID, + Trigger: t.Name, + Event: events.EventLogPayload{ + ID: -1, + Source: "test", + Severity: "info", + Message: "Test event from Tinyforge — trigger=" + t.Name, + Metadata: `{"synthetic":true}`, + CreatedAt: now, + }, + Timestamp: now, + } + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + result := s.notifier.SendSyncForTestPayload(ctx, t.ActionTarget, t.ActionSecret, + notify.TierEventTrigger, "event_trigger", payload) + respondJSON(w, http.StatusOK, result) +} + +// validateTrigger runs the full set of invariants over a fully-merged +// trigger row. Called by both create and update so the contract is +// enforced once. Returns an empty string for a valid trigger. +func validateTrigger(t store.EventTrigger) string { + if t.Name == "" { + return "name is required" + } + if t.ActionType != "" && t.ActionType != store.EventTriggerActionWebhook { + return "action_type must be 'webhook'" + } + if t.ActionTarget == "" { + return "action_target is required" + } + if msg := validateWebhookURL(t.ActionTarget); msg != "" { + return msg + } + if t.FilterMessageRegex != "" { + if _, err := regexp.Compile(t.FilterMessageRegex); err != nil { + return "filter_message_regex invalid: " + err.Error() + } + } + return "" +} + +// validateWebhookURL guards against the most common SSRF vectors that +// admin-controlled webhook URLs enable: non-http(s) schemes, missing +// host, and internal-network targets (loopback / link-local / RFC1918 +// when the hostname resolves to a literal). Hostname-based lookups +// are NOT resolved here — DNS rebinding is out of scope and would +// require enforcement at dispatch time too. Admin gating remains the +// primary control; this is defense-in-depth. +func validateWebhookURL(raw string) string { + u, err := url.Parse(raw) + if err != nil { + return "action_target invalid URL: " + err.Error() + } + if u.Scheme != "http" && u.Scheme != "https" { + return "action_target must be http:// or https://" + } + host := u.Hostname() + if host == "" { + return "action_target missing host" + } + // Literal-IP guard: block loopback / link-local / unspecified + // addresses outright. RFC1918 private ranges are intentionally + // allowed since same-LAN receivers are a legitimate Tinyforge + // deployment pattern. + if ip := net.ParseIP(host); ip != nil { + if ip.IsLoopback() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() || ip.IsUnspecified() { + return "action_target points at a reserved/loopback address" + } + } + return "" +} + +// redactTriggerSecret returns a copy of t with ActionSecret replaced +// by the placeholder string when a secret is configured. Empty secret +// stays empty so the UI can distinguish "no signing" from "signing +// configured." +func redactTriggerSecret(t store.EventTrigger) store.EventTrigger { + if t.ActionSecret != "" { + t.ActionSecret = actionSecretPlaceholder + } + return t +} + +// mapStoreError translates a store-layer error into an HTTP status + +// generic message. ErrNotFound → 404; everything else → 500 without +// echoing driver text to the client (avoids leaking schema details +// or transient error states to API consumers). +func mapStoreError(w http.ResponseWriter, err error, resource string) { + if errors.Is(err, store.ErrNotFound) { + respondNotFound(w, resource) + return + } + respondError(w, http.StatusInternalServerError, "get "+resource) +} + +func parseTriggerID(w http.ResponseWriter, r *http.Request) (int64, bool) { + raw := chi.URLParam(r, "id") + id, err := strconv.ParseInt(raw, 10, 64) + if err != nil || id <= 0 { + respondError(w, http.StatusBadRequest, "invalid event trigger id") + return 0, false + } + return id, true +} + +func derefString(p *string) string { + if p == nil { + return "" + } + return *p +} + +func firstNonEmpty(a, b string) string { + if a != "" { + return a + } + return b +} diff --git a/internal/api/event_triggers_test.go b/internal/api/event_triggers_test.go new file mode 100644 index 0000000..52b51a4 --- /dev/null +++ b/internal/api/event_triggers_test.go @@ -0,0 +1,143 @@ +package api + +import ( + "strings" + "testing" + + "github.com/alexei/tinyforge/internal/store" +) + +func TestValidateWebhookURL(t *testing.T) { + cases := []struct { + name string + url string + wantErr string // substring; empty = pass + }{ + {"https valid", "https://example.com/hook", ""}, + {"http valid", "http://example.com:8080/hook", ""}, + {"RFC1918 private LAN allowed", "http://192.168.1.50:9090/hook", ""}, + {"loopback rejected", "http://127.0.0.1:8090/hook", "loopback"}, + {"ipv6 loopback rejected", "http://[::1]:9000/hook", "loopback"}, + {"link-local rejected", "http://169.254.169.254/latest/meta-data", "reserved"}, + {"unspecified rejected", "http://0.0.0.0:9000/hook", "reserved"}, + {"file scheme rejected", "file:///etc/passwd", "http:// or https://"}, + {"missing host rejected", "https://", "missing host"}, + {"malformed url rejected", "://nope", "invalid URL"}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + got := validateWebhookURL(c.url) + if c.wantErr == "" { + if got != "" { + t.Fatalf("expected pass, got error: %q", got) + } + return + } + if !strings.Contains(got, c.wantErr) { + t.Fatalf("error mismatch:\n got: %q\n want substring: %q", got, c.wantErr) + } + }) + } +} + +func TestValidateTrigger(t *testing.T) { + cases := []struct { + name string + in store.EventTrigger + want string // substring of error; empty = pass + }{ + { + name: "missing name", + in: store.EventTrigger{ActionTarget: "https://x.example.com/h"}, + want: "name is required", + }, + { + name: "missing target", + in: store.EventTrigger{Name: "n"}, + want: "action_target is required", + }, + { + name: "bad scheme", + in: store.EventTrigger{Name: "n", ActionTarget: "ftp://x.example.com/h"}, + want: "http:// or https://", + }, + { + name: "loopback target", + in: store.EventTrigger{Name: "n", ActionTarget: "http://127.0.0.1/hook"}, + want: "loopback", + }, + { + name: "unsupported action_type", + in: store.EventTrigger{Name: "n", ActionType: "email", ActionTarget: "https://x.example.com/h"}, + want: "action_type must be", + }, + { + name: "invalid regex", + in: store.EventTrigger{ + Name: "n", ActionTarget: "https://x.example.com/h", + FilterMessageRegex: "([unclosed", + }, + want: "filter_message_regex invalid", + }, + { + name: "all valid", + in: store.EventTrigger{ + Name: "n", + ActionTarget: "https://x.example.com/h", + FilterSeverity: "warn,error", + FilterMessageRegex: `\bpanic\b`, + }, + want: "", + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + got := validateTrigger(c.in) + if c.want == "" { + if got != "" { + t.Fatalf("expected pass, got error: %q", got) + } + return + } + if !strings.Contains(got, c.want) { + t.Fatalf("error mismatch:\n got: %q\n want substring: %q", got, c.want) + } + }) + } +} + +func TestRedactTriggerSecret(t *testing.T) { + withSecret := store.EventTrigger{Name: "n", ActionSecret: "shh-real-secret"} + got := redactTriggerSecret(withSecret) + if got.ActionSecret != actionSecretPlaceholder { + t.Errorf("expected placeholder, got %q", got.ActionSecret) + } + if withSecret.ActionSecret != "shh-real-secret" { + t.Errorf("original mutated: %q", withSecret.ActionSecret) + } + + noSecret := store.EventTrigger{Name: "n", ActionSecret: ""} + got2 := redactTriggerSecret(noSecret) + if got2.ActionSecret != "" { + t.Errorf("empty secret should stay empty, got %q", got2.ActionSecret) + } +} + +func TestDerefString(t *testing.T) { + if derefString(nil) != "" { + t.Error("nil should deref to empty string") + } + s := "value" + if derefString(&s) != "value" { + t.Error("non-nil should deref to value") + } +} + +func TestFirstNonEmpty(t *testing.T) { + if firstNonEmpty("a", "b") != "a" { + t.Error("non-empty first wins") + } + if firstNonEmpty("", "b") != "b" { + t.Error("fallback when first empty") + } +} diff --git a/internal/api/log_scan_rules.go b/internal/api/log_scan_rules.go new file mode 100644 index 0000000..29edbd7 --- /dev/null +++ b/internal/api/log_scan_rules.go @@ -0,0 +1,350 @@ +// Package api: log-scan rule HTTP handlers. The scanner manager +// lives in internal/logscanner; this file is the REST surface that +// lets operators create, edit, and test rules from the UI. +package api + +import ( + "errors" + "log/slog" + "net/http" + "regexp" + "strconv" + "strings" + + "github.com/go-chi/chi/v5" + + "github.com/alexei/tinyforge/internal/logscanner" + "github.com/alexei/tinyforge/internal/store" +) + +// LogScanReloader is what the API calls after any rule CRUD so the +// scanner manager swaps its snapshot, and what the /stats endpoint +// queries for runtime counters. Implemented by *logscanner.Manager; +// nil-tolerant on the API side so the routes still work in a +// scanner-disabled deployment. +type LogScanReloader interface { + ReloadRules() error + Stats() logscanner.Stats +} + +// SetLogScanReloader wires the API → manager reload signal. Called +// from main after both subsystems are constructed. +func (s *Server) SetLogScanReloader(r LogScanReloader) { + s.logScanReloader = r +} + +// ruleInput is the JSON shape accepted by POST + PATCH. Pointers +// distinguish "absent" from explicit empty/zero. WorkloadID and +// OverridesID are immutable on update (per store.UpdateLogScanRule) +// so they only appear here for create. +type ruleInput struct { + WorkloadID *string `json:"workload_id"` + OverridesID *int64 `json:"overrides_id"` + Name *string `json:"name"` + Pattern *string `json:"pattern"` + Severity *string `json:"severity"` + Streams *string `json:"streams"` + CooldownSeconds *int `json:"cooldown_seconds"` + Enabled *bool `json:"enabled"` +} + +// listLogScanRules handles GET /api/log-scan-rules. Optional query +// filter `workload_id=...` returns only rules scoped to that +// workload (workload-only + override rows, NOT globals). +func (s *Server) listLogScanRules(w http.ResponseWriter, r *http.Request) { + if wlID := r.URL.Query().Get("workload_id"); wlID != "" { + out, err := s.store.ListLogScanRulesByWorkload(wlID) + if err != nil { + respondError(w, http.StatusInternalServerError, "list log scan rules") + return + } + respondJSON(w, http.StatusOK, out) + return + } + out, err := s.store.ListLogScanRules() + if err != nil { + respondError(w, http.StatusInternalServerError, "list log scan rules") + return + } + respondJSON(w, http.StatusOK, out) +} + +// getLogScanRule handles GET /api/log-scan-rules/{id}. +func (s *Server) getLogScanRule(w http.ResponseWriter, r *http.Request) { + id, ok := parseRuleID(w, r) + if !ok { + return + } + rule, err := s.store.GetLogScanRule(id) + if err != nil { + mapStoreError(w, err, "log scan rule") + return + } + respondJSON(w, http.StatusOK, rule) +} + +// createLogScanRule handles POST /api/log-scan-rules. +func (s *Server) createLogScanRule(w http.ResponseWriter, r *http.Request) { + var in ruleInput + if !decodeJSON(w, r, &in) { + return + } + rule := store.LogScanRule{ + WorkloadID: derefString(in.WorkloadID), + OverridesID: derefInt64(in.OverridesID), + Name: derefString(in.Name), + Pattern: derefString(in.Pattern), + Severity: firstNonEmpty(derefString(in.Severity), store.LogScanSeverityWarn), + Streams: firstNonEmpty(derefString(in.Streams), store.LogScanStreamAll), + CooldownSeconds: derefIntDefault(in.CooldownSeconds, 60), + Enabled: in.Enabled == nil || *in.Enabled, + } + if msg := validateRulePattern(rule.Pattern); msg != "" { + respondError(w, http.StatusBadRequest, msg) + return + } + out, err := s.store.CreateLogScanRule(rule) + if err != nil { + // Store-side validation errors map to 400; anything else + // (driver errors) is a 500 without leaking the raw text. + if isClientValidationErr(err) { + respondError(w, http.StatusBadRequest, err.Error()) + return + } + respondError(w, http.StatusInternalServerError, "create log scan rule") + return + } + s.reloadLogScan() + respondJSON(w, http.StatusCreated, out) +} + +// updateLogScanRule handles PATCH /api/log-scan-rules/{id}. Scope +// fields (workload_id, overrides_id) are immutable; pattern/severity/ +// streams/cooldown/enabled/name are individually overridable. +func (s *Server) updateLogScanRule(w http.ResponseWriter, r *http.Request) { + id, ok := parseRuleID(w, r) + if !ok { + return + } + existing, err := s.store.GetLogScanRule(id) + if err != nil { + mapStoreError(w, err, "log scan rule") + return + } + var in ruleInput + if !decodeJSON(w, r, &in) { + return + } + if in.Name != nil { + existing.Name = *in.Name + } + if in.Pattern != nil { + existing.Pattern = *in.Pattern + } + if in.Severity != nil && *in.Severity != "" { + existing.Severity = *in.Severity + } + if in.Streams != nil && *in.Streams != "" { + existing.Streams = *in.Streams + } + if in.CooldownSeconds != nil { + existing.CooldownSeconds = *in.CooldownSeconds + } + if in.Enabled != nil { + existing.Enabled = *in.Enabled + } + if msg := validateRulePattern(existing.Pattern); msg != "" { + respondError(w, http.StatusBadRequest, msg) + return + } + out, err := s.store.UpdateLogScanRule(existing) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + respondNotFound(w, "log scan rule") + return + } + if isClientValidationErr(err) { + respondError(w, http.StatusBadRequest, err.Error()) + return + } + respondError(w, http.StatusInternalServerError, "update log scan rule") + return + } + s.reloadLogScan() + respondJSON(w, http.StatusOK, out) +} + +// deleteLogScanRule handles DELETE /api/log-scan-rules/{id}. Override +// rows that reference this id are cascade-deleted by the store layer. +func (s *Server) deleteLogScanRule(w http.ResponseWriter, r *http.Request) { + id, ok := parseRuleID(w, r) + if !ok { + return + } + if err := s.store.DeleteLogScanRule(id); err != nil { + mapStoreError(w, err, "log scan rule") + return + } + s.reloadLogScan() + w.WriteHeader(http.StatusNoContent) +} + +// testLogScanRule handles POST /api/log-scan-rules/{id}/test. Body +// `{"sample_line": "..."}` returns whether the rule pattern matches + +// any captured subgroups. Lets operators iterate on regexes in the +// UI without spinning up real container traffic. +func (s *Server) testLogScanRule(w http.ResponseWriter, r *http.Request) { + id, ok := parseRuleID(w, r) + if !ok { + return + } + rule, err := s.store.GetLogScanRule(id) + if err != nil { + mapStoreError(w, err, "log scan rule") + return + } + var body struct { + SampleLine string `json:"sample_line"` + } + if !decodeJSON(w, r, &body) { + return + } + respondJSON(w, http.StatusOK, testRuleAgainstLine(rule, body.SampleLine)) +} + +// getEffectiveLogScanRules handles GET /api/workloads/{id}/effective-rules. +// Returns the resolved effective rule set (globals minus overrides + +// workload-only + override-substitutes) that the scanner uses for +// this workload's containers. +func (s *Server) getEffectiveLogScanRules(w http.ResponseWriter, r *http.Request) { + workloadID := chi.URLParam(r, "id") + if workloadID == "" { + respondError(w, http.StatusBadRequest, "workload id required") + return + } + rules, err := s.store.EffectiveLogScanRules(workloadID) + if err != nil { + respondError(w, http.StatusInternalServerError, "compute effective rules") + return + } + respondJSON(w, http.StatusOK, rules) +} + +// testResult is the shape returned by /test. Keeping it focused — +// caller wants a yes/no + captures so they can iterate, nothing more. +type ruleTestResult struct { + Matched bool `json:"matched"` + Captures map[string]string `json:"captures,omitempty"` + Error string `json:"error,omitempty"` +} + +func testRuleAgainstLine(rule store.LogScanRule, line string) ruleTestResult { + re, err := regexp.Compile(rule.Pattern) + if err != nil { + return ruleTestResult{Error: "rule pattern is invalid: " + err.Error()} + } + subs := re.FindStringSubmatch(line) + if subs == nil { + return ruleTestResult{Matched: false} + } + captures := map[string]string{} + names := re.SubexpNames() + for i, s := range subs[1:] { + key := names[i+1] + if key == "" { + key = "$" + strconv.Itoa(i+1) + } + captures[key] = s + } + return ruleTestResult{Matched: true, Captures: captures} +} + +func validateRulePattern(pattern string) string { + if strings.TrimSpace(pattern) == "" { + return "pattern is required" + } + if _, err := regexp.Compile(pattern); err != nil { + return "pattern invalid: " + err.Error() + } + return "" +} + +// isClientValidationErr returns true when the store error is one of +// the validation errors raised by CreateLogScanRule / +// UpdateLogScanRule (name/pattern required, invalid enum, negative +// cooldown). Used to map those to 400 rather than 500 without +// exposing driver text. +func isClientValidationErr(err error) bool { + if err == nil { + return false + } + msg := err.Error() + for _, needle := range []string{ + "name is required", + "pattern is required", + "invalid severity", + "invalid streams", + "cooldown_seconds must be", + "override row requires workload_id", + } { + if strings.Contains(msg, needle) { + return true + } + } + return false +} + +func parseRuleID(w http.ResponseWriter, r *http.Request) (int64, bool) { + raw := chi.URLParam(r, "id") + id, err := strconv.ParseInt(raw, 10, 64) + if err != nil || id <= 0 { + respondError(w, http.StatusBadRequest, "invalid rule id") + return 0, false + } + return id, true +} + +func derefInt64(p *int64) int64 { + if p == nil { + return 0 + } + return *p +} + +func derefIntDefault(p *int, def int) int { + if p == nil { + return def + } + return *p +} + +// getLogScanStats handles GET /api/log-scan-rules/stats. Returns +// engine drop counters + last-snapshot compile errors + active +// tail count so operators can see when their patterns are too +// greedy or syntactically broken. When the scanner manager is not +// wired (scanner-disabled deployment), returns a zero-valued +// shape rather than 404 so the frontend can render the panel +// uniformly. +func (s *Server) getLogScanStats(w http.ResponseWriter, r *http.Request) { + if s.logScanReloader == nil { + respondJSON(w, http.StatusOK, logscanner.Stats{}) + return + } + respondJSON(w, http.StatusOK, s.logScanReloader.Stats()) +} + +// reloadLogScan fires the manager's snapshot rebuild. Nil-tolerant +// so the API can run before the manager is wired (and in +// scanner-disabled deployments). Failures are logged at warn — +// we don't fail the originating CRUD request because that already +// succeeded, but operators need a signal so they don't chase a +// "why isn't my rule firing?" mystery. +func (s *Server) reloadLogScan() { + if s.logScanReloader == nil { + return + } + if err := s.logScanReloader.ReloadRules(); err != nil { + slog.Warn("log-scan reload failed; manager snapshot may be stale", + "error", err) + } +} diff --git a/internal/api/router.go b/internal/api/router.go index 263ebdf..4683595 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -50,6 +50,7 @@ type Server struct { stackManager *stack.Manager backupEngine *backup.Engine sseGate *sseGate + logScanReloader LogScanReloader dbPath string shutdownFunc func() // called after restore to trigger graceful shutdown onBackupSettingsChanged func(enabled bool, intervalHours int) // called when backup settings change @@ -217,13 +218,26 @@ func (s *Server) Router() chi.Router { r.Group(func(r chi.Router) { r.Use(auth.Middleware(s.localAuth)) + // Plugin registry inspection + unified ingress (Workload refactor). + // /hooks/kinds is informational and visible to any authenticated + // caller. /hooks/generic dispatches deploys and is admin-gated — + // vendor-specific webhooks (with their own per-target HMAC + // secrets) live under /webhook/* and remain the only ingress + // reachable by external CI systems until Phase 5 consolidates them. + r.Get("/hooks/kinds", s.listHookKinds) + r.Get("/hooks/kinds/{kind}/schema", s.getHookKindSchema) + r.With(auth.AdminOnly).Post("/hooks/generic", s.dispatchGeneric) + // Read-only endpoints (any authenticated user). r.Get("/health", s.getHealth) r.Get("/auth/me", s.currentUser) r.Post("/auth/logout", s.logout) r.Get("/proxies", s.listProxyRoutes) r.Get("/docker/unused-images", s.unusedImageStats) - r.Get("/projects", s.listProjects) + // Legacy project/stage/site/stack endpoints carry a Deprecation + // header pointing at /api/workloads. Functional behavior is + // unchanged until the hard cutover removes them. + r.With(deprecated("/api/workloads")).Get("/projects", s.listProjects) r.Route("/projects/{id}", func(r chi.Router) { r.Get("/", s.getProject) r.Get("/stages/{stage}/env", s.listStageEnv) @@ -290,7 +304,7 @@ func (s *Server) Router() chi.Router { }) }) // Stacks (docker-compose). - r.Get("/stacks", s.listStacks) + r.With(deprecated("/api/workloads?kind=plugin&source_kind=compose")).Get("/stacks", s.listStacks) r.Route("/stacks/{id}", func(r chi.Router) { r.Get("/", s.getStack) r.Get("/revisions", s.listStackRevisions) @@ -311,7 +325,7 @@ func (s *Server) Router() chi.Router { r.With(auth.AdminOnly).Post("/stacks", s.createStack) // Static sites. - r.Get("/sites", s.listStaticSites) + r.With(deprecated("/api/workloads?kind=plugin&source_kind=static")).Get("/sites", s.listStaticSites) r.Route("/sites/{id}", func(r chi.Router) { r.Get("/", s.getStaticSite) r.Get("/secrets", s.listStaticSiteSecrets) @@ -375,13 +389,47 @@ func (s *Server) Router() chi.Router { r.Get("/containers/stale", s.listStaleContainers) // Workload-shaped endpoints (the unifying layer over project / - // stack / site). Read-only; mutations still go through the - // kind-specific endpoints (POST /projects, PUT /stacks/{id}, …). + // stack / site). Read endpoints are open to any authenticated + // user; create / update / deploy mutate state and are admin-gated. + // Plugin-native workloads (source_kind + trigger_kind set) are + // created here; legacy project / stack / site mutations remain at + // their dedicated endpoints during the cutover. r.Get("/workloads", s.listWorkloads) + r.With(auth.AdminOnly).Post("/workloads", s.createPluginWorkload) r.Route("/workloads/{id}", func(r chi.Router) { r.Get("/", s.getWorkload) r.Get("/containers", s.listWorkloadContainers) + r.Get("/containers/{cid}/logs", s.streamWorkloadContainerLogs) r.With(auth.AdminOnly).Patch("/app", s.updateWorkloadAppID) + r.With(auth.AdminOnly).Put("/plugin", s.updatePluginWorkload) + r.With(auth.AdminOnly).Post("/deploy", s.deployPluginWorkload) + r.With(auth.AdminOnly).Delete("/", s.deletePluginWorkload) + + // Per-workload env vars (analog of legacy stage_env). + // Listing is open to authenticated readers; mutations are + // admin-gated. Encrypted values are write-only after store. + r.Get("/env", s.listWorkloadEnv) + r.With(auth.AdminOnly).Put("/env", s.setWorkloadEnv) + r.With(auth.AdminOnly).Delete("/env/{envID}", s.deleteWorkloadEnv) + + // Per-workload inbound webhook URL: rotate the secret + fetch + // the canonical URL. Mirrors the project / site webhook UX. + r.With(auth.AdminOnly).Get("/webhook", s.getWorkloadWebhook) + r.With(auth.AdminOnly).Post("/webhook/regenerate", s.regenerateWorkloadWebhook) + + // Per-workload volume mounts (analog of legacy project volumes). + // Reads are open to authenticated users; mutations admin-gated. + // Source/target paths are validated for traversal safety here; + // host-path allow-listing happens at deploy time. + r.Get("/volumes", s.listWorkloadVolumes) + r.With(auth.AdminOnly).Put("/volumes", s.setWorkloadVolume) + r.With(auth.AdminOnly).Delete("/volumes/{volID}", s.deleteWorkloadVolume) + + // Stages chain: parent + self + direct children, plus a + // promote-from action that copies the source workload's + // running image tag onto this workload's default_tag. + r.Get("/chain", s.getWorkloadChain) + r.With(auth.AdminOnly).Post("/promote-from/{sourceID}", s.promoteFromWorkload) }) // Global container index, joined to workload + app names. @@ -398,6 +446,37 @@ func (s *Server) Router() chi.Router { r.Delete("/apps/{id}", s.deleteApp) }) + // Event triggers: filter+action rules over the event_log + // stream. Read endpoints are available to any authenticated + // user; mutations + test-dispatch are admin-gated since they + // can fire arbitrary outbound webhooks. + r.Get("/event-triggers", s.listEventTriggers) + r.Get("/event-triggers/{id}", s.getEventTrigger) + r.Group(func(r chi.Router) { + r.Use(auth.AdminOnly) + r.Post("/event-triggers", s.createEventTrigger) + r.Patch("/event-triggers/{id}", s.updateEventTrigger) + r.Delete("/event-triggers/{id}", s.deleteEventTrigger) + r.Post("/event-triggers/{id}/test", s.testEventTrigger) + }) + + // Log-scan rules: regex patterns the scanner manager + // applies to container log lines. Read endpoints are + // available to any authenticated user; mutations are + // admin-gated since they can change global observability + // behavior across every workload. + r.Get("/log-scan-rules", s.listLogScanRules) + r.Get("/log-scan-rules/stats", s.getLogScanStats) + r.Get("/log-scan-rules/{id}", s.getLogScanRule) + r.Get("/workloads/{id}/effective-rules", s.getEffectiveLogScanRules) + r.Group(func(r chi.Router) { + r.Use(auth.AdminOnly) + r.Post("/log-scan-rules", s.createLogScanRule) + r.Patch("/log-scan-rules/{id}", s.updateLogScanRule) + r.Delete("/log-scan-rules/{id}", s.deleteLogScanRule) + r.Post("/log-scan-rules/{id}/test", s.testLogScanRule) + }) + // System resources (read-only). r.Get("/system/stats", s.getSystemStats) r.Get("/system/stats/history", s.getSystemStatsHistory) diff --git a/internal/docker/container.go b/internal/docker/container.go index f8d88ab..e7a9702 100644 --- a/internal/docker/container.go +++ b/internal/docker/container.go @@ -359,12 +359,41 @@ func isTinyforgeManaged(labels map[string]string) bool { // ContainerLogs returns a log stream for a container. // If follow is true, the stream stays open for new log lines. // tail specifies the number of lines from the end to return (e.g., "200"). +// Both stdout and stderr are streamed. For stream-selective reads +// (e.g. the log scanner narrowing to stderr-only), use ContainerLogsOpts. func (c *Client) ContainerLogs(ctx context.Context, containerID string, follow bool, tail string) (io.ReadCloser, error) { - result, err := c.api.ContainerLogs(ctx, containerID, client.ContainerLogsOptions{ - ShowStdout: true, - ShowStderr: true, + return c.ContainerLogsOpts(ctx, containerID, ContainerLogOptions{ Follow: follow, Tail: tail, + ShowStdout: true, + ShowStderr: true, + }) +} + +// ContainerLogOptions controls which streams + framing are pulled +// from a container. Currently expanded over the legacy ContainerLogs +// shape so the log-scanner can read stderr-only rules without +// post-filtering every line. +type ContainerLogOptions struct { + Follow bool + Tail string + ShowStdout bool + ShowStderr bool +} + +// ContainerLogsOpts is the stream-selectable counterpart to +// ContainerLogs. When both ShowStdout and ShowStderr are false the +// upstream client returns an empty stream — we treat that as caller +// error and return an explicit message rather than a silent no-op. +func (c *Client) ContainerLogsOpts(ctx context.Context, containerID string, opts ContainerLogOptions) (io.ReadCloser, error) { + if !opts.ShowStdout && !opts.ShowStderr { + return nil, fmt.Errorf("container logs %s: at least one of ShowStdout/ShowStderr must be true", containerID) + } + result, err := c.api.ContainerLogs(ctx, containerID, client.ContainerLogsOptions{ + ShowStdout: opts.ShowStdout, + ShowStderr: opts.ShowStderr, + Follow: opts.Follow, + Tail: opts.Tail, Timestamps: true, }) if err != nil { diff --git a/internal/events/dispatcher.go b/internal/events/dispatcher.go new file mode 100644 index 0000000..d25b80e --- /dev/null +++ b/internal/events/dispatcher.go @@ -0,0 +1,169 @@ +package events + +import ( + "fmt" + "log/slog" + "regexp" + "strings" + "sync" + "time" + + "github.com/alexei/tinyforge/internal/store" +) + +// TriggerSource is the read-side seam the dispatcher uses to fetch the +// currently-enabled set of triggers. Kept as an interface so tests can +// swap in a static list without spinning up SQLite. The dispatcher +// re-reads triggers from this source on every event so config edits +// take effect within one event without an explicit hot-reload hook. +type TriggerSource interface { + ListEnabledEventTriggers() ([]store.EventTrigger, error) +} + +// TriggerNotifier is what the dispatcher uses to deliver. Real callers +// pass *notify.Notifier; tests pass a recorder. The shape matches the +// notifier method one-to-one so wiring is just a method-value pass. +type TriggerNotifier interface { + SendPayload(webhookURL, secret, eventType string, payload any) +} + +// TriggerWebhookPayload is the JSON shape sent to action_target webhook +// receivers. Includes both the event that fired the trigger and a brief +// trigger descriptor so receivers can route by trigger name or filter +// shape without re-looking-up the rule. +type TriggerWebhookPayload struct { + Type string `json:"type"` // "event_trigger" — stable + TriggerID int64 `json:"trigger_id"` + Trigger string `json:"trigger_name"` + Event EventLogPayload `json:"event"` + Timestamp string `json:"timestamp"` +} + +// RegisterEventTriggerDispatcher subscribes to EventLog events on the +// bus and dispatches matching triggers via the supplied notifier. +// +// Loop-prevention is structural: the dispatcher never writes to +// event_log. All delivery outcomes are recorded inside the notifier +// implementation (webhook_deliveries audit trail today). Adding a new +// EventLog row here would cause the dispatcher to re-fire on its own +// emission — a tight feedback loop the design explicitly forbids. +// +// Returns an unsubscribe function. Safe to call multiple times. +func RegisterEventTriggerDispatcher(b *Bus, triggers TriggerSource, notifier TriggerNotifier) func() { + sub := b.Subscribe(func(evt Event) bool { return evt.Type == EventLog }) + d := &dispatcher{ + triggers: triggers, + notifier: notifier, + regexCache: map[string]*regexp.Regexp{}, + } + go func() { + for evt := range sub { + payload, ok := evt.Payload.(EventLogPayload) + if !ok { + continue + } + d.handle(payload) + } + }() + return func() { b.Unsubscribe(sub) } +} + +type dispatcher struct { + triggers TriggerSource + notifier TriggerNotifier + + // regexCache memoizes compiled message-regex patterns so the hot + // path doesn't re-compile on every event. Bounded by the number of + // distinct patterns across all triggers, which is small in practice. + mu sync.Mutex + regexCache map[string]*regexp.Regexp +} + +func (d *dispatcher) handle(p EventLogPayload) { + triggers, err := d.triggers.ListEnabledEventTriggers() + if err != nil { + slog.Warn("event-trigger dispatcher: list failed", "error", err) + return + } + for _, t := range triggers { + ok, err := d.matches(t, p) + if err != nil { + slog.Warn("event-trigger: filter eval failed", + "trigger", t.Name, "error", err) + continue + } + if !ok { + continue + } + switch t.ActionType { + case store.EventTriggerActionWebhook: + d.notifier.SendPayload(t.ActionTarget, t.ActionSecret, "event_trigger", + TriggerWebhookPayload{ + Type: "event_trigger", + TriggerID: t.ID, + Trigger: t.Name, + Event: p, + Timestamp: time.Now().UTC().Format(time.RFC3339), + }) + default: + slog.Warn("event-trigger: unsupported action_type", + "trigger", t.Name, "action_type", t.ActionType) + } + } +} + +// matches evaluates the (severity, source, message-regex) filters +// against an event log payload. AND semantics — every non-empty filter +// must pass. An empty filter is "any" and silently passes. +func (d *dispatcher) matches(t store.EventTrigger, p EventLogPayload) (bool, error) { + if !filterMatchCSV(t.FilterSeverity, p.Severity) { + return false, nil + } + if !filterMatchCSV(t.FilterSource, p.Source) { + return false, nil + } + if t.FilterMessageRegex != "" { + re, err := d.compile(t.FilterMessageRegex) + if err != nil { + return false, fmt.Errorf("invalid regex %q: %w", t.FilterMessageRegex, err) + } + if !re.MatchString(p.Message) { + return false, nil + } + } + return true, nil +} + +// filterMatchCSV returns true when the candidate equals one of the +// comma-separated values in filter, or when filter is empty (no filter +// = match-all). Whitespace around list entries is tolerated so the +// operator's CSV pasting is forgiving. +func filterMatchCSV(filter, candidate string) bool { + filter = strings.TrimSpace(filter) + if filter == "" { + return true + } + for _, p := range strings.Split(filter, ",") { + if strings.TrimSpace(p) == candidate { + return true + } + } + return false +} + +func (d *dispatcher) compile(pattern string) (*regexp.Regexp, error) { + d.mu.Lock() + if cached, ok := d.regexCache[pattern]; ok { + d.mu.Unlock() + return cached, nil + } + d.mu.Unlock() + re, err := regexp.Compile(pattern) + if err != nil { + return nil, err + } + d.mu.Lock() + d.regexCache[pattern] = re + d.mu.Unlock() + return re, nil +} diff --git a/internal/events/dispatcher_test.go b/internal/events/dispatcher_test.go new file mode 100644 index 0000000..6d2df45 --- /dev/null +++ b/internal/events/dispatcher_test.go @@ -0,0 +1,252 @@ +package events + +import ( + "errors" + "regexp" + "sync" + "testing" + "time" + + "github.com/alexei/tinyforge/internal/store" +) + +// fakeTriggerSource lets tests inject a static set of enabled triggers +// without standing up SQLite. ListEnabledEventTriggers is the only +// method the dispatcher uses. +type fakeTriggerSource struct { + rows []store.EventTrigger + err error +} + +func (f *fakeTriggerSource) ListEnabledEventTriggers() ([]store.EventTrigger, error) { + if f.err != nil { + return nil, f.err + } + return f.rows, nil +} + +// fakeNotifier captures dispatches in memory so tests can assert +// (URL, secret, eventType, payload) tuples. +type fakeNotifier struct { + mu sync.Mutex + calls []fakeNotifierCall +} + +type fakeNotifierCall struct { + URL string + Secret string + EventType string + Payload any +} + +func (f *fakeNotifier) SendPayload(url, secret, eventType string, payload any) { + f.mu.Lock() + defer f.mu.Unlock() + f.calls = append(f.calls, fakeNotifierCall{URL: url, Secret: secret, EventType: eventType, Payload: payload}) +} + +func (f *fakeNotifier) Calls() []fakeNotifierCall { + f.mu.Lock() + defer f.mu.Unlock() + out := make([]fakeNotifierCall, len(f.calls)) + copy(out, f.calls) + return out +} + +func newDispatcher(rows []store.EventTrigger) (*dispatcher, *fakeNotifier) { + n := &fakeNotifier{} + return &dispatcher{ + triggers: &fakeTriggerSource{rows: rows}, + notifier: n, + regexCache: map[string]*regexp.Regexp{}, + }, n +} + +func TestMatches_EmptyFiltersAllowAnything(t *testing.T) { + d, _ := newDispatcher(nil) + tr := store.EventTrigger{Name: "anything"} + got, err := d.matches(tr, EventLogPayload{Severity: "info", Source: "deploy", Message: "hello"}) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if !got { + t.Fatal("empty filters should pass") + } +} + +func TestMatches_SeverityCSV(t *testing.T) { + d, _ := newDispatcher(nil) + tr := store.EventTrigger{FilterSeverity: "warn, error"} + cases := []struct { + sev string + want bool + }{ + {"error", true}, + {"warn", true}, + {"info", false}, + } + for _, c := range cases { + got, err := d.matches(tr, EventLogPayload{Severity: c.sev}) + if err != nil { + t.Fatalf("err: %v", err) + } + if got != c.want { + t.Errorf("severity=%q want=%v got=%v", c.sev, c.want, got) + } + } +} + +func TestMatches_SourceCSV(t *testing.T) { + d, _ := newDispatcher(nil) + tr := store.EventTrigger{FilterSource: "logscan,deploy"} + cases := []struct { + src string + want bool + }{ + {"logscan", true}, + {"deploy", true}, + {"reconciler", false}, + {"", false}, + } + for _, c := range cases { + got, _ := d.matches(tr, EventLogPayload{Source: c.src}) + if got != c.want { + t.Errorf("source=%q want=%v got=%v", c.src, c.want, got) + } + } +} + +func TestMatches_MessageRegex(t *testing.T) { + d, _ := newDispatcher(nil) + tr := store.EventTrigger{FilterMessageRegex: `(?i)\bpanic\b`} + if got, _ := d.matches(tr, EventLogPayload{Message: "fatal: Panic in worker"}); !got { + t.Error("expected case-insensitive panic to match") + } + if got, _ := d.matches(tr, EventLogPayload{Message: "all good"}); got { + t.Error("expected non-matching message to fail") + } +} + +func TestMatches_InvalidRegexReturnsError(t *testing.T) { + d, _ := newDispatcher(nil) + tr := store.EventTrigger{FilterMessageRegex: "([unclosed"} + _, err := d.matches(tr, EventLogPayload{Message: "x"}) + if err == nil { + t.Fatal("expected compile error on invalid regex") + } +} + +func TestMatches_AllFiltersAND(t *testing.T) { + d, _ := newDispatcher(nil) + tr := store.EventTrigger{ + FilterSeverity: "error", + FilterSource: "logscan", + FilterMessageRegex: "timeout", + } + full := EventLogPayload{Severity: "error", Source: "logscan", Message: "request timeout"} + if got, _ := d.matches(tr, full); !got { + t.Error("all-match payload should pass") + } + missMessage := full + missMessage.Message = "all good" + if got, _ := d.matches(tr, missMessage); got { + t.Error("message mismatch should fail despite severity+source match") + } + missSource := full + missSource.Source = "deploy" + if got, _ := d.matches(tr, missSource); got { + t.Error("source mismatch should fail") + } +} + +func TestHandle_DispatchesMatchingTrigger(t *testing.T) { + rows := []store.EventTrigger{ + {ID: 1, Name: "T1", FilterSeverity: "error", ActionType: store.EventTriggerActionWebhook, + ActionTarget: "https://example.com/hook", ActionSecret: "shh"}, + {ID: 2, Name: "T2", FilterSeverity: "warn", ActionType: store.EventTriggerActionWebhook, + ActionTarget: "https://example.com/other"}, + } + d, notifier := newDispatcher(rows) + d.handle(EventLogPayload{Severity: "error", Source: "logscan", Message: "panic", CreatedAt: time.Now().Format(time.RFC3339)}) + + calls := notifier.Calls() + if len(calls) != 1 { + t.Fatalf("expected 1 call, got %d", len(calls)) + } + if calls[0].URL != "https://example.com/hook" { + t.Errorf("URL=%q want https://example.com/hook", calls[0].URL) + } + if calls[0].Secret != "shh" { + t.Errorf("Secret=%q want shh", calls[0].Secret) + } + p, ok := calls[0].Payload.(TriggerWebhookPayload) + if !ok { + t.Fatalf("payload type=%T want TriggerWebhookPayload", calls[0].Payload) + } + if p.TriggerID != 1 || p.Trigger != "T1" { + t.Errorf("payload trigger metadata wrong: %+v", p) + } + if p.Event.Severity != "error" { + t.Errorf("payload event mismatch: %+v", p.Event) + } +} + +func TestHandle_TriggerSourceErrorLogged(t *testing.T) { + n := &fakeNotifier{} + d := &dispatcher{ + triggers: &fakeTriggerSource{err: errors.New("db down")}, + notifier: n, + regexCache: map[string]*regexp.Regexp{}, + } + d.handle(EventLogPayload{Severity: "error"}) + if len(n.Calls()) != 0 { + t.Errorf("dispatcher should not call notifier when trigger source errored") + } +} + +func TestHandle_UnsupportedActionTypeSkipped(t *testing.T) { + rows := []store.EventTrigger{ + {ID: 1, Name: "T1", ActionType: "future-channel", ActionTarget: "x"}, + } + d, n := newDispatcher(rows) + d.handle(EventLogPayload{Severity: "info"}) + if len(n.Calls()) != 0 { + t.Errorf("unsupported action_type should not dispatch") + } +} + +func TestFilterMatchCSV(t *testing.T) { + cases := []struct { + filter, cand string + want bool + }{ + {"", "anything", true}, + {" ", "anything", true}, + {"a", "a", true}, + {"a", "b", false}, + {"a,b,c", "b", true}, + {" a , b , c ", "b", true}, + {"a,b", "c", false}, + } + for _, c := range cases { + got := filterMatchCSV(c.filter, c.cand) + if got != c.want { + t.Errorf("filterMatchCSV(%q, %q) = %v want %v", c.filter, c.cand, got, c.want) + } + } +} + +func TestRegexCache_ReusesCompiledPattern(t *testing.T) { + d, _ := newDispatcher(nil) + re1, err := d.compile(`\bfoo\b`) + if err != nil { + t.Fatalf("compile: %v", err) + } + re2, err := d.compile(`\bfoo\b`) + if err != nil { + t.Fatalf("recompile: %v", err) + } + if re1 != re2 { + t.Error("expected cached compile to return the same regexp pointer") + } +} diff --git a/internal/logscanner/engine.go b/internal/logscanner/engine.go new file mode 100644 index 0000000..a72088a --- /dev/null +++ b/internal/logscanner/engine.go @@ -0,0 +1,227 @@ +package logscanner + +import ( + "sync" + "sync/atomic" + "time" +) + +// Engine evaluates lines against a snapshot, gates by per-rule +// cooldown, and rate-limits emissions per container so a noisy regex +// can't flood the event_log table. +type Engine struct { + // cooldownMu guards lastFiredAt. The map is keyed on a + // composite (containerID, ruleID) so the same rule firing on two + // different containers gets independent cooldowns — matches the + // operator intuition that "this container is alerting" doesn't + // suppress an alert from a different container. + cooldownMu sync.Mutex + lastFiredAt map[cooldownKey]time.Time + + // bucketMu guards tokenBuckets. One bucket per container. + bucketMu sync.Mutex + tokenBuckets map[string]*bucket + + // Drop counters. Incremented when a matching hit is suppressed + // before reaching the bus. droppedByBucket covers per-container + // rate-limit drops; droppedByCooldown counts cooldown-suppressed + // matches so operators can tell whether their patterns are too + // greedy vs too tightly cooled. Atomic so the hot path doesn't + // take the lock just to bump the counter. + droppedByBucket atomic.Int64 + droppedByCooldown atomic.Int64 + + // Configuration knobs. Defaults set in NewEngine. + tokensPerWindow int // bucket capacity + tokenWindow time.Duration // bucket refill window + now func() time.Time +} + +// EngineStats is the public-facing counter snapshot. Returned by +// Stats() and surfaced through the manager + API for operator +// visibility — when a noisy regex floods the bucket, this is how +// they discover it. +type EngineStats struct { + DroppedByBucket int64 `json:"dropped_by_bucket"` + DroppedByCooldown int64 `json:"dropped_by_cooldown"` +} + +type cooldownKey struct { + ContainerID string + RuleID int64 +} + +type bucket struct { + tokens int + resetsAt time.Time +} + +// NewEngine constructs an Engine with sensible defaults: 10 +// events / 60s per container. Both knobs can be overridden via +// the With* options. +func NewEngine(opts ...Option) *Engine { + e := &Engine{ + lastFiredAt: map[cooldownKey]time.Time{}, + tokenBuckets: map[string]*bucket{}, + tokensPerWindow: 10, + tokenWindow: 60 * time.Second, + now: time.Now, + } + for _, opt := range opts { + opt(e) + } + return e +} + +// Option mutates an Engine during construction. +type Option func(*Engine) + +// WithBucket sets the per-container token bucket capacity and +// refill window. Used by tests to make rate-limit assertions +// deterministic. +func WithBucket(tokens int, window time.Duration) Option { + return func(e *Engine) { + e.tokensPerWindow = tokens + e.tokenWindow = window + } +} + +// WithNow overrides the clock for tests. +func WithNow(now func() time.Time) Option { + return func(e *Engine) { + e.now = now + } +} + +// Match is the hot-path evaluation. For every rule whose Streams +// covers `stream` and whose pattern matches `line`, returns one +// Hit — gated by cooldown and per-container token bucket. Empty +// result means "drop this line." +// +// Side-effect: cooldown timestamps + bucket counters are updated +// when a rule fires. Callers must not retry the same line. +func (e *Engine) Match(containerID, workloadID, stream, line string, rules []Rule) []Hit { + if line == "" || len(rules) == 0 { + return nil + } + var hits []Hit + now := e.now() + for _, r := range rules { + if !streamMatches(r.Streams, stream) { + continue + } + if !r.Pattern.MatchString(line) { + continue + } + if !e.cooledDown(containerID, r, now) { + // Matched but inside the cooldown window — bump the + // counter so operators can see when their cooldowns + // are eating real signal. + e.droppedByCooldown.Add(1) + continue + } + if !e.takeToken(containerID, now) { + // Bucket exhausted for this container. Bump the + // counter so the stats endpoint can surface chatty + // patterns; the operator's signal is "Stats says we + // dropped N events — your rule is too broad." + e.droppedByBucket.Add(1) + continue + } + e.markFired(containerID, r.ID, now) + hits = append(hits, Hit{ + Rule: r, + ContainerID: containerID, + WorkloadID: workloadID, + Stream: stream, + Line: line, + }) + } + return hits +} + +// Stats returns a snapshot of the engine's drop counters. The +// counters are monotonic over the lifetime of the engine — useful +// for trend observation, but not for instantaneous "is it dropping +// right now" alerting. Reset semantics: the engine is recreated on +// every process restart so counters reset to zero with the binary. +func (e *Engine) Stats() EngineStats { + return EngineStats{ + DroppedByBucket: e.droppedByBucket.Load(), + DroppedByCooldown: e.droppedByCooldown.Load(), + } +} + +// Hit is one rule fire — the engine returns these for the manager +// to persist + publish on the bus. Kept narrow on purpose so the +// engine has no event_log / bus dependency. +type Hit struct { + Rule Rule + ContainerID string + WorkloadID string + Stream string + Line string +} + +// streamMatches checks whether a rule that scopes itself to a +// stream subset accepts the given stream. Empty rule.Streams is +// equivalent to "all" for forward-compat with older rows. +func streamMatches(ruleStreams, lineStream string) bool { + if ruleStreams == "" || ruleStreams == "all" { + return true + } + return ruleStreams == lineStream +} + +func (e *Engine) cooledDown(containerID string, r Rule, now time.Time) bool { + if r.CooldownSeconds <= 0 { + return true + } + e.cooldownMu.Lock() + defer e.cooldownMu.Unlock() + last, ok := e.lastFiredAt[cooldownKey{containerID, r.ID}] + if !ok { + return true + } + return now.Sub(last) >= time.Duration(r.CooldownSeconds)*time.Second +} + +func (e *Engine) markFired(containerID string, ruleID int64, now time.Time) { + e.cooldownMu.Lock() + e.lastFiredAt[cooldownKey{containerID, ruleID}] = now + e.cooldownMu.Unlock() +} + +func (e *Engine) takeToken(containerID string, now time.Time) bool { + e.bucketMu.Lock() + defer e.bucketMu.Unlock() + b, ok := e.tokenBuckets[containerID] + if !ok { + b = &bucket{tokens: e.tokensPerWindow, resetsAt: now.Add(e.tokenWindow)} + e.tokenBuckets[containerID] = b + } + if !now.Before(b.resetsAt) { + b.tokens = e.tokensPerWindow + b.resetsAt = now.Add(e.tokenWindow) + } + if b.tokens <= 0 { + return false + } + b.tokens-- + return true +} + +// Forget drops cooldown + bucket state for a container that has been +// removed. Called by the manager when a tail exits to reclaim memory. +func (e *Engine) Forget(containerID string) { + e.cooldownMu.Lock() + for k := range e.lastFiredAt { + if k.ContainerID == containerID { + delete(e.lastFiredAt, k) + } + } + e.cooldownMu.Unlock() + e.bucketMu.Lock() + delete(e.tokenBuckets, containerID) + e.bucketMu.Unlock() +} diff --git a/internal/logscanner/engine_test.go b/internal/logscanner/engine_test.go new file mode 100644 index 0000000..57cf7d7 --- /dev/null +++ b/internal/logscanner/engine_test.go @@ -0,0 +1,223 @@ +package logscanner + +import ( + "regexp" + "testing" + "time" +) + +func mustRegexp(t *testing.T, p string) *regexp.Regexp { + t.Helper() + re, err := regexp.Compile(p) + if err != nil { + t.Fatalf("compile %q: %v", p, err) + } + return re +} + +func newRule(t *testing.T, id int64, pattern string, opts ...func(*Rule)) Rule { + r := Rule{ + ID: id, + Name: "rule" + pattern, + Pattern: mustRegexp(t, pattern), + Severity: "warn", + Streams: "all", + CooldownSeconds: 0, + } + for _, o := range opts { + o(&r) + } + return r +} + +func TestEngineMatch_BasicHit(t *testing.T) { + e := NewEngine() + rules := []Rule{newRule(t, 1, `panic`)} + hits := e.Match("c1", "w1", "stderr", "fatal panic in worker", rules) + if len(hits) != 1 { + t.Fatalf("want 1 hit, got %d", len(hits)) + } + if hits[0].Rule.ID != 1 { + t.Errorf("rule id mismatch: %d", hits[0].Rule.ID) + } +} + +func TestEngineMatch_StreamFilter(t *testing.T) { + e := NewEngine() + rules := []Rule{newRule(t, 1, `boom`, func(r *Rule) { r.Streams = "stderr" })} + if len(e.Match("c", "w", "stdout", "boom there", rules)) != 0 { + t.Error("stdout line should not match stderr-only rule") + } + if len(e.Match("c", "w", "stderr", "boom there", rules)) != 1 { + t.Error("stderr line should match stderr-only rule") + } +} + +func TestEngineMatch_NoMatchNoHit(t *testing.T) { + e := NewEngine() + rules := []Rule{newRule(t, 1, `panic`)} + if h := e.Match("c", "w", "stdout", "all good", rules); len(h) != 0 { + t.Errorf("expected no hit, got %d", len(h)) + } +} + +func TestEngineMatch_Cooldown(t *testing.T) { + now := time.Now() + clock := now + e := NewEngine(WithNow(func() time.Time { return clock })) + rules := []Rule{newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 30 })} + + if len(e.Match("c", "w", "stdout", "bad event", rules)) != 1 { + t.Fatal("first fire expected") + } + clock = now.Add(10 * time.Second) + if h := e.Match("c", "w", "stdout", "bad event 2", rules); len(h) != 0 { + t.Errorf("cooled-down rule fired: %+v", h) + } + clock = now.Add(31 * time.Second) + if len(e.Match("c", "w", "stdout", "bad event 3", rules)) != 1 { + t.Error("cooldown expired but rule did not fire") + } +} + +func TestEngineMatch_PerContainerCooldownIndependent(t *testing.T) { + // Same rule firing on two different containers should not + // share cooldown — operators expect each container's alerts + // to be independent. + now := time.Now() + clock := now + e := NewEngine(WithNow(func() time.Time { return clock })) + rules := []Rule{newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 30 })} + + e.Match("c1", "w", "stdout", "bad on one", rules) + if len(e.Match("c2", "w", "stdout", "bad on two", rules)) != 1 { + t.Error("second container should fire independently") + } +} + +func TestEngineMatch_TokenBucket(t *testing.T) { + now := time.Now() + clock := now + e := NewEngine(WithNow(func() time.Time { return clock }), WithBucket(3, time.Minute)) + rules := []Rule{newRule(t, 1, `noisy`)} + for i := 0; i < 3; i++ { + if len(e.Match("c", "w", "stdout", "noisy "+string(rune('a'+i)), rules)) != 1 { + t.Errorf("hit %d should fire", i) + } + } + // 4th within window should be dropped. + if h := e.Match("c", "w", "stdout", "noisy d", rules); len(h) != 0 { + t.Errorf("4th hit should be rate-limited, got %d", len(h)) + } + // After the window the bucket refills. + clock = now.Add(time.Minute + time.Second) + if len(e.Match("c", "w", "stdout", "noisy e", rules)) != 1 { + t.Error("bucket should have refilled after window") + } +} + +func TestEngineForget_DropsState(t *testing.T) { + e := NewEngine() + rules := []Rule{newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 999 })} + e.Match("c", "w", "stdout", "bad once", rules) + e.Forget("c") + // After Forget, the same rule on the same container should + // fire again immediately — cooldown state was cleared. + if len(e.Match("c", "w", "stdout", "bad again", rules)) != 1 { + t.Error("Forget should drop cooldown state") + } +} + +func TestEngineStats_CooldownCounter(t *testing.T) { + // Two matches inside the cooldown window should drop once and + // increment DroppedByCooldown by one. Bucket is generous so it + // never participates. + now := time.Now() + clock := now + e := NewEngine( + WithNow(func() time.Time { return clock }), + WithBucket(100, time.Hour), + ) + rule := newRule(t, 1, `bad`, func(r *Rule) { r.CooldownSeconds = 60 }) + + if len(e.Match("c", "w", "stdout", "bad a", []Rule{rule})) != 1 { + t.Fatal("first fire expected") + } + if len(e.Match("c", "w", "stdout", "bad b", []Rule{rule})) != 0 { + t.Fatal("second fire inside cooldown should drop") + } + stats := e.Stats() + if stats.DroppedByCooldown != 1 { + t.Errorf("DroppedByCooldown = %d, want 1", stats.DroppedByCooldown) + } + if stats.DroppedByBucket != 0 { + t.Errorf("DroppedByBucket = %d, want 0 (bucket should not have fired)", + stats.DroppedByBucket) + } +} + +func TestEngineStats_BucketCounter(t *testing.T) { + // Drain a small bucket without any cooldown so every drop + // after the first is bucket-attributable. + now := time.Now() + clock := now + e := NewEngine( + WithNow(func() time.Time { return clock }), + WithBucket(2, time.Hour), + ) + rule := newRule(t, 1, `bad`) // cooldown=0 + + // 2 fires consume both tokens. + for i := 0; i < 2; i++ { + if len(e.Match("c", "w", "stdout", "bad", []Rule{rule})) != 1 { + t.Fatalf("fire %d expected to succeed", i) + } + } + // 3rd and 4th fires hit an empty bucket. + for i := 0; i < 2; i++ { + if len(e.Match("c", "w", "stdout", "bad", []Rule{rule})) != 0 { + t.Fatalf("fire %d expected to be bucket-dropped", i+2) + } + } + stats := e.Stats() + if stats.DroppedByBucket != 2 { + t.Errorf("DroppedByBucket = %d, want 2", stats.DroppedByBucket) + } + if stats.DroppedByCooldown != 0 { + t.Errorf("DroppedByCooldown = %d, want 0 (cooldown should not have fired)", + stats.DroppedByCooldown) + } +} + +func TestEngineStats_NoDropsWhenAllowed(t *testing.T) { + e := NewEngine(WithBucket(100, time.Minute)) + rule := newRule(t, 1, `x`) + for i := 0; i < 5; i++ { + e.Match("c", "w", "stdout", "x", []Rule{rule}) + } + stats := e.Stats() + if stats.DroppedByBucket != 0 || stats.DroppedByCooldown != 0 { + t.Errorf("expected zero drops, got %+v", stats) + } +} + +func TestStreamMatches(t *testing.T) { + cases := []struct { + rule, line string + want bool + }{ + {"all", "stdout", true}, + {"all", "stderr", true}, + {"", "stdout", true}, + {"stdout", "stdout", true}, + {"stdout", "stderr", false}, + {"stderr", "stderr", true}, + {"stderr", "stdout", false}, + } + for _, c := range cases { + got := streamMatches(c.rule, c.line) + if got != c.want { + t.Errorf("streamMatches(%q,%q) = %v want %v", c.rule, c.line, got, c.want) + } + } +} diff --git a/internal/logscanner/manager.go b/internal/logscanner/manager.go new file mode 100644 index 0000000..29c8cc6 --- /dev/null +++ b/internal/logscanner/manager.go @@ -0,0 +1,345 @@ +package logscanner + +import ( + "context" + "encoding/json" + "log/slog" + "strconv" + "sync" + "sync/atomic" + "time" + "unicode/utf8" + + "github.com/alexei/tinyforge/internal/docker" + "github.com/alexei/tinyforge/internal/events" + "github.com/alexei/tinyforge/internal/store" +) + +// RuleSource is the read-side seam for fetching the current rule +// rows. Real callers pass *store.Store; tests pass a fake. +type RuleSource interface { + ListLogScanRules() ([]store.LogScanRule, error) +} + +// ContainerLister is what the manager needs from the store to +// discover running container rows. The filter is set by the +// manager (state="running") so adapters don't have to do that. +type ContainerLister interface { + ListContainers(filter store.ContainerFilter) ([]store.Container, error) +} + +// EventStore writes the matched-line entries into event_log. +// Same shape as events.PersistFunc but typed to keep the package +// self-contained. +type EventStore interface { + InsertEvent(evt store.EventLog) (store.EventLog, error) +} + +// Manager owns the lifecycle of per-container tails. It polls the +// container index every PollInterval (default 5s), starts tails for +// new running containers, and stops tails when a container is no +// longer running (state != "running" or row deleted). +// +// Rule changes (CRUD via API) trigger ReloadRules which rebuilds the +// snapshot once and atomically swaps the pointer all tails read. +type Manager struct { + rules RuleSource + containers ContainerLister + docker dockerLogger + events EventStore + bus *events.Bus + pollInterval time.Duration + + snapshot atomic.Pointer[Snapshot] + engine *Engine + + mu sync.Mutex + tails map[string]context.CancelFunc // containerID -> cancel + tailWG sync.WaitGroup + + // statsMu guards lastCompileErrors. Compile-error visibility + // matters because a broken rule silently disappears from the + // snapshot — without surfacing the message, the operator has + // no signal beyond a warn-level log line. + statsMu sync.RWMutex + lastCompileErrors []string + + ctx context.Context + cancel context.CancelFunc +} + +// Stats bundles the operator-facing observability for the log +// scanner. EngineStats covers the hot-path drop counters; compile +// errors are the last snapshot's invalid-pattern messages (helpful +// when a freshly-saved rule "doesn't fire" — the issue is usually +// here). ActiveTails reports how many container goroutines the +// manager is currently driving. +type Stats struct { + Engine EngineStats `json:"engine"` + ActiveTails int `json:"active_tails"` + LastCompileErrors []string `json:"last_compile_errors"` +} + +// Config bundles the constructor inputs. +type Config struct { + Rules RuleSource + Containers ContainerLister + Docker *docker.Client + Events EventStore + Bus *events.Bus + PollInterval time.Duration // default 5s +} + +// NewManager wires a manager with the supplied dependencies. +// It does not start polling — call Start to begin the lifecycle. +func NewManager(cfg Config) *Manager { + poll := cfg.PollInterval + if poll <= 0 { + poll = 5 * time.Second + } + return &Manager{ + rules: cfg.Rules, + containers: cfg.Containers, + docker: cfg.Docker, + events: cfg.Events, + bus: cfg.Bus, + pollInterval: poll, + engine: NewEngine(), + tails: map[string]context.CancelFunc{}, + } +} + +// Start kicks off the polling loop and initial snapshot build. +// Returns an error only if the initial rule load fails; subsequent +// load failures are logged but do not stop the manager. +func (m *Manager) Start(ctx context.Context) error { + if err := m.ReloadRules(); err != nil { + return err + } + m.ctx, m.cancel = context.WithCancel(ctx) + go m.loop() + return nil +} + +// Stop cancels every tail and waits for them to exit. +func (m *Manager) Stop() { + if m.cancel != nil { + m.cancel() + } + m.tailWG.Wait() +} + +// ReloadRules rebuilds the snapshot from the current store state. +// Safe to call concurrently with the polling loop and with tails — +// the atomic pointer swap is the only synchronization required. +// +// Compile errors are both logged and stored on the manager so the +// API stats endpoint can surface them to operators. The set is +// fully replaced on each reload — there's no "and previously +// these other rules were broken" trail. +func (m *Manager) ReloadRules() error { + rows, err := m.rules.ListLogScanRules() + if err != nil { + return err + } + snap, compileErrs := BuildSnapshot(rows) + errMsgs := make([]string, 0, len(compileErrs)) + for _, e := range compileErrs { + slog.Warn("logscanner: rule compile failed (dropped from snapshot)", "error", e) + errMsgs = append(errMsgs, e.Error()) + } + m.snapshot.Store(snap) + m.statsMu.Lock() + m.lastCompileErrors = errMsgs + m.statsMu.Unlock() + return nil +} + +// Stats returns the current operator-facing observability snapshot. +// Read-safe: the mutex protects only the compile-errors slice; +// counters and tail-count are atomic / mutex-guarded internally. +func (m *Manager) Stats() Stats { + m.statsMu.RLock() + errs := make([]string, len(m.lastCompileErrors)) + copy(errs, m.lastCompileErrors) + m.statsMu.RUnlock() + + m.mu.Lock() + tails := len(m.tails) + m.mu.Unlock() + + return Stats{ + Engine: m.engine.Stats(), + ActiveTails: tails, + LastCompileErrors: errs, + } +} + +// EmitHit persists a hit as an event_log row and publishes it on +// the bus. Implements the HitEmitter interface so tails depend on +// the interface, not the concrete manager — also makes the manager +// easy to unit-test by passing a recording emitter. +func (m *Manager) EmitHit(ctx context.Context, hit Hit) { + const maxMessage = 500 // truncate long lines so one chatty rule can't blow up event_log + msg := truncateUTF8(hit.Line, maxMessage) + meta := map[string]any{ + "workload_id": hit.WorkloadID, + "container_id": hit.ContainerID, + "rule_id": hit.Rule.ID, + "rule_name": hit.Rule.Name, + "stream": hit.Stream, + } + if subs := hit.Rule.Pattern.FindStringSubmatch(hit.Line); len(subs) > 1 { + captures := map[string]string{} + for i, sub := range subs[1:] { + captures[indexName(hit.Rule.Pattern, i+1)] = sub + } + meta["captures"] = captures + } + metaJSON, _ := json.Marshal(meta) + evt, err := m.events.InsertEvent(store.EventLog{ + Source: "logscan", + Severity: nonEmpty(hit.Rule.Severity, store.LogScanSeverityWarn), + Message: msg, + Metadata: string(metaJSON), + }) + if err != nil { + slog.Warn("logscanner: persist event", "rule", hit.Rule.Name, "error", err) + return + } + if m.bus != nil { + m.bus.Publish(events.Event{ + Type: events.EventLog, + Payload: events.EventLogPayload{ + ID: evt.ID, + Source: evt.Source, + Severity: evt.Severity, + Message: evt.Message, + Metadata: evt.Metadata, + CreatedAt: evt.CreatedAt, + }, + }) + } +} + +// loop runs the lifecycle poll until ctx cancellation. It is single- +// threaded over the tails map — start/stop of individual tails +// happens here so there's no race on the map. +func (m *Manager) loop() { + ticker := time.NewTicker(m.pollInterval) + defer ticker.Stop() + m.reconcile() // run once immediately + for { + select { + case <-m.ctx.Done(): + m.stopAllTails() + return + case <-ticker.C: + m.reconcile() + } + } +} + +// reconcile diffs the desired set of running container IDs against +// the live tails and starts/stops as needed. +func (m *Manager) reconcile() { + rows, err := m.containers.ListContainers(store.ContainerFilter{State: "running"}) + if err != nil { + slog.Warn("logscanner: list containers", "error", err) + return + } + desired := map[string]store.Container{} + for _, c := range rows { + if c.ContainerID == "" { + continue + } + desired[c.ContainerID] = c + } + + m.mu.Lock() + defer m.mu.Unlock() + + for id, c := range desired { + if _, ok := m.tails[id]; ok { + continue + } + m.startTailLocked(id, c.WorkloadID) + } + for id, cancel := range m.tails { + if _, ok := desired[id]; !ok { + cancel() + delete(m.tails, id) + m.engine.Forget(id) + } + } +} + +func (m *Manager) startTailLocked(containerID, workloadID string) { + tailCtx, cancel := context.WithCancel(m.ctx) + t := &tail{ + containerID: containerID, + workloadID: workloadID, + docker: m.docker, + engine: m.engine, + emitter: m, + snapshot: &m.snapshot, + } + if err := t.validate(); err != nil { + slog.Warn("logscanner: tail validation failed", "container", containerID, "error", err) + cancel() + return + } + m.tails[containerID] = cancel + m.tailWG.Add(1) + go func() { + defer m.tailWG.Done() + t.run(tailCtx) + }() +} + +func (m *Manager) stopAllTails() { + m.mu.Lock() + for id, cancel := range m.tails { + cancel() + delete(m.tails, id) + } + m.mu.Unlock() + m.tailWG.Wait() +} + +// indexName returns the name of capture group i (1-based). Falls +// back to "$" when the group is unnamed so JSON keys stay stable +// AND distinct across groups (the previous $N collision-fallback +// silently dropped groups beyond $3 onto the same JSON key). +func indexName(re interface{ SubexpNames() []string }, i int) string { + names := re.SubexpNames() + if i < len(names) && names[i] != "" { + return names[i] + } + return "$" + strconv.Itoa(i) +} + +func nonEmpty(a, b string) string { + if a != "" { + return a + } + return b +} + +// truncateUTF8 returns s shortened to at most maxBytes, cutting on +// a rune boundary so we never leave a partial UTF-8 codepoint in +// the output. An ellipsis is appended when truncation occurred so +// downstream readers can tell. Callers must size maxBytes to +// include the trailing 3-byte ellipsis. +func truncateUTF8(s string, maxBytes int) string { + if len(s) <= maxBytes { + return s + } + end := maxBytes + // Walk back from the byte cut to the last rune boundary so + // utf8.ValidString stays true on the returned slice. + for end > 0 && !utf8.RuneStart(s[end]) { + end-- + } + return s[:end] + "…" +} diff --git a/internal/logscanner/manager_test.go b/internal/logscanner/manager_test.go new file mode 100644 index 0000000..7800558 --- /dev/null +++ b/internal/logscanner/manager_test.go @@ -0,0 +1,108 @@ +package logscanner + +import ( + "regexp" + "strings" + "testing" + "unicode/utf8" +) + +func TestTruncateUTF8(t *testing.T) { + cases := []struct { + name string + in string + maxBytes int + want string + }{ + { + name: "shorter than cap untouched", + in: "hello", + maxBytes: 100, + want: "hello", + }, + { + name: "ASCII truncation", + in: "abcdefghij", + maxBytes: 5, + want: "abcde…", + }, + { + name: "cuts on rune boundary inside multibyte", + in: "abcdé", + maxBytes: 5, + want: "abcd…", + }, + { + name: "preserves valid utf-8 when cap lands mid-codepoint", + in: "ééééééé", + maxBytes: 5, + want: "éé…", + }, + { + name: "empty input untouched", + in: "", + maxBytes: 5, + want: "", + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + got := truncateUTF8(c.in, c.maxBytes) + if got != c.want { + t.Errorf("got %q want %q", got, c.want) + } + if !utf8.ValidString(got) { + t.Errorf("result not valid UTF-8: %q", got) + } + }) + } +} + +func TestNonEmpty(t *testing.T) { + if nonEmpty("a", "b") != "a" { + t.Error("first non-empty wins") + } + if nonEmpty("", "b") != "b" { + t.Error("fallback when first empty") + } + if nonEmpty("", "") != "" { + t.Error("both empty yields empty") + } +} + +func TestIndexName_UnamedGroupsStable(t *testing.T) { + // Each unnamed group should get a distinct fallback name so + // JSON serialization doesn't collapse $4..$N onto a single key. + re := mustCompile(t, `(\w+) (\w+) (\w+) (\w+) (\w+)`) + seen := map[string]bool{} + for i := 1; i <= 5; i++ { + name := indexName(re, i) + if seen[name] { + t.Errorf("indexName(%d) = %q collides with prior group", i, name) + } + seen[name] = true + if !strings.HasPrefix(name, "$") { + t.Errorf("unnamed group %d should fall back to $N form, got %q", i, name) + } + } +} + +func TestIndexName_NamedGroupWins(t *testing.T) { + re := mustCompile(t, `(?P\d+) (\w+)`) + if got := indexName(re, 1); got != "code" { + t.Errorf("named group should win: %q", got) + } + if got := indexName(re, 2); got != "$2" { + t.Errorf("second (unnamed) group: %q", got) + } +} + +// mustCompile is a local helper so the test file is self-contained. +func mustCompile(t *testing.T, pattern string) interface{ SubexpNames() []string } { + t.Helper() + r, err := regexp.Compile(pattern) + if err != nil { + t.Fatalf("compile %q: %v", pattern, err) + } + return r +} diff --git a/internal/logscanner/rules.go b/internal/logscanner/rules.go new file mode 100644 index 0000000..ced36dc --- /dev/null +++ b/internal/logscanner/rules.go @@ -0,0 +1,143 @@ +// Package logscanner tails running container logs, matches lines +// against operator-configured regex rules, and emits event_log entries +// via the events bus. The package is split into four files: +// +// - rules.go: the rule snapshot — compiled regexes + per-rule +// metadata. Snapshots are immutable; the manager builds a new +// snapshot on every rule change and swaps it atomically. +// - engine.go: rule evaluation per line + cooldown + token bucket. +// - tail.go: per-container goroutine reading docker log stream. +// - manager.go: container lifecycle polling + tail lifecycle. +package logscanner + +import ( + "fmt" + "regexp" + + "github.com/alexei/tinyforge/internal/store" +) + +// Rule is a compiled, immutable representation of a store.LogScanRule. +// Built once when the snapshot is loaded; held by every tail goroutine +// via the snapshot atomic pointer. +type Rule struct { + ID int64 + WorkloadID string // "" = global + Name string + Pattern *regexp.Regexp + Severity string + Streams string // "all" | "stdout" | "stderr" + CooldownSeconds int +} + +// Snapshot is the rule set as seen by tails. Built from a flat slice +// of store.LogScanRule rows: globals are expanded via +// store.EffectiveLogScanRules to yield the per-workload effective +// set at lookup time. +// +// The snapshot is immutable. Hot-reload semantics are achieved by +// building a fresh snapshot in the manager and swapping the atomic +// pointer; in-flight tails finish their current match against the +// old snapshot, then pick up the new one on the next line. +// +// MUST NOT mutate any field (including map values, including slice +// elements) after BuildSnapshot returns — tails read these +// concurrently with no locking and rely on immutability for safety. +type Snapshot struct { + // global is the list of compiled global rules (workload_id == ""). + global []Rule + // perWorkload[workloadID] holds workload-only additions AND + // per-workload overrides (already resolved against globals). + // EffectiveFor(id) merges global + perWorkload[id] minus any + // global overridden under that workload. + perWorkload map[string][]Rule + // overrides[id] is the workload's overriding rule (compiled). + // overrides[id][globalID] -> Rule (the override row). + overrides map[string]map[int64]Rule +} + +// BuildSnapshot compiles every rule's pattern and returns an +// immutable Snapshot. Rules that fail to compile are skipped with +// their error reported to the caller — store-side validation keeps +// the pattern field as a free-form regex string, so engine-time +// compile failures are an expected, recoverable mode. +// +// The returned error slice is informational, not fatal: bad rules +// are dropped from the snapshot but the rest still work. +func BuildSnapshot(rows []store.LogScanRule) (*Snapshot, []error) { + s := &Snapshot{ + perWorkload: map[string][]Rule{}, + overrides: map[string]map[int64]Rule{}, + } + var errs []error + for _, row := range rows { + if !row.Enabled { + // Skip outright. Overrides with enabled=false still + // need to be recorded so they suppress a global — + // handled by tracking disable separately below. + } + re, err := regexp.Compile(row.Pattern) + if err != nil { + errs = append(errs, fmt.Errorf("rule #%d %q: %w", row.ID, row.Name, err)) + continue + } + r := Rule{ + ID: row.ID, + WorkloadID: row.WorkloadID, + Name: row.Name, + Pattern: re, + Severity: row.Severity, + Streams: row.Streams, + CooldownSeconds: row.CooldownSeconds, + } + switch { + case row.WorkloadID == "" && row.OverridesID == 0: + if row.Enabled { + s.global = append(s.global, r) + } + case row.WorkloadID != "" && row.OverridesID == 0: + if row.Enabled { + s.perWorkload[row.WorkloadID] = append(s.perWorkload[row.WorkloadID], r) + } + case row.WorkloadID != "" && row.OverridesID != 0: + // Override row: always record so a disabled override + // suppresses the global for this workload. + if _, ok := s.overrides[row.WorkloadID]; !ok { + s.overrides[row.WorkloadID] = map[int64]Rule{} + } + if row.Enabled { + s.overrides[row.WorkloadID][row.OverridesID] = r + } else { + // Encode "disabled override" as the zero rule so + // EffectiveFor can drop it without re-querying. + s.overrides[row.WorkloadID][row.OverridesID] = Rule{ID: row.OverridesID} + } + } + } + return s, errs +} + +// EffectiveFor returns the rule list to evaluate against logs of a +// specific workload. Equivalent to store.EffectiveLogScanRules but +// operates on the compiled snapshot, so the hot path is allocation +// free except for the result slice. +func (s *Snapshot) EffectiveFor(workloadID string) []Rule { + if s == nil { + return nil + } + overrides := s.overrides[workloadID] + out := make([]Rule, 0, len(s.global)+len(s.perWorkload[workloadID])) + for _, g := range s.global { + if ov, ok := overrides[g.ID]; ok { + if ov.Pattern == nil { + // Suppressed for this workload — skip. + continue + } + out = append(out, ov) + continue + } + out = append(out, g) + } + out = append(out, s.perWorkload[workloadID]...) + return out +} diff --git a/internal/logscanner/rules_test.go b/internal/logscanner/rules_test.go new file mode 100644 index 0000000..7c74a6e --- /dev/null +++ b/internal/logscanner/rules_test.go @@ -0,0 +1,109 @@ +package logscanner + +import ( + "testing" + + "github.com/alexei/tinyforge/internal/store" +) + +func TestBuildSnapshot_CompileErrorsReported(t *testing.T) { + rows := []store.LogScanRule{ + {ID: 1, Pattern: `valid`, Enabled: true}, + {ID: 2, Pattern: `([unclosed`, Enabled: true}, + } + snap, errs := BuildSnapshot(rows) + if snap == nil { + t.Fatal("snapshot should not be nil") + } + if len(errs) != 1 { + t.Fatalf("expected 1 compile error, got %d", len(errs)) + } + if len(snap.global) != 1 { + t.Errorf("expected 1 global rule (valid one), got %d", len(snap.global)) + } +} + +func TestEffectiveFor_GlobalOnly(t *testing.T) { + rows := []store.LogScanRule{ + {ID: 1, Pattern: `panic`, Enabled: true}, + {ID: 2, Pattern: `fatal`, Enabled: true}, + } + snap, _ := BuildSnapshot(rows) + out := snap.EffectiveFor("w1") + if len(out) != 2 { + t.Fatalf("expected 2 rules, got %d", len(out)) + } +} + +func TestEffectiveFor_WorkloadAddition(t *testing.T) { + rows := []store.LogScanRule{ + {ID: 1, Pattern: `panic`, Enabled: true}, + {ID: 2, Pattern: `slow_query`, WorkloadID: "w1", Enabled: true}, + } + snap, _ := BuildSnapshot(rows) + out := snap.EffectiveFor("w1") + if len(out) != 2 { + t.Fatalf("workload w1 should see both: %d", len(out)) + } + out2 := snap.EffectiveFor("w2") + if len(out2) != 1 { + t.Errorf("workload w2 should see only the global: %d", len(out2)) + } +} + +func TestEffectiveFor_OverrideReplacesGlobal(t *testing.T) { + rows := []store.LogScanRule{ + {ID: 1, Pattern: `panic`, Severity: "warn", Enabled: true}, + { + ID: 2, WorkloadID: "w1", OverridesID: 1, + Pattern: `panic`, Severity: "error", Enabled: true, + }, + } + snap, _ := BuildSnapshot(rows) + out := snap.EffectiveFor("w1") + if len(out) != 1 { + t.Fatalf("expected 1 rule, got %d", len(out)) + } + if out[0].Severity != "error" { + t.Errorf("override severity should win: %q", out[0].Severity) + } + // Other workloads still see the original. + out2 := snap.EffectiveFor("w2") + if len(out2) != 1 || out2[0].Severity != "warn" { + t.Errorf("w2 should see original severity, got %+v", out2) + } +} + +func TestEffectiveFor_DisabledOverrideSuppresses(t *testing.T) { + rows := []store.LogScanRule{ + {ID: 1, Pattern: `panic`, Enabled: true}, + { + ID: 2, WorkloadID: "w1", OverridesID: 1, + Pattern: `panic`, Enabled: false, + }, + } + snap, _ := BuildSnapshot(rows) + if len(snap.EffectiveFor("w1")) != 0 { + t.Errorf("disabled override should suppress global for w1") + } + if len(snap.EffectiveFor("w2")) != 1 { + t.Errorf("w2 should still see the global") + } +} + +func TestEffectiveFor_DisabledGlobalSkipped(t *testing.T) { + rows := []store.LogScanRule{ + {ID: 1, Pattern: `panic`, Enabled: false}, + } + snap, _ := BuildSnapshot(rows) + if len(snap.EffectiveFor("w1")) != 0 { + t.Errorf("disabled global should not appear in effective set") + } +} + +func TestEffectiveFor_NilSnapshot(t *testing.T) { + var snap *Snapshot + if out := snap.EffectiveFor("w1"); out != nil { + t.Errorf("nil snapshot should return nil, got %+v", out) + } +} diff --git a/internal/logscanner/stats_test.go b/internal/logscanner/stats_test.go new file mode 100644 index 0000000..6b1d707 --- /dev/null +++ b/internal/logscanner/stats_test.go @@ -0,0 +1,88 @@ +package logscanner + +import ( + "errors" + "strings" + "testing" + + "github.com/alexei/tinyforge/internal/store" +) + +// fakeRuleSource lets us inject rules into ReloadRules without +// standing up SQLite. Mirrors the fakeTriggerSource pattern from +// the events package. +type fakeRuleSource struct { + rows []store.LogScanRule + err error +} + +func (f *fakeRuleSource) ListLogScanRules() ([]store.LogScanRule, error) { + if f.err != nil { + return nil, f.err + } + return f.rows, nil +} + +func TestManagerStats_CapturesCompileErrors(t *testing.T) { + rs := &fakeRuleSource{rows: []store.LogScanRule{ + {ID: 1, Name: "valid", Pattern: `panic`, Severity: "warn", Streams: "all", Enabled: true}, + {ID: 2, Name: "broken", Pattern: `([unclosed`, Severity: "warn", Streams: "all", Enabled: true}, + {ID: 3, Name: "also-broken", Pattern: `[`, Severity: "warn", Streams: "all", Enabled: true}, + }} + m := NewManager(Config{Rules: rs}) + if err := m.ReloadRules(); err != nil { + t.Fatalf("ReloadRules: %v", err) + } + stats := m.Stats() + if len(stats.LastCompileErrors) != 2 { + t.Fatalf("expected 2 compile errors, got %d: %+v", len(stats.LastCompileErrors), stats.LastCompileErrors) + } + // The error messages should mention the rule id/name from the + // BuildSnapshot format so operators can find which rule broke. + joined := strings.Join(stats.LastCompileErrors, "|") + if !strings.Contains(joined, "broken") { + t.Errorf("error messages should reference rule name 'broken': %s", joined) + } +} + +func TestManagerStats_CompileErrorsReplacedOnReload(t *testing.T) { + // A broken rule then a reload with all-valid rules should + // clear the error list — operators expect the panel to flip + // from "2 errors" to "all clean" after they fix things. + rs := &fakeRuleSource{rows: []store.LogScanRule{ + {ID: 1, Name: "broken", Pattern: `([`, Severity: "warn", Streams: "all", Enabled: true}, + }} + m := NewManager(Config{Rules: rs}) + _ = m.ReloadRules() + if len(m.Stats().LastCompileErrors) != 1 { + t.Fatal("expected one compile error before fix") + } + + rs.rows = []store.LogScanRule{ + {ID: 1, Name: "fixed", Pattern: `panic`, Severity: "warn", Streams: "all", Enabled: true}, + } + _ = m.ReloadRules() + if len(m.Stats().LastCompileErrors) != 0 { + t.Errorf("expected zero compile errors after reload, got %d", + len(m.Stats().LastCompileErrors)) + } +} + +func TestManagerStats_ReloadErrorPropagates(t *testing.T) { + rs := &fakeRuleSource{err: errors.New("db down")} + m := NewManager(Config{Rules: rs}) + if err := m.ReloadRules(); err == nil { + t.Fatal("expected ReloadRules to propagate the source error") + } +} + +func TestManagerStats_ActiveTailsDefaultsZero(t *testing.T) { + // Without Start() and without a docker dependency we can't + // run real tails, but the counter should be a stable 0 read + // rather than panic/uninitialized. + rs := &fakeRuleSource{} + m := NewManager(Config{Rules: rs}) + if got := m.Stats().ActiveTails; got != 0 { + t.Errorf("ActiveTails on fresh manager = %d, want 0", got) + } +} diff --git a/internal/logscanner/tail.go b/internal/logscanner/tail.go new file mode 100644 index 0000000..d99a1d2 --- /dev/null +++ b/internal/logscanner/tail.go @@ -0,0 +1,245 @@ +package logscanner + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io" + "log/slog" + "strings" + "sync/atomic" + "time" + + "github.com/alexei/tinyforge/internal/docker" +) + +// maxFramePayloadBytes caps a single docker log frame payload so a +// hostile or corrupted stream can't force an unbounded allocation. +// Real-world container log frames are well under this; the limit is +// purely a safety belt against the 4 GiB-by-spec upper bound the +// header byte width allows. +const maxFramePayloadBytes = 16 * 1024 * 1024 // 16 MiB + +// maxLineBufferBytes caps the per-stream line-reassembly buffer for +// the same reason. A stream that never sends a newline would +// otherwise grow without bound. 1 MiB matches the bufio.Scanner +// default max and is far above any reasonable log line. +const maxLineBufferBytes = 1 * 1024 * 1024 // 1 MiB + +// dockerLogger is the minimum surface the tail needs from the +// docker client. Defined as an interface so tests can stand up a +// canned log stream without spinning up containerd. +type dockerLogger interface { + ContainerLogsOpts(ctx context.Context, containerID string, opts docker.ContainerLogOptions) (io.ReadCloser, error) +} + +// HitEmitter is what the tail calls when a rule fires. Implemented +// by the manager — it writes to event_log and publishes on the bus. +// Kept as a single-method interface so the tail has zero coupling to +// store/events. +type HitEmitter interface { + EmitHit(ctx context.Context, hit Hit) +} + +// tail watches one container. Lifetime is bound to the supplied +// context — cancellation propagates through docker's log stream so +// goroutines exit promptly on container stop or manager shutdown. +type tail struct { + containerID string + workloadID string + docker dockerLogger + engine *Engine + emitter HitEmitter + snapshot *atomic.Pointer[Snapshot] +} + +// run is the goroutine body. Opens a follow=true log stream and +// reads lines until the stream EOFs or context is cancelled. +// Tails terminate quietly on context cancel; any other error is +// logged at warn so the operator sees it without it stopping the +// whole manager. +func (t *tail) run(ctx context.Context) { + stream, err := t.docker.ContainerLogsOpts(ctx, t.containerID, docker.ContainerLogOptions{ + Follow: true, + Tail: "0", // start from the newest line; backfill is out of scope + ShowStdout: true, + ShowStderr: true, + }) + if err != nil { + if !errors.Is(err, context.Canceled) { + slog.Warn("logscanner: open log stream", "container", t.containerID, "error", err) + } + return + } + defer stream.Close() + + // Demuxing: docker emits multiplexed frames when TTY is off, + // where each frame's first byte is the stream type (1=stdout, + // 2=stderr) and bytes 4..7 are big-endian length. When TTY is + // on the stream is plain bytes. We try to detect the frame + // header on the first read; if it parses as a valid frame we + // use the demux path, otherwise we fall back to line-by-line. + bufStream := bufio.NewReaderSize(stream, 32*1024) + if isMultiplexedStream(bufStream) { + t.readDemuxed(ctx, bufStream) + } else { + t.readPlain(ctx, bufStream) + } +} + +// readDemuxed reads docker's multiplexed log frames. Each frame: +// +// [type 1 byte][000 3 bytes][len 4 bytes BE][payload len bytes] +// +// We track the stream type per-frame and feed payload bytes into a +// per-stream line buffer so a line split across frames still +// reassembles cleanly. +func (t *tail) readDemuxed(ctx context.Context, r *bufio.Reader) { + header := make([]byte, 8) + // Two line buffers — stdout vs stderr — so a partial line read + // across multiple frames doesn't bleed into the other stream. + buffers := map[string]*bytes.Buffer{ + "stdout": {}, + "stderr": {}, + } + for { + if ctx.Err() != nil { + return + } + if _, err := io.ReadFull(r, header); err != nil { + if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) { + slog.Warn("logscanner: read header", "container", t.containerID, "error", err) + } + return + } + stream := "stdout" + if header[0] == 2 { + stream = "stderr" + } + size := int(uint32(header[4])<<24 | uint32(header[5])<<16 | uint32(header[6])<<8 | uint32(header[7])) + if size <= 0 { + continue + } + if size > maxFramePayloadBytes { + slog.Warn("logscanner: frame payload exceeds cap, dropping tail", + "container", t.containerID, "size", size, "cap", maxFramePayloadBytes) + return + } + payload := make([]byte, size) + if _, err := io.ReadFull(r, payload); err != nil { + if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) { + slog.Warn("logscanner: read payload", "container", t.containerID, "error", err) + } + return + } + buf := buffers[stream] + buf.Write(payload) + // Reassembly buffer should never accumulate beyond + // maxLineBufferBytes — a stream with no newline would + // otherwise grow unbounded. Drop the buffer (and the partial + // line) when the cap is reached so the tail stays healthy. + if buf.Len() > maxLineBufferBytes { + slog.Warn("logscanner: line buffer exceeded cap, resetting", + "container", t.containerID, "stream", stream, "size", buf.Len()) + buf.Reset() + continue + } + for { + line, ok := readLineFromBuffer(buf) + if !ok { + break + } + t.processLine(ctx, stream, line) + } + } +} + +// readPlain reads a non-multiplexed stream (TTY mode). All lines +// are reported as "stdout" since the API doesn't separate the +// streams in this mode. +func (t *tail) readPlain(ctx context.Context, r *bufio.Reader) { + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + for scanner.Scan() { + if ctx.Err() != nil { + return + } + t.processLine(ctx, "stdout", scanner.Text()) + } + if err := scanner.Err(); err != nil && !errors.Is(err, context.Canceled) { + slog.Warn("logscanner: plain read", "container", t.containerID, "error", err) + } +} + +func (t *tail) processLine(ctx context.Context, stream, line string) { + line = strings.TrimRight(line, "\r\n") + // Strip the leading RFC3339Nano timestamp docker prepends when + // Timestamps=true. We validate the prefix actually parses as a + // time rather than blindly stripping at the first space — a + // legitimate log line whose first word is 20+ chars (e.g. a long + // hash with no whitespace) would otherwise lose data. + if idx := strings.IndexByte(line, ' '); idx >= 20 && idx <= 40 { + candidate := line[:idx] + if _, err := time.Parse(time.RFC3339Nano, candidate); err == nil { + line = line[idx+1:] + } + } + snap := t.snapshot.Load() + if snap == nil { + return + } + rules := snap.EffectiveFor(t.workloadID) + hits := t.engine.Match(t.containerID, t.workloadID, stream, line, rules) + for _, h := range hits { + t.emitter.EmitHit(ctx, h) + } +} + +// isMultiplexedStream peeks at the first 8 bytes to detect docker's +// multiplexed frame header. The frame type byte must be 0..2 and +// bytes 1..3 must be zero. We restore the bytes via UnreadByte +// equivalent (bufio.Peek), so the actual reader is unaffected. +func isMultiplexedStream(r *bufio.Reader) bool { + peeked, err := r.Peek(8) + if err != nil || len(peeked) < 8 { + return false + } + if peeked[0] > 2 || peeked[1] != 0 || peeked[2] != 0 || peeked[3] != 0 { + return false + } + return true +} + +func readLineFromBuffer(b *bytes.Buffer) (string, bool) { + idx := bytes.IndexByte(b.Bytes(), '\n') + if idx < 0 { + return "", false + } + line := make([]byte, idx) + copy(line, b.Bytes()[:idx]) + b.Next(idx + 1) // consume line + newline + return string(line), true +} + +// validate sanity-checks the tail before launch. Returns an error +// the manager can log rather than panicking on a nil dependency. +func (t *tail) validate() error { + if t.containerID == "" { + return fmt.Errorf("tail: container_id required") + } + if t.docker == nil { + return fmt.Errorf("tail: docker client required") + } + if t.engine == nil { + return fmt.Errorf("tail: engine required") + } + if t.emitter == nil { + return fmt.Errorf("tail: emitter required") + } + if t.snapshot == nil { + return fmt.Errorf("tail: snapshot pointer required") + } + return nil +} diff --git a/internal/logscanner/tail_test.go b/internal/logscanner/tail_test.go new file mode 100644 index 0000000..8122b0b --- /dev/null +++ b/internal/logscanner/tail_test.go @@ -0,0 +1,156 @@ +package logscanner + +import ( + "bufio" + "bytes" + "context" + "sync" + "sync/atomic" + "testing" + + "github.com/alexei/tinyforge/internal/store" +) + +// recordingEmitter captures hits from a tail without touching the +// real store or event bus. Concurrent-safe so we can let the tail +// goroutine push hits while the test asserts. +type recordingEmitter struct { + mu sync.Mutex + hits []Hit +} + +func (r *recordingEmitter) EmitHit(_ context.Context, hit Hit) { + r.mu.Lock() + r.hits = append(r.hits, hit) + r.mu.Unlock() +} + +func (r *recordingEmitter) Hits() []Hit { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]Hit, len(r.hits)) + copy(out, r.hits) + return out +} + +// snapshotForRule wraps a single rule into an atomic Snapshot +// pointer so tests can drive processLine without rebuilding from +// store rows. +func snapshotForRule(t *testing.T, pattern string) *atomic.Pointer[Snapshot] { + t.Helper() + rows := []store.LogScanRule{ + {ID: 1, Name: "t", Pattern: pattern, Severity: "warn", Streams: "all", Enabled: true}, + } + snap, errs := BuildSnapshot(rows) + if len(errs) != 0 { + t.Fatalf("BuildSnapshot: %v", errs) + } + p := &atomic.Pointer[Snapshot]{} + p.Store(snap) + return p +} + +func TestProcessLine_StripsRFC3339Prefix(t *testing.T) { + emit := &recordingEmitter{} + snap := snapshotForRule(t, `panic`) + tl := &tail{ + containerID: "c1", + workloadID: "w1", + engine: NewEngine(), + emitter: emit, + snapshot: snap, + } + tl.processLine(context.Background(), "stderr", "2026-05-11T12:34:56.123456789Z fatal panic in worker") + hits := emit.Hits() + if len(hits) != 1 { + t.Fatalf("want 1 hit, got %d", len(hits)) + } + if hits[0].Line != "fatal panic in worker" { + t.Errorf("timestamp not stripped: %q", hits[0].Line) + } +} + +func TestProcessLine_LeavesNonTimestampedLineAlone(t *testing.T) { + // The previous heuristic stripped the first word of any line + // whose first space landed past byte 20. A long-hash line with + // no embedded timestamp must now survive intact. + emit := &recordingEmitter{} + snap := snapshotForRule(t, `(?i)hash`) + tl := &tail{ + containerID: "c1", + workloadID: "w1", + engine: NewEngine(), + emitter: emit, + snapshot: snap, + } + long := "aaaaaaaaaaaaaaaaaaaaaaaa hash payload" + tl.processLine(context.Background(), "stdout", long) + hits := emit.Hits() + if len(hits) != 1 { + t.Fatalf("want 1 hit, got %d", len(hits)) + } + if hits[0].Line != long { + t.Errorf("non-timestamp prefix incorrectly stripped: %q", hits[0].Line) + } +} + +func TestProcessLine_NoSnapshotIsSafe(t *testing.T) { + // Tail constructed before the manager loads its first snapshot + // (or after a transient nil) must not crash — processLine + // returns silently when snapshot.Load() is nil. + tl := &tail{ + containerID: "c1", + workloadID: "w1", + engine: NewEngine(), + emitter: &recordingEmitter{}, + snapshot: &atomic.Pointer[Snapshot]{}, // empty pointer + } + tl.processLine(context.Background(), "stdout", "anything") +} + +func TestReadLineFromBuffer(t *testing.T) { + buf := &bytes.Buffer{} + buf.WriteString("line one\nline two\npartial") + + got, ok := readLineFromBuffer(buf) + if !ok || got != "line one" { + t.Fatalf("first read: ok=%v got=%q", ok, got) + } + got, ok = readLineFromBuffer(buf) + if !ok || got != "line two" { + t.Fatalf("second read: ok=%v got=%q", ok, got) + } + // Trailing partial line stays in buffer until more data arrives. + if _, ok := readLineFromBuffer(buf); ok { + t.Errorf("partial line should NOT yield until newline arrives") + } + if buf.String() != "partial" { + t.Errorf("remainder=%q want %q", buf.String(), "partial") + } +} + +func TestIsMultiplexedStream(t *testing.T) { + // Valid docker frame header: type=2 (stderr), 3 nulls, then 4-byte length. + demuxed := []byte{2, 0, 0, 0, 0, 0, 0, 12, 'h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd', '!'} + if !isMultiplexedStream(newReader(demuxed)) { + t.Error("valid demuxed frame should be detected") + } + + // Plain text: first byte is a printable letter, header check fails. + plain := []byte("plain log line without framing\n") + if isMultiplexedStream(newReader(plain)) { + t.Error("plain text should not be detected as multiplexed") + } + + // Header with type=3 is invalid (docker uses 0,1,2 only). + bad := []byte{3, 0, 0, 0, 0, 0, 0, 1} + if isMultiplexedStream(newReader(bad)) { + t.Error("type=3 header is not a valid docker frame") + } +} + +// newReader returns a *bufio.Reader sized large enough to satisfy +// the Peek(8) the demuxer detection requires. +func newReader(b []byte) *bufio.Reader { + return bufio.NewReaderSize(bytes.NewReader(b), 32) +} diff --git a/internal/notify/notifier.go b/internal/notify/notifier.go index ce423e7..c046d98 100644 --- a/internal/notify/notifier.go +++ b/internal/notify/notifier.go @@ -40,10 +40,11 @@ type Event struct { type Tier string const ( - TierSettings Tier = "settings" - TierProject Tier = "project" - TierStage Tier = "stage" - TierSite Tier = "site" + TierSettings Tier = "settings" + TierProject Tier = "project" + TierStage Tier = "stage" + TierSite Tier = "site" + TierEventTrigger Tier = "event_trigger" ) // Header names for outgoing webhooks. The signature header name matches @@ -145,6 +146,43 @@ func (n *Notifier) SendSigned(webhookURL, secret string, tier Tier, event Event) }() } +// SendPayload dispatches an arbitrary JSON payload to the given URL, +// signed with HMAC-SHA256 when secret is non-empty. Used by the +// event-trigger dispatcher: event-log → trigger filter → webhook +// delivery. The eventType travels in the X-Tinyforge-Event header so +// receivers can route by it without parsing the body. +// +// Fire-and-forget. Failures are logged at warn but never propagate; +// trigger reliability is observed via webhook_deliveries (audit trail) +// and the dispatcher remaining bus-driven means delivery hiccups +// cannot back-pressure event publishing. +func (n *Notifier) SendPayload(webhookURL, secret, eventType string, payload any) { + if webhookURL == "" { + return + } + delivery := uuid.NewString() + timestamp := time.Now().UTC().Format(time.RFC3339) + + n.wg.Add(1) + go func() { + defer n.wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + _, err := n.doSendRaw(ctx, webhookURL, secret, TierEventTrigger, delivery, eventType, timestamp, payload) + host := safeHost(webhookURL) + if err != nil { + slog.Warn("notify: trigger webhook send failed", + "tier", TierEventTrigger, "host", host, "delivery", delivery, + "event", eventType, "signed", secret != "", "error", err) + return + } + slog.Info("notify: trigger webhook dispatched", + "tier", TierEventTrigger, "host", host, "delivery", delivery, + "event", eventType, "signed", secret != "") + }() +} + // SendSyncForTest performs a synchronous, single-shot send for the "Send // test" UI button. Returns a TestResult describing what the receiver // answered with so the operator can confirm wiring without watching server @@ -183,6 +221,42 @@ func (n *Notifier) SendSyncForTest(ctx context.Context, webhookURL, secret strin return result } +// SendSyncForTestPayload is the arbitrary-payload counterpart to +// SendSyncForTest. Returns the same TestResult shape but sends an +// arbitrary payload + event-type pair through the shared HTTP+HMAC +// core. Used by the event-trigger /test endpoint so the operator's +// receiver sees the same envelope shape it will receive during normal +// dispatch — verifying a different payload would defeat the test's +// purpose. +func (n *Notifier) SendSyncForTestPayload(ctx context.Context, webhookURL, secret string, tier Tier, eventType string, payload any) TestResult { + delivery := uuid.NewString() + timestamp := time.Now().UTC().Format(time.RFC3339) + result := TestResult{ + URL: webhookURL, + Tier: tier, + SignatureSent: secret != "", + DeliveryID: delivery, + } + if webhookURL == "" { + result.Error = "no webhook URL configured for this tier" + return result + } + start := time.Now() + resp, err := n.doSendRaw(ctx, webhookURL, secret, tier, delivery, eventType, timestamp, payload) + result.LatencyMs = time.Since(start).Milliseconds() + if err != nil { + result.Error = err.Error() + if resp != nil { + result.StatusCode = resp.StatusCode + result.ResponseSnippet = resp.BodyPreview + } + return result + } + result.StatusCode = resp.StatusCode + result.ResponseSnippet = resp.BodyPreview + return result +} + // sendResponse captures the small subset of the receiver's response we want // to surface back to the operator (status + a body preview). Distinct from // http.Response so callers don't accidentally hold an unread body. @@ -198,7 +272,16 @@ type sendResponse struct { // exactly what travels on the wire. Receivers MUST verify against the raw // body, not a re-serialised copy. func (n *Notifier) doSend(ctx context.Context, webhookURL, secret string, tier Tier, delivery string, event Event) (*sendResponse, error) { - body, err := json.Marshal(event) + return n.doSendRaw(ctx, webhookURL, secret, tier, delivery, event.Type, event.Timestamp, event) +} + +// doSendRaw is the shared HTTP+HMAC core. It serializes any payload to +// JSON, signs the resulting bytes (if a secret is configured) and +// dispatches with the same Tinyforge headers as the legacy deploy-event +// path. Separated out so SendPayload can reuse it without forcing the +// caller to fit into the Event shape. +func (n *Notifier) doSendRaw(ctx context.Context, webhookURL, secret string, tier Tier, delivery, eventType, timestamp string, payload any) (*sendResponse, error) { + body, err := json.Marshal(payload) if err != nil { return nil, fmt.Errorf("marshal notification: %w", err) } @@ -209,9 +292,9 @@ func (n *Notifier) doSend(ctx context.Context, webhookURL, secret string, tier T } req.Header.Set("Content-Type", "application/json") req.Header.Set("User-Agent", userAgent) - req.Header.Set(HeaderEvent, event.Type) + req.Header.Set(HeaderEvent, eventType) req.Header.Set(HeaderDelivery, delivery) - req.Header.Set(HeaderTimestamp, event.Timestamp) + req.Header.Set(HeaderTimestamp, timestamp) req.Header.Set(HeaderTier, string(tier)) if secret != "" { req.Header.Set(HeaderSignature, "sha256="+sign(secret, body)) diff --git a/internal/store/event_triggers.go b/internal/store/event_triggers.go new file mode 100644 index 0000000..94f1e3d --- /dev/null +++ b/internal/store/event_triggers.go @@ -0,0 +1,208 @@ +package store + +import ( + "database/sql" + "errors" + "fmt" + "strings" +) + +// CreateEventTrigger inserts a new trigger row. ID is assigned by the +// auto-increment column and returned on the populated struct. +func (s *Store) CreateEventTrigger(t EventTrigger) (EventTrigger, error) { + if strings.TrimSpace(t.Name) == "" { + return EventTrigger{}, fmt.Errorf("event_trigger: name is required") + } + if t.ActionType == "" { + t.ActionType = EventTriggerActionWebhook + } + if t.ActionType != EventTriggerActionWebhook { + return EventTrigger{}, fmt.Errorf("event_trigger: unsupported action_type %q", t.ActionType) + } + if strings.TrimSpace(t.ActionTarget) == "" { + return EventTrigger{}, fmt.Errorf("event_trigger: action_target is required") + } + + now := Now() + t.CreatedAt = now + t.UpdatedAt = now + + res, err := s.db.Exec( + `INSERT INTO event_triggers + (name, filter_severity, filter_source, filter_message_regex, + action_type, action_target, action_secret, enabled, + created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + t.Name, t.FilterSeverity, t.FilterSource, t.FilterMessageRegex, + t.ActionType, t.ActionTarget, t.ActionSecret, boolToInt(t.Enabled), + t.CreatedAt, t.UpdatedAt, + ) + if err != nil { + return EventTrigger{}, fmt.Errorf("insert event trigger: %w", err) + } + id, err := res.LastInsertId() + if err != nil { + return EventTrigger{}, fmt.Errorf("get event trigger id: %w", err) + } + t.ID = id + return t, nil +} + +// ListEventTriggers returns every trigger row, ordered by id so the UI +// rendering is stable across requests. Trigger counts are expected to +// be small (operator-curated), so unbounded listing is fine. +func (s *Store) ListEventTriggers() ([]EventTrigger, error) { + rows, err := s.db.Query( + `SELECT id, name, filter_severity, filter_source, filter_message_regex, + action_type, action_target, action_secret, enabled, + created_at, updated_at + FROM event_triggers ORDER BY id`, + ) + if err != nil { + return nil, fmt.Errorf("query event triggers: %w", err) + } + defer rows.Close() + + out := []EventTrigger{} + for rows.Next() { + t, err := scanEventTrigger(rows) + if err != nil { + return nil, err + } + out = append(out, t) + } + return out, rows.Err() +} + +// ListEnabledEventTriggers returns only the rows with enabled=1. The +// dispatcher hot path uses this so a disabled trigger costs nothing. +func (s *Store) ListEnabledEventTriggers() ([]EventTrigger, error) { + rows, err := s.db.Query( + `SELECT id, name, filter_severity, filter_source, filter_message_regex, + action_type, action_target, action_secret, enabled, + created_at, updated_at + FROM event_triggers WHERE enabled = 1 ORDER BY id`, + ) + if err != nil { + return nil, fmt.Errorf("query enabled event triggers: %w", err) + } + defer rows.Close() + + out := []EventTrigger{} + for rows.Next() { + t, err := scanEventTrigger(rows) + if err != nil { + return nil, err + } + out = append(out, t) + } + return out, rows.Err() +} + +// GetEventTrigger returns one trigger by ID or ErrNotFound. +func (s *Store) GetEventTrigger(id int64) (EventTrigger, error) { + row := s.db.QueryRow( + `SELECT id, name, filter_severity, filter_source, filter_message_regex, + action_type, action_target, action_secret, enabled, + created_at, updated_at + FROM event_triggers WHERE id = ?`, id, + ) + t, err := scanEventTriggerRow(row) + if errors.Is(err, sql.ErrNoRows) { + return EventTrigger{}, fmt.Errorf("event trigger %d: %w", id, ErrNotFound) + } + if err != nil { + return EventTrigger{}, fmt.Errorf("query event trigger: %w", err) + } + return t, nil +} + +// UpdateEventTrigger overwrites the editable columns of an existing row. +// CreatedAt is preserved; UpdatedAt is refreshed. +func (s *Store) UpdateEventTrigger(t EventTrigger) (EventTrigger, error) { + if t.ID == 0 { + return EventTrigger{}, fmt.Errorf("event_trigger: id is required for update") + } + if strings.TrimSpace(t.Name) == "" { + return EventTrigger{}, fmt.Errorf("event_trigger: name is required") + } + if t.ActionType == "" { + t.ActionType = EventTriggerActionWebhook + } + if t.ActionType != EventTriggerActionWebhook { + return EventTrigger{}, fmt.Errorf("event_trigger: unsupported action_type %q", t.ActionType) + } + if strings.TrimSpace(t.ActionTarget) == "" { + return EventTrigger{}, fmt.Errorf("event_trigger: action_target is required") + } + + t.UpdatedAt = Now() + res, err := s.db.Exec( + `UPDATE event_triggers + SET name = ?, filter_severity = ?, filter_source = ?, + filter_message_regex = ?, action_type = ?, action_target = ?, + action_secret = ?, enabled = ?, updated_at = ? + WHERE id = ?`, + t.Name, t.FilterSeverity, t.FilterSource, t.FilterMessageRegex, + t.ActionType, t.ActionTarget, t.ActionSecret, boolToInt(t.Enabled), + t.UpdatedAt, t.ID, + ) + if err != nil { + return EventTrigger{}, fmt.Errorf("update event trigger: %w", err) + } + n, _ := res.RowsAffected() + if n == 0 { + return EventTrigger{}, fmt.Errorf("event trigger %d: %w", t.ID, ErrNotFound) + } + return s.GetEventTrigger(t.ID) +} + +// DeleteEventTrigger removes a trigger by ID. Idempotent on the +// caller's side: returns ErrNotFound if the row is already gone so a +// double-click in the UI gives a clean error rather than 500. +func (s *Store) DeleteEventTrigger(id int64) error { + res, err := s.db.Exec(`DELETE FROM event_triggers WHERE id = ?`, id) + if err != nil { + return fmt.Errorf("delete event trigger: %w", err) + } + n, _ := res.RowsAffected() + if n == 0 { + return fmt.Errorf("event trigger %d: %w", id, ErrNotFound) + } + return nil +} + +func scanEventTrigger(rows *sql.Rows) (EventTrigger, error) { + var t EventTrigger + var enabled int + if err := rows.Scan( + &t.ID, &t.Name, &t.FilterSeverity, &t.FilterSource, &t.FilterMessageRegex, + &t.ActionType, &t.ActionTarget, &t.ActionSecret, &enabled, + &t.CreatedAt, &t.UpdatedAt, + ); err != nil { + return EventTrigger{}, fmt.Errorf("scan event trigger: %w", err) + } + t.Enabled = enabled != 0 + return t, nil +} + +func scanEventTriggerRow(row *sql.Row) (EventTrigger, error) { + var t EventTrigger + var enabled int + if err := row.Scan( + &t.ID, &t.Name, &t.FilterSeverity, &t.FilterSource, &t.FilterMessageRegex, + &t.ActionType, &t.ActionTarget, &t.ActionSecret, &enabled, + &t.CreatedAt, &t.UpdatedAt, + ); err != nil { + return EventTrigger{}, err + } + t.Enabled = enabled != 0 + return t, nil +} + +func boolToInt(b bool) int { + if b { + return 1 + } + return 0 +} diff --git a/internal/store/log_scan_rules.go b/internal/store/log_scan_rules.go new file mode 100644 index 0000000..4e9b9cb --- /dev/null +++ b/internal/store/log_scan_rules.go @@ -0,0 +1,256 @@ +package store + +import ( + "database/sql" + "errors" + "fmt" + "strings" +) + +// CreateLogScanRule inserts a new rule row. Validates severity + +// streams enum membership and rejects negative cooldowns. +func (s *Store) CreateLogScanRule(r LogScanRule) (LogScanRule, error) { + if err := validateLogScanRule(r); err != nil { + return LogScanRule{}, err + } + now := Now() + r.CreatedAt = now + r.UpdatedAt = now + res, err := s.db.Exec( + `INSERT INTO log_scan_rules + (workload_id, overrides_id, name, pattern, severity, streams, + cooldown_seconds, enabled, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + r.WorkloadID, r.OverridesID, r.Name, r.Pattern, r.Severity, r.Streams, + r.CooldownSeconds, boolToInt(r.Enabled), r.CreatedAt, r.UpdatedAt, + ) + if err != nil { + return LogScanRule{}, fmt.Errorf("insert log scan rule: %w", err) + } + id, err := res.LastInsertId() + if err != nil { + return LogScanRule{}, fmt.Errorf("get log scan rule id: %w", err) + } + r.ID = id + return r, nil +} + +// ListLogScanRules returns every rule, ordered by id for stable UI +// rendering. +func (s *Store) ListLogScanRules() ([]LogScanRule, error) { + return s.queryLogScanRules( + `SELECT id, workload_id, overrides_id, name, pattern, severity, streams, + cooldown_seconds, enabled, created_at, updated_at + FROM log_scan_rules ORDER BY id`, + ) +} + +// ListLogScanRulesByWorkload returns all rows directly attached to +// the workload (workload-only additions and per-workload overrides), +// excluding global rules. Useful for the workload detail page. +func (s *Store) ListLogScanRulesByWorkload(workloadID string) ([]LogScanRule, error) { + return s.queryLogScanRules( + `SELECT id, workload_id, overrides_id, name, pattern, severity, streams, + cooldown_seconds, enabled, created_at, updated_at + FROM log_scan_rules WHERE workload_id = ? ORDER BY id`, + workloadID, + ) +} + +// GetLogScanRule fetches one rule by id or returns ErrNotFound. +func (s *Store) GetLogScanRule(id int64) (LogScanRule, error) { + row := s.db.QueryRow( + `SELECT id, workload_id, overrides_id, name, pattern, severity, streams, + cooldown_seconds, enabled, created_at, updated_at + FROM log_scan_rules WHERE id = ?`, id, + ) + r, err := scanLogScanRuleRow(row) + if errors.Is(err, sql.ErrNoRows) { + return LogScanRule{}, fmt.Errorf("log scan rule %d: %w", id, ErrNotFound) + } + if err != nil { + return LogScanRule{}, fmt.Errorf("query log scan rule: %w", err) + } + return r, nil +} + +// UpdateLogScanRule overwrites the editable columns of a rule row. +// id, workload_id, overrides_id are immutable on update — change the +// scope of a rule by deleting + recreating, to keep the +// hot-reload-snapshot semantics simple. +func (s *Store) UpdateLogScanRule(r LogScanRule) (LogScanRule, error) { + if r.ID == 0 { + return LogScanRule{}, fmt.Errorf("log scan rule: id is required for update") + } + if err := validateLogScanRule(r); err != nil { + return LogScanRule{}, err + } + r.UpdatedAt = Now() + res, err := s.db.Exec( + `UPDATE log_scan_rules + SET name = ?, pattern = ?, severity = ?, streams = ?, + cooldown_seconds = ?, enabled = ?, updated_at = ? + WHERE id = ?`, + r.Name, r.Pattern, r.Severity, r.Streams, + r.CooldownSeconds, boolToInt(r.Enabled), r.UpdatedAt, r.ID, + ) + if err != nil { + return LogScanRule{}, fmt.Errorf("update log scan rule: %w", err) + } + n, _ := res.RowsAffected() + if n == 0 { + return LogScanRule{}, fmt.Errorf("log scan rule %d: %w", r.ID, ErrNotFound) + } + return s.GetLogScanRule(r.ID) +} + +// DeleteLogScanRule removes a rule by id. Override rows referencing +// this id are cascade-deleted at the application layer because we +// don't enforce SQLite FK constraints repo-wide. The two DELETEs run +// inside a single transaction so a mid-cascade failure can't leave +// overrides orphaned by a vanished global. +func (s *Store) DeleteLogScanRule(id int64) error { + tx, err := s.db.Begin() + if err != nil { + return fmt.Errorf("begin delete tx: %w", err) + } + defer tx.Rollback() //nolint:errcheck // commit path returns nil; rollback after commit is a no-op + if _, err := tx.Exec(`DELETE FROM log_scan_rules WHERE overrides_id = ?`, id); err != nil { + return fmt.Errorf("delete dependent log scan overrides: %w", err) + } + res, err := tx.Exec(`DELETE FROM log_scan_rules WHERE id = ?`, id) + if err != nil { + return fmt.Errorf("delete log scan rule: %w", err) + } + n, _ := res.RowsAffected() + if n == 0 { + return fmt.Errorf("log scan rule %d: %w", id, ErrNotFound) + } + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit delete tx: %w", err) + } + return nil +} + +// EffectiveLogScanRules computes the effective rule set for one +// workload according to the spec in docs/LOGSCAN_AND_TRIGGERS_TODO.md: +// +// 1. All global rules (workload_id == "" AND overrides_id == 0) +// minus globals that have a per-workload override row. +// 2. Plus workload-only rules (workload_id == X AND overrides_id == 0). +// 3. Plus per-workload override rules (workload_id == X AND overrides_id != 0), +// which carry the override's own enabled/pattern/severity. +// +// Computed in Go after two simple SELECTs since rule counts will be +// small (operator-curated, dozens not thousands). +func (s *Store) EffectiveLogScanRules(workloadID string) ([]LogScanRule, error) { + all, err := s.ListLogScanRules() + if err != nil { + return nil, err + } + overrides := map[int64]LogScanRule{} // globalID -> override row + var workloadOnly []LogScanRule + var globals []LogScanRule + for _, r := range all { + switch { + case r.WorkloadID == "" && r.OverridesID == 0: + globals = append(globals, r) + case r.WorkloadID == workloadID && r.OverridesID == 0: + workloadOnly = append(workloadOnly, r) + case r.WorkloadID == workloadID && r.OverridesID != 0: + overrides[r.OverridesID] = r + } + } + out := make([]LogScanRule, 0, len(globals)+len(workloadOnly)) + for _, g := range globals { + if ov, ok := overrides[g.ID]; ok { + // Override row's fields win — including enabled=false to + // turn off the global for this workload. + out = append(out, ov) + } else { + out = append(out, g) + } + } + out = append(out, workloadOnly...) + return out, nil +} + +func (s *Store) queryLogScanRules(query string, args ...any) ([]LogScanRule, error) { + rows, err := s.db.Query(query, args...) + if err != nil { + return nil, fmt.Errorf("query log scan rules: %w", err) + } + defer rows.Close() + out := []LogScanRule{} + for rows.Next() { + r, err := scanLogScanRuleRows(rows) + if err != nil { + return nil, err + } + out = append(out, r) + } + return out, rows.Err() +} + +func scanLogScanRuleRows(rows *sql.Rows) (LogScanRule, error) { + var r LogScanRule + var enabled int + if err := rows.Scan( + &r.ID, &r.WorkloadID, &r.OverridesID, &r.Name, &r.Pattern, &r.Severity, &r.Streams, + &r.CooldownSeconds, &enabled, &r.CreatedAt, &r.UpdatedAt, + ); err != nil { + return LogScanRule{}, fmt.Errorf("scan log scan rule: %w", err) + } + r.Enabled = enabled != 0 + return r, nil +} + +func scanLogScanRuleRow(row *sql.Row) (LogScanRule, error) { + var r LogScanRule + var enabled int + if err := row.Scan( + &r.ID, &r.WorkloadID, &r.OverridesID, &r.Name, &r.Pattern, &r.Severity, &r.Streams, + &r.CooldownSeconds, &enabled, &r.CreatedAt, &r.UpdatedAt, + ); err != nil { + return LogScanRule{}, err + } + r.Enabled = enabled != 0 + return r, nil +} + +// validateLogScanRule enforces the per-row invariants. Regex +// compilation is intentionally NOT done here — it's a hot-path +// concern owned by the engine snapshot, and engine compile errors +// become engine-side warnings rather than store-side rejections to +// keep the failure mode operator-debuggable. +func validateLogScanRule(r LogScanRule) error { + if strings.TrimSpace(r.Name) == "" { + return fmt.Errorf("log scan rule: name is required") + } + if strings.TrimSpace(r.Pattern) == "" { + return fmt.Errorf("log scan rule: pattern is required") + } + switch r.Severity { + case LogScanSeverityInfo, LogScanSeverityWarn, LogScanSeverityError: + case "": + // Default applied at the caller; allow blank. + default: + return fmt.Errorf("log scan rule: invalid severity %q", r.Severity) + } + switch r.Streams { + case LogScanStreamAll, LogScanStreamStdout, LogScanStreamStderr: + case "": + default: + return fmt.Errorf("log scan rule: invalid streams %q", r.Streams) + } + if r.CooldownSeconds < 0 { + return fmt.Errorf("log scan rule: cooldown_seconds must be >= 0") + } + // An override row must reference an existing global id and live + // under a specific workload. The store doesn't verify the FK + // (no PRAGMA foreign_keys), but we can sanity-check the shape. + if r.OverridesID != 0 && r.WorkloadID == "" { + return fmt.Errorf("log scan rule: override row requires workload_id") + } + return nil +} diff --git a/internal/store/log_scan_rules_test.go b/internal/store/log_scan_rules_test.go new file mode 100644 index 0000000..4dfd46a --- /dev/null +++ b/internal/store/log_scan_rules_test.go @@ -0,0 +1,155 @@ +package store + +import ( + "strings" + "testing" +) + +func TestCreateLogScanRule_Validates(t *testing.T) { + s := newTestStore(t) + cases := []struct { + name string + in LogScanRule + wantErr string + }{ + { + name: "missing name", + in: LogScanRule{Pattern: "x"}, + wantErr: "name is required", + }, + { + name: "missing pattern", + in: LogScanRule{Name: "n"}, + wantErr: "pattern is required", + }, + { + name: "bad severity", + in: LogScanRule{Name: "n", Pattern: "x", Severity: "loud"}, + wantErr: "invalid severity", + }, + { + name: "bad streams", + in: LogScanRule{Name: "n", Pattern: "x", Streams: "both"}, + wantErr: "invalid streams", + }, + { + name: "negative cooldown", + in: LogScanRule{Name: "n", Pattern: "x", CooldownSeconds: -1}, + wantErr: "cooldown_seconds must be", + }, + { + name: "override without workload", + in: LogScanRule{Name: "n", Pattern: "x", OverridesID: 5}, + wantErr: "override row requires workload_id", + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + _, err := s.CreateLogScanRule(c.in) + if err == nil { + t.Fatalf("expected error containing %q, got nil", c.wantErr) + } + if !strings.Contains(err.Error(), c.wantErr) { + t.Fatalf("error mismatch: got %q want substring %q", err.Error(), c.wantErr) + } + }) + } +} + +func TestCreateAndGetLogScanRule(t *testing.T) { + s := newTestStore(t) + r, err := s.CreateLogScanRule(LogScanRule{ + Name: "panics", Pattern: `\bpanic\b`, Severity: "error", Streams: "stderr", + CooldownSeconds: 30, Enabled: true, + }) + if err != nil { + t.Fatalf("create: %v", err) + } + if r.ID == 0 { + t.Fatal("id should be set") + } + got, err := s.GetLogScanRule(r.ID) + if err != nil { + t.Fatalf("get: %v", err) + } + if got.Pattern != `\bpanic\b` { + t.Errorf("pattern mismatch: %q", got.Pattern) + } + if !got.Enabled { + t.Error("enabled lost on round-trip") + } +} + +func TestEffectiveLogScanRules(t *testing.T) { + s := newTestStore(t) + g, _ := s.CreateLogScanRule(LogScanRule{ + Name: "global", Pattern: "panic", Severity: "warn", Streams: "all", Enabled: true, + }) + _, _ = s.CreateLogScanRule(LogScanRule{ + Name: "w1-only", Pattern: "slow_query", WorkloadID: "w1", Severity: "info", Streams: "all", Enabled: true, + }) + _, _ = s.CreateLogScanRule(LogScanRule{ + Name: "override-for-w1", Pattern: "panic", WorkloadID: "w1", OverridesID: g.ID, + Severity: "error", Streams: "all", Enabled: true, + }) + + w1, err := s.EffectiveLogScanRules("w1") + if err != nil { + t.Fatalf("effective w1: %v", err) + } + if len(w1) != 2 { + t.Fatalf("w1 effective should be 2 (override + addition), got %d", len(w1)) + } + // First entry replaces the global with the override (error severity). + if w1[0].Severity != "error" { + t.Errorf("override severity not applied: %q", w1[0].Severity) + } + + w2, err := s.EffectiveLogScanRules("w2") + if err != nil { + t.Fatalf("effective w2: %v", err) + } + if len(w2) != 1 { + t.Fatalf("w2 effective should be 1 (just the global), got %d", len(w2)) + } + if w2[0].Severity != "warn" { + t.Errorf("w2 should see original severity: %q", w2[0].Severity) + } +} + +func TestDeleteLogScanRule_CascadesOverrides(t *testing.T) { + s := newTestStore(t) + g, _ := s.CreateLogScanRule(LogScanRule{ + Name: "global", Pattern: "panic", Severity: "warn", Streams: "all", Enabled: true, + }) + ov, _ := s.CreateLogScanRule(LogScanRule{ + Name: "override", Pattern: "panic", WorkloadID: "w1", OverridesID: g.ID, + Severity: "error", Streams: "all", Enabled: true, + }) + + if err := s.DeleteLogScanRule(g.ID); err != nil { + t.Fatalf("delete: %v", err) + } + if _, err := s.GetLogScanRule(ov.ID); err == nil { + t.Error("override should be cascade-deleted with its global") + } +} + +func TestUpdateLogScanRule(t *testing.T) { + s := newTestStore(t) + r, _ := s.CreateLogScanRule(LogScanRule{ + Name: "n", Pattern: "x", Severity: "warn", Streams: "all", Enabled: true, + }) + r.Pattern = "y" + r.Enabled = false + got, err := s.UpdateLogScanRule(r) + if err != nil { + t.Fatalf("update: %v", err) + } + if got.Pattern != "y" { + t.Errorf("pattern not updated: %q", got.Pattern) + } + if got.Enabled { + t.Error("enabled=false not applied") + } +} diff --git a/internal/store/models.go b/internal/store/models.go index 7e1ea23..77f66f8 100644 --- a/internal/store/models.go +++ b/internal/store/models.go @@ -197,6 +197,34 @@ type StageEnv struct { UpdatedAt string `json:"updated_at"` } +// WorkloadVolume is the plugin-shape equivalent of legacy Volume: a +// per-workload mount declaration. The Scope enum matches the existing +// VolumeScope contract so the legacy resolver can be reused once its +// project_id assumption is loosened. +type WorkloadVolume struct { + ID string `json:"id"` + WorkloadID string `json:"workload_id"` + Source string `json:"source"` + Target string `json:"target"` + Scope string `json:"scope"` + Name string `json:"name"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` +} + +// WorkloadEnv is the plugin-shape equivalent of StageEnv: per-workload +// environment variable overrides, optionally encrypted at rest. Read by +// the Source plugin at deploy time, merged on top of source_config.env. +type WorkloadEnv struct { + ID string `json:"id"` + WorkloadID string `json:"workload_id"` + Key string `json:"key"` + Value string `json:"value"` + Encrypted bool `json:"encrypted"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` +} + // VolumeScope defines the sharing scope for a volume mount. // Valid scopes: instance, stage, project, project_named, named, ephemeral. type VolumeScope string @@ -333,6 +361,82 @@ type EventLog struct { CreatedAt string `json:"created_at"` } +// EventTrigger is a filter+action rule evaluated against EventLog +// entries published on the bus. When all non-empty filters match, the +// trigger fires its configured action (webhook today, additional action +// types extensible via the ActionType enum). +// +// Filter fields use a comma-separated list shape for multi-value +// filters (severity, source) to keep the schema flat — empty string +// means "no filter on this dimension." FilterMessageRegex is a single +// regex evaluated against EventLog.Message. +// +// Loop-prevention: deliveries are recorded in webhook_deliveries (the +// existing audit trail). The dispatcher MUST NOT write to event_log +// or it will recurse. +type EventTrigger struct { + ID int64 `json:"id"` + Name string `json:"name"` + FilterSeverity string `json:"filter_severity"` // comma list: "warn,error"; "" = any + FilterSource string `json:"filter_source"` // comma list: "logscan,deploy"; "" = any + FilterMessageRegex string `json:"filter_message_regex"` // "" = any + ActionType string `json:"action_type"` // "webhook" today + ActionTarget string `json:"action_target"` // URL for webhook + ActionSecret string `json:"action_secret"` // optional HMAC secret for signed delivery + Enabled bool `json:"enabled"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` +} + +// EventTriggerActionType enumerates the supported action_type values. +// Adding a new action is additive — old triggers keep working, the +// dispatcher just learns a new branch. +const ( + EventTriggerActionWebhook = "webhook" +) + +// LogScanRule is one regex-based pattern the log scanner evaluates +// against container log lines. The (workload_id, overrides_id) pair +// implements the "global rule with optional per-workload override" +// pattern documented in docs/LOGSCAN_AND_TRIGGERS_TODO.md: +// +// - WorkloadID == "" && OverridesID == 0 → global rule, applies to +// every workload unless overridden. +// - WorkloadID != "" && OverridesID == 0 → workload-only addition. +// - WorkloadID != "" && OverridesID != 0 → override of the named +// global rule for one workload (Enabled=false to disable globally +// for this workload). +type LogScanRule struct { + ID int64 `json:"id"` + WorkloadID string `json:"workload_id"` // "" = global + OverridesID int64 `json:"overrides_id"` // 0 = not an override + Name string `json:"name"` + Pattern string `json:"pattern"` // regex, compiled at load + Severity string `json:"severity"` // info|warn|error + Streams string `json:"streams"` // all|stdout|stderr + CooldownSeconds int `json:"cooldown_seconds"` + Enabled bool `json:"enabled"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` +} + +// Log scan stream filter values. "all" reads both streams; "stdout" +// or "stderr" filter to one. Used both for store validation and at +// docker-side log read time. +const ( + LogScanStreamAll = "all" + LogScanStreamStdout = "stdout" + LogScanStreamStderr = "stderr" +) + +// Log scan severity values mirror the event_log enum so a matched +// rule lands as an event_log row with the rule's severity verbatim. +const ( + LogScanSeverityInfo = "info" + LogScanSeverityWarn = "warn" + LogScanSeverityError = "error" +) + // WorkloadKind enumerates the kinds of things that own containers. // Each kind has a corresponding row in projects/stacks/static_sites referenced via Workload.RefID. type WorkloadKind string @@ -346,12 +450,24 @@ const ( // Workload is the unifying primitive that abstracts Project, Stack, and StaticSite. // Each row is paired with exactly one project/stack/site via (Kind, RefID). // Notification + webhook config moves here so it lives in one place across kinds. +// +// SourceKind / SourceConfig / TriggerKind / TriggerConfig / PublicFaces / +// ParentWorkloadID populate the unified plugin model from the Workload-first +// refactor. Existing rows keep these empty until they are explicitly migrated +// or replaced — the legacy Kind/RefID columns continue to point at +// project/stack/site rows in parallel during the cutover. type Workload struct { ID string `json:"id"` - Kind string `json:"kind"` // project | stack | site + Kind string `json:"kind"` // project | stack | site (legacy discriminator) RefID string `json:"ref_id"` Name string `json:"name"` - AppID string `json:"app_id"` // nullable; "" = unassigned + AppID string `json:"app_id"` // nullable; "" = unassigned (a.k.a. GroupID after rename) + SourceKind string `json:"source_kind"` // "" until plugin-mode populated + SourceConfig string `json:"source_config"` // JSON-encoded, decoded by the matching Source + TriggerKind string `json:"trigger_kind"` + TriggerConfig string `json:"trigger_config"` // JSON-encoded, decoded by the matching Trigger + PublicFaces string `json:"public_faces"` // JSON-encoded []PublicFace + ParentWorkloadID string `json:"parent_workload_id"` // "" = root; non-empty = stage chain NotificationURL string `json:"notification_url"` NotificationSecret string `json:"-"` // never serialized WebhookSecret string `json:"-"` // URL-identifier secret; never serialized @@ -384,8 +500,14 @@ type Container struct { ProxyRouteID string `json:"proxy_route_id"` NpmProxyID int `json:"npm_proxy_id"` LastSeenAt string `json:"last_seen_at"` - CreatedAt string `json:"created_at"` - UpdatedAt string `json:"updated_at"` + // ExtraJSON carries source-specific metadata that isn't promoted to a + // first-class column — currently per-face proxy route IDs for + // multi-face image deploys. Stored as a JSON object; '{}' on empty + // rows. Sources own the shape; consumers should tolerate unknown + // keys. + ExtraJSON string `json:"extra_json"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` } // App is an optional grouping of workloads (e.g., "my-saas" = web project + worker stack + redis stack). diff --git a/internal/store/store.go b/internal/store/store.go index 69b915c..5b9535c 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -181,6 +181,15 @@ func (s *Store) runMigrations() error { // re-write path; the LEFT JOIN in ListContainersByStageID falls back // to (project_id, role=stage_name) so legacy rows still resolve. `ALTER TABLE containers ADD COLUMN stage_id TEXT NOT NULL DEFAULT ''`, + // Workload-first refactor columns (2026-05-10). Land additively so + // the legacy kind/ref_id columns continue to serve existing + // project/stack/site rows during cutover. + `ALTER TABLE workloads ADD COLUMN source_kind TEXT NOT NULL DEFAULT ''`, + `ALTER TABLE workloads ADD COLUMN source_config TEXT NOT NULL DEFAULT '{}'`, + `ALTER TABLE workloads ADD COLUMN trigger_kind TEXT NOT NULL DEFAULT ''`, + `ALTER TABLE workloads ADD COLUMN trigger_config TEXT NOT NULL DEFAULT '{}'`, + `ALTER TABLE workloads ADD COLUMN public_faces TEXT NOT NULL DEFAULT '[]'`, + `ALTER TABLE workloads ADD COLUMN parent_workload_id TEXT NOT NULL DEFAULT ''`, } // Workload refactor tables (2026-05-09). Workload is the unifying primitive @@ -195,6 +204,12 @@ func (s *Store) runMigrations() error { ref_id TEXT NOT NULL, name TEXT NOT NULL, app_id TEXT NOT NULL DEFAULT '', + source_kind TEXT NOT NULL DEFAULT '', + source_config TEXT NOT NULL DEFAULT '{}', + trigger_kind TEXT NOT NULL DEFAULT '', + trigger_config TEXT NOT NULL DEFAULT '{}', + public_faces TEXT NOT NULL DEFAULT '[]', + parent_workload_id TEXT NOT NULL DEFAULT '', notification_url TEXT NOT NULL DEFAULT '', notification_secret TEXT NOT NULL DEFAULT '', webhook_secret TEXT NOT NULL DEFAULT '', @@ -231,6 +246,34 @@ func (s *Store) runMigrations() error { created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) )`, + // workload_env: per-workload env overrides (encrypt-at-rest for + // secrets). Functional analog of stage_env. Workload deletion + // cascades through the FK so orphan rows are impossible. + `CREATE TABLE IF NOT EXISTS workload_env ( + id TEXT PRIMARY KEY, + workload_id TEXT NOT NULL REFERENCES workloads(id) ON DELETE CASCADE, + key TEXT NOT NULL, + value TEXT NOT NULL DEFAULT '', + encrypted INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')), + UNIQUE(workload_id, key) + )`, + // workload_volumes: per-workload mount declarations. Mirrors the + // legacy `volumes` table shape (source / target / scope / name) + // but keyed on workload_id. UNIQUE on (workload_id, target) so a + // re-add overwrites instead of duplicating. + `CREATE TABLE IF NOT EXISTS workload_volumes ( + id TEXT PRIMARY KEY, + workload_id TEXT NOT NULL REFERENCES workloads(id) ON DELETE CASCADE, + source TEXT NOT NULL DEFAULT '', + target TEXT NOT NULL, + scope TEXT NOT NULL DEFAULT 'absolute', + name TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')), + UNIQUE(workload_id, target) + )`, } for _, t := range workloadTables { if _, err := s.db.Exec(t); err != nil { @@ -312,6 +355,49 @@ func (s *Store) runMigrations() error { } } + // Observability: event_triggers — consume EventLog entries off the + // bus and dispatch webhook actions. Schema kept flat (comma-list + // filters, single optional regex) — see LOGSCAN_AND_TRIGGERS_TODO.md. + observabilityTables := []string{ + `CREATE TABLE IF NOT EXISTS event_triggers ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + filter_severity TEXT NOT NULL DEFAULT '', + filter_source TEXT NOT NULL DEFAULT '', + filter_message_regex TEXT NOT NULL DEFAULT '', + action_type TEXT NOT NULL DEFAULT 'webhook', + action_target TEXT NOT NULL DEFAULT '', + action_secret TEXT NOT NULL DEFAULT '', + enabled INTEGER NOT NULL DEFAULT 1, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + )`, + // log_scan_rules: regex patterns the log-scanner manager + // applies to container log lines. WorkloadID is nullable (via + // "" sentinel) so a global rule can have OverridesID = 0 and + // per-workload overrides reference the global's id. + `CREATE TABLE IF NOT EXISTS log_scan_rules ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + workload_id TEXT NOT NULL DEFAULT '', + overrides_id INTEGER NOT NULL DEFAULT 0, + name TEXT NOT NULL, + pattern TEXT NOT NULL, + severity TEXT NOT NULL DEFAULT 'warn', + streams TEXT NOT NULL DEFAULT 'all', + cooldown_seconds INTEGER NOT NULL DEFAULT 60, + enabled INTEGER NOT NULL DEFAULT 1, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + )`, + `CREATE INDEX IF NOT EXISTS idx_log_scan_rules_workload ON log_scan_rules(workload_id)`, + `CREATE INDEX IF NOT EXISTS idx_log_scan_rules_overrides ON log_scan_rules(overrides_id)`, + } + for _, t := range observabilityTables { + if _, err := s.db.Exec(t); err != nil { + return fmt.Errorf("create observability table: %w", err) + } + } + for _, m := range migrations { if _, err := s.db.Exec(m); err != nil { // "duplicate column" / "already exists" are expected when a @@ -366,6 +452,8 @@ func (s *Store) runMigrations() error { `CREATE INDEX IF NOT EXISTS idx_containers_container_id ON containers(container_id) WHERE container_id != ''`, `CREATE INDEX IF NOT EXISTS idx_containers_kind ON containers(workload_kind)`, `CREATE INDEX IF NOT EXISTS idx_containers_stage_id ON containers(stage_id) WHERE stage_id != ''`, + `CREATE INDEX IF NOT EXISTS idx_workload_env_workload ON workload_env(workload_id)`, + `CREATE INDEX IF NOT EXISTS idx_workload_volumes_workload ON workload_volumes(workload_id)`, } for _, idx := range indexes { if _, err := s.db.Exec(idx); err != nil {