perf(reconciler): batch workloads per tick, drop redundant image inspect
Load every workload once per tick into a map instead of a per-container GetWorkloadByID (N+1) in the upsert loop plus a second ListWorkloads in the plugin pass: one query per tick, zero GetWorkloadByID. The ListWorkloads error path returns before the missing-sweep so a failed load can't flip live container rows to 'missing'. image.Reconcile is now a no-op: the generic upsert+markMissing pass already syncs every labeled container's state from the single ListAllForReconciler (docker ps -a) snapshot earlier in the same tick, so the former per-container IsContainerRunning loop was N redundant Docker calls/tick. (Its no-op body sits in image.go, which landed with the preceding commit; the tests are here.) compose/static reconcile do non-redundant work and are intentionally untouched. Reviewed: go APPROVE.
This commit is contained in:
@@ -17,6 +17,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -110,17 +111,37 @@ func (r *Reconciler) ReconcileOnce(ctx context.Context) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Load every workload ONCE per tick and index by ID. This replaces both
|
||||||
|
// the former N+1 GetWorkloadByID (one DB read per container) in the
|
||||||
|
// upsert loop and the second ListWorkloads("") in the plugin pass: net 1
|
||||||
|
// query per tick, 0 GetWorkloadByID.
|
||||||
|
//
|
||||||
|
// On error we return BEFORE the upsert loop and leave state untouched
|
||||||
|
// this tick (the next tick retries). We must NOT proceed with an empty
|
||||||
|
// map and fall through to markMissingRows: with no container resolving,
|
||||||
|
// `seen` would be empty and markMissingRows would flip EVERY live row to
|
||||||
|
// 'missing'. Aborting early is the safe choice.
|
||||||
|
rows, err := r.store.ListWorkloads("")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("reconciler: list workloads: %w", err)
|
||||||
|
}
|
||||||
|
byID := make(map[string]store.Workload, len(rows))
|
||||||
|
for _, w := range rows {
|
||||||
|
byID[w.ID] = w
|
||||||
|
}
|
||||||
|
|
||||||
seen := make(map[string]struct{}, len(items)) // container row IDs we touched
|
seen := make(map[string]struct{}, len(items)) // container row IDs we touched
|
||||||
|
|
||||||
for _, item := range items {
|
for _, item := range items {
|
||||||
rowID := r.upsertFromItem(item)
|
rowID := r.upsertFromItem(item, byID)
|
||||||
if rowID != "" {
|
if rowID != "" {
|
||||||
seen[rowID] = struct{}{}
|
seen[rowID] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
r.markMissingRows(seen)
|
r.markMissingRows(seen)
|
||||||
r.reconcilePluginWorkloads(ctx)
|
r.reconcilePluginWorkloads(ctx, rows)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -137,15 +158,13 @@ func (r *Reconciler) ReconcileOnce(ctx context.Context) error {
|
|||||||
//
|
//
|
||||||
// No-op when the plugin dispatcher hasn't been wired (boot-time race,
|
// No-op when the plugin dispatcher hasn't been wired (boot-time race,
|
||||||
// disabled deployments, tests).
|
// disabled deployments, tests).
|
||||||
func (r *Reconciler) reconcilePluginWorkloads(ctx context.Context) {
|
//
|
||||||
|
// rows is the workload set already loaded once by ReconcileOnce — passed
|
||||||
|
// through rather than re-queried so a tick costs a single ListWorkloads.
|
||||||
|
func (r *Reconciler) reconcilePluginWorkloads(ctx context.Context, rows []store.Workload) {
|
||||||
if r.plugins == nil {
|
if r.plugins == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rows, err := r.store.ListWorkloads("")
|
|
||||||
if err != nil {
|
|
||||||
slog.Warn("reconciler: list workloads for plugin pass", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, w := range rows {
|
for _, w := range rows {
|
||||||
if w.SourceKind == "" {
|
if w.SourceKind == "" {
|
||||||
continue
|
continue
|
||||||
@@ -214,9 +233,9 @@ func (r *Reconciler) loop(ctx context.Context) {
|
|||||||
// After the hard cutover only the canonical tinyforge.workload.id label
|
// After the hard cutover only the canonical tinyforge.workload.id label
|
||||||
// path is honored — every Source plugin labels its containers with the
|
// path is honored — every Source plugin labels its containers with the
|
||||||
// workload identity at create time.
|
// workload identity at create time.
|
||||||
func (r *Reconciler) upsertFromItem(item docker.ReconcileItem) string {
|
func (r *Reconciler) upsertFromItem(item docker.ReconcileItem, byID map[string]store.Workload) string {
|
||||||
if id := item.Labels[docker.LabelWorkloadID]; id != "" {
|
if id := item.Labels[docker.LabelWorkloadID]; id != "" {
|
||||||
return r.upsertByWorkloadLabel(item, id)
|
return r.upsertByWorkloadLabel(item, id, byID)
|
||||||
}
|
}
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
@@ -233,9 +252,9 @@ func (r *Reconciler) upsertFromItem(item docker.ReconcileItem) string {
|
|||||||
// known workload row is silently ignored. Anyone with Docker socket access
|
// known workload row is silently ignored. Anyone with Docker socket access
|
||||||
// could otherwise spawn a container with a forged label and steal the
|
// could otherwise spawn a container with a forged label and steal the
|
||||||
// canonical slot for an existing workload.
|
// canonical slot for an existing workload.
|
||||||
func (r *Reconciler) upsertByWorkloadLabel(item docker.ReconcileItem, workloadID string) string {
|
func (r *Reconciler) upsertByWorkloadLabel(item docker.ReconcileItem, workloadID string, byID map[string]store.Workload) string {
|
||||||
w, err := r.store.GetWorkloadByID(workloadID)
|
w, ok := byID[workloadID]
|
||||||
if err != nil {
|
if !ok {
|
||||||
// Forged or stale label — log once at debug; tick rate keeps logs quiet.
|
// Forged or stale label — log once at debug; tick rate keeps logs quiet.
|
||||||
slog.Debug("reconciler: unknown workload_id label", "workload_id", workloadID, "container_id", item.ID)
|
slog.Debug("reconciler: unknown workload_id label", "workload_id", workloadID, "container_id", item.ID)
|
||||||
return ""
|
return ""
|
||||||
|
|||||||
@@ -257,6 +257,138 @@ func TestReconcileSkipsProjectInsertWithoutDeployerRow(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestReconcileBatchingPreservesBehavior locks Fix A: loading all workloads
|
||||||
|
// once per tick (and resolving labels from that in-memory map instead of an
|
||||||
|
// N+1 GetWorkloadByID) must produce the same outcome as the per-container
|
||||||
|
// lookup did. With multiple containers across multiple workloads plus a forged
|
||||||
|
// label and a stale row, after one ReconcileOnce: known-workload containers
|
||||||
|
// are upserted with the snapshot State, the forged-label container is skipped,
|
||||||
|
// and the absent stale row is flipped to missing.
|
||||||
|
func TestReconcileBatchingPreservesBehavior(t *testing.T) {
|
||||||
|
st := newTestStore(t)
|
||||||
|
|
||||||
|
w1 := makeWorkload(t, st, "batch-a", "stack")
|
||||||
|
w2 := makeWorkload(t, st, "batch-b", "stack")
|
||||||
|
|
||||||
|
// A stale row for w2 whose container is gone — must be marked missing.
|
||||||
|
if err := st.UpsertContainer(store.Container{
|
||||||
|
ID: w2.ID + ":old", WorkloadID: w2.ID, WorkloadKind: "stack",
|
||||||
|
Role: "old", ContainerID: "docker-vanished", State: "running",
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("seed stale row: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fake := &fakeDocker{items: []docker.ReconcileItem{
|
||||||
|
{
|
||||||
|
ID: "docker-a1", Name: "batch-a-web-1", Image: "nginx:1.27", State: "running",
|
||||||
|
Labels: map[string]string{
|
||||||
|
docker.LabelManaged: "true",
|
||||||
|
docker.LabelWorkloadID: w1.ID,
|
||||||
|
docker.LabelWorkloadKind: "stack",
|
||||||
|
docker.LabelRole: "web",
|
||||||
|
},
|
||||||
|
Ports: []uint16{8080},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "docker-b1", Name: "batch-b-api-1", Image: "redis:7", State: "exited",
|
||||||
|
Labels: map[string]string{
|
||||||
|
docker.LabelManaged: "true",
|
||||||
|
docker.LabelWorkloadID: w2.ID,
|
||||||
|
docker.LabelWorkloadKind: "stack",
|
||||||
|
docker.LabelRole: "api",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// Forged label — no such workload. Must be skipped entirely.
|
||||||
|
ID: "docker-evil", Name: "evil", Image: "nginx", State: "running",
|
||||||
|
Labels: map[string]string{
|
||||||
|
docker.LabelManaged: "true",
|
||||||
|
docker.LabelWorkloadID: "wl-forged",
|
||||||
|
docker.LabelWorkloadKind: "stack",
|
||||||
|
docker.LabelRole: "web",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
|
||||||
|
r := New(st, fake, 0)
|
||||||
|
if err := r.ReconcileOnce(context.Background()); err != nil {
|
||||||
|
t.Fatalf("ReconcileOnce: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// w1: one row, bound to docker-a1, running.
|
||||||
|
w1Rows, _ := st.ListContainersByWorkload(w1.ID)
|
||||||
|
if len(w1Rows) != 1 {
|
||||||
|
t.Fatalf("w1: expected 1 row, got %d", len(w1Rows))
|
||||||
|
}
|
||||||
|
if w1Rows[0].ContainerID != "docker-a1" || w1Rows[0].State != "running" || w1Rows[0].Role != "web" {
|
||||||
|
t.Fatalf("w1 row wrong: %+v", w1Rows[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
// w2: the new api container is present (exited→stopped); the stale row is missing.
|
||||||
|
api, _ := st.GetContainerByID(w2.ID + ":api")
|
||||||
|
if api.ContainerID != "docker-b1" || api.State != "stopped" {
|
||||||
|
t.Fatalf("w2 api row wrong: %+v", api)
|
||||||
|
}
|
||||||
|
old, _ := st.GetContainerByID(w2.ID + ":old")
|
||||||
|
if old.State != "missing" {
|
||||||
|
t.Fatalf("w2 stale row should be missing, got %q", old.State)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Forged label produced no row anywhere.
|
||||||
|
all, _ := st.ListContainers(store.ContainerFilter{})
|
||||||
|
for _, c := range all {
|
||||||
|
if c.ContainerID == "docker-evil" {
|
||||||
|
t.Fatalf("forged-label container was adopted: %+v", c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestReconcileSyncsImageContainerState locks the Fix B coupling: the generic
|
||||||
|
// reconciler upsert pass — NOT image.Reconcile — is what syncs an image
|
||||||
|
// container's State from the snapshot. An image container carries the
|
||||||
|
// workload_id / kind=image / role=image labels at create time, so a present
|
||||||
|
// container's row gets its State written here, proving the per-container
|
||||||
|
// inspect formerly in image.Reconcile is redundant.
|
||||||
|
func TestReconcileSyncsImageContainerState(t *testing.T) {
|
||||||
|
st := newTestStore(t)
|
||||||
|
w := makeWorkload(t, st, "img", "image")
|
||||||
|
|
||||||
|
// Deployer pre-created the image container row (running). Docker now
|
||||||
|
// reports it exited — the generic pass must sync it to stopped.
|
||||||
|
if err := st.UpsertContainer(store.Container{
|
||||||
|
ID: "img-deploy-uuid", WorkloadID: w.ID, WorkloadKind: "image",
|
||||||
|
Role: "image", ContainerID: "docker-img", State: "running",
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("seed image row: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fake := &fakeDocker{items: []docker.ReconcileItem{{
|
||||||
|
ID: "docker-img", Image: "ghcr.io/owner/app:v1", State: "exited",
|
||||||
|
Labels: map[string]string{
|
||||||
|
docker.LabelManaged: "true",
|
||||||
|
docker.LabelWorkloadID: w.ID,
|
||||||
|
docker.LabelWorkloadKind: "image",
|
||||||
|
docker.LabelRole: "image",
|
||||||
|
},
|
||||||
|
Ports: []uint16{3000},
|
||||||
|
}}}
|
||||||
|
|
||||||
|
// No plugin reconciler wired — proves the state sync comes from the
|
||||||
|
// generic upsert pass, not from image.Reconcile.
|
||||||
|
r := New(st, fake, 0)
|
||||||
|
if err := r.ReconcileOnce(context.Background()); err != nil {
|
||||||
|
t.Fatalf("ReconcileOnce: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got, _ := st.GetContainerByID("img-deploy-uuid")
|
||||||
|
if got.State != "stopped" {
|
||||||
|
t.Fatalf("image container state not synced by generic pass: got %q want stopped", got.State)
|
||||||
|
}
|
||||||
|
if got.Port != 3000 || got.ImageRef != "ghcr.io/owner/app:v1" {
|
||||||
|
t.Fatalf("image container docker fields not synced: %+v", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestReconcileNormalizesState(t *testing.T) {
|
func TestReconcileNormalizesState(t *testing.T) {
|
||||||
st := newTestStore(t)
|
st := newTestStore(t)
|
||||||
w := makeWorkload(t, st, "norm", "stack")
|
w := makeWorkload(t, st, "norm", "stack")
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package image
|
package image
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@@ -8,6 +9,20 @@ import (
|
|||||||
"github.com/alexei/tinyforge/internal/workload/plugin"
|
"github.com/alexei/tinyforge/internal/workload/plugin"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// TestReconcileIsNoOp locks Fix B: image.Reconcile must do nothing and touch
|
||||||
|
// neither the Store nor Docker (the generic reconciler pass syncs state). We
|
||||||
|
// pass a zero-value plugin.Deps whose Store and Docker are nil — the old
|
||||||
|
// implementation called deps.Store.ListContainersByWorkload then
|
||||||
|
// deps.Docker.IsContainerRunning, both of which would nil-panic. Returning nil
|
||||||
|
// without panicking proves it dereferences neither.
|
||||||
|
func TestReconcileIsNoOp(t *testing.T) {
|
||||||
|
src := &source{}
|
||||||
|
w := plugin.Workload{ID: "wl-1", Name: "app", SourceKind: "image"}
|
||||||
|
if err := src.Reconcile(context.Background(), plugin.Deps{}, w); err != nil {
|
||||||
|
t.Fatalf("Reconcile should be a no-op returning nil, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestBuildContainerName(t *testing.T) {
|
func TestBuildContainerName(t *testing.T) {
|
||||||
ts := time.Unix(1700000000, 0)
|
ts := time.Unix(1700000000, 0)
|
||||||
name := buildContainerName("My App", "abcd1234-5678-1234-abcd-deadbeef0000", "v1.2.3", ts)
|
name := buildContainerName("My App", "abcd1234-5678-1234-abcd-deadbeef0000", "v1.2.3", ts)
|
||||||
@@ -56,10 +71,10 @@ func TestFaceEnabled(t *testing.T) {
|
|||||||
|
|
||||||
func TestFqdnFor(t *testing.T) {
|
func TestFqdnFor(t *testing.T) {
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
name string
|
name string
|
||||||
face plugin.PublicFace
|
face plugin.PublicFace
|
||||||
defDom string
|
defDom string
|
||||||
want string
|
want string
|
||||||
}{
|
}{
|
||||||
{"subdomain + face domain", plugin.PublicFace{Subdomain: "api", Domain: "example.com"}, "default.io", "api.example.com"},
|
{"subdomain + face domain", plugin.PublicFace{Subdomain: "api", Domain: "example.com"}, "default.io", "api.example.com"},
|
||||||
{"subdomain inherits default", plugin.PublicFace{Subdomain: "api"}, "default.io", "api.default.io"},
|
{"subdomain inherits default", plugin.PublicFace{Subdomain: "api"}, "default.io", "api.default.io"},
|
||||||
@@ -78,8 +93,8 @@ func TestFqdnFor(t *testing.T) {
|
|||||||
func TestPrimaryFace(t *testing.T) {
|
func TestPrimaryFace(t *testing.T) {
|
||||||
t.Run("returns first enabled", func(t *testing.T) {
|
t.Run("returns first enabled", func(t *testing.T) {
|
||||||
faces := []plugin.PublicFace{
|
faces := []plugin.PublicFace{
|
||||||
{}, // disabled
|
{}, // disabled
|
||||||
{Subdomain: "api"}, // first enabled
|
{Subdomain: "api"}, // first enabled
|
||||||
{Domain: "second.example.com"},
|
{Domain: "second.example.com"},
|
||||||
}
|
}
|
||||||
got := primaryFace(faces)
|
got := primaryFace(faces)
|
||||||
|
|||||||
Reference in New Issue
Block a user