feat(workload): container index reconciler

Background worker that keeps the containers table in sync with
docker ps. Runs one boot pass and ticks every 30s.

Dispatch precedence per container:
  1. tinyforge.workload.id label   (canonical, new)
  2. tinyforge.instance-id label   (legacy project — joins via instances)
  3. tinyforge.static-site label   (legacy site)
  4. com.docker.compose.project    (stacks — joins via ComposeProjectName)

Rows whose Docker container ID is no longer present are flipped
to state='missing'. Placeholder rows (empty container_id, e.g.
a deploy mid-flight) are left alone so a tick that races a
deploy doesn't mark them as missing.

DockerLister interface lets tests substitute a fake daemon —
6 unit tests cover the dispatch matrix, missing-sweep, and
state normalization.

Wired into cmd/server/main.go between docker.New and the
existing startup chain. Boot pass populates the containers
table from any pre-refactor running containers.
This commit is contained in:
2026-05-09 13:45:13 +03:00
parent b6f20599d7
commit af82be3fb8
4 changed files with 659 additions and 0 deletions
+12
View File
@@ -30,6 +30,7 @@ import (
"github.com/alexei/tinyforge/internal/notify" "github.com/alexei/tinyforge/internal/notify"
"github.com/alexei/tinyforge/internal/npm" "github.com/alexei/tinyforge/internal/npm"
"github.com/alexei/tinyforge/internal/proxy" "github.com/alexei/tinyforge/internal/proxy"
"github.com/alexei/tinyforge/internal/reconciler"
"github.com/alexei/tinyforge/internal/registry" "github.com/alexei/tinyforge/internal/registry"
"github.com/alexei/tinyforge/internal/stale" "github.com/alexei/tinyforge/internal/stale"
"github.com/alexei/tinyforge/internal/stack" "github.com/alexei/tinyforge/internal/stack"
@@ -94,6 +95,17 @@ func main() {
} }
defer dockerClient.Close() defer dockerClient.Close()
// Start the container index reconciler. Runs one boot pass and then
// ticks every 30s. Boot pass populates the containers table from any
// running containers that predate the workload refactor; subsequent
// ticks catch state drift the deployer didn't witness (e.g., a stack
// service that exited on its own).
reconcilerCtx, reconcilerCancel := context.WithCancel(context.Background())
defer reconcilerCancel()
rec := reconciler.New(db, dockerClient, 30*time.Second)
rec.Start(reconcilerCtx)
defer rec.Stop()
// Read settings for NPM URL and polling interval. // Read settings for NPM URL and polling interval.
settings, err := db.GetSettings() settings, err := db.GetSettings()
if err != nil { if err != nil {
+80
View File
@@ -293,6 +293,86 @@ func (c *Client) ListContainers(ctx context.Context, labelFilters map[string]str
return result, nil return result, nil
} }
// ReconcileItem is a fat container summary aimed at the reconciler — it
// exposes the full label map so the caller can dispatch by workload labels,
// legacy labels, or compose labels without re-inspecting.
type ReconcileItem struct {
ID string
Name string
Image string
State string
Status string
Labels map[string]string
Ports []uint16
}
// ListAllForReconciler returns every container the daemon knows about whose
// labels mark it as Tinyforge-managed by ANY of the supported schemes:
// - tinyforge.managed (canonical, new)
// - tinyforge.project / tinyforge.instance-id (legacy project)
// - tinyforge.static-site (legacy site)
// - com.docker.compose.project starting with "tinyforge-" (stacks)
//
// The Docker API does not support OR'd label filters, so we list everything
// and filter in-process. On a small/medium daemon this is cheap; the
// reconciler runs on a 30s tick.
func (c *Client) ListAllForReconciler(ctx context.Context) ([]ReconcileItem, error) {
listResult, err := c.api.ContainerList(ctx, client.ContainerListOptions{All: true})
if err != nil {
return nil, fmt.Errorf("list containers: %w", err)
}
out := make([]ReconcileItem, 0, len(listResult.Items))
for _, ctr := range listResult.Items {
labels := ctr.Labels
if !isTinyforgeManaged(labels) {
continue
}
name := ""
if len(ctr.Names) > 0 {
name = strings.TrimPrefix(ctr.Names[0], "/")
}
var ports []uint16
for _, p := range ctr.Ports {
if p.PublicPort > 0 {
ports = append(ports, p.PublicPort)
}
}
out = append(out, ReconcileItem{
ID: ctr.ID,
Name: name,
Image: ctr.Image,
State: string(ctr.State),
Status: ctr.Status,
Labels: labels,
Ports: ports,
})
}
return out, nil
}
// isTinyforgeManaged returns true when a container's labels mark it as
// belonging to Tinyforge under any of the supported labelling schemes.
func isTinyforgeManaged(labels map[string]string) bool {
if labels == nil {
return false
}
if labels[LabelManaged] == "true" {
return true
}
if labels[LabelProject] != "" || labels[LabelInstanceID] != "" {
return true
}
if _, ok := labels["tinyforge.static-site"]; ok {
return true
}
if cp, ok := labels["com.docker.compose.project"]; ok && strings.HasPrefix(cp, "tinyforge-") {
return true
}
return false
}
// ContainerLogs returns a log stream for a container. // ContainerLogs returns a log stream for a container.
// If follow is true, the stream stays open for new log lines. // If follow is true, the stream stays open for new log lines.
// tail specifies the number of lines from the end to return (e.g., "200"). // tail specifies the number of lines from the end to return (e.g., "200").
+348
View File
@@ -0,0 +1,348 @@
// Package reconciler keeps the normalized containers index in sync with the
// Docker daemon. It runs on a tick (and one-shot at boot) — for every
// Tinyforge-managed container in `docker ps`, it dispatches to a workload by
// labels and upserts a Container row. Rows whose Docker container ID is no
// longer present are flipped to state='missing'.
//
// Dispatch precedence:
// 1. tinyforge.workload.id label (canonical)
// 2. tinyforge.instance-id label (legacy project — joins via instances row)
// 3. tinyforge.static-site label (legacy site)
// 4. com.docker.compose.project (stack — joins via Stack.ComposeProjectName)
package reconciler
import (
"context"
"errors"
"log/slog"
"strings"
"sync"
"time"
"github.com/alexei/tinyforge/internal/docker"
"github.com/alexei/tinyforge/internal/store"
)
// DockerLister is the subset of docker.Client the reconciler depends on.
// Defined here (where it's used) so tests can substitute a fake without
// pulling in the full docker package.
type DockerLister interface {
ListAllForReconciler(ctx context.Context) ([]docker.ReconcileItem, error)
}
// Reconciler is the background worker that syncs the containers index.
type Reconciler struct {
store *store.Store
docker DockerLister
interval time.Duration
stop chan struct{}
wg sync.WaitGroup
}
// New constructs a Reconciler. interval is the tick period; values <=0 fall
// back to 30s. interval > 5m is clamped to 5m so a manual misconfiguration
// can't silently disable timely state updates.
func New(st *store.Store, dockerClient DockerLister, interval time.Duration) *Reconciler {
if interval <= 0 {
interval = 30 * time.Second
}
if interval > 5*time.Minute {
interval = 5 * time.Minute
}
return &Reconciler{
store: st,
docker: dockerClient,
interval: interval,
stop: make(chan struct{}),
}
}
// Start kicks off the background reconciliation loop. Runs one tick
// immediately so startup populates the index without waiting for the first
// timer fire. Idempotent: calling Start twice is a programming error.
func (r *Reconciler) Start(ctx context.Context) {
r.wg.Add(1)
go r.loop(ctx)
}
// Stop signals the loop to exit and waits for the in-flight tick to finish.
func (r *Reconciler) Stop() {
close(r.stop)
r.wg.Wait()
}
// ReconcileOnce runs a single reconciliation pass. Exposed for tests and for
// callers that want to force a sync after a known mutation (e.g., right after
// a deploy succeeds, before the next tick).
func (r *Reconciler) ReconcileOnce(ctx context.Context) error {
items, err := r.docker.ListAllForReconciler(ctx)
if err != nil {
return err
}
seen := make(map[string]struct{}, len(items)) // container row IDs we touched
for _, item := range items {
rowID := r.upsertFromItem(ctx, item)
if rowID != "" {
seen[rowID] = struct{}{}
}
}
r.markMissingRows(seen)
return nil
}
func (r *Reconciler) loop(ctx context.Context) {
defer r.wg.Done()
// Boot tick.
if err := r.ReconcileOnce(ctx); err != nil {
slog.Warn("reconciler: initial pass", "error", err)
}
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-r.stop:
return
case <-ticker.C:
if err := r.ReconcileOnce(ctx); err != nil {
slog.Warn("reconciler: tick", "error", err)
}
}
}
}
// upsertFromItem dispatches one container to its workload and writes the
// Container row. Returns the row ID on success or "" if no dispatch matched.
func (r *Reconciler) upsertFromItem(ctx context.Context, item docker.ReconcileItem) string {
if id := item.Labels[docker.LabelWorkloadID]; id != "" {
return r.upsertByWorkloadLabel(item, id)
}
if instanceID := item.Labels[docker.LabelInstanceID]; instanceID != "" {
return r.upsertByInstanceLabel(item, instanceID)
}
if siteID := item.Labels["tinyforge.static-site"]; siteID != "" {
return r.upsertBySiteLabel(item, siteID)
}
if cp := item.Labels["com.docker.compose.project"]; cp != "" && strings.HasPrefix(cp, "tinyforge-") {
return r.upsertByComposeProject(item, cp)
}
return ""
}
// upsertByWorkloadLabel — canonical path. WorkloadID + Role uniquely
// identifies the row. ID stays deterministic so re-deploys update in place.
func (r *Reconciler) upsertByWorkloadLabel(item docker.ReconcileItem, workloadID string) string {
role := item.Labels[docker.LabelRole]
kind := item.Labels[docker.LabelWorkloadKind]
rowID := workloadIDRow(workloadID, kind, role, item.Labels[docker.LabelInstanceID], item.ID)
port := 0
if len(item.Ports) > 0 {
port = int(item.Ports[0])
}
if err := r.store.UpsertContainer(store.Container{
ID: rowID,
WorkloadID: workloadID,
WorkloadKind: kind,
Role: role,
ContainerID: item.ID,
ImageRef: item.Image,
Host: "local",
State: normalizeState(item.State),
Port: port,
LastSeenAt: store.Now(),
}); err != nil {
slog.Warn("reconciler: upsert by workload label", "container_id", item.ID, "error", err)
return ""
}
return rowID
}
// upsertByInstanceLabel — legacy project path. Instance ID maps 1:1 to the
// container row ID by construction (deployer uses the same UUID for both),
// so we can update directly. We still need the workload ID for the row.
func (r *Reconciler) upsertByInstanceLabel(item docker.ReconcileItem, instanceID string) string {
inst, err := r.store.GetInstanceByID(instanceID)
if err != nil {
// Container with stale label — instance row gone. Skip silently.
if errors.Is(err, store.ErrNotFound) {
return ""
}
slog.Warn("reconciler: lookup instance", "instance_id", instanceID, "error", err)
return ""
}
w, err := r.store.GetWorkloadByRef(store.WorkloadKindProject, inst.ProjectID)
if err != nil {
return ""
}
port := inst.Port
if port == 0 && len(item.Ports) > 0 {
port = int(item.Ports[0])
}
if err := r.store.UpsertContainer(store.Container{
ID: inst.ID,
WorkloadID: w.ID,
WorkloadKind: string(store.WorkloadKindProject),
Role: item.Labels[docker.LabelStage],
ContainerID: item.ID,
ImageRef: item.Image,
ImageTag: inst.ImageTag,
Host: "local",
State: normalizeState(item.State),
Port: port,
Subdomain: inst.Subdomain,
ProxyRouteID: inst.ProxyRouteID,
NpmProxyID: inst.NpmProxyID,
LastSeenAt: store.Now(),
}); err != nil {
slog.Warn("reconciler: upsert by instance label", "container_id", item.ID, "error", err)
return ""
}
return inst.ID
}
func (r *Reconciler) upsertBySiteLabel(item docker.ReconcileItem, siteID string) string {
w, err := r.store.GetWorkloadByRef(store.WorkloadKindSite, siteID)
if err != nil {
return ""
}
rowID := w.ID + ":site"
port := 0
if len(item.Ports) > 0 {
port = int(item.Ports[0])
}
if err := r.store.UpsertContainer(store.Container{
ID: rowID,
WorkloadID: w.ID,
WorkloadKind: string(store.WorkloadKindSite),
Role: "",
ContainerID: item.ID,
ImageRef: item.Image,
Host: "local",
State: normalizeState(item.State),
Port: port,
LastSeenAt: store.Now(),
}); err != nil {
slog.Warn("reconciler: upsert by site label", "container_id", item.ID, "error", err)
return ""
}
return rowID
}
func (r *Reconciler) upsertByComposeProject(item docker.ReconcileItem, composeProject string) string {
stack, err := r.findStackByComposeProject(composeProject)
if err != nil {
return ""
}
w, err := r.store.GetWorkloadByRef(store.WorkloadKindStack, stack.ID)
if err != nil {
return ""
}
role := item.Labels["com.docker.compose.service"]
if role == "" {
role = item.Name
}
rowID := w.ID + ":" + role
port := 0
if len(item.Ports) > 0 {
port = int(item.Ports[0])
}
if err := r.store.UpsertContainer(store.Container{
ID: rowID,
WorkloadID: w.ID,
WorkloadKind: string(store.WorkloadKindStack),
Role: role,
ContainerID: item.ID,
ImageRef: item.Image,
Host: "local",
State: normalizeState(item.State),
Port: port,
LastSeenAt: store.Now(),
}); err != nil {
slog.Warn("reconciler: upsert by compose project", "container_id", item.ID, "error", err)
return ""
}
return rowID
}
// findStackByComposeProject scans all stacks for a matching ComposeProjectName.
// Linear; the stack count is small in practice.
func (r *Reconciler) findStackByComposeProject(composeProject string) (store.Stack, error) {
stacks, err := r.store.GetAllStacks()
if err != nil {
return store.Stack{}, err
}
for _, s := range stacks {
if s.ComposeProjectName == composeProject {
return s, nil
}
}
return store.Stack{}, store.ErrNotFound
}
// markMissingRows flips state to 'missing' for any container row whose Docker
// container ID was not seen in this pass. Rows with empty container_id are
// skipped — the deployer creates them ahead of `docker create` so they're
// transient and shouldn't be marked missing on a tick that races the deploy.
func (r *Reconciler) markMissingRows(seen map[string]struct{}) {
rows, err := r.store.ListContainers(store.ContainerFilter{})
if err != nil {
slog.Warn("reconciler: list containers for missing-sweep", "error", err)
return
}
for _, row := range rows {
if _, ok := seen[row.ID]; ok {
continue
}
if row.ContainerID == "" {
continue // never bound to a real container yet
}
if row.State == "missing" {
continue // already marked
}
if err := r.store.MarkContainerMissing(row.ID); err != nil {
slog.Warn("reconciler: mark missing", "row_id", row.ID, "error", err)
}
}
}
// workloadIDRow picks the row ID for a workload-labelled container.
// For projects the deployer assigns instance ID = container row ID (via
// LabelInstanceID), so we honor that to keep IDs stable. For stack/site
// it's the deterministic workloadID:role pattern.
func workloadIDRow(workloadID, kind, role, instanceID, containerID string) string {
if instanceID != "" && kind == string(store.WorkloadKindProject) {
return instanceID
}
if role != "" {
return workloadID + ":" + role
}
if kind == string(store.WorkloadKindSite) {
return workloadID + ":site"
}
// Last-resort fallback: container ID. Better than ""; uncommon path.
return workloadID + ":" + containerID
}
// normalizeState maps Docker container states to our condensed set:
// running | stopped | failed | removing | missing.
func normalizeState(dockerState string) string {
switch dockerState {
case "running":
return "running"
case "exited", "dead", "stopped":
return "stopped"
case "created", "restarting", "paused":
return dockerState
case "removing":
return "removing"
default:
return dockerState
}
}
+219
View File
@@ -0,0 +1,219 @@
package reconciler
import (
"context"
"testing"
"github.com/alexei/tinyforge/internal/docker"
"github.com/alexei/tinyforge/internal/store"
)
// fakeDocker is a tiny stand-in for docker.Client. The reconciler depends on
// the DockerLister interface so we don't need a real daemon for unit tests.
type fakeDocker struct {
items []docker.ReconcileItem
}
func (f *fakeDocker) ListAllForReconciler(ctx context.Context) ([]docker.ReconcileItem, error) {
return f.items, nil
}
func newTestStore(t *testing.T) *store.Store {
t.Helper()
s, err := store.New(":memory:")
if err != nil {
t.Fatalf("create store: %v", err)
}
t.Cleanup(func() { s.Close() })
return s
}
func TestReconcileWorkloadLabelledStackContainer(t *testing.T) {
st := newTestStore(t)
// Set up a stack workload (no project/site interaction).
stack, err := st.CreateStack(store.Stack{
Name: "wf-stack", ComposeProjectName: "tinyforge-wf-stack",
})
if err != nil {
t.Fatalf("CreateStack: %v", err)
}
w, _ := st.GetWorkloadByRef(store.WorkloadKindStack, stack.ID)
// One container with the canonical workload labels stamped.
fake := &fakeDocker{items: []docker.ReconcileItem{{
ID: "docker-abc",
Name: "wf-stack-web-1",
Image: "nginx:1.27",
State: "running",
Labels: map[string]string{
docker.LabelManaged: "true",
docker.LabelWorkloadID: w.ID,
docker.LabelWorkloadKind: "stack",
docker.LabelRole: "web",
},
Ports: []uint16{8080},
}}}
r := New(st, fake, 0)
if err := r.ReconcileOnce(context.Background()); err != nil {
t.Fatalf("ReconcileOnce: %v", err)
}
rows, _ := st.ListContainersByWorkload(w.ID)
if len(rows) != 1 {
t.Fatalf("expected 1 container row, got %d", len(rows))
}
got := rows[0]
if got.ContainerID != "docker-abc" {
t.Fatalf("container_id not bound: got %q", got.ContainerID)
}
if got.Role != "web" || got.WorkloadKind != "stack" {
t.Fatalf("dispatch wrong: %+v", got)
}
if got.State != "running" || got.Port != 8080 {
t.Fatalf("state/port wrong: %+v", got)
}
}
func TestReconcileComposeOnlyStackContainer(t *testing.T) {
st := newTestStore(t)
stack, _ := st.CreateStack(store.Stack{
Name: "compose-stack", ComposeProjectName: "tinyforge-compose-stack",
})
w, _ := st.GetWorkloadByRef(store.WorkloadKindStack, stack.ID)
// Pre-existing compose container — only carries compose's own labels,
// no tinyforge.* labels at all.
fake := &fakeDocker{items: []docker.ReconcileItem{{
ID: "docker-xyz",
Name: "tinyforge-compose-stack-worker-1",
Image: "redis:7",
State: "running",
Labels: map[string]string{
"com.docker.compose.project": "tinyforge-compose-stack",
"com.docker.compose.service": "worker",
},
}}}
r := New(st, fake, 0)
if err := r.ReconcileOnce(context.Background()); err != nil {
t.Fatalf("ReconcileOnce: %v", err)
}
rows, _ := st.ListContainersByWorkload(w.ID)
if len(rows) != 1 {
t.Fatalf("expected 1 row, got %d", len(rows))
}
if rows[0].Role != "worker" {
t.Fatalf("role from compose label wrong: %q", rows[0].Role)
}
if rows[0].ContainerID != "docker-xyz" {
t.Fatalf("container_id not bound: %q", rows[0].ContainerID)
}
}
func TestReconcileMarksMissingRows(t *testing.T) {
st := newTestStore(t)
stack, _ := st.CreateStack(store.Stack{
Name: "missing-stack", ComposeProjectName: "tinyforge-missing-stack",
})
w, _ := st.GetWorkloadByRef(store.WorkloadKindStack, stack.ID)
// Pre-existing row with a real container_id that no longer exists.
if err := st.UpsertContainer(store.Container{
ID: w.ID + ":web", WorkloadID: w.ID, WorkloadKind: "stack",
Role: "web", ContainerID: "docker-gone", State: "running",
}); err != nil {
t.Fatalf("seed: %v", err)
}
// Reconciler sees nothing.
r := New(st, &fakeDocker{}, 0)
if err := r.ReconcileOnce(context.Background()); err != nil {
t.Fatalf("ReconcileOnce: %v", err)
}
got, _ := st.GetContainerByID(w.ID + ":web")
if got.State != "missing" {
t.Fatalf("expected state=missing, got %q", got.State)
}
}
func TestReconcileSkipsRowsAwaitingDocker(t *testing.T) {
st := newTestStore(t)
stack, _ := st.CreateStack(store.Stack{
Name: "pending", ComposeProjectName: "tinyforge-pending",
})
w, _ := st.GetWorkloadByRef(store.WorkloadKindStack, stack.ID)
// A row with empty container_id (deployer placeholder, awaiting docker
// create). Reconciler must not mark this as missing.
if err := st.UpsertContainer(store.Container{
ID: w.ID + ":web", WorkloadID: w.ID, WorkloadKind: "stack",
Role: "web", ContainerID: "", State: "starting",
}); err != nil {
t.Fatalf("seed: %v", err)
}
r := New(st, &fakeDocker{}, 0)
if err := r.ReconcileOnce(context.Background()); err != nil {
t.Fatalf("ReconcileOnce: %v", err)
}
got, _ := st.GetContainerByID(w.ID + ":web")
if got.State != "starting" {
t.Fatalf("placeholder row should keep state, got %q", got.State)
}
}
func TestReconcileIgnoresUnmanagedContainers(t *testing.T) {
// A container without any tinyforge or compose labels would not even be
// returned by ListAllForReconciler in production; but the dispatch must
// be a no-op even if a stray item slips through.
st := newTestStore(t)
fake := &fakeDocker{items: []docker.ReconcileItem{{
ID: "docker-foreign", Labels: map[string]string{"app": "other"},
}}}
r := New(st, fake, 0)
if err := r.ReconcileOnce(context.Background()); err != nil {
t.Fatalf("ReconcileOnce: %v", err)
}
rows, _ := st.ListContainers(store.ContainerFilter{})
if len(rows) != 0 {
t.Fatalf("foreign container should not produce rows, got %d", len(rows))
}
}
func TestReconcileNormalizesState(t *testing.T) {
st := newTestStore(t)
stack, _ := st.CreateStack(store.Stack{
Name: "norm", ComposeProjectName: "tinyforge-norm",
})
w, _ := st.GetWorkloadByRef(store.WorkloadKindStack, stack.ID)
fake := &fakeDocker{items: []docker.ReconcileItem{{
ID: "docker-1",
Image: "nginx",
State: "exited",
Labels: map[string]string{
docker.LabelManaged: "true",
docker.LabelWorkloadID: w.ID,
docker.LabelWorkloadKind: "stack",
docker.LabelRole: "web",
},
}}}
r := New(st, fake, 0)
if err := r.ReconcileOnce(context.Background()); err != nil {
t.Fatalf("ReconcileOnce: %v", err)
}
got, _ := st.GetContainerByID(w.ID + ":web")
if got.State != "stopped" {
t.Fatalf("docker 'exited' should normalize to 'stopped', got %q", got.State)
}
}