// Package image implements the "image" source: a single container pulled // from a registry. This is the canonical CI-driven shape — the registry // trigger feeds it new tags, and Deploy reconciles the running container // to match the requested tag. package image import ( "context" "encoding/json" "errors" "fmt" "log/slog" "regexp" "sort" "strings" "time" "github.com/moby/moby/api/types/mount" "github.com/alexei/tinyforge/internal/crypto" "github.com/alexei/tinyforge/internal/docker" "github.com/alexei/tinyforge/internal/proxy" "github.com/alexei/tinyforge/internal/store" "github.com/alexei/tinyforge/internal/volume" "github.com/alexei/tinyforge/internal/workload/plugin" ) // Config is the per-workload source config blob. Mirrors the deployment // fields that used to live on the projects + stages tables, less anything // that is now a Workload-level concern (notification config, webhook // secrets, public_face, group/parent). type Config struct { Image string `json:"image"` // fully-qualified, e.g. registry.example.com/owner/app RegistryName string `json:"registry_name"` // FK by name into registries table; "" = public/no auth Port int `json:"port"` // container's primary exposed port Healthcheck string `json:"healthcheck"` // HTTP path, e.g. "/healthz"; "" disables Env map[string]string `json:"env"` // injected as container env Volumes []VolumeMount `json:"volumes"` CpuLimit float64 `json:"cpu_limit"` // CPU cores; 0 = unlimited MemoryLimit int `json:"memory_limit"` // megabytes; 0 = unlimited DefaultTag string `json:"default_tag"` // tag used when intent.Reference is empty MaxInstances int `json:"max_instances"` // simultaneous containers to keep; 0/1 = strict blue-green } // VolumeMount mirrors the existing store.Volume scope shape but as a flat // per-workload list. Future absolute / named-volume scopes can extend // this without schema changes. type VolumeMount struct { Source string `json:"source"` Target string `json:"target"` Scope string `json:"scope"` Name string `json:"name"` } type source struct{} func init() { plugin.RegisterSource(&source{}) } func (*source) Kind() string { return "image" } // SchemaSample returns a populated example of Config so the frontend can // render kind-aware forms without hardcoding samples per call-site. Each // Source / Trigger exposes the same hook via plugin.SourceSchemaer / // plugin.TriggerSchemaer below. func (*source) SchemaSample() any { return Config{ Image: "registry.example.com/owner/app", Port: 8080, Healthcheck: "/healthz", Env: map[string]string{}, Volumes: []VolumeMount{}, DefaultTag: "latest", MaxInstances: 1, } } func (*source) Validate(cfg json.RawMessage) error { var c Config if len(cfg) == 0 { return fmt.Errorf("image source: config is required") } if err := json.Unmarshal(cfg, &c); err != nil { return fmt.Errorf("image source: invalid json: %w", err) } if strings.TrimSpace(c.Image) == "" { return fmt.Errorf("image source: image is required") } if c.Port < 0 || c.Port > 65535 { return fmt.Errorf("image source: port must be 0-65535") } for i, v := range c.Volumes { if strings.TrimSpace(v.Target) == "" { return fmt.Errorf("image source: volumes[%d].target is required", i) } if v.Scope == "" { return fmt.Errorf("image source: volumes[%d].scope is required", i) } } return nil } // Deploy executes a blue-green deploy of w against the image tag implied // by intent. The flow: // // 1. Short-circuit if an existing container for this workload is already // running the requested ImageRef (duplicate webhook deliveries). // 2. Pull image, ensure network. // 3. Create + start a NEW container with a unique-per-deploy name (the // old container keeps serving traffic). // 4. Optional in-network healthcheck. Failure rolls back the new // container only — the old container is untouched. // 5. Register / update each public face's proxy route to point at the // new container. // 6. Enforce cfg.MaxInstances (default 1) by removing the oldest // surplus containers belonging to this workload. With MaxInstances=1 // this is the "green" cutover — old container is removed only AFTER // the new face is live. // // Any failure between create and face-registration rolls back the new // container + its row; old serving state is preserved. func (*source) Deploy(ctx context.Context, deps plugin.Deps, w plugin.Workload, intent plugin.DeploymentIntent) (err error) { cfg, err := plugin.SourceConfigOf[Config](w) if err != nil { return fmt.Errorf("image source: decode config: %w", err) } if strings.TrimSpace(cfg.Image) == "" { return fmt.Errorf("image source: workload %s has empty image", w.ID) } tag := intent.Reference if tag == "" { tag = cfg.DefaultTag } if tag == "" { tag = "latest" } imageRef := cfg.Image + ":" + tag settings, err := deps.Store.GetSettings() if err != nil { return fmt.Errorf("image source: load settings: %w", err) } if settings.Network == "" { return fmt.Errorf("image source: settings.network is required") } existing, err := deps.Store.ListContainersByWorkload(w.ID) if err != nil { return fmt.Errorf("image source: list existing containers: %w", err) } // Idempotency: if a container is already running the requested // ImageRef, short-circuit. Saves a pull + churn on duplicate webhook // deliveries (Gitea retries on flaky 5xx, etc.). for _, c := range existing { if c.ImageRef == imageRef && c.State == "running" && c.ContainerID != "" { if running, err := deps.Docker.IsContainerRunning(ctx, c.ContainerID); err == nil && running { slog.Info("image source: deploy skipped — already running", "workload", w.ID, "image", imageRef, "trigger", intent.Reason) return nil } } } // Past the idempotency short-circuit: this is a real deploy. Emit a // terminal audit event for the per-app timeline. Armed here (not at the // top) so duplicate-webhook no-ops above don't flood the log, and // pre-flight config/settings errors above stay quiet. err is the named // return, so the deferred closure observes the final outcome. defer func() { if err != nil { plugin.EmitDeployEvent(deps, w, "image", "failed: "+err.Error()) } else { plugin.EmitDeployEvent(deps, w, "image", "deployed") } }() authConfig, err := buildRegistryAuth(deps, cfg.RegistryName) if err != nil { return fmt.Errorf("image source: %w", err) } if err := deps.Docker.PullImage(ctx, cfg.Image, tag, authConfig); err != nil { slog.Warn("image source: pull failed", "image", imageRef, "error", err) return fmt.Errorf("image source: pull %s failed", imageRef) } networkID, err := deps.Docker.EnsureNetwork(ctx, settings.Network) if err != nil { return fmt.Errorf("image source: ensure network: %w", err) } // Unique-per-deploy name so the new container can run alongside the // old one. The suffix is monotonic ms; collisions are not a real // concern for human-driven or webhook-driven deploys. containerName := buildContainerName(w.Name, w.ID, tag, time.Now()) cc := docker.ContainerConfig{ Name: containerName, Image: imageRef, Env: buildEnv(deps, w, cfg), ExposedPorts: []string{fmt.Sprintf("%d/tcp", cfg.Port)}, NetworkName: settings.Network, NetworkID: networkID, WorkloadID: w.ID, WorkloadKind: "image", Role: "image", Mounts: computeMounts(deps, w, cfg, tag, settings), CpuLimit: cfg.CpuLimit, MemoryLimit: cfg.MemoryLimit, } // Per-face proxy labels (Traefik picks these up; NPM ignores them). primary := primaryFace(w.PublicFaces) for _, face := range w.PublicFaces { if !faceEnabled(face) { continue } port := face.TargetPort if port == 0 { port = cfg.Port } fqdn := fqdnFor(face, settings.Domain) if labels := deps.Proxy.ContainerLabels(fqdn, port); labels != nil { if cc.Labels == nil { cc.Labels = map[string]string{} } for k, v := range labels { cc.Labels[k] = v } } } dockerID, err := deps.Docker.CreateContainer(ctx, cc) if err != nil { return fmt.Errorf("image source: create container: %w", err) } row := store.Container{ WorkloadID: w.ID, WorkloadKind: "image", Role: "image", ContainerID: dockerID, ImageRef: imageRef, ImageTag: tag, Host: "local", State: "stopped", Port: cfg.Port, Subdomain: primary.Subdomain, } created, err := deps.Store.CreateContainer(row) if err != nil { _ = deps.Docker.RemoveContainer(ctx, dockerID, true) return fmt.Errorf("image source: persist container row: %w", err) } // Cleanup helper: roll back only the NEW container we just created. // Old containers are left running so a failed deploy is non-disruptive. rollbackNew := func(reason string, src error) error { _ = deps.Docker.RemoveContainer(ctx, dockerID, true) if delErr := deps.Store.DeleteContainer(created.ID); delErr != nil && !errors.Is(delErr, store.ErrNotFound) { slog.Warn("image source: rollback delete row", "workload", w.ID, "row", created.ID, "stage", reason, "error", delErr) } return fmt.Errorf("image source: %s: %w", reason, src) } if err := deps.Docker.StartContainer(ctx, dockerID); err != nil { return rollbackNew("start container", err) } if err := deps.Store.UpdateContainerState(created.ID, "running"); err != nil { slog.Warn("image source: update container state", "workload", w.ID, "error", err) } // Optional in-network healthcheck. Failure rolls back the new // container; the old one keeps serving via its existing proxy face. if cfg.Healthcheck != "" && deps.Health != nil { healthURL := fmt.Sprintf("http://%s:%d%s", containerName, cfg.Port, cfg.Healthcheck) if err := deps.Health.Check(ctx, healthURL); err != nil { return rollbackNew(fmt.Sprintf("health check %s", healthURL), err) } } // Switch each public face to the new container. ConfigureRoute is // upsert-style at the proxy provider, so the old route is replaced // in-place by FQDN — no traffic gap. Per-face route IDs are // collected and stored on the container row's extra_json so Teardown // can drop every route (not just the primary). faceRoutes := map[string]string{} // fqdn → routeID for i, face := range w.PublicFaces { if !faceEnabled(face) { continue } port := face.TargetPort if port == 0 { port = cfg.Port } fqdn := fqdnFor(face, settings.Domain) forwardHost := containerName forwardPort := port if settings.NpmRemote && settings.ProxyProvider == "npm" { if settings.ServerIP == "" { return rollbackNew("configure proxy", fmt.Errorf("NPM remote mode requires settings.server_ip")) } forwardHost = settings.ServerIP hostPort, err := deps.Docker.InspectContainerPort(ctx, dockerID, fmt.Sprintf("%d/tcp", port)) if err != nil { return rollbackNew("inspect host port", err) } forwardPort = int(hostPort) } accessListID := settings.NpmAccessListID if face.AccessListID > 0 { accessListID = face.AccessListID } routeID, err := deps.Proxy.ConfigureRoute(ctx, fqdn, forwardHost, forwardPort, proxy.RouteOptions{ SSLCertificateID: settings.SSLCertificateID, AccessListID: accessListID, }) if err != nil { // Roll back any face routes we've already configured this // deploy so a partial failure doesn't leak orphan rules at // the proxy provider. for prevFQDN, prevRouteID := range faceRoutes { _ = prevFQDN if dErr := deps.Proxy.DeleteRoute(ctx, prevRouteID); dErr != nil { slog.Warn("image source: rollback proxy route", "workload", w.ID, "route", prevRouteID, "error", dErr) } } return rollbackNew(fmt.Sprintf("configure proxy face[%d]", i), err) } faceRoutes[fqdn] = routeID if i == 0 { created.ProxyRouteID = routeID created.Subdomain = face.Subdomain } // Best-effort DNS. Skipped under wildcard DNS (deps.DNS == nil). if deps.DNS != nil && settings.PublicIP != "" { if _, err := deps.DNS.EnsureRecord(ctx, fqdn, settings.PublicIP); err != nil { slog.Warn("image source: ensure DNS", "fqdn", fqdn, "error", err) } } } // Persist the per-face route map on the container row so Teardown // and the next blue-green redeploy can find every configured face. if len(faceRoutes) > 0 { extra := containerExtra{ProxyRoutes: faceRoutes} if b, err := json.Marshal(extra); err == nil { created.ExtraJSON = string(b) } } if err := deps.Store.UpdateContainer(created); err != nil { slog.Warn("image source: update container with routes", "workload", w.ID, "error", err) } // Now the new container is live behind the proxy. Enforce // MaxInstances by removing oldest surplus rows (which includes the // pre-deploy "blue" container when MaxInstances=1). maxInstances := cfg.MaxInstances if maxInstances <= 0 { maxInstances = 1 } enforceMaxInstances(ctx, deps, w, created.ID, maxInstances) return nil } // enforceMaxInstances trims older containers down to `keep` total for this // workload, preserving the just-deployed row (justDeployedRowID) at the // top. Best-effort: failures are logged, not propagated — the new deploy // already succeeded and we don't want to roll it back because cleanup of // an old container hiccupped. func enforceMaxInstances(ctx context.Context, deps plugin.Deps, w plugin.Workload, justDeployedRowID string, keep int) { rows, err := deps.Store.ListContainersByWorkload(w.ID) if err != nil { slog.Warn("image source: list for max-instances", "workload", w.ID, "error", err) return } // Sort newest first by CreatedAt, with the just-deployed row pinned // at index 0 regardless of clock skew. sort.Slice(rows, func(i, j int) bool { if rows[i].ID == justDeployedRowID { return true } if rows[j].ID == justDeployedRowID { return false } return rows[i].CreatedAt > rows[j].CreatedAt }) if len(rows) <= keep { return } for _, victim := range rows[keep:] { if victim.ID == justDeployedRowID { continue } if victim.ContainerID != "" { if err := deps.Docker.RemoveContainer(ctx, victim.ContainerID, true); err != nil { slog.Warn("image source: remove old container", "workload", w.ID, "container", victim.ContainerID, "error", err) } } // The proxy route was already replaced by ConfigureRoute earlier // (same FQDN, new target). The old route ID, if any, is still // valid in the proxy provider's DB but now points at a removed // container. Delete it to keep the proxy clean. Best-effort. if victim.ProxyRouteID != "" && victim.ProxyRouteID != findCurrentRouteID(rows, justDeployedRowID) { if err := deps.Proxy.DeleteRoute(ctx, victim.ProxyRouteID); err != nil { slog.Warn("image source: delete old proxy route", "workload", w.ID, "route", victim.ProxyRouteID, "error", err) } } if err := deps.Store.DeleteContainer(victim.ID); err != nil && !errors.Is(err, store.ErrNotFound) { slog.Warn("image source: delete old container row", "workload", w.ID, "row", victim.ID, "error", err) } } } // findCurrentRouteID returns the route ID stored on the just-deployed // row, so we don't accidentally delete the live face. func findCurrentRouteID(rows []store.Container, justDeployedRowID string) string { for _, r := range rows { if r.ID == justDeployedRowID { return r.ProxyRouteID } } return "" } // Teardown stops and removes every container, proxy route, and DNS // record owned by this workload. Idempotent. Reads extra_json off each // row so non-primary face routes are cleaned up too — without this a // multi-face workload would leak every face beyond the primary at // delete-time. func (*source) Teardown(ctx context.Context, deps plugin.Deps, w plugin.Workload) error { rows, err := deps.Store.ListContainersByWorkload(w.ID) if err != nil { return fmt.Errorf("image source: list containers: %w", err) } settings, _ := deps.Store.GetSettings() for _, c := range rows { if c.ContainerID != "" { if err := deps.Docker.RemoveContainer(ctx, c.ContainerID, true); err != nil { slog.Warn("image source: remove docker container", "workload", w.ID, "container", c.ContainerID, "error", err) } } // Collect every route to delete: the primary (c.ProxyRouteID) // plus any extras stashed under extra_json.proxy_routes. Dedup // because the primary is also re-listed in the extras map. toDelete := map[string]string{} // fqdn → routeID if c.ProxyRouteID != "" { toDelete[c.Subdomain] = c.ProxyRouteID // key is opaque; we only iterate values } if c.ExtraJSON != "" && c.ExtraJSON != "{}" { var ex containerExtra if jErr := json.Unmarshal([]byte(c.ExtraJSON), &ex); jErr == nil { for fqdn, rid := range ex.ProxyRoutes { toDelete[fqdn] = rid } } } seenRoute := map[string]struct{}{} for _, rid := range toDelete { if _, dup := seenRoute[rid]; dup { continue } seenRoute[rid] = struct{}{} if err := deps.Proxy.DeleteRoute(ctx, rid); err != nil { slog.Warn("image source: delete proxy route", "workload", w.ID, "route", rid, "error", err) } } if deps.DNS != nil && c.Subdomain != "" && settings.Domain != "" { fqdn := c.Subdomain + "." + settings.Domain if err := deps.DNS.DeleteRecord(ctx, fqdn); err != nil { slog.Warn("image source: delete DNS", "fqdn", fqdn, "error", err) } } if err := deps.Store.DeleteContainer(c.ID); err != nil && !errors.Is(err, store.ErrNotFound) { slog.Warn("image source: delete container row", "id", c.ID, "error", err) } } return nil } // containerExtra is the shape stored under container.extra_json by the // image source. Kept versionless on purpose — additive only, unknown // keys must be ignored by older deployers reading rows written by newer // ones. type containerExtra struct { ProxyRoutes map[string]string `json:"proxy_routes,omitempty"` } // Reconcile is intentionally a no-op for the image source. // // State sync is fully handled by the generic reconciler pass that runs // EARLIER in the same Reconciler.ReconcileOnce: its upsert loop writes each // present container's State from the single `docker ps -a` snapshot // (ListAllForReconciler), and its markMissing pass flips rows whose container // ID is absent from that snapshot to 'missing'. Every image container carries // the tinyforge.workload.id label (ContainerConfig.WorkloadID at create time), // so the generic pass covers all of them. // // The previous implementation looped this workload's container rows and called // Docker.IsContainerRunning per row — a redundant Docker inspect per container // per tick that duplicated work already done from the snapshot and scaled as N // Docker API calls/tick. Returning nil here drops that cost without changing // observable state. The method stays because the source interface requires it. func (*source) Reconcile(context.Context, plugin.Deps, plugin.Workload) error { return nil } // buildRegistryAuth returns a Docker registry auth string for the named // registry, or "" when no auth is configured. Username is taken from // reg.Owner when present; falls back to the token for registries that // accept token-as-username (Docker Hub PATs, GHCR, etc.). func buildRegistryAuth(deps plugin.Deps, registryName string) (string, error) { if registryName == "" { return "", nil } reg, err := deps.Store.GetRegistryByName(registryName) if err != nil { return "", fmt.Errorf("get registry %s: %w", registryName, err) } if reg.Token == "" { return "", nil } token, err := crypto.Decrypt(deps.EncKey, reg.Token) if err != nil { return "", fmt.Errorf("decrypt registry token: %w", err) } username := reg.Owner if username == "" { username = token } return docker.EncodeRegistryAuth(username, token, reg.URL) } // buildEnv flattens cfg.Env plus the workload_env overrides into the // KEY=VALUE list Docker expects. workload_env wins on key conflict and // encrypted rows are decrypted lazily so plaintext never lives in the // store output. If a decrypt fails the value is skipped with a warning — // failing the whole deploy because one rotated key bricked one env entry // would be a worse outcome than the missing variable. func buildEnv(deps plugin.Deps, w plugin.Workload, cfg Config) []string { merged := make(map[string]string, len(cfg.Env)) for k, v := range cfg.Env { merged[k] = v } overrides, err := deps.Store.ListWorkloadEnv(w.ID) if err != nil { slog.Warn("image source: list workload env", "workload", w.ID, "error", err) } else { for _, e := range overrides { value := e.Value if e.Encrypted { decrypted, err := crypto.Decrypt(deps.EncKey, e.Value) if err != nil { slog.Warn("image source: decrypt env value", "workload", w.ID, "key", e.Key, "error", err) continue } value = decrypted } merged[e.Key] = value } } out := make([]string, 0, len(merged)) for k, v := range merged { out = append(out, k+"="+v) } return out } // computeMounts resolves a workload's VolumeMounts into mount.Mount // values. Both inline `cfg.Volumes` and persisted `workload_volumes` are // considered — persisted rows win on target conflict so the operator's // last UI-side edit takes precedence over whatever shipped with the // config blob. // // All VolumeScope values are honored: // // - absolute → host bind (validated against settings.AllowedVolumePaths) // - ephemeral → tmpfs (no host path) // - instance → per-tag dir under /instance-/ // - stage → shared per-workload dir (alias of project) // - project → shared per-workload dir // - project_named → workload-scoped Docker named volume // - named → globally-scoped Docker named volume // // Volumes with empty target or unresolvable scope are skipped with a // warning rather than failing the whole deploy — a misconfigured volume // should not brick an otherwise-valid CI push. func computeMounts(deps plugin.Deps, w plugin.Workload, cfg Config, imageTag string, settings store.Settings) []mount.Mount { byTarget := map[string]VolumeMount{} for _, v := range cfg.Volumes { if v.Target == "" { continue } byTarget[v.Target] = v } if persisted, err := deps.Store.ListWorkloadVolumes(w.ID); err == nil { for _, p := range persisted { byTarget[p.Target] = VolumeMount{ Source: p.Source, Target: p.Target, Scope: p.Scope, Name: p.Name, } } } else { slog.Warn("image source: list workload volumes", "workload", w.ID, "error", err) } params := volume.ResolveWorkloadParams{ BasePath: settings.BaseVolumePath, WorkloadID: w.ID, WorkloadName: w.Name, ImageTag: imageTag, AllowedVolumePaths: settings.AllowedVolumePaths, } out := make([]mount.Mount, 0, len(byTarget)) for _, v := range byTarget { if v.Target == "" { continue } switch v.Scope { case string(store.VolumeScopeEphemeral): out = append(out, mount.Mount{Type: mount.TypeTmpfs, Target: v.Target}) continue case string(store.VolumeScopeNamed), string(store.VolumeScopeProjectNamed): // Docker named volumes use the volume name as Source. We // scope project_named entries to the workload by prefixing // the name so two workloads can both claim "data" without // sharing storage. name := v.Name if name == "" { slog.Warn("image source: named volume missing name", "workload", w.ID, "target", v.Target) continue } if v.Scope == string(store.VolumeScopeProjectNamed) { name = workloadNamedVolume(w, name) } out = append(out, mount.Mount{Type: mount.TypeVolume, Source: name, Target: v.Target}) continue } // Everything else resolves to a host path (absolute, instance, // stage, project). Empty source on absolute is invalid; for the // others "source" is the per-scope subdirectory. wv := store.WorkloadVolume{ Source: v.Source, Target: v.Target, Scope: v.Scope, Name: v.Name, } path, err := volume.ResolveWorkloadPath(wv, params) if err != nil { slog.Warn("image source: resolve volume", "workload", w.ID, "target", v.Target, "scope", v.Scope, "error", err) continue } out = append(out, mount.Mount{Type: mount.TypeBind, Source: path, Target: v.Target}) } return out } // workloadNamedVolume builds the Docker volume name for a project_named // mount. The "tf-" prefix and short-id suffix keep volumes from one // workload separate from another's, even when they share a logical // volume name. func workloadNamedVolume(w plugin.Workload, name string) string { idShort := w.ID if len(idShort) > 8 { idShort = idShort[:8] } clean := strings.Trim(nameSanitizer.ReplaceAllString(name, "-"), "-") return "tf-" + idShort + "-" + clean } // buildContainerName generates a deterministic container name keyed on // workload + tag. The scheme intentionally diverges from the legacy // "dw-{project}-{stage}-{tag}" scheme so plugin-managed containers are // trivially distinguishable in `docker ps`. var nameSanitizer = regexp.MustCompile(`[^a-zA-Z0-9_.-]`) func buildContainerName(workloadName, workloadID, tag string, ts time.Time) string { clean := func(s string) string { return strings.Trim(nameSanitizer.ReplaceAllString(s, "-"), "-") } idShort := workloadID if len(idShort) > 8 { idShort = idShort[:8] } // Suffix is a millisecond-resolution monotonic stamp so two deploys // can never collide on container name (blue-green needs the new // container to start while the old one is still bound to the same // "tf-name-id-tag" prefix). suffix := fmt.Sprintf("%x", ts.UnixMilli()) return fmt.Sprintf("tf-%s-%s-%s-%s", clean(workloadName), idShort, clean(tag), suffix) } // faceEnabled is true for any face that should yield a proxy route. A // face with empty subdomain AND empty domain is treated as disabled. func faceEnabled(f plugin.PublicFace) bool { return f.Subdomain != "" || f.Domain != "" } func fqdnFor(f plugin.PublicFace, defaultDomain string) string { domain := f.Domain if domain == "" { domain = defaultDomain } if f.Subdomain == "" { return domain } return f.Subdomain + "." + domain } func primaryFace(faces []plugin.PublicFace) plugin.PublicFace { for _, f := range faces { if faceEnabled(f) { return f } } return plugin.PublicFace{} }