7d6719da12
Replace direct npm.Client usage throughout the codebase with the proxy.Provider interface, enabling pluggable proxy backends. The deployer, API layer, and proxy manager now use provider-agnostic route management (ConfigureRoute/DeleteRoute) instead of NPM-specific API calls. Adds ProxyRouteID (string) to Instance model and ProxyProvider setting to Settings, with SQLite migrations for backward compatibility.
780 lines
24 KiB
Go
780 lines
24 KiB
Go
package deployer
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/alexei/docker-watcher/internal/crypto"
|
|
"github.com/alexei/docker-watcher/internal/dns"
|
|
"github.com/alexei/docker-watcher/internal/docker"
|
|
"github.com/alexei/docker-watcher/internal/events"
|
|
"github.com/alexei/docker-watcher/internal/health"
|
|
"github.com/alexei/docker-watcher/internal/notify"
|
|
"github.com/alexei/docker-watcher/internal/proxy"
|
|
"github.com/alexei/docker-watcher/internal/store"
|
|
"github.com/alexei/docker-watcher/internal/volume"
|
|
"github.com/moby/moby/api/types/mount"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// Deployer orchestrates the full deployment flow: pull image, create container,
|
|
// start, configure proxy, health check, and handle rollback on failure.
|
|
// It implements both webhook.DeployTriggerer and registry.DeployTriggerer.
|
|
type Deployer struct {
|
|
docker *docker.Client
|
|
proxy proxy.Provider
|
|
store *store.Store
|
|
health *health.Checker
|
|
notifier *notify.Notifier
|
|
eventBus EventPublisher
|
|
encKey [32]byte
|
|
dnsMu sync.RWMutex
|
|
dns dns.Provider // nil when wildcard DNS is active
|
|
|
|
// Graceful shutdown: tracks in-progress deploys.
|
|
activeWg sync.WaitGroup
|
|
shuttingDown atomic.Bool
|
|
}
|
|
|
|
// EventPublisher is the interface for publishing events to the event bus.
|
|
type EventPublisher interface {
|
|
Publish(evt events.Event)
|
|
}
|
|
|
|
// New creates a new Deployer with all required dependencies.
|
|
func New(
|
|
dockerClient *docker.Client,
|
|
proxyProvider proxy.Provider,
|
|
st *store.Store,
|
|
checker *health.Checker,
|
|
notifier *notify.Notifier,
|
|
eventBus EventPublisher,
|
|
encKey [32]byte,
|
|
) *Deployer {
|
|
return &Deployer{
|
|
docker: dockerClient,
|
|
proxy: proxyProvider,
|
|
store: st,
|
|
health: checker,
|
|
notifier: notifier,
|
|
eventBus: eventBus,
|
|
encKey: encKey,
|
|
}
|
|
}
|
|
|
|
// SetDNSProvider sets the DNS provider for managing DNS records during deployments.
|
|
// Pass nil to disable DNS management (wildcard DNS mode).
|
|
func (d *Deployer) SetDNSProvider(provider dns.Provider) {
|
|
d.dnsMu.Lock()
|
|
defer d.dnsMu.Unlock()
|
|
d.dns = provider
|
|
}
|
|
|
|
// getDNS returns the current DNS provider under read lock.
|
|
func (d *Deployer) getDNS() dns.Provider {
|
|
d.dnsMu.RLock()
|
|
defer d.dnsMu.RUnlock()
|
|
return d.dns
|
|
}
|
|
|
|
// Drain waits for all in-progress deploys to complete. Call this during graceful shutdown.
|
|
func (d *Deployer) Drain() {
|
|
d.shuttingDown.Store(true)
|
|
slog.Info("deployer: draining in-progress deploys")
|
|
d.activeWg.Wait()
|
|
slog.Info("deployer: all deploys drained")
|
|
}
|
|
|
|
// AsyncTriggerDeploy creates a deploy record and returns the deploy ID immediately,
|
|
// then runs the full deploy pipeline in a background goroutine. Use this from HTTP handlers
|
|
// to avoid blocking the request. Progress is streamed via SSE.
|
|
func (d *Deployer) AsyncTriggerDeploy(ctx context.Context, projectID, stageID, imageTag string) (string, error) {
|
|
if d.shuttingDown.Load() {
|
|
return "", fmt.Errorf("deployer is shutting down, rejecting new deploy")
|
|
}
|
|
|
|
// Validate inputs synchronously so the caller gets immediate feedback.
|
|
project, err := d.store.GetProjectByID(projectID)
|
|
if err != nil {
|
|
return "", fmt.Errorf("get project: %w", err)
|
|
}
|
|
stage, err := d.store.GetStageByID(stageID)
|
|
if err != nil {
|
|
return "", fmt.Errorf("get stage: %w", err)
|
|
}
|
|
if err := d.validatePromoteFrom(stage, imageTag); err != nil {
|
|
return "", fmt.Errorf("promote validation: %w", err)
|
|
}
|
|
|
|
// Create deploy record synchronously so caller gets the ID.
|
|
deploy, err := d.store.CreateDeploy(store.Deploy{
|
|
ProjectID: projectID,
|
|
StageID: stageID,
|
|
ImageTag: imageTag,
|
|
Status: "pending",
|
|
})
|
|
if err != nil {
|
|
return "", fmt.Errorf("create deploy record: %w", err)
|
|
}
|
|
|
|
// Run the actual deploy in the background.
|
|
d.activeWg.Add(1)
|
|
go func() {
|
|
defer d.activeWg.Done()
|
|
// Use a detached context so client disconnect doesn't abort the deploy.
|
|
bgCtx := context.Background()
|
|
if err := d.runDeploy(bgCtx, project, stage, deploy.ID, imageTag); err != nil {
|
|
slog.Error("async deploy failed", "deploy_id", deploy.ID, "error", err)
|
|
}
|
|
}()
|
|
|
|
return deploy.ID, nil
|
|
}
|
|
|
|
// runDeploy is the internal deploy pipeline used by AsyncTriggerDeploy.
|
|
// It assumes the deploy record already exists and project/stage are validated.
|
|
func (d *Deployer) runDeploy(ctx context.Context, project store.Project, stage store.Stage, deployID string, imageTag string) error {
|
|
settings, err := d.store.GetSettings()
|
|
if err != nil {
|
|
if updateErr := d.store.UpdateDeployStatus(deployID, "failed", err.Error()); updateErr != nil {
|
|
slog.Warn("update deploy status", "error", updateErr)
|
|
}
|
|
return fmt.Errorf("get settings: %w", err)
|
|
}
|
|
|
|
slog.Info("starting deploy",
|
|
"deploy_id", deployID,
|
|
"project", project.Name,
|
|
"stage", stage.Name,
|
|
"tag", imageTag,
|
|
)
|
|
d.logDeploy(deployID, fmt.Sprintf("Starting deploy of %s:%s for project %s, stage %s", project.Image, imageTag, project.Name, stage.Name), "info")
|
|
|
|
// Enforce max_instances before deploying.
|
|
if err := d.enforceMaxInstances(ctx, stage, deployID, settings); err != nil {
|
|
d.logDeploy(deployID, fmt.Sprintf("Failed to enforce max instances: %v", err), "error")
|
|
}
|
|
|
|
var containerID string
|
|
var proxyRouteID string
|
|
var instanceID string
|
|
var deployErr error
|
|
|
|
if stage.MaxInstances == 1 {
|
|
containerID, proxyRouteID, instanceID, deployErr = d.blueGreenDeploy(ctx, project, stage, settings, deployID, imageTag)
|
|
} else {
|
|
containerID, proxyRouteID, instanceID, deployErr = d.executeDeploy(ctx, project, stage, settings, deployID, imageTag)
|
|
}
|
|
|
|
if deployErr != nil {
|
|
d.logDeploy(deployID, fmt.Sprintf("Deploy failed: %v", deployErr), "error")
|
|
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "failed", deployErr.Error())
|
|
d.rollback(ctx, deployID, containerID, proxyRouteID, instanceID)
|
|
|
|
d.notifier.Send(settings.NotificationURL, notify.Event{
|
|
Type: "deploy_failure",
|
|
Project: project.Name,
|
|
Stage: stage.Name,
|
|
ImageTag: imageTag,
|
|
Error: deployErr.Error(),
|
|
})
|
|
|
|
return fmt.Errorf("deploy failed: %w", deployErr)
|
|
}
|
|
|
|
if err := d.store.UpdateDeployStatus(deployID, "success", ""); err != nil {
|
|
slog.Warn("update deploy status to success", "error", err)
|
|
}
|
|
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "success", "")
|
|
|
|
subdomain := d.buildSubdomain(project, stage, settings, imageTag)
|
|
fullURL := fmt.Sprintf("https://%s.%s", subdomain, settings.Domain)
|
|
|
|
d.logDeploy(deployID, fmt.Sprintf("Deploy successful: %s", fullURL), "info")
|
|
|
|
d.notifier.Send(settings.NotificationURL, notify.Event{
|
|
Type: "deploy_success",
|
|
Project: project.Name,
|
|
Stage: stage.Name,
|
|
ImageTag: imageTag,
|
|
Subdomain: subdomain,
|
|
URL: fullURL,
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// TriggerDeploy is the synchronous entry point for deployments (used by poller and webhook).
|
|
// It validates inputs, creates a deploy record, and delegates to runDeploy.
|
|
func (d *Deployer) TriggerDeploy(ctx context.Context, projectID, stageID, imageTag string) error {
|
|
if d.shuttingDown.Load() {
|
|
return fmt.Errorf("deployer is shutting down, rejecting new deploy")
|
|
}
|
|
|
|
d.activeWg.Add(1)
|
|
defer d.activeWg.Done()
|
|
|
|
project, err := d.store.GetProjectByID(projectID)
|
|
if err != nil {
|
|
return fmt.Errorf("get project: %w", err)
|
|
}
|
|
|
|
stage, err := d.store.GetStageByID(stageID)
|
|
if err != nil {
|
|
return fmt.Errorf("get stage: %w", err)
|
|
}
|
|
|
|
if err := d.validatePromoteFrom(stage, imageTag); err != nil {
|
|
return fmt.Errorf("promote validation: %w", err)
|
|
}
|
|
|
|
deploy, err := d.store.CreateDeploy(store.Deploy{
|
|
ProjectID: projectID,
|
|
StageID: stageID,
|
|
ImageTag: imageTag,
|
|
Status: "pending",
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("create deploy record: %w", err)
|
|
}
|
|
|
|
if err := d.runDeploy(ctx, project, stage, deploy.ID, imageTag); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// executeDeploy runs the deploy pipeline steps and returns rollback-relevant state.
|
|
// It returns (containerID, proxyRouteID, instanceID, error).
|
|
func (d *Deployer) executeDeploy(
|
|
ctx context.Context,
|
|
project store.Project,
|
|
stage store.Stage,
|
|
settings store.Settings,
|
|
deployID string,
|
|
imageTag string,
|
|
) (string, string, string, error) {
|
|
var containerID string
|
|
var proxyRouteID string
|
|
var instanceID string
|
|
|
|
// Step 1: Pull image.
|
|
if err := d.store.UpdateDeployStatus(deployID, "pulling", ""); err != nil {
|
|
slog.Warn("update deploy status", "error", err)
|
|
}
|
|
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "pulling", "")
|
|
d.logDeploy(deployID, fmt.Sprintf("Pulling image %s:%s", project.Image, imageTag), "info")
|
|
|
|
authConfig, err := d.buildRegistryAuth(project)
|
|
if err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("build registry auth: %w", err)
|
|
}
|
|
|
|
if err := d.docker.PullImage(ctx, project.Image, imageTag, authConfig); err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("pull image: %w", err)
|
|
}
|
|
d.logDeploy(deployID, "Image pulled successfully", "info")
|
|
|
|
// Step 2: Ensure network exists.
|
|
networkID, err := d.docker.EnsureNetwork(ctx, settings.Network)
|
|
if err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("ensure network: %w", err)
|
|
}
|
|
d.logDeploy(deployID, fmt.Sprintf("Network %s ready (ID: %s)", settings.Network, truncateID(networkID)), "info")
|
|
|
|
// Step 3: Create and start container.
|
|
if err := d.store.UpdateDeployStatus(deployID, "starting", ""); err != nil {
|
|
slog.Warn("update deploy status", "error", err)
|
|
}
|
|
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "starting", "")
|
|
|
|
// Pre-generate instance ID so it can be set as a container label.
|
|
instanceID = uuid.New().String()
|
|
subdomain := d.buildSubdomain(project, stage, settings, imageTag)
|
|
|
|
containerName := docker.ContainerName(project.Name, stage.Name, imageTag)
|
|
portStr := fmt.Sprintf("%d/tcp", project.Port)
|
|
envVars := d.mergeEnvVars(project, stage.ID)
|
|
mounts := d.computeVolumeMounts(project.ID, project.Name, stage.Name, imageTag, settings.BaseVolumePath)
|
|
|
|
containerCfg := docker.ContainerConfig{
|
|
Name: containerName,
|
|
Image: project.Image + ":" + imageTag,
|
|
Env: envVars,
|
|
ExposedPorts: []string{portStr},
|
|
NetworkName: settings.Network,
|
|
NetworkID: networkID,
|
|
Project: project.Name,
|
|
Stage: stage.Name,
|
|
InstanceID: instanceID,
|
|
Mounts: mounts,
|
|
}
|
|
|
|
d.logDeploy(deployID, fmt.Sprintf("Creating container %s", containerName), "info")
|
|
containerID, err = d.docker.CreateContainer(ctx, containerCfg)
|
|
if err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("create container: %w", err)
|
|
}
|
|
d.logDeploy(deployID, fmt.Sprintf("Container created (ID: %s)", truncateID(containerID)), "info")
|
|
|
|
// Create instance record in store with the pre-generated ID.
|
|
inst, err := d.store.CreateInstanceWithID(store.Instance{
|
|
ID: instanceID,
|
|
StageID: stage.ID,
|
|
ProjectID: project.ID,
|
|
ContainerID: containerID,
|
|
ImageTag: imageTag,
|
|
Subdomain: subdomain,
|
|
Status: "stopped",
|
|
Port: project.Port,
|
|
})
|
|
if err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("create instance record: %w", err)
|
|
}
|
|
instanceID = inst.ID
|
|
|
|
// Link deploy to instance.
|
|
if err := d.store.SetDeployInstanceID(deployID, instanceID); err != nil {
|
|
slog.Warn("link deploy to instance", "error", err)
|
|
}
|
|
|
|
d.logDeploy(deployID, fmt.Sprintf("Starting container %s", containerName), "info")
|
|
if err := d.docker.StartContainer(ctx, containerID); err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("start container: %w", err)
|
|
}
|
|
|
|
if err := d.store.UpdateInstanceStatus(instanceID, "running"); err != nil {
|
|
slog.Warn("update instance status to running", "error", err)
|
|
}
|
|
if err := d.store.UpdateLastAliveAt(instanceID); err != nil {
|
|
slog.Warn("update last_alive_at on deploy", "instance_id", instanceID, "error", err)
|
|
}
|
|
d.publishInstanceStatus(instanceID, project.ID, stage.ID, "running")
|
|
d.logDeploy(deployID, "Container started", "info")
|
|
|
|
// Step 4: Configure NPM proxy (optional per stage).
|
|
if stage.EnableProxy {
|
|
if err := d.store.UpdateDeployStatus(deployID, "configuring_proxy", ""); err != nil {
|
|
slog.Warn("update deploy status", "error", err)
|
|
}
|
|
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "configuring_proxy", "")
|
|
|
|
proxyRouteID, err = d.configureProxy(ctx, deployID, settings, containerName, project.Port, subdomain)
|
|
if err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("configure proxy: %w", err)
|
|
}
|
|
|
|
// Update instance with proxy route ID.
|
|
inst.ProxyRouteID = proxyRouteID
|
|
inst.Subdomain = subdomain
|
|
if err := d.store.UpdateInstance(inst); err != nil {
|
|
slog.Warn("update instance with proxy ID", "error", err)
|
|
}
|
|
|
|
// Create DNS record for this instance.
|
|
fqdn := subdomain + "." + settings.Domain
|
|
d.ensureDNS(ctx, fqdn, "instance", instanceID, deployID)
|
|
} else {
|
|
d.logDeploy(deployID, "Proxy creation skipped (disabled for this stage)", "info")
|
|
inst.Subdomain = subdomain
|
|
if err := d.store.UpdateInstance(inst); err != nil {
|
|
slog.Warn("update instance", "error", err)
|
|
}
|
|
}
|
|
|
|
// Step 5: Health check.
|
|
if project.Healthcheck != "" {
|
|
if err := d.store.UpdateDeployStatus(deployID, "health_checking", ""); err != nil {
|
|
slog.Warn("update deploy status", "error", err)
|
|
}
|
|
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "health_checking", "")
|
|
|
|
healthURL := fmt.Sprintf("http://%s:%d%s", containerName, project.Port, project.Healthcheck)
|
|
d.logDeploy(deployID, fmt.Sprintf("Running health check: %s", healthURL), "info")
|
|
|
|
if err := d.health.Check(ctx, healthURL); err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("health check: %w", err)
|
|
}
|
|
d.logDeploy(deployID, "Health check passed", "info")
|
|
} else {
|
|
d.logDeploy(deployID, "No health check configured, skipping", "info")
|
|
}
|
|
|
|
return containerID, proxyRouteID, instanceID, nil
|
|
}
|
|
|
|
// configureProxy creates or updates a proxy route for the deployed container.
|
|
// Uses the configured proxy.Provider (NPM, Traefik, or None).
|
|
// Returns the proxy route ID string.
|
|
func (d *Deployer) configureProxy(
|
|
ctx context.Context,
|
|
deployID string,
|
|
settings store.Settings,
|
|
containerName string,
|
|
containerPort int,
|
|
subdomain string,
|
|
) (string, error) {
|
|
fqdn := subdomain + "." + settings.Domain
|
|
d.logDeploy(deployID, fmt.Sprintf("Configuring proxy (%s): %s -> %s:%d", d.proxy.Name(), fqdn, containerName, containerPort), "info")
|
|
|
|
routeID, err := d.proxy.ConfigureRoute(ctx, fqdn, containerName, containerPort, proxy.RouteOptions{
|
|
SSLCertificateID: settings.SSLCertificateID,
|
|
})
|
|
if err != nil {
|
|
return "", fmt.Errorf("configure proxy route: %w", err)
|
|
}
|
|
|
|
if routeID != "" {
|
|
d.logDeploy(deployID, fmt.Sprintf("Proxy route configured (ID: %s)", routeID), "info")
|
|
}
|
|
return routeID, nil
|
|
}
|
|
|
|
// enforceMaxInstances removes the oldest instances when the stage has reached its limit.
|
|
// This makes room for the new deployment.
|
|
func (d *Deployer) enforceMaxInstances(ctx context.Context, stage store.Stage, deployID string, settings store.Settings) error {
|
|
if stage.MaxInstances <= 0 {
|
|
return nil
|
|
}
|
|
|
|
instances, err := d.store.GetInstancesByStageID(stage.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("get instances for stage: %w", err)
|
|
}
|
|
|
|
// Filter to running/stopped instances (not already failed/removing).
|
|
var active []store.Instance
|
|
for _, inst := range instances {
|
|
if inst.Status == "running" || inst.Status == "stopped" {
|
|
active = append(active, inst)
|
|
}
|
|
}
|
|
|
|
// We need room for one more instance, so remove oldest when at limit.
|
|
removeCount := len(active) - stage.MaxInstances + 1
|
|
if removeCount <= 0 {
|
|
return nil
|
|
}
|
|
|
|
// Sort by created_at ascending (oldest first).
|
|
sort.Slice(active, func(i, j int) bool {
|
|
return active[i].CreatedAt < active[j].CreatedAt
|
|
})
|
|
|
|
for i := 0; i < removeCount && i < len(active); i++ {
|
|
inst := active[i]
|
|
d.logDeploy(deployID, fmt.Sprintf("Removing oldest instance %s (tag: %s) to enforce max_instances=%d", inst.ID, inst.ImageTag, stage.MaxInstances), "info")
|
|
|
|
if err := d.removeInstance(ctx, inst, settings); err != nil {
|
|
d.logDeploy(deployID, fmt.Sprintf("Failed to remove instance %s: %v", inst.ID, err), "warn")
|
|
continue
|
|
}
|
|
d.logDeploy(deployID, fmt.Sprintf("Removed instance %s", inst.ID), "info")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// removeInstance stops and removes a container, deletes its NPM proxy host,
|
|
// and removes the instance record from the store.
|
|
func (d *Deployer) removeInstance(ctx context.Context, inst store.Instance, settings store.Settings) error {
|
|
// Mark as removing.
|
|
if err := d.store.UpdateInstanceStatus(inst.ID, "removing"); err != nil {
|
|
slog.Warn("update instance status to removing", "instance_id", inst.ID, "error", err)
|
|
}
|
|
|
|
// Remove Docker container.
|
|
if inst.ContainerID != "" {
|
|
if err := d.docker.RemoveContainer(ctx, inst.ContainerID, true); err != nil {
|
|
slog.Warn("remove container", "container_id", inst.ContainerID, "error", err)
|
|
}
|
|
}
|
|
|
|
// Delete proxy route.
|
|
if inst.ProxyRouteID != "" {
|
|
if err := d.proxy.DeleteRoute(ctx, inst.ProxyRouteID); err != nil {
|
|
slog.Warn("delete proxy route", "route_id", inst.ProxyRouteID, "error", err)
|
|
}
|
|
|
|
// Remove DNS record for this instance.
|
|
if inst.Subdomain != "" && settings.Domain != "" {
|
|
fqdn := inst.Subdomain + "." + settings.Domain
|
|
d.removeDNS(ctx, fqdn, "")
|
|
}
|
|
}
|
|
|
|
// Delete instance record.
|
|
if err := d.store.DeleteInstance(inst.ID); err != nil {
|
|
return fmt.Errorf("delete instance record: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// buildSubdomain generates the subdomain for an instance based on settings and stage config.
|
|
func (d *Deployer) buildSubdomain(project store.Project, stage store.Stage, settings store.Settings, imageTag string) string {
|
|
return GenerateTaggedSubdomain(settings.SubdomainPattern, project.Name, stage.Name, imageTag, stage.Subdomain)
|
|
}
|
|
|
|
// buildRegistryAuth constructs the Docker registry auth string for pulling images.
|
|
// If the project has a registry configured, it looks up the registry token.
|
|
func (d *Deployer) buildRegistryAuth(project store.Project) (string, error) {
|
|
if project.Registry == "" {
|
|
return "", nil
|
|
}
|
|
|
|
reg, err := d.store.GetRegistryByName(project.Registry)
|
|
if err != nil {
|
|
return "", fmt.Errorf("get registry %s: %w", project.Registry, err)
|
|
}
|
|
|
|
if reg.Token != "" {
|
|
decrypted, err := crypto.Decrypt(d.encKey, reg.Token)
|
|
if err != nil {
|
|
return "", fmt.Errorf("decrypt registry token: %w", err)
|
|
}
|
|
return docker.EncodeRegistryAuth(decrypted, decrypted, reg.URL)
|
|
}
|
|
return "", nil
|
|
}
|
|
|
|
// mergeEnvVars builds the final environment variable list for a container:
|
|
// 1. Parse project-level env JSON
|
|
// 2. Overlay with stage-level env overrides (stage wins on key conflict)
|
|
// 3. Decrypt any encrypted (secret) values
|
|
// Returns a []string of KEY=VALUE pairs.
|
|
func (d *Deployer) mergeEnvVars(project store.Project, stageID string) []string {
|
|
// Step 1: Parse project-level env.
|
|
envMap := make(map[string]string)
|
|
if project.Env != "" && project.Env != "{}" {
|
|
var projectEnv map[string]string
|
|
if err := json.Unmarshal([]byte(project.Env), &projectEnv); err != nil {
|
|
slog.Warn("parse project env vars", "error", err)
|
|
} else {
|
|
for k, v := range projectEnv {
|
|
envMap[k] = v
|
|
}
|
|
}
|
|
}
|
|
|
|
// Step 2: Overlay with stage-level overrides.
|
|
stageEnvs, err := d.store.GetStageEnvByStageID(stageID)
|
|
if err != nil {
|
|
slog.Warn("get stage env overrides", "stage_id", stageID, "error", err)
|
|
} else {
|
|
for _, se := range stageEnvs {
|
|
value := se.Value
|
|
if se.Encrypted {
|
|
// Step 3: Decrypt secret values.
|
|
decrypted, err := crypto.Decrypt(d.encKey, se.Value)
|
|
if err != nil {
|
|
slog.Warn("decrypt stage env value", "key", se.Key, "error", err)
|
|
continue
|
|
}
|
|
value = decrypted
|
|
}
|
|
envMap[se.Key] = value
|
|
}
|
|
}
|
|
|
|
vars := make([]string, 0, len(envMap))
|
|
for k, v := range envMap {
|
|
vars = append(vars, k+"="+v)
|
|
}
|
|
return vars
|
|
}
|
|
|
|
// computeVolumeMounts builds Docker mount specifications from the project's volume config.
|
|
// Uses the shared volume.ResolvePath for path resolution.
|
|
func (d *Deployer) computeVolumeMounts(projectID, projectName, stageName, imageTag, basePath string) []mount.Mount {
|
|
vols, err := d.store.GetVolumesByProjectID(projectID)
|
|
if err != nil {
|
|
slog.Warn("get project volumes", "project_id", projectID, "error", err)
|
|
return nil
|
|
}
|
|
|
|
if len(vols) == 0 {
|
|
return nil
|
|
}
|
|
|
|
params := volume.ResolveParams{
|
|
BasePath: basePath,
|
|
ProjectName: projectName,
|
|
StageName: stageName,
|
|
ImageTag: imageTag,
|
|
}
|
|
|
|
mounts := make([]mount.Mount, 0, len(vols))
|
|
for _, vol := range vols {
|
|
scope := vol.Scope
|
|
if scope == "" {
|
|
switch vol.Mode {
|
|
case "isolated":
|
|
scope = "instance"
|
|
default:
|
|
scope = "project"
|
|
}
|
|
}
|
|
|
|
// Ephemeral volumes use tmpfs — no host path.
|
|
if scope == "ephemeral" {
|
|
mounts = append(mounts, mount.Mount{
|
|
Type: mount.TypeTmpfs,
|
|
Target: vol.Target,
|
|
})
|
|
continue
|
|
}
|
|
|
|
source, err := volume.ResolvePath(vol, params)
|
|
if err != nil {
|
|
slog.Warn("resolve volume path", "volume_id", vol.ID, "error", err)
|
|
continue
|
|
}
|
|
|
|
mounts = append(mounts, mount.Mount{
|
|
Type: mount.TypeBind,
|
|
Source: source,
|
|
Target: vol.Target,
|
|
})
|
|
}
|
|
return mounts
|
|
}
|
|
|
|
// logDeploy appends a log entry for a deploy and publishes it on the event bus.
|
|
// Errors are logged to stderr but not propagated.
|
|
func (d *Deployer) logDeploy(deployID, message, level string) {
|
|
if err := d.store.AppendDeployLog(deployID, message, level); err != nil {
|
|
slog.Warn("append deploy log", "error", err)
|
|
}
|
|
if d.eventBus != nil {
|
|
d.eventBus.Publish(events.Event{
|
|
Type: events.EventDeployLog,
|
|
Payload: events.DeployLogPayload{
|
|
DeployID: deployID,
|
|
Message: message,
|
|
Level: level,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
// publishDeployStatus publishes a deploy status change event on the bus.
|
|
func (d *Deployer) publishDeployStatus(deployID, projectID, stageID, imageTag, status, deployErr string) {
|
|
if d.eventBus != nil {
|
|
d.eventBus.Publish(events.Event{
|
|
Type: events.EventDeployStatus,
|
|
Payload: events.DeployStatusPayload{
|
|
DeployID: deployID,
|
|
ProjectID: projectID,
|
|
StageID: stageID,
|
|
ImageTag: imageTag,
|
|
Status: status,
|
|
Error: deployErr,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
// publishInstanceStatus publishes an instance status change event on the bus.
|
|
func (d *Deployer) publishInstanceStatus(instanceID, projectID, stageID, status string) {
|
|
if d.eventBus != nil {
|
|
d.eventBus.Publish(events.Event{
|
|
Type: events.EventInstanceStatus,
|
|
Payload: events.InstanceStatusPayload{
|
|
InstanceID: instanceID,
|
|
ProjectID: projectID,
|
|
StageID: stageID,
|
|
Status: status,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
// ensureDNS creates or updates a DNS record for the given FQDN. Best-effort: logs warnings on failure.
|
|
func (d *Deployer) ensureDNS(ctx context.Context, fqdn, consumerType, consumerID, deployID string) {
|
|
dnsProvider := d.getDNS()
|
|
if dnsProvider == nil {
|
|
return
|
|
}
|
|
settings, err := d.store.GetSettings()
|
|
if err != nil {
|
|
slog.Warn("dns: get settings for server IP", "error", err)
|
|
return
|
|
}
|
|
if settings.ServerIP == "" {
|
|
slog.Warn("dns: server IP not configured, skipping DNS record creation", "fqdn", fqdn)
|
|
return
|
|
}
|
|
|
|
recordID, err := dnsProvider.EnsureRecord(ctx, fqdn, settings.ServerIP)
|
|
if err != nil {
|
|
msg := fmt.Sprintf("DNS: failed to create/update record for %s: %v", fqdn, err)
|
|
slog.Warn(msg)
|
|
if deployID != "" {
|
|
d.logDeploy(deployID, msg, "warn")
|
|
}
|
|
return
|
|
}
|
|
|
|
// Track the record locally.
|
|
if _, err := d.store.CreateDNSRecord(store.DNSRecord{
|
|
FQDN: fqdn,
|
|
ProviderRecordID: recordID,
|
|
ConsumerType: consumerType,
|
|
ConsumerID: consumerID,
|
|
}); err != nil {
|
|
// May already exist — try updating.
|
|
if updateErr := d.store.UpdateDNSRecordProviderID(fqdn, recordID); updateErr != nil {
|
|
slog.Warn("dns: failed to track record", "fqdn", fqdn, "error", updateErr)
|
|
}
|
|
}
|
|
|
|
logMsg := fmt.Sprintf("DNS: record ensured for %s", fqdn)
|
|
slog.Info(logMsg)
|
|
if deployID != "" {
|
|
d.logDeploy(deployID, logMsg, "info")
|
|
}
|
|
}
|
|
|
|
// removeDNS deletes a DNS record for the given FQDN. Best-effort: logs warnings on failure.
|
|
func (d *Deployer) removeDNS(ctx context.Context, fqdn, deployID string) {
|
|
dnsProvider := d.getDNS()
|
|
if dnsProvider == nil {
|
|
return
|
|
}
|
|
|
|
if err := dnsProvider.DeleteRecord(ctx, fqdn); err != nil {
|
|
msg := fmt.Sprintf("DNS: failed to delete record for %s: %v", fqdn, err)
|
|
slog.Warn(msg)
|
|
if deployID != "" {
|
|
d.logDeploy(deployID, msg, "warn")
|
|
}
|
|
return
|
|
}
|
|
|
|
// Remove local tracking.
|
|
if err := d.store.DeleteDNSRecord(fqdn); err != nil {
|
|
slog.Warn("dns: failed to remove tracking record", "fqdn", fqdn, "error", err)
|
|
}
|
|
|
|
logMsg := fmt.Sprintf("DNS: record deleted for %s", fqdn)
|
|
slog.Info(logMsg)
|
|
if deployID != "" {
|
|
d.logDeploy(deployID, logMsg, "info")
|
|
}
|
|
}
|
|
|
|
// truncateID safely truncates a Docker ID to 12 characters for display.
|
|
func truncateID(id string) string {
|
|
if len(id) > 12 {
|
|
return id[:12]
|
|
}
|
|
return id
|
|
}
|
|
|