From db235c14120aa68ad48c2fed6a1bf53e9f22d777 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Sat, 9 May 2026 13:28:20 +0300 Subject: [PATCH] feat(workload): write-through workload sync + boot-time backfill MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CRUD on Project / Stack / StaticSite now keeps a paired Workload row in sync. Secret setters (webhook secret, signing secret, require-signature toggle, notification secret) all re-sync after mutating the source-of-truth row so the workload row always reflects the canonical state. Delete cascades: DeleteProject/Stack/StaticSite now drop the matching workload row plus any container index entries owned by it, so global views don't show ghost rows. Boot-time BackfillWorkloads scans every project/stack/site and ensures each has a workload row. Idempotent — safe to run on every restart, recovers from a deleted/missing workload row. Behavior unchanged for existing call sites; the workloads table just starts being populated. Deployer / reconciler / consumer switchover land in the next commit. --- cmd/server/main.go | 7 + internal/store/projects.go | 46 ++++++- internal/store/stacks.go | 14 +- internal/store/static_sites.go | 42 +++++- internal/store/workload_sync.go | 119 +++++++++++++++++ internal/store/workload_sync_test.go | 190 +++++++++++++++++++++++++++ internal/store/workloads.go | 10 +- 7 files changed, 410 insertions(+), 18 deletions(-) create mode 100644 internal/store/workload_sync.go create mode 100644 internal/store/workload_sync_test.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 139e6a3..8a38856 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -73,6 +73,13 @@ func main() { os.Exit(1) } + // Backfill workload rows for any project / stack / static site that + // predates the workload refactor. Idempotent — safe on every boot. + if err := db.BackfillWorkloads(); err != nil { + slog.Error("workload backfill", "error", err) + os.Exit(1) + } + // Ensure default admin user exists on first launch. if err := ensureDefaultAdmin(db); err != nil { slog.Error("ensure default admin", "error", err) diff --git a/internal/store/projects.go b/internal/store/projects.go index d89c21e..3d12d3e 100644 --- a/internal/store/projects.go +++ b/internal/store/projects.go @@ -79,6 +79,9 @@ func (s *Store) CreateProject(p Project) (Project, error) { if err != nil { return Project{}, fmt.Errorf("insert project: %w", err) } + if err := s.SyncProjectWorkload(p); err != nil { + return Project{}, fmt.Errorf("sync project workload: %w", err) + } return p, nil } @@ -173,6 +176,15 @@ func (s *Store) UpdateProject(p Project) error { if n == 0 { return fmt.Errorf("project %s: %w", p.ID, ErrNotFound) } + // Re-read so the workload sync sees the canonical row (e.g. webhook + // secrets that UpdateProject does not write but other call sites do). + current, err := s.GetProjectByID(p.ID) + if err != nil { + return fmt.Errorf("reread project for workload sync: %w", err) + } + if err := s.SyncProjectWorkload(current); err != nil { + return fmt.Errorf("sync project workload: %w", err) + } return nil } @@ -190,7 +202,11 @@ func (s *Store) SetProjectWebhookSecret(id, secret string) error { if n == 0 { return fmt.Errorf("project %s: %w", id, ErrNotFound) } - return nil + current, err := s.GetProjectByID(id) + if err != nil { + return fmt.Errorf("reread project for workload sync: %w", err) + } + return s.SyncProjectWorkload(current) } // SetProjectWebhookSigningSecret assigns the HMAC signing secret used to @@ -208,7 +224,11 @@ func (s *Store) SetProjectWebhookSigningSecret(id, secret string) error { if n == 0 { return fmt.Errorf("project %s: %w", id, ErrNotFound) } - return nil + current, err := s.GetProjectByID(id) + if err != nil { + return fmt.Errorf("reread project for workload sync: %w", err) + } + return s.SyncProjectWorkload(current) } // SetProjectWebhookRequireSignature toggles whether unsigned (or @@ -229,7 +249,11 @@ func (s *Store) SetProjectWebhookRequireSignature(id string, require bool) error if n == 0 { return fmt.Errorf("project %s: %w", id, ErrNotFound) } - return nil + current, err := s.GetProjectByID(id) + if err != nil { + return fmt.Errorf("reread project for workload sync: %w", err) + } + return s.SyncProjectWorkload(current) } // EnsureProjectWebhookSecret returns the current webhook secret for a project, @@ -265,7 +289,11 @@ func (s *Store) SetProjectNotificationSecret(id, secret string) error { if n == 0 { return fmt.Errorf("project %s: %w", id, ErrNotFound) } - return nil + current, err := s.GetProjectByID(id) + if err != nil { + return fmt.Errorf("reread project for workload sync: %w", err) + } + return s.SyncProjectWorkload(current) } // EnsureProjectNotificationSecret returns the current outgoing-webhook signing @@ -287,6 +315,8 @@ func (s *Store) EnsureProjectNotificationSecret(id string) (string, error) { } // DeleteProject removes a project by ID. Cascading deletes handle stages, instances, and deploys. +// Workload row + container index entries are removed too so the global views +// don't show ghost rows after a project is gone. func (s *Store) DeleteProject(id string) error { result, err := s.db.Exec(`DELETE FROM projects WHERE id = ?`, id) if err != nil { @@ -296,5 +326,13 @@ func (s *Store) DeleteProject(id string) error { if n == 0 { return fmt.Errorf("project %s: %w", id, ErrNotFound) } + if w, err := s.GetWorkloadByRef(WorkloadKindProject, id); err == nil { + if err := s.DeleteContainersByWorkload(w.ID); err != nil { + return fmt.Errorf("delete project containers: %w", err) + } + if err := s.DeleteWorkload(w.ID); err != nil { + return fmt.Errorf("delete project workload: %w", err) + } + } return nil } diff --git a/internal/store/stacks.go b/internal/store/stacks.go index eff23f2..deb106e 100644 --- a/internal/store/stacks.go +++ b/internal/store/stacks.go @@ -29,6 +29,9 @@ func (s *Store) CreateStack(st Stack) (Stack, error) { if err != nil { return Stack{}, fmt.Errorf("insert stack: %w", err) } + if err := s.SyncStackWorkload(st); err != nil { + return Stack{}, fmt.Errorf("sync stack workload: %w", err) + } return st, nil } @@ -79,7 +82,7 @@ func (s *Store) UpdateStack(st Stack) error { if n == 0 { return fmt.Errorf("stack %s: %w", st.ID, ErrNotFound) } - return nil + return s.SyncStackWorkload(st) } // UpdateStackStatus updates the deployment status + error fields. @@ -117,6 +120,7 @@ func (s *Store) SetStackCurrentRevision(id, revisionID string) error { } // DeleteStack removes a stack by ID. Cascading deletes handle revisions + deploys. +// Workload row + container index entries are removed too. func (s *Store) DeleteStack(id string) error { result, err := s.db.Exec(`DELETE FROM stacks WHERE id = ?`, id) if err != nil { @@ -126,6 +130,14 @@ func (s *Store) DeleteStack(id string) error { if n == 0 { return fmt.Errorf("stack %s: %w", id, ErrNotFound) } + if w, err := s.GetWorkloadByRef(WorkloadKindStack, id); err == nil { + if err := s.DeleteContainersByWorkload(w.ID); err != nil { + return fmt.Errorf("delete stack containers: %w", err) + } + if err := s.DeleteWorkload(w.ID); err != nil { + return fmt.Errorf("delete stack workload: %w", err) + } + } return nil } diff --git a/internal/store/static_sites.go b/internal/store/static_sites.go index eaca341..9404b36 100644 --- a/internal/store/static_sites.go +++ b/internal/store/static_sites.go @@ -45,6 +45,9 @@ func (s *Store) CreateStaticSite(site StaticSite) (StaticSite, error) { if err != nil { return StaticSite{}, fmt.Errorf("insert static site: %w", err) } + if err := s.SyncStaticSiteWorkload(site); err != nil { + return StaticSite{}, fmt.Errorf("sync static site workload: %w", err) + } return site, nil } @@ -131,7 +134,11 @@ func (s *Store) UpdateStaticSite(site StaticSite) error { if n == 0 { return fmt.Errorf("static site %s: %w", site.ID, ErrNotFound) } - return nil + current, err := s.GetStaticSiteByID(site.ID) + if err != nil { + return fmt.Errorf("reread static site for workload sync: %w", err) + } + return s.SyncStaticSiteWorkload(current) } // UpdateStaticSiteStatus updates the deployment status fields. @@ -214,6 +221,7 @@ func (s *Store) ListStaticSiteProxyRoutes(domain string) ([]ProxyRoute, error) { } // DeleteStaticSite removes a static site by ID. Cascading deletes handle secrets. +// Workload row + container index entries are removed too. func (s *Store) DeleteStaticSite(id string) error { result, err := s.db.Exec(`DELETE FROM static_sites WHERE id = ?`, id) if err != nil { @@ -223,6 +231,14 @@ func (s *Store) DeleteStaticSite(id string) error { if n == 0 { return fmt.Errorf("static site %s: %w", id, ErrNotFound) } + if w, err := s.GetWorkloadByRef(WorkloadKindSite, id); err == nil { + if err := s.DeleteContainersByWorkload(w.ID); err != nil { + return fmt.Errorf("delete static site containers: %w", err) + } + if err := s.DeleteWorkload(w.ID); err != nil { + return fmt.Errorf("delete static site workload: %w", err) + } + } return nil } @@ -286,7 +302,11 @@ func (s *Store) SetStaticSiteWebhookSigningSecret(id, secret string) error { if n == 0 { return fmt.Errorf("static site %s: %w", id, ErrNotFound) } - return nil + current, err := s.GetStaticSiteByID(id) + if err != nil { + return fmt.Errorf("reread static site for workload sync: %w", err) + } + return s.SyncStaticSiteWorkload(current) } // SetStaticSiteWebhookRequireSignature toggles whether unsigned (or @@ -307,7 +327,11 @@ func (s *Store) SetStaticSiteWebhookRequireSignature(id string, require bool) er if n == 0 { return fmt.Errorf("static site %s: %w", id, ErrNotFound) } - return nil + current, err := s.GetStaticSiteByID(id) + if err != nil { + return fmt.Errorf("reread static site for workload sync: %w", err) + } + return s.SyncStaticSiteWorkload(current) } // SetStaticSiteNotificationSecret rotates the static site's outgoing-webhook @@ -325,7 +349,11 @@ func (s *Store) SetStaticSiteNotificationSecret(id, secret string) error { if n == 0 { return fmt.Errorf("static site %s: %w", id, ErrNotFound) } - return nil + current, err := s.GetStaticSiteByID(id) + if err != nil { + return fmt.Errorf("reread static site for workload sync: %w", err) + } + return s.SyncStaticSiteWorkload(current) } // EnsureStaticSiteNotificationSecret returns the static site's outgoing-webhook @@ -394,7 +422,11 @@ func (s *Store) SetStaticSiteWebhookSecret(id, secret string) error { if n == 0 { return fmt.Errorf("static site %s: %w", id, ErrNotFound) } - return nil + current, err := s.GetStaticSiteByID(id) + if err != nil { + return fmt.Errorf("reread static site for workload sync: %w", err) + } + return s.SyncStaticSiteWorkload(current) } // EnsureStaticSiteWebhookSecret returns the current webhook secret for a site, diff --git a/internal/store/workload_sync.go b/internal/store/workload_sync.go new file mode 100644 index 0000000..b448cc4 --- /dev/null +++ b/internal/store/workload_sync.go @@ -0,0 +1,119 @@ +package store + +import "fmt" + +// SyncProjectWorkload upserts the Workload row paired with a project so that +// its name, notification config, and webhook secrets stay in sync. Called from +// CreateProject / UpdateProject / SetProject*Secret paths. Idempotent — safe +// to call when a workload row already exists for the (project, RefID) pair. +func (s *Store) SyncProjectWorkload(p Project) error { + existing, err := s.GetWorkloadByRef(WorkloadKindProject, p.ID) + if err == nil { + existing.Name = p.Name + existing.NotificationURL = p.NotificationURL + existing.NotificationSecret = p.NotificationSecret + existing.WebhookSecret = p.WebhookSecret + existing.WebhookSigningSecret = p.WebhookSigningSecret + existing.WebhookRequireSignature = p.WebhookRequireSignature + return s.UpdateWorkload(existing) + } + _, err = s.CreateWorkload(Workload{ + Kind: string(WorkloadKindProject), + RefID: p.ID, + Name: p.Name, + NotificationURL: p.NotificationURL, + NotificationSecret: p.NotificationSecret, + WebhookSecret: p.WebhookSecret, + WebhookSigningSecret: p.WebhookSigningSecret, + WebhookRequireSignature: p.WebhookRequireSignature, + }) + if err != nil { + return fmt.Errorf("create project workload: %w", err) + } + return nil +} + +// SyncStackWorkload upserts the Workload row paired with a stack. Stacks +// don't (yet) carry their own notification or webhook config — those fields +// stay empty on the workload row until the stack model gains them. +func (s *Store) SyncStackWorkload(st Stack) error { + existing, err := s.GetWorkloadByRef(WorkloadKindStack, st.ID) + if err == nil { + existing.Name = st.Name + return s.UpdateWorkload(existing) + } + _, err = s.CreateWorkload(Workload{ + Kind: string(WorkloadKindStack), + RefID: st.ID, + Name: st.Name, + }) + if err != nil { + return fmt.Errorf("create stack workload: %w", err) + } + return nil +} + +// SyncStaticSiteWorkload upserts the Workload row paired with a static site. +func (s *Store) SyncStaticSiteWorkload(site StaticSite) error { + existing, err := s.GetWorkloadByRef(WorkloadKindSite, site.ID) + if err == nil { + existing.Name = site.Name + existing.NotificationURL = site.NotificationURL + existing.NotificationSecret = site.NotificationSecret + existing.WebhookSecret = site.WebhookSecret + existing.WebhookSigningSecret = site.WebhookSigningSecret + existing.WebhookRequireSignature = site.WebhookRequireSignature + return s.UpdateWorkload(existing) + } + _, err = s.CreateWorkload(Workload{ + Kind: string(WorkloadKindSite), + RefID: site.ID, + Name: site.Name, + NotificationURL: site.NotificationURL, + NotificationSecret: site.NotificationSecret, + WebhookSecret: site.WebhookSecret, + WebhookSigningSecret: site.WebhookSigningSecret, + WebhookRequireSignature: site.WebhookRequireSignature, + }) + if err != nil { + return fmt.Errorf("create static site workload: %w", err) + } + return nil +} + +// BackfillWorkloads scans every project / stack / static_site row and ensures +// each has a matching workload row. Called once at boot before HTTP starts so +// any pre-Workload-refactor data is upgraded transparently. Idempotent. +func (s *Store) BackfillWorkloads() error { + projects, err := s.GetAllProjects() + if err != nil { + return fmt.Errorf("backfill: list projects: %w", err) + } + for _, p := range projects { + if err := s.SyncProjectWorkload(p); err != nil { + return fmt.Errorf("backfill project %s: %w", p.ID, err) + } + } + + stacks, err := s.GetAllStacks() + if err != nil { + return fmt.Errorf("backfill: list stacks: %w", err) + } + for _, st := range stacks { + if err := s.SyncStackWorkload(st); err != nil { + return fmt.Errorf("backfill stack %s: %w", st.ID, err) + } + } + + sites, err := s.GetAllStaticSites() + if err != nil { + return fmt.Errorf("backfill: list static sites: %w", err) + } + for _, site := range sites { + if err := s.SyncStaticSiteWorkload(site); err != nil { + return fmt.Errorf("backfill static site %s: %w", site.ID, err) + } + } + + return nil +} diff --git a/internal/store/workload_sync_test.go b/internal/store/workload_sync_test.go new file mode 100644 index 0000000..dd6b5a3 --- /dev/null +++ b/internal/store/workload_sync_test.go @@ -0,0 +1,190 @@ +package store + +import ( + "errors" + "testing" +) + +func TestCreateProjectAlsoCreatesWorkload(t *testing.T) { + s := newTestStore(t) + + p, err := s.CreateProject(Project{ + Name: "wf-project", Image: "nginx", Port: 80, Env: "{}", Volumes: "{}", + NotificationURL: "https://example.test/hook", + }) + if err != nil { + t.Fatalf("CreateProject: %v", err) + } + + w, err := s.GetWorkloadByRef(WorkloadKindProject, p.ID) + if err != nil { + t.Fatalf("workload should exist after CreateProject: %v", err) + } + if w.Name != "wf-project" { + t.Fatalf("workload name not synced: got %q", w.Name) + } + if w.WebhookSecret == "" { + t.Fatal("webhook secret should be carried into workload row") + } + if w.NotificationURL != "https://example.test/hook" { + t.Fatalf("notification url not synced: got %q", w.NotificationURL) + } +} + +func TestUpdateProjectSyncsWorkload(t *testing.T) { + s := newTestStore(t) + + p, _ := s.CreateProject(Project{ + Name: "before", Image: "i", Env: "{}", Volumes: "{}", + }) + + p.Name = "after" + p.NotificationURL = "https://new.test/hook" + if err := s.UpdateProject(p); err != nil { + t.Fatalf("UpdateProject: %v", err) + } + + w, _ := s.GetWorkloadByRef(WorkloadKindProject, p.ID) + if w.Name != "after" { + t.Fatalf("workload name not updated: got %q", w.Name) + } + if w.NotificationURL != "https://new.test/hook" { + t.Fatalf("workload notification url not updated: got %q", w.NotificationURL) + } +} + +func TestDeleteProjectCascadesWorkload(t *testing.T) { + s := newTestStore(t) + + p, _ := s.CreateProject(Project{Name: "doomed", Image: "i", Env: "{}", Volumes: "{}"}) + w, _ := s.GetWorkloadByRef(WorkloadKindProject, p.ID) + + // Add a container under this workload to verify cascade. + if _, err := s.CreateContainer(Container{ + WorkloadID: w.ID, WorkloadKind: "project", State: "running", + }); err != nil { + t.Fatalf("CreateContainer: %v", err) + } + + if err := s.DeleteProject(p.ID); err != nil { + t.Fatalf("DeleteProject: %v", err) + } + + if _, err := s.GetWorkloadByID(w.ID); !errors.Is(err, ErrNotFound) { + t.Fatalf("workload should be deleted, got %v", err) + } + containers, _ := s.ListContainersByWorkload(w.ID) + if len(containers) != 0 { + t.Fatalf("containers should be deleted, got %d", len(containers)) + } +} + +func TestSetProjectWebhookSecretSyncsWorkload(t *testing.T) { + s := newTestStore(t) + + p, _ := s.CreateProject(Project{Name: "n", Image: "i", Env: "{}", Volumes: "{}"}) + + newSecret := "new-secret-value-with-enough-entropy-1234" + if err := s.SetProjectWebhookSecret(p.ID, newSecret); err != nil { + t.Fatalf("SetProjectWebhookSecret: %v", err) + } + w, _ := s.GetWorkloadByRef(WorkloadKindProject, p.ID) + if w.WebhookSecret != newSecret { + t.Fatalf("workload webhook secret not synced: got %q", w.WebhookSecret) + } +} + +func TestCreateStackAlsoCreatesWorkload(t *testing.T) { + s := newTestStore(t) + + st, err := s.CreateStack(Stack{Name: "wf-stack", ComposeProjectName: "wf-stack"}) + if err != nil { + t.Fatalf("CreateStack: %v", err) + } + + w, err := s.GetWorkloadByRef(WorkloadKindStack, st.ID) + if err != nil { + t.Fatalf("workload should exist after CreateStack: %v", err) + } + if w.Name != "wf-stack" { + t.Fatalf("workload name not synced: got %q", w.Name) + } +} + +func TestUpdateStackSyncsWorkload(t *testing.T) { + s := newTestStore(t) + + st, _ := s.CreateStack(Stack{Name: "before", ComposeProjectName: "before-cp"}) + st.Name = "after" + if err := s.UpdateStack(st); err != nil { + t.Fatalf("UpdateStack: %v", err) + } + + w, _ := s.GetWorkloadByRef(WorkloadKindStack, st.ID) + if w.Name != "after" { + t.Fatalf("workload name not updated: got %q", w.Name) + } +} + +func TestDeleteStackCascadesWorkload(t *testing.T) { + s := newTestStore(t) + + st, _ := s.CreateStack(Stack{Name: "doomed-stack", ComposeProjectName: "doomed-cp"}) + w, _ := s.GetWorkloadByRef(WorkloadKindStack, st.ID) + + if err := s.DeleteStack(st.ID); err != nil { + t.Fatalf("DeleteStack: %v", err) + } + if _, err := s.GetWorkloadByID(w.ID); !errors.Is(err, ErrNotFound) { + t.Fatalf("workload should be deleted, got %v", err) + } +} + +func TestBackfillWorkloadsIdempotent(t *testing.T) { + s := newTestStore(t) + + // Create rows directly via the store (which already auto-syncs), then run + // the backfill twice — it must be a no-op the second time and not error. + p, _ := s.CreateProject(Project{Name: "p1", Image: "i", Env: "{}", Volumes: "{}"}) + st, _ := s.CreateStack(Stack{Name: "s1", ComposeProjectName: "s1-cp"}) + + if err := s.BackfillWorkloads(); err != nil { + t.Fatalf("first backfill: %v", err) + } + if err := s.BackfillWorkloads(); err != nil { + t.Fatalf("second backfill (should be idempotent): %v", err) + } + + all, _ := s.ListWorkloads("") + // Expect exactly 2: one project workload, one stack workload, no duplicates. + if len(all) != 2 { + t.Fatalf("expected 2 workloads after backfill, got %d", len(all)) + } + + // Confirm both refs are findable. + if _, err := s.GetWorkloadByRef(WorkloadKindProject, p.ID); err != nil { + t.Fatalf("project workload not found: %v", err) + } + if _, err := s.GetWorkloadByRef(WorkloadKindStack, st.ID); err != nil { + t.Fatalf("stack workload not found: %v", err) + } +} + +func TestBackfillRecoversFromMissingWorkloads(t *testing.T) { + s := newTestStore(t) + + p, _ := s.CreateProject(Project{Name: "p1", Image: "i", Env: "{}", Volumes: "{}"}) + + // Simulate the legacy state: a project exists but its workload row is gone + // (e.g. the rollout from before the refactor). Backfill must restore it. + w, _ := s.GetWorkloadByRef(WorkloadKindProject, p.ID) + _ = s.DeleteWorkload(w.ID) + + if err := s.BackfillWorkloads(); err != nil { + t.Fatalf("backfill: %v", err) + } + + if _, err := s.GetWorkloadByRef(WorkloadKindProject, p.ID); err != nil { + t.Fatalf("workload should be restored: %v", err) + } +} diff --git a/internal/store/workloads.go b/internal/store/workloads.go index ed2a27a..86e5212 100644 --- a/internal/store/workloads.go +++ b/internal/store/workloads.go @@ -41,7 +41,7 @@ func (s *Store) CreateWorkload(w Workload) (Workload, error) { VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, w.ID, w.Kind, w.RefID, w.Name, w.AppID, w.NotificationURL, w.NotificationSecret, - w.WebhookSecret, w.WebhookSigningSecret, boolToInt(w.WebhookRequireSignature), + w.WebhookSecret, w.WebhookSigningSecret, BoolToInt(w.WebhookRequireSignature), w.CreatedAt, w.UpdatedAt, ) if err != nil { @@ -142,7 +142,7 @@ func (s *Store) UpdateWorkload(w Workload) error { WHERE id=?`, w.Name, w.AppID, w.NotificationURL, w.NotificationSecret, - w.WebhookSecret, w.WebhookSigningSecret, boolToInt(w.WebhookRequireSignature), + w.WebhookSecret, w.WebhookSigningSecret, BoolToInt(w.WebhookRequireSignature), w.UpdatedAt, w.ID, ) if err != nil { @@ -184,9 +184,3 @@ func (s *Store) DeleteWorkloadByRef(kind WorkloadKind, refID string) error { return nil } -func boolToInt(b bool) int { - if b { - return 1 - } - return 0 -}