refactor(workload): finalize containers index + post-review hardening

Wraps up the workload refactor with the fixes that came out of the multi-agent
code review (see docs/plans/workload-refactor.md "What actually shipped").

Backend:
- store.ReconcileContainer: separate write path so the 30s reconciler tick no
  longer overwrites deployer-owned fields (subdomain, proxy_route_id,
  npm_proxy_id, image_tag).
- Container.stage_id column + index; ListProxyRoutes / ListContainersByStageID
  join via stage_id (survives stage rename), with legacy fallback to
  (project_id, role=stage_name).
- Reconciler: workload-existence check (rejects forged tinyforge.workload.id
  labels), skips inventing project-kind rows, child-context cancel before
  wg.Wait() on shutdown.
- Transactional CRUD across projects / stacks / static_sites: parent UPDATE
  and workload sync land in one transaction so secret rotations are durable.
- Webhook routing reads exclusively through workloads.webhook_secret; legacy
  GetProjectByWebhookSecret / GetStaticSiteByWebhookSecret fallback removed.
- store.GetStackByComposeProjectName + indexed lookup (no more full-table
  stack scan per compose container per tick).
- store.ListMissingSweepRows: filtered query for the missing-sweep.
- /api/instances/* handlers verify (workload_id, role) match URL
  (project_id, stage_name) before mutating — closes the cross-project
  hijack the security review flagged.
- extra_json no longer referenced from Go (column kept on disk for now).

Frontend:
- WorkloadContainers.svelte: generic detail-page panel reusable by stack and
  site detail pages.
- Containers page polish: client-side kind/state filters over an unfiltered
  fetch, URL-synced filters, race-safe loads via sequence number, EN+RU i18n,
  sidebar counter via navCounts.containers.

Misc:
- scripts/dev-server.sh: tolerate empty netstat grep result.
- .gitignore: ignore docker-watcher binaries, .claude/worktrees/, .facts-sync.json.
This commit is contained in:
2026-05-09 15:44:41 +03:00
parent d8ab22876f
commit cba2149aa9
30 changed files with 1227 additions and 509 deletions
+68 -20
View File
@@ -119,19 +119,14 @@ func (s *Server) deployInstance(w http.ResponseWriter, r *http.Request) {
// removeInstance handles DELETE /api/projects/{id}/stages/{stage}/instances/{iid}.
// {iid} is the container row ID (same UUID as the legacy instance ID).
// Verifies that the container belongs to the project + stage in the URL —
// without this check, a stale URL could delete an unrelated stack/site row.
func (s *Server) removeInstance(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "iid")
c, err := s.store.GetContainerByID(id)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
respondNotFound(w, "container")
return
}
slog.Error("failed to get container", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
c, ok := s.resolveAndAuthorizeInstance(w, r)
if !ok {
return
}
id := c.ID
// Remove the Docker container if it has one.
if c.ContainerID != "" {
@@ -171,19 +166,14 @@ func (s *Server) restartInstance(w http.ResponseWriter, r *http.Request) {
}
// controlInstance performs a stop/start/restart action on a container.
// The container's ownership of the URL-provided project + stage is verified
// before any Docker call — see resolveAndAuthorizeInstance for rationale.
func (s *Server) controlInstance(w http.ResponseWriter, r *http.Request, action string) {
id := chi.URLParam(r, "iid")
c, err := s.store.GetContainerByID(id)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
respondNotFound(w, "container")
return
}
slog.Error("failed to get container", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
c, ok := s.resolveAndAuthorizeInstance(w, r)
if !ok {
return
}
id := c.ID
if c.ContainerID == "" {
respondError(w, http.StatusBadRequest, "container row has no docker container bound")
@@ -231,3 +221,61 @@ type DeployTriggerer interface {
TriggerDeploy(ctx context.Context, projectID, stageID, imageTag string) error
AsyncTriggerDeploy(ctx context.Context, projectID, stageID, imageTag string) (string, error)
}
// resolveAndAuthorizeInstance loads the container row identified by {iid} and
// verifies it actually belongs to the project + stage in the URL path.
// Without this, a stale or hand-crafted URL like
//
// DELETE /api/projects/<projectA>/stages/<stageA>/instances/<rowOfStackB>
//
// would happily delete an unrelated stack/site container — admin-only doesn't
// excuse the cross-project bypass. Returns the container on success or
// nothing (with the response already written) on failure.
func (s *Server) resolveAndAuthorizeInstance(w http.ResponseWriter, r *http.Request) (store.Container, bool) {
projectID := chi.URLParam(r, "id")
stageName := ""
if stageID := chi.URLParam(r, "stage"); stageID != "" {
st, err := s.store.GetStageByID(stageID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
respondNotFound(w, "stage")
return store.Container{}, false
}
slog.Error("failed to get stage", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
return store.Container{}, false
}
if st.ProjectID != projectID {
respondNotFound(w, "stage")
return store.Container{}, false
}
stageName = st.Name
}
id := chi.URLParam(r, "iid")
c, err := s.store.GetContainerByID(id)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
respondNotFound(w, "container")
return store.Container{}, false
}
slog.Error("failed to get container", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
return store.Container{}, false
}
w2, err := s.store.GetWorkloadByRef(store.WorkloadKindProject, projectID)
if err != nil {
respondNotFound(w, "container")
return store.Container{}, false
}
if c.WorkloadID != w2.ID {
respondNotFound(w, "container")
return store.Container{}, false
}
if stageName != "" && c.Role != stageName {
respondNotFound(w, "container")
return store.Container{}, false
}
return c, true
}
+16 -1
View File
@@ -1,6 +1,7 @@
package api
import (
"encoding/json"
"errors"
"log/slog"
"net/http"
@@ -136,11 +137,25 @@ func (s *Server) cleanupContainer(r *http.Request, c store.Container) error {
func (s *Server) emitStaleCleanupEvent(c store.Container) {
msg := "Stale container cleaned up: " + c.ID + " (tag: " + c.ImageTag + ")"
// Use json.Marshal — c.Role is reconciler-derived from a Docker label and
// could contain quotes / control chars that break a hand-built JSON string.
metaBytes, err := json.Marshal(map[string]string{
"container_id": c.ID,
"workload_id": c.WorkloadID,
"role": c.Role,
})
if err != nil {
// json.Marshal on a flat string map can only fail in pathological
// circumstances (memory exhaustion); fall back to an empty object so
// the event still records.
metaBytes = []byte(`{}`)
}
evt, err := s.store.InsertEvent(store.EventLog{
Source: "stale_cleanup",
Severity: "info",
Message: msg,
Metadata: `{"container_id":"` + c.ID + `","workload_id":"` + c.WorkloadID + `","role":"` + c.Role + `"}`,
Metadata: string(metaBytes),
})
if err != nil {
slog.Error("stale cleanup: failed to persist event", "error", err)
+1
View File
@@ -117,6 +117,7 @@ func (d *Deployer) blueGreenDeploy(
WorkloadID: workloadID,
WorkloadKind: string(store.WorkloadKindProject),
Role: stage.Name,
StageID: stage.ID,
ContainerID: containerID,
ImageRef: project.Image + ":" + imageTag,
ImageTag: imageTag,
+1
View File
@@ -413,6 +413,7 @@ func (d *Deployer) executeDeploy(
WorkloadID: workloadID,
WorkloadKind: string(store.WorkloadKindProject),
Role: stage.Name,
StageID: stage.ID,
ContainerID: containerID,
ImageRef: project.Image + ":" + imageTag,
ImageTag: imageTag,
+128 -62
View File
@@ -1,10 +1,14 @@
// Package reconciler keeps the normalized containers index in sync with the
// Docker daemon. It runs on a tick (and one-shot at boot) — for every
// Tinyforge-managed container in `docker ps`, it dispatches to a workload by
// labels and upserts a Container row. Rows whose Docker container ID is no
// longer present are flipped to state='missing'.
// labels and writes a Container row through ReconcileContainer (which only
// touches Docker-derived fields on conflict, never deployer-owned columns
// like subdomain / proxy_route_id / npm_proxy_id / image_tag / stage_id).
// Rows whose Docker container ID is no longer present are flipped to
// state='missing'.
//
// Dispatch precedence:
// Dispatch precedence (a container with multiple matching labels is dispatched
// by the first match in this order):
// 1. tinyforge.workload.id label (canonical, new)
// 2. tinyforge.static-site label (legacy site — joins via static_sites)
// 3. com.docker.compose.project (stack — joins via Stack.ComposeProjectName)
@@ -16,6 +20,7 @@ package reconciler
import (
"context"
"errors"
"log/slog"
"strings"
"sync"
@@ -38,8 +43,9 @@ type Reconciler struct {
docker DockerLister
interval time.Duration
stop chan struct{}
wg sync.WaitGroup
stop chan struct{}
cancel context.CancelFunc // populated in Start; invoked by Stop so an in-flight tick is unblocked.
wg sync.WaitGroup
}
// New constructs a Reconciler. interval is the tick period; values <=0 fall
@@ -62,15 +68,28 @@ func New(st *store.Store, dockerClient DockerLister, interval time.Duration) *Re
// Start kicks off the background reconciliation loop. Runs one tick
// immediately so startup populates the index without waiting for the first
// timer fire. Idempotent: calling Start twice is a programming error.
// timer fire. The provided context is wrapped with a child cancel func so
// Stop() can unblock an in-flight Docker call.
func (r *Reconciler) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.wg.Add(1)
go r.loop(ctx)
}
// Stop signals the loop to exit and waits for the in-flight tick to finish.
// Stop signals the loop to exit. Cancels the child context FIRST so any
// in-flight `docker ps` (which can hang on a stuck daemon) returns promptly,
// then waits for the goroutine to finish. Idempotent.
func (r *Reconciler) Stop() {
close(r.stop)
if r.cancel != nil {
r.cancel()
}
select {
case <-r.stop:
// already closed
default:
close(r.stop)
}
r.wg.Wait()
}
@@ -84,8 +103,12 @@ func (r *Reconciler) ReconcileOnce(ctx context.Context) error {
}
seen := make(map[string]struct{}, len(items)) // container row IDs we touched
// Build a per-pass cache of compose project name → stack ID so we don't
// hit the DB once per compose container.
stackByCompose := map[string]store.Stack{}
for _, item := range items {
rowID := r.upsertFromItem(ctx, item)
rowID := r.upsertFromItem(item, stackByCompose)
if rowID != "" {
seen[rowID] = struct{}{}
}
@@ -121,7 +144,7 @@ func (r *Reconciler) loop(ctx context.Context) {
// upsertFromItem dispatches one container to its workload and writes the
// Container row. Returns the row ID on success or "" if no dispatch matched.
func (r *Reconciler) upsertFromItem(ctx context.Context, item docker.ReconcileItem) string {
func (r *Reconciler) upsertFromItem(item docker.ReconcileItem, stackCache map[string]store.Stack) string {
if id := item.Labels[docker.LabelWorkloadID]; id != "" {
return r.upsertByWorkloadLabel(item, id)
}
@@ -129,28 +152,86 @@ func (r *Reconciler) upsertFromItem(ctx context.Context, item docker.ReconcileIt
return r.upsertBySiteLabel(item, siteID)
}
if cp := item.Labels["com.docker.compose.project"]; cp != "" && strings.HasPrefix(cp, "tinyforge-") {
return r.upsertByComposeProject(item, cp)
return r.upsertByComposeProject(item, cp, stackCache)
}
return ""
}
// upsertByWorkloadLabel — canonical path. The row may already exist with a
// deployer-assigned UUID (project deploys do this so each blue-green slot
// has a stable handle); look it up by docker container ID first and fall
// back to the deterministic workloadID:role key.
// upsertByWorkloadLabel — canonical path. Project containers are owned by the
// deployer: the deployer pre-creates the row with a per-instance UUID and
// proxy/subdomain metadata. The reconciler resolves the existing row by
// docker container ID and only touches Docker-derived fields. If no existing
// row matches and the kind is project, we skip the upsert — inventing a
// deterministic-ID row would race with the deployer's UUID rows for stages
// with MaxInstances > 1, leaving ghost rows behind.
//
// Untrusted-label defense: a workload_id label that doesn't resolve to a
// known workload row is silently ignored. Anyone with Docker socket access
// could otherwise spawn a container with a forged label and steal the
// canonical slot for an existing workload.
func (r *Reconciler) upsertByWorkloadLabel(item docker.ReconcileItem, workloadID string) string {
w, err := r.store.GetWorkloadByID(workloadID)
if err != nil {
// Forged or stale label — log once at debug; tick rate keeps logs quiet.
slog.Debug("reconciler: unknown workload_id label", "workload_id", workloadID, "container_id", item.ID)
return ""
}
role := item.Labels[docker.LabelRole]
kind := item.Labels[docker.LabelWorkloadKind]
rowID := workloadIDRow(workloadID, kind, role, item.ID)
if existing, err := r.store.GetContainerByDockerID(item.ID); err == nil {
rowID = existing.ID
if kind != "" && kind != w.Kind {
slog.Warn("reconciler: workload kind mismatch", "label_kind", kind, "stored_kind", w.Kind, "workload_id", workloadID)
return ""
}
if kind == "" {
kind = w.Kind
}
// Resolve to existing row by Docker container ID.
existing, lookupErr := r.store.GetContainerByDockerID(item.ID)
if lookupErr == nil {
port := 0
if len(item.Ports) > 0 {
port = int(item.Ports[0])
}
if err := r.store.ReconcileContainer(store.Container{
ID: existing.ID,
WorkloadID: workloadID,
WorkloadKind: kind,
Role: role,
ContainerID: item.ID,
ImageRef: item.Image,
Host: "local",
State: normalizeState(item.State),
Port: port,
LastSeenAt: store.Now(),
}); err != nil {
slog.Warn("reconciler: reconcile by workload label", "container_id", item.ID, "error", err)
return ""
}
return existing.ID
}
if !errors.Is(lookupErr, store.ErrNotFound) {
slog.Warn("reconciler: lookup container by docker id", "container_id", item.ID, "error", lookupErr)
return ""
}
// No row yet. For project workloads, the deployer is the authoritative
// writer — wait for the deployer to create the row rather than
// inventing one with a deterministic key (which would collide with
// MaxInstances > 1 deploys).
if kind == string(store.WorkloadKindProject) {
return ""
}
// Site/stack reach this branch only when their kind-specific dispatcher
// hasn't run yet (e.g. boot tick before site row is registered). The
// site/stack dispatchers below own their own deterministic IDs.
rowID := workloadIDRow(workloadID, kind, role, item.ID)
port := 0
if len(item.Ports) > 0 {
port = int(item.Ports[0])
}
if err := r.store.UpsertContainer(store.Container{
if err := r.store.ReconcileContainer(store.Container{
ID: rowID,
WorkloadID: workloadID,
WorkloadKind: kind,
@@ -162,7 +243,7 @@ func (r *Reconciler) upsertByWorkloadLabel(item docker.ReconcileItem, workloadID
Port: port,
LastSeenAt: store.Now(),
}); err != nil {
slog.Warn("reconciler: upsert by workload label", "container_id", item.ID, "error", err)
slog.Warn("reconciler: reconcile by workload label (insert)", "container_id", item.ID, "error", err)
return ""
}
return rowID
@@ -178,7 +259,7 @@ func (r *Reconciler) upsertBySiteLabel(item docker.ReconcileItem, siteID string)
if len(item.Ports) > 0 {
port = int(item.Ports[0])
}
if err := r.store.UpsertContainer(store.Container{
if err := r.store.ReconcileContainer(store.Container{
ID: rowID,
WorkloadID: w.ID,
WorkloadKind: string(store.WorkloadKindSite),
@@ -190,15 +271,24 @@ func (r *Reconciler) upsertBySiteLabel(item docker.ReconcileItem, siteID string)
Port: port,
LastSeenAt: store.Now(),
}); err != nil {
slog.Warn("reconciler: upsert by site label", "container_id", item.ID, "error", err)
slog.Warn("reconciler: reconcile by site label", "container_id", item.ID, "error", err)
return ""
}
return rowID
}
func (r *Reconciler) upsertByComposeProject(item docker.ReconcileItem, composeProject string) string {
stack, err := r.findStackByComposeProject(composeProject)
if err != nil {
func (r *Reconciler) upsertByComposeProject(item docker.ReconcileItem, composeProject string, cache map[string]store.Stack) string {
stack, ok := cache[composeProject]
if !ok {
st, err := r.store.GetStackByComposeProjectName(composeProject)
if err != nil {
cache[composeProject] = store.Stack{} // negative cache for the rest of the pass
return ""
}
stack = st
cache[composeProject] = st
}
if stack.ID == "" {
return ""
}
w, err := r.store.GetWorkloadByRef(store.WorkloadKindStack, stack.ID)
@@ -214,7 +304,7 @@ func (r *Reconciler) upsertByComposeProject(item docker.ReconcileItem, composePr
if len(item.Ports) > 0 {
port = int(item.Ports[0])
}
if err := r.store.UpsertContainer(store.Container{
if err := r.store.ReconcileContainer(store.Container{
ID: rowID,
WorkloadID: w.ID,
WorkloadKind: string(store.WorkloadKindStack),
@@ -226,66 +316,42 @@ func (r *Reconciler) upsertByComposeProject(item docker.ReconcileItem, composePr
Port: port,
LastSeenAt: store.Now(),
}); err != nil {
slog.Warn("reconciler: upsert by compose project", "container_id", item.ID, "error", err)
slog.Warn("reconciler: reconcile by compose project", "container_id", item.ID, "error", err)
return ""
}
return rowID
}
// findStackByComposeProject scans all stacks for a matching ComposeProjectName.
// Linear; the stack count is small in practice.
func (r *Reconciler) findStackByComposeProject(composeProject string) (store.Stack, error) {
stacks, err := r.store.GetAllStacks()
if err != nil {
return store.Stack{}, err
}
for _, s := range stacks {
if s.ComposeProjectName == composeProject {
return s, nil
}
}
return store.Stack{}, store.ErrNotFound
}
// markMissingRows flips state to 'missing' for any container row whose Docker
// container ID was not seen in this pass. Rows with empty container_id are
// skipped — the deployer creates them ahead of `docker create` so they're
// transient and shouldn't be marked missing on a tick that races the deploy.
// container ID was not seen in this pass. Uses ListMissingSweepRows to scan
// only rows that are bound to a real container and not already missing.
func (r *Reconciler) markMissingRows(seen map[string]struct{}) {
rows, err := r.store.ListContainers(store.ContainerFilter{})
rows, err := r.store.ListMissingSweepRows()
if err != nil {
slog.Warn("reconciler: list containers for missing-sweep", "error", err)
slog.Warn("reconciler: list rows for missing-sweep", "error", err)
return
}
for _, row := range rows {
if _, ok := seen[row.ID]; ok {
continue
}
if row.ContainerID == "" {
continue // never bound to a real container yet
}
if row.State == "missing" {
continue // already marked
}
if err := r.store.MarkContainerMissing(row.ID); err != nil {
slog.Warn("reconciler: mark missing", "row_id", row.ID, "error", err)
}
}
}
// workloadIDRow picks the row ID for a workload-labelled container.
// Stack rows use the deterministic workloadID:role pattern; sites use
// workloadID:site. Project rows have a per-deploy UUID assigned by the
// deployer and ALSO carry the role label (= stage name), so the same
// pattern resolves to the same row across deployer + reconciler upserts.
// workloadIDRow picks the row ID for a non-project workload-labelled
// container that has no existing row. Stack rows use workloadID:role; sites
// use workloadID:site. Project rows are never invented here — see
// upsertByWorkloadLabel for the rationale.
func workloadIDRow(workloadID, kind, role, containerID string) string {
if role != "" {
return workloadID + ":" + role
}
if kind == string(store.WorkloadKindSite) {
return workloadID + ":site"
}
// Last-resort fallback: container ID. Uncommon path.
if role != "" {
return workloadID + ":" + role
}
return workloadID + ":" + containerID
}
+116
View File
@@ -188,6 +188,122 @@ func TestReconcileIgnoresUnmanagedContainers(t *testing.T) {
}
}
// TestReconcileDoesNotClobberDeployerFields guards against the regression where
// the reconciler's upsert wiped subdomain / proxy_route_id / npm_proxy_id /
// image_tag / stage_id on every tick because those columns were included in
// the ON CONFLICT DO UPDATE SET clause but never populated by the reconciler.
func TestReconcileDoesNotClobberDeployerFields(t *testing.T) {
st := newTestStore(t)
// Project workload — exercises the path most affected by the regression
// (proxies, blue-green slots, image-tag-based stale detection).
project, err := st.CreateProject(store.Project{Name: "p", Image: "nginx"})
if err != nil {
t.Fatalf("CreateProject: %v", err)
}
w, _ := st.GetWorkloadByRef(store.WorkloadKindProject, project.ID)
// Deployer wrote the row with proxy / subdomain / image_tag / stage_id.
deployerRow := store.Container{
ID: "deploy-uuid-1", WorkloadID: w.ID, WorkloadKind: "project",
Role: "prod", StageID: "stage-prod-id", ContainerID: "docker-aaa",
ImageRef: "nginx:1.27", ImageTag: "1.27", State: "running", Port: 8080,
Subdomain: "prod-p", ProxyRouteID: "route-42", NpmProxyID: 7,
}
if err := st.UpsertContainer(deployerRow); err != nil {
t.Fatalf("seed deployer row: %v", err)
}
// Reconciler sees the same docker container — no proxy fields in labels.
fake := &fakeDocker{items: []docker.ReconcileItem{{
ID: "docker-aaa", Image: "nginx:1.27", State: "running",
Labels: map[string]string{
docker.LabelManaged: "true",
docker.LabelWorkloadID: w.ID,
docker.LabelWorkloadKind: "project",
docker.LabelRole: "prod",
},
Ports: []uint16{8080},
}}}
r := New(st, fake, 0)
if err := r.ReconcileOnce(context.Background()); err != nil {
t.Fatalf("ReconcileOnce: %v", err)
}
got, _ := st.GetContainerByID("deploy-uuid-1")
if got.Subdomain != "prod-p" {
t.Fatalf("subdomain wiped: %q", got.Subdomain)
}
if got.ProxyRouteID != "route-42" {
t.Fatalf("proxy_route_id wiped: %q", got.ProxyRouteID)
}
if got.NpmProxyID != 7 {
t.Fatalf("npm_proxy_id wiped: %d", got.NpmProxyID)
}
if got.ImageTag != "1.27" {
t.Fatalf("image_tag wiped: %q", got.ImageTag)
}
if got.StageID != "stage-prod-id" {
t.Fatalf("stage_id wiped: %q", got.StageID)
}
}
// TestReconcileRejectsForgedWorkloadLabel guards C2 — a Docker container
// claiming a non-existent workload_id must be ignored, not adopted into the
// containers index.
func TestReconcileRejectsForgedWorkloadLabel(t *testing.T) {
st := newTestStore(t)
fake := &fakeDocker{items: []docker.ReconcileItem{{
ID: "docker-evil",
Labels: map[string]string{
docker.LabelManaged: "true",
docker.LabelWorkloadID: "wl-does-not-exist",
docker.LabelWorkloadKind: "project",
docker.LabelRole: "prod",
},
}}}
r := New(st, fake, 0)
if err := r.ReconcileOnce(context.Background()); err != nil {
t.Fatalf("ReconcileOnce: %v", err)
}
rows, _ := st.ListContainers(store.ContainerFilter{})
if len(rows) != 0 {
t.Fatalf("forged label should produce no row, got %d", len(rows))
}
}
// TestReconcileSkipsProjectInsertWithoutDeployerRow guards H3 — the reconciler
// must not invent a project container row, since the deployer is the
// authoritative writer and inventing rows races with MaxInstances > 1 deploys.
func TestReconcileSkipsProjectInsertWithoutDeployerRow(t *testing.T) {
st := newTestStore(t)
project, err := st.CreateProject(store.Project{Name: "p2", Image: "nginx"})
if err != nil {
t.Fatalf("CreateProject: %v", err)
}
w, _ := st.GetWorkloadByRef(store.WorkloadKindProject, project.ID)
// Reconciler sees a real container with project labels but no deployer
// row exists yet (race during deploy).
fake := &fakeDocker{items: []docker.ReconcileItem{{
ID: "docker-race", Image: "nginx", State: "running",
Labels: map[string]string{
docker.LabelManaged: "true",
docker.LabelWorkloadID: w.ID,
docker.LabelWorkloadKind: "project",
docker.LabelRole: "prod",
},
}}}
r := New(st, fake, 0)
if err := r.ReconcileOnce(context.Background()); err != nil {
t.Fatalf("ReconcileOnce: %v", err)
}
rows, _ := st.ListContainersByWorkload(w.ID)
if len(rows) != 0 {
t.Fatalf("project insert without deployer row should be skipped, got %d rows", len(rows))
}
}
func TestReconcileNormalizesState(t *testing.T) {
st := newTestStore(t)
stack, _ := st.CreateStack(store.Stack{
+8 -2
View File
@@ -79,7 +79,10 @@ func (s *Store) UpdateApp(a App) error {
if err != nil {
return fmt.Errorf("update app: %w", err)
}
n, _ := result.RowsAffected()
n, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if n == 0 {
return fmt.Errorf("app %s: %w", a.ID, ErrNotFound)
}
@@ -102,7 +105,10 @@ func (s *Store) DeleteApp(id string) error {
if err != nil {
return fmt.Errorf("delete app: %w", err)
}
n, _ := result.RowsAffected()
n, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if n == 0 {
return fmt.Errorf("app %s: %w", id, ErrNotFound)
}
+135 -55
View File
@@ -9,18 +9,21 @@ import (
"github.com/google/uuid"
)
const containerColumns = `id, workload_id, workload_kind, role, container_id,
// containerColumns is the canonical column list for `containers` queries.
// stage_id is populated by the deployer for project containers (so ListProxyRoutes
// survives stage renames) and left empty for stacks and sites.
const containerColumns = `id, workload_id, workload_kind, role, stage_id, container_id,
image_ref, image_tag, host, state, port,
subdomain, proxy_route_id, npm_proxy_id,
last_seen_at, extra_json, created_at, updated_at`
last_seen_at, created_at, updated_at`
func scanContainer(scanner interface{ Scan(...any) error }) (Container, error) {
var c Container
err := scanner.Scan(
&c.ID, &c.WorkloadID, &c.WorkloadKind, &c.Role, &c.ContainerID,
&c.ID, &c.WorkloadID, &c.WorkloadKind, &c.Role, &c.StageID, &c.ContainerID,
&c.ImageRef, &c.ImageTag, &c.Host, &c.State, &c.Port,
&c.Subdomain, &c.ProxyRouteID, &c.NpmProxyID,
&c.LastSeenAt, &c.ExtraJSON, &c.CreatedAt, &c.UpdatedAt,
&c.LastSeenAt, &c.CreatedAt, &c.UpdatedAt,
)
return c, err
}
@@ -34,19 +37,16 @@ func (s *Store) CreateContainer(c Container) (Container, error) {
if c.Host == "" {
c.Host = "local"
}
if c.ExtraJSON == "" {
c.ExtraJSON = "{}"
}
c.CreatedAt = Now()
c.UpdatedAt = c.CreatedAt
_, err := s.db.Exec(
`INSERT INTO containers (`+containerColumns+`)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
c.ID, c.WorkloadID, c.WorkloadKind, c.Role, c.ContainerID,
c.ID, c.WorkloadID, c.WorkloadKind, c.Role, c.StageID, c.ContainerID,
c.ImageRef, c.ImageTag, c.Host, c.State, c.Port,
c.Subdomain, c.ProxyRouteID, c.NpmProxyID,
c.LastSeenAt, c.ExtraJSON, c.CreatedAt, c.UpdatedAt,
c.LastSeenAt, c.CreatedAt, c.UpdatedAt,
)
if err != nil {
return Container{}, fmt.Errorf("insert container: %w", err)
@@ -54,9 +54,12 @@ func (s *Store) CreateContainer(c Container) (Container, error) {
return c, nil
}
// UpsertContainer is the reconciler's primary write path. It updates an
// existing row (matched by ID) or inserts a new one. Caller is responsible
// for setting ID — use container_id-based lookup before calling this.
// UpsertContainer is the full-write path used by the deployer paths
// (stack manager, static-site manager) that own all fields of a row. Inserts
// if missing, replaces every column on conflict. The reconciler must NOT call
// this — it would clobber deployer-written subdomain / proxy_route_id /
// npm_proxy_id / image_tag with the empty values it doesn't know about. Use
// ReconcileContainer instead.
func (s *Store) UpsertContainer(c Container) error {
if c.ID == "" {
return fmt.Errorf("UpsertContainer: ID is required")
@@ -64,9 +67,6 @@ func (s *Store) UpsertContainer(c Container) error {
if c.Host == "" {
c.Host = "local"
}
if c.ExtraJSON == "" {
c.ExtraJSON = "{}"
}
c.UpdatedAt = Now()
if c.CreatedAt == "" {
c.CreatedAt = c.UpdatedAt
@@ -80,6 +80,7 @@ func (s *Store) UpsertContainer(c Container) error {
workload_id=excluded.workload_id,
workload_kind=excluded.workload_kind,
role=excluded.role,
stage_id=excluded.stage_id,
container_id=excluded.container_id,
image_ref=excluded.image_ref,
image_tag=excluded.image_tag,
@@ -90,12 +91,11 @@ func (s *Store) UpsertContainer(c Container) error {
proxy_route_id=excluded.proxy_route_id,
npm_proxy_id=excluded.npm_proxy_id,
last_seen_at=excluded.last_seen_at,
extra_json=excluded.extra_json,
updated_at=excluded.updated_at`,
c.ID, c.WorkloadID, c.WorkloadKind, c.Role, c.ContainerID,
c.ID, c.WorkloadID, c.WorkloadKind, c.Role, c.StageID, c.ContainerID,
c.ImageRef, c.ImageTag, c.Host, c.State, c.Port,
c.Subdomain, c.ProxyRouteID, c.NpmProxyID,
c.LastSeenAt, c.ExtraJSON, c.CreatedAt, c.UpdatedAt,
c.LastSeenAt, c.CreatedAt, c.UpdatedAt,
)
if err != nil {
return fmt.Errorf("upsert container: %w", err)
@@ -103,6 +103,44 @@ func (s *Store) UpsertContainer(c Container) error {
return nil
}
// ReconcileContainer is the reconciler's write path. INSERTs a new row when
// none exists (with all label-derived metadata) and on conflict updates ONLY
// the Docker-derived fields the reconciler can observe — never touching
// subdomain / proxy_route_id / npm_proxy_id / image_tag / stage_id, which are
// owned by the deployer paths and would be wiped to empty if included.
func (s *Store) ReconcileContainer(c Container) error {
if c.ID == "" {
return fmt.Errorf("ReconcileContainer: ID is required")
}
if c.Host == "" {
c.Host = "local"
}
c.UpdatedAt = Now()
if c.CreatedAt == "" {
c.CreatedAt = c.UpdatedAt
}
_, err := s.db.Exec(
`INSERT INTO containers (`+containerColumns+`)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
container_id=excluded.container_id,
image_ref=excluded.image_ref,
state=excluded.state,
port=excluded.port,
last_seen_at=excluded.last_seen_at,
updated_at=excluded.updated_at`,
c.ID, c.WorkloadID, c.WorkloadKind, c.Role, c.StageID, c.ContainerID,
c.ImageRef, c.ImageTag, c.Host, c.State, c.Port,
c.Subdomain, c.ProxyRouteID, c.NpmProxyID,
c.LastSeenAt, c.CreatedAt, c.UpdatedAt,
)
if err != nil {
return fmt.Errorf("reconcile container: %w", err)
}
return nil
}
// GetContainerByID returns a single container row.
func (s *Store) GetContainerByID(id string) (Container, error) {
c, err := scanContainer(s.db.QueryRow(
@@ -136,9 +174,8 @@ func (s *Store) GetContainerByDockerID(dockerID string) (Container, error) {
}
// ListProxyRoutes returns proxy-enabled project containers joined with
// project + stage names. Reads from the normalized containers index. Stage
// ID is resolved through a (project_id, role=stage_name) join, which is
// uniquely indexed via UNIQUE(project_id, name) on stages.
// project + stage names. Reads from the normalized containers index and
// joins through stage_id so a stage rename does not orphan the row's view.
//
// Source is reported as "instance" for back-compat with the Proxies page
// filter (the frontend keys off the literal string).
@@ -150,7 +187,7 @@ func (s *Store) ListProxyRoutes(domain string) ([]ProxyRoute, error) {
FROM containers c
JOIN workloads w ON w.id = c.workload_id AND w.kind = 'project'
JOIN projects p ON p.id = w.ref_id
JOIN stages s ON s.project_id = p.id AND s.name = c.role
JOIN stages s ON s.id = c.stage_id OR (c.stage_id = '' AND s.project_id = p.id AND s.name = c.role)
WHERE c.subdomain != '' AND (c.proxy_route_id != '' OR c.npm_proxy_id > 0)
ORDER BY p.name, s.name, c.created_at DESC`,
)
@@ -179,17 +216,23 @@ func (s *Store) ListProxyRoutes(domain string) ([]ProxyRoute, error) {
}
// ListContainersByStageID returns project containers for the given stage,
// newest first. Resolves stage → project_id → workload(kind=project) →
// containers with role = stage.name. Replaces GetInstancesByStageID for
// callers in the deployer / API layer.
// newest first. Resolves via stage_id with a fallback to the legacy
// (stage.name = container.role) join for rows written before the stage_id
// column was populated. Replaces GetInstancesByStageID.
func (s *Store) ListContainersByStageID(stageID string) ([]Container, error) {
rows, err := s.db.Query(`
SELECT `+prefixCols(containerColumns, "c.")+`
FROM containers c
JOIN workloads w ON w.id = c.workload_id AND w.kind = 'project'
JOIN stages s ON s.project_id = w.ref_id AND s.name = c.role
WHERE s.id = ?
ORDER BY c.created_at DESC`, stageID)
LEFT JOIN stages s ON s.id = ?
WHERE c.stage_id = ?
OR (c.stage_id = '' AND s.id IS NOT NULL
AND c.role = s.name
AND EXISTS (
SELECT 1 FROM workloads w
WHERE w.id = c.workload_id
AND w.kind = 'project'
AND w.ref_id = s.project_id))
ORDER BY c.created_at DESC`, stageID, stageID)
if err != nil {
return nil, fmt.Errorf("query containers by stage: %w", err)
}
@@ -244,6 +287,11 @@ func (s *Store) ListContainers(f ContainerFilter) ([]Container, error) {
where []string
args []any
)
needsAppJoin := f.AppID != ""
if needsAppJoin {
where = append(where, "w.app_id = ?")
args = append(args, f.AppID)
}
if f.WorkloadID != "" {
where = append(where, "c.workload_id = ?")
args = append(args, f.WorkloadID)
@@ -256,23 +304,15 @@ func (s *Store) ListContainers(f ContainerFilter) ([]Container, error) {
where = append(where, "c.state = ?")
args = append(args, f.State)
}
var query string
if f.AppID != "" {
query = `SELECT ` + prefixCols(containerColumns, "c.") + `
FROM containers c JOIN workloads w ON w.id = c.workload_id
WHERE w.app_id = ?`
args = append([]any{f.AppID}, args...)
if len(where) > 0 {
query += " AND " + strings.Join(where, " AND ")
}
query += " ORDER BY c.created_at DESC"
} else {
query = `SELECT ` + prefixCols(containerColumns, "c.") + ` FROM containers c`
if len(where) > 0 {
query += " WHERE " + strings.Join(where, " AND ")
}
query += " ORDER BY c.created_at DESC"
query := `SELECT ` + prefixCols(containerColumns, "c.") + ` FROM containers c`
if needsAppJoin {
query += ` JOIN workloads w ON w.id = c.workload_id`
}
if len(where) > 0 {
query += " WHERE " + strings.Join(where, " AND ")
}
query += " ORDER BY c.created_at DESC"
rows, err := s.db.Query(query, args...)
if err != nil {
@@ -295,24 +335,24 @@ func (s *Store) ListContainers(f ContainerFilter) ([]Container, error) {
// Use this from the deployer when proxy / subdomain assignments change.
func (s *Store) UpdateContainer(c Container) error {
c.UpdatedAt = Now()
if c.ExtraJSON == "" {
c.ExtraJSON = "{}"
}
result, err := s.db.Exec(
`UPDATE containers SET workload_id=?, workload_kind=?, role=?, container_id=?,
`UPDATE containers SET workload_id=?, workload_kind=?, role=?, stage_id=?, container_id=?,
image_ref=?, image_tag=?, host=?, state=?, port=?,
subdomain=?, proxy_route_id=?, npm_proxy_id=?,
last_seen_at=?, extra_json=?, updated_at=?
last_seen_at=?, updated_at=?
WHERE id=?`,
c.WorkloadID, c.WorkloadKind, c.Role, c.ContainerID,
c.WorkloadID, c.WorkloadKind, c.Role, c.StageID, c.ContainerID,
c.ImageRef, c.ImageTag, c.Host, c.State, c.Port,
c.Subdomain, c.ProxyRouteID, c.NpmProxyID,
c.LastSeenAt, c.ExtraJSON, c.UpdatedAt, c.ID,
c.LastSeenAt, c.UpdatedAt, c.ID,
)
if err != nil {
return fmt.Errorf("update container: %w", err)
}
n, _ := result.RowsAffected()
n, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if n == 0 {
return fmt.Errorf("container %s: %w", c.ID, ErrNotFound)
}
@@ -330,7 +370,10 @@ func (s *Store) UpdateContainerState(id, state string) error {
if err != nil {
return fmt.Errorf("update container state: %w", err)
}
n, _ := result.RowsAffected()
n, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if n == 0 {
return fmt.Errorf("container %s: %w", id, ErrNotFound)
}
@@ -350,7 +393,10 @@ func (s *Store) DeleteContainer(id string) error {
if err != nil {
return fmt.Errorf("delete container: %w", err)
}
n, _ := result.RowsAffected()
n, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if n == 0 {
return fmt.Errorf("container %s: %w", id, ErrNotFound)
}
@@ -367,6 +413,40 @@ func (s *Store) DeleteContainersByWorkload(workloadID string) error {
return nil
}
// ListMissingSweepRows returns rows the reconciler must consider for the
// missing-state sweep — bound to a real Docker container and not already
// flipped to 'missing'. Used in place of a full ListContainers scan to keep
// the per-tick query proportional to the live set.
func (s *Store) ListMissingSweepRows() ([]struct {
ID string
ContainerID string
}, error) {
rows, err := s.db.Query(
`SELECT id, container_id FROM containers
WHERE container_id != '' AND state != 'missing'`,
)
if err != nil {
return nil, fmt.Errorf("query missing-sweep rows: %w", err)
}
defer rows.Close()
out := []struct {
ID string
ContainerID string
}{}
for rows.Next() {
var r struct {
ID string
ContainerID string
}
if err := rows.Scan(&r.ID, &r.ContainerID); err != nil {
return nil, fmt.Errorf("scan missing-sweep row: %w", err)
}
out = append(out, r)
}
return out, rows.Err()
}
// prefixCols rewrites a comma-separated column list to use a table alias prefix.
// Used by ListContainers when joining containers (alias `c`) to workloads.
func prefixCols(cols, prefix string) string {
-3
View File
@@ -22,9 +22,6 @@ func TestCreateAndGetContainer(t *testing.T) {
if c.Host != "local" {
t.Fatalf("default host should be 'local', got %q", c.Host)
}
if c.ExtraJSON != "{}" {
t.Fatalf("default extra_json should be '{}', got %q", c.ExtraJSON)
}
got, err := s.GetContainerByID(c.ID)
if err != nil {
+20 -17
View File
@@ -365,24 +365,27 @@ type Workload struct {
// Replaces the project-specific Instance table after migration. Subdomain/
// proxy fields are hoisted as first-class columns because ListProxyRoutes,
// stale detection, and dashboard queries filter on them frequently.
//
// StageID is populated by the deployer for project containers so ListProxyRoutes
// survives stage renames; it stays empty for stack and site rows.
type Container struct {
ID string `json:"id"`
WorkloadID string `json:"workload_id"`
WorkloadKind string `json:"workload_kind"` // denormalized for filtered queries
Role string `json:"role"` // stage name (project), service name (stack), '' (site)
ContainerID string `json:"container_id"` // Docker container ID; '' between create+start
ImageRef string `json:"image_ref"` // "image:tag" as scheduled
ImageTag string `json:"image_tag"` // just the tag, for ListProxyRoutes
Host string `json:"host"`
State string `json:"state"` // running | stopped | failed | removing | missing
Port int `json:"port"`
Subdomain string `json:"subdomain"`
ProxyRouteID string `json:"proxy_route_id"`
NpmProxyID int `json:"npm_proxy_id"`
LastSeenAt string `json:"last_seen_at"`
ExtraJSON string `json:"extra_json"` // {} default; reserved for kind-specific forward-compat
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
ID string `json:"id"`
WorkloadID string `json:"workload_id"`
WorkloadKind string `json:"workload_kind"` // denormalized for filtered queries
Role string `json:"role"` // stage name (project), service name (stack), '' (site)
StageID string `json:"stage_id"` // project containers only; '' otherwise
ContainerID string `json:"container_id"` // Docker container ID; '' between create+start
ImageRef string `json:"image_ref"` // "image:tag" as scheduled
ImageTag string `json:"image_tag"` // just the tag, for ListProxyRoutes
Host string `json:"host"`
State string `json:"state"` // running | stopped | failed | removing | missing
Port int `json:"port"`
Subdomain string `json:"subdomain"`
ProxyRouteID string `json:"proxy_route_id"`
NpmProxyID int `json:"npm_proxy_id"`
LastSeenAt string `json:"last_seen_at"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
// App is an optional grouping of workloads (e.g., "my-saas" = web project + worker stack + redis stack).
+87 -83
View File
@@ -54,7 +54,8 @@ func scanProject(r rowScanner) (Project, error) {
}
// CreateProject inserts a new project and returns it. A webhook secret is
// generated automatically if one is not already set on the input.
// generated automatically if one is not already set on the input. Project
// row + matching workload row are written in a single transaction.
func (s *Store) CreateProject(p Project) (Project, error) {
p.ID = uuid.New().String()
p.CreatedAt = Now()
@@ -69,18 +70,27 @@ func (s *Store) CreateProject(p Project) (Project, error) {
if p.WebhookRequireSignature {
requireSig = 1
}
_, err := s.db.Exec(
tx, err := s.db.Begin()
if err != nil {
return Project{}, fmt.Errorf("begin: %w", err)
}
defer tx.Rollback()
if _, err := tx.Exec(
`INSERT INTO projects (`+projectCols+`)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
p.ID, p.Name, p.Registry, p.Image, p.Port, p.Healthcheck, p.Env, p.Volumes,
p.NpmAccessListID, p.WebhookSecret, p.WebhookSigningSecret, requireSig,
p.NotificationURL, p.NotificationSecret, p.CreatedAt, p.UpdatedAt,
)
if err != nil {
); err != nil {
return Project{}, fmt.Errorf("insert project: %w", err)
}
if err := s.SyncProjectWorkload(p); err != nil {
return Project{}, fmt.Errorf("sync project workload: %w", err)
if err := SyncProjectWorkloadTx(tx, p); err != nil {
return Project{}, err
}
if err := tx.Commit(); err != nil {
return Project{}, fmt.Errorf("commit: %w", err)
}
return p, nil
}
@@ -157,78 +167,74 @@ func (s *Store) GetProjectsByImage(image string) ([]Project, error) {
return projects, rows.Err()
}
// updateProjectAndSyncWorkloadTx performs the parent UPDATE + workload sync in
// a single transaction. Used by every Set*Secret / UpdateProject path so the
// project row and the workload row never desync after a partial failure.
// updateSQL must be a parameterized UPDATE on `projects` ending with `WHERE id=?`;
// args are the parameter values in order, with the project ID last.
func (s *Store) updateProjectAndSyncWorkloadTx(id string, updateSQL string, args ...any) error {
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer tx.Rollback()
result, err := tx.Exec(updateSQL, args...)
if err != nil {
return fmt.Errorf("update project: %w", err)
}
n, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if n == 0 {
return fmt.Errorf("project %s: %w", id, ErrNotFound)
}
// Re-read the row inside the transaction so the workload sync sees the
// canonical values (the caller may have only updated one column).
row := tx.QueryRow(`SELECT `+projectCols+` FROM projects WHERE id = ?`, id)
p, err := scanProject(row)
if err != nil {
return fmt.Errorf("reread project for workload sync: %w", err)
}
if err := SyncProjectWorkloadTx(tx, p); err != nil {
return err
}
return tx.Commit()
}
// UpdateProject updates an existing project's mutable fields. Webhook secret
// and notification_secret are intentionally not updated here — use the
// dedicated SetProjectWebhookSecret / SetProjectNotificationSecret helpers.
func (s *Store) UpdateProject(p Project) error {
p.UpdatedAt = Now()
result, err := s.db.Exec(
return s.updateProjectAndSyncWorkloadTx(p.ID,
`UPDATE projects SET name=?, registry=?, image=?, port=?, healthcheck=?, env=?, volumes=?,
npm_access_list_id=?, notification_url=?, updated_at=?
WHERE id=?`,
p.Name, p.Registry, p.Image, p.Port, p.Healthcheck, p.Env, p.Volumes,
p.NpmAccessListID, p.NotificationURL, p.UpdatedAt, p.ID,
)
if err != nil {
return fmt.Errorf("update project: %w", err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("project %s: %w", p.ID, ErrNotFound)
}
// Re-read so the workload sync sees the canonical row (e.g. webhook
// secrets that UpdateProject does not write but other call sites do).
current, err := s.GetProjectByID(p.ID)
if err != nil {
return fmt.Errorf("reread project for workload sync: %w", err)
}
if err := s.SyncProjectWorkload(current); err != nil {
return fmt.Errorf("sync project workload: %w", err)
}
return nil
}
// SetProjectWebhookSecret assigns a webhook secret to a project.
// Pass an empty string to disable webhook access for the project.
func (s *Store) SetProjectWebhookSecret(id, secret string) error {
result, err := s.db.Exec(
return s.updateProjectAndSyncWorkloadTx(id,
`UPDATE projects SET webhook_secret=?, updated_at=? WHERE id=?`,
secret, Now(), id,
)
if err != nil {
return fmt.Errorf("set project webhook secret: %w", err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("project %s: %w", id, ErrNotFound)
}
current, err := s.GetProjectByID(id)
if err != nil {
return fmt.Errorf("reread project for workload sync: %w", err)
}
return s.SyncProjectWorkload(current)
}
// SetProjectWebhookSigningSecret assigns the HMAC signing secret used to
// verify inbound webhook payloads. Pass an empty string to clear it (which
// also implicitly disables signature enforcement on the next request).
func (s *Store) SetProjectWebhookSigningSecret(id, secret string) error {
result, err := s.db.Exec(
return s.updateProjectAndSyncWorkloadTx(id,
`UPDATE projects SET webhook_signing_secret=?, updated_at=? WHERE id=?`,
secret, Now(), id,
)
if err != nil {
return fmt.Errorf("set project webhook signing secret: %w", err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("project %s: %w", id, ErrNotFound)
}
current, err := s.GetProjectByID(id)
if err != nil {
return fmt.Errorf("reread project for workload sync: %w", err)
}
return s.SyncProjectWorkload(current)
}
// SetProjectWebhookRequireSignature toggles whether unsigned (or
@@ -238,22 +244,10 @@ func (s *Store) SetProjectWebhookRequireSignature(id string, require bool) error
if require {
v = 1
}
result, err := s.db.Exec(
return s.updateProjectAndSyncWorkloadTx(id,
`UPDATE projects SET webhook_require_signature=?, updated_at=? WHERE id=?`,
v, Now(), id,
)
if err != nil {
return fmt.Errorf("set project webhook require_signature: %w", err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("project %s: %w", id, ErrNotFound)
}
current, err := s.GetProjectByID(id)
if err != nil {
return fmt.Errorf("reread project for workload sync: %w", err)
}
return s.SyncProjectWorkload(current)
}
// EnsureProjectWebhookSecret returns the current webhook secret for a project,
@@ -278,22 +272,10 @@ func (s *Store) EnsureProjectWebhookSecret(id string) (string, error) {
// secret. Empty string disables HMAC signing for this project (notifications
// still send unsigned, falling through to the parent tier's secret if any).
func (s *Store) SetProjectNotificationSecret(id, secret string) error {
result, err := s.db.Exec(
return s.updateProjectAndSyncWorkloadTx(id,
`UPDATE projects SET notification_secret=?, updated_at=? WHERE id=?`,
secret, Now(), id,
)
if err != nil {
return fmt.Errorf("set project notification secret: %w", err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("project %s: %w", id, ErrNotFound)
}
current, err := s.GetProjectByID(id)
if err != nil {
return fmt.Errorf("reread project for workload sync: %w", err)
}
return s.SyncProjectWorkload(current)
}
// EnsureProjectNotificationSecret returns the current outgoing-webhook signing
@@ -316,23 +298,45 @@ func (s *Store) EnsureProjectNotificationSecret(id string) (string, error) {
// DeleteProject removes a project by ID. Cascading deletes handle stages, instances, and deploys.
// Workload row + container index entries are removed too so the global views
// don't show ghost rows after a project is gone.
// don't show ghost rows after a project is gone. Atomic: the project, its
// container index entries, and its workload row all live or die together.
func (s *Store) DeleteProject(id string) error {
result, err := s.db.Exec(`DELETE FROM projects WHERE id = ?`, id)
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer tx.Rollback()
// Resolve the workload before deleting the project so we have the
// workload ID for the cascade.
var workloadID string
if err := tx.QueryRow(
`SELECT id FROM workloads WHERE kind = ? AND ref_id = ?`,
string(WorkloadKindProject), id,
).Scan(&workloadID); err != nil && !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("lookup project workload: %w", err)
}
result, err := tx.Exec(`DELETE FROM projects WHERE id = ?`, id)
if err != nil {
return fmt.Errorf("delete project: %w", err)
}
n, _ := result.RowsAffected()
n, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if n == 0 {
return fmt.Errorf("project %s: %w", id, ErrNotFound)
}
if w, err := s.GetWorkloadByRef(WorkloadKindProject, id); err == nil {
if err := s.DeleteContainersByWorkload(w.ID); err != nil {
if workloadID != "" {
if _, err := tx.Exec(`DELETE FROM containers WHERE workload_id = ?`, workloadID); err != nil {
return fmt.Errorf("delete project containers: %w", err)
}
if err := s.DeleteWorkload(w.ID); err != nil {
if _, err := tx.Exec(`DELETE FROM workloads WHERE id = ?`, workloadID); err != nil {
return fmt.Errorf("delete project workload: %w", err)
}
}
return nil
return tx.Commit()
}
+78 -16
View File
@@ -11,7 +11,9 @@ import (
const stackCols = `id, name, description, compose_project_name, status, error,
current_revision_id, created_at, updated_at`
// CreateStack inserts a new stack and returns it.
// CreateStack inserts a new stack and returns it. Stack row + matching
// workload row are written in a single transaction so a partial failure
// leaves no orphan.
func (s *Store) CreateStack(st Stack) (Stack, error) {
st.ID = uuid.New().String()
st.CreatedAt = Now()
@@ -20,17 +22,25 @@ func (s *Store) CreateStack(st Stack) (Stack, error) {
st.Status = "stopped"
}
_, err := s.db.Exec(
tx, err := s.db.Begin()
if err != nil {
return Stack{}, fmt.Errorf("begin: %w", err)
}
defer tx.Rollback()
if _, err := tx.Exec(
`INSERT INTO stacks (`+stackCols+`)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
st.ID, st.Name, st.Description, st.ComposeProjectName, st.Status,
st.Error, st.CurrentRevisionID, st.CreatedAt, st.UpdatedAt,
)
if err != nil {
); err != nil {
return Stack{}, fmt.Errorf("insert stack: %w", err)
}
if err := s.SyncStackWorkload(st); err != nil {
return Stack{}, fmt.Errorf("sync stack workload: %w", err)
if err := SyncStackWorkloadTx(tx, st); err != nil {
return Stack{}, err
}
if err := tx.Commit(); err != nil {
return Stack{}, fmt.Errorf("commit: %w", err)
}
return st, nil
}
@@ -49,6 +59,26 @@ func (s *Store) GetStackByID(id string) (Stack, error) {
return st, nil
}
// GetStackByComposeProjectName looks up a stack by its compose project name.
// Compose project names are unique per the stacks table schema, so this is an
// O(1) index lookup. Used by the reconciler to resolve compose-managed
// containers without scanning every stack.
func (s *Store) GetStackByComposeProjectName(name string) (Stack, error) {
if name == "" {
return Stack{}, ErrNotFound
}
st, err := scanStackRow(s.db.QueryRow(
`SELECT `+stackCols+` FROM stacks WHERE compose_project_name = ?`, name,
))
if errors.Is(err, sql.ErrNoRows) {
return Stack{}, ErrNotFound
}
if err != nil {
return Stack{}, fmt.Errorf("query stack by compose project: %w", err)
}
return st, nil
}
// GetAllStacks returns every stack ordered by name.
func (s *Store) GetAllStacks() ([]Stack, error) {
rows, err := s.db.Query(`SELECT ` + stackCols + ` FROM stacks ORDER BY name`)
@@ -69,20 +99,34 @@ func (s *Store) GetAllStacks() ([]Stack, error) {
}
// UpdateStack updates the mutable metadata fields (name, description).
// Atomic: stack row UPDATE and workload row sync share a transaction so the
// workload row's name never lags after a rename.
func (s *Store) UpdateStack(st Stack) error {
st.UpdatedAt = Now()
result, err := s.db.Exec(
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer tx.Rollback()
result, err := tx.Exec(
`UPDATE stacks SET name=?, description=?, updated_at=? WHERE id=?`,
st.Name, st.Description, st.UpdatedAt, st.ID,
)
if err != nil {
return fmt.Errorf("update stack: %w", err)
}
n, _ := result.RowsAffected()
n, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if n == 0 {
return fmt.Errorf("stack %s: %w", st.ID, ErrNotFound)
}
return s.SyncStackWorkload(st)
if err := SyncStackWorkloadTx(tx, st); err != nil {
return err
}
return tx.Commit()
}
// UpdateStackStatus updates the deployment status + error fields.
@@ -120,25 +164,43 @@ func (s *Store) SetStackCurrentRevision(id, revisionID string) error {
}
// DeleteStack removes a stack by ID. Cascading deletes handle revisions + deploys.
// Workload row + container index entries are removed too.
// Stack + workload + container index rows are dropped atomically.
func (s *Store) DeleteStack(id string) error {
result, err := s.db.Exec(`DELETE FROM stacks WHERE id = ?`, id)
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer tx.Rollback()
var workloadID string
if err := tx.QueryRow(
`SELECT id FROM workloads WHERE kind = ? AND ref_id = ?`,
string(WorkloadKindStack), id,
).Scan(&workloadID); err != nil && !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("lookup stack workload: %w", err)
}
result, err := tx.Exec(`DELETE FROM stacks WHERE id = ?`, id)
if err != nil {
return fmt.Errorf("delete stack: %w", err)
}
n, _ := result.RowsAffected()
n, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if n == 0 {
return fmt.Errorf("stack %s: %w", id, ErrNotFound)
}
if w, err := s.GetWorkloadByRef(WorkloadKindStack, id); err == nil {
if err := s.DeleteContainersByWorkload(w.ID); err != nil {
if workloadID != "" {
if _, err := tx.Exec(`DELETE FROM containers WHERE workload_id = ?`, workloadID); err != nil {
return fmt.Errorf("delete stack containers: %w", err)
}
if err := s.DeleteWorkload(w.ID); err != nil {
if _, err := tx.Exec(`DELETE FROM workloads WHERE id = ?`, workloadID); err != nil {
return fmt.Errorf("delete stack workload: %w", err)
}
}
return nil
return tx.Commit()
}
func scanStackRow(row *sql.Row) (Stack, error) {
+86 -79
View File
@@ -19,7 +19,8 @@ const staticSiteCols = `id, name, provider, gitea_url, repo_owner, repo_name, br
created_at, updated_at`
// CreateStaticSite inserts a new static site and returns it. A webhook secret
// is generated automatically if one is not already set on the input.
// is generated automatically if one is not already set on the input. Site row
// + matching workload row are written in a single transaction.
func (s *Store) CreateStaticSite(site StaticSite) (StaticSite, error) {
site.ID = uuid.New().String()
site.CreatedAt = Now()
@@ -30,7 +31,13 @@ func (s *Store) CreateStaticSite(site StaticSite) (StaticSite, error) {
return StaticSite{}, fmt.Errorf("webhook_secret must be at least %d characters", minWebhookSecretLength)
}
_, err := s.db.Exec(
tx, err := s.db.Begin()
if err != nil {
return StaticSite{}, fmt.Errorf("begin: %w", err)
}
defer tx.Rollback()
if _, err := tx.Exec(
`INSERT INTO static_sites (`+staticSiteCols+`)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
site.ID, site.Name, site.Provider, site.GiteaURL, site.RepoOwner, site.RepoName,
@@ -41,12 +48,14 @@ func (s *Store) CreateStaticSite(site StaticSite) (StaticSite, error) {
site.WebhookSecret, site.WebhookSigningSecret, BoolToInt(site.WebhookRequireSignature),
site.NotificationURL, site.NotificationSecret,
site.CreatedAt, site.UpdatedAt,
)
if err != nil {
); err != nil {
return StaticSite{}, fmt.Errorf("insert static site: %w", err)
}
if err := s.SyncStaticSiteWorkload(site); err != nil {
return StaticSite{}, fmt.Errorf("sync static site workload: %w", err)
if err := SyncStaticSiteWorkloadTx(tx, site); err != nil {
return StaticSite{}, err
}
if err := tx.Commit(); err != nil {
return StaticSite{}, fmt.Errorf("commit: %w", err)
}
return site, nil
}
@@ -110,12 +119,52 @@ func (s *Store) GetStaticSitesByRepo(giteaURL, owner, name string) ([]StaticSite
return sites, rows.Err()
}
// updateStaticSiteAndSyncWorkloadTx wraps a parameterized UPDATE on
// static_sites with the workload sync, all inside a single transaction.
// updateSQL must end with `WHERE id=?`; args end with the site ID.
func (s *Store) updateStaticSiteAndSyncWorkloadTx(id string, updateSQL string, args ...any) error {
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer tx.Rollback()
result, err := tx.Exec(updateSQL, args...)
if err != nil {
return fmt.Errorf("update static site: %w", err)
}
n, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if n == 0 {
return fmt.Errorf("static site %s: %w", id, ErrNotFound)
}
row := tx.QueryRow(`SELECT `+staticSiteCols+` FROM static_sites WHERE id = ?`, id)
current, err := scanStaticSiteRowFromQuery(row)
if err != nil {
return fmt.Errorf("reread static site for workload sync: %w", err)
}
if err := SyncStaticSiteWorkloadTx(tx, current); err != nil {
return err
}
return tx.Commit()
}
// scanStaticSiteRowFromQuery is a thin wrapper around scanStaticSiteRow that
// accepts a *sql.Row from either s.db or a transaction. Kept private so the
// public surface stays narrow.
func scanStaticSiteRowFromQuery(row *sql.Row) (StaticSite, error) {
return scanStaticSiteRow(row)
}
// UpdateStaticSite updates an existing static site's configuration fields.
// notification_secret is intentionally not updated here — use the dedicated
// SetStaticSiteNotificationSecret rotation helper.
func (s *Store) UpdateStaticSite(site StaticSite) error {
site.UpdatedAt = Now()
result, err := s.db.Exec(
return s.updateStaticSiteAndSyncWorkloadTx(site.ID,
`UPDATE static_sites SET name=?, provider=?, gitea_url=?, repo_owner=?, repo_name=?, branch=?,
folder_path=?, access_token=?, domain=?, mode=?, render_markdown=?,
sync_trigger=?, tag_pattern=?, storage_enabled=?, storage_limit_mb=?,
@@ -127,18 +176,6 @@ func (s *Store) UpdateStaticSite(site StaticSite) error {
BoolToInt(site.StorageEnabled), site.StorageLimitMB,
site.NotificationURL, site.UpdatedAt, site.ID,
)
if err != nil {
return fmt.Errorf("update static site: %w", err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("static site %s: %w", site.ID, ErrNotFound)
}
current, err := s.GetStaticSiteByID(site.ID)
if err != nil {
return fmt.Errorf("reread static site for workload sync: %w", err)
}
return s.SyncStaticSiteWorkload(current)
}
// UpdateStaticSiteStatus updates the deployment status fields.
@@ -220,26 +257,44 @@ func (s *Store) ListStaticSiteProxyRoutes(domain string) ([]ProxyRoute, error) {
return routes, rows.Err()
}
// DeleteStaticSite removes a static site by ID. Cascading deletes handle secrets.
// Workload row + container index entries are removed too.
// DeleteStaticSite removes a static site by ID. Cascading deletes handle
// secrets. Site + workload + container index rows are dropped atomically.
func (s *Store) DeleteStaticSite(id string) error {
result, err := s.db.Exec(`DELETE FROM static_sites WHERE id = ?`, id)
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer tx.Rollback()
var workloadID string
if err := tx.QueryRow(
`SELECT id FROM workloads WHERE kind = ? AND ref_id = ?`,
string(WorkloadKindSite), id,
).Scan(&workloadID); err != nil && !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("lookup site workload: %w", err)
}
result, err := tx.Exec(`DELETE FROM static_sites WHERE id = ?`, id)
if err != nil {
return fmt.Errorf("delete static site: %w", err)
}
n, _ := result.RowsAffected()
n, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if n == 0 {
return fmt.Errorf("static site %s: %w", id, ErrNotFound)
}
if w, err := s.GetWorkloadByRef(WorkloadKindSite, id); err == nil {
if err := s.DeleteContainersByWorkload(w.ID); err != nil {
if workloadID != "" {
if _, err := tx.Exec(`DELETE FROM containers WHERE workload_id = ?`, workloadID); err != nil {
return fmt.Errorf("delete static site containers: %w", err)
}
if err := s.DeleteWorkload(w.ID); err != nil {
if _, err := tx.Exec(`DELETE FROM workloads WHERE id = ?`, workloadID); err != nil {
return fmt.Errorf("delete static site workload: %w", err)
}
}
return nil
return tx.Commit()
}
// scanStaticSiteRow scans a static site from a *sql.Row.
@@ -291,22 +346,10 @@ func scanStaticSiteRows(rows *sql.Rows) (StaticSite, error) {
// SetStaticSiteWebhookSigningSecret assigns the inbound HMAC signing secret.
// Pass an empty string to clear it (also implicitly disables enforcement).
func (s *Store) SetStaticSiteWebhookSigningSecret(id, secret string) error {
result, err := s.db.Exec(
return s.updateStaticSiteAndSyncWorkloadTx(id,
`UPDATE static_sites SET webhook_signing_secret=?, updated_at=? WHERE id=?`,
secret, Now(), id,
)
if err != nil {
return fmt.Errorf("set static site webhook signing secret: %w", err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("static site %s: %w", id, ErrNotFound)
}
current, err := s.GetStaticSiteByID(id)
if err != nil {
return fmt.Errorf("reread static site for workload sync: %w", err)
}
return s.SyncStaticSiteWorkload(current)
}
// SetStaticSiteWebhookRequireSignature toggles whether unsigned (or
@@ -316,44 +359,20 @@ func (s *Store) SetStaticSiteWebhookRequireSignature(id string, require bool) er
if require {
v = 1
}
result, err := s.db.Exec(
return s.updateStaticSiteAndSyncWorkloadTx(id,
`UPDATE static_sites SET webhook_require_signature=?, updated_at=? WHERE id=?`,
v, Now(), id,
)
if err != nil {
return fmt.Errorf("set static site webhook require_signature: %w", err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("static site %s: %w", id, ErrNotFound)
}
current, err := s.GetStaticSiteByID(id)
if err != nil {
return fmt.Errorf("reread static site for workload sync: %w", err)
}
return s.SyncStaticSiteWorkload(current)
}
// SetStaticSiteNotificationSecret rotates the static site's outgoing-webhook
// signing secret. Empty string disables HMAC signing for this site
// (notifications still send unsigned, falling through to global resolution).
func (s *Store) SetStaticSiteNotificationSecret(id, secret string) error {
result, err := s.db.Exec(
return s.updateStaticSiteAndSyncWorkloadTx(id,
`UPDATE static_sites SET notification_secret=?, updated_at=? WHERE id=?`,
secret, Now(), id,
)
if err != nil {
return fmt.Errorf("set static site notification secret: %w", err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("static site %s: %w", id, ErrNotFound)
}
current, err := s.GetStaticSiteByID(id)
if err != nil {
return fmt.Errorf("reread static site for workload sync: %w", err)
}
return s.SyncStaticSiteWorkload(current)
}
// EnsureStaticSiteNotificationSecret returns the static site's outgoing-webhook
@@ -411,22 +430,10 @@ func (s *Store) GetStaticSiteByWebhookSecret(secret string) (StaticSite, error)
// SetStaticSiteWebhookSecret assigns a webhook secret to a static site.
// Pass an empty string to disable webhook access for the site.
func (s *Store) SetStaticSiteWebhookSecret(id, secret string) error {
result, err := s.db.Exec(
return s.updateStaticSiteAndSyncWorkloadTx(id,
`UPDATE static_sites SET webhook_secret=?, updated_at=? WHERE id=?`,
secret, Now(), id,
)
if err != nil {
return fmt.Errorf("set static site webhook secret: %w", err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("static site %s: %w", id, ErrNotFound)
}
current, err := s.GetStaticSiteByID(id)
if err != nil {
return fmt.Errorf("reread static site for workload sync: %w", err)
}
return s.SyncStaticSiteWorkload(current)
}
// EnsureStaticSiteWebhookSecret returns the current webhook secret for a site,
+6
View File
@@ -177,6 +177,10 @@ func (s *Store) runMigrations() error {
)`,
`CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_target ON webhook_deliveries(target_type, target_id, received_at)`,
`CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_received_at ON webhook_deliveries(received_at)`,
// Add stage_id to containers (2026-05-09). Backfill via the deployer
// re-write path; the LEFT JOIN in ListContainersByStageID falls back
// to (project_id, role=stage_name) so legacy rows still resolve.
`ALTER TABLE containers ADD COLUMN stage_id TEXT NOT NULL DEFAULT ''`,
}
// Workload refactor tables (2026-05-09). Workload is the unifying primitive
@@ -205,6 +209,7 @@ func (s *Store) runMigrations() error {
workload_id TEXT NOT NULL,
workload_kind TEXT NOT NULL,
role TEXT NOT NULL DEFAULT '',
stage_id TEXT NOT NULL DEFAULT '',
container_id TEXT NOT NULL DEFAULT '',
image_ref TEXT NOT NULL DEFAULT '',
image_tag TEXT NOT NULL DEFAULT '',
@@ -360,6 +365,7 @@ func (s *Store) runMigrations() error {
`CREATE INDEX IF NOT EXISTS idx_containers_state ON containers(state)`,
`CREATE INDEX IF NOT EXISTS idx_containers_container_id ON containers(container_id) WHERE container_id != ''`,
`CREATE INDEX IF NOT EXISTS idx_containers_kind ON containers(workload_kind)`,
`CREATE INDEX IF NOT EXISTS idx_containers_stage_id ON containers(stage_id) WHERE stage_id != ''`,
}
for _, idx := range indexes {
if _, err := s.db.Exec(idx); err != nil {
+102 -71
View File
@@ -1,84 +1,115 @@
package store
import "fmt"
import (
"database/sql"
"errors"
"fmt"
// SyncProjectWorkload upserts the Workload row paired with a project so that
// its name, notification config, and webhook secrets stay in sync. Called from
// CreateProject / UpdateProject / SetProject*Secret paths. Idempotent — safe
// to call when a workload row already exists for the (project, RefID) pair.
"github.com/google/uuid"
)
// dbExec is the subset of *sql.DB and *sql.Tx used by the sync helpers so
// CRUD callers can pass in either a transaction or the raw DB handle. Keeps
// the sync logic atomic with the parent row when wrapped in a Begin/Commit.
type dbExec interface {
Exec(query string, args ...any) (sql.Result, error)
QueryRow(query string, args ...any) *sql.Row
}
// syncWorkloadTx is the shared upsert path used by every kind-specific
// sync helper. Caller passes the kind, ref, and the projection of fields
// that map onto the workload row. Idempotent — uses the (kind, ref_id) UNIQUE
// constraint to decide INSERT vs UPDATE.
func syncWorkloadTx(ex dbExec, kind WorkloadKind, refID, name, notifURL, notifSecret, hookSecret, signSecret string, requireSig bool) error {
now := Now()
requireInt := 0
if requireSig {
requireInt = 1
}
var existingID string
err := ex.QueryRow(
`SELECT id FROM workloads WHERE kind = ? AND ref_id = ?`,
string(kind), refID,
).Scan(&existingID)
if errors.Is(err, sql.ErrNoRows) {
_, err := ex.Exec(
`INSERT INTO workloads (id, kind, ref_id, name, app_id,
notification_url, notification_secret,
webhook_secret, webhook_signing_secret, webhook_require_signature,
created_at, updated_at)
VALUES (?, ?, ?, ?, '', ?, ?, ?, ?, ?, ?, ?)`,
uuid.New().String(), string(kind), refID, name,
notifURL, notifSecret, hookSecret, signSecret, requireInt,
now, now,
)
if err != nil {
return fmt.Errorf("insert %s workload: %w", kind, err)
}
return nil
}
if err != nil {
return fmt.Errorf("lookup %s workload: %w", kind, err)
}
_, err = ex.Exec(
`UPDATE workloads SET name=?,
notification_url=?, notification_secret=?,
webhook_secret=?, webhook_signing_secret=?, webhook_require_signature=?,
updated_at=?
WHERE id=?`,
name, notifURL, notifSecret, hookSecret, signSecret, requireInt, now, existingID,
)
if err != nil {
return fmt.Errorf("update %s workload: %w", kind, err)
}
return nil
}
// SyncProjectWorkloadTx upserts the workload row paired with a project inside
// the caller's transaction. Used by CreateProject / UpdateProject /
// SetProject*Secret so the parent UPDATE and the workload sync share atomicity.
func SyncProjectWorkloadTx(tx *sql.Tx, p Project) error {
return syncWorkloadTx(tx, WorkloadKindProject, p.ID, p.Name,
p.NotificationURL, p.NotificationSecret,
p.WebhookSecret, p.WebhookSigningSecret, p.WebhookRequireSignature)
}
// SyncStackWorkloadTx upserts the workload row paired with a stack inside the
// caller's transaction. Stacks don't carry notification or webhook config yet.
func SyncStackWorkloadTx(tx *sql.Tx, st Stack) error {
return syncWorkloadTx(tx, WorkloadKindStack, st.ID, st.Name, "", "", "", "", false)
}
// SyncStaticSiteWorkloadTx upserts the workload row paired with a static site
// inside the caller's transaction.
func SyncStaticSiteWorkloadTx(tx *sql.Tx, site StaticSite) error {
return syncWorkloadTx(tx, WorkloadKindSite, site.ID, site.Name,
site.NotificationURL, site.NotificationSecret,
site.WebhookSecret, site.WebhookSigningSecret, site.WebhookRequireSignature)
}
// SyncProjectWorkload is the non-transactional convenience used by
// BackfillWorkloads (a boot-time, single-row, idempotent recovery pass).
// CRUD paths must use SyncProjectWorkloadTx instead, with their parent
// UPDATE inside the same transaction.
func (s *Store) SyncProjectWorkload(p Project) error {
existing, err := s.GetWorkloadByRef(WorkloadKindProject, p.ID)
if err == nil {
existing.Name = p.Name
existing.NotificationURL = p.NotificationURL
existing.NotificationSecret = p.NotificationSecret
existing.WebhookSecret = p.WebhookSecret
existing.WebhookSigningSecret = p.WebhookSigningSecret
existing.WebhookRequireSignature = p.WebhookRequireSignature
return s.UpdateWorkload(existing)
}
_, err = s.CreateWorkload(Workload{
Kind: string(WorkloadKindProject),
RefID: p.ID,
Name: p.Name,
NotificationURL: p.NotificationURL,
NotificationSecret: p.NotificationSecret,
WebhookSecret: p.WebhookSecret,
WebhookSigningSecret: p.WebhookSigningSecret,
WebhookRequireSignature: p.WebhookRequireSignature,
})
if err != nil {
return fmt.Errorf("create project workload: %w", err)
}
return nil
return syncWorkloadTx(s.db, WorkloadKindProject, p.ID, p.Name,
p.NotificationURL, p.NotificationSecret,
p.WebhookSecret, p.WebhookSigningSecret, p.WebhookRequireSignature)
}
// SyncStackWorkload upserts the Workload row paired with a stack. Stacks
// don't (yet) carry their own notification or webhook config — those fields
// stay empty on the workload row until the stack model gains them.
// SyncStackWorkload is the non-transactional convenience used by BackfillWorkloads.
func (s *Store) SyncStackWorkload(st Stack) error {
existing, err := s.GetWorkloadByRef(WorkloadKindStack, st.ID)
if err == nil {
existing.Name = st.Name
return s.UpdateWorkload(existing)
}
_, err = s.CreateWorkload(Workload{
Kind: string(WorkloadKindStack),
RefID: st.ID,
Name: st.Name,
})
if err != nil {
return fmt.Errorf("create stack workload: %w", err)
}
return nil
return syncWorkloadTx(s.db, WorkloadKindStack, st.ID, st.Name, "", "", "", "", false)
}
// SyncStaticSiteWorkload upserts the Workload row paired with a static site.
// SyncStaticSiteWorkload is the non-transactional convenience used by BackfillWorkloads.
func (s *Store) SyncStaticSiteWorkload(site StaticSite) error {
existing, err := s.GetWorkloadByRef(WorkloadKindSite, site.ID)
if err == nil {
existing.Name = site.Name
existing.NotificationURL = site.NotificationURL
existing.NotificationSecret = site.NotificationSecret
existing.WebhookSecret = site.WebhookSecret
existing.WebhookSigningSecret = site.WebhookSigningSecret
existing.WebhookRequireSignature = site.WebhookRequireSignature
return s.UpdateWorkload(existing)
}
_, err = s.CreateWorkload(Workload{
Kind: string(WorkloadKindSite),
RefID: site.ID,
Name: site.Name,
NotificationURL: site.NotificationURL,
NotificationSecret: site.NotificationSecret,
WebhookSecret: site.WebhookSecret,
WebhookSigningSecret: site.WebhookSigningSecret,
WebhookRequireSignature: site.WebhookRequireSignature,
})
if err != nil {
return fmt.Errorf("create static site workload: %w", err)
}
return nil
return syncWorkloadTx(s.db, WorkloadKindSite, site.ID, site.Name,
site.NotificationURL, site.NotificationSecret,
site.WebhookSecret, site.WebhookSigningSecret, site.WebhookRequireSignature)
}
// BackfillWorkloads scans every project / stack / static_site row and ensures
+8 -5
View File
@@ -30,9 +30,6 @@ func (s *Store) CreateWorkload(w Workload) (Workload, error) {
if w.ID == "" {
w.ID = uuid.New().String()
}
if w.AppID == "" {
w.AppID = ""
}
w.CreatedAt = Now()
w.UpdatedAt = w.CreatedAt
@@ -148,7 +145,10 @@ func (s *Store) UpdateWorkload(w Workload) error {
if err != nil {
return fmt.Errorf("update workload: %w", err)
}
n, _ := result.RowsAffected()
n, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if n == 0 {
return fmt.Errorf("workload %s: %w", w.ID, ErrNotFound)
}
@@ -163,7 +163,10 @@ func (s *Store) DeleteWorkload(id string) error {
if err != nil {
return fmt.Errorf("delete workload: %w", err)
}
n, _ := result.RowsAffected()
n, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if n == 0 {
return fmt.Errorf("workload %s: %w", id, ErrNotFound)
}
+17 -10
View File
@@ -319,18 +319,22 @@ func (h *Handler) handleWebhook(w http.ResponseWriter, r *http.Request) {
return
}
// Resolve the secret via the workload row first (canonical path —
// workloads.webhook_secret is kept in sync by the project CRUD path).
// Fall back to the project's own column for any pre-refactor row that
// might not have its workload yet (defensive belt-and-suspenders).
// Resolve the secret via the workload row only. The project's own
// webhook_secret column is the source of truth, but lookups go through
// workloads.webhook_secret which is kept in lock-step by the
// transactional sync in the project CRUD path. Reading from workloads
// alone closes the rotation-durability gap: any rotation that didn't
// commit also didn't update the workload row, so an old secret
// surfaces here as 404 rather than being silently accepted.
var (
project store.Project
err error
)
if wl, wErr := h.store.GetWorkloadByWebhookSecret(secret); wErr == nil && wl.Kind == string(store.WorkloadKindProject) {
wl, wErr := h.store.GetWorkloadByWebhookSecret(secret)
if wErr == nil && wl.Kind == string(store.WorkloadKindProject) {
project, err = h.store.GetProjectByID(wl.RefID)
} else {
project, err = h.store.GetProjectByWebhookSecret(secret)
err = store.ErrNotFound
}
if err != nil {
if errors.Is(err, store.ErrNotFound) {
@@ -514,16 +518,19 @@ func (h *Handler) handleSiteWebhook(w http.ResponseWriter, r *http.Request) {
return
}
// Workload-first lookup, mirroring the project handler. Falls back to the
// site's own webhook_secret column for pre-refactor rows.
// Workload-only lookup, mirroring the project handler. Reading from
// workloads.webhook_secret keeps rotation-durability honest — a
// rotation that didn't commit doesn't update the workload row, so the
// stale secret returns 404 instead of being silently accepted.
var (
site store.StaticSite
err error
)
if wl, wErr := h.store.GetWorkloadByWebhookSecret(secret); wErr == nil && wl.Kind == string(store.WorkloadKindSite) {
wl, wErr := h.store.GetWorkloadByWebhookSecret(secret)
if wErr == nil && wl.Kind == string(store.WorkloadKindSite) {
site, err = h.store.GetStaticSiteByID(wl.RefID)
} else {
site, err = h.store.GetStaticSiteByWebhookSecret(secret)
err = store.ErrNotFound
}
if err != nil {
if errors.Is(err, store.ErrNotFound) {