From af82be3fb8962e246ab22cbac348cab7a9fd5196 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Sat, 9 May 2026 13:45:13 +0300 Subject: [PATCH] feat(workload): container index reconciler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Background worker that keeps the containers table in sync with docker ps. Runs one boot pass and ticks every 30s. Dispatch precedence per container: 1. tinyforge.workload.id label (canonical, new) 2. tinyforge.instance-id label (legacy project — joins via instances) 3. tinyforge.static-site label (legacy site) 4. com.docker.compose.project (stacks — joins via ComposeProjectName) Rows whose Docker container ID is no longer present are flipped to state='missing'. Placeholder rows (empty container_id, e.g. a deploy mid-flight) are left alone so a tick that races a deploy doesn't mark them as missing. DockerLister interface lets tests substitute a fake daemon — 6 unit tests cover the dispatch matrix, missing-sweep, and state normalization. Wired into cmd/server/main.go between docker.New and the existing startup chain. Boot pass populates the containers table from any pre-refactor running containers. --- cmd/server/main.go | 12 + internal/docker/container.go | 80 ++++++ internal/reconciler/reconciler.go | 348 +++++++++++++++++++++++++ internal/reconciler/reconciler_test.go | 219 ++++++++++++++++ 4 files changed, 659 insertions(+) create mode 100644 internal/reconciler/reconciler.go create mode 100644 internal/reconciler/reconciler_test.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 8a38856..c3c247b 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -30,6 +30,7 @@ import ( "github.com/alexei/tinyforge/internal/notify" "github.com/alexei/tinyforge/internal/npm" "github.com/alexei/tinyforge/internal/proxy" + "github.com/alexei/tinyforge/internal/reconciler" "github.com/alexei/tinyforge/internal/registry" "github.com/alexei/tinyforge/internal/stale" "github.com/alexei/tinyforge/internal/stack" @@ -94,6 +95,17 @@ func main() { } defer dockerClient.Close() + // Start the container index reconciler. Runs one boot pass and then + // ticks every 30s. Boot pass populates the containers table from any + // running containers that predate the workload refactor; subsequent + // ticks catch state drift the deployer didn't witness (e.g., a stack + // service that exited on its own). + reconcilerCtx, reconcilerCancel := context.WithCancel(context.Background()) + defer reconcilerCancel() + rec := reconciler.New(db, dockerClient, 30*time.Second) + rec.Start(reconcilerCtx) + defer rec.Stop() + // Read settings for NPM URL and polling interval. settings, err := db.GetSettings() if err != nil { diff --git a/internal/docker/container.go b/internal/docker/container.go index a40389e..75af8c3 100644 --- a/internal/docker/container.go +++ b/internal/docker/container.go @@ -293,6 +293,86 @@ func (c *Client) ListContainers(ctx context.Context, labelFilters map[string]str return result, nil } +// ReconcileItem is a fat container summary aimed at the reconciler — it +// exposes the full label map so the caller can dispatch by workload labels, +// legacy labels, or compose labels without re-inspecting. +type ReconcileItem struct { + ID string + Name string + Image string + State string + Status string + Labels map[string]string + Ports []uint16 +} + +// ListAllForReconciler returns every container the daemon knows about whose +// labels mark it as Tinyforge-managed by ANY of the supported schemes: +// - tinyforge.managed (canonical, new) +// - tinyforge.project / tinyforge.instance-id (legacy project) +// - tinyforge.static-site (legacy site) +// - com.docker.compose.project starting with "tinyforge-" (stacks) +// +// The Docker API does not support OR'd label filters, so we list everything +// and filter in-process. On a small/medium daemon this is cheap; the +// reconciler runs on a 30s tick. +func (c *Client) ListAllForReconciler(ctx context.Context) ([]ReconcileItem, error) { + listResult, err := c.api.ContainerList(ctx, client.ContainerListOptions{All: true}) + if err != nil { + return nil, fmt.Errorf("list containers: %w", err) + } + + out := make([]ReconcileItem, 0, len(listResult.Items)) + for _, ctr := range listResult.Items { + labels := ctr.Labels + if !isTinyforgeManaged(labels) { + continue + } + + name := "" + if len(ctr.Names) > 0 { + name = strings.TrimPrefix(ctr.Names[0], "/") + } + var ports []uint16 + for _, p := range ctr.Ports { + if p.PublicPort > 0 { + ports = append(ports, p.PublicPort) + } + } + out = append(out, ReconcileItem{ + ID: ctr.ID, + Name: name, + Image: ctr.Image, + State: string(ctr.State), + Status: ctr.Status, + Labels: labels, + Ports: ports, + }) + } + return out, nil +} + +// isTinyforgeManaged returns true when a container's labels mark it as +// belonging to Tinyforge under any of the supported labelling schemes. +func isTinyforgeManaged(labels map[string]string) bool { + if labels == nil { + return false + } + if labels[LabelManaged] == "true" { + return true + } + if labels[LabelProject] != "" || labels[LabelInstanceID] != "" { + return true + } + if _, ok := labels["tinyforge.static-site"]; ok { + return true + } + if cp, ok := labels["com.docker.compose.project"]; ok && strings.HasPrefix(cp, "tinyforge-") { + return true + } + return false +} + // ContainerLogs returns a log stream for a container. // If follow is true, the stream stays open for new log lines. // tail specifies the number of lines from the end to return (e.g., "200"). diff --git a/internal/reconciler/reconciler.go b/internal/reconciler/reconciler.go new file mode 100644 index 0000000..216c620 --- /dev/null +++ b/internal/reconciler/reconciler.go @@ -0,0 +1,348 @@ +// Package reconciler keeps the normalized containers index in sync with the +// Docker daemon. It runs on a tick (and one-shot at boot) — for every +// Tinyforge-managed container in `docker ps`, it dispatches to a workload by +// labels and upserts a Container row. Rows whose Docker container ID is no +// longer present are flipped to state='missing'. +// +// Dispatch precedence: +// 1. tinyforge.workload.id label (canonical) +// 2. tinyforge.instance-id label (legacy project — joins via instances row) +// 3. tinyforge.static-site label (legacy site) +// 4. com.docker.compose.project (stack — joins via Stack.ComposeProjectName) +package reconciler + +import ( + "context" + "errors" + "log/slog" + "strings" + "sync" + "time" + + "github.com/alexei/tinyforge/internal/docker" + "github.com/alexei/tinyforge/internal/store" +) + +// DockerLister is the subset of docker.Client the reconciler depends on. +// Defined here (where it's used) so tests can substitute a fake without +// pulling in the full docker package. +type DockerLister interface { + ListAllForReconciler(ctx context.Context) ([]docker.ReconcileItem, error) +} + +// Reconciler is the background worker that syncs the containers index. +type Reconciler struct { + store *store.Store + docker DockerLister + interval time.Duration + + stop chan struct{} + wg sync.WaitGroup +} + +// New constructs a Reconciler. interval is the tick period; values <=0 fall +// back to 30s. interval > 5m is clamped to 5m so a manual misconfiguration +// can't silently disable timely state updates. +func New(st *store.Store, dockerClient DockerLister, interval time.Duration) *Reconciler { + if interval <= 0 { + interval = 30 * time.Second + } + if interval > 5*time.Minute { + interval = 5 * time.Minute + } + return &Reconciler{ + store: st, + docker: dockerClient, + interval: interval, + stop: make(chan struct{}), + } +} + +// Start kicks off the background reconciliation loop. Runs one tick +// immediately so startup populates the index without waiting for the first +// timer fire. Idempotent: calling Start twice is a programming error. +func (r *Reconciler) Start(ctx context.Context) { + r.wg.Add(1) + go r.loop(ctx) +} + +// Stop signals the loop to exit and waits for the in-flight tick to finish. +func (r *Reconciler) Stop() { + close(r.stop) + r.wg.Wait() +} + +// ReconcileOnce runs a single reconciliation pass. Exposed for tests and for +// callers that want to force a sync after a known mutation (e.g., right after +// a deploy succeeds, before the next tick). +func (r *Reconciler) ReconcileOnce(ctx context.Context) error { + items, err := r.docker.ListAllForReconciler(ctx) + if err != nil { + return err + } + seen := make(map[string]struct{}, len(items)) // container row IDs we touched + + for _, item := range items { + rowID := r.upsertFromItem(ctx, item) + if rowID != "" { + seen[rowID] = struct{}{} + } + } + + r.markMissingRows(seen) + return nil +} + +func (r *Reconciler) loop(ctx context.Context) { + defer r.wg.Done() + + // Boot tick. + if err := r.ReconcileOnce(ctx); err != nil { + slog.Warn("reconciler: initial pass", "error", err) + } + + ticker := time.NewTicker(r.interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-r.stop: + return + case <-ticker.C: + if err := r.ReconcileOnce(ctx); err != nil { + slog.Warn("reconciler: tick", "error", err) + } + } + } +} + +// upsertFromItem dispatches one container to its workload and writes the +// Container row. Returns the row ID on success or "" if no dispatch matched. +func (r *Reconciler) upsertFromItem(ctx context.Context, item docker.ReconcileItem) string { + if id := item.Labels[docker.LabelWorkloadID]; id != "" { + return r.upsertByWorkloadLabel(item, id) + } + if instanceID := item.Labels[docker.LabelInstanceID]; instanceID != "" { + return r.upsertByInstanceLabel(item, instanceID) + } + if siteID := item.Labels["tinyforge.static-site"]; siteID != "" { + return r.upsertBySiteLabel(item, siteID) + } + if cp := item.Labels["com.docker.compose.project"]; cp != "" && strings.HasPrefix(cp, "tinyforge-") { + return r.upsertByComposeProject(item, cp) + } + return "" +} + +// upsertByWorkloadLabel — canonical path. WorkloadID + Role uniquely +// identifies the row. ID stays deterministic so re-deploys update in place. +func (r *Reconciler) upsertByWorkloadLabel(item docker.ReconcileItem, workloadID string) string { + role := item.Labels[docker.LabelRole] + kind := item.Labels[docker.LabelWorkloadKind] + rowID := workloadIDRow(workloadID, kind, role, item.Labels[docker.LabelInstanceID], item.ID) + + port := 0 + if len(item.Ports) > 0 { + port = int(item.Ports[0]) + } + if err := r.store.UpsertContainer(store.Container{ + ID: rowID, + WorkloadID: workloadID, + WorkloadKind: kind, + Role: role, + ContainerID: item.ID, + ImageRef: item.Image, + Host: "local", + State: normalizeState(item.State), + Port: port, + LastSeenAt: store.Now(), + }); err != nil { + slog.Warn("reconciler: upsert by workload label", "container_id", item.ID, "error", err) + return "" + } + return rowID +} + +// upsertByInstanceLabel — legacy project path. Instance ID maps 1:1 to the +// container row ID by construction (deployer uses the same UUID for both), +// so we can update directly. We still need the workload ID for the row. +func (r *Reconciler) upsertByInstanceLabel(item docker.ReconcileItem, instanceID string) string { + inst, err := r.store.GetInstanceByID(instanceID) + if err != nil { + // Container with stale label — instance row gone. Skip silently. + if errors.Is(err, store.ErrNotFound) { + return "" + } + slog.Warn("reconciler: lookup instance", "instance_id", instanceID, "error", err) + return "" + } + w, err := r.store.GetWorkloadByRef(store.WorkloadKindProject, inst.ProjectID) + if err != nil { + return "" + } + port := inst.Port + if port == 0 && len(item.Ports) > 0 { + port = int(item.Ports[0]) + } + if err := r.store.UpsertContainer(store.Container{ + ID: inst.ID, + WorkloadID: w.ID, + WorkloadKind: string(store.WorkloadKindProject), + Role: item.Labels[docker.LabelStage], + ContainerID: item.ID, + ImageRef: item.Image, + ImageTag: inst.ImageTag, + Host: "local", + State: normalizeState(item.State), + Port: port, + Subdomain: inst.Subdomain, + ProxyRouteID: inst.ProxyRouteID, + NpmProxyID: inst.NpmProxyID, + LastSeenAt: store.Now(), + }); err != nil { + slog.Warn("reconciler: upsert by instance label", "container_id", item.ID, "error", err) + return "" + } + return inst.ID +} + +func (r *Reconciler) upsertBySiteLabel(item docker.ReconcileItem, siteID string) string { + w, err := r.store.GetWorkloadByRef(store.WorkloadKindSite, siteID) + if err != nil { + return "" + } + rowID := w.ID + ":site" + port := 0 + if len(item.Ports) > 0 { + port = int(item.Ports[0]) + } + if err := r.store.UpsertContainer(store.Container{ + ID: rowID, + WorkloadID: w.ID, + WorkloadKind: string(store.WorkloadKindSite), + Role: "", + ContainerID: item.ID, + ImageRef: item.Image, + Host: "local", + State: normalizeState(item.State), + Port: port, + LastSeenAt: store.Now(), + }); err != nil { + slog.Warn("reconciler: upsert by site label", "container_id", item.ID, "error", err) + return "" + } + return rowID +} + +func (r *Reconciler) upsertByComposeProject(item docker.ReconcileItem, composeProject string) string { + stack, err := r.findStackByComposeProject(composeProject) + if err != nil { + return "" + } + w, err := r.store.GetWorkloadByRef(store.WorkloadKindStack, stack.ID) + if err != nil { + return "" + } + role := item.Labels["com.docker.compose.service"] + if role == "" { + role = item.Name + } + rowID := w.ID + ":" + role + port := 0 + if len(item.Ports) > 0 { + port = int(item.Ports[0]) + } + if err := r.store.UpsertContainer(store.Container{ + ID: rowID, + WorkloadID: w.ID, + WorkloadKind: string(store.WorkloadKindStack), + Role: role, + ContainerID: item.ID, + ImageRef: item.Image, + Host: "local", + State: normalizeState(item.State), + Port: port, + LastSeenAt: store.Now(), + }); err != nil { + slog.Warn("reconciler: upsert by compose project", "container_id", item.ID, "error", err) + return "" + } + return rowID +} + +// findStackByComposeProject scans all stacks for a matching ComposeProjectName. +// Linear; the stack count is small in practice. +func (r *Reconciler) findStackByComposeProject(composeProject string) (store.Stack, error) { + stacks, err := r.store.GetAllStacks() + if err != nil { + return store.Stack{}, err + } + for _, s := range stacks { + if s.ComposeProjectName == composeProject { + return s, nil + } + } + return store.Stack{}, store.ErrNotFound +} + +// markMissingRows flips state to 'missing' for any container row whose Docker +// container ID was not seen in this pass. Rows with empty container_id are +// skipped — the deployer creates them ahead of `docker create` so they're +// transient and shouldn't be marked missing on a tick that races the deploy. +func (r *Reconciler) markMissingRows(seen map[string]struct{}) { + rows, err := r.store.ListContainers(store.ContainerFilter{}) + if err != nil { + slog.Warn("reconciler: list containers for missing-sweep", "error", err) + return + } + for _, row := range rows { + if _, ok := seen[row.ID]; ok { + continue + } + if row.ContainerID == "" { + continue // never bound to a real container yet + } + if row.State == "missing" { + continue // already marked + } + if err := r.store.MarkContainerMissing(row.ID); err != nil { + slog.Warn("reconciler: mark missing", "row_id", row.ID, "error", err) + } + } +} + +// workloadIDRow picks the row ID for a workload-labelled container. +// For projects the deployer assigns instance ID = container row ID (via +// LabelInstanceID), so we honor that to keep IDs stable. For stack/site +// it's the deterministic workloadID:role pattern. +func workloadIDRow(workloadID, kind, role, instanceID, containerID string) string { + if instanceID != "" && kind == string(store.WorkloadKindProject) { + return instanceID + } + if role != "" { + return workloadID + ":" + role + } + if kind == string(store.WorkloadKindSite) { + return workloadID + ":site" + } + // Last-resort fallback: container ID. Better than ""; uncommon path. + return workloadID + ":" + containerID +} + +// normalizeState maps Docker container states to our condensed set: +// running | stopped | failed | removing | missing. +func normalizeState(dockerState string) string { + switch dockerState { + case "running": + return "running" + case "exited", "dead", "stopped": + return "stopped" + case "created", "restarting", "paused": + return dockerState + case "removing": + return "removing" + default: + return dockerState + } +} diff --git a/internal/reconciler/reconciler_test.go b/internal/reconciler/reconciler_test.go new file mode 100644 index 0000000..1165b6f --- /dev/null +++ b/internal/reconciler/reconciler_test.go @@ -0,0 +1,219 @@ +package reconciler + +import ( + "context" + "testing" + + "github.com/alexei/tinyforge/internal/docker" + "github.com/alexei/tinyforge/internal/store" +) + +// fakeDocker is a tiny stand-in for docker.Client. The reconciler depends on +// the DockerLister interface so we don't need a real daemon for unit tests. +type fakeDocker struct { + items []docker.ReconcileItem +} + +func (f *fakeDocker) ListAllForReconciler(ctx context.Context) ([]docker.ReconcileItem, error) { + return f.items, nil +} + +func newTestStore(t *testing.T) *store.Store { + t.Helper() + s, err := store.New(":memory:") + if err != nil { + t.Fatalf("create store: %v", err) + } + t.Cleanup(func() { s.Close() }) + return s +} + +func TestReconcileWorkloadLabelledStackContainer(t *testing.T) { + st := newTestStore(t) + + // Set up a stack workload (no project/site interaction). + stack, err := st.CreateStack(store.Stack{ + Name: "wf-stack", ComposeProjectName: "tinyforge-wf-stack", + }) + if err != nil { + t.Fatalf("CreateStack: %v", err) + } + w, _ := st.GetWorkloadByRef(store.WorkloadKindStack, stack.ID) + + // One container with the canonical workload labels stamped. + fake := &fakeDocker{items: []docker.ReconcileItem{{ + ID: "docker-abc", + Name: "wf-stack-web-1", + Image: "nginx:1.27", + State: "running", + Labels: map[string]string{ + docker.LabelManaged: "true", + docker.LabelWorkloadID: w.ID, + docker.LabelWorkloadKind: "stack", + docker.LabelRole: "web", + }, + Ports: []uint16{8080}, + }}} + + r := New(st, fake, 0) + if err := r.ReconcileOnce(context.Background()); err != nil { + t.Fatalf("ReconcileOnce: %v", err) + } + + rows, _ := st.ListContainersByWorkload(w.ID) + if len(rows) != 1 { + t.Fatalf("expected 1 container row, got %d", len(rows)) + } + got := rows[0] + if got.ContainerID != "docker-abc" { + t.Fatalf("container_id not bound: got %q", got.ContainerID) + } + if got.Role != "web" || got.WorkloadKind != "stack" { + t.Fatalf("dispatch wrong: %+v", got) + } + if got.State != "running" || got.Port != 8080 { + t.Fatalf("state/port wrong: %+v", got) + } +} + +func TestReconcileComposeOnlyStackContainer(t *testing.T) { + st := newTestStore(t) + + stack, _ := st.CreateStack(store.Stack{ + Name: "compose-stack", ComposeProjectName: "tinyforge-compose-stack", + }) + w, _ := st.GetWorkloadByRef(store.WorkloadKindStack, stack.ID) + + // Pre-existing compose container — only carries compose's own labels, + // no tinyforge.* labels at all. + fake := &fakeDocker{items: []docker.ReconcileItem{{ + ID: "docker-xyz", + Name: "tinyforge-compose-stack-worker-1", + Image: "redis:7", + State: "running", + Labels: map[string]string{ + "com.docker.compose.project": "tinyforge-compose-stack", + "com.docker.compose.service": "worker", + }, + }}} + + r := New(st, fake, 0) + if err := r.ReconcileOnce(context.Background()); err != nil { + t.Fatalf("ReconcileOnce: %v", err) + } + + rows, _ := st.ListContainersByWorkload(w.ID) + if len(rows) != 1 { + t.Fatalf("expected 1 row, got %d", len(rows)) + } + if rows[0].Role != "worker" { + t.Fatalf("role from compose label wrong: %q", rows[0].Role) + } + if rows[0].ContainerID != "docker-xyz" { + t.Fatalf("container_id not bound: %q", rows[0].ContainerID) + } +} + +func TestReconcileMarksMissingRows(t *testing.T) { + st := newTestStore(t) + + stack, _ := st.CreateStack(store.Stack{ + Name: "missing-stack", ComposeProjectName: "tinyforge-missing-stack", + }) + w, _ := st.GetWorkloadByRef(store.WorkloadKindStack, stack.ID) + + // Pre-existing row with a real container_id that no longer exists. + if err := st.UpsertContainer(store.Container{ + ID: w.ID + ":web", WorkloadID: w.ID, WorkloadKind: "stack", + Role: "web", ContainerID: "docker-gone", State: "running", + }); err != nil { + t.Fatalf("seed: %v", err) + } + + // Reconciler sees nothing. + r := New(st, &fakeDocker{}, 0) + if err := r.ReconcileOnce(context.Background()); err != nil { + t.Fatalf("ReconcileOnce: %v", err) + } + + got, _ := st.GetContainerByID(w.ID + ":web") + if got.State != "missing" { + t.Fatalf("expected state=missing, got %q", got.State) + } +} + +func TestReconcileSkipsRowsAwaitingDocker(t *testing.T) { + st := newTestStore(t) + + stack, _ := st.CreateStack(store.Stack{ + Name: "pending", ComposeProjectName: "tinyforge-pending", + }) + w, _ := st.GetWorkloadByRef(store.WorkloadKindStack, stack.ID) + + // A row with empty container_id (deployer placeholder, awaiting docker + // create). Reconciler must not mark this as missing. + if err := st.UpsertContainer(store.Container{ + ID: w.ID + ":web", WorkloadID: w.ID, WorkloadKind: "stack", + Role: "web", ContainerID: "", State: "starting", + }); err != nil { + t.Fatalf("seed: %v", err) + } + + r := New(st, &fakeDocker{}, 0) + if err := r.ReconcileOnce(context.Background()); err != nil { + t.Fatalf("ReconcileOnce: %v", err) + } + + got, _ := st.GetContainerByID(w.ID + ":web") + if got.State != "starting" { + t.Fatalf("placeholder row should keep state, got %q", got.State) + } +} + +func TestReconcileIgnoresUnmanagedContainers(t *testing.T) { + // A container without any tinyforge or compose labels would not even be + // returned by ListAllForReconciler in production; but the dispatch must + // be a no-op even if a stray item slips through. + st := newTestStore(t) + fake := &fakeDocker{items: []docker.ReconcileItem{{ + ID: "docker-foreign", Labels: map[string]string{"app": "other"}, + }}} + r := New(st, fake, 0) + if err := r.ReconcileOnce(context.Background()); err != nil { + t.Fatalf("ReconcileOnce: %v", err) + } + rows, _ := st.ListContainers(store.ContainerFilter{}) + if len(rows) != 0 { + t.Fatalf("foreign container should not produce rows, got %d", len(rows)) + } +} + +func TestReconcileNormalizesState(t *testing.T) { + st := newTestStore(t) + stack, _ := st.CreateStack(store.Stack{ + Name: "norm", ComposeProjectName: "tinyforge-norm", + }) + w, _ := st.GetWorkloadByRef(store.WorkloadKindStack, stack.ID) + + fake := &fakeDocker{items: []docker.ReconcileItem{{ + ID: "docker-1", + Image: "nginx", + State: "exited", + Labels: map[string]string{ + docker.LabelManaged: "true", + docker.LabelWorkloadID: w.ID, + docker.LabelWorkloadKind: "stack", + docker.LabelRole: "web", + }, + }}} + + r := New(st, fake, 0) + if err := r.ReconcileOnce(context.Background()); err != nil { + t.Fatalf("ReconcileOnce: %v", err) + } + + got, _ := st.GetContainerByID(w.ID + ":web") + if got.State != "stopped" { + t.Fatalf("docker 'exited' should normalize to 'stopped', got %q", got.State) + } +}