feat(triggers): first-class triggers + bindings with fan-out webhook
Build / build (push) Successful in 10m39s
Build / build (push) Successful in 10m39s
Promote triggers from embedded workload fields to standalone records
joined to workloads via workload_trigger_bindings. One trigger (webhook,
registry watcher, git push, manual) now fans out to many workloads with
per-binding config overrides (top-level JSON merge, binding wins).
Backend
- new triggers + workload_trigger_bindings tables with ON DELETE CASCADE
- boot-time backfill of embedded trigger config inside per-workload tx
- store.ErrUnique sentinel translates SQLite UNIQUE at store boundary
- /api/triggers CRUD + /api/triggers/{id}/{webhook,bindings}
- /api/bindings/{id} update/delete; /api/workloads/{id}/triggers list+bind
- bindTriggerToWorkload accepts trigger_id or inline {kind,name,config}
- inline-create uses CreateTriggerWithBindingTx (no orphan triggers)
- validateBindingConfig enforces 8 KiB cap + plugin Validate on merged
- ListTriggersWithBindingCount + ListBindings*WithNames remove N+1
- POST /api/webhook/triggers/{secret} resolves trigger then fans out
- bounded worker pool (4) per request; per-binding error isolation
- outcome accounting: deployed / skipped / no-match / errored
- legacy /api/webhook/workloads/{secret} route removed (clean break;
backfill keeps secrets resolvable at the new /triggers/{secret} path)
- reconciler gate dropped from (Source && Trigger) to Source only
- MergeJSONConfig returns freshly allocated slices (no fan-out aliasing)
- WithEffectiveTrigger lets existing Trigger.Match contract stay unchanged
Frontend
- /triggers list, new wizard, [id] detail (bindings, webhook rotate)
- workload create wizard: NEW / PICK / SKIP trigger modes
- workload detail: bindings panel + Add-trigger modal (inline / pick)
- per-binding override editor with merged-preview + 8 KiB guard
- "OVERRIDES n FIELDS" row badge when binding_config is non-empty
- shared TriggerKindForm component (registry / git / manual + JSON)
- 3 raw <input type=checkbox> replaced with <ToggleSwitch>
- full EN + RU i18n: redeployTriggers.*, apps.detail.bindings.*,
apps.new.triggers.*, nav.triggers; event-triggers nav disambiguated
Doc
- WORKLOAD_REFACTOR_TODO: trigger-split marked DONE; next focus is
the static-source inline port + hard legacy cutover (Priority 1)
This commit is contained in:
@@ -430,6 +430,12 @@ func (s *Server) Router() chi.Router {
|
||||
// running image tag onto this workload's default_tag.
|
||||
r.Get("/chain", s.getWorkloadChain)
|
||||
r.With(auth.AdminOnly).Post("/promote-from/{sourceID}", s.promoteFromWorkload)
|
||||
|
||||
// Trigger bindings on this workload — the symmetric view
|
||||
// of /triggers/{id}/bindings keyed on the workload side
|
||||
// so the workload detail page is one round-trip.
|
||||
r.Get("/triggers", s.listBindingsForWorkload)
|
||||
r.With(auth.AdminOnly).Post("/triggers", s.bindTriggerToWorkload)
|
||||
})
|
||||
|
||||
// Global container index, joined to workload + app names.
|
||||
@@ -446,6 +452,26 @@ func (s *Server) Router() chi.Router {
|
||||
r.Delete("/apps/{id}", s.deleteApp)
|
||||
})
|
||||
|
||||
// First-class Triggers (redeploy signal sources). One trigger
|
||||
// (registry / git / webhook / manual / schedule / log_scan)
|
||||
// fans out to many workloads via workload_trigger_bindings.
|
||||
// Reads are open to authenticated users; mutations + secret
|
||||
// rotation are admin-gated.
|
||||
r.Get("/triggers", s.listTriggers)
|
||||
r.Get("/triggers/{id}", s.getTrigger)
|
||||
r.Get("/triggers/{id}/bindings", s.listBindingsForTrigger)
|
||||
r.Group(func(r chi.Router) {
|
||||
r.Use(auth.AdminOnly)
|
||||
r.Post("/triggers", s.createTrigger)
|
||||
r.Put("/triggers/{id}", s.updateTrigger)
|
||||
r.Delete("/triggers/{id}", s.deleteTrigger)
|
||||
r.Get("/triggers/{id}/webhook", s.getTriggerWebhook)
|
||||
r.Post("/triggers/{id}/webhook/regenerate", s.regenerateTriggerWebhook)
|
||||
r.Post("/triggers/{id}/bindings", s.bindWorkloadToTrigger)
|
||||
r.Put("/bindings/{bid}", s.updateBinding)
|
||||
r.Delete("/bindings/{bid}", s.deleteBinding)
|
||||
})
|
||||
|
||||
// Event triggers: filter+action rules over the event_log
|
||||
// stream. Read endpoints are available to any authenticated
|
||||
// user; mutations + test-dispatch are admin-gated since they
|
||||
|
||||
@@ -0,0 +1,628 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"github.com/alexei/tinyforge/internal/store"
|
||||
"github.com/alexei/tinyforge/internal/workload/plugin"
|
||||
)
|
||||
|
||||
// triggerView is the response shape for /api/triggers. Webhook secrets
|
||||
// are never serialized — read them via the dedicated /webhook subresource
|
||||
// where the canonical URL is composed.
|
||||
type triggerView struct {
|
||||
ID string `json:"id"`
|
||||
Kind string `json:"kind"`
|
||||
Name string `json:"name"`
|
||||
Config json.RawMessage `json:"config"`
|
||||
WebhookEnabled bool `json:"webhook_enabled"`
|
||||
WebhookRequireSignature bool `json:"webhook_require_signature"`
|
||||
BindingCount int `json:"binding_count"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
func (s *Server) toTriggerView(t store.Trigger) triggerView {
|
||||
count, err := s.store.CountBindingsForTrigger(t.ID)
|
||||
if err != nil {
|
||||
slog.Warn("triggerView: count bindings", "trigger", t.ID, "error", err)
|
||||
}
|
||||
return triggerView{
|
||||
ID: t.ID,
|
||||
Kind: t.Kind,
|
||||
Name: t.Name,
|
||||
Config: json.RawMessage(t.Config),
|
||||
WebhookEnabled: t.WebhookSecret != "",
|
||||
WebhookRequireSignature: t.WebhookRequireSignature,
|
||||
BindingCount: count,
|
||||
CreatedAt: t.CreatedAt,
|
||||
UpdatedAt: t.UpdatedAt,
|
||||
}
|
||||
}
|
||||
|
||||
// toTriggerViewWithCount is the join-aware variant used by listTriggers
|
||||
// to avoid one COUNT(*) per row. Kept distinct from toTriggerView so
|
||||
// single-row paths (get/create/update) keep the simple call shape.
|
||||
func toTriggerViewWithCount(row store.TriggerWithBindingCount) triggerView {
|
||||
return triggerView{
|
||||
ID: row.ID,
|
||||
Kind: row.Kind,
|
||||
Name: row.Name,
|
||||
Config: json.RawMessage(row.Config),
|
||||
WebhookEnabled: row.WebhookSecret != "",
|
||||
WebhookRequireSignature: row.WebhookRequireSignature,
|
||||
BindingCount: row.BindingCount,
|
||||
CreatedAt: row.CreatedAt,
|
||||
UpdatedAt: row.UpdatedAt,
|
||||
}
|
||||
}
|
||||
|
||||
// triggerRequest is the create/update body. Config is opaque per kind.
|
||||
// Auto-generates a webhook secret on create when WebhookEnabled is true;
|
||||
// the secret is exposed only via the /webhook subresource.
|
||||
type triggerRequest struct {
|
||||
Kind string `json:"kind"`
|
||||
Name string `json:"name"`
|
||||
Config json.RawMessage `json:"config"`
|
||||
WebhookEnabled bool `json:"webhook_enabled"`
|
||||
WebhookRequireSignature bool `json:"webhook_require_signature"`
|
||||
}
|
||||
|
||||
// Same per-blob caps used on the workload pluginWorkloadRequest path —
|
||||
// triggers and workload trigger configs share the same plugin Validate()
|
||||
// call, so the byte budget should match.
|
||||
const maxTriggerStandaloneConfigBytes = 16 << 10
|
||||
|
||||
func (s *Server) listTriggers(w http.ResponseWriter, r *http.Request) {
|
||||
kind := r.URL.Query().Get("kind")
|
||||
rows, err := s.store.ListTriggersWithBindingCount(kind)
|
||||
if err != nil {
|
||||
slog.Error("list triggers", "error", err)
|
||||
respondError(w, http.StatusInternalServerError, "list triggers")
|
||||
return
|
||||
}
|
||||
out := make([]triggerView, 0, len(rows))
|
||||
for _, t := range rows {
|
||||
out = append(out, toTriggerViewWithCount(t))
|
||||
}
|
||||
respondJSON(w, http.StatusOK, out)
|
||||
}
|
||||
|
||||
func (s *Server) getTrigger(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
t, err := s.store.GetTriggerByID(id)
|
||||
if err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondNotFound(w, "trigger")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "get trigger")
|
||||
return
|
||||
}
|
||||
respondJSON(w, http.StatusOK, s.toTriggerView(t))
|
||||
}
|
||||
|
||||
// buildTriggerFromRequest assembles a store.Trigger ready for insert.
|
||||
// Centralized so the standalone create endpoint and the inline-bind
|
||||
// endpoint cannot drift on secret-generation defaults.
|
||||
func buildTriggerFromRequest(req triggerRequest) store.Trigger {
|
||||
t := store.Trigger{
|
||||
Kind: req.Kind,
|
||||
Name: strings.TrimSpace(req.Name),
|
||||
Config: string(req.Config),
|
||||
WebhookRequireSignature: req.WebhookRequireSignature,
|
||||
}
|
||||
if req.WebhookEnabled {
|
||||
t.WebhookSecret = generateWebhookSecret()
|
||||
t.WebhookSigningSecret = generateWebhookSecret()
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
func (s *Server) createTrigger(w http.ResponseWriter, r *http.Request) {
|
||||
var req triggerRequest
|
||||
if !decodeJSONStrict(w, r, &req) {
|
||||
return
|
||||
}
|
||||
if err := validateTriggerRequest(req); err != nil {
|
||||
respondError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
created, err := s.store.CreateTrigger(buildTriggerFromRequest(req))
|
||||
if err != nil {
|
||||
slog.Error("create trigger", "error", err)
|
||||
// UNIQUE name collision is the most common user-facing failure.
|
||||
if errors.Is(err, store.ErrUnique) {
|
||||
respondError(w, http.StatusConflict, "a trigger with this name already exists")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "create trigger")
|
||||
return
|
||||
}
|
||||
respondJSON(w, http.StatusCreated, s.toTriggerView(created))
|
||||
}
|
||||
|
||||
func (s *Server) updateTrigger(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
existing, err := s.store.GetTriggerByID(id)
|
||||
if err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondNotFound(w, "trigger")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "get trigger")
|
||||
return
|
||||
}
|
||||
var req triggerRequest
|
||||
if !decodeJSONStrict(w, r, &req) {
|
||||
return
|
||||
}
|
||||
// Kind is immutable on update. Mirror the value from the existing
|
||||
// row so validateTriggerRequest can still verify the config blob.
|
||||
req.Kind = existing.Kind
|
||||
if err := validateTriggerRequest(req); err != nil {
|
||||
respondError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
if req.Name != "" {
|
||||
existing.Name = strings.TrimSpace(req.Name)
|
||||
}
|
||||
if len(req.Config) > 0 {
|
||||
existing.Config = string(req.Config)
|
||||
}
|
||||
existing.WebhookRequireSignature = req.WebhookRequireSignature
|
||||
wasEnabled := existing.WebhookSecret != ""
|
||||
if req.WebhookEnabled && !wasEnabled {
|
||||
// false→true transition: rotate both secrets so re-enabling
|
||||
// after a disable does not silently revive an old leaked URL.
|
||||
existing.WebhookSecret = generateWebhookSecret()
|
||||
existing.WebhookSigningSecret = generateWebhookSecret()
|
||||
}
|
||||
if !req.WebhookEnabled {
|
||||
existing.WebhookSecret = ""
|
||||
existing.WebhookSigningSecret = ""
|
||||
}
|
||||
if err := s.store.UpdateTrigger(existing); err != nil {
|
||||
slog.Error("update trigger", "error", err)
|
||||
if errors.Is(err, store.ErrUnique) {
|
||||
respondError(w, http.StatusConflict, "a trigger with this name already exists")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "update trigger")
|
||||
return
|
||||
}
|
||||
respondJSON(w, http.StatusOK, s.toTriggerView(existing))
|
||||
}
|
||||
|
||||
func (s *Server) deleteTrigger(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
if err := s.store.DeleteTrigger(id); err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondNotFound(w, "trigger")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "delete trigger")
|
||||
return
|
||||
}
|
||||
respondJSON(w, http.StatusOK, map[string]string{"deleted": id})
|
||||
}
|
||||
|
||||
// triggerWebhookView surfaces the inbound URL for a trigger. Returns
|
||||
// empty path / secret when the trigger has webhook ingress disabled.
|
||||
type triggerWebhookView struct {
|
||||
URL string `json:"url"`
|
||||
Secret string `json:"secret"`
|
||||
WebhookRequireSignature bool `json:"webhook_require_signature"`
|
||||
}
|
||||
|
||||
func (s *Server) getTriggerWebhook(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
t, err := s.store.GetTriggerByID(id)
|
||||
if err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondNotFound(w, "trigger")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "get trigger")
|
||||
return
|
||||
}
|
||||
view := triggerWebhookView{
|
||||
Secret: t.WebhookSecret,
|
||||
WebhookRequireSignature: t.WebhookRequireSignature,
|
||||
}
|
||||
if t.WebhookSecret != "" {
|
||||
view.URL = "/api/webhook/triggers/" + t.WebhookSecret
|
||||
}
|
||||
respondJSON(w, http.StatusOK, view)
|
||||
}
|
||||
|
||||
func (s *Server) regenerateTriggerWebhook(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
secret := generateWebhookSecret()
|
||||
if err := s.store.SetTriggerWebhookSecret(id, secret); err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondNotFound(w, "trigger")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "rotate webhook secret")
|
||||
return
|
||||
}
|
||||
respondJSON(w, http.StatusOK, map[string]string{
|
||||
"secret": secret,
|
||||
"url": "/api/webhook/triggers/" + secret,
|
||||
})
|
||||
}
|
||||
|
||||
// maxBindingConfigBytes caps a per-binding override blob. Smaller than
|
||||
// the full trigger config — bindings should be lightweight tweaks
|
||||
// (tag pattern, branch filter), not whole replacement configs.
|
||||
const maxBindingConfigBytes = 8 << 10
|
||||
|
||||
// validateBindingConfig enforces the size cap and runs the trigger
|
||||
// plugin's Validate() against the merged (trigger.config + binding)
|
||||
// shape so a malformed override is caught at write time instead of
|
||||
// silently breaking webhook fan-out at deploy time.
|
||||
func validateBindingConfig(trg store.Trigger, bindingConfig json.RawMessage) error {
|
||||
if len(bindingConfig) > maxBindingConfigBytes {
|
||||
return fmt.Errorf("binding_config exceeds %d bytes", maxBindingConfigBytes)
|
||||
}
|
||||
merged, err := plugin.MergeJSONConfig(json.RawMessage(trg.Config), bindingConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("binding_config: %w", err)
|
||||
}
|
||||
tp, err := plugin.GetTrigger(trg.Kind)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return tp.Validate(merged)
|
||||
}
|
||||
|
||||
// validateTriggerRequest type-checks the trigger via the registered
|
||||
// plugin. Accepts an empty config only when the plugin allows it (e.g.
|
||||
// the manual trigger).
|
||||
func validateTriggerRequest(req triggerRequest) error {
|
||||
if strings.TrimSpace(req.Kind) == "" {
|
||||
return fmt.Errorf("kind is required")
|
||||
}
|
||||
if strings.TrimSpace(req.Name) == "" {
|
||||
return fmt.Errorf("name is required")
|
||||
}
|
||||
if len(req.Config) > maxTriggerStandaloneConfigBytes {
|
||||
return fmt.Errorf("config exceeds %d bytes", maxTriggerStandaloneConfigBytes)
|
||||
}
|
||||
tp, err := plugin.GetTrigger(req.Kind)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return tp.Validate(req.Config)
|
||||
}
|
||||
|
||||
// bindingView shapes one binding for the /api/triggers/{id}/bindings
|
||||
// listing. Includes the workload's name to avoid an N+1 round-trip on
|
||||
// the frontend.
|
||||
type bindingView struct {
|
||||
ID string `json:"id"`
|
||||
WorkloadID string `json:"workload_id"`
|
||||
WorkloadName string `json:"workload_name"`
|
||||
TriggerID string `json:"trigger_id"`
|
||||
BindingConfig json.RawMessage `json:"binding_config"`
|
||||
Enabled bool `json:"enabled"`
|
||||
SortOrder int `json:"sort_order"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
func (s *Server) toBindingView(b store.WorkloadTriggerBinding) bindingView {
|
||||
name := ""
|
||||
if w, err := s.store.GetWorkloadByID(b.WorkloadID); err == nil {
|
||||
name = w.Name
|
||||
}
|
||||
return bindingView{
|
||||
ID: b.ID,
|
||||
WorkloadID: b.WorkloadID,
|
||||
WorkloadName: name,
|
||||
TriggerID: b.TriggerID,
|
||||
BindingConfig: json.RawMessage(b.BindingConfig),
|
||||
Enabled: b.Enabled,
|
||||
SortOrder: b.SortOrder,
|
||||
CreatedAt: b.CreatedAt,
|
||||
UpdatedAt: b.UpdatedAt,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) listBindingsForTrigger(w http.ResponseWriter, r *http.Request) {
|
||||
tid := chi.URLParam(r, "id")
|
||||
if _, err := s.store.GetTriggerByID(tid); err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondNotFound(w, "trigger")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "get trigger")
|
||||
return
|
||||
}
|
||||
rows, err := s.store.ListBindingsForTriggerWithNames(tid)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusInternalServerError, "list bindings")
|
||||
return
|
||||
}
|
||||
out := make([]bindingView, 0, len(rows))
|
||||
for _, b := range rows {
|
||||
out = append(out, bindingView{
|
||||
ID: b.ID,
|
||||
WorkloadID: b.WorkloadID,
|
||||
WorkloadName: b.WorkloadName,
|
||||
TriggerID: b.TriggerID,
|
||||
BindingConfig: json.RawMessage(b.BindingConfig),
|
||||
Enabled: b.Enabled,
|
||||
SortOrder: b.SortOrder,
|
||||
CreatedAt: b.CreatedAt,
|
||||
UpdatedAt: b.UpdatedAt,
|
||||
})
|
||||
}
|
||||
respondJSON(w, http.StatusOK, out)
|
||||
}
|
||||
|
||||
// bindingRequest is shared by trigger-side bind (POST .../bindings) and
|
||||
// workload-side bind (POST workloads/{id}/triggers).
|
||||
type bindingRequest struct {
|
||||
WorkloadID string `json:"workload_id"`
|
||||
TriggerID string `json:"trigger_id"`
|
||||
BindingConfig json.RawMessage `json:"binding_config"`
|
||||
Enabled *bool `json:"enabled"`
|
||||
SortOrder int `json:"sort_order"`
|
||||
}
|
||||
|
||||
func (s *Server) bindWorkloadToTrigger(w http.ResponseWriter, r *http.Request) {
|
||||
tid := chi.URLParam(r, "id")
|
||||
var req bindingRequest
|
||||
if !decodeJSONStrict(w, r, &req) {
|
||||
return
|
||||
}
|
||||
if req.WorkloadID == "" {
|
||||
respondError(w, http.StatusBadRequest, "workload_id is required")
|
||||
return
|
||||
}
|
||||
trg, err := s.store.GetTriggerByID(tid)
|
||||
if err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondNotFound(w, "trigger")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "get trigger")
|
||||
return
|
||||
}
|
||||
if _, err := s.store.GetWorkloadByID(req.WorkloadID); err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondNotFound(w, "workload")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "get workload")
|
||||
return
|
||||
}
|
||||
if err := validateBindingConfig(trg, req.BindingConfig); err != nil {
|
||||
respondError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
enabled := true
|
||||
if req.Enabled != nil {
|
||||
enabled = *req.Enabled
|
||||
}
|
||||
b := store.WorkloadTriggerBinding{
|
||||
WorkloadID: req.WorkloadID,
|
||||
TriggerID: tid,
|
||||
BindingConfig: string(req.BindingConfig),
|
||||
Enabled: enabled,
|
||||
SortOrder: req.SortOrder,
|
||||
}
|
||||
created, err := s.store.CreateBinding(b)
|
||||
if err != nil {
|
||||
if errors.Is(err, store.ErrUnique) {
|
||||
respondError(w, http.StatusConflict, "this workload is already bound to this trigger")
|
||||
return
|
||||
}
|
||||
slog.Error("create binding", "error", err)
|
||||
respondError(w, http.StatusInternalServerError, "create binding")
|
||||
return
|
||||
}
|
||||
respondJSON(w, http.StatusCreated, s.toBindingView(created))
|
||||
}
|
||||
|
||||
func (s *Server) updateBinding(w http.ResponseWriter, r *http.Request) {
|
||||
bid := chi.URLParam(r, "bid")
|
||||
existing, err := s.store.GetBindingByID(bid)
|
||||
if err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondNotFound(w, "binding")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "get binding")
|
||||
return
|
||||
}
|
||||
var req bindingRequest
|
||||
if !decodeJSONStrict(w, r, &req) {
|
||||
return
|
||||
}
|
||||
if len(req.BindingConfig) > 0 {
|
||||
trg, terr := s.store.GetTriggerByID(existing.TriggerID)
|
||||
if terr != nil {
|
||||
slog.Error("update binding: trigger lookup", "trigger", existing.TriggerID, "error", terr)
|
||||
respondError(w, http.StatusInternalServerError, "trigger lookup")
|
||||
return
|
||||
}
|
||||
if err := validateBindingConfig(trg, req.BindingConfig); err != nil {
|
||||
respondError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
existing.BindingConfig = string(req.BindingConfig)
|
||||
}
|
||||
if req.Enabled != nil {
|
||||
existing.Enabled = *req.Enabled
|
||||
}
|
||||
existing.SortOrder = req.SortOrder
|
||||
if err := s.store.UpdateBinding(existing); err != nil {
|
||||
respondError(w, http.StatusInternalServerError, "update binding")
|
||||
return
|
||||
}
|
||||
respondJSON(w, http.StatusOK, s.toBindingView(existing))
|
||||
}
|
||||
|
||||
// listBindingsForWorkload is the workload-side mirror of
|
||||
// listBindingsForTrigger. Returns every trigger bound to the workload
|
||||
// in sort_order so the detail page can render them inline.
|
||||
func (s *Server) listBindingsForWorkload(w http.ResponseWriter, r *http.Request) {
|
||||
wid := chi.URLParam(r, "id")
|
||||
if _, err := s.store.GetWorkloadByID(wid); err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondNotFound(w, "workload")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "get workload")
|
||||
return
|
||||
}
|
||||
rows, err := s.store.ListBindingsForWorkloadWithNames(wid)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusInternalServerError, "list bindings")
|
||||
return
|
||||
}
|
||||
type item struct {
|
||||
bindingView
|
||||
TriggerKind string `json:"trigger_kind"`
|
||||
TriggerName string `json:"trigger_name"`
|
||||
}
|
||||
out := make([]item, 0, len(rows))
|
||||
for _, b := range rows {
|
||||
out = append(out, item{
|
||||
bindingView: bindingView{
|
||||
ID: b.ID,
|
||||
WorkloadID: b.WorkloadID,
|
||||
TriggerID: b.TriggerID,
|
||||
BindingConfig: json.RawMessage(b.BindingConfig),
|
||||
Enabled: b.Enabled,
|
||||
SortOrder: b.SortOrder,
|
||||
CreatedAt: b.CreatedAt,
|
||||
UpdatedAt: b.UpdatedAt,
|
||||
},
|
||||
TriggerKind: b.TriggerKind,
|
||||
TriggerName: b.TriggerName,
|
||||
})
|
||||
}
|
||||
respondJSON(w, http.StatusOK, out)
|
||||
}
|
||||
|
||||
// workloadBindRequest covers the two UX flows: bind an existing trigger
|
||||
// (TriggerID present) or inline-create one in the same call (TriggerID
|
||||
// empty + Inline populated). The inline form keeps the 1:1 case feeling
|
||||
// unchanged from the embedded-trigger era.
|
||||
type workloadBindRequest struct {
|
||||
TriggerID string `json:"trigger_id"`
|
||||
BindingConfig json.RawMessage `json:"binding_config"`
|
||||
Enabled *bool `json:"enabled"`
|
||||
SortOrder int `json:"sort_order"`
|
||||
Inline *triggerRequest `json:"inline"`
|
||||
}
|
||||
|
||||
func (s *Server) bindTriggerToWorkload(w http.ResponseWriter, r *http.Request) {
|
||||
wid := chi.URLParam(r, "id")
|
||||
if _, err := s.store.GetWorkloadByID(wid); err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondNotFound(w, "workload")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "get workload")
|
||||
return
|
||||
}
|
||||
var req workloadBindRequest
|
||||
if !decodeJSONStrict(w, r, &req) {
|
||||
return
|
||||
}
|
||||
if req.TriggerID == "" && req.Inline == nil {
|
||||
respondError(w, http.StatusBadRequest, "either trigger_id or inline trigger is required")
|
||||
return
|
||||
}
|
||||
|
||||
enabled := true
|
||||
if req.Enabled != nil {
|
||||
enabled = *req.Enabled
|
||||
}
|
||||
|
||||
// Inline path: create trigger + binding atomically so a binding
|
||||
// failure cannot leak a half-built trigger row.
|
||||
if req.TriggerID == "" {
|
||||
if err := validateTriggerRequest(*req.Inline); err != nil {
|
||||
respondError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
_, b, err := s.store.CreateTriggerWithBindingTx(
|
||||
buildTriggerFromRequest(*req.Inline),
|
||||
store.WorkloadTriggerBinding{
|
||||
WorkloadID: wid,
|
||||
BindingConfig: string(req.BindingConfig),
|
||||
Enabled: enabled,
|
||||
SortOrder: req.SortOrder,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, store.ErrUnique) {
|
||||
respondError(w, http.StatusConflict, "a trigger with this name already exists")
|
||||
return
|
||||
}
|
||||
slog.Error("inline trigger+binding tx", "error", err)
|
||||
respondError(w, http.StatusInternalServerError, "create inline trigger+binding")
|
||||
return
|
||||
}
|
||||
respondJSON(w, http.StatusCreated, s.toBindingView(b))
|
||||
return
|
||||
}
|
||||
|
||||
// Existing-trigger path: just bind.
|
||||
trg, err := s.store.GetTriggerByID(req.TriggerID)
|
||||
if err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondNotFound(w, "trigger")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "get trigger")
|
||||
return
|
||||
}
|
||||
if err := validateBindingConfig(trg, req.BindingConfig); err != nil {
|
||||
respondError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
b, err := s.store.CreateBinding(store.WorkloadTriggerBinding{
|
||||
WorkloadID: wid,
|
||||
TriggerID: req.TriggerID,
|
||||
BindingConfig: string(req.BindingConfig),
|
||||
Enabled: enabled,
|
||||
SortOrder: req.SortOrder,
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, store.ErrUnique) {
|
||||
respondError(w, http.StatusConflict, "this workload is already bound to this trigger")
|
||||
return
|
||||
}
|
||||
slog.Error("create binding from workload side", "error", err)
|
||||
respondError(w, http.StatusInternalServerError, "create binding")
|
||||
return
|
||||
}
|
||||
respondJSON(w, http.StatusCreated, s.toBindingView(b))
|
||||
}
|
||||
|
||||
func (s *Server) deleteBinding(w http.ResponseWriter, r *http.Request) {
|
||||
bid := chi.URLParam(r, "bid")
|
||||
if err := s.store.DeleteBinding(bid); err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondNotFound(w, "binding")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "delete binding")
|
||||
return
|
||||
}
|
||||
respondJSON(w, http.StatusOK, map[string]string{"deleted": bid})
|
||||
}
|
||||
@@ -134,10 +134,16 @@ func (r *Reconciler) ReconcileOnce(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// reconcilePluginWorkloads iterates every workload row that opted into
|
||||
// the plugin pipeline (source_kind + trigger_kind both set) and asks the
|
||||
// dispatcher to invoke Source.Reconcile. Failures are logged per-workload
|
||||
// — one workload's broken state must not stop sweeping the rest.
|
||||
// reconcilePluginWorkloads iterates every workload row that has a
|
||||
// Source plugin and asks the dispatcher to invoke Source.Reconcile.
|
||||
// Failures are logged per-workload — one workload's broken state must
|
||||
// not stop sweeping the rest.
|
||||
//
|
||||
// Trigger configuration is no longer required to reconcile: a workload
|
||||
// with a Source but no trigger bindings is still a deployed thing whose
|
||||
// container state must stay in sync (manual-only deploys are common
|
||||
// during early setup). After the trigger-split refactor triggers live
|
||||
// in their own table, so the only gate here is SourceKind.
|
||||
//
|
||||
// No-op when the plugin dispatcher hasn't been wired (boot-time race,
|
||||
// disabled deployments, tests).
|
||||
@@ -151,7 +157,7 @@ func (r *Reconciler) reconcilePluginWorkloads(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
for _, w := range rows {
|
||||
if w.SourceKind == "" || w.TriggerKind == "" {
|
||||
if w.SourceKind == "" {
|
||||
continue
|
||||
}
|
||||
pw := toPluginWorkload(w)
|
||||
|
||||
@@ -510,6 +510,43 @@ type Container struct {
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
// Trigger is a first-class redeploy signal source. Triggers were embedded
|
||||
// in workload rows (workload.trigger_kind / trigger_config) until the
|
||||
// trigger-split refactor; they are now standalone records bound to
|
||||
// workloads via WorkloadTriggerBinding so a single trigger (a webhook,
|
||||
// registry watcher, schedule, git push) can fan out to many workloads.
|
||||
//
|
||||
// Webhook secrets live here, not on the workload — the inbound webhook
|
||||
// URL identifies a trigger, which then resolves its bindings to decide
|
||||
// which workloads to fire.
|
||||
type Trigger struct {
|
||||
ID string `json:"id"`
|
||||
Kind string `json:"kind"` // registry | git | manual | schedule | log_scan | ...
|
||||
Name string `json:"name"` // human-readable, unique
|
||||
Config string `json:"config"` // JSON-encoded, decoded by the matching plugin
|
||||
WebhookSecret string `json:"-"` // URL-identifier secret; never serialized
|
||||
WebhookSigningSecret string `json:"-"` // HMAC key; never serialized
|
||||
WebhookRequireSignature bool `json:"webhook_require_signature"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
// WorkloadTriggerBinding joins a Workload to a Trigger. BindingConfig is
|
||||
// the per-binding override applied on top of Trigger.Config (top-level
|
||||
// JSON merge: binding fields win). Empty BindingConfig means "use the
|
||||
// trigger's config verbatim". Enabled false skips the binding without
|
||||
// deleting it (useful for paused stages).
|
||||
type WorkloadTriggerBinding struct {
|
||||
ID string `json:"id"`
|
||||
WorkloadID string `json:"workload_id"`
|
||||
TriggerID string `json:"trigger_id"`
|
||||
BindingConfig string `json:"binding_config"` // JSON-encoded; "{}" = none
|
||||
Enabled bool `json:"enabled"`
|
||||
SortOrder int `json:"sort_order"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
// App is an optional grouping of workloads (e.g., "my-saas" = web project + worker stack + redis stack).
|
||||
// Schema lives here from day one so future UI work is unblocked, but no UI is wired in v1.
|
||||
type App struct {
|
||||
|
||||
@@ -4,15 +4,41 @@ import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
// ErrNotFound is returned when a requested entity does not exist.
|
||||
var ErrNotFound = errors.New("not found")
|
||||
|
||||
// ErrUnique is returned when a write violates a UNIQUE constraint.
|
||||
// Translating the driver-specific message at the store boundary lets
|
||||
// callers use errors.Is instead of fragile substring matching on
|
||||
// err.Error(); the SQLite driver's wording is not part of any contract.
|
||||
var ErrUnique = errors.New("unique constraint violation")
|
||||
|
||||
// translateSQLError maps a driver-level error onto one of the store's
|
||||
// sentinel errors when possible. Returns the original error unchanged
|
||||
// when no mapping applies. The returned error wraps the original via
|
||||
// %w so callers that need the raw message still have it.
|
||||
func translateSQLError(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
msg := err.Error()
|
||||
// modernc.org/sqlite returns text like
|
||||
// "constraint failed: UNIQUE constraint failed: triggers.name (2067)"
|
||||
// Match case-insensitively in case the driver wording shifts.
|
||||
if strings.Contains(strings.ToUpper(msg), "UNIQUE") {
|
||||
return fmt.Errorf("%w: %v", ErrUnique, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Store wraps the SQLite database connection and provides access to all query methods.
|
||||
type Store struct {
|
||||
db *sql.DB
|
||||
@@ -274,6 +300,34 @@ func (s *Store) runMigrations() error {
|
||||
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
UNIQUE(workload_id, target)
|
||||
)`,
|
||||
// triggers: first-class redeploy signal sources. Webhook secrets
|
||||
// move from workload onto the trigger so one webhook URL can fan
|
||||
// out to multiple workloads via workload_trigger_bindings.
|
||||
`CREATE TABLE IF NOT EXISTS triggers (
|
||||
id TEXT PRIMARY KEY,
|
||||
kind TEXT NOT NULL,
|
||||
name TEXT NOT NULL UNIQUE,
|
||||
config TEXT NOT NULL DEFAULT '{}',
|
||||
webhook_secret TEXT NOT NULL DEFAULT '',
|
||||
webhook_signing_secret TEXT NOT NULL DEFAULT '',
|
||||
webhook_require_signature INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
)`,
|
||||
// workload_trigger_bindings: many-to-many between workloads and
|
||||
// triggers. binding_config is the per-binding override applied on
|
||||
// top of trigger.config (top-level JSON merge, binding wins).
|
||||
`CREATE TABLE IF NOT EXISTS workload_trigger_bindings (
|
||||
id TEXT PRIMARY KEY,
|
||||
workload_id TEXT NOT NULL REFERENCES workloads(id) ON DELETE CASCADE,
|
||||
trigger_id TEXT NOT NULL REFERENCES triggers(id) ON DELETE CASCADE,
|
||||
binding_config TEXT NOT NULL DEFAULT '{}',
|
||||
enabled INTEGER NOT NULL DEFAULT 1,
|
||||
sort_order INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
UNIQUE(workload_id, trigger_id)
|
||||
)`,
|
||||
}
|
||||
for _, t := range workloadTables {
|
||||
if _, err := s.db.Exec(t); err != nil {
|
||||
@@ -454,6 +508,11 @@ func (s *Store) runMigrations() error {
|
||||
`CREATE INDEX IF NOT EXISTS idx_containers_stage_id ON containers(stage_id) WHERE stage_id != ''`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_workload_env_workload ON workload_env(workload_id)`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_workload_volumes_workload ON workload_volumes(workload_id)`,
|
||||
// Trigger-split indexes (2026-05-16).
|
||||
`CREATE INDEX IF NOT EXISTS idx_triggers_kind ON triggers(kind)`,
|
||||
`CREATE UNIQUE INDEX IF NOT EXISTS idx_triggers_webhook_secret ON triggers(webhook_secret) WHERE webhook_secret != ''`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_bindings_workload ON workload_trigger_bindings(workload_id)`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_bindings_trigger ON workload_trigger_bindings(trigger_id)`,
|
||||
}
|
||||
for _, idx := range indexes {
|
||||
if _, err := s.db.Exec(idx); err != nil {
|
||||
@@ -474,6 +533,127 @@ func (s *Store) runMigrations() error {
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.backfillTriggersFromWorkloads(); err != nil {
|
||||
slog.Warn("trigger backfill", "error", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// backfillTriggersFromWorkloads converts embedded trigger config on
|
||||
// workload rows into standalone trigger + binding rows. Runs once per
|
||||
// boot and is idempotent — only workloads with non-empty trigger_kind
|
||||
// AND no existing binding produce a new trigger record.
|
||||
//
|
||||
// Each per-workload backfill runs inside a transaction so a partial
|
||||
// failure (binding insert fails after trigger insert succeeds) rolls
|
||||
// back cleanly; otherwise an orphan trigger row would survive forever
|
||||
// because the next boot's bindings-count check sees zero bindings and
|
||||
// tries to re-insert under the same UNIQUE name.
|
||||
//
|
||||
// Trigger names are unconditionally suffixed with the workload's id
|
||||
// short-prefix to make collisions impossible across two workloads with
|
||||
// identical (name, kind) — the "Foo [registry]" + "Foo [registry]" case
|
||||
// would otherwise have one of them silently dropped.
|
||||
//
|
||||
// Why on every boot: the trigger-split refactor is a clean break (no
|
||||
// formal migration). Existing dev databases have triggers embedded in
|
||||
// workloads.trigger_kind / trigger_config; this lifts them into the new
|
||||
// shape so URLs and behavior survive the upgrade.
|
||||
func (s *Store) backfillTriggersFromWorkloads() error {
|
||||
rows, err := s.db.Query(
|
||||
`SELECT id, name, trigger_kind, trigger_config,
|
||||
webhook_secret, webhook_signing_secret, webhook_require_signature
|
||||
FROM workloads
|
||||
WHERE trigger_kind != ''`,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("scan workloads for backfill: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type embedded struct {
|
||||
id, name, kind, config string
|
||||
webhookSecret, webhookSigningSecret string
|
||||
requireSig int
|
||||
}
|
||||
var pending []embedded
|
||||
for rows.Next() {
|
||||
var e embedded
|
||||
if err := rows.Scan(&e.id, &e.name, &e.kind, &e.config,
|
||||
&e.webhookSecret, &e.webhookSigningSecret, &e.requireSig); err != nil {
|
||||
return fmt.Errorf("scan workload row: %w", err)
|
||||
}
|
||||
pending = append(pending, e)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, e := range pending {
|
||||
if err := s.backfillOneTrigger(e.id, e.name, e.kind, e.config,
|
||||
e.webhookSecret, e.webhookSigningSecret, e.requireSig); err != nil {
|
||||
slog.Warn("trigger backfill: workload skipped",
|
||||
"workload", e.id, "error", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// backfillOneTrigger lifts one embedded trigger into its own row + binding
|
||||
// inside a single transaction. Idempotent: a workload that already has at
|
||||
// least one binding is left alone.
|
||||
func (s *Store) backfillOneTrigger(workloadID, workloadName, kind, config,
|
||||
webhookSecret, webhookSigningSecret string, requireSig int) error {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return fmt.Errorf("begin: %w", err)
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
|
||||
var existing int
|
||||
if err := tx.QueryRow(
|
||||
`SELECT COUNT(*) FROM workload_trigger_bindings WHERE workload_id = ?`,
|
||||
workloadID,
|
||||
).Scan(&existing); err != nil {
|
||||
return fmt.Errorf("count bindings: %w", err)
|
||||
}
|
||||
if existing > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
idShort := workloadID
|
||||
if len(idShort) > 8 {
|
||||
idShort = idShort[:8]
|
||||
}
|
||||
triggerName := fmt.Sprintf("%s [%s] %s", workloadName, kind, idShort)
|
||||
triggerID := uuid.New().String()
|
||||
now := Now()
|
||||
if _, err := tx.Exec(
|
||||
`INSERT INTO triggers (id, kind, name, config,
|
||||
webhook_secret, webhook_signing_secret, webhook_require_signature,
|
||||
created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
triggerID, kind, triggerName, config,
|
||||
webhookSecret, webhookSigningSecret, requireSig,
|
||||
now, now,
|
||||
); err != nil {
|
||||
return fmt.Errorf("insert trigger: %w", err)
|
||||
}
|
||||
|
||||
bindingID := uuid.New().String()
|
||||
if _, err := tx.Exec(
|
||||
`INSERT INTO workload_trigger_bindings
|
||||
(id, workload_id, trigger_id, binding_config, enabled, sort_order, created_at, updated_at)
|
||||
VALUES (?, ?, ?, '{}', 1, 0, ?, ?)`,
|
||||
bindingID, workloadID, triggerID, now, now,
|
||||
); err != nil {
|
||||
return fmt.Errorf("insert binding: %w", err)
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return fmt.Errorf("commit: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,303 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const triggerColumns = `id, kind, name, config,
|
||||
webhook_secret, webhook_signing_secret, webhook_require_signature,
|
||||
created_at, updated_at`
|
||||
|
||||
func scanTrigger(s rowScanner) (Trigger, error) {
|
||||
var t Trigger
|
||||
var requireSig int
|
||||
if err := s.Scan(&t.ID, &t.Kind, &t.Name, &t.Config,
|
||||
&t.WebhookSecret, &t.WebhookSigningSecret, &requireSig,
|
||||
&t.CreatedAt, &t.UpdatedAt); err != nil {
|
||||
return Trigger{}, err
|
||||
}
|
||||
t.WebhookRequireSignature = requireSig != 0
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// CreateTrigger inserts a new trigger row. Kind + Name are required.
|
||||
// Config is normalized to "{}" when empty; webhook secret is left empty
|
||||
// unless the caller pre-populates it.
|
||||
func (s *Store) CreateTrigger(t Trigger) (Trigger, error) {
|
||||
if t.ID == "" {
|
||||
t.ID = uuid.New().String()
|
||||
}
|
||||
if t.Config == "" {
|
||||
t.Config = "{}"
|
||||
}
|
||||
t.CreatedAt = Now()
|
||||
t.UpdatedAt = t.CreatedAt
|
||||
_, err := s.db.Exec(
|
||||
`INSERT INTO triggers (`+triggerColumns+`)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
t.ID, t.Kind, t.Name, t.Config,
|
||||
t.WebhookSecret, t.WebhookSigningSecret, BoolToInt(t.WebhookRequireSignature),
|
||||
t.CreatedAt, t.UpdatedAt,
|
||||
)
|
||||
if err != nil {
|
||||
return Trigger{}, fmt.Errorf("insert trigger: %w", translateSQLError(err))
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// GetTriggerByID returns one trigger.
|
||||
func (s *Store) GetTriggerByID(id string) (Trigger, error) {
|
||||
t, err := scanTrigger(s.db.QueryRow(
|
||||
`SELECT `+triggerColumns+` FROM triggers WHERE id = ?`, id,
|
||||
))
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return Trigger{}, fmt.Errorf("trigger %s: %w", id, ErrNotFound)
|
||||
}
|
||||
if err != nil {
|
||||
return Trigger{}, fmt.Errorf("query trigger: %w", err)
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// GetTriggerByWebhookSecret resolves a trigger by its inbound webhook
|
||||
// secret. Empty input is treated as not-found to avoid accidental matches
|
||||
// against rows whose webhook is disabled.
|
||||
func (s *Store) GetTriggerByWebhookSecret(secret string) (Trigger, error) {
|
||||
if secret == "" {
|
||||
return Trigger{}, fmt.Errorf("empty secret: %w", ErrNotFound)
|
||||
}
|
||||
t, err := scanTrigger(s.db.QueryRow(
|
||||
`SELECT `+triggerColumns+` FROM triggers WHERE webhook_secret = ?`, secret,
|
||||
))
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return Trigger{}, ErrNotFound
|
||||
}
|
||||
if err != nil {
|
||||
return Trigger{}, fmt.Errorf("query trigger by webhook secret: %w", err)
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// GetTriggerByName resolves a trigger by its unique human name.
|
||||
func (s *Store) GetTriggerByName(name string) (Trigger, error) {
|
||||
if name == "" {
|
||||
return Trigger{}, fmt.Errorf("empty name: %w", ErrNotFound)
|
||||
}
|
||||
t, err := scanTrigger(s.db.QueryRow(
|
||||
`SELECT `+triggerColumns+` FROM triggers WHERE name = ?`, name,
|
||||
))
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return Trigger{}, ErrNotFound
|
||||
}
|
||||
if err != nil {
|
||||
return Trigger{}, fmt.Errorf("query trigger by name: %w", err)
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// ListTriggers returns all triggers, ordered by name. Optional kind
|
||||
// filter — empty string returns everything.
|
||||
func (s *Store) ListTriggers(kind string) ([]Trigger, error) {
|
||||
var rows *sql.Rows
|
||||
var err error
|
||||
if kind == "" {
|
||||
rows, err = s.db.Query(`SELECT ` + triggerColumns + ` FROM triggers ORDER BY name`)
|
||||
} else {
|
||||
rows, err = s.db.Query(`SELECT `+triggerColumns+` FROM triggers WHERE kind = ? ORDER BY name`, kind)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query triggers: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := []Trigger{}
|
||||
for rows.Next() {
|
||||
t, err := scanTrigger(rows)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scan trigger: %w", err)
|
||||
}
|
||||
out = append(out, t)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// TriggerWithBindingCount projects a Trigger plus its current binding
|
||||
// count in a single round-trip. Used by /api/triggers list rendering so
|
||||
// the response avoids one COUNT(*) per trigger row.
|
||||
type TriggerWithBindingCount struct {
|
||||
Trigger
|
||||
BindingCount int
|
||||
}
|
||||
|
||||
// ListTriggersWithBindingCount returns every trigger ordered by name
|
||||
// with the count of its bindings joined in. Optional kind filter.
|
||||
func (s *Store) ListTriggersWithBindingCount(kind string) ([]TriggerWithBindingCount, error) {
|
||||
const base = `
|
||||
SELECT t.id, t.kind, t.name, t.config,
|
||||
t.webhook_secret, t.webhook_signing_secret, t.webhook_require_signature,
|
||||
t.created_at, t.updated_at,
|
||||
COALESCE(b.cnt, 0)
|
||||
FROM triggers t
|
||||
LEFT JOIN (
|
||||
SELECT trigger_id, COUNT(*) AS cnt
|
||||
FROM workload_trigger_bindings
|
||||
GROUP BY trigger_id
|
||||
) b ON b.trigger_id = t.id`
|
||||
var rows *sql.Rows
|
||||
var err error
|
||||
if kind == "" {
|
||||
rows, err = s.db.Query(base + ` ORDER BY t.name`)
|
||||
} else {
|
||||
rows, err = s.db.Query(base+` WHERE t.kind = ? ORDER BY t.name`, kind)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query triggers with binding count: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := []TriggerWithBindingCount{}
|
||||
for rows.Next() {
|
||||
var t Trigger
|
||||
var requireSig int
|
||||
var count int
|
||||
if err := rows.Scan(&t.ID, &t.Kind, &t.Name, &t.Config,
|
||||
&t.WebhookSecret, &t.WebhookSigningSecret, &requireSig,
|
||||
&t.CreatedAt, &t.UpdatedAt, &count); err != nil {
|
||||
return nil, fmt.Errorf("scan trigger+count: %w", err)
|
||||
}
|
||||
t.WebhookRequireSignature = requireSig != 0
|
||||
out = append(out, TriggerWithBindingCount{Trigger: t, BindingCount: count})
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// UpdateTrigger updates the mutable fields of a trigger. Kind is
|
||||
// immutable post-create — changing kinds would invalidate every
|
||||
// binding's interpretation of binding_config.
|
||||
func (s *Store) UpdateTrigger(t Trigger) error {
|
||||
t.UpdatedAt = Now()
|
||||
if t.Config == "" {
|
||||
t.Config = "{}"
|
||||
}
|
||||
result, err := s.db.Exec(
|
||||
`UPDATE triggers SET name=?, config=?,
|
||||
webhook_secret=?, webhook_signing_secret=?, webhook_require_signature=?,
|
||||
updated_at=?
|
||||
WHERE id=?`,
|
||||
t.Name, t.Config,
|
||||
t.WebhookSecret, t.WebhookSigningSecret, BoolToInt(t.WebhookRequireSignature),
|
||||
t.UpdatedAt, t.ID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("update trigger: %w", translateSQLError(err))
|
||||
}
|
||||
n, _ := result.RowsAffected()
|
||||
if n == 0 {
|
||||
return fmt.Errorf("trigger %s: %w", t.ID, ErrNotFound)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteTrigger removes a trigger row. Bindings cascade away via FK.
|
||||
func (s *Store) DeleteTrigger(id string) error {
|
||||
result, err := s.db.Exec(`DELETE FROM triggers WHERE id = ?`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete trigger: %w", err)
|
||||
}
|
||||
n, _ := result.RowsAffected()
|
||||
if n == 0 {
|
||||
return fmt.Errorf("trigger %s: %w", id, ErrNotFound)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateTriggerWithBindingTx atomically creates a trigger row and
|
||||
// binds it to a workload. Used by the workload-side inline-create-and-
|
||||
// bind endpoint so a binding-insert failure does not leave an orphan
|
||||
// trigger row behind. Returns the persisted trigger and binding.
|
||||
func (s *Store) CreateTriggerWithBindingTx(t Trigger, b WorkloadTriggerBinding) (Trigger, WorkloadTriggerBinding, error) {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return Trigger{}, WorkloadTriggerBinding{}, fmt.Errorf("begin: %w", err)
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
|
||||
if t.ID == "" {
|
||||
t.ID = uuid.New().String()
|
||||
}
|
||||
if t.Config == "" {
|
||||
t.Config = "{}"
|
||||
}
|
||||
t.CreatedAt = Now()
|
||||
t.UpdatedAt = t.CreatedAt
|
||||
if _, err := tx.Exec(
|
||||
`INSERT INTO triggers (`+triggerColumns+`)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
t.ID, t.Kind, t.Name, t.Config,
|
||||
t.WebhookSecret, t.WebhookSigningSecret, BoolToInt(t.WebhookRequireSignature),
|
||||
t.CreatedAt, t.UpdatedAt,
|
||||
); err != nil {
|
||||
return Trigger{}, WorkloadTriggerBinding{}, fmt.Errorf("insert trigger: %w", translateSQLError(err))
|
||||
}
|
||||
|
||||
if b.ID == "" {
|
||||
b.ID = uuid.New().String()
|
||||
}
|
||||
if b.BindingConfig == "" {
|
||||
b.BindingConfig = "{}"
|
||||
}
|
||||
b.TriggerID = t.ID
|
||||
b.CreatedAt = t.CreatedAt
|
||||
b.UpdatedAt = t.UpdatedAt
|
||||
if _, err := tx.Exec(
|
||||
`INSERT INTO workload_trigger_bindings (`+bindingColumns+`)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
b.ID, b.WorkloadID, b.TriggerID, b.BindingConfig,
|
||||
BoolToInt(b.Enabled), b.SortOrder, b.CreatedAt, b.UpdatedAt,
|
||||
); err != nil {
|
||||
return Trigger{}, WorkloadTriggerBinding{}, fmt.Errorf("insert binding: %w", translateSQLError(err))
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return Trigger{}, WorkloadTriggerBinding{}, fmt.Errorf("commit: %w", err)
|
||||
}
|
||||
return t, b, nil
|
||||
}
|
||||
|
||||
// SetTriggerWebhookSecret rotates the inbound webhook URL secret. Pass
|
||||
// empty string to disable webhook ingress for this trigger.
|
||||
func (s *Store) SetTriggerWebhookSecret(id, secret string) error {
|
||||
result, err := s.db.Exec(
|
||||
`UPDATE triggers SET webhook_secret=?, updated_at=? WHERE id=?`,
|
||||
secret, Now(), id,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("update trigger webhook_secret: %w", err)
|
||||
}
|
||||
n, _ := result.RowsAffected()
|
||||
if n == 0 {
|
||||
return fmt.Errorf("trigger %s: %w", id, ErrNotFound)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnsureTriggerWebhookSecret returns the current secret, generating one
|
||||
// lazily for triggers that have none. Mirrors the workload helper.
|
||||
func (s *Store) EnsureTriggerWebhookSecret(id string) (string, error) {
|
||||
t, err := s.GetTriggerByID(id)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if t.WebhookSecret != "" {
|
||||
return t.WebhookSecret, nil
|
||||
}
|
||||
secret := generateWebhookSecret()
|
||||
if err := s.SetTriggerWebhookSecret(id, secret); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return secret, nil
|
||||
}
|
||||
@@ -0,0 +1,270 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const bindingColumns = `id, workload_id, trigger_id, binding_config,
|
||||
enabled, sort_order, created_at, updated_at`
|
||||
|
||||
func scanBinding(s rowScanner) (WorkloadTriggerBinding, error) {
|
||||
var b WorkloadTriggerBinding
|
||||
var enabled int
|
||||
if err := s.Scan(&b.ID, &b.WorkloadID, &b.TriggerID, &b.BindingConfig,
|
||||
&enabled, &b.SortOrder, &b.CreatedAt, &b.UpdatedAt); err != nil {
|
||||
return WorkloadTriggerBinding{}, err
|
||||
}
|
||||
b.Enabled = enabled != 0
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// CreateBinding inserts a binding row. The (workload_id, trigger_id) pair
|
||||
// must be unique — re-binding an existing pair is an UpdateBinding call,
|
||||
// not an insert.
|
||||
func (s *Store) CreateBinding(b WorkloadTriggerBinding) (WorkloadTriggerBinding, error) {
|
||||
if b.ID == "" {
|
||||
b.ID = uuid.New().String()
|
||||
}
|
||||
if b.BindingConfig == "" {
|
||||
b.BindingConfig = "{}"
|
||||
}
|
||||
b.CreatedAt = Now()
|
||||
b.UpdatedAt = b.CreatedAt
|
||||
_, err := s.db.Exec(
|
||||
`INSERT INTO workload_trigger_bindings (`+bindingColumns+`)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
b.ID, b.WorkloadID, b.TriggerID, b.BindingConfig,
|
||||
BoolToInt(b.Enabled), b.SortOrder, b.CreatedAt, b.UpdatedAt,
|
||||
)
|
||||
if err != nil {
|
||||
return WorkloadTriggerBinding{}, fmt.Errorf("insert binding: %w", translateSQLError(err))
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// GetBindingByID returns one binding by its primary key.
|
||||
func (s *Store) GetBindingByID(id string) (WorkloadTriggerBinding, error) {
|
||||
b, err := scanBinding(s.db.QueryRow(
|
||||
`SELECT `+bindingColumns+` FROM workload_trigger_bindings WHERE id = ?`, id,
|
||||
))
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return WorkloadTriggerBinding{}, fmt.Errorf("binding %s: %w", id, ErrNotFound)
|
||||
}
|
||||
if err != nil {
|
||||
return WorkloadTriggerBinding{}, fmt.Errorf("query binding: %w", err)
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// ListBindingsForWorkload returns every trigger bound to a workload,
|
||||
// ordered by sort_order then created_at for stable display.
|
||||
func (s *Store) ListBindingsForWorkload(workloadID string) ([]WorkloadTriggerBinding, error) {
|
||||
rows, err := s.db.Query(
|
||||
`SELECT `+bindingColumns+` FROM workload_trigger_bindings
|
||||
WHERE workload_id = ? ORDER BY sort_order, created_at`,
|
||||
workloadID,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query bindings for workload: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := []WorkloadTriggerBinding{}
|
||||
for rows.Next() {
|
||||
b, err := scanBinding(rows)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scan binding: %w", err)
|
||||
}
|
||||
out = append(out, b)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// ListBindingsForTrigger returns every workload bound to a trigger,
|
||||
// ordered by sort_order. Used by the webhook fan-out path.
|
||||
func (s *Store) ListBindingsForTrigger(triggerID string) ([]WorkloadTriggerBinding, error) {
|
||||
rows, err := s.db.Query(
|
||||
`SELECT `+bindingColumns+` FROM workload_trigger_bindings
|
||||
WHERE trigger_id = ? ORDER BY sort_order, created_at`,
|
||||
triggerID,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query bindings for trigger: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := []WorkloadTriggerBinding{}
|
||||
for rows.Next() {
|
||||
b, err := scanBinding(rows)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scan binding: %w", err)
|
||||
}
|
||||
out = append(out, b)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// GetBindingByPair returns the binding for an exact (workload, trigger)
|
||||
// pair. ErrNotFound when missing.
|
||||
func (s *Store) GetBindingByPair(workloadID, triggerID string) (WorkloadTriggerBinding, error) {
|
||||
b, err := scanBinding(s.db.QueryRow(
|
||||
`SELECT `+bindingColumns+` FROM workload_trigger_bindings
|
||||
WHERE workload_id = ? AND trigger_id = ?`,
|
||||
workloadID, triggerID,
|
||||
))
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return WorkloadTriggerBinding{}, ErrNotFound
|
||||
}
|
||||
if err != nil {
|
||||
return WorkloadTriggerBinding{}, fmt.Errorf("query binding by pair: %w", err)
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// UpdateBinding updates the mutable fields of a binding (binding_config,
|
||||
// enabled, sort_order). The (workload_id, trigger_id) pair is immutable
|
||||
// — to re-target, delete and re-insert.
|
||||
func (s *Store) UpdateBinding(b WorkloadTriggerBinding) error {
|
||||
b.UpdatedAt = Now()
|
||||
if b.BindingConfig == "" {
|
||||
b.BindingConfig = "{}"
|
||||
}
|
||||
result, err := s.db.Exec(
|
||||
`UPDATE workload_trigger_bindings
|
||||
SET binding_config=?, enabled=?, sort_order=?, updated_at=?
|
||||
WHERE id=?`,
|
||||
b.BindingConfig, BoolToInt(b.Enabled), b.SortOrder, b.UpdatedAt, b.ID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("update binding: %w", err)
|
||||
}
|
||||
n, _ := result.RowsAffected()
|
||||
if n == 0 {
|
||||
return fmt.Errorf("binding %s: %w", b.ID, ErrNotFound)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteBinding removes a binding row.
|
||||
func (s *Store) DeleteBinding(id string) error {
|
||||
result, err := s.db.Exec(
|
||||
`DELETE FROM workload_trigger_bindings WHERE id = ?`, id,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete binding: %w", err)
|
||||
}
|
||||
n, _ := result.RowsAffected()
|
||||
if n == 0 {
|
||||
return fmt.Errorf("binding %s: %w", id, ErrNotFound)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteBindingsForWorkload removes every binding for a workload.
|
||||
// Idempotent — returns nil even if no rows existed.
|
||||
func (s *Store) DeleteBindingsForWorkload(workloadID string) error {
|
||||
_, err := s.db.Exec(
|
||||
`DELETE FROM workload_trigger_bindings WHERE workload_id = ?`,
|
||||
workloadID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete bindings for workload: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BindingWithNames carries a binding plus the human names of its
|
||||
// workload + trigger so the API listing endpoints render in one
|
||||
// round-trip instead of N+1 lookups.
|
||||
type BindingWithNames struct {
|
||||
WorkloadTriggerBinding
|
||||
WorkloadName string
|
||||
TriggerKind string
|
||||
TriggerName string
|
||||
}
|
||||
|
||||
// ListBindingsForTriggerWithNames is the join-aware variant of
|
||||
// ListBindingsForTrigger that also surfaces the workload's name.
|
||||
func (s *Store) ListBindingsForTriggerWithNames(triggerID string) ([]BindingWithNames, error) {
|
||||
rows, err := s.db.Query(
|
||||
`SELECT b.id, b.workload_id, b.trigger_id, b.binding_config,
|
||||
b.enabled, b.sort_order, b.created_at, b.updated_at,
|
||||
COALESCE(w.name, '')
|
||||
FROM workload_trigger_bindings b
|
||||
LEFT JOIN workloads w ON w.id = b.workload_id
|
||||
WHERE b.trigger_id = ?
|
||||
ORDER BY b.sort_order, b.created_at`,
|
||||
triggerID,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query bindings+names for trigger: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
out := []BindingWithNames{}
|
||||
for rows.Next() {
|
||||
var b WorkloadTriggerBinding
|
||||
var enabled int
|
||||
var workloadName string
|
||||
if err := rows.Scan(&b.ID, &b.WorkloadID, &b.TriggerID, &b.BindingConfig,
|
||||
&enabled, &b.SortOrder, &b.CreatedAt, &b.UpdatedAt, &workloadName); err != nil {
|
||||
return nil, fmt.Errorf("scan binding+name: %w", err)
|
||||
}
|
||||
b.Enabled = enabled != 0
|
||||
out = append(out, BindingWithNames{WorkloadTriggerBinding: b, WorkloadName: workloadName})
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// ListBindingsForWorkloadWithNames is the join-aware variant of
|
||||
// ListBindingsForWorkload that also surfaces the trigger's kind + name.
|
||||
func (s *Store) ListBindingsForWorkloadWithNames(workloadID string) ([]BindingWithNames, error) {
|
||||
rows, err := s.db.Query(
|
||||
`SELECT b.id, b.workload_id, b.trigger_id, b.binding_config,
|
||||
b.enabled, b.sort_order, b.created_at, b.updated_at,
|
||||
COALESCE(t.kind, ''), COALESCE(t.name, '')
|
||||
FROM workload_trigger_bindings b
|
||||
LEFT JOIN triggers t ON t.id = b.trigger_id
|
||||
WHERE b.workload_id = ?
|
||||
ORDER BY b.sort_order, b.created_at`,
|
||||
workloadID,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query bindings+names for workload: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
out := []BindingWithNames{}
|
||||
for rows.Next() {
|
||||
var b WorkloadTriggerBinding
|
||||
var enabled int
|
||||
var kind, name string
|
||||
if err := rows.Scan(&b.ID, &b.WorkloadID, &b.TriggerID, &b.BindingConfig,
|
||||
&enabled, &b.SortOrder, &b.CreatedAt, &b.UpdatedAt, &kind, &name); err != nil {
|
||||
return nil, fmt.Errorf("scan binding+trigger names: %w", err)
|
||||
}
|
||||
b.Enabled = enabled != 0
|
||||
out = append(out, BindingWithNames{
|
||||
WorkloadTriggerBinding: b,
|
||||
TriggerKind: kind,
|
||||
TriggerName: name,
|
||||
})
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// CountBindingsForTrigger returns the number of bindings a trigger has.
|
||||
// Used by the UI to decide whether deleting a trigger is safe.
|
||||
func (s *Store) CountBindingsForTrigger(triggerID string) (int, error) {
|
||||
var n int
|
||||
err := s.db.QueryRow(
|
||||
`SELECT COUNT(*) FROM workload_trigger_bindings WHERE trigger_id = ?`,
|
||||
triggerID,
|
||||
).Scan(&n)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("count bindings for trigger: %w", err)
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
+7
-167
@@ -13,7 +13,6 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
@@ -297,11 +296,16 @@ func (h *Handler) Drain() {
|
||||
//
|
||||
// POST /{secret} — per-project deploy trigger (legacy)
|
||||
// POST /sites/{secret} — per-site sync trigger (legacy)
|
||||
// POST /workloads/{secret} — plugin-native workload trigger
|
||||
// POST /triggers/{secret} — first-class trigger fan-out to all bound workloads
|
||||
//
|
||||
// The legacy POST /workloads/{secret} route was dropped in the
|
||||
// trigger-split refactor. Existing inbound webhook secrets were lifted
|
||||
// into trigger rows by the boot backfill, so the same secret value
|
||||
// works at /triggers/{secret} after the upgrade.
|
||||
func (h *Handler) Route() chi.Router {
|
||||
r := chi.NewRouter()
|
||||
r.Post("/sites/{secret}", h.handleSiteWebhook)
|
||||
r.Post("/workloads/{secret}", h.handlePluginWorkloadWebhook)
|
||||
r.Post("/triggers/{secret}", h.handleTriggerWebhook)
|
||||
r.Post("/{secret}", h.handleWebhook)
|
||||
return r
|
||||
}
|
||||
@@ -675,170 +679,6 @@ func (h *Handler) handleSiteWebhook(w http.ResponseWriter, r *http.Request) {
|
||||
})
|
||||
}
|
||||
|
||||
// handlePluginWorkloadWebhook processes an inbound webhook for a
|
||||
// plugin-native workload.
|
||||
//
|
||||
// URL: POST /api/webhook/workloads/{secret}
|
||||
//
|
||||
// The secret resolves to exactly one workload row whose Source +
|
||||
// Trigger kinds determine how the payload is interpreted. The body
|
||||
// shape is the same as the legacy project/site webhooks (Image for
|
||||
// registry pushes, Ref for git pushes) — Gitea / GitHub / generic
|
||||
// registry CIs can target this URL without payload changes. The
|
||||
// workload's configured Trigger plugin then decides whether the event
|
||||
// fires a deploy.
|
||||
func (h *Handler) handlePluginWorkloadWebhook(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
delivery := store.WebhookDelivery{
|
||||
TargetType: "workload",
|
||||
SourceIP: clientIP(r),
|
||||
SignatureState: sigStateUnconfigured,
|
||||
StatusCode: http.StatusOK,
|
||||
Outcome: outcomeSkip,
|
||||
}
|
||||
defer func() { h.recordDelivery(delivery) }()
|
||||
|
||||
if h.plugins == nil {
|
||||
delivery.StatusCode = http.StatusServiceUnavailable
|
||||
delivery.Outcome = outcomeError
|
||||
delivery.Detail = "plugin dispatcher not wired"
|
||||
respondWebhookError(w, http.StatusServiceUnavailable, "plugin dispatcher not wired")
|
||||
return
|
||||
}
|
||||
|
||||
secret := chi.URLParam(r, "secret")
|
||||
if secret == "" {
|
||||
delivery.StatusCode = http.StatusNotFound
|
||||
delivery.Outcome = outcomeNotFound
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
wl, err := h.store.GetWorkloadByWebhookSecret(secret)
|
||||
if err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
delivery.StatusCode = http.StatusNotFound
|
||||
delivery.Outcome = outcomeNotFound
|
||||
delivery.Detail = "unknown webhook secret"
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
slog.Error("webhook: workload lookup failed", "error", err)
|
||||
delivery.StatusCode = http.StatusNotFound
|
||||
delivery.Outcome = outcomeError
|
||||
delivery.Detail = "lookup failed"
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
if wl.SourceKind == "" || wl.TriggerKind == "" {
|
||||
// Legacy workload row whose secret happens to also be valid on the
|
||||
// legacy path. Tell the caller they hit the wrong route rather
|
||||
// than silently 404-ing — avoids head-scratching.
|
||||
delivery.StatusCode = http.StatusBadRequest
|
||||
delivery.Outcome = outcomeBadRequest
|
||||
delivery.Detail = "workload is legacy; use the project or site route"
|
||||
respondWebhookError(w, http.StatusBadRequest, "workload is not plugin-native")
|
||||
return
|
||||
}
|
||||
delivery.TargetID = wl.ID
|
||||
delivery.TargetName = wl.Name
|
||||
|
||||
body, err := io.ReadAll(io.LimitReader(r.Body, maxWebhookBodyBytes))
|
||||
if err != nil {
|
||||
delivery.StatusCode = http.StatusBadRequest
|
||||
delivery.Outcome = outcomeBadRequest
|
||||
delivery.Detail = "failed to read request body"
|
||||
respondWebhookError(w, http.StatusBadRequest, "failed to read request body")
|
||||
return
|
||||
}
|
||||
delivery.BodySize = len(body)
|
||||
|
||||
header := r.Header.Get(signatureHeader)
|
||||
verified, attempted := verifyHMAC(wl.WebhookSigningSecret, body, header)
|
||||
delivery.SignatureState = signatureStateFor(wl.WebhookSigningSecret, header, verified, attempted)
|
||||
if wl.WebhookRequireSignature && !verified {
|
||||
slog.Warn("webhook: workload signature required but invalid/missing", "workload", wl.Name)
|
||||
delivery.StatusCode = http.StatusUnauthorized
|
||||
delivery.Outcome = outcomeRejected
|
||||
delivery.Detail = "invalid or missing signature"
|
||||
respondWebhookError(w, http.StatusUnauthorized, "invalid or missing signature")
|
||||
return
|
||||
}
|
||||
if attempted && !verified {
|
||||
slog.Warn("webhook: workload bad signature", "workload", wl.Name)
|
||||
delivery.StatusCode = http.StatusUnauthorized
|
||||
delivery.Outcome = outcomeRejected
|
||||
delivery.Detail = "invalid signature"
|
||||
respondWebhookError(w, http.StatusUnauthorized, "invalid signature")
|
||||
return
|
||||
}
|
||||
|
||||
evt, err := buildInboundEvent(body, r.Header)
|
||||
if err != nil {
|
||||
delivery.StatusCode = http.StatusBadRequest
|
||||
delivery.Outcome = outcomeBadRequest
|
||||
delivery.Detail = err.Error()
|
||||
respondWebhookError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
trig, err := plugin.GetTrigger(wl.TriggerKind)
|
||||
if err != nil {
|
||||
slog.Warn("webhook: trigger plugin not registered",
|
||||
"workload", wl.Name, "trigger", wl.TriggerKind, "error", err)
|
||||
delivery.StatusCode = http.StatusInternalServerError
|
||||
delivery.Outcome = outcomeError
|
||||
delivery.Detail = "trigger plugin missing"
|
||||
respondWebhookError(w, http.StatusInternalServerError, "trigger plugin missing")
|
||||
return
|
||||
}
|
||||
|
||||
pwl := toPluginWorkload(wl)
|
||||
intent, err := trig.Match(ctx, h.plugins.PluginDeps(), pwl, evt)
|
||||
if err != nil {
|
||||
slog.Warn("webhook: trigger match error",
|
||||
"workload", wl.Name, "trigger", wl.TriggerKind, "error", err)
|
||||
delivery.StatusCode = http.StatusInternalServerError
|
||||
delivery.Outcome = outcomeError
|
||||
delivery.Detail = "trigger match error"
|
||||
respondWebhookError(w, http.StatusInternalServerError, "trigger match error")
|
||||
return
|
||||
}
|
||||
if intent == nil {
|
||||
delivery.Detail = "trigger declined (no match)"
|
||||
respondWebhookJSON(w, http.StatusOK, map[string]any{
|
||||
"success": true, "deploy": false, "workload": wl.Name,
|
||||
"reason": "trigger declined",
|
||||
})
|
||||
return
|
||||
}
|
||||
if intent.TriggeredAt.IsZero() {
|
||||
intent.TriggeredAt = time.Now().UTC()
|
||||
}
|
||||
if intent.TriggeredBy == "" {
|
||||
intent.TriggeredBy = "webhook"
|
||||
}
|
||||
|
||||
if err := h.plugins.DispatchPlugin(ctx, pwl, *intent); err != nil {
|
||||
slog.Warn("webhook: plugin dispatch failed",
|
||||
"workload", wl.Name, "error", err)
|
||||
delivery.StatusCode = http.StatusInternalServerError
|
||||
delivery.Outcome = outcomeError
|
||||
delivery.Detail = "dispatch failed; see server logs"
|
||||
respondWebhookError(w, http.StatusInternalServerError, "dispatch failed; see server logs")
|
||||
return
|
||||
}
|
||||
delivery.Outcome = outcomeDeploy
|
||||
delivery.Detail = fmt.Sprintf("reason=%s ref=%s", intent.Reason, intent.Reference)
|
||||
slog.Info("webhook: triggered plugin deploy",
|
||||
"workload", wl.Name, "trigger", wl.TriggerKind, "reason", intent.Reason)
|
||||
respondWebhookJSON(w, http.StatusOK, map[string]any{
|
||||
"success": true, "deploy": true,
|
||||
"workload": wl.Name, "reference": intent.Reference,
|
||||
})
|
||||
}
|
||||
|
||||
// buildInboundEvent normalizes the incoming webhook body into the
|
||||
// plugin.InboundEvent shape. The dispatch order is:
|
||||
//
|
||||
|
||||
@@ -0,0 +1,288 @@
|
||||
package webhook
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"github.com/alexei/tinyforge/internal/store"
|
||||
"github.com/alexei/tinyforge/internal/workload/plugin"
|
||||
)
|
||||
|
||||
// maxTriggerFanOutConcurrency caps how many bindings dispatch in
|
||||
// parallel for a single trigger webhook. Sequential fan-out would hold
|
||||
// the request goroutine for the sum of every binding's deploy time —
|
||||
// minutes for an N-binding trigger. Bounding to 4 keeps wall-clock
|
||||
// roughly N/4 * deploy_time without saturating the docker daemon (which
|
||||
// already serializes pulls).
|
||||
const maxTriggerFanOutConcurrency = 4
|
||||
|
||||
// bindingResult is the per-binding entry in the trigger fan-out
|
||||
// response body.
|
||||
type bindingResult struct {
|
||||
Workload string `json:"workload"`
|
||||
Deployed bool `json:"deployed"`
|
||||
Reason string `json:"reason,omitempty"`
|
||||
}
|
||||
|
||||
// handleTriggerWebhook processes an inbound webhook for a first-class
|
||||
// Trigger record. The secret resolves to one Trigger; the Trigger then
|
||||
// fans out to every enabled workload binding. Each binding gets its
|
||||
// effective config (trigger.config + binding.binding_config merged) and
|
||||
// runs through the trigger plugin's Match independently — one binding
|
||||
// firing does not affect another.
|
||||
//
|
||||
// URL: POST /api/webhook/triggers/{secret}
|
||||
//
|
||||
// Response shape: aggregate counts so a CI can tell at a glance whether
|
||||
// any deploys fired (status 200 + deploys=N) without parsing per-binding
|
||||
// detail. Errors per-binding are logged at warn level but do not fail
|
||||
// the whole request — one broken workload should not block the others.
|
||||
func (h *Handler) handleTriggerWebhook(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
delivery := store.WebhookDelivery{
|
||||
TargetType: "trigger",
|
||||
SourceIP: clientIP(r),
|
||||
SignatureState: sigStateUnconfigured,
|
||||
StatusCode: http.StatusOK,
|
||||
Outcome: outcomeSkip,
|
||||
}
|
||||
defer func() { h.recordDelivery(delivery) }()
|
||||
|
||||
if h.plugins == nil {
|
||||
delivery.StatusCode = http.StatusServiceUnavailable
|
||||
delivery.Outcome = outcomeError
|
||||
delivery.Detail = "plugin dispatcher not wired"
|
||||
respondWebhookError(w, http.StatusServiceUnavailable, "plugin dispatcher not wired")
|
||||
return
|
||||
}
|
||||
|
||||
secret := chi.URLParam(r, "secret")
|
||||
if secret == "" {
|
||||
delivery.StatusCode = http.StatusNotFound
|
||||
delivery.Outcome = outcomeNotFound
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
trg, err := h.store.GetTriggerByWebhookSecret(secret)
|
||||
if err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
delivery.StatusCode = http.StatusNotFound
|
||||
delivery.Outcome = outcomeNotFound
|
||||
delivery.Detail = "unknown webhook secret"
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
slog.Error("webhook: trigger lookup failed", "error", err)
|
||||
delivery.StatusCode = http.StatusNotFound
|
||||
delivery.Outcome = outcomeError
|
||||
delivery.Detail = "lookup failed"
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
delivery.TargetID = trg.ID
|
||||
delivery.TargetName = trg.Name
|
||||
|
||||
body, err := io.ReadAll(io.LimitReader(r.Body, maxWebhookBodyBytes))
|
||||
if err != nil {
|
||||
delivery.StatusCode = http.StatusBadRequest
|
||||
delivery.Outcome = outcomeBadRequest
|
||||
delivery.Detail = "failed to read request body"
|
||||
respondWebhookError(w, http.StatusBadRequest, "failed to read request body")
|
||||
return
|
||||
}
|
||||
delivery.BodySize = len(body)
|
||||
|
||||
header := r.Header.Get(signatureHeader)
|
||||
verified, attempted := verifyHMAC(trg.WebhookSigningSecret, body, header)
|
||||
delivery.SignatureState = signatureStateFor(trg.WebhookSigningSecret, header, verified, attempted)
|
||||
if trg.WebhookRequireSignature && !verified {
|
||||
slog.Warn("webhook: trigger signature required but invalid/missing", "trigger", trg.Name)
|
||||
delivery.StatusCode = http.StatusUnauthorized
|
||||
delivery.Outcome = outcomeRejected
|
||||
delivery.Detail = "invalid or missing signature"
|
||||
respondWebhookError(w, http.StatusUnauthorized, "invalid or missing signature")
|
||||
return
|
||||
}
|
||||
if attempted && !verified {
|
||||
slog.Warn("webhook: trigger bad signature", "trigger", trg.Name)
|
||||
delivery.StatusCode = http.StatusUnauthorized
|
||||
delivery.Outcome = outcomeRejected
|
||||
delivery.Detail = "invalid signature"
|
||||
respondWebhookError(w, http.StatusUnauthorized, "invalid signature")
|
||||
return
|
||||
}
|
||||
|
||||
evt, err := buildInboundEvent(body, r.Header)
|
||||
if err != nil {
|
||||
delivery.StatusCode = http.StatusBadRequest
|
||||
delivery.Outcome = outcomeBadRequest
|
||||
delivery.Detail = err.Error()
|
||||
respondWebhookError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
trigPlugin, err := plugin.GetTrigger(trg.Kind)
|
||||
if err != nil {
|
||||
slog.Warn("webhook: trigger plugin not registered",
|
||||
"trigger", trg.Name, "kind", trg.Kind, "error", err)
|
||||
delivery.StatusCode = http.StatusInternalServerError
|
||||
delivery.Outcome = outcomeError
|
||||
delivery.Detail = "trigger plugin missing"
|
||||
respondWebhookError(w, http.StatusInternalServerError, "trigger plugin missing")
|
||||
return
|
||||
}
|
||||
|
||||
bindings, err := h.store.ListBindingsForTrigger(trg.ID)
|
||||
if err != nil {
|
||||
slog.Error("webhook: list bindings failed", "trigger", trg.Name, "error", err)
|
||||
delivery.StatusCode = http.StatusInternalServerError
|
||||
delivery.Outcome = outcomeError
|
||||
delivery.Detail = "list bindings failed"
|
||||
respondWebhookError(w, http.StatusInternalServerError, "list bindings failed")
|
||||
return
|
||||
}
|
||||
|
||||
results := h.fanOutBindings(ctx, trg, trigPlugin, bindings, evt)
|
||||
var deployed, skipped, noMatch, errored int
|
||||
for _, r := range results {
|
||||
switch {
|
||||
case r.Deployed:
|
||||
deployed++
|
||||
case r.Reason == "binding disabled":
|
||||
skipped++
|
||||
case r.Reason == "no match":
|
||||
noMatch++
|
||||
default:
|
||||
errored++
|
||||
}
|
||||
}
|
||||
|
||||
switch {
|
||||
case deployed > 0:
|
||||
delivery.Outcome = outcomeDeploy
|
||||
delivery.Detail = fmt.Sprintf("deployed=%d of %d (errored=%d, skipped=%d)",
|
||||
deployed, len(results), errored, skipped)
|
||||
case errored > 0:
|
||||
delivery.Outcome = outcomeError
|
||||
delivery.Detail = fmt.Sprintf("errored=%d of %d", errored, len(results))
|
||||
case skipped == len(results):
|
||||
delivery.Detail = "all bindings disabled"
|
||||
case noMatch == len(results)-skipped:
|
||||
delivery.Detail = "no binding matched"
|
||||
default:
|
||||
delivery.Detail = fmt.Sprintf("matched=0 skipped=%d errored=%d", skipped, errored)
|
||||
}
|
||||
respondWebhookJSON(w, http.StatusOK, map[string]any{
|
||||
"success": true,
|
||||
"trigger": trg.Name,
|
||||
"deployed": deployed,
|
||||
"bindings": results,
|
||||
})
|
||||
}
|
||||
|
||||
// fanOutBindings dispatches every binding through fireBinding with at
|
||||
// most maxTriggerFanOutConcurrency goroutines in flight. Order of the
|
||||
// returned slice matches the input bindings slice so callers can rely
|
||||
// on positional correlation.
|
||||
//
|
||||
// Disabled bindings short-circuit on the orchestrator goroutine — they
|
||||
// don't take a worker slot, leaving the pool free for real dispatches.
|
||||
// Workload-missing rows are recorded as errors and also skip the pool.
|
||||
func (h *Handler) fanOutBindings(
|
||||
ctx context.Context,
|
||||
trg store.Trigger,
|
||||
trigPlugin plugin.Trigger,
|
||||
bindings []store.WorkloadTriggerBinding,
|
||||
evt plugin.InboundEvent,
|
||||
) []bindingResult {
|
||||
results := make([]bindingResult, len(bindings))
|
||||
concurrency := maxTriggerFanOutConcurrency
|
||||
if len(bindings) < concurrency {
|
||||
concurrency = len(bindings)
|
||||
}
|
||||
if concurrency < 1 {
|
||||
concurrency = 1
|
||||
}
|
||||
sem := make(chan struct{}, concurrency)
|
||||
var wg sync.WaitGroup
|
||||
for i, b := range bindings {
|
||||
if !b.Enabled {
|
||||
results[i] = bindingResult{Workload: b.WorkloadID, Deployed: false, Reason: "binding disabled"}
|
||||
continue
|
||||
}
|
||||
row, lookupErr := h.store.GetWorkloadByID(b.WorkloadID)
|
||||
if lookupErr != nil {
|
||||
slog.Warn("webhook: bound workload missing",
|
||||
"trigger", trg.Name, "workload", b.WorkloadID, "error", lookupErr)
|
||||
results[i] = bindingResult{Workload: b.WorkloadID, Deployed: false, Reason: "workload missing"}
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
sem <- struct{}{}
|
||||
go func(idx int, binding store.WorkloadTriggerBinding, wl store.Workload) {
|
||||
defer wg.Done()
|
||||
defer func() { <-sem }()
|
||||
fired, reason := h.fireBinding(ctx, trg, trigPlugin, wl, binding, evt)
|
||||
results[idx] = bindingResult{Workload: wl.Name, Deployed: fired, Reason: reason}
|
||||
}(i, b, row)
|
||||
}
|
||||
wg.Wait()
|
||||
return results
|
||||
}
|
||||
|
||||
// fireBinding runs Match for one binding and dispatches if intent.
|
||||
// Returns (fired, human-readable reason). Errors are logged but the
|
||||
// reason is kept generic on the wire so a malformed binding does not
|
||||
// leak internals.
|
||||
func (h *Handler) fireBinding(
|
||||
ctx context.Context,
|
||||
trg store.Trigger,
|
||||
trigPlugin plugin.Trigger,
|
||||
row store.Workload,
|
||||
b store.WorkloadTriggerBinding,
|
||||
evt plugin.InboundEvent,
|
||||
) (bool, string) {
|
||||
pwl := toPluginWorkload(row)
|
||||
pwl, err := plugin.WithEffectiveTrigger(pwl, trg.Kind,
|
||||
json.RawMessage(trg.Config), json.RawMessage(b.BindingConfig))
|
||||
if err != nil {
|
||||
slog.Warn("webhook: merge effective trigger config failed",
|
||||
"trigger", trg.Name, "workload", row.Name, "error", err)
|
||||
return false, "config merge error"
|
||||
}
|
||||
intent, err := trigPlugin.Match(ctx, h.plugins.PluginDeps(), pwl, evt)
|
||||
if err != nil {
|
||||
slog.Warn("webhook: trigger match error",
|
||||
"trigger", trg.Name, "workload", row.Name, "error", err)
|
||||
return false, "match error"
|
||||
}
|
||||
if intent == nil {
|
||||
return false, "no match"
|
||||
}
|
||||
if intent.TriggeredAt.IsZero() {
|
||||
intent.TriggeredAt = time.Now().UTC()
|
||||
}
|
||||
if intent.TriggeredBy == "" {
|
||||
intent.TriggeredBy = "trigger-webhook"
|
||||
}
|
||||
if err := h.plugins.DispatchPlugin(ctx, pwl, *intent); err != nil {
|
||||
slog.Warn("webhook: dispatch failed",
|
||||
"trigger", trg.Name, "workload", row.Name, "error", err)
|
||||
return false, "dispatch failed"
|
||||
}
|
||||
slog.Info("webhook: triggered deploy via trigger fan-out",
|
||||
"trigger", trg.Name, "workload", row.Name, "reason", intent.Reason)
|
||||
return true, intent.Reason
|
||||
}
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// MergeJSONConfig merges override on top of base at the top-level
|
||||
// (binding fields win, base fields fill the rest). Both inputs are
|
||||
// expected to be JSON objects ("{}" when empty); arrays or scalars are
|
||||
// rejected because trigger and binding configs are always objects.
|
||||
//
|
||||
// Always returns a freshly allocated slice so callers may freely mutate
|
||||
// the result without affecting base. The fast-path (override empty)
|
||||
// returns a defensive copy of base for the same reason.
|
||||
func MergeJSONConfig(base, override json.RawMessage) (json.RawMessage, error) {
|
||||
if len(override) == 0 || string(override) == "{}" {
|
||||
if len(base) == 0 {
|
||||
return json.RawMessage("{}"), nil
|
||||
}
|
||||
return append(json.RawMessage(nil), base...), nil
|
||||
}
|
||||
if len(base) == 0 || string(base) == "{}" {
|
||||
return append(json.RawMessage(nil), override...), nil
|
||||
}
|
||||
baseMap := map[string]json.RawMessage{}
|
||||
if err := json.Unmarshal(base, &baseMap); err != nil {
|
||||
return nil, fmt.Errorf("merge config: base is not a JSON object: %w", err)
|
||||
}
|
||||
overMap := map[string]json.RawMessage{}
|
||||
if err := json.Unmarshal(override, &overMap); err != nil {
|
||||
return nil, fmt.Errorf("merge config: override is not a JSON object: %w", err)
|
||||
}
|
||||
for k, v := range overMap {
|
||||
baseMap[k] = v
|
||||
}
|
||||
out, err := json.Marshal(baseMap)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("merge config: re-marshal: %w", err)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// WithEffectiveTrigger returns a copy of w with TriggerKind and
|
||||
// TriggerConfig set from a resolved (trigger, binding) pair. The
|
||||
// existing Trigger.Match contract reads w.TriggerConfig via
|
||||
// TriggerConfigOf[T], so this is the seam that lets the trigger plugins
|
||||
// stay unchanged after the trigger-split refactor.
|
||||
func WithEffectiveTrigger(w Workload, kind string, triggerConfig, bindingConfig json.RawMessage) (Workload, error) {
|
||||
merged, err := MergeJSONConfig(triggerConfig, bindingConfig)
|
||||
if err != nil {
|
||||
return Workload{}, err
|
||||
}
|
||||
w.TriggerKind = kind
|
||||
w.TriggerConfig = merged
|
||||
return w, nil
|
||||
}
|
||||
Reference in New Issue
Block a user