// Package reconciler keeps the normalized containers index in sync with the // Docker daemon. It runs on a tick (and one-shot at boot) — for every // Tinyforge-managed container in `docker ps`, it resolves a workload by the // canonical workload-id label and writes a Container row through // ReconcileContainer (which only touches Docker-derived fields on conflict, // never deployer-owned columns like subdomain / proxy_route_id / // npm_proxy_id / image_tag / stage_id). Rows whose Docker container ID is no // longer present are flipped to state='missing'. // // Only the tinyforge.workload.id label is honored after the hard cutover — // every Source plugin labels its containers with the workload identity at // create time. The legacy tinyforge.static-site / compose-project paths // were dropped along with the static_sites / stacks tables. package reconciler import ( "context" "errors" "fmt" "log/slog" "sync" "time" "github.com/alexei/tinyforge/internal/docker" "github.com/alexei/tinyforge/internal/store" "github.com/alexei/tinyforge/internal/workload/plugin" ) // DockerLister is the subset of docker.Client the reconciler depends on. // Defined here (where it's used) so tests can substitute a fake without // pulling in the full docker package. type DockerLister interface { ListAllForReconciler(ctx context.Context) ([]docker.ReconcileItem, error) } // PluginReconciler is the optional dispatch surface for per-workload // Source.Reconcile calls. Nil-safe — when unset, the reconciler skips // the plugin pass and only refreshes the containers index from Docker. type PluginReconciler interface { DispatchReconcile(ctx context.Context, w plugin.Workload) error } // Reconciler is the background worker that syncs the containers index. type Reconciler struct { store *store.Store docker DockerLister interval time.Duration plugins PluginReconciler // optional; nil disables the per-workload Source.Reconcile pass. stop chan struct{} cancel context.CancelFunc // populated in Start; invoked by Stop so an in-flight tick is unblocked. wg sync.WaitGroup } // New constructs a Reconciler. interval is the tick period; values <=0 fall // back to 30s. interval > 5m is clamped to 5m so a manual misconfiguration // can't silently disable timely state updates. func New(st *store.Store, dockerClient DockerLister, interval time.Duration) *Reconciler { if interval <= 0 { interval = 30 * time.Second } if interval > 5*time.Minute { interval = 5 * time.Minute } return &Reconciler{ store: st, docker: dockerClient, interval: interval, stop: make(chan struct{}), } } // SetPluginReconciler injects the per-workload Source.Reconcile dispatch. // Safe to call before or after Start; tick uses whatever's set at the // time. func (r *Reconciler) SetPluginReconciler(p PluginReconciler) { r.plugins = p } // Start kicks off the background reconciliation loop. Runs one tick // immediately so startup populates the index without waiting for the first // timer fire. The provided context is wrapped with a child cancel func so // Stop() can unblock an in-flight Docker call. func (r *Reconciler) Start(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) r.cancel = cancel r.wg.Add(1) go r.loop(ctx) } // Stop signals the loop to exit. Cancels the child context FIRST so any // in-flight `docker ps` (which can hang on a stuck daemon) returns promptly, // then waits for the goroutine to finish. Idempotent. func (r *Reconciler) Stop() { if r.cancel != nil { r.cancel() } select { case <-r.stop: // already closed default: close(r.stop) } r.wg.Wait() } // ReconcileOnce runs a single reconciliation pass. Exposed for tests and for // callers that want to force a sync after a known mutation (e.g., right after // a deploy succeeds, before the next tick). func (r *Reconciler) ReconcileOnce(ctx context.Context) error { items, err := r.docker.ListAllForReconciler(ctx) if err != nil { return err } // Load every workload ONCE per tick and index by ID. This replaces both // the former N+1 GetWorkloadByID (one DB read per container) in the // upsert loop and the second ListWorkloads("") in the plugin pass: net 1 // query per tick, 0 GetWorkloadByID. // // On error we return BEFORE the upsert loop and leave state untouched // this tick (the next tick retries). We must NOT proceed with an empty // map and fall through to markMissingRows: with no container resolving, // `seen` would be empty and markMissingRows would flip EVERY live row to // 'missing'. Aborting early is the safe choice. rows, err := r.store.ListWorkloads("") if err != nil { return fmt.Errorf("reconciler: list workloads: %w", err) } byID := make(map[string]store.Workload, len(rows)) for _, w := range rows { byID[w.ID] = w } seen := make(map[string]struct{}, len(items)) // container row IDs we touched for _, item := range items { rowID := r.upsertFromItem(item, byID) if rowID != "" { seen[rowID] = struct{}{} } } r.markMissingRows(seen) r.reconcilePluginWorkloads(ctx, rows) return nil } // reconcilePluginWorkloads iterates every workload row that has a // Source plugin and asks the dispatcher to invoke Source.Reconcile. // Failures are logged per-workload — one workload's broken state must // not stop sweeping the rest. // // Trigger configuration is no longer required to reconcile: a workload // with a Source but no trigger bindings is still a deployed thing whose // container state must stay in sync (manual-only deploys are common // during early setup). After the trigger-split refactor triggers live // in their own table, so the only gate here is SourceKind. // // No-op when the plugin dispatcher hasn't been wired (boot-time race, // disabled deployments, tests). // // rows is the workload set already loaded once by ReconcileOnce — passed // through rather than re-queried so a tick costs a single ListWorkloads. func (r *Reconciler) reconcilePluginWorkloads(ctx context.Context, rows []store.Workload) { if r.plugins == nil { return } for _, w := range rows { if w.SourceKind == "" { continue } pw := plugin.WorkloadFromStore(w) if err := r.plugins.DispatchReconcile(ctx, pw); err != nil { slog.Warn("reconciler: plugin reconcile failed", "workload", w.ID, "kind", w.SourceKind, "error", err) } } } func (r *Reconciler) loop(ctx context.Context) { defer r.wg.Done() // Boot tick. if err := r.ReconcileOnce(ctx); err != nil { slog.Warn("reconciler: initial pass", "error", err) } ticker := time.NewTicker(r.interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-r.stop: return case <-ticker.C: if err := r.ReconcileOnce(ctx); err != nil { slog.Warn("reconciler: tick", "error", err) } } } } // upsertFromItem dispatches one container to its workload and writes the // Container row. Returns the row ID on success or "" if no dispatch matched. // After the hard cutover only the canonical tinyforge.workload.id label // path is honored — every Source plugin labels its containers with the // workload identity at create time. func (r *Reconciler) upsertFromItem(item docker.ReconcileItem, byID map[string]store.Workload) string { if id := item.Labels[docker.LabelWorkloadID]; id != "" { return r.upsertByWorkloadLabel(item, id, byID) } return "" } // upsertByWorkloadLabel — canonical path. Project containers are owned by the // deployer: the deployer pre-creates the row with a per-instance UUID and // proxy/subdomain metadata. The reconciler resolves the existing row by // docker container ID and only touches Docker-derived fields. If no existing // row matches and the kind is project, we skip the upsert — inventing a // deterministic-ID row would race with the deployer's UUID rows for stages // with MaxInstances > 1, leaving ghost rows behind. // // Untrusted-label defense: a workload_id label that doesn't resolve to a // known workload row is silently ignored. Anyone with Docker socket access // could otherwise spawn a container with a forged label and steal the // canonical slot for an existing workload. func (r *Reconciler) upsertByWorkloadLabel(item docker.ReconcileItem, workloadID string, byID map[string]store.Workload) string { w, ok := byID[workloadID] if !ok { // Forged or stale label — log once at debug; tick rate keeps logs quiet. slog.Debug("reconciler: unknown workload_id label", "workload_id", workloadID, "container_id", item.ID) return "" } role := item.Labels[docker.LabelRole] kind := item.Labels[docker.LabelWorkloadKind] if kind != "" && kind != w.Kind { slog.Warn("reconciler: workload kind mismatch", "label_kind", kind, "stored_kind", w.Kind, "workload_id", workloadID) return "" } if kind == "" { kind = w.Kind } // Resolve to existing row by Docker container ID. existing, lookupErr := r.store.GetContainerByDockerID(item.ID) if lookupErr == nil { port := 0 if len(item.Ports) > 0 { port = int(item.Ports[0]) } if err := r.store.ReconcileContainer(store.Container{ ID: existing.ID, WorkloadID: workloadID, WorkloadKind: kind, Role: role, ContainerID: item.ID, ImageRef: item.Image, Host: "local", State: normalizeState(item.State), Port: port, LastSeenAt: store.Now(), }); err != nil { slog.Warn("reconciler: reconcile by workload label", "container_id", item.ID, "error", err) return "" } return existing.ID } if !errors.Is(lookupErr, store.ErrNotFound) { slog.Warn("reconciler: lookup container by docker id", "container_id", item.ID, "error", lookupErr) return "" } // No row yet. For project workloads, the deployer is the authoritative // writer — wait for the deployer to create the row rather than // inventing one with a deterministic key (which would collide with // MaxInstances > 1 deploys). if kind == string(store.WorkloadKindProject) { return "" } // Site/stack reach this branch only when their plugin hasn't yet // upserted the row (e.g. a boot tick that races the first deploy). // The deterministic ID computed here matches what the static and // compose plugins write in their state-save paths, so a subsequent // plugin write upserts in place rather than creating a sibling row. rowID := workloadIDRow(workloadID, kind, role, item.ID) port := 0 if len(item.Ports) > 0 { port = int(item.Ports[0]) } if err := r.store.ReconcileContainer(store.Container{ ID: rowID, WorkloadID: workloadID, WorkloadKind: kind, Role: role, ContainerID: item.ID, ImageRef: item.Image, Host: "local", State: normalizeState(item.State), Port: port, LastSeenAt: store.Now(), }); err != nil { slog.Warn("reconciler: reconcile by workload label (insert)", "container_id", item.ID, "error", err) return "" } return rowID } // markMissingRows flips state to 'missing' for any container row whose Docker // container ID was not seen in this pass. Uses ListMissingSweepRows to scan // only rows that are bound to a real container and not already missing. func (r *Reconciler) markMissingRows(seen map[string]struct{}) { rows, err := r.store.ListMissingSweepRows() if err != nil { slog.Warn("reconciler: list rows for missing-sweep", "error", err) return } for _, row := range rows { if _, ok := seen[row.ID]; ok { continue } if err := r.store.MarkContainerMissing(row.ID); err != nil { slog.Warn("reconciler: mark missing", "row_id", row.ID, "error", err) } } } // workloadIDRow picks the row ID for a non-project workload-labelled // container that has no existing row. Sites use `:site` // (matches the static plugin's `containerRowID` helper). Stack // services use `:` (matches the compose // plugin). Project rows are never invented here — the deployer // pre-creates per-instance UUID rows so the reconciler must wait. func workloadIDRow(workloadID, kind, role, containerID string) string { if kind == string(store.WorkloadKindSite) { return workloadID + ":site" } if role != "" { return workloadID + ":" + role } return workloadID + ":" + containerID } // normalizeState maps Docker container states to our condensed set: // running | stopped | failed | removing | missing. func normalizeState(dockerState string) string { switch dockerState { case "running": return "running" case "exited", "dead", "stopped": return "stopped" case "created", "restarting", "paused": return dockerState case "removing": return "removing" default: return dockerState } }