// Package reconciler keeps the normalized containers index in sync with the // Docker daemon. It runs on a tick (and one-shot at boot) — for every // Tinyforge-managed container in `docker ps`, it dispatches to a workload by // labels and upserts a Container row. Rows whose Docker container ID is no // longer present are flipped to state='missing'. // // Dispatch precedence: // 1. tinyforge.workload.id label (canonical) // 2. tinyforge.instance-id label (legacy project — joins via instances row) // 3. tinyforge.static-site label (legacy site) // 4. com.docker.compose.project (stack — joins via Stack.ComposeProjectName) package reconciler import ( "context" "errors" "log/slog" "strings" "sync" "time" "github.com/alexei/tinyforge/internal/docker" "github.com/alexei/tinyforge/internal/store" ) // DockerLister is the subset of docker.Client the reconciler depends on. // Defined here (where it's used) so tests can substitute a fake without // pulling in the full docker package. type DockerLister interface { ListAllForReconciler(ctx context.Context) ([]docker.ReconcileItem, error) } // Reconciler is the background worker that syncs the containers index. type Reconciler struct { store *store.Store docker DockerLister interval time.Duration stop chan struct{} wg sync.WaitGroup } // New constructs a Reconciler. interval is the tick period; values <=0 fall // back to 30s. interval > 5m is clamped to 5m so a manual misconfiguration // can't silently disable timely state updates. func New(st *store.Store, dockerClient DockerLister, interval time.Duration) *Reconciler { if interval <= 0 { interval = 30 * time.Second } if interval > 5*time.Minute { interval = 5 * time.Minute } return &Reconciler{ store: st, docker: dockerClient, interval: interval, stop: make(chan struct{}), } } // Start kicks off the background reconciliation loop. Runs one tick // immediately so startup populates the index without waiting for the first // timer fire. Idempotent: calling Start twice is a programming error. func (r *Reconciler) Start(ctx context.Context) { r.wg.Add(1) go r.loop(ctx) } // Stop signals the loop to exit and waits for the in-flight tick to finish. func (r *Reconciler) Stop() { close(r.stop) r.wg.Wait() } // ReconcileOnce runs a single reconciliation pass. Exposed for tests and for // callers that want to force a sync after a known mutation (e.g., right after // a deploy succeeds, before the next tick). func (r *Reconciler) ReconcileOnce(ctx context.Context) error { items, err := r.docker.ListAllForReconciler(ctx) if err != nil { return err } seen := make(map[string]struct{}, len(items)) // container row IDs we touched for _, item := range items { rowID := r.upsertFromItem(ctx, item) if rowID != "" { seen[rowID] = struct{}{} } } r.markMissingRows(seen) return nil } func (r *Reconciler) loop(ctx context.Context) { defer r.wg.Done() // Boot tick. if err := r.ReconcileOnce(ctx); err != nil { slog.Warn("reconciler: initial pass", "error", err) } ticker := time.NewTicker(r.interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-r.stop: return case <-ticker.C: if err := r.ReconcileOnce(ctx); err != nil { slog.Warn("reconciler: tick", "error", err) } } } } // upsertFromItem dispatches one container to its workload and writes the // Container row. Returns the row ID on success or "" if no dispatch matched. func (r *Reconciler) upsertFromItem(ctx context.Context, item docker.ReconcileItem) string { if id := item.Labels[docker.LabelWorkloadID]; id != "" { return r.upsertByWorkloadLabel(item, id) } if instanceID := item.Labels[docker.LabelInstanceID]; instanceID != "" { return r.upsertByInstanceLabel(item, instanceID) } if siteID := item.Labels["tinyforge.static-site"]; siteID != "" { return r.upsertBySiteLabel(item, siteID) } if cp := item.Labels["com.docker.compose.project"]; cp != "" && strings.HasPrefix(cp, "tinyforge-") { return r.upsertByComposeProject(item, cp) } return "" } // upsertByWorkloadLabel — canonical path. WorkloadID + Role uniquely // identifies the row. ID stays deterministic so re-deploys update in place. func (r *Reconciler) upsertByWorkloadLabel(item docker.ReconcileItem, workloadID string) string { role := item.Labels[docker.LabelRole] kind := item.Labels[docker.LabelWorkloadKind] rowID := workloadIDRow(workloadID, kind, role, item.Labels[docker.LabelInstanceID], item.ID) port := 0 if len(item.Ports) > 0 { port = int(item.Ports[0]) } if err := r.store.UpsertContainer(store.Container{ ID: rowID, WorkloadID: workloadID, WorkloadKind: kind, Role: role, ContainerID: item.ID, ImageRef: item.Image, Host: "local", State: normalizeState(item.State), Port: port, LastSeenAt: store.Now(), }); err != nil { slog.Warn("reconciler: upsert by workload label", "container_id", item.ID, "error", err) return "" } return rowID } // upsertByInstanceLabel — legacy project path. Instance ID maps 1:1 to the // container row ID by construction (deployer uses the same UUID for both), // so we can update directly. We still need the workload ID for the row. func (r *Reconciler) upsertByInstanceLabel(item docker.ReconcileItem, instanceID string) string { inst, err := r.store.GetInstanceByID(instanceID) if err != nil { // Container with stale label — instance row gone. Skip silently. if errors.Is(err, store.ErrNotFound) { return "" } slog.Warn("reconciler: lookup instance", "instance_id", instanceID, "error", err) return "" } w, err := r.store.GetWorkloadByRef(store.WorkloadKindProject, inst.ProjectID) if err != nil { return "" } port := inst.Port if port == 0 && len(item.Ports) > 0 { port = int(item.Ports[0]) } if err := r.store.UpsertContainer(store.Container{ ID: inst.ID, WorkloadID: w.ID, WorkloadKind: string(store.WorkloadKindProject), Role: item.Labels[docker.LabelStage], ContainerID: item.ID, ImageRef: item.Image, ImageTag: inst.ImageTag, Host: "local", State: normalizeState(item.State), Port: port, Subdomain: inst.Subdomain, ProxyRouteID: inst.ProxyRouteID, NpmProxyID: inst.NpmProxyID, LastSeenAt: store.Now(), }); err != nil { slog.Warn("reconciler: upsert by instance label", "container_id", item.ID, "error", err) return "" } return inst.ID } func (r *Reconciler) upsertBySiteLabel(item docker.ReconcileItem, siteID string) string { w, err := r.store.GetWorkloadByRef(store.WorkloadKindSite, siteID) if err != nil { return "" } rowID := w.ID + ":site" port := 0 if len(item.Ports) > 0 { port = int(item.Ports[0]) } if err := r.store.UpsertContainer(store.Container{ ID: rowID, WorkloadID: w.ID, WorkloadKind: string(store.WorkloadKindSite), Role: "", ContainerID: item.ID, ImageRef: item.Image, Host: "local", State: normalizeState(item.State), Port: port, LastSeenAt: store.Now(), }); err != nil { slog.Warn("reconciler: upsert by site label", "container_id", item.ID, "error", err) return "" } return rowID } func (r *Reconciler) upsertByComposeProject(item docker.ReconcileItem, composeProject string) string { stack, err := r.findStackByComposeProject(composeProject) if err != nil { return "" } w, err := r.store.GetWorkloadByRef(store.WorkloadKindStack, stack.ID) if err != nil { return "" } role := item.Labels["com.docker.compose.service"] if role == "" { role = item.Name } rowID := w.ID + ":" + role port := 0 if len(item.Ports) > 0 { port = int(item.Ports[0]) } if err := r.store.UpsertContainer(store.Container{ ID: rowID, WorkloadID: w.ID, WorkloadKind: string(store.WorkloadKindStack), Role: role, ContainerID: item.ID, ImageRef: item.Image, Host: "local", State: normalizeState(item.State), Port: port, LastSeenAt: store.Now(), }); err != nil { slog.Warn("reconciler: upsert by compose project", "container_id", item.ID, "error", err) return "" } return rowID } // findStackByComposeProject scans all stacks for a matching ComposeProjectName. // Linear; the stack count is small in practice. func (r *Reconciler) findStackByComposeProject(composeProject string) (store.Stack, error) { stacks, err := r.store.GetAllStacks() if err != nil { return store.Stack{}, err } for _, s := range stacks { if s.ComposeProjectName == composeProject { return s, nil } } return store.Stack{}, store.ErrNotFound } // markMissingRows flips state to 'missing' for any container row whose Docker // container ID was not seen in this pass. Rows with empty container_id are // skipped — the deployer creates them ahead of `docker create` so they're // transient and shouldn't be marked missing on a tick that races the deploy. func (r *Reconciler) markMissingRows(seen map[string]struct{}) { rows, err := r.store.ListContainers(store.ContainerFilter{}) if err != nil { slog.Warn("reconciler: list containers for missing-sweep", "error", err) return } for _, row := range rows { if _, ok := seen[row.ID]; ok { continue } if row.ContainerID == "" { continue // never bound to a real container yet } if row.State == "missing" { continue // already marked } if err := r.store.MarkContainerMissing(row.ID); err != nil { slog.Warn("reconciler: mark missing", "row_id", row.ID, "error", err) } } } // workloadIDRow picks the row ID for a workload-labelled container. // For projects the deployer assigns instance ID = container row ID (via // LabelInstanceID), so we honor that to keep IDs stable. For stack/site // it's the deterministic workloadID:role pattern. func workloadIDRow(workloadID, kind, role, instanceID, containerID string) string { if instanceID != "" && kind == string(store.WorkloadKindProject) { return instanceID } if role != "" { return workloadID + ":" + role } if kind == string(store.WorkloadKindSite) { return workloadID + ":site" } // Last-resort fallback: container ID. Better than ""; uncommon path. return workloadID + ":" + containerID } // normalizeState maps Docker container states to our condensed set: // running | stopped | failed | removing | missing. func normalizeState(dockerState string) string { switch dockerState { case "running": return "running" case "exited", "dead", "stopped": return "stopped" case "created", "restarting", "paused": return dockerState case "removing": return "removing" default: return dockerState } }